1use crate::communication_replay::{CommunicationConsumptionArtifact, CommunicationReplayMode};
4use crate::determinism::EffectDeterminismTier;
5use crate::effect::{CorruptionType, EffectTraceEntry};
6use crate::engine::{ObsEvent, SessionTerminalReason};
7use crate::output_condition::OutputConditionCheck;
8use crate::semantic_objects::{
9 protocol_machine_semantic_objects, OperationInstance, OutstandingEffect, ProgressContract,
10 ProgressTransition, ProtocolMachineSemanticObjects, PublicationEvent, TransformationObligation,
11};
12use crate::session::{
13 AuthorityArtifact, AuthorityAuditEvent, AuthorityAuditRecord, AuthorityWitnessId,
14 FragmentOwnerId, OwnershipTerminalReason, SessionId,
15};
16use crate::trace::normalize_trace;
17use crate::transfer_semantics::{DelegationAuditRecord, DelegationReceipt, DelegationStatus};
18use crate::verification::Hash;
19use serde::{de::DeserializeOwned, Deserialize, Serialize};
20use serde_json::Value as JsonValue;
21
22pub const SERIALIZATION_SCHEMA_VERSION: &str = "machine.serialization.v1";
24
25fn canonical_serialization_schema_version() -> String {
26 SERIALIZATION_SCHEMA_VERSION.to_string()
27}
28
29pub fn binary_encode<T: Serialize + ?Sized>(value: &T) -> Result<Vec<u8>, bincode::Error> {
39 bincode::serialize(value)
40}
41
42pub fn binary_decode<T: DeserializeOwned>(bytes: &[u8]) -> Result<T, bincode::Error> {
52 bincode::deserialize(bytes)
53}
54
55#[must_use]
57pub fn binary_size<T: Serialize + ?Sized>(value: &T) -> usize {
58 bincode::serialized_size(value)
59 .ok()
60 .and_then(|bytes| usize::try_from(bytes).ok())
61 .unwrap_or(0)
62}
63
64fn deserialize_serialization_schema_version<'de, D>(deserializer: D) -> Result<String, D::Error>
65where
66 D: serde::Deserializer<'de>,
67{
68 let version = String::deserialize(deserializer)?;
69 if version == SERIALIZATION_SCHEMA_VERSION {
70 Ok(version)
71 } else {
72 Err(serde::de::Error::custom(format!(
73 "unsupported schema_version '{version}'; expected '{SERIALIZATION_SCHEMA_VERSION}'"
74 )))
75 }
76}
77
78#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
80pub struct CanonicalTraceV1 {
81 #[serde(deserialize_with = "deserialize_serialization_schema_version")]
83 pub schema_version: String,
84 pub events: Vec<ObsEvent>,
86}
87
88#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
90pub struct CanonicalReplayFragmentV1 {
91 #[serde(deserialize_with = "deserialize_serialization_schema_version")]
93 pub schema_version: String,
94 pub obs_trace: Vec<ObsEvent>,
96 pub effect_trace: Vec<EffectTraceEntry>,
98 pub crashed_sites: Vec<String>,
100 pub partitioned_edges: Vec<(String, String)>,
102 pub corrupted_edges: Vec<((String, String), CorruptionType)>,
104 pub timed_out_sites: Vec<(String, u64)>,
106 #[serde(default)]
108 pub effect_determinism_tier: EffectDeterminismTier,
109 #[serde(default)]
111 pub communication_replay_mode: CommunicationReplayMode,
112 #[serde(default)]
114 pub communication_replay_root: Option<Hash>,
115 #[serde(default)]
117 pub communication_consumption_artifacts: Vec<CommunicationConsumptionArtifact>,
118 #[serde(default)]
120 pub semantic_audit_log: Vec<SemanticAuditRecord>,
121 #[serde(default)]
124 pub semantic_objects: ProtocolMachineSemanticObjects,
125}
126
127#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
130pub enum SemanticAuditRecord {
131 Authority {
133 tick: Option<u64>,
135 session: Option<SessionId>,
137 artifact: AuthorityArtifact,
139 event: AuthorityAuditEvent,
141 reason: Option<String>,
143 },
144 Delegation {
146 tick: u64,
148 session: SessionId,
150 receipt: DelegationReceipt,
152 status: DelegationStatus,
154 reason: Option<String>,
156 },
157 TransformationObligation {
159 tick: u64,
161 session: SessionId,
163 obligation: TransformationObligation,
165 },
166 Publication {
168 tick: u64,
170 session: Option<SessionId>,
172 event: PublicationEvent,
174 },
175 ProgressTransition {
177 tick: u64,
179 session: Option<SessionId>,
181 transition: ProgressTransition,
183 },
184 FailureBranch {
186 tick: u64,
188 session: SessionId,
190 coro_id: usize,
192 fault: crate::coroutine::Fault,
194 },
195 TimeoutIssued {
197 tick: u64,
199 site: String,
201 until_tick: u64,
203 witness_id: AuthorityWitnessId,
205 },
206 CancellationRequested {
208 tick: u64,
210 session: SessionId,
212 witness_id: AuthorityWitnessId,
214 owner_id: FragmentOwnerId,
216 reason: OwnershipTerminalReason,
218 },
219 Cancelled {
221 tick: u64,
223 session: SessionId,
225 witness_id: AuthorityWitnessId,
227 reason: OwnershipTerminalReason,
229 },
230 SessionTerminal {
232 tick: u64,
234 session: SessionId,
236 reason: SessionTerminalReason,
238 },
239 EffectObservation {
241 effect_id: u64,
243 ordering_key: u64,
245 session: Option<SessionId>,
247 effect_kind: String,
249 effect_interface: Option<String>,
251 effect_operation: Option<String>,
253 handler_identity: String,
255 inputs: JsonValue,
257 outputs: JsonValue,
259 },
260}
261
262#[must_use]
264pub fn canonical_trace_v1(trace: &[ObsEvent]) -> CanonicalTraceV1 {
265 CanonicalTraceV1 {
266 schema_version: canonical_serialization_schema_version(),
267 events: normalize_trace(trace),
268 }
269}
270
271#[must_use]
273pub fn canonical_effect_trace(trace: &[EffectTraceEntry]) -> Vec<EffectTraceEntry> {
274 let mut out = trace.to_vec();
275 out.sort_by(|lhs, rhs| {
276 (lhs.ordering_key, lhs.effect_id, &lhs.effect_kind).cmp(&(
277 rhs.ordering_key,
278 rhs.effect_id,
279 &rhs.effect_kind,
280 ))
281 });
282 out
283}
284
285fn authority_artifact_session(artifact: &AuthorityArtifact) -> Option<SessionId> {
286 match artifact {
287 AuthorityArtifact::OwnershipCapability(capability) => Some(capability.session_id),
288 AuthorityArtifact::OwnershipReceipt(receipt) => Some(receipt.session_id),
289 AuthorityArtifact::Readiness(witness) => Some(witness.session_id),
290 AuthorityArtifact::Cancellation(witness) => Some(witness.session_id),
291 AuthorityArtifact::Timeout(_) => None,
292 }
293}
294
295fn semantic_rank(record: &SemanticAuditRecord) -> u8 {
296 match record {
297 SemanticAuditRecord::Authority { .. } => 0,
298 SemanticAuditRecord::Delegation { .. } => 1,
299 SemanticAuditRecord::TransformationObligation { .. } => 2,
300 SemanticAuditRecord::Publication { .. } => 3,
301 SemanticAuditRecord::ProgressTransition { .. } => 4,
302 SemanticAuditRecord::FailureBranch { .. } => 5,
303 SemanticAuditRecord::TimeoutIssued { .. } => 6,
304 SemanticAuditRecord::CancellationRequested { .. } => 7,
305 SemanticAuditRecord::Cancelled { .. } => 8,
306 SemanticAuditRecord::SessionTerminal { .. } => 9,
307 SemanticAuditRecord::EffectObservation { .. } => 10,
308 }
309}
310
311fn semantic_tick(record: &SemanticAuditRecord) -> u64 {
312 match record {
313 SemanticAuditRecord::Authority { tick, .. } => tick.unwrap_or(0),
314 SemanticAuditRecord::Delegation { tick, .. }
315 | SemanticAuditRecord::TransformationObligation { tick, .. }
316 | SemanticAuditRecord::Publication { tick, .. }
317 | SemanticAuditRecord::ProgressTransition { tick, .. }
318 | SemanticAuditRecord::FailureBranch { tick, .. }
319 | SemanticAuditRecord::TimeoutIssued { tick, .. }
320 | SemanticAuditRecord::CancellationRequested { tick, .. }
321 | SemanticAuditRecord::Cancelled { tick, .. }
322 | SemanticAuditRecord::SessionTerminal { tick, .. } => *tick,
323 SemanticAuditRecord::EffectObservation { ordering_key, .. } => *ordering_key,
324 }
325}
326
327fn publication_tick(
328 event: &PublicationEvent,
329 operation_instances: &[OperationInstance],
330 outstanding_effects: &[OutstandingEffect],
331) -> u64 {
332 operation_instances
333 .iter()
334 .find(|operation| operation.operation_id == event.operation_id)
335 .and_then(|operation| {
336 operation.effect_ids.iter().find_map(|effect_id| {
337 outstanding_effects
338 .iter()
339 .find(|effect| effect.effect_id == *effect_id)
340 .map(|effect| effect.completed_at_tick.unwrap_or(effect.ordering_key))
341 })
342 })
343 .unwrap_or(0)
344}
345
346#[must_use]
348pub fn canonical_semantic_audit_log(records: &[SemanticAuditRecord]) -> Vec<SemanticAuditRecord> {
349 let mut out = records.to_vec();
350 out.sort_by(|lhs, rhs| {
351 let lhs_key = (
352 semantic_tick(lhs),
353 semantic_rank(lhs),
354 serde_json::to_string(lhs).unwrap_or_default(),
355 );
356 let rhs_key = (
357 semantic_tick(rhs),
358 semantic_rank(rhs),
359 serde_json::to_string(rhs).unwrap_or_default(),
360 );
361 lhs_key.cmp(&rhs_key)
362 });
363 out
364}
365
366#[must_use]
368pub fn canonicalize_protocol_machine_semantic_objects(
369 objects: &ProtocolMachineSemanticObjects,
370) -> ProtocolMachineSemanticObjects {
371 let mut out = objects.clone();
372 out.operation_instances
373 .sort_by_key(|lhs| lhs.operation_id.clone());
374 out.outstanding_effects.sort_by_key(|lhs| lhs.effect_id);
375 out.semantic_handoffs.sort_by_key(|lhs| lhs.handoff_id);
376 out.authoritative_reads
377 .sort_by_key(|lhs| lhs.read_id.clone());
378 out.observed_reads.sort_by_key(|lhs| lhs.read_id.clone());
379 out.materialization_proofs
380 .sort_by_key(|lhs| lhs.proof_id.clone());
381 out.canonical_handles
382 .sort_by_key(|lhs| lhs.handle_id.clone());
383 out.publication_events
384 .sort_by_key(|lhs| lhs.publication_id.clone());
385 out.prestate_bindings
386 .sort_by_key(|lhs| lhs.binding_id.clone());
387 out.agreement_profiles
388 .sort_by_key(|lhs| lhs.profile_name.clone());
389 out.agreement_contracts
390 .sort_by_key(|lhs| lhs.contract_name.clone());
391 out.agreement_evidence
392 .sort_by_key(|lhs| lhs.evidence_id.clone());
393 out.agreement_states.sort_by(|lhs, rhs| {
394 (
395 &lhs.operation_id,
396 &lhs.contract_name,
397 lhs.last_updated_tick.unwrap_or(0),
398 )
399 .cmp(&(
400 &rhs.operation_id,
401 &rhs.contract_name,
402 rhs.last_updated_tick.unwrap_or(0),
403 ))
404 });
405 out.progress_contracts
406 .sort_by_key(|lhs| lhs.operation_id.clone());
407 out
408}
409
410#[must_use]
413#[allow(clippy::too_many_lines)]
414pub fn semantic_audit_log_v1(
415 authority_audit_log: &[AuthorityAuditRecord],
416 delegation_audit_log: &[DelegationAuditRecord],
417 operation_instances: &[OperationInstance],
418 obs_trace: &[ObsEvent],
419 outstanding_effects: &[OutstandingEffect],
420 progress_contracts: &[ProgressContract],
421 progress_transitions: &[ProgressTransition],
422) -> Vec<SemanticAuditRecord> {
423 let mut records = Vec::new();
424
425 records.extend(authority_audit_log.iter().cloned().map(|record| {
426 SemanticAuditRecord::Authority {
427 tick: record.tick,
428 session: authority_artifact_session(&record.artifact),
429 artifact: record.artifact,
430 event: record.event,
431 reason: record.reason,
432 }
433 }));
434
435 records.extend(delegation_audit_log.iter().cloned().map(|record| {
436 SemanticAuditRecord::Delegation {
437 tick: record.tick,
438 session: record.receipt.session,
439 receipt: record.receipt,
440 status: record.status,
441 reason: record.reason,
442 }
443 }));
444
445 let semantic_objects = protocol_machine_semantic_objects(
446 authority_audit_log,
447 delegation_audit_log,
448 operation_instances,
449 outstanding_effects,
450 &[],
451 progress_contracts,
452 progress_transitions,
453 );
454 let obligations = semantic_objects.transformation_obligations.clone();
455 records.extend(obligations.into_iter().map(|obligation| {
456 SemanticAuditRecord::TransformationObligation {
457 tick: obligation.tick,
458 session: obligation.session,
459 obligation,
460 }
461 }));
462 records.extend(
463 semantic_objects
464 .publication_events
465 .into_iter()
466 .map(|event| SemanticAuditRecord::Publication {
467 tick: publication_tick(&event, operation_instances, outstanding_effects),
468 session: event.session,
469 event,
470 }),
471 );
472 records.extend(progress_transitions.iter().cloned().map(|transition| {
473 SemanticAuditRecord::ProgressTransition {
474 tick: transition.tick,
475 session: transition.session,
476 transition,
477 }
478 }));
479
480 records.extend(obs_trace.iter().filter_map(|event| match event {
481 ObsEvent::FailureBranchEntered {
482 tick,
483 session,
484 coro_id,
485 fault,
486 } => Some(SemanticAuditRecord::FailureBranch {
487 tick: *tick,
488 session: *session,
489 coro_id: *coro_id,
490 fault: fault.clone(),
491 }),
492 ObsEvent::TimeoutIssued {
493 tick,
494 site,
495 until_tick,
496 witness_id,
497 } => Some(SemanticAuditRecord::TimeoutIssued {
498 tick: *tick,
499 site: site.clone(),
500 until_tick: *until_tick,
501 witness_id: *witness_id,
502 }),
503 ObsEvent::CancellationRequested {
504 tick,
505 session,
506 witness_id,
507 owner_id,
508 reason,
509 } => Some(SemanticAuditRecord::CancellationRequested {
510 tick: *tick,
511 session: *session,
512 witness_id: *witness_id,
513 owner_id: owner_id.clone(),
514 reason: reason.clone(),
515 }),
516 ObsEvent::Cancelled {
517 tick,
518 session,
519 witness_id,
520 reason,
521 } => Some(SemanticAuditRecord::Cancelled {
522 tick: *tick,
523 session: *session,
524 witness_id: *witness_id,
525 reason: reason.clone(),
526 }),
527 ObsEvent::SessionTerminal {
528 tick,
529 session,
530 reason,
531 } => Some(SemanticAuditRecord::SessionTerminal {
532 tick: *tick,
533 session: *session,
534 reason: reason.clone(),
535 }),
536 _ => None,
537 }));
538
539 records.extend(outstanding_effects.iter().cloned().map(|effect| {
540 SemanticAuditRecord::EffectObservation {
541 effect_id: effect.effect_id,
542 ordering_key: effect.ordering_key,
543 session: effect.session,
544 effect_kind: effect.effect_kind,
545 effect_interface: effect.effect_interface,
546 effect_operation: effect.effect_operation,
547 handler_identity: effect.handler_identity,
548 inputs: effect.inputs,
549 outputs: effect.outputs,
550 }
551 }));
552
553 canonical_semantic_audit_log(&records)
554}
555
556#[must_use]
558#[allow(clippy::too_many_arguments)]
559pub fn canonical_replay_fragment_v1(
560 obs_trace: &[ObsEvent],
561 effect_trace: &[EffectTraceEntry],
562 authority_audit_log: &[AuthorityAuditRecord],
563 delegation_audit_log: &[DelegationAuditRecord],
564 operation_instances: &[OperationInstance],
565 outstanding_effects: &[OutstandingEffect],
566 output_condition_checks: &[OutputConditionCheck],
567 progress_contracts: &[ProgressContract],
568 progress_transitions: &[ProgressTransition],
569 mut crashed_sites: Vec<String>,
570 mut partitioned_edges: Vec<(String, String)>,
571 mut corrupted_edges: Vec<((String, String), CorruptionType)>,
572 mut timed_out_sites: Vec<(String, u64)>,
573 effect_determinism_tier: EffectDeterminismTier,
574 communication_replay_mode: CommunicationReplayMode,
575 communication_replay_root: Option<Hash>,
576 communication_consumption_artifacts: Vec<CommunicationConsumptionArtifact>,
577) -> CanonicalReplayFragmentV1 {
578 crashed_sites.sort_unstable();
579 crashed_sites.dedup();
580
581 partitioned_edges.sort_unstable();
582 partitioned_edges.dedup();
583
584 corrupted_edges.sort_by(|lhs, rhs| lhs.0.cmp(&rhs.0));
585 corrupted_edges.dedup();
586
587 timed_out_sites.sort_unstable();
588
589 CanonicalReplayFragmentV1 {
590 schema_version: canonical_serialization_schema_version(),
591 obs_trace: canonical_trace_v1(obs_trace).events,
592 effect_trace: canonical_effect_trace(effect_trace),
593 crashed_sites,
594 partitioned_edges,
595 corrupted_edges,
596 timed_out_sites,
597 effect_determinism_tier,
598 communication_replay_mode,
599 communication_replay_root,
600 communication_consumption_artifacts,
601 semantic_audit_log: semantic_audit_log_v1(
602 authority_audit_log,
603 delegation_audit_log,
604 operation_instances,
605 obs_trace,
606 outstanding_effects,
607 progress_contracts,
608 progress_transitions,
609 ),
610 semantic_objects: canonicalize_protocol_machine_semantic_objects(
611 &protocol_machine_semantic_objects(
612 authority_audit_log,
613 delegation_audit_log,
614 operation_instances,
615 outstanding_effects,
616 output_condition_checks,
617 progress_contracts,
618 progress_transitions,
619 ),
620 ),
621 }
622}
623
624#[cfg(test)]
625mod tests {
626 use super::*;
627 use crate::session::Edge;
628
629 #[test]
630 fn canonical_effect_trace_is_stably_sorted() {
631 let trace = vec![
632 EffectTraceEntry {
633 effect_id: 2,
634 effect_kind: "b".to_string(),
635 inputs: serde_json::json!({}),
636 outputs: serde_json::json!({}),
637 handler_identity: "h".to_string(),
638 effect_interface: None,
639 effect_operation: None,
640 ordering_key: 3,
641 topology: None,
642 },
643 EffectTraceEntry {
644 effect_id: 1,
645 effect_kind: "a".to_string(),
646 inputs: serde_json::json!({}),
647 outputs: serde_json::json!({}),
648 handler_identity: "h".to_string(),
649 effect_interface: None,
650 effect_operation: None,
651 ordering_key: 2,
652 topology: None,
653 },
654 ];
655
656 let sorted = canonical_effect_trace(&trace);
657 assert_eq!(sorted[0].effect_id, 1);
658 assert_eq!(sorted[1].effect_id, 2);
659 }
660
661 #[test]
662 fn canonical_trace_payload_has_version() {
663 let trace = vec![ObsEvent::Sent {
664 tick: 1,
665 edge: Edge::new(1, "A", "B"),
666 session: 1,
667 from: "A".to_string(),
668 to: "B".to_string(),
669 label: "m".to_string(),
670 }];
671 let payload = canonical_trace_v1(&trace);
672 assert_eq!(payload.schema_version, SERIALIZATION_SCHEMA_VERSION);
673 assert_eq!(payload.events.len(), 1);
674 }
675
676 #[test]
677 fn numeric_schema_version_is_rejected() {
678 let payload = serde_json::json!({
679 "schema_version": 1,
680 "events": []
681 });
682 serde_json::from_value::<CanonicalTraceV1>(payload)
683 .expect_err("numeric schema version should be rejected");
684 }
685}