use crate::evidence::{content_hash, sort_findings};
use crate::store::append::{CompactionConfig, CompactionStrategy};
use crate::store::segment::{CompactionOutcome, CompactionResult};
use serde::{Deserialize, Serialize};
use std::path::Path;
pub const COMPACTION_REPORT_SCHEMA_VERSION: u16 = 1;
pub type CompactionEvidenceHash = [u8; 32];
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum CompactionStrategyShape {
Merge,
Retention,
Tombstone,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum CompactionReportFinding {
PreSwapRollback {
reason: String,
},
OutputSegmentHashUnavailable {
reason: String,
},
}
#[derive(Serialize)]
struct CompactionStructuralFingerprint {
schema_version: u16,
strategy_shape: CompactionStrategyShape,
min_segments_threshold: usize,
active_segment_id: u64,
sealed_segment_count: usize,
source_segment_ids_sorted: Vec<u64>,
merged_segment_id: Option<u64>,
outcome: CompactionOutcome,
segments_removed: usize,
bytes_reclaimed: u64,
}
fn compaction_id_digest(
fp: &CompactionStructuralFingerprint,
) -> Result<[u8; 32], rmp_serde::encode::Error> {
let bytes = crate::encoding::to_bytes(fp)?;
Ok(content_hash(&bytes))
}
fn segment_id_bounds(ids: &[u64]) -> (Option<u64>, Option<u64>) {
match (ids.first(), ids.last()) {
(Some(lo), Some(hi)) => (Some(*lo), Some(*hi)),
_ => (None, None),
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct CompactionReportBody {
pub schema_version: u16,
pub compaction_id: [u8; 32],
pub input_segment_id_low: Option<u64>,
pub input_segment_id_high: Option<u64>,
pub strategy_shape: CompactionStrategyShape,
pub min_segments_threshold: usize,
pub active_segment_id: u64,
pub sealed_segment_count: usize,
pub source_segment_ids_sorted: Vec<u64>,
pub merged_segment_id: Option<u64>,
pub output_segment_bytes_hash: Option<[u8; 32]>,
pub outcome: CompactionOutcome,
pub segments_removed: usize,
pub bytes_reclaimed: u64,
pub findings: Vec<CompactionReportFinding>,
}
impl CompactionReportBody {
pub fn body_hash(&self) -> Result<[u8; 32], rmp_serde::encode::Error> {
let mut body = self.clone();
sort_findings(&mut body.findings);
let bytes = crate::encoding::to_bytes(&body)?;
Ok(content_hash(&bytes))
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct CompactionEvidenceReport {
pub body: CompactionReportBody,
pub body_hash: CompactionEvidenceHash,
}
impl CompactionEvidenceReport {
pub fn from_body(body: CompactionReportBody) -> Result<Self, rmp_serde::encode::Error> {
let body_hash = body.body_hash()?;
Ok(Self { body, body_hash })
}
}
pub fn compaction_strategy_shape(strategy: &CompactionStrategy) -> CompactionStrategyShape {
match strategy {
CompactionStrategy::Merge => CompactionStrategyShape::Merge,
CompactionStrategy::Retention(_) => CompactionStrategyShape::Retention,
CompactionStrategy::Tombstone(_) => CompactionStrategyShape::Tombstone,
}
}
pub fn report_skipped(
config: &CompactionConfig,
active_segment_id: u64,
sealed: &[(u64, std::path::PathBuf)],
) -> Result<CompactionReportBody, rmp_serde::encode::Error> {
let mut source_segment_ids_sorted: Vec<u64> = sealed.iter().map(|(id, _)| *id).collect();
source_segment_ids_sorted.sort();
let (input_segment_id_low, input_segment_id_high) =
segment_id_bounds(&source_segment_ids_sorted);
let outcome = CompactionOutcome::Skipped;
let fp = CompactionStructuralFingerprint {
schema_version: COMPACTION_REPORT_SCHEMA_VERSION,
strategy_shape: compaction_strategy_shape(&config.strategy),
min_segments_threshold: config.min_segments,
active_segment_id,
sealed_segment_count: sealed.len(),
source_segment_ids_sorted: source_segment_ids_sorted.clone(),
merged_segment_id: None,
outcome: outcome.clone(),
segments_removed: 0,
bytes_reclaimed: 0,
};
let compaction_id = compaction_id_digest(&fp)?;
Ok(CompactionReportBody {
schema_version: COMPACTION_REPORT_SCHEMA_VERSION,
compaction_id,
input_segment_id_low,
input_segment_id_high,
strategy_shape: fp.strategy_shape,
min_segments_threshold: fp.min_segments_threshold,
active_segment_id,
sealed_segment_count: sealed.len(),
source_segment_ids_sorted,
merged_segment_id: None,
output_segment_bytes_hash: None,
outcome,
segments_removed: 0,
bytes_reclaimed: 0,
findings: Vec::new(),
})
}
fn push_failed_finding(findings: &mut Vec<CompactionReportFinding>, outcome: &CompactionOutcome) {
if let CompactionOutcome::Failed { reason } = outcome {
findings.push(CompactionReportFinding::PreSwapRollback {
reason: reason.clone(),
});
}
}
pub fn report_for_run(
config: &CompactionConfig,
active_segment_id: u64,
sealed: &[(u64, std::path::PathBuf)],
merged_segment_id: Option<u64>,
result: &CompactionResult,
merged_segment_path_for_hash: Option<&Path>,
) -> Result<CompactionReportBody, rmp_serde::encode::Error> {
let mut source_segment_ids_sorted: Vec<u64> = sealed.iter().map(|(id, _)| *id).collect();
source_segment_ids_sorted.sort();
let (input_segment_id_low, input_segment_id_high) =
segment_id_bounds(&source_segment_ids_sorted);
let mut findings = Vec::new();
push_failed_finding(&mut findings, &result.outcome);
let output_segment_bytes_hash = match (&result.outcome, merged_segment_path_for_hash) {
(CompactionOutcome::Performed, Some(path)) => {
match crate::store::platform::fs::read(path) {
Ok(bytes) => Some(content_hash(&bytes)),
Err(err) => {
findings.push(CompactionReportFinding::OutputSegmentHashUnavailable {
reason: format!("read merged segment for evidence hash: {err}"),
});
None
}
}
}
(CompactionOutcome::Performed, None) => {
findings.push(CompactionReportFinding::OutputSegmentHashUnavailable {
reason: "merged segment path unavailable for evidence hash".into(),
});
None
}
_ => None,
};
let fp = CompactionStructuralFingerprint {
schema_version: COMPACTION_REPORT_SCHEMA_VERSION,
strategy_shape: compaction_strategy_shape(&config.strategy),
min_segments_threshold: config.min_segments,
active_segment_id,
sealed_segment_count: sealed.len(),
source_segment_ids_sorted: source_segment_ids_sorted.clone(),
merged_segment_id,
outcome: result.outcome.clone(),
segments_removed: result.segments_removed,
bytes_reclaimed: result.bytes_reclaimed,
};
let compaction_id = compaction_id_digest(&fp)?;
sort_findings(&mut findings);
Ok(CompactionReportBody {
schema_version: COMPACTION_REPORT_SCHEMA_VERSION,
compaction_id,
input_segment_id_low,
input_segment_id_high,
strategy_shape: fp.strategy_shape,
min_segments_threshold: fp.min_segments_threshold,
active_segment_id,
sealed_segment_count: sealed.len(),
source_segment_ids_sorted,
merged_segment_id,
output_segment_bytes_hash,
outcome: result.outcome.clone(),
segments_removed: result.segments_removed,
bytes_reclaimed: result.bytes_reclaimed,
findings,
})
}