impl ProtocolMachine {
#[must_use]
pub fn clock(&self) -> &SimClock {
&self.clock
}
#[must_use]
pub fn all_done(&self) -> bool {
self.sched.ready_count() == 0 && self.sched.blocked_count() == 0
}
#[must_use]
pub fn coroutine(&self, id: usize) -> Option<&Coroutine> {
let idx = self.coro_index(id)?;
self.coroutines.get(idx)
}
#[must_use]
pub fn coroutine_program_len(&self, id: usize) -> Option<usize> {
let coro = self.coroutine(id)?;
self.programs
.get(coro.program_id)
.map(|program| program.len())
}
#[must_use]
pub fn unique_program_count(&self) -> usize {
self.programs.len()
}
pub fn coroutine_mut(&mut self, id: usize) -> Option<&mut Coroutine> {
let idx = self.coro_index(id)?;
self.coroutines.get_mut(idx)
}
#[must_use]
pub fn session_coroutines(&self, sid: SessionId) -> Vec<&Coroutine> {
self.coroutines
.iter()
.filter(|c| c.session_id == sid)
.collect()
}
#[must_use]
pub fn sessions(&self) -> &SessionStore {
&self.sessions
}
pub fn validate_post_decode(&self) -> Result<(), String> {
self.config.validate_invariants()?;
if self.coroutines.len() > self.config.max_coroutines {
return Err(format!(
"decoded coroutine count {} exceeds max_coroutines {}",
self.coroutines.len(),
self.config.max_coroutines
));
}
let session_count = self.sessions.iter().count();
if session_count > self.config.max_sessions {
return Err(format!(
"decoded session count {session_count} exceeds max_sessions {}",
self.config.max_sessions
));
}
let mut seen_coro_ids = BTreeSet::new();
for coro in &self.coroutines {
if !seen_coro_ids.insert(coro.id) {
return Err(format!("decoded duplicate coroutine id {}", coro.id));
}
if coro.id >= self.next_coro_id {
return Err(format!(
"decoded coroutine id {} is not below next_coro_id {}",
coro.id, self.next_coro_id
));
}
if self.sessions.get(coro.session_id).is_none() {
return Err(format!(
"decoded coroutine {} references missing session {}",
coro.id, coro.session_id
));
}
}
let mut seen_session_ids = BTreeSet::new();
for session in self.sessions.iter() {
if !seen_session_ids.insert(session.sid) {
return Err(format!("decoded duplicate session id {}", session.sid));
}
if session.sid >= self.next_session_id {
return Err(format!(
"decoded session id {} is not below next_session_id {}",
session.sid, self.next_session_id
));
}
}
Ok(())
}
#[doc(hidden)]
pub fn sessions_mut(&mut self) -> &mut SessionStore {
&mut self.sessions
}
#[cfg(test)]
fn replace_program_for_test(&mut self, program_id: usize, program: Vec<Instr>) {
self.programs.replace_for_test(program_id, program);
}
#[allow(clippy::too_many_lines)]
fn wf_coroutine_state(&self, coro: &Coroutine) -> Result<(), String> {
if self.sessions.get(coro.session_id).is_none() {
return Err(format!(
"coroutine {} references missing session {}",
coro.id, coro.session_id
));
}
let Some(program) = self.programs.get(coro.program_id) else {
return Err(format!("missing program for coroutine {}", coro.id));
};
if coro.pc > program.len() {
return Err(format!("pc out of bounds for coroutine {}", coro.id));
}
if coro.regs.len() != usize::from(self.config.num_registers) {
return Err(format!("register width mismatch for coroutine {}", coro.id));
}
for ep in &coro.owned_endpoints {
let Some(session) = self.sessions.get(ep.sid) else {
return Err(format!(
"owned endpoint missing session {}:{}",
ep.sid, ep.role
));
};
if !session.roles.iter().any(|role| role == &ep.role) {
return Err(format!(
"owned endpoint role not in session {}:{}",
ep.sid, ep.role
));
}
}
for token in &coro.progress_tokens {
let Some(session) = self.sessions.get(token.sid) else {
return Err(format!(
"progress token missing session {} for coroutine {}",
token.sid, coro.id
));
};
if !session
.roles
.iter()
.any(|role| role == &token.endpoint.role)
{
return Err(format!(
"progress token role not in session {}:{}",
token.sid, token.endpoint.role
));
}
}
for fact in &coro.knowledge_set {
let Some(session) = self.sessions.get(fact.endpoint.sid) else {
return Err(format!(
"knowledge fact missing session {}:{}",
fact.endpoint.sid, fact.endpoint.role
));
};
if !session.roles.iter().any(|role| role == &fact.endpoint.role) {
return Err(format!(
"knowledge fact role not in session {}:{}",
fact.endpoint.sid, fact.endpoint.role
));
}
}
Ok(())
}
fn wf_collect_session_sets(
&self,
) -> Result<(BTreeSet<SessionId>, BTreeSet<SessionId>), String> {
let mut active_sids = BTreeSet::new();
let mut monitor_required_sids = BTreeSet::new();
for session in self.sessions.iter() {
active_sids.insert(session.sid);
if !matches!(
session.status,
SessionStatus::Closed | SessionStatus::Cancelled | SessionStatus::Faulted { .. }
) {
monitor_required_sids.insert(session.sid);
}
for ep in session.local_types.keys() {
if ep.sid != session.sid {
return Err(format!("local type sid mismatch for role {}", ep.role));
}
}
for (edge, buffer) in &session.buffers {
if edge.sid != session.sid {
return Err("buffer edge sid mismatch".to_string());
}
if buffer.len() > buffer.capacity() {
return Err("buffer length exceeds capacity".to_string());
}
}
}
Ok((active_sids, monitor_required_sids))
}
fn wf_monitor_state(
&self,
active_sids: &BTreeSet<SessionId>,
monitor_required_sids: &BTreeSet<SessionId>,
) -> Result<(), String> {
for sid in self.monitor.session_kinds.keys() {
if !active_sids.contains(sid) {
return Err(format!("monitor tracks unknown session {sid}"));
}
}
for sid in monitor_required_sids {
if !self.monitor.session_kinds.contains_key(sid) {
return Err(format!("monitor missing kind for active session {sid}"));
}
}
Ok(())
}
#[allow(clippy::too_many_lines)]
pub fn wf_vm_state(&self) -> Result<(), String> {
for coro in &self.coroutines {
self.wf_coroutine_state(coro)?;
}
let (active_sids, monitor_required_sids) = self.wf_collect_session_sets()?;
self.wf_monitor_state(&active_sids, &monitor_required_sids)?;
if !self.arena.check_invariants() {
return Err("arena invariant violation".to_string());
}
Ok(())
}
pub fn inject_message(
&mut self,
sid: SessionId,
from: &str,
to: &str,
value: Value,
) -> Result<EnqueueResult, ProtocolMachineError> {
let session = self
.sessions
.get_mut(sid)
.ok_or(ProtocolMachineError::SessionNotFound(sid))?;
session
.send(from, to, value)
.map_err(|_| ProtocolMachineError::SessionNotFound(sid))
}
#[must_use]
pub fn coroutines(&self) -> &[Coroutine] {
&self.coroutines
}
pub fn pause_role(&mut self, role: &str) {
if !self.paused_roles.insert(role.to_string()) {
return;
}
let coro_ids = self.role_coroutines.get(role).cloned().unwrap_or_default();
for coro_id in coro_ids {
self.paused_coro_ids.insert(coro_id);
self.sync_ready_eligibility_for(coro_id);
}
#[cfg(debug_assertions)]
self.debug_assert_paused_role_index();
}
pub fn resume_role(&mut self, role: &str) {
if !self.paused_roles.remove(role) {
return;
}
let coro_ids = self.role_coroutines.get(role).cloned().unwrap_or_default();
for coro_id in coro_ids {
self.paused_coro_ids.remove(&coro_id);
self.sync_ready_eligibility_for(coro_id);
}
#[cfg(debug_assertions)]
self.debug_assert_paused_role_index();
}
pub fn set_paused_roles(&mut self, roles: &BTreeSet<String>) {
let to_pause: Vec<String> = roles.difference(&self.paused_roles).cloned().collect();
let to_resume: Vec<String> = self.paused_roles.difference(roles).cloned().collect();
for role in to_resume {
self.resume_role(&role);
}
for role in to_pause {
self.pause_role(&role);
}
}
#[must_use]
pub fn paused_roles(&self) -> &BTreeSet<String> {
&self.paused_roles
}
fn coro_index(&self, id: usize) -> Option<usize> {
if let Some(idx) = self.coro_slots.get(&id).copied() {
return Some(idx);
}
if self.coroutines.get(id).is_some_and(|coro| coro.id == id) {
return Some(id);
}
self.coroutines.iter().position(|c| c.id == id)
}
fn rebuild_coroutine_indexes(&mut self) {
self.coro_slots.clear();
self.role_coroutines.clear();
self.paused_coro_ids.clear();
self.timed_out_coro_ids.clear();
for (idx, coro) in self.coroutines.iter().enumerate() {
self.coro_slots.insert(coro.id, idx);
self.role_coroutines
.entry(coro.role.clone())
.or_default()
.push(coro.id);
if self.paused_roles.contains(&coro.role) {
self.paused_coro_ids.insert(coro.id);
}
if self.timed_out_sites.contains_key(&coro.role) {
self.timed_out_coro_ids.insert(coro.id);
}
}
}
#[cfg(debug_assertions)]
fn debug_assert_paused_role_index(&self) {
let expected: BTreeSet<usize> = self
.coroutines
.iter()
.filter(|coro| self.paused_roles.contains(&coro.role))
.map(|coro| coro.id)
.collect();
debug_assert_eq!(self.paused_coro_ids, expected);
}
pub(crate) fn read_reg(&self, coro_idx: usize, reg: u16) -> Result<Value, Fault> {
self.read_reg_checked(coro_idx, reg)
}
pub(crate) fn write_coro_reg(
coro: &mut Coroutine,
reg: u16,
value: Value,
) -> Result<(), Fault> {
let slot = coro
.regs
.get_mut(usize::from(reg))
.ok_or(Fault::OutOfRegisters)?;
*slot = value;
Ok(())
}
fn read_reg_checked(&self, coro_idx: usize, reg: u16) -> Result<Value, Fault> {
self.coroutines[coro_idx]
.regs
.get(usize::from(reg))
.cloned()
.ok_or(Fault::OutOfRegisters)
}
fn endpoint_from_reg(&self, coro_idx: usize, reg: u16) -> Result<Endpoint, Fault> {
decode_endpoint_from_reg(&self.coroutines[coro_idx], reg)
}
fn decode_fact(value: Value) -> Option<(Endpoint, String)> {
decode_endpoint_fact(value)
}
fn validate_payload(
&self,
role: &str,
context: &str,
label: &str,
expected_type: Option<&ValType>,
value: &Value,
strict_requires_annotation: bool,
) -> Result<(), Fault> {
let mode = self.config.payload_validation_mode;
if mode == PayloadValidationMode::Off {
return Ok(());
}
let actual_type = runtime_value_val_type(value);
let payload_bytes = runtime_value_wire_size_bytes(value);
if payload_bytes > self.config.max_payload_bytes {
return Err(Fault::TypeViolation {
expected: expected_type.cloned().unwrap_or_else(|| actual_type.clone()),
actual: actual_type,
message: format!(
"{role}: {context} payload '{label}' exceeds max_payload_bytes={} (actual={payload_bytes})",
self.config.max_payload_bytes
),
});
}
match expected_type {
Some(expected) => {
if runtime_value_matches_val_type(value, expected) {
Ok(())
} else {
Err(Fault::TypeViolation {
expected: expected.clone(),
actual: actual_type,
message: format!(
"{role}: {context} payload '{label}' violated expected type {expected:?}"
),
})
}
}
None
if mode == PayloadValidationMode::StrictSchema && strict_requires_annotation =>
{
Err(Fault::TypeViolation {
expected: ValType::Unit,
actual: actual_type,
message: format!(
"{role}: {context} payload '{label}' requires explicit ValType annotation in strict_schema mode"
),
})
}
None => Ok(()),
}
}
fn expect_recv_type<'a>(
local_type: &'a LocalTypeR,
role: &str,
) -> Result<(&'a str, &'a BranchList), Fault> {
match local_type {
LocalTypeR::Recv {
partner, branches, ..
} => Ok((partner.as_str(), branches)),
other => Err(Fault::TypeViolation {
expected: telltale_types::ValType::Unit,
actual: telltale_types::ValType::Unit,
message: format!("{role}: Choose expects Recv, got {other:?}"),
}),
}
}
fn monitor_precheck(
&mut self,
ep: &Endpoint,
role: &str,
instr: &crate::instr::Instr,
) -> Result<(), Fault> {
if self.config.monitor_mode == MonitorMode::Off {
return Ok(());
}
match instr {
crate::instr::Instr::Send { .. } | crate::instr::Instr::Offer { .. } => {
let local_type =
self.sessions
.lookup_type(ep)
.ok_or_else(|| Fault::TypeViolation {
expected: telltale_types::ValType::Unit,
actual: telltale_types::ValType::Unit,
message: format!("[monitor] {role}: no type registered"),
})?;
if matches!(local_type, LocalTypeR::Send { .. }) {
Ok(())
} else {
Err(Fault::TypeViolation {
expected: telltale_types::ValType::Unit,
actual: telltale_types::ValType::Unit,
message: format!(
"[monitor] {role}: expected Send state, got {local_type:?}"
),
})
}
}
crate::instr::Instr::Receive { .. } => {
let local_type =
self.sessions
.lookup_type(ep)
.ok_or_else(|| Fault::TypeViolation {
expected: telltale_types::ValType::Unit,
actual: telltale_types::ValType::Unit,
message: format!("[monitor] {role}: no type registered"),
})?;
if matches!(local_type, LocalTypeR::Recv { .. }) {
Ok(())
} else {
Err(Fault::TypeViolation {
expected: telltale_types::ValType::Unit,
actual: telltale_types::ValType::Unit,
message: format!(
"[monitor] {role}: expected Recv state, got {local_type:?}"
),
})
}
}
crate::instr::Instr::Choose { table, .. } => {
let mut labels = BTreeSet::new();
if !table
.iter()
.map(|(label, _)| label)
.all(|label| labels.insert(label.clone()))
{
return Err(Fault::Speculation {
message: "[monitor] structural precheck failed: duplicate choose labels"
.to_string(),
});
}
let local_type =
self.sessions
.lookup_type(ep)
.ok_or_else(|| Fault::TypeViolation {
expected: telltale_types::ValType::Unit,
actual: telltale_types::ValType::Unit,
message: format!("[monitor] {role}: no type registered"),
})?;
if matches!(local_type, LocalTypeR::Recv { .. }) {
Ok(())
} else {
Err(Fault::TypeViolation {
expected: telltale_types::ValType::Unit,
actual: telltale_types::ValType::Unit,
message: format!(
"[monitor] {role}: expected Recv state, got {local_type:?}"
),
})
}
}
crate::instr::Instr::Open { roles, dsts, .. } => {
if roles.len() == dsts.len() {
Ok(())
} else {
Err(Fault::Speculation {
message: "[monitor] structural precheck failed: open arity mismatch"
.to_string(),
})
}
}
_ => Ok(()),
}?;
self.monitor
.record(ep, &format!("{instr:?}"), self.clock.tick);
Ok(())
}
}