Skip to main content

telltale_machine/engine/
protocol_machine_error_and_step_pack.rs

1/// Errors from ProtocolMachine operations.
2#[derive(Debug, thiserror::Error)]
3pub enum ProtocolMachineError {
4    /// A coroutine faulted.
5    #[error("coroutine {coro_id} faulted: {fault}")]
6    Fault {
7        /// Coroutine ID.
8        coro_id: usize,
9        /// The fault.
10        fault: Fault,
11    },
12    /// Session limit exceeded.
13    #[error("max sessions ({max}) exceeded")]
14    TooManySessions {
15        /// Maximum allowed.
16        max: usize,
17    },
18    /// Coroutine limit exceeded.
19    #[error("max coroutines ({max}) exceeded")]
20    TooManyCoroutines {
21        /// Maximum allowed.
22        max: usize,
23    },
24    /// Session not found.
25    #[error("session {0} not found")]
26    SessionNotFound(SessionId),
27    /// Effect handler error.
28    #[error("effect handler error: {0}")]
29    HandlerError(EffectFailure),
30    /// Persistence model lifecycle error.
31    #[error("persistence error: {0}")]
32    PersistenceError(String),
33    /// Invalid concurrency parameter.
34    #[error("invalid concurrency level: {n}")]
35    InvalidConcurrency {
36        /// Requested concurrency.
37        n: usize,
38    },
39    /// ProtocolMachine configuration violates required runtime invariants.
40    #[error("invalid ProtocolMachine config: {reason}")]
41    InvalidConfig {
42        /// Validation failure details.
43        reason: String,
44    },
45    /// Thread-pool initialization failed.
46    #[error("thread pool build failed: {message}")]
47    ThreadPoolBuild {
48        /// Build error details.
49        message: String,
50    },
51    /// Code image failed runtime validation checks.
52    #[error("invalid code image: {reason}")]
53    InvalidCodeImage {
54        /// Validation failure details.
55        reason: String,
56    },
57    /// Ownership contract violation surfaced by a preferred host integration path.
58    #[error("ownership contract error: {0}")]
59    OwnershipContract(String),
60}
61
62// ---- StepPack: atomic instruction result (matches Lean StepPack) ----
63
64/// How to update the coroutine after an instruction.
65pub(crate) enum CoroUpdate {
66    /// Advance PC by 1, status = Ready.
67    AdvancePc,
68    /// Set PC to target (for Jmp), status = Ready.
69    SetPc(PC),
70    /// Block with given reason. PC unchanged.
71    Block(BlockReason),
72    /// Advance PC by 1 and set blocked status.
73    AdvancePcBlock(BlockReason),
74    /// Halt (Done). PC unchanged.
75    Halt,
76    /// Advance PC by 1, write a value to a register, status = Ready.
77    AdvancePcWriteReg { reg: u16, val: Value },
78}
79
80/// Type update action for commit.
81pub(crate) enum TypeUpdate {
82    /// Advance to a new local type.
83    Advance(LocalTypeR),
84    /// Advance to a new local type and update the original (for Mu unfolding).
85    AdvanceWithOriginal(LocalTypeR, LocalTypeR),
86    /// Remove the type entry (endpoint completed).
87    Remove,
88}
89
90/// Resolve a continuation and build the appropriate `TypeUpdate`.
91pub(crate) fn resolve_type_update(
92    cont: &LocalTypeR,
93    original: &LocalTypeR,
94    ep: &Endpoint,
95) -> (LocalTypeR, Option<(Endpoint, TypeUpdate)>) {
96    let (resolved, new_scope) = unfold_if_var_with_scope(cont, original);
97    let update = if let Some(mu) = new_scope {
98        Some((
99            ep.clone(),
100            TypeUpdate::AdvanceWithOriginal(resolved.clone(), mu),
101        ))
102    } else {
103        Some((ep.clone(), TypeUpdate::Advance(resolved.clone())))
104    };
105    (resolved, update)
106}
107
108/// Atomic result of executing one instruction.
109///
110/// Matches the Lean `StepPack` pattern: bundles all mutations so the
111/// caller commits them together via `commit_pack`.
112pub(crate) struct StepPack {
113    /// How to update the coroutine.
114    pub(crate) coro_update: CoroUpdate,
115    /// Type advancement, if any. `None` means no type change (e.g., block, control flow).
116    pub(crate) type_update: Option<(Endpoint, TypeUpdate)>,
117    /// Observable events to emit.
118    pub(crate) events: Vec<ObsEvent>,
119}
120
121#[derive(Clone, Copy)]
122pub(crate) struct GuardAcquireInput<'a> {
123    pub coro_idx: usize,
124    pub endpoint: &'a Endpoint,
125    pub role: &'a str,
126    pub sid: SessionId,
127    pub layer: &'a str,
128    pub dst: u16,
129}
130
131#[derive(Clone, Copy)]
132pub(crate) struct GuardReleaseInput<'a> {
133    pub coro_idx: usize,
134    pub endpoint: &'a Endpoint,
135    pub role: &'a str,
136    pub sid: SessionId,
137    pub layer: &'a str,
138    pub evidence: u16,
139}
140
141/// Internal outcome after committing a `StepPack`.
142pub(crate) enum ExecOutcome {
143    /// Instruction completed, coroutine continues.
144    Continue,
145    /// Coroutine blocked on a resource.
146    Blocked(BlockReason),
147    /// Coroutine halted normally.
148    Halted,
149}
150
151// ---- The ProtocolMachine ----
152
153/// Retained observability artifacts with optional bounded storage.
154#[derive(Debug, Clone, Serialize, Deserialize)]
155#[serde(transparent)]
156pub(crate) struct RetainedLog<T>(Vec<T>);
157
158fn default_true() -> bool {
159    true
160}
161
162impl<T> Default for RetainedLog<T> {
163    fn default() -> Self {
164        Self(Vec::new())
165    }
166}
167
168impl<T> RetainedLog<T> {
169    fn push(&mut self, item: T, config: &ObservabilityRetentionConfig) {
170        match config.mode {
171            ObservabilityRetentionMode::Disabled => {}
172            ObservabilityRetentionMode::Full => self.0.push(item),
173            ObservabilityRetentionMode::Capped => {
174                self.0.push(item);
175                self.trim_to_capacity(config.capacity);
176            }
177        }
178    }
179
180    fn extend<I>(&mut self, iter: I, config: &ObservabilityRetentionConfig)
181    where
182        I: IntoIterator<Item = T>,
183    {
184        match config.mode {
185            ObservabilityRetentionMode::Disabled => {}
186            ObservabilityRetentionMode::Full => self.0.extend(iter),
187            ObservabilityRetentionMode::Capped => {
188                self.0.extend(iter);
189                self.trim_to_capacity(config.capacity);
190            }
191        }
192    }
193
194    fn as_slice(&self) -> &[T] {
195        &self.0
196    }
197
198    fn as_mut_slice(&mut self) -> &mut [T] {
199        &mut self.0
200    }
201
202    fn len(&self) -> usize {
203        self.0.len()
204    }
205
206    fn drain(&mut self) -> Vec<T> {
207        self.0.drain(..).collect()
208    }
209
210    fn trim_to_capacity(&mut self, capacity: usize) {
211        if self.0.len() > capacity {
212            let overflow = self.0.len() - capacity;
213            self.0.drain(0..overflow);
214        }
215    }
216}
217
218impl<T> std::ops::Deref for RetainedLog<T> {
219    type Target = [T];
220
221    fn deref(&self) -> &Self::Target {
222        self.as_slice()
223    }
224}
225
226/// The choreographic ProtocolMachine.
227///
228/// Manages coroutines, sessions (which own type state), and a scheduler.
229/// Multiple choreographies can be loaded into a single ProtocolMachine, each in its
230/// own session namespace — justified by separation logic.
231#[derive(Debug, Serialize, Deserialize)]
232pub struct ProtocolMachine<I = (), G = (), P = NoopPersistence, Nu = DefaultVerificationModel>
233where
234    P: PersistenceModel,
235{
236    config: ProtocolMachineConfig,
237    code: Option<Program>,
238    programs: ProgramStore,
239    identity_model: PhantomData<I>,
240    guard_model: PhantomData<G>,
241    persistence_model: PhantomData<P>,
242    persistent: P::PState,
243    verification: Nu,
244    #[serde(default)]
245    communication_consumption: DefaultCommunicationConsumption,
246    #[serde(default)]
247    communication_consumption_artifacts: RetainedLog<CommunicationConsumptionArtifact>,
248    coroutines: Vec<Coroutine>,
249    sessions: SessionStore,
250    arena: Arena,
251    resource_states: BTreeMap<ScopeId, ResourceState>,
252    sched: Scheduler,
253    monitor: SessionMonitor,
254    obs_trace: RetainedLog<ObsEvent>,
255    role_symbols: SymbolTable,
256    label_symbols: SymbolTable,
257    handler_symbols: SymbolTable,
258    edge_symbols: EdgeSymbolTable,
259    clock: SimClock,
260    next_coro_id: usize,
261    next_session_id: SessionId,
262    paused_roles: BTreeSet<String>,
263    #[serde(skip, default)]
264    coro_slots: BTreeMap<usize, usize>,
265    #[serde(skip, default)]
266    role_coroutines: BTreeMap<String, Vec<usize>>,
267    #[serde(skip, default)]
268    paused_coro_ids: BTreeSet<usize>,
269    #[serde(skip, default)]
270    timed_out_coro_ids: BTreeSet<usize>,
271    #[serde(skip, default)]
272    session_open_plans: BTreeMap<String, crate::session::SessionOpenPlan>,
273    #[serde(skip, default)]
274    eligible_ready: BTreeSet<usize>,
275    #[serde(skip, default = "default_true")]
276    eligibility_dirty: bool,
277    guard_layer: InMemoryGuardLayer,
278    effect_trace: RetainedLog<EffectTraceEntry>,
279    effect_exchanges: RetainedLog<EffectExchangeRecord>,
280    operation_instances: RetainedLog<OperationInstance>,
281    outstanding_effects: RetainedLog<OutstandingEffect>,
282    progress_contracts: RetainedLog<ProgressContract>,
283    progress_transitions: RetainedLog<ProgressTransition>,
284    next_effect_id: u64,
285    output_condition_checks: RetainedLog<OutputConditionCheck>,
286    delegation_audit_log: RetainedLog<DelegationAuditRecord>,
287    next_delegation_receipt_id: u64,
288    authority_audit_log: RetainedLog<AuthorityAuditRecord>,
289    next_authority_witness_id: u64,
290    crashed_sites: BTreeSet<SiteId>,
291    partitioned_edges: BTreeSet<(SiteId, SiteId)>,
292    corrupted_edges: BTreeMap<(SiteId, SiteId), CorruptionType>,
293    timed_out_sites: BTreeMap<SiteId, TimeoutWitness>,
294    last_sched_step: Option<SchedStepDebug>,
295    #[serde(skip, default)]
296    last_pre_dispatch_state: Option<crate::refinement::ProtocolMachineRefinementSlice>,
297    handler_identity_anchor: Option<String>,
298}
299
300/// Lean-aligned ProtocolMachine state alias.
301pub type ProtocolMachineState<I = (), G = (), P = NoopPersistence, Nu = DefaultVerificationModel> =
302    ProtocolMachine<I, G, P, Nu>;
303
304impl<I, G, P, Nu> ProtocolMachine<I, G, P, Nu>
305where
306    P: PersistenceModel,
307{
308    /// Create a ProtocolMachine for arbitrary persistence/verification model parameters.
309    #[must_use]
310    pub fn new_with_models(config: ProtocolMachineConfig) -> Self
311    where
312        P::PState: Default,
313        Nu: VerificationModel + Default,
314    {
315        config.assert_invariants();
316        let tick_duration = config.tick_duration;
317        let communication_replay_mode = config.communication_replay_mode;
318        let sched = Scheduler::new(config.sched_policy.clone());
319        let mut guard_resources = BTreeMap::new();
320        for layer in &config.guard_layers {
321            guard_resources.insert(layer.id.clone(), Value::Unit);
322        }
323        Self {
324            config,
325            code: None,
326            programs: ProgramStore::new(),
327            identity_model: PhantomData,
328            guard_model: PhantomData,
329            persistence_model: PhantomData,
330            persistent: P::PState::default(),
331            verification: Nu::default(),
332            communication_consumption: DefaultCommunicationConsumption::new(
333                communication_replay_mode,
334            ),
335            communication_consumption_artifacts: RetainedLog::default(),
336            coroutines: Vec::new(),
337            sessions: SessionStore::new(),
338            arena: Arena::default(),
339            resource_states: BTreeMap::new(),
340            sched,
341            monitor: SessionMonitor::default(),
342            obs_trace: RetainedLog::default(),
343            role_symbols: SymbolTable::new(),
344            label_symbols: SymbolTable::new(),
345            handler_symbols: SymbolTable::new(),
346            edge_symbols: EdgeSymbolTable::new(),
347            clock: SimClock::new(tick_duration),
348            next_coro_id: 0,
349            next_session_id: 0,
350            paused_roles: BTreeSet::new(),
351            coro_slots: BTreeMap::new(),
352            role_coroutines: BTreeMap::new(),
353            paused_coro_ids: BTreeSet::new(),
354            timed_out_coro_ids: BTreeSet::new(),
355            session_open_plans: BTreeMap::new(),
356            eligible_ready: BTreeSet::new(),
357            eligibility_dirty: true,
358            guard_layer: InMemoryGuardLayer {
359                resources: guard_resources
360                    .into_iter()
361                    .map(|(k, v)| (LayerId(k), v))
362                    .collect(),
363            },
364            effect_trace: RetainedLog::default(),
365            effect_exchanges: RetainedLog::default(),
366            operation_instances: RetainedLog::default(),
367            outstanding_effects: RetainedLog::default(),
368            progress_contracts: RetainedLog::default(),
369            progress_transitions: RetainedLog::default(),
370            next_effect_id: 0,
371            output_condition_checks: RetainedLog::default(),
372            delegation_audit_log: RetainedLog::default(),
373            next_delegation_receipt_id: 0,
374            authority_audit_log: RetainedLog::default(),
375            next_authority_witness_id: 0,
376            crashed_sites: BTreeSet::new(),
377            partitioned_edges: BTreeSet::new(),
378            corrupted_edges: BTreeMap::new(),
379            timed_out_sites: BTreeMap::new(),
380            last_sched_step: None,
381            last_pre_dispatch_state: None,
382            handler_identity_anchor: None,
383        }
384    }
385
386    /// Borrow the persistent state tracked by the configured persistence model.
387    #[must_use]
388    pub fn persistent_state(&self) -> &P::PState {
389        &self.persistent
390    }
391
392    /// Mutably borrow persistent state.
393    pub fn persistent_state_mut(&mut self) -> &mut P::PState {
394        &mut self.persistent
395    }
396
397    fn apply_open_delta(&mut self, sid: SessionId) -> Result<(), String> {
398        let delta = P::open_delta(sid);
399        P::apply(&mut self.persistent, &delta)
400    }
401
402    fn apply_close_delta(&mut self, sid: SessionId) -> Result<(), String> {
403        let delta = P::close_delta(sid);
404        P::apply(&mut self.persistent, &delta)
405    }
406
407    fn apply_invoke_delta(&mut self, sid: SessionId, action: &str) -> Result<(), String> {
408        if let Some(delta) = P::invoke_delta(sid, action) {
409            P::apply(&mut self.persistent, &delta)?;
410        }
411        Ok(())
412    }
413
414    /// Resolve guard-layer capability for a participant via bridge binding.
415    #[must_use]
416    pub fn bridge_guard_layer_for_participant<B>(
417        &self,
418        bridge: &B,
419        participant: &I::ParticipantId,
420    ) -> LayerId
421    where
422        I: IdentityModel,
423        G: GuardLayer,
424        B: IdentityGuardBridge<I, G>,
425    {
426        bridge.guard_layer_for_participant(participant)
427    }
428
429    /// Resolve participant verification key via bridge binding.
430    #[must_use]
431    pub fn bridge_verifying_key_for_participant<B>(
432        &self,
433        bridge: &B,
434        participant: &I::ParticipantId,
435    ) -> Nu::VerifyingKey
436    where
437        I: IdentityModel,
438        Nu: VerificationModel,
439        B: IdentityVerificationBridge<I, Nu>,
440    {
441        bridge.verification_key_for_participant(participant)
442    }
443}