use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::sync::Arc;
use meerkat_core::BlobStore;
use meerkat_core::comms_drain_lifecycle_authority::{
CommsDrainLifecycleAuthority, CommsDrainLifecycleEffect, CommsDrainMode, DrainExitReason,
};
use meerkat_core::generated::{protocol_comms_drain_abort, protocol_comms_drain_spawn};
use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
use meerkat_core::lifecycle::run_control::RunControlCommand;
use meerkat_core::lifecycle::{InputId, RunId};
use meerkat_core::types::SessionId;
use crate::accept::AcceptOutcome;
use crate::driver::ephemeral::EphemeralRuntimeDriver;
use crate::driver::persistent::PersistentRuntimeDriver;
use crate::identifiers::LogicalRuntimeId;
use crate::input::Input;
use crate::input_lifecycle_authority::InputLifecycleError;
use crate::input_state::InputState;
use crate::runtime_state::{RuntimeState, RuntimeStateTransitionError};
use crate::service_ext::{RuntimeMode, SessionServiceRuntimeExt};
use crate::store::RuntimeStore;
use crate::tokio;
use crate::tokio::sync::{Mutex, RwLock, mpsc};
use crate::traits::{
DestroyReport, RecoveryReport, RecycleReport, ResetReport, RetireReport,
RuntimeControlPlaneError, RuntimeDriver, RuntimeDriverError,
};
pub(crate) type SharedDriver = Arc<Mutex<DriverEntry>>;
pub(crate) enum DriverEntry {
Ephemeral(EphemeralRuntimeDriver),
Persistent(PersistentRuntimeDriver),
}
impl DriverEntry {
pub(crate) fn as_driver(&self) -> &dyn RuntimeDriver {
match self {
DriverEntry::Ephemeral(d) => d,
DriverEntry::Persistent(d) => d,
}
}
pub(crate) fn as_driver_mut(&mut self) -> &mut dyn RuntimeDriver {
match self {
DriverEntry::Ephemeral(d) => d,
DriverEntry::Persistent(d) => d,
}
}
pub(crate) fn set_silent_comms_intents(&mut self, intents: Vec<String>) {
match self {
DriverEntry::Ephemeral(d) => d.set_silent_comms_intents(intents),
DriverEntry::Persistent(d) => d.set_silent_comms_intents(intents),
}
}
pub(crate) fn is_idle_or_attached(&self) -> bool {
match self {
DriverEntry::Ephemeral(d) => d.is_idle_or_attached(),
DriverEntry::Persistent(d) => d.is_idle_or_attached(),
}
}
pub(crate) fn attach(&mut self) -> Result<(), RuntimeStateTransitionError> {
match self {
DriverEntry::Ephemeral(d) => d.attach(),
DriverEntry::Persistent(d) => d.attach(),
}
}
pub(crate) fn detach(
&mut self,
) -> Result<Option<crate::runtime_state::RuntimeState>, RuntimeStateTransitionError> {
match self {
DriverEntry::Ephemeral(d) => d.detach(),
DriverEntry::Persistent(d) => d.detach(),
}
}
pub(crate) fn can_process_queue(&self) -> bool {
match self {
DriverEntry::Ephemeral(d) => d.control().can_process_queue(),
DriverEntry::Persistent(d) => d.inner_ref().control().can_process_queue(),
}
}
pub(crate) fn take_wake_requested(&mut self) -> bool {
match self {
DriverEntry::Ephemeral(d) => d.take_wake_requested(),
DriverEntry::Persistent(d) => d.take_wake_requested(),
}
}
pub(crate) fn take_process_requested(&mut self) -> bool {
match self {
DriverEntry::Ephemeral(d) => d.take_process_requested(),
DriverEntry::Persistent(d) => d.take_process_requested(),
}
}
pub(crate) fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
match self {
DriverEntry::Ephemeral(d) => d.dequeue_next(),
DriverEntry::Persistent(d) => d.dequeue_next(),
}
}
pub(crate) fn dequeue_by_id(&mut self, input_id: &InputId) -> Option<(InputId, Input)> {
match self {
DriverEntry::Ephemeral(d) => d.dequeue_by_id(input_id),
DriverEntry::Persistent(d) => d.dequeue_by_id(input_id),
}
}
pub(crate) fn ingress(&self) -> &crate::runtime_ingress_authority::RuntimeIngressAuthority {
match self {
DriverEntry::Ephemeral(d) => d.ingress(),
DriverEntry::Persistent(d) => d.inner_ref().ingress(),
}
}
pub(crate) fn has_queued_input_outside(&self, excluded: &[InputId]) -> bool {
match self {
DriverEntry::Ephemeral(d) => d.has_queued_input_outside(excluded),
DriverEntry::Persistent(d) => d.has_queued_input_outside(excluded),
}
}
pub(crate) fn start_run(&mut self, run_id: RunId) -> Result<(), RuntimeStateTransitionError> {
match self {
DriverEntry::Ephemeral(d) => d.start_run(run_id),
DriverEntry::Persistent(d) => d.start_run(run_id),
}
}
pub(crate) fn complete_run(&mut self) -> Result<RunId, RuntimeStateTransitionError> {
match self {
DriverEntry::Ephemeral(d) => d.complete_run(),
DriverEntry::Persistent(d) => d.complete_run(),
}
}
pub(crate) fn stage_input(
&mut self,
input_id: &InputId,
run_id: &RunId,
) -> Result<(), InputLifecycleError> {
match self {
DriverEntry::Ephemeral(d) => d.stage_input(input_id, run_id),
DriverEntry::Persistent(d) => d.stage_input(input_id, run_id),
}
}
pub(crate) fn stage_batch(
&mut self,
input_ids: &[InputId],
run_id: &RunId,
) -> Result<(), InputLifecycleError> {
match self {
DriverEntry::Ephemeral(d) => d.stage_batch(input_ids, run_id),
DriverEntry::Persistent(d) => d.stage_batch(input_ids, run_id),
}
}
pub(crate) fn rollback_staged(
&mut self,
input_ids: &[InputId],
) -> Result<(), InputLifecycleError> {
match self {
DriverEntry::Ephemeral(d) => d.rollback_staged(input_ids),
DriverEntry::Persistent(d) => d.rollback_staged(input_ids),
}
}
pub(crate) async fn abandon_pending_inputs(
&mut self,
reason: crate::input_state::InputAbandonReason,
) -> Result<usize, RuntimeDriverError> {
match self {
DriverEntry::Ephemeral(d) => Ok(d.abandon_pending_inputs(reason)),
DriverEntry::Persistent(d) => d.abandon_pending_inputs(reason).await,
}
}
}
pub(crate) type SharedCompletionRegistry = Arc<Mutex<crate::completion::CompletionRegistry>>;
struct RuntimeSessionEntry {
driver: SharedDriver,
ops_lifecycle: Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
completions: SharedCompletionRegistry,
attachment: Option<RuntimeLoopAttachment>,
}
struct RuntimeLoopAttachment {
wake_tx: mpsc::Sender<()>,
control_tx: mpsc::Sender<RunControlCommand>,
_loop_handle: tokio::task::JoinHandle<()>,
}
impl RuntimeSessionEntry {
fn attachment_is_live(&self) -> bool {
self.attachment
.as_ref()
.map(|attachment| !attachment.wake_tx.is_closed() && !attachment.control_tx.is_closed())
.unwrap_or(false)
}
fn has_attachment(&self) -> bool {
self.attachment_is_live()
}
fn attach_runtime_loop(
&mut self,
wake_tx: mpsc::Sender<()>,
control_tx: mpsc::Sender<RunControlCommand>,
loop_handle: tokio::task::JoinHandle<()>,
) {
self.attachment = Some(RuntimeLoopAttachment {
wake_tx,
control_tx,
_loop_handle: loop_handle,
});
}
fn clear_dead_attachment(&mut self) -> bool {
if self.attachment.is_some() && !self.attachment_is_live() {
self.attachment = None;
return true;
}
false
}
fn wake_sender(&self) -> Option<mpsc::Sender<()>> {
if !self.attachment_is_live() {
return None;
}
self.attachment
.as_ref()
.map(|attachment| attachment.wake_tx.clone())
}
fn control_sender(&self) -> Option<mpsc::Sender<RunControlCommand>> {
if !self.attachment_is_live() {
return None;
}
self.attachment
.as_ref()
.map(|attachment| attachment.control_tx.clone())
}
}
struct CommsDrainSlot {
authority: CommsDrainLifecycleAuthority,
handle: Option<tokio::task::JoinHandle<()>>,
}
impl CommsDrainSlot {
fn new() -> Self {
Self {
authority: CommsDrainLifecycleAuthority::new(),
handle: None,
}
}
}
fn apply_runtime_drain_effects(slot: &mut CommsDrainSlot, effects: &[CommsDrainLifecycleEffect]) {
for effect in effects {
if let CommsDrainLifecycleEffect::AbortDrainTask = effect
&& let Some(handle) = slot.handle.take()
{
handle.abort();
}
}
}
fn abort_slot(slot: &mut CommsDrainSlot) {
match protocol_comms_drain_abort::execute_stop_requested(&mut slot.authority) {
Ok(result) => {
apply_runtime_drain_effects(slot, &result.effects);
let _ = result.obligation;
}
Err(_) => {
if let Some(handle) = slot.handle.take() {
handle.abort();
}
}
}
}
pub struct RuntimeSessionAdapter {
sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
mode: RuntimeMode,
store: Option<Arc<dyn RuntimeStore>>,
blob_store: Option<Arc<dyn BlobStore>>,
comms_drain_slots: RwLock<HashMap<SessionId, CommsDrainSlot>>,
}
impl RuntimeSessionAdapter {
pub fn ephemeral() -> Self {
Self {
sessions: RwLock::new(HashMap::new()),
mode: RuntimeMode::V9Compliant,
store: None,
blob_store: None,
comms_drain_slots: RwLock::new(HashMap::new()),
}
}
pub fn persistent(store: Arc<dyn RuntimeStore>, blob_store: Arc<dyn BlobStore>) -> Self {
Self {
sessions: RwLock::new(HashMap::new()),
mode: RuntimeMode::V9Compliant,
store: Some(store),
blob_store: Some(blob_store),
comms_drain_slots: RwLock::new(HashMap::new()),
}
}
fn make_driver(&self, session_id: &SessionId) -> DriverEntry {
let runtime_id = LogicalRuntimeId::new(session_id.to_string());
match (&self.store, &self.blob_store) {
(Some(store), Some(blob_store)) => DriverEntry::Persistent(
PersistentRuntimeDriver::new(runtime_id, store.clone(), blob_store.clone()),
),
(Some(_store), None) => {
tracing::warn!(
%session_id,
"persistent runtime store present but blob store missing; \
falling back to ephemeral driver"
);
DriverEntry::Ephemeral(EphemeralRuntimeDriver::new(runtime_id))
}
_ => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new(runtime_id)),
}
}
pub async fn register_session(&self, session_id: SessionId) {
{
let mut sessions = self.sessions.write().await;
if let Some(existing) = sessions.get_mut(&session_id) {
existing.clear_dead_attachment();
return;
}
}
let mut entry = self.make_driver(&session_id);
if let Err(err) = entry.as_driver_mut().recover().await {
tracing::error!(%session_id, error = %err, "failed to recover runtime driver during registration");
return;
}
let session_entry = RuntimeSessionEntry {
driver: Arc::new(Mutex::new(entry)),
ops_lifecycle: Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
completions: Arc::new(Mutex::new(crate::completion::CompletionRegistry::new())),
attachment: None,
};
let mut sessions = self.sessions.write().await;
if let Some(existing) = sessions.get_mut(&session_id) {
existing.clear_dead_attachment();
} else {
sessions.insert(session_id, session_entry);
}
}
pub async fn set_session_silent_intents(&self, session_id: &SessionId, intents: Vec<String>) {
let sessions = self.sessions.read().await;
if let Some(entry) = sessions.get(session_id) {
let mut driver = entry.driver.lock().await;
driver.set_silent_comms_intents(intents);
}
}
pub async fn register_session_with_executor(
&self,
session_id: SessionId,
executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
) {
self.ensure_session_with_executor(session_id, executor)
.await;
}
pub async fn ensure_session_with_executor(
&self,
session_id: SessionId,
executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
) {
let existing = {
let mut sessions = self.sessions.write().await;
sessions.get_mut(&session_id).map(|entry| {
entry.clear_dead_attachment();
(
entry.has_attachment(),
entry.driver.clone(),
entry.completions.clone(),
)
})
};
let (driver, completions) = if let Some((has_attachment, driver, completions)) = existing {
if has_attachment {
return;
}
(driver, completions)
} else {
let mut recovered_entry = self.make_driver(&session_id);
if let Err(err) = recovered_entry.as_driver_mut().recover().await {
tracing::error!(
%session_id,
error = %err,
"failed to recover runtime driver during registration"
);
return;
}
let mut sessions = self.sessions.write().await;
if let Some(entry) = sessions.get_mut(&session_id) {
entry.clear_dead_attachment();
if entry.has_attachment() {
return;
}
(entry.driver.clone(), entry.completions.clone())
} else {
let driver = Arc::new(Mutex::new(recovered_entry));
let completions =
Arc::new(Mutex::new(crate::completion::CompletionRegistry::new()));
sessions.insert(
session_id.clone(),
RuntimeSessionEntry {
driver: driver.clone(),
ops_lifecycle: Arc::new(
crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new(),
),
completions: completions.clone(),
attachment: None,
},
);
(driver, completions)
}
};
let should_wake = {
let mut driver_guard = driver.lock().await;
if let Err(error) = driver_guard.attach() {
let repaired = if error.from == RuntimeState::Attached
&& error.to == RuntimeState::Attached
{
tracing::warn!(
%session_id,
error = %error,
"runtime driver remained attached without a live published loop; detaching and retrying attachment"
);
match driver_guard.detach() {
Ok(_) => match driver_guard.attach() {
Ok(()) => true,
Err(retry_error) => {
tracing::warn!(
%session_id,
error = %retry_error,
"failed to re-attach runtime driver after repairing stale attachment state"
);
false
}
},
Err(detach_error) => {
tracing::warn!(
%session_id,
error = %detach_error,
"failed to detach stale attached runtime driver before retrying attachment"
);
false
}
}
} else {
false
};
if !repaired {
tracing::warn!(
%session_id,
error = %error,
"failed to attach runtime driver before publishing loop attachment"
);
return;
}
}
!driver_guard.as_driver().active_input_ids().is_empty()
};
let (wake_tx, wake_rx) = mpsc::channel(16);
let (control_tx, control_rx) = mpsc::channel(16);
let mut pending_loop_handle =
Some(crate::runtime_loop::spawn_runtime_loop_with_completions(
driver.clone(),
executor,
wake_rx,
control_rx,
Some(completions.clone()),
));
let (published, detach_after_abort) = {
let mut sessions = self.sessions.write().await;
match sessions.get_mut(&session_id) {
None => (false, true),
Some(entry) => {
entry.clear_dead_attachment();
if entry.has_attachment() {
(false, false)
} else if !Arc::ptr_eq(&entry.driver, &driver)
|| !Arc::ptr_eq(&entry.completions, &completions)
{
tracing::warn!(
%session_id,
"runtime session entry changed while wiring executor; aborting stale loop attachment"
);
(false, true)
} else {
match pending_loop_handle.take() {
Some(loop_handle) => {
entry.attach_runtime_loop(wake_tx.clone(), control_tx, loop_handle);
(true, false)
}
None => {
tracing::error!(
%session_id,
"runtime loop handle missing during attachment publish"
);
(false, true)
}
}
}
}
}
};
if !published {
if let Some(loop_handle) = pending_loop_handle.take() {
loop_handle.abort();
}
if detach_after_abort {
let mut driver_guard = driver.lock().await;
let _ = driver_guard.detach();
}
return;
}
if should_wake {
let _ = wake_tx.try_send(());
}
}
pub async fn unregister_session(&self, session_id: &SessionId) {
let entry = {
let mut sessions = self.sessions.write().await;
let mut slots = self.comms_drain_slots.write().await;
if let Some(mut slot) = slots.remove(session_id) {
abort_slot(&mut slot);
}
sessions.remove(session_id)
};
if let Some(entry) = entry {
let mut driver = entry.driver.lock().await;
let _ = driver.detach(); drop(driver);
let mut completions = entry.completions.lock().await;
completions.resolve_all_terminated("runtime session unregistered");
}
}
pub async fn contains_session(&self, session_id: &SessionId) -> bool {
self.sessions.read().await.contains_key(session_id)
}
pub async fn interrupt_current_run(
&self,
session_id: &SessionId,
) -> Result<(), RuntimeDriverError> {
let (driver, control_tx) = {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
(entry.driver.clone(), entry.control_sender())
};
let Some(control_tx) = control_tx else {
let state = {
let driver = driver.lock().await;
driver.as_driver().runtime_state()
};
return Err(RuntimeDriverError::NotReady { state });
};
control_tx
.send(RunControlCommand::CancelCurrentRun {
reason: "mob interrupt".to_string(),
})
.await
.map_err(|err| RuntimeDriverError::Internal(format!("failed to send interrupt: {err}")))
}
pub async fn stop_runtime_executor(
&self,
session_id: &SessionId,
command: RunControlCommand,
) -> Result<(), RuntimeDriverError> {
let (driver, completions, control_tx) = {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
(
entry.driver.clone(),
entry.completions.clone(),
entry.control_sender(),
)
};
if let Some(control_tx) = control_tx
&& control_tx.send(command.clone()).await.is_ok()
{
return Ok(());
}
if matches!(command, RunControlCommand::StopRuntimeExecutor { .. }) {
let mut driver = driver.lock().await;
driver
.as_driver_mut()
.on_runtime_control(crate::traits::RuntimeControlCommand::Stop)
.await?;
drop(driver);
let mut completions = completions.lock().await;
completions.resolve_all_terminated("runtime stopped");
drop(completions);
let mut sessions = self.sessions.write().await;
if let Some(entry) = sessions.get_mut(session_id) {
entry.clear_dead_attachment();
}
Ok(())
} else {
Err(RuntimeDriverError::Internal(
"failed to send stop: runtime loop is unavailable".into(),
))
}
}
pub async fn accept_input_and_run<T, F, Fut>(
&self,
session_id: &SessionId,
input: Input,
op: F,
) -> Result<T, RuntimeDriverError>
where
F: FnOnce(RunId, meerkat_core::lifecycle::run_primitive::RunPrimitive) -> Fut,
Fut: Future<Output = Result<(T, CoreApplyOutput), RuntimeDriverError>>,
{
let driver = {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?
.driver
.clone()
};
let (input_id, run_id, primitive) = {
let mut driver = driver.lock().await;
if !driver.is_idle_or_attached() {
return Err(RuntimeDriverError::NotReady {
state: driver.as_driver().runtime_state(),
});
}
let active_input_ids = driver.as_driver().active_input_ids();
if !active_input_ids.is_empty() {
let duplicate_active_input =
input.header().idempotency_key.as_ref().and_then(|key| {
active_input_ids.iter().find(|active_id| {
driver
.as_driver()
.input_state(active_id)
.and_then(|state| state.idempotency_key.as_ref())
== Some(key)
})
});
if let Some(existing_id) = duplicate_active_input {
return Err(RuntimeDriverError::ValidationFailed {
reason: format!(
"accept_input_and_run does not support deduplicated admission; existing input {existing_id} already owns execution"
),
});
}
return Err(RuntimeDriverError::NotReady {
state: driver.as_driver().runtime_state(),
});
}
let outcome = driver.as_driver_mut().accept_input(input).await?;
let input_id = match outcome {
AcceptOutcome::Accepted { input_id, .. } => input_id,
AcceptOutcome::Deduplicated { existing_id, .. } => {
return Err(RuntimeDriverError::ValidationFailed {
reason: format!(
"accept_input_and_run does not support deduplicated admission; existing input {existing_id} already owns execution"
),
});
}
AcceptOutcome::Rejected { reason } => {
return Err(RuntimeDriverError::ValidationFailed { reason });
}
};
if !driver.is_idle_or_attached() {
return Err(RuntimeDriverError::NotReady {
state: driver.as_driver().runtime_state(),
});
}
let (dequeued_id, dequeued_input) = driver.dequeue_next().ok_or_else(|| {
RuntimeDriverError::Internal("accepted input was not queued for execution".into())
})?;
if dequeued_id != input_id {
return Err(RuntimeDriverError::NotReady {
state: driver.as_driver().runtime_state(),
});
}
let run_id = RunId::new();
driver.start_run(run_id.clone()).map_err(|err| {
RuntimeDriverError::Internal(format!("failed to start runtime run: {err}"))
})?;
driver.stage_input(&dequeued_id, &run_id).map_err(|err| {
RuntimeDriverError::Internal(format!("failed to stage accepted input: {err}"))
})?;
let primitive = crate::runtime_loop::input_to_primitive(&dequeued_input, dequeued_id);
(input_id, run_id, primitive)
};
match op(run_id.clone(), primitive.clone()).await {
Ok((result, output)) => {
let mut driver = driver.lock().await;
if let Err(err) = driver
.as_driver_mut()
.on_run_event(meerkat_core::lifecycle::RunEvent::BoundaryApplied {
run_id: run_id.clone(),
receipt: output.receipt,
session_snapshot: output.session_snapshot,
})
.await
{
if let Err(unwind_err) = driver
.as_driver_mut()
.on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
run_id,
error: format!("boundary commit failed: {err}"),
recoverable: true,
})
.await
{
return Err(RuntimeDriverError::Internal(format!(
"runtime boundary commit failed: {err}; additionally failed to unwind runtime state: {unwind_err}"
)));
}
return Err(RuntimeDriverError::Internal(format!(
"runtime boundary commit failed: {err}"
)));
}
if let Err(err) = driver
.as_driver_mut()
.on_run_event(meerkat_core::lifecycle::RunEvent::RunCompleted {
run_id,
consumed_input_ids: vec![input_id],
})
.await
{
drop(driver);
self.unregister_session(session_id).await;
return Err(RuntimeDriverError::Internal(format!(
"failed to persist runtime completion snapshot: {err}"
)));
}
Ok(result)
}
Err(err) => {
let mut driver = driver.lock().await;
if let Err(run_err) = driver
.as_driver_mut()
.on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
run_id,
error: err.to_string(),
recoverable: true,
})
.await
{
drop(driver);
self.unregister_session(session_id).await;
return Err(RuntimeDriverError::Internal(format!(
"failed to persist runtime failure snapshot: {run_err}"
)));
}
Err(err)
}
}
}
pub async fn accept_input_with_completion(
&self,
session_id: &SessionId,
input: Input,
) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
{
let (driver, completions, wake_tx) = {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
(
entry.driver.clone(),
entry.completions.clone(),
entry.wake_sender(),
)
};
let (outcome, should_wake, should_process, handle) = {
let mut driver = driver.lock().await;
let result = driver.as_driver_mut().accept_input(input).await?;
match &result {
AcceptOutcome::Accepted { input_id, .. } => {
let is_terminal = driver
.as_driver()
.input_state(input_id)
.map(|state| state.current_state().is_terminal())
.unwrap_or(true);
let handle = if is_terminal {
None
} else {
Some({
let mut completions = completions.lock().await;
completions.register(input_id.clone())
})
};
let wake = driver.take_wake_requested();
let process_now = driver.take_process_requested();
(result, wake, process_now, handle)
}
AcceptOutcome::Deduplicated { existing_id, .. } => {
let existing_state = driver.as_driver().input_state(existing_id);
let is_terminal = existing_state
.map(|s| s.current_state().is_terminal())
.unwrap_or(true);
if is_terminal {
(result, false, false, None)
} else {
let handle = {
let mut completions = completions.lock().await;
completions.register(existing_id.clone())
};
(result, false, false, Some(handle))
}
}
AcceptOutcome::Rejected { reason } => {
return Err(RuntimeDriverError::ValidationFailed {
reason: reason.clone(),
});
}
}
};
if (should_wake || should_process)
&& let Some(ref wake_tx) = wake_tx
{
let _ = wake_tx.try_send(());
}
Ok((outcome, handle))
}
pub async fn accept_input_without_wake(
&self,
session_id: &SessionId,
input: Input,
) -> Result<AcceptOutcome, RuntimeDriverError> {
let driver = {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
entry.driver.clone()
};
let outcome = {
let mut driver = driver.lock().await;
let result = driver.as_driver_mut().accept_input(input).await?;
let _ = driver.take_wake_requested();
let process_requested = driver.take_process_requested();
debug_assert!(
!process_requested,
"queue-only admission unexpectedly requested immediate processing"
);
result
};
Ok(outcome)
}
pub async fn ops_lifecycle_registry(
&self,
session_id: &SessionId,
) -> Option<Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>> {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.map(|e| Arc::clone(&e.ops_lifecycle))
}
pub async fn maybe_spawn_comms_drain(
self: &Arc<Self>,
session_id: &SessionId,
keep_alive: bool,
comms_runtime: Option<Arc<dyn meerkat_core::agent::CommsRuntime>>,
) -> bool {
if !keep_alive {
self.abort_comms_drain(session_id).await;
return false;
}
let mode = CommsDrainMode::PersistentHost;
let comms = match comms_runtime {
Some(c) => c,
None => return false,
};
let sessions = self.sessions.read().await;
if !sessions.contains_key(session_id) {
tracing::warn!(
%session_id,
"refusing to spawn comms drain for unregistered session"
);
return false;
}
let mut slots = self.comms_drain_slots.write().await;
let slot = slots
.entry(session_id.clone())
.or_insert_with(CommsDrainSlot::new);
let result =
match protocol_comms_drain_spawn::execute_ensure_running(&mut slot.authority, mode) {
Ok(r) => r,
Err(e) => {
tracing::trace!(error = %e, "comms drain authority rejected EnsureRunning");
return false;
}
};
for effect in &result.effects {
match effect {
CommsDrainLifecycleEffect::SpawnDrainTask { mode: spawn_mode } => {
let idle_timeout = match spawn_mode {
CommsDrainMode::PersistentHost => Some(std::time::Duration::MAX),
CommsDrainMode::Timed => None,
};
let handle = crate::comms_drain::spawn_comms_drain(
Arc::clone(self),
session_id.clone(),
comms.clone(),
idle_timeout,
);
slot.handle = Some(handle);
}
CommsDrainLifecycleEffect::AbortDrainTask => {
if let Some(handle) = slot.handle.take() {
handle.abort();
}
}
}
}
let Some(obligation) = result.obligation else {
tracing::warn!(
%session_id,
"comms drain spawn transition emitted no obligation"
);
return false;
};
match protocol_comms_drain_spawn::submit_task_spawned(&mut slot.authority, obligation) {
Ok(_effects) => {}
Err(e) => {
tracing::trace!(error = %e, "comms drain authority rejected TaskSpawned");
}
}
true
}
pub async fn notify_comms_drain_exited(&self, session_id: &SessionId, reason: DrainExitReason) {
let mut slots = self.comms_drain_slots.write().await;
if let Some(slot) = slots.get_mut(session_id) {
slot.handle.take(); match protocol_comms_drain_spawn::notify_task_exited(&mut slot.authority, reason) {
Ok(_effects) => {}
Err(e) => {
tracing::warn!(error = %e, "comms drain authority rejected TaskExited");
}
}
}
}
pub async fn abort_comms_drains(&self) {
let mut slots = self.comms_drain_slots.write().await;
for (_, slot) in slots.iter_mut() {
abort_slot(slot);
}
}
pub async fn abort_comms_drain(&self, session_id: &SessionId) {
let mut slots = self.comms_drain_slots.write().await;
if let Some(slot) = slots.get_mut(session_id) {
abort_slot(slot);
}
}
pub async fn wait_comms_drain(&self, session_id: &SessionId) {
let handle = {
let mut slots = self.comms_drain_slots.write().await;
slots
.get_mut(session_id)
.and_then(|slot| slot.handle.take())
};
if let Some(handle) = handle {
let _ = handle.await;
}
let mut slots = self.comms_drain_slots.write().await;
if let Some(slot) = slots.get_mut(session_id)
&& slot.authority.phase()
== meerkat_core::comms_drain_lifecycle_authority::CommsDrainPhase::Running
{
tracing::warn!(
"comms_drain: task exited without notifying authority (likely panicked), \
submitting Failed safety net"
);
match protocol_comms_drain_spawn::notify_task_exited(
&mut slot.authority,
DrainExitReason::Failed,
) {
Ok(effects) => {
apply_runtime_drain_effects(slot, &effects);
}
Err(e) => {
tracing::warn!(error = %e, "comms drain authority rejected safety-net TaskExited");
}
}
}
}
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl SessionServiceRuntimeExt for RuntimeSessionAdapter {
fn runtime_mode(&self) -> RuntimeMode {
self.mode
}
async fn accept_input(
&self,
session_id: &SessionId,
input: Input,
) -> Result<AcceptOutcome, RuntimeDriverError> {
let (driver, wake_tx) = {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
(entry.driver.clone(), entry.wake_sender())
};
let (outcome, should_wake, should_process) = {
let mut driver = driver.lock().await;
let result = driver.as_driver_mut().accept_input(input).await?;
let wake = driver.take_wake_requested();
let process_now = driver.take_process_requested();
(result, wake, process_now)
};
if (should_wake || should_process)
&& let Some(ref wake_tx) = wake_tx
{
let _ = wake_tx.try_send(());
}
Ok(outcome)
}
async fn accept_input_with_completion(
&self,
session_id: &SessionId,
input: Input,
) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
{
RuntimeSessionAdapter::accept_input_with_completion(self, session_id, input).await
}
async fn runtime_state(
&self,
session_id: &SessionId,
) -> Result<RuntimeState, RuntimeDriverError> {
let driver = {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
entry.driver.clone()
};
let driver = driver.lock().await;
Ok(driver.as_driver().runtime_state())
}
async fn retire_runtime(
&self,
session_id: &SessionId,
) -> Result<RetireReport, RuntimeDriverError> {
let (driver_handle, completions, wake_tx) = {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
(
entry.driver.clone(),
entry.completions.clone(),
entry.wake_sender(),
)
};
let mut driver = driver_handle.lock().await;
let mut report = driver.as_driver_mut().retire().await?;
drop(driver);
if report.inputs_pending_drain > 0 {
if let Some(ref wake_tx) = wake_tx
&& wake_tx.send(()).await.is_ok()
{
return Ok(report);
}
let mut driver = driver_handle.lock().await;
let abandoned = driver
.abandon_pending_inputs(crate::input_state::InputAbandonReason::Retired)
.await?;
drop(driver);
let mut completions = completions.lock().await;
completions.resolve_all_terminated("retired without runtime loop");
report.inputs_abandoned += abandoned;
report.inputs_pending_drain = 0;
}
Ok(report)
}
async fn reset_runtime(
&self,
session_id: &SessionId,
) -> Result<ResetReport, RuntimeDriverError> {
let (driver, completions) = {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
(entry.driver.clone(), entry.completions.clone())
};
let mut driver = driver.lock().await;
if matches!(driver.as_driver().runtime_state(), RuntimeState::Running) {
return Err(RuntimeDriverError::NotReady {
state: RuntimeState::Running,
});
}
let report = driver.as_driver_mut().reset().await?;
drop(driver);
let mut completions = completions.lock().await;
completions.resolve_all_terminated("runtime reset");
Ok(report)
}
async fn input_state(
&self,
session_id: &SessionId,
input_id: &InputId,
) -> Result<Option<InputState>, RuntimeDriverError> {
let driver = {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
entry.driver.clone()
};
let driver = driver.lock().await;
Ok(driver.as_driver().input_state(input_id).cloned())
}
async fn list_active_inputs(
&self,
session_id: &SessionId,
) -> Result<Vec<InputId>, RuntimeDriverError> {
let driver = {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
entry.driver.clone()
};
let driver = driver.lock().await;
Ok(driver.as_driver().active_input_ids())
}
}
impl RuntimeSessionAdapter {
fn resolve_session_id(
runtime_id: &LogicalRuntimeId,
) -> Result<SessionId, RuntimeControlPlaneError> {
runtime_id
.0
.parse::<uuid::Uuid>()
.map(SessionId)
.map_err(|_| RuntimeControlPlaneError::NotFound(runtime_id.clone()))
}
async fn lookup_entry(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<
(
SessionId,
SharedDriver,
SharedCompletionRegistry,
Option<mpsc::Sender<()>>,
),
RuntimeControlPlaneError,
> {
let session_id = Self::resolve_session_id(runtime_id)?;
let sessions = self.sessions.read().await;
let entry = sessions
.get(&session_id)
.ok_or_else(|| RuntimeControlPlaneError::NotFound(runtime_id.clone()))?;
Ok((
session_id,
entry.driver.clone(),
entry.completions.clone(),
entry.wake_sender(),
))
}
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl crate::traits::RuntimeControlPlane for RuntimeSessionAdapter {
async fn ingest(
&self,
runtime_id: &LogicalRuntimeId,
input: Input,
) -> Result<AcceptOutcome, RuntimeControlPlaneError> {
let (session_id, driver, _completions, wake_tx) = self.lookup_entry(runtime_id).await?;
let _ = session_id;
let (outcome, should_wake, should_process) = {
let mut drv = driver.lock().await;
let result = drv
.as_driver_mut()
.accept_input(input)
.await
.map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
let wake = drv.take_wake_requested();
let process_now = drv.take_process_requested();
(result, wake, process_now)
};
if (should_wake || should_process)
&& let Some(ref tx) = wake_tx
{
let _ = tx.try_send(());
}
Ok(outcome)
}
async fn publish_event(
&self,
event: crate::runtime_event::RuntimeEventEnvelope,
) -> Result<(), RuntimeControlPlaneError> {
let runtime_id = event.runtime_id.clone();
let (_session_id, driver, _completions, _wake_tx) = self.lookup_entry(&runtime_id).await?;
let mut drv = driver.lock().await;
drv.as_driver_mut()
.on_runtime_event(event)
.await
.map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))
}
async fn retire(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<RetireReport, RuntimeControlPlaneError> {
let (session_id, driver, completions, wake_tx) = self.lookup_entry(runtime_id).await?;
let _ = session_id;
let mut drv = driver.lock().await;
let mut report = drv
.as_driver_mut()
.retire()
.await
.map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
drop(drv);
if report.inputs_pending_drain > 0 {
if let Some(ref tx) = wake_tx
&& tx.send(()).await.is_ok()
{
return Ok(report);
}
let mut drv = driver.lock().await;
let abandoned = drv
.abandon_pending_inputs(crate::input_state::InputAbandonReason::Retired)
.await
.map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
drop(drv);
let mut comp = completions.lock().await;
comp.resolve_all_terminated("retired without runtime loop");
report.inputs_abandoned += abandoned;
report.inputs_pending_drain = 0;
}
Ok(report)
}
async fn recycle(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<RecycleReport, RuntimeControlPlaneError> {
let (_session_id, driver, completions, wake_tx) = self.lookup_entry(runtime_id).await?;
let (transferred, active_after_recycle) = {
let mut drv = driver.lock().await;
let state = drv.as_driver().runtime_state();
if matches!(state, RuntimeState::Running) {
return Err(RuntimeControlPlaneError::InvalidState { state });
}
let should_restore_attached = matches!(state, RuntimeState::Attached);
let transferred = match &mut *drv {
DriverEntry::Ephemeral(driver) => driver
.recycle_preserving_work()
.map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?,
DriverEntry::Persistent(driver) => driver
.recycle_preserving_work()
.await
.map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?,
};
if should_restore_attached
&& matches!(drv.as_driver().runtime_state(), RuntimeState::Idle)
{
drv.attach()
.map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
}
let active_after_recycle = drv.as_driver().active_input_ids();
(transferred, active_after_recycle)
};
{
let pending_after: HashSet<InputId> = active_after_recycle.into_iter().collect();
let mut comp = completions.lock().await;
comp.resolve_not_pending(
|input_id| pending_after.contains(input_id),
"recycled input no longer pending",
);
}
if let Some(ref tx) = wake_tx {
let _ = tx.try_send(());
}
Ok(RecycleReport {
inputs_transferred: transferred,
})
}
async fn reset(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<crate::traits::ResetReport, RuntimeControlPlaneError> {
let (_session_id, driver, completions, _wake_tx) = self.lookup_entry(runtime_id).await?;
let mut drv = driver.lock().await;
if matches!(drv.as_driver().runtime_state(), RuntimeState::Running) {
return Err(RuntimeControlPlaneError::InvalidState {
state: RuntimeState::Running,
});
}
let report = drv
.as_driver_mut()
.reset()
.await
.map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
drop(drv);
let mut comp = completions.lock().await;
comp.resolve_all_terminated("runtime reset");
Ok(report)
}
async fn recover(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<RecoveryReport, RuntimeControlPlaneError> {
let (_session_id, driver, _completions, wake_tx) = self.lookup_entry(runtime_id).await?;
let mut drv = driver.lock().await;
let report = drv
.as_driver_mut()
.recover()
.await
.map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
drop(drv);
if let Some(ref tx) = wake_tx {
let _ = tx.try_send(());
}
Ok(report)
}
async fn destroy(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<DestroyReport, RuntimeControlPlaneError> {
let (_session_id, driver, completions, _wake_tx) = self.lookup_entry(runtime_id).await?;
let mut drv = driver.lock().await;
let report = drv
.as_driver_mut()
.destroy()
.await
.map_err(|e| RuntimeControlPlaneError::Internal(e.to_string()))?;
drop(drv);
let mut comp = completions.lock().await;
comp.resolve_all_terminated("runtime destroyed");
Ok(report)
}
async fn runtime_state(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<RuntimeState, RuntimeControlPlaneError> {
let (_session_id, driver, _completions, _wake_tx) = self.lookup_entry(runtime_id).await?;
let drv = driver.lock().await;
Ok(drv.as_driver().runtime_state())
}
async fn load_boundary_receipt(
&self,
runtime_id: &LogicalRuntimeId,
run_id: &RunId,
sequence: u64,
) -> Result<Option<meerkat_core::lifecycle::RunBoundaryReceipt>, RuntimeControlPlaneError> {
match &self.store {
Some(store) => store
.load_boundary_receipt(runtime_id, run_id, sequence)
.await
.map_err(|e| RuntimeControlPlaneError::StoreError(e.to_string())),
None => {
Ok(None)
}
}
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use meerkat_core::agent::{CommsCapabilityError, CommsRuntime};
use meerkat_core::comms_drain_lifecycle_authority::{CommsDrainMode, CommsDrainPhase};
use tokio::sync::Notify;
struct FakeDrainRuntime {
notify: Arc<Notify>,
dismiss: AtomicBool,
}
impl FakeDrainRuntime {
fn dismissing() -> Self {
Self {
notify: Arc::new(Notify::new()),
dismiss: AtomicBool::new(true),
}
}
fn idle() -> Self {
Self {
notify: Arc::new(Notify::new()),
dismiss: AtomicBool::new(false),
}
}
}
#[async_trait::async_trait]
impl CommsRuntime for FakeDrainRuntime {
async fn drain_messages(&self) -> Vec<String> {
Vec::new()
}
fn inbox_notify(&self) -> Arc<Notify> {
Arc::clone(&self.notify)
}
fn dismiss_received(&self) -> bool {
self.dismiss.load(Ordering::Acquire)
}
async fn drain_classified_inbox_interactions(
&self,
) -> Result<Vec<meerkat_core::interaction::ClassifiedInboxInteraction>, CommsCapabilityError>
{
Ok(Vec::new())
}
}
async fn spawn_test_comms_drain(
adapter: &Arc<RuntimeSessionAdapter>,
session_id: &SessionId,
mode: CommsDrainMode,
comms_runtime: Arc<dyn CommsRuntime>,
idle_timeout: Duration,
) {
adapter.register_session(session_id.clone()).await;
let mut slots = adapter.comms_drain_slots.write().await;
let slot = slots
.entry(session_id.clone())
.or_insert_with(CommsDrainSlot::new);
let result = protocol_comms_drain_spawn::execute_ensure_running(&mut slot.authority, mode)
.expect("ensure running");
let obligation = result
.obligation
.expect("spawn obligation should be present");
apply_runtime_drain_effects(slot, &result.effects);
for effect in &result.effects {
if let CommsDrainLifecycleEffect::SpawnDrainTask { .. } = effect {
slot.handle = Some(crate::comms_drain::spawn_comms_drain(
Arc::clone(adapter),
session_id.clone(),
Arc::clone(&comms_runtime),
Some(idle_timeout),
));
}
}
let feedback_effects =
protocol_comms_drain_spawn::submit_task_spawned(&mut slot.authority, obligation)
.expect("task spawned");
apply_runtime_drain_effects(slot, &feedback_effects);
}
async fn current_phase(
adapter: &Arc<RuntimeSessionAdapter>,
session_id: &SessionId,
) -> Option<CommsDrainPhase> {
let slots = adapter.comms_drain_slots.read().await;
slots.get(session_id).map(|slot| slot.authority.phase())
}
async fn handle_present(adapter: &Arc<RuntimeSessionAdapter>, session_id: &SessionId) -> bool {
let slots = adapter.comms_drain_slots.read().await;
slots
.get(session_id)
.and_then(|slot| slot.handle.as_ref())
.is_some()
}
async fn wait_for_phase(
adapter: &Arc<RuntimeSessionAdapter>,
session_id: &SessionId,
expected: CommsDrainPhase,
) {
tokio::time::timeout(Duration::from_secs(1), async {
loop {
if current_phase(adapter, session_id).await == Some(expected) {
break;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
})
.await
.expect("phase transition");
}
#[tokio::test]
async fn dismiss_exit_updates_authority_before_join() {
let adapter = Arc::new(RuntimeSessionAdapter::ephemeral());
let session_id = SessionId::new();
let comms_runtime: Arc<dyn CommsRuntime> = Arc::new(FakeDrainRuntime::dismissing());
spawn_test_comms_drain(
&adapter,
&session_id,
CommsDrainMode::PersistentHost,
comms_runtime,
Duration::from_millis(25),
)
.await;
wait_for_phase(&adapter, &session_id, CommsDrainPhase::Stopped).await;
assert!(
!handle_present(&adapter, &session_id).await,
"drain task should clear its slot before wait_comms_drain joins"
);
adapter.wait_comms_drain(&session_id).await;
assert_eq!(
current_phase(&adapter, &session_id).await,
Some(CommsDrainPhase::Stopped)
);
}
#[tokio::test]
async fn idle_timeout_updates_authority_before_join() {
let adapter = Arc::new(RuntimeSessionAdapter::ephemeral());
let session_id = SessionId::new();
let comms_runtime: Arc<dyn CommsRuntime> = Arc::new(FakeDrainRuntime::idle());
spawn_test_comms_drain(
&adapter,
&session_id,
CommsDrainMode::Timed,
comms_runtime,
Duration::from_millis(25),
)
.await;
wait_for_phase(&adapter, &session_id, CommsDrainPhase::Stopped).await;
assert!(
!handle_present(&adapter, &session_id).await,
"drain task should clear its slot before wait_comms_drain joins"
);
adapter.wait_comms_drain(&session_id).await;
assert_eq!(
current_phase(&adapter, &session_id).await,
Some(CommsDrainPhase::Stopped)
);
}
#[tokio::test]
async fn unregister_session_aborts_and_removes_drain_slot() {
let adapter = Arc::new(RuntimeSessionAdapter::ephemeral());
let session_id = SessionId::new();
let comms_runtime: Arc<dyn CommsRuntime> = Arc::new(FakeDrainRuntime::idle());
adapter.register_session(session_id.clone()).await;
spawn_test_comms_drain(
&adapter,
&session_id,
CommsDrainMode::PersistentHost,
comms_runtime,
Duration::from_secs(60),
)
.await;
assert_eq!(
current_phase(&adapter, &session_id).await,
Some(CommsDrainPhase::Running)
);
assert!(handle_present(&adapter, &session_id).await);
adapter.unregister_session(&session_id).await;
let slots = adapter.comms_drain_slots.read().await;
assert!(
!slots.contains_key(&session_id),
"unregister must remove the comms drain slot entirely"
);
}
}