1use std::collections::BTreeSet;
9use std::mem::size_of;
10use std::sync::Arc;
11
12use crate::determinism::DeterminismMode;
13use crate::effect::EffectHandler;
14use crate::engine::{ProtocolMachine, ProtocolMachineConfig, ProtocolMachineError};
15use crate::loader::CodeImage;
16use crate::output_condition::OutputConditionPolicy;
17use crate::runtime_contracts::{
18 enforce_protocol_machine_runtime_gates, execution_profile_supported,
19 validate_transport_contracts_for_execution_profile, ProtocolMachineAdmissibilityClass,
20 ProtocolMachineEscalationWindowClass, ProtocolMachineExecutionProfile,
21 ProtocolMachineFairnessAssumption, RuntimeContracts, RuntimeGateResult,
22 TransportContractGateError,
23};
24use crate::scheduler::SchedPolicy;
25use telltale_types::{
26 canonical_transport_boundaries, canonicalize_placement_observations,
27 CanonicalPublicationContinuity, PendingEffectTreatment, PlacementObservation,
28 RuntimeUpgradeArtifact, RuntimeUpgradeCompatibility, RuntimeUpgradeExecutionConstraint,
29 TransitionArtifactPhase, TransportBoundaryObservation,
30};
31
32type ValidatedPlanStepArtifacts = (
33 BTreeSet<String>,
34 Vec<PlacementObservation>,
35 Vec<TransportBoundaryObservation>,
36);
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum DeterminismCapability {
41 Full,
43 ModuloEffects,
45 ModuloCommutativity,
47 Replay,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum SchedulerCapability {
54 Cooperative,
56 RoundRobin,
58 Priority,
60 ProgressAware,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct TheoremPackCapabilities {
67 pub determinism: Vec<DeterminismCapability>,
69 pub schedulers: Vec<SchedulerCapability>,
71 pub output_condition_gating: bool,
73}
74
75impl TheoremPackCapabilities {
76 #[must_use]
78 pub fn full() -> Self {
79 Self {
80 determinism: vec![
81 DeterminismCapability::Full,
82 DeterminismCapability::ModuloEffects,
83 DeterminismCapability::ModuloCommutativity,
84 DeterminismCapability::Replay,
85 ],
86 schedulers: vec![
87 SchedulerCapability::Cooperative,
88 SchedulerCapability::RoundRobin,
89 SchedulerCapability::Priority,
90 SchedulerCapability::ProgressAware,
91 ],
92 output_condition_gating: true,
93 }
94 }
95
96 #[must_use]
98 pub fn execution_profile(&self) -> ProtocolMachineExecutionProfile {
99 let mut fairness_assumptions: std::collections::BTreeSet<_> =
100 [ProtocolMachineFairnessAssumption::ScheduleConfluence]
101 .into_iter()
102 .collect();
103 if self.schedulers.iter().any(|scheduler| {
104 matches!(
105 scheduler,
106 SchedulerCapability::RoundRobin
107 | SchedulerCapability::Priority
108 | SchedulerCapability::ProgressAware
109 )
110 }) {
111 fairness_assumptions.insert(ProtocolMachineFairnessAssumption::StarvationFreedom);
112 }
113 if self
114 .schedulers
115 .iter()
116 .any(|scheduler| matches!(scheduler, SchedulerCapability::ProgressAware))
117 {
118 fairness_assumptions.insert(ProtocolMachineFairnessAssumption::LivenessFairness);
119 }
120
121 let mut admissibility_classes: std::collections::BTreeSet<_> = [
122 ProtocolMachineAdmissibilityClass::LocalEnvelope,
123 ProtocolMachineAdmissibilityClass::ShardedEnvelope,
124 ]
125 .into_iter()
126 .collect();
127 let mut escalation_window_classes: std::collections::BTreeSet<_> =
128 [ProtocolMachineEscalationWindowClass::ProgressContract]
129 .into_iter()
130 .collect();
131 if self.output_condition_gating {
132 admissibility_classes.insert(ProtocolMachineAdmissibilityClass::ProtocolEnvelopeBridge);
133 escalation_window_classes.insert(ProtocolMachineEscalationWindowClass::ProtocolBridge);
134 }
135 if self
136 .determinism
137 .iter()
138 .any(|capability| !matches!(capability, DeterminismCapability::Full))
139 {
140 escalation_window_classes
141 .insert(ProtocolMachineEscalationWindowClass::AdmissionComplexity);
142 }
143
144 ProtocolMachineExecutionProfile {
145 fairness_assumptions,
146 admissibility_classes,
147 escalation_window_classes,
148 theorem_pack_eligibility: vec![
149 ("protocol_machine_envelope_adherence".to_string(), true),
150 (
151 "protocol_machine_envelope_admission".to_string(),
152 self.determinism.iter().any(|capability| {
153 matches!(
154 capability,
155 DeterminismCapability::Full
156 | DeterminismCapability::Replay
157 | DeterminismCapability::ModuloEffects
158 | DeterminismCapability::ModuloCommutativity
159 )
160 }),
161 ),
162 (
163 "protocol_envelope_bridge".to_string(),
164 self.output_condition_gating,
165 ),
166 ],
167 }
168 }
169
170 #[must_use]
172 pub fn transport_requirements(&self) -> crate::runtime_contracts::TheoremTransportRequirements {
173 self.execution_profile().transport_requirements()
174 }
175}
176
177#[derive(Debug, Clone, PartialEq, Eq)]
179pub struct CompositionCertificate {
180 pub artifact_id: String,
182 pub link_ok_full: bool,
184 pub theorem_pack: TheoremPackCapabilities,
186 pub runtime_contracts: Option<RuntimeContracts>,
188}
189
190#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
192pub struct ReconfigurationPolicy {
193 pub dynamic_membership: bool,
195 pub overlap_required: bool,
197}
198
199#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
201pub struct ReconfigurationEvent {
202 pub artifact_id: String,
204 pub epoch: u64,
206 pub previous_members: Vec<String>,
208 pub next_members: Vec<String>,
210 pub added_members: Vec<String>,
212 pub removed_members: Vec<String>,
214 pub overlap_preserved: bool,
216 pub dynamic_membership: bool,
218 pub overlap_required: bool,
220}
221
222#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
224pub struct ReconfigurationPlanStep {
225 pub step_id: String,
227 pub next_members: Vec<String>,
229 pub placements: Vec<PlacementObservation>,
231}
232
233#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
235pub struct ReconfigurationPlan {
236 pub plan_id: String,
238 pub steps: Vec<ReconfigurationPlanStep>,
240}
241
242#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
244pub struct ReconfigurationPhaseArtifact {
245 pub step_id: String,
247 pub event: ReconfigurationEvent,
249 pub placements: Vec<PlacementObservation>,
251 pub transport_boundaries: Vec<TransportBoundaryObservation>,
253}
254
255#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
257pub struct ReconfigurationPlanExecution {
258 pub artifact_id: String,
260 pub plan_id: String,
262 pub initial_members: Vec<String>,
264 pub phases: Vec<ReconfigurationPhaseArtifact>,
266 pub final_members: Vec<String>,
268}
269
270#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
272pub struct RuntimeUpgradeRequest {
273 pub upgrade_id: String,
275 pub plan: ReconfigurationPlan,
277 pub compatibility: RuntimeUpgradeCompatibility,
279 pub carried_publication_ids: Vec<String>,
281 pub invalidated_publication_ids: Vec<String>,
283 pub carried_obligation_ids: Vec<String>,
285 pub invalidated_obligation_ids: Vec<String>,
287}
288
289#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
291pub struct RuntimeUpgradeExecution {
292 pub artifact_id: String,
294 pub upgrade_id: String,
296 pub artifacts: Vec<RuntimeUpgradeArtifact>,
298 pub final_members: Vec<String>,
300}
301
302#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
304pub struct ReconfigurationRuntimeSnapshot {
305 pub epoch: u64,
307 pub active_members: Vec<String>,
309 pub history: Vec<ReconfigurationEvent>,
311 pub plan_executions: Vec<ReconfigurationPlanExecution>,
313 pub runtime_upgrades: Vec<RuntimeUpgradeExecution>,
315}
316
317#[derive(Debug, Clone, Default)]
318struct ReconfigurationRuntimeState {
319 epoch: u64,
320 active_members: BTreeSet<String>,
321 history: Vec<ReconfigurationEvent>,
322 plan_executions: Vec<ReconfigurationPlanExecution>,
323 runtime_upgrades: Vec<RuntimeUpgradeExecution>,
324}
325
326#[derive(Debug, Clone)]
328pub struct ProtocolBundle {
329 pub code: Arc<CodeImage>,
331 pub certificate: CompositionCertificate,
333 pub reconfiguration_policy: Option<ReconfigurationPolicy>,
335}
336
337impl ProtocolBundle {
338 #[must_use]
340 pub fn new(code: Arc<CodeImage>, certificate: CompositionCertificate) -> Self {
341 Self {
342 code,
343 certificate,
344 reconfiguration_policy: None,
345 }
346 }
347
348 #[must_use]
350 pub fn with_reconfiguration_policy(mut self, policy: ReconfigurationPolicy) -> Self {
351 self.reconfiguration_policy = Some(policy);
352 self
353 }
354
355 #[must_use]
357 pub fn reconfiguration_policy(&self) -> Option<&ReconfigurationPolicy> {
358 self.reconfiguration_policy.as_ref()
359 }
360}
361
362#[derive(Debug, Clone, PartialEq, Eq)]
364pub struct MemoryBudget {
365 pub max_bytes_per_coroutine: usize,
367 pub max_bytes_per_session: usize,
369 pub max_incremental_bytes_per_protocol: usize,
371}
372
373impl Default for MemoryBudget {
374 fn default() -> Self {
375 Self {
376 max_bytes_per_coroutine: 64 * 1024,
377 max_bytes_per_session: 256 * 1024,
378 max_incremental_bytes_per_protocol: 128 * 1024,
379 }
380 }
381}
382
383#[derive(Debug, Clone, Default, PartialEq, Eq)]
385pub struct MemoryUsage {
386 pub bytes_per_coroutine: usize,
388 pub bytes_per_session: usize,
390 pub incremental_bytes_per_protocol: usize,
392 pub protocol_count: usize,
394 pub session_count: usize,
396 pub coroutine_count: usize,
398}
399
400#[derive(Debug, thiserror::Error)]
402pub enum CompositionError {
403 #[error("bundle `{artifact_id}` rejected: missing LinkOKFull compatibility evidence")]
405 MissingCompatibilityProof {
406 artifact_id: String,
408 },
409 #[error("bundle `{artifact_id}` rejected: missing required capability `{capability}`")]
411 MissingCapability {
412 artifact_id: String,
414 capability: String,
416 },
417 #[error("bundle `{artifact_id}` rejected: missing ProtocolMachine runtime contracts for advanced mode")]
419 MissingRuntimeContracts {
420 artifact_id: String,
422 },
423 #[error(
425 "bundle `{artifact_id}` rejected: missing transport contracts for theorem-pack claims"
426 )]
427 MissingTransportContracts {
428 artifact_id: String,
430 },
431 #[error("bundle `{artifact_id}` rejected: transport `{transport_name}` does not satisfy required contract `{requirement}`")]
433 UnsatisfiedTransportContract {
434 artifact_id: String,
436 transport_name: String,
438 requirement: &'static str,
440 },
441 #[error("bundle index {bundle_idx} is out of range")]
443 InvalidBundleIndex {
444 bundle_idx: usize,
446 },
447 #[error("bundle `{artifact_id}` rejected reconfiguration request: bundle is not reconfiguration-enabled")]
449 ReconfigurationDisabled {
450 artifact_id: String,
452 },
453 #[error("bundle `{artifact_id}` rejected reconfiguration request: missing required capability `{capability}`")]
455 MissingReconfigurationCapability {
456 artifact_id: String,
458 capability: String,
460 },
461 #[error(
463 "bundle `{artifact_id}` rejected reconfiguration request: membership set may not be empty"
464 )]
465 EmptyMembership {
466 artifact_id: String,
468 },
469 #[error("bundle `{artifact_id}` rejected reconfiguration request: overlap_required but member sets are disjoint")]
471 OverlapRequiredViolation {
472 artifact_id: String,
474 },
475 #[error("bundle `{artifact_id}` rejected reconfiguration plan: {reason}")]
477 InvalidReconfigurationPlan {
478 artifact_id: String,
480 reason: String,
482 },
483 #[error("bundle `{artifact_id}` rejected: memory budget exceeded ({reason})")]
485 BudgetExceeded {
486 artifact_id: String,
488 reason: String,
490 },
491 #[error(transparent)]
493 Vm(#[from] ProtocolMachineError),
494}
495
496#[derive(Debug)]
498pub struct ComposedRuntime {
499 machine: ProtocolMachine,
500 bundles: Vec<ProtocolBundle>,
501 reconfiguration_states: Vec<Option<ReconfigurationRuntimeState>>,
502 budget: MemoryBudget,
503 usage: MemoryUsage,
504}
505
506impl ComposedRuntime {
507 #[must_use]
509 pub fn new(config: ProtocolMachineConfig, budget: MemoryBudget) -> Self {
510 let machine = ProtocolMachine::new(config);
511 Self {
512 machine,
513 bundles: Vec::new(),
514 reconfiguration_states: Vec::new(),
515 budget,
516 usage: MemoryUsage {
517 bytes_per_coroutine: size_of::<crate::coroutine::Coroutine>(),
518 bytes_per_session: size_of::<crate::session::SessionState>(),
519 incremental_bytes_per_protocol: size_of::<Arc<CodeImage>>(),
520 ..MemoryUsage::default()
521 },
522 }
523 }
524
525 pub fn admit_bundle(&mut self, bundle: ProtocolBundle) -> Result<(), CompositionError> {
531 if !bundle.certificate.link_ok_full {
532 return Err(CompositionError::MissingCompatibilityProof {
533 artifact_id: bundle.certificate.artifact_id,
534 });
535 }
536 self.require_capabilities(&bundle)?;
537
538 if self.usage.incremental_bytes_per_protocol
539 > self.budget.max_incremental_bytes_per_protocol
540 {
541 return Err(CompositionError::BudgetExceeded {
542 artifact_id: bundle.certificate.artifact_id,
543 reason: "incremental protocol overhead".to_string(),
544 });
545 }
546
547 self.bundles.push(bundle);
548 self.reconfiguration_states
549 .push(Some(ReconfigurationRuntimeState::default()));
550 self.usage.protocol_count = self.bundles.len();
551 Ok(())
552 }
553
554 pub fn load_bundle_session(&mut self, bundle_idx: usize) -> Result<usize, CompositionError> {
560 let bundle = self
561 .bundles
562 .get(bundle_idx)
563 .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?;
564
565 let sid = self.machine.load_choreography(&bundle.code)?;
566 self.refresh_usage();
567 self.assert_budget(bundle_idx)?;
568 Ok(sid)
569 }
570
571 pub fn load_bundle_sessions(
577 &mut self,
578 bundle_idx: usize,
579 sessions: usize,
580 ) -> Result<Vec<usize>, CompositionError> {
581 let mut out = Vec::with_capacity(sessions);
582 for _ in 0..sessions {
583 out.push(self.load_bundle_session(bundle_idx)?);
584 }
585 Ok(out)
586 }
587
588 pub fn run(
594 &mut self,
595 handler: &dyn EffectHandler,
596 max_steps: usize,
597 ) -> Result<(), CompositionError> {
598 self.machine.run(handler, max_steps)?;
599 self.refresh_usage();
600 Ok(())
601 }
602
603 #[must_use]
605 pub fn memory_usage(&self) -> &MemoryUsage {
606 &self.usage
607 }
608
609 #[must_use]
611 pub fn bundles(&self) -> &[ProtocolBundle] {
612 &self.bundles
613 }
614
615 fn simulate_reconfiguration_transition(
616 artifact_id: &str,
617 policy: &ReconfigurationPolicy,
618 state: &mut ReconfigurationRuntimeState,
619 next_members: BTreeSet<String>,
620 ) -> Result<ReconfigurationEvent, CompositionError> {
621 if next_members.is_empty() {
622 return Err(CompositionError::EmptyMembership {
623 artifact_id: artifact_id.to_string(),
624 });
625 }
626
627 let overlap_preserved =
628 state.active_members.is_empty() || !state.active_members.is_disjoint(&next_members);
629 if policy.overlap_required && !overlap_preserved {
630 return Err(CompositionError::OverlapRequiredViolation {
631 artifact_id: artifact_id.to_string(),
632 });
633 }
634
635 let previous_members: Vec<String> = state.active_members.iter().cloned().collect();
636 let next_members_vec: Vec<String> = next_members.iter().cloned().collect();
637 let added_members: Vec<String> = next_members
638 .difference(&state.active_members)
639 .cloned()
640 .collect();
641 let removed_members: Vec<String> = state
642 .active_members
643 .difference(&next_members)
644 .cloned()
645 .collect();
646
647 state.epoch = state.epoch.saturating_add(1);
648 state.active_members = next_members;
649 let event = ReconfigurationEvent {
650 artifact_id: artifact_id.to_string(),
651 epoch: state.epoch,
652 previous_members,
653 next_members: next_members_vec,
654 added_members,
655 removed_members,
656 overlap_preserved,
657 dynamic_membership: policy.dynamic_membership,
658 overlap_required: policy.overlap_required,
659 };
660 state.history.push(event.clone());
661 Ok(event)
662 }
663
664 fn validate_plan_step(
665 artifact_id: &str,
666 step: &ReconfigurationPlanStep,
667 ) -> Result<ValidatedPlanStepArtifacts, CompositionError> {
668 let next_members = step.next_members.iter().cloned().collect::<BTreeSet<_>>();
669 if next_members.is_empty() {
670 return Err(CompositionError::InvalidReconfigurationPlan {
671 artifact_id: artifact_id.to_string(),
672 reason: format!(
673 "step `{}` must preserve a non-empty membership set",
674 step.step_id
675 ),
676 });
677 }
678 let placements =
679 canonicalize_placement_observations(&step.placements).map_err(|reason| {
680 CompositionError::InvalidReconfigurationPlan {
681 artifact_id: artifact_id.to_string(),
682 reason: format!("step `{}` has invalid placements: {reason}", step.step_id),
683 }
684 })?;
685 let placement_roles = placements
686 .iter()
687 .map(|placement| placement.role.clone())
688 .collect::<BTreeSet<_>>();
689 if placement_roles != next_members {
690 return Err(CompositionError::InvalidReconfigurationPlan {
691 artifact_id: artifact_id.to_string(),
692 reason: format!(
693 "step `{}` placements must match next_members exactly",
694 step.step_id
695 ),
696 });
697 }
698 let transport_boundaries =
699 canonical_transport_boundaries(&placements).map_err(|reason| {
700 CompositionError::InvalidReconfigurationPlan {
701 artifact_id: artifact_id.to_string(),
702 reason: format!(
703 "step `{}` has invalid transport boundaries: {reason}",
704 step.step_id
705 ),
706 }
707 })?;
708 Ok((next_members, placements, transport_boundaries))
709 }
710
711 fn validate_runtime_upgrade_request(
712 config: &ProtocolMachineConfig,
713 bundle: &ProtocolBundle,
714 state: &ReconfigurationRuntimeState,
715 request: &RuntimeUpgradeRequest,
716 ) -> Result<(), CompositionError> {
717 Self::validate_runtime_upgrade_nonempty(bundle, request)?;
718 Self::validate_runtime_upgrade_carried_sets(bundle, request)?;
719 Self::validate_runtime_upgrade_execution_constraint(config, bundle, request)?;
720 Self::validate_runtime_upgrade_ownership_continuity(bundle, state, request)?;
721 Self::validate_runtime_upgrade_pending_effects(bundle, request)?;
722 Self::validate_runtime_upgrade_publication_continuity(bundle, request)?;
723 Ok(())
724 }
725
726 fn invalid_runtime_upgrade_plan(
727 bundle: &ProtocolBundle,
728 upgrade_id: &str,
729 reason: impl Into<String>,
730 ) -> CompositionError {
731 CompositionError::InvalidReconfigurationPlan {
732 artifact_id: bundle.certificate.artifact_id.clone(),
733 reason: format!("runtime upgrade `{upgrade_id}` {}", reason.into()),
734 }
735 }
736
737 fn validate_runtime_upgrade_nonempty(
738 bundle: &ProtocolBundle,
739 request: &RuntimeUpgradeRequest,
740 ) -> Result<(), CompositionError> {
741 if request.plan.steps.is_empty() {
742 return Err(Self::invalid_runtime_upgrade_plan(
743 bundle,
744 &request.upgrade_id,
745 "must contain at least one plan step",
746 ));
747 }
748 Ok(())
749 }
750
751 fn validate_runtime_upgrade_carried_sets(
752 bundle: &ProtocolBundle,
753 request: &RuntimeUpgradeRequest,
754 ) -> Result<(), CompositionError> {
755 let duplicate_publication = request
756 .carried_publication_ids
757 .iter()
758 .any(|publication_id| request.invalidated_publication_ids.contains(publication_id));
759 if duplicate_publication {
760 return Err(Self::invalid_runtime_upgrade_plan(
761 bundle,
762 &request.upgrade_id,
763 "may not both carry and invalidate the same canonical publication",
764 ));
765 }
766
767 let duplicate_obligation = request
768 .carried_obligation_ids
769 .iter()
770 .any(|obligation_id| request.invalidated_obligation_ids.contains(obligation_id));
771 if duplicate_obligation {
772 return Err(Self::invalid_runtime_upgrade_plan(
773 bundle,
774 &request.upgrade_id,
775 "may not both carry and invalidate the same obligation",
776 ));
777 }
778 Ok(())
779 }
780
781 fn validate_runtime_upgrade_execution_constraint(
782 config: &ProtocolMachineConfig,
783 bundle: &ProtocolBundle,
784 request: &RuntimeUpgradeRequest,
785 ) -> Result<(), CompositionError> {
786 match request.compatibility.execution_constraint {
787 RuntimeUpgradeExecutionConstraint::PreserveBundleProfile => {
788 let profile = bundle.certificate.theorem_pack.execution_profile();
789 if execution_profile_supported(
790 &profile,
791 config,
792 bundle.certificate.runtime_contracts.as_ref(),
793 ) {
794 Ok(())
795 } else {
796 Err(Self::invalid_runtime_upgrade_plan(
797 bundle,
798 &request.upgrade_id,
799 "requires preserving the admitted execution profile",
800 ))
801 }
802 }
803 RuntimeUpgradeExecutionConstraint::MixedDeterminismAllowed => {
804 let Some(runtime_contracts) = bundle.certificate.runtime_contracts.as_ref() else {
805 return Err(Self::invalid_runtime_upgrade_plan(
806 bundle,
807 &request.upgrade_id,
808 "requires runtime contracts for mixed determinism",
809 ));
810 };
811 if runtime_contracts.can_use_mixed_determinism_profiles {
812 Ok(())
813 } else {
814 Err(Self::invalid_runtime_upgrade_plan(
815 bundle,
816 &request.upgrade_id,
817 "requires mixed-determinism admission",
818 ))
819 }
820 }
821 }
822 }
823
824 fn validate_runtime_upgrade_ownership_continuity(
825 bundle: &ProtocolBundle,
826 state: &ReconfigurationRuntimeState,
827 request: &RuntimeUpgradeRequest,
828 ) -> Result<(), CompositionError> {
829 if !request.compatibility.ownership_continuity_required {
830 return Ok(());
831 }
832
833 let first_members = request.plan.steps[0]
834 .next_members
835 .iter()
836 .cloned()
837 .collect::<BTreeSet<_>>();
838 if state.active_members.is_empty() || state.active_members.is_disjoint(&first_members) {
839 return Err(Self::invalid_runtime_upgrade_plan(
840 bundle,
841 &request.upgrade_id,
842 "requires ownership continuity across the first cutover",
843 ));
844 }
845 Ok(())
846 }
847
848 fn validate_runtime_upgrade_pending_effects(
849 bundle: &ProtocolBundle,
850 request: &RuntimeUpgradeRequest,
851 ) -> Result<(), CompositionError> {
852 let missing_pending_effect_policy = matches!(
853 request.compatibility.pending_effect_treatment,
854 PendingEffectTreatment::InvalidateBlocked
855 ) && request.carried_obligation_ids.is_empty()
856 && request.invalidated_obligation_ids.is_empty();
857 if missing_pending_effect_policy {
858 return Err(Self::invalid_runtime_upgrade_plan(
859 bundle,
860 &request.upgrade_id,
861 "must make pending-effect treatment explicit",
862 ));
863 }
864 Ok(())
865 }
866
867 fn validate_runtime_upgrade_publication_continuity(
868 bundle: &ProtocolBundle,
869 request: &RuntimeUpgradeRequest,
870 ) -> Result<(), CompositionError> {
871 let invalidates_canonical_publications =
872 matches!(
873 request.compatibility.canonical_publication_continuity,
874 CanonicalPublicationContinuity::PreserveCanonicalTruth
875 ) && !request.invalidated_publication_ids.is_empty();
876 if invalidates_canonical_publications {
877 return Err(Self::invalid_runtime_upgrade_plan(
878 bundle,
879 &request.upgrade_id,
880 "may not invalidate canonical publications when continuity is required",
881 ));
882 }
883 Ok(())
884 }
885
886 fn snapshot_from_state(state: &ReconfigurationRuntimeState) -> ReconfigurationRuntimeSnapshot {
887 ReconfigurationRuntimeSnapshot {
888 epoch: state.epoch,
889 active_members: state.active_members.iter().cloned().collect(),
890 history: state.history.clone(),
891 plan_executions: state.plan_executions.clone(),
892 runtime_upgrades: state.runtime_upgrades.clone(),
893 }
894 }
895
896 fn restore_snapshot_into_state(
897 artifact_id: &str,
898 state: &mut ReconfigurationRuntimeState,
899 snapshot: ReconfigurationRuntimeSnapshot,
900 ) -> Result<(), CompositionError> {
901 let active_members = snapshot
902 .active_members
903 .iter()
904 .cloned()
905 .collect::<BTreeSet<_>>();
906 if active_members.is_empty() {
907 return Err(CompositionError::InvalidReconfigurationPlan {
908 artifact_id: artifact_id.to_string(),
909 reason: "snapshot must preserve a non-empty active membership set".to_string(),
910 });
911 }
912 if snapshot
913 .history
914 .windows(2)
915 .any(|window| window[0].epoch >= window[1].epoch)
916 {
917 return Err(CompositionError::InvalidReconfigurationPlan {
918 artifact_id: artifact_id.to_string(),
919 reason: "snapshot history must have strictly increasing epochs".to_string(),
920 });
921 }
922 if let Some(last_event) = snapshot.history.last() {
923 let final_from_history = last_event
924 .next_members
925 .iter()
926 .cloned()
927 .collect::<BTreeSet<_>>();
928 if final_from_history != active_members {
929 return Err(CompositionError::InvalidReconfigurationPlan {
930 artifact_id: artifact_id.to_string(),
931 reason: "snapshot active membership must match the final history event"
932 .to_string(),
933 });
934 }
935 if last_event.epoch != snapshot.epoch {
936 return Err(CompositionError::InvalidReconfigurationPlan {
937 artifact_id: artifact_id.to_string(),
938 reason: "snapshot epoch must match the final history event epoch".to_string(),
939 });
940 }
941 } else if snapshot.epoch != 0 {
942 return Err(CompositionError::InvalidReconfigurationPlan {
943 artifact_id: artifact_id.to_string(),
944 reason: "empty snapshot history must use epoch 0".to_string(),
945 });
946 }
947 if snapshot
948 .plan_executions
949 .iter()
950 .any(|execution| execution.phases.is_empty())
951 {
952 return Err(CompositionError::InvalidReconfigurationPlan {
953 artifact_id: artifact_id.to_string(),
954 reason: "snapshot plan executions must contain at least one phase".to_string(),
955 });
956 }
957 if snapshot
958 .runtime_upgrades
959 .iter()
960 .any(|execution| execution.artifacts.is_empty())
961 {
962 return Err(CompositionError::InvalidReconfigurationPlan {
963 artifact_id: artifact_id.to_string(),
964 reason: "snapshot runtime upgrades must contain at least one artifact".to_string(),
965 });
966 }
967 state.epoch = snapshot.epoch;
968 state.active_members = active_members;
969 state.history = snapshot.history;
970 state.plan_executions = snapshot.plan_executions;
971 state.runtime_upgrades = snapshot.runtime_upgrades;
972 Ok(())
973 }
974
975 pub fn seed_bundle_membership<I, S>(
982 &mut self,
983 bundle_idx: usize,
984 members: I,
985 ) -> Result<(), CompositionError>
986 where
987 I: IntoIterator<Item = S>,
988 S: Into<String>,
989 {
990 let bundle = self
991 .bundles
992 .get(bundle_idx)
993 .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?;
994 let state = self
995 .reconfiguration_states
996 .get_mut(bundle_idx)
997 .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?
998 .as_mut()
999 .ok_or_else(|| CompositionError::ReconfigurationDisabled {
1000 artifact_id: bundle.certificate.artifact_id.clone(),
1001 })?;
1002 if bundle.reconfiguration_policy.is_none() {
1003 return Err(CompositionError::ReconfigurationDisabled {
1004 artifact_id: bundle.certificate.artifact_id.clone(),
1005 });
1006 }
1007 let members: BTreeSet<String> = members.into_iter().map(Into::into).collect();
1008 if members.is_empty() {
1009 return Err(CompositionError::EmptyMembership {
1010 artifact_id: bundle.certificate.artifact_id.clone(),
1011 });
1012 }
1013 state.active_members = members;
1014 state.epoch = 0;
1015 state.history.clear();
1016 state.plan_executions.clear();
1017 state.runtime_upgrades.clear();
1018 Ok(())
1019 }
1020
1021 pub fn reconfigure_bundle<I, S>(
1029 &mut self,
1030 bundle_idx: usize,
1031 next_members: I,
1032 ) -> Result<ReconfigurationEvent, CompositionError>
1033 where
1034 I: IntoIterator<Item = S>,
1035 S: Into<String>,
1036 {
1037 let bundle = self
1038 .bundles
1039 .get(bundle_idx)
1040 .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?;
1041 let Some(policy) = bundle.reconfiguration_policy.as_ref() else {
1042 return Err(CompositionError::ReconfigurationDisabled {
1043 artifact_id: bundle.certificate.artifact_id.clone(),
1044 });
1045 };
1046 if !policy.dynamic_membership {
1047 return Err(CompositionError::ReconfigurationDisabled {
1048 artifact_id: bundle.certificate.artifact_id.clone(),
1049 });
1050 }
1051
1052 let state = self
1053 .reconfiguration_states
1054 .get_mut(bundle_idx)
1055 .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?
1056 .as_mut()
1057 .ok_or_else(|| CompositionError::ReconfigurationDisabled {
1058 artifact_id: bundle.certificate.artifact_id.clone(),
1059 })?;
1060 let next_members: BTreeSet<String> = next_members.into_iter().map(Into::into).collect();
1061 Self::simulate_reconfiguration_transition(
1062 &bundle.certificate.artifact_id,
1063 policy,
1064 state,
1065 next_members,
1066 )
1067 }
1068
1069 #[must_use]
1071 pub fn bundle_members(&self, bundle_idx: usize) -> Option<&BTreeSet<String>> {
1072 self.reconfiguration_states
1073 .get(bundle_idx)
1074 .and_then(Option::as_ref)
1075 .map(|state| &state.active_members)
1076 }
1077
1078 #[must_use]
1080 pub fn bundle_reconfiguration_history(
1081 &self,
1082 bundle_idx: usize,
1083 ) -> Option<&[ReconfigurationEvent]> {
1084 self.reconfiguration_states
1085 .get(bundle_idx)
1086 .and_then(Option::as_ref)
1087 .map(|state| state.history.as_slice())
1088 }
1089
1090 pub fn execute_reconfiguration_plan(
1096 &mut self,
1097 bundle_idx: usize,
1098 plan: &ReconfigurationPlan,
1099 ) -> Result<ReconfigurationPlanExecution, CompositionError> {
1100 let bundle = self
1101 .bundles
1102 .get(bundle_idx)
1103 .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?;
1104 let Some(policy) = bundle.reconfiguration_policy.as_ref() else {
1105 return Err(CompositionError::ReconfigurationDisabled {
1106 artifact_id: bundle.certificate.artifact_id.clone(),
1107 });
1108 };
1109 if !policy.dynamic_membership {
1110 return Err(CompositionError::ReconfigurationDisabled {
1111 artifact_id: bundle.certificate.artifact_id.clone(),
1112 });
1113 }
1114 if plan.steps.is_empty() {
1115 return Err(CompositionError::InvalidReconfigurationPlan {
1116 artifact_id: bundle.certificate.artifact_id.clone(),
1117 reason: format!("plan `{}` must contain at least one step", plan.plan_id),
1118 });
1119 }
1120
1121 let state = self
1122 .reconfiguration_states
1123 .get_mut(bundle_idx)
1124 .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?
1125 .as_mut()
1126 .ok_or_else(|| CompositionError::ReconfigurationDisabled {
1127 artifact_id: bundle.certificate.artifact_id.clone(),
1128 })?;
1129 let initial_members = state.active_members.iter().cloned().collect::<Vec<_>>();
1130 let mut simulated = state.clone();
1131 let mut phases = Vec::new();
1132 for step in &plan.steps {
1133 let (next_members, placements, transport_boundaries) =
1134 Self::validate_plan_step(&bundle.certificate.artifact_id, step)?;
1135 let event = Self::simulate_reconfiguration_transition(
1136 &bundle.certificate.artifact_id,
1137 policy,
1138 &mut simulated,
1139 next_members,
1140 )?;
1141 phases.push(ReconfigurationPhaseArtifact {
1142 step_id: step.step_id.clone(),
1143 event,
1144 placements,
1145 transport_boundaries,
1146 });
1147 }
1148
1149 let execution = ReconfigurationPlanExecution {
1150 artifact_id: bundle.certificate.artifact_id.clone(),
1151 plan_id: plan.plan_id.clone(),
1152 initial_members,
1153 final_members: simulated.active_members.iter().cloned().collect(),
1154 phases,
1155 };
1156 simulated.plan_executions.push(execution.clone());
1157 *state = simulated;
1158 Ok(execution)
1159 }
1160
1161 #[must_use]
1163 pub fn bundle_reconfiguration_plan_executions(
1164 &self,
1165 bundle_idx: usize,
1166 ) -> Option<&[ReconfigurationPlanExecution]> {
1167 self.reconfiguration_states
1168 .get(bundle_idx)
1169 .and_then(Option::as_ref)
1170 .map(|state| state.plan_executions.as_slice())
1171 }
1172
1173 pub fn execute_runtime_upgrade(
1179 &mut self,
1180 bundle_idx: usize,
1181 request: &RuntimeUpgradeRequest,
1182 ) -> Result<RuntimeUpgradeExecution, CompositionError> {
1183 let bundle = self
1184 .bundles
1185 .get(bundle_idx)
1186 .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?;
1187 let policy = Self::require_reconfiguration_policy(bundle)?;
1188 let config = self.machine.config().clone();
1189 let state = self
1190 .reconfiguration_states
1191 .get_mut(bundle_idx)
1192 .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?
1193 .as_mut()
1194 .ok_or_else(|| Self::reconfiguration_disabled(bundle))?;
1195 let initial_members = state.active_members.iter().cloned().collect::<Vec<_>>();
1196 let next_members = Self::runtime_upgrade_target_members(request);
1197 let staged = Self::runtime_upgrade_artifact(
1198 request,
1199 TransitionArtifactPhase::Staged,
1200 initial_members.clone(),
1201 next_members.clone(),
1202 None,
1203 );
1204
1205 if let Err(err) = Self::validate_runtime_upgrade_request(&config, bundle, state, request) {
1206 let execution = Self::failed_runtime_upgrade_execution(
1207 bundle,
1208 request,
1209 initial_members.clone(),
1210 vec![
1211 staged,
1212 Self::runtime_upgrade_artifact(
1213 request,
1214 TransitionArtifactPhase::Failed,
1215 initial_members.clone(),
1216 initial_members.clone(),
1217 Some(err.to_string()),
1218 ),
1219 ],
1220 );
1221 state.runtime_upgrades.push(execution);
1222 return Err(err);
1223 }
1224
1225 let admitted = Self::runtime_upgrade_artifact(
1226 request,
1227 TransitionArtifactPhase::Admitted,
1228 initial_members.clone(),
1229 next_members,
1230 None,
1231 );
1232
1233 let mut simulated = state.clone();
1234 let execution_result =
1235 Self::simulate_runtime_upgrade_steps(bundle, policy, &mut simulated, request);
1236
1237 match execution_result {
1238 Ok(()) => {
1239 let execution = Self::committed_runtime_upgrade_execution(
1240 bundle,
1241 request,
1242 &simulated,
1243 initial_members.clone(),
1244 staged,
1245 admitted,
1246 );
1247 simulated.runtime_upgrades.push(execution.clone());
1248 *state = simulated;
1249 Ok(execution)
1250 }
1251 Err(err) => {
1252 let execution = Self::failed_runtime_upgrade_execution(
1253 bundle,
1254 request,
1255 initial_members.clone(),
1256 vec![
1257 staged,
1258 admitted,
1259 Self::runtime_upgrade_artifact(
1260 request,
1261 TransitionArtifactPhase::RolledBack,
1262 initial_members.clone(),
1263 initial_members.clone(),
1264 Some(err.to_string()),
1265 ),
1266 ],
1267 );
1268 state.runtime_upgrades.push(execution);
1269 Err(err)
1270 }
1271 }
1272 }
1273
1274 fn reconfiguration_disabled(bundle: &ProtocolBundle) -> CompositionError {
1275 CompositionError::ReconfigurationDisabled {
1276 artifact_id: bundle.certificate.artifact_id.clone(),
1277 }
1278 }
1279
1280 fn require_reconfiguration_policy(
1281 bundle: &ProtocolBundle,
1282 ) -> Result<&ReconfigurationPolicy, CompositionError> {
1283 bundle
1284 .reconfiguration_policy
1285 .as_ref()
1286 .ok_or_else(|| Self::reconfiguration_disabled(bundle))
1287 }
1288
1289 fn runtime_upgrade_target_members(request: &RuntimeUpgradeRequest) -> Vec<String> {
1290 request
1291 .plan
1292 .steps
1293 .last()
1294 .map(|step| step.next_members.clone())
1295 .unwrap_or_default()
1296 }
1297
1298 fn simulate_runtime_upgrade_steps(
1299 bundle: &ProtocolBundle,
1300 policy: &ReconfigurationPolicy,
1301 simulated: &mut ReconfigurationRuntimeState,
1302 request: &RuntimeUpgradeRequest,
1303 ) -> Result<(), CompositionError> {
1304 for step in &request.plan.steps {
1305 let (next_members, _placements, _transport_boundaries) =
1306 Self::validate_plan_step(&bundle.certificate.artifact_id, step)?;
1307 Self::simulate_reconfiguration_transition(
1308 &bundle.certificate.artifact_id,
1309 policy,
1310 simulated,
1311 next_members,
1312 )?;
1313 }
1314 Ok(())
1315 }
1316
1317 fn committed_runtime_upgrade_execution(
1318 bundle: &ProtocolBundle,
1319 request: &RuntimeUpgradeRequest,
1320 simulated: &ReconfigurationRuntimeState,
1321 initial_members: Vec<String>,
1322 staged: RuntimeUpgradeArtifact,
1323 admitted: RuntimeUpgradeArtifact,
1324 ) -> RuntimeUpgradeExecution {
1325 let committed = Self::runtime_upgrade_artifact(
1326 request,
1327 TransitionArtifactPhase::CommittedCutover,
1328 initial_members,
1329 simulated.active_members.iter().cloned().collect(),
1330 None,
1331 );
1332 RuntimeUpgradeExecution {
1333 artifact_id: bundle.certificate.artifact_id.clone(),
1334 upgrade_id: request.upgrade_id.clone(),
1335 final_members: simulated.active_members.iter().cloned().collect(),
1336 artifacts: vec![staged, admitted, committed],
1337 }
1338 }
1339
1340 fn runtime_upgrade_artifact(
1341 request: &RuntimeUpgradeRequest,
1342 phase: TransitionArtifactPhase,
1343 previous_members: Vec<String>,
1344 next_members: Vec<String>,
1345 reason: Option<String>,
1346 ) -> RuntimeUpgradeArtifact {
1347 RuntimeUpgradeArtifact {
1348 upgrade_id: request.upgrade_id.clone(),
1349 phase,
1350 previous_members,
1351 next_members,
1352 compatibility: request.compatibility.clone(),
1353 carried_publication_ids: request.carried_publication_ids.clone(),
1354 invalidated_publication_ids: request.invalidated_publication_ids.clone(),
1355 carried_obligation_ids: request.carried_obligation_ids.clone(),
1356 invalidated_obligation_ids: request.invalidated_obligation_ids.clone(),
1357 reason,
1358 }
1359 }
1360
1361 fn failed_runtime_upgrade_execution(
1362 bundle: &ProtocolBundle,
1363 request: &RuntimeUpgradeRequest,
1364 final_members: Vec<String>,
1365 artifacts: Vec<RuntimeUpgradeArtifact>,
1366 ) -> RuntimeUpgradeExecution {
1367 RuntimeUpgradeExecution {
1368 artifact_id: bundle.certificate.artifact_id.clone(),
1369 upgrade_id: request.upgrade_id.clone(),
1370 final_members,
1371 artifacts,
1372 }
1373 }
1374
1375 #[must_use]
1377 pub fn bundle_runtime_upgrade_executions(
1378 &self,
1379 bundle_idx: usize,
1380 ) -> Option<&[RuntimeUpgradeExecution]> {
1381 self.reconfiguration_states
1382 .get(bundle_idx)
1383 .and_then(Option::as_ref)
1384 .map(|state| state.runtime_upgrades.as_slice())
1385 }
1386
1387 #[must_use]
1389 pub fn bundle_reconfiguration_snapshot(
1390 &self,
1391 bundle_idx: usize,
1392 ) -> Option<ReconfigurationRuntimeSnapshot> {
1393 self.reconfiguration_states
1394 .get(bundle_idx)
1395 .and_then(Option::as_ref)
1396 .map(Self::snapshot_from_state)
1397 }
1398
1399 pub fn restore_bundle_reconfiguration_snapshot(
1405 &mut self,
1406 bundle_idx: usize,
1407 snapshot: ReconfigurationRuntimeSnapshot,
1408 ) -> Result<(), CompositionError> {
1409 let bundle = self
1410 .bundles
1411 .get(bundle_idx)
1412 .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?;
1413 if bundle.reconfiguration_policy.is_none() {
1414 return Err(CompositionError::ReconfigurationDisabled {
1415 artifact_id: bundle.certificate.artifact_id.clone(),
1416 });
1417 }
1418 let state = self
1419 .reconfiguration_states
1420 .get_mut(bundle_idx)
1421 .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?
1422 .as_mut()
1423 .ok_or_else(|| CompositionError::ReconfigurationDisabled {
1424 artifact_id: bundle.certificate.artifact_id.clone(),
1425 })?;
1426 Self::restore_snapshot_into_state(&bundle.certificate.artifact_id, state, snapshot)
1427 }
1428
1429 #[must_use]
1431 pub fn machine(&self) -> &ProtocolMachine {
1432 &self.machine
1433 }
1434
1435 fn refresh_usage(&mut self) {
1436 self.usage.session_count = self.machine.session_count();
1437 self.usage.coroutine_count = self.machine.coroutine_count();
1438 self.usage.protocol_count = self.bundles.len();
1439 }
1440
1441 fn assert_budget(&self, bundle_idx: usize) -> Result<(), CompositionError> {
1442 let per_coro = self.usage.bytes_per_coroutine;
1443 let per_sess = self.usage.bytes_per_session;
1444 let artifact_id = self
1445 .bundles
1446 .get(bundle_idx)
1447 .map(|b| b.certificate.artifact_id.clone())
1448 .unwrap_or_else(|| format!("bundle/{bundle_idx}"));
1449 if per_coro > self.budget.max_bytes_per_coroutine {
1450 return Err(CompositionError::BudgetExceeded {
1451 artifact_id,
1452 reason: "bytes_per_coroutine".to_string(),
1453 });
1454 }
1455 if per_sess > self.budget.max_bytes_per_session {
1456 return Err(CompositionError::BudgetExceeded {
1457 artifact_id,
1458 reason: "bytes_per_session".to_string(),
1459 });
1460 }
1461 Ok(())
1462 }
1463
1464 fn require_capabilities(&self, bundle: &ProtocolBundle) -> Result<(), CompositionError> {
1465 let cert = &bundle.certificate;
1466 let caps = &cert.theorem_pack;
1467 let runtime_contracts = cert.runtime_contracts.as_ref();
1468
1469 self.require_runtime_contracts(cert, runtime_contracts)?;
1470 self.require_scheduler_capability(cert, caps)?;
1471 self.require_determinism_capability(cert, caps)?;
1472 self.require_execution_profile(cert, caps, runtime_contracts)?;
1473 self.require_output_condition_gating(cert, caps)?;
1474 self.require_transport_contracts(cert, caps, runtime_contracts)?;
1475 self.require_reconfiguration_capabilities(bundle, runtime_contracts)?;
1476 Ok(())
1477 }
1478
1479 fn require_runtime_contracts(
1480 &self,
1481 cert: &CompositionCertificate,
1482 runtime_contracts: Option<&RuntimeContracts>,
1483 ) -> Result<(), CompositionError> {
1484 match enforce_protocol_machine_runtime_gates(self.machine.config(), runtime_contracts) {
1485 RuntimeGateResult::Admitted => Ok(()),
1486 RuntimeGateResult::RejectedMissingContracts => {
1487 Err(CompositionError::MissingRuntimeContracts {
1488 artifact_id: cert.artifact_id.clone(),
1489 })
1490 }
1491 RuntimeGateResult::RejectedUnsupportedDeterminismProfile => {
1492 Err(CompositionError::MissingCapability {
1493 artifact_id: cert.artifact_id.clone(),
1494 capability: format!(
1495 "determinism_profile::{:?}",
1496 self.machine.config().determinism_mode
1497 ),
1498 })
1499 }
1500 }
1501 }
1502
1503 fn require_scheduler_capability(
1504 &self,
1505 cert: &CompositionCertificate,
1506 caps: &TheoremPackCapabilities,
1507 ) -> Result<(), CompositionError> {
1508 let required_sched = match self.machine.config().sched_policy {
1509 SchedPolicy::Cooperative => SchedulerCapability::Cooperative,
1510 SchedPolicy::RoundRobin => SchedulerCapability::RoundRobin,
1511 SchedPolicy::Priority(_) => SchedulerCapability::Priority,
1512 SchedPolicy::ProgressAware => SchedulerCapability::ProgressAware,
1513 };
1514 if caps.schedulers.contains(&required_sched) {
1515 Ok(())
1516 } else {
1517 Err(CompositionError::MissingCapability {
1518 artifact_id: cert.artifact_id.clone(),
1519 capability: format!("scheduler::{required_sched:?}"),
1520 })
1521 }
1522 }
1523
1524 fn require_determinism_capability(
1525 &self,
1526 cert: &CompositionCertificate,
1527 caps: &TheoremPackCapabilities,
1528 ) -> Result<(), CompositionError> {
1529 let required_det = match self.machine.config().determinism_mode {
1530 DeterminismMode::Full => DeterminismCapability::Full,
1531 DeterminismMode::ModuloEffects => DeterminismCapability::ModuloEffects,
1532 DeterminismMode::ModuloCommutativity => DeterminismCapability::ModuloCommutativity,
1533 DeterminismMode::Replay => DeterminismCapability::Replay,
1534 };
1535 if caps.determinism.contains(&required_det) {
1536 Ok(())
1537 } else {
1538 Err(CompositionError::MissingCapability {
1539 artifact_id: cert.artifact_id.clone(),
1540 capability: format!("determinism::{required_det:?}"),
1541 })
1542 }
1543 }
1544
1545 fn require_execution_profile(
1546 &self,
1547 cert: &CompositionCertificate,
1548 caps: &TheoremPackCapabilities,
1549 runtime_contracts: Option<&RuntimeContracts>,
1550 ) -> Result<(), CompositionError> {
1551 if execution_profile_supported(
1552 &caps.execution_profile(),
1553 self.machine.config(),
1554 runtime_contracts,
1555 ) {
1556 Ok(())
1557 } else {
1558 Err(CompositionError::MissingCapability {
1559 artifact_id: cert.artifact_id.clone(),
1560 capability: "execution_profile".to_string(),
1561 })
1562 }
1563 }
1564
1565 fn require_output_condition_gating(
1566 &self,
1567 cert: &CompositionCertificate,
1568 caps: &TheoremPackCapabilities,
1569 ) -> Result<(), CompositionError> {
1570 let output_conditions_disabled = matches!(
1571 self.machine.config().output_condition_policy,
1572 OutputConditionPolicy::Disabled
1573 );
1574 if output_conditions_disabled || caps.output_condition_gating {
1575 Ok(())
1576 } else {
1577 Err(CompositionError::MissingCapability {
1578 artifact_id: cert.artifact_id.clone(),
1579 capability: "output_condition_gating".to_string(),
1580 })
1581 }
1582 }
1583
1584 fn require_transport_contracts(
1585 &self,
1586 cert: &CompositionCertificate,
1587 caps: &TheoremPackCapabilities,
1588 runtime_contracts: Option<&RuntimeContracts>,
1589 ) -> Result<(), CompositionError> {
1590 let profile = caps.execution_profile();
1591 let requirements = profile.transport_requirements();
1592 if requirements.is_empty() {
1593 return Ok(());
1594 }
1595 let Some(runtime_contracts) = runtime_contracts else {
1596 return Err(CompositionError::MissingTransportContracts {
1597 artifact_id: cert.artifact_id.clone(),
1598 });
1599 };
1600 validate_transport_contracts_for_execution_profile(
1601 &profile,
1602 &runtime_contracts.transport_contracts,
1603 )
1604 .map_err(|error| match error {
1605 TransportContractGateError::MissingTransportContracts => {
1606 CompositionError::MissingTransportContracts {
1607 artifact_id: cert.artifact_id.clone(),
1608 }
1609 }
1610 TransportContractGateError::UnsatisfiedTransportRequirement {
1611 transport_name,
1612 requirement,
1613 } => CompositionError::UnsatisfiedTransportContract {
1614 artifact_id: cert.artifact_id.clone(),
1615 transport_name,
1616 requirement,
1617 },
1618 })
1619 }
1620
1621 fn require_reconfiguration_capabilities(
1622 &self,
1623 bundle: &ProtocolBundle,
1624 runtime_contracts: Option<&RuntimeContracts>,
1625 ) -> Result<(), CompositionError> {
1626 let Some(policy) = &bundle.reconfiguration_policy else {
1627 return Ok(());
1628 };
1629 let cert = &bundle.certificate;
1630 let Some(contracts) = runtime_contracts else {
1631 return Err(CompositionError::MissingRuntimeContracts {
1632 artifact_id: cert.artifact_id.clone(),
1633 });
1634 };
1635 let capabilities = &contracts.capabilities;
1636 if !capabilities.contains(&crate::runtime_contracts::RuntimeCapability::PlacementRefinement)
1637 {
1638 return Err(CompositionError::MissingReconfigurationCapability {
1639 artifact_id: cert.artifact_id.clone(),
1640 capability: "reconfiguration::placement_refinement".to_string(),
1641 });
1642 }
1643 if policy.dynamic_membership
1644 && !capabilities
1645 .contains(&crate::runtime_contracts::RuntimeCapability::AutoscaleRepartition)
1646 {
1647 return Err(CompositionError::MissingReconfigurationCapability {
1648 artifact_id: cert.artifact_id.clone(),
1649 capability: "reconfiguration::dynamic_membership".to_string(),
1650 });
1651 }
1652 if policy.overlap_required
1653 && !capabilities.contains(&crate::runtime_contracts::RuntimeCapability::LiveMigration)
1654 {
1655 return Err(CompositionError::MissingReconfigurationCapability {
1656 artifact_id: cert.artifact_id.clone(),
1657 capability: "reconfiguration::overlap_required".to_string(),
1658 });
1659 }
1660 Ok(())
1661 }
1662}
1663
1664#[cfg(test)]
1665mod tests {
1666 use std::collections::BTreeMap;
1667
1668 use telltale_types::{GlobalType, Label, LocalTypeR};
1669
1670 use super::*;
1671 use crate::coroutine::Value;
1672 use crate::effect::{EffectFailure, EffectResult};
1673
1674 #[derive(Debug, Clone, Copy)]
1675 struct Noop;
1676
1677 impl EffectHandler for Noop {
1678 fn handle_send(
1679 &self,
1680 _role: &str,
1681 _partner: &str,
1682 label: &str,
1683 _state: &[Value],
1684 ) -> EffectResult<Value> {
1685 EffectResult::success(Value::Str(label.to_string()))
1686 }
1687
1688 fn handle_recv(
1689 &self,
1690 _role: &str,
1691 _partner: &str,
1692 _label: &str,
1693 _state: &mut Vec<Value>,
1694 _payload: &Value,
1695 ) -> EffectResult<()> {
1696 EffectResult::success(())
1697 }
1698
1699 fn handle_choose(
1700 &self,
1701 _role: &str,
1702 _partner: &str,
1703 labels: &[String],
1704 _state: &[Value],
1705 ) -> EffectResult<String> {
1706 match labels.first().cloned() {
1707 Some(label) => EffectResult::success(label),
1708 None => EffectResult::failure(EffectFailure::invalid_input("no labels available")),
1709 }
1710 }
1711
1712 fn step(&self, _role: &str, _state: &mut Vec<Value>) -> EffectResult<()> {
1713 EffectResult::success(())
1714 }
1715 }
1716
1717 fn image(label: &str) -> Arc<CodeImage> {
1718 let mut local_types = BTreeMap::new();
1719 local_types.insert(
1720 "A".to_string(),
1721 LocalTypeR::send("B", Label::new(label), LocalTypeR::End),
1722 );
1723 local_types.insert(
1724 "B".to_string(),
1725 LocalTypeR::recv("A", Label::new(label), LocalTypeR::End),
1726 );
1727 let global = GlobalType::send("A", "B", Label::new(label), GlobalType::End);
1728 Arc::new(CodeImage::from_local_types(&local_types, &global))
1729 }
1730
1731 #[test]
1732 fn proof_carrying_admission_rejects_missing_link_ok_full() {
1733 let mut runtime =
1734 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
1735 let bad = ProtocolBundle::new(
1736 image("m"),
1737 CompositionCertificate {
1738 artifact_id: "cert/bad".to_string(),
1739 link_ok_full: false,
1740 theorem_pack: TheoremPackCapabilities::full(),
1741 runtime_contracts: Some(RuntimeContracts::full()),
1742 },
1743 );
1744 assert!(matches!(
1745 runtime.admit_bundle(bad),
1746 Err(CompositionError::MissingCompatibilityProof { .. })
1747 ));
1748 }
1749
1750 #[test]
1751 fn immutable_code_artifacts_are_arc_shared() {
1752 let shared = image("m");
1753 let mut runtime =
1754 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
1755 let b1 = ProtocolBundle::new(
1756 Arc::clone(&shared),
1757 CompositionCertificate {
1758 artifact_id: "cert/1".to_string(),
1759 link_ok_full: true,
1760 theorem_pack: TheoremPackCapabilities::full(),
1761 runtime_contracts: Some(RuntimeContracts::full()),
1762 },
1763 );
1764 let b2 = ProtocolBundle::new(
1765 Arc::clone(&shared),
1766 CompositionCertificate {
1767 artifact_id: "cert/2".to_string(),
1768 link_ok_full: true,
1769 theorem_pack: TheoremPackCapabilities::full(),
1770 runtime_contracts: Some(RuntimeContracts::full()),
1771 },
1772 );
1773 runtime.admit_bundle(b1).expect("admit b1");
1774 runtime.admit_bundle(b2).expect("admit b2");
1775 assert!(
1776 Arc::strong_count(&shared) >= 3,
1777 "bundle admission should keep shared immutable artifacts in Arc"
1778 );
1779 }
1780
1781 #[test]
1782 fn composed_execution_runs_and_usage_grows_monotonically() {
1783 let mut runtime =
1784 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
1785 let b = ProtocolBundle::new(
1786 image("m"),
1787 CompositionCertificate {
1788 artifact_id: "cert/ok".to_string(),
1789 link_ok_full: true,
1790 theorem_pack: TheoremPackCapabilities::full(),
1791 runtime_contracts: Some(RuntimeContracts::full()),
1792 },
1793 );
1794 runtime.admit_bundle(b).expect("admit");
1795 runtime
1796 .load_bundle_sessions(0, 8)
1797 .expect("load composed sessions");
1798 let usage_before = runtime.memory_usage().clone();
1799 assert!(usage_before.session_count >= 8);
1800 assert!(usage_before.coroutine_count >= 16);
1801 runtime.run(&Noop, 512).expect("composed run");
1802 let usage_after = runtime.memory_usage().clone();
1803 assert!(usage_after.session_count >= usage_before.session_count);
1804 assert!(usage_after.coroutine_count >= usage_before.coroutine_count);
1805 }
1806
1807 #[test]
1808 fn admission_rejects_missing_scheduler_profile_capability() {
1809 let cfg = ProtocolMachineConfig {
1810 sched_policy: SchedPolicy::ProgressAware,
1811 ..ProtocolMachineConfig::default()
1812 };
1813 let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
1814 let bundle = ProtocolBundle::new(
1815 image("m"),
1816 CompositionCertificate {
1817 artifact_id: "cert/no-sched".to_string(),
1818 link_ok_full: true,
1819 theorem_pack: TheoremPackCapabilities {
1820 determinism: vec![
1821 DeterminismCapability::Full,
1822 DeterminismCapability::ModuloEffects,
1823 ],
1824 schedulers: vec![SchedulerCapability::Cooperative],
1825 output_condition_gating: true,
1826 },
1827 runtime_contracts: Some(RuntimeContracts::full()),
1828 },
1829 );
1830
1831 let err = runtime
1832 .admit_bundle(bundle)
1833 .expect_err("should reject bundle");
1834 assert!(matches!(
1835 err,
1836 CompositionError::MissingCapability { capability, .. }
1837 if capability == "scheduler::ProgressAware"
1838 ));
1839 }
1840
1841 #[test]
1842 fn admission_rejects_missing_determinism_capability() {
1843 let cfg = ProtocolMachineConfig {
1844 determinism_mode: DeterminismMode::ModuloCommutativity,
1845 ..ProtocolMachineConfig::default()
1846 };
1847 let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
1848 let bundle = ProtocolBundle::new(
1849 image("m"),
1850 CompositionCertificate {
1851 artifact_id: "cert/no-det".to_string(),
1852 link_ok_full: true,
1853 theorem_pack: TheoremPackCapabilities {
1854 determinism: vec![
1855 DeterminismCapability::Full,
1856 DeterminismCapability::ModuloEffects,
1857 ],
1858 schedulers: vec![SchedulerCapability::Cooperative],
1859 output_condition_gating: true,
1860 },
1861 runtime_contracts: Some(RuntimeContracts::full()),
1862 },
1863 );
1864
1865 let err = runtime
1866 .admit_bundle(bundle)
1867 .expect_err("should reject bundle");
1868 assert!(matches!(
1869 err,
1870 CompositionError::MissingCapability { capability, .. }
1871 if capability == "determinism::ModuloCommutativity"
1872 ));
1873 }
1874
1875 #[test]
1876 fn admission_rejects_missing_output_condition_capability() {
1877 let cfg = ProtocolMachineConfig {
1878 output_condition_policy: OutputConditionPolicy::AllowAll,
1879 ..ProtocolMachineConfig::default()
1880 };
1881 let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
1882 let bundle = ProtocolBundle::new(
1883 image("m"),
1884 CompositionCertificate {
1885 artifact_id: "cert/no-output-gate".to_string(),
1886 link_ok_full: true,
1887 theorem_pack: TheoremPackCapabilities {
1888 determinism: vec![DeterminismCapability::Full],
1889 schedulers: vec![SchedulerCapability::Cooperative],
1890 output_condition_gating: false,
1891 },
1892 runtime_contracts: Some(RuntimeContracts::full()),
1893 },
1894 );
1895
1896 let err = runtime
1897 .admit_bundle(bundle)
1898 .expect_err("should reject bundle");
1899 assert!(matches!(
1900 err,
1901 CompositionError::MissingCapability { capability, .. }
1902 if capability == "execution_profile"
1903 ));
1904 }
1905
1906 #[test]
1907 fn admission_accepts_when_required_capabilities_present() {
1908 let cfg = ProtocolMachineConfig {
1909 sched_policy: SchedPolicy::RoundRobin,
1910 determinism_mode: DeterminismMode::ModuloEffects,
1911 output_condition_policy: OutputConditionPolicy::PredicateAllowList(vec![
1912 "protocol_machine.observable_output".to_string(),
1913 ]),
1914 ..ProtocolMachineConfig::default()
1915 };
1916 let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
1917 let bundle = ProtocolBundle::new(
1918 image("m"),
1919 CompositionCertificate {
1920 artifact_id: "cert/full".to_string(),
1921 link_ok_full: true,
1922 theorem_pack: TheoremPackCapabilities::full(),
1923 runtime_contracts: Some(RuntimeContracts::full()),
1924 },
1925 );
1926 runtime
1927 .admit_bundle(bundle)
1928 .expect("bundle should be admitted");
1929 }
1930
1931 #[test]
1932 fn admission_accepts_minimal_required_capabilities_without_full_parity() {
1933 let cfg = ProtocolMachineConfig::default();
1934 let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
1935 let bundle = ProtocolBundle::new(
1936 image("m"),
1937 CompositionCertificate {
1938 artifact_id: "cert/minimal-required".to_string(),
1939 link_ok_full: true,
1940 theorem_pack: TheoremPackCapabilities {
1941 determinism: vec![
1942 DeterminismCapability::Full,
1943 DeterminismCapability::ModuloEffects,
1944 ],
1945 schedulers: vec![
1946 SchedulerCapability::Cooperative,
1947 SchedulerCapability::ProgressAware,
1948 ],
1949 output_condition_gating: true,
1950 },
1951 runtime_contracts: Some(RuntimeContracts::full()),
1952 },
1953 );
1954 runtime
1955 .admit_bundle(bundle)
1956 .expect("minimal required capabilities should be sufficient");
1957 }
1958
1959 #[test]
1960 fn admission_rejects_advanced_mode_without_runtime_contracts() {
1961 let cfg = ProtocolMachineConfig {
1962 sched_policy: SchedPolicy::RoundRobin,
1963 ..ProtocolMachineConfig::default()
1964 };
1965 let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
1966 let bundle = ProtocolBundle::new(
1967 image("m"),
1968 CompositionCertificate {
1969 artifact_id: "cert/no-runtime-contracts".to_string(),
1970 link_ok_full: true,
1971 theorem_pack: TheoremPackCapabilities::full(),
1972 runtime_contracts: None,
1973 },
1974 );
1975 let err = runtime
1976 .admit_bundle(bundle)
1977 .expect_err("advanced mode should reject missing runtime contracts");
1978 assert!(matches!(
1979 err,
1980 CompositionError::MissingRuntimeContracts { .. }
1981 ));
1982 }
1983
1984 #[test]
1985 fn admission_rejects_replay_profile_without_mixed_profile_gate() {
1986 let cfg = ProtocolMachineConfig {
1987 determinism_mode: DeterminismMode::Replay,
1988 ..ProtocolMachineConfig::default()
1989 };
1990 let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
1991 let mut contracts = RuntimeContracts::full();
1992 contracts.can_use_mixed_determinism_profiles = false;
1993 let bundle = ProtocolBundle::new(
1994 image("m"),
1995 CompositionCertificate {
1996 artifact_id: "cert/no-mixed-profile-gate".to_string(),
1997 link_ok_full: true,
1998 theorem_pack: TheoremPackCapabilities::full(),
1999 runtime_contracts: Some(contracts),
2000 },
2001 );
2002 let err = runtime
2003 .admit_bundle(bundle)
2004 .expect_err("replay profile should require mixed-profile gate");
2005 assert!(matches!(
2006 err,
2007 CompositionError::MissingCapability { capability, .. }
2008 if capability == "determinism_profile::Replay"
2009 ));
2010 }
2011
2012 #[test]
2013 fn admission_rejects_theorem_pack_without_transport_contracts() {
2014 let mut runtime =
2015 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2016 let bundle = ProtocolBundle::new(
2017 image("m"),
2018 CompositionCertificate {
2019 artifact_id: "cert/no-transport-contracts".to_string(),
2020 link_ok_full: true,
2021 theorem_pack: TheoremPackCapabilities::full(),
2022 runtime_contracts: None,
2023 },
2024 );
2025 let err = runtime
2026 .admit_bundle(bundle)
2027 .expect_err("theorem claims should require transport contracts");
2028 assert!(matches!(
2029 err,
2030 CompositionError::MissingTransportContracts { .. }
2031 ));
2032 }
2033
2034 #[test]
2035 fn admission_rejects_unauthenticated_transport_for_authenticated_theorem_claims() {
2036 let mut runtime =
2037 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2038 let unauthenticated = crate::runtime_contracts::RuntimeTransportContract::new(
2039 "UnauthenticatedTransport",
2040 "Network",
2041 )
2042 .with_role_addressed_routing(true)
2043 .with_per_peer_fifo_delivery(true)
2044 .with_fail_closed_unknown_role(true)
2045 .with_no_message_synthesis(true)
2046 .with_explicit_readiness_errors(true);
2047 let contracts = RuntimeContracts::full().with_transport_contracts([unauthenticated]);
2048 let bundle = ProtocolBundle::new(
2049 image("m"),
2050 CompositionCertificate {
2051 artifact_id: "cert/unauthenticated-transport".to_string(),
2052 link_ok_full: true,
2053 theorem_pack: TheoremPackCapabilities::full(),
2054 runtime_contracts: Some(contracts),
2055 },
2056 );
2057 let err = runtime.admit_bundle(bundle).expect_err(
2058 "unauthenticated transport should not satisfy authenticated theorem claims",
2059 );
2060 assert!(matches!(
2061 err,
2062 CompositionError::UnsatisfiedTransportContract {
2063 ref transport_name,
2064 requirement,
2065 ..
2066 } if transport_name == "UnauthenticatedTransport" && requirement == "authenticated_peers"
2067 ));
2068 }
2069
2070 #[test]
2071 fn admission_accepts_replay_profile_with_contracts_and_capability() {
2072 let cfg = ProtocolMachineConfig {
2073 determinism_mode: DeterminismMode::Replay,
2074 ..ProtocolMachineConfig::default()
2075 };
2076 let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
2077 let bundle = ProtocolBundle::new(
2078 image("m"),
2079 CompositionCertificate {
2080 artifact_id: "cert/replay-ok".to_string(),
2081 link_ok_full: true,
2082 theorem_pack: TheoremPackCapabilities::full(),
2083 runtime_contracts: Some(RuntimeContracts::full()),
2084 },
2085 );
2086 runtime
2087 .admit_bundle(bundle)
2088 .expect("replay profile should admit with matching contracts");
2089 }
2090
2091 #[test]
2092 fn reconfiguration_requires_runtime_capabilities_at_admission() {
2093 let mut runtime =
2094 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2095 let mut contracts = RuntimeContracts::full();
2096 contracts
2097 .capabilities
2098 .remove(&crate::runtime_contracts::RuntimeCapability::AutoscaleRepartition);
2099 let bundle = ProtocolBundle::new(
2100 image("m"),
2101 CompositionCertificate {
2102 artifact_id: "cert/reconfig-missing-cap".to_string(),
2103 link_ok_full: true,
2104 theorem_pack: TheoremPackCapabilities::full(),
2105 runtime_contracts: Some(contracts),
2106 },
2107 )
2108 .with_reconfiguration_policy(ReconfigurationPolicy {
2109 dynamic_membership: true,
2110 overlap_required: true,
2111 });
2112 let err = runtime
2113 .admit_bundle(bundle)
2114 .expect_err("dynamic membership should require explicit runtime capability");
2115 assert!(matches!(
2116 err,
2117 CompositionError::MissingReconfigurationCapability { capability, .. }
2118 if capability == "reconfiguration::dynamic_membership"
2119 ));
2120 }
2121
2122 #[test]
2123 fn reconfiguration_emits_deterministic_membership_events() {
2124 let mut runtime =
2125 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2126 let bundle = ProtocolBundle::new(
2127 image("m"),
2128 CompositionCertificate {
2129 artifact_id: "cert/reconfig-ok".to_string(),
2130 link_ok_full: true,
2131 theorem_pack: TheoremPackCapabilities::full(),
2132 runtime_contracts: Some(RuntimeContracts::full()),
2133 },
2134 )
2135 .with_reconfiguration_policy(ReconfigurationPolicy {
2136 dynamic_membership: true,
2137 overlap_required: true,
2138 });
2139 runtime.admit_bundle(bundle).expect("admit bundle");
2140 runtime
2141 .seed_bundle_membership(0, ["Alice", "Bob"])
2142 .expect("seed members");
2143
2144 let event = runtime
2145 .reconfigure_bundle(0, ["Bob", "Carol"])
2146 .expect("reconfigure");
2147 assert_eq!(event.epoch, 1);
2148 assert_eq!(event.previous_members, vec!["Alice", "Bob"]);
2149 assert_eq!(event.next_members, vec!["Bob", "Carol"]);
2150 assert_eq!(event.added_members, vec!["Carol"]);
2151 assert_eq!(event.removed_members, vec!["Alice"]);
2152 assert!(event.overlap_preserved);
2153 assert_eq!(
2154 runtime
2155 .bundle_members(0)
2156 .expect("members after reconfiguration"),
2157 &BTreeSet::from(["Bob".to_string(), "Carol".to_string()])
2158 );
2159 assert_eq!(
2160 runtime
2161 .bundle_reconfiguration_history(0)
2162 .expect("history")
2163 .last()
2164 .expect("event in history"),
2165 &event
2166 );
2167 }
2168
2169 #[test]
2170 fn reconfiguration_rejects_disjoint_membership_when_overlap_is_required() {
2171 let mut runtime =
2172 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2173 let bundle = ProtocolBundle::new(
2174 image("m"),
2175 CompositionCertificate {
2176 artifact_id: "cert/reconfig-overlap".to_string(),
2177 link_ok_full: true,
2178 theorem_pack: TheoremPackCapabilities::full(),
2179 runtime_contracts: Some(RuntimeContracts::full()),
2180 },
2181 )
2182 .with_reconfiguration_policy(ReconfigurationPolicy {
2183 dynamic_membership: true,
2184 overlap_required: true,
2185 });
2186 runtime.admit_bundle(bundle).expect("admit bundle");
2187 runtime
2188 .seed_bundle_membership(0, ["Alice", "Bob"])
2189 .expect("seed members");
2190 let err = runtime
2191 .reconfigure_bundle(0, ["Carol", "Dave"])
2192 .expect_err("overlap-required policy should reject disjoint membership");
2193 assert!(matches!(
2194 err,
2195 CompositionError::OverlapRequiredViolation { .. }
2196 ));
2197 }
2198
2199 #[test]
2200 fn reconfiguration_history_is_stable_across_recreated_runtimes() {
2201 fn drive_history() -> Vec<ReconfigurationEvent> {
2202 let mut runtime =
2203 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2204 let bundle = ProtocolBundle::new(
2205 image("m"),
2206 CompositionCertificate {
2207 artifact_id: "cert/reconfig-history".to_string(),
2208 link_ok_full: true,
2209 theorem_pack: TheoremPackCapabilities::full(),
2210 runtime_contracts: Some(RuntimeContracts::full()),
2211 },
2212 )
2213 .with_reconfiguration_policy(ReconfigurationPolicy {
2214 dynamic_membership: true,
2215 overlap_required: true,
2216 });
2217 runtime.admit_bundle(bundle).expect("admit bundle");
2218 runtime
2219 .seed_bundle_membership(0, ["Alice", "Bob"])
2220 .expect("seed members");
2221 runtime
2222 .reconfigure_bundle(0, ["Bob", "Carol"])
2223 .expect("first reconfiguration");
2224 runtime
2225 .reconfigure_bundle(0, ["Carol", "Dave"])
2226 .expect("second reconfiguration");
2227 runtime
2228 .bundle_reconfiguration_history(0)
2229 .expect("reconfiguration history")
2230 .to_vec()
2231 }
2232
2233 assert_eq!(drive_history(), drive_history());
2234 }
2235
2236 #[test]
2237 fn reconfiguration_plan_executes_atomically_with_phase_artifacts() {
2238 let mut runtime =
2239 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2240 let bundle = ProtocolBundle::new(
2241 image("m"),
2242 CompositionCertificate {
2243 artifact_id: "cert/reconfig-plan".to_string(),
2244 link_ok_full: true,
2245 theorem_pack: TheoremPackCapabilities::full(),
2246 runtime_contracts: Some(RuntimeContracts::full()),
2247 },
2248 )
2249 .with_reconfiguration_policy(ReconfigurationPolicy {
2250 dynamic_membership: true,
2251 overlap_required: true,
2252 });
2253 runtime.admit_bundle(bundle).expect("admit bundle");
2254 runtime
2255 .seed_bundle_membership(0, ["Alice", "Bob"])
2256 .expect("seed members");
2257
2258 let plan = ReconfigurationPlan {
2259 plan_id: "plan/blue-green".to_string(),
2260 steps: vec![
2261 ReconfigurationPlanStep {
2262 step_id: "prepare".to_string(),
2263 next_members: vec!["Bob".to_string(), "Carol".to_string(), "Dora".to_string()],
2264 placements: vec![
2265 PlacementObservation::local("Bob").with_region("eu_central_1"),
2266 PlacementObservation::colocated("Carol", "Bob").with_region("eu_central_1"),
2267 PlacementObservation::remote("Dora", "127.0.0.1:19821")
2268 .with_region("us_east_1"),
2269 ],
2270 },
2271 ReconfigurationPlanStep {
2272 step_id: "cutover".to_string(),
2273 next_members: vec!["Carol".to_string(), "Dora".to_string(), "Eve".to_string()],
2274 placements: vec![
2275 PlacementObservation::remote("Carol", "127.0.0.1:19822")
2276 .with_region("us_east_1"),
2277 PlacementObservation::remote("Dora", "127.0.0.1:19821")
2278 .with_region("us_east_1"),
2279 PlacementObservation::colocated("Eve", "Carol").with_region("us_east_1"),
2280 ],
2281 },
2282 ],
2283 };
2284
2285 let execution = runtime
2286 .execute_reconfiguration_plan(0, &plan)
2287 .expect("execute reconfiguration plan");
2288 assert_eq!(execution.plan_id, "plan/blue-green");
2289 assert_eq!(execution.initial_members, vec!["Alice", "Bob"]);
2290 assert_eq!(
2291 execution.final_members,
2292 vec!["Carol".to_string(), "Dora".to_string(), "Eve".to_string()]
2293 );
2294 assert_eq!(execution.phases.len(), 2);
2295 assert_eq!(execution.phases[0].event.epoch, 1);
2296 assert_eq!(execution.phases[1].event.epoch, 2);
2297 assert!(
2298 execution.phases[0]
2299 .transport_boundaries
2300 .iter()
2301 .any(|boundary| matches!(
2302 boundary.boundary,
2303 telltale_types::TransportBoundaryKind::SharedMemory
2304 )),
2305 "first phase should expose a colocated shared-memory boundary"
2306 );
2307 assert!(
2308 execution.phases[0]
2309 .transport_boundaries
2310 .iter()
2311 .any(|boundary| matches!(
2312 boundary.boundary,
2313 telltale_types::TransportBoundaryKind::Network
2314 )),
2315 "first phase should expose a remote network boundary"
2316 );
2317 assert_eq!(
2318 runtime
2319 .bundle_reconfiguration_history(0)
2320 .expect("history after plan")
2321 .len(),
2322 2
2323 );
2324 assert_eq!(
2325 runtime
2326 .bundle_reconfiguration_plan_executions(0)
2327 .expect("plan executions")
2328 .last(),
2329 Some(&execution)
2330 );
2331 }
2332
2333 #[test]
2334 fn invalid_reconfiguration_plan_rejects_without_partial_commit() {
2335 let mut runtime =
2336 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2337 let bundle = ProtocolBundle::new(
2338 image("m"),
2339 CompositionCertificate {
2340 artifact_id: "cert/reconfig-plan-invalid".to_string(),
2341 link_ok_full: true,
2342 theorem_pack: TheoremPackCapabilities::full(),
2343 runtime_contracts: Some(RuntimeContracts::full()),
2344 },
2345 )
2346 .with_reconfiguration_policy(ReconfigurationPolicy {
2347 dynamic_membership: true,
2348 overlap_required: true,
2349 });
2350 runtime.admit_bundle(bundle).expect("admit bundle");
2351 runtime
2352 .seed_bundle_membership(0, ["Alice", "Bob"])
2353 .expect("seed members");
2354
2355 let invalid_plan = ReconfigurationPlan {
2356 plan_id: "plan/invalid".to_string(),
2357 steps: vec![
2358 ReconfigurationPlanStep {
2359 step_id: "prepare".to_string(),
2360 next_members: vec!["Bob".to_string(), "Carol".to_string()],
2361 placements: vec![
2362 PlacementObservation::local("Bob"),
2363 PlacementObservation::local("Carol"),
2364 ],
2365 },
2366 ReconfigurationPlanStep {
2367 step_id: "split-brain".to_string(),
2368 next_members: vec!["Dora".to_string(), "Eve".to_string()],
2369 placements: vec![
2370 PlacementObservation::local("Dora"),
2371 PlacementObservation::local("Eve"),
2372 ],
2373 },
2374 ],
2375 };
2376
2377 let err = runtime
2378 .execute_reconfiguration_plan(0, &invalid_plan)
2379 .expect_err("invalid overlap-breaking plan must reject atomically");
2380 assert!(matches!(
2381 err,
2382 CompositionError::OverlapRequiredViolation { .. }
2383 ));
2384 assert_eq!(
2385 runtime.bundle_members(0).expect("members after rejection"),
2386 &BTreeSet::from(["Alice".to_string(), "Bob".to_string()])
2387 );
2388 assert!(
2389 runtime
2390 .bundle_reconfiguration_history(0)
2391 .expect("history after rejection")
2392 .is_empty(),
2393 "failed plan must not partially commit history"
2394 );
2395 assert!(
2396 runtime
2397 .bundle_reconfiguration_plan_executions(0)
2398 .expect("plan executions after rejection")
2399 .is_empty(),
2400 "failed plan must not record a plan execution"
2401 );
2402 }
2403
2404 #[test]
2405 fn reconfiguration_snapshot_restore_preserves_plan_execution_history() {
2406 fn configured_runtime(artifact_id: &str) -> ComposedRuntime {
2407 let mut runtime =
2408 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2409 let bundle = ProtocolBundle::new(
2410 image("m"),
2411 CompositionCertificate {
2412 artifact_id: artifact_id.to_string(),
2413 link_ok_full: true,
2414 theorem_pack: TheoremPackCapabilities::full(),
2415 runtime_contracts: Some(RuntimeContracts::full()),
2416 },
2417 )
2418 .with_reconfiguration_policy(ReconfigurationPolicy {
2419 dynamic_membership: true,
2420 overlap_required: true,
2421 });
2422 runtime.admit_bundle(bundle).expect("admit bundle");
2423 runtime
2424 }
2425
2426 let mut baseline = configured_runtime("cert/reconfig-snapshot");
2427 baseline
2428 .seed_bundle_membership(0, ["Alice", "Bob"])
2429 .expect("seed baseline members");
2430 let first_plan = ReconfigurationPlan {
2431 plan_id: "plan/prefix".to_string(),
2432 steps: vec![ReconfigurationPlanStep {
2433 step_id: "prepare".to_string(),
2434 next_members: vec!["Bob".to_string(), "Carol".to_string()],
2435 placements: vec![
2436 PlacementObservation::local("Bob").with_region("eu_central_1"),
2437 PlacementObservation::remote("Carol", "127.0.0.1:19831")
2438 .with_region("us_east_1"),
2439 ],
2440 }],
2441 };
2442 baseline
2443 .execute_reconfiguration_plan(0, &first_plan)
2444 .expect("execute first plan");
2445 let snapshot = serde_json::from_str::<ReconfigurationRuntimeSnapshot>(
2446 &serde_json::to_string(
2447 &baseline
2448 .bundle_reconfiguration_snapshot(0)
2449 .expect("snapshot after first plan"),
2450 )
2451 .expect("serialize snapshot"),
2452 )
2453 .expect("deserialize snapshot");
2454
2455 let second_plan = ReconfigurationPlan {
2456 plan_id: "plan/suffix".to_string(),
2457 steps: vec![ReconfigurationPlanStep {
2458 step_id: "cutover".to_string(),
2459 next_members: vec!["Carol".to_string(), "Dora".to_string()],
2460 placements: vec![
2461 PlacementObservation::remote("Carol", "127.0.0.1:19831")
2462 .with_region("us_east_1"),
2463 PlacementObservation::colocated("Dora", "Carol").with_region("us_east_1"),
2464 ],
2465 }],
2466 };
2467 let baseline_suffix = baseline
2468 .execute_reconfiguration_plan(0, &second_plan)
2469 .expect("execute second plan");
2470 let baseline_history = baseline
2471 .bundle_reconfiguration_history(0)
2472 .expect("baseline history")
2473 .to_vec();
2474 let baseline_executions = baseline
2475 .bundle_reconfiguration_plan_executions(0)
2476 .expect("baseline plan executions")
2477 .to_vec();
2478
2479 let mut restored = configured_runtime("cert/reconfig-snapshot");
2480 restored
2481 .restore_bundle_reconfiguration_snapshot(0, snapshot)
2482 .expect("restore reconfiguration snapshot");
2483 let restored_suffix = restored
2484 .execute_reconfiguration_plan(0, &second_plan)
2485 .expect("execute suffix after restore");
2486
2487 assert_eq!(restored_suffix, baseline_suffix);
2488 assert_eq!(
2489 restored
2490 .bundle_reconfiguration_history(0)
2491 .expect("restored history"),
2492 baseline_history.as_slice()
2493 );
2494 assert_eq!(
2495 restored
2496 .bundle_reconfiguration_plan_executions(0)
2497 .expect("restored plan executions"),
2498 baseline_executions.as_slice()
2499 );
2500 }
2501
2502 #[test]
2503 fn runtime_upgrade_emits_staged_admitted_and_committed_artifacts() {
2504 let mut runtime =
2505 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2506 let bundle = ProtocolBundle::new(
2507 image("m"),
2508 CompositionCertificate {
2509 artifact_id: "cert/runtime-upgrade".to_string(),
2510 link_ok_full: true,
2511 theorem_pack: TheoremPackCapabilities::full(),
2512 runtime_contracts: Some(RuntimeContracts::full()),
2513 },
2514 )
2515 .with_reconfiguration_policy(ReconfigurationPolicy {
2516 dynamic_membership: true,
2517 overlap_required: true,
2518 });
2519 runtime.admit_bundle(bundle).expect("admit bundle");
2520 runtime
2521 .seed_bundle_membership(0, ["Alice", "Bob"])
2522 .expect("seed members");
2523
2524 let request = RuntimeUpgradeRequest {
2525 upgrade_id: "upgrade/v2".to_string(),
2526 plan: ReconfigurationPlan {
2527 plan_id: "plan/upgrade".to_string(),
2528 steps: vec![ReconfigurationPlanStep {
2529 step_id: "cutover".to_string(),
2530 next_members: vec!["Bob".to_string(), "Carol".to_string()],
2531 placements: vec![
2532 PlacementObservation::local("Bob").with_region("eu_central_1"),
2533 PlacementObservation::remote("Carol", "127.0.0.1:19901")
2534 .with_region("us_east_1"),
2535 ],
2536 }],
2537 },
2538 compatibility: RuntimeUpgradeCompatibility {
2539 execution_constraint: RuntimeUpgradeExecutionConstraint::PreserveBundleProfile,
2540 ownership_continuity_required: true,
2541 pending_effect_treatment: PendingEffectTreatment::InvalidateBlocked,
2542 canonical_publication_continuity:
2543 CanonicalPublicationContinuity::PreserveCanonicalTruth,
2544 },
2545 carried_publication_ids: vec!["publication:accept#1".to_string()],
2546 invalidated_publication_ids: Vec::new(),
2547 carried_obligation_ids: vec!["obligation:pending#1".to_string()],
2548 invalidated_obligation_ids: Vec::new(),
2549 };
2550
2551 let execution = runtime
2552 .execute_runtime_upgrade(0, &request)
2553 .expect("execute runtime upgrade");
2554 assert_eq!(execution.upgrade_id, "upgrade/v2");
2555 assert_eq!(execution.artifacts.len(), 3);
2556 assert_eq!(
2557 execution.artifacts[0].phase,
2558 TransitionArtifactPhase::Staged
2559 );
2560 assert_eq!(
2561 execution.artifacts[1].phase,
2562 TransitionArtifactPhase::Admitted
2563 );
2564 assert_eq!(
2565 execution.artifacts[2].phase,
2566 TransitionArtifactPhase::CommittedCutover
2567 );
2568 assert_eq!(
2569 execution.artifacts[2].carried_publication_ids,
2570 vec!["publication:accept#1".to_string()]
2571 );
2572 assert_eq!(
2573 runtime
2574 .bundle_runtime_upgrade_executions(0)
2575 .expect("runtime upgrade executions")
2576 .last(),
2577 Some(&execution)
2578 );
2579 }
2580
2581 #[test]
2582 fn runtime_upgrade_rejects_when_canonical_continuity_is_violated() {
2583 let mut runtime =
2584 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2585 let bundle = ProtocolBundle::new(
2586 image("m"),
2587 CompositionCertificate {
2588 artifact_id: "cert/runtime-upgrade-reject".to_string(),
2589 link_ok_full: true,
2590 theorem_pack: TheoremPackCapabilities::full(),
2591 runtime_contracts: Some(RuntimeContracts::full()),
2592 },
2593 )
2594 .with_reconfiguration_policy(ReconfigurationPolicy {
2595 dynamic_membership: true,
2596 overlap_required: true,
2597 });
2598 runtime.admit_bundle(bundle).expect("admit bundle");
2599 runtime
2600 .seed_bundle_membership(0, ["Alice", "Bob"])
2601 .expect("seed members");
2602
2603 let request = RuntimeUpgradeRequest {
2604 upgrade_id: "upgrade/reject".to_string(),
2605 plan: ReconfigurationPlan {
2606 plan_id: "plan/reject".to_string(),
2607 steps: vec![ReconfigurationPlanStep {
2608 step_id: "cutover".to_string(),
2609 next_members: vec!["Bob".to_string(), "Carol".to_string()],
2610 placements: vec![
2611 PlacementObservation::local("Bob"),
2612 PlacementObservation::remote("Carol", "127.0.0.1:19902"),
2613 ],
2614 }],
2615 },
2616 compatibility: RuntimeUpgradeCompatibility {
2617 execution_constraint: RuntimeUpgradeExecutionConstraint::PreserveBundleProfile,
2618 ownership_continuity_required: true,
2619 pending_effect_treatment: PendingEffectTreatment::PreservePending,
2620 canonical_publication_continuity:
2621 CanonicalPublicationContinuity::PreserveCanonicalTruth,
2622 },
2623 carried_publication_ids: vec!["publication:accept#1".to_string()],
2624 invalidated_publication_ids: vec!["publication:accept#1".to_string()],
2625 carried_obligation_ids: Vec::new(),
2626 invalidated_obligation_ids: Vec::new(),
2627 };
2628
2629 let err = runtime
2630 .execute_runtime_upgrade(0, &request)
2631 .expect_err("continuity violation must reject");
2632 assert!(matches!(
2633 err,
2634 CompositionError::InvalidReconfigurationPlan { .. }
2635 ));
2636 let execution = runtime
2637 .bundle_runtime_upgrade_executions(0)
2638 .expect("runtime upgrade executions")
2639 .last()
2640 .expect("failed execution artifact");
2641 assert_eq!(
2642 execution.artifacts.last().expect("failure artifact").phase,
2643 TransitionArtifactPhase::Failed
2644 );
2645 }
2646
2647 #[test]
2648 fn runtime_upgrade_rollback_is_replay_visible_when_plan_breaks_overlap() {
2649 let mut runtime =
2650 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2651 let bundle = ProtocolBundle::new(
2652 image("m"),
2653 CompositionCertificate {
2654 artifact_id: "cert/runtime-upgrade-rollback".to_string(),
2655 link_ok_full: true,
2656 theorem_pack: TheoremPackCapabilities::full(),
2657 runtime_contracts: Some(RuntimeContracts::full()),
2658 },
2659 )
2660 .with_reconfiguration_policy(ReconfigurationPolicy {
2661 dynamic_membership: true,
2662 overlap_required: true,
2663 });
2664 runtime.admit_bundle(bundle).expect("admit bundle");
2665 runtime
2666 .seed_bundle_membership(0, ["Alice", "Bob"])
2667 .expect("seed members");
2668
2669 let request = RuntimeUpgradeRequest {
2670 upgrade_id: "upgrade/rollback".to_string(),
2671 plan: ReconfigurationPlan {
2672 plan_id: "plan/rollback".to_string(),
2673 steps: vec![ReconfigurationPlanStep {
2674 step_id: "cutover".to_string(),
2675 next_members: vec!["Carol".to_string(), "Dora".to_string()],
2676 placements: vec![
2677 PlacementObservation::local("Carol"),
2678 PlacementObservation::local("Dora"),
2679 ],
2680 }],
2681 },
2682 compatibility: RuntimeUpgradeCompatibility {
2683 execution_constraint: RuntimeUpgradeExecutionConstraint::PreserveBundleProfile,
2684 ownership_continuity_required: false,
2685 pending_effect_treatment: PendingEffectTreatment::InvalidateBlocked,
2686 canonical_publication_continuity:
2687 CanonicalPublicationContinuity::ReissueCanonicalTruth,
2688 },
2689 carried_publication_ids: Vec::new(),
2690 invalidated_publication_ids: vec!["publication:old#1".to_string()],
2691 carried_obligation_ids: vec!["obligation:pending#1".to_string()],
2692 invalidated_obligation_ids: vec!["obligation:blocked#1".to_string()],
2693 };
2694
2695 let err = runtime
2696 .execute_runtime_upgrade(0, &request)
2697 .expect_err("overlap failure must roll back");
2698 assert!(matches!(
2699 err,
2700 CompositionError::OverlapRequiredViolation { .. }
2701 ));
2702 let execution = runtime
2703 .bundle_runtime_upgrade_executions(0)
2704 .expect("runtime upgrade executions")
2705 .last()
2706 .expect("rolled-back execution");
2707 assert_eq!(
2708 execution.artifacts.last().expect("rollback artifact").phase,
2709 TransitionArtifactPhase::RolledBack
2710 );
2711 assert_eq!(
2712 runtime.bundle_members(0).expect("members after rollback"),
2713 &BTreeSet::from(["Alice".to_string(), "Bob".to_string()])
2714 );
2715 }
2716
2717 #[test]
2718 fn runtime_upgrade_snapshot_restore_preserves_upgrade_history() {
2719 let mut baseline =
2720 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2721 let bundle = ProtocolBundle::new(
2722 image("m"),
2723 CompositionCertificate {
2724 artifact_id: "cert/runtime-upgrade-snapshot".to_string(),
2725 link_ok_full: true,
2726 theorem_pack: TheoremPackCapabilities::full(),
2727 runtime_contracts: Some(RuntimeContracts::full()),
2728 },
2729 )
2730 .with_reconfiguration_policy(ReconfigurationPolicy {
2731 dynamic_membership: true,
2732 overlap_required: true,
2733 });
2734 baseline.admit_bundle(bundle).expect("admit bundle");
2735 baseline
2736 .seed_bundle_membership(0, ["Alice", "Bob"])
2737 .expect("seed members");
2738
2739 let request = RuntimeUpgradeRequest {
2740 upgrade_id: "upgrade/snapshot".to_string(),
2741 plan: ReconfigurationPlan {
2742 plan_id: "plan/snapshot".to_string(),
2743 steps: vec![ReconfigurationPlanStep {
2744 step_id: "cutover".to_string(),
2745 next_members: vec!["Bob".to_string(), "Carol".to_string()],
2746 placements: vec![
2747 PlacementObservation::local("Bob"),
2748 PlacementObservation::remote("Carol", "127.0.0.1:19903"),
2749 ],
2750 }],
2751 },
2752 compatibility: RuntimeUpgradeCompatibility {
2753 execution_constraint: RuntimeUpgradeExecutionConstraint::PreserveBundleProfile,
2754 ownership_continuity_required: true,
2755 pending_effect_treatment: PendingEffectTreatment::InvalidateBlocked,
2756 canonical_publication_continuity:
2757 CanonicalPublicationContinuity::PreserveCanonicalTruth,
2758 },
2759 carried_publication_ids: vec!["publication:stable#1".to_string()],
2760 invalidated_publication_ids: Vec::new(),
2761 carried_obligation_ids: vec!["obligation:stable#1".to_string()],
2762 invalidated_obligation_ids: Vec::new(),
2763 };
2764 let baseline_execution = baseline
2765 .execute_runtime_upgrade(0, &request)
2766 .expect("execute baseline upgrade");
2767 let snapshot = serde_json::from_str::<ReconfigurationRuntimeSnapshot>(
2768 &serde_json::to_string(
2769 &baseline
2770 .bundle_reconfiguration_snapshot(0)
2771 .expect("snapshot after upgrade"),
2772 )
2773 .expect("serialize snapshot"),
2774 )
2775 .expect("deserialize snapshot");
2776
2777 let mut restored =
2778 ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2779 let bundle = ProtocolBundle::new(
2780 image("m"),
2781 CompositionCertificate {
2782 artifact_id: "cert/runtime-upgrade-snapshot".to_string(),
2783 link_ok_full: true,
2784 theorem_pack: TheoremPackCapabilities::full(),
2785 runtime_contracts: Some(RuntimeContracts::full()),
2786 },
2787 )
2788 .with_reconfiguration_policy(ReconfigurationPolicy {
2789 dynamic_membership: true,
2790 overlap_required: true,
2791 });
2792 restored.admit_bundle(bundle).expect("admit bundle");
2793 restored
2794 .restore_bundle_reconfiguration_snapshot(0, snapshot)
2795 .expect("restore snapshot");
2796
2797 assert_eq!(
2798 restored
2799 .bundle_runtime_upgrade_executions(0)
2800 .expect("restored upgrades"),
2801 &[baseline_execution]
2802 );
2803 }
2804}