use std::collections::BTreeMap;
use crate::effect::EffectTraceEntry;
use crate::engine::ObsEvent;
use crate::trace::{normalize_trace, obs_session};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum EffectDeterminismTier {
#[default]
StrictDeterministic,
ReplayDeterministic,
EnvelopeBoundedNondeterministic,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum DeterminismMode {
Full,
ModuloEffects,
ModuloCommutativity,
Replay,
}
#[must_use]
pub fn replay_consistent(
mode: DeterminismMode,
baseline_trace: &[ObsEvent],
replay_trace: &[ObsEvent],
baseline_effect_trace: &[EffectTraceEntry],
replay_effect_trace: &[EffectTraceEntry],
) -> bool {
match mode {
DeterminismMode::Full => {
baseline_trace == replay_trace && baseline_effect_trace == replay_effect_trace
}
DeterminismMode::ModuloEffects => {
normalize_trace(baseline_trace) == normalize_trace(replay_trace)
}
DeterminismMode::ModuloCommutativity => {
commutativity_normalize(baseline_trace) == commutativity_normalize(replay_trace)
}
DeterminismMode::Replay => baseline_trace == replay_trace,
}
}
fn commutativity_normalize(trace: &[ObsEvent]) -> Vec<ObsEvent> {
let normalized = normalize_trace(trace);
let mut out = Vec::with_capacity(normalized.len());
let mut run = Vec::new();
for event in normalized {
if is_commutativity_eligible(&event) {
run.push(event);
} else {
flush_commutative_run(&mut out, &mut run);
out.push(event);
}
}
flush_commutative_run(&mut out, &mut run);
out
}
fn is_commutativity_eligible(event: &ObsEvent) -> bool {
obs_session(event).is_some()
}
fn flush_commutative_run(out: &mut Vec<ObsEvent>, run: &mut Vec<ObsEvent>) {
if run.is_empty() {
return;
}
let mut buckets: BTreeMap<usize, Vec<ObsEvent>> = BTreeMap::new();
for event in run.drain(..) {
if let Some(sid) = obs_session(&event) {
buckets.entry(sid).or_default().push(event);
} else {
out.push(event);
}
}
let mut cursors: BTreeMap<usize, usize> = buckets.keys().map(|sid| (*sid, 0)).collect();
loop {
let mut progressed = false;
for (sid, events) in &buckets {
if let Some(cursor) = cursors.get_mut(sid) {
if *cursor < events.len() {
out.push(events[*cursor].clone());
*cursor += 1;
progressed = true;
}
}
}
if !progressed {
break;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::session::Edge;
use serde_json::json;
fn entry(id: u64, kind: &str) -> EffectTraceEntry {
EffectTraceEntry {
effect_id: id,
effect_kind: kind.to_string(),
inputs: json!({}),
outputs: json!({}),
handler_identity: "h".to_string(),
effect_interface: None,
effect_operation: None,
ordering_key: id,
topology: None,
}
}
#[test]
fn full_mode_requires_exact_match() {
let trace = vec![ObsEvent::Halted {
tick: 1,
coro_id: 0,
}];
let effects = vec![entry(0, "send_decision")];
assert!(replay_consistent(
DeterminismMode::Full,
&trace,
&trace,
&effects,
&effects
));
assert!(!replay_consistent(
DeterminismMode::Full,
&trace,
&trace,
&effects,
&[]
));
}
#[test]
fn modulo_effects_ignores_effect_trace_differences() {
let left = vec![ObsEvent::Sent {
tick: 10,
edge: Edge::new(1, "A", "B"),
session: 1,
from: "A".to_string(),
to: "B".to_string(),
label: "m".to_string(),
}];
let right = vec![ObsEvent::Sent {
tick: 99,
edge: Edge::new(1, "A", "B"),
session: 1,
from: "A".to_string(),
to: "B".to_string(),
label: "m".to_string(),
}];
assert!(replay_consistent(
DeterminismMode::ModuloEffects,
&left,
&right,
&[entry(0, "send_decision")],
&[entry(9, "send_decision")]
));
}
#[test]
fn modulo_commutativity_ignores_cross_session_reorderings() {
let event_a = ObsEvent::Sent {
tick: 1,
edge: Edge::new(1, "A", "B"),
session: 1,
from: "A".to_string(),
to: "B".to_string(),
label: "x".to_string(),
};
let event_b = ObsEvent::Sent {
tick: 1,
edge: Edge::new(2, "C", "D"),
session: 2,
from: "C".to_string(),
to: "D".to_string(),
label: "y".to_string(),
};
let left = vec![event_a.clone(), event_b.clone()];
let right = vec![event_b, event_a];
assert!(replay_consistent(
DeterminismMode::ModuloCommutativity,
&left,
&right,
&[],
&[]
));
}
#[test]
fn modulo_commutativity_preserves_in_session_order() {
let a1 = ObsEvent::Sent {
tick: 1,
edge: Edge::new(1, "A", "B"),
session: 1,
from: "A".to_string(),
to: "B".to_string(),
label: "a1".to_string(),
};
let a2 = ObsEvent::Received {
tick: 2,
edge: Edge::new(1, "B", "A"),
session: 1,
from: "B".to_string(),
to: "A".to_string(),
label: "a2".to_string(),
};
let b1 = ObsEvent::Sent {
tick: 1,
edge: Edge::new(2, "C", "D"),
session: 2,
from: "C".to_string(),
to: "D".to_string(),
label: "b1".to_string(),
};
let baseline = vec![a1.clone(), b1, a2.clone()];
let invalid = vec![a2, a1];
assert!(!replay_consistent(
DeterminismMode::ModuloCommutativity,
&baseline,
&invalid,
&[],
&[]
));
}
#[test]
fn modulo_commutativity_keeps_non_session_barriers_fixed() {
let sent = ObsEvent::Sent {
tick: 1,
edge: Edge::new(1, "A", "B"),
session: 1,
from: "A".to_string(),
to: "B".to_string(),
label: "x".to_string(),
};
let barrier = ObsEvent::Halted {
tick: 2,
coro_id: 99,
};
let recv = ObsEvent::Received {
tick: 3,
edge: Edge::new(2, "C", "D"),
session: 2,
from: "C".to_string(),
to: "D".to_string(),
label: "y".to_string(),
};
let baseline = vec![sent.clone(), barrier.clone(), recv.clone()];
let reordered = vec![recv, barrier, sent];
assert!(!replay_consistent(
DeterminismMode::ModuloCommutativity,
&baseline,
&reordered,
&[],
&[]
));
}
#[test]
fn replay_mode_requires_exact_observation_trace() {
let left = vec![ObsEvent::Halted {
tick: 1,
coro_id: 0,
}];
let right = vec![ObsEvent::Halted {
tick: 2,
coro_id: 0,
}];
assert!(replay_consistent(
DeterminismMode::Replay,
&left,
&left,
&[],
&[]
));
assert!(!replay_consistent(
DeterminismMode::Replay,
&left,
&right,
&[],
&[]
));
}
}