use std::collections::BTreeSet;
use meerkat_core::lifecycle::{InputId, RunEvent, RunId};
use meerkat_core::types::HandlingMode;
use crate::accept::AcceptOutcome;
use crate::durability::{DurabilityError, validate_durability};
use crate::identifiers::LogicalRuntimeId;
use crate::input::Input;
use crate::input_ledger::InputLedger;
use crate::input_lifecycle_authority::{InputLifecycleError, InputLifecycleInput};
use crate::input_state::{InputAbandonReason, InputLifecycleState, InputState, PolicySnapshot};
use crate::policy::{PolicyDecision, RoutingDisposition};
use crate::policy_table::DefaultPolicyTable;
use crate::queue::InputQueue;
use crate::runtime_control_authority::{
RuntimeControlAuthority, RuntimeControlInput, RuntimeControlMutator,
};
use crate::runtime_event::{
InputLifecycleEvent, RuntimeEvent, RuntimeEventEnvelope, RuntimeStateChangeEvent,
};
use crate::runtime_ingress_authority::{
ContentShape, RequestId, ReservationKey, RuntimeIngressAuthority, RuntimeIngressEffect,
RuntimeIngressInput, RuntimeIngressMutator,
};
use crate::runtime_state::RuntimeState;
use crate::traits::{
DestroyReport, RecoveryReport, ResetReport, RetireReport, RuntimeControlCommand,
RuntimeDriverError,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum PostAdmissionSignal {
None,
WakeLoop,
RequestImmediateProcessing,
}
impl PostAdmissionSignal {
pub fn should_wake(self) -> bool {
self >= Self::WakeLoop
}
pub fn should_process_immediately(self) -> bool {
self == Self::RequestImmediateProcessing
}
}
pub(crate) fn handling_mode_from_policy(policy: &crate::policy::PolicyDecision) -> HandlingMode {
match policy.routing_disposition {
RoutingDisposition::Steer => HandlingMode::Steer,
_ => HandlingMode::Queue,
}
}
pub(crate) fn requests_immediate_processing(input: &Input) -> bool {
matches!(input.handling_mode(), Some(HandlingMode::Steer))
}
#[derive(Clone)]
pub struct EphemeralRuntimeDriver {
runtime_id: LogicalRuntimeId,
control: RuntimeControlAuthority,
ledger: InputLedger,
queue: InputQueue,
steer_queue: InputQueue,
events: Vec<RuntimeEventEnvelope>,
post_admission_signal: PostAdmissionSignal,
silent_comms_intents: Vec<String>,
ingress: RuntimeIngressAuthority,
}
impl EphemeralRuntimeDriver {
pub fn new(runtime_id: LogicalRuntimeId) -> Self {
Self {
runtime_id,
control: RuntimeControlAuthority::from_state(RuntimeState::Idle),
ledger: InputLedger::new(),
queue: InputQueue::new(),
steer_queue: InputQueue::new(),
events: Vec::new(),
post_admission_signal: PostAdmissionSignal::None,
silent_comms_intents: Vec::new(),
ingress: RuntimeIngressAuthority::new(),
}
}
pub fn set_silent_comms_intents(&mut self, intents: Vec<String>) {
let overrides = intents.into_iter().collect::<BTreeSet<_>>();
match self
.ingress
.apply(RuntimeIngressInput::SetSilentIntentOverrides { intents: overrides })
{
Ok(transition) => {
self.process_ingress_effects(&transition.effects);
self.silent_comms_intents = self
.ingress
.silent_intent_overrides()
.iter()
.cloned()
.collect();
}
Err(err) => {
tracing::warn!(
error = %err,
"ingress authority rejected SetSilentIntentOverrides"
);
}
}
}
pub fn silent_comms_intents(&self) -> Vec<String> {
self.silent_comms_intents.clone()
}
pub fn ingress(&self) -> &RuntimeIngressAuthority {
&self.ingress
}
fn build_projection_queue(&self, ids: &[InputId], lane: &str) -> InputQueue {
let mut queue = InputQueue::new();
for input_id in ids {
match self
.ledger
.get(input_id)
.and_then(|state| state.persisted_input.clone())
{
Some(input) => queue.enqueue(input_id.clone(), input),
None => {
tracing::error!(
input_id = ?input_id,
lane,
"ingress queue references input without persisted payload"
);
debug_assert!(
false,
"ingress queue projection missing persisted payload for {input_id:?} in {lane}"
);
}
}
}
queue
}
fn rebuild_queue_projections(&mut self) {
self.queue = self.build_projection_queue(self.ingress.queue(), "queue");
self.steer_queue = self.build_projection_queue(self.ingress.steer_queue(), "steer_queue");
}
fn debug_assert_queue_projection_alignment(&self) {
debug_assert_eq!(
self.queue.input_ids(),
self.ingress.queue(),
"physical queue must match canonical ingress queue lane"
);
debug_assert_eq!(
self.steer_queue.input_ids(),
self.ingress.steer_queue(),
"physical steer queue must match canonical ingress steer lane"
);
}
#[allow(clippy::too_many_arguments)]
pub fn admit_recovered_to_ingress(
&mut self,
work_id: InputId,
content_shape: ContentShape,
handling_mode: HandlingMode,
lifecycle_state: InputLifecycleState,
policy: PolicyDecision,
request_id: Option<RequestId>,
reservation_key: Option<ReservationKey>,
) -> Result<(), RuntimeDriverError> {
match self.ingress.apply(RuntimeIngressInput::AdmitRecovered {
work_id,
content_shape,
handling_mode,
lifecycle_state,
policy,
request_id,
reservation_key,
}) {
Ok(transition) => {
self.process_ingress_effects(&transition.effects);
Ok(())
}
Err(err) => Err(RuntimeDriverError::Internal(format!(
"ingress AdmitRecovered failed: {err}"
))),
}
}
fn process_ingress_effects(&mut self, effects: &[RuntimeIngressEffect]) {
for effect in effects {
match effect {
RuntimeIngressEffect::WakeRuntime => {
if self.post_admission_signal < PostAdmissionSignal::WakeLoop {
self.post_admission_signal = PostAdmissionSignal::WakeLoop;
}
}
RuntimeIngressEffect::RequestImmediateProcessing => {
self.post_admission_signal = PostAdmissionSignal::RequestImmediateProcessing;
}
RuntimeIngressEffect::IngressAccepted { work_id } => {
tracing::debug!(
work_id = ?work_id,
"ingress authority: input accepted"
);
}
RuntimeIngressEffect::Deduplicated {
work_id,
existing_id,
} => {
tracing::debug!(
work_id = ?work_id,
existing_id = ?existing_id,
"ingress authority: input deduplicated"
);
}
RuntimeIngressEffect::CompletionResolved { work_id, outcome } => {
tracing::debug!(
work_id = ?work_id,
outcome = ?outcome,
"ingress authority: completion resolved"
);
}
RuntimeIngressEffect::InputLifecycleNotice { work_id, new_state } => {
tracing::trace!(
work_id = ?work_id,
new_state = ?new_state,
"ingress authority: lifecycle notice"
);
}
RuntimeIngressEffect::ReadyForRun {
run_id,
contributing_work_ids,
} => {
tracing::debug!(
run_id = ?run_id,
contributors = contributing_work_ids.len(),
"ingress authority: ready for run"
);
}
RuntimeIngressEffect::IngressNotice { kind, detail } => {
tracing::debug!(
kind = %kind,
detail = %detail,
"ingress authority: notice"
);
}
RuntimeIngressEffect::SilentIntentApplied { work_id, intent } => {
tracing::debug!(
work_id = ?work_id,
intent = %intent,
"ingress authority: silent intent applied"
);
}
RuntimeIngressEffect::StageInput { work_id, run_id } => {
if let Some(state) = self.ledger.get_mut(work_id)
&& let Err(e) = state.apply(InputLifecycleInput::StageForRun {
run_id: run_id.clone(),
})
{
tracing::warn!(
work_id = ?work_id,
run_id = ?run_id,
error = %e,
"per-input StageForRun failed (driven by ingress StageInput effect)"
);
}
self.emit_event(RuntimeEvent::InputLifecycle(InputLifecycleEvent::Staged {
input_id: work_id.clone(),
run_id: run_id.clone(),
}));
}
RuntimeIngressEffect::PersistAndQueue { .. }
| RuntimeIngressEffect::EnqueueTo { .. }
| RuntimeIngressEffect::EnqueueFront { .. }
| RuntimeIngressEffect::RemoveFromQueues { .. }
| RuntimeIngressEffect::CoalesceExisting { .. }
| RuntimeIngressEffect::SupersedeExisting { .. }
| RuntimeIngressEffect::ConsumeOnAccept { .. }
| RuntimeIngressEffect::EmitQueuedEvent { .. } => {
tracing::trace!(
effect = ?effect,
"ingress authority: accept-phase effect (handled in accept path)"
);
}
RuntimeIngressEffect::RecoverConsumeOnAccept { work_id }
| RuntimeIngressEffect::RecoverRollback { work_id }
| RuntimeIngressEffect::RecoverKeep { work_id } => {
tracing::trace!(
work_id = ?work_id,
effect = ?effect,
"ingress authority: recovery effect"
);
}
}
}
}
fn process_accept_effects(&mut self, effects: &[RuntimeIngressEffect], input: &Input) {
for effect in effects {
match effect {
RuntimeIngressEffect::PersistAndQueue { work_id } => {
if let Some(s) = self.ledger.get_mut(work_id) {
s.persisted_input = Some(input.clone());
let _ = s.apply(InputLifecycleInput::QueueAccepted);
}
}
RuntimeIngressEffect::EnqueueTo { work_id, target } => {
self.enqueue_to(*target, work_id.clone(), input.clone());
}
RuntimeIngressEffect::EnqueueFront { work_id, target } => {
self.enqueue_front_to(*target, work_id.clone(), input.clone());
}
RuntimeIngressEffect::RemoveFromQueues { work_id } => {
let _ = self.queue.remove(work_id);
let _ = self.steer_queue.remove(work_id);
}
RuntimeIngressEffect::CoalesceExisting {
new_id,
existing_id,
} => {
if let Some(existing_state) = self.ledger.get_mut(existing_id) {
let _ = crate::coalescing::apply_coalescing(existing_state, new_id.clone());
}
}
RuntimeIngressEffect::SupersedeExisting {
new_id,
existing_id,
} => {
if let Some(existing_state) = self.ledger.get_mut(existing_id) {
let _ =
crate::coalescing::apply_supersession(existing_state, new_id.clone());
}
}
RuntimeIngressEffect::ConsumeOnAccept { work_id } => {
if let Some(s) = self.ledger.get_mut(work_id) {
let _ = s.apply(InputLifecycleInput::ConsumeOnAccept);
}
}
RuntimeIngressEffect::EmitQueuedEvent { work_id } => {
self.emit_event(RuntimeEvent::InputLifecycle(InputLifecycleEvent::Queued {
input_id: work_id.clone(),
}));
}
other => {
self.process_ingress_effects(std::slice::from_ref(other));
}
}
}
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
}
pub fn is_idle(&self) -> bool {
self.control.is_idle()
}
pub fn is_idle_or_attached(&self) -> bool {
self.control.phase().is_idle_or_attached()
}
pub fn attach(&mut self) -> Result<(), crate::runtime_state::RuntimeStateTransitionError> {
let transition = self.control.apply(RuntimeControlInput::AttachExecutor)?;
self.emit_event(RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
from: transition.from_phase,
to: transition.next_phase,
}));
Ok(())
}
pub fn detach(
&mut self,
) -> Result<Option<RuntimeState>, crate::runtime_state::RuntimeStateTransitionError> {
if self.control.is_attached() {
let transition = self.control.apply(RuntimeControlInput::DetachExecutor)?;
self.emit_event(RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
from: transition.from_phase,
to: transition.next_phase,
}));
Ok(Some(RuntimeState::Attached))
} else {
Ok(None)
}
}
pub fn start_run(
&mut self,
run_id: RunId,
) -> Result<(), crate::runtime_state::RuntimeStateTransitionError> {
let _transition = self
.control
.apply(RuntimeControlInput::BeginRun { run_id })?;
Ok(())
}
pub fn complete_run(
&mut self,
) -> Result<RunId, crate::runtime_state::RuntimeStateTransitionError> {
let run_id = self.control.current_run_id().cloned().ok_or(
crate::runtime_state::RuntimeStateTransitionError {
from: self.control.phase(),
to: RuntimeState::Idle,
},
)?;
let _transition = self.control.apply(RuntimeControlInput::RunCompleted {
run_id: run_id.clone(),
})?;
Ok(run_id)
}
pub fn take_post_admission_signal(&mut self) -> PostAdmissionSignal {
std::mem::replace(&mut self.post_admission_signal, PostAdmissionSignal::None)
}
pub fn take_wake_requested(&mut self) -> bool {
self.post_admission_signal.should_wake()
}
pub fn take_process_requested(&mut self) -> bool {
let signal = std::mem::replace(&mut self.post_admission_signal, PostAdmissionSignal::None);
signal.should_process_immediately()
}
pub fn drain_events(&mut self) -> Vec<RuntimeEventEnvelope> {
std::mem::take(&mut self.events)
}
pub fn queue(&self) -> &InputQueue {
&self.queue
}
pub fn steer_queue(&self) -> &InputQueue {
&self.steer_queue
}
#[cfg(test)]
pub fn queue_mut(&mut self) -> &mut InputQueue {
&mut self.queue
}
#[cfg(test)]
pub fn steer_queue_mut(&mut self) -> &mut InputQueue {
&mut self.steer_queue
}
fn enqueue_to(&mut self, mode: HandlingMode, input_id: InputId, input: Input) {
match mode {
HandlingMode::Steer => self.steer_queue.enqueue(input_id, input),
HandlingMode::Queue => self.queue.enqueue(input_id, input),
}
}
fn enqueue_front_to(&mut self, mode: HandlingMode, input_id: InputId, input: Input) {
match mode {
HandlingMode::Steer => self.steer_queue.enqueue_front(input_id, input),
HandlingMode::Queue => self.queue.enqueue_front(input_id, input),
}
}
pub fn has_queued_input(&self, input_id: &InputId) -> bool {
self.ingress
.queue()
.iter()
.any(|queued_id| queued_id == input_id)
|| self
.ingress
.steer_queue()
.iter()
.any(|queued_id| queued_id == input_id)
}
pub fn has_queued_input_outside(&self, excluded: &[InputId]) -> bool {
self.ingress
.queue()
.iter()
.chain(self.ingress.steer_queue().iter())
.any(|queued_id| !excluded.iter().any(|excluded_id| excluded_id == queued_id))
}
fn existing_superseded_input(
&self,
input: &Input,
) -> Option<(InputId, crate::coalescing::CoalescingResult)> {
self.ingress
.queue()
.iter()
.chain(self.ingress.steer_queue().iter())
.find_map(|queued_id| {
let existing = self.ledger.get(queued_id)?.persisted_input.as_ref()?;
let result =
crate::coalescing::check_supersession(input, existing, &self.runtime_id);
match result {
crate::coalescing::CoalescingResult::Supersedes { .. } => {
Some((queued_id.clone(), result))
}
crate::coalescing::CoalescingResult::Standalone => None,
}
})
}
pub fn ledger(&self) -> &InputLedger {
&self.ledger
}
pub(crate) fn ledger_mut(&mut self) -> &mut InputLedger {
&mut self.ledger
}
pub fn input_states_snapshot(&self) -> Vec<InputState> {
self.ledger.iter().map(|(_, state)| state.clone()).collect()
}
pub fn control(&self) -> &RuntimeControlAuthority {
&self.control
}
pub fn control_mut(&mut self) -> &mut RuntimeControlAuthority {
&mut self.control
}
pub fn clear_queue_projections(&mut self) {
self.queue = InputQueue::new();
self.steer_queue = InputQueue::new();
}
pub fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
let queued = self
.steer_queue
.dequeue()
.or_else(|| self.queue.dequeue())?;
Some((queued.input_id, queued.input))
}
pub fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, Input)> {
self.steer_queue
.dequeue_by_id(input_id)
.or_else(|| self.queue.dequeue_by_id(input_id))
}
#[allow(dead_code)] pub fn persisted_input(&self, input_id: &InputId) -> Option<&Input> {
self.ledger
.get(input_id)
.and_then(|state| state.persisted_input.as_ref())
}
pub fn stage_input(
&mut self,
input_id: &InputId,
run_id: &RunId,
) -> Result<(), InputLifecycleError> {
self.stage_batch(std::slice::from_ref(input_id), run_id)
}
pub fn stage_batch(
&mut self,
input_ids: &[InputId],
run_id: &RunId,
) -> Result<(), InputLifecycleError> {
match self.ingress.apply(RuntimeIngressInput::StageDrainSnapshot {
run_id: run_id.clone(),
contributing_work_ids: input_ids.to_vec(),
}) {
Ok(transition) => {
self.process_ingress_effects(&transition.effects);
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
}
Err(err) => {
tracing::warn!(
input_ids = ?input_ids,
run_id = ?run_id,
error = %err,
"ingress authority rejected StageDrainSnapshot"
);
return Err(InputLifecycleError::InvalidTransition {
from: InputLifecycleState::Queued,
input: format!("StageDrainSnapshot rejected: {err}"),
});
}
}
Ok(())
}
pub fn apply_input(
&mut self,
input_id: &InputId,
run_id: &RunId,
) -> Result<(), InputLifecycleError> {
let state =
self.ledger
.get_mut(input_id)
.ok_or(InputLifecycleError::InvalidTransition {
from: InputLifecycleState::Staged,
input: "MarkApplied (input not found)".into(),
})?;
state.apply(InputLifecycleInput::MarkApplied {
run_id: run_id.clone(),
})?;
state.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
boundary_sequence: 0,
})?;
self.emit_event(RuntimeEvent::InputLifecycle(InputLifecycleEvent::Applied {
input_id: input_id.clone(),
run_id: run_id.clone(),
}));
Ok(())
}
pub fn consume_inputs(
&mut self,
input_ids: &[InputId],
run_id: &RunId,
) -> Result<(), InputLifecycleError> {
for input_id in input_ids {
if let Some(state) = self.ledger.get_mut(input_id) {
match state.apply(InputLifecycleInput::Consume) {
Ok(_) => {
self.events
.push(self.make_envelope(RuntimeEvent::InputLifecycle(
InputLifecycleEvent::Consumed {
input_id: input_id.clone(),
run_id: run_id.clone(),
},
)));
}
Err(
InputLifecycleError::InvalidTransition { .. }
| InputLifecycleError::TerminalState { .. },
) => {}
Err(err) => return Err(err),
}
}
}
Ok(())
}
pub fn rollback_staged(&mut self, input_ids: &[InputId]) -> Result<(), InputLifecycleError> {
for input_id in input_ids {
let mut requeue_input = None;
let mut is_steer = false;
if let Some(state) = self.ledger.get_mut(input_id) {
match state.apply(InputLifecycleInput::RollbackStaged) {
Ok(transition) => {
if transition.next_phase == crate::input_state::InputLifecycleState::Queued
{
requeue_input = state.persisted_input.clone();
is_steer = requeue_input
.as_ref()
.and_then(super::super::input::Input::handling_mode)
== Some(HandlingMode::Steer);
} else {
tracing::warn!(
input_id = %input_id,
next_phase = ?transition.next_phase,
"input abandoned after max stage attempts"
);
}
}
Err(
InputLifecycleError::InvalidTransition { .. }
| InputLifecycleError::TerminalState { .. },
) => {}
Err(err) => return Err(err),
}
}
if !self.has_queued_input(input_id)
&& let Some(input) = requeue_input
{
if is_steer {
self.steer_queue.enqueue(input_id.clone(), input);
} else {
self.queue.enqueue(input_id.clone(), input);
}
}
}
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
Ok(())
}
pub fn retire(&mut self) -> Result<RetireReport, RuntimeDriverError> {
match self.ingress.apply(RuntimeIngressInput::Retire) {
Ok(transition) => {
self.process_ingress_effects(&transition.effects);
}
Err(err) => {
tracing::warn!(
error = %err,
"ingress authority rejected Retire"
);
}
}
let transition = self
.control
.apply(RuntimeControlInput::RetireRequested)
.map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
self.emit_event(RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
from: transition.from_phase,
to: transition.next_phase,
}));
let inputs_pending_drain = self.ledger.iter().filter(|(_, s)| !s.is_terminal()).count();
Ok(RetireReport {
inputs_abandoned: 0,
inputs_pending_drain,
})
}
pub fn reset(&mut self) -> Result<ResetReport, RuntimeDriverError> {
match self.ingress.apply(RuntimeIngressInput::Reset) {
Ok(transition) => {
self.process_ingress_effects(&transition.effects);
}
Err(err) => {
tracing::warn!(
error = %err,
"ingress authority rejected Reset"
);
}
}
let abandoned = self.abandon_all_non_terminal(InputAbandonReason::Reset);
self.queue.drain();
self.steer_queue.drain();
self.post_admission_signal = PostAdmissionSignal::None;
match self.control.apply(RuntimeControlInput::ResetRequested) {
Ok(transition) => {
self.emit_event(RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
from: transition.from_phase,
to: transition.next_phase,
}));
}
Err(err) if err.from == RuntimeState::Idle => {}
Err(err) => {
return Err(RuntimeDriverError::Internal(err.to_string()));
}
}
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
Ok(ResetReport {
inputs_abandoned: abandoned,
})
}
pub fn destroy(&mut self) -> Result<usize, RuntimeDriverError> {
match self.ingress.apply(RuntimeIngressInput::Destroy) {
Ok(transition) => {
self.process_ingress_effects(&transition.effects);
}
Err(err) => {
tracing::warn!(
error = %err,
"ingress authority rejected Destroy"
);
}
}
let transition = self
.control
.apply(RuntimeControlInput::DestroyRequested)
.map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
self.emit_event(RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
from: transition.from_phase,
to: transition.next_phase,
}));
let abandoned = self.abandon_all_non_terminal(InputAbandonReason::Destroyed);
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
Ok(abandoned)
}
pub fn recover_ephemeral(&mut self) -> RecoveryReport {
let recovery_effects = match self.ingress.apply(RuntimeIngressInput::Recover) {
Ok(transition) => {
self.process_ingress_effects(&transition.effects);
transition.effects
}
Err(err) => {
tracing::warn!(
error = %err,
"ingress authority rejected Recover"
);
Vec::new()
}
};
let mut recovered = 0;
let mut abandoned = 0;
let mut requeued = 0;
for effect in &recovery_effects {
match effect {
RuntimeIngressEffect::RecoverConsumeOnAccept { work_id } => {
if let Some(state) = self.ledger.get_mut(work_id) {
let _ = state.apply(InputLifecycleInput::ConsumeOnAccept);
abandoned += 1;
recovered += 1;
}
}
RuntimeIngressEffect::RecoverRollback { work_id } => {
if let Some(state) = self.ledger.get_mut(work_id) {
match state.current_state() {
InputLifecycleState::Accepted => {
let _ = state.apply(InputLifecycleInput::QueueAccepted);
}
InputLifecycleState::Staged => {
let _ = state.apply(InputLifecycleInput::RollbackStaged);
}
_ => {}
}
requeued += 1;
recovered += 1;
}
}
RuntimeIngressEffect::RecoverKeep { work_id } => {
if self.ledger.get(work_id).is_some() {
recovered += 1;
}
}
_ => {}
}
}
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
RecoveryReport {
inputs_recovered: recovered,
inputs_abandoned: abandoned,
inputs_requeued: requeued,
details: Vec::new(),
}
}
pub fn recycle_preserving_work(&mut self) -> Result<usize, RuntimeDriverError> {
let transition = self
.control
.apply(RuntimeControlInput::RecycleRequested)
.map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
self.emit_event(RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
from: transition.from_phase,
to: transition.next_phase,
}));
let transferred = self.ledger.active_input_ids().len();
let runtime_id = self.runtime_id.clone();
let silent_comms_intents = self.silent_comms_intents.clone();
let ledger = self.ledger.clone();
let ingress = self.ingress.clone();
let control = self.control.clone();
*self = Self::new(runtime_id);
self.silent_comms_intents = silent_comms_intents;
self.ledger = ledger;
self.ingress = ingress;
self.control = control;
let _ = self.recover_ephemeral();
let transition = self
.control
.apply(RuntimeControlInput::RecycleSucceeded)
.map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
self.emit_event(RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
from: transition.from_phase,
to: transition.next_phase,
}));
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
Ok(transferred)
}
fn emit_event(&mut self, event: RuntimeEvent) {
self.events.push(self.make_envelope(event));
}
fn make_envelope(&self, event: RuntimeEvent) -> RuntimeEventEnvelope {
RuntimeEventEnvelope {
id: crate::identifiers::RuntimeEventId::new(),
timestamp: chrono::Utc::now(),
runtime_id: self.runtime_id.clone(),
event,
causation_id: None,
correlation_id: None,
}
}
pub fn abandon_all_non_terminal(&mut self, reason: InputAbandonReason) -> usize {
let non_terminal_ids: Vec<InputId> = self
.ledger
.iter()
.filter(|(_, s)| !s.is_terminal())
.map(|(id, _)| id.clone())
.collect();
let mut count = 0;
for id in &non_terminal_ids {
if let Some(state) = self.ledger.get_mut(id)
&& state
.apply(InputLifecycleInput::Abandon {
reason: reason.clone(),
})
.is_ok()
{
count += 1;
self.events
.push(self.make_envelope(RuntimeEvent::InputLifecycle(
InputLifecycleEvent::Abandoned {
input_id: id.clone(),
reason: format!("{reason:?}"),
},
)));
}
}
count
}
pub fn abandon_pending_inputs(&mut self, reason: InputAbandonReason) -> usize {
let abandoned = self.abandon_all_non_terminal(reason);
self.queue.drain();
self.steer_queue.drain();
self.post_admission_signal = PostAdmissionSignal::None;
self.rebuild_queue_projections();
self.debug_assert_queue_projection_alignment();
abandoned
}
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl crate::traits::RuntimeDriver for EphemeralRuntimeDriver {
async fn accept_input(&mut self, input: Input) -> Result<AcceptOutcome, RuntimeDriverError> {
let control_phase = self.control.phase();
match control_phase.can_accept_input() {
true => {}
false => {
return Err(RuntimeDriverError::NotReady {
state: control_phase,
});
}
}
if let Err(e) = validate_durability(&input) {
match e {
DurabilityError::DerivedForbidden { .. }
| DurabilityError::ExternalDerivedForbidden => {
return Ok(AcceptOutcome::Rejected {
reason: crate::accept::RejectReason::DurabilityViolation {
detail: e.to_string(),
},
});
}
}
}
if let Err(e) = crate::peer_handling_mode::validate_peer_handling_mode(&input) {
return Ok(AcceptOutcome::Rejected {
reason: crate::accept::RejectReason::PeerHandlingModeInvalid {
detail: e.to_string(),
},
});
}
let input_id = input.id().clone();
let mut state = InputState::new_accepted(input_id.clone());
state.durability = Some(input.header().durability);
state.idempotency_key = input.header().idempotency_key.clone();
if let Some(ref key) = input.header().idempotency_key {
if let Some(existing_id) = self
.ledger
.accept_with_idempotency(state.clone(), key.clone())
{
match self.ingress.apply(RuntimeIngressInput::AdmitDeduplicated {
work_id: input_id.clone(),
existing_id: existing_id.clone(),
}) {
Ok(transition) => {
self.process_ingress_effects(&transition.effects);
}
Err(err) => {
tracing::warn!(
input_id = ?input_id,
existing_id = ?existing_id,
error = %err,
"ingress authority rejected AdmitDeduplicated"
);
}
}
self.emit_event(RuntimeEvent::InputLifecycle(
InputLifecycleEvent::Deduplicated {
input_id: input_id.clone(),
existing_id: existing_id.clone(),
},
));
return Ok(AcceptOutcome::Deduplicated {
input_id,
existing_id,
});
}
} else {
self.ledger.accept(state.clone());
}
let runtime_idle = self.control.phase().is_idle_or_attached();
let mut policy = DefaultPolicyTable::resolve(&input, runtime_idle);
crate::silent_intent::apply_silent_intent_override(
&input,
&self.silent_comms_intents,
&mut policy,
);
if let Some(s) = self.ledger.get_mut(&input_id) {
s.policy = Some(PolicySnapshot {
version: policy.policy_version,
decision: policy.clone(),
});
}
self.emit_event(RuntimeEvent::InputLifecycle(
InputLifecycleEvent::Accepted {
input_id: input_id.clone(),
},
));
let handling_mode = handling_mode_from_policy(&policy);
let request_immediate_processing = requests_immediate_processing(&input);
let content_shape = ContentShape(input.kind_id().to_string());
let is_prompt = matches!(input, Input::Prompt(_));
let existing_superseded_id = self.existing_superseded_input(&input).map(|(id, _)| id);
match self.ingress.admit(
input_id.clone(),
content_shape,
handling_mode,
request_immediate_processing,
is_prompt,
None, None, policy.clone(),
existing_superseded_id,
) {
Ok(transition) => {
self.process_accept_effects(&transition.effects, &input);
}
Err(err) => {
tracing::warn!(
input_id = ?input_id,
error = %err,
"ingress authority rejected accept_input"
);
}
}
let final_state = self.ledger.get(&input_id).cloned().unwrap_or_else(|| state);
Ok(AcceptOutcome::Accepted {
input_id,
policy,
state: final_state,
})
}
async fn on_runtime_event(
&mut self,
_event: RuntimeEventEnvelope,
) -> Result<(), RuntimeDriverError> {
Ok(())
}
async fn on_run_event(&mut self, event: RunEvent) -> Result<(), RuntimeDriverError> {
match event {
RunEvent::RunCompleted {
run_id,
consumed_input_ids,
} => {
match self.ingress.apply(RuntimeIngressInput::RunCompleted {
run_id: run_id.clone(),
}) {
Ok(transition) => {
self.process_ingress_effects(&transition.effects);
}
Err(err) => {
tracing::warn!(
run_id = ?run_id,
error = %err,
"ingress authority rejected RunCompleted"
);
}
}
self.consume_inputs(&consumed_input_ids, &run_id)
.map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
self.control
.apply(RuntimeControlInput::RunCompleted {
run_id: run_id.clone(),
})
.map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
}
RunEvent::RunFailed { ref run_id, .. } => {
match self.ingress.apply(RuntimeIngressInput::RunFailed {
run_id: run_id.clone(),
}) {
Ok(transition) => {
self.process_ingress_effects(&transition.effects);
}
Err(err) => {
tracing::warn!(
run_id = ?run_id,
error = %err,
"ingress authority rejected RunFailed"
);
}
}
let staged_ids: Vec<InputId> = self
.ledger
.iter()
.filter(|(_, s)| s.current_state() == InputLifecycleState::Staged)
.map(|(id, _)| id.clone())
.collect();
self.rollback_staged(&staged_ids)
.map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
self.control
.apply(RuntimeControlInput::RunFailed {
run_id: run_id.clone(),
})
.map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
}
RunEvent::RunCancelled { ref run_id, .. } => {
match self.ingress.apply(RuntimeIngressInput::RunCancelled {
run_id: run_id.clone(),
}) {
Ok(transition) => {
self.process_ingress_effects(&transition.effects);
}
Err(err) => {
tracing::warn!(
run_id = ?run_id,
error = %err,
"ingress authority rejected RunCancelled"
);
}
}
let staged_ids: Vec<InputId> = self
.ledger
.iter()
.filter(|(_, s)| s.current_state() == InputLifecycleState::Staged)
.map(|(id, _)| id.clone())
.collect();
self.rollback_staged(&staged_ids)
.map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
self.control
.apply(RuntimeControlInput::RunCancelled {
run_id: run_id.clone(),
})
.map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
}
RunEvent::RunStarted { .. } => {}
RunEvent::BoundaryApplied {
run_id, receipt, ..
} => {
if self.ingress.current_run() == Some(&run_id) {
match self.ingress.apply(RuntimeIngressInput::BoundaryApplied {
run_id: run_id.clone(),
boundary_sequence: receipt.sequence,
}) {
Ok(transition) => {
self.process_ingress_effects(&transition.effects);
}
Err(err) => {
tracing::warn!(
run_id = ?run_id,
boundary_sequence = receipt.sequence,
error = %err,
"ingress authority rejected BoundaryApplied"
);
}
}
}
for input_id in &receipt.contributing_input_ids {
if let Some(state) = self.ledger.get_mut(input_id) {
let applied = state
.apply(InputLifecycleInput::MarkApplied {
run_id: run_id.clone(),
})
.is_ok();
let _ = state.apply(InputLifecycleInput::MarkAppliedPendingConsumption {
boundary_sequence: receipt.sequence,
});
if applied {
self.events
.push(self.make_envelope(RuntimeEvent::InputLifecycle(
InputLifecycleEvent::Applied {
input_id: input_id.clone(),
run_id: run_id.clone(),
},
)));
}
}
}
}
_ => {}
}
Ok(())
}
async fn on_runtime_control(
&mut self,
command: RuntimeControlCommand,
) -> Result<(), RuntimeDriverError> {
match command {
RuntimeControlCommand::Stop => {
match self.ingress.apply(RuntimeIngressInput::Destroy) {
Ok(transition) => {
self.process_ingress_effects(&transition.effects);
}
Err(err) => {
tracing::warn!(
error = %err,
"ingress authority rejected Destroy on Stop"
);
}
}
let transition = self
.control
.apply(RuntimeControlInput::StopRequested)
.map_err(|e| RuntimeDriverError::Internal(e.to_string()))?;
self.emit_event(RuntimeEvent::RuntimeStateChange(RuntimeStateChangeEvent {
from: transition.from_phase,
to: transition.next_phase,
}));
self.abandon_all_non_terminal(InputAbandonReason::Destroyed);
self.queue.drain();
self.steer_queue.drain();
}
RuntimeControlCommand::Resume => {
match self.control.apply(RuntimeControlInput::ResumeRequested) {
Ok(_) => {}
Err(err) if err.from != RuntimeState::Recovering => {}
Err(err) => {
return Err(RuntimeDriverError::Internal(err.to_string()));
}
}
}
}
Ok(())
}
async fn recover(&mut self) -> Result<RecoveryReport, RuntimeDriverError> {
Ok(self.recover_ephemeral())
}
fn runtime_state(&self) -> RuntimeState {
self.control.phase()
}
async fn retire(&mut self) -> Result<RetireReport, RuntimeDriverError> {
EphemeralRuntimeDriver::retire(self)
}
async fn reset(&mut self) -> Result<ResetReport, RuntimeDriverError> {
EphemeralRuntimeDriver::reset(self)
}
async fn destroy(&mut self) -> Result<DestroyReport, RuntimeDriverError> {
let abandoned = EphemeralRuntimeDriver::destroy(self)?;
Ok(DestroyReport {
inputs_abandoned: abandoned,
})
}
fn input_state(&self, input_id: &InputId) -> Option<&InputState> {
self.ledger.get(input_id)
}
fn active_input_ids(&self) -> Vec<InputId> {
self.ledger.active_input_ids()
}
}