1#[derive(Debug, thiserror::Error)]
3pub enum ProtocolMachineError {
4 #[error("coroutine {coro_id} faulted: {fault}")]
6 Fault {
7 coro_id: usize,
9 fault: Fault,
11 },
12 #[error("max sessions ({max}) exceeded")]
14 TooManySessions {
15 max: usize,
17 },
18 #[error("max coroutines ({max}) exceeded")]
20 TooManyCoroutines {
21 max: usize,
23 },
24 #[error("session {0} not found")]
26 SessionNotFound(SessionId),
27 #[error("effect handler error: {0}")]
29 HandlerError(EffectFailure),
30 #[error("persistence error: {0}")]
32 PersistenceError(String),
33 #[error("invalid concurrency level: {n}")]
35 InvalidConcurrency {
36 n: usize,
38 },
39 #[error("invalid ProtocolMachine config: {reason}")]
41 InvalidConfig {
42 reason: String,
44 },
45 #[error("thread pool build failed: {message}")]
47 ThreadPoolBuild {
48 message: String,
50 },
51 #[error("invalid code image: {reason}")]
53 InvalidCodeImage {
54 reason: String,
56 },
57 #[error("ownership contract error: {0}")]
59 OwnershipContract(String),
60}
61
62pub(crate) enum CoroUpdate {
66 AdvancePc,
68 SetPc(PC),
70 Block(BlockReason),
72 AdvancePcBlock(BlockReason),
74 Halt,
76 AdvancePcWriteReg { reg: u16, val: Value },
78}
79
80pub(crate) enum TypeUpdate {
82 Advance(LocalTypeR),
84 AdvanceWithOriginal(LocalTypeR, LocalTypeR),
86 Remove,
88}
89
90pub(crate) fn resolve_type_update(
92 cont: &LocalTypeR,
93 original: &LocalTypeR,
94 ep: &Endpoint,
95) -> (LocalTypeR, Option<(Endpoint, TypeUpdate)>) {
96 let (resolved, new_scope) = unfold_if_var_with_scope(cont, original);
97 let update = if let Some(mu) = new_scope {
98 Some((
99 ep.clone(),
100 TypeUpdate::AdvanceWithOriginal(resolved.clone(), mu),
101 ))
102 } else {
103 Some((ep.clone(), TypeUpdate::Advance(resolved.clone())))
104 };
105 (resolved, update)
106}
107
108pub(crate) struct StepPack {
113 pub(crate) coro_update: CoroUpdate,
115 pub(crate) type_update: Option<(Endpoint, TypeUpdate)>,
117 pub(crate) events: Vec<ObsEvent>,
119}
120
121#[derive(Clone, Copy)]
122pub(crate) struct GuardAcquireInput<'a> {
123 pub coro_idx: usize,
124 pub endpoint: &'a Endpoint,
125 pub role: &'a str,
126 pub sid: SessionId,
127 pub layer: &'a str,
128 pub dst: u16,
129}
130
131#[derive(Clone, Copy)]
132pub(crate) struct GuardReleaseInput<'a> {
133 pub coro_idx: usize,
134 pub endpoint: &'a Endpoint,
135 pub role: &'a str,
136 pub sid: SessionId,
137 pub layer: &'a str,
138 pub evidence: u16,
139}
140
141pub(crate) enum ExecOutcome {
143 Continue,
145 Blocked(BlockReason),
147 Halted,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
155#[serde(transparent)]
156pub(crate) struct RetainedLog<T>(Vec<T>);
157
158fn default_true() -> bool {
159 true
160}
161
162impl<T> Default for RetainedLog<T> {
163 fn default() -> Self {
164 Self(Vec::new())
165 }
166}
167
168impl<T> RetainedLog<T> {
169 fn push(&mut self, item: T, config: &ObservabilityRetentionConfig) {
170 match config.mode {
171 ObservabilityRetentionMode::Disabled => {}
172 ObservabilityRetentionMode::Full => self.0.push(item),
173 ObservabilityRetentionMode::Capped => {
174 self.0.push(item);
175 self.trim_to_capacity(config.capacity);
176 }
177 }
178 }
179
180 fn extend<I>(&mut self, iter: I, config: &ObservabilityRetentionConfig)
181 where
182 I: IntoIterator<Item = T>,
183 {
184 match config.mode {
185 ObservabilityRetentionMode::Disabled => {}
186 ObservabilityRetentionMode::Full => self.0.extend(iter),
187 ObservabilityRetentionMode::Capped => {
188 self.0.extend(iter);
189 self.trim_to_capacity(config.capacity);
190 }
191 }
192 }
193
194 fn as_slice(&self) -> &[T] {
195 &self.0
196 }
197
198 fn as_mut_slice(&mut self) -> &mut [T] {
199 &mut self.0
200 }
201
202 fn len(&self) -> usize {
203 self.0.len()
204 }
205
206 fn drain(&mut self) -> Vec<T> {
207 self.0.drain(..).collect()
208 }
209
210 fn trim_to_capacity(&mut self, capacity: usize) {
211 if self.0.len() > capacity {
212 let overflow = self.0.len() - capacity;
213 self.0.drain(0..overflow);
214 }
215 }
216}
217
218impl<T> std::ops::Deref for RetainedLog<T> {
219 type Target = [T];
220
221 fn deref(&self) -> &Self::Target {
222 self.as_slice()
223 }
224}
225
226#[derive(Debug, Serialize, Deserialize)]
232pub struct ProtocolMachine<I = (), G = (), P = NoopPersistence, Nu = DefaultVerificationModel>
233where
234 P: PersistenceModel,
235{
236 config: ProtocolMachineConfig,
237 code: Option<Program>,
238 programs: ProgramStore,
239 identity_model: PhantomData<I>,
240 guard_model: PhantomData<G>,
241 persistence_model: PhantomData<P>,
242 persistent: P::PState,
243 verification: Nu,
244 #[serde(default)]
245 communication_consumption: DefaultCommunicationConsumption,
246 #[serde(default)]
247 communication_consumption_artifacts: RetainedLog<CommunicationConsumptionArtifact>,
248 coroutines: Vec<Coroutine>,
249 sessions: SessionStore,
250 arena: Arena,
251 resource_states: BTreeMap<ScopeId, ResourceState>,
252 sched: Scheduler,
253 monitor: SessionMonitor,
254 obs_trace: RetainedLog<ObsEvent>,
255 role_symbols: SymbolTable,
256 label_symbols: SymbolTable,
257 handler_symbols: SymbolTable,
258 edge_symbols: EdgeSymbolTable,
259 clock: SimClock,
260 next_coro_id: usize,
261 next_session_id: SessionId,
262 paused_roles: BTreeSet<String>,
263 #[serde(skip, default)]
264 coro_slots: BTreeMap<usize, usize>,
265 #[serde(skip, default)]
266 role_coroutines: BTreeMap<String, Vec<usize>>,
267 #[serde(skip, default)]
268 paused_coro_ids: BTreeSet<usize>,
269 #[serde(skip, default)]
270 timed_out_coro_ids: BTreeSet<usize>,
271 #[serde(skip, default)]
272 session_open_plans: BTreeMap<String, crate::session::SessionOpenPlan>,
273 #[serde(skip, default)]
274 eligible_ready: BTreeSet<usize>,
275 #[serde(skip, default = "default_true")]
276 eligibility_dirty: bool,
277 guard_layer: InMemoryGuardLayer,
278 effect_trace: RetainedLog<EffectTraceEntry>,
279 effect_exchanges: RetainedLog<EffectExchangeRecord>,
280 operation_instances: RetainedLog<OperationInstance>,
281 outstanding_effects: RetainedLog<OutstandingEffect>,
282 progress_contracts: RetainedLog<ProgressContract>,
283 progress_transitions: RetainedLog<ProgressTransition>,
284 next_effect_id: u64,
285 output_condition_checks: RetainedLog<OutputConditionCheck>,
286 delegation_audit_log: RetainedLog<DelegationAuditRecord>,
287 next_delegation_receipt_id: u64,
288 authority_audit_log: RetainedLog<AuthorityAuditRecord>,
289 next_authority_witness_id: u64,
290 crashed_sites: BTreeSet<SiteId>,
291 partitioned_edges: BTreeSet<(SiteId, SiteId)>,
292 corrupted_edges: BTreeMap<(SiteId, SiteId), CorruptionType>,
293 timed_out_sites: BTreeMap<SiteId, TimeoutWitness>,
294 last_sched_step: Option<SchedStepDebug>,
295 #[serde(skip, default)]
296 last_pre_dispatch_state: Option<crate::refinement::ProtocolMachineRefinementSlice>,
297 handler_identity_anchor: Option<String>,
298}
299
300pub type ProtocolMachineState<I = (), G = (), P = NoopPersistence, Nu = DefaultVerificationModel> =
302 ProtocolMachine<I, G, P, Nu>;
303
304impl<I, G, P, Nu> ProtocolMachine<I, G, P, Nu>
305where
306 P: PersistenceModel,
307{
308 #[must_use]
310 pub fn new_with_models(config: ProtocolMachineConfig) -> Self
311 where
312 P::PState: Default,
313 Nu: VerificationModel + Default,
314 {
315 config.assert_invariants();
316 let tick_duration = config.tick_duration;
317 let communication_replay_mode = config.communication_replay_mode;
318 let sched = Scheduler::new(config.sched_policy.clone());
319 let mut guard_resources = BTreeMap::new();
320 for layer in &config.guard_layers {
321 guard_resources.insert(layer.id.clone(), Value::Unit);
322 }
323 Self {
324 config,
325 code: None,
326 programs: ProgramStore::new(),
327 identity_model: PhantomData,
328 guard_model: PhantomData,
329 persistence_model: PhantomData,
330 persistent: P::PState::default(),
331 verification: Nu::default(),
332 communication_consumption: DefaultCommunicationConsumption::new(
333 communication_replay_mode,
334 ),
335 communication_consumption_artifacts: RetainedLog::default(),
336 coroutines: Vec::new(),
337 sessions: SessionStore::new(),
338 arena: Arena::default(),
339 resource_states: BTreeMap::new(),
340 sched,
341 monitor: SessionMonitor::default(),
342 obs_trace: RetainedLog::default(),
343 role_symbols: SymbolTable::new(),
344 label_symbols: SymbolTable::new(),
345 handler_symbols: SymbolTable::new(),
346 edge_symbols: EdgeSymbolTable::new(),
347 clock: SimClock::new(tick_duration),
348 next_coro_id: 0,
349 next_session_id: 0,
350 paused_roles: BTreeSet::new(),
351 coro_slots: BTreeMap::new(),
352 role_coroutines: BTreeMap::new(),
353 paused_coro_ids: BTreeSet::new(),
354 timed_out_coro_ids: BTreeSet::new(),
355 session_open_plans: BTreeMap::new(),
356 eligible_ready: BTreeSet::new(),
357 eligibility_dirty: true,
358 guard_layer: InMemoryGuardLayer {
359 resources: guard_resources
360 .into_iter()
361 .map(|(k, v)| (LayerId(k), v))
362 .collect(),
363 },
364 effect_trace: RetainedLog::default(),
365 effect_exchanges: RetainedLog::default(),
366 operation_instances: RetainedLog::default(),
367 outstanding_effects: RetainedLog::default(),
368 progress_contracts: RetainedLog::default(),
369 progress_transitions: RetainedLog::default(),
370 next_effect_id: 0,
371 output_condition_checks: RetainedLog::default(),
372 delegation_audit_log: RetainedLog::default(),
373 next_delegation_receipt_id: 0,
374 authority_audit_log: RetainedLog::default(),
375 next_authority_witness_id: 0,
376 crashed_sites: BTreeSet::new(),
377 partitioned_edges: BTreeSet::new(),
378 corrupted_edges: BTreeMap::new(),
379 timed_out_sites: BTreeMap::new(),
380 last_sched_step: None,
381 last_pre_dispatch_state: None,
382 handler_identity_anchor: None,
383 }
384 }
385
386 #[must_use]
388 pub fn persistent_state(&self) -> &P::PState {
389 &self.persistent
390 }
391
392 pub fn persistent_state_mut(&mut self) -> &mut P::PState {
394 &mut self.persistent
395 }
396
397 fn apply_open_delta(&mut self, sid: SessionId) -> Result<(), String> {
398 let delta = P::open_delta(sid);
399 P::apply(&mut self.persistent, &delta)
400 }
401
402 fn apply_close_delta(&mut self, sid: SessionId) -> Result<(), String> {
403 let delta = P::close_delta(sid);
404 P::apply(&mut self.persistent, &delta)
405 }
406
407 fn apply_invoke_delta(&mut self, sid: SessionId, action: &str) -> Result<(), String> {
408 if let Some(delta) = P::invoke_delta(sid, action) {
409 P::apply(&mut self.persistent, &delta)?;
410 }
411 Ok(())
412 }
413
414 #[must_use]
416 pub fn bridge_guard_layer_for_participant<B>(
417 &self,
418 bridge: &B,
419 participant: &I::ParticipantId,
420 ) -> LayerId
421 where
422 I: IdentityModel,
423 G: GuardLayer,
424 B: IdentityGuardBridge<I, G>,
425 {
426 bridge.guard_layer_for_participant(participant)
427 }
428
429 #[must_use]
431 pub fn bridge_verifying_key_for_participant<B>(
432 &self,
433 bridge: &B,
434 participant: &I::ParticipantId,
435 ) -> Nu::VerifyingKey
436 where
437 I: IdentityModel,
438 Nu: VerificationModel,
439 B: IdentityVerificationBridge<I, Nu>,
440 {
441 bridge.verification_key_for_participant(participant)
442 }
443}