Skip to main content

telltale_machine/
composition.rs

1//! Protocol composition API for running many protocols in one ProtocolMachine instance.
2//!
3//! This layer provides:
4//! - proof-carrying admission (`LinkOKFull`-style certificate flag),
5//! - shared immutable protocol artifacts (`Arc<CodeImage>`),
6//! - memory-budget accounting for composed workloads.
7
8use std::collections::BTreeSet;
9use std::mem::size_of;
10use std::sync::Arc;
11
12use crate::determinism::DeterminismMode;
13use crate::effect::EffectHandler;
14use crate::engine::{ProtocolMachine, ProtocolMachineConfig, ProtocolMachineError};
15use crate::loader::CodeImage;
16use crate::output_condition::OutputConditionPolicy;
17use crate::runtime_contracts::{
18    enforce_protocol_machine_runtime_gates, execution_profile_supported,
19    validate_transport_contracts_for_execution_profile, ProtocolMachineAdmissibilityClass,
20    ProtocolMachineEscalationWindowClass, ProtocolMachineExecutionProfile,
21    ProtocolMachineFairnessAssumption, RuntimeContracts, RuntimeGateResult,
22    TransportContractGateError,
23};
24use crate::scheduler::SchedPolicy;
25use telltale_types::{
26    canonical_transport_boundaries, canonicalize_placement_observations,
27    CanonicalPublicationContinuity, PendingEffectTreatment, PlacementObservation,
28    RuntimeUpgradeArtifact, RuntimeUpgradeCompatibility, RuntimeUpgradeExecutionConstraint,
29    TransitionArtifactPhase, TransportBoundaryObservation,
30};
31
32type ValidatedPlanStepArtifacts = (
33    BTreeSet<String>,
34    Vec<PlacementObservation>,
35    Vec<TransportBoundaryObservation>,
36);
37
38/// Determinism capability required to admit a protocol bundle.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum DeterminismCapability {
41    /// Supports full determinism for fixed initial state/policy/effect trace.
42    Full,
43    /// Supports determinism modulo effect traces.
44    ModuloEffects,
45    /// Supports determinism modulo admissible commutativity.
46    ModuloCommutativity,
47    /// Supports replay determinism profile.
48    Replay,
49}
50
51/// Scheduler profile capability required to admit a protocol bundle.
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum SchedulerCapability {
54    /// Cooperative scheduler profile.
55    Cooperative,
56    /// Round-robin scheduler profile.
57    RoundRobin,
58    /// Priority scheduler profile.
59    Priority,
60    /// Progress-aware scheduler profile.
61    ProgressAware,
62}
63
64/// Theorem-pack capabilities surfaced by a proof artifact.
65#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct TheoremPackCapabilities {
67    /// Determinism profiles certified by the proof artifact.
68    pub determinism: Vec<DeterminismCapability>,
69    /// Scheduler profiles certified by the proof artifact.
70    pub schedulers: Vec<SchedulerCapability>,
71    /// Whether output-condition commit-gating is certified.
72    pub output_condition_gating: bool,
73}
74
75impl TheoremPackCapabilities {
76    /// Capability set that allows all currently supported advanced runtime modes.
77    #[must_use]
78    pub fn full() -> Self {
79        Self {
80            determinism: vec![
81                DeterminismCapability::Full,
82                DeterminismCapability::ModuloEffects,
83                DeterminismCapability::ModuloCommutativity,
84                DeterminismCapability::Replay,
85            ],
86            schedulers: vec![
87                SchedulerCapability::Cooperative,
88                SchedulerCapability::RoundRobin,
89                SchedulerCapability::Priority,
90                SchedulerCapability::ProgressAware,
91            ],
92            output_condition_gating: true,
93        }
94    }
95
96    /// Proof-carrying execution profile derived from theorem-pack capabilities.
97    #[must_use]
98    pub fn execution_profile(&self) -> ProtocolMachineExecutionProfile {
99        let mut fairness_assumptions: std::collections::BTreeSet<_> =
100            [ProtocolMachineFairnessAssumption::ScheduleConfluence]
101                .into_iter()
102                .collect();
103        if self.schedulers.iter().any(|scheduler| {
104            matches!(
105                scheduler,
106                SchedulerCapability::RoundRobin
107                    | SchedulerCapability::Priority
108                    | SchedulerCapability::ProgressAware
109            )
110        }) {
111            fairness_assumptions.insert(ProtocolMachineFairnessAssumption::StarvationFreedom);
112        }
113        if self
114            .schedulers
115            .iter()
116            .any(|scheduler| matches!(scheduler, SchedulerCapability::ProgressAware))
117        {
118            fairness_assumptions.insert(ProtocolMachineFairnessAssumption::LivenessFairness);
119        }
120
121        let mut admissibility_classes: std::collections::BTreeSet<_> = [
122            ProtocolMachineAdmissibilityClass::LocalEnvelope,
123            ProtocolMachineAdmissibilityClass::ShardedEnvelope,
124        ]
125        .into_iter()
126        .collect();
127        let mut escalation_window_classes: std::collections::BTreeSet<_> =
128            [ProtocolMachineEscalationWindowClass::ProgressContract]
129                .into_iter()
130                .collect();
131        if self.output_condition_gating {
132            admissibility_classes.insert(ProtocolMachineAdmissibilityClass::ProtocolEnvelopeBridge);
133            escalation_window_classes.insert(ProtocolMachineEscalationWindowClass::ProtocolBridge);
134        }
135        if self
136            .determinism
137            .iter()
138            .any(|capability| !matches!(capability, DeterminismCapability::Full))
139        {
140            escalation_window_classes
141                .insert(ProtocolMachineEscalationWindowClass::AdmissionComplexity);
142        }
143
144        ProtocolMachineExecutionProfile {
145            fairness_assumptions,
146            admissibility_classes,
147            escalation_window_classes,
148            theorem_pack_eligibility: vec![
149                ("protocol_machine_envelope_adherence".to_string(), true),
150                (
151                    "protocol_machine_envelope_admission".to_string(),
152                    self.determinism.iter().any(|capability| {
153                        matches!(
154                            capability,
155                            DeterminismCapability::Full
156                                | DeterminismCapability::Replay
157                                | DeterminismCapability::ModuloEffects
158                                | DeterminismCapability::ModuloCommutativity
159                        )
160                    }),
161                ),
162                (
163                    "protocol_envelope_bridge".to_string(),
164                    self.output_condition_gating,
165                ),
166            ],
167        }
168    }
169
170    /// Transport semantics required by this theorem-pack capability set.
171    #[must_use]
172    pub fn transport_requirements(&self) -> crate::runtime_contracts::TheoremTransportRequirements {
173        self.execution_profile().transport_requirements()
174    }
175}
176
177/// Proof/certificate artifact required for composed protocol admission.
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub struct CompositionCertificate {
180    /// Stable identifier for the certificate/proof artifact.
181    pub artifact_id: String,
182    /// Whether this artifact witnesses `LinkOKFull`-style compatibility.
183    pub link_ok_full: bool,
184    /// Theorem-pack capabilities proven by this certificate.
185    pub theorem_pack: TheoremPackCapabilities,
186    /// Optional runtime contract bundle required for advanced runtime modes.
187    pub runtime_contracts: Option<RuntimeContracts>,
188}
189
190/// Reconfiguration policy admitted for one protocol bundle.
191#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
192pub struct ReconfigurationPolicy {
193    /// Whether the runtime may change the active member set over time.
194    pub dynamic_membership: bool,
195    /// Whether consecutive member sets must overlap.
196    pub overlap_required: bool,
197}
198
199/// Deterministic audit artifact emitted for one accepted reconfiguration.
200#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
201pub struct ReconfigurationEvent {
202    /// Stable bundle/certificate identifier.
203    pub artifact_id: String,
204    /// Monotonic epoch after the transition.
205    pub epoch: u64,
206    /// Canonical sorted member set before the transition.
207    pub previous_members: Vec<String>,
208    /// Canonical sorted member set after the transition.
209    pub next_members: Vec<String>,
210    /// Canonical sorted members added by the transition.
211    pub added_members: Vec<String>,
212    /// Canonical sorted members removed by the transition.
213    pub removed_members: Vec<String>,
214    /// Whether overlap was preserved on this transition.
215    pub overlap_preserved: bool,
216    /// Policy flag carried by the admitted bundle.
217    pub dynamic_membership: bool,
218    /// Policy flag carried by the admitted bundle.
219    pub overlap_required: bool,
220}
221
222/// Deterministic placement-aware reconfiguration step.
223#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
224pub struct ReconfigurationPlanStep {
225    /// Stable step identifier within the plan.
226    pub step_id: String,
227    /// Canonical sorted membership set after this step.
228    pub next_members: Vec<String>,
229    /// Canonical placement observations for the target membership set.
230    pub placements: Vec<PlacementObservation>,
231}
232
233/// Deterministic multi-step reconfiguration plan.
234#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
235pub struct ReconfigurationPlan {
236    /// Stable plan identifier.
237    pub plan_id: String,
238    /// Canonical ordered steps for the plan.
239    pub steps: Vec<ReconfigurationPlanStep>,
240}
241
242/// Canonical semantic artifact for one executed reconfiguration phase.
243#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
244pub struct ReconfigurationPhaseArtifact {
245    /// Stable step identifier.
246    pub step_id: String,
247    /// Accepted membership transition event.
248    pub event: ReconfigurationEvent,
249    /// Canonical placement observations for the active membership after the step.
250    pub placements: Vec<PlacementObservation>,
251    /// Canonical transport-observable boundaries implied by the placements.
252    pub transport_boundaries: Vec<TransportBoundaryObservation>,
253}
254
255/// Canonical semantic artifact for one executed reconfiguration plan.
256#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
257pub struct ReconfigurationPlanExecution {
258    /// Stable certificate artifact id.
259    pub artifact_id: String,
260    /// Stable plan identifier.
261    pub plan_id: String,
262    /// Canonical initial membership set.
263    pub initial_members: Vec<String>,
264    /// Executed phases in order.
265    pub phases: Vec<ReconfigurationPhaseArtifact>,
266    /// Canonical final membership set.
267    pub final_members: Vec<String>,
268}
269
270/// Specialized runtime-upgrade request carried through the reconfiguration layer.
271#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
272pub struct RuntimeUpgradeRequest {
273    /// Stable upgrade identifier.
274    pub upgrade_id: String,
275    /// Underlying deterministic reconfiguration plan.
276    pub plan: ReconfigurationPlan,
277    /// Compatibility contract required for the cutover.
278    pub compatibility: RuntimeUpgradeCompatibility,
279    /// Canonical publication ids carried into the upgraded runtime.
280    pub carried_publication_ids: Vec<String>,
281    /// Canonical publication ids invalidated by the upgrade.
282    pub invalidated_publication_ids: Vec<String>,
283    /// Stable obligation ids carried into the upgraded runtime.
284    pub carried_obligation_ids: Vec<String>,
285    /// Stable obligation ids invalidated by the upgrade.
286    pub invalidated_obligation_ids: Vec<String>,
287}
288
289/// Canonical semantic artifact for one executed runtime-upgrade request.
290#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
291pub struct RuntimeUpgradeExecution {
292    /// Stable certificate artifact id.
293    pub artifact_id: String,
294    /// Stable upgrade identifier.
295    pub upgrade_id: String,
296    /// Executed runtime-upgrade artifacts in order.
297    pub artifacts: Vec<RuntimeUpgradeArtifact>,
298    /// Canonical final membership after the upgrade.
299    pub final_members: Vec<String>,
300}
301
302/// Serializable snapshot of the reconfiguration state for one admitted bundle.
303#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
304pub struct ReconfigurationRuntimeSnapshot {
305    /// Current epoch.
306    pub epoch: u64,
307    /// Canonical sorted active members.
308    pub active_members: Vec<String>,
309    /// Deterministic accepted transition history.
310    pub history: Vec<ReconfigurationEvent>,
311    /// Deterministic executed plan artifacts.
312    pub plan_executions: Vec<ReconfigurationPlanExecution>,
313    /// Deterministic runtime-upgrade executions.
314    pub runtime_upgrades: Vec<RuntimeUpgradeExecution>,
315}
316
317#[derive(Debug, Clone, Default)]
318struct ReconfigurationRuntimeState {
319    epoch: u64,
320    active_members: BTreeSet<String>,
321    history: Vec<ReconfigurationEvent>,
322    plan_executions: Vec<ReconfigurationPlanExecution>,
323    runtime_upgrades: Vec<RuntimeUpgradeExecution>,
324}
325
326/// Immutable protocol bundle loaded by the composition API.
327#[derive(Debug, Clone)]
328pub struct ProtocolBundle {
329    /// Shared immutable code image.
330    pub code: Arc<CodeImage>,
331    /// Certificate checked at admission time.
332    pub certificate: CompositionCertificate,
333    /// Optional runtime reconfiguration policy admitted for this bundle.
334    pub reconfiguration_policy: Option<ReconfigurationPolicy>,
335}
336
337impl ProtocolBundle {
338    /// Construct a bundle from a code image and certificate.
339    #[must_use]
340    pub fn new(code: Arc<CodeImage>, certificate: CompositionCertificate) -> Self {
341        Self {
342            code,
343            certificate,
344            reconfiguration_policy: None,
345        }
346    }
347
348    /// Attach a deterministic reconfiguration policy to the bundle.
349    #[must_use]
350    pub fn with_reconfiguration_policy(mut self, policy: ReconfigurationPolicy) -> Self {
351        self.reconfiguration_policy = Some(policy);
352        self
353    }
354
355    /// The admitted reconfiguration policy for this bundle, if any.
356    #[must_use]
357    pub fn reconfiguration_policy(&self) -> Option<&ReconfigurationPolicy> {
358        self.reconfiguration_policy.as_ref()
359    }
360}
361
362/// Memory budget policy for composed execution.
363#[derive(Debug, Clone, PartialEq, Eq)]
364pub struct MemoryBudget {
365    /// Max bytes allowed per coroutine.
366    pub max_bytes_per_coroutine: usize,
367    /// Max bytes allowed per session.
368    pub max_bytes_per_session: usize,
369    /// Max incremental bytes allowed per additional protocol.
370    pub max_incremental_bytes_per_protocol: usize,
371}
372
373impl Default for MemoryBudget {
374    fn default() -> Self {
375        Self {
376            max_bytes_per_coroutine: 64 * 1024,
377            max_bytes_per_session: 256 * 1024,
378            max_incremental_bytes_per_protocol: 128 * 1024,
379        }
380    }
381}
382
383/// Composition memory usage snapshot.
384#[derive(Debug, Clone, Default, PartialEq, Eq)]
385pub struct MemoryUsage {
386    /// Estimated bytes per coroutine.
387    pub bytes_per_coroutine: usize,
388    /// Estimated bytes per session.
389    pub bytes_per_session: usize,
390    /// Estimated incremental overhead per protocol.
391    pub incremental_bytes_per_protocol: usize,
392    /// Number of loaded protocol bundles.
393    pub protocol_count: usize,
394    /// Number of live sessions.
395    pub session_count: usize,
396    /// Number of live coroutines.
397    pub coroutine_count: usize,
398}
399
400/// Errors emitted by the composition API.
401#[derive(Debug, thiserror::Error)]
402pub enum CompositionError {
403    /// Missing/invalid proof artifact.
404    #[error("bundle `{artifact_id}` rejected: missing LinkOKFull compatibility evidence")]
405    MissingCompatibilityProof {
406        /// Certificate artifact id that failed admission.
407        artifact_id: String,
408    },
409    /// The certificate does not expose required theorem/capability evidence.
410    #[error("bundle `{artifact_id}` rejected: missing required capability `{capability}`")]
411    MissingCapability {
412        /// Certificate artifact id that failed admission.
413        artifact_id: String,
414        /// Human-readable capability key.
415        capability: String,
416    },
417    /// Advanced runtime mode requires runtime contract evidence.
418    #[error("bundle `{artifact_id}` rejected: missing ProtocolMachine runtime contracts for advanced mode")]
419    MissingRuntimeContracts {
420        /// Certificate artifact id that failed admission.
421        artifact_id: String,
422    },
423    /// Theorem-backed admission requires transport contracts.
424    #[error(
425        "bundle `{artifact_id}` rejected: missing transport contracts for theorem-pack claims"
426    )]
427    MissingTransportContracts {
428        /// Certificate artifact id that failed admission.
429        artifact_id: String,
430    },
431    /// A selected transport does not satisfy theorem-pack transport requirements.
432    #[error("bundle `{artifact_id}` rejected: transport `{transport_name}` does not satisfy required contract `{requirement}`")]
433    UnsatisfiedTransportContract {
434        /// Certificate artifact id that failed admission.
435        artifact_id: String,
436        /// Transport profile that failed admission.
437        transport_name: String,
438        /// Required semantic field.
439        requirement: &'static str,
440    },
441    /// Bundle index does not exist.
442    #[error("bundle index {bundle_idx} is out of range")]
443    InvalidBundleIndex {
444        /// The bundle index that was requested.
445        bundle_idx: usize,
446    },
447    /// Reconfiguration was requested for a bundle that did not admit it.
448    #[error("bundle `{artifact_id}` rejected reconfiguration request: bundle is not reconfiguration-enabled")]
449    ReconfigurationDisabled {
450        /// Certificate artifact id associated with the rejected bundle.
451        artifact_id: String,
452    },
453    /// Reconfiguration requires a runtime capability that was not supplied.
454    #[error("bundle `{artifact_id}` rejected reconfiguration request: missing required capability `{capability}`")]
455    MissingReconfigurationCapability {
456        /// Certificate artifact id associated with the rejected bundle.
457        artifact_id: String,
458        /// Human-readable capability key.
459        capability: String,
460    },
461    /// Reconfiguration attempted to produce an empty membership set.
462    #[error(
463        "bundle `{artifact_id}` rejected reconfiguration request: membership set may not be empty"
464    )]
465    EmptyMembership {
466        /// Certificate artifact id associated with the rejected bundle.
467        artifact_id: String,
468    },
469    /// Reconfiguration violated the admitted overlap policy.
470    #[error("bundle `{artifact_id}` rejected reconfiguration request: overlap_required but member sets are disjoint")]
471    OverlapRequiredViolation {
472        /// Certificate artifact id associated with the rejected bundle.
473        artifact_id: String,
474    },
475    /// Reconfiguration plan was internally inconsistent.
476    #[error("bundle `{artifact_id}` rejected reconfiguration plan: {reason}")]
477    InvalidReconfigurationPlan {
478        /// Certificate artifact id associated with the rejected bundle.
479        artifact_id: String,
480        /// Human-readable validation reason.
481        reason: String,
482    },
483    /// Admission would violate memory budget.
484    #[error("bundle `{artifact_id}` rejected: memory budget exceeded ({reason})")]
485    BudgetExceeded {
486        /// Certificate artifact id associated with the rejected bundle.
487        artifact_id: String,
488        /// Human-readable budget reason.
489        reason: String,
490    },
491    /// ProtocolMachine-layer load/run failure.
492    #[error(transparent)]
493    Vm(#[from] ProtocolMachineError),
494}
495
496/// Runtime wrapper for composed protocol execution.
497#[derive(Debug)]
498pub struct ComposedRuntime {
499    machine: ProtocolMachine,
500    bundles: Vec<ProtocolBundle>,
501    reconfiguration_states: Vec<Option<ReconfigurationRuntimeState>>,
502    budget: MemoryBudget,
503    usage: MemoryUsage,
504}
505
506impl ComposedRuntime {
507    /// Create an empty composed runtime.
508    #[must_use]
509    pub fn new(config: ProtocolMachineConfig, budget: MemoryBudget) -> Self {
510        let machine = ProtocolMachine::new(config);
511        Self {
512            machine,
513            bundles: Vec::new(),
514            reconfiguration_states: Vec::new(),
515            budget,
516            usage: MemoryUsage {
517                bytes_per_coroutine: size_of::<crate::coroutine::Coroutine>(),
518                bytes_per_session: size_of::<crate::session::SessionState>(),
519                incremental_bytes_per_protocol: size_of::<Arc<CodeImage>>(),
520                ..MemoryUsage::default()
521            },
522        }
523    }
524
525    /// Admit a protocol bundle after certificate + budget checks.
526    ///
527    /// # Errors
528    ///
529    /// Returns a `CompositionError` when proof or budget checks fail.
530    pub fn admit_bundle(&mut self, bundle: ProtocolBundle) -> Result<(), CompositionError> {
531        if !bundle.certificate.link_ok_full {
532            return Err(CompositionError::MissingCompatibilityProof {
533                artifact_id: bundle.certificate.artifact_id,
534            });
535        }
536        self.require_capabilities(&bundle)?;
537
538        if self.usage.incremental_bytes_per_protocol
539            > self.budget.max_incremental_bytes_per_protocol
540        {
541            return Err(CompositionError::BudgetExceeded {
542                artifact_id: bundle.certificate.artifact_id,
543                reason: "incremental protocol overhead".to_string(),
544            });
545        }
546
547        self.bundles.push(bundle);
548        self.reconfiguration_states
549            .push(Some(ReconfigurationRuntimeState::default()));
550        self.usage.protocol_count = self.bundles.len();
551        Ok(())
552    }
553
554    /// Load one session instance from an admitted bundle index.
555    ///
556    /// # Errors
557    ///
558    /// Returns a `CompositionError` when index is invalid or ProtocolMachine loading fails.
559    pub fn load_bundle_session(&mut self, bundle_idx: usize) -> Result<usize, CompositionError> {
560        let bundle = self
561            .bundles
562            .get(bundle_idx)
563            .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?;
564
565        let sid = self.machine.load_choreography(&bundle.code)?;
566        self.refresh_usage();
567        self.assert_budget(bundle_idx)?;
568        Ok(sid)
569    }
570
571    /// Load `sessions` instances from a bundle index.
572    ///
573    /// # Errors
574    ///
575    /// Returns a `CompositionError` when any session load fails.
576    pub fn load_bundle_sessions(
577        &mut self,
578        bundle_idx: usize,
579        sessions: usize,
580    ) -> Result<Vec<usize>, CompositionError> {
581        let mut out = Vec::with_capacity(sessions);
582        for _ in 0..sessions {
583            out.push(self.load_bundle_session(bundle_idx)?);
584        }
585        Ok(out)
586    }
587
588    /// Run the composed runtime.
589    ///
590    /// # Errors
591    ///
592    /// Returns ProtocolMachine-layer errors as composition errors.
593    pub fn run(
594        &mut self,
595        handler: &dyn EffectHandler,
596        max_steps: usize,
597    ) -> Result<(), CompositionError> {
598        self.machine.run(handler, max_steps)?;
599        self.refresh_usage();
600        Ok(())
601    }
602
603    /// Access current memory usage snapshot.
604    #[must_use]
605    pub fn memory_usage(&self) -> &MemoryUsage {
606        &self.usage
607    }
608
609    /// Access admitted bundles.
610    #[must_use]
611    pub fn bundles(&self) -> &[ProtocolBundle] {
612        &self.bundles
613    }
614
615    fn simulate_reconfiguration_transition(
616        artifact_id: &str,
617        policy: &ReconfigurationPolicy,
618        state: &mut ReconfigurationRuntimeState,
619        next_members: BTreeSet<String>,
620    ) -> Result<ReconfigurationEvent, CompositionError> {
621        if next_members.is_empty() {
622            return Err(CompositionError::EmptyMembership {
623                artifact_id: artifact_id.to_string(),
624            });
625        }
626
627        let overlap_preserved =
628            state.active_members.is_empty() || !state.active_members.is_disjoint(&next_members);
629        if policy.overlap_required && !overlap_preserved {
630            return Err(CompositionError::OverlapRequiredViolation {
631                artifact_id: artifact_id.to_string(),
632            });
633        }
634
635        let previous_members: Vec<String> = state.active_members.iter().cloned().collect();
636        let next_members_vec: Vec<String> = next_members.iter().cloned().collect();
637        let added_members: Vec<String> = next_members
638            .difference(&state.active_members)
639            .cloned()
640            .collect();
641        let removed_members: Vec<String> = state
642            .active_members
643            .difference(&next_members)
644            .cloned()
645            .collect();
646
647        state.epoch = state.epoch.saturating_add(1);
648        state.active_members = next_members;
649        let event = ReconfigurationEvent {
650            artifact_id: artifact_id.to_string(),
651            epoch: state.epoch,
652            previous_members,
653            next_members: next_members_vec,
654            added_members,
655            removed_members,
656            overlap_preserved,
657            dynamic_membership: policy.dynamic_membership,
658            overlap_required: policy.overlap_required,
659        };
660        state.history.push(event.clone());
661        Ok(event)
662    }
663
664    fn validate_plan_step(
665        artifact_id: &str,
666        step: &ReconfigurationPlanStep,
667    ) -> Result<ValidatedPlanStepArtifacts, CompositionError> {
668        let next_members = step.next_members.iter().cloned().collect::<BTreeSet<_>>();
669        if next_members.is_empty() {
670            return Err(CompositionError::InvalidReconfigurationPlan {
671                artifact_id: artifact_id.to_string(),
672                reason: format!(
673                    "step `{}` must preserve a non-empty membership set",
674                    step.step_id
675                ),
676            });
677        }
678        let placements =
679            canonicalize_placement_observations(&step.placements).map_err(|reason| {
680                CompositionError::InvalidReconfigurationPlan {
681                    artifact_id: artifact_id.to_string(),
682                    reason: format!("step `{}` has invalid placements: {reason}", step.step_id),
683                }
684            })?;
685        let placement_roles = placements
686            .iter()
687            .map(|placement| placement.role.clone())
688            .collect::<BTreeSet<_>>();
689        if placement_roles != next_members {
690            return Err(CompositionError::InvalidReconfigurationPlan {
691                artifact_id: artifact_id.to_string(),
692                reason: format!(
693                    "step `{}` placements must match next_members exactly",
694                    step.step_id
695                ),
696            });
697        }
698        let transport_boundaries =
699            canonical_transport_boundaries(&placements).map_err(|reason| {
700                CompositionError::InvalidReconfigurationPlan {
701                    artifact_id: artifact_id.to_string(),
702                    reason: format!(
703                        "step `{}` has invalid transport boundaries: {reason}",
704                        step.step_id
705                    ),
706                }
707            })?;
708        Ok((next_members, placements, transport_boundaries))
709    }
710
711    fn validate_runtime_upgrade_request(
712        config: &ProtocolMachineConfig,
713        bundle: &ProtocolBundle,
714        state: &ReconfigurationRuntimeState,
715        request: &RuntimeUpgradeRequest,
716    ) -> Result<(), CompositionError> {
717        Self::validate_runtime_upgrade_nonempty(bundle, request)?;
718        Self::validate_runtime_upgrade_carried_sets(bundle, request)?;
719        Self::validate_runtime_upgrade_execution_constraint(config, bundle, request)?;
720        Self::validate_runtime_upgrade_ownership_continuity(bundle, state, request)?;
721        Self::validate_runtime_upgrade_pending_effects(bundle, request)?;
722        Self::validate_runtime_upgrade_publication_continuity(bundle, request)?;
723        Ok(())
724    }
725
726    fn invalid_runtime_upgrade_plan(
727        bundle: &ProtocolBundle,
728        upgrade_id: &str,
729        reason: impl Into<String>,
730    ) -> CompositionError {
731        CompositionError::InvalidReconfigurationPlan {
732            artifact_id: bundle.certificate.artifact_id.clone(),
733            reason: format!("runtime upgrade `{upgrade_id}` {}", reason.into()),
734        }
735    }
736
737    fn validate_runtime_upgrade_nonempty(
738        bundle: &ProtocolBundle,
739        request: &RuntimeUpgradeRequest,
740    ) -> Result<(), CompositionError> {
741        if request.plan.steps.is_empty() {
742            return Err(Self::invalid_runtime_upgrade_plan(
743                bundle,
744                &request.upgrade_id,
745                "must contain at least one plan step",
746            ));
747        }
748        Ok(())
749    }
750
751    fn validate_runtime_upgrade_carried_sets(
752        bundle: &ProtocolBundle,
753        request: &RuntimeUpgradeRequest,
754    ) -> Result<(), CompositionError> {
755        let duplicate_publication = request
756            .carried_publication_ids
757            .iter()
758            .any(|publication_id| request.invalidated_publication_ids.contains(publication_id));
759        if duplicate_publication {
760            return Err(Self::invalid_runtime_upgrade_plan(
761                bundle,
762                &request.upgrade_id,
763                "may not both carry and invalidate the same canonical publication",
764            ));
765        }
766
767        let duplicate_obligation = request
768            .carried_obligation_ids
769            .iter()
770            .any(|obligation_id| request.invalidated_obligation_ids.contains(obligation_id));
771        if duplicate_obligation {
772            return Err(Self::invalid_runtime_upgrade_plan(
773                bundle,
774                &request.upgrade_id,
775                "may not both carry and invalidate the same obligation",
776            ));
777        }
778        Ok(())
779    }
780
781    fn validate_runtime_upgrade_execution_constraint(
782        config: &ProtocolMachineConfig,
783        bundle: &ProtocolBundle,
784        request: &RuntimeUpgradeRequest,
785    ) -> Result<(), CompositionError> {
786        match request.compatibility.execution_constraint {
787            RuntimeUpgradeExecutionConstraint::PreserveBundleProfile => {
788                let profile = bundle.certificate.theorem_pack.execution_profile();
789                if execution_profile_supported(
790                    &profile,
791                    config,
792                    bundle.certificate.runtime_contracts.as_ref(),
793                ) {
794                    Ok(())
795                } else {
796                    Err(Self::invalid_runtime_upgrade_plan(
797                        bundle,
798                        &request.upgrade_id,
799                        "requires preserving the admitted execution profile",
800                    ))
801                }
802            }
803            RuntimeUpgradeExecutionConstraint::MixedDeterminismAllowed => {
804                let Some(runtime_contracts) = bundle.certificate.runtime_contracts.as_ref() else {
805                    return Err(Self::invalid_runtime_upgrade_plan(
806                        bundle,
807                        &request.upgrade_id,
808                        "requires runtime contracts for mixed determinism",
809                    ));
810                };
811                if runtime_contracts.can_use_mixed_determinism_profiles {
812                    Ok(())
813                } else {
814                    Err(Self::invalid_runtime_upgrade_plan(
815                        bundle,
816                        &request.upgrade_id,
817                        "requires mixed-determinism admission",
818                    ))
819                }
820            }
821        }
822    }
823
824    fn validate_runtime_upgrade_ownership_continuity(
825        bundle: &ProtocolBundle,
826        state: &ReconfigurationRuntimeState,
827        request: &RuntimeUpgradeRequest,
828    ) -> Result<(), CompositionError> {
829        if !request.compatibility.ownership_continuity_required {
830            return Ok(());
831        }
832
833        let first_members = request.plan.steps[0]
834            .next_members
835            .iter()
836            .cloned()
837            .collect::<BTreeSet<_>>();
838        if state.active_members.is_empty() || state.active_members.is_disjoint(&first_members) {
839            return Err(Self::invalid_runtime_upgrade_plan(
840                bundle,
841                &request.upgrade_id,
842                "requires ownership continuity across the first cutover",
843            ));
844        }
845        Ok(())
846    }
847
848    fn validate_runtime_upgrade_pending_effects(
849        bundle: &ProtocolBundle,
850        request: &RuntimeUpgradeRequest,
851    ) -> Result<(), CompositionError> {
852        let missing_pending_effect_policy = matches!(
853            request.compatibility.pending_effect_treatment,
854            PendingEffectTreatment::InvalidateBlocked
855        ) && request.carried_obligation_ids.is_empty()
856            && request.invalidated_obligation_ids.is_empty();
857        if missing_pending_effect_policy {
858            return Err(Self::invalid_runtime_upgrade_plan(
859                bundle,
860                &request.upgrade_id,
861                "must make pending-effect treatment explicit",
862            ));
863        }
864        Ok(())
865    }
866
867    fn validate_runtime_upgrade_publication_continuity(
868        bundle: &ProtocolBundle,
869        request: &RuntimeUpgradeRequest,
870    ) -> Result<(), CompositionError> {
871        let invalidates_canonical_publications =
872            matches!(
873                request.compatibility.canonical_publication_continuity,
874                CanonicalPublicationContinuity::PreserveCanonicalTruth
875            ) && !request.invalidated_publication_ids.is_empty();
876        if invalidates_canonical_publications {
877            return Err(Self::invalid_runtime_upgrade_plan(
878                bundle,
879                &request.upgrade_id,
880                "may not invalidate canonical publications when continuity is required",
881            ));
882        }
883        Ok(())
884    }
885
886    fn snapshot_from_state(state: &ReconfigurationRuntimeState) -> ReconfigurationRuntimeSnapshot {
887        ReconfigurationRuntimeSnapshot {
888            epoch: state.epoch,
889            active_members: state.active_members.iter().cloned().collect(),
890            history: state.history.clone(),
891            plan_executions: state.plan_executions.clone(),
892            runtime_upgrades: state.runtime_upgrades.clone(),
893        }
894    }
895
896    fn restore_snapshot_into_state(
897        artifact_id: &str,
898        state: &mut ReconfigurationRuntimeState,
899        snapshot: ReconfigurationRuntimeSnapshot,
900    ) -> Result<(), CompositionError> {
901        let active_members = snapshot
902            .active_members
903            .iter()
904            .cloned()
905            .collect::<BTreeSet<_>>();
906        if active_members.is_empty() {
907            return Err(CompositionError::InvalidReconfigurationPlan {
908                artifact_id: artifact_id.to_string(),
909                reason: "snapshot must preserve a non-empty active membership set".to_string(),
910            });
911        }
912        if snapshot
913            .history
914            .windows(2)
915            .any(|window| window[0].epoch >= window[1].epoch)
916        {
917            return Err(CompositionError::InvalidReconfigurationPlan {
918                artifact_id: artifact_id.to_string(),
919                reason: "snapshot history must have strictly increasing epochs".to_string(),
920            });
921        }
922        if let Some(last_event) = snapshot.history.last() {
923            let final_from_history = last_event
924                .next_members
925                .iter()
926                .cloned()
927                .collect::<BTreeSet<_>>();
928            if final_from_history != active_members {
929                return Err(CompositionError::InvalidReconfigurationPlan {
930                    artifact_id: artifact_id.to_string(),
931                    reason: "snapshot active membership must match the final history event"
932                        .to_string(),
933                });
934            }
935            if last_event.epoch != snapshot.epoch {
936                return Err(CompositionError::InvalidReconfigurationPlan {
937                    artifact_id: artifact_id.to_string(),
938                    reason: "snapshot epoch must match the final history event epoch".to_string(),
939                });
940            }
941        } else if snapshot.epoch != 0 {
942            return Err(CompositionError::InvalidReconfigurationPlan {
943                artifact_id: artifact_id.to_string(),
944                reason: "empty snapshot history must use epoch 0".to_string(),
945            });
946        }
947        if snapshot
948            .plan_executions
949            .iter()
950            .any(|execution| execution.phases.is_empty())
951        {
952            return Err(CompositionError::InvalidReconfigurationPlan {
953                artifact_id: artifact_id.to_string(),
954                reason: "snapshot plan executions must contain at least one phase".to_string(),
955            });
956        }
957        if snapshot
958            .runtime_upgrades
959            .iter()
960            .any(|execution| execution.artifacts.is_empty())
961        {
962            return Err(CompositionError::InvalidReconfigurationPlan {
963                artifact_id: artifact_id.to_string(),
964                reason: "snapshot runtime upgrades must contain at least one artifact".to_string(),
965            });
966        }
967        state.epoch = snapshot.epoch;
968        state.active_members = active_members;
969        state.history = snapshot.history;
970        state.plan_executions = snapshot.plan_executions;
971        state.runtime_upgrades = snapshot.runtime_upgrades;
972        Ok(())
973    }
974
975    /// Seed the active membership set for one admitted reconfiguration-enabled bundle.
976    ///
977    /// # Errors
978    ///
979    /// Returns a `CompositionError` when the bundle index is invalid, reconfiguration is disabled,
980    /// or the membership set is empty.
981    pub fn seed_bundle_membership<I, S>(
982        &mut self,
983        bundle_idx: usize,
984        members: I,
985    ) -> Result<(), CompositionError>
986    where
987        I: IntoIterator<Item = S>,
988        S: Into<String>,
989    {
990        let bundle = self
991            .bundles
992            .get(bundle_idx)
993            .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?;
994        let state = self
995            .reconfiguration_states
996            .get_mut(bundle_idx)
997            .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?
998            .as_mut()
999            .ok_or_else(|| CompositionError::ReconfigurationDisabled {
1000                artifact_id: bundle.certificate.artifact_id.clone(),
1001            })?;
1002        if bundle.reconfiguration_policy.is_none() {
1003            return Err(CompositionError::ReconfigurationDisabled {
1004                artifact_id: bundle.certificate.artifact_id.clone(),
1005            });
1006        }
1007        let members: BTreeSet<String> = members.into_iter().map(Into::into).collect();
1008        if members.is_empty() {
1009            return Err(CompositionError::EmptyMembership {
1010                artifact_id: bundle.certificate.artifact_id.clone(),
1011            });
1012        }
1013        state.active_members = members;
1014        state.epoch = 0;
1015        state.history.clear();
1016        state.plan_executions.clear();
1017        state.runtime_upgrades.clear();
1018        Ok(())
1019    }
1020
1021    /// Apply one deterministic reconfiguration transition to an admitted bundle.
1022    ///
1023    /// # Errors
1024    ///
1025    /// Returns a `CompositionError` when the bundle index is invalid, the bundle does not admit
1026    /// reconfiguration, the transition violates required overlap, or the new membership set is
1027    /// empty.
1028    pub fn reconfigure_bundle<I, S>(
1029        &mut self,
1030        bundle_idx: usize,
1031        next_members: I,
1032    ) -> Result<ReconfigurationEvent, CompositionError>
1033    where
1034        I: IntoIterator<Item = S>,
1035        S: Into<String>,
1036    {
1037        let bundle = self
1038            .bundles
1039            .get(bundle_idx)
1040            .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?;
1041        let Some(policy) = bundle.reconfiguration_policy.as_ref() else {
1042            return Err(CompositionError::ReconfigurationDisabled {
1043                artifact_id: bundle.certificate.artifact_id.clone(),
1044            });
1045        };
1046        if !policy.dynamic_membership {
1047            return Err(CompositionError::ReconfigurationDisabled {
1048                artifact_id: bundle.certificate.artifact_id.clone(),
1049            });
1050        }
1051
1052        let state = self
1053            .reconfiguration_states
1054            .get_mut(bundle_idx)
1055            .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?
1056            .as_mut()
1057            .ok_or_else(|| CompositionError::ReconfigurationDisabled {
1058                artifact_id: bundle.certificate.artifact_id.clone(),
1059            })?;
1060        let next_members: BTreeSet<String> = next_members.into_iter().map(Into::into).collect();
1061        Self::simulate_reconfiguration_transition(
1062            &bundle.certificate.artifact_id,
1063            policy,
1064            state,
1065            next_members,
1066        )
1067    }
1068
1069    /// The canonical sorted active member set for one bundle, if configured.
1070    #[must_use]
1071    pub fn bundle_members(&self, bundle_idx: usize) -> Option<&BTreeSet<String>> {
1072        self.reconfiguration_states
1073            .get(bundle_idx)
1074            .and_then(Option::as_ref)
1075            .map(|state| &state.active_members)
1076    }
1077
1078    /// The deterministic reconfiguration history for one bundle, if configured.
1079    #[must_use]
1080    pub fn bundle_reconfiguration_history(
1081        &self,
1082        bundle_idx: usize,
1083    ) -> Option<&[ReconfigurationEvent]> {
1084        self.reconfiguration_states
1085            .get(bundle_idx)
1086            .and_then(Option::as_ref)
1087            .map(|state| state.history.as_slice())
1088    }
1089
1090    /// Execute one deterministic multi-step reconfiguration plan atomically.
1091    ///
1092    /// # Errors
1093    ///
1094    /// Returns a `CompositionError` when any plan step is invalid or violates the admitted policy.
1095    pub fn execute_reconfiguration_plan(
1096        &mut self,
1097        bundle_idx: usize,
1098        plan: &ReconfigurationPlan,
1099    ) -> Result<ReconfigurationPlanExecution, CompositionError> {
1100        let bundle = self
1101            .bundles
1102            .get(bundle_idx)
1103            .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?;
1104        let Some(policy) = bundle.reconfiguration_policy.as_ref() else {
1105            return Err(CompositionError::ReconfigurationDisabled {
1106                artifact_id: bundle.certificate.artifact_id.clone(),
1107            });
1108        };
1109        if !policy.dynamic_membership {
1110            return Err(CompositionError::ReconfigurationDisabled {
1111                artifact_id: bundle.certificate.artifact_id.clone(),
1112            });
1113        }
1114        if plan.steps.is_empty() {
1115            return Err(CompositionError::InvalidReconfigurationPlan {
1116                artifact_id: bundle.certificate.artifact_id.clone(),
1117                reason: format!("plan `{}` must contain at least one step", plan.plan_id),
1118            });
1119        }
1120
1121        let state = self
1122            .reconfiguration_states
1123            .get_mut(bundle_idx)
1124            .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?
1125            .as_mut()
1126            .ok_or_else(|| CompositionError::ReconfigurationDisabled {
1127                artifact_id: bundle.certificate.artifact_id.clone(),
1128            })?;
1129        let initial_members = state.active_members.iter().cloned().collect::<Vec<_>>();
1130        let mut simulated = state.clone();
1131        let mut phases = Vec::new();
1132        for step in &plan.steps {
1133            let (next_members, placements, transport_boundaries) =
1134                Self::validate_plan_step(&bundle.certificate.artifact_id, step)?;
1135            let event = Self::simulate_reconfiguration_transition(
1136                &bundle.certificate.artifact_id,
1137                policy,
1138                &mut simulated,
1139                next_members,
1140            )?;
1141            phases.push(ReconfigurationPhaseArtifact {
1142                step_id: step.step_id.clone(),
1143                event,
1144                placements,
1145                transport_boundaries,
1146            });
1147        }
1148
1149        let execution = ReconfigurationPlanExecution {
1150            artifact_id: bundle.certificate.artifact_id.clone(),
1151            plan_id: plan.plan_id.clone(),
1152            initial_members,
1153            final_members: simulated.active_members.iter().cloned().collect(),
1154            phases,
1155        };
1156        simulated.plan_executions.push(execution.clone());
1157        *state = simulated;
1158        Ok(execution)
1159    }
1160
1161    /// The deterministic reconfiguration plan executions for one bundle, if configured.
1162    #[must_use]
1163    pub fn bundle_reconfiguration_plan_executions(
1164        &self,
1165        bundle_idx: usize,
1166    ) -> Option<&[ReconfigurationPlanExecution]> {
1167        self.reconfiguration_states
1168            .get(bundle_idx)
1169            .and_then(Option::as_ref)
1170            .map(|state| state.plan_executions.as_slice())
1171    }
1172
1173    /// Execute one deterministic runtime-upgrade request as a specialized reconfiguration.
1174    ///
1175    /// # Errors
1176    ///
1177    /// Returns a `CompositionError` when compatibility checks or any plan step fails.
1178    pub fn execute_runtime_upgrade(
1179        &mut self,
1180        bundle_idx: usize,
1181        request: &RuntimeUpgradeRequest,
1182    ) -> Result<RuntimeUpgradeExecution, CompositionError> {
1183        let bundle = self
1184            .bundles
1185            .get(bundle_idx)
1186            .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?;
1187        let policy = Self::require_reconfiguration_policy(bundle)?;
1188        let config = self.machine.config().clone();
1189        let state = self
1190            .reconfiguration_states
1191            .get_mut(bundle_idx)
1192            .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?
1193            .as_mut()
1194            .ok_or_else(|| Self::reconfiguration_disabled(bundle))?;
1195        let initial_members = state.active_members.iter().cloned().collect::<Vec<_>>();
1196        let next_members = Self::runtime_upgrade_target_members(request);
1197        let staged = Self::runtime_upgrade_artifact(
1198            request,
1199            TransitionArtifactPhase::Staged,
1200            initial_members.clone(),
1201            next_members.clone(),
1202            None,
1203        );
1204
1205        if let Err(err) = Self::validate_runtime_upgrade_request(&config, bundle, state, request) {
1206            let execution = Self::failed_runtime_upgrade_execution(
1207                bundle,
1208                request,
1209                initial_members.clone(),
1210                vec![
1211                    staged,
1212                    Self::runtime_upgrade_artifact(
1213                        request,
1214                        TransitionArtifactPhase::Failed,
1215                        initial_members.clone(),
1216                        initial_members.clone(),
1217                        Some(err.to_string()),
1218                    ),
1219                ],
1220            );
1221            state.runtime_upgrades.push(execution);
1222            return Err(err);
1223        }
1224
1225        let admitted = Self::runtime_upgrade_artifact(
1226            request,
1227            TransitionArtifactPhase::Admitted,
1228            initial_members.clone(),
1229            next_members,
1230            None,
1231        );
1232
1233        let mut simulated = state.clone();
1234        let execution_result =
1235            Self::simulate_runtime_upgrade_steps(bundle, policy, &mut simulated, request);
1236
1237        match execution_result {
1238            Ok(()) => {
1239                let execution = Self::committed_runtime_upgrade_execution(
1240                    bundle,
1241                    request,
1242                    &simulated,
1243                    initial_members.clone(),
1244                    staged,
1245                    admitted,
1246                );
1247                simulated.runtime_upgrades.push(execution.clone());
1248                *state = simulated;
1249                Ok(execution)
1250            }
1251            Err(err) => {
1252                let execution = Self::failed_runtime_upgrade_execution(
1253                    bundle,
1254                    request,
1255                    initial_members.clone(),
1256                    vec![
1257                        staged,
1258                        admitted,
1259                        Self::runtime_upgrade_artifact(
1260                            request,
1261                            TransitionArtifactPhase::RolledBack,
1262                            initial_members.clone(),
1263                            initial_members.clone(),
1264                            Some(err.to_string()),
1265                        ),
1266                    ],
1267                );
1268                state.runtime_upgrades.push(execution);
1269                Err(err)
1270            }
1271        }
1272    }
1273
1274    fn reconfiguration_disabled(bundle: &ProtocolBundle) -> CompositionError {
1275        CompositionError::ReconfigurationDisabled {
1276            artifact_id: bundle.certificate.artifact_id.clone(),
1277        }
1278    }
1279
1280    fn require_reconfiguration_policy(
1281        bundle: &ProtocolBundle,
1282    ) -> Result<&ReconfigurationPolicy, CompositionError> {
1283        bundle
1284            .reconfiguration_policy
1285            .as_ref()
1286            .ok_or_else(|| Self::reconfiguration_disabled(bundle))
1287    }
1288
1289    fn runtime_upgrade_target_members(request: &RuntimeUpgradeRequest) -> Vec<String> {
1290        request
1291            .plan
1292            .steps
1293            .last()
1294            .map(|step| step.next_members.clone())
1295            .unwrap_or_default()
1296    }
1297
1298    fn simulate_runtime_upgrade_steps(
1299        bundle: &ProtocolBundle,
1300        policy: &ReconfigurationPolicy,
1301        simulated: &mut ReconfigurationRuntimeState,
1302        request: &RuntimeUpgradeRequest,
1303    ) -> Result<(), CompositionError> {
1304        for step in &request.plan.steps {
1305            let (next_members, _placements, _transport_boundaries) =
1306                Self::validate_plan_step(&bundle.certificate.artifact_id, step)?;
1307            Self::simulate_reconfiguration_transition(
1308                &bundle.certificate.artifact_id,
1309                policy,
1310                simulated,
1311                next_members,
1312            )?;
1313        }
1314        Ok(())
1315    }
1316
1317    fn committed_runtime_upgrade_execution(
1318        bundle: &ProtocolBundle,
1319        request: &RuntimeUpgradeRequest,
1320        simulated: &ReconfigurationRuntimeState,
1321        initial_members: Vec<String>,
1322        staged: RuntimeUpgradeArtifact,
1323        admitted: RuntimeUpgradeArtifact,
1324    ) -> RuntimeUpgradeExecution {
1325        let committed = Self::runtime_upgrade_artifact(
1326            request,
1327            TransitionArtifactPhase::CommittedCutover,
1328            initial_members,
1329            simulated.active_members.iter().cloned().collect(),
1330            None,
1331        );
1332        RuntimeUpgradeExecution {
1333            artifact_id: bundle.certificate.artifact_id.clone(),
1334            upgrade_id: request.upgrade_id.clone(),
1335            final_members: simulated.active_members.iter().cloned().collect(),
1336            artifacts: vec![staged, admitted, committed],
1337        }
1338    }
1339
1340    fn runtime_upgrade_artifact(
1341        request: &RuntimeUpgradeRequest,
1342        phase: TransitionArtifactPhase,
1343        previous_members: Vec<String>,
1344        next_members: Vec<String>,
1345        reason: Option<String>,
1346    ) -> RuntimeUpgradeArtifact {
1347        RuntimeUpgradeArtifact {
1348            upgrade_id: request.upgrade_id.clone(),
1349            phase,
1350            previous_members,
1351            next_members,
1352            compatibility: request.compatibility.clone(),
1353            carried_publication_ids: request.carried_publication_ids.clone(),
1354            invalidated_publication_ids: request.invalidated_publication_ids.clone(),
1355            carried_obligation_ids: request.carried_obligation_ids.clone(),
1356            invalidated_obligation_ids: request.invalidated_obligation_ids.clone(),
1357            reason,
1358        }
1359    }
1360
1361    fn failed_runtime_upgrade_execution(
1362        bundle: &ProtocolBundle,
1363        request: &RuntimeUpgradeRequest,
1364        final_members: Vec<String>,
1365        artifacts: Vec<RuntimeUpgradeArtifact>,
1366    ) -> RuntimeUpgradeExecution {
1367        RuntimeUpgradeExecution {
1368            artifact_id: bundle.certificate.artifact_id.clone(),
1369            upgrade_id: request.upgrade_id.clone(),
1370            final_members,
1371            artifacts,
1372        }
1373    }
1374
1375    /// Deterministic runtime-upgrade executions for one bundle, if configured.
1376    #[must_use]
1377    pub fn bundle_runtime_upgrade_executions(
1378        &self,
1379        bundle_idx: usize,
1380    ) -> Option<&[RuntimeUpgradeExecution]> {
1381        self.reconfiguration_states
1382            .get(bundle_idx)
1383            .and_then(Option::as_ref)
1384            .map(|state| state.runtime_upgrades.as_slice())
1385    }
1386
1387    /// Export a deterministic snapshot of the reconfiguration state for one bundle.
1388    #[must_use]
1389    pub fn bundle_reconfiguration_snapshot(
1390        &self,
1391        bundle_idx: usize,
1392    ) -> Option<ReconfigurationRuntimeSnapshot> {
1393        self.reconfiguration_states
1394            .get(bundle_idx)
1395            .and_then(Option::as_ref)
1396            .map(Self::snapshot_from_state)
1397    }
1398
1399    /// Restore a previously exported reconfiguration snapshot for one admitted bundle.
1400    ///
1401    /// # Errors
1402    ///
1403    /// Returns a `CompositionError` when the snapshot is inconsistent with the admitted policy.
1404    pub fn restore_bundle_reconfiguration_snapshot(
1405        &mut self,
1406        bundle_idx: usize,
1407        snapshot: ReconfigurationRuntimeSnapshot,
1408    ) -> Result<(), CompositionError> {
1409        let bundle = self
1410            .bundles
1411            .get(bundle_idx)
1412            .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?;
1413        if bundle.reconfiguration_policy.is_none() {
1414            return Err(CompositionError::ReconfigurationDisabled {
1415                artifact_id: bundle.certificate.artifact_id.clone(),
1416            });
1417        }
1418        let state = self
1419            .reconfiguration_states
1420            .get_mut(bundle_idx)
1421            .ok_or(CompositionError::InvalidBundleIndex { bundle_idx })?
1422            .as_mut()
1423            .ok_or_else(|| CompositionError::ReconfigurationDisabled {
1424                artifact_id: bundle.certificate.artifact_id.clone(),
1425            })?;
1426        Self::restore_snapshot_into_state(&bundle.certificate.artifact_id, state, snapshot)
1427    }
1428
1429    /// Access underlying ProtocolMachine.
1430    #[must_use]
1431    pub fn machine(&self) -> &ProtocolMachine {
1432        &self.machine
1433    }
1434
1435    fn refresh_usage(&mut self) {
1436        self.usage.session_count = self.machine.session_count();
1437        self.usage.coroutine_count = self.machine.coroutine_count();
1438        self.usage.protocol_count = self.bundles.len();
1439    }
1440
1441    fn assert_budget(&self, bundle_idx: usize) -> Result<(), CompositionError> {
1442        let per_coro = self.usage.bytes_per_coroutine;
1443        let per_sess = self.usage.bytes_per_session;
1444        let artifact_id = self
1445            .bundles
1446            .get(bundle_idx)
1447            .map(|b| b.certificate.artifact_id.clone())
1448            .unwrap_or_else(|| format!("bundle/{bundle_idx}"));
1449        if per_coro > self.budget.max_bytes_per_coroutine {
1450            return Err(CompositionError::BudgetExceeded {
1451                artifact_id,
1452                reason: "bytes_per_coroutine".to_string(),
1453            });
1454        }
1455        if per_sess > self.budget.max_bytes_per_session {
1456            return Err(CompositionError::BudgetExceeded {
1457                artifact_id,
1458                reason: "bytes_per_session".to_string(),
1459            });
1460        }
1461        Ok(())
1462    }
1463
1464    fn require_capabilities(&self, bundle: &ProtocolBundle) -> Result<(), CompositionError> {
1465        let cert = &bundle.certificate;
1466        let caps = &cert.theorem_pack;
1467        let runtime_contracts = cert.runtime_contracts.as_ref();
1468
1469        self.require_runtime_contracts(cert, runtime_contracts)?;
1470        self.require_scheduler_capability(cert, caps)?;
1471        self.require_determinism_capability(cert, caps)?;
1472        self.require_execution_profile(cert, caps, runtime_contracts)?;
1473        self.require_output_condition_gating(cert, caps)?;
1474        self.require_transport_contracts(cert, caps, runtime_contracts)?;
1475        self.require_reconfiguration_capabilities(bundle, runtime_contracts)?;
1476        Ok(())
1477    }
1478
1479    fn require_runtime_contracts(
1480        &self,
1481        cert: &CompositionCertificate,
1482        runtime_contracts: Option<&RuntimeContracts>,
1483    ) -> Result<(), CompositionError> {
1484        match enforce_protocol_machine_runtime_gates(self.machine.config(), runtime_contracts) {
1485            RuntimeGateResult::Admitted => Ok(()),
1486            RuntimeGateResult::RejectedMissingContracts => {
1487                Err(CompositionError::MissingRuntimeContracts {
1488                    artifact_id: cert.artifact_id.clone(),
1489                })
1490            }
1491            RuntimeGateResult::RejectedUnsupportedDeterminismProfile => {
1492                Err(CompositionError::MissingCapability {
1493                    artifact_id: cert.artifact_id.clone(),
1494                    capability: format!(
1495                        "determinism_profile::{:?}",
1496                        self.machine.config().determinism_mode
1497                    ),
1498                })
1499            }
1500        }
1501    }
1502
1503    fn require_scheduler_capability(
1504        &self,
1505        cert: &CompositionCertificate,
1506        caps: &TheoremPackCapabilities,
1507    ) -> Result<(), CompositionError> {
1508        let required_sched = match self.machine.config().sched_policy {
1509            SchedPolicy::Cooperative => SchedulerCapability::Cooperative,
1510            SchedPolicy::RoundRobin => SchedulerCapability::RoundRobin,
1511            SchedPolicy::Priority(_) => SchedulerCapability::Priority,
1512            SchedPolicy::ProgressAware => SchedulerCapability::ProgressAware,
1513        };
1514        if caps.schedulers.contains(&required_sched) {
1515            Ok(())
1516        } else {
1517            Err(CompositionError::MissingCapability {
1518                artifact_id: cert.artifact_id.clone(),
1519                capability: format!("scheduler::{required_sched:?}"),
1520            })
1521        }
1522    }
1523
1524    fn require_determinism_capability(
1525        &self,
1526        cert: &CompositionCertificate,
1527        caps: &TheoremPackCapabilities,
1528    ) -> Result<(), CompositionError> {
1529        let required_det = match self.machine.config().determinism_mode {
1530            DeterminismMode::Full => DeterminismCapability::Full,
1531            DeterminismMode::ModuloEffects => DeterminismCapability::ModuloEffects,
1532            DeterminismMode::ModuloCommutativity => DeterminismCapability::ModuloCommutativity,
1533            DeterminismMode::Replay => DeterminismCapability::Replay,
1534        };
1535        if caps.determinism.contains(&required_det) {
1536            Ok(())
1537        } else {
1538            Err(CompositionError::MissingCapability {
1539                artifact_id: cert.artifact_id.clone(),
1540                capability: format!("determinism::{required_det:?}"),
1541            })
1542        }
1543    }
1544
1545    fn require_execution_profile(
1546        &self,
1547        cert: &CompositionCertificate,
1548        caps: &TheoremPackCapabilities,
1549        runtime_contracts: Option<&RuntimeContracts>,
1550    ) -> Result<(), CompositionError> {
1551        if execution_profile_supported(
1552            &caps.execution_profile(),
1553            self.machine.config(),
1554            runtime_contracts,
1555        ) {
1556            Ok(())
1557        } else {
1558            Err(CompositionError::MissingCapability {
1559                artifact_id: cert.artifact_id.clone(),
1560                capability: "execution_profile".to_string(),
1561            })
1562        }
1563    }
1564
1565    fn require_output_condition_gating(
1566        &self,
1567        cert: &CompositionCertificate,
1568        caps: &TheoremPackCapabilities,
1569    ) -> Result<(), CompositionError> {
1570        let output_conditions_disabled = matches!(
1571            self.machine.config().output_condition_policy,
1572            OutputConditionPolicy::Disabled
1573        );
1574        if output_conditions_disabled || caps.output_condition_gating {
1575            Ok(())
1576        } else {
1577            Err(CompositionError::MissingCapability {
1578                artifact_id: cert.artifact_id.clone(),
1579                capability: "output_condition_gating".to_string(),
1580            })
1581        }
1582    }
1583
1584    fn require_transport_contracts(
1585        &self,
1586        cert: &CompositionCertificate,
1587        caps: &TheoremPackCapabilities,
1588        runtime_contracts: Option<&RuntimeContracts>,
1589    ) -> Result<(), CompositionError> {
1590        let profile = caps.execution_profile();
1591        let requirements = profile.transport_requirements();
1592        if requirements.is_empty() {
1593            return Ok(());
1594        }
1595        let Some(runtime_contracts) = runtime_contracts else {
1596            return Err(CompositionError::MissingTransportContracts {
1597                artifact_id: cert.artifact_id.clone(),
1598            });
1599        };
1600        validate_transport_contracts_for_execution_profile(
1601            &profile,
1602            &runtime_contracts.transport_contracts,
1603        )
1604        .map_err(|error| match error {
1605            TransportContractGateError::MissingTransportContracts => {
1606                CompositionError::MissingTransportContracts {
1607                    artifact_id: cert.artifact_id.clone(),
1608                }
1609            }
1610            TransportContractGateError::UnsatisfiedTransportRequirement {
1611                transport_name,
1612                requirement,
1613            } => CompositionError::UnsatisfiedTransportContract {
1614                artifact_id: cert.artifact_id.clone(),
1615                transport_name,
1616                requirement,
1617            },
1618        })
1619    }
1620
1621    fn require_reconfiguration_capabilities(
1622        &self,
1623        bundle: &ProtocolBundle,
1624        runtime_contracts: Option<&RuntimeContracts>,
1625    ) -> Result<(), CompositionError> {
1626        let Some(policy) = &bundle.reconfiguration_policy else {
1627            return Ok(());
1628        };
1629        let cert = &bundle.certificate;
1630        let Some(contracts) = runtime_contracts else {
1631            return Err(CompositionError::MissingRuntimeContracts {
1632                artifact_id: cert.artifact_id.clone(),
1633            });
1634        };
1635        let capabilities = &contracts.capabilities;
1636        if !capabilities.contains(&crate::runtime_contracts::RuntimeCapability::PlacementRefinement)
1637        {
1638            return Err(CompositionError::MissingReconfigurationCapability {
1639                artifact_id: cert.artifact_id.clone(),
1640                capability: "reconfiguration::placement_refinement".to_string(),
1641            });
1642        }
1643        if policy.dynamic_membership
1644            && !capabilities
1645                .contains(&crate::runtime_contracts::RuntimeCapability::AutoscaleRepartition)
1646        {
1647            return Err(CompositionError::MissingReconfigurationCapability {
1648                artifact_id: cert.artifact_id.clone(),
1649                capability: "reconfiguration::dynamic_membership".to_string(),
1650            });
1651        }
1652        if policy.overlap_required
1653            && !capabilities.contains(&crate::runtime_contracts::RuntimeCapability::LiveMigration)
1654        {
1655            return Err(CompositionError::MissingReconfigurationCapability {
1656                artifact_id: cert.artifact_id.clone(),
1657                capability: "reconfiguration::overlap_required".to_string(),
1658            });
1659        }
1660        Ok(())
1661    }
1662}
1663
1664#[cfg(test)]
1665mod tests {
1666    use std::collections::BTreeMap;
1667
1668    use telltale_types::{GlobalType, Label, LocalTypeR};
1669
1670    use super::*;
1671    use crate::coroutine::Value;
1672    use crate::effect::{EffectFailure, EffectResult};
1673
1674    #[derive(Debug, Clone, Copy)]
1675    struct Noop;
1676
1677    impl EffectHandler for Noop {
1678        fn handle_send(
1679            &self,
1680            _role: &str,
1681            _partner: &str,
1682            label: &str,
1683            _state: &[Value],
1684        ) -> EffectResult<Value> {
1685            EffectResult::success(Value::Str(label.to_string()))
1686        }
1687
1688        fn handle_recv(
1689            &self,
1690            _role: &str,
1691            _partner: &str,
1692            _label: &str,
1693            _state: &mut Vec<Value>,
1694            _payload: &Value,
1695        ) -> EffectResult<()> {
1696            EffectResult::success(())
1697        }
1698
1699        fn handle_choose(
1700            &self,
1701            _role: &str,
1702            _partner: &str,
1703            labels: &[String],
1704            _state: &[Value],
1705        ) -> EffectResult<String> {
1706            match labels.first().cloned() {
1707                Some(label) => EffectResult::success(label),
1708                None => EffectResult::failure(EffectFailure::invalid_input("no labels available")),
1709            }
1710        }
1711
1712        fn step(&self, _role: &str, _state: &mut Vec<Value>) -> EffectResult<()> {
1713            EffectResult::success(())
1714        }
1715    }
1716
1717    fn image(label: &str) -> Arc<CodeImage> {
1718        let mut local_types = BTreeMap::new();
1719        local_types.insert(
1720            "A".to_string(),
1721            LocalTypeR::send("B", Label::new(label), LocalTypeR::End),
1722        );
1723        local_types.insert(
1724            "B".to_string(),
1725            LocalTypeR::recv("A", Label::new(label), LocalTypeR::End),
1726        );
1727        let global = GlobalType::send("A", "B", Label::new(label), GlobalType::End);
1728        Arc::new(CodeImage::from_local_types(&local_types, &global))
1729    }
1730
1731    #[test]
1732    fn proof_carrying_admission_rejects_missing_link_ok_full() {
1733        let mut runtime =
1734            ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
1735        let bad = ProtocolBundle::new(
1736            image("m"),
1737            CompositionCertificate {
1738                artifact_id: "cert/bad".to_string(),
1739                link_ok_full: false,
1740                theorem_pack: TheoremPackCapabilities::full(),
1741                runtime_contracts: Some(RuntimeContracts::full()),
1742            },
1743        );
1744        assert!(matches!(
1745            runtime.admit_bundle(bad),
1746            Err(CompositionError::MissingCompatibilityProof { .. })
1747        ));
1748    }
1749
1750    #[test]
1751    fn immutable_code_artifacts_are_arc_shared() {
1752        let shared = image("m");
1753        let mut runtime =
1754            ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
1755        let b1 = ProtocolBundle::new(
1756            Arc::clone(&shared),
1757            CompositionCertificate {
1758                artifact_id: "cert/1".to_string(),
1759                link_ok_full: true,
1760                theorem_pack: TheoremPackCapabilities::full(),
1761                runtime_contracts: Some(RuntimeContracts::full()),
1762            },
1763        );
1764        let b2 = ProtocolBundle::new(
1765            Arc::clone(&shared),
1766            CompositionCertificate {
1767                artifact_id: "cert/2".to_string(),
1768                link_ok_full: true,
1769                theorem_pack: TheoremPackCapabilities::full(),
1770                runtime_contracts: Some(RuntimeContracts::full()),
1771            },
1772        );
1773        runtime.admit_bundle(b1).expect("admit b1");
1774        runtime.admit_bundle(b2).expect("admit b2");
1775        assert!(
1776            Arc::strong_count(&shared) >= 3,
1777            "bundle admission should keep shared immutable artifacts in Arc"
1778        );
1779    }
1780
1781    #[test]
1782    fn composed_execution_runs_and_usage_grows_monotonically() {
1783        let mut runtime =
1784            ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
1785        let b = ProtocolBundle::new(
1786            image("m"),
1787            CompositionCertificate {
1788                artifact_id: "cert/ok".to_string(),
1789                link_ok_full: true,
1790                theorem_pack: TheoremPackCapabilities::full(),
1791                runtime_contracts: Some(RuntimeContracts::full()),
1792            },
1793        );
1794        runtime.admit_bundle(b).expect("admit");
1795        runtime
1796            .load_bundle_sessions(0, 8)
1797            .expect("load composed sessions");
1798        let usage_before = runtime.memory_usage().clone();
1799        assert!(usage_before.session_count >= 8);
1800        assert!(usage_before.coroutine_count >= 16);
1801        runtime.run(&Noop, 512).expect("composed run");
1802        let usage_after = runtime.memory_usage().clone();
1803        assert!(usage_after.session_count >= usage_before.session_count);
1804        assert!(usage_after.coroutine_count >= usage_before.coroutine_count);
1805    }
1806
1807    #[test]
1808    fn admission_rejects_missing_scheduler_profile_capability() {
1809        let cfg = ProtocolMachineConfig {
1810            sched_policy: SchedPolicy::ProgressAware,
1811            ..ProtocolMachineConfig::default()
1812        };
1813        let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
1814        let bundle = ProtocolBundle::new(
1815            image("m"),
1816            CompositionCertificate {
1817                artifact_id: "cert/no-sched".to_string(),
1818                link_ok_full: true,
1819                theorem_pack: TheoremPackCapabilities {
1820                    determinism: vec![
1821                        DeterminismCapability::Full,
1822                        DeterminismCapability::ModuloEffects,
1823                    ],
1824                    schedulers: vec![SchedulerCapability::Cooperative],
1825                    output_condition_gating: true,
1826                },
1827                runtime_contracts: Some(RuntimeContracts::full()),
1828            },
1829        );
1830
1831        let err = runtime
1832            .admit_bundle(bundle)
1833            .expect_err("should reject bundle");
1834        assert!(matches!(
1835            err,
1836            CompositionError::MissingCapability { capability, .. }
1837            if capability == "scheduler::ProgressAware"
1838        ));
1839    }
1840
1841    #[test]
1842    fn admission_rejects_missing_determinism_capability() {
1843        let cfg = ProtocolMachineConfig {
1844            determinism_mode: DeterminismMode::ModuloCommutativity,
1845            ..ProtocolMachineConfig::default()
1846        };
1847        let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
1848        let bundle = ProtocolBundle::new(
1849            image("m"),
1850            CompositionCertificate {
1851                artifact_id: "cert/no-det".to_string(),
1852                link_ok_full: true,
1853                theorem_pack: TheoremPackCapabilities {
1854                    determinism: vec![
1855                        DeterminismCapability::Full,
1856                        DeterminismCapability::ModuloEffects,
1857                    ],
1858                    schedulers: vec![SchedulerCapability::Cooperative],
1859                    output_condition_gating: true,
1860                },
1861                runtime_contracts: Some(RuntimeContracts::full()),
1862            },
1863        );
1864
1865        let err = runtime
1866            .admit_bundle(bundle)
1867            .expect_err("should reject bundle");
1868        assert!(matches!(
1869            err,
1870            CompositionError::MissingCapability { capability, .. }
1871            if capability == "determinism::ModuloCommutativity"
1872        ));
1873    }
1874
1875    #[test]
1876    fn admission_rejects_missing_output_condition_capability() {
1877        let cfg = ProtocolMachineConfig {
1878            output_condition_policy: OutputConditionPolicy::AllowAll,
1879            ..ProtocolMachineConfig::default()
1880        };
1881        let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
1882        let bundle = ProtocolBundle::new(
1883            image("m"),
1884            CompositionCertificate {
1885                artifact_id: "cert/no-output-gate".to_string(),
1886                link_ok_full: true,
1887                theorem_pack: TheoremPackCapabilities {
1888                    determinism: vec![DeterminismCapability::Full],
1889                    schedulers: vec![SchedulerCapability::Cooperative],
1890                    output_condition_gating: false,
1891                },
1892                runtime_contracts: Some(RuntimeContracts::full()),
1893            },
1894        );
1895
1896        let err = runtime
1897            .admit_bundle(bundle)
1898            .expect_err("should reject bundle");
1899        assert!(matches!(
1900            err,
1901            CompositionError::MissingCapability { capability, .. }
1902            if capability == "execution_profile"
1903        ));
1904    }
1905
1906    #[test]
1907    fn admission_accepts_when_required_capabilities_present() {
1908        let cfg = ProtocolMachineConfig {
1909            sched_policy: SchedPolicy::RoundRobin,
1910            determinism_mode: DeterminismMode::ModuloEffects,
1911            output_condition_policy: OutputConditionPolicy::PredicateAllowList(vec![
1912                "protocol_machine.observable_output".to_string(),
1913            ]),
1914            ..ProtocolMachineConfig::default()
1915        };
1916        let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
1917        let bundle = ProtocolBundle::new(
1918            image("m"),
1919            CompositionCertificate {
1920                artifact_id: "cert/full".to_string(),
1921                link_ok_full: true,
1922                theorem_pack: TheoremPackCapabilities::full(),
1923                runtime_contracts: Some(RuntimeContracts::full()),
1924            },
1925        );
1926        runtime
1927            .admit_bundle(bundle)
1928            .expect("bundle should be admitted");
1929    }
1930
1931    #[test]
1932    fn admission_accepts_minimal_required_capabilities_without_full_parity() {
1933        let cfg = ProtocolMachineConfig::default();
1934        let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
1935        let bundle = ProtocolBundle::new(
1936            image("m"),
1937            CompositionCertificate {
1938                artifact_id: "cert/minimal-required".to_string(),
1939                link_ok_full: true,
1940                theorem_pack: TheoremPackCapabilities {
1941                    determinism: vec![
1942                        DeterminismCapability::Full,
1943                        DeterminismCapability::ModuloEffects,
1944                    ],
1945                    schedulers: vec![
1946                        SchedulerCapability::Cooperative,
1947                        SchedulerCapability::ProgressAware,
1948                    ],
1949                    output_condition_gating: true,
1950                },
1951                runtime_contracts: Some(RuntimeContracts::full()),
1952            },
1953        );
1954        runtime
1955            .admit_bundle(bundle)
1956            .expect("minimal required capabilities should be sufficient");
1957    }
1958
1959    #[test]
1960    fn admission_rejects_advanced_mode_without_runtime_contracts() {
1961        let cfg = ProtocolMachineConfig {
1962            sched_policy: SchedPolicy::RoundRobin,
1963            ..ProtocolMachineConfig::default()
1964        };
1965        let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
1966        let bundle = ProtocolBundle::new(
1967            image("m"),
1968            CompositionCertificate {
1969                artifact_id: "cert/no-runtime-contracts".to_string(),
1970                link_ok_full: true,
1971                theorem_pack: TheoremPackCapabilities::full(),
1972                runtime_contracts: None,
1973            },
1974        );
1975        let err = runtime
1976            .admit_bundle(bundle)
1977            .expect_err("advanced mode should reject missing runtime contracts");
1978        assert!(matches!(
1979            err,
1980            CompositionError::MissingRuntimeContracts { .. }
1981        ));
1982    }
1983
1984    #[test]
1985    fn admission_rejects_replay_profile_without_mixed_profile_gate() {
1986        let cfg = ProtocolMachineConfig {
1987            determinism_mode: DeterminismMode::Replay,
1988            ..ProtocolMachineConfig::default()
1989        };
1990        let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
1991        let mut contracts = RuntimeContracts::full();
1992        contracts.can_use_mixed_determinism_profiles = false;
1993        let bundle = ProtocolBundle::new(
1994            image("m"),
1995            CompositionCertificate {
1996                artifact_id: "cert/no-mixed-profile-gate".to_string(),
1997                link_ok_full: true,
1998                theorem_pack: TheoremPackCapabilities::full(),
1999                runtime_contracts: Some(contracts),
2000            },
2001        );
2002        let err = runtime
2003            .admit_bundle(bundle)
2004            .expect_err("replay profile should require mixed-profile gate");
2005        assert!(matches!(
2006            err,
2007            CompositionError::MissingCapability { capability, .. }
2008            if capability == "determinism_profile::Replay"
2009        ));
2010    }
2011
2012    #[test]
2013    fn admission_rejects_theorem_pack_without_transport_contracts() {
2014        let mut runtime =
2015            ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2016        let bundle = ProtocolBundle::new(
2017            image("m"),
2018            CompositionCertificate {
2019                artifact_id: "cert/no-transport-contracts".to_string(),
2020                link_ok_full: true,
2021                theorem_pack: TheoremPackCapabilities::full(),
2022                runtime_contracts: None,
2023            },
2024        );
2025        let err = runtime
2026            .admit_bundle(bundle)
2027            .expect_err("theorem claims should require transport contracts");
2028        assert!(matches!(
2029            err,
2030            CompositionError::MissingTransportContracts { .. }
2031        ));
2032    }
2033
2034    #[test]
2035    fn admission_rejects_unauthenticated_transport_for_authenticated_theorem_claims() {
2036        let mut runtime =
2037            ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2038        let unauthenticated = crate::runtime_contracts::RuntimeTransportContract::new(
2039            "UnauthenticatedTransport",
2040            "Network",
2041        )
2042        .with_role_addressed_routing(true)
2043        .with_per_peer_fifo_delivery(true)
2044        .with_fail_closed_unknown_role(true)
2045        .with_no_message_synthesis(true)
2046        .with_explicit_readiness_errors(true);
2047        let contracts = RuntimeContracts::full().with_transport_contracts([unauthenticated]);
2048        let bundle = ProtocolBundle::new(
2049            image("m"),
2050            CompositionCertificate {
2051                artifact_id: "cert/unauthenticated-transport".to_string(),
2052                link_ok_full: true,
2053                theorem_pack: TheoremPackCapabilities::full(),
2054                runtime_contracts: Some(contracts),
2055            },
2056        );
2057        let err = runtime.admit_bundle(bundle).expect_err(
2058            "unauthenticated transport should not satisfy authenticated theorem claims",
2059        );
2060        assert!(matches!(
2061            err,
2062            CompositionError::UnsatisfiedTransportContract {
2063                ref transport_name,
2064                requirement,
2065                ..
2066            } if transport_name == "UnauthenticatedTransport" && requirement == "authenticated_peers"
2067        ));
2068    }
2069
2070    #[test]
2071    fn admission_accepts_replay_profile_with_contracts_and_capability() {
2072        let cfg = ProtocolMachineConfig {
2073            determinism_mode: DeterminismMode::Replay,
2074            ..ProtocolMachineConfig::default()
2075        };
2076        let mut runtime = ComposedRuntime::new(cfg, MemoryBudget::default());
2077        let bundle = ProtocolBundle::new(
2078            image("m"),
2079            CompositionCertificate {
2080                artifact_id: "cert/replay-ok".to_string(),
2081                link_ok_full: true,
2082                theorem_pack: TheoremPackCapabilities::full(),
2083                runtime_contracts: Some(RuntimeContracts::full()),
2084            },
2085        );
2086        runtime
2087            .admit_bundle(bundle)
2088            .expect("replay profile should admit with matching contracts");
2089    }
2090
2091    #[test]
2092    fn reconfiguration_requires_runtime_capabilities_at_admission() {
2093        let mut runtime =
2094            ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2095        let mut contracts = RuntimeContracts::full();
2096        contracts
2097            .capabilities
2098            .remove(&crate::runtime_contracts::RuntimeCapability::AutoscaleRepartition);
2099        let bundle = ProtocolBundle::new(
2100            image("m"),
2101            CompositionCertificate {
2102                artifact_id: "cert/reconfig-missing-cap".to_string(),
2103                link_ok_full: true,
2104                theorem_pack: TheoremPackCapabilities::full(),
2105                runtime_contracts: Some(contracts),
2106            },
2107        )
2108        .with_reconfiguration_policy(ReconfigurationPolicy {
2109            dynamic_membership: true,
2110            overlap_required: true,
2111        });
2112        let err = runtime
2113            .admit_bundle(bundle)
2114            .expect_err("dynamic membership should require explicit runtime capability");
2115        assert!(matches!(
2116            err,
2117            CompositionError::MissingReconfigurationCapability { capability, .. }
2118            if capability == "reconfiguration::dynamic_membership"
2119        ));
2120    }
2121
2122    #[test]
2123    fn reconfiguration_emits_deterministic_membership_events() {
2124        let mut runtime =
2125            ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2126        let bundle = ProtocolBundle::new(
2127            image("m"),
2128            CompositionCertificate {
2129                artifact_id: "cert/reconfig-ok".to_string(),
2130                link_ok_full: true,
2131                theorem_pack: TheoremPackCapabilities::full(),
2132                runtime_contracts: Some(RuntimeContracts::full()),
2133            },
2134        )
2135        .with_reconfiguration_policy(ReconfigurationPolicy {
2136            dynamic_membership: true,
2137            overlap_required: true,
2138        });
2139        runtime.admit_bundle(bundle).expect("admit bundle");
2140        runtime
2141            .seed_bundle_membership(0, ["Alice", "Bob"])
2142            .expect("seed members");
2143
2144        let event = runtime
2145            .reconfigure_bundle(0, ["Bob", "Carol"])
2146            .expect("reconfigure");
2147        assert_eq!(event.epoch, 1);
2148        assert_eq!(event.previous_members, vec!["Alice", "Bob"]);
2149        assert_eq!(event.next_members, vec!["Bob", "Carol"]);
2150        assert_eq!(event.added_members, vec!["Carol"]);
2151        assert_eq!(event.removed_members, vec!["Alice"]);
2152        assert!(event.overlap_preserved);
2153        assert_eq!(
2154            runtime
2155                .bundle_members(0)
2156                .expect("members after reconfiguration"),
2157            &BTreeSet::from(["Bob".to_string(), "Carol".to_string()])
2158        );
2159        assert_eq!(
2160            runtime
2161                .bundle_reconfiguration_history(0)
2162                .expect("history")
2163                .last()
2164                .expect("event in history"),
2165            &event
2166        );
2167    }
2168
2169    #[test]
2170    fn reconfiguration_rejects_disjoint_membership_when_overlap_is_required() {
2171        let mut runtime =
2172            ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2173        let bundle = ProtocolBundle::new(
2174            image("m"),
2175            CompositionCertificate {
2176                artifact_id: "cert/reconfig-overlap".to_string(),
2177                link_ok_full: true,
2178                theorem_pack: TheoremPackCapabilities::full(),
2179                runtime_contracts: Some(RuntimeContracts::full()),
2180            },
2181        )
2182        .with_reconfiguration_policy(ReconfigurationPolicy {
2183            dynamic_membership: true,
2184            overlap_required: true,
2185        });
2186        runtime.admit_bundle(bundle).expect("admit bundle");
2187        runtime
2188            .seed_bundle_membership(0, ["Alice", "Bob"])
2189            .expect("seed members");
2190        let err = runtime
2191            .reconfigure_bundle(0, ["Carol", "Dave"])
2192            .expect_err("overlap-required policy should reject disjoint membership");
2193        assert!(matches!(
2194            err,
2195            CompositionError::OverlapRequiredViolation { .. }
2196        ));
2197    }
2198
2199    #[test]
2200    fn reconfiguration_history_is_stable_across_recreated_runtimes() {
2201        fn drive_history() -> Vec<ReconfigurationEvent> {
2202            let mut runtime =
2203                ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2204            let bundle = ProtocolBundle::new(
2205                image("m"),
2206                CompositionCertificate {
2207                    artifact_id: "cert/reconfig-history".to_string(),
2208                    link_ok_full: true,
2209                    theorem_pack: TheoremPackCapabilities::full(),
2210                    runtime_contracts: Some(RuntimeContracts::full()),
2211                },
2212            )
2213            .with_reconfiguration_policy(ReconfigurationPolicy {
2214                dynamic_membership: true,
2215                overlap_required: true,
2216            });
2217            runtime.admit_bundle(bundle).expect("admit bundle");
2218            runtime
2219                .seed_bundle_membership(0, ["Alice", "Bob"])
2220                .expect("seed members");
2221            runtime
2222                .reconfigure_bundle(0, ["Bob", "Carol"])
2223                .expect("first reconfiguration");
2224            runtime
2225                .reconfigure_bundle(0, ["Carol", "Dave"])
2226                .expect("second reconfiguration");
2227            runtime
2228                .bundle_reconfiguration_history(0)
2229                .expect("reconfiguration history")
2230                .to_vec()
2231        }
2232
2233        assert_eq!(drive_history(), drive_history());
2234    }
2235
2236    #[test]
2237    fn reconfiguration_plan_executes_atomically_with_phase_artifacts() {
2238        let mut runtime =
2239            ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2240        let bundle = ProtocolBundle::new(
2241            image("m"),
2242            CompositionCertificate {
2243                artifact_id: "cert/reconfig-plan".to_string(),
2244                link_ok_full: true,
2245                theorem_pack: TheoremPackCapabilities::full(),
2246                runtime_contracts: Some(RuntimeContracts::full()),
2247            },
2248        )
2249        .with_reconfiguration_policy(ReconfigurationPolicy {
2250            dynamic_membership: true,
2251            overlap_required: true,
2252        });
2253        runtime.admit_bundle(bundle).expect("admit bundle");
2254        runtime
2255            .seed_bundle_membership(0, ["Alice", "Bob"])
2256            .expect("seed members");
2257
2258        let plan = ReconfigurationPlan {
2259            plan_id: "plan/blue-green".to_string(),
2260            steps: vec![
2261                ReconfigurationPlanStep {
2262                    step_id: "prepare".to_string(),
2263                    next_members: vec!["Bob".to_string(), "Carol".to_string(), "Dora".to_string()],
2264                    placements: vec![
2265                        PlacementObservation::local("Bob").with_region("eu_central_1"),
2266                        PlacementObservation::colocated("Carol", "Bob").with_region("eu_central_1"),
2267                        PlacementObservation::remote("Dora", "127.0.0.1:19821")
2268                            .with_region("us_east_1"),
2269                    ],
2270                },
2271                ReconfigurationPlanStep {
2272                    step_id: "cutover".to_string(),
2273                    next_members: vec!["Carol".to_string(), "Dora".to_string(), "Eve".to_string()],
2274                    placements: vec![
2275                        PlacementObservation::remote("Carol", "127.0.0.1:19822")
2276                            .with_region("us_east_1"),
2277                        PlacementObservation::remote("Dora", "127.0.0.1:19821")
2278                            .with_region("us_east_1"),
2279                        PlacementObservation::colocated("Eve", "Carol").with_region("us_east_1"),
2280                    ],
2281                },
2282            ],
2283        };
2284
2285        let execution = runtime
2286            .execute_reconfiguration_plan(0, &plan)
2287            .expect("execute reconfiguration plan");
2288        assert_eq!(execution.plan_id, "plan/blue-green");
2289        assert_eq!(execution.initial_members, vec!["Alice", "Bob"]);
2290        assert_eq!(
2291            execution.final_members,
2292            vec!["Carol".to_string(), "Dora".to_string(), "Eve".to_string()]
2293        );
2294        assert_eq!(execution.phases.len(), 2);
2295        assert_eq!(execution.phases[0].event.epoch, 1);
2296        assert_eq!(execution.phases[1].event.epoch, 2);
2297        assert!(
2298            execution.phases[0]
2299                .transport_boundaries
2300                .iter()
2301                .any(|boundary| matches!(
2302                    boundary.boundary,
2303                    telltale_types::TransportBoundaryKind::SharedMemory
2304                )),
2305            "first phase should expose a colocated shared-memory boundary"
2306        );
2307        assert!(
2308            execution.phases[0]
2309                .transport_boundaries
2310                .iter()
2311                .any(|boundary| matches!(
2312                    boundary.boundary,
2313                    telltale_types::TransportBoundaryKind::Network
2314                )),
2315            "first phase should expose a remote network boundary"
2316        );
2317        assert_eq!(
2318            runtime
2319                .bundle_reconfiguration_history(0)
2320                .expect("history after plan")
2321                .len(),
2322            2
2323        );
2324        assert_eq!(
2325            runtime
2326                .bundle_reconfiguration_plan_executions(0)
2327                .expect("plan executions")
2328                .last(),
2329            Some(&execution)
2330        );
2331    }
2332
2333    #[test]
2334    fn invalid_reconfiguration_plan_rejects_without_partial_commit() {
2335        let mut runtime =
2336            ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2337        let bundle = ProtocolBundle::new(
2338            image("m"),
2339            CompositionCertificate {
2340                artifact_id: "cert/reconfig-plan-invalid".to_string(),
2341                link_ok_full: true,
2342                theorem_pack: TheoremPackCapabilities::full(),
2343                runtime_contracts: Some(RuntimeContracts::full()),
2344            },
2345        )
2346        .with_reconfiguration_policy(ReconfigurationPolicy {
2347            dynamic_membership: true,
2348            overlap_required: true,
2349        });
2350        runtime.admit_bundle(bundle).expect("admit bundle");
2351        runtime
2352            .seed_bundle_membership(0, ["Alice", "Bob"])
2353            .expect("seed members");
2354
2355        let invalid_plan = ReconfigurationPlan {
2356            plan_id: "plan/invalid".to_string(),
2357            steps: vec![
2358                ReconfigurationPlanStep {
2359                    step_id: "prepare".to_string(),
2360                    next_members: vec!["Bob".to_string(), "Carol".to_string()],
2361                    placements: vec![
2362                        PlacementObservation::local("Bob"),
2363                        PlacementObservation::local("Carol"),
2364                    ],
2365                },
2366                ReconfigurationPlanStep {
2367                    step_id: "split-brain".to_string(),
2368                    next_members: vec!["Dora".to_string(), "Eve".to_string()],
2369                    placements: vec![
2370                        PlacementObservation::local("Dora"),
2371                        PlacementObservation::local("Eve"),
2372                    ],
2373                },
2374            ],
2375        };
2376
2377        let err = runtime
2378            .execute_reconfiguration_plan(0, &invalid_plan)
2379            .expect_err("invalid overlap-breaking plan must reject atomically");
2380        assert!(matches!(
2381            err,
2382            CompositionError::OverlapRequiredViolation { .. }
2383        ));
2384        assert_eq!(
2385            runtime.bundle_members(0).expect("members after rejection"),
2386            &BTreeSet::from(["Alice".to_string(), "Bob".to_string()])
2387        );
2388        assert!(
2389            runtime
2390                .bundle_reconfiguration_history(0)
2391                .expect("history after rejection")
2392                .is_empty(),
2393            "failed plan must not partially commit history"
2394        );
2395        assert!(
2396            runtime
2397                .bundle_reconfiguration_plan_executions(0)
2398                .expect("plan executions after rejection")
2399                .is_empty(),
2400            "failed plan must not record a plan execution"
2401        );
2402    }
2403
2404    #[test]
2405    fn reconfiguration_snapshot_restore_preserves_plan_execution_history() {
2406        fn configured_runtime(artifact_id: &str) -> ComposedRuntime {
2407            let mut runtime =
2408                ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2409            let bundle = ProtocolBundle::new(
2410                image("m"),
2411                CompositionCertificate {
2412                    artifact_id: artifact_id.to_string(),
2413                    link_ok_full: true,
2414                    theorem_pack: TheoremPackCapabilities::full(),
2415                    runtime_contracts: Some(RuntimeContracts::full()),
2416                },
2417            )
2418            .with_reconfiguration_policy(ReconfigurationPolicy {
2419                dynamic_membership: true,
2420                overlap_required: true,
2421            });
2422            runtime.admit_bundle(bundle).expect("admit bundle");
2423            runtime
2424        }
2425
2426        let mut baseline = configured_runtime("cert/reconfig-snapshot");
2427        baseline
2428            .seed_bundle_membership(0, ["Alice", "Bob"])
2429            .expect("seed baseline members");
2430        let first_plan = ReconfigurationPlan {
2431            plan_id: "plan/prefix".to_string(),
2432            steps: vec![ReconfigurationPlanStep {
2433                step_id: "prepare".to_string(),
2434                next_members: vec!["Bob".to_string(), "Carol".to_string()],
2435                placements: vec![
2436                    PlacementObservation::local("Bob").with_region("eu_central_1"),
2437                    PlacementObservation::remote("Carol", "127.0.0.1:19831")
2438                        .with_region("us_east_1"),
2439                ],
2440            }],
2441        };
2442        baseline
2443            .execute_reconfiguration_plan(0, &first_plan)
2444            .expect("execute first plan");
2445        let snapshot = serde_json::from_str::<ReconfigurationRuntimeSnapshot>(
2446            &serde_json::to_string(
2447                &baseline
2448                    .bundle_reconfiguration_snapshot(0)
2449                    .expect("snapshot after first plan"),
2450            )
2451            .expect("serialize snapshot"),
2452        )
2453        .expect("deserialize snapshot");
2454
2455        let second_plan = ReconfigurationPlan {
2456            plan_id: "plan/suffix".to_string(),
2457            steps: vec![ReconfigurationPlanStep {
2458                step_id: "cutover".to_string(),
2459                next_members: vec!["Carol".to_string(), "Dora".to_string()],
2460                placements: vec![
2461                    PlacementObservation::remote("Carol", "127.0.0.1:19831")
2462                        .with_region("us_east_1"),
2463                    PlacementObservation::colocated("Dora", "Carol").with_region("us_east_1"),
2464                ],
2465            }],
2466        };
2467        let baseline_suffix = baseline
2468            .execute_reconfiguration_plan(0, &second_plan)
2469            .expect("execute second plan");
2470        let baseline_history = baseline
2471            .bundle_reconfiguration_history(0)
2472            .expect("baseline history")
2473            .to_vec();
2474        let baseline_executions = baseline
2475            .bundle_reconfiguration_plan_executions(0)
2476            .expect("baseline plan executions")
2477            .to_vec();
2478
2479        let mut restored = configured_runtime("cert/reconfig-snapshot");
2480        restored
2481            .restore_bundle_reconfiguration_snapshot(0, snapshot)
2482            .expect("restore reconfiguration snapshot");
2483        let restored_suffix = restored
2484            .execute_reconfiguration_plan(0, &second_plan)
2485            .expect("execute suffix after restore");
2486
2487        assert_eq!(restored_suffix, baseline_suffix);
2488        assert_eq!(
2489            restored
2490                .bundle_reconfiguration_history(0)
2491                .expect("restored history"),
2492            baseline_history.as_slice()
2493        );
2494        assert_eq!(
2495            restored
2496                .bundle_reconfiguration_plan_executions(0)
2497                .expect("restored plan executions"),
2498            baseline_executions.as_slice()
2499        );
2500    }
2501
2502    #[test]
2503    fn runtime_upgrade_emits_staged_admitted_and_committed_artifacts() {
2504        let mut runtime =
2505            ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2506        let bundle = ProtocolBundle::new(
2507            image("m"),
2508            CompositionCertificate {
2509                artifact_id: "cert/runtime-upgrade".to_string(),
2510                link_ok_full: true,
2511                theorem_pack: TheoremPackCapabilities::full(),
2512                runtime_contracts: Some(RuntimeContracts::full()),
2513            },
2514        )
2515        .with_reconfiguration_policy(ReconfigurationPolicy {
2516            dynamic_membership: true,
2517            overlap_required: true,
2518        });
2519        runtime.admit_bundle(bundle).expect("admit bundle");
2520        runtime
2521            .seed_bundle_membership(0, ["Alice", "Bob"])
2522            .expect("seed members");
2523
2524        let request = RuntimeUpgradeRequest {
2525            upgrade_id: "upgrade/v2".to_string(),
2526            plan: ReconfigurationPlan {
2527                plan_id: "plan/upgrade".to_string(),
2528                steps: vec![ReconfigurationPlanStep {
2529                    step_id: "cutover".to_string(),
2530                    next_members: vec!["Bob".to_string(), "Carol".to_string()],
2531                    placements: vec![
2532                        PlacementObservation::local("Bob").with_region("eu_central_1"),
2533                        PlacementObservation::remote("Carol", "127.0.0.1:19901")
2534                            .with_region("us_east_1"),
2535                    ],
2536                }],
2537            },
2538            compatibility: RuntimeUpgradeCompatibility {
2539                execution_constraint: RuntimeUpgradeExecutionConstraint::PreserveBundleProfile,
2540                ownership_continuity_required: true,
2541                pending_effect_treatment: PendingEffectTreatment::InvalidateBlocked,
2542                canonical_publication_continuity:
2543                    CanonicalPublicationContinuity::PreserveCanonicalTruth,
2544            },
2545            carried_publication_ids: vec!["publication:accept#1".to_string()],
2546            invalidated_publication_ids: Vec::new(),
2547            carried_obligation_ids: vec!["obligation:pending#1".to_string()],
2548            invalidated_obligation_ids: Vec::new(),
2549        };
2550
2551        let execution = runtime
2552            .execute_runtime_upgrade(0, &request)
2553            .expect("execute runtime upgrade");
2554        assert_eq!(execution.upgrade_id, "upgrade/v2");
2555        assert_eq!(execution.artifacts.len(), 3);
2556        assert_eq!(
2557            execution.artifacts[0].phase,
2558            TransitionArtifactPhase::Staged
2559        );
2560        assert_eq!(
2561            execution.artifacts[1].phase,
2562            TransitionArtifactPhase::Admitted
2563        );
2564        assert_eq!(
2565            execution.artifacts[2].phase,
2566            TransitionArtifactPhase::CommittedCutover
2567        );
2568        assert_eq!(
2569            execution.artifacts[2].carried_publication_ids,
2570            vec!["publication:accept#1".to_string()]
2571        );
2572        assert_eq!(
2573            runtime
2574                .bundle_runtime_upgrade_executions(0)
2575                .expect("runtime upgrade executions")
2576                .last(),
2577            Some(&execution)
2578        );
2579    }
2580
2581    #[test]
2582    fn runtime_upgrade_rejects_when_canonical_continuity_is_violated() {
2583        let mut runtime =
2584            ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2585        let bundle = ProtocolBundle::new(
2586            image("m"),
2587            CompositionCertificate {
2588                artifact_id: "cert/runtime-upgrade-reject".to_string(),
2589                link_ok_full: true,
2590                theorem_pack: TheoremPackCapabilities::full(),
2591                runtime_contracts: Some(RuntimeContracts::full()),
2592            },
2593        )
2594        .with_reconfiguration_policy(ReconfigurationPolicy {
2595            dynamic_membership: true,
2596            overlap_required: true,
2597        });
2598        runtime.admit_bundle(bundle).expect("admit bundle");
2599        runtime
2600            .seed_bundle_membership(0, ["Alice", "Bob"])
2601            .expect("seed members");
2602
2603        let request = RuntimeUpgradeRequest {
2604            upgrade_id: "upgrade/reject".to_string(),
2605            plan: ReconfigurationPlan {
2606                plan_id: "plan/reject".to_string(),
2607                steps: vec![ReconfigurationPlanStep {
2608                    step_id: "cutover".to_string(),
2609                    next_members: vec!["Bob".to_string(), "Carol".to_string()],
2610                    placements: vec![
2611                        PlacementObservation::local("Bob"),
2612                        PlacementObservation::remote("Carol", "127.0.0.1:19902"),
2613                    ],
2614                }],
2615            },
2616            compatibility: RuntimeUpgradeCompatibility {
2617                execution_constraint: RuntimeUpgradeExecutionConstraint::PreserveBundleProfile,
2618                ownership_continuity_required: true,
2619                pending_effect_treatment: PendingEffectTreatment::PreservePending,
2620                canonical_publication_continuity:
2621                    CanonicalPublicationContinuity::PreserveCanonicalTruth,
2622            },
2623            carried_publication_ids: vec!["publication:accept#1".to_string()],
2624            invalidated_publication_ids: vec!["publication:accept#1".to_string()],
2625            carried_obligation_ids: Vec::new(),
2626            invalidated_obligation_ids: Vec::new(),
2627        };
2628
2629        let err = runtime
2630            .execute_runtime_upgrade(0, &request)
2631            .expect_err("continuity violation must reject");
2632        assert!(matches!(
2633            err,
2634            CompositionError::InvalidReconfigurationPlan { .. }
2635        ));
2636        let execution = runtime
2637            .bundle_runtime_upgrade_executions(0)
2638            .expect("runtime upgrade executions")
2639            .last()
2640            .expect("failed execution artifact");
2641        assert_eq!(
2642            execution.artifacts.last().expect("failure artifact").phase,
2643            TransitionArtifactPhase::Failed
2644        );
2645    }
2646
2647    #[test]
2648    fn runtime_upgrade_rollback_is_replay_visible_when_plan_breaks_overlap() {
2649        let mut runtime =
2650            ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2651        let bundle = ProtocolBundle::new(
2652            image("m"),
2653            CompositionCertificate {
2654                artifact_id: "cert/runtime-upgrade-rollback".to_string(),
2655                link_ok_full: true,
2656                theorem_pack: TheoremPackCapabilities::full(),
2657                runtime_contracts: Some(RuntimeContracts::full()),
2658            },
2659        )
2660        .with_reconfiguration_policy(ReconfigurationPolicy {
2661            dynamic_membership: true,
2662            overlap_required: true,
2663        });
2664        runtime.admit_bundle(bundle).expect("admit bundle");
2665        runtime
2666            .seed_bundle_membership(0, ["Alice", "Bob"])
2667            .expect("seed members");
2668
2669        let request = RuntimeUpgradeRequest {
2670            upgrade_id: "upgrade/rollback".to_string(),
2671            plan: ReconfigurationPlan {
2672                plan_id: "plan/rollback".to_string(),
2673                steps: vec![ReconfigurationPlanStep {
2674                    step_id: "cutover".to_string(),
2675                    next_members: vec!["Carol".to_string(), "Dora".to_string()],
2676                    placements: vec![
2677                        PlacementObservation::local("Carol"),
2678                        PlacementObservation::local("Dora"),
2679                    ],
2680                }],
2681            },
2682            compatibility: RuntimeUpgradeCompatibility {
2683                execution_constraint: RuntimeUpgradeExecutionConstraint::PreserveBundleProfile,
2684                ownership_continuity_required: false,
2685                pending_effect_treatment: PendingEffectTreatment::InvalidateBlocked,
2686                canonical_publication_continuity:
2687                    CanonicalPublicationContinuity::ReissueCanonicalTruth,
2688            },
2689            carried_publication_ids: Vec::new(),
2690            invalidated_publication_ids: vec!["publication:old#1".to_string()],
2691            carried_obligation_ids: vec!["obligation:pending#1".to_string()],
2692            invalidated_obligation_ids: vec!["obligation:blocked#1".to_string()],
2693        };
2694
2695        let err = runtime
2696            .execute_runtime_upgrade(0, &request)
2697            .expect_err("overlap failure must roll back");
2698        assert!(matches!(
2699            err,
2700            CompositionError::OverlapRequiredViolation { .. }
2701        ));
2702        let execution = runtime
2703            .bundle_runtime_upgrade_executions(0)
2704            .expect("runtime upgrade executions")
2705            .last()
2706            .expect("rolled-back execution");
2707        assert_eq!(
2708            execution.artifacts.last().expect("rollback artifact").phase,
2709            TransitionArtifactPhase::RolledBack
2710        );
2711        assert_eq!(
2712            runtime.bundle_members(0).expect("members after rollback"),
2713            &BTreeSet::from(["Alice".to_string(), "Bob".to_string()])
2714        );
2715    }
2716
2717    #[test]
2718    fn runtime_upgrade_snapshot_restore_preserves_upgrade_history() {
2719        let mut baseline =
2720            ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2721        let bundle = ProtocolBundle::new(
2722            image("m"),
2723            CompositionCertificate {
2724                artifact_id: "cert/runtime-upgrade-snapshot".to_string(),
2725                link_ok_full: true,
2726                theorem_pack: TheoremPackCapabilities::full(),
2727                runtime_contracts: Some(RuntimeContracts::full()),
2728            },
2729        )
2730        .with_reconfiguration_policy(ReconfigurationPolicy {
2731            dynamic_membership: true,
2732            overlap_required: true,
2733        });
2734        baseline.admit_bundle(bundle).expect("admit bundle");
2735        baseline
2736            .seed_bundle_membership(0, ["Alice", "Bob"])
2737            .expect("seed members");
2738
2739        let request = RuntimeUpgradeRequest {
2740            upgrade_id: "upgrade/snapshot".to_string(),
2741            plan: ReconfigurationPlan {
2742                plan_id: "plan/snapshot".to_string(),
2743                steps: vec![ReconfigurationPlanStep {
2744                    step_id: "cutover".to_string(),
2745                    next_members: vec!["Bob".to_string(), "Carol".to_string()],
2746                    placements: vec![
2747                        PlacementObservation::local("Bob"),
2748                        PlacementObservation::remote("Carol", "127.0.0.1:19903"),
2749                    ],
2750                }],
2751            },
2752            compatibility: RuntimeUpgradeCompatibility {
2753                execution_constraint: RuntimeUpgradeExecutionConstraint::PreserveBundleProfile,
2754                ownership_continuity_required: true,
2755                pending_effect_treatment: PendingEffectTreatment::InvalidateBlocked,
2756                canonical_publication_continuity:
2757                    CanonicalPublicationContinuity::PreserveCanonicalTruth,
2758            },
2759            carried_publication_ids: vec!["publication:stable#1".to_string()],
2760            invalidated_publication_ids: Vec::new(),
2761            carried_obligation_ids: vec!["obligation:stable#1".to_string()],
2762            invalidated_obligation_ids: Vec::new(),
2763        };
2764        let baseline_execution = baseline
2765            .execute_runtime_upgrade(0, &request)
2766            .expect("execute baseline upgrade");
2767        let snapshot = serde_json::from_str::<ReconfigurationRuntimeSnapshot>(
2768            &serde_json::to_string(
2769                &baseline
2770                    .bundle_reconfiguration_snapshot(0)
2771                    .expect("snapshot after upgrade"),
2772            )
2773            .expect("serialize snapshot"),
2774        )
2775        .expect("deserialize snapshot");
2776
2777        let mut restored =
2778            ComposedRuntime::new(ProtocolMachineConfig::default(), MemoryBudget::default());
2779        let bundle = ProtocolBundle::new(
2780            image("m"),
2781            CompositionCertificate {
2782                artifact_id: "cert/runtime-upgrade-snapshot".to_string(),
2783                link_ok_full: true,
2784                theorem_pack: TheoremPackCapabilities::full(),
2785                runtime_contracts: Some(RuntimeContracts::full()),
2786            },
2787        )
2788        .with_reconfiguration_policy(ReconfigurationPolicy {
2789            dynamic_membership: true,
2790            overlap_required: true,
2791        });
2792        restored.admit_bundle(bundle).expect("admit bundle");
2793        restored
2794            .restore_bundle_reconfiguration_snapshot(0, snapshot)
2795            .expect("restore snapshot");
2796
2797        assert_eq!(
2798            restored
2799                .bundle_runtime_upgrade_executions(0)
2800                .expect("restored upgrades"),
2801            &[baseline_execution]
2802        );
2803    }
2804}