use crate::communication_replay::{CommunicationConsumptionArtifact, CommunicationReplayMode};
use crate::determinism::EffectDeterminismTier;
use crate::effect::{CorruptionType, EffectTraceEntry};
use crate::engine::{ObsEvent, SessionTerminalReason};
use crate::output_condition::OutputConditionCheck;
use crate::semantic_objects::{
protocol_machine_semantic_objects, OperationInstance, OutstandingEffect, ProgressContract,
ProgressTransition, ProtocolMachineSemanticObjects, PublicationEvent, TransformationObligation,
};
use crate::session::{
AuthorityArtifact, AuthorityAuditEvent, AuthorityAuditRecord, AuthorityWitnessId,
FragmentOwnerId, OwnershipTerminalReason, SessionId,
};
use crate::trace::normalize_trace;
use crate::transfer_semantics::{DelegationAuditRecord, DelegationReceipt, DelegationStatus};
use crate::verification::Hash;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value as JsonValue;
pub const SERIALIZATION_SCHEMA_VERSION: &str = "machine.serialization.v1";
fn canonical_serialization_schema_version() -> String {
SERIALIZATION_SCHEMA_VERSION.to_string()
}
pub fn binary_encode<T: Serialize + ?Sized>(value: &T) -> Result<Vec<u8>, bincode::Error> {
bincode::serialize(value)
}
pub fn binary_decode<T: DeserializeOwned>(bytes: &[u8]) -> Result<T, bincode::Error> {
bincode::deserialize(bytes)
}
#[must_use]
pub fn binary_size<T: Serialize + ?Sized>(value: &T) -> usize {
bincode::serialized_size(value)
.ok()
.and_then(|bytes| usize::try_from(bytes).ok())
.unwrap_or(0)
}
fn deserialize_serialization_schema_version<'de, D>(deserializer: D) -> Result<String, D::Error>
where
D: serde::Deserializer<'de>,
{
let version = String::deserialize(deserializer)?;
if version == SERIALIZATION_SCHEMA_VERSION {
Ok(version)
} else {
Err(serde::de::Error::custom(format!(
"unsupported schema_version '{version}'; expected '{SERIALIZATION_SCHEMA_VERSION}'"
)))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CanonicalTraceV1 {
#[serde(deserialize_with = "deserialize_serialization_schema_version")]
pub schema_version: String,
pub events: Vec<ObsEvent>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CanonicalReplayFragmentV1 {
#[serde(deserialize_with = "deserialize_serialization_schema_version")]
pub schema_version: String,
pub obs_trace: Vec<ObsEvent>,
pub effect_trace: Vec<EffectTraceEntry>,
pub crashed_sites: Vec<String>,
pub partitioned_edges: Vec<(String, String)>,
pub corrupted_edges: Vec<((String, String), CorruptionType)>,
pub timed_out_sites: Vec<(String, u64)>,
#[serde(default)]
pub effect_determinism_tier: EffectDeterminismTier,
#[serde(default)]
pub communication_replay_mode: CommunicationReplayMode,
#[serde(default)]
pub communication_replay_root: Option<Hash>,
#[serde(default)]
pub communication_consumption_artifacts: Vec<CommunicationConsumptionArtifact>,
#[serde(default)]
pub semantic_audit_log: Vec<SemanticAuditRecord>,
#[serde(default)]
pub semantic_objects: ProtocolMachineSemanticObjects,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum SemanticAuditRecord {
Authority {
tick: Option<u64>,
session: Option<SessionId>,
artifact: AuthorityArtifact,
event: AuthorityAuditEvent,
reason: Option<String>,
},
Delegation {
tick: u64,
session: SessionId,
receipt: DelegationReceipt,
status: DelegationStatus,
reason: Option<String>,
},
TransformationObligation {
tick: u64,
session: SessionId,
obligation: TransformationObligation,
},
Publication {
tick: u64,
session: Option<SessionId>,
event: PublicationEvent,
},
ProgressTransition {
tick: u64,
session: Option<SessionId>,
transition: ProgressTransition,
},
FailureBranch {
tick: u64,
session: SessionId,
coro_id: usize,
fault: crate::coroutine::Fault,
},
TimeoutIssued {
tick: u64,
site: String,
until_tick: u64,
witness_id: AuthorityWitnessId,
},
CancellationRequested {
tick: u64,
session: SessionId,
witness_id: AuthorityWitnessId,
owner_id: FragmentOwnerId,
reason: OwnershipTerminalReason,
},
Cancelled {
tick: u64,
session: SessionId,
witness_id: AuthorityWitnessId,
reason: OwnershipTerminalReason,
},
SessionTerminal {
tick: u64,
session: SessionId,
reason: SessionTerminalReason,
},
EffectObservation {
effect_id: u64,
ordering_key: u64,
session: Option<SessionId>,
effect_kind: String,
effect_interface: Option<String>,
effect_operation: Option<String>,
handler_identity: String,
inputs: JsonValue,
outputs: JsonValue,
},
}
#[must_use]
pub fn canonical_trace_v1(trace: &[ObsEvent]) -> CanonicalTraceV1 {
CanonicalTraceV1 {
schema_version: canonical_serialization_schema_version(),
events: normalize_trace(trace),
}
}
#[must_use]
pub fn canonical_effect_trace(trace: &[EffectTraceEntry]) -> Vec<EffectTraceEntry> {
let mut out = trace.to_vec();
out.sort_by(|lhs, rhs| {
(lhs.ordering_key, lhs.effect_id, &lhs.effect_kind).cmp(&(
rhs.ordering_key,
rhs.effect_id,
&rhs.effect_kind,
))
});
out
}
fn authority_artifact_session(artifact: &AuthorityArtifact) -> Option<SessionId> {
match artifact {
AuthorityArtifact::OwnershipCapability(capability) => Some(capability.session_id),
AuthorityArtifact::OwnershipReceipt(receipt) => Some(receipt.session_id),
AuthorityArtifact::Readiness(witness) => Some(witness.session_id),
AuthorityArtifact::Cancellation(witness) => Some(witness.session_id),
AuthorityArtifact::Timeout(_) => None,
}
}
fn semantic_rank(record: &SemanticAuditRecord) -> u8 {
match record {
SemanticAuditRecord::Authority { .. } => 0,
SemanticAuditRecord::Delegation { .. } => 1,
SemanticAuditRecord::TransformationObligation { .. } => 2,
SemanticAuditRecord::Publication { .. } => 3,
SemanticAuditRecord::ProgressTransition { .. } => 4,
SemanticAuditRecord::FailureBranch { .. } => 5,
SemanticAuditRecord::TimeoutIssued { .. } => 6,
SemanticAuditRecord::CancellationRequested { .. } => 7,
SemanticAuditRecord::Cancelled { .. } => 8,
SemanticAuditRecord::SessionTerminal { .. } => 9,
SemanticAuditRecord::EffectObservation { .. } => 10,
}
}
fn semantic_tick(record: &SemanticAuditRecord) -> u64 {
match record {
SemanticAuditRecord::Authority { tick, .. } => tick.unwrap_or(0),
SemanticAuditRecord::Delegation { tick, .. }
| SemanticAuditRecord::TransformationObligation { tick, .. }
| SemanticAuditRecord::Publication { tick, .. }
| SemanticAuditRecord::ProgressTransition { tick, .. }
| SemanticAuditRecord::FailureBranch { tick, .. }
| SemanticAuditRecord::TimeoutIssued { tick, .. }
| SemanticAuditRecord::CancellationRequested { tick, .. }
| SemanticAuditRecord::Cancelled { tick, .. }
| SemanticAuditRecord::SessionTerminal { tick, .. } => *tick,
SemanticAuditRecord::EffectObservation { ordering_key, .. } => *ordering_key,
}
}
fn publication_tick(
event: &PublicationEvent,
operation_instances: &[OperationInstance],
outstanding_effects: &[OutstandingEffect],
) -> u64 {
operation_instances
.iter()
.find(|operation| operation.operation_id == event.operation_id)
.and_then(|operation| {
operation.effect_ids.iter().find_map(|effect_id| {
outstanding_effects
.iter()
.find(|effect| effect.effect_id == *effect_id)
.map(|effect| effect.completed_at_tick.unwrap_or(effect.ordering_key))
})
})
.unwrap_or(0)
}
#[must_use]
pub fn canonical_semantic_audit_log(records: &[SemanticAuditRecord]) -> Vec<SemanticAuditRecord> {
let mut out = records.to_vec();
out.sort_by(|lhs, rhs| {
let lhs_key = (
semantic_tick(lhs),
semantic_rank(lhs),
serde_json::to_string(lhs).unwrap_or_default(),
);
let rhs_key = (
semantic_tick(rhs),
semantic_rank(rhs),
serde_json::to_string(rhs).unwrap_or_default(),
);
lhs_key.cmp(&rhs_key)
});
out
}
#[must_use]
pub fn canonicalize_protocol_machine_semantic_objects(
objects: &ProtocolMachineSemanticObjects,
) -> ProtocolMachineSemanticObjects {
let mut out = objects.clone();
out.operation_instances
.sort_by_key(|lhs| lhs.operation_id.clone());
out.outstanding_effects.sort_by_key(|lhs| lhs.effect_id);
out.semantic_handoffs.sort_by_key(|lhs| lhs.handoff_id);
out.authoritative_reads
.sort_by_key(|lhs| lhs.read_id.clone());
out.observed_reads.sort_by_key(|lhs| lhs.read_id.clone());
out.materialization_proofs
.sort_by_key(|lhs| lhs.proof_id.clone());
out.canonical_handles
.sort_by_key(|lhs| lhs.handle_id.clone());
out.publication_events
.sort_by_key(|lhs| lhs.publication_id.clone());
out.prestate_bindings
.sort_by_key(|lhs| lhs.binding_id.clone());
out.agreement_profiles
.sort_by_key(|lhs| lhs.profile_name.clone());
out.agreement_contracts
.sort_by_key(|lhs| lhs.contract_name.clone());
out.agreement_evidence
.sort_by_key(|lhs| lhs.evidence_id.clone());
out.agreement_states.sort_by(|lhs, rhs| {
(
&lhs.operation_id,
&lhs.contract_name,
lhs.last_updated_tick.unwrap_or(0),
)
.cmp(&(
&rhs.operation_id,
&rhs.contract_name,
rhs.last_updated_tick.unwrap_or(0),
))
});
out.progress_contracts
.sort_by_key(|lhs| lhs.operation_id.clone());
out
}
#[must_use]
#[allow(clippy::too_many_lines)]
pub fn semantic_audit_log_v1(
authority_audit_log: &[AuthorityAuditRecord],
delegation_audit_log: &[DelegationAuditRecord],
operation_instances: &[OperationInstance],
obs_trace: &[ObsEvent],
outstanding_effects: &[OutstandingEffect],
progress_contracts: &[ProgressContract],
progress_transitions: &[ProgressTransition],
) -> Vec<SemanticAuditRecord> {
let mut records = Vec::new();
records.extend(authority_audit_log.iter().cloned().map(|record| {
SemanticAuditRecord::Authority {
tick: record.tick,
session: authority_artifact_session(&record.artifact),
artifact: record.artifact,
event: record.event,
reason: record.reason,
}
}));
records.extend(delegation_audit_log.iter().cloned().map(|record| {
SemanticAuditRecord::Delegation {
tick: record.tick,
session: record.receipt.session,
receipt: record.receipt,
status: record.status,
reason: record.reason,
}
}));
let semantic_objects = protocol_machine_semantic_objects(
authority_audit_log,
delegation_audit_log,
operation_instances,
outstanding_effects,
&[],
progress_contracts,
progress_transitions,
);
let obligations = semantic_objects.transformation_obligations.clone();
records.extend(obligations.into_iter().map(|obligation| {
SemanticAuditRecord::TransformationObligation {
tick: obligation.tick,
session: obligation.session,
obligation,
}
}));
records.extend(
semantic_objects
.publication_events
.into_iter()
.map(|event| SemanticAuditRecord::Publication {
tick: publication_tick(&event, operation_instances, outstanding_effects),
session: event.session,
event,
}),
);
records.extend(progress_transitions.iter().cloned().map(|transition| {
SemanticAuditRecord::ProgressTransition {
tick: transition.tick,
session: transition.session,
transition,
}
}));
records.extend(obs_trace.iter().filter_map(|event| match event {
ObsEvent::FailureBranchEntered {
tick,
session,
coro_id,
fault,
} => Some(SemanticAuditRecord::FailureBranch {
tick: *tick,
session: *session,
coro_id: *coro_id,
fault: fault.clone(),
}),
ObsEvent::TimeoutIssued {
tick,
site,
until_tick,
witness_id,
} => Some(SemanticAuditRecord::TimeoutIssued {
tick: *tick,
site: site.clone(),
until_tick: *until_tick,
witness_id: *witness_id,
}),
ObsEvent::CancellationRequested {
tick,
session,
witness_id,
owner_id,
reason,
} => Some(SemanticAuditRecord::CancellationRequested {
tick: *tick,
session: *session,
witness_id: *witness_id,
owner_id: owner_id.clone(),
reason: reason.clone(),
}),
ObsEvent::Cancelled {
tick,
session,
witness_id,
reason,
} => Some(SemanticAuditRecord::Cancelled {
tick: *tick,
session: *session,
witness_id: *witness_id,
reason: reason.clone(),
}),
ObsEvent::SessionTerminal {
tick,
session,
reason,
} => Some(SemanticAuditRecord::SessionTerminal {
tick: *tick,
session: *session,
reason: reason.clone(),
}),
_ => None,
}));
records.extend(outstanding_effects.iter().cloned().map(|effect| {
SemanticAuditRecord::EffectObservation {
effect_id: effect.effect_id,
ordering_key: effect.ordering_key,
session: effect.session,
effect_kind: effect.effect_kind,
effect_interface: effect.effect_interface,
effect_operation: effect.effect_operation,
handler_identity: effect.handler_identity,
inputs: effect.inputs,
outputs: effect.outputs,
}
}));
canonical_semantic_audit_log(&records)
}
#[must_use]
#[allow(clippy::too_many_arguments)]
pub fn canonical_replay_fragment_v1(
obs_trace: &[ObsEvent],
effect_trace: &[EffectTraceEntry],
authority_audit_log: &[AuthorityAuditRecord],
delegation_audit_log: &[DelegationAuditRecord],
operation_instances: &[OperationInstance],
outstanding_effects: &[OutstandingEffect],
output_condition_checks: &[OutputConditionCheck],
progress_contracts: &[ProgressContract],
progress_transitions: &[ProgressTransition],
mut crashed_sites: Vec<String>,
mut partitioned_edges: Vec<(String, String)>,
mut corrupted_edges: Vec<((String, String), CorruptionType)>,
mut timed_out_sites: Vec<(String, u64)>,
effect_determinism_tier: EffectDeterminismTier,
communication_replay_mode: CommunicationReplayMode,
communication_replay_root: Option<Hash>,
communication_consumption_artifacts: Vec<CommunicationConsumptionArtifact>,
) -> CanonicalReplayFragmentV1 {
crashed_sites.sort_unstable();
crashed_sites.dedup();
partitioned_edges.sort_unstable();
partitioned_edges.dedup();
corrupted_edges.sort_by(|lhs, rhs| lhs.0.cmp(&rhs.0));
corrupted_edges.dedup();
timed_out_sites.sort_unstable();
CanonicalReplayFragmentV1 {
schema_version: canonical_serialization_schema_version(),
obs_trace: canonical_trace_v1(obs_trace).events,
effect_trace: canonical_effect_trace(effect_trace),
crashed_sites,
partitioned_edges,
corrupted_edges,
timed_out_sites,
effect_determinism_tier,
communication_replay_mode,
communication_replay_root,
communication_consumption_artifacts,
semantic_audit_log: semantic_audit_log_v1(
authority_audit_log,
delegation_audit_log,
operation_instances,
obs_trace,
outstanding_effects,
progress_contracts,
progress_transitions,
),
semantic_objects: canonicalize_protocol_machine_semantic_objects(
&protocol_machine_semantic_objects(
authority_audit_log,
delegation_audit_log,
operation_instances,
outstanding_effects,
output_condition_checks,
progress_contracts,
progress_transitions,
),
),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::session::Edge;
#[test]
fn canonical_effect_trace_is_stably_sorted() {
let trace = vec![
EffectTraceEntry {
effect_id: 2,
effect_kind: "b".to_string(),
inputs: serde_json::json!({}),
outputs: serde_json::json!({}),
handler_identity: "h".to_string(),
effect_interface: None,
effect_operation: None,
ordering_key: 3,
topology: None,
},
EffectTraceEntry {
effect_id: 1,
effect_kind: "a".to_string(),
inputs: serde_json::json!({}),
outputs: serde_json::json!({}),
handler_identity: "h".to_string(),
effect_interface: None,
effect_operation: None,
ordering_key: 2,
topology: None,
},
];
let sorted = canonical_effect_trace(&trace);
assert_eq!(sorted[0].effect_id, 1);
assert_eq!(sorted[1].effect_id, 2);
}
#[test]
fn canonical_trace_payload_has_version() {
let trace = vec![ObsEvent::Sent {
tick: 1,
edge: Edge::new(1, "A", "B"),
session: 1,
from: "A".to_string(),
to: "B".to_string(),
label: "m".to_string(),
}];
let payload = canonical_trace_v1(&trace);
assert_eq!(payload.schema_version, SERIALIZATION_SCHEMA_VERSION);
assert_eq!(payload.events.len(), 1);
}
#[test]
fn numeric_schema_version_is_rejected() {
let payload = serde_json::json!({
"schema_version": 1,
"events": []
});
serde_json::from_value::<CanonicalTraceV1>(payload)
.expect_err("numeric schema version should be rejected");
}
}