Skip to main content

telltale_machine/
serialization.rs

1//! Canonical serialization helpers for deterministic replay/testing artifacts.
2
3use crate::communication_replay::{CommunicationConsumptionArtifact, CommunicationReplayMode};
4use crate::determinism::EffectDeterminismTier;
5use crate::effect::{CorruptionType, EffectTraceEntry};
6use crate::engine::{ObsEvent, SessionTerminalReason};
7use crate::output_condition::OutputConditionCheck;
8use crate::semantic_objects::{
9    protocol_machine_semantic_objects, OperationInstance, OutstandingEffect, ProgressContract,
10    ProgressTransition, ProtocolMachineSemanticObjects, PublicationEvent, TransformationObligation,
11};
12use crate::session::{
13    AuthorityArtifact, AuthorityAuditEvent, AuthorityAuditRecord, AuthorityWitnessId,
14    FragmentOwnerId, OwnershipTerminalReason, SessionId,
15};
16use crate::trace::normalize_trace;
17use crate::transfer_semantics::{DelegationAuditRecord, DelegationReceipt, DelegationStatus};
18use crate::verification::Hash;
19use serde::{de::DeserializeOwned, Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22/// Canonical schema version identifier for ProtocolMachine replay/trace payloads.
23pub const SERIALIZATION_SCHEMA_VERSION: &str = "machine.serialization.v1";
24
25fn canonical_serialization_schema_version() -> String {
26    SERIALIZATION_SCHEMA_VERSION.to_string()
27}
28
29/// Serialize one value through the canonical ProtocolMachine binary codec.
30///
31/// This wrapper keeps binary-serialization policy centralized inside the ProtocolMachine
32/// crate instead of scattering direct `bincode` calls through runtime code.
33///
34/// # Errors
35///
36/// Returns a `bincode::Error` if the value cannot be serialized by the
37/// canonical binary codec.
38pub fn binary_encode<T: Serialize + ?Sized>(value: &T) -> Result<Vec<u8>, bincode::Error> {
39    bincode::serialize(value)
40}
41
42/// Deserialize one value through the canonical ProtocolMachine binary codec.
43///
44/// This wrapper keeps binary-serialization policy centralized inside the ProtocolMachine
45/// crate instead of scattering direct `bincode` calls through runtime code.
46///
47/// # Errors
48///
49/// Returns a `bincode::Error` if the bytes do not decode as the requested type
50/// under the canonical binary codec.
51pub fn binary_decode<T: DeserializeOwned>(bytes: &[u8]) -> Result<T, bincode::Error> {
52    bincode::deserialize(bytes)
53}
54
55/// Return the binary-encoded size for one value, saturating to `usize`.
56#[must_use]
57pub fn binary_size<T: Serialize + ?Sized>(value: &T) -> usize {
58    bincode::serialized_size(value)
59        .ok()
60        .and_then(|bytes| usize::try_from(bytes).ok())
61        .unwrap_or(0)
62}
63
64fn deserialize_serialization_schema_version<'de, D>(deserializer: D) -> Result<String, D::Error>
65where
66    D: serde::Deserializer<'de>,
67{
68    let version = String::deserialize(deserializer)?;
69    if version == SERIALIZATION_SCHEMA_VERSION {
70        Ok(version)
71    } else {
72        Err(serde::de::Error::custom(format!(
73            "unsupported schema_version '{version}'; expected '{SERIALIZATION_SCHEMA_VERSION}'"
74        )))
75    }
76}
77
78/// Versioned canonical trace payload used for cross-target normalization.
79#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
80pub struct CanonicalTraceV1 {
81    /// Schema version for canonical trace serialization.
82    #[serde(deserialize_with = "deserialize_serialization_schema_version")]
83    pub schema_version: String,
84    /// Canonically normalized observable events.
85    pub events: Vec<ObsEvent>,
86}
87
88/// Versioned canonical replay-state fragment used by tests and replay checks.
89#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
90pub struct CanonicalReplayFragmentV1 {
91    /// Schema version for canonical replay serialization.
92    #[serde(deserialize_with = "deserialize_serialization_schema_version")]
93    pub schema_version: String,
94    /// Canonically normalized observable trace.
95    pub obs_trace: Vec<ObsEvent>,
96    /// Canonically sorted effect trace.
97    pub effect_trace: Vec<EffectTraceEntry>,
98    /// Sorted crashed sites.
99    pub crashed_sites: Vec<String>,
100    /// Sorted directed partition edges.
101    pub partitioned_edges: Vec<(String, String)>,
102    /// Sorted directed corruption edges with policies.
103    pub corrupted_edges: Vec<((String, String), CorruptionType)>,
104    /// Sorted timeout horizons keyed by site.
105    pub timed_out_sites: Vec<(String, u64)>,
106    /// Declared effect determinism tier for this run.
107    #[serde(default)]
108    pub effect_determinism_tier: EffectDeterminismTier,
109    /// Active communication replay mode.
110    #[serde(default)]
111    pub communication_replay_mode: CommunicationReplayMode,
112    /// Deterministic communication replay-state root.
113    #[serde(default)]
114    pub communication_replay_root: Option<Hash>,
115    /// Proof-friendly receive consumption artifacts.
116    #[serde(default)]
117    pub communication_consumption_artifacts: Vec<CommunicationConsumptionArtifact>,
118    /// Canonical semantic audit records derived from authority/failure/effect surfaces.
119    #[serde(default)]
120    pub semantic_audit_log: Vec<SemanticAuditRecord>,
121    /// Canonical semantic objects derived from authority, delegation,
122    /// effect, and proof surfaces.
123    #[serde(default)]
124    pub semantic_objects: ProtocolMachineSemanticObjects,
125}
126
127/// Replay-stable semantic record derived from authority, delegation, effect, and
128/// failure-visible runtime artifacts.
129#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
130pub enum SemanticAuditRecord {
131    /// Authority witness issuance/consumption/rejection.
132    Authority {
133        /// Scheduler tick associated with the authority artifact, when present.
134        tick: Option<u64>,
135        /// Session referenced by the authority artifact, when session-scoped.
136        session: Option<SessionId>,
137        /// Authority witness or receipt artifact carried by the audit record.
138        artifact: AuthorityArtifact,
139        /// Audit event kind recorded for the authority artifact.
140        event: AuthorityAuditEvent,
141        /// Optional rejection or failure reason associated with the audit record.
142        reason: Option<String>,
143    },
144    /// Delegation/transfer completion or rollback.
145    Delegation {
146        /// Scheduler tick at which the delegation audit record was emitted.
147        tick: u64,
148        /// Session being delegated.
149        session: SessionId,
150        /// Delegation receipt proving the sanctioned transfer path.
151        receipt: DelegationReceipt,
152        /// Final delegation status for the receipt.
153        status: DelegationStatus,
154        /// Optional rollback or rejection reason for the transfer.
155        reason: Option<String>,
156    },
157    /// Transformation-local obligation bundle emitted for one handoff.
158    TransformationObligation {
159        /// Scheduler tick at which the obligation became canonical.
160        tick: u64,
161        /// Session whose fragments were transformed.
162        session: SessionId,
163        /// Explicit obligation bundle tied to the handoff.
164        obligation: TransformationObligation,
165    },
166    /// Canonical semantic publication event.
167    Publication {
168        /// Stable publication ordering tick.
169        tick: u64,
170        /// Session whose lifecycle/publication became visible, when session-scoped.
171        session: Option<SessionId>,
172        /// Canonical publication event.
173        event: PublicationEvent,
174    },
175    /// Replay-visible progress-contract transition.
176    ProgressTransition {
177        /// Stable tick at which the transition became visible.
178        tick: u64,
179        /// Session scoped by the progress contract, when available.
180        session: Option<SessionId>,
181        /// Canonical progress transition.
182        transition: ProgressTransition,
183    },
184    /// Explicit typed failure branch entry.
185    FailureBranch {
186        /// Scheduler tick at which the failure branch became visible.
187        tick: u64,
188        /// Session containing the failing coroutine.
189        session: SessionId,
190        /// Coroutine entering the failure branch.
191        coro_id: usize,
192        /// Typed fault surfaced by the branch.
193        fault: crate::coroutine::Fault,
194    },
195    /// Explicit timeout activation and timeout witness issuance.
196    TimeoutIssued {
197        /// Scheduler tick at which the timeout became active.
198        tick: u64,
199        /// Site for which timeout was issued.
200        site: String,
201        /// Tick horizon until which the timeout remains active.
202        until_tick: u64,
203        /// Issued timeout witness identifier.
204        witness_id: AuthorityWitnessId,
205    },
206    /// Explicit cancellation request.
207    CancellationRequested {
208        /// Scheduler tick at which cancellation was requested.
209        tick: u64,
210        /// Session being cancelled.
211        session: SessionId,
212        /// Cancellation witness authorizing the request.
213        witness_id: AuthorityWitnessId,
214        /// Owner capability active when cancellation was requested.
215        owner_id: FragmentOwnerId,
216        /// Terminal ownership reason causing the cancellation request.
217        reason: OwnershipTerminalReason,
218    },
219    /// Explicit cancellation completion.
220    Cancelled {
221        /// Scheduler tick at which cancellation completed.
222        tick: u64,
223        /// Session that was cancelled.
224        session: SessionId,
225        /// Cancellation witness consumed by completion.
226        witness_id: AuthorityWitnessId,
227        /// Terminal ownership reason recorded for the cancellation.
228        reason: OwnershipTerminalReason,
229    },
230    /// Explicit session terminal reason.
231    SessionTerminal {
232        /// Scheduler tick at which terminal state became visible.
233        tick: u64,
234        /// Session that reached terminal state.
235        session: SessionId,
236        /// Deterministic terminal reason recorded by the runtime.
237        reason: SessionTerminalReason,
238    },
239    /// Structured effect/interface observation.
240    EffectObservation {
241        /// Stable effect identifier assigned by the runtime.
242        effect_id: u64,
243        /// Deterministic ordering key used for canonical replay comparison.
244        ordering_key: u64,
245        /// Session referenced by the effect observation, when derivable.
246        session: Option<SessionId>,
247        /// Raw runtime effect kind tag.
248        effect_kind: String,
249        /// Nominal effect interface classification, when known.
250        effect_interface: Option<String>,
251        /// Nominal effect operation classification, when known.
252        effect_operation: Option<String>,
253        /// Stable handler identity attached to the observation.
254        handler_identity: String,
255        /// Serialized effect inputs.
256        inputs: JsonValue,
257        /// Serialized effect outputs.
258        outputs: JsonValue,
259    },
260}
261
262/// Normalize an observable trace into the canonical versioned format.
263#[must_use]
264pub fn canonical_trace_v1(trace: &[ObsEvent]) -> CanonicalTraceV1 {
265    CanonicalTraceV1 {
266        schema_version: canonical_serialization_schema_version(),
267        events: normalize_trace(trace),
268    }
269}
270
271/// Canonicalize effect-trace ordering for deterministic replay diffs.
272#[must_use]
273pub fn canonical_effect_trace(trace: &[EffectTraceEntry]) -> Vec<EffectTraceEntry> {
274    let mut out = trace.to_vec();
275    out.sort_by(|lhs, rhs| {
276        (lhs.ordering_key, lhs.effect_id, &lhs.effect_kind).cmp(&(
277            rhs.ordering_key,
278            rhs.effect_id,
279            &rhs.effect_kind,
280        ))
281    });
282    out
283}
284
285fn authority_artifact_session(artifact: &AuthorityArtifact) -> Option<SessionId> {
286    match artifact {
287        AuthorityArtifact::OwnershipCapability(capability) => Some(capability.session_id),
288        AuthorityArtifact::OwnershipReceipt(receipt) => Some(receipt.session_id),
289        AuthorityArtifact::Readiness(witness) => Some(witness.session_id),
290        AuthorityArtifact::Cancellation(witness) => Some(witness.session_id),
291        AuthorityArtifact::Timeout(_) => None,
292    }
293}
294
295fn semantic_rank(record: &SemanticAuditRecord) -> u8 {
296    match record {
297        SemanticAuditRecord::Authority { .. } => 0,
298        SemanticAuditRecord::Delegation { .. } => 1,
299        SemanticAuditRecord::TransformationObligation { .. } => 2,
300        SemanticAuditRecord::Publication { .. } => 3,
301        SemanticAuditRecord::ProgressTransition { .. } => 4,
302        SemanticAuditRecord::FailureBranch { .. } => 5,
303        SemanticAuditRecord::TimeoutIssued { .. } => 6,
304        SemanticAuditRecord::CancellationRequested { .. } => 7,
305        SemanticAuditRecord::Cancelled { .. } => 8,
306        SemanticAuditRecord::SessionTerminal { .. } => 9,
307        SemanticAuditRecord::EffectObservation { .. } => 10,
308    }
309}
310
311fn semantic_tick(record: &SemanticAuditRecord) -> u64 {
312    match record {
313        SemanticAuditRecord::Authority { tick, .. } => tick.unwrap_or(0),
314        SemanticAuditRecord::Delegation { tick, .. }
315        | SemanticAuditRecord::TransformationObligation { tick, .. }
316        | SemanticAuditRecord::Publication { tick, .. }
317        | SemanticAuditRecord::ProgressTransition { tick, .. }
318        | SemanticAuditRecord::FailureBranch { tick, .. }
319        | SemanticAuditRecord::TimeoutIssued { tick, .. }
320        | SemanticAuditRecord::CancellationRequested { tick, .. }
321        | SemanticAuditRecord::Cancelled { tick, .. }
322        | SemanticAuditRecord::SessionTerminal { tick, .. } => *tick,
323        SemanticAuditRecord::EffectObservation { ordering_key, .. } => *ordering_key,
324    }
325}
326
327fn publication_tick(
328    event: &PublicationEvent,
329    operation_instances: &[OperationInstance],
330    outstanding_effects: &[OutstandingEffect],
331) -> u64 {
332    operation_instances
333        .iter()
334        .find(|operation| operation.operation_id == event.operation_id)
335        .and_then(|operation| {
336            operation.effect_ids.iter().find_map(|effect_id| {
337                outstanding_effects
338                    .iter()
339                    .find(|effect| effect.effect_id == *effect_id)
340                    .map(|effect| effect.completed_at_tick.unwrap_or(effect.ordering_key))
341            })
342        })
343        .unwrap_or(0)
344}
345
346/// Canonicalize semantic audit ordering for deterministic replay diffs.
347#[must_use]
348pub fn canonical_semantic_audit_log(records: &[SemanticAuditRecord]) -> Vec<SemanticAuditRecord> {
349    let mut out = records.to_vec();
350    out.sort_by(|lhs, rhs| {
351        let lhs_key = (
352            semantic_tick(lhs),
353            semantic_rank(lhs),
354            serde_json::to_string(lhs).unwrap_or_default(),
355        );
356        let rhs_key = (
357            semantic_tick(rhs),
358            semantic_rank(rhs),
359            serde_json::to_string(rhs).unwrap_or_default(),
360        );
361        lhs_key.cmp(&rhs_key)
362    });
363    out
364}
365
366/// Canonicalize semantic-object ordering for deterministic replay diffs.
367#[must_use]
368pub fn canonicalize_protocol_machine_semantic_objects(
369    objects: &ProtocolMachineSemanticObjects,
370) -> ProtocolMachineSemanticObjects {
371    let mut out = objects.clone();
372    out.operation_instances
373        .sort_by_key(|lhs| lhs.operation_id.clone());
374    out.outstanding_effects.sort_by_key(|lhs| lhs.effect_id);
375    out.semantic_handoffs.sort_by_key(|lhs| lhs.handoff_id);
376    out.authoritative_reads
377        .sort_by_key(|lhs| lhs.read_id.clone());
378    out.observed_reads.sort_by_key(|lhs| lhs.read_id.clone());
379    out.materialization_proofs
380        .sort_by_key(|lhs| lhs.proof_id.clone());
381    out.canonical_handles
382        .sort_by_key(|lhs| lhs.handle_id.clone());
383    out.publication_events
384        .sort_by_key(|lhs| lhs.publication_id.clone());
385    out.prestate_bindings
386        .sort_by_key(|lhs| lhs.binding_id.clone());
387    out.agreement_profiles
388        .sort_by_key(|lhs| lhs.profile_name.clone());
389    out.agreement_contracts
390        .sort_by_key(|lhs| lhs.contract_name.clone());
391    out.agreement_evidence
392        .sort_by_key(|lhs| lhs.evidence_id.clone());
393    out.agreement_states.sort_by(|lhs, rhs| {
394        (
395            &lhs.operation_id,
396            &lhs.contract_name,
397            lhs.last_updated_tick.unwrap_or(0),
398        )
399            .cmp(&(
400                &rhs.operation_id,
401                &rhs.contract_name,
402                rhs.last_updated_tick.unwrap_or(0),
403            ))
404    });
405    out.progress_contracts
406        .sort_by_key(|lhs| lhs.operation_id.clone());
407    out
408}
409
410/// Build canonical semantic audit records from authority, delegation,
411/// failure-visible observable events, and effect/interface observations.
412#[must_use]
413#[allow(clippy::too_many_lines)]
414pub fn semantic_audit_log_v1(
415    authority_audit_log: &[AuthorityAuditRecord],
416    delegation_audit_log: &[DelegationAuditRecord],
417    operation_instances: &[OperationInstance],
418    obs_trace: &[ObsEvent],
419    outstanding_effects: &[OutstandingEffect],
420    progress_contracts: &[ProgressContract],
421    progress_transitions: &[ProgressTransition],
422) -> Vec<SemanticAuditRecord> {
423    let mut records = Vec::new();
424
425    records.extend(authority_audit_log.iter().cloned().map(|record| {
426        SemanticAuditRecord::Authority {
427            tick: record.tick,
428            session: authority_artifact_session(&record.artifact),
429            artifact: record.artifact,
430            event: record.event,
431            reason: record.reason,
432        }
433    }));
434
435    records.extend(delegation_audit_log.iter().cloned().map(|record| {
436        SemanticAuditRecord::Delegation {
437            tick: record.tick,
438            session: record.receipt.session,
439            receipt: record.receipt,
440            status: record.status,
441            reason: record.reason,
442        }
443    }));
444
445    let semantic_objects = protocol_machine_semantic_objects(
446        authority_audit_log,
447        delegation_audit_log,
448        operation_instances,
449        outstanding_effects,
450        &[],
451        progress_contracts,
452        progress_transitions,
453    );
454    let obligations = semantic_objects.transformation_obligations.clone();
455    records.extend(obligations.into_iter().map(|obligation| {
456        SemanticAuditRecord::TransformationObligation {
457            tick: obligation.tick,
458            session: obligation.session,
459            obligation,
460        }
461    }));
462    records.extend(
463        semantic_objects
464            .publication_events
465            .into_iter()
466            .map(|event| SemanticAuditRecord::Publication {
467                tick: publication_tick(&event, operation_instances, outstanding_effects),
468                session: event.session,
469                event,
470            }),
471    );
472    records.extend(progress_transitions.iter().cloned().map(|transition| {
473        SemanticAuditRecord::ProgressTransition {
474            tick: transition.tick,
475            session: transition.session,
476            transition,
477        }
478    }));
479
480    records.extend(obs_trace.iter().filter_map(|event| match event {
481        ObsEvent::FailureBranchEntered {
482            tick,
483            session,
484            coro_id,
485            fault,
486        } => Some(SemanticAuditRecord::FailureBranch {
487            tick: *tick,
488            session: *session,
489            coro_id: *coro_id,
490            fault: fault.clone(),
491        }),
492        ObsEvent::TimeoutIssued {
493            tick,
494            site,
495            until_tick,
496            witness_id,
497        } => Some(SemanticAuditRecord::TimeoutIssued {
498            tick: *tick,
499            site: site.clone(),
500            until_tick: *until_tick,
501            witness_id: *witness_id,
502        }),
503        ObsEvent::CancellationRequested {
504            tick,
505            session,
506            witness_id,
507            owner_id,
508            reason,
509        } => Some(SemanticAuditRecord::CancellationRequested {
510            tick: *tick,
511            session: *session,
512            witness_id: *witness_id,
513            owner_id: owner_id.clone(),
514            reason: reason.clone(),
515        }),
516        ObsEvent::Cancelled {
517            tick,
518            session,
519            witness_id,
520            reason,
521        } => Some(SemanticAuditRecord::Cancelled {
522            tick: *tick,
523            session: *session,
524            witness_id: *witness_id,
525            reason: reason.clone(),
526        }),
527        ObsEvent::SessionTerminal {
528            tick,
529            session,
530            reason,
531        } => Some(SemanticAuditRecord::SessionTerminal {
532            tick: *tick,
533            session: *session,
534            reason: reason.clone(),
535        }),
536        _ => None,
537    }));
538
539    records.extend(outstanding_effects.iter().cloned().map(|effect| {
540        SemanticAuditRecord::EffectObservation {
541            effect_id: effect.effect_id,
542            ordering_key: effect.ordering_key,
543            session: effect.session,
544            effect_kind: effect.effect_kind,
545            effect_interface: effect.effect_interface,
546            effect_operation: effect.effect_operation,
547            handler_identity: effect.handler_identity,
548            inputs: effect.inputs,
549            outputs: effect.outputs,
550        }
551    }));
552
553    canonical_semantic_audit_log(&records)
554}
555
556/// Build a canonical replay-state fragment from runtime snapshots.
557#[must_use]
558#[allow(clippy::too_many_arguments)]
559pub fn canonical_replay_fragment_v1(
560    obs_trace: &[ObsEvent],
561    effect_trace: &[EffectTraceEntry],
562    authority_audit_log: &[AuthorityAuditRecord],
563    delegation_audit_log: &[DelegationAuditRecord],
564    operation_instances: &[OperationInstance],
565    outstanding_effects: &[OutstandingEffect],
566    output_condition_checks: &[OutputConditionCheck],
567    progress_contracts: &[ProgressContract],
568    progress_transitions: &[ProgressTransition],
569    mut crashed_sites: Vec<String>,
570    mut partitioned_edges: Vec<(String, String)>,
571    mut corrupted_edges: Vec<((String, String), CorruptionType)>,
572    mut timed_out_sites: Vec<(String, u64)>,
573    effect_determinism_tier: EffectDeterminismTier,
574    communication_replay_mode: CommunicationReplayMode,
575    communication_replay_root: Option<Hash>,
576    communication_consumption_artifacts: Vec<CommunicationConsumptionArtifact>,
577) -> CanonicalReplayFragmentV1 {
578    crashed_sites.sort_unstable();
579    crashed_sites.dedup();
580
581    partitioned_edges.sort_unstable();
582    partitioned_edges.dedup();
583
584    corrupted_edges.sort_by(|lhs, rhs| lhs.0.cmp(&rhs.0));
585    corrupted_edges.dedup();
586
587    timed_out_sites.sort_unstable();
588
589    CanonicalReplayFragmentV1 {
590        schema_version: canonical_serialization_schema_version(),
591        obs_trace: canonical_trace_v1(obs_trace).events,
592        effect_trace: canonical_effect_trace(effect_trace),
593        crashed_sites,
594        partitioned_edges,
595        corrupted_edges,
596        timed_out_sites,
597        effect_determinism_tier,
598        communication_replay_mode,
599        communication_replay_root,
600        communication_consumption_artifacts,
601        semantic_audit_log: semantic_audit_log_v1(
602            authority_audit_log,
603            delegation_audit_log,
604            operation_instances,
605            obs_trace,
606            outstanding_effects,
607            progress_contracts,
608            progress_transitions,
609        ),
610        semantic_objects: canonicalize_protocol_machine_semantic_objects(
611            &protocol_machine_semantic_objects(
612                authority_audit_log,
613                delegation_audit_log,
614                operation_instances,
615                outstanding_effects,
616                output_condition_checks,
617                progress_contracts,
618                progress_transitions,
619            ),
620        ),
621    }
622}
623
624#[cfg(test)]
625mod tests {
626    use super::*;
627    use crate::session::Edge;
628
629    #[test]
630    fn canonical_effect_trace_is_stably_sorted() {
631        let trace = vec![
632            EffectTraceEntry {
633                effect_id: 2,
634                effect_kind: "b".to_string(),
635                inputs: serde_json::json!({}),
636                outputs: serde_json::json!({}),
637                handler_identity: "h".to_string(),
638                effect_interface: None,
639                effect_operation: None,
640                ordering_key: 3,
641                topology: None,
642            },
643            EffectTraceEntry {
644                effect_id: 1,
645                effect_kind: "a".to_string(),
646                inputs: serde_json::json!({}),
647                outputs: serde_json::json!({}),
648                handler_identity: "h".to_string(),
649                effect_interface: None,
650                effect_operation: None,
651                ordering_key: 2,
652                topology: None,
653            },
654        ];
655
656        let sorted = canonical_effect_trace(&trace);
657        assert_eq!(sorted[0].effect_id, 1);
658        assert_eq!(sorted[1].effect_id, 2);
659    }
660
661    #[test]
662    fn canonical_trace_payload_has_version() {
663        let trace = vec![ObsEvent::Sent {
664            tick: 1,
665            edge: Edge::new(1, "A", "B"),
666            session: 1,
667            from: "A".to_string(),
668            to: "B".to_string(),
669            label: "m".to_string(),
670        }];
671        let payload = canonical_trace_v1(&trace);
672        assert_eq!(payload.schema_version, SERIALIZATION_SCHEMA_VERSION);
673        assert_eq!(payload.events.len(), 1);
674    }
675
676    #[test]
677    fn numeric_schema_version_is_rejected() {
678        let payload = serde_json::json!({
679            "schema_version": 1,
680            "events": []
681        });
682        serde_json::from_value::<CanonicalTraceV1>(payload)
683            .expect_err("numeric schema version should be rejected");
684    }
685}