1use std::collections::BTreeMap;
9
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use telltale_types::de_bruijn::LocalTypeRDB;
13use thiserror::Error;
14
15use crate::coroutine::{BlockReason, CoroStatus, Coroutine};
16use crate::output_condition::OutputConditionCheck;
17use crate::scheduler::Scheduler;
18use crate::session::{SessionState, SessionStatus, SessionStore};
19use crate::{
20 protocol_machine_semantic_objects, semantic_audit_log_v1, DelegationAuditRecord,
21 EffectExchangeRecord, ObsEvent, OperationInstance, OutstandingEffect, ProgressContract,
22 ProgressTransition, ProtocolMachineSemanticObjects, SemanticAuditRecord,
23};
24
25const _: () = assert!(usize::BITS <= u64::BITS);
28
29#[derive(Debug, Error, Clone, PartialEq, Eq)]
31pub enum RefinementSliceError {
32 #[error("refinement slice field '{field}' value {value} exceeds u64")]
34 CountOverflow {
35 field: &'static str,
37 value: usize,
39 },
40}
41
42fn checked_u64(field: &'static str, value: usize) -> Result<u64, RefinementSliceError> {
43 u64::try_from(value).map_err(|_| RefinementSliceError::CountOverflow { field, value })
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48pub struct CoroutineRefinementSlice {
49 pub coro_id: u64,
51 pub session_id: u64,
53 pub pc: u64,
55 pub status: String,
57 pub owned_endpoints: u64,
59 pub progress_tokens: u64,
61}
62
63impl CoroutineRefinementSlice {
64 pub(crate) fn from_coroutine(coro: &Coroutine) -> Result<Self, RefinementSliceError> {
65 Ok(Self {
66 coro_id: checked_u64("coroutine.id", coro.id)?,
67 session_id: checked_u64("coroutine.session_id", coro.session_id)?,
68 pc: checked_u64("coroutine.pc", coro.pc)?,
69 status: coro_status_tag(&coro.status).to_string(),
70 owned_endpoints: checked_u64("coroutine.owned_endpoints", coro.owned_endpoints.len())?,
71 progress_tokens: checked_u64("coroutine.progress_tokens", coro.progress_tokens.len())?,
72 })
73 }
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
78pub struct SessionRefinementSlice {
79 pub sid: u64,
81 pub role_count: u64,
83 pub local_type_entries: u64,
85 pub buffer_edges: u64,
87 pub buffered_messages: u64,
89 pub status: String,
91 pub epoch: u64,
93}
94
95impl SessionRefinementSlice {
96 pub(crate) fn from_session(session: &SessionState) -> Result<Self, RefinementSliceError> {
97 let buffered_messages = session.buffers.values().try_fold(0_u64, |acc, buffer| {
98 Ok::<_, RefinementSliceError>(
99 acc + checked_u64("session.buffered_messages", buffer.len())?,
100 )
101 })?;
102 Ok(Self {
103 sid: checked_u64("session.sid", session.sid)?,
104 role_count: checked_u64("session.role_count", session.roles.len())?,
105 local_type_entries: checked_u64(
106 "session.local_type_entries",
107 session.local_types.len(),
108 )?,
109 buffer_edges: checked_u64("session.buffer_edges", session.buffers.len())?,
110 buffered_messages,
111 status: session_status_tag(&session.status).to_string(),
112 epoch: checked_u64("session.epoch", session.epoch)?,
113 })
114 }
115}
116
117#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
119pub struct SchedulerRefinementSlice {
120 pub ready_queue: Vec<u64>,
122 pub blocked: BTreeMap<u64, String>,
124 pub step_count: u64,
126}
127
128impl SchedulerRefinementSlice {
129 pub(crate) fn from_scheduler(scheduler: &Scheduler) -> Result<Self, RefinementSliceError> {
130 let ready_queue = scheduler
131 .ready_snapshot()
132 .into_iter()
133 .map(|id| checked_u64("scheduler.ready_queue", id))
134 .collect::<Result<Vec<_>, _>>()?;
135 let blocked = scheduler
136 .blocked_snapshot()
137 .into_iter()
138 .map(|(id, reason)| {
139 Ok::<_, RefinementSliceError>((
140 checked_u64("scheduler.blocked", id)?,
141 block_reason_tag(&reason).to_string(),
142 ))
143 })
144 .collect::<Result<BTreeMap<_, _>, _>>()?;
145 Ok(Self {
146 ready_queue,
147 blocked,
148 step_count: checked_u64("scheduler.step_count", scheduler.step_count())?,
149 })
150 }
151}
152
153#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
155pub struct TransitionRefinementSummary {
156 pub selected_coro: Option<u64>,
158 pub selected_pc: Option<u64>,
160 pub selected_type: Option<Value>,
162 pub exec_status: Option<String>,
164 pub session_type_counts: BTreeMap<u64, u64>,
166 pub buffered_message_counts: BTreeMap<u64, u64>,
168 pub ready_queue: Vec<u64>,
170 pub blocked: BTreeMap<u64, String>,
172}
173
174impl TransitionRefinementSummary {
175 pub(crate) fn from_runtime_state(
176 coroutines: &[Coroutine],
177 sessions: &SessionStore,
178 scheduler: &Scheduler,
179 last_sched_step: Option<&crate::SchedStepDebug>,
180 ) -> Result<Self, RefinementSliceError> {
181 let session_slices = sessions
182 .iter()
183 .map(SessionRefinementSlice::from_session)
184 .collect::<Result<Vec<_>, _>>()?;
185 let scheduler_slice = SchedulerRefinementSlice::from_scheduler(scheduler)?;
186 let session_type_counts = session_slices
187 .iter()
188 .map(|session| (session.sid, session.local_type_entries))
189 .collect();
190 let buffered_message_counts = session_slices
191 .iter()
192 .map(|session| (session.sid, session.buffered_messages))
193 .collect();
194 Ok(Self {
195 selected_coro: last_sched_step
196 .map(|step| checked_u64("transition.selected_coro", step.selected_coro))
197 .transpose()?,
198 selected_pc: selected_pc(coroutines, last_sched_step)?,
199 selected_type: selected_type_json(coroutines, sessions, last_sched_step)?,
200 exec_status: last_sched_step
201 .map(|step| sched_exec_status_tag(step.exec_status).to_string()),
202 session_type_counts,
203 buffered_message_counts,
204 ready_queue: scheduler_slice.ready_queue,
205 blocked: scheduler_slice.blocked,
206 })
207 }
208}
209
210#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
212pub struct ClaimedRuntimeCoreBundle {
213 pub state: ProtocolMachineRefinementSlice,
215 pub transition: TransitionRefinementSummary,
217}
218
219impl ClaimedRuntimeCoreBundle {
220 pub(crate) fn from_runtime_state(
221 coroutines: &[Coroutine],
222 sessions: &SessionStore,
223 scheduler: &Scheduler,
224 last_sched_step: Option<&crate::SchedStepDebug>,
225 ) -> Result<Self, RefinementSliceError> {
226 let state = cooperative_refinement_slice(coroutines, sessions, scheduler)?;
227 let transition = TransitionRefinementSummary::from_runtime_state(
228 coroutines,
229 sessions,
230 scheduler,
231 last_sched_step,
232 )?;
233 Ok(Self { state, transition })
234 }
235}
236
237#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
243pub struct RuntimeObservationBundle {
244 pub state: ProtocolMachineRefinementSlice,
246 pub transition: TransitionRefinementSummary,
248 pub semantic_audit: Vec<SemanticAuditRecord>,
250 pub effect_exchanges: Vec<EffectExchangeRecord>,
252 pub output_condition_checks: Vec<OutputConditionCheck>,
254 pub semantic_objects: ProtocolMachineSemanticObjects,
256}
257
258impl RuntimeObservationBundle {
259 #[allow(clippy::too_many_arguments)]
260 pub(crate) fn from_runtime_state(
261 coroutines: &[Coroutine],
262 sessions: &SessionStore,
263 scheduler: &Scheduler,
264 last_sched_step: Option<&crate::SchedStepDebug>,
265 authority_audit_log: &[crate::AuthorityAuditRecord],
266 delegation_audit_log: &[DelegationAuditRecord],
267 operation_instances: &[OperationInstance],
268 obs_trace: &[ObsEvent],
269 outstanding_effects: &[OutstandingEffect],
270 output_condition_checks: &[OutputConditionCheck],
271 progress_contracts: &[ProgressContract],
272 progress_transitions: &[ProgressTransition],
273 effect_exchanges: &[EffectExchangeRecord],
274 ) -> Result<Self, RefinementSliceError> {
275 let core = ClaimedRuntimeCoreBundle::from_runtime_state(
276 coroutines,
277 sessions,
278 scheduler,
279 last_sched_step,
280 )?;
281 let semantic_audit = semantic_audit_log_v1(
282 authority_audit_log,
283 delegation_audit_log,
284 operation_instances,
285 obs_trace,
286 outstanding_effects,
287 progress_contracts,
288 progress_transitions,
289 );
290 let semantic_objects = protocol_machine_semantic_objects(
291 authority_audit_log,
292 delegation_audit_log,
293 operation_instances,
294 outstanding_effects,
295 output_condition_checks,
296 progress_contracts,
297 progress_transitions,
298 );
299 Ok(Self {
300 state: core.state,
301 transition: core.transition,
302 semantic_audit,
303 effect_exchanges: effect_exchanges.to_vec(),
304 output_condition_checks: output_condition_checks.to_vec(),
305 semantic_objects,
306 })
307 }
308}
309
310fn selected_pc(
311 coroutines: &[Coroutine],
312 last_sched_step: Option<&crate::SchedStepDebug>,
313) -> Result<Option<u64>, RefinementSliceError> {
314 let Some(step) = last_sched_step else {
315 return Ok(None);
316 };
317 coroutines
318 .iter()
319 .find(|coro| coro.id == step.selected_coro)
320 .map(|coro| checked_u64("transition.selected_pc", coro.pc))
321 .transpose()
322}
323
324fn selected_type_json(
325 coroutines: &[Coroutine],
326 sessions: &SessionStore,
327 last_sched_step: Option<&crate::SchedStepDebug>,
328) -> Result<Option<Value>, RefinementSliceError> {
329 let Some(step) = last_sched_step else {
330 return Ok(None);
331 };
332 let Some(coro) = coroutines.iter().find(|coro| coro.id == step.selected_coro) else {
333 return Ok(None);
334 };
335 let Some(endpoint) = coro.owned_endpoints.first() else {
336 return Ok(None);
337 };
338 let Some(session) = sessions.get(endpoint.sid) else {
339 return Ok(None);
340 };
341 let Some(entry) = session.local_types.get(endpoint) else {
342 return Ok(None);
343 };
344 Ok(Some(Value::String(runtime_local_type_repr(&entry.current))))
345}
346
347fn runtime_local_type_repr(local_type: &telltale_types::LocalTypeR) -> String {
348 fn render(db: &LocalTypeRDB) -> String {
349 match db {
350 LocalTypeRDB::End => "LocalType.end_".to_string(),
351 LocalTypeRDB::Send { partner, branches } => format!(
352 "LocalType.select {:?} [{}]",
353 partner,
354 branches
355 .iter()
356 .map(|(label, _, cont)| format!("({:?}, {})", label.name, render(cont)))
357 .collect::<Vec<_>>()
358 .join(", ")
359 ),
360 LocalTypeRDB::Recv { partner, branches } => format!(
361 "LocalType.branch {:?} [{}]",
362 partner,
363 branches
364 .iter()
365 .map(|(label, _, cont)| format!("({:?}, {})", label.name, render(cont)))
366 .collect::<Vec<_>>()
367 .join(", ")
368 ),
369 LocalTypeRDB::Rec(body) => format!("LocalType.mu {}", render(body)),
370 LocalTypeRDB::Var(index) => format!("LocalType.var {index}"),
371 }
372 }
373
374 render(&LocalTypeRDB::from(local_type))
375}
376
377#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
379pub struct ProtocolMachineRefinementSlice {
380 pub coroutines: Vec<CoroutineRefinementSlice>,
382 pub sessions: Vec<SessionRefinementSlice>,
384 pub scheduler: SchedulerRefinementSlice,
386}
387
388pub(crate) fn block_reason_tag(reason: &BlockReason) -> &'static str {
389 match reason {
390 BlockReason::Recv { .. } => "recv_wait",
391 BlockReason::Send { .. } => "send_wait",
392 BlockReason::Invoke { .. } => "invoke_wait",
393 BlockReason::AcquireDenied { .. } => "acquire_denied",
394 BlockReason::Consensus { .. } => "consensus_wait",
395 BlockReason::Spawn => "spawn_wait",
396 BlockReason::Close { .. } => "close_wait",
397 }
398}
399
400pub(crate) fn coro_status_tag(status: &CoroStatus) -> &'static str {
401 match status {
402 CoroStatus::Ready => "ready",
403 CoroStatus::Blocked(_) => "blocked",
404 CoroStatus::Done => "done",
405 CoroStatus::Faulted(_) => "faulted",
406 CoroStatus::Speculating => "speculating",
407 }
408}
409
410pub(crate) fn session_status_tag(status: &SessionStatus) -> &'static str {
411 match status {
412 SessionStatus::Active => "active",
413 SessionStatus::Draining => "draining",
414 SessionStatus::Closed => "closed",
415 SessionStatus::Cancelled => "cancelled",
416 SessionStatus::Faulted { .. } => "faulted",
417 }
418}
419
420pub(crate) fn sched_exec_status_tag(status: crate::SchedExecStatus) -> &'static str {
421 match status {
422 crate::SchedExecStatus::Continue => "continue",
423 crate::SchedExecStatus::Yielded => "yielded",
424 crate::SchedExecStatus::Blocked => "blocked",
425 crate::SchedExecStatus::Halted => "halted",
426 crate::SchedExecStatus::Faulted => "faulted",
427 }
428}
429
430pub(crate) fn cooperative_refinement_slice(
431 coroutines: &[Coroutine],
432 sessions: &SessionStore,
433 scheduler: &Scheduler,
434) -> Result<ProtocolMachineRefinementSlice, RefinementSliceError> {
435 let coroutines = coroutines
436 .iter()
437 .map(CoroutineRefinementSlice::from_coroutine)
438 .collect::<Result<Vec<_>, _>>()?;
439 let sessions = sessions
440 .iter()
441 .map(SessionRefinementSlice::from_session)
442 .collect::<Result<Vec<_>, _>>()?;
443 let scheduler = SchedulerRefinementSlice::from_scheduler(scheduler)?;
444 Ok(ProtocolMachineRefinementSlice {
445 coroutines,
446 sessions,
447 scheduler,
448 })
449}
450
451#[cfg(test)]
452mod tests {
453 use super::runtime_local_type_repr;
454 use telltale_types::{Label, LocalTypeR};
455
456 #[test]
457 fn runtime_local_type_repr_erases_payloads_into_lean_shape() {
458 let local = LocalTypeR::Recv {
459 partner: "B".to_string(),
460 branches: vec![(Label::new("pong"), None, LocalTypeR::End)],
461 };
462 assert_eq!(
463 runtime_local_type_repr(&local),
464 r#"LocalType.branch "B" [("pong", LocalType.end_)]"#
465 );
466 }
467
468 #[test]
469 fn runtime_local_type_repr_uses_de_bruijn_recursion_indices() {
470 let local = LocalTypeR::mu(
471 "Loop",
472 LocalTypeR::send("Peer", Label::new("tick"), LocalTypeR::var("Loop")),
473 );
474 assert_eq!(
475 runtime_local_type_repr(&local),
476 r#"LocalType.mu LocalType.select "Peer" [("tick", LocalType.var 0)]"#
477 );
478 }
479}