use std::collections::HashMap;
use std::time::{Duration, Instant};
use sha2::{Digest, Sha256};
pub const DEFAULT_RETENTION: Duration = Duration::from_secs(30 * 24 * 60 * 60);
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct StepAuditRecord {
pub step_name: String,
pub step_index: usize,
pub success: bool,
pub tokens_emitted: u64,
pub output_hash_hex: String,
pub effect_policy_applied: Option<String>,
pub chunks_dropped: u64,
pub chunks_degraded: u64,
pub timestamp_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_chunks_emitted: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_output_hash_hex: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_terminator_kind: Option<String>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct AxonendpointReplayEntry {
pub trace_id: String,
pub timestamp_ms: u64,
pub endpoint_name: String,
pub flow_name: String,
pub method: String,
pub path: String,
pub client_id: String,
pub capabilities_used: Vec<String>,
pub request_body_hash_hex: String,
pub request_body: Vec<u8>,
pub response_status: u16,
pub response_body_hash_hex: String,
pub response_content_type: String,
pub response_body: Vec<u8>,
pub model_version: String,
pub deterministic: bool,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub step_audit: Vec<StepAuditRecord>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub runtime_warnings: Vec<crate::runtime_warnings::RuntimeWarning>,
}
#[derive(Debug)]
pub struct AxonendpointReplayLog {
entries: HashMap<String, AxonendpointReplayEntry>,
inserted_at: HashMap<String, Instant>,
capacity: usize,
retention: Duration,
}
impl Default for AxonendpointReplayLog {
fn default() -> Self {
Self::new(10_000, DEFAULT_RETENTION)
}
}
impl AxonendpointReplayLog {
pub fn new(capacity: usize, retention: Duration) -> Self {
Self {
entries: HashMap::new(),
inserted_at: HashMap::new(),
capacity,
retention,
}
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn hash_body_hex(body: &[u8]) -> String {
let mut h = Sha256::new();
h.update(body);
let digest = h.finalize();
let mut s = String::with_capacity(64);
for byte in digest.iter() {
s.push_str(&format!("{byte:02x}"));
}
s
}
pub fn append(&mut self, entry: AxonendpointReplayEntry) {
let key = entry.trace_id.clone();
if !self.entries.contains_key(&key) && self.entries.len() >= self.capacity {
if let Some(oldest_key) = self
.inserted_at
.iter()
.min_by_key(|(_, t)| **t)
.map(|(k, _)| k.clone())
{
self.entries.remove(&oldest_key);
self.inserted_at.remove(&oldest_key);
}
}
self.inserted_at.insert(key.clone(), Instant::now());
self.entries.insert(key, entry);
}
pub fn get(&self, trace_id: &str) -> Option<&AxonendpointReplayEntry> {
self.entries.get(trace_id)
}
pub fn reap_expired(&mut self) -> usize {
let now = Instant::now();
let retention = self.retention;
let before = self.entries.len();
let to_remove: Vec<String> = self
.inserted_at
.iter()
.filter(|(_, t)| now.duration_since(**t) > retention)
.map(|(k, _)| k.clone())
.collect();
for k in &to_remove {
self.entries.remove(k);
self.inserted_at.remove(k);
}
before - self.entries.len()
}
}
pub fn resolve_replay_enabled(method: &str, replay_explicit: bool, replay: bool) -> bool {
if replay_explicit {
return replay;
}
matches!(method, "POST" | "PUT")
}
pub fn is_backend_deterministic(backend: &str) -> bool {
backend == "stub"
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_post_replay_enabled() {
assert!(resolve_replay_enabled("POST", false, false));
}
#[test]
fn default_put_replay_enabled() {
assert!(resolve_replay_enabled("PUT", false, false));
}
#[test]
fn default_get_replay_disabled() {
assert!(!resolve_replay_enabled("GET", false, false));
}
#[test]
fn default_delete_replay_disabled() {
assert!(!resolve_replay_enabled("DELETE", false, false));
}
#[test]
fn explicit_false_overrides_post_default() {
assert!(!resolve_replay_enabled("POST", true, false));
}
#[test]
fn explicit_true_overrides_get_default() {
assert!(resolve_replay_enabled("GET", true, true));
}
#[test]
fn stub_backend_is_deterministic() {
assert!(is_backend_deterministic("stub"));
}
#[test]
fn llm_backend_is_not_deterministic_by_default() {
assert!(!is_backend_deterministic("anthropic"));
assert!(!is_backend_deterministic("openai"));
}
fn make_entry(trace_id: &str) -> AxonendpointReplayEntry {
AxonendpointReplayEntry {
trace_id: trace_id.to_string(),
timestamp_ms: 0,
endpoint_name: "E".to_string(),
flow_name: "F".to_string(),
method: "POST".to_string(),
path: "/p".to_string(),
client_id: "anon".to_string(),
capabilities_used: vec![],
request_body_hash_hex: AxonendpointReplayLog::hash_body_hex(b"{}"),
request_body: b"{}".to_vec(),
response_status: 200,
response_body_hash_hex: AxonendpointReplayLog::hash_body_hex(b"ok"),
response_content_type: "application/json".to_string(),
response_body: b"ok".to_vec(),
model_version: "axon.runtime.dynamic_route.v1".to_string(),
deterministic: true,
step_audit: Vec::new(),
runtime_warnings: Vec::new(),
}
}
#[test]
fn log_append_and_get_round_trip() {
let mut log = AxonendpointReplayLog::default();
let e = make_entry("t1");
log.append(e);
let got = log.get("t1").expect("entry must be present");
assert_eq!(got.trace_id, "t1");
assert_eq!(got.response_body, b"ok");
}
#[test]
fn get_unknown_trace_id_returns_none() {
let log = AxonendpointReplayLog::default();
assert!(log.get("nope").is_none());
}
#[test]
fn same_trace_id_overwrite_in_place() {
let mut log = AxonendpointReplayLog::default();
let mut e1 = make_entry("t1");
e1.response_body = b"first".to_vec();
log.append(e1);
let mut e2 = make_entry("t1");
e2.response_body = b"second".to_vec();
log.append(e2);
assert_eq!(log.len(), 1);
assert_eq!(log.get("t1").unwrap().response_body, b"second");
}
#[test]
fn capacity_eviction_drops_oldest() {
let mut log = AxonendpointReplayLog::new(2, DEFAULT_RETENTION);
log.append(make_entry("a"));
std::thread::sleep(Duration::from_millis(1));
log.append(make_entry("b"));
std::thread::sleep(Duration::from_millis(1));
log.append(make_entry("c"));
assert_eq!(log.len(), 2);
assert!(log.get("a").is_none(), "oldest must be evicted");
assert!(log.get("c").is_some());
}
#[test]
fn reap_expired_removes_old_entries() {
let mut log = AxonendpointReplayLog::new(10, Duration::from_millis(0));
log.append(make_entry("t1"));
log.append(make_entry("t2"));
std::thread::sleep(Duration::from_millis(2));
assert_eq!(log.reap_expired(), 2);
assert!(log.is_empty());
}
#[test]
fn hash_body_hex_is_64_chars_lowercase() {
let h = AxonendpointReplayLog::hash_body_hex(b"hello");
assert_eq!(h.len(), 64);
for c in h.chars() {
assert!(c.is_ascii_hexdigit() && !c.is_ascii_uppercase());
}
}
#[test]
fn hash_body_hex_deterministic() {
let a = AxonendpointReplayLog::hash_body_hex(b"{\"x\":1}");
let b = AxonendpointReplayLog::hash_body_hex(b"{\"x\":1}");
assert_eq!(a, b);
}
}