use std::collections::{BTreeSet, HashMap, HashSet};
use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
use meerkat_core::lifecycle::{InputId, RunId};
use meerkat_core::types::HandlingMode;
use crate::input_state::{InputLifecycleState, InputTerminalOutcome};
use crate::policy::PolicyDecision;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ContentShape(pub String);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ReservationKey(pub String);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RequestId(pub String);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum IngressPhase {
Active,
Retired,
Destroyed,
}
impl IngressPhase {
pub fn is_terminal(self) -> bool {
matches!(self, Self::Destroyed)
}
}
#[derive(Debug, Clone)]
pub enum RuntimeIngressInput {
AdmitQueued {
work_id: InputId,
content_shape: ContentShape,
handling_mode: HandlingMode,
is_prompt: bool,
request_id: Option<RequestId>,
reservation_key: Option<ReservationKey>,
policy: PolicyDecision,
existing_superseded_id: Option<InputId>,
},
AdmitConsumedOnAccept {
work_id: InputId,
content_shape: ContentShape,
request_id: Option<RequestId>,
reservation_key: Option<ReservationKey>,
policy: PolicyDecision,
},
AdmitDeduplicated {
work_id: InputId,
existing_id: InputId,
},
StageDrainSnapshot {
run_id: RunId,
contributing_work_ids: Vec<InputId>,
},
BoundaryApplied {
run_id: RunId,
boundary_sequence: u64,
},
RunCompleted { run_id: RunId },
RunFailed { run_id: RunId },
RunCancelled { run_id: RunId },
SupersedeQueuedInput {
new_work_id: InputId,
old_work_id: InputId,
},
CoalesceQueuedInputs {
aggregate_work_id: InputId,
source_work_ids: Vec<InputId>,
},
Retire,
Reset,
Destroy,
AdmitRecovered {
work_id: InputId,
content_shape: ContentShape,
handling_mode: HandlingMode,
lifecycle_state: InputLifecycleState,
policy: PolicyDecision,
request_id: Option<RequestId>,
reservation_key: Option<ReservationKey>,
},
Recover,
SetSilentIntentOverrides { intents: BTreeSet<String> },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RuntimeIngressEffect {
IngressAccepted { work_id: InputId },
Deduplicated {
work_id: InputId,
existing_id: InputId,
},
ReadyForRun {
run_id: RunId,
contributing_work_ids: Vec<InputId>,
},
InputLifecycleNotice {
work_id: InputId,
new_state: InputLifecycleState,
},
WakeRuntime,
RequestImmediateProcessing,
CompletionResolved {
work_id: InputId,
outcome: InputTerminalOutcome,
},
IngressNotice { kind: String, detail: String },
SilentIntentApplied { work_id: InputId, intent: String },
PersistAndQueue { work_id: InputId },
EnqueueTo {
work_id: InputId,
target: HandlingMode,
},
EnqueueFront {
work_id: InputId,
target: HandlingMode,
},
RemoveFromQueues { work_id: InputId },
CoalesceExisting {
new_id: InputId,
existing_id: InputId,
},
SupersedeExisting {
new_id: InputId,
existing_id: InputId,
},
ConsumeOnAccept { work_id: InputId },
EmitQueuedEvent { work_id: InputId },
StageInput { work_id: InputId, run_id: RunId },
RecoverConsumeOnAccept { work_id: InputId },
RecoverRollback { work_id: InputId },
RecoverKeep { work_id: InputId },
}
#[derive(Debug)]
pub struct RuntimeIngressTransition {
pub next_phase: IngressPhase,
pub effects: Vec<RuntimeIngressEffect>,
}
#[derive(Debug, Clone, thiserror::Error)]
#[non_exhaustive]
pub enum RuntimeIngressError {
#[error("Invalid ingress transition: {from:?} via {input} (rejected)")]
InvalidTransition { from: IngressPhase, input: String },
#[error("Guard failed: {guard} (from {from:?})")]
GuardFailed { from: IngressPhase, guard: String },
#[error("Ingress is in terminal phase {phase:?}")]
TerminalPhase { phase: IngressPhase },
}
#[derive(Debug, Clone)]
struct RuntimeIngressFields {
admitted_inputs: HashSet<InputId>,
admission_order: Vec<InputId>,
content_shape: HashMap<InputId, ContentShape>,
request_id: HashMap<InputId, Option<RequestId>>,
reservation_key: HashMap<InputId, Option<ReservationKey>>,
policy_snapshot: HashMap<InputId, PolicyDecision>,
handling_mode: HashMap<InputId, HandlingMode>,
is_prompt: HashMap<InputId, bool>,
lifecycle: HashMap<InputId, InputLifecycleState>,
terminal_outcome: HashMap<InputId, Option<InputTerminalOutcome>>,
queue: Vec<InputId>,
steer_queue: Vec<InputId>,
current_run: Option<RunId>,
current_run_contributors: Vec<InputId>,
last_run: HashMap<InputId, Option<RunId>>,
last_boundary_sequence: HashMap<InputId, Option<u64>>,
wake_requested: bool,
process_requested: bool,
silent_intent_overrides: BTreeSet<String>,
}
impl RuntimeIngressFields {
fn new() -> Self {
Self {
admitted_inputs: HashSet::new(),
admission_order: Vec::new(),
content_shape: HashMap::new(),
request_id: HashMap::new(),
reservation_key: HashMap::new(),
policy_snapshot: HashMap::new(),
handling_mode: HashMap::new(),
is_prompt: HashMap::new(),
lifecycle: HashMap::new(),
terminal_outcome: HashMap::new(),
queue: Vec::new(),
steer_queue: Vec::new(),
current_run: None,
current_run_contributors: Vec::new(),
last_run: HashMap::new(),
last_boundary_sequence: HashMap::new(),
wake_requested: false,
process_requested: false,
silent_intent_overrides: BTreeSet::new(),
}
}
}
mod sealed {
pub trait Sealed {}
}
pub trait RuntimeIngressMutator: sealed::Sealed {
fn apply(
&mut self,
input: RuntimeIngressInput,
) -> Result<RuntimeIngressTransition, RuntimeIngressError>;
}
#[derive(Debug, Clone)]
pub struct RuntimeIngressAuthority {
phase: IngressPhase,
fields: RuntimeIngressFields,
}
impl sealed::Sealed for RuntimeIngressAuthority {}
impl Default for RuntimeIngressAuthority {
fn default() -> Self {
Self::new()
}
}
impl RuntimeIngressAuthority {
pub fn new() -> Self {
Self {
phase: IngressPhase::Active,
fields: RuntimeIngressFields::new(),
}
}
pub fn phase(&self) -> IngressPhase {
self.phase
}
pub fn is_terminal(&self) -> bool {
self.phase.is_terminal()
}
pub fn admitted_inputs(&self) -> &HashSet<InputId> {
&self.fields.admitted_inputs
}
pub fn admission_order(&self) -> &[InputId] {
&self.fields.admission_order
}
pub fn queue(&self) -> &[InputId] {
&self.fields.queue
}
pub fn steer_queue(&self) -> &[InputId] {
&self.fields.steer_queue
}
pub fn current_run(&self) -> Option<&RunId> {
self.fields.current_run.as_ref()
}
pub fn current_run_contributors(&self) -> &[InputId] {
&self.fields.current_run_contributors
}
pub fn lifecycle_state(&self, work_id: &InputId) -> Option<InputLifecycleState> {
self.fields.lifecycle.get(work_id).copied()
}
pub fn terminal_outcome(&self, work_id: &InputId) -> Option<&InputTerminalOutcome> {
self.fields
.terminal_outcome
.get(work_id)
.and_then(|o| o.as_ref())
}
pub fn policy_snapshot(&self, work_id: &InputId) -> Option<&PolicyDecision> {
self.fields.policy_snapshot.get(work_id)
}
pub fn wake_requested(&self) -> bool {
self.fields.wake_requested
}
pub fn process_requested(&self) -> bool {
self.fields.process_requested
}
pub fn silent_intent_overrides(&self) -> &BTreeSet<String> {
&self.fields.silent_intent_overrides
}
pub fn last_run(&self, work_id: &InputId) -> Option<&RunId> {
self.fields.last_run.get(work_id).and_then(|o| o.as_ref())
}
pub fn last_boundary_sequence(&self, work_id: &InputId) -> Option<u64> {
self.fields
.last_boundary_sequence
.get(work_id)
.and_then(|o| *o)
}
pub fn handling_mode(&self, work_id: &InputId) -> Option<HandlingMode> {
self.fields.handling_mode.get(work_id).copied()
}
pub fn is_prompt(&self, work_id: &InputId) -> bool {
self.fields.is_prompt.get(work_id).copied().unwrap_or(false)
}
pub fn input_boundary(&self, work_id: &InputId) -> RunApplyBoundary {
match self.fields.handling_mode.get(work_id) {
Some(HandlingMode::Steer) => RunApplyBoundary::RunCheckpoint,
_ => RunApplyBoundary::RunStart,
}
}
pub fn drain_next_batch<F>(&self, boundary_of: F) -> Vec<InputId>
where
F: Fn(&InputId) -> RunApplyBoundary,
{
if !self.fields.steer_queue.is_empty() {
let first = &self.fields.steer_queue[0];
let target_boundary = boundary_of(first);
return self
.fields
.steer_queue
.iter()
.take_while(|id| boundary_of(id) == target_boundary)
.cloned()
.collect();
}
if let Some(first) = self.fields.queue.first() {
if self.fields.is_prompt.get(first).copied().unwrap_or(false) {
return vec![first.clone()];
}
return self
.fields
.queue
.iter()
.take_while(|id| !self.fields.is_prompt.get(*id).copied().unwrap_or(false))
.cloned()
.collect();
}
Vec::new()
}
pub fn can_accept(&self, input: &RuntimeIngressInput) -> bool {
self.evaluate(input).is_ok()
}
#[allow(clippy::too_many_arguments)]
pub fn admit(
&mut self,
work_id: InputId,
content_shape: ContentShape,
handling_mode: HandlingMode,
is_prompt: bool,
request_id: Option<RequestId>,
reservation_key: Option<ReservationKey>,
policy: PolicyDecision,
existing_superseded_id: Option<InputId>,
) -> Result<RuntimeIngressTransition, RuntimeIngressError> {
let consumed_on_accept = policy.apply_mode == crate::policy::ApplyMode::Ignore
&& policy.consume_point == crate::policy::ConsumePoint::OnAccept;
if consumed_on_accept {
self.apply(RuntimeIngressInput::AdmitConsumedOnAccept {
work_id,
content_shape,
request_id,
reservation_key,
policy,
})
} else {
self.apply(RuntimeIngressInput::AdmitQueued {
work_id,
content_shape,
handling_mode,
is_prompt,
request_id,
reservation_key,
policy,
existing_superseded_id,
})
}
}
pub fn take_wake_requested(&mut self) -> bool {
std::mem::take(&mut self.fields.wake_requested)
}
pub fn take_process_requested(&mut self) -> bool {
std::mem::take(&mut self.fields.process_requested)
}
fn evaluate(
&self,
input: &RuntimeIngressInput,
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
let phase = self.phase;
if phase.is_terminal() {
return Err(RuntimeIngressError::TerminalPhase { phase });
}
match input {
RuntimeIngressInput::AdmitQueued {
work_id,
content_shape,
handling_mode,
is_prompt,
request_id,
reservation_key,
policy,
existing_superseded_id,
} => self.eval_admit_queued(
phase,
work_id,
content_shape,
*handling_mode,
*is_prompt,
request_id,
reservation_key,
policy,
existing_superseded_id,
),
RuntimeIngressInput::AdmitConsumedOnAccept {
work_id,
content_shape,
request_id,
reservation_key,
policy,
} => self.eval_admit_consumed_on_accept(
phase,
work_id,
content_shape,
request_id,
reservation_key,
policy,
),
RuntimeIngressInput::AdmitDeduplicated {
work_id,
existing_id,
} => self.eval_admit_deduplicated(phase, work_id, existing_id),
RuntimeIngressInput::StageDrainSnapshot {
run_id,
contributing_work_ids,
} => self.eval_stage_drain_snapshot(phase, run_id, contributing_work_ids),
RuntimeIngressInput::BoundaryApplied {
run_id,
boundary_sequence,
} => self.eval_boundary_applied(phase, run_id, *boundary_sequence),
RuntimeIngressInput::RunCompleted { run_id } => self.eval_run_completed(phase, run_id),
RuntimeIngressInput::RunFailed { run_id } => self.eval_run_failed(phase, run_id),
RuntimeIngressInput::RunCancelled { run_id } => self.eval_run_cancelled(phase, run_id),
RuntimeIngressInput::SupersedeQueuedInput {
new_work_id,
old_work_id,
} => self.eval_supersede(phase, new_work_id, old_work_id),
RuntimeIngressInput::CoalesceQueuedInputs {
aggregate_work_id,
source_work_ids,
} => self.eval_coalesce(phase, aggregate_work_id, source_work_ids),
RuntimeIngressInput::AdmitRecovered {
work_id,
content_shape,
handling_mode,
lifecycle_state,
policy,
request_id,
reservation_key,
} => self.eval_admit_recovered(
phase,
work_id.clone(),
content_shape.clone(),
*handling_mode,
*lifecycle_state,
policy.clone(),
request_id.clone(),
reservation_key.clone(),
),
RuntimeIngressInput::Retire => self.eval_retire(phase),
RuntimeIngressInput::Reset => self.eval_reset(phase),
RuntimeIngressInput::Destroy => self.eval_destroy(phase),
RuntimeIngressInput::Recover => self.eval_recover(phase),
RuntimeIngressInput::SetSilentIntentOverrides { intents } => {
self.eval_set_silent_intent_overrides(phase, intents)
}
}
}
#[allow(clippy::too_many_arguments)]
fn eval_admit_queued(
&self,
phase: IngressPhase,
work_id: &InputId,
content_shape: &ContentShape,
handling_mode: HandlingMode,
is_prompt: bool,
request_id: &Option<RequestId>,
reservation_key: &Option<ReservationKey>,
policy: &PolicyDecision,
existing_superseded_id: &Option<InputId>,
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if phase != IngressPhase::Active {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "AdmitQueued".into(),
});
}
if self.fields.admitted_inputs.contains(work_id) {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: format!("input_is_new: {work_id:?} already admitted"),
});
}
let mut fields = self.fields.clone();
let mut effects = Vec::new();
fields.admitted_inputs.insert(work_id.clone());
fields.admission_order.push(work_id.clone());
fields
.content_shape
.insert(work_id.clone(), content_shape.clone());
fields
.request_id
.insert(work_id.clone(), request_id.clone());
fields
.reservation_key
.insert(work_id.clone(), reservation_key.clone());
fields
.policy_snapshot
.insert(work_id.clone(), policy.clone());
fields.handling_mode.insert(work_id.clone(), handling_mode);
fields.is_prompt.insert(work_id.clone(), is_prompt);
fields
.lifecycle
.insert(work_id.clone(), InputLifecycleState::Queued);
fields.terminal_outcome.insert(work_id.clone(), None);
fields.last_run.insert(work_id.clone(), None);
fields.last_boundary_sequence.insert(work_id.clone(), None);
match handling_mode {
HandlingMode::Queue => fields.queue.push(work_id.clone()),
HandlingMode::Steer => fields.steer_queue.push(work_id.clone()),
}
fields.wake_requested = true;
if handling_mode == HandlingMode::Steer {
fields.process_requested = true;
}
effects.push(RuntimeIngressEffect::IngressAccepted {
work_id: work_id.clone(),
});
effects.push(RuntimeIngressEffect::InputLifecycleNotice {
work_id: work_id.clone(),
new_state: InputLifecycleState::Queued,
});
match policy.wake_mode {
crate::WakeMode::WakeIfIdle => {
effects.push(RuntimeIngressEffect::WakeRuntime);
if handling_mode == HandlingMode::Steer {
effects.push(RuntimeIngressEffect::RequestImmediateProcessing);
}
}
crate::WakeMode::InterruptYielding => {
if handling_mode == HandlingMode::Steer {
effects.push(RuntimeIngressEffect::RequestImmediateProcessing);
}
}
crate::WakeMode::None => {}
}
match policy.apply_mode {
crate::policy::ApplyMode::Ignore => {
}
crate::policy::ApplyMode::InjectNow
| crate::policy::ApplyMode::StageRunStart
| crate::policy::ApplyMode::StageRunBoundary => {
effects.push(RuntimeIngressEffect::PersistAndQueue {
work_id: work_id.clone(),
});
match policy.queue_mode {
crate::policy::QueueMode::Coalesce => {
if let Some(existing_id) = existing_superseded_id {
effects.push(RuntimeIngressEffect::RemoveFromQueues {
work_id: existing_id.clone(),
});
effects.push(RuntimeIngressEffect::CoalesceExisting {
new_id: work_id.clone(),
existing_id: existing_id.clone(),
});
}
effects.push(RuntimeIngressEffect::EnqueueTo {
work_id: work_id.clone(),
target: handling_mode,
});
}
crate::policy::QueueMode::Supersede => {
if let Some(existing_id) = existing_superseded_id {
effects.push(RuntimeIngressEffect::RemoveFromQueues {
work_id: existing_id.clone(),
});
effects.push(RuntimeIngressEffect::SupersedeExisting {
new_id: work_id.clone(),
existing_id: existing_id.clone(),
});
}
effects.push(RuntimeIngressEffect::EnqueueTo {
work_id: work_id.clone(),
target: handling_mode,
});
}
crate::policy::QueueMode::Priority => {
effects.push(RuntimeIngressEffect::EnqueueFront {
work_id: work_id.clone(),
target: handling_mode,
});
}
crate::policy::QueueMode::Fifo | crate::policy::QueueMode::None => {
effects.push(RuntimeIngressEffect::EnqueueTo {
work_id: work_id.clone(),
target: handling_mode,
});
}
}
effects.push(RuntimeIngressEffect::EmitQueuedEvent {
work_id: work_id.clone(),
});
}
}
Ok((IngressPhase::Active, fields, effects))
}
fn eval_admit_deduplicated(
&self,
phase: IngressPhase,
work_id: &InputId,
existing_id: &InputId,
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if phase != IngressPhase::Active {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "AdmitDeduplicated".into(),
});
}
let fields = self.fields.clone();
let effects = vec![RuntimeIngressEffect::Deduplicated {
work_id: work_id.clone(),
existing_id: existing_id.clone(),
}];
Ok((phase, fields, effects))
}
fn eval_admit_consumed_on_accept(
&self,
phase: IngressPhase,
work_id: &InputId,
content_shape: &ContentShape,
request_id: &Option<RequestId>,
reservation_key: &Option<ReservationKey>,
policy: &PolicyDecision,
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if phase != IngressPhase::Active {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "AdmitConsumedOnAccept".into(),
});
}
if self.fields.admitted_inputs.contains(work_id) {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: format!("input_is_new: {work_id:?} already admitted"),
});
}
let mut fields = self.fields.clone();
let mut effects = Vec::new();
fields.admitted_inputs.insert(work_id.clone());
fields.admission_order.push(work_id.clone());
fields
.content_shape
.insert(work_id.clone(), content_shape.clone());
fields
.request_id
.insert(work_id.clone(), request_id.clone());
fields
.reservation_key
.insert(work_id.clone(), reservation_key.clone());
fields
.policy_snapshot
.insert(work_id.clone(), policy.clone());
fields
.lifecycle
.insert(work_id.clone(), InputLifecycleState::Consumed);
fields
.terminal_outcome
.insert(work_id.clone(), Some(InputTerminalOutcome::Consumed));
fields.last_run.insert(work_id.clone(), None);
fields.last_boundary_sequence.insert(work_id.clone(), None);
effects.push(RuntimeIngressEffect::IngressAccepted {
work_id: work_id.clone(),
});
effects.push(RuntimeIngressEffect::InputLifecycleNotice {
work_id: work_id.clone(),
new_state: InputLifecycleState::Consumed,
});
effects.push(RuntimeIngressEffect::CompletionResolved {
work_id: work_id.clone(),
outcome: InputTerminalOutcome::Consumed,
});
effects.push(RuntimeIngressEffect::ConsumeOnAccept {
work_id: work_id.clone(),
});
Ok((IngressPhase::Active, fields, effects))
}
fn eval_stage_drain_snapshot(
&self,
phase: IngressPhase,
run_id: &RunId,
contributing_work_ids: &[InputId],
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if !matches!(phase, IngressPhase::Active | IngressPhase::Retired) {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "StageDrainSnapshot".into(),
});
}
if self.fields.current_run.is_some() {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: "no_current_run: a run is already in progress".into(),
});
}
if contributing_work_ids.is_empty() {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: "contributors_non_empty: must have at least one contributor".into(),
});
}
for wid in contributing_work_ids {
let lifecycle = self.fields.lifecycle.get(wid);
if lifecycle != Some(&InputLifecycleState::Queued) {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: format!("all_contributors_are_queued: {wid:?} is {lifecycle:?}"),
});
}
}
if !self.fields.steer_queue.is_empty() {
if !seq_starts_with(&self.fields.steer_queue, contributing_work_ids) {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: "contributors_match_current_drain_source: not a prefix of steer_queue"
.into(),
});
}
} else if !seq_starts_with(&self.fields.queue, contributing_work_ids) {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: "contributors_match_current_drain_source: not a prefix of queue".into(),
});
}
let mut fields = self.fields.clone();
let mut effects = Vec::new();
if fields.steer_queue.is_empty() {
seq_remove_all(&mut fields.queue, contributing_work_ids);
} else {
seq_remove_all(&mut fields.steer_queue, contributing_work_ids);
}
fields.current_run = Some(run_id.clone());
fields.current_run_contributors = contributing_work_ids.to_vec();
fields.wake_requested = false;
fields.process_requested = false;
for wid in contributing_work_ids {
fields.last_run.insert(wid.clone(), Some(run_id.clone()));
fields
.lifecycle
.insert(wid.clone(), InputLifecycleState::Staged);
effects.push(RuntimeIngressEffect::StageInput {
work_id: wid.clone(),
run_id: run_id.clone(),
});
}
effects.push(RuntimeIngressEffect::ReadyForRun {
run_id: run_id.clone(),
contributing_work_ids: contributing_work_ids.to_vec(),
});
Ok((phase, fields, effects))
}
fn eval_boundary_applied(
&self,
phase: IngressPhase,
run_id: &RunId,
boundary_sequence: u64,
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if !matches!(phase, IngressPhase::Active | IngressPhase::Retired) {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "BoundaryApplied".into(),
});
}
if self.fields.current_run.as_ref() != Some(run_id) {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: format!(
"run_matches_current: expected {:?}, got {run_id:?}",
self.fields.current_run
),
});
}
for wid in &self.fields.current_run_contributors {
let lifecycle = self.fields.lifecycle.get(wid);
if lifecycle != Some(&InputLifecycleState::Staged) {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: format!("contributors_are_staged: {wid:?} is {lifecycle:?}"),
});
}
}
let mut fields = self.fields.clone();
let mut effects = Vec::new();
for wid in &fields.current_run_contributors.clone() {
fields
.lifecycle
.insert(wid.clone(), InputLifecycleState::AppliedPendingConsumption);
fields
.last_boundary_sequence
.insert(wid.clone(), Some(boundary_sequence));
}
effects.push(RuntimeIngressEffect::IngressNotice {
kind: "BoundaryApplied".into(),
detail: "ContributorsPendingConsumption".into(),
});
Ok((phase, fields, effects))
}
fn eval_run_completed(
&self,
phase: IngressPhase,
run_id: &RunId,
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if !matches!(phase, IngressPhase::Active | IngressPhase::Retired) {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "RunCompleted".into(),
});
}
if self.fields.current_run.as_ref() != Some(run_id) {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: format!(
"run_matches_current: expected {:?}, got {run_id:?}",
self.fields.current_run
),
});
}
for wid in &self.fields.current_run_contributors {
let lifecycle = self.fields.lifecycle.get(wid);
if lifecycle != Some(&InputLifecycleState::AppliedPendingConsumption) {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: format!("contributors_pending_consumption: {wid:?} is {lifecycle:?}"),
});
}
}
let mut fields = self.fields.clone();
let mut effects = Vec::new();
for wid in &fields.current_run_contributors.clone() {
fields
.lifecycle
.insert(wid.clone(), InputLifecycleState::Consumed);
fields
.terminal_outcome
.insert(wid.clone(), Some(InputTerminalOutcome::Consumed));
effects.push(RuntimeIngressEffect::InputLifecycleNotice {
work_id: wid.clone(),
new_state: InputLifecycleState::Consumed,
});
effects.push(RuntimeIngressEffect::CompletionResolved {
work_id: wid.clone(),
outcome: InputTerminalOutcome::Consumed,
});
}
fields.current_run = None;
fields.current_run_contributors = Vec::new();
Ok((phase, fields, effects))
}
fn eval_run_failed(
&self,
phase: IngressPhase,
run_id: &RunId,
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if !matches!(phase, IngressPhase::Active | IngressPhase::Retired) {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "RunFailed".into(),
});
}
if self.fields.current_run.as_ref() != Some(run_id) {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: format!(
"run_matches_current: expected {:?}, got {run_id:?}",
self.fields.current_run
),
});
}
let mut fields = self.fields.clone();
let mut effects = Vec::new();
for wid in &fields.current_run_contributors.clone() {
let lifecycle = fields.lifecycle.get(wid);
if lifecycle == Some(&InputLifecycleState::Staged) {
fields
.lifecycle
.insert(wid.clone(), InputLifecycleState::Queued);
let hm = fields.handling_mode.get(wid).copied();
match hm {
Some(HandlingMode::Steer) => {
if !fields.steer_queue.contains(wid) {
fields.steer_queue.insert(0, wid.clone());
}
}
_ => {
if !fields.queue.contains(wid) {
fields.queue.insert(0, wid.clone());
}
}
}
effects.push(RuntimeIngressEffect::InputLifecycleNotice {
work_id: wid.clone(),
new_state: InputLifecycleState::Queued,
});
}
}
fields.current_run = None;
fields.current_run_contributors = Vec::new();
if !fields.queue.is_empty() || !fields.steer_queue.is_empty() {
fields.wake_requested = true;
effects.push(RuntimeIngressEffect::WakeRuntime);
}
effects.push(RuntimeIngressEffect::IngressNotice {
kind: "RunFailed".into(),
detail: "StagedRolledBack".into(),
});
Ok((phase, fields, effects))
}
fn eval_run_cancelled(
&self,
phase: IngressPhase,
run_id: &RunId,
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if !matches!(phase, IngressPhase::Active | IngressPhase::Retired) {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "RunCancelled".into(),
});
}
if self.fields.current_run.as_ref() != Some(run_id) {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: format!(
"run_matches_current: expected {:?}, got {run_id:?}",
self.fields.current_run
),
});
}
let mut fields = self.fields.clone();
let mut effects = Vec::new();
for wid in &fields.current_run_contributors.clone() {
let lifecycle = fields.lifecycle.get(wid);
if lifecycle == Some(&InputLifecycleState::Staged) {
fields
.lifecycle
.insert(wid.clone(), InputLifecycleState::Queued);
let hm = fields.handling_mode.get(wid).copied();
match hm {
Some(HandlingMode::Steer) => {
if !fields.steer_queue.contains(wid) {
fields.steer_queue.insert(0, wid.clone());
}
}
_ => {
if !fields.queue.contains(wid) {
fields.queue.insert(0, wid.clone());
}
}
}
effects.push(RuntimeIngressEffect::InputLifecycleNotice {
work_id: wid.clone(),
new_state: InputLifecycleState::Queued,
});
}
}
fields.current_run = None;
fields.current_run_contributors = Vec::new();
if !fields.queue.is_empty() || !fields.steer_queue.is_empty() {
fields.wake_requested = true;
effects.push(RuntimeIngressEffect::WakeRuntime);
}
effects.push(RuntimeIngressEffect::IngressNotice {
kind: "RunCancelled".into(),
detail: "StagedRolledBack".into(),
});
Ok((phase, fields, effects))
}
fn eval_supersede(
&self,
phase: IngressPhase,
new_work_id: &InputId,
old_work_id: &InputId,
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if !matches!(phase, IngressPhase::Active | IngressPhase::Retired) {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "SupersedeQueuedInput".into(),
});
}
let old_lifecycle = self.fields.lifecycle.get(old_work_id);
if old_lifecycle != Some(&InputLifecycleState::Queued) {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: format!("old_input_is_queued: {old_work_id:?} is {old_lifecycle:?}"),
});
}
if !self.fields.admitted_inputs.contains(new_work_id) {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: format!("new_input_is_admitted: {new_work_id:?} not found"),
});
}
let mut fields = self.fields.clone();
let mut effects = Vec::new();
fields.queue.retain(|id| id != old_work_id);
fields.steer_queue.retain(|id| id != old_work_id);
fields
.lifecycle
.insert(old_work_id.clone(), InputLifecycleState::Superseded);
let outcome = InputTerminalOutcome::Superseded {
superseded_by: new_work_id.clone(),
};
fields
.terminal_outcome
.insert(old_work_id.clone(), Some(outcome.clone()));
effects.push(RuntimeIngressEffect::InputLifecycleNotice {
work_id: old_work_id.clone(),
new_state: InputLifecycleState::Superseded,
});
effects.push(RuntimeIngressEffect::CompletionResolved {
work_id: old_work_id.clone(),
outcome,
});
Ok((phase, fields, effects))
}
fn eval_coalesce(
&self,
phase: IngressPhase,
aggregate_work_id: &InputId,
source_work_ids: &[InputId],
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if !matches!(phase, IngressPhase::Active | IngressPhase::Retired) {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "CoalesceQueuedInputs".into(),
});
}
for wid in source_work_ids {
let lifecycle = self.fields.lifecycle.get(wid);
if lifecycle != Some(&InputLifecycleState::Queued) {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: format!("all_sources_queued: {wid:?} is {lifecycle:?}"),
});
}
}
let mut fields = self.fields.clone();
let mut effects = Vec::new();
for wid in source_work_ids {
fields.queue.retain(|id| id != wid);
fields.steer_queue.retain(|id| id != wid);
fields
.lifecycle
.insert(wid.clone(), InputLifecycleState::Coalesced);
let outcome = InputTerminalOutcome::Coalesced {
aggregate_id: aggregate_work_id.clone(),
};
fields
.terminal_outcome
.insert(wid.clone(), Some(outcome.clone()));
effects.push(RuntimeIngressEffect::InputLifecycleNotice {
work_id: wid.clone(),
new_state: InputLifecycleState::Coalesced,
});
effects.push(RuntimeIngressEffect::CompletionResolved {
work_id: wid.clone(),
outcome,
});
}
Ok((phase, fields, effects))
}
fn eval_retire(
&self,
phase: IngressPhase,
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if phase != IngressPhase::Active {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "Retire".into(),
});
}
let fields = self.fields.clone();
let effects = vec![RuntimeIngressEffect::IngressNotice {
kind: "Retire".into(),
detail: "IngressRetired".into(),
}];
Ok((IngressPhase::Retired, fields, effects))
}
fn eval_reset(
&self,
phase: IngressPhase,
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if !matches!(phase, IngressPhase::Active | IngressPhase::Retired) {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "Reset".into(),
});
}
if self.fields.current_run.is_some() {
return Err(RuntimeIngressError::GuardFailed {
from: phase,
guard: "no_current_run: cannot reset while a run is in progress".into(),
});
}
let mut fields = self.fields.clone();
let mut effects = Vec::new();
let non_terminal_ids: Vec<InputId> = fields
.lifecycle
.iter()
.filter(|(_, state)| !state.is_terminal())
.map(|(id, _)| id.clone())
.collect();
for wid in &non_terminal_ids {
let outcome = InputTerminalOutcome::Abandoned {
reason: crate::input_state::InputAbandonReason::Reset,
};
fields
.lifecycle
.insert(wid.clone(), InputLifecycleState::Abandoned);
fields
.terminal_outcome
.insert(wid.clone(), Some(outcome.clone()));
effects.push(RuntimeIngressEffect::InputLifecycleNotice {
work_id: wid.clone(),
new_state: InputLifecycleState::Abandoned,
});
effects.push(RuntimeIngressEffect::CompletionResolved {
work_id: wid.clone(),
outcome,
});
}
fields.queue.clear();
fields.steer_queue.clear();
fields.wake_requested = false;
fields.process_requested = false;
fields.current_run = None;
fields.current_run_contributors = Vec::new();
effects.push(RuntimeIngressEffect::IngressNotice {
kind: "Reset".into(),
detail: "IngressReset".into(),
});
Ok((IngressPhase::Active, fields, effects))
}
fn eval_destroy(
&self,
phase: IngressPhase,
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if !matches!(phase, IngressPhase::Active | IngressPhase::Retired) {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "Destroy".into(),
});
}
let mut fields = self.fields.clone();
let mut effects = Vec::new();
let non_terminal_ids: Vec<InputId> = fields
.lifecycle
.iter()
.filter(|(_, state)| !state.is_terminal())
.map(|(id, _)| id.clone())
.collect();
for wid in &non_terminal_ids {
let outcome = InputTerminalOutcome::Abandoned {
reason: crate::input_state::InputAbandonReason::Destroyed,
};
fields
.lifecycle
.insert(wid.clone(), InputLifecycleState::Abandoned);
fields
.terminal_outcome
.insert(wid.clone(), Some(outcome.clone()));
effects.push(RuntimeIngressEffect::InputLifecycleNotice {
work_id: wid.clone(),
new_state: InputLifecycleState::Abandoned,
});
effects.push(RuntimeIngressEffect::CompletionResolved {
work_id: wid.clone(),
outcome,
});
}
fields.queue.clear();
fields.steer_queue.clear();
fields.wake_requested = false;
fields.process_requested = false;
fields.current_run = None;
fields.current_run_contributors = Vec::new();
effects.push(RuntimeIngressEffect::IngressNotice {
kind: "Destroy".into(),
detail: "IngressDestroyed".into(),
});
Ok((IngressPhase::Destroyed, fields, effects))
}
#[allow(clippy::too_many_arguments)]
fn eval_admit_recovered(
&self,
phase: IngressPhase,
work_id: InputId,
content_shape: ContentShape,
handling_mode: HandlingMode,
lifecycle_state: InputLifecycleState,
policy: PolicyDecision,
request_id: Option<RequestId>,
reservation_key: Option<ReservationKey>,
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if !matches!(phase, IngressPhase::Active | IngressPhase::Retired) {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "AdmitRecovered".into(),
});
}
let mut fields = self.fields.clone();
fields.admitted_inputs.insert(work_id.clone());
fields.content_shape.insert(work_id.clone(), content_shape);
fields.handling_mode.insert(work_id.clone(), handling_mode);
fields.lifecycle.insert(work_id.clone(), lifecycle_state);
fields.policy_snapshot.insert(work_id.clone(), policy);
fields.request_id.insert(work_id.clone(), request_id);
fields
.reservation_key
.insert(work_id.clone(), reservation_key);
if lifecycle_state == InputLifecycleState::Queued {
match handling_mode {
HandlingMode::Steer => {
if !fields.steer_queue.contains(&work_id) {
fields.steer_queue.push(work_id);
}
}
HandlingMode::Queue => {
if !fields.queue.contains(&work_id) {
fields.queue.push(work_id);
}
}
}
}
Ok((phase, fields, vec![]))
}
fn eval_recover(
&self,
phase: IngressPhase,
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if !matches!(phase, IngressPhase::Active | IngressPhase::Retired) {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "Recover".into(),
});
}
let mut fields = self.fields.clone();
let mut effects = Vec::new();
if fields.current_run.is_some() {
for wid in &fields.current_run_contributors.clone() {
let lifecycle = fields.lifecycle.get(wid);
if lifecycle == Some(&InputLifecycleState::Staged) {
fields
.lifecycle
.insert(wid.clone(), InputLifecycleState::Queued);
let hm = fields.handling_mode.get(wid).copied();
match hm {
Some(HandlingMode::Steer) => {
if !fields.steer_queue.contains(wid) {
fields.steer_queue.insert(0, wid.clone());
}
}
_ => {
if !fields.queue.contains(wid) {
fields.queue.insert(0, wid.clone());
}
}
}
}
}
fields.current_run = None;
fields.current_run_contributors = Vec::new();
}
let non_terminal_ids: Vec<InputId> = fields
.lifecycle
.iter()
.filter(|(_, state)| !state.is_terminal())
.map(|(id, _)| id.clone())
.collect();
for wid in &non_terminal_ids {
let lifecycle = fields.lifecycle.get(wid).copied();
match lifecycle {
Some(InputLifecycleState::Accepted) => {
fields
.lifecycle
.insert(wid.clone(), InputLifecycleState::Queued);
let hm = fields.handling_mode.get(wid).copied();
match hm {
Some(HandlingMode::Steer) => {
if !fields.steer_queue.contains(wid) {
fields.steer_queue.insert(0, wid.clone());
}
}
_ => {
if !fields.queue.contains(wid) {
fields.queue.insert(0, wid.clone());
}
}
}
let should_consume = fields
.policy_snapshot
.get(wid)
.map(|p| {
p.apply_mode == crate::policy::ApplyMode::Ignore
&& p.consume_point == crate::policy::ConsumePoint::OnAccept
})
.unwrap_or(false);
if should_consume {
effects.push(RuntimeIngressEffect::RecoverConsumeOnAccept {
work_id: wid.clone(),
});
} else {
effects.push(RuntimeIngressEffect::RecoverRollback {
work_id: wid.clone(),
});
}
}
Some(InputLifecycleState::Staged) => {
fields
.lifecycle
.insert(wid.clone(), InputLifecycleState::Queued);
let hm = fields.handling_mode.get(wid).copied();
match hm {
Some(HandlingMode::Steer) => {
if !fields.steer_queue.contains(wid) {
fields.steer_queue.insert(0, wid.clone());
}
}
_ => {
if !fields.queue.contains(wid) {
fields.queue.insert(0, wid.clone());
}
}
}
effects.push(RuntimeIngressEffect::RecoverRollback {
work_id: wid.clone(),
});
}
Some(
InputLifecycleState::Applied | InputLifecycleState::AppliedPendingConsumption,
) => {
effects.push(RuntimeIngressEffect::RecoverKeep {
work_id: wid.clone(),
});
}
Some(InputLifecycleState::Queued) => {
effects.push(RuntimeIngressEffect::RecoverKeep {
work_id: wid.clone(),
});
}
_ => {}
}
}
if !fields.queue.is_empty() || !fields.steer_queue.is_empty() {
fields.wake_requested = true;
effects.push(RuntimeIngressEffect::WakeRuntime);
}
effects.push(RuntimeIngressEffect::IngressNotice {
kind: "Recover".into(),
detail: "IngressRecovered".into(),
});
Ok((phase, fields, effects))
}
fn eval_set_silent_intent_overrides(
&self,
phase: IngressPhase,
intents: &BTreeSet<String>,
) -> Result<
(
IngressPhase,
RuntimeIngressFields,
Vec<RuntimeIngressEffect>,
),
RuntimeIngressError,
> {
if !matches!(phase, IngressPhase::Active | IngressPhase::Retired) {
return Err(RuntimeIngressError::InvalidTransition {
from: phase,
input: "SetSilentIntentOverrides".into(),
});
}
let mut fields = self.fields.clone();
fields.silent_intent_overrides = intents.clone();
let effects = vec![RuntimeIngressEffect::IngressNotice {
kind: "SetSilentIntentOverrides".into(),
detail: format!("{} intents configured", intents.len()),
}];
Ok((phase, fields, effects))
}
}
impl RuntimeIngressMutator for RuntimeIngressAuthority {
fn apply(
&mut self,
input: RuntimeIngressInput,
) -> Result<RuntimeIngressTransition, RuntimeIngressError> {
let (next_phase, next_fields, effects) = self.evaluate(&input)?;
self.phase = next_phase;
self.fields = next_fields;
Ok(RuntimeIngressTransition {
next_phase,
effects,
})
}
}
fn seq_starts_with(seq: &[InputId], prefix: &[InputId]) -> bool {
if prefix.len() > seq.len() {
return false;
}
seq[..prefix.len()] == *prefix
}
fn seq_remove_all(seq: &mut Vec<InputId>, values: &[InputId]) {
seq.retain(|id| !values.contains(id));
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::redundant_clone,
clippy::cloned_ref_to_slice_refs
)]
mod tests {
use super::*;
use crate::identifiers::PolicyVersion;
use crate::policy::{
ApplyMode, ConsumePoint, DrainPolicy, InterruptPolicy, QueueMode, RoutingDisposition,
WakeMode,
};
fn test_policy() -> PolicyDecision {
PolicyDecision {
apply_mode: ApplyMode::StageRunStart,
wake_mode: WakeMode::WakeIfIdle,
queue_mode: QueueMode::Fifo,
consume_point: ConsumePoint::OnRunComplete,
interrupt_policy: InterruptPolicy::None,
drain_policy: DrainPolicy::QueueNextTurn,
routing_disposition: RoutingDisposition::Queue,
record_transcript: true,
emit_operator_content: true,
policy_version: PolicyVersion(1),
}
}
fn admit_queued(
auth: &mut RuntimeIngressAuthority,
work_id: InputId,
mode: HandlingMode,
) -> RuntimeIngressTransition {
admit_queued_with_prompt(auth, work_id, mode, false)
}
fn admit_queued_with_prompt(
auth: &mut RuntimeIngressAuthority,
work_id: InputId,
mode: HandlingMode,
is_prompt: bool,
) -> RuntimeIngressTransition {
auth.apply(RuntimeIngressInput::AdmitQueued {
work_id,
content_shape: ContentShape("text".into()),
handling_mode: mode,
is_prompt,
request_id: None,
reservation_key: None,
policy: test_policy(),
existing_superseded_id: None,
})
.expect("admit should succeed")
}
fn admit_consumed(
auth: &mut RuntimeIngressAuthority,
work_id: InputId,
) -> RuntimeIngressTransition {
auth.apply(RuntimeIngressInput::AdmitConsumedOnAccept {
work_id,
content_shape: ContentShape("text".into()),
request_id: None,
reservation_key: None,
policy: test_policy(),
})
.expect("admit consumed should succeed")
}
#[test]
fn new_starts_active() {
let auth = RuntimeIngressAuthority::new();
assert_eq!(auth.phase(), IngressPhase::Active);
assert!(!auth.is_terminal());
}
#[test]
fn retire_transitions_to_retired() {
let mut auth = RuntimeIngressAuthority::new();
let t = auth
.apply(RuntimeIngressInput::Retire)
.expect("retire should succeed");
assert_eq!(t.next_phase, IngressPhase::Retired);
assert_eq!(auth.phase(), IngressPhase::Retired);
}
#[test]
fn destroy_transitions_to_destroyed() {
let mut auth = RuntimeIngressAuthority::new();
let t = auth
.apply(RuntimeIngressInput::Destroy)
.expect("destroy should succeed");
assert_eq!(t.next_phase, IngressPhase::Destroyed);
assert!(auth.is_terminal());
}
#[test]
fn destroy_from_retired() {
let mut auth = RuntimeIngressAuthority::new();
auth.apply(RuntimeIngressInput::Retire).unwrap();
let t = auth
.apply(RuntimeIngressInput::Destroy)
.expect("destroy from retired should succeed");
assert_eq!(t.next_phase, IngressPhase::Destroyed);
}
#[test]
fn destroyed_rejects_all() {
let mut auth = RuntimeIngressAuthority::new();
auth.apply(RuntimeIngressInput::Destroy).unwrap();
let result = auth.apply(RuntimeIngressInput::Retire);
assert!(matches!(
result,
Err(RuntimeIngressError::TerminalPhase { .. })
));
}
#[test]
fn reset_returns_to_active() {
let mut auth = RuntimeIngressAuthority::new();
auth.apply(RuntimeIngressInput::Retire).unwrap();
let t = auth
.apply(RuntimeIngressInput::Reset)
.expect("reset from retired");
assert_eq!(t.next_phase, IngressPhase::Active);
}
#[test]
fn admit_queued_registers_input() {
let mut auth = RuntimeIngressAuthority::new();
let wid = InputId::new();
let t = admit_queued(&mut auth, wid.clone(), HandlingMode::Queue);
assert_eq!(t.next_phase, IngressPhase::Active);
assert!(auth.admitted_inputs().contains(&wid));
assert_eq!(auth.queue(), &[wid.clone()]);
assert!(auth.steer_queue().is_empty());
assert_eq!(
auth.lifecycle_state(&wid),
Some(InputLifecycleState::Queued)
);
assert!(auth.wake_requested());
assert!(
t.effects
.iter()
.any(|e| matches!(e, RuntimeIngressEffect::IngressAccepted { .. }))
);
}
#[test]
fn admit_queued_steer_mode() {
let mut auth = RuntimeIngressAuthority::new();
let wid = InputId::new();
let t = admit_queued(&mut auth, wid.clone(), HandlingMode::Steer);
assert!(auth.queue().is_empty());
assert_eq!(auth.steer_queue(), &[wid.clone()]);
assert!(auth.process_requested());
assert!(
t.effects
.iter()
.any(|e| matches!(e, RuntimeIngressEffect::RequestImmediateProcessing))
);
}
#[test]
fn admit_queued_duplicate_rejected() {
let mut auth = RuntimeIngressAuthority::new();
let wid = InputId::new();
admit_queued(&mut auth, wid.clone(), HandlingMode::Queue);
let result = auth.apply(RuntimeIngressInput::AdmitQueued {
work_id: wid,
content_shape: ContentShape("text".into()),
handling_mode: HandlingMode::Queue,
is_prompt: false,
request_id: None,
reservation_key: None,
policy: test_policy(),
existing_superseded_id: None,
});
assert!(matches!(
result,
Err(RuntimeIngressError::GuardFailed { .. })
));
}
#[test]
fn admit_queued_rejected_from_retired() {
let mut auth = RuntimeIngressAuthority::new();
auth.apply(RuntimeIngressInput::Retire).unwrap();
let result = auth.apply(RuntimeIngressInput::AdmitQueued {
work_id: InputId::new(),
content_shape: ContentShape("text".into()),
handling_mode: HandlingMode::Queue,
is_prompt: false,
request_id: None,
reservation_key: None,
policy: test_policy(),
existing_superseded_id: None,
});
assert!(matches!(
result,
Err(RuntimeIngressError::InvalidTransition { .. })
));
}
#[test]
fn admit_consumed_on_accept() {
let mut auth = RuntimeIngressAuthority::new();
let wid = InputId::new();
let t = admit_consumed(&mut auth, wid.clone());
assert_eq!(
auth.lifecycle_state(&wid),
Some(InputLifecycleState::Consumed)
);
assert!(matches!(
auth.terminal_outcome(&wid),
Some(InputTerminalOutcome::Consumed)
));
assert!(auth.queue().is_empty());
assert!(
t.effects
.iter()
.any(|e| matches!(e, RuntimeIngressEffect::CompletionResolved { .. }))
);
}
#[test]
fn stage_drain_snapshot_happy_path() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
let w2 = InputId::new();
admit_queued(&mut auth, w1.clone(), HandlingMode::Queue);
admit_queued(&mut auth, w2.clone(), HandlingMode::Queue);
let run_id = RunId::new();
let t = auth
.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: run_id.clone(),
contributing_work_ids: vec![w1.clone(), w2.clone()],
})
.expect("stage should succeed");
assert!(auth.queue().is_empty());
assert_eq!(auth.current_run(), Some(&run_id));
assert_eq!(auth.current_run_contributors(), &[w1.clone(), w2.clone()]);
assert_eq!(auth.lifecycle_state(&w1), Some(InputLifecycleState::Staged));
assert_eq!(auth.lifecycle_state(&w2), Some(InputLifecycleState::Staged));
assert!(
t.effects
.iter()
.any(|e| matches!(e, RuntimeIngressEffect::ReadyForRun { .. }))
);
}
#[test]
fn stage_drain_rejected_with_current_run() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
let w2 = InputId::new();
admit_queued(&mut auth, w1.clone(), HandlingMode::Queue);
admit_queued(&mut auth, w2.clone(), HandlingMode::Queue);
auth.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: RunId::new(),
contributing_work_ids: vec![w1.clone()],
})
.unwrap();
let result = auth.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: RunId::new(),
contributing_work_ids: vec![w2],
});
assert!(matches!(
result,
Err(RuntimeIngressError::GuardFailed { .. })
));
}
#[test]
fn stage_drain_prefers_steer_queue() {
let mut auth = RuntimeIngressAuthority::new();
let w_queue = InputId::new();
let w_steer = InputId::new();
admit_queued(&mut auth, w_queue.clone(), HandlingMode::Queue);
admit_queued(&mut auth, w_steer.clone(), HandlingMode::Steer);
let result = auth.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: RunId::new(),
contributing_work_ids: vec![w_queue.clone()],
});
assert!(
result.is_err(),
"should reject staging from queue when steer_queue is non-empty"
);
auth.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: RunId::new(),
contributing_work_ids: vec![w_steer.clone()],
})
.expect("steer staging should succeed");
}
#[test]
fn boundary_applied_transitions_to_pending_consumption() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
admit_queued(&mut auth, w1.clone(), HandlingMode::Queue);
let run_id = RunId::new();
auth.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: run_id.clone(),
contributing_work_ids: vec![w1.clone()],
})
.unwrap();
let t = auth
.apply(RuntimeIngressInput::BoundaryApplied {
run_id: run_id.clone(),
boundary_sequence: 42,
})
.expect("boundary applied should succeed");
assert_eq!(
auth.lifecycle_state(&w1),
Some(InputLifecycleState::AppliedPendingConsumption)
);
assert_eq!(auth.last_boundary_sequence(&w1), Some(42));
assert!(
t.effects
.iter()
.any(|e| matches!(e, RuntimeIngressEffect::IngressNotice { .. }))
);
}
#[test]
fn boundary_applied_wrong_run_rejected() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
admit_queued(&mut auth, w1.clone(), HandlingMode::Queue);
auth.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: RunId::new(),
contributing_work_ids: vec![w1.clone()],
})
.unwrap();
let result = auth.apply(RuntimeIngressInput::BoundaryApplied {
run_id: RunId::new(), boundary_sequence: 1,
});
assert!(matches!(
result,
Err(RuntimeIngressError::GuardFailed { .. })
));
}
#[test]
fn run_completed_consumes_contributors() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
admit_queued(&mut auth, w1.clone(), HandlingMode::Queue);
let run_id = RunId::new();
auth.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: run_id.clone(),
contributing_work_ids: vec![w1.clone()],
})
.unwrap();
auth.apply(RuntimeIngressInput::BoundaryApplied {
run_id: run_id.clone(),
boundary_sequence: 1,
})
.unwrap();
let t = auth
.apply(RuntimeIngressInput::RunCompleted {
run_id: run_id.clone(),
})
.expect("run completed should succeed");
assert_eq!(
auth.lifecycle_state(&w1),
Some(InputLifecycleState::Consumed)
);
assert!(auth.current_run().is_none());
assert!(auth.current_run_contributors().is_empty());
assert!(t.effects.iter().any(|e| matches!(
e,
RuntimeIngressEffect::CompletionResolved {
outcome: InputTerminalOutcome::Consumed,
..
}
)));
}
#[test]
fn run_failed_rolls_back_staged() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
admit_queued(&mut auth, w1.clone(), HandlingMode::Queue);
let run_id = RunId::new();
auth.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: run_id.clone(),
contributing_work_ids: vec![w1.clone()],
})
.unwrap();
auth.apply(RuntimeIngressInput::RunFailed {
run_id: run_id.clone(),
})
.expect("run failed should succeed");
assert_eq!(auth.lifecycle_state(&w1), Some(InputLifecycleState::Queued));
assert!(auth.queue().contains(&w1));
assert!(auth.current_run().is_none());
assert!(auth.wake_requested());
}
#[test]
fn run_cancelled_rolls_back_staged() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
admit_queued(&mut auth, w1.clone(), HandlingMode::Queue);
let run_id = RunId::new();
auth.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: run_id.clone(),
contributing_work_ids: vec![w1.clone()],
})
.unwrap();
auth.apply(RuntimeIngressInput::RunCancelled {
run_id: run_id.clone(),
})
.expect("run cancelled should succeed");
assert_eq!(auth.lifecycle_state(&w1), Some(InputLifecycleState::Queued));
assert!(auth.queue().contains(&w1));
}
#[test]
fn supersede_marks_old_as_superseded() {
let mut auth = RuntimeIngressAuthority::new();
let old = InputId::new();
let new = InputId::new();
admit_queued(&mut auth, old.clone(), HandlingMode::Queue);
admit_queued(&mut auth, new.clone(), HandlingMode::Queue);
auth.apply(RuntimeIngressInput::SupersedeQueuedInput {
new_work_id: new.clone(),
old_work_id: old.clone(),
})
.expect("supersede should succeed");
assert_eq!(
auth.lifecycle_state(&old),
Some(InputLifecycleState::Superseded)
);
assert!(!auth.queue().contains(&old));
assert!(auth.queue().contains(&new));
assert!(matches!(
auth.terminal_outcome(&old),
Some(InputTerminalOutcome::Superseded { .. })
));
}
#[test]
fn coalesce_marks_sources_as_coalesced() {
let mut auth = RuntimeIngressAuthority::new();
let s1 = InputId::new();
let s2 = InputId::new();
let agg = InputId::new();
admit_queued(&mut auth, s1.clone(), HandlingMode::Queue);
admit_queued(&mut auth, s2.clone(), HandlingMode::Queue);
admit_queued(&mut auth, agg.clone(), HandlingMode::Queue);
auth.apply(RuntimeIngressInput::CoalesceQueuedInputs {
aggregate_work_id: agg.clone(),
source_work_ids: vec![s1.clone(), s2.clone()],
})
.expect("coalesce should succeed");
assert_eq!(
auth.lifecycle_state(&s1),
Some(InputLifecycleState::Coalesced)
);
assert_eq!(
auth.lifecycle_state(&s2),
Some(InputLifecycleState::Coalesced)
);
assert!(!auth.queue().contains(&s1));
assert!(!auth.queue().contains(&s2));
}
#[test]
fn recover_rolls_back_in_flight_run() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
admit_queued(&mut auth, w1.clone(), HandlingMode::Queue);
let run_id = RunId::new();
auth.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: run_id.clone(),
contributing_work_ids: vec![w1.clone()],
})
.unwrap();
auth.apply(RuntimeIngressInput::Recover)
.expect("recover should succeed");
assert_eq!(auth.lifecycle_state(&w1), Some(InputLifecycleState::Queued));
assert!(auth.current_run().is_none());
assert!(auth.queue().contains(&w1));
assert!(auth.wake_requested());
}
#[test]
fn recover_requeues_admitted_recovered_accepted_input() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
auth.apply(RuntimeIngressInput::AdmitRecovered {
work_id: w1.clone(),
content_shape: ContentShape("text".into()),
handling_mode: HandlingMode::Queue,
lifecycle_state: InputLifecycleState::Accepted,
policy: test_policy(),
request_id: None,
reservation_key: None,
})
.expect("admit recovered accepted input should succeed");
let transition = auth
.apply(RuntimeIngressInput::Recover)
.expect("recover should succeed");
assert_eq!(auth.lifecycle_state(&w1), Some(InputLifecycleState::Queued));
assert!(auth.queue().contains(&w1));
assert!(transition.effects.iter().any(|effect| matches!(
effect,
RuntimeIngressEffect::RecoverRollback { work_id } if work_id == &w1
)));
}
#[test]
fn recover_requeues_admitted_recovered_staged_input_without_current_run() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
auth.apply(RuntimeIngressInput::AdmitRecovered {
work_id: w1.clone(),
content_shape: ContentShape("text".into()),
handling_mode: HandlingMode::Steer,
lifecycle_state: InputLifecycleState::Staged,
policy: test_policy(),
request_id: None,
reservation_key: None,
})
.expect("admit recovered staged input should succeed");
let transition = auth
.apply(RuntimeIngressInput::Recover)
.expect("recover should succeed");
assert_eq!(auth.lifecycle_state(&w1), Some(InputLifecycleState::Queued));
assert!(auth.steer_queue().contains(&w1));
assert!(!auth.queue().contains(&w1));
assert!(transition.effects.iter().any(|effect| matches!(
effect,
RuntimeIngressEffect::RecoverRollback { work_id } if work_id == &w1
)));
}
#[test]
fn set_silent_intent_overrides() {
let mut auth = RuntimeIngressAuthority::new();
let intents: BTreeSet<String> =
["intent_a".into(), "intent_b".into()].into_iter().collect();
auth.apply(RuntimeIngressInput::SetSilentIntentOverrides {
intents: intents.clone(),
})
.expect("set overrides should succeed");
assert_eq!(auth.silent_intent_overrides(), &intents);
}
#[test]
fn full_lifecycle_happy_path() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
let w2 = InputId::new();
admit_queued(&mut auth, w1.clone(), HandlingMode::Queue);
admit_queued(&mut auth, w2.clone(), HandlingMode::Queue);
let run_id = RunId::new();
auth.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: run_id.clone(),
contributing_work_ids: vec![w1.clone(), w2.clone()],
})
.unwrap();
auth.apply(RuntimeIngressInput::BoundaryApplied {
run_id: run_id.clone(),
boundary_sequence: 1,
})
.unwrap();
auth.apply(RuntimeIngressInput::RunCompleted {
run_id: run_id.clone(),
})
.unwrap();
assert_eq!(
auth.lifecycle_state(&w1),
Some(InputLifecycleState::Consumed)
);
assert_eq!(
auth.lifecycle_state(&w2),
Some(InputLifecycleState::Consumed)
);
assert!(auth.current_run().is_none());
assert!(auth.queue().is_empty());
}
#[test]
fn destroy_abandons_queued_inputs() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
let w2 = InputId::new();
admit_queued(&mut auth, w1.clone(), HandlingMode::Queue);
admit_queued(&mut auth, w2.clone(), HandlingMode::Queue);
let t = auth.apply(RuntimeIngressInput::Destroy).unwrap();
assert_eq!(
auth.lifecycle_state(&w1),
Some(InputLifecycleState::Abandoned)
);
assert_eq!(
auth.lifecycle_state(&w2),
Some(InputLifecycleState::Abandoned)
);
assert!(auth.queue().is_empty());
assert!(auth.is_terminal());
let completion_count = t
.effects
.iter()
.filter(|e| matches!(e, RuntimeIngressEffect::CompletionResolved { .. }))
.count();
assert_eq!(completion_count, 2);
}
#[test]
fn reset_abandons_and_returns_to_active() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
admit_queued(&mut auth, w1.clone(), HandlingMode::Queue);
let w2 = InputId::new();
admit_consumed(&mut auth, w2.clone());
auth.apply(RuntimeIngressInput::Reset).unwrap();
assert_eq!(auth.phase(), IngressPhase::Active);
assert_eq!(
auth.lifecycle_state(&w1),
Some(InputLifecycleState::Abandoned)
);
assert_eq!(
auth.lifecycle_state(&w2),
Some(InputLifecycleState::Consumed)
);
assert!(auth.queue().is_empty());
}
#[test]
fn reset_rejected_during_run() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
admit_queued(&mut auth, w1.clone(), HandlingMode::Queue);
auth.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: RunId::new(),
contributing_work_ids: vec![w1],
})
.unwrap();
let result = auth.apply(RuntimeIngressInput::Reset);
assert!(matches!(
result,
Err(RuntimeIngressError::GuardFailed { .. })
));
}
#[test]
fn retired_can_stage_and_complete_drain() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
admit_queued(&mut auth, w1.clone(), HandlingMode::Queue);
auth.apply(RuntimeIngressInput::Retire).unwrap();
assert_eq!(auth.phase(), IngressPhase::Retired);
let run_id = RunId::new();
auth.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: run_id.clone(),
contributing_work_ids: vec![w1.clone()],
})
.unwrap();
auth.apply(RuntimeIngressInput::BoundaryApplied {
run_id: run_id.clone(),
boundary_sequence: 1,
})
.unwrap();
auth.apply(RuntimeIngressInput::RunCompleted {
run_id: run_id.clone(),
})
.unwrap();
assert_eq!(
auth.lifecycle_state(&w1),
Some(InputLifecycleState::Consumed)
);
assert_eq!(auth.phase(), IngressPhase::Retired);
}
#[test]
fn can_accept_probes_without_mutation() {
let auth = RuntimeIngressAuthority::new();
assert!(auth.can_accept(&RuntimeIngressInput::Retire));
assert!(auth.can_accept(&RuntimeIngressInput::Reset));
assert_eq!(auth.phase(), IngressPhase::Active);
}
#[test]
fn can_accept_reset_from_active() {
let auth = RuntimeIngressAuthority::new();
assert!(auth.can_accept(&RuntimeIngressInput::Reset));
assert_eq!(auth.phase(), IngressPhase::Active); }
#[test]
fn phase_unchanged_on_rejected_transition() {
let mut auth = RuntimeIngressAuthority::new();
auth.apply(RuntimeIngressInput::Retire).unwrap();
let result = auth.apply(RuntimeIngressInput::Retire);
assert!(result.is_err());
assert_eq!(auth.phase(), IngressPhase::Retired);
}
#[test]
fn run_failed_rolls_back_steer_to_steer_queue() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
admit_queued(&mut auth, w1.clone(), HandlingMode::Steer);
let run_id = RunId::new();
auth.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: run_id.clone(),
contributing_work_ids: vec![w1.clone()],
})
.unwrap();
auth.apply(RuntimeIngressInput::RunFailed {
run_id: run_id.clone(),
})
.unwrap();
assert!(auth.steer_queue().contains(&w1));
assert!(!auth.queue().contains(&w1));
}
#[test]
fn take_wake_requested_clears_flag() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
admit_queued(&mut auth, w1, HandlingMode::Queue);
assert!(auth.take_wake_requested());
assert!(!auth.take_wake_requested()); }
#[test]
fn take_process_requested_clears_flag() {
let mut auth = RuntimeIngressAuthority::new();
let w1 = InputId::new();
admit_queued(&mut auth, w1, HandlingMode::Steer);
assert!(auth.take_process_requested());
assert!(!auth.take_process_requested()); }
}