use crate::evidence::{content_hash, sort_findings};
use crate::store::StoreError;
use serde::{Deserialize, Serialize};
use std::path::Path;
pub const FORK_EVIDENCE_REPORT_SCHEMA_VERSION: u16 = 1;
pub type ForkEvidenceHash = [u8; 32];
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ForkCopyStrategy {
Reflink,
Hardlink,
DeepCopy,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum CopyPreference {
#[default]
ReflinkThenHardlink,
HardlinkOnly,
DeepCopyOnly,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum KeysetPolicy {
#[default]
Refuse,
ExcludeKeys,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[must_use]
pub struct ForkOptions {
pub copy_preference: CopyPreference,
pub exclude_caches: bool,
pub keyset_policy: KeysetPolicy,
}
impl Default for ForkOptions {
fn default() -> Self {
Self {
copy_preference: CopyPreference::default(),
exclude_caches: true,
keyset_policy: KeysetPolicy::default(),
}
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct ForkStrategyCounts {
pub reflink: usize,
pub hardlink: usize,
pub deep_copy: usize,
pub cache_regenerable: usize,
pub excluded: usize,
}
impl ForkStrategyCounts {
pub(crate) fn record_copy(&mut self, strategy: ForkCopyStrategy) {
match strategy {
ForkCopyStrategy::Reflink => self.reflink += 1,
ForkCopyStrategy::Hardlink => self.hardlink += 1,
ForkCopyStrategy::DeepCopy => self.deep_copy += 1,
}
}
pub(crate) fn record_cache_regenerable(&mut self) {
self.cache_regenerable += 1;
}
pub(crate) fn record_excluded(&mut self) {
self.excluded += 1;
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ForkFinding {
DestinationCleared {
artifact_count: usize,
},
FenceTokenCancelled,
CacheRegenerableExcluded {
file_name: String,
},
FileExcluded {
file_name: String,
reason: String,
},
FileCopied {
file_name: String,
strategy: ForkCopyStrategy,
},
KeysExcluded,
}
#[derive(Serialize)]
struct ForkStructuralFingerprint {
schema_version: u16,
fence_token: crate::store::SnapshotFenceTokenRef,
source_watermark: crate::store::SnapshotWatermarkRef,
active_segment_id: u64,
shared_segment_ids_sorted: Vec<u64>,
deep_copied_segment_ids_sorted: Vec<u64>,
strategy_counts: ForkStrategyCounts,
copied_visibility_ranges_present: bool,
copied_pending_compaction_marker_present: bool,
copied_idempotency_store_present: bool,
destination_path_digest: ForkEvidenceHash,
}
fn fork_id_digest(
fp: &ForkStructuralFingerprint,
) -> Result<ForkEvidenceHash, rmp_serde::encode::Error> {
let bytes = crate::encoding::to_bytes(fp)?;
Ok(content_hash(&bytes))
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ForkReportBody {
pub schema_version: u16,
pub fork_id: ForkEvidenceHash,
pub fence_token: crate::store::SnapshotFenceTokenRef,
pub source_watermark: crate::store::SnapshotWatermarkRef,
pub active_segment_id: u64,
pub shared_segment_ids_sorted: Vec<u64>,
pub deep_copied_segment_ids_sorted: Vec<u64>,
pub strategy_counts: ForkStrategyCounts,
pub copied_visibility_ranges_present: bool,
pub copied_pending_compaction_marker_present: bool,
pub copied_idempotency_store_present: bool,
pub destination_path_digest: ForkEvidenceHash,
pub findings: Vec<ForkFinding>,
}
impl ForkReportBody {
pub fn body_hash(&self) -> Result<ForkEvidenceHash, rmp_serde::encode::Error> {
fork_report_body_hash(self)
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ForkReport {
pub body: ForkReportBody,
pub body_hash: ForkEvidenceHash,
pub generated_at_unix_ms: Option<u64>,
pub batpak_version: Option<String>,
pub diagnostics: Vec<String>,
}
pub fn fork_report_body_hash(
body: &ForkReportBody,
) -> Result<ForkEvidenceHash, rmp_serde::encode::Error> {
let mut body = body.clone();
sort_findings(&mut body.findings);
let bytes = crate::encoding::to_bytes(&body)?;
Ok(content_hash(&bytes))
}
pub(crate) fn destination_path_digest(dest: &Path) -> ForkEvidenceHash {
content_hash(dest.as_os_str().as_encoded_bytes())
}
pub(crate) struct ForkReportInput {
pub(crate) fence_token: u64,
pub(crate) source_watermark_segment_id: u64,
pub(crate) source_watermark_offset: u64,
pub(crate) active_segment_id: u64,
pub(crate) shared_segment_ids_sorted: Vec<u64>,
pub(crate) deep_copied_segment_ids_sorted: Vec<u64>,
pub(crate) strategy_counts: ForkStrategyCounts,
pub(crate) copied_visibility_ranges_present: bool,
pub(crate) copied_pending_compaction_marker_present: bool,
pub(crate) copied_idempotency_store_present: bool,
pub(crate) destination_path_digest: ForkEvidenceHash,
pub(crate) findings: Vec<ForkFinding>,
}
pub(crate) fn fork_evidence_report(
input: ForkReportInput,
) -> Result<ForkReport, rmp_serde::encode::Error> {
let fence_token = crate::store::SnapshotFenceTokenRef {
token: input.fence_token,
};
let source_watermark = crate::store::SnapshotWatermarkRef {
segment_id: input.source_watermark_segment_id,
offset: input.source_watermark_offset,
};
let mut shared_segment_ids_sorted = input.shared_segment_ids_sorted;
shared_segment_ids_sorted.sort_unstable();
let mut deep_copied_segment_ids_sorted = input.deep_copied_segment_ids_sorted;
deep_copied_segment_ids_sorted.sort_unstable();
let mut findings = input.findings;
sort_findings(&mut findings);
let fp = ForkStructuralFingerprint {
schema_version: FORK_EVIDENCE_REPORT_SCHEMA_VERSION,
fence_token,
source_watermark,
active_segment_id: input.active_segment_id,
shared_segment_ids_sorted: shared_segment_ids_sorted.clone(),
deep_copied_segment_ids_sorted: deep_copied_segment_ids_sorted.clone(),
strategy_counts: input.strategy_counts,
copied_visibility_ranges_present: input.copied_visibility_ranges_present,
copied_pending_compaction_marker_present: input.copied_pending_compaction_marker_present,
copied_idempotency_store_present: input.copied_idempotency_store_present,
destination_path_digest: input.destination_path_digest,
};
let fork_id = fork_id_digest(&fp)?;
let body = ForkReportBody {
schema_version: FORK_EVIDENCE_REPORT_SCHEMA_VERSION,
fork_id,
fence_token,
source_watermark,
active_segment_id: input.active_segment_id,
shared_segment_ids_sorted,
deep_copied_segment_ids_sorted,
strategy_counts: input.strategy_counts,
copied_visibility_ranges_present: input.copied_visibility_ranges_present,
copied_pending_compaction_marker_present: input.copied_pending_compaction_marker_present,
copied_idempotency_store_present: input.copied_idempotency_store_present,
destination_path_digest: input.destination_path_digest,
findings,
};
let body_hash = fork_report_body_hash(&body)?;
Ok(ForkReport {
body,
body_hash,
generated_at_unix_ms: None,
batpak_version: None,
diagnostics: Vec::new(),
})
}
pub(crate) const FORK_EVIDENCE_WIRE_MAGIC: &[u8; 6] = b"FBATFE";
pub fn encode_fork_evidence_wire(body: &ForkReportBody) -> Result<Vec<u8>, StoreError> {
let body_bytes = crate::encoding::to_bytes(body)
.map_err(|error| StoreError::Serialization(Box::new(error)))?;
let crc = crc32fast::hash(&body_bytes);
let mut bytes = Vec::with_capacity(12 + body_bytes.len());
bytes.extend_from_slice(FORK_EVIDENCE_WIRE_MAGIC);
bytes.extend_from_slice(&body.schema_version.to_le_bytes());
bytes.extend_from_slice(&crc.to_le_bytes());
bytes.extend_from_slice(&body_bytes);
Ok(bytes)
}
pub fn decode_fork_evidence_wire(bytes: &[u8]) -> Result<ForkReportBody, StoreError> {
if bytes.len() < 12 || bytes.get(..6) != Some(FORK_EVIDENCE_WIRE_MAGIC) {
return Err(StoreError::Configuration(
"fork evidence wire framing is invalid".into(),
));
}
let found = u16::from_le_bytes(
bytes[6..8]
.try_into()
.map_err(|_| StoreError::Configuration("fork evidence version slice".into()))?,
);
if found > FORK_EVIDENCE_REPORT_SCHEMA_VERSION {
return Err(StoreError::ForkEvidenceFutureVersion {
found,
supported: FORK_EVIDENCE_REPORT_SCHEMA_VERSION,
});
}
let expected_crc = u32::from_le_bytes(
bytes[8..12]
.try_into()
.map_err(|_| StoreError::Configuration("fork evidence crc slice".into()))?,
);
let body_bytes = &bytes[12..];
if crc32fast::hash(body_bytes) != expected_crc {
return Err(StoreError::Configuration(
"fork evidence wire crc mismatch".into(),
));
}
let body: ForkReportBody = crate::encoding::from_bytes(body_bytes)
.map_err(|error| StoreError::Serialization(Box::new(error)))?;
if body.schema_version > FORK_EVIDENCE_REPORT_SCHEMA_VERSION {
return Err(StoreError::ForkEvidenceFutureVersion {
found: body.schema_version,
supported: FORK_EVIDENCE_REPORT_SCHEMA_VERSION,
});
}
Ok(body)
}
#[cfg(test)]
#[path = "fork_report_mutation_kill.rs"]
mod fork_report_mutation_kill;
#[cfg(test)]
mod tests {
use super::{decode_fork_evidence_wire, FORK_EVIDENCE_WIRE_MAGIC};
use crate::store::StoreError;
#[test]
fn decode_rejects_a_short_frame_before_reading_its_version() {
let mut bytes = Vec::new();
bytes.extend_from_slice(FORK_EVIDENCE_WIRE_MAGIC); bytes.extend_from_slice(&u16::MAX.to_le_bytes()); bytes.extend_from_slice(&[0u8, 0u8]);
let err = decode_fork_evidence_wire(&bytes)
.expect_err("a 10-byte frame is too short to be a valid fork-evidence wire");
assert!(
matches!(err, StoreError::Configuration(_)),
"a sub-12-byte frame must be rejected on LENGTH as Configuration, not \
routed to a later branch; got {err:?}"
);
}
}