use std::collections::HashMap;
use crate::destination::Destination;
use crate::error::Result;
use crate::manifest::{MANIFEST_FILENAME, QUARANTINE_PREFIX, RunManifest};
use crate::pipeline::RunSummary;
use crate::pipeline::resume_decisions::{ResumeDecision, build_resume_plan};
use crate::pipeline::validate_manifest::{MANIFEST_MAX_BYTES, read_capped};
use crate::plan::ResolvedRunPlan;
use crate::state::StateStore;
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub struct M8Stats {
pub skipped: usize,
pub reset_for_rewrite: usize,
pub reset_for_quarantine: usize,
pub quarantined_moved: usize,
pub quarantine_move_failures: usize,
pub orphan_parts: usize,
}
pub(crate) fn apply_m8_resume_decisions(
state: &StateStore,
run_id: &str,
plan: &ResolvedRunPlan,
summary: &mut RunSummary,
) -> Result<M8Stats> {
use crate::destination::WriteCommitProtocol;
let dest = match crate::destination::create_destination(&plan.destination) {
Ok(d) => d,
Err(e) => {
log::warn!(
"M8 resume preamble: cannot open destination for export '{}' (not fatal — \
falling back to state-only resume): {:#}",
plan.export_name,
e
);
return Ok(M8Stats::default());
}
};
if dest.capabilities().commit_protocol == WriteCommitProtocol::Streaming {
log::debug!(
"M8 resume preamble: streaming destination for export '{}'; skipped",
plan.export_name
);
return Ok(M8Stats::default());
}
let manifest_bytes = match dest.head(MANIFEST_FILENAME) {
Ok(Some(_)) => match read_capped(&*dest, MANIFEST_FILENAME, MANIFEST_MAX_BYTES) {
Ok(b) => b,
Err(e) => {
log::warn!(
"M8 resume preamble: manifest.json present but unreadable on destination \
for export '{}' (not fatal): {:#}",
plan.export_name,
e
);
return Ok(M8Stats::default());
}
},
Ok(None) => {
log::info!(
"M8 resume preamble: no manifest at destination for export '{}'; \
falling back to state-only resume (legacy / fresh prefix)",
plan.export_name
);
return Ok(M8Stats::default());
}
Err(e) => {
log::warn!(
"M8 resume preamble: manifest.json head failed for export '{}' (not fatal): {:#}",
plan.export_name,
e
);
return Ok(M8Stats::default());
}
};
let manifest: RunManifest = match serde_json::from_slice(&manifest_bytes) {
Ok(m) => m,
Err(e) => {
log::warn!(
"M8 resume preamble: manifest.json at destination did not parse for export '{}' \
(not fatal — manifest may be from a future schema version): {:#}",
plan.export_name,
e
);
return Ok(M8Stats::default());
}
};
if manifest.run_id != run_id {
log::info!(
"M8 resume preamble: destination manifest run_id '{}' differs from current resume \
run_id '{}' for export '{}'; skipping reconciliation (foreign manifest)",
manifest.run_id,
run_id,
plan.export_name
);
return Ok(M8Stats::default());
}
let listing = match dest.list_prefix("") {
Ok(l) => l,
Err(e) => {
log::warn!(
"M8 resume preamble: list_prefix failed for export '{}' (not fatal): {:#}",
plan.export_name,
e
);
return Ok(M8Stats::default());
}
};
let resume_plan = build_resume_plan(&manifest, &listing);
let tasks = state.list_chunk_tasks_for_run(run_id)?;
let by_file: HashMap<&str, &crate::state::ChunkTaskInfo> = tasks
.iter()
.filter_map(|t| t.file_name.as_deref().map(|n| (n, t)))
.collect();
let mut stats = M8Stats::default();
let manifest_part_by_path: HashMap<&str, &crate::manifest::ManifestPart> = manifest
.parts
.iter()
.filter(|p| p.status == crate::manifest::PartStatus::Committed)
.map(|p| (p.path.as_str(), p))
.collect();
if summary.schema_fingerprint.is_none()
&& manifest.schema_fingerprint != "xxh3:0000000000000000"
{
summary.schema_fingerprint = Some(manifest.schema_fingerprint.clone());
}
for (path, decision) in &resume_plan.per_part {
let task = match by_file.get(path.as_str()) {
Some(t) => *t,
None => {
stats.orphan_parts += 1;
continue;
}
};
match decision.decision {
ResumeDecision::Skip => {
stats.skipped += 1;
if let Some(p) = manifest_part_by_path.get(path.as_str()) {
summary.manifest_parts.push((*p).clone());
}
}
ResumeDecision::Rewrite => {
let n = state.reset_chunk_task_for_re_export(
run_id,
task.chunk_index,
"M8 reset: manifest part missing at destination",
)?;
if n > 0 {
stats.reset_for_rewrite += 1;
}
}
ResumeDecision::Quarantine { reason } => {
let n = state.reset_chunk_task_for_re_export(
run_id,
task.chunk_index,
&format!("M8 reset: destination part diverged ({:?})", reason),
)?;
if n > 0 {
stats.reset_for_quarantine += 1;
}
quarantine_move(&*dest, path, run_id, &plan.export_name, &mut stats);
}
}
}
for key in resume_plan.untracked.keys() {
quarantine_move(&*dest, key, run_id, &plan.export_name, &mut stats);
}
log::info!(
"M8 resume preamble: export '{}' run_id '{}' — {} skipped, {} reset for rewrite, \
{} reset for quarantine, {} quarantined ({} move failure(s)), {} orphan part(s), \
{} untracked surplus object(s)",
plan.export_name,
run_id,
stats.skipped,
stats.reset_for_rewrite,
stats.reset_for_quarantine,
stats.quarantined_moved,
stats.quarantine_move_failures,
stats.orphan_parts,
resume_plan.untracked.len(),
);
Ok(stats)
}
fn quarantine_move(
dest: &dyn Destination,
src_key: &str,
run_id: &str,
export_name: &str,
stats: &mut M8Stats,
) {
let quarantine_key = format!("{}/{}/{}", QUARANTINE_PREFIX, run_id, src_key);
match dest.r#move(src_key, &quarantine_key) {
Ok(()) => {
stats.quarantined_moved += 1;
log::info!(
"M9 quarantine: export '{}' moved '{}' → '{}'",
export_name,
src_key,
quarantine_key,
);
}
Err(e) => {
stats.quarantine_move_failures += 1;
log::warn!(
"M9 quarantine: export '{}' could not move '{}' → '{}' (not fatal — \
next resume will re-trip M9 on this object): {:#}",
export_name,
src_key,
quarantine_key,
e,
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::state::ChunkTaskInfo;
#[test]
fn m8stats_aggregates_default_to_zero() {
let s = M8Stats::default();
assert_eq!(s.skipped, 0);
assert_eq!(s.reset_for_rewrite, 0);
assert_eq!(s.reset_for_quarantine, 0);
assert_eq!(s.orphan_parts, 0);
}
#[test]
fn quarantine_move_is_best_effort_on_success_and_failure() {
use crate::destination::{
Destination, DestinationCapabilities, WriteCommitProtocol, WriteOutcome,
};
use std::path::Path;
use std::sync::Mutex;
struct MoveMock {
fail: bool,
calls: Mutex<Vec<(String, String)>>,
}
impl Destination for MoveMock {
fn write(&self, _p: &Path, _k: &str) -> Result<WriteOutcome> {
unreachable!("quarantine_move must never call write")
}
fn capabilities(&self) -> DestinationCapabilities {
DestinationCapabilities {
commit_protocol: WriteCommitProtocol::Atomic,
idempotent_overwrite: true,
retry_safe: false,
partial_write_risk: true,
}
}
fn r#move(&self, from: &str, to: &str) -> Result<()> {
self.calls
.lock()
.unwrap()
.push((from.to_string(), to.to_string()));
if self.fail {
anyhow::bail!("simulated move failure")
}
Ok(())
}
}
let ok = MoveMock {
fail: false,
calls: Mutex::new(Vec::new()),
};
let mut stats = M8Stats::default();
quarantine_move(
&ok,
"part-000003.parquet",
"run_xyz",
"public.orders",
&mut stats,
);
assert_eq!(stats.quarantined_moved, 1);
assert_eq!(stats.quarantine_move_failures, 0);
assert_eq!(
ok.calls.lock().unwrap().as_slice(),
&[(
"part-000003.parquet".to_string(),
format!("{QUARANTINE_PREFIX}/run_xyz/part-000003.parquet"),
)],
"successful move targets _quarantine/<run_id>/<original-key>"
);
let bad = MoveMock {
fail: true,
calls: Mutex::new(Vec::new()),
};
let mut stats = M8Stats::default();
quarantine_move(
&bad,
"orphan.parquet",
"run_xyz",
"public.orders",
&mut stats,
);
assert_eq!(stats.quarantined_moved, 0);
assert_eq!(stats.quarantine_move_failures, 1);
}
#[test]
fn sec_manifest_read_rejects_oversized() {
use crate::config::{DestinationConfig, DestinationType};
use crate::destination::local::LocalDestination;
const MANIFEST_MAX_BYTES: u64 = 64 * 1024 * 1024;
let dir = tempfile::tempdir().unwrap();
let dest = LocalDestination::new(&DestinationConfig {
destination_type: DestinationType::Local,
path: Some(dir.path().to_string_lossy().into_owned()),
..Default::default()
})
.unwrap();
{
use std::io::{Seek, SeekFrom, Write};
let mut f = std::fs::File::create(dir.path().join(MANIFEST_FILENAME)).unwrap();
f.seek(SeekFrom::Start(MANIFEST_MAX_BYTES + 4096)).unwrap();
f.write_all(b"}").unwrap();
f.flush().unwrap();
}
let meta = dest.head(MANIFEST_FILENAME).unwrap().unwrap();
assert!(
meta.size_bytes > MANIFEST_MAX_BYTES,
"precondition: planted manifest must exceed the cap"
);
let result = read_capped(&dest, MANIFEST_FILENAME, MANIFEST_MAX_BYTES);
assert!(
result.is_err(),
"oversized manifest.json must be rejected by the capped read path, \
not materialised into memory (V21 DoS)"
);
}
#[test]
fn by_file_index_skips_chunks_without_file_name() {
let tasks = [
ChunkTaskInfo {
chunk_index: 0,
start_key: "0".into(),
end_key: "100".into(),
status: "completed".into(),
attempts: 1,
last_error: None,
rows_written: Some(100),
file_name: Some("part-000001.parquet".into()),
},
ChunkTaskInfo {
chunk_index: 1,
start_key: "100".into(),
end_key: "200".into(),
status: "pending".into(),
attempts: 0,
last_error: None,
rows_written: None,
file_name: None, },
ChunkTaskInfo {
chunk_index: 2,
start_key: "200".into(),
end_key: "300".into(),
status: "completed".into(),
attempts: 1,
last_error: None,
rows_written: Some(50),
file_name: Some("part-000003.parquet".into()),
},
];
let by_file: HashMap<&str, &ChunkTaskInfo> = tasks
.iter()
.filter_map(|t| t.file_name.as_deref().map(|n| (n, t)))
.collect();
assert_eq!(by_file.len(), 2);
assert_eq!(by_file["part-000001.parquet"].chunk_index, 0);
assert_eq!(by_file["part-000003.parquet"].chunk_index, 2);
assert!(!by_file.contains_key("part-000002.parquet"));
}
}