1use std::collections::BTreeMap;
4
5use crate::effect::EffectTraceEntry;
6use crate::engine::ObsEvent;
7use crate::trace::{normalize_trace, obs_session};
8use serde::{Deserialize, Serialize};
9
10#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
12#[serde(rename_all = "snake_case")]
13pub enum EffectDeterminismTier {
14 #[default]
16 StrictDeterministic,
17 ReplayDeterministic,
19 EnvelopeBoundedNondeterministic,
21}
22
23#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
25pub enum DeterminismMode {
26 Full,
28 ModuloEffects,
30 ModuloCommutativity,
32 Replay,
34}
35
36#[must_use]
38pub fn replay_consistent(
39 mode: DeterminismMode,
40 baseline_trace: &[ObsEvent],
41 replay_trace: &[ObsEvent],
42 baseline_effect_trace: &[EffectTraceEntry],
43 replay_effect_trace: &[EffectTraceEntry],
44) -> bool {
45 match mode {
46 DeterminismMode::Full => {
47 baseline_trace == replay_trace && baseline_effect_trace == replay_effect_trace
48 }
49 DeterminismMode::ModuloEffects => {
50 normalize_trace(baseline_trace) == normalize_trace(replay_trace)
51 }
52 DeterminismMode::ModuloCommutativity => {
53 commutativity_normalize(baseline_trace) == commutativity_normalize(replay_trace)
54 }
55 DeterminismMode::Replay => baseline_trace == replay_trace,
56 }
57}
58
59fn commutativity_normalize(trace: &[ObsEvent]) -> Vec<ObsEvent> {
60 let normalized = normalize_trace(trace);
64 let mut out = Vec::with_capacity(normalized.len());
65 let mut run = Vec::new();
66
67 for event in normalized {
68 if is_commutativity_eligible(&event) {
69 run.push(event);
70 } else {
71 flush_commutative_run(&mut out, &mut run);
72 out.push(event);
73 }
74 }
75 flush_commutative_run(&mut out, &mut run);
76 out
77}
78
79fn is_commutativity_eligible(event: &ObsEvent) -> bool {
80 obs_session(event).is_some()
81}
82
83fn flush_commutative_run(out: &mut Vec<ObsEvent>, run: &mut Vec<ObsEvent>) {
84 if run.is_empty() {
85 return;
86 }
87 let mut buckets: BTreeMap<usize, Vec<ObsEvent>> = BTreeMap::new();
88 for event in run.drain(..) {
89 if let Some(sid) = obs_session(&event) {
90 buckets.entry(sid).or_default().push(event);
91 } else {
92 out.push(event);
93 }
94 }
95 let mut cursors: BTreeMap<usize, usize> = buckets.keys().map(|sid| (*sid, 0)).collect();
96
97 loop {
98 let mut progressed = false;
100 for (sid, events) in &buckets {
101 if let Some(cursor) = cursors.get_mut(sid) {
102 if *cursor < events.len() {
103 out.push(events[*cursor].clone());
104 *cursor += 1;
105 progressed = true;
106 }
107 }
108 }
109 if !progressed {
110 break;
111 }
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118 use crate::session::Edge;
119 use serde_json::json;
120
121 fn entry(id: u64, kind: &str) -> EffectTraceEntry {
122 EffectTraceEntry {
123 effect_id: id,
124 effect_kind: kind.to_string(),
125 inputs: json!({}),
126 outputs: json!({}),
127 handler_identity: "h".to_string(),
128 effect_interface: None,
129 effect_operation: None,
130 ordering_key: id,
131 topology: None,
132 }
133 }
134
135 #[test]
136 fn full_mode_requires_exact_match() {
137 let trace = vec![ObsEvent::Halted {
138 tick: 1,
139 coro_id: 0,
140 }];
141 let effects = vec![entry(0, "send_decision")];
142 assert!(replay_consistent(
143 DeterminismMode::Full,
144 &trace,
145 &trace,
146 &effects,
147 &effects
148 ));
149 assert!(!replay_consistent(
150 DeterminismMode::Full,
151 &trace,
152 &trace,
153 &effects,
154 &[]
155 ));
156 }
157
158 #[test]
159 fn modulo_effects_ignores_effect_trace_differences() {
160 let left = vec![ObsEvent::Sent {
161 tick: 10,
162 edge: Edge::new(1, "A", "B"),
163 session: 1,
164 from: "A".to_string(),
165 to: "B".to_string(),
166 label: "m".to_string(),
167 }];
168 let right = vec![ObsEvent::Sent {
169 tick: 99,
170 edge: Edge::new(1, "A", "B"),
171 session: 1,
172 from: "A".to_string(),
173 to: "B".to_string(),
174 label: "m".to_string(),
175 }];
176 assert!(replay_consistent(
177 DeterminismMode::ModuloEffects,
178 &left,
179 &right,
180 &[entry(0, "send_decision")],
181 &[entry(9, "send_decision")]
182 ));
183 }
184
185 #[test]
186 fn modulo_commutativity_ignores_cross_session_reorderings() {
187 let event_a = ObsEvent::Sent {
188 tick: 1,
189 edge: Edge::new(1, "A", "B"),
190 session: 1,
191 from: "A".to_string(),
192 to: "B".to_string(),
193 label: "x".to_string(),
194 };
195 let event_b = ObsEvent::Sent {
196 tick: 1,
197 edge: Edge::new(2, "C", "D"),
198 session: 2,
199 from: "C".to_string(),
200 to: "D".to_string(),
201 label: "y".to_string(),
202 };
203 let left = vec![event_a.clone(), event_b.clone()];
204 let right = vec![event_b, event_a];
205 assert!(replay_consistent(
206 DeterminismMode::ModuloCommutativity,
207 &left,
208 &right,
209 &[],
210 &[]
211 ));
212 }
213
214 #[test]
215 fn modulo_commutativity_preserves_in_session_order() {
216 let a1 = ObsEvent::Sent {
217 tick: 1,
218 edge: Edge::new(1, "A", "B"),
219 session: 1,
220 from: "A".to_string(),
221 to: "B".to_string(),
222 label: "a1".to_string(),
223 };
224 let a2 = ObsEvent::Received {
225 tick: 2,
226 edge: Edge::new(1, "B", "A"),
227 session: 1,
228 from: "B".to_string(),
229 to: "A".to_string(),
230 label: "a2".to_string(),
231 };
232 let b1 = ObsEvent::Sent {
233 tick: 1,
234 edge: Edge::new(2, "C", "D"),
235 session: 2,
236 from: "C".to_string(),
237 to: "D".to_string(),
238 label: "b1".to_string(),
239 };
240
241 let baseline = vec![a1.clone(), b1, a2.clone()];
242 let invalid = vec![a2, a1];
243 assert!(!replay_consistent(
244 DeterminismMode::ModuloCommutativity,
245 &baseline,
246 &invalid,
247 &[],
248 &[]
249 ));
250 }
251
252 #[test]
253 fn modulo_commutativity_keeps_non_session_barriers_fixed() {
254 let sent = ObsEvent::Sent {
255 tick: 1,
256 edge: Edge::new(1, "A", "B"),
257 session: 1,
258 from: "A".to_string(),
259 to: "B".to_string(),
260 label: "x".to_string(),
261 };
262 let barrier = ObsEvent::Halted {
263 tick: 2,
264 coro_id: 99,
265 };
266 let recv = ObsEvent::Received {
267 tick: 3,
268 edge: Edge::new(2, "C", "D"),
269 session: 2,
270 from: "C".to_string(),
271 to: "D".to_string(),
272 label: "y".to_string(),
273 };
274
275 let baseline = vec![sent.clone(), barrier.clone(), recv.clone()];
276 let reordered = vec![recv, barrier, sent];
277 assert!(!replay_consistent(
278 DeterminismMode::ModuloCommutativity,
279 &baseline,
280 &reordered,
281 &[],
282 &[]
283 ));
284 }
285
286 #[test]
287 fn replay_mode_requires_exact_observation_trace() {
288 let left = vec![ObsEvent::Halted {
289 tick: 1,
290 coro_id: 0,
291 }];
292 let right = vec![ObsEvent::Halted {
293 tick: 2,
294 coro_id: 0,
295 }];
296 assert!(replay_consistent(
297 DeterminismMode::Replay,
298 &left,
299 &left,
300 &[],
301 &[]
302 ));
303 assert!(!replay_consistent(
304 DeterminismMode::Replay,
305 &left,
306 &right,
307 &[],
308 &[]
309 ));
310 }
311}