use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock as StdRwLock};
use chrono::Utc;
use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
use meerkat_core::types::HandlingMode;
use crate::accept::{
AcceptOutcome, AdmissionPlan, AdmissionQueueAction, ExistingQueuedAdmissionAction,
};
use crate::durability::{DurabilityError, validate_durability};
use crate::identifiers::LogicalRuntimeId;
use crate::ingress_types::{
ContentShape, RequestId, ReservationKey, RuntimeInputProjection, RuntimeInputSemantics,
};
use crate::input::Input;
use crate::input_ledger::InputLedger;
use crate::input_state::{
InputAbandonReason, InputLifecycleState, InputState, InputStateHistoryEntry, InputStateSeed,
InputTerminalOutcome, MAX_STAGE_ATTEMPTS, PolicySnapshot, StoredInputState,
};
use crate::meerkat_machine::dsl as mm_dsl;
use crate::policy::PolicyDecision;
use crate::queue::InputQueue;
use crate::runtime_event::{
InputLifecycleEvent, RuntimeEvent, RuntimeEventEnvelope, RuntimeStateChangeEvent,
};
use crate::runtime_state::RuntimeState;
use crate::traits::{RecoveryReport, ResetReport, RetireReport, RuntimeDriverError};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum PostAdmissionSignal {
None,
WakeLoop,
InterruptYielding,
RequestImmediateProcessing,
}
impl PostAdmissionSignal {
pub fn should_wake(self) -> bool {
self >= Self::WakeLoop
}
pub fn should_interrupt_yielding(self) -> bool {
self >= Self::InterruptYielding
}
pub fn should_process_immediately(self) -> bool {
self == Self::RequestImmediateProcessing
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct RuntimeControlProjection {
pub(crate) phase: RuntimeState,
pub(crate) current_run_id: Option<RunId>,
pub(crate) pre_run_phase: Option<RuntimeState>,
}
impl Default for RuntimeControlProjection {
fn default() -> Self {
Self {
phase: RuntimeState::Idle,
current_run_id: None,
pre_run_phase: None,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ReplayQueuedContributorsPlan {
pub queue_work_ids: Vec<InputId>,
pub steer_work_ids: Vec<InputId>,
pub wake_runtime: bool,
pub notice_kind: &'static str,
}
#[derive(Clone)]
pub(crate) struct EphemeralDriverRollbackSnapshot {
control_projection: RuntimeControlProjection,
dsl_state: mm_dsl::MeerkatMachineState,
ledger: InputLedger,
queue: InputQueue,
steer_queue: InputQueue,
events: Vec<RuntimeEventEnvelope>,
post_admission_signal: PostAdmissionSignal,
silent_comms_intents: Vec<String>,
handling_mode: HashMap<InputId, HandlingMode>,
runtime_semantics: HashMap<InputId, RuntimeInputSemantics>,
primitive_projection: HashMap<InputId, RuntimeInputProjection>,
is_prompt_set: std::collections::HashSet<InputId>,
content_shape: HashMap<InputId, ContentShape>,
request_id: HashMap<InputId, Option<RequestId>>,
reservation_key: HashMap<InputId, Option<ReservationKey>>,
policy_snapshot: HashMap<InputId, PolicyDecision>,
admission_order: Vec<InputId>,
}
#[derive(Clone)]
pub struct EphemeralRuntimeDriver {
runtime_id: LogicalRuntimeId,
control: Arc<StdRwLock<RuntimeControlProjection>>,
ledger: InputLedger,
queue: InputQueue,
steer_queue: InputQueue,
events: Vec<RuntimeEventEnvelope>,
post_admission_signal: PostAdmissionSignal,
silent_comms_intents: Vec<String>,
dsl: DslAuthority,
handling_mode: HashMap<InputId, HandlingMode>,
runtime_semantics: HashMap<InputId, RuntimeInputSemantics>,
primitive_projection: HashMap<InputId, RuntimeInputProjection>,
is_prompt_set: std::collections::HashSet<InputId>,
content_shape: HashMap<InputId, ContentShape>,
request_id: HashMap<InputId, Option<RequestId>>,
reservation_key: HashMap<InputId, Option<ReservationKey>>,
policy_snapshot: HashMap<InputId, PolicyDecision>,
admission_order: Vec<InputId>,
}
pub(crate) type SharedIngressDslAuthority = Arc<Mutex<mm_dsl::MeerkatMachineAuthority>>;
struct DslAuthority(SharedIngressDslAuthority);
impl Clone for DslAuthority {
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}
impl DslAuthority {
fn lock(&self) -> std::sync::MutexGuard<'_, mm_dsl::MeerkatMachineAuthority> {
self.0
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
}
pub(crate) fn new_ingress_dsl_authority() -> SharedIngressDslAuthority {
let state = mm_dsl::MeerkatMachineState {
lifecycle_phase: mm_dsl::MeerkatPhase::Idle,
..mm_dsl::MeerkatMachineState::default()
};
Arc::new(Mutex::new(mm_dsl::MeerkatMachineAuthority::from_state(
state,
)))
}
impl EphemeralRuntimeDriver {
fn read_control_projection(&self) -> std::sync::RwLockReadGuard<'_, RuntimeControlProjection> {
match self.control.read() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::error!("runtime control projection lock poisoned");
poisoned.into_inner()
}
}
}
fn write_control_projection(
&self,
) -> std::sync::RwLockWriteGuard<'_, RuntimeControlProjection> {
match self.control.write() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::error!("runtime control projection lock poisoned");
poisoned.into_inner()
}
}
}
pub fn new(runtime_id: LogicalRuntimeId) -> Self {
Self::new_with_control_and_dsl(
runtime_id,
Arc::new(StdRwLock::new(RuntimeControlProjection::default())),
new_ingress_dsl_authority(),
)
}
pub(crate) fn new_with_control(
runtime_id: LogicalRuntimeId,
control: Arc<StdRwLock<RuntimeControlProjection>>,
) -> Self {
Self::new_with_control_and_dsl(runtime_id, control, new_ingress_dsl_authority())
}
pub(crate) fn new_with_control_and_dsl(
runtime_id: LogicalRuntimeId,
control: Arc<StdRwLock<RuntimeControlProjection>>,
dsl: SharedIngressDslAuthority,
) -> Self {
Self {
runtime_id,
control,
ledger: InputLedger::new(),
queue: InputQueue::new(),
steer_queue: InputQueue::new(),
events: Vec::new(),
post_admission_signal: PostAdmissionSignal::None,
silent_comms_intents: Vec::new(),
dsl: DslAuthority(dsl),
handling_mode: HashMap::new(),
runtime_semantics: HashMap::new(),
primitive_projection: HashMap::new(),
is_prompt_set: std::collections::HashSet::new(),
content_shape: HashMap::new(),
request_id: HashMap::new(),
reservation_key: HashMap::new(),
policy_snapshot: HashMap::new(),
admission_order: Vec::new(),
}
}
pub(crate) fn rollback_snapshot(&self) -> EphemeralDriverRollbackSnapshot {
EphemeralDriverRollbackSnapshot {
control_projection: self.read_control_projection().clone(),
dsl_state: self.with_dsl_state(Clone::clone),
ledger: self.ledger.clone(),
queue: self.queue.clone(),
steer_queue: self.steer_queue.clone(),
events: self.events.clone(),
post_admission_signal: self.post_admission_signal,
silent_comms_intents: self.silent_comms_intents.clone(),
handling_mode: self.handling_mode.clone(),
runtime_semantics: self.runtime_semantics.clone(),
primitive_projection: self.primitive_projection.clone(),
is_prompt_set: self.is_prompt_set.clone(),
content_shape: self.content_shape.clone(),
request_id: self.request_id.clone(),
reservation_key: self.reservation_key.clone(),
policy_snapshot: self.policy_snapshot.clone(),
admission_order: self.admission_order.clone(),
}
}
pub(crate) fn restore_rollback_snapshot(&mut self, snapshot: EphemeralDriverRollbackSnapshot) {
{
let mut control = self.write_control_projection();
*control = snapshot.control_projection;
}
{
let mut authority = self.dsl.lock();
authority.state = snapshot.dsl_state;
}
self.ledger = snapshot.ledger;
self.queue = snapshot.queue;
self.steer_queue = snapshot.steer_queue;
self.events = snapshot.events;
self.post_admission_signal = snapshot.post_admission_signal;
self.silent_comms_intents = snapshot.silent_comms_intents;
self.handling_mode = snapshot.handling_mode;
self.runtime_semantics = snapshot.runtime_semantics;
self.primitive_projection = snapshot.primitive_projection;
self.is_prompt_set = snapshot.is_prompt_set;
self.content_shape = snapshot.content_shape;
self.request_id = snapshot.request_id;
self.reservation_key = snapshot.reservation_key;
self.policy_snapshot = snapshot.policy_snapshot;
self.admission_order = snapshot.admission_order;
}
pub(crate) fn shared_dsl_authority(&self) -> SharedIngressDslAuthority {
Arc::clone(&self.dsl.0)
}
fn dsl_apply(
&mut self,
input: mm_dsl::MeerkatMachineInput,
context: &str,
) -> Result<(), RuntimeDriverError> {
let transition = {
let mut authority = self.dsl.lock();
mm_dsl::MeerkatMachineMutator::apply(&mut *authority, input).map_err(|err| {
RuntimeDriverError::Internal(format!("DSL rejected {context}: {err:?}"))
})?
};
self.absorb_dsl_effects(&transition.effects);
Ok(())
}
fn absorb_dsl_effects(&mut self, effects: &[mm_dsl::MeerkatMachineEffect]) {
for effect in effects {
if let mm_dsl::MeerkatMachineEffect::PostAdmissionSignal { signal } = effect {
let new_signal = match signal {
mm_dsl::PostAdmissionSignalKind::WakeLoop => PostAdmissionSignal::WakeLoop,
mm_dsl::PostAdmissionSignalKind::InterruptYielding => {
PostAdmissionSignal::InterruptYielding
}
mm_dsl::PostAdmissionSignalKind::RequestImmediateProcessing => {
PostAdmissionSignal::RequestImmediateProcessing
}
};
if new_signal > self.post_admission_signal {
self.post_admission_signal = new_signal;
}
}
}
}
pub(crate) fn absorb_post_admission_effects(
&mut self,
effects: &[mm_dsl::MeerkatMachineEffect],
) {
self.absorb_dsl_effects(effects);
}
fn dsl_key(input_id: &InputId) -> String {
input_id.to_string()
}
fn with_dsl_state<R>(&self, body: impl FnOnce(&mm_dsl::MeerkatMachineState) -> R) -> R {
let authority = self.dsl.lock();
body(&authority.state)
}
fn with_dsl_state_mut<R>(
&mut self,
body: impl FnOnce(&mut mm_dsl::MeerkatMachineState) -> R,
) -> R {
let mut authority = self.dsl.lock();
body(&mut authority.state)
}
fn dsl_queue_lane(&self) -> Vec<InputId> {
self.lane_in_admission_order(mm_dsl::InputLane::Queue)
}
fn dsl_steer_lane(&self) -> Vec<InputId> {
self.lane_in_admission_order(mm_dsl::InputLane::Steer)
}
fn lane_in_admission_order(&self, lane: mm_dsl::InputLane) -> Vec<InputId> {
let mut candidates: Vec<(u64, InputId)> = self.with_dsl_state(|state| {
self.admission_order
.iter()
.filter(|id| state.input_lane.get(&Self::dsl_key(id)).copied() == Some(lane))
.cloned()
.map(|id| {
let seq = state
.input_admission_seq
.get(&Self::dsl_key(&id))
.copied()
.unwrap_or(u64::MAX);
(seq, id)
})
.collect()
});
candidates.sort_by_key(|(seq, _)| *seq);
candidates.into_iter().map(|(_, id)| id).collect()
}
pub fn input_phase(&self, input_id: &InputId) -> Option<InputLifecycleState> {
let key = Self::dsl_key(input_id);
let phase = self.with_dsl_state(|state| state.input_phases.get(&key).copied())?;
Some(Self::input_phase_to_lifecycle(phase))
}
fn input_phase_to_lifecycle(phase: mm_dsl::InputPhase) -> InputLifecycleState {
match phase {
mm_dsl::InputPhase::Queued => InputLifecycleState::Queued,
mm_dsl::InputPhase::Staged => InputLifecycleState::Staged,
mm_dsl::InputPhase::Applied => InputLifecycleState::Applied,
mm_dsl::InputPhase::AppliedPendingConsumption => {
InputLifecycleState::AppliedPendingConsumption
}
mm_dsl::InputPhase::Consumed => InputLifecycleState::Consumed,
mm_dsl::InputPhase::Superseded => InputLifecycleState::Superseded,
mm_dsl::InputPhase::Coalesced => InputLifecycleState::Coalesced,
mm_dsl::InputPhase::Abandoned => InputLifecycleState::Abandoned,
}
}
pub fn input_last_run_id(&self, input_id: &InputId) -> Option<RunId> {
let key = Self::dsl_key(input_id);
let raw = self.with_dsl_state(|state| state.input_run_associations.get(&key).cloned())?;
raw.parse::<uuid::Uuid>().ok().map(RunId::from_uuid)
}
pub fn input_last_boundary_sequence(&self, input_id: &InputId) -> Option<u64> {
let key = Self::dsl_key(input_id);
self.with_dsl_state(|state| state.input_boundary_sequences.get(&key).copied())
}
pub fn input_terminal_outcome(&self, input_id: &InputId) -> Option<InputTerminalOutcome> {
let key = Self::dsl_key(input_id);
let kind = self.with_dsl_state(|state| state.input_terminal_kind.get(&key).copied())?;
match kind {
mm_dsl::InputTerminalKind::Consumed => Some(InputTerminalOutcome::Consumed),
mm_dsl::InputTerminalKind::Superseded => {
let raw =
self.with_dsl_state(|state| state.input_superseded_by.get(&key).cloned())?;
let id = raw.parse::<uuid::Uuid>().ok().map(InputId::from_uuid)?;
Some(InputTerminalOutcome::Superseded { superseded_by: id })
}
mm_dsl::InputTerminalKind::Coalesced => {
let raw =
self.with_dsl_state(|state| state.input_aggregate_id.get(&key).cloned())?;
let id = raw.parse::<uuid::Uuid>().ok().map(InputId::from_uuid)?;
Some(InputTerminalOutcome::Coalesced { aggregate_id: id })
}
mm_dsl::InputTerminalKind::Abandoned => {
let reason = match self
.with_dsl_state(|state| state.input_abandon_reason.get(&key).copied())?
{
mm_dsl::InputAbandonReason::Retired => InputAbandonReason::Retired,
mm_dsl::InputAbandonReason::Reset => InputAbandonReason::Reset,
mm_dsl::InputAbandonReason::Stopped => InputAbandonReason::Stopped,
mm_dsl::InputAbandonReason::Destroyed => InputAbandonReason::Destroyed,
mm_dsl::InputAbandonReason::Cancelled => InputAbandonReason::Cancelled,
mm_dsl::InputAbandonReason::MaxAttemptsExhausted => {
let attempts = self.with_dsl_state(|state| {
state
.input_abandon_attempt_count
.get(&key)
.copied()
.unwrap_or(0)
}) as u32;
InputAbandonReason::MaxAttemptsExhausted { attempts }
}
};
Some(InputTerminalOutcome::Abandoned { reason })
}
}
}
pub fn input_attempt_count(&self, input_id: &InputId) -> u32 {
let key = Self::dsl_key(input_id);
self.with_dsl_state(|state| state.input_attempt_counts.get(&key).copied().unwrap_or(0))
as u32
}
pub fn admission_order(&self) -> &[InputId] {
&self.admission_order
}
pub fn admitted_policy(&self, input_id: &InputId) -> Option<&PolicyDecision> {
self.policy_snapshot.get(input_id)
}
pub fn admitted_content_shape(&self, input_id: &InputId) -> Option<ContentShape> {
self.content_shape.get(input_id).copied()
}
pub fn admitted_request_id(&self, input_id: &InputId) -> Option<RequestId> {
self.request_id.get(input_id).cloned().flatten()
}
pub fn admitted_reservation_key(&self, input_id: &InputId) -> Option<ReservationKey> {
self.reservation_key.get(input_id).cloned().flatten()
}
pub fn admitted_handling_mode(&self, input_id: &InputId) -> Option<HandlingMode> {
self.handling_mode.get(input_id).copied()
}
pub fn admitted_runtime_semantics(&self, input_id: &InputId) -> Option<RuntimeInputSemantics> {
self.runtime_semantics.get(input_id).copied()
}
pub fn admitted_primitive_projection(
&self,
input_id: &InputId,
) -> Option<RuntimeInputProjection> {
self.primitive_projection.get(input_id).cloned()
}
pub fn admitted_is_prompt(&self, input_id: &InputId) -> bool {
self.is_prompt_set.contains(input_id)
}
pub fn queue_lane(&self) -> Vec<InputId> {
self.dsl_queue_lane()
}
pub fn steer_lane(&self) -> Vec<InputId> {
self.dsl_steer_lane()
}
pub fn ingress_lifecycle(&self, input_id: &InputId) -> Option<InputLifecycleState> {
self.input_phase(input_id)
}
pub(crate) fn control_handle(&self) -> Arc<StdRwLock<RuntimeControlProjection>> {
self.control.clone()
}
fn control_snapshot(&self) -> RuntimeControlProjection {
self.read_control_projection().clone()
}
pub fn set_silent_comms_intents(&mut self, intents: Vec<String>) {
self.silent_comms_intents = intents;
}
pub fn silent_comms_intents(&self) -> Vec<String> {
self.silent_comms_intents.clone()
}
fn build_projection_queue(&self, ids: &[InputId], lane: &str) -> InputQueue {
let mut queue = InputQueue::new();
for input_id in ids {
match self
.ledger
.get(input_id)
.and_then(|state| state.persisted_input.clone())
{
Some(input) => queue.enqueue(input_id.clone(), input),
None => {
tracing::error!(
input_id = ?input_id,
lane,
"ingress queue references input without persisted payload"
);
debug_assert!(
false,
"ingress queue projection missing persisted payload for {input_id:?} in {lane}"
);
}
}
}
queue
}
fn rebuild_queue_projections(&mut self) {
let queue_ids = self.dsl_queue_lane();
let steer_ids = self.dsl_steer_lane();
self.queue = self.build_projection_queue(&queue_ids, "queue");
self.steer_queue = self.build_projection_queue(&steer_ids, "steer_queue");
}
pub(crate) fn rebuild_queue_projections_after_recovery(&mut self) {
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
}
fn debug_assert_queue_projection_alignment(&self) {
debug_assert_eq!(
self.queue.input_ids(),
self.dsl_queue_lane().as_slice(),
"physical queue must match DSL queue lane"
);
debug_assert_eq!(
self.steer_queue.input_ids(),
self.dsl_steer_lane().as_slice(),
"physical steer queue must match DSL steer lane"
);
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn admit_recovered_to_ingress(
&mut self,
work_id: InputId,
content_shape: ContentShape,
handling_mode: HandlingMode,
runtime_semantics: RuntimeInputSemantics,
primitive_projection: RuntimeInputProjection,
is_prompt: bool,
recovered_state: &InputState,
recovered_seed: &InputStateSeed,
policy: PolicyDecision,
request_id: Option<RequestId>,
reservation_key: Option<ReservationKey>,
) -> Result<(), RuntimeDriverError> {
let persisted_input =
recovered_state.persisted_input.as_ref().ok_or_else(|| {
RuntimeDriverError::Internal(format!(
"store corruption: recovered input '{work_id}' has no persisted input; cannot validate recovered runtime semantics"
))
})?;
let expected = RuntimeInputSemantics::from_policy_and_input(&policy, persisted_input);
if runtime_semantics != expected {
return Err(RuntimeDriverError::Internal(format!(
"store corruption: recovered input '{work_id}' has runtime execution semantics stamp that does not match persisted input kind and admission policy; cannot recover with contradictory runtime-stamped execution kind"
)));
}
self.recover_input_lifecycle(
&work_id,
&content_shape,
handling_mode,
runtime_semantics,
primitive_projection,
is_prompt,
recovered_state,
recovered_seed,
&policy,
request_id.as_ref(),
reservation_key.as_ref(),
)
}
pub(crate) fn recover_terminal_input_lifecycle(
&mut self,
work_id: &InputId,
recovered_seed: &InputStateSeed,
) -> Result<(), RuntimeDriverError> {
debug_assert!(
recovered_seed.phase.is_terminal(),
"terminal recovery path must only be used for terminal input phases"
);
self.apply_recovered_lifecycle(work_id, recovered_seed, None)
}
fn lifecycle_to_input_phase(lifecycle: InputLifecycleState) -> mm_dsl::InputPhase {
match lifecycle {
InputLifecycleState::Accepted | InputLifecycleState::Queued => {
mm_dsl::InputPhase::Queued
}
InputLifecycleState::Staged => mm_dsl::InputPhase::Staged,
InputLifecycleState::Applied => mm_dsl::InputPhase::Applied,
InputLifecycleState::AppliedPendingConsumption => {
mm_dsl::InputPhase::AppliedPendingConsumption
}
InputLifecycleState::Consumed => mm_dsl::InputPhase::Consumed,
InputLifecycleState::Superseded => mm_dsl::InputPhase::Superseded,
InputLifecycleState::Coalesced => mm_dsl::InputPhase::Coalesced,
InputLifecycleState::Abandoned => mm_dsl::InputPhase::Abandoned,
}
}
#[allow(clippy::too_many_arguments)]
fn recover_input_lifecycle(
&mut self,
work_id: &InputId,
content_shape: &ContentShape,
handling_mode: HandlingMode,
runtime_semantics: RuntimeInputSemantics,
primitive_projection: RuntimeInputProjection,
is_prompt: bool,
_recovered_state: &InputState,
recovered_seed: &InputStateSeed,
policy: &PolicyDecision,
request_id: Option<&RequestId>,
reservation_key: Option<&ReservationKey>,
) -> Result<(), RuntimeDriverError> {
self.record_admission_metadata(
work_id,
content_shape,
handling_mode,
runtime_semantics,
primitive_projection,
is_prompt,
policy,
request_id,
reservation_key,
);
self.apply_recovered_lifecycle(work_id, recovered_seed, Some(handling_mode))
}
fn apply_recovered_lifecycle(
&mut self,
work_id: &InputId,
recovered_seed: &InputStateSeed,
handling_mode: Option<HandlingMode>,
) -> Result<(), RuntimeDriverError> {
let key = Self::dsl_key(work_id);
let lifecycle_state = recovered_seed.phase;
let (terminal_kind, superseded_by, aggregate_id, abandon_reason, abandon_attempt_count) =
match recovered_seed.terminal_outcome.clone() {
Some(InputTerminalOutcome::Consumed) => (
Some(mm_dsl::InputTerminalKind::Consumed),
None,
None,
None,
0,
),
Some(InputTerminalOutcome::Superseded { superseded_by }) => (
Some(mm_dsl::InputTerminalKind::Superseded),
Some(superseded_by.to_string()),
None,
None,
0,
),
Some(InputTerminalOutcome::Coalesced { aggregate_id }) => (
Some(mm_dsl::InputTerminalKind::Coalesced),
None,
Some(aggregate_id.to_string()),
None,
0,
),
Some(InputTerminalOutcome::Abandoned { reason }) => (
Some(mm_dsl::InputTerminalKind::Abandoned),
None,
None,
Some(mm_dsl::InputAbandonReason::from(&reason)),
u64::from(recovered_seed.attempt_count),
),
None => (None, None, None, None, 0),
};
let lane = matches!(lifecycle_state, InputLifecycleState::Queued)
.then(|| handling_mode.map(mm_dsl::InputLane::from))
.flatten();
self.dsl_apply(
mm_dsl::MeerkatMachineInput::RecoverInputLifecycle {
input_id: key,
phase: Self::lifecycle_to_input_phase(lifecycle_state),
terminal_kind,
superseded_by,
aggregate_id,
abandon_reason,
abandon_attempt_count,
attempt_count: u64::from(recovered_seed.attempt_count),
run_id: recovered_seed
.last_run_id
.as_ref()
.map(std::string::ToString::to_string),
boundary_sequence: recovered_seed.last_boundary_sequence,
lane,
},
"RecoverInputLifecycle",
)
}
#[allow(clippy::too_many_arguments)]
fn record_admission_metadata(
&mut self,
work_id: &InputId,
content_shape: &ContentShape,
handling_mode: HandlingMode,
runtime_semantics: RuntimeInputSemantics,
primitive_projection: RuntimeInputProjection,
is_prompt: bool,
policy: &PolicyDecision,
request_id: Option<&RequestId>,
reservation_key: Option<&ReservationKey>,
) {
if !self.admission_order.contains(work_id) {
self.admission_order.push(work_id.clone());
}
self.content_shape.insert(work_id.clone(), *content_shape);
self.handling_mode.insert(work_id.clone(), handling_mode);
self.runtime_semantics
.insert(work_id.clone(), runtime_semantics);
if let Some(state) = self.ledger.get_mut(work_id) {
state.runtime_semantics = Some(runtime_semantics);
}
self.primitive_projection
.insert(work_id.clone(), primitive_projection);
if is_prompt {
self.is_prompt_set.insert(work_id.clone());
} else {
self.is_prompt_set.remove(work_id);
}
self.request_id.insert(work_id.clone(), request_id.cloned());
self.reservation_key
.insert(work_id.clone(), reservation_key.cloned());
self.policy_snapshot.insert(work_id.clone(), policy.clone());
}
#[allow(clippy::too_many_arguments)]
fn apply_persist_and_queue(
&mut self,
input_id: &InputId,
input: &Input,
content_shape: &ContentShape,
handling_mode: HandlingMode,
runtime_semantics: RuntimeInputSemantics,
primitive_projection: RuntimeInputProjection,
is_prompt: bool,
policy: &PolicyDecision,
queue_action: AdmissionQueueAction,
existing_action: Option<&ExistingQueuedAdmissionAction>,
) -> Result<(), RuntimeDriverError> {
self.record_admission_metadata(
input_id,
content_shape,
handling_mode,
runtime_semantics,
primitive_projection,
is_prompt,
policy,
None,
None,
);
let now = Utc::now();
if let Some(s) = self.ledger.get_mut(input_id) {
s.persisted_input = Some(input.clone());
s.history.push(InputStateHistoryEntry {
timestamp: now,
from: InputLifecycleState::Accepted,
to: InputLifecycleState::Queued,
reason: Some("QueueAccepted".into()),
});
s.updated_at = now;
}
let admission_lane = mm_dsl::InputLane::from(handling_mode);
let admission_key = Self::dsl_key(input_id);
let (admission_input, admission_label) = match admission_lane {
mm_dsl::InputLane::Queue => (
mm_dsl::MeerkatMachineInput::QueueAccepted {
input_id: admission_key,
},
"QueueAccepted",
),
mm_dsl::InputLane::Steer => (
mm_dsl::MeerkatMachineInput::SteerAccepted {
input_id: admission_key,
},
"SteerAccepted",
),
};
self.dsl_apply(admission_input, admission_label)?;
if let Some(action) = existing_action {
match action {
ExistingQueuedAdmissionAction::Coalesce { existing_id } => {
let existing_key = Self::dsl_key(existing_id);
let aggregate_key = Self::dsl_key(input_id);
let from_phase = self
.input_phase(existing_id)
.unwrap_or(InputLifecycleState::Queued);
self.dsl_apply(
mm_dsl::MeerkatMachineInput::CoalesceInput {
input_id: existing_key,
aggregate_id: aggregate_key,
},
"CoalesceInput",
)?;
let _ = self.queue.remove(existing_id);
let _ = self.steer_queue.remove(existing_id);
if let Some(existing_state) = self.ledger.get_mut(existing_id) {
crate::coalescing::apply_coalescing(
existing_state,
from_phase,
input_id.clone(),
);
}
}
ExistingQueuedAdmissionAction::Supersede { existing_id } => {
let existing_key = Self::dsl_key(existing_id);
let superseded_by = Self::dsl_key(input_id);
let from_phase = self
.input_phase(existing_id)
.unwrap_or(InputLifecycleState::Queued);
self.dsl_apply(
mm_dsl::MeerkatMachineInput::SupersedeInput {
input_id: existing_key,
superseded_by,
},
"SupersedeInput",
)?;
let _ = self.queue.remove(existing_id);
let _ = self.steer_queue.remove(existing_id);
if let Some(existing_state) = self.ledger.get_mut(existing_id) {
crate::coalescing::apply_supersession(
existing_state,
from_phase,
input_id.clone(),
);
}
}
}
}
match queue_action {
AdmissionQueueAction::None => {}
AdmissionQueueAction::EnqueueTo { target } => {
match target {
HandlingMode::Queue => self.queue.enqueue(input_id.clone(), input.clone()),
HandlingMode::Steer => {
self.steer_queue.enqueue(input_id.clone(), input.clone());
}
}
let target_lane = mm_dsl::InputLane::from(target);
if target_lane != admission_lane {
self.dsl_apply(
mm_dsl::MeerkatMachineInput::ChangeLane {
input_id: Self::dsl_key(input_id),
new_lane: target_lane,
},
"ChangeLane",
)?;
}
}
AdmissionQueueAction::EnqueueFront { target } => {
match target {
HandlingMode::Queue => {
self.queue.enqueue_front(input_id.clone(), input.clone());
}
HandlingMode::Steer => {
self.steer_queue
.enqueue_front(input_id.clone(), input.clone());
}
}
let key = Self::dsl_key(input_id);
self.with_dsl_state_mut(|state| {
let min_seq = state
.input_admission_seq
.values()
.min()
.copied()
.unwrap_or(0);
state
.input_admission_seq
.insert(key.clone(), min_seq.saturating_sub(1));
});
let target_lane = mm_dsl::InputLane::from(target);
if target_lane != admission_lane {
self.dsl_apply(
mm_dsl::MeerkatMachineInput::ChangeLane {
input_id: key,
new_lane: target_lane,
},
"ChangeLane",
)?;
}
}
}
self.emit_event(RuntimeEvent::InputLifecycle(InputLifecycleEvent::Queued {
input_id: input_id.clone(),
}));
Ok(())
}
pub fn is_idle(&self) -> bool {
self.runtime_phase_snapshot() == RuntimeState::Idle
}
pub fn is_idle_or_attached(&self) -> bool {
self.runtime_phase_snapshot().is_idle_or_attached()
}
pub fn phase(&self) -> RuntimeState {
self.runtime_phase_snapshot()
}
fn runtime_phase_snapshot(&self) -> RuntimeState {
let authority = self.shared_dsl_authority();
let authority = authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
crate::meerkat_machine::dsl_authority::runtime_phase_from_authority(&authority)
}
pub fn current_run_id(&self) -> Option<RunId> {
let authority = self.shared_dsl_authority();
let authority = authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
crate::meerkat_machine::dsl_authority::current_run_id_from_authority(&authority)
}
pub fn pre_run_phase(&self) -> Option<RuntimeState> {
let authority = self.shared_dsl_authority();
let authority = authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
crate::meerkat_machine::dsl_authority::pre_run_phase_from_authority(&authority)
}
pub fn can_process_queue(&self) -> bool {
self.runtime_phase_snapshot().can_process_queue()
}
fn contract_session_authority_id(&self) -> mm_dsl::SessionId {
mm_dsl::SessionId::from(self.runtime_id.to_string())
}
fn ensure_contract_session_authority(
&mut self,
) -> Result<mm_dsl::SessionId, RuntimeDriverError> {
let existing = {
let authority = self.shared_dsl_authority();
let authority = authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
authority.state.session_id.clone()
};
if let Some(session_id) = existing {
return Ok(session_id);
}
let session_id = self.contract_session_authority_id();
self.dsl_apply(
mm_dsl::MeerkatMachineInput::RegisterSession {
session_id: session_id.clone(),
},
"ContractRegisterSession",
)?;
self.sync_control_projection_from_dsl_authority();
Ok(session_id)
}
#[doc(hidden)]
pub fn contract_begin_run_authority(
&mut self,
run_id: RunId,
) -> Result<(), RuntimeDriverError> {
let from = self.runtime_phase_snapshot();
if from == RuntimeState::Running && self.current_run_id().as_ref() == Some(&run_id) {
return Ok(());
}
if self.current_run_id().is_some() {
return Err(RuntimeDriverError::Internal(
crate::runtime_state::RuntimeStateTransitionError {
from,
to: RuntimeState::Running,
}
.to_string(),
));
}
crate::runtime_state::run_start_pre_phase_from_phase(from)
.map_err(|err| RuntimeDriverError::Internal(err.to_string()))?;
let session_id = self.ensure_contract_session_authority()?;
if from == RuntimeState::Retired {
let authority = self.shared_dsl_authority();
let mut authority = authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
authority
.apply_signal(mm_dsl::MeerkatMachineSignal::DrainQueuedRun {
run_id: mm_dsl::RunId::from_domain(&run_id),
})
.map(|_| ())
.map_err(|err| {
RuntimeDriverError::Internal(crate::meerkat_machine::dsl_authority::map_error(
err,
"ContractDrainQueuedRun",
))
})?;
} else {
self.dsl_apply(
mm_dsl::MeerkatMachineInput::Prepare {
session_id,
run_id: mm_dsl::RunId::from_domain(&run_id),
},
"ContractPrepareRun",
)?;
}
self.sync_control_projection_from_dsl_authority();
Ok(())
}
fn set_phase(&mut self, next_phase: RuntimeState) -> RuntimeState {
let mut control = self.write_control_projection();
let from_phase = control.phase;
control.phase = next_phase;
from_phase
}
fn transition_phase(&mut self, next_phase: RuntimeState) {
let from_phase = self.set_phase(next_phase);
self.emit_event(RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
from: from_phase,
to: next_phase,
}));
}
pub(crate) fn set_control_projection(
&mut self,
next_phase: RuntimeState,
current_run_id: Option<RunId>,
pre_run_phase: Option<RuntimeState>,
) {
if self.control_snapshot().phase == next_phase {
self.write_control_projection().phase = next_phase;
} else {
self.transition_phase(next_phase);
}
let mut control = self.write_control_projection();
control.current_run_id = current_run_id;
control.pre_run_phase = pre_run_phase;
}
pub(crate) fn sync_control_projection_from_dsl_authority(&mut self) {
let (phase, current_run_id, pre_run_phase) = {
let authority = self.shared_dsl_authority();
let authority = authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
(
crate::meerkat_machine::dsl_authority::runtime_phase_from_authority(&authority),
crate::meerkat_machine::dsl_authority::current_run_id_from_authority(&authority),
crate::meerkat_machine::dsl_authority::pre_run_phase_from_authority(&authority),
)
};
self.set_control_projection(phase, current_run_id, pre_run_phase);
}
#[cfg(test)]
#[doc(hidden)]
pub(crate) fn contract_force_runtime_authority(
&mut self,
next_phase: RuntimeState,
current_run_id: Option<RunId>,
pre_run_phase: Option<RuntimeState>,
) {
{
let authority = self.shared_dsl_authority();
let mut authority = authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
authority.state.lifecycle_phase =
crate::meerkat_machine::dsl_authority::project_phase(next_phase);
authority.state.current_run_id = current_run_id
.as_ref()
.map(crate::meerkat_machine::dsl::RunId::from_domain);
authority.state.pre_run_phase = pre_run_phase
.and_then(crate::meerkat_machine::dsl_authority::pre_run_phase_from_runtime_state);
}
self.set_control_projection(next_phase, current_run_id, pre_run_phase);
}
pub(crate) fn apply_runtime_executor_exited_authority(
&mut self,
) -> Result<(), RuntimeDriverError> {
self.dsl_apply(
mm_dsl::MeerkatMachineInput::RuntimeExecutorExited,
"RuntimeExecutorExited",
)
}
pub fn take_post_admission_signal(&mut self) -> PostAdmissionSignal {
std::mem::replace(&mut self.post_admission_signal, PostAdmissionSignal::None)
}
pub fn post_admission_signal(&self) -> PostAdmissionSignal {
self.post_admission_signal
}
pub fn take_wake_requested(&mut self) -> bool {
self.post_admission_signal.should_wake()
}
pub fn take_process_requested(&mut self) -> bool {
let signal = std::mem::replace(&mut self.post_admission_signal, PostAdmissionSignal::None);
signal.should_process_immediately()
}
pub fn drain_events(&mut self) -> Vec<RuntimeEventEnvelope> {
std::mem::take(&mut self.events)
}
pub fn queue(&self) -> &InputQueue {
&self.queue
}
pub fn steer_queue(&self) -> &InputQueue {
&self.steer_queue
}
#[cfg(test)]
pub fn queue_mut(&mut self) -> &mut InputQueue {
&mut self.queue
}
#[cfg(test)]
pub fn steer_queue_mut(&mut self) -> &mut InputQueue {
&mut self.steer_queue
}
#[cfg(test)]
pub(crate) fn clear_admitted_runtime_semantics_for_test(&mut self, input_id: &InputId) {
self.runtime_semantics.remove(input_id);
if let Some(state) = self.ledger.get_mut(input_id) {
state.runtime_semantics = None;
}
}
pub fn has_queued_input(&self, input_id: &InputId) -> bool {
let key = Self::dsl_key(input_id);
self.with_dsl_state(|state| state.input_lane.contains_key(&key))
}
pub fn has_queued_input_outside(&self, excluded: &[InputId]) -> bool {
let excluded_keys: std::collections::HashSet<String> =
excluded.iter().map(Self::dsl_key).collect();
self.with_dsl_state(|state| {
state
.input_lane
.keys()
.any(|queued_key| !excluded_keys.contains(queued_key))
})
}
pub(crate) fn defer_queued_inputs_behind_backlog(&mut self, input_ids: &[InputId]) {
let keys: Vec<String> = input_ids.iter().map(Self::dsl_key).collect();
self.with_dsl_state_mut(|state| {
let mut next_seq = state
.input_admission_seq
.values()
.max()
.copied()
.unwrap_or(0)
.saturating_add(1);
for key in &keys {
if state.input_lane.contains_key(key) {
state.input_admission_seq.insert(key.clone(), next_seq);
next_seq = next_seq.saturating_add(1);
}
}
state.next_admission_seq = state.next_admission_seq.max(next_seq);
});
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
}
fn existing_superseded_input(
&self,
input: &Input,
) -> Option<(InputId, crate::coalescing::CoalescingResult)> {
let mut candidates = self.queue.input_ids();
candidates.extend(self.steer_queue.input_ids());
candidates.extend(self.dsl_queue_lane());
candidates.extend(self.dsl_steer_lane());
let mut seen = Vec::<InputId>::new();
candidates.retain(|id| {
if seen.contains(id) {
false
} else {
seen.push(id.clone());
true
}
});
candidates.into_iter().find_map(|queued_id| {
let existing = self.ledger.get(&queued_id)?.persisted_input.as_ref()?;
let result = crate::coalescing::check_supersession(input, existing, &self.runtime_id);
match result {
crate::coalescing::CoalescingResult::Supersedes { .. } => Some((queued_id, result)),
crate::coalescing::CoalescingResult::Standalone => None,
}
})
}
pub fn ledger(&self) -> &InputLedger {
&self.ledger
}
pub fn runtime_id(&self) -> &LogicalRuntimeId {
&self.runtime_id
}
pub(crate) fn ledger_mut(&mut self) -> &mut InputLedger {
&mut self.ledger
}
pub fn stored_input_state(&self, input_id: &InputId) -> Option<StoredInputState> {
let mut state = self.ledger.get(input_id)?.clone();
if state.runtime_semantics.is_none() {
state.runtime_semantics = self.admitted_runtime_semantics(input_id);
}
let phase = self
.input_phase(input_id)
.unwrap_or(InputLifecycleState::Accepted);
let seed = InputStateSeed {
phase,
last_run_id: self.input_last_run_id(input_id),
last_boundary_sequence: self.input_last_boundary_sequence(input_id),
terminal_outcome: self.input_terminal_outcome(input_id),
attempt_count: self.input_attempt_count(input_id),
};
Some(StoredInputState { state, seed })
}
pub fn stored_input_states_snapshot(&self) -> Vec<StoredInputState> {
self.ledger
.iter()
.map(|(input_id, state)| {
let mut state = state.clone();
if state.runtime_semantics.is_none() {
state.runtime_semantics = self.admitted_runtime_semantics(input_id);
}
let phase = self
.input_phase(input_id)
.unwrap_or(InputLifecycleState::Accepted);
let seed = InputStateSeed {
phase,
last_run_id: self.input_last_run_id(input_id),
last_boundary_sequence: self.input_last_boundary_sequence(input_id),
terminal_outcome: self.input_terminal_outcome(input_id),
attempt_count: self.input_attempt_count(input_id),
};
StoredInputState { state, seed }
})
.collect()
}
pub fn clear_queue_projections(&mut self) {
self.queue = InputQueue::new();
self.steer_queue = InputQueue::new();
}
pub fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
let queued = self
.steer_queue
.dequeue()
.or_else(|| self.queue.dequeue())?;
Some((queued.input_id, queued.input))
}
pub fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, Input)> {
self.steer_queue
.dequeue_by_id(input_id)
.or_else(|| self.queue.dequeue_by_id(input_id))
}
pub fn stage_input(
&mut self,
input_id: &InputId,
run_id: &RunId,
) -> Result<(), RuntimeDriverError> {
self.stage_batch(std::slice::from_ref(input_id), run_id)
}
pub fn stage_batch(
&mut self,
input_ids: &[InputId],
run_id: &RunId,
) -> Result<(), RuntimeDriverError> {
self.machine_realize_stage_batch(input_ids, run_id)
}
pub(crate) fn machine_realize_stage_batch(
&mut self,
input_ids: &[InputId],
run_id: &RunId,
) -> Result<(), RuntimeDriverError> {
for input_id in input_ids {
let key = Self::dsl_key(input_id);
let from_phase = self
.input_phase(input_id)
.unwrap_or(InputLifecycleState::Queued);
self.dsl_apply(
mm_dsl::MeerkatMachineInput::StageForRun {
input_id: key,
run_id: run_id.to_string(),
},
"StageForRun",
)?;
self.dsl_apply(
mm_dsl::MeerkatMachineInput::IncrementAttemptCount {
input_id: Self::dsl_key(input_id),
},
"IncrementAttemptCount",
)?;
let now = Utc::now();
if let Some(state) = self.ledger.get_mut(input_id) {
state.history.push(InputStateHistoryEntry {
timestamp: now,
from: from_phase,
to: InputLifecycleState::Staged,
reason: Some(format!("StageForRun({run_id})")),
});
state.attempt_count = state.attempt_count.saturating_add(1);
state.updated_at = now;
}
self.emit_event(RuntimeEvent::InputLifecycle(InputLifecycleEvent::Staged {
input_id: input_id.clone(),
run_id: run_id.clone(),
}));
}
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
Ok(())
}
pub fn apply_input(
&mut self,
input_id: &InputId,
run_id: &RunId,
) -> Result<(), RuntimeDriverError> {
let key = Self::dsl_key(input_id);
let from_phase = self
.input_phase(input_id)
.unwrap_or(InputLifecycleState::Staged);
self.dsl_apply(
mm_dsl::MeerkatMachineInput::MarkApplied {
input_id: key.clone(),
},
"MarkApplied",
)?;
self.dsl_apply(
mm_dsl::MeerkatMachineInput::MarkAppliedPendingConsumption { input_id: key },
"MarkAppliedPendingConsumption",
)?;
let now = Utc::now();
if let Some(state) = self.ledger.get_mut(input_id) {
state.history.push(InputStateHistoryEntry {
timestamp: now,
from: from_phase,
to: InputLifecycleState::Applied,
reason: Some(format!("MarkApplied({run_id})")),
});
state.history.push(InputStateHistoryEntry {
timestamp: now,
from: InputLifecycleState::Applied,
to: InputLifecycleState::AppliedPendingConsumption,
reason: Some("MarkAppliedPendingConsumption(boundary_sequence=0)".into()),
});
state.updated_at = now;
}
self.emit_event(RuntimeEvent::InputLifecycle(InputLifecycleEvent::Applied {
input_id: input_id.clone(),
run_id: run_id.clone(),
}));
Ok(())
}
pub(crate) fn consume_inputs(
&mut self,
input_ids: &[InputId],
run_id: &RunId,
) -> Result<(), RuntimeDriverError> {
for input_id in input_ids {
let phase = self.input_phase(input_id);
if phase != Some(InputLifecycleState::AppliedPendingConsumption) {
continue;
}
let from_phase = phase.unwrap_or(InputLifecycleState::AppliedPendingConsumption);
let key = Self::dsl_key(input_id);
self.dsl_apply(
mm_dsl::MeerkatMachineInput::ConsumeInput { input_id: key },
"ConsumeInput",
)?;
let now = Utc::now();
if let Some(state) = self.ledger.get_mut(input_id) {
state.history.push(InputStateHistoryEntry {
timestamp: now,
from: from_phase,
to: InputLifecycleState::Consumed,
reason: Some("Consume".into()),
});
state.terminal_outcome = Some(InputTerminalOutcome::Consumed);
state.updated_at = now;
}
self.events
.push(self.make_envelope(RuntimeEvent::InputLifecycle(
InputLifecycleEvent::Consumed {
input_id: input_id.clone(),
run_id: run_id.clone(),
},
)));
}
Ok(())
}
pub(crate) fn next_live_boundary_context_sequence(&self, run_id: &RunId) -> u64 {
self.ledger
.iter()
.filter_map(|(input_id, _)| {
(self.input_last_run_id(input_id).as_ref() == Some(run_id))
.then(|| self.input_last_boundary_sequence(input_id))
.flatten()
})
.max()
.unwrap_or(0)
.saturating_add(1)
}
pub(crate) fn machine_realize_live_boundary_context_injected(
&mut self,
run_id: &RunId,
input_ids: &[InputId],
receipt: &RunBoundaryReceipt,
) -> Result<(), RuntimeDriverError> {
let checkpoint = self.rollback_snapshot();
if let Err(err) = self
.machine_realize_stage_batch(input_ids, run_id)
.and_then(|()| self.machine_realize_boundary_applied(run_id, receipt))
.and_then(|()| self.machine_realize_run_completed(run_id, input_ids))
{
self.restore_rollback_snapshot(checkpoint);
return Err(err);
}
Ok(())
}
pub fn rollback_staged(&mut self, input_ids: &[InputId]) -> Result<(), RuntimeDriverError> {
for input_id in input_ids {
if self.input_phase(input_id) != Some(InputLifecycleState::Staged) {
continue;
}
let Some(_state) = self.ledger.get(input_id) else {
continue;
};
let attempts = self.input_attempt_count(input_id);
let key = Self::dsl_key(input_id);
if attempts >= MAX_STAGE_ATTEMPTS {
let attempts_count = u64::from(attempts);
self.dsl_apply(
mm_dsl::MeerkatMachineInput::AbandonInput {
input_id: key.clone(),
reason: mm_dsl::InputAbandonReason::MaxAttemptsExhausted,
attempt_count: attempts_count,
},
"AbandonInput(MaxAttemptsExhausted)",
)?;
let now = Utc::now();
tracing::warn!(
input_id = %input_id,
attempts,
"input abandoned after max stage attempts"
);
let outcome = InputTerminalOutcome::Abandoned {
reason: InputAbandonReason::MaxAttemptsExhausted { attempts },
};
if let Some(state) = self.ledger.get_mut(input_id) {
state.history.push(InputStateHistoryEntry {
timestamp: now,
from: InputLifecycleState::Staged,
to: InputLifecycleState::Abandoned,
reason: Some(format!("RollbackStaged→Abandon(attempts={attempts})")),
});
state.terminal_outcome = Some(outcome.clone());
state.updated_at = now;
}
self.events.push(
self.make_envelope(RuntimeEvent::InputLifecycle(
InputLifecycleEvent::Abandoned {
input_id: input_id.clone(),
reason: mm_dsl::InputAbandonReason::MaxAttemptsExhausted
.as_str()
.to_string(),
},
)),
);
} else {
let lane = self
.handling_mode
.get(input_id)
.copied()
.map(mm_dsl::InputLane::from)
.unwrap_or(mm_dsl::InputLane::Queue);
self.dsl_apply(
mm_dsl::MeerkatMachineInput::RollbackStaged {
input_id: key,
lane,
},
"RollbackStaged",
)?;
let now = Utc::now();
if let Some(state) = self.ledger.get_mut(input_id) {
state.history.push(InputStateHistoryEntry {
timestamp: now,
from: InputLifecycleState::Staged,
to: InputLifecycleState::Queued,
reason: Some("RollbackStaged".into()),
});
state.updated_at = now;
}
}
}
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
Ok(())
}
pub(crate) fn finalize_retire(&mut self) -> RetireReport {
let inputs_pending_drain = self
.ledger
.iter()
.filter(|(id, _)| {
self.input_phase(id)
.map(|phase| !phase.is_terminal())
.unwrap_or(false)
})
.count();
RetireReport {
inputs_abandoned: 0,
inputs_pending_drain,
}
}
pub(crate) fn reset_cleanup(&mut self) -> ResetReport {
let abandoned = self.abandon_all_non_terminal(InputAbandonReason::Reset);
self.queue.drain();
self.steer_queue.drain();
self.post_admission_signal = PostAdmissionSignal::None;
self.silent_comms_intents.clear();
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
ResetReport {
inputs_abandoned: abandoned,
}
}
pub(crate) fn destroy_cleanup(&mut self) -> usize {
let abandoned = self.abandon_all_non_terminal(InputAbandonReason::Destroyed);
self.queue.drain();
self.steer_queue.drain();
self.post_admission_signal = PostAdmissionSignal::None;
self.silent_comms_intents.clear();
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
abandoned
}
pub(crate) fn stop_runtime_cleanup(&mut self) {
self.abandon_all_non_terminal(InputAbandonReason::Stopped);
self.silent_comms_intents.clear();
self.queue.drain();
self.steer_queue.drain();
}
pub(crate) fn finalize_stop_runtime(&mut self) {
self.stop_runtime_cleanup();
}
pub fn recover_ephemeral(&mut self) -> Result<RecoveryReport, RuntimeDriverError> {
crate::meerkat_machine::machine_recover_ephemeral_driver(self)
}
pub(crate) fn recycle_preserving_work(&mut self) -> Result<usize, RuntimeDriverError> {
let transferred = self
.ledger
.iter()
.filter(|(id, _)| {
self.input_phase(id)
.map(|phase| !phase.is_terminal())
.unwrap_or(false)
})
.count();
let runtime_id = self.runtime_id.clone();
let silent_comms_intents = self.silent_comms_intents.clone();
let ledger = self.ledger.clone();
let preserved_dsl = self.dsl.clone();
let preserved_admission_order = std::mem::take(&mut self.admission_order);
let preserved_handling_mode = std::mem::take(&mut self.handling_mode);
let preserved_is_prompt = std::mem::take(&mut self.is_prompt_set);
let preserved_content_shape = std::mem::take(&mut self.content_shape);
let preserved_request_id = std::mem::take(&mut self.request_id);
let preserved_reservation_key = std::mem::take(&mut self.reservation_key);
let preserved_policy_snapshot = std::mem::take(&mut self.policy_snapshot);
let control = self.control.clone();
*self = Self::new_with_control(runtime_id, control);
self.silent_comms_intents = silent_comms_intents;
self.ledger = ledger;
self.dsl = preserved_dsl;
self.admission_order = preserved_admission_order;
self.handling_mode = preserved_handling_mode;
self.is_prompt_set = preserved_is_prompt;
self.content_shape = preserved_content_shape;
self.request_id = preserved_request_id;
self.reservation_key = preserved_reservation_key;
self.policy_snapshot = preserved_policy_snapshot;
self.recover_ephemeral()?;
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
Ok(transferred)
}
fn emit_event(&mut self, event: RuntimeEvent) {
self.events.push(self.make_envelope(event));
}
fn make_envelope(&self, event: RuntimeEvent) -> RuntimeEventEnvelope {
RuntimeEventEnvelope {
id: crate::identifiers::RuntimeEventId::new(),
timestamp: chrono::Utc::now(),
runtime_id: self.runtime_id.clone(),
event,
causation_id: None,
correlation_id: None,
}
}
pub(crate) fn resolve_admission_for_runtime_idle(
&self,
input: &Input,
runtime_idle: bool,
) -> crate::accept::ResolvedAdmission {
let existing_superseded_id = self.existing_superseded_input(input).map(|(id, _)| id);
crate::accept::resolve_admission(
input,
runtime_idle,
&self.silent_comms_intents,
existing_superseded_id,
)
}
pub(crate) fn resolve_admission(&self, input: &Input) -> crate::accept::ResolvedAdmission {
let runtime_idle = self.runtime_phase_snapshot().is_idle_or_attached();
self.resolve_admission_for_runtime_idle(input, runtime_idle)
}
pub(crate) async fn accept_resolved_input(
&mut self,
input: Input,
resolved: crate::accept::ResolvedAdmission,
) -> Result<AcceptOutcome, RuntimeDriverError> {
let runtime_phase = self.runtime_phase_snapshot();
match runtime_phase {
RuntimeState::Retired | RuntimeState::Stopped => {
return Err(RuntimeDriverError::NotReady {
state: runtime_phase,
});
}
RuntimeState::Destroyed => return Err(RuntimeDriverError::Destroyed),
RuntimeState::Initializing
| RuntimeState::Idle
| RuntimeState::Attached
| RuntimeState::Running => {}
}
if let Err(e) = validate_durability(&input) {
match e {
DurabilityError::DerivedForbidden { .. }
| DurabilityError::ExternalDerivedForbidden => {
return Ok(AcceptOutcome::Rejected {
reason: crate::accept::RejectReason::DurabilityViolation {
detail: e.to_string(),
},
});
}
}
}
if let Err(e) = crate::peer_handling_mode::validate_peer_handling_mode(&input) {
return Ok(AcceptOutcome::Rejected {
reason: crate::accept::RejectReason::PeerHandlingModeInvalid {
detail: e.to_string(),
},
});
}
if let Err(e) = crate::input::validate_peer_response_terminal_fact(&input) {
return Ok(AcceptOutcome::Rejected {
reason: crate::accept::RejectReason::PeerResponseTerminalInvalid {
detail: e.to_string(),
},
});
}
let input_id = input.id().clone();
let mut state = InputState::new_accepted(input_id.clone());
state.durability = Some(input.header().durability);
state.idempotency_key = input.header().idempotency_key.clone();
let active_idempotency_only = matches!(
&input,
Input::Continuation(continuation) if continuation.reason == "workgraph_attention"
);
if let Some(ref key) = input.header().idempotency_key {
let existing_id = if active_idempotency_only {
self.ledger
.accept_with_active_idempotency(state.clone(), key.clone())
} else {
self.ledger
.accept_with_idempotency(state.clone(), key.clone())
};
if let Some(existing_id) = existing_id {
tracing::debug!(
work_id = ?input_id,
existing_id = ?existing_id,
"input deduplicated"
);
self.emit_event(RuntimeEvent::InputLifecycle(
InputLifecycleEvent::Deduplicated {
input_id: input_id.clone(),
existing_id: existing_id.clone(),
},
));
return Ok(AcceptOutcome::Deduplicated {
input_id,
existing_id,
});
}
} else {
self.ledger.accept(state.clone());
}
if let Some(s) = self.ledger.get_mut(&input_id) {
s.policy = Some(PolicySnapshot {
version: resolved.policy.policy_version,
decision: resolved.policy.clone(),
});
}
self.emit_event(RuntimeEvent::InputLifecycle(
InputLifecycleEvent::Accepted {
input_id: input_id.clone(),
},
));
let runtime_idle = runtime_phase.is_idle_or_attached();
let handling_mode = resolved.handling_mode;
let content_shape = ContentShape::from_kind(input.kind());
let is_prompt = matches!(input, Input::Prompt(_));
match resolved.admission_plan {
AdmissionPlan::ConsumedOnAccept => {
self.record_admission_metadata(
&input_id,
&content_shape,
handling_mode,
resolved.runtime_semantics,
resolved.primitive_projection.clone(),
is_prompt,
&resolved.policy,
None,
None,
);
self.dsl_apply(
mm_dsl::MeerkatMachineInput::QueueAccepted {
input_id: Self::dsl_key(&input_id),
},
"QueueAccepted(consumed_on_accept)",
)?;
self.dsl_apply(
mm_dsl::MeerkatMachineInput::ConsumeOnAccept {
input_id: Self::dsl_key(&input_id),
},
"ConsumeOnAccept",
)?;
let now = Utc::now();
if let Some(s) = self.ledger.get_mut(&input_id) {
s.history.push(InputStateHistoryEntry {
timestamp: now,
from: InputLifecycleState::Accepted,
to: InputLifecycleState::Consumed,
reason: Some("ConsumeOnAccept (Ignore+OnAccept)".into()),
});
s.terminal_outcome = Some(InputTerminalOutcome::Consumed);
s.updated_at = now;
}
tracing::debug!(work_id = ?input_id, "input consumed on accept");
}
AdmissionPlan::Queued {
persist_and_queue,
queue_action,
existing_action,
} => {
if persist_and_queue {
self.apply_persist_and_queue(
&input_id,
&input,
&content_shape,
handling_mode,
resolved.runtime_semantics,
resolved.primitive_projection,
is_prompt,
&resolved.policy,
queue_action,
existing_action.as_ref(),
)?;
}
}
}
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
if !runtime_idle
&& matches!(resolved.policy.wake_mode, crate::WakeMode::WakeIfIdle)
&& PostAdmissionSignal::WakeLoop > self.post_admission_signal
{
self.post_admission_signal = PostAdmissionSignal::WakeLoop;
}
let final_state = self.ledger.get(&input_id).cloned().unwrap_or(state);
Ok(AcceptOutcome::Accepted {
input_id,
policy: resolved.policy,
state: final_state,
})
}
pub fn abandon_all_non_terminal(&mut self, reason: InputAbandonReason) -> usize {
let non_terminal_ids: Vec<InputId> = self
.ledger
.iter()
.filter_map(|(id, _)| {
if self
.input_phase(id)
.map(|phase| !phase.is_terminal())
.unwrap_or(false)
{
Some(id.clone())
} else {
None
}
})
.collect();
let dsl_reason = mm_dsl::InputAbandonReason::from(&reason);
let reason_label = dsl_reason.as_str();
let mut count = 0;
for id in &non_terminal_ids {
let key = Self::dsl_key(id);
let attempt_count = u64::from(self.input_attempt_count(id));
let from_phase = self
.input_phase(id)
.unwrap_or(InputLifecycleState::Accepted);
if self
.dsl_apply(
mm_dsl::MeerkatMachineInput::AbandonInput {
input_id: key.clone(),
reason: dsl_reason,
attempt_count,
},
"AbandonInput",
)
.is_err()
{
continue;
}
let now = Utc::now();
if let Some(state) = self.ledger.get_mut(id) {
state.history.push(InputStateHistoryEntry {
timestamp: now,
from: from_phase,
to: InputLifecycleState::Abandoned,
reason: Some(format!("Abandon({reason_label})")),
});
state.terminal_outcome = Some(InputTerminalOutcome::Abandoned {
reason: reason.clone(),
});
state.updated_at = now;
}
count += 1;
self.events
.push(self.make_envelope(RuntimeEvent::InputLifecycle(
InputLifecycleEvent::Abandoned {
input_id: id.clone(),
reason: reason_label.to_string(),
},
)));
}
count
}
pub(crate) fn abandon_staged_inputs(
&mut self,
input_ids: &[InputId],
reason: InputAbandonReason,
) -> Result<usize, RuntimeDriverError> {
let dsl_reason = mm_dsl::InputAbandonReason::from(&reason);
let reason_label = dsl_reason.as_str();
let mut count = 0;
for input_id in input_ids {
if self.input_phase(input_id) != Some(InputLifecycleState::Staged) {
continue;
}
let from_phase = self
.input_phase(input_id)
.unwrap_or(InputLifecycleState::Staged);
let attempt_count = u64::from(self.input_attempt_count(input_id));
self.dsl_apply(
mm_dsl::MeerkatMachineInput::AbandonInput {
input_id: Self::dsl_key(input_id),
reason: dsl_reason,
attempt_count,
},
"AbandonInput(CancelledRun)",
)?;
let now = Utc::now();
if let Some(state) = self.ledger.get_mut(input_id) {
state.history.push(InputStateHistoryEntry {
timestamp: now,
from: from_phase,
to: InputLifecycleState::Abandoned,
reason: Some(format!("Abandon({reason_label})")),
});
state.terminal_outcome = Some(InputTerminalOutcome::Abandoned {
reason: reason.clone(),
});
state.updated_at = now;
}
count += 1;
self.events
.push(self.make_envelope(RuntimeEvent::InputLifecycle(
InputLifecycleEvent::Abandoned {
input_id: input_id.clone(),
reason: reason_label.to_string(),
},
)));
}
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
Ok(count)
}
pub(crate) fn abandon_pending_inputs(&mut self, reason: InputAbandonReason) -> usize {
let abandoned = self.abandon_all_non_terminal(reason);
self.queue.drain();
self.steer_queue.drain();
self.post_admission_signal = PostAdmissionSignal::None;
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
abandoned
}
pub(crate) fn machine_realize_run_completed(
&mut self,
run_id: &RunId,
consumed_input_ids: &[InputId],
) -> Result<(), RuntimeDriverError> {
self.consume_inputs(consumed_input_ids, run_id)
}
pub(crate) fn machine_realize_run_failed(
&mut self,
run_id: &RunId,
contributing_input_ids: &[InputId],
replay_plan: &ReplayQueuedContributorsPlan,
) -> Result<(), RuntimeDriverError> {
if replay_plan.wake_runtime && self.post_admission_signal < PostAdmissionSignal::WakeLoop {
self.post_admission_signal = PostAdmissionSignal::WakeLoop;
}
tracing::debug!(
run_id = ?run_id,
kind = replay_plan.notice_kind,
queue = replay_plan.queue_work_ids.len(),
steer = replay_plan.steer_work_ids.len(),
"runtime replayed queued contributors"
);
self.rollback_staged(contributing_input_ids)
}
pub(crate) fn machine_realize_run_cancelled(
&mut self,
run_id: &RunId,
contributing_input_ids: &[InputId],
) -> Result<(), RuntimeDriverError> {
tracing::debug!(
run_id = ?run_id,
contributors = contributing_input_ids.len(),
"runtime abandoned cancelled run contributors"
);
let _ =
self.abandon_staged_inputs(contributing_input_ids, InputAbandonReason::Cancelled)?;
Ok(())
}
pub(crate) fn machine_realize_boundary_applied(
&mut self,
run_id: &RunId,
receipt: &RunBoundaryReceipt,
) -> Result<(), RuntimeDriverError> {
tracing::debug!(
contributors = receipt.contributing_input_ids.len(),
sequence = receipt.sequence,
"runtime boundary applied"
);
for input_id in &receipt.contributing_input_ids {
let key = Self::dsl_key(input_id);
if !self.with_dsl_state(|state| state.input_phases.contains_key(&key)) {
continue;
}
if self.input_phase(input_id) != Some(InputLifecycleState::Staged) {
continue;
}
self.dsl_apply(
mm_dsl::MeerkatMachineInput::MarkApplied {
input_id: key.clone(),
},
"MarkApplied",
)?;
self.dsl_apply(
mm_dsl::MeerkatMachineInput::MarkAppliedPendingConsumption {
input_id: key.clone(),
},
"MarkAppliedPendingConsumption",
)?;
self.dsl_apply(
mm_dsl::MeerkatMachineInput::RecordBoundarySeq {
input_id: key,
seq: receipt.sequence,
},
"RecordBoundarySeq",
)?;
let now = Utc::now();
if let Some(state) = self.ledger.get_mut(input_id) {
state.history.push(InputStateHistoryEntry {
timestamp: now,
from: InputLifecycleState::Staged,
to: InputLifecycleState::Applied,
reason: Some(format!("MarkApplied({run_id})")),
});
state.history.push(InputStateHistoryEntry {
timestamp: now,
from: InputLifecycleState::Applied,
to: InputLifecycleState::AppliedPendingConsumption,
reason: Some(format!(
"MarkAppliedPendingConsumption(boundary_sequence={})",
receipt.sequence
)),
});
state.updated_at = now;
}
self.events
.push(self.make_envelope(RuntimeEvent::InputLifecycle(
InputLifecycleEvent::Applied {
input_id: input_id.clone(),
run_id: run_id.clone(),
},
)));
}
Ok(())
}
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl crate::traits::RuntimeDriver for EphemeralRuntimeDriver {
async fn accept_input(&mut self, input: Input) -> Result<AcceptOutcome, RuntimeDriverError> {
let resolved = self.resolve_admission(&input);
self.accept_resolved_input(input, resolved).await
}
async fn on_runtime_event(
&mut self,
_event: RuntimeEventEnvelope,
) -> Result<(), RuntimeDriverError> {
Ok(())
}
async fn recover(&mut self) -> Result<RecoveryReport, RuntimeDriverError> {
self.recover_ephemeral()
}
fn runtime_state(&self) -> RuntimeState {
self.runtime_phase_snapshot()
}
fn input_state(&self, input_id: &InputId) -> Option<&InputState> {
self.ledger.get(input_id)
}
fn input_phase(&self, input_id: &InputId) -> Option<InputLifecycleState> {
EphemeralRuntimeDriver::input_phase(self, input_id)
}
fn input_last_run_id(&self, input_id: &InputId) -> Option<RunId> {
EphemeralRuntimeDriver::input_last_run_id(self, input_id)
}
fn input_last_boundary_sequence(&self, input_id: &InputId) -> Option<u64> {
EphemeralRuntimeDriver::input_last_boundary_sequence(self, input_id)
}
fn stored_input_state(&self, input_id: &InputId) -> Option<StoredInputState> {
EphemeralRuntimeDriver::stored_input_state(self, input_id)
}
fn active_input_ids(&self) -> Vec<InputId> {
self.ledger
.iter()
.filter_map(|(id, _)| match self.input_phase(id) {
Some(phase) if !phase.is_terminal() => Some(id.clone()),
_ => None,
})
.collect()
}
}
#[cfg(test)]
mod tests {
use super::EphemeralRuntimeDriver;
use crate::identifiers::{IdempotencyKey, LogicalRuntimeId, SupersessionKey};
use crate::input::{
ContinuationInput, Input, InputDurability, InputHeader, InputOrigin, InputVisibility,
PeerConvention, PeerInput,
};
use crate::input_state::{InputAbandonReason, InputLifecycleState};
use crate::traits::RuntimeDriver;
use crate::{RuntimeState, WakeMode};
use chrono::Utc;
use meerkat_core::lifecycle::{InputId, RunId};
use meerkat_core::types::HandlingMode;
fn peer_message_input() -> Input {
Input::Peer(PeerInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Peer {
peer_id: "peer-1".into(),
display_identity: None,
runtime_id: None,
},
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
convention: Some(PeerConvention::Message),
body: "peer body".into(),
payload: None,
blocks: None,
handling_mode: None,
})
}
fn continuation_input(supersession_key: Option<&str>) -> Input {
Input::Continuation(ContinuationInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::System,
durability: InputDurability::Ephemeral,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: supersession_key.map(SupersessionKey::new),
correlation_id: None,
},
reason: "test-continuation".to_string(),
handling_mode: HandlingMode::Steer,
request_id: None,
flow_tool_overlay: None,
context_append: None,
turn_append: None,
})
}
fn workgraph_attention_continuation_input(idempotency_key: &str) -> Input {
Input::Continuation(ContinuationInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::System,
durability: InputDurability::Durable,
visibility: InputVisibility {
transcript_eligible: false,
operator_eligible: false,
},
idempotency_key: Some(IdempotencyKey::new(idempotency_key)),
supersession_key: Some(SupersessionKey::new("workgraph_attention:binding")),
correlation_id: None,
},
reason: "workgraph_attention".to_string(),
handling_mode: HandlingMode::Steer,
request_id: Some("binding".to_string()),
flow_tool_overlay: None,
context_append: None,
turn_append: None,
})
}
fn force_control_shadow(
driver: &mut EphemeralRuntimeDriver,
phase: RuntimeState,
current_run_id: Option<RunId>,
pre_run_phase: Option<RuntimeState>,
) {
let mut control = driver.write_control_projection();
control.phase = phase;
control.current_run_id = current_run_id;
control.pre_run_phase = pre_run_phase;
}
#[test]
fn set_control_projection_does_not_write_dsl_authority() {
let mut driver = EphemeralRuntimeDriver::new(LogicalRuntimeId::new("projection-only"));
let run_id = RunId::new();
driver.set_control_projection(
RuntimeState::Running,
Some(run_id),
Some(RuntimeState::Attached),
);
assert_eq!(
driver.runtime_phase_snapshot(),
RuntimeState::Idle,
"control projection writes must not mutate DSL lifecycle truth",
);
assert_eq!(
driver.current_run_id(),
None,
"control projection writes must not mutate DSL run binding truth",
);
assert_eq!(
driver.control_snapshot().phase,
RuntimeState::Running,
"the shell projection still records the mechanical projection",
);
}
#[tokio::test]
async fn direct_accept_uses_dsl_phase_not_control_projection_shadow() {
let mut driver = EphemeralRuntimeDriver::new(LogicalRuntimeId::new("admission-shadow"));
force_control_shadow(&mut driver, RuntimeState::Stopped, None, None);
let outcome = driver.accept_input(peer_message_input()).await.unwrap();
assert!(
outcome.is_accepted(),
"direct RuntimeDriver admission should follow DSL phase, not a stale control shadow",
);
}
#[tokio::test]
async fn continuation_supersession_coalesces_only_in_flight_inputs() {
let mut driver =
EphemeralRuntimeDriver::new(LogicalRuntimeId::new("continuation-supersession"));
let first = continuation_input(Some("attention:key"));
let first_id = first.id().clone();
let first_outcome = driver.accept_input(first).await.unwrap();
assert!(first_outcome.is_accepted());
assert!(
driver
.ledger()
.get(&first_id)
.and_then(|state| state.persisted_input.as_ref())
.is_some(),
"accepted continuation should keep its persisted input for supersession"
);
let second = continuation_input(Some("attention:key"));
let existing = driver
.ledger()
.get(&first_id)
.and_then(|state| state.persisted_input.as_ref())
.expect("persisted input");
assert!(matches!(
crate::coalescing::check_supersession(&second, existing, driver.runtime_id()),
crate::coalescing::CoalescingResult::Supersedes { .. }
));
assert!(driver.existing_superseded_input(&second).is_some());
let resolved = driver.resolve_admission(&second);
assert!(matches!(
resolved.admission_plan,
crate::accept::AdmissionPlan::Queued {
existing_action: Some(
crate::accept::ExistingQueuedAdmissionAction::Supersede { .. }
),
..
}
));
let second_id = second.id().clone();
let second_outcome = driver.accept_input(second).await.unwrap();
assert!(second_outcome.is_accepted());
assert_eq!(
driver.input_phase(&first_id),
Some(InputLifecycleState::Superseded)
);
assert_eq!(
driver.input_phase(&second_id),
Some(InputLifecycleState::Queued)
);
assert_eq!(driver.dsl_steer_lane(), vec![second_id.clone()]);
driver.abandon_all_non_terminal(InputAbandonReason::Reset);
let third = continuation_input(Some("attention:key"));
let third_id = third.id().clone();
let third_outcome = driver.accept_input(third).await.unwrap();
assert!(third_outcome.is_accepted());
assert_eq!(
driver.input_phase(&third_id),
Some(InputLifecycleState::Queued)
);
assert_eq!(
driver.dsl_steer_lane(),
vec![third_id],
"terminal/superseded continuations must not dedupe future keep-alive turns",
);
}
#[tokio::test]
async fn workgraph_attention_idempotency_only_deduplicates_live_inputs() {
let mut driver =
EphemeralRuntimeDriver::new(LogicalRuntimeId::new("attention-idempotency"));
let key = "workgraph_attention:realm:namespace:binding:1:1:digest";
let first = workgraph_attention_continuation_input(key);
let first_id = first.id().clone();
let first_outcome = driver.accept_input(first).await.unwrap();
assert!(first_outcome.is_accepted());
let duplicate = workgraph_attention_continuation_input(key);
let duplicate_outcome = driver.accept_input(duplicate).await.unwrap();
assert!(
matches!(
duplicate_outcome,
crate::accept::AcceptOutcome::Deduplicated { ref existing_id, .. }
if existing_id == &first_id
),
"same attention projection should dedupe while the first input is live: {duplicate_outcome:?}"
);
driver.abandon_all_non_terminal(InputAbandonReason::Reset);
let retry = workgraph_attention_continuation_input(key);
let retry_id = retry.id().clone();
let retry_outcome = driver.accept_input(retry).await.unwrap();
assert!(
retry_outcome.is_accepted(),
"same attention projection should be admissible after the previous attempt is terminal"
);
assert_eq!(
driver.input_phase(&retry_id),
Some(InputLifecycleState::Queued)
);
}
#[test]
fn resolve_admission_for_runtime_idle_uses_explicit_machine_phase_not_control_projection() {
let mut driver = EphemeralRuntimeDriver::new(LogicalRuntimeId::new("phase-drift"));
force_control_shadow(
&mut driver,
RuntimeState::Running,
Some(RunId::new()),
Some(RuntimeState::Attached),
);
let input = peer_message_input();
let projected = driver.resolve_admission(&input);
assert_eq!(projected.policy.wake_mode, WakeMode::WakeIfIdle);
assert!(!projected.coarse_flags.interrupt_yielding);
let machine_owned = driver.resolve_admission_for_runtime_idle(&input, true);
assert_eq!(machine_owned.policy.wake_mode, WakeMode::WakeIfIdle);
assert!(!machine_owned.coarse_flags.interrupt_yielding);
assert!(!machine_owned.coarse_flags.request_immediate_processing);
}
}