1use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize};
6
7use crate::determinism::EffectDeterminismTier;
8use crate::engine::ObsEvent;
9use crate::serialization::CanonicalReplayFragmentV1;
10use crate::trace::normalize_trace;
11use crate::trace::obs_session;
12use crate::verification::{DefaultVerificationModel, HashTag, VerificationModel};
13
14pub const ENVELOPE_DIFF_SCHEMA_VERSION: &str = "protocol_machine.envelope_diff.v1";
16
17fn canonical_schema_version() -> String {
18 ENVELOPE_DIFF_SCHEMA_VERSION.to_string()
19}
20
21fn deserialize_envelope_schema_version<'de, D>(deserializer: D) -> Result<String, D::Error>
22where
23 D: serde::Deserializer<'de>,
24{
25 let version = String::deserialize(deserializer)?;
26 if version == ENVELOPE_DIFF_SCHEMA_VERSION {
27 Ok(version)
28 } else {
29 Err(serde::de::Error::custom(format!(
30 "unsupported schema_version '{version}'; expected '{ENVELOPE_DIFF_SCHEMA_VERSION}'"
31 )))
32 }
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
37pub enum SchedulerPermutationClass {
38 Exact,
40 SessionNormalizedPermutation,
42 EnvelopeBounded,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
48pub enum EffectOrderingClass {
49 Exact,
51 ReplayDeterministic,
53 EnvelopeBounded,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
59pub enum FailureVisibleDiffClass {
60 Exact,
62 EnvelopeBounded,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
68pub struct WaveWidthBound {
69 pub baseline_max_wave_width: usize,
71 pub candidate_max_wave_width: usize,
73 pub declared_upper_bound: usize,
75}
76
77impl WaveWidthBound {
78 #[must_use]
80 pub fn within_declared_bound(&self) -> bool {
81 self.candidate_max_wave_width <= self.declared_upper_bound
82 }
83}
84
85#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
87pub struct EnvelopeDiff {
88 #[serde(deserialize_with = "deserialize_envelope_schema_version")]
90 pub schema_version: String,
91 pub baseline_engine: String,
93 pub candidate_engine: String,
95 pub scheduler_permutation_class: SchedulerPermutationClass,
97 pub wave_width_bound: WaveWidthBound,
99 pub effect_ordering_class: EffectOrderingClass,
101 pub failure_visible_diff_class: FailureVisibleDiffClass,
103 pub effect_determinism_tier: EffectDeterminismTier,
105}
106
107impl EnvelopeDiff {
108 #[must_use]
110 pub fn from_replay_fragments(
111 baseline_engine: impl Into<String>,
112 candidate_engine: impl Into<String>,
113 baseline: &CanonicalReplayFragmentV1,
114 candidate: &CanonicalReplayFragmentV1,
115 baseline_max_wave_width: usize,
116 candidate_max_wave_width: usize,
117 declared_upper_bound: usize,
118 effect_determinism_tier: EffectDeterminismTier,
119 ) -> Self {
120 let scheduler_permutation_class =
121 classify_scheduler_permutation(&baseline.obs_trace, &candidate.obs_trace);
122 let effect_ordering_class =
123 classify_effect_ordering(baseline, candidate, scheduler_permutation_class);
124 let failure_visible_diff_class = classify_failure_visible(baseline, candidate);
125
126 Self {
127 schema_version: canonical_schema_version(),
128 baseline_engine: baseline_engine.into(),
129 candidate_engine: candidate_engine.into(),
130 scheduler_permutation_class,
131 wave_width_bound: WaveWidthBound {
132 baseline_max_wave_width,
133 candidate_max_wave_width,
134 declared_upper_bound,
135 },
136 effect_ordering_class,
137 failure_visible_diff_class,
138 effect_determinism_tier,
139 }
140 }
141
142 pub fn canonical_json(&self) -> Result<Vec<u8>, serde_json::Error> {
148 serde_json::to_vec(self)
149 }
150
151 #[must_use]
153 pub fn stable_hash_hex(&self) -> String {
154 stable_hash_hex_from_serializable(self)
155 }
156}
157
158#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
160pub struct EnvelopeDiffArtifactV1 {
161 #[serde(deserialize_with = "deserialize_envelope_schema_version")]
163 pub schema_version: String,
164 pub envelope_diff: EnvelopeDiff,
166 pub baseline_fragment_hash: String,
168 pub candidate_fragment_hash: String,
170 pub envelope_diff_hash: String,
172}
173
174impl EnvelopeDiffArtifactV1 {
175 #[must_use]
177 pub fn from_replay_fragments(
178 baseline_engine: impl Into<String>,
179 candidate_engine: impl Into<String>,
180 baseline: &CanonicalReplayFragmentV1,
181 candidate: &CanonicalReplayFragmentV1,
182 baseline_max_wave_width: usize,
183 candidate_max_wave_width: usize,
184 declared_upper_bound: usize,
185 effect_determinism_tier: EffectDeterminismTier,
186 ) -> Self {
187 let envelope_diff = EnvelopeDiff::from_replay_fragments(
188 baseline_engine,
189 candidate_engine,
190 baseline,
191 candidate,
192 baseline_max_wave_width,
193 candidate_max_wave_width,
194 declared_upper_bound,
195 effect_determinism_tier,
196 );
197 let baseline_fragment_hash = stable_hash_hex_from_serializable(baseline);
198 let candidate_fragment_hash = stable_hash_hex_from_serializable(candidate);
199 let envelope_diff_hash = envelope_diff.stable_hash_hex();
200 Self {
201 schema_version: canonical_schema_version(),
202 envelope_diff,
203 baseline_fragment_hash,
204 candidate_fragment_hash,
205 envelope_diff_hash,
206 }
207 }
208}
209
210fn classify_scheduler_permutation(
211 baseline_trace: &[ObsEvent],
212 candidate_trace: &[ObsEvent],
213) -> SchedulerPermutationClass {
214 if baseline_trace == candidate_trace {
215 return SchedulerPermutationClass::Exact;
216 }
217 let baseline_normalized = normalize_trace(baseline_trace);
218 let candidate_normalized = normalize_trace(candidate_trace);
219 if per_session_projection(&baseline_normalized) == per_session_projection(&candidate_normalized)
220 {
221 return SchedulerPermutationClass::SessionNormalizedPermutation;
222 }
223 SchedulerPermutationClass::EnvelopeBounded
224}
225
226fn per_session_projection(trace: &[ObsEvent]) -> BTreeMap<usize, Vec<ObsEvent>> {
227 let mut out: BTreeMap<usize, Vec<ObsEvent>> = BTreeMap::new();
228 for event in trace {
229 if let Some(sid) = obs_session(event) {
230 out.entry(sid).or_default().push(event.clone());
231 }
232 }
233 out
234}
235
236fn classify_effect_ordering(
237 baseline: &CanonicalReplayFragmentV1,
238 candidate: &CanonicalReplayFragmentV1,
239 scheduler_permutation_class: SchedulerPermutationClass,
240) -> EffectOrderingClass {
241 if baseline.effect_trace == candidate.effect_trace {
242 return EffectOrderingClass::Exact;
243 }
244 match scheduler_permutation_class {
245 SchedulerPermutationClass::Exact
246 | SchedulerPermutationClass::SessionNormalizedPermutation => {
247 EffectOrderingClass::ReplayDeterministic
248 }
249 SchedulerPermutationClass::EnvelopeBounded => EffectOrderingClass::EnvelopeBounded,
250 }
251}
252
253fn classify_failure_visible(
254 baseline: &CanonicalReplayFragmentV1,
255 candidate: &CanonicalReplayFragmentV1,
256) -> FailureVisibleDiffClass {
257 if baseline.crashed_sites == candidate.crashed_sites
258 && baseline.partitioned_edges == candidate.partitioned_edges
259 && baseline.corrupted_edges == candidate.corrupted_edges
260 && baseline.timed_out_sites == candidate.timed_out_sites
261 {
262 FailureVisibleDiffClass::Exact
263 } else {
264 FailureVisibleDiffClass::EnvelopeBounded
265 }
266}
267
268fn stable_hash_hex_from_serializable<T: Serialize>(value: &T) -> String {
269 let bytes = serde_json::to_vec(value).unwrap_or_else(|_| b"{}".to_vec());
270 let digest = DefaultVerificationModel::hash(HashTag::Value, &bytes);
271 bytes_to_hex(&digest.0)
272}
273
274#[allow(clippy::as_conversions)]
275fn bytes_to_hex(bytes: &[u8]) -> String {
276 const HEX: &[u8; 16] = b"0123456789abcdef";
277 let mut out = String::with_capacity(bytes.len() * 2);
278 for byte in bytes {
279 out.push(HEX[(byte >> 4) as usize] as char);
281 out.push(HEX[(byte & 0x0f) as usize] as char);
283 }
284 out
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use crate::session::Edge;
291
292 fn sent(session: usize, tick: u64) -> ObsEvent {
293 ObsEvent::Sent {
294 tick,
295 edge: Edge::new(session, "A", "B"),
296 session,
297 from: "A".to_string(),
298 to: "B".to_string(),
299 label: "m".to_string(),
300 }
301 }
302
303 fn fragment(trace: Vec<ObsEvent>) -> CanonicalReplayFragmentV1 {
304 CanonicalReplayFragmentV1 {
305 schema_version: crate::serialization::SERIALIZATION_SCHEMA_VERSION.to_string(),
306 obs_trace: trace,
307 effect_trace: Vec::new(),
308 crashed_sites: Vec::new(),
309 partitioned_edges: Vec::new(),
310 corrupted_edges: Vec::new(),
311 timed_out_sites: Vec::new(),
312 effect_determinism_tier: EffectDeterminismTier::StrictDeterministic,
313 communication_replay_mode: crate::communication_replay::CommunicationReplayMode::Off,
314 communication_replay_root: None,
315 communication_consumption_artifacts: Vec::new(),
316 semantic_audit_log: Vec::new(),
317 semantic_objects: crate::semantic_objects::ProtocolMachineSemanticObjects::default(),
318 }
319 }
320
321 #[test]
322 fn scheduler_class_detects_session_permutation() {
323 let baseline = fragment(vec![sent(1, 1), sent(2, 2)]);
324 let candidate = fragment(vec![sent(2, 3), sent(1, 4)]);
325 let diff = EnvelopeDiff::from_replay_fragments(
326 "canonical",
327 "threaded",
328 &baseline,
329 &candidate,
330 1,
331 2,
332 2,
333 EffectDeterminismTier::EnvelopeBoundedNondeterministic,
334 );
335 assert_eq!(
336 diff.scheduler_permutation_class,
337 SchedulerPermutationClass::SessionNormalizedPermutation
338 );
339 }
340
341 #[test]
342 fn envelope_hash_is_stable_for_equal_payloads() {
343 let baseline = fragment(vec![sent(1, 1)]);
344 let candidate = fragment(vec![sent(1, 1)]);
345 let left = EnvelopeDiff::from_replay_fragments(
346 "a",
347 "b",
348 &baseline,
349 &candidate,
350 1,
351 1,
352 1,
353 EffectDeterminismTier::StrictDeterministic,
354 );
355 let right = EnvelopeDiff::from_replay_fragments(
356 "a",
357 "b",
358 &baseline,
359 &candidate,
360 1,
361 1,
362 1,
363 EffectDeterminismTier::StrictDeterministic,
364 );
365 assert_eq!(left.stable_hash_hex(), right.stable_hash_hex());
366 }
367
368 #[test]
369 fn artifact_hash_tracks_envelope_payload() {
370 let baseline = fragment(vec![sent(1, 1)]);
371 let candidate = fragment(vec![sent(1, 1)]);
372 let artifact = EnvelopeDiffArtifactV1::from_replay_fragments(
373 "canonical",
374 "threaded",
375 &baseline,
376 &candidate,
377 1,
378 1,
379 1,
380 EffectDeterminismTier::StrictDeterministic,
381 );
382 assert!(!artifact.envelope_diff_hash.is_empty());
383 assert_eq!(
384 artifact.envelope_diff_hash,
385 artifact.envelope_diff.stable_hash_hex()
386 );
387 }
388
389 #[test]
390 fn numeric_schema_version_is_rejected() {
391 let payload = serde_json::json!({
392 "schema_version": 1,
393 "baseline_engine": "lean",
394 "candidate_engine": "threaded",
395 "scheduler_permutation_class": "Exact",
396 "wave_width_bound": {
397 "baseline_max_wave_width": 1,
398 "candidate_max_wave_width": 1,
399 "declared_upper_bound": 1
400 },
401 "effect_ordering_class": "Exact",
402 "failure_visible_diff_class": "Exact",
403 "effect_determinism_tier": "strict_deterministic"
404 });
405 serde_json::from_value::<EnvelopeDiff>(payload)
406 .expect_err("numeric schema version should be rejected");
407 }
408}