use crate::lab::crashpack::evidence_ledger::{AtpEvidenceEntry, AtpEvidenceLedger};
use crate::lab::crashpack::{
ATP_CRASHPACK_SCHEMA_VERSION, AtpCrashpack, TransferOracleResult, TransferViolation,
ViolationSeverity, atp_specialized_log, is_atp_path_event, is_atp_quic_event,
is_atp_repair_event,
};
use crate::lab::oracle::evidence::EvidenceStrength;
use crate::lab::oracle::{OracleEntryReport, OracleReport, OracleStats};
use crate::trace::{TraceData, TraceEvent, TraceEventKind};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::collections::{BTreeMap, BTreeSet};
use std::path::{Path, PathBuf};
use thiserror::Error;
#[derive(Debug)]
pub struct AtpReplayCoordinator {
pub crashpack: AtpCrashpack,
pub minimizer_config: TraceMinimizerConfig,
}
impl AtpReplayCoordinator {
pub fn new(crashpack: AtpCrashpack) -> Self {
Self {
crashpack,
minimizer_config: TraceMinimizerConfig::default(),
}
}
pub fn with_minimizer_config(mut self, config: TraceMinimizerConfig) -> Self {
self.minimizer_config = config;
self
}
pub fn replay(&self) -> Result<AtpReplayResult, ReplayError> {
let original_violations = self.count_original_violations();
self.validate_crashpack_for_replay(original_violations)?;
let mut result = AtpReplayResult {
original_violations,
minimized_trace_length: self.crashpack.trace_events.len(),
replay_successful: original_violations == 0
|| !failure_witness_counts(&self.crashpack.trace_events).is_empty(),
oracle_results: vec![self.replay_oracle_report()],
minimization_stats: MinimizationStats::default(),
};
if self.minimizer_config.enabled {
let minimizer = TraceMinimizer::new(self.minimizer_config.clone());
let minimization_result = minimizer.minimize(&self.crashpack.trace_events)?;
result.minimized_trace_length = minimization_result.minimized_events.len();
result.minimization_stats = minimization_result.stats;
}
Ok(result)
}
pub fn generate_replay_command(&self, output_dir: &Path) -> Result<String, ReplayError> {
let mut cmd = String::from("#!/bin/bash\n");
cmd.push_str("# ATP Replay Command - Generated by crashpack replay coordinator\n");
cmd.push_str("# This command reproduces the captured failure deterministically\n\n");
for (name, seed) in &self.crashpack.seeds {
cmd.push_str(&format!("export ATP_SEED_{}={}\n", env_suffix(name), seed));
}
cmd.push_str("\n# Oracle configuration\n");
for result in &self.crashpack.oracle_results {
cmd.push_str(&format!(
"export ATP_ORACLE_{}=enabled\n",
env_suffix(&result.oracle_name)
));
}
let trace_file = shell_path_arg(&output_dir.join("transfer.atp-trace"));
let manifest = shell_path_arg(&output_dir.join("manifest"));
let journal_digest = shell_path_arg(&output_dir.join("journal.digest"));
let evidence_ledger = shell_path_arg(&output_dir.join("evidence-ledger.json"));
let pathlog = shell_path_arg(&output_dir.join("pathlog"));
let quiclog = shell_path_arg(&output_dir.join("quiclog"));
let repairlog = shell_path_arg(&output_dir.join("repairlog"));
cmd.push_str("\n# Replay execution\n");
cmd.push_str(&format!(
"asupersync atp replay --trace-file {trace_file} --manifest {manifest} --journal-digest {journal_digest} --evidence-ledger {evidence_ledger} --pathlog {pathlog} --quiclog {quiclog} --repairlog {repairlog} --validate-oracles"
));
if self.minimizer_config.enabled {
cmd.push_str(&format!(
" --minimize --reduction-target {}",
self.minimizer_config.reduction_target
));
}
cmd.push('\n');
Ok(cmd)
}
pub fn validate_replay_artifacts(
output_dir: &Path,
) -> Result<AtpReplayArtifactReport, ReplayError> {
for artifact in REQUIRED_REPLAY_ARTIFACTS {
let path = output_dir.join(artifact);
if !path.is_file() {
return Err(replay_validation_failed(format!(
"missing replay artifact {}",
path.display()
)));
}
}
let trace_events = read_trace_events(output_dir)?;
validate_specialized_replay_log(output_dir, "pathlog", &trace_events, is_atp_path_event)?;
validate_specialized_replay_log(output_dir, "quiclog", &trace_events, is_atp_quic_event)?;
validate_specialized_replay_log(
output_dir,
"repairlog",
&trace_events,
is_atp_repair_event,
)?;
let journal_path = output_dir.join("journal");
let journal = read_replay_artifact(&journal_path)?;
let computed_journal_digest = journal_digest_ref(&journal);
let digest_path = output_dir.join("journal.digest");
let digest_file = read_replay_artifact(&digest_path)?;
let recorded_journal_digest = keyed_value(&digest_file, "digest")
.ok_or_else(|| replay_validation_failed("journal.digest is missing digest field"))?;
if recorded_journal_digest != computed_journal_digest {
return Err(replay_validation_failed(format!(
"journal.digest mismatch: recorded {recorded_journal_digest}, computed {computed_journal_digest}"
)));
}
let recorded_journal_bytes = keyed_value(&digest_file, "bytes")
.ok_or_else(|| replay_validation_failed("journal.digest is missing bytes field"))?
.parse::<usize>()
.map_err(|err| {
replay_validation_failed(format!("journal.digest bytes field is invalid: {err}"))
})?;
if recorded_journal_bytes != journal.len() {
return Err(replay_validation_failed(format!(
"journal.digest byte count mismatch: recorded {recorded_journal_bytes}, computed {}",
journal.len()
)));
}
let manifest_path = output_dir.join("manifest");
let manifest = read_replay_artifact(&manifest_path)?;
validate_manifest_reference(
&manifest,
"schema_version",
&ATP_CRASHPACK_SCHEMA_VERSION.to_string(),
)?;
validate_manifest_reference(&manifest, "journal_digest", &computed_journal_digest)?;
validate_manifest_reference(&manifest, "journal_digest_artifact", "journal.digest")?;
validate_manifest_reference(&manifest, "evidence_ledger", "evidence-ledger.json")?;
let manifest_violations = keyed_value(&manifest, "violations")
.ok_or_else(|| replay_validation_failed("manifest is missing violations field"))?
.parse::<usize>()
.map_err(|err| {
replay_validation_failed(format!("manifest violations field is invalid: {err}"))
})?;
let ledger = read_evidence_ledger(output_dir)?;
if ledger.schema_version != ATP_CRASHPACK_SCHEMA_VERSION {
return Err(replay_validation_failed(format!(
"unsupported evidence ledger schema version {}",
ledger.schema_version
)));
}
validate_manifest_seed_and_metadata(&manifest, &ledger)?;
for artifact in REQUIRED_LEDGER_ARTIFACT_PATHS {
let artifact = PathBuf::from(artifact);
if !ledger.artifact_paths.contains(&artifact) {
return Err(replay_validation_failed(format!(
"evidence ledger is missing artifact path {}",
artifact.display()
)));
}
}
let oracle_results = parse_journal_oracle_results(&journal)?;
validate_manifest_violation_count(manifest_violations, &oracle_results)?;
validate_journal_matches_ledger(&ledger, &oracle_results)?;
let violation_entries = ledger.violation_entries().len();
if manifest_violations < violation_entries {
return Err(replay_validation_failed(format!(
"manifest understates evidence ledger violations: manifest {manifest_violations}, ledger {violation_entries}"
)));
}
if violation_entries > 0 && failure_witness_counts(&trace_events).is_empty() {
return Err(replay_validation_failed(
"replay artifact has violation evidence but no trace failure witnesses",
));
}
let replay_command = read_replay_artifact(&output_dir.join("replay_command.sh"))?;
validate_replay_command(output_dir, &replay_command)?;
Ok(AtpReplayArtifactReport {
trace_events: trace_events.len(),
ledger_entries: ledger.entries.len(),
violation_entries,
artifact_paths: ledger.artifact_paths,
journal_digest: computed_journal_digest,
replay_ready: true,
})
}
pub fn replay_from_artifacts(output_dir: &Path) -> Result<AtpReplayResult, ReplayError> {
Self::replay_from_artifacts_with_config(output_dir, TraceMinimizerConfig::default())
}
pub fn replay_from_artifacts_with_config(
output_dir: &Path,
minimizer_config: TraceMinimizerConfig,
) -> Result<AtpReplayResult, ReplayError> {
Self::validate_replay_artifacts(output_dir)?;
let trace_events = read_trace_events(output_dir)?;
let ledger = read_evidence_ledger(output_dir)?;
let journal = read_replay_artifact(&output_dir.join("journal"))?;
let oracle_results = parse_journal_oracle_results(&journal)?;
validate_journal_matches_manifest(output_dir, &oracle_results)?;
validate_journal_matches_ledger(&ledger, &oracle_results)?;
let crashpack = AtpCrashpack {
schema_version: ledger.schema_version,
oracle_results,
trace_events,
seeds: ledger.seeds,
artifact_paths: ledger
.artifact_paths
.iter()
.map(|path| path.to_string_lossy().into_owned())
.collect(),
metadata: ledger.metadata,
};
Self::new(crashpack)
.with_minimizer_config(minimizer_config)
.replay()
}
fn count_original_violations(&self) -> usize {
self.crashpack
.oracle_results
.iter()
.map(|r| r.violations.len())
.sum()
}
fn validate_crashpack_for_replay(&self, original_violations: usize) -> Result<(), ReplayError> {
if self.crashpack.schema_version != ATP_CRASHPACK_SCHEMA_VERSION {
return Err(ReplayError::ReplayValidationFailed(format!(
"unsupported ATP crashpack schema version {}",
self.crashpack.schema_version
)));
}
if original_violations == 0 {
return Ok(());
}
if self.crashpack.trace_events.is_empty() {
return Err(ReplayError::ReplayValidationFailed(
"crashpack has oracle violations but no trace events".to_string(),
));
}
let missing_witnesses = self.missing_violation_witnesses();
if !missing_witnesses.is_empty() {
return Err(ReplayError::ReplayValidationFailed(format!(
"crashpack has oracle violations without matching trace failure witnesses: {}",
missing_witnesses.join(", ")
)));
}
Ok(())
}
fn missing_violation_witnesses(&self) -> Vec<String> {
let mut missing = Vec::new();
for result in &self.crashpack.oracle_results {
for violation in &result.violations {
if !self
.crashpack
.trace_events
.iter()
.any(|event| event_witnesses_violation(event, &result.oracle_name, violation))
{
missing.push(violation_label(&result.oracle_name, violation));
}
}
}
missing
}
fn replay_oracle_report(&self) -> OracleReport {
let entries = self
.crashpack
.oracle_results
.iter()
.map(|result| OracleEntryReport {
invariant: result.oracle_name.clone(),
passed: result.passed,
violation: summarize_transfer_violations(&result.violations),
stats: result.stats.clone(),
})
.collect::<Vec<_>>();
let passed = entries.iter().filter(|entry| entry.passed).count();
let total = entries.len();
OracleReport {
entries,
total,
passed,
failed: total.saturating_sub(passed),
check_time_nanos: 0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TraceMinimizerConfig {
pub enabled: bool,
pub reduction_target: f64,
pub max_attempts: usize,
pub preserve_oracle_events: bool,
pub preserve_timing: bool,
}
impl Default for TraceMinimizerConfig {
fn default() -> Self {
Self {
enabled: true,
reduction_target: 0.3, max_attempts: 100,
preserve_oracle_events: true,
preserve_timing: false,
}
}
}
#[derive(Debug)]
pub struct TraceMinimizer {
config: TraceMinimizerConfig,
}
impl TraceMinimizer {
pub fn new(config: TraceMinimizerConfig) -> Self {
Self { config }
}
pub fn minimize(&self, events: &[TraceEvent]) -> Result<MinimizationResult, ReplayError> {
if !self.config.enabled || events.is_empty() {
return Ok(MinimizationResult {
minimized_events: events.to_vec(),
stats: MinimizationStats {
original_length: events.len(),
minimized_length: events.len(),
reduction_percentage: 0.0,
attempts_made: 0,
},
});
}
let mut minimized = events.to_vec();
let mut attempts = 0;
let original_length = events.len();
let target_length =
(original_length as f64 * (1.0 - self.config.reduction_target)) as usize;
let essential_events = self.identify_essential_events(events);
let mut protected_events = BTreeSet::new();
while minimized.len() > target_length && attempts < self.config.max_attempts {
let candidates_for_removal =
self.find_removal_candidates(&minimized, &essential_events, &protected_events);
if candidates_for_removal.is_empty() {
break; }
if let Some(candidate_idx) = candidates_for_removal.first() {
let mut test_trace = minimized.clone(); test_trace.remove(*candidate_idx);
if self.validate_trace_preserves_failure(&test_trace, events) {
minimized = test_trace;
} else {
protected_events.insert(event_identity(&minimized[*candidate_idx]));
}
}
attempts += 1;
}
let minimized_length = minimized.len();
let reduction_percentage =
(original_length - minimized_length) as f64 / original_length as f64;
Ok(MinimizationResult {
minimized_events: minimized,
stats: MinimizationStats {
original_length,
minimized_length,
reduction_percentage,
attempts_made: attempts,
},
})
}
fn identify_essential_events(&self, events: &[TraceEvent]) -> BTreeSet<String> {
let mut essential = BTreeSet::new();
for event in events {
if self.config.preserve_oracle_events
&& (is_structural_replay_event(event) || failure_witness_signature(event).is_some())
{
essential.insert(event_identity(event));
}
}
if self.config.preserve_timing {
for event in events {
essential.insert(event_identity(event));
}
}
essential
}
fn find_removal_candidates(
&self,
events: &[TraceEvent],
essential: &BTreeSet<String>,
protected: &BTreeSet<String>,
) -> Vec<usize> {
let mut candidates = Vec::new();
for (idx, event) in events.iter().enumerate() {
let identity = event_identity(event);
if !essential.contains(&identity) && !protected.contains(&identity) {
candidates.push(idx);
}
}
candidates.sort_by(|a, b| b.cmp(a));
candidates
}
fn validate_trace_preserves_failure(
&self,
test_trace: &[TraceEvent],
original_trace: &[TraceEvent],
) -> bool {
let original_witnesses = failure_witness_counts(original_trace);
if original_witnesses.is_empty() {
return test_trace == original_trace;
}
let test_witnesses = failure_witness_counts(test_trace);
original_witnesses
.iter()
.all(|(signature, count)| test_witnesses.get(signature).unwrap_or(&0) >= count)
}
}
fn is_structural_replay_event(event: &TraceEvent) -> bool {
matches!(
event.kind,
TraceEventKind::Spawn
| TraceEventKind::Complete
| TraceEventKind::CancelRequest
| TraceEventKind::CancelAck
| TraceEventKind::RegionCancelled
| TraceEventKind::RegionCloseBegin
| TraceEventKind::RegionCloseComplete
| TraceEventKind::ObligationReserve
| TraceEventKind::ObligationCommit
| TraceEventKind::ObligationAbort
| TraceEventKind::ObligationLeak
)
}
fn failure_witness_counts(events: &[TraceEvent]) -> BTreeMap<String, usize> {
let mut counts = BTreeMap::new();
for signature in events.iter().filter_map(failure_witness_signature) {
*counts.entry(signature).or_insert(0) += 1;
}
counts
}
fn failure_witness_signature(event: &TraceEvent) -> Option<String> {
match event.kind {
TraceEventKind::ObligationLeak
| TraceEventKind::FuturelockDetected
| TraceEventKind::IoError
| TraceEventKind::ChaosInjection => {
Some(format!("{}:{:?}", event.kind.stable_name(), event.data))
}
TraceEventKind::UserTrace => match &event.data {
TraceData::Message(message) if message_marks_failure(message) => {
Some(format!("{}:{message}", event.kind.stable_name()))
}
_ => None,
},
_ => None,
}
}
fn event_witnesses_violation(
event: &TraceEvent,
oracle_name: &str,
violation: &TransferViolation,
) -> bool {
if structural_event_witnesses_violation(event, oracle_name, violation) {
return true;
}
let TraceData::Message(message) = &event.data else {
return false;
};
if !message_marks_failure(message) {
return false;
}
let message = message.to_ascii_lowercase();
violation_aliases(oracle_name, violation)
.iter()
.any(|alias| message.contains(alias))
}
fn structural_event_witnesses_violation(
event: &TraceEvent,
oracle_name: &str,
violation: &TransferViolation,
) -> bool {
let aliases = violation_aliases(oracle_name, violation);
match event.kind {
TraceEventKind::ObligationLeak => aliases
.iter()
.any(|alias| alias.contains("obligation") || alias.contains("leak")),
TraceEventKind::FuturelockDetected => aliases.iter().any(|alias| {
alias.contains("quiescence")
|| alias.contains("pending")
|| alias.contains("obligation")
|| alias.contains("leak")
}),
TraceEventKind::IoError => aliases
.iter()
.any(|alias| alias.contains("path") || alias.contains("journal")),
TraceEventKind::ChaosInjection => aliases
.iter()
.any(|alias| alias.contains("crash") || alias.contains("journal")),
_ => false,
}
}
fn violation_aliases(oracle_name: &str, violation: &TransferViolation) -> BTreeSet<String> {
let mut aliases = BTreeSet::new();
push_alias(&mut aliases, oracle_name);
push_alias(&mut aliases, &violation.violation_type);
push_alias(&mut aliases, &violation.description);
let known = format!(
"{} {}",
normalized_identifier(oracle_name),
normalized_identifier(&violation.violation_type)
);
if known.contains("manifest") {
aliases.extend([
"manifest".to_string(),
"manifest integrity".to_string(),
"manifest corruption".to_string(),
"hash".to_string(),
]);
}
if known.contains("journal") {
aliases.extend([
"journal".to_string(),
"journal consistency".to_string(),
"journal gap".to_string(),
]);
}
if known.contains("quiescence") {
aliases.extend([
"quiescence".to_string(),
"non quiescence".to_string(),
"pending operation".to_string(),
]);
}
if known.contains("obligation") || known.contains("leak") {
aliases.extend([
"obligation".to_string(),
"obligation leak".to_string(),
"leak".to_string(),
]);
}
if known.contains("path") {
aliases.extend([
"path".to_string(),
"path consistency".to_string(),
"route".to_string(),
]);
}
if known.contains("proof") {
aliases.extend([
"proof".to_string(),
"proof bundle".to_string(),
"invalid proof".to_string(),
"proof invalid".to_string(),
]);
}
aliases
}
fn push_alias(aliases: &mut BTreeSet<String>, raw: &str) {
let normalized = normalized_identifier(raw);
if normalized.is_empty() {
return;
}
aliases.insert(normalized.clone());
aliases.insert(normalized.replace('_', " "));
}
fn normalized_identifier(raw: &str) -> String {
let mut normalized = String::with_capacity(raw.len());
let mut last_was_separator = false;
for ch in raw.chars() {
if ch.is_ascii_alphanumeric() {
normalized.push(ch.to_ascii_lowercase());
last_was_separator = false;
} else if !last_was_separator {
normalized.push('_');
last_was_separator = true;
}
}
normalized.trim_matches('_').to_string()
}
fn message_marks_failure(message: &str) -> bool {
let lower = message.to_ascii_lowercase();
[
"atp violation",
"oracle violation",
"violation",
"failure",
"panic",
"corrupt",
"leak",
"error",
]
.iter()
.any(|marker| lower.contains(marker))
}
fn event_identity(event: &TraceEvent) -> String {
format!(
"{}:{}:{:?}",
event.seq,
event.kind.stable_name(),
event.data
)
}
fn violation_label(oracle_name: &str, violation: &TransferViolation) -> String {
if violation.violation_type.is_empty() || violation.violation_type == oracle_name {
oracle_name.to_string()
} else {
format!("{}/{}", oracle_name, violation.violation_type)
}
}
fn summarize_transfer_violations(violations: &[TransferViolation]) -> Option<String> {
if violations.is_empty() {
return None;
}
Some(
violations
.iter()
.map(|violation| {
format!(
"{} [{:?}]: {}",
violation.violation_type, violation.severity, violation.description
)
})
.collect::<Vec<_>>()
.join("; "),
)
}
fn env_suffix(name: &str) -> String {
let mut suffix = String::with_capacity(name.len());
for ch in name.chars() {
if ch.is_ascii_alphanumeric() {
suffix.push(ch.to_ascii_uppercase());
} else {
suffix.push('_');
}
}
let suffix = suffix.trim_matches('_');
if suffix.is_empty() {
"VALUE".to_string()
} else {
suffix.to_string()
}
}
const REQUIRED_REPLAY_ARTIFACTS: &[&str] = &[
"transfer.atp-trace",
"manifest",
"journal",
"journal.digest",
"evidence-ledger.json",
"pathlog",
"quiclog",
"repairlog",
"replay_command.sh",
];
const REQUIRED_LEDGER_ARTIFACT_PATHS: &[&str] = &[
"transfer.atp-trace",
"manifest",
"journal",
"journal.digest",
"evidence-ledger.json",
"pathlog",
"quiclog",
"repairlog",
"replay_command.sh",
];
const REQUIRED_REPLAY_COMMAND_ARTIFACT_FLAGS: &[(&str, &str)] = &[
("--trace-file", "transfer.atp-trace"),
("--manifest", "manifest"),
("--journal-digest", "journal.digest"),
("--evidence-ledger", "evidence-ledger.json"),
("--pathlog", "pathlog"),
("--quiclog", "quiclog"),
("--repairlog", "repairlog"),
];
fn read_replay_artifact(path: &Path) -> Result<String, ReplayError> {
std::fs::read_to_string(path).map_err(|err| {
replay_validation_failed(format!(
"failed to read replay artifact {}: {err}",
path.display()
))
})
}
fn read_trace_events(output_dir: &Path) -> Result<Vec<TraceEvent>, ReplayError> {
let trace_path = output_dir.join("transfer.atp-trace");
let trace_json = read_replay_artifact(&trace_path)?;
serde_json::from_str::<Vec<TraceEvent>>(&trace_json).map_err(|err| {
replay_validation_failed(format!(
"transfer.atp-trace is not a TraceEvent list: {err}"
))
})
}
fn validate_specialized_replay_log(
output_dir: &Path,
artifact: &str,
trace_events: &[TraceEvent],
include: impl Fn(&TraceEvent) -> bool,
) -> Result<(), ReplayError> {
let actual = read_replay_artifact(&output_dir.join(artifact))?;
let expected = atp_specialized_log(trace_events, include);
if actual != expected {
return Err(replay_validation_failed(format!(
"{artifact} mismatch: emitted log does not match transfer.atp-trace projection"
)));
}
Ok(())
}
fn read_evidence_ledger(output_dir: &Path) -> Result<AtpEvidenceLedger, ReplayError> {
let ledger_path = output_dir.join("evidence-ledger.json");
let ledger_json = read_replay_artifact(&ledger_path)?;
AtpEvidenceLedger::import_json(&ledger_json)
.map_err(|err| replay_validation_failed(format!("evidence-ledger.json is invalid: {err}")))
}
fn journal_digest_ref(journal_data: &str) -> String {
let digest = Sha256::digest(journal_data.as_bytes());
format!("sha256:{}", hex::encode(digest))
}
fn keyed_value<'a>(text: &'a str, key: &str) -> Option<&'a str> {
text.lines().find_map(|line| {
let (candidate, value) = line.split_once(':')?;
(candidate.trim() == key).then_some(value.trim())
})
}
fn validate_manifest_reference(
manifest: &str,
key: &str,
expected: &str,
) -> Result<(), ReplayError> {
let actual = keyed_value(manifest, key)
.ok_or_else(|| replay_validation_failed(format!("manifest is missing {key} field")))?;
if actual != expected {
return Err(replay_validation_failed(format!(
"manifest {key} mismatch: recorded {actual}, expected {expected}"
)));
}
Ok(())
}
fn validate_manifest_seed_and_metadata(
manifest: &str,
ledger: &AtpEvidenceLedger,
) -> Result<(), ReplayError> {
let manifest_metadata = parse_manifest_metadata(manifest)?;
if manifest_metadata != ledger.metadata {
return Err(replay_validation_failed(format!(
"manifest metadata mismatch: manifest {:?}, evidence ledger {:?}",
manifest_metadata, ledger.metadata
)));
}
let manifest_seeds = parse_manifest_seeds(manifest)?;
if manifest_seeds != ledger.seeds {
return Err(replay_validation_failed(format!(
"manifest seed mismatch: manifest {:?}, evidence ledger {:?}",
manifest_seeds, ledger.seeds
)));
}
Ok(())
}
fn parse_manifest_metadata(manifest: &str) -> Result<BTreeMap<String, String>, ReplayError> {
let mut metadata = BTreeMap::new();
for (line_index, raw_line) in manifest.lines().enumerate() {
let line_number = line_index + 1;
let line = raw_line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
let Some(rest) = line.strip_prefix("metadata.") else {
continue;
};
let (key, value) = rest.split_once(':').ok_or_else(|| {
replay_validation_failed(format!(
"manifest metadata field is invalid at line {line_number}: {line}"
))
})?;
let key = key.trim();
if key.is_empty() {
return Err(replay_validation_failed(format!(
"manifest metadata field has empty key at line {line_number}"
)));
}
if metadata
.insert(key.to_string(), value.trim().to_string())
.is_some()
{
return Err(replay_validation_failed(format!(
"manifest metadata field {key} is duplicated"
)));
}
}
Ok(metadata)
}
fn parse_manifest_seeds(manifest: &str) -> Result<BTreeMap<String, u64>, ReplayError> {
let mut seeds = BTreeMap::new();
let mut in_seeds = false;
for (line_index, raw_line) in manifest.lines().enumerate() {
let line_number = line_index + 1;
let indent = raw_line
.chars()
.take_while(|ch| ch.is_ascii_whitespace())
.count();
let line = raw_line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if indent == 0 {
in_seeds = line == "seeds:";
continue;
}
if !in_seeds {
continue;
}
let (name, seed) = line.split_once(':').ok_or_else(|| {
replay_validation_failed(format!(
"manifest seed field is invalid at line {line_number}: {line}"
))
})?;
let name = name.trim();
if name.is_empty() {
return Err(replay_validation_failed(format!(
"manifest seed field has empty name at line {line_number}"
)));
}
let seed = seed.trim().parse::<u64>().map_err(|err| {
replay_validation_failed(format!(
"manifest seed {name} is invalid at line {line_number}: {err}"
))
})?;
if seeds.insert(name.to_string(), seed).is_some() {
return Err(replay_validation_failed(format!(
"manifest seed {name} is duplicated"
)));
}
}
Ok(seeds)
}
fn validate_replay_command(output_dir: &Path, replay_command: &str) -> Result<(), ReplayError> {
let tokens = replay_invocation_tokens(replay_command)?;
for (flag, artifact) in REQUIRED_REPLAY_COMMAND_ARTIFACT_FLAGS {
let actual = replay_command_flag_value(&tokens, flag)?;
if !replay_command_arg_matches_artifact(&actual, output_dir, artifact) {
return Err(replay_validation_failed(format!(
"replay command flag {flag} mismatch: recorded {actual}, expected {artifact}"
)));
}
}
if !tokens.iter().any(|token| token == "--validate-oracles") {
return Err(replay_validation_failed(
"replay command is missing --validate-oracles",
));
}
Ok(())
}
fn replay_invocation_tokens(replay_command: &str) -> Result<Vec<String>, ReplayError> {
for line in replay_command.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') || line.starts_with("export ") {
continue;
}
let tokens = shell_tokens(line)?;
if let Some(start) = tokens.windows(3).position(|window| {
window[0] == "asupersync" && window[1] == "atp" && window[2] == "replay"
}) {
return Ok(tokens[start..].to_vec());
}
}
Err(replay_validation_failed(
"replay command is missing asupersync atp replay invocation",
))
}
fn replay_command_flag_value(tokens: &[String], flag: &str) -> Result<String, ReplayError> {
let mut value = None;
let mut count = 0usize;
for (index, token) in tokens.iter().enumerate() {
if token != flag {
continue;
}
count += 1;
let Some(next) = tokens.get(index + 1) else {
return Err(replay_validation_failed(format!(
"replay command flag {flag} is missing a value"
)));
};
if next.starts_with("--") {
return Err(replay_validation_failed(format!(
"replay command flag {flag} is missing a value"
)));
}
value = Some(next.clone());
}
match (count, value) {
(0, _) => Err(replay_validation_failed(format!(
"replay command is missing {flag}"
))),
(1, Some(value)) => Ok(value),
_ => Err(replay_validation_failed(format!(
"replay command has duplicate {flag} flags"
))),
}
}
fn replay_command_arg_matches_artifact(actual: &str, output_dir: &Path, artifact: &str) -> bool {
actual == artifact || actual == output_dir.join(artifact).display().to_string()
}
fn shell_tokens(line: &str) -> Result<Vec<String>, ReplayError> {
let mut tokens = Vec::new();
let mut current = String::new();
let mut token_started = false;
let mut in_single_quote = false;
let mut in_double_quote = false;
for ch in line.chars() {
if in_single_quote {
if ch == '\'' {
in_single_quote = false;
} else {
current.push(ch);
}
continue;
}
if in_double_quote {
if ch == '"' {
in_double_quote = false;
} else {
current.push(ch);
}
continue;
}
match ch {
'\'' => {
token_started = true;
in_single_quote = true;
}
'"' => {
token_started = true;
in_double_quote = true;
}
ch if ch.is_ascii_whitespace() => {
if token_started {
tokens.push(std::mem::take(&mut current));
token_started = false;
}
}
_ => {
token_started = true;
current.push(ch);
}
}
}
if in_single_quote || in_double_quote {
return Err(replay_validation_failed(
"replay command contains unterminated shell quote",
));
}
if token_started {
tokens.push(current);
}
Ok(tokens)
}
fn replay_validation_failed(message: impl Into<String>) -> ReplayError {
ReplayError::ReplayValidationFailed(message.into())
}
fn parse_journal_oracle_results(journal: &str) -> Result<Vec<TransferOracleResult>, ReplayError> {
let mut results = Vec::new();
let mut pending_oracle: Option<PendingJournalOracle> = None;
for (line_index, raw_line) in journal.lines().enumerate() {
let line_number = line_index + 1;
let indent = raw_line
.chars()
.take_while(|ch| ch.is_ascii_whitespace())
.count();
let line = raw_line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if let Some(name) = line.strip_prefix("oracle:") {
if let Some(oracle) = pending_oracle.take() {
results.push(oracle.into_result()?);
}
let name = name.trim();
if name.is_empty() {
return Err(replay_validation_failed(format!(
"journal has empty oracle name at line {line_number}"
)));
}
pending_oracle = Some(PendingJournalOracle::new(name));
continue;
}
let oracle = pending_oracle.as_mut().ok_or_else(|| {
replay_validation_failed(format!(
"journal field appeared before oracle header at line {line_number}"
))
})?;
if indent >= 8 {
let (key, value) = line.split_once(':').ok_or_else(|| {
replay_validation_failed(format!(
"journal evidence field is invalid at line {line_number}: {line}"
))
})?;
let violation = oracle.current_violation_mut(line_number)?;
violation
.evidence
.insert(key.trim().to_string(), value.trim().to_string());
} else if let Some(value) = line.strip_prefix("events_recorded:") {
oracle.events_recorded = Some(parse_usize_journal_field(
"events_recorded",
value,
line_number,
)?);
} else if let Some(value) = line.strip_prefix("entities_tracked:") {
oracle.entities_tracked = Some(parse_usize_journal_field(
"entities_tracked",
value,
line_number,
)?);
} else if let Some(value) = line.strip_prefix("passed:") {
oracle.passed = Some(parse_bool_journal_field("passed", value, line_number)?);
} else if line == "violations:" || line == "evidence:" {
} else if let Some(value) = line.strip_prefix("- type:") {
oracle.finish_violation()?;
oracle.current_violation = Some(PendingJournalViolation::new(value.trim()));
} else if let Some(value) = line.strip_prefix("severity:") {
let violation = oracle.current_violation_mut(line_number)?;
violation.severity = Some(parse_violation_severity(value.trim(), line_number)?);
} else if let Some(value) = line.strip_prefix("description:") {
let violation = oracle.current_violation_mut(line_number)?;
violation.description = Some(value.trim().to_string());
} else if let Some((key, value)) = line.split_once(':') {
let violation = oracle.current_violation_mut(line_number)?;
violation
.evidence
.insert(key.trim().to_string(), value.trim().to_string());
} else {
return Err(replay_validation_failed(format!(
"unsupported journal line {line_number}: {line}"
)));
}
}
if let Some(oracle) = pending_oracle.take() {
results.push(oracle.into_result()?);
}
Ok(results)
}
fn parse_usize_journal_field(
field: &str,
value: &str,
line_number: usize,
) -> Result<usize, ReplayError> {
value.trim().parse::<usize>().map_err(|err| {
replay_validation_failed(format!(
"journal {field} field is invalid at line {line_number}: {err}"
))
})
}
fn parse_bool_journal_field(
field: &str,
value: &str,
line_number: usize,
) -> Result<bool, ReplayError> {
value.trim().parse::<bool>().map_err(|err| {
replay_validation_failed(format!(
"journal {field} field is invalid at line {line_number}: {err}"
))
})
}
fn parse_violation_severity(
value: &str,
line_number: usize,
) -> Result<ViolationSeverity, ReplayError> {
match value {
"Low" => Ok(ViolationSeverity::Low),
"Medium" => Ok(ViolationSeverity::Medium),
"High" => Ok(ViolationSeverity::High),
"Critical" => Ok(ViolationSeverity::Critical),
_ => Err(replay_validation_failed(format!(
"journal severity field is invalid at line {line_number}: {value}"
))),
}
}
fn validate_journal_matches_manifest(
output_dir: &Path,
oracle_results: &[TransferOracleResult],
) -> Result<(), ReplayError> {
let manifest = read_replay_artifact(&output_dir.join("manifest"))?;
let manifest_violations = keyed_value(&manifest, "violations")
.ok_or_else(|| replay_validation_failed("manifest is missing violations field"))?
.parse::<usize>()
.map_err(|err| {
replay_validation_failed(format!("manifest violations field is invalid: {err}"))
})?;
validate_manifest_violation_count(manifest_violations, oracle_results)?;
Ok(())
}
fn validate_manifest_violation_count(
manifest_violations: usize,
oracle_results: &[TransferOracleResult],
) -> Result<(), ReplayError> {
let journal_violations = oracle_results
.iter()
.map(|result| result.violations.len())
.sum::<usize>();
if journal_violations != manifest_violations {
return Err(replay_validation_failed(format!(
"journal violation count mismatch: journal {journal_violations}, manifest {manifest_violations}"
)));
}
Ok(())
}
fn validate_journal_matches_ledger(
ledger: &AtpEvidenceLedger,
oracle_results: &[TransferOracleResult],
) -> Result<(), ReplayError> {
let journal_by_oracle = oracle_results
.iter()
.map(|result| (result.oracle_name.as_str(), result))
.collect::<BTreeMap<_, _>>();
let ledger_oracles = ledger
.entries
.iter()
.map(|entry| entry.oracle_name.as_str())
.collect::<BTreeSet<_>>();
for entry in &ledger.entries {
let result = journal_by_oracle
.get(entry.oracle_name.as_str())
.ok_or_else(|| {
replay_validation_failed(format!(
"evidence ledger oracle {} is missing from journal",
entry.oracle_name
))
})?;
validate_ledger_entry_matches_journal(entry, result)?;
}
for result in oracle_results {
if !ledger_oracles.contains(result.oracle_name.as_str()) {
return Err(replay_validation_failed(format!(
"journal oracle {} is missing from evidence ledger",
result.oracle_name
)));
}
}
Ok(())
}
fn validate_ledger_entry_matches_journal(
entry: &AtpEvidenceEntry,
result: &TransferOracleResult,
) -> Result<(), ReplayError> {
if entry.evidence.invariant != entry.oracle_name {
return Err(replay_validation_failed(format!(
"evidence ledger entry {} has mismatched invariant {}",
entry.oracle_name, entry.evidence.invariant
)));
}
let Some(artifact_path) = entry.artifact_path.as_deref() else {
return Err(replay_validation_failed(format!(
"evidence ledger entry {} is missing artifact path",
entry.oracle_name
)));
};
if artifact_path != Path::new("transfer.atp-trace") {
return Err(replay_validation_failed(format!(
"evidence ledger entry {} artifact path mismatch: recorded {}, expected transfer.atp-trace",
entry.oracle_name,
artifact_path.display()
)));
}
if entry.evidence.passed != result.passed {
return Err(replay_validation_failed(format!(
"evidence ledger oracle {} passed status mismatch: ledger {}, journal {}",
entry.oracle_name, entry.evidence.passed, result.passed
)));
}
let ledger_reports_failure =
evidence_strength_reports_violation(&entry.evidence.bayes_factor.strength);
if result.passed && ledger_reports_failure {
return Err(replay_validation_failed(format!(
"evidence ledger oracle {} reports violation strength for passed journal oracle",
entry.oracle_name
)));
}
if !result.passed && !ledger_reports_failure {
return Err(replay_validation_failed(format!(
"evidence ledger oracle {} does not record the journal oracle failure",
entry.oracle_name
)));
}
Ok(())
}
fn evidence_strength_reports_violation(strength: &EvidenceStrength) -> bool {
matches!(
strength,
EvidenceStrength::Positive | EvidenceStrength::Strong | EvidenceStrength::VeryStrong
)
}
struct PendingJournalOracle {
oracle_name: String,
entities_tracked: Option<usize>,
events_recorded: Option<usize>,
passed: Option<bool>,
violations: Vec<TransferViolation>,
current_violation: Option<PendingJournalViolation>,
}
impl PendingJournalOracle {
fn new(name: &str) -> Self {
Self {
oracle_name: name.to_string(),
entities_tracked: None,
events_recorded: None,
passed: None,
violations: Vec::new(),
current_violation: None,
}
}
fn current_violation_mut(
&mut self,
line_number: usize,
) -> Result<&mut PendingJournalViolation, ReplayError> {
self.current_violation.as_mut().ok_or_else(|| {
replay_validation_failed(format!(
"journal violation field appeared before violation type at line {line_number}"
))
})
}
fn finish_violation(&mut self) -> Result<(), ReplayError> {
if let Some(violation) = self.current_violation.take() {
self.violations.push(violation.into_violation()?);
}
Ok(())
}
fn into_result(mut self) -> Result<TransferOracleResult, ReplayError> {
self.finish_violation()?;
let passed = self.passed.ok_or_else(|| {
replay_validation_failed(format!(
"journal oracle {} is missing passed field",
self.oracle_name
))
})?;
let entities_tracked = self.entities_tracked.ok_or_else(|| {
replay_validation_failed(format!(
"journal oracle {} is missing entities_tracked field",
self.oracle_name
))
})?;
let events_recorded = self.events_recorded.ok_or_else(|| {
replay_validation_failed(format!(
"journal oracle {} is missing events_recorded field",
self.oracle_name
))
})?;
if passed && !self.violations.is_empty() {
return Err(replay_validation_failed(format!(
"journal oracle {} is marked passed but contains violations",
self.oracle_name
)));
}
if !passed && self.violations.is_empty() {
return Err(replay_validation_failed(format!(
"journal oracle {} is marked failed but contains no violations",
self.oracle_name
)));
}
Ok(TransferOracleResult {
oracle_name: self.oracle_name,
violations: self.violations,
stats: OracleStats {
entities_tracked,
events_recorded,
},
passed,
})
}
}
struct PendingJournalViolation {
violation_type: String,
severity: Option<ViolationSeverity>,
description: Option<String>,
evidence: BTreeMap<String, String>,
}
impl PendingJournalViolation {
fn new(violation_type: &str) -> Self {
Self {
violation_type: violation_type.to_string(),
severity: None,
description: None,
evidence: BTreeMap::new(),
}
}
fn into_violation(self) -> Result<TransferViolation, ReplayError> {
if self.violation_type.is_empty() {
return Err(replay_validation_failed(
"journal violation is missing violation type",
));
}
let severity = self.severity.ok_or_else(|| {
replay_validation_failed(format!(
"journal violation {} is missing severity",
self.violation_type
))
})?;
let description = self.description.ok_or_else(|| {
replay_validation_failed(format!(
"journal violation {} is missing description",
self.violation_type
))
})?;
Ok(TransferViolation {
violation_type: self.violation_type,
description,
severity,
evidence: self.evidence,
})
}
}
fn shell_path_arg(path: &Path) -> String {
shell_arg(&path.display().to_string())
}
fn shell_arg(raw: &str) -> String {
if !raw.is_empty() && raw.bytes().all(shell_safe_byte) {
return raw.to_string();
}
let mut quoted = String::with_capacity(raw.len() + 2);
quoted.push('\'');
for ch in raw.chars() {
if ch == '\'' {
quoted.push_str("'\"'\"'");
} else {
quoted.push(ch);
}
}
quoted.push('\'');
quoted
}
fn shell_safe_byte(byte: u8) -> bool {
byte.is_ascii_alphanumeric()
|| matches!(
byte,
b'_' | b'-' | b'.' | b'/' | b':' | b'@' | b'%' | b'+' | b'=' | b','
)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AtpReplayResult {
pub original_violations: usize,
pub minimized_trace_length: usize,
pub replay_successful: bool,
pub oracle_results: Vec<OracleReport>,
pub minimization_stats: MinimizationStats,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AtpReplayArtifactReport {
pub trace_events: usize,
pub ledger_entries: usize,
pub violation_entries: usize,
pub artifact_paths: Vec<PathBuf>,
pub journal_digest: String,
pub replay_ready: bool,
}
#[derive(Debug, Clone)]
pub struct MinimizationResult {
pub minimized_events: Vec<TraceEvent>,
pub stats: MinimizationStats,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MinimizationStats {
pub original_length: usize,
pub minimized_length: usize,
pub reduction_percentage: f64,
pub attempts_made: usize,
}
impl Default for MinimizationStats {
fn default() -> Self {
Self {
original_length: 0,
minimized_length: 0,
reduction_percentage: 0.0,
attempts_made: 0,
}
}
}
impl MinimizationStats {
pub fn summary_text(&self) -> String {
format!(
"Minimization: {} → {} events ({:.1}% reduction) in {} attempts",
self.original_length,
self.minimized_length,
self.reduction_percentage * 100.0,
self.attempts_made
)
}
}
#[derive(Debug, Error)]
pub enum ReplayError {
#[error("Trace minimization failed: {0}")]
MinimizationFailed(String),
#[error("Replay validation failed: {0}")]
ReplayValidationFailed(String),
#[error("IO error during replay: {0}")]
Io(#[from] std::io::Error),
#[error("Oracle validation failed: {0}")]
OracleValidationFailed(String),
}