Skip to main content

telltale_machine/engine/runtime_exec/
core.rs

1/// Approximate retained state for the live protocol-machine runtime.
2#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
3pub struct ProtocolMachineMemoryUsage {
4    /// Session-store retained state.
5    pub session_store: SessionStoreMemoryUsage,
6    /// Number of coroutine records still retained by the ProtocolMachine.
7    pub coroutine_records: usize,
8    /// Number of terminal coroutine records retained by the ProtocolMachine.
9    pub terminal_coroutines: usize,
10    /// Number of loaded immutable program records.
11    pub program_count: usize,
12    /// Total instruction count across loaded programs.
13    pub program_instruction_count: usize,
14    /// Number of retained observable events.
15    pub obs_events: usize,
16    /// Number of retained effect-trace entries.
17    pub effect_trace_entries: usize,
18    /// Number of retained delegation audit records.
19    pub delegation_audits: usize,
20    /// Number of retained authority witness audit records.
21    pub authority_audits: usize,
22    /// Number of retained replay-consumption artifacts.
23    pub communication_artifacts: usize,
24    /// Number of retained output-condition checks.
25    pub output_condition_checks: usize,
26    /// Estimated retained bytes by protocol-machine subsystem.
27    pub retained_bytes: ProtocolMachineRetainedBytes,
28}
29/// Estimated retained bytes for protocol-machine subsystems.
30#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
31pub struct ProtocolMachineRetainedBytes {
32    /// Session-store retained bytes.
33    pub session_store: usize,
34    /// Coroutine state.
35    pub coroutines: usize,
36    /// Immutable program storage.
37    pub programs: usize,
38    /// Resource-state storage.
39    pub resource_states: usize,
40    /// Observable/effect trace storage.
41    pub traces: usize,
42    /// Replay-state and replay-artifact storage.
43    pub replay: usize,
44    /// Output-condition diagnostics.
45    pub output_condition_checks: usize,
46    /// Scheduler and control-state bookkeeping.
47    pub scheduler_and_control: usize,
48    /// Symbol interning tables.
49    pub symbols: usize,
50    /// Guard-layer resources.
51    pub guard_layer: usize,
52    /// Session monitor metadata.
53    pub monitor: usize,
54    /// Arena slot storage.
55    pub arena: usize,
56    /// Aggregate retained bytes across ProtocolMachine subsystems.
57    pub total: usize,
58}
59
60fn serialized_byte_size<T: Serialize>(value: &T) -> usize {
61    crate::serialization::binary_size(value)
62}
63
64impl ProtocolMachine {
65    fn intern_load_plan_symbols(&mut self, plan: &crate::session::SessionOpenPlan, sid: SessionId) {
66        for role in plan.roles() {
67            let _: StringId = self.role_symbols.intern(role);
68        }
69        let _: StringId = self
70            .handler_symbols
71            .intern(crate::session::DEFAULT_HANDLER_ID);
72        let edge_handlers: Vec<_> = self
73            .sessions
74            .get(sid)
75            .map(|session| session.edge_handlers.keys().cloned().collect())
76            .unwrap_or_default();
77        for edge in edge_handlers {
78            let _: EdgeId = self.intern_edge(&edge);
79        }
80    }
81
82    /// Create a ProtocolMachine instance from configuration.
83    #[must_use]
84    pub fn new(config: ProtocolMachineConfig) -> Self {
85        Self::new_with_models(config)
86    }
87
88    fn bind_default_handlers_for_session(&mut self, sid: SessionId) {
89        self.sessions
90            .set_default_handler_for_session(sid, crate::session::DEFAULT_HANDLER_ID.to_string());
91        self.handler_symbols
92            .intern(crate::session::DEFAULT_HANDLER_ID);
93    }
94
95    fn ensure_session_capacity(&self) -> Result<(), ProtocolMachineError> {
96        if self.sessions.active_count() >= self.config.max_sessions {
97            return Err(ProtocolMachineError::TooManySessions {
98                max: self.config.max_sessions,
99            });
100        }
101        Ok(())
102    }
103
104    fn coroutine_runtime_eligible(&self, coro_id: usize) -> bool {
105        let Some(idx) = self.coro_index(coro_id) else {
106            return false;
107        };
108        let role = &self.coroutines[idx].role;
109        !(self.paused_coro_ids.contains(&coro_id)
110            || self.paused_roles.contains(role)
111            || self.crashed_sites.contains(role)
112            || self.timed_out_coro_ids.contains(&coro_id)
113            || self.timed_out_sites.contains_key(role))
114    }
115
116    fn mark_eligibility_dirty(&mut self) {
117        self.eligibility_dirty = true;
118    }
119
120    fn sync_ready_eligibility_for(&mut self, coro_id: usize) {
121        let eligible = self.sched.is_ready(coro_id) && self.coroutine_runtime_eligible(coro_id);
122        let eligibility = if eligible {
123            crate::scheduler::ReadyEligibility::Eligible
124        } else {
125            crate::scheduler::ReadyEligibility::Ineligible
126        };
127        self.sched.set_ready_eligibility(coro_id, eligibility);
128        #[cfg(debug_assertions)]
129        {
130            if eligible {
131                self.eligible_ready.insert(coro_id);
132            } else {
133                self.eligible_ready.remove(&coro_id);
134            }
135        }
136    }
137
138    fn refresh_ready_eligibility(&mut self) {
139        self.sched.clear_ready_eligibility();
140        #[cfg(debug_assertions)]
141        self.eligible_ready.clear();
142        for coro_id in self.sched.ready_set_snapshot() {
143            let eligible = self.coroutine_runtime_eligible(coro_id);
144            let eligibility = if eligible {
145                crate::scheduler::ReadyEligibility::Eligible
146            } else {
147                crate::scheduler::ReadyEligibility::Ineligible
148            };
149            self.sched.set_ready_eligibility(coro_id, eligibility);
150            #[cfg(debug_assertions)]
151            if eligible {
152                self.eligible_ready.insert(coro_id);
153            }
154        }
155        self.eligibility_dirty = false;
156    }
157
158    fn ensure_ready_eligibility(&mut self) {
159        if self.eligibility_dirty {
160            self.refresh_ready_eligibility();
161        }
162    }
163
164    #[cfg(debug_assertions)]
165    fn debug_assert_ready_eligibility_consistent(&self) {
166        for coro_id in &self.eligible_ready {
167            debug_assert!(self.sched.is_ready(*coro_id));
168            debug_assert!(self.coroutine_runtime_eligible(*coro_id));
169        }
170    }
171
172    fn sync_communication_consumption_mode(&mut self) {
173        self.communication_consumption
174            .set_mode(self.config.communication_replay_mode);
175    }
176
177    fn allocate_send_sequence(&mut self, edge: &Edge) -> u64 {
178        self.sync_communication_consumption_mode();
179        self.communication_consumption.allocate_send_sequence(edge)
180    }
181
182    fn consume_receive_identity(
183        &mut self,
184        identity: CommunicationIdentity,
185    ) -> Result<CommunicationConsumeResult, CommunicationReplayError> {
186        self.sync_communication_consumption_mode();
187        let result = self.communication_consumption.consume_receive(&identity)?;
188        self.communication_consumption_artifacts.push(
189            CommunicationConsumptionArtifact {
190                tick: self.clock.tick,
191                identity,
192                mode: result.mode,
193                pre_root: result.pre_root,
194                post_root: result.post_root,
195            },
196            &self.config.observability_retention,
197        );
198        Ok(result)
199    }
200
201    fn session_open_plan(&mut self, image: &CodeImage) -> &crate::session::SessionOpenPlan {
202        let key = format!("{image:p}");
203        self.session_open_plans.entry(key).or_insert_with(|| {
204            crate::session::SessionOpenPlan::new(&image.roles(), &image.local_types)
205        })
206    }
207
208    fn open_choreography_session(
209        &mut self,
210        plan: &crate::session::SessionOpenPlan,
211    ) -> (SessionId, Vec<String>) {
212        let sid = self.sessions.next_session_id();
213        let roles = plan.roles().to_vec();
214        self.sessions
215            .open_with_sid_from_plan(sid, plan, &self.config.buffer_config);
216        (sid, roles)
217    }
218
219    fn finalize_open_choreography_session(
220        &mut self,
221        sid: SessionId,
222        roles: &[String],
223        plan: &crate::session::SessionOpenPlan,
224    ) -> Result<(), ProtocolMachineError> {
225        self.next_session_id = self.sessions.next_session_id();
226        self.bind_default_handlers_for_session(sid);
227        self.intern_load_plan_symbols(plan, sid);
228        self.monitor.set_kind(sid, SessionKind::Peer);
229        self.resource_states.entry(sid).or_default();
230        self.apply_open_delta(sid)
231            .map_err(ProtocolMachineError::PersistenceError)?;
232        self.obs_trace.push(
233            ObsEvent::Opened {
234                tick: self.clock.tick,
235                session: sid,
236                roles: roles.to_vec(),
237            },
238            &self.config.observability_retention,
239        );
240        Ok(())
241    }
242
243    fn spawn_coroutine_for_role(
244        &mut self,
245        image: &CodeImage,
246        sid: SessionId,
247        role: &str,
248    ) -> Result<(), ProtocolMachineError> {
249        if self.coroutines.len() >= self.config.max_coroutines {
250            return Err(ProtocolMachineError::TooManyCoroutines {
251                max: self.config.max_coroutines,
252            });
253        }
254
255        let program_id = self
256            .programs
257            .intern(image.programs.get(role).cloned().unwrap_or_default());
258        if self.code.is_none() {
259            let program = self
260                .programs
261                .get(program_id)
262                .expect("interned program must exist")
263                .clone();
264            self.code = Some(program);
265        }
266
267        let coro_id = self.next_coro_id;
268        self.next_coro_id += 1;
269
270        let endpoint = Endpoint {
271            sid,
272            role: role.to_string(),
273        };
274        self.role_coroutines
275            .entry(role.to_string())
276            .or_default()
277            .push(coro_id);
278        if self.paused_roles.contains(role) {
279            self.paused_coro_ids.insert(coro_id);
280        }
281        if self.timed_out_sites.contains_key(role) {
282            self.timed_out_coro_ids.insert(coro_id);
283        }
284        let mut coro = Coroutine::new(
285            coro_id,
286            program_id,
287            sid,
288            role.to_string(),
289            self.config.num_registers,
290            self.config.initial_cost_budget,
291        );
292        coro.owned_endpoints.push(endpoint.clone());
293        if !coro.regs.is_empty() {
294            coro.regs[0] = Value::Endpoint(endpoint);
295        }
296        self.sched.add_ready(coro_id);
297        self.coroutines.push(coro);
298        self.coro_slots.insert(coro_id, self.coroutines.len() - 1);
299        self.sync_ready_eligibility_for(coro_id);
300        Ok(())
301    }
302
303    fn spawn_session_coroutines(
304        &mut self,
305        image: &CodeImage,
306        sid: SessionId,
307        roles: &[String],
308    ) -> Result<(), ProtocolMachineError> {
309        for role in roles {
310            self.spawn_coroutine_for_role(image, sid, role)?;
311        }
312        Ok(())
313    }
314
315    /// Runtime open primitive for a verified code image.
316    ///
317    /// Creates a session (with local types), spawns coroutines per role,
318    /// and returns the session ID. Type state is initialized in the
319    /// session store with no separate monitor object.
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if session or coroutine limits are exceeded.
324    #[doc(hidden)]
325    pub fn load_choreography(&mut self, image: &CodeImage) -> Result<SessionId, ProtocolMachineError> {
326        self.ensure_session_capacity()?;
327        image
328            .validate_runtime_shape()
329            .map_err(|reason| ProtocolMachineError::InvalidCodeImage { reason })?;
330        let plan = self.session_open_plan(image).clone();
331        let (sid, roles) = self.open_choreography_session(&plan);
332        self.finalize_open_choreography_session(sid, &roles, &plan)?;
333        self.programs.reserve(image.programs.len());
334        self.coroutines.reserve(roles.len());
335        self.spawn_session_coroutines(image, sid, &roles)?;
336        Ok(sid)
337    }
338}