pub type LaneId = usize;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LaneSelection {
pub tick: u64,
pub wave: u64,
pub coro_id: usize,
pub session: SessionId,
pub lane: LaneId,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LaneHandoff {
pub handoff_id: u64,
pub tick: u64,
pub session: SessionId,
pub endpoint_role: String,
pub from_coro: usize,
pub to_coro: usize,
pub from_lane: LaneId,
pub to_lane: LaneId,
pub receipt: DelegationReceipt,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LaneSchedulerState {
pub lane_count: usize,
pub lane_queues: BTreeMap<LaneId, Vec<usize>>,
pub blocked: BTreeMap<usize, BlockReason>,
pub step_count: usize,
}
#[allow(dead_code)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct WaveCertificate {
pub waves: Vec<Vec<usize>>,
pub planner_step: usize,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct ContentionMetrics {
pub lock_contention_events: u64,
pub mutex_lock_contention_events: u64,
pub planner_conflict_events: u64,
pub max_ready_queue_depth: usize,
pub max_wave_width: usize,
pub cross_lane_transfer_count: u64,
pub handoff_applied_count: u64,
}
impl ContentionMetrics {
fn observe_ready_depth(&mut self, depth: usize) {
self.max_ready_queue_depth = self.max_ready_queue_depth.max(depth);
}
fn observe_wave_width(&mut self, width: usize) {
self.max_wave_width = self.max_wave_width.max(width);
}
}
pub struct ThreadedProtocolMachine {
config: ProtocolMachineConfig,
programs: ProgramStore,
coroutines: Vec<Arc<Mutex<Coroutine>>>,
sessions: ThreadedSessionStore,
scheduler: Scheduler,
trace: Vec<ObsEvent>,
role_symbols: SymbolTable,
label_symbols: SymbolTable,
handler_symbols: SymbolTable,
edge_symbols: EdgeSymbolTable,
clock: SimClock,
next_coro_id: usize,
non_terminal_coroutines: usize,
pool: ThreadPool,
workers: usize,
lane_count: usize,
guard_resources: Arc<Mutex<BTreeMap<String, Value>>>,
resource_states: Arc<Mutex<BTreeMap<SessionId, ResourceState>>>,
communication_consumption: Arc<Mutex<DefaultCommunicationConsumption>>,
communication_consumption_artifacts: Arc<Mutex<Vec<CommunicationConsumptionArtifact>>>,
effect_trace: Vec<EffectTraceEntry>,
effect_exchanges: Vec<EffectExchangeRecord>,
operation_instances: Vec<OperationInstance>,
outstanding_effects: Vec<OutstandingEffect>,
progress_contracts: Vec<ProgressContract>,
progress_transitions: Vec<ProgressTransition>,
next_effect_id: u64,
output_condition_checks: Vec<OutputConditionCheck>,
crashed_sites: BTreeSet<String>,
paused_roles: BTreeSet<String>,
partitioned_edges: BTreeSet<(String, String)>,
corrupted_edges: BTreeMap<(String, String), CorruptionType>,
timed_out_sites: BTreeMap<String, u64>,
lane_trace: Vec<LaneSelection>,
pending_handoffs: VecDeque<LaneHandoff>,
handoff_trace_log: Vec<LaneHandoff>,
next_handoff_id: u64,
delegation_audit_log: Vec<DelegationAuditRecord>,
next_delegation_receipt_id: u64,
contention_metrics: ContentionMetrics,
force_invalid_wave_certificate_once: bool,
handler_identity_anchor: Option<String>,
}
impl KernelMachine for ThreadedProtocolMachine {
fn kernel_step_round(
&mut self,
handler: &dyn EffectHandler,
n: usize,
) -> Result<StepResult, ProtocolMachineError> {
ThreadedProtocolMachine::kernel_step_round(self, handler, n)
}
}
#[allow(dead_code)]
#[derive(Debug, Default)]
pub(crate) struct SessionLock {
locks: BTreeMap<SessionId, Mutex<()>>,
}
#[allow(dead_code)]
impl SessionLock {
#[must_use]
pub(crate) fn new() -> Self {
Self::default()
}
pub(crate) fn ensure(&mut self, sid: SessionId) {
self.locks.entry(sid).or_insert_with(|| Mutex::new(()));
}
}
#[derive(Debug, Default)]
struct ThreadedSessionStore {
sessions: RwLock<BTreeMap<SessionId, Arc<Mutex<SessionState>>>>,
next_id: AtomicUsize,
}
impl ThreadedSessionStore {
fn new() -> Self {
Self::default()
}
#[allow(clippy::needless_pass_by_value)]
fn open(
&self,
roles: Vec<String>,
buffer_config: &BufferConfig,
initial_types: &BTreeMap<String, LocalTypeR>,
) -> SessionId {
let plan = SessionOpenPlan::new(&roles, initial_types);
self.open_from_plan(&plan, buffer_config)
}
fn open_from_plan(&self, plan: &SessionOpenPlan, buffer_config: &BufferConfig) -> SessionId {
let sid = self.next_id.fetch_add(1, Ordering::SeqCst);
let state = SessionState::from_open_plan(sid, plan, buffer_config);
let mut sessions = self.sessions.write().expect("threaded ProtocolMachine lock poisoned");
sessions.insert(sid, Arc::new(Mutex::new(state)));
sid
}
fn get(&self, sid: SessionId) -> Option<Arc<Mutex<SessionState>>> {
self.sessions
.read()
.expect("threaded ProtocolMachine lock poisoned")
.get(&sid)
.cloned()
}
fn active_count(&self) -> usize {
let sessions = self.sessions.read().expect("threaded ProtocolMachine lock poisoned");
sessions
.values()
.filter(|session| {
session.lock().expect("threaded ProtocolMachine lock poisoned").status == SessionStatus::Active
})
.count()
}
fn claim_ownership(
&self,
sid: SessionId,
owner_id: impl Into<String>,
scope: OwnershipScope,
) -> Result<OwnershipCapability, OwnershipError> {
let session = self
.get(sid)
.ok_or(OwnershipError::SessionNotFound { session_id: sid })?;
let mut session = session.lock().expect("threaded ProtocolMachine lock poisoned");
if let Some(reason) = session.ownership().terminal_reason.clone() {
return Err(OwnershipError::Terminal {
session_id: sid,
reason,
});
}
if let Some(current) = session.ownership().current.as_ref() {
return Err(OwnershipError::AlreadyClaimed {
session_id: sid,
current_owner_id: current.owner_id.clone(),
});
}
if let Some(pending) = session.ownership().pending_transfer.as_ref() {
return Err(OwnershipError::TransferPending {
session_id: sid,
claim_id: pending.receipt.claim_id,
});
}
let capability = OwnershipCapability {
session_id: sid,
owner_id: owner_id.into(),
generation: 0,
scope,
};
session.ownership_mut().current = Some(capability.clone());
Ok(capability)
}
}
struct Picked {
coro_id: usize,
sid: SessionId,
lane: LaneId,
coro: Arc<Mutex<Coroutine>>,
session: Arc<Mutex<SessionState>>,
}
struct PlannedWave {
picks: Vec<Picked>,
stop_after_wave: bool,
}
fn write_coro_reg(coro: &mut Coroutine, reg: u16, value: Value) -> Result<(), Fault> {
let slot = coro
.regs
.get_mut(usize::from(reg))
.ok_or(Fault::OutOfRegisters)?;
*slot = value;
Ok(())
}
enum CoroUpdate {
AdvancePc,
SetPc(PC),
Block(BlockReason),
Halt,
AdvancePcWriteReg { reg: u16, val: Value },
AdvancePcSpawnChild { target: PC, args: Vec<u16> },
}
enum TypeUpdate {
Advance(LocalTypeR),
AdvanceWithOriginal(LocalTypeR, LocalTypeR),
Remove,
}
fn resolve_type_update(
cont: &LocalTypeR,
original: &LocalTypeR,
ep: &Endpoint,
) -> (LocalTypeR, Option<(Endpoint, TypeUpdate)>) {
let (resolved, new_scope) = unfold_if_var_with_scope(cont, original);
let update = if let Some(mu) = new_scope {
Some((
ep.clone(),
TypeUpdate::AdvanceWithOriginal(resolved.clone(), mu),
))
} else {
Some((ep.clone(), TypeUpdate::Advance(resolved.clone())))
};
(resolved, update)
}
fn coro_has_progress(coros: &[Arc<Mutex<Coroutine>>], coro_id: usize) -> bool {
coros
.get(coro_id)
.map(|coro| {
!coro
.lock()
.expect("threaded ProtocolMachine lock poisoned")
.progress_tokens
.is_empty()
})
.unwrap_or(false)
}
struct StepPack {
coro_update: CoroUpdate,
type_update: Option<(Endpoint, TypeUpdate)>,
events: Vec<ObsEvent>,
}
enum ExecOutcome {
Continue,
Blocked(BlockReason),
Halted,
}
struct WavePlannerState {
used_sessions: BTreeSet<SessionId>,
used_lanes: BTreeSet<usize>,
used_footprints: BTreeSet<(SessionId, u64)>,
conflict_events: u64,
lane_count: usize,
allow_same_session_disjoint: bool,
}
impl WavePlannerState {
fn new(lane_count: usize, allow_same_session_disjoint: bool) -> Self {
Self {
used_sessions: BTreeSet::new(),
used_lanes: BTreeSet::new(),
used_footprints: BTreeSet::new(),
conflict_events: 0,
lane_count,
allow_same_session_disjoint,
}
}
fn note_conflict(&mut self) {
self.conflict_events = self.conflict_events.saturating_add(1);
}
}