1#![allow(missing_docs)]
9
10use crate::output_condition::OutputConditionCheck;
11use crate::session::{
12 AuthorityArtifact, AuthorityAuditEvent, AuthorityAuditRecord, AuthorityWitnessId,
13 FragmentOwnerId, OwnershipEpoch, SessionId,
14};
15use crate::transfer_semantics::DelegationAuditRecord;
16use serde::{Deserialize, Serialize};
17use std::collections::BTreeMap;
18
19pub const SEMANTIC_OBJECTS_SCHEMA_VERSION: &str = "protocol_machine.semantic_objects.v1";
21
22fn canonical_semantic_objects_schema_version() -> String {
23 SEMANTIC_OBJECTS_SCHEMA_VERSION.to_string()
24}
25
26fn deserialize_semantic_objects_schema_version<'de, D>(deserializer: D) -> Result<String, D::Error>
27where
28 D: serde::Deserializer<'de>,
29{
30 let version = String::deserialize(deserializer)?;
31 if version == SEMANTIC_OBJECTS_SCHEMA_VERSION {
32 Ok(version)
33 } else {
34 Err(serde::de::Error::custom(format!(
35 "unsupported schema_version '{version}'; expected '{SEMANTIC_OBJECTS_SCHEMA_VERSION}'"
36 )))
37 }
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(rename_all = "snake_case")]
43pub enum OperationPhase {
44 Pending,
45 Blocked,
46 Succeeded,
47 Failed,
48 Cancelled,
49 TimedOut,
50 HandedOff,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(rename_all = "snake_case")]
56pub enum OutstandingEffectStatus {
57 Pending,
58 Blocked,
59 Succeeded,
60 Failed,
61 Cancelled,
62 Invalidated,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
67#[serde(rename_all = "snake_case")]
68pub enum AuthoritativeReadKind {
69 Readiness,
70 Cancellation,
71 Timeout,
72 OutputCondition,
73}
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(rename_all = "snake_case")]
78pub enum AuthoritativeReadLifecycle {
79 Issued,
80 Consumed,
81 Rejected,
82 Verified,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum CanonicalHandleKind {
89 Materialization,
90 Handoff,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
95#[serde(rename_all = "snake_case")]
96pub enum PublicationObserverClass {
97 Canonical,
98 Audit,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
103#[serde(rename_all = "snake_case")]
104pub enum PublicationStatus {
105 Published,
106 Rejected,
107}
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
111#[serde(rename_all = "snake_case")]
112pub enum ProgressState {
113 Pending,
114 Blocked,
115 NoProgress,
116 Degraded,
117 Succeeded,
118 Failed,
119 Cancelled,
120 TimedOut,
121 HandedOff,
122}
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
126#[serde(rename_all = "snake_case")]
127pub enum OperationVisibility {
128 Immediate,
129 Pending,
130 BlockedUntilFinalized,
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
135#[serde(rename_all = "snake_case")]
136pub enum AgreementLevel {
137 None,
138 Provisional,
139 SoftSafe,
140 Finalized,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
145#[serde(rename_all = "snake_case")]
146pub enum AgreementRule {
147 NoAgreement,
148 AnyParticipant,
149 Unanimous,
150 Threshold { required_participants: u64 },
151 Named { rule_name: String },
152}
153
154#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
156#[serde(rename_all = "snake_case")]
157pub enum AgreementEvidenceKind {
158 Witness,
159 Certificate,
160 CommitFact,
161 Publication,
162 Materialization,
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
167#[serde(rename_all = "snake_case")]
168pub enum FinalizationOutcome {
169 Finalized,
170 Aborted,
171 Rejected,
172 TimedOut,
173}
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
177#[serde(rename_all = "snake_case")]
178pub enum FinalizationReadClass {
179 None,
180 ObservedOnly,
181 AuthoritativeOnly,
182 Mixed,
183}
184
185#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
187#[serde(rename_all = "snake_case")]
188pub enum FinalizationStage {
189 Observed,
190 Authoritative,
191 Materialized,
192 Canonical,
193 Invalidated,
194 Rejected,
195}
196
197#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
199pub enum OwnershipScope {
200 Session,
201 Fragments(Vec<String>),
202}
203
204#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
206pub enum DelegationStatus {
207 Committed,
208 RolledBack,
209}
210
211fn default_progress_contract_last_progress_tick() -> Option<u64> {
212 None
213}
214
215fn default_progress_contract_escalated_at_tick() -> Option<u64> {
216 None
217}
218
219fn default_progress_contract_reason() -> Option<String> {
220 None
221}
222
223impl AgreementLevel {
224 #[must_use]
225 pub const fn rank(self) -> u8 {
226 match self {
227 Self::None => 0,
228 Self::Provisional => 1,
229 Self::SoftSafe => 2,
230 Self::Finalized => 3,
231 }
232 }
233
234 #[must_use]
235 pub const fn at_least(self, required: Self) -> bool {
236 self.rank() >= required.rank()
237 }
238}
239
240impl OperationVisibility {
241 #[must_use]
242 pub const fn permits_use_at(self, level: AgreementLevel) -> bool {
243 match self {
244 Self::Immediate => true,
245 Self::Pending => level.at_least(AgreementLevel::Provisional),
246 Self::BlockedUntilFinalized => level.at_least(AgreementLevel::Finalized),
247 }
248 }
249}
250
251#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
253pub struct OperationInstance {
254 pub operation_id: String,
255 pub session: Option<SessionId>,
256 pub owner_id: Option<FragmentOwnerId>,
257 pub kind: String,
258 pub phase: OperationPhase,
259 pub handler_identity: Option<String>,
260 pub effect_ids: Vec<u64>,
261 pub dependent_operation_ids: Vec<String>,
262 pub terminal_publication: Option<String>,
263 pub budget_ticks: Option<u64>,
264 pub requires_proof: bool,
265}
266
267#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
269pub struct OutstandingEffect {
270 pub effect_id: u64,
271 pub operation_id: String,
272 pub session: Option<SessionId>,
273 pub owner_id: Option<FragmentOwnerId>,
274 pub effect_interface: Option<String>,
275 pub effect_operation: Option<String>,
276 pub effect_kind: String,
277 pub handler_identity: String,
278 pub status: OutstandingEffectStatus,
279 pub ordering_key: u64,
280 pub budget_ticks: Option<u64>,
281 pub retry_policy: String,
282 pub invalidation_token: String,
283 pub completed_at_tick: Option<u64>,
284 pub inputs: serde_json::Value,
285 pub outputs: serde_json::Value,
286}
287
288#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
290pub struct SemanticHandoff {
291 pub handoff_id: u64,
292 pub session: SessionId,
293 pub from_coro: usize,
294 pub to_coro: usize,
295 #[serde(default)]
296 pub revoked_owner_id: String,
297 #[serde(default)]
298 pub activated_owner_id: String,
299 pub scope: OwnershipScope,
300 pub status: DelegationStatus,
301 pub tick: u64,
302 pub reason: Option<String>,
303}
304
305#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
307pub struct TransformationObligation {
308 pub obligation_id: String,
309 pub handoff_id: u64,
310 pub session: SessionId,
311 pub transformed_fragments: Vec<String>,
312 pub affected_operation_ids: Vec<String>,
313 pub affected_effect_ids: Vec<u64>,
314 pub transported_effect_ids: Vec<u64>,
315 pub invalidated_effect_ids: Vec<u64>,
316 pub witness_policy: String,
317 pub publication_revoked_from: String,
318 pub publication_activated_to: String,
319 pub scope: OwnershipScope,
320 pub status: DelegationStatus,
321 pub tick: u64,
322 pub reason: Option<String>,
323}
324
325#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
327pub struct AuthoritativeRead {
328 pub read_id: String,
329 pub session: Option<SessionId>,
330 pub owner_id: Option<FragmentOwnerId>,
331 pub kind: AuthoritativeReadKind,
332 pub lifecycle: AuthoritativeReadLifecycle,
333 pub predicate_ref: Option<String>,
334 pub witness_id: Option<AuthorityWitnessId>,
335 pub generation: Option<OwnershipEpoch>,
336 pub reason: Option<String>,
337}
338
339#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
341pub struct ObservedRead {
342 pub read_id: String,
343 pub session: Option<SessionId>,
344 pub effect_id: u64,
345 pub effect_interface: Option<String>,
346 pub effect_operation: Option<String>,
347 pub handler_identity: String,
348 pub status: OutstandingEffectStatus,
349}
350
351#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
353pub struct MaterializationProof {
354 pub proof_id: String,
355 pub session: Option<SessionId>,
356 pub predicate_ref: String,
357 pub witness_ref: Option<String>,
358 pub output_digest: String,
359 pub passed: bool,
360}
361
362#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
364pub struct CanonicalHandle {
365 pub handle_id: String,
366 pub session: Option<SessionId>,
367 pub owner_id: Option<FragmentOwnerId>,
368 pub kind: CanonicalHandleKind,
369 pub proof_ref: Option<String>,
370}
371
372#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
374pub struct PublicationEvent {
375 pub publication_id: String,
376 pub session: Option<SessionId>,
377 pub operation_id: String,
378 pub owner_id: Option<FragmentOwnerId>,
379 pub publication: String,
380 pub observer_class: PublicationObserverClass,
381 pub status: PublicationStatus,
382 pub proof_ref: Option<String>,
383 pub handle_ref: Option<String>,
384 pub reason: Option<String>,
385}
386
387#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
389pub struct PrestateBinding {
390 pub binding_id: String,
391 pub operation_id: String,
392 pub session: Option<SessionId>,
393 pub state_digest: String,
394 pub epoch_ref: Option<String>,
395 pub participant_digest: Option<String>,
396}
397
398#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
400pub struct AgreementProfile {
401 pub profile_name: String,
402 pub visibility: OperationVisibility,
403 pub rule: AgreementRule,
404 pub usable_at: AgreementLevel,
405 pub finalized_at: AgreementLevel,
406 pub required_evidence_kind: AgreementEvidenceKind,
407}
408
409#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
411pub struct AgreementContract {
412 pub contract_name: String,
413 pub operation_id: String,
414 pub session: Option<SessionId>,
415 pub owner_id: Option<FragmentOwnerId>,
416 pub profile_name: Option<String>,
417 pub visibility: OperationVisibility,
418 pub rule: AgreementRule,
419 pub usable_at: AgreementLevel,
420 pub finalized_at: AgreementLevel,
421 pub required_evidence_kind: AgreementEvidenceKind,
422}
423
424#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
426pub struct AgreementEvidence {
427 pub evidence_id: String,
428 pub operation_id: String,
429 pub session: Option<SessionId>,
430 pub owner_id: Option<FragmentOwnerId>,
431 pub level: AgreementLevel,
432 pub kind: AgreementEvidenceKind,
433 pub reference: String,
434 pub authoritative: bool,
435}
436
437#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
439pub struct AgreementState {
440 pub operation_id: String,
441 pub session: Option<SessionId>,
442 pub owner_id: Option<FragmentOwnerId>,
443 pub contract_name: String,
444 pub level: AgreementLevel,
445 pub finalization: Option<FinalizationOutcome>,
446 pub evidence_ids: Vec<String>,
447 pub last_updated_tick: Option<u64>,
448 pub reason: Option<String>,
449}
450
451#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
453pub struct Region {
454 pub region_id: String,
455 pub session: Option<SessionId>,
456 pub owner_id: Option<FragmentOwnerId>,
457 pub scope: OwnershipScope,
458 pub operation_ids: Vec<String>,
459 pub effect_ids: Vec<u64>,
460 pub authoritative_read_ids: Vec<String>,
461 pub handle_ids: Vec<String>,
462 pub publication_ids: Vec<String>,
463}
464
465#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
467pub struct ProgressContract {
468 pub operation_id: String,
469 pub session: Option<SessionId>,
470 pub state: ProgressState,
471 pub last_ordering_key: Option<u64>,
472 pub bounded: bool,
473 #[serde(default)]
474 pub budget_ticks: Option<u64>,
475 #[serde(default = "default_progress_contract_last_progress_tick")]
476 pub last_progress_tick: Option<u64>,
477 #[serde(default = "default_progress_contract_escalated_at_tick")]
478 pub escalated_at_tick: Option<u64>,
479 #[serde(default = "default_progress_contract_reason")]
480 pub reason: Option<String>,
481}
482
483#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
485pub struct ProgressTransition {
486 pub operation_id: String,
487 pub session: Option<SessionId>,
488 pub from_state: ProgressState,
489 pub to_state: ProgressState,
490 pub tick: u64,
491 pub reason: Option<String>,
492}
493
494#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
496pub struct FinalizationPath {
497 pub operation_id: String,
498 pub session: Option<SessionId>,
499 pub owner_id: Option<FragmentOwnerId>,
500 pub read_class: FinalizationReadClass,
501 pub stage: FinalizationStage,
502 pub observed_read_ids: Vec<String>,
503 pub authoritative_read_ids: Vec<String>,
504 pub proof_ids: Vec<String>,
505 pub canonical_handle_ids: Vec<String>,
506 pub publication_ids: Vec<String>,
507 pub invalidated_by_handoff_ids: Vec<u64>,
508 pub rejected_publication_ids: Vec<String>,
509}
510
511#[derive(Debug, Clone, Copy)]
513pub struct ProtocolMachineFinalization<'a> {
514 objects: &'a ProtocolMachineSemanticObjects,
515}
516
517impl FinalizationPath {
518 #[must_use]
520 pub fn is_canonical(&self) -> bool {
521 self.stage == FinalizationStage::Canonical
522 }
523
524 #[must_use]
526 pub fn is_invalidated(&self) -> bool {
527 self.stage == FinalizationStage::Invalidated
528 }
529}
530
531impl PrestateBinding {
532 #[must_use]
533 pub fn binds_operation(&self, operation: &OperationInstance) -> bool {
534 self.operation_id == operation.operation_id && self.session == operation.session
535 }
536}
537
538impl AgreementProfile {
539 #[must_use]
540 pub fn supports_contract(&self, contract: &AgreementContract) -> bool {
541 contract.profile_name.as_deref() == Some(self.profile_name.as_str())
542 && contract.visibility == self.visibility
543 && contract.rule == self.rule
544 && contract.usable_at == self.usable_at
545 && contract.finalized_at == self.finalized_at
546 && contract.required_evidence_kind == self.required_evidence_kind
547 }
548}
549
550impl AgreementContract {
551 #[must_use]
552 pub fn tracks_operation(&self, operation: &OperationInstance) -> bool {
553 self.operation_id == operation.operation_id
554 && self.session == operation.session
555 && self.owner_id == operation.owner_id
556 }
557
558 #[must_use]
559 pub fn provisional_usable(&self, state: &AgreementState) -> bool {
560 state.tracks_contract(self)
561 && state.level.at_least(self.usable_at)
562 && self.visibility.permits_use_at(state.level)
563 && !matches!(
564 state.finalization,
565 Some(FinalizationOutcome::Aborted)
566 | Some(FinalizationOutcome::Rejected)
567 | Some(FinalizationOutcome::TimedOut)
568 )
569 }
570
571 #[must_use]
572 pub fn finalization_admissible(
573 &self,
574 binding: &PrestateBinding,
575 evidence: &AgreementEvidence,
576 state: &AgreementState,
577 ) -> bool {
578 state.tracks_contract(self)
579 && binding.operation_id == self.operation_id
580 && binding.session == self.session
581 && evidence.satisfies_contract(self)
582 && evidence.level.at_least(self.finalized_at)
583 && state.level == self.finalized_at
584 && state.finalization == Some(FinalizationOutcome::Finalized)
585 }
586
587 #[must_use]
588 pub fn aborted_state(&self, state: &AgreementState) -> bool {
589 state.tracks_contract(self) && state.finalization == Some(FinalizationOutcome::Aborted)
590 }
591}
592
593impl AgreementEvidence {
594 #[must_use]
595 pub fn satisfies_contract(&self, contract: &AgreementContract) -> bool {
596 self.operation_id == contract.operation_id
597 && self.session == contract.session
598 && self.owner_id == contract.owner_id
599 && self.kind == contract.required_evidence_kind
600 && self.authoritative
601 }
602}
603
604impl AgreementState {
605 #[must_use]
606 pub fn tracks_contract(&self, contract: &AgreementContract) -> bool {
607 self.operation_id == contract.operation_id
608 && self.session == contract.session
609 && self.owner_id == contract.owner_id
610 && self.contract_name == contract.contract_name
611 }
612
613 #[must_use]
614 pub fn is_terminal(&self) -> bool {
615 self.finalization.is_some()
616 }
617}
618
619impl PublicationEvent {
620 #[must_use]
621 pub fn supports_agreement_evidence(&self, evidence: &AgreementEvidence) -> bool {
622 evidence.kind == AgreementEvidenceKind::Publication
623 && evidence.reference == self.publication_id
624 && evidence.operation_id == self.operation_id
625 && evidence.session == self.session
626 && evidence.owner_id == self.owner_id
627 && evidence.authoritative == (self.proof_ref.is_some() && self.handle_ref.is_some())
628 }
629}
630
631impl MaterializationProof {
632 #[must_use]
633 pub fn supports_agreement_evidence(&self, evidence: &AgreementEvidence) -> bool {
634 evidence.kind == AgreementEvidenceKind::Materialization
635 && evidence.reference == self.proof_id
636 && evidence.session == self.session
637 && evidence.authoritative == self.passed
638 }
639}
640
641impl CanonicalHandle {
642 #[must_use]
643 pub fn supports_agreement_evidence(&self, evidence: &AgreementEvidence) -> bool {
644 evidence.reference == self.handle_id
645 && evidence.session == self.session
646 && evidence.owner_id == self.owner_id
647 }
648}
649
650impl SemanticHandoff {
651 #[must_use]
652 pub fn relocates_agreement_state(&self, state: &AgreementState) -> bool {
653 state.session == Some(self.session)
654 && state.owner_id.as_deref() == Some(self.activated_owner_id.as_str())
655 }
656}
657
658impl OperationInstance {
659 #[must_use]
661 pub fn is_parity_critical(&self) -> bool {
662 self.requires_proof || self.terminal_publication.is_some()
663 }
664
665 #[must_use]
667 pub fn requires_explicit_finalization(&self) -> bool {
668 self.requires_proof || self.terminal_publication.as_deref() == Some("handoff.committed")
669 }
670
671 #[must_use]
673 pub fn commitment_aligned_with_agreement_state(&self, state: &AgreementState) -> bool {
674 self.operation_id == state.operation_id
675 && self.session == state.session
676 && (self.phase != OperationPhase::Succeeded
677 || state.finalization == Some(FinalizationOutcome::Finalized))
678 && (state.finalization != Some(FinalizationOutcome::Finalized)
679 || matches!(
680 self.phase,
681 OperationPhase::Succeeded | OperationPhase::HandedOff
682 ))
683 && (state.finalization != Some(FinalizationOutcome::Aborted)
684 || matches!(
685 self.phase,
686 OperationPhase::Failed | OperationPhase::Cancelled | OperationPhase::TimedOut
687 ))
688 }
689}
690
691impl ProgressState {
692 #[must_use]
694 pub fn is_terminal(self) -> bool {
695 matches!(
696 self,
697 Self::Succeeded | Self::Failed | Self::Cancelled | Self::TimedOut | Self::HandedOff
698 )
699 }
700
701 #[must_use]
703 pub fn expected_operation_phase(self) -> OperationPhase {
704 match self {
705 Self::Pending => OperationPhase::Pending,
706 Self::Blocked | Self::NoProgress | Self::Degraded => OperationPhase::Blocked,
707 Self::Succeeded => OperationPhase::Succeeded,
708 Self::Failed => OperationPhase::Failed,
709 Self::Cancelled => OperationPhase::Cancelled,
710 Self::TimedOut => OperationPhase::TimedOut,
711 Self::HandedOff => OperationPhase::HandedOff,
712 }
713 }
714}
715
716impl ProgressContract {
717 #[must_use]
719 pub fn is_terminal(&self) -> bool {
720 self.state.is_terminal()
721 }
722
723 #[must_use]
725 pub fn has_budget_discipline(&self) -> bool {
726 !self.bounded || self.budget_ticks.is_some()
727 }
728
729 #[must_use]
731 pub fn tracks_operation(&self, operation: &OperationInstance) -> bool {
732 self.operation_id == operation.operation_id
733 && self.session == operation.session
734 && operation.phase == self.state.expected_operation_phase()
735 }
736
737 #[must_use]
739 pub fn progress_measure(&self) -> u64 {
740 match self.state {
741 ProgressState::Pending => 4,
742 ProgressState::Blocked => 3,
743 ProgressState::NoProgress => 2,
744 ProgressState::Degraded => 1,
745 ProgressState::Succeeded
746 | ProgressState::Failed
747 | ProgressState::Cancelled
748 | ProgressState::TimedOut
749 | ProgressState::HandedOff => 0,
750 }
751 }
752
753 #[must_use]
755 pub fn synthetic_step(&self) -> Option<Self> {
756 match self.state {
757 ProgressState::Pending => Some(Self {
758 state: ProgressState::Blocked,
759 ..self.clone()
760 }),
761 ProgressState::Blocked => Some(Self {
762 state: ProgressState::NoProgress,
763 escalated_at_tick: self.escalated_at_tick.or(self.last_progress_tick),
764 ..self.clone()
765 }),
766 ProgressState::NoProgress => Some(Self {
767 state: ProgressState::Degraded,
768 escalated_at_tick: self.escalated_at_tick.or(self.last_progress_tick),
769 ..self.clone()
770 }),
771 ProgressState::Degraded => Some(Self {
772 state: if self.bounded {
773 ProgressState::TimedOut
774 } else {
775 ProgressState::Failed
776 },
777 escalated_at_tick: self.escalated_at_tick.or(self.last_progress_tick),
778 ..self.clone()
779 }),
780 ProgressState::Succeeded
781 | ProgressState::Failed
782 | ProgressState::Cancelled
783 | ProgressState::TimedOut
784 | ProgressState::HandedOff => None,
785 }
786 }
787}
788
789#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
791pub struct ProtocolMachineSemanticObjects {
792 #[serde(deserialize_with = "deserialize_semantic_objects_schema_version")]
793 pub schema_version: String,
794 pub operation_instances: Vec<OperationInstance>,
795 pub outstanding_effects: Vec<OutstandingEffect>,
796 pub semantic_handoffs: Vec<SemanticHandoff>,
797 #[serde(default)]
798 pub transformation_obligations: Vec<TransformationObligation>,
799 pub authoritative_reads: Vec<AuthoritativeRead>,
800 pub observed_reads: Vec<ObservedRead>,
801 pub materialization_proofs: Vec<MaterializationProof>,
802 pub canonical_handles: Vec<CanonicalHandle>,
803 #[serde(default)]
804 pub publication_events: Vec<PublicationEvent>,
805 #[serde(default)]
806 pub prestate_bindings: Vec<PrestateBinding>,
807 #[serde(default)]
808 pub agreement_profiles: Vec<AgreementProfile>,
809 #[serde(default)]
810 pub agreement_contracts: Vec<AgreementContract>,
811 #[serde(default)]
812 pub agreement_evidence: Vec<AgreementEvidence>,
813 #[serde(default)]
814 pub agreement_states: Vec<AgreementState>,
815 pub regions: Vec<Region>,
816 pub progress_contracts: Vec<ProgressContract>,
817 #[serde(default)]
818 pub progress_transitions: Vec<ProgressTransition>,
819}
820
821impl Default for ProtocolMachineSemanticObjects {
822 fn default() -> Self {
823 Self {
824 schema_version: canonical_semantic_objects_schema_version(),
825 operation_instances: Vec::new(),
826 outstanding_effects: Vec::new(),
827 semantic_handoffs: Vec::new(),
828 transformation_obligations: Vec::new(),
829 authoritative_reads: Vec::new(),
830 observed_reads: Vec::new(),
831 materialization_proofs: Vec::new(),
832 canonical_handles: Vec::new(),
833 publication_events: Vec::new(),
834 prestate_bindings: Vec::new(),
835 agreement_profiles: Vec::new(),
836 agreement_contracts: Vec::new(),
837 agreement_evidence: Vec::new(),
838 agreement_states: Vec::new(),
839 regions: Vec::new(),
840 progress_contracts: Vec::new(),
841 progress_transitions: Vec::new(),
842 }
843 }
844}
845
846impl ProtocolMachineSemanticObjects {
847 #[must_use]
849 pub const fn finalization(&self) -> ProtocolMachineFinalization<'_> {
850 ProtocolMachineFinalization { objects: self }
851 }
852
853 pub fn require_authoritative_read(&self, read_id: &str) -> Result<&AuthoritativeRead, String> {
859 self.finalization().require_authoritative_read(read_id)
860 }
861
862 pub fn require_canonical_handle(&self, handle_id: &str) -> Result<&CanonicalHandle, String> {
868 self.finalization().require_canonical_handle(handle_id)
869 }
870
871 #[must_use]
873 pub fn parity_critical_operations_have_canonical_handles(&self) -> bool {
874 self.operation_instances
875 .iter()
876 .filter(|operation| operation.requires_explicit_finalization())
877 .all(|operation| {
878 let path = self.finalization().path_for_operation(operation);
879 !path.canonical_handle_ids.is_empty()
880 || matches!(
881 path.stage,
882 FinalizationStage::Rejected | FinalizationStage::Invalidated
883 )
884 })
885 }
886
887 #[must_use]
889 pub fn parity_critical_operations_have_progress_contracts(&self) -> bool {
890 self.operation_instances
891 .iter()
892 .filter(|operation| operation.is_parity_critical())
893 .all(|operation| {
894 self.progress_contracts
895 .iter()
896 .any(|contract| contract.tracks_operation(operation))
897 })
898 }
899
900 #[must_use]
902 pub fn named_agreement_profile_available(&self, profile_name: &str) -> bool {
903 self.agreement_profiles
904 .iter()
905 .any(|profile| profile.profile_name == profile_name)
906 }
907
908 #[must_use]
910 pub fn agreement_contract_for_operation(&self, operation: &OperationInstance) -> bool {
911 self.agreement_contracts
912 .iter()
913 .any(|contract| contract.tracks_operation(operation))
914 }
915
916 #[must_use]
918 pub fn agreement_state_for_operation(&self, operation: &OperationInstance) -> bool {
919 self.agreement_states.iter().any(|state| {
920 state.operation_id == operation.operation_id && state.session == operation.session
921 })
922 }
923
924 #[must_use]
926 pub fn prestate_binding_for_operation(&self, operation: &OperationInstance) -> bool {
927 self.prestate_bindings
928 .iter()
929 .any(|binding| binding.binds_operation(operation))
930 }
931
932 #[must_use]
934 pub fn agreement_evidence_for_operation(&self, operation: &OperationInstance) -> bool {
935 self.agreement_evidence.iter().any(|evidence| {
936 evidence.operation_id == operation.operation_id && evidence.session == operation.session
937 })
938 }
939
940 #[must_use]
942 pub fn provisional_agreement_usable(&self, operation: &OperationInstance) -> bool {
943 self.agreement_contracts.iter().any(|contract| {
944 contract.tracks_operation(operation)
945 && self
946 .agreement_states
947 .iter()
948 .any(|state| contract.provisional_usable(state))
949 })
950 }
951
952 #[must_use]
954 pub fn finalization_backed(&self, operation: &OperationInstance) -> bool {
955 matches!(
956 self.finalization().path_for_operation(operation).stage,
957 FinalizationStage::Materialized
958 | FinalizationStage::Canonical
959 | FinalizationStage::Rejected
960 | FinalizationStage::Invalidated
961 ) && self.agreement_contracts.iter().any(|contract| {
962 contract.tracks_operation(operation)
963 && self.prestate_bindings.iter().any(|binding| {
964 self.agreement_evidence.iter().any(|evidence| {
965 self.agreement_states
966 .iter()
967 .any(|state| contract.finalization_admissible(binding, evidence, state))
968 })
969 })
970 })
971 }
972}
973
974impl<'a> ProtocolMachineFinalization<'a> {
975 pub fn require_authoritative_read(
981 self,
982 read_id: &str,
983 ) -> Result<&'a AuthoritativeRead, String> {
984 if let Some(read) = self
985 .objects
986 .authoritative_reads
987 .iter()
988 .find(|read| read.read_id == read_id)
989 {
990 return Ok(read);
991 }
992 if self
993 .objects
994 .observed_reads
995 .iter()
996 .any(|read| read.read_id == read_id)
997 {
998 return Err(format!(
999 "observed read `{read_id}` may not be consumed on a canonical semantic path; materialize it through authoritative evidence first"
1000 ));
1001 }
1002 Err(format!("semantic read `{read_id}` is unknown"))
1003 }
1004
1005 pub fn require_canonical_handle(self, handle_id: &str) -> Result<&'a CanonicalHandle, String> {
1011 self.objects
1012 .canonical_handles
1013 .iter()
1014 .find(|handle| handle.handle_id == handle_id)
1015 .ok_or_else(|| {
1016 format!("canonical handle `{handle_id}` is required on this parity-critical path")
1017 })
1018 }
1019
1020 #[must_use]
1022 pub fn observed_read_is_noncanonical(self, read_id: &str) -> bool {
1023 let Some(read) = self
1024 .objects
1025 .observed_reads
1026 .iter()
1027 .find(|candidate| candidate.read_id == read_id)
1028 else {
1029 return false;
1030 };
1031
1032 let operation_id = self
1033 .objects
1034 .outstanding_effects
1035 .iter()
1036 .find(|effect| effect.effect_id == read.effect_id)
1037 .map(|effect| effect.operation_id.as_str());
1038
1039 operation_id
1040 .and_then(|operation_id| self.path_for_operation_id(operation_id))
1041 .map_or(true, |path| !path.is_canonical())
1042 }
1043
1044 #[must_use]
1046 pub fn path_for_operation_id(self, operation_id: &str) -> Option<FinalizationPath> {
1047 self.objects
1048 .operation_instances
1049 .iter()
1050 .find(|operation| operation.operation_id == operation_id)
1051 .map(|operation| self.path_for_operation(operation))
1052 }
1053
1054 #[must_use]
1056 pub fn path_for_operation(self, operation: &OperationInstance) -> FinalizationPath {
1057 let observed_read_ids = self.observed_read_ids_for_operation(operation);
1058 let authoritative_read_ids = self.authoritative_read_ids_for_operation(operation);
1059 let publications = self.publications_for_operation(operation);
1060 let publication_ids = self.publication_ids(&publications);
1061 let rejected_publication_ids = self.rejected_publication_ids(&publications);
1062 let proof_ids = self.proof_ids_for_operation(operation, &publications);
1063 let canonical_handle_ids =
1064 self.canonical_handle_ids_for_operation(operation, &publications, &proof_ids);
1065 let invalidated_by_handoff_ids = self.invalidated_handoff_ids_for_operation(operation);
1066
1067 let read_class = match (
1068 observed_read_ids.is_empty(),
1069 authoritative_read_ids.is_empty(),
1070 ) {
1071 (true, true) => FinalizationReadClass::None,
1072 (false, true) => FinalizationReadClass::ObservedOnly,
1073 (true, false) => FinalizationReadClass::AuthoritativeOnly,
1074 (false, false) => FinalizationReadClass::Mixed,
1075 };
1076
1077 let stage = if !invalidated_by_handoff_ids.is_empty() {
1078 FinalizationStage::Invalidated
1079 } else if !rejected_publication_ids.is_empty() {
1080 FinalizationStage::Rejected
1081 } else if publications.iter().any(|publication| {
1082 publication.status == PublicationStatus::Published
1083 && publication.proof_ref.is_some()
1084 && publication.handle_ref.is_some()
1085 }) {
1086 FinalizationStage::Canonical
1087 } else if !proof_ids.is_empty() {
1088 FinalizationStage::Materialized
1089 } else if !authoritative_read_ids.is_empty() {
1090 FinalizationStage::Authoritative
1091 } else {
1092 FinalizationStage::Observed
1093 };
1094
1095 FinalizationPath {
1096 operation_id: operation.operation_id.clone(),
1097 session: operation.session,
1098 owner_id: operation.owner_id.clone(),
1099 read_class,
1100 stage,
1101 observed_read_ids,
1102 authoritative_read_ids,
1103 proof_ids,
1104 canonical_handle_ids,
1105 publication_ids,
1106 invalidated_by_handoff_ids,
1107 rejected_publication_ids,
1108 }
1109 }
1110
1111 fn observed_read_ids_for_operation(self, operation: &OperationInstance) -> Vec<String> {
1112 self.objects
1113 .observed_reads
1114 .iter()
1115 .filter(|read| {
1116 self.objects.outstanding_effects.iter().any(|effect| {
1117 effect.effect_id == read.effect_id
1118 && effect.operation_id == operation.operation_id
1119 })
1120 })
1121 .map(|read| read.read_id.clone())
1122 .collect()
1123 }
1124
1125 fn authoritative_read_ids_for_operation(self, operation: &OperationInstance) -> Vec<String> {
1126 self.objects
1127 .authoritative_reads
1128 .iter()
1129 .filter(|read| {
1130 read.session == operation.session
1131 && (read.owner_id == operation.owner_id
1132 || operation.owner_id.is_none()
1133 || read.owner_id.is_none())
1134 })
1135 .map(|read| read.read_id.clone())
1136 .collect()
1137 }
1138
1139 fn publications_for_operation(
1140 self,
1141 operation: &OperationInstance,
1142 ) -> Vec<&'a PublicationEvent> {
1143 self.objects
1144 .publication_events
1145 .iter()
1146 .filter(|publication| publication.operation_id == operation.operation_id)
1147 .collect()
1148 }
1149
1150 fn publication_ids(self, publications: &[&'a PublicationEvent]) -> Vec<String> {
1151 publications
1152 .iter()
1153 .map(|publication| publication.publication_id.clone())
1154 .collect()
1155 }
1156
1157 fn rejected_publication_ids(self, publications: &[&'a PublicationEvent]) -> Vec<String> {
1158 publications
1159 .iter()
1160 .filter(|publication| publication.status == PublicationStatus::Rejected)
1161 .map(|publication| publication.publication_id.clone())
1162 .collect()
1163 }
1164
1165 fn proof_ids_for_operation(
1166 self,
1167 operation: &OperationInstance,
1168 publications: &[&'a PublicationEvent],
1169 ) -> Vec<String> {
1170 let mut proof_ids: Vec<_> = publications
1171 .iter()
1172 .filter_map(|publication| publication.proof_ref.clone())
1173 .collect();
1174 if let Some(proof_id) = operation.operation_id.strip_prefix("materialization:") {
1175 push_unique(&mut proof_ids, proof_id.to_string());
1176 }
1177 proof_ids
1178 }
1179
1180 fn canonical_handle_ids_for_operation(
1181 self,
1182 operation: &OperationInstance,
1183 publications: &[&'a PublicationEvent],
1184 proof_ids: &[String],
1185 ) -> Vec<String> {
1186 let mut canonical_handle_ids: Vec<_> = publications
1187 .iter()
1188 .filter_map(|publication| publication.handle_ref.clone())
1189 .collect();
1190 for handle in &self.objects.canonical_handles {
1191 let proof_backed = handle
1192 .proof_ref
1193 .as_ref()
1194 .is_some_and(|proof_ref| proof_ids.contains(proof_ref));
1195 if proof_backed || handle.handle_id == operation.operation_id {
1196 push_unique(&mut canonical_handle_ids, handle.handle_id.clone());
1197 }
1198 }
1199 canonical_handle_ids
1200 }
1201
1202 fn invalidated_handoff_ids_for_operation(self, operation: &OperationInstance) -> Vec<u64> {
1203 self.objects
1204 .transformation_obligations
1205 .iter()
1206 .filter(|obligation| {
1207 obligation
1208 .affected_operation_ids
1209 .iter()
1210 .any(|operation_id| operation_id == &operation.operation_id)
1211 && (!obligation.invalidated_effect_ids.is_empty()
1212 || matches!(obligation.status, DelegationStatus::Committed))
1213 })
1214 .map(|obligation| obligation.handoff_id)
1215 .collect()
1216 }
1217}
1218
1219fn progress_state(status: OutstandingEffectStatus) -> ProgressState {
1220 match status {
1221 OutstandingEffectStatus::Pending => ProgressState::Pending,
1222 OutstandingEffectStatus::Blocked => ProgressState::Blocked,
1223 OutstandingEffectStatus::Succeeded => ProgressState::Succeeded,
1224 OutstandingEffectStatus::Failed => ProgressState::Failed,
1225 OutstandingEffectStatus::Cancelled => ProgressState::Cancelled,
1226 OutstandingEffectStatus::Invalidated => ProgressState::TimedOut,
1227 }
1228}
1229
1230fn progress_state_for_operation_phase(phase: OperationPhase) -> ProgressState {
1231 match phase {
1232 OperationPhase::Pending => ProgressState::Pending,
1233 OperationPhase::Blocked => ProgressState::Blocked,
1234 OperationPhase::Succeeded => ProgressState::Succeeded,
1235 OperationPhase::Failed => ProgressState::Failed,
1236 OperationPhase::Cancelled => ProgressState::Cancelled,
1237 OperationPhase::TimedOut => ProgressState::TimedOut,
1238 OperationPhase::HandedOff => ProgressState::HandedOff,
1239 }
1240}
1241
1242fn authority_read_lifecycle(event: AuthorityAuditEvent) -> Option<AuthoritativeReadLifecycle> {
1243 match event {
1244 AuthorityAuditEvent::Issued => Some(AuthoritativeReadLifecycle::Issued),
1245 AuthorityAuditEvent::Consumed => Some(AuthoritativeReadLifecycle::Consumed),
1246 AuthorityAuditEvent::Rejected => Some(AuthoritativeReadLifecycle::Rejected),
1247 AuthorityAuditEvent::Invalidated
1248 | AuthorityAuditEvent::Committed
1249 | AuthorityAuditEvent::RolledBack
1250 | AuthorityAuditEvent::Expired => None,
1251 }
1252}
1253
1254fn authority_read_from_record(record: &AuthorityAuditRecord) -> Option<AuthoritativeRead> {
1255 let lifecycle = authority_read_lifecycle(record.event)?;
1256 match &record.artifact {
1257 AuthorityArtifact::Readiness(witness) => Some(AuthoritativeRead {
1258 read_id: format!("readiness:{}:{:?}", witness.witness_id, record.event),
1259 session: Some(witness.session_id),
1260 owner_id: Some(witness.owner_id.clone()),
1261 kind: AuthoritativeReadKind::Readiness,
1262 lifecycle,
1263 predicate_ref: Some(witness.predicate_ref.clone()),
1264 witness_id: Some(witness.witness_id),
1265 generation: Some(witness.generation),
1266 reason: record.reason.clone(),
1267 }),
1268 AuthorityArtifact::Cancellation(witness) => Some(AuthoritativeRead {
1269 read_id: format!("cancellation:{}:{:?}", witness.witness_id, record.event),
1270 session: Some(witness.session_id),
1271 owner_id: Some(witness.owner_id.clone()),
1272 kind: AuthoritativeReadKind::Cancellation,
1273 lifecycle,
1274 predicate_ref: None,
1275 witness_id: Some(witness.witness_id),
1276 generation: Some(witness.generation),
1277 reason: record.reason.clone(),
1278 }),
1279 AuthorityArtifact::Timeout(witness) => Some(AuthoritativeRead {
1280 read_id: format!("timeout:{}:{:?}", witness.witness_id, record.event),
1281 session: None,
1282 owner_id: None,
1283 kind: AuthoritativeReadKind::Timeout,
1284 lifecycle,
1285 predicate_ref: None,
1286 witness_id: Some(witness.witness_id),
1287 generation: None,
1288 reason: record.reason.clone(),
1289 }),
1290 AuthorityArtifact::OwnershipCapability(_) | AuthorityArtifact::OwnershipReceipt(_) => None,
1291 }
1292}
1293
1294fn proof_id(check: &OutputConditionCheck) -> String {
1295 format!("{}:{}", check.meta.predicate_ref, check.meta.output_digest)
1296}
1297
1298fn coro_owner_id(coro_id: usize) -> String {
1299 format!("coro:{coro_id}")
1300}
1301
1302fn transformed_fragments(scope: &OwnershipScope) -> Vec<String> {
1303 match scope {
1304 OwnershipScope::Session => vec!["session".to_string()],
1305 OwnershipScope::Fragments(fragments) => fragments.clone(),
1306 }
1307}
1308
1309fn semantic_ownership_scope(scope: &crate::session::OwnershipScope) -> OwnershipScope {
1310 match scope {
1311 crate::session::OwnershipScope::Session => OwnershipScope::Session,
1312 crate::session::OwnershipScope::Fragments(fragments) => {
1313 OwnershipScope::Fragments(fragments.iter().cloned().collect())
1314 }
1315 }
1316}
1317
1318fn semantic_delegation_status(
1319 status: &crate::transfer_semantics::DelegationStatus,
1320) -> DelegationStatus {
1321 match status {
1322 crate::transfer_semantics::DelegationStatus::Committed => DelegationStatus::Committed,
1323 crate::transfer_semantics::DelegationStatus::RolledBack => DelegationStatus::RolledBack,
1324 }
1325}
1326
1327fn publication_observer_class(publication: &str) -> PublicationObserverClass {
1328 if publication.starts_with("handoff.") || publication.starts_with("materialization.") {
1329 PublicationObserverClass::Audit
1330 } else {
1331 PublicationObserverClass::Canonical
1332 }
1333}
1334
1335fn progress_state_rank(state: ProgressState) -> u8 {
1336 match state {
1337 ProgressState::Pending => 0,
1338 ProgressState::Blocked => 1,
1339 ProgressState::NoProgress => 2,
1340 ProgressState::Degraded => 3,
1341 ProgressState::Succeeded => 4,
1342 ProgressState::Failed => 5,
1343 ProgressState::Cancelled => 6,
1344 ProgressState::TimedOut => 7,
1345 ProgressState::HandedOff => 8,
1346 }
1347}
1348
1349fn agreement_level_rank(level: AgreementLevel) -> u8 {
1350 level.rank()
1351}
1352
1353fn operation_visibility(operation: &OperationInstance) -> OperationVisibility {
1354 if operation.requires_proof {
1355 OperationVisibility::BlockedUntilFinalized
1356 } else if operation.terminal_publication.is_some() {
1357 OperationVisibility::Pending
1358 } else {
1359 OperationVisibility::Immediate
1360 }
1361}
1362
1363fn agreement_rule(operation: &OperationInstance) -> AgreementRule {
1364 if operation.requires_proof {
1365 AgreementRule::Named {
1366 rule_name: "proof_finalization".to_string(),
1367 }
1368 } else if !operation.dependent_operation_ids.is_empty() {
1369 AgreementRule::Named {
1370 rule_name: "dependent_work".to_string(),
1371 }
1372 } else {
1373 AgreementRule::NoAgreement
1374 }
1375}
1376
1377fn agreement_profile_name(operation: &OperationInstance) -> String {
1378 if operation.requires_proof {
1379 "ProofFinalization".to_string()
1380 } else if !operation.dependent_operation_ids.is_empty() {
1381 "DependentWork".to_string()
1382 } else if operation.terminal_publication.is_some() {
1383 "PendingPublication".to_string()
1384 } else {
1385 "ImmediateLocal".to_string()
1386 }
1387}
1388
1389fn required_agreement_evidence_kind(operation: &OperationInstance) -> AgreementEvidenceKind {
1390 if operation.requires_proof || operation.terminal_publication.is_some() {
1391 AgreementEvidenceKind::Publication
1392 } else {
1393 AgreementEvidenceKind::Witness
1394 }
1395}
1396
1397fn usable_agreement_level(operation: &OperationInstance) -> AgreementLevel {
1398 match operation_visibility(operation) {
1399 OperationVisibility::Immediate => AgreementLevel::None,
1400 OperationVisibility::Pending => AgreementLevel::Provisional,
1401 OperationVisibility::BlockedUntilFinalized => AgreementLevel::Finalized,
1402 }
1403}
1404
1405fn finalized_agreement_level(operation: &OperationInstance) -> AgreementLevel {
1406 match operation_visibility(operation) {
1407 OperationVisibility::Immediate => AgreementLevel::None,
1408 OperationVisibility::Pending | OperationVisibility::BlockedUntilFinalized => {
1409 AgreementLevel::Finalized
1410 }
1411 }
1412}
1413
1414fn push_unique<T: PartialEq>(items: &mut Vec<T>, item: T) {
1415 if !items.iter().any(|existing| existing == &item) {
1416 items.push(item);
1417 }
1418}
1419
1420fn absorb_region_owner(region: &mut Region, owner_id: Option<&FragmentOwnerId>) {
1421 if region.owner_id.is_none() {
1422 region.owner_id = owner_id.cloned();
1423 }
1424}
1425
1426fn derive_regions(
1427 operation_instances: &[OperationInstance],
1428 outstanding_effects: &[OutstandingEffect],
1429 authoritative_reads: &[AuthoritativeRead],
1430 canonical_handles: &[CanonicalHandle],
1431 publication_events: &BTreeMap<String, PublicationEvent>,
1432 progress_contracts: &[ProgressContract],
1433) -> Vec<Region> {
1434 fn region_entry(regions: &mut BTreeMap<SessionId, Region>, session: SessionId) -> &mut Region {
1435 regions.entry(session).or_insert_with(|| Region {
1436 region_id: format!("session:{session}"),
1437 session: Some(session),
1438 owner_id: None,
1439 scope: OwnershipScope::Session,
1440 operation_ids: Vec::new(),
1441 effect_ids: Vec::new(),
1442 authoritative_read_ids: Vec::new(),
1443 handle_ids: Vec::new(),
1444 publication_ids: Vec::new(),
1445 })
1446 }
1447
1448 let mut regions = BTreeMap::<SessionId, Region>::new();
1449
1450 for operation in operation_instances {
1451 let Some(session) = operation.session else {
1452 continue;
1453 };
1454 let region = region_entry(&mut regions, session);
1455 absorb_region_owner(region, operation.owner_id.as_ref());
1456 push_unique(&mut region.operation_ids, operation.operation_id.clone());
1457 }
1458
1459 for effect in outstanding_effects {
1460 let Some(session) = effect.session else {
1461 continue;
1462 };
1463 let region = region_entry(&mut regions, session);
1464 absorb_region_owner(region, effect.owner_id.as_ref());
1465 push_unique(&mut region.effect_ids, effect.effect_id);
1466 push_unique(&mut region.operation_ids, effect.operation_id.clone());
1467 }
1468
1469 for read in authoritative_reads {
1470 let Some(session) = read.session else {
1471 continue;
1472 };
1473 let region = region_entry(&mut regions, session);
1474 absorb_region_owner(region, read.owner_id.as_ref());
1475 push_unique(&mut region.authoritative_read_ids, read.read_id.clone());
1476 }
1477
1478 for handle in canonical_handles {
1479 let Some(session) = handle.session else {
1480 continue;
1481 };
1482 let region = region_entry(&mut regions, session);
1483 absorb_region_owner(region, handle.owner_id.as_ref());
1484 push_unique(&mut region.handle_ids, handle.handle_id.clone());
1485 }
1486
1487 for publication in publication_events.values() {
1488 let Some(session) = publication.session else {
1489 continue;
1490 };
1491 let region = region_entry(&mut regions, session);
1492 absorb_region_owner(region, publication.owner_id.as_ref());
1493 push_unique(
1494 &mut region.publication_ids,
1495 publication.publication_id.clone(),
1496 );
1497 push_unique(&mut region.operation_ids, publication.operation_id.clone());
1498 }
1499
1500 for contract in progress_contracts {
1501 let Some(session) = contract.session else {
1502 continue;
1503 };
1504 let region = region_entry(&mut regions, session);
1505 push_unique(&mut region.operation_ids, contract.operation_id.clone());
1506 }
1507
1508 regions.into_values().collect()
1509}
1510
1511fn canonicalize(mut out: ProtocolMachineSemanticObjects) -> ProtocolMachineSemanticObjects {
1512 out.operation_instances
1513 .sort_by_key(|lhs| lhs.operation_id.clone());
1514 out.outstanding_effects.sort_by_key(|lhs| lhs.effect_id);
1515 out.semantic_handoffs.sort_by_key(|lhs| lhs.handoff_id);
1516 out.transformation_obligations
1517 .sort_by_key(|lhs| lhs.handoff_id);
1518 out.authoritative_reads
1519 .sort_by_key(|lhs| lhs.read_id.clone());
1520 out.observed_reads.sort_by_key(|lhs| lhs.read_id.clone());
1521 out.materialization_proofs
1522 .sort_by_key(|lhs| lhs.proof_id.clone());
1523 out.canonical_handles
1524 .sort_by_key(|lhs| lhs.handle_id.clone());
1525 out.publication_events
1526 .sort_by_key(|lhs| lhs.publication_id.clone());
1527 out.prestate_bindings
1528 .sort_by_key(|lhs| lhs.binding_id.clone());
1529 out.agreement_profiles
1530 .sort_by_key(|lhs| lhs.profile_name.clone());
1531 out.agreement_contracts
1532 .sort_by_key(|lhs| lhs.contract_name.clone());
1533 out.agreement_evidence
1534 .sort_by_key(|lhs| lhs.evidence_id.clone());
1535 out.agreement_states.sort_by(|lhs, rhs| {
1536 (
1537 &lhs.operation_id,
1538 &lhs.contract_name,
1539 agreement_level_rank(lhs.level),
1540 lhs.last_updated_tick.unwrap_or(0),
1541 )
1542 .cmp(&(
1543 &rhs.operation_id,
1544 &rhs.contract_name,
1545 agreement_level_rank(rhs.level),
1546 rhs.last_updated_tick.unwrap_or(0),
1547 ))
1548 });
1549 out.regions.sort_by_key(|lhs| lhs.region_id.clone());
1550 for region in &mut out.regions {
1551 region.operation_ids.sort();
1552 region.operation_ids.dedup();
1553 region.effect_ids.sort();
1554 region.effect_ids.dedup();
1555 region.authoritative_read_ids.sort();
1556 region.authoritative_read_ids.dedup();
1557 region.handle_ids.sort();
1558 region.handle_ids.dedup();
1559 region.publication_ids.sort();
1560 region.publication_ids.dedup();
1561 }
1562 out.progress_contracts
1563 .sort_by_key(|lhs| lhs.operation_id.clone());
1564 out.progress_transitions.sort_by(|lhs, rhs| {
1565 (
1566 lhs.tick,
1567 &lhs.operation_id,
1568 progress_state_rank(lhs.from_state),
1569 progress_state_rank(lhs.to_state),
1570 )
1571 .cmp(&(
1572 rhs.tick,
1573 &rhs.operation_id,
1574 progress_state_rank(rhs.from_state),
1575 progress_state_rank(rhs.to_state),
1576 ))
1577 });
1578 out
1579}
1580
1581#[must_use]
1584#[allow(clippy::too_many_lines)]
1585pub fn protocol_machine_semantic_objects(
1586 authority_audit_log: &[AuthorityAuditRecord],
1587 delegation_audit_log: &[DelegationAuditRecord],
1588 operation_instances: &[OperationInstance],
1589 outstanding_effects: &[OutstandingEffect],
1590 output_condition_checks: &[OutputConditionCheck],
1591 progress_contracts: &[ProgressContract],
1592 progress_transitions: &[ProgressTransition],
1593) -> ProtocolMachineSemanticObjects {
1594 let mut operation_instances = operation_instances.to_vec();
1595 let outstanding_effects = outstanding_effects.to_vec();
1596
1597 let semantic_handoffs: Vec<_> = delegation_audit_log
1598 .iter()
1599 .map(|record| SemanticHandoff {
1600 handoff_id: record.receipt.receipt_id,
1601 session: record.receipt.session,
1602 from_coro: record.receipt.from_coro,
1603 to_coro: record.receipt.to_coro,
1604 revoked_owner_id: coro_owner_id(record.receipt.from_coro),
1605 activated_owner_id: coro_owner_id(record.receipt.to_coro),
1606 scope: semantic_ownership_scope(&record.receipt.scope),
1607 status: semantic_delegation_status(&record.status),
1608 tick: record.tick,
1609 reason: record.reason.clone(),
1610 })
1611 .collect();
1612
1613 let transformation_obligations: Vec<_> = delegation_audit_log
1614 .iter()
1615 .map(|record| {
1616 let affected_operation_ids: Vec<_> = operation_instances
1617 .iter()
1618 .filter(|operation| operation.session == Some(record.receipt.session))
1619 .map(|operation| operation.operation_id.clone())
1620 .collect();
1621 let affected_effect_ids: Vec<_> = outstanding_effects
1622 .iter()
1623 .filter(|effect| effect.session == Some(record.receipt.session))
1624 .map(|effect| effect.effect_id)
1625 .collect();
1626 let transported_effect_ids: Vec<_> = outstanding_effects
1627 .iter()
1628 .filter(|effect| {
1629 effect.session == Some(record.receipt.session)
1630 && matches!(effect.status, OutstandingEffectStatus::Pending)
1631 })
1632 .map(|effect| effect.effect_id)
1633 .collect();
1634 let invalidated_effect_ids: Vec<_> = outstanding_effects
1635 .iter()
1636 .filter(|effect| {
1637 effect.session == Some(record.receipt.session)
1638 && matches!(effect.status, OutstandingEffectStatus::Invalidated)
1639 })
1640 .map(|effect| effect.effect_id)
1641 .collect();
1642
1643 TransformationObligation {
1644 obligation_id: format!("handoff:{}", record.receipt.receipt_id),
1645 handoff_id: record.receipt.receipt_id,
1646 session: record.receipt.session,
1647 transformed_fragments: transformed_fragments(&semantic_ownership_scope(
1648 &record.receipt.scope,
1649 )),
1650 affected_operation_ids,
1651 affected_effect_ids,
1652 transported_effect_ids,
1653 invalidated_effect_ids,
1654 witness_policy: if matches!(
1655 semantic_delegation_status(&record.status),
1656 DelegationStatus::Committed
1657 ) {
1658 "transport_pending_invalidate_blocked".to_string()
1659 } else {
1660 "rollback".to_string()
1661 },
1662 publication_revoked_from: coro_owner_id(record.receipt.from_coro),
1663 publication_activated_to: coro_owner_id(record.receipt.to_coro),
1664 scope: semantic_ownership_scope(&record.receipt.scope),
1665 status: semantic_delegation_status(&record.status),
1666 tick: record.tick,
1667 reason: record.reason.clone(),
1668 }
1669 })
1670 .collect();
1671
1672 operation_instances.extend(semantic_handoffs.iter().map(|handoff| OperationInstance {
1673 operation_id: format!("handoff:{}", handoff.handoff_id),
1674 session: Some(handoff.session),
1675 owner_id: None,
1676 kind: "semantic_handoff".to_string(),
1677 phase: OperationPhase::HandedOff,
1678 handler_identity: None,
1679 effect_ids: Vec::new(),
1680 dependent_operation_ids: Vec::new(),
1681 terminal_publication: Some("handoff.committed".to_string()),
1682 budget_ticks: Some(1),
1683 requires_proof: false,
1684 }));
1685
1686 let mut authoritative_reads: Vec<_> = authority_audit_log
1687 .iter()
1688 .filter_map(authority_read_from_record)
1689 .collect();
1690 authoritative_reads.extend(
1691 output_condition_checks
1692 .iter()
1693 .map(|check| AuthoritativeRead {
1694 read_id: format!("output_condition:{}", proof_id(check)),
1695 session: None,
1696 owner_id: None,
1697 kind: AuthoritativeReadKind::OutputCondition,
1698 lifecycle: if check.passed {
1699 AuthoritativeReadLifecycle::Verified
1700 } else {
1701 AuthoritativeReadLifecycle::Rejected
1702 },
1703 predicate_ref: Some(check.meta.predicate_ref.clone()),
1704 witness_id: None,
1705 generation: None,
1706 reason: (!check.passed).then(|| "output condition failed".to_string()),
1707 }),
1708 );
1709
1710 let observed_reads: Vec<_> = outstanding_effects
1711 .iter()
1712 .map(|effect| ObservedRead {
1713 read_id: format!("effect:{}", effect.effect_id),
1714 session: effect.session,
1715 effect_id: effect.effect_id,
1716 effect_interface: effect.effect_interface.clone(),
1717 effect_operation: effect.effect_operation.clone(),
1718 handler_identity: effect.handler_identity.clone(),
1719 status: effect.status,
1720 })
1721 .collect();
1722
1723 let materialization_proofs: Vec<_> = output_condition_checks
1724 .iter()
1725 .map(|check| MaterializationProof {
1726 proof_id: proof_id(check),
1727 session: None,
1728 predicate_ref: check.meta.predicate_ref.clone(),
1729 witness_ref: check.meta.witness_ref.clone(),
1730 output_digest: check.meta.output_digest.clone(),
1731 passed: check.passed,
1732 })
1733 .collect();
1734
1735 operation_instances.extend(
1736 materialization_proofs
1737 .iter()
1738 .map(|proof| OperationInstance {
1739 operation_id: format!("materialization:{}", proof.proof_id),
1740 session: proof.session,
1741 owner_id: None,
1742 kind: "materialization".to_string(),
1743 phase: if proof.passed {
1744 OperationPhase::Succeeded
1745 } else {
1746 OperationPhase::Failed
1747 },
1748 handler_identity: None,
1749 effect_ids: Vec::new(),
1750 dependent_operation_ids: Vec::new(),
1751 terminal_publication: Some(if proof.passed {
1752 "materialization.succeeded".to_string()
1753 } else {
1754 "materialization.failed".to_string()
1755 }),
1756 budget_ticks: Some(1),
1757 requires_proof: true,
1758 }),
1759 );
1760
1761 let mut canonical_handles: Vec<_> = materialization_proofs
1762 .iter()
1763 .filter(|proof| proof.passed)
1764 .map(|proof| CanonicalHandle {
1765 handle_id: format!("materialization:{}", proof.output_digest),
1766 session: proof.session,
1767 owner_id: None,
1768 kind: CanonicalHandleKind::Materialization,
1769 proof_ref: Some(proof.proof_id.clone()),
1770 })
1771 .collect();
1772 canonical_handles.extend(
1773 semantic_handoffs
1774 .iter()
1775 .filter(|handoff| matches!(handoff.status, DelegationStatus::Committed))
1776 .map(|handoff| CanonicalHandle {
1777 handle_id: format!("handoff:{}", handoff.handoff_id),
1778 session: Some(handoff.session),
1779 owner_id: None,
1780 kind: CanonicalHandleKind::Handoff,
1781 proof_ref: Some(format!("handoff:{}", handoff.handoff_id)),
1782 }),
1783 );
1784
1785 let proof_by_operation: BTreeMap<String, String> = materialization_proofs
1786 .iter()
1787 .filter(|proof| proof.passed)
1788 .map(|proof| {
1789 (
1790 format!("materialization:{}", proof.proof_id),
1791 proof.proof_id.clone(),
1792 )
1793 })
1794 .collect();
1795 let handle_by_proof: BTreeMap<String, String> = canonical_handles
1796 .iter()
1797 .filter_map(|handle| {
1798 handle
1799 .proof_ref
1800 .as_ref()
1801 .map(|proof_ref| (proof_ref.clone(), handle.handle_id.clone()))
1802 })
1803 .collect();
1804 let mut publication_events = BTreeMap::<String, PublicationEvent>::new();
1805 for operation in &operation_instances {
1806 let Some(publication) = operation.terminal_publication.as_ref() else {
1807 continue;
1808 };
1809 let proof_ref = proof_by_operation
1810 .get(&operation.operation_id)
1811 .cloned()
1812 .or_else(|| {
1813 operation
1814 .operation_id
1815 .strip_prefix("handoff:")
1816 .map(|suffix| format!("handoff:{suffix}"))
1817 });
1818 let handle_ref = proof_ref
1819 .as_ref()
1820 .and_then(|proof_ref| handle_by_proof.get(proof_ref).cloned());
1821 let (status, reason) = if operation.requires_proof && handle_ref.is_none() {
1822 (
1823 PublicationStatus::Rejected,
1824 Some("proof-bearing success required".to_string()),
1825 )
1826 } else {
1827 (PublicationStatus::Published, None)
1828 };
1829 let publication_id = format!("{}:{publication}", operation.operation_id);
1830 publication_events
1831 .entry(publication_id.clone())
1832 .or_insert(PublicationEvent {
1833 publication_id,
1834 session: operation.session,
1835 operation_id: operation.operation_id.clone(),
1836 owner_id: operation.owner_id.clone(),
1837 publication: publication.clone(),
1838 observer_class: publication_observer_class(publication),
1839 status,
1840 proof_ref,
1841 handle_ref,
1842 reason,
1843 });
1844 }
1845
1846 let mut agreement_evidence: Vec<_> = publication_events
1847 .values()
1848 .map(|event| AgreementEvidence {
1849 evidence_id: format!("publication:{}", event.publication_id),
1850 operation_id: event.operation_id.clone(),
1851 session: event.session,
1852 owner_id: event.owner_id.clone(),
1853 level: if event.proof_ref.is_some() && event.handle_ref.is_some() {
1854 AgreementLevel::Finalized
1855 } else {
1856 AgreementLevel::Provisional
1857 },
1858 kind: AgreementEvidenceKind::Publication,
1859 reference: event.publication_id.clone(),
1860 authoritative: event.owner_id.is_some()
1861 && event.proof_ref.is_some()
1862 && event.handle_ref.is_some(),
1863 })
1864 .collect();
1865 agreement_evidence.extend(
1866 materialization_proofs
1867 .iter()
1868 .map(|proof| AgreementEvidence {
1869 evidence_id: format!("materialization:{}", proof.proof_id),
1870 operation_id: format!("materialization:{}", proof.proof_id),
1871 session: proof.session,
1872 owner_id: None,
1873 level: if proof.passed {
1874 AgreementLevel::Finalized
1875 } else {
1876 AgreementLevel::Provisional
1877 },
1878 kind: AgreementEvidenceKind::Materialization,
1879 reference: proof.proof_id.clone(),
1880 authoritative: proof.passed,
1881 }),
1882 );
1883
1884 let mut agreement_profiles_by_name = BTreeMap::<String, AgreementProfile>::new();
1885 let mut agreement_contracts = Vec::new();
1886 let mut agreement_states = Vec::new();
1887 let mut prestate_bindings = Vec::new();
1888
1889 for operation in &operation_instances {
1890 let visibility = operation_visibility(operation);
1891 let rule = agreement_rule(operation);
1892 let profile_name = agreement_profile_name(operation);
1893 let usable_at = usable_agreement_level(operation);
1894 let finalized_at = finalized_agreement_level(operation);
1895 let required_evidence_kind = required_agreement_evidence_kind(operation);
1896
1897 agreement_profiles_by_name
1898 .entry(profile_name.clone())
1899 .or_insert_with(|| AgreementProfile {
1900 profile_name: profile_name.clone(),
1901 visibility,
1902 rule: rule.clone(),
1903 usable_at,
1904 finalized_at,
1905 required_evidence_kind,
1906 });
1907
1908 let contract_name = format!("agreement:{}", operation.operation_id);
1909 agreement_contracts.push(AgreementContract {
1910 contract_name: contract_name.clone(),
1911 operation_id: operation.operation_id.clone(),
1912 session: operation.session,
1913 owner_id: operation.owner_id.clone(),
1914 profile_name: Some(profile_name),
1915 visibility,
1916 rule,
1917 usable_at,
1918 finalized_at,
1919 required_evidence_kind,
1920 });
1921
1922 prestate_bindings.push(PrestateBinding {
1923 binding_id: format!("prestate:{}", operation.operation_id),
1924 operation_id: operation.operation_id.clone(),
1925 session: operation.session,
1926 state_digest: format!(
1927 "{}:{:?}:{}",
1928 operation.kind,
1929 operation.phase,
1930 operation.terminal_publication.as_deref().unwrap_or("none")
1931 ),
1932 epoch_ref: operation.budget_ticks.map(|ticks| format!("ticks:{ticks}")),
1933 participant_digest: operation.owner_id.clone(),
1934 });
1935
1936 let evidence_ids: Vec<_> = agreement_evidence
1937 .iter()
1938 .filter(|evidence| {
1939 evidence.operation_id == operation.operation_id
1940 && evidence.session == operation.session
1941 })
1942 .map(|evidence| evidence.evidence_id.clone())
1943 .collect();
1944 let finalized = agreement_evidence.iter().any(|evidence| {
1945 evidence.operation_id == operation.operation_id
1946 && evidence.session == operation.session
1947 && evidence.level.at_least(finalized_at)
1948 && evidence.authoritative
1949 });
1950 let (level, finalization) = match operation.phase {
1951 OperationPhase::Pending => (AgreementLevel::None, None),
1952 OperationPhase::Blocked => (AgreementLevel::Provisional, None),
1953 OperationPhase::Succeeded => {
1954 if finalized {
1955 (
1956 AgreementLevel::Finalized,
1957 Some(FinalizationOutcome::Finalized),
1958 )
1959 } else if matches!(visibility, OperationVisibility::Immediate) {
1960 (AgreementLevel::None, None)
1961 } else {
1962 (AgreementLevel::SoftSafe, None)
1963 }
1964 }
1965 OperationPhase::Failed | OperationPhase::Cancelled => (
1966 AgreementLevel::Provisional,
1967 Some(FinalizationOutcome::Aborted),
1968 ),
1969 OperationPhase::TimedOut => (
1970 AgreementLevel::Provisional,
1971 Some(FinalizationOutcome::TimedOut),
1972 ),
1973 OperationPhase::HandedOff => (AgreementLevel::SoftSafe, None),
1974 };
1975
1976 agreement_states.push(AgreementState {
1977 operation_id: operation.operation_id.clone(),
1978 session: operation.session,
1979 owner_id: operation.owner_id.clone(),
1980 contract_name,
1981 level,
1982 finalization,
1983 evidence_ids,
1984 last_updated_tick: operation.budget_ticks,
1985 reason: None,
1986 });
1987 }
1988
1989 let agreement_profiles = agreement_profiles_by_name.into_values().collect();
1990
1991 let mut progress_contracts: Vec<_> = progress_contracts.to_vec();
1992 for effect in &outstanding_effects {
1993 if progress_contracts
1994 .iter()
1995 .any(|contract| contract.operation_id == effect.operation_id)
1996 {
1997 continue;
1998 }
1999 progress_contracts.push(ProgressContract {
2000 operation_id: effect.operation_id.clone(),
2001 session: effect.session,
2002 state: progress_state(effect.status),
2003 last_ordering_key: Some(effect.ordering_key),
2004 bounded: true,
2005 budget_ticks: effect.budget_ticks,
2006 last_progress_tick: Some(effect.ordering_key),
2007 escalated_at_tick: None,
2008 reason: None,
2009 });
2010 }
2011 for handoff in &semantic_handoffs {
2012 let operation_id = format!("handoff:{}", handoff.handoff_id);
2013 if progress_contracts
2014 .iter()
2015 .any(|contract| contract.operation_id == operation_id)
2016 {
2017 continue;
2018 }
2019 progress_contracts.push(ProgressContract {
2020 operation_id,
2021 session: Some(handoff.session),
2022 state: ProgressState::HandedOff,
2023 last_ordering_key: Some(handoff.tick),
2024 bounded: true,
2025 budget_ticks: Some(1),
2026 last_progress_tick: Some(handoff.tick),
2027 escalated_at_tick: None,
2028 reason: handoff.reason.clone(),
2029 });
2030 }
2031 for operation in &operation_instances {
2032 if !operation.is_parity_critical()
2033 || progress_contracts
2034 .iter()
2035 .any(|contract| contract.tracks_operation(operation))
2036 {
2037 continue;
2038 }
2039 progress_contracts.push(ProgressContract {
2040 operation_id: operation.operation_id.clone(),
2041 session: operation.session,
2042 state: progress_state_for_operation_phase(operation.phase),
2043 last_ordering_key: None,
2044 bounded: operation.budget_ticks.is_some(),
2045 budget_ticks: operation.budget_ticks,
2046 last_progress_tick: None,
2047 escalated_at_tick: None,
2048 reason: None,
2049 });
2050 }
2051
2052 let regions = derive_regions(
2053 &operation_instances,
2054 &outstanding_effects,
2055 &authoritative_reads,
2056 &canonical_handles,
2057 &publication_events,
2058 &progress_contracts,
2059 );
2060
2061 canonicalize(ProtocolMachineSemanticObjects {
2062 schema_version: canonical_semantic_objects_schema_version(),
2063 operation_instances,
2064 outstanding_effects,
2065 semantic_handoffs,
2066 transformation_obligations,
2067 authoritative_reads,
2068 observed_reads,
2069 materialization_proofs,
2070 canonical_handles,
2071 publication_events: publication_events.into_values().collect(),
2072 prestate_bindings,
2073 agreement_profiles,
2074 agreement_contracts,
2075 agreement_evidence,
2076 agreement_states,
2077 regions,
2078 progress_contracts,
2079 progress_transitions: progress_transitions.to_vec(),
2080 })
2081}
2082
2083#[cfg(test)]
2084mod semantic_object_tests {
2085 use super::*;
2086
2087 fn agreement_semantics_fixture() -> ProtocolMachineSemanticObjects {
2088 let output_condition = OutputConditionCheck {
2089 meta: crate::output_condition::OutputConditionMeta {
2090 predicate_ref: "agreement.ready".to_string(),
2091 witness_ref: Some("accepted".to_string()),
2092 output_digest: "digest:ready".to_string(),
2093 },
2094 passed: true,
2095 };
2096 let operations = vec![
2097 operation_fixture("cancelled:op", "cancelled", OperationPhase::Cancelled)
2098 .with_publication("cancelled"),
2099 operation_fixture("timed_out:op", "timed_out", OperationPhase::TimedOut)
2100 .with_publication("timed_out")
2101 .with_budget(5),
2102 operation_fixture("degraded:op", "degraded", OperationPhase::Blocked)
2103 .with_publication("degraded")
2104 .with_budget(8)
2105 .with_dependencies(&["child:1"]),
2106 ];
2107 let progress_contracts = vec![ProgressContract {
2108 operation_id: "degraded:op".to_string(),
2109 session: Some(1),
2110 state: ProgressState::Degraded,
2111 last_ordering_key: Some(8),
2112 bounded: true,
2113 budget_ticks: Some(8),
2114 last_progress_tick: Some(6),
2115 escalated_at_tick: Some(8),
2116 reason: Some("timeout witness escalated".to_string()),
2117 }];
2118 let progress_transitions = vec![ProgressTransition {
2119 operation_id: "degraded:op".to_string(),
2120 session: Some(1),
2121 from_state: ProgressState::Blocked,
2122 to_state: ProgressState::Degraded,
2123 tick: 8,
2124 reason: Some("timeout witness escalated".to_string()),
2125 }];
2126
2127 protocol_machine_semantic_objects(
2128 &[],
2129 &[],
2130 &operations,
2131 &[],
2132 &[output_condition],
2133 &progress_contracts,
2134 &progress_transitions,
2135 )
2136 }
2137
2138 fn operation_fixture(
2139 operation_id: &str,
2140 kind: &str,
2141 phase: OperationPhase,
2142 ) -> OperationInstance {
2143 OperationInstance {
2144 operation_id: operation_id.to_string(),
2145 session: Some(1),
2146 owner_id: Some("owner/A".to_string()),
2147 kind: kind.to_string(),
2148 phase,
2149 handler_identity: None,
2150 effect_ids: Vec::new(),
2151 dependent_operation_ids: Vec::new(),
2152 terminal_publication: None,
2153 budget_ticks: Some(3),
2154 requires_proof: false,
2155 }
2156 }
2157
2158 trait OperationFixtureExt {
2159 fn with_publication(self, publication: &str) -> Self;
2160 fn with_budget(self, budget_ticks: u64) -> Self;
2161 fn with_dependencies(self, deps: &[&str]) -> Self;
2162 }
2163
2164 impl OperationFixtureExt for OperationInstance {
2165 fn with_publication(mut self, publication: &str) -> Self {
2166 self.terminal_publication = Some(publication.to_string());
2167 self
2168 }
2169
2170 fn with_budget(mut self, budget_ticks: u64) -> Self {
2171 self.budget_ticks = Some(budget_ticks);
2172 self
2173 }
2174
2175 fn with_dependencies(mut self, deps: &[&str]) -> Self {
2176 self.dependent_operation_ids = deps.iter().map(ToString::to_string).collect();
2177 self
2178 }
2179 }
2180
2181 fn agreement_state<'a>(
2182 objects: &'a ProtocolMachineSemanticObjects,
2183 operation_id: &str,
2184 ) -> &'a AgreementState {
2185 objects
2186 .agreement_states
2187 .iter()
2188 .find(|state| state.operation_id == operation_id)
2189 .expect("agreement state")
2190 }
2191
2192 #[test]
2193 fn parity_critical_operations_synthesize_progress_contracts() {
2194 let objects = protocol_machine_semantic_objects(
2195 &[],
2196 &[],
2197 &[OperationInstance {
2198 operation_id: "materialization:proof".to_string(),
2199 session: Some(1),
2200 owner_id: None,
2201 kind: "materialization".to_string(),
2202 phase: OperationPhase::Succeeded,
2203 handler_identity: None,
2204 effect_ids: Vec::new(),
2205 dependent_operation_ids: Vec::new(),
2206 terminal_publication: Some("materialization.succeeded".to_string()),
2207 budget_ticks: Some(1),
2208 requires_proof: true,
2209 }],
2210 &[],
2211 &[],
2212 &[],
2213 &[],
2214 );
2215
2216 assert!(objects.parity_critical_operations_have_progress_contracts());
2217 assert!(objects
2218 .progress_contracts
2219 .iter()
2220 .any(|contract| contract.operation_id == "materialization:proof"
2221 && contract.state == ProgressState::Succeeded));
2222 assert_eq!(objects.regions.len(), 1);
2223 assert_eq!(objects.regions[0].region_id, "session:1");
2224 }
2225
2226 #[test]
2227 fn synthetic_step_descends_progress_measure() {
2228 let contract = ProgressContract {
2229 operation_id: "effect:1".to_string(),
2230 session: Some(1),
2231 state: ProgressState::Blocked,
2232 last_ordering_key: Some(1),
2233 bounded: true,
2234 budget_ticks: Some(1),
2235 last_progress_tick: Some(1),
2236 escalated_at_tick: None,
2237 reason: None,
2238 };
2239
2240 let next = contract
2241 .synthetic_step()
2242 .expect("blocked contract should take a synthetic step");
2243 assert_eq!(next.state, ProgressState::NoProgress);
2244 assert!(next.progress_measure() < contract.progress_measure());
2245 }
2246
2247 #[test]
2248 fn agreement_and_progress_semantics_cover_finalized_timeout_cancelled_and_degraded_paths() {
2249 let objects = agreement_semantics_fixture();
2250
2251 let finalized = objects
2252 .agreement_states
2253 .iter()
2254 .find(|state| state.operation_id.starts_with("materialization:"))
2255 .expect("materialization agreement state");
2256 assert_eq!(finalized.level, AgreementLevel::Finalized);
2257 assert_eq!(finalized.finalization, Some(FinalizationOutcome::Finalized));
2258
2259 let cancelled = agreement_state(&objects, "cancelled:op");
2260 assert_eq!(cancelled.level, AgreementLevel::Provisional);
2261 assert_eq!(cancelled.finalization, Some(FinalizationOutcome::Aborted));
2262
2263 let timed_out = agreement_state(&objects, "timed_out:op");
2264 assert_eq!(timed_out.level, AgreementLevel::Provisional);
2265 assert_eq!(timed_out.finalization, Some(FinalizationOutcome::TimedOut));
2266
2267 let degraded = objects
2268 .progress_contracts
2269 .iter()
2270 .find(|contract| contract.operation_id == "degraded:op")
2271 .expect("degraded progress contract");
2272 assert_eq!(degraded.state, ProgressState::Degraded);
2273 assert_eq!(
2274 degraded.reason.as_deref(),
2275 Some("timeout witness escalated")
2276 );
2277 assert!(objects.progress_transitions.iter().any(|transition| {
2278 transition.operation_id == "degraded:op"
2279 && transition.to_state == ProgressState::Degraded
2280 }));
2281 }
2282}