use std::fs;
use std::time::Instant;
use chrono::Utc;
use crate::catalog::reliability_evidence_report_uri;
use crate::error::Result;
use crate::evidence::{
ReliabilityEvidenceInput, accumulate_replay_report, build_reliability_evidence_checks,
evidence_status,
};
use crate::models::{
MetadataFilter, QueueDiagnostics, ReliabilityEvidenceReport, ReliabilityQueueDelta,
ReliabilityReplayPlan, ReliabilityReplayProgress, ReliabilitySearchProbe, ReplayReport,
};
use crate::uri::{AxiomUri, Scope};
use super::AxiomSync;
struct ReliabilityRuntimeState {
replay_totals: ReplayReport,
replay_cycles: u32,
queue_after_replay: QueueDiagnostics,
queue_after_restart: QueueDiagnostics,
replay_hit_uri: Option<String>,
restart_hit_uri: Option<String>,
final_checkpoint: Option<i64>,
final_dead_letter: u64,
}
impl AxiomSync {
pub fn collect_reliability_evidence(
&self,
replay_limit: usize,
max_cycles: u32,
) -> Result<ReliabilityEvidenceReport> {
let request_id = uuid::Uuid::new_v4().to_string();
let started = Instant::now();
let replay_limit = replay_limit.max(1);
let max_cycles = max_cycles.max(1);
let mut seeded_probe_uri: Option<String> = None;
let output =
self.build_reliability_evidence_report(replay_limit, max_cycles, &mut seeded_probe_uri);
if let Some(uri) = seeded_probe_uri.as_deref()
&& let Err(cleanup_err) = self.cleanup_reliability_probe(uri)
{
self.log_request_warning(
request_id.clone(),
"reliability.evidence.cleanup",
started,
Some(uri.to_string()),
"reliability probe cleanup failed",
Some(serde_json::json!({
"error": cleanup_err.to_string(),
})),
);
}
match output {
Ok(report) => {
self.log_request_status(
request_id,
"reliability.evidence",
report.status.as_str(),
started,
Some(report.search_probe.queued_root_uri.clone()),
Some(serde_json::json!({
"replay_limit": report.replay_plan.replay_limit,
"max_cycles": report.replay_plan.max_cycles,
"replay_cycles": report.replay_progress.replay_cycles,
"passed": report.passed,
"report_uri": report.report_uri,
"replay_done": report.replay_progress.replay_totals.done,
"replay_dead_letter": report.replay_progress.replay_totals.dead_letter,
})),
);
Ok(report)
}
Err(err) => {
self.log_request_error(
request_id,
"reliability.evidence",
started,
None,
&err,
Some(serde_json::json!({
"replay_limit": replay_limit,
"max_cycles": max_cycles,
})),
);
Err(err)
}
}
}
fn build_reliability_evidence_report(
&self,
replay_limit: usize,
max_cycles: u32,
seeded_probe_uri: &mut Option<String>,
) -> Result<ReliabilityEvidenceReport> {
let baseline_queue = self.queue_diagnostics()?;
let baseline_dead_letter = baseline_queue.counts.dead_letter;
let baseline_checkpoint = self.state.get_checkpoint("replay")?;
let report_id = uuid::Uuid::new_v4().to_string();
let query = format!("axiomsync reliability evidence {report_id}");
let queued_root_uri_str = self.seed_reliability_probe(&report_id, &query)?;
*seeded_probe_uri = Some(queued_root_uri_str.clone());
let runtime = self.execute_reliability_runtime_state(
&query,
&queued_root_uri_str,
replay_limit,
max_cycles,
)?;
let checks = build_reliability_evidence_checks(&ReliabilityEvidenceInput {
replay_totals: &runtime.replay_totals,
queue_after_replay: &runtime.queue_after_replay,
baseline_dead_letter,
final_dead_letter: runtime.final_dead_letter,
baseline_checkpoint,
final_checkpoint: runtime.final_checkpoint,
replay_hit_uri: runtime.replay_hit_uri.as_deref(),
restart_hit_uri: runtime.restart_hit_uri.as_deref(),
});
let passed = checks.iter().all(|check| check.passed);
let status = evidence_status(passed);
let report_uri = reliability_evidence_report_uri(&report_id)?;
let report = ReliabilityEvidenceReport {
report_id,
created_at: Utc::now().to_rfc3339(),
passed,
status,
replay_plan: ReliabilityReplayPlan {
replay_limit,
max_cycles,
},
replay_progress: ReliabilityReplayProgress {
replay_cycles: runtime.replay_cycles,
replay_totals: runtime.replay_totals,
},
queue_delta: ReliabilityQueueDelta {
baseline_dead_letter,
final_dead_letter: runtime.final_dead_letter,
baseline_checkpoint,
final_checkpoint: runtime.final_checkpoint,
},
search_probe: ReliabilitySearchProbe {
queued_root_uri: queued_root_uri_str,
query,
replay_hit_uri: runtime.replay_hit_uri,
restart_hit_uri: runtime.restart_hit_uri,
},
queue: runtime.queue_after_restart,
checks,
report_uri: report_uri.to_string(),
};
self.fs
.write(&report_uri, &serde_json::to_string_pretty(&report)?, true)?;
Ok(report)
}
fn seed_reliability_probe(&self, report_id: &str, query: &str) -> Result<String> {
let source_path =
std::env::temp_dir().join(format!("axiomsync_reliability_{report_id}.txt"));
let source_path_str = source_path.to_string_lossy().to_string();
fs::write(&source_path, format!("{query}\n"))?;
let queued_root_uri = AxiomUri::root(Scope::Resources)
.join("reliability")?
.join("evidence")?
.join(report_id)?;
let queued_root_uri_str = queued_root_uri.to_string();
let add_result = self.add_resource(
&source_path_str,
Some(&queued_root_uri_str),
None,
None,
false,
None,
);
let _ = fs::remove_file(&source_path);
add_result?;
Ok(queued_root_uri_str)
}
fn cleanup_reliability_probe(&self, probe_root_uri: &str) -> Result<()> {
let probe_uri = AxiomUri::parse(probe_root_uri)?;
if !self.fs.exists(&probe_uri) {
return Ok(());
}
self.fs.rm(&probe_uri, true, true)?;
self.prune_index_prefix_from_memory(&probe_uri)?;
self.state
.remove_search_documents_with_prefix(probe_root_uri)?;
self.state.remove_index_state_with_prefix(probe_root_uri)?;
Ok(())
}
fn execute_reliability_runtime_state(
&self,
query: &str,
queued_root_uri_str: &str,
replay_limit: usize,
max_cycles: u32,
) -> Result<ReliabilityRuntimeState> {
let replay_app = Self::new(self.fs.root())?;
let mut replay_totals = ReplayReport::default();
let mut replay_cycles = 0u32;
for _ in 0..max_cycles {
replay_cycles = replay_cycles.saturating_add(1);
let report = replay_app.replay_outbox(replay_limit, false)?;
accumulate_replay_report(&mut replay_totals, &report);
let queue_state = replay_app.queue_diagnostics()?;
if queue_state.counts.new_due == 0 && queue_state.counts.processing == 0 {
break;
}
}
let queue_after_replay = replay_app.queue_diagnostics()?;
let replay_find = replay_app.find(
query,
Some(queued_root_uri_str),
Some(5),
None,
None::<MetadataFilter>,
)?;
let replay_hit_uri = replay_find.query_results.first().map(|hit| hit.uri.clone());
let restarted_app = Self::new(self.fs.root())?;
restarted_app.initialize()?;
let restart_find = restarted_app.find(
query,
Some(queued_root_uri_str),
Some(5),
None,
None::<MetadataFilter>,
)?;
let restart_hit_uri = restart_find
.query_results
.first()
.map(|hit| hit.uri.clone());
let queue_after_restart = restarted_app.queue_diagnostics()?;
let final_checkpoint = restarted_app.state.get_checkpoint("replay")?;
let final_dead_letter = queue_after_replay.counts.dead_letter;
Ok(ReliabilityRuntimeState {
replay_totals,
replay_cycles,
queue_after_replay,
queue_after_restart,
replay_hit_uri,
restart_hit_uri,
final_checkpoint,
final_dead_letter,
})
}
}