use super::{
compaction_id_digest, compaction_strategy_shape, report_for_run, report_skipped,
segment_id_bounds, CompactionEvidenceReport, CompactionReportBody, CompactionReportFinding,
CompactionStrategyShape, CompactionStructuralFingerprint, COMPACTION_REPORT_SCHEMA_VERSION,
};
use crate::evidence::content_hash;
use crate::store::segment::{CompactionOutcome, CompactionResult};
use crate::store::{CompactionConfig, CompactionStrategy};
use std::path::{Path, PathBuf};
fn merge_config(min_segments: usize) -> CompactionConfig {
CompactionConfig {
strategy: CompactionStrategy::Merge,
min_segments,
}
}
fn sealed_pair() -> Vec<(u64, PathBuf)> {
vec![
(7u64, PathBuf::from("/d/000007.fbat")),
(2u64, PathBuf::from("/d/000002.fbat")),
]
}
#[test]
fn segment_id_bounds_takes_first_as_low_and_last_as_high() {
assert_eq!(segment_id_bounds(&[3, 7, 9]), (Some(3), Some(9)));
assert_eq!(segment_id_bounds(&[42]), (Some(42), Some(42)));
assert_eq!(
segment_id_bounds(&[]),
(None, None),
"an empty id set carries no bounds"
);
}
#[test]
fn compaction_strategy_shape_maps_each_live_strategy_to_its_shape() {
assert_eq!(
compaction_strategy_shape(&CompactionStrategy::Merge),
CompactionStrategyShape::Merge
);
assert_eq!(
compaction_strategy_shape(&CompactionStrategy::Retention(Box::new(|_| true))),
CompactionStrategyShape::Retention
);
assert_eq!(
compaction_strategy_shape(&CompactionStrategy::Tombstone(Box::new(|_| true))),
CompactionStrategyShape::Tombstone
);
}
#[test]
fn report_skipped_sorts_ids_counts_sealed_and_zeroes_the_reclaim_columns() {
let sealed = sealed_pair();
let config = merge_config(4);
let body = report_skipped(&config, 11, &sealed).expect("skip report encodes");
assert_eq!(body.outcome, CompactionOutcome::Skipped);
assert_eq!(
body.sealed_segment_count, 2,
"both sealed sources are counted; kills the `-> 0`/`-> 1` count mutants"
);
assert_eq!(
body.source_segment_ids_sorted,
vec![2, 7],
"ids are sorted ascending, not left in scan order; kills the `.sort()` removal"
);
assert_eq!(body.input_segment_id_low, Some(2));
assert_eq!(body.input_segment_id_high, Some(7));
assert_eq!(body.active_segment_id, 11);
assert_eq!(
body.min_segments_threshold, 4,
"the min-segments threshold is echoed from config"
);
assert_eq!(body.merged_segment_id, None);
assert_eq!(body.segments_removed, 0);
assert_eq!(body.bytes_reclaimed, 0);
assert_eq!(body.schema_version, COMPACTION_REPORT_SCHEMA_VERSION);
assert!(body.output_segment_bytes_hash.is_none());
assert!(
body.findings.is_empty(),
"a pure skip carries no structural findings"
);
}
#[test]
fn report_for_run_flags_a_failed_run_with_preswap_rollback_and_no_output_hash() {
let sealed = sealed_pair();
let config = merge_config(1);
let result = CompactionResult {
outcome: CompactionOutcome::Failed {
reason: "rebuild aborted".to_string(),
},
segments_removed: 0,
bytes_reclaimed: 0,
};
let body = report_for_run(&config, 5, &sealed, Some(2), &result, None)
.expect("failed run report encodes");
assert!(
body.findings.iter().any(|finding| matches!(
finding,
CompactionReportFinding::PreSwapRollback { reason } if reason == "rebuild aborted"
)),
"a Failed outcome must record PreSwapRollback with the engine reason"
);
assert!(
!body.findings.iter().any(|finding| matches!(
finding,
CompactionReportFinding::OutputSegmentHashUnavailable { .. }
)),
"a Failed run reads no merged bytes, so no output-hash finding"
);
assert_eq!(body.output_segment_bytes_hash, None);
assert_eq!(
body.outcome,
CompactionOutcome::Failed {
reason: "rebuild aborted".to_string()
}
);
}
#[test]
fn report_for_run_hashes_a_readable_merged_segment_and_passes_counts_through() {
let dir = tempfile::TempDir::new().expect("temp dir");
let merged = dir.path().join("000002.fbat");
let merged_bytes = b"merged-segment-evidence-bytes";
crate::store::platform::fs::write_file_atomically(
dir.path(),
&merged,
"merged segment fixture",
|file| {
use std::io::Write;
file.write_all(merged_bytes)
.map_err(crate::store::StoreError::Io)
},
)
.expect("write merged fixture through the platform seam");
let sealed = sealed_pair();
let config = merge_config(1);
let result = CompactionResult {
outcome: CompactionOutcome::Performed,
segments_removed: 2,
bytes_reclaimed: 4096,
};
let body = report_for_run(&config, 5, &sealed, Some(2), &result, Some(&merged))
.expect("performed run report encodes");
assert_eq!(
body.output_segment_bytes_hash,
Some(content_hash(merged_bytes)),
"the output hash is exactly blake3 over the merged segment bytes"
);
assert!(
!body.findings.iter().any(|finding| matches!(
finding,
CompactionReportFinding::OutputSegmentHashUnavailable { .. }
| CompactionReportFinding::PreSwapRollback { .. }
)),
"a readable Performed run has neither a rollback nor a hash-unavailable finding"
);
assert_eq!(body.segments_removed, 2);
assert_eq!(body.bytes_reclaimed, 4096);
assert_eq!(body.merged_segment_id, Some(2));
assert_eq!(body.source_segment_ids_sorted, vec![2, 7]);
assert_eq!(body.outcome, CompactionOutcome::Performed);
}
#[test]
fn report_for_run_flags_a_performed_run_whose_merged_bytes_cannot_be_hashed() {
let sealed = sealed_pair();
let config = merge_config(1);
let result = CompactionResult {
outcome: CompactionOutcome::Performed,
segments_removed: 1,
bytes_reclaimed: 10,
};
let none_body = report_for_run(&config, 5, &sealed, Some(2), &result, None)
.expect("performed/none report encodes");
assert_eq!(none_body.output_segment_bytes_hash, None);
assert!(
none_body.findings.iter().any(|finding| matches!(
finding,
CompactionReportFinding::OutputSegmentHashUnavailable { .. }
)),
"a Performed run with no merged path records the hash-unavailable finding"
);
let missing = Path::new("/nonexistent/batpak/merged/never.fbat");
let missing_body = report_for_run(&config, 5, &sealed, Some(2), &result, Some(missing))
.expect("performed/unreadable report encodes");
assert_eq!(missing_body.output_segment_bytes_hash, None);
assert!(
missing_body.findings.iter().any(|finding| matches!(
finding,
CompactionReportFinding::OutputSegmentHashUnavailable { .. }
)),
"a Performed run whose merged file cannot be read records the hash-unavailable finding"
);
}
#[test]
fn body_hash_is_finding_order_independent_nonzero_and_bound_by_from_body() {
let finding_a = CompactionReportFinding::PreSwapRollback {
reason: "a".to_string(),
};
let finding_b = CompactionReportFinding::OutputSegmentHashUnavailable {
reason: "b".to_string(),
};
let mut ordered = body_fixture(vec![finding_a.clone(), finding_b.clone()]);
let reordered = body_fixture(vec![finding_b, finding_a]);
assert_eq!(
ordered.body_hash().expect("hash a"),
reordered.body_hash().expect("hash b"),
"finding order must not change the body hash (findings are sorted first)"
);
assert_ne!(
ordered.body_hash().expect("hash a"),
[0u8; 32],
"a populated body never hashes to the all-zero digest; kills `-> Ok(Default::default())`"
);
let bound = ordered.body_hash().expect("hash a");
ordered.bytes_reclaimed += 1;
assert_ne!(
ordered.body_hash().expect("mutated hash"),
bound,
"changing a real column changes the body hash"
);
ordered.bytes_reclaimed -= 1;
let report = CompactionEvidenceReport::from_body(ordered.clone()).expect("evidence report");
assert_eq!(
report.body_hash,
ordered.body_hash().expect("hash a"),
"from_body must bind body_hash to the body's own digest"
);
}
#[test]
fn compaction_id_digest_is_stable_and_column_sensitive() {
let digest_nine = compaction_id_digest(&fingerprint_fixture(9)).expect("digest nine");
let digest_ten = compaction_id_digest(&fingerprint_fixture(10)).expect("digest ten");
assert_ne!(
digest_nine, [0u8; 32],
"a real fingerprint never digests to the all-zero value"
);
assert_ne!(
digest_nine, digest_ten,
"a changed structural column (active_segment_id) yields a different compaction id"
);
assert_eq!(
digest_nine,
compaction_id_digest(&fingerprint_fixture(9)).expect("digest nine again"),
"the digest is a deterministic function of the fingerprint"
);
}
fn fingerprint_fixture(active_segment_id: u64) -> CompactionStructuralFingerprint {
CompactionStructuralFingerprint {
schema_version: COMPACTION_REPORT_SCHEMA_VERSION,
strategy_shape: CompactionStrategyShape::Merge,
min_segments_threshold: 2,
active_segment_id,
sealed_segment_count: 2,
source_segment_ids_sorted: vec![2, 7],
merged_segment_id: Some(2),
outcome: CompactionOutcome::Performed,
segments_removed: 2,
bytes_reclaimed: 4096,
}
}
fn body_fixture(findings: Vec<CompactionReportFinding>) -> CompactionReportBody {
CompactionReportBody {
schema_version: COMPACTION_REPORT_SCHEMA_VERSION,
compaction_id: [1u8; 32],
input_segment_id_low: Some(2),
input_segment_id_high: Some(7),
strategy_shape: CompactionStrategyShape::Merge,
min_segments_threshold: 2,
active_segment_id: 9,
sealed_segment_count: 2,
source_segment_ids_sorted: vec![2, 7],
merged_segment_id: Some(2),
output_segment_bytes_hash: None,
outcome: CompactionOutcome::Performed,
segments_removed: 2,
bytes_reclaimed: 4096,
findings,
}
}