telltale_machine/engine/runtime_exec/
core.rs1#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
3pub struct ProtocolMachineMemoryUsage {
4 pub session_store: SessionStoreMemoryUsage,
6 pub coroutine_records: usize,
8 pub terminal_coroutines: usize,
10 pub program_count: usize,
12 pub program_instruction_count: usize,
14 pub obs_events: usize,
16 pub effect_trace_entries: usize,
18 pub delegation_audits: usize,
20 pub authority_audits: usize,
22 pub communication_artifacts: usize,
24 pub output_condition_checks: usize,
26 pub retained_bytes: ProtocolMachineRetainedBytes,
28}
29#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
31pub struct ProtocolMachineRetainedBytes {
32 pub session_store: usize,
34 pub coroutines: usize,
36 pub programs: usize,
38 pub resource_states: usize,
40 pub traces: usize,
42 pub replay: usize,
44 pub output_condition_checks: usize,
46 pub scheduler_and_control: usize,
48 pub symbols: usize,
50 pub guard_layer: usize,
52 pub monitor: usize,
54 pub arena: usize,
56 pub total: usize,
58}
59
60fn serialized_byte_size<T: Serialize>(value: &T) -> usize {
61 crate::serialization::binary_size(value)
62}
63
64impl ProtocolMachine {
65 fn intern_load_plan_symbols(&mut self, plan: &crate::session::SessionOpenPlan, sid: SessionId) {
66 for role in plan.roles() {
67 let _: StringId = self.role_symbols.intern(role);
68 }
69 let _: StringId = self
70 .handler_symbols
71 .intern(crate::session::DEFAULT_HANDLER_ID);
72 let edge_handlers: Vec<_> = self
73 .sessions
74 .get(sid)
75 .map(|session| session.edge_handlers.keys().cloned().collect())
76 .unwrap_or_default();
77 for edge in edge_handlers {
78 let _: EdgeId = self.intern_edge(&edge);
79 }
80 }
81
82 #[must_use]
84 pub fn new(config: ProtocolMachineConfig) -> Self {
85 Self::new_with_models(config)
86 }
87
88 fn bind_default_handlers_for_session(&mut self, sid: SessionId) {
89 self.sessions
90 .set_default_handler_for_session(sid, crate::session::DEFAULT_HANDLER_ID.to_string());
91 self.handler_symbols
92 .intern(crate::session::DEFAULT_HANDLER_ID);
93 }
94
95 fn ensure_session_capacity(&self) -> Result<(), ProtocolMachineError> {
96 if self.sessions.active_count() >= self.config.max_sessions {
97 return Err(ProtocolMachineError::TooManySessions {
98 max: self.config.max_sessions,
99 });
100 }
101 Ok(())
102 }
103
104 fn coroutine_runtime_eligible(&self, coro_id: usize) -> bool {
105 let Some(idx) = self.coro_index(coro_id) else {
106 return false;
107 };
108 let role = &self.coroutines[idx].role;
109 !(self.paused_coro_ids.contains(&coro_id)
110 || self.paused_roles.contains(role)
111 || self.crashed_sites.contains(role)
112 || self.timed_out_coro_ids.contains(&coro_id)
113 || self.timed_out_sites.contains_key(role))
114 }
115
116 fn mark_eligibility_dirty(&mut self) {
117 self.eligibility_dirty = true;
118 }
119
120 fn sync_ready_eligibility_for(&mut self, coro_id: usize) {
121 let eligible = self.sched.is_ready(coro_id) && self.coroutine_runtime_eligible(coro_id);
122 let eligibility = if eligible {
123 crate::scheduler::ReadyEligibility::Eligible
124 } else {
125 crate::scheduler::ReadyEligibility::Ineligible
126 };
127 self.sched.set_ready_eligibility(coro_id, eligibility);
128 #[cfg(debug_assertions)]
129 {
130 if eligible {
131 self.eligible_ready.insert(coro_id);
132 } else {
133 self.eligible_ready.remove(&coro_id);
134 }
135 }
136 }
137
138 fn refresh_ready_eligibility(&mut self) {
139 self.sched.clear_ready_eligibility();
140 #[cfg(debug_assertions)]
141 self.eligible_ready.clear();
142 for coro_id in self.sched.ready_set_snapshot() {
143 let eligible = self.coroutine_runtime_eligible(coro_id);
144 let eligibility = if eligible {
145 crate::scheduler::ReadyEligibility::Eligible
146 } else {
147 crate::scheduler::ReadyEligibility::Ineligible
148 };
149 self.sched.set_ready_eligibility(coro_id, eligibility);
150 #[cfg(debug_assertions)]
151 if eligible {
152 self.eligible_ready.insert(coro_id);
153 }
154 }
155 self.eligibility_dirty = false;
156 }
157
158 fn ensure_ready_eligibility(&mut self) {
159 if self.eligibility_dirty {
160 self.refresh_ready_eligibility();
161 }
162 }
163
164 #[cfg(debug_assertions)]
165 fn debug_assert_ready_eligibility_consistent(&self) {
166 for coro_id in &self.eligible_ready {
167 debug_assert!(self.sched.is_ready(*coro_id));
168 debug_assert!(self.coroutine_runtime_eligible(*coro_id));
169 }
170 }
171
172 fn sync_communication_consumption_mode(&mut self) {
173 self.communication_consumption
174 .set_mode(self.config.communication_replay_mode);
175 }
176
177 fn allocate_send_sequence(&mut self, edge: &Edge) -> u64 {
178 self.sync_communication_consumption_mode();
179 self.communication_consumption.allocate_send_sequence(edge)
180 }
181
182 fn consume_receive_identity(
183 &mut self,
184 identity: CommunicationIdentity,
185 ) -> Result<CommunicationConsumeResult, CommunicationReplayError> {
186 self.sync_communication_consumption_mode();
187 let result = self.communication_consumption.consume_receive(&identity)?;
188 self.communication_consumption_artifacts.push(
189 CommunicationConsumptionArtifact {
190 tick: self.clock.tick,
191 identity,
192 mode: result.mode,
193 pre_root: result.pre_root,
194 post_root: result.post_root,
195 },
196 &self.config.observability_retention,
197 );
198 Ok(result)
199 }
200
201 fn session_open_plan(&mut self, image: &CodeImage) -> &crate::session::SessionOpenPlan {
202 let key = format!("{image:p}");
203 self.session_open_plans.entry(key).or_insert_with(|| {
204 crate::session::SessionOpenPlan::new(&image.roles(), &image.local_types)
205 })
206 }
207
208 fn open_choreography_session(
209 &mut self,
210 plan: &crate::session::SessionOpenPlan,
211 ) -> (SessionId, Vec<String>) {
212 let sid = self.sessions.next_session_id();
213 let roles = plan.roles().to_vec();
214 self.sessions
215 .open_with_sid_from_plan(sid, plan, &self.config.buffer_config);
216 (sid, roles)
217 }
218
219 fn finalize_open_choreography_session(
220 &mut self,
221 sid: SessionId,
222 roles: &[String],
223 plan: &crate::session::SessionOpenPlan,
224 ) -> Result<(), ProtocolMachineError> {
225 self.next_session_id = self.sessions.next_session_id();
226 self.bind_default_handlers_for_session(sid);
227 self.intern_load_plan_symbols(plan, sid);
228 self.monitor.set_kind(sid, SessionKind::Peer);
229 self.resource_states.entry(sid).or_default();
230 self.apply_open_delta(sid)
231 .map_err(ProtocolMachineError::PersistenceError)?;
232 self.obs_trace.push(
233 ObsEvent::Opened {
234 tick: self.clock.tick,
235 session: sid,
236 roles: roles.to_vec(),
237 },
238 &self.config.observability_retention,
239 );
240 Ok(())
241 }
242
243 fn spawn_coroutine_for_role(
244 &mut self,
245 image: &CodeImage,
246 sid: SessionId,
247 role: &str,
248 ) -> Result<(), ProtocolMachineError> {
249 if self.coroutines.len() >= self.config.max_coroutines {
250 return Err(ProtocolMachineError::TooManyCoroutines {
251 max: self.config.max_coroutines,
252 });
253 }
254
255 let program_id = self
256 .programs
257 .intern(image.programs.get(role).cloned().unwrap_or_default());
258 if self.code.is_none() {
259 let program = self
260 .programs
261 .get(program_id)
262 .expect("interned program must exist")
263 .clone();
264 self.code = Some(program);
265 }
266
267 let coro_id = self.next_coro_id;
268 self.next_coro_id += 1;
269
270 let endpoint = Endpoint {
271 sid,
272 role: role.to_string(),
273 };
274 self.role_coroutines
275 .entry(role.to_string())
276 .or_default()
277 .push(coro_id);
278 if self.paused_roles.contains(role) {
279 self.paused_coro_ids.insert(coro_id);
280 }
281 if self.timed_out_sites.contains_key(role) {
282 self.timed_out_coro_ids.insert(coro_id);
283 }
284 let mut coro = Coroutine::new(
285 coro_id,
286 program_id,
287 sid,
288 role.to_string(),
289 self.config.num_registers,
290 self.config.initial_cost_budget,
291 );
292 coro.owned_endpoints.push(endpoint.clone());
293 if !coro.regs.is_empty() {
294 coro.regs[0] = Value::Endpoint(endpoint);
295 }
296 self.sched.add_ready(coro_id);
297 self.coroutines.push(coro);
298 self.coro_slots.insert(coro_id, self.coroutines.len() - 1);
299 self.sync_ready_eligibility_for(coro_id);
300 Ok(())
301 }
302
303 fn spawn_session_coroutines(
304 &mut self,
305 image: &CodeImage,
306 sid: SessionId,
307 roles: &[String],
308 ) -> Result<(), ProtocolMachineError> {
309 for role in roles {
310 self.spawn_coroutine_for_role(image, sid, role)?;
311 }
312 Ok(())
313 }
314
315 #[doc(hidden)]
325 pub fn load_choreography(&mut self, image: &CodeImage) -> Result<SessionId, ProtocolMachineError> {
326 self.ensure_session_capacity()?;
327 image
328 .validate_runtime_shape()
329 .map_err(|reason| ProtocolMachineError::InvalidCodeImage { reason })?;
330 let plan = self.session_open_plan(image).clone();
331 let (sid, roles) = self.open_choreography_session(&plan);
332 self.finalize_open_choreography_session(sid, &roles, &plan)?;
333 self.programs.reserve(image.programs.len());
334 self.coroutines.reserve(roles.len());
335 self.spawn_session_coroutines(image, sid, &roles)?;
336 Ok(sid)
337 }
338}