#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProtocolMachineMemoryUsage {
pub session_store: SessionStoreMemoryUsage,
pub coroutine_records: usize,
pub terminal_coroutines: usize,
pub program_count: usize,
pub program_instruction_count: usize,
pub obs_events: usize,
pub effect_trace_entries: usize,
pub delegation_audits: usize,
pub authority_audits: usize,
pub communication_artifacts: usize,
pub output_condition_checks: usize,
pub retained_bytes: ProtocolMachineRetainedBytes,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProtocolMachineRetainedBytes {
pub session_store: usize,
pub coroutines: usize,
pub programs: usize,
pub resource_states: usize,
pub traces: usize,
pub replay: usize,
pub output_condition_checks: usize,
pub scheduler_and_control: usize,
pub symbols: usize,
pub guard_layer: usize,
pub monitor: usize,
pub arena: usize,
pub total: usize,
}
fn serialized_byte_size<T: Serialize>(value: &T) -> usize {
crate::serialization::binary_size(value)
}
impl ProtocolMachine {
fn intern_load_plan_symbols(&mut self, plan: &crate::session::SessionOpenPlan, sid: SessionId) {
for role in plan.roles() {
let _: StringId = self.role_symbols.intern(role);
}
let _: StringId = self
.handler_symbols
.intern(crate::session::DEFAULT_HANDLER_ID);
let edge_handlers: Vec<_> = self
.sessions
.get(sid)
.map(|session| session.edge_handlers.keys().cloned().collect())
.unwrap_or_default();
for edge in edge_handlers {
let _: EdgeId = self.intern_edge(&edge);
}
}
#[must_use]
pub fn new(config: ProtocolMachineConfig) -> Self {
Self::new_with_models(config)
}
fn bind_default_handlers_for_session(&mut self, sid: SessionId) {
self.sessions
.set_default_handler_for_session(sid, crate::session::DEFAULT_HANDLER_ID.to_string());
self.handler_symbols
.intern(crate::session::DEFAULT_HANDLER_ID);
}
fn ensure_session_capacity(&self) -> Result<(), ProtocolMachineError> {
if self.sessions.active_count() >= self.config.max_sessions {
return Err(ProtocolMachineError::TooManySessions {
max: self.config.max_sessions,
});
}
Ok(())
}
fn coroutine_runtime_eligible(&self, coro_id: usize) -> bool {
let Some(idx) = self.coro_index(coro_id) else {
return false;
};
let role = &self.coroutines[idx].role;
!(self.paused_coro_ids.contains(&coro_id)
|| self.paused_roles.contains(role)
|| self.crashed_sites.contains(role)
|| self.timed_out_coro_ids.contains(&coro_id)
|| self.timed_out_sites.contains_key(role))
}
fn mark_eligibility_dirty(&mut self) {
self.eligibility_dirty = true;
}
fn sync_ready_eligibility_for(&mut self, coro_id: usize) {
let eligible = self.sched.is_ready(coro_id) && self.coroutine_runtime_eligible(coro_id);
let eligibility = if eligible {
crate::scheduler::ReadyEligibility::Eligible
} else {
crate::scheduler::ReadyEligibility::Ineligible
};
self.sched.set_ready_eligibility(coro_id, eligibility);
#[cfg(debug_assertions)]
{
if eligible {
self.eligible_ready.insert(coro_id);
} else {
self.eligible_ready.remove(&coro_id);
}
}
}
fn refresh_ready_eligibility(&mut self) {
self.sched.clear_ready_eligibility();
#[cfg(debug_assertions)]
self.eligible_ready.clear();
for coro_id in self.sched.ready_set_snapshot() {
let eligible = self.coroutine_runtime_eligible(coro_id);
let eligibility = if eligible {
crate::scheduler::ReadyEligibility::Eligible
} else {
crate::scheduler::ReadyEligibility::Ineligible
};
self.sched.set_ready_eligibility(coro_id, eligibility);
#[cfg(debug_assertions)]
if eligible {
self.eligible_ready.insert(coro_id);
}
}
self.eligibility_dirty = false;
}
fn ensure_ready_eligibility(&mut self) {
if self.eligibility_dirty {
self.refresh_ready_eligibility();
}
}
#[cfg(debug_assertions)]
fn debug_assert_ready_eligibility_consistent(&self) {
for coro_id in &self.eligible_ready {
debug_assert!(self.sched.is_ready(*coro_id));
debug_assert!(self.coroutine_runtime_eligible(*coro_id));
}
}
fn sync_communication_consumption_mode(&mut self) {
self.communication_consumption
.set_mode(self.config.communication_replay_mode);
}
fn allocate_send_sequence(&mut self, edge: &Edge) -> u64 {
self.sync_communication_consumption_mode();
self.communication_consumption.allocate_send_sequence(edge)
}
fn consume_receive_identity(
&mut self,
identity: CommunicationIdentity,
) -> Result<CommunicationConsumeResult, CommunicationReplayError> {
self.sync_communication_consumption_mode();
let result = self.communication_consumption.consume_receive(&identity)?;
self.communication_consumption_artifacts.push(
CommunicationConsumptionArtifact {
tick: self.clock.tick,
identity,
mode: result.mode,
pre_root: result.pre_root,
post_root: result.post_root,
},
&self.config.observability_retention,
);
Ok(result)
}
fn session_open_plan(&mut self, image: &CodeImage) -> &crate::session::SessionOpenPlan {
let key = format!("{image:p}");
self.session_open_plans.entry(key).or_insert_with(|| {
crate::session::SessionOpenPlan::new(&image.roles(), &image.local_types)
})
}
fn open_choreography_session(
&mut self,
plan: &crate::session::SessionOpenPlan,
) -> (SessionId, Vec<String>) {
let sid = self.sessions.next_session_id();
let roles = plan.roles().to_vec();
self.sessions
.open_with_sid_from_plan(sid, plan, &self.config.buffer_config);
(sid, roles)
}
fn finalize_open_choreography_session(
&mut self,
sid: SessionId,
roles: &[String],
plan: &crate::session::SessionOpenPlan,
) -> Result<(), ProtocolMachineError> {
self.next_session_id = self.sessions.next_session_id();
self.bind_default_handlers_for_session(sid);
self.intern_load_plan_symbols(plan, sid);
self.monitor.set_kind(sid, SessionKind::Peer);
self.resource_states.entry(sid).or_default();
self.apply_open_delta(sid)
.map_err(ProtocolMachineError::PersistenceError)?;
self.obs_trace.push(
ObsEvent::Opened {
tick: self.clock.tick,
session: sid,
roles: roles.to_vec(),
},
&self.config.observability_retention,
);
Ok(())
}
fn spawn_coroutine_for_role(
&mut self,
image: &CodeImage,
sid: SessionId,
role: &str,
) -> Result<(), ProtocolMachineError> {
if self.coroutines.len() >= self.config.max_coroutines {
return Err(ProtocolMachineError::TooManyCoroutines {
max: self.config.max_coroutines,
});
}
let program_id = self
.programs
.intern(image.programs.get(role).cloned().unwrap_or_default());
if self.code.is_none() {
let program = self
.programs
.get(program_id)
.expect("interned program must exist")
.clone();
self.code = Some(program);
}
let coro_id = self.next_coro_id;
self.next_coro_id += 1;
let endpoint = Endpoint {
sid,
role: role.to_string(),
};
self.role_coroutines
.entry(role.to_string())
.or_default()
.push(coro_id);
if self.paused_roles.contains(role) {
self.paused_coro_ids.insert(coro_id);
}
if self.timed_out_sites.contains_key(role) {
self.timed_out_coro_ids.insert(coro_id);
}
let mut coro = Coroutine::new(
coro_id,
program_id,
sid,
role.to_string(),
self.config.num_registers,
self.config.initial_cost_budget,
);
coro.owned_endpoints.push(endpoint.clone());
if !coro.regs.is_empty() {
coro.regs[0] = Value::Endpoint(endpoint);
}
self.sched.add_ready(coro_id);
self.coroutines.push(coro);
self.coro_slots.insert(coro_id, self.coroutines.len() - 1);
self.sync_ready_eligibility_for(coro_id);
Ok(())
}
fn spawn_session_coroutines(
&mut self,
image: &CodeImage,
sid: SessionId,
roles: &[String],
) -> Result<(), ProtocolMachineError> {
for role in roles {
self.spawn_coroutine_for_role(image, sid, role)?;
}
Ok(())
}
#[doc(hidden)]
pub fn load_choreography(&mut self, image: &CodeImage) -> Result<SessionId, ProtocolMachineError> {
self.ensure_session_capacity()?;
image
.validate_runtime_shape()
.map_err(|reason| ProtocolMachineError::InvalidCodeImage { reason })?;
let plan = self.session_open_plan(image).clone();
let (sid, roles) = self.open_choreography_session(&plan);
self.finalize_open_choreography_session(sid, &roles, &plan)?;
self.programs.reserve(image.programs.len());
self.coroutines.reserve(roles.len());
self.spawn_session_coroutines(image, sid, &roles)?;
Ok(sid)
}
}