use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
use meerkat_core::lifecycle::run_control::RunControlCommand;
use meerkat_core::lifecycle::{InputId, RunId};
use meerkat_core::types::SessionId;
use crate::accept::AcceptOutcome;
use crate::driver::ephemeral::EphemeralRuntimeDriver;
use crate::driver::persistent::PersistentRuntimeDriver;
use crate::identifiers::LogicalRuntimeId;
use crate::input::Input;
use crate::input_machine::InputStateMachineError;
use crate::input_state::InputState;
use crate::runtime_state::{RuntimeState, RuntimeStateTransitionError};
use crate::service_ext::{RuntimeMode, SessionServiceRuntimeExt};
use crate::store::RuntimeStore;
use crate::tokio;
use crate::tokio::sync::{Mutex, RwLock, mpsc};
use crate::traits::{ResetReport, RetireReport, RuntimeDriver, RuntimeDriverError};
pub(crate) type SharedDriver = Arc<Mutex<DriverEntry>>;
pub(crate) enum DriverEntry {
Ephemeral(EphemeralRuntimeDriver),
Persistent(PersistentRuntimeDriver),
}
impl DriverEntry {
pub(crate) fn as_driver(&self) -> &dyn RuntimeDriver {
match self {
DriverEntry::Ephemeral(d) => d,
DriverEntry::Persistent(d) => d,
}
}
pub(crate) fn as_driver_mut(&mut self) -> &mut dyn RuntimeDriver {
match self {
DriverEntry::Ephemeral(d) => d,
DriverEntry::Persistent(d) => d,
}
}
pub(crate) fn is_idle(&self) -> bool {
match self {
DriverEntry::Ephemeral(d) => d.is_idle(),
DriverEntry::Persistent(d) => d.is_idle(),
}
}
pub(crate) fn can_process_queue(&self) -> bool {
match self {
DriverEntry::Ephemeral(d) => d.state_machine_ref().can_process_queue(),
DriverEntry::Persistent(d) => d.inner_ref().state_machine_ref().can_process_queue(),
}
}
pub(crate) fn take_wake_requested(&mut self) -> bool {
match self {
DriverEntry::Ephemeral(d) => d.take_wake_requested(),
DriverEntry::Persistent(d) => d.take_wake_requested(),
}
}
pub(crate) fn take_process_requested(&mut self) -> bool {
match self {
DriverEntry::Ephemeral(d) => d.take_process_requested(),
DriverEntry::Persistent(d) => d.take_process_requested(),
}
}
pub(crate) fn dequeue_next(&mut self) -> Option<(InputId, Input)> {
match self {
DriverEntry::Ephemeral(d) => d.dequeue_next(),
DriverEntry::Persistent(d) => d.dequeue_next(),
}
}
pub(crate) fn start_run(&mut self, run_id: RunId) -> Result<(), RuntimeStateTransitionError> {
match self {
DriverEntry::Ephemeral(d) => d.start_run(run_id),
DriverEntry::Persistent(d) => d.start_run(run_id),
}
}
pub(crate) fn complete_run(&mut self) -> Result<RunId, RuntimeStateTransitionError> {
match self {
DriverEntry::Ephemeral(d) => d.complete_run(),
DriverEntry::Persistent(d) => d.complete_run(),
}
}
pub(crate) fn stage_input(
&mut self,
input_id: &InputId,
run_id: &RunId,
) -> Result<(), InputStateMachineError> {
match self {
DriverEntry::Ephemeral(d) => d.stage_input(input_id, run_id),
DriverEntry::Persistent(d) => d.stage_input(input_id, run_id),
}
}
#[allow(dead_code)]
pub(crate) fn apply_input(
&mut self,
input_id: &InputId,
run_id: &RunId,
) -> Result<(), InputStateMachineError> {
match self {
DriverEntry::Ephemeral(d) => d.apply_input(input_id, run_id),
DriverEntry::Persistent(d) => d.apply_input(input_id, run_id),
}
}
#[allow(dead_code)]
pub(crate) fn consume_inputs(
&mut self,
input_ids: &[InputId],
run_id: &RunId,
) -> Result<(), InputStateMachineError> {
match self {
DriverEntry::Ephemeral(d) => d.consume_inputs(input_ids, run_id),
DriverEntry::Persistent(d) => d.consume_inputs(input_ids, run_id),
}
}
#[allow(dead_code)]
pub(crate) fn rollback_staged(
&mut self,
input_ids: &[InputId],
) -> Result<(), InputStateMachineError> {
match self {
DriverEntry::Ephemeral(d) => d.rollback_staged(input_ids),
DriverEntry::Persistent(d) => d.rollback_staged(input_ids),
}
}
}
pub(crate) type SharedCompletionRegistry = Arc<Mutex<crate::completion::CompletionRegistry>>;
struct RuntimeSessionEntry {
driver: SharedDriver,
completions: SharedCompletionRegistry,
wake_tx: Option<mpsc::Sender<()>>,
control_tx: Option<mpsc::Sender<RunControlCommand>>,
_loop_handle: Option<tokio::task::JoinHandle<()>>,
}
pub struct RuntimeSessionAdapter {
sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
mode: RuntimeMode,
store: Option<Arc<dyn RuntimeStore>>,
}
impl RuntimeSessionAdapter {
pub fn ephemeral() -> Self {
Self {
sessions: RwLock::new(HashMap::new()),
mode: RuntimeMode::V9Compliant,
store: None,
}
}
pub fn persistent(store: Arc<dyn RuntimeStore>) -> Self {
Self {
sessions: RwLock::new(HashMap::new()),
mode: RuntimeMode::V9Compliant,
store: Some(store),
}
}
fn make_driver(&self, session_id: &SessionId) -> DriverEntry {
let runtime_id = LogicalRuntimeId::new(session_id.to_string());
match &self.store {
Some(store) => {
DriverEntry::Persistent(PersistentRuntimeDriver::new(runtime_id, store.clone()))
}
None => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new(runtime_id)),
}
}
pub async fn register_session(&self, session_id: SessionId) {
if self.contains_session(&session_id).await {
return;
}
let mut entry = self.make_driver(&session_id);
if let Err(err) = entry.as_driver_mut().recover().await {
tracing::error!(%session_id, error = %err, "failed to recover runtime driver during registration");
return;
}
let session_entry = RuntimeSessionEntry {
driver: Arc::new(Mutex::new(entry)),
completions: Arc::new(Mutex::new(crate::completion::CompletionRegistry::new())),
wake_tx: None,
control_tx: None,
_loop_handle: None,
};
let mut sessions = self.sessions.write().await;
sessions.entry(session_id).or_insert(session_entry);
}
pub async fn register_session_with_executor(
&self,
session_id: SessionId,
executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
) {
self.ensure_session_with_executor(session_id, executor)
.await;
}
pub async fn ensure_session_with_executor(
&self,
session_id: SessionId,
executor: Box<dyn meerkat_core::lifecycle::CoreExecutor>,
) {
let mut executor = Some(executor);
let upgrade = {
let mut sessions = self.sessions.write().await;
if let Some(entry) = sessions.get_mut(&session_id) {
if entry.wake_tx.is_some() && entry.control_tx.is_some() {
return;
}
let driver = Arc::clone(&entry.driver);
let (wake_tx, wake_rx) = mpsc::channel(16);
let (control_tx, control_rx) = mpsc::channel(16);
let Some(executor) = executor.take() else {
tracing::error!(%session_id, "executor missing while upgrading existing runtime session");
return;
};
let handle = crate::runtime_loop::spawn_runtime_loop_with_completions(
driver.clone(),
executor,
wake_rx,
control_rx,
Some(entry.completions.clone()),
);
entry.wake_tx = Some(wake_tx.clone());
entry.control_tx = Some(control_tx);
entry._loop_handle = Some(handle);
Some((driver, wake_tx))
} else {
None
}
};
if let Some((driver, wake_tx)) = upgrade {
let should_wake = {
let driver = driver.lock().await;
!driver.as_driver().active_input_ids().is_empty()
};
if should_wake {
let _ = wake_tx.try_send(());
}
return;
}
let mut recovered_entry = self.make_driver(&session_id);
if let Err(err) = recovered_entry.as_driver_mut().recover().await {
tracing::error!(%session_id, error = %err, "failed to recover runtime driver during registration");
return;
}
let driver = {
let mut sessions = self.sessions.write().await;
if let Some(entry) = sessions.get(&session_id) {
if entry.wake_tx.is_some() && entry.control_tx.is_some() {
return;
}
entry.driver.clone()
} else {
let driver = Arc::new(Mutex::new(recovered_entry));
sessions.insert(
session_id.clone(),
RuntimeSessionEntry {
driver: driver.clone(),
completions: Arc::new(Mutex::new(
crate::completion::CompletionRegistry::new(),
)),
wake_tx: None,
control_tx: None,
_loop_handle: None,
},
);
driver
}
};
let (wake_tx, wake_rx) = mpsc::channel(16);
let (control_tx, control_rx) = mpsc::channel(16);
let Some(executor) = executor.take() else {
tracing::error!(%session_id, "executor missing while registering runtime session");
return;
};
let completions = {
let sessions = self.sessions.read().await;
sessions.get(&session_id).map(|e| e.completions.clone())
};
let handle = crate::runtime_loop::spawn_runtime_loop_with_completions(
driver.clone(),
executor,
wake_rx,
control_rx,
completions,
);
let mut sessions = self.sessions.write().await;
let Some(entry) = sessions.get_mut(&session_id) else {
return;
};
if entry.wake_tx.is_some() && entry.control_tx.is_some() {
return;
}
entry.wake_tx = Some(wake_tx.clone());
entry.control_tx = Some(control_tx);
entry._loop_handle = Some(handle);
drop(sessions);
let should_wake = {
let driver = driver.lock().await;
!driver.as_driver().active_input_ids().is_empty()
};
if should_wake {
let _ = wake_tx.try_send(());
}
}
pub async fn unregister_session(&self, session_id: &SessionId) {
self.sessions.write().await.remove(session_id);
}
pub async fn contains_session(&self, session_id: &SessionId) -> bool {
self.sessions.read().await.contains_key(session_id)
}
pub async fn interrupt_current_run(
&self,
session_id: &SessionId,
) -> Result<(), RuntimeDriverError> {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
let Some(control_tx) = &entry.control_tx else {
return Err(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
});
};
control_tx
.send(RunControlCommand::CancelCurrentRun {
reason: "mob interrupt".to_string(),
})
.await
.map_err(|err| RuntimeDriverError::Internal(format!("failed to send interrupt: {err}")))
}
pub async fn accept_input_and_run<T, F, Fut>(
&self,
session_id: &SessionId,
input: Input,
op: F,
) -> Result<T, RuntimeDriverError>
where
F: FnOnce(RunId, meerkat_core::lifecycle::run_primitive::RunPrimitive) -> Fut,
Fut: Future<Output = Result<(T, CoreApplyOutput), RuntimeDriverError>>,
{
let driver = {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?
.driver
.clone()
};
let (input_id, run_id, primitive) = {
let mut driver = driver.lock().await;
if !driver.is_idle() || !driver.as_driver().active_input_ids().is_empty() {
return Err(RuntimeDriverError::NotReady {
state: driver.as_driver().runtime_state(),
});
}
let outcome = driver.as_driver_mut().accept_input(input).await?;
let input_id = match outcome {
AcceptOutcome::Accepted { input_id, .. } => input_id,
AcceptOutcome::Deduplicated { existing_id, .. } => existing_id,
AcceptOutcome::Rejected { reason } => {
return Err(RuntimeDriverError::ValidationFailed { reason });
}
};
if !driver.is_idle() {
return Err(RuntimeDriverError::NotReady {
state: driver.as_driver().runtime_state(),
});
}
let (dequeued_id, dequeued_input) = driver.dequeue_next().ok_or_else(|| {
RuntimeDriverError::Internal("accepted input was not queued for execution".into())
})?;
if dequeued_id != input_id {
return Err(RuntimeDriverError::NotReady {
state: driver.as_driver().runtime_state(),
});
}
let run_id = RunId::new();
driver.start_run(run_id.clone()).map_err(|err| {
RuntimeDriverError::Internal(format!("failed to start runtime run: {err}"))
})?;
driver.stage_input(&dequeued_id, &run_id).map_err(|err| {
RuntimeDriverError::Internal(format!("failed to stage accepted input: {err}"))
})?;
let primitive = crate::runtime_loop::input_to_primitive(&dequeued_input, dequeued_id);
(input_id, run_id, primitive)
};
match op(run_id.clone(), primitive.clone()).await {
Ok((result, output)) => {
let mut driver = driver.lock().await;
if let Err(err) = driver
.as_driver_mut()
.on_run_event(meerkat_core::lifecycle::RunEvent::BoundaryApplied {
run_id: run_id.clone(),
receipt: output.receipt,
session_snapshot: output.session_snapshot,
})
.await
{
if let Err(unwind_err) = driver
.as_driver_mut()
.on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
run_id,
error: format!("boundary commit failed: {err}"),
recoverable: true,
})
.await
{
return Err(RuntimeDriverError::Internal(format!(
"runtime boundary commit failed: {err}; additionally failed to unwind runtime state: {unwind_err}"
)));
}
return Err(RuntimeDriverError::Internal(format!(
"runtime boundary commit failed: {err}"
)));
}
if let Err(err) = driver
.as_driver_mut()
.on_run_event(meerkat_core::lifecycle::RunEvent::RunCompleted {
run_id,
consumed_input_ids: vec![input_id],
})
.await
{
drop(driver);
self.unregister_session(session_id).await;
return Err(RuntimeDriverError::Internal(format!(
"failed to persist runtime completion snapshot: {err}"
)));
}
Ok(result)
}
Err(err) => {
let mut driver = driver.lock().await;
if let Err(run_err) = driver
.as_driver_mut()
.on_run_event(meerkat_core::lifecycle::RunEvent::RunFailed {
run_id,
error: err.to_string(),
recoverable: true,
})
.await
{
drop(driver);
self.unregister_session(session_id).await;
return Err(RuntimeDriverError::Internal(format!(
"failed to persist runtime failure snapshot: {run_err}"
)));
}
Err(err)
}
}
}
pub async fn accept_input_with_completion(
&self,
session_id: &SessionId,
input: Input,
) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
{
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
let (outcome, should_wake, should_process, handle) = {
let mut driver = entry.driver.lock().await;
let result = driver.as_driver_mut().accept_input(input).await?;
match &result {
AcceptOutcome::Accepted { input_id, .. } => {
let handle = {
let mut completions = entry.completions.lock().await;
completions.register(input_id.clone())
};
let wake = driver.take_wake_requested();
let process_now = driver.take_process_requested();
(result, wake, process_now, Some(handle))
}
AcceptOutcome::Deduplicated { existing_id, .. } => {
let existing_state = driver.as_driver().input_state(existing_id);
let is_terminal = existing_state
.map(|s| s.current_state.is_terminal())
.unwrap_or(true);
if is_terminal {
(result, false, false, None)
} else {
let handle = {
let mut completions = entry.completions.lock().await;
completions.register(existing_id.clone())
};
(result, false, false, Some(handle))
}
}
AcceptOutcome::Rejected { reason } => {
return Err(RuntimeDriverError::ValidationFailed {
reason: reason.clone(),
});
}
}
};
if (should_wake || should_process)
&& let Some(ref wake_tx) = entry.wake_tx
{
let _ = wake_tx.try_send(());
}
Ok((outcome, handle))
}
#[allow(dead_code)]
pub(crate) async fn completion_registry(
&self,
session_id: &SessionId,
) -> Option<SharedCompletionRegistry> {
let sessions = self.sessions.read().await;
sessions.get(session_id).map(|e| e.completions.clone())
}
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl SessionServiceRuntimeExt for RuntimeSessionAdapter {
fn runtime_mode(&self) -> RuntimeMode {
self.mode
}
async fn accept_input(
&self,
session_id: &SessionId,
input: Input,
) -> Result<AcceptOutcome, RuntimeDriverError> {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
let (outcome, should_wake, should_process) = {
let mut driver = entry.driver.lock().await;
let result = driver.as_driver_mut().accept_input(input).await?;
let wake = driver.take_wake_requested();
let process_now = driver.take_process_requested();
(result, wake, process_now)
};
if (should_wake || should_process)
&& let Some(ref wake_tx) = entry.wake_tx
{
let _ = wake_tx.try_send(());
}
Ok(outcome)
}
async fn runtime_state(
&self,
session_id: &SessionId,
) -> Result<RuntimeState, RuntimeDriverError> {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
let driver = entry.driver.lock().await;
Ok(driver.as_driver().runtime_state())
}
async fn retire_runtime(
&self,
session_id: &SessionId,
) -> Result<RetireReport, RuntimeDriverError> {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
let mut driver = entry.driver.lock().await;
let report = driver.as_driver_mut().retire().await?;
drop(driver);
if report.inputs_pending_drain > 0 {
if let Some(ref wake_tx) = entry.wake_tx {
let _ = wake_tx.send(()).await;
}
}
if entry.wake_tx.is_none() {
let mut completions = entry.completions.lock().await;
completions.resolve_all_terminated("retired without runtime loop");
}
Ok(report)
}
async fn reset_runtime(
&self,
session_id: &SessionId,
) -> Result<ResetReport, RuntimeDriverError> {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
let mut driver = entry.driver.lock().await;
if matches!(driver.as_driver().runtime_state(), RuntimeState::Running) {
return Err(RuntimeDriverError::NotReady {
state: RuntimeState::Running,
});
}
let report = driver.as_driver_mut().reset().await?;
let mut completions = entry.completions.lock().await;
completions.resolve_all_terminated("runtime reset");
Ok(report)
}
async fn input_state(
&self,
session_id: &SessionId,
input_id: &InputId,
) -> Result<Option<InputState>, RuntimeDriverError> {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
let driver = entry.driver.lock().await;
Ok(driver.as_driver().input_state(input_id).cloned())
}
async fn list_active_inputs(
&self,
session_id: &SessionId,
) -> Result<Vec<InputId>, RuntimeDriverError> {
let sessions = self.sessions.read().await;
let entry = sessions
.get(session_id)
.ok_or(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
})?;
let driver = entry.driver.lock().await;
Ok(driver.as_driver().active_input_ids())
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::input::*;
use crate::input_state::InputState;
use crate::runtime_state::RuntimeState;
use crate::store::{RuntimeStore, RuntimeStoreError, SessionDelta};
use chrono::Utc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
fn make_prompt(text: &str) -> Input {
Input::Prompt(PromptInput {
header: InputHeader {
id: InputId::new(),
timestamp: Utc::now(),
source: InputOrigin::Operator,
durability: InputDurability::Durable,
visibility: InputVisibility::default(),
idempotency_key: None,
supersession_key: None,
correlation_id: None,
},
text: text.into(),
turn_metadata: None,
})
}
struct HarnessRuntimeStore {
inner: crate::store::InMemoryRuntimeStore,
fail_atomic_apply: bool,
fail_atomic_lifecycle_commit_after: Option<usize>,
atomic_lifecycle_commit_calls: AtomicUsize,
load_input_states_delay: Duration,
fail_persist_input_state_after: Option<usize>,
persist_input_state_calls: AtomicUsize,
}
impl HarnessRuntimeStore {
fn failing_atomic_apply() -> Self {
Self {
inner: crate::store::InMemoryRuntimeStore::new(),
fail_atomic_apply: true,
fail_atomic_lifecycle_commit_after: None,
atomic_lifecycle_commit_calls: AtomicUsize::new(0),
load_input_states_delay: Duration::ZERO,
fail_persist_input_state_after: None,
persist_input_state_calls: AtomicUsize::new(0),
}
}
fn delayed_recover(delay: Duration) -> Self {
Self {
inner: crate::store::InMemoryRuntimeStore::new(),
fail_atomic_apply: false,
fail_atomic_lifecycle_commit_after: None,
atomic_lifecycle_commit_calls: AtomicUsize::new(0),
load_input_states_delay: delay,
fail_persist_input_state_after: None,
persist_input_state_calls: AtomicUsize::new(0),
}
}
fn failing_terminal_snapshot() -> Self {
Self {
inner: crate::store::InMemoryRuntimeStore::new(),
fail_atomic_apply: false,
fail_atomic_lifecycle_commit_after: Some(1),
atomic_lifecycle_commit_calls: AtomicUsize::new(0),
load_input_states_delay: Duration::ZERO,
fail_persist_input_state_after: None,
persist_input_state_calls: AtomicUsize::new(0),
}
}
}
#[async_trait::async_trait]
impl RuntimeStore for HarnessRuntimeStore {
async fn commit_session_boundary(
&self,
runtime_id: &crate::identifiers::LogicalRuntimeId,
session_delta: SessionDelta,
run_id: RunId,
boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
input_updates: Vec<InputState>,
) -> Result<meerkat_core::lifecycle::RunBoundaryReceipt, RuntimeStoreError> {
self.inner
.commit_session_boundary(
runtime_id,
session_delta,
run_id,
boundary,
contributing_input_ids,
input_updates,
)
.await
}
async fn atomic_apply(
&self,
runtime_id: &crate::identifiers::LogicalRuntimeId,
session_delta: Option<SessionDelta>,
receipt: meerkat_core::lifecycle::RunBoundaryReceipt,
input_updates: Vec<InputState>,
session_store_key: Option<meerkat_core::types::SessionId>,
) -> Result<(), RuntimeStoreError> {
if self.fail_atomic_apply {
return Err(RuntimeStoreError::WriteFailed(
"synthetic atomic_apply failure".to_string(),
));
}
self.inner
.atomic_apply(
runtime_id,
session_delta,
receipt,
input_updates,
session_store_key,
)
.await
}
async fn load_input_states(
&self,
runtime_id: &crate::identifiers::LogicalRuntimeId,
) -> Result<Vec<InputState>, RuntimeStoreError> {
if !self.load_input_states_delay.is_zero() {
tokio::time::sleep(self.load_input_states_delay).await;
}
self.inner.load_input_states(runtime_id).await
}
async fn load_boundary_receipt(
&self,
runtime_id: &crate::identifiers::LogicalRuntimeId,
run_id: &RunId,
sequence: u64,
) -> Result<Option<meerkat_core::lifecycle::RunBoundaryReceipt>, RuntimeStoreError>
{
self.inner
.load_boundary_receipt(runtime_id, run_id, sequence)
.await
}
async fn load_session_snapshot(
&self,
runtime_id: &crate::identifiers::LogicalRuntimeId,
) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
self.inner.load_session_snapshot(runtime_id).await
}
async fn persist_input_state(
&self,
runtime_id: &crate::identifiers::LogicalRuntimeId,
state: &InputState,
) -> Result<(), RuntimeStoreError> {
let call_index = self
.persist_input_state_calls
.fetch_add(1, Ordering::SeqCst);
if self
.fail_persist_input_state_after
.is_some_and(|fail_after| call_index >= fail_after)
{
return Err(RuntimeStoreError::WriteFailed(
"synthetic persist_input_state failure".to_string(),
));
}
self.inner.persist_input_state(runtime_id, state).await
}
async fn load_input_state(
&self,
runtime_id: &crate::identifiers::LogicalRuntimeId,
input_id: &InputId,
) -> Result<Option<InputState>, RuntimeStoreError> {
self.inner.load_input_state(runtime_id, input_id).await
}
async fn persist_runtime_state(
&self,
runtime_id: &crate::identifiers::LogicalRuntimeId,
state: RuntimeState,
) -> Result<(), RuntimeStoreError> {
self.inner.persist_runtime_state(runtime_id, state).await
}
async fn load_runtime_state(
&self,
runtime_id: &crate::identifiers::LogicalRuntimeId,
) -> Result<Option<RuntimeState>, RuntimeStoreError> {
self.inner.load_runtime_state(runtime_id).await
}
async fn atomic_lifecycle_commit(
&self,
runtime_id: &crate::identifiers::LogicalRuntimeId,
runtime_state: RuntimeState,
input_states: &[InputState],
) -> Result<(), RuntimeStoreError> {
let call_index = self
.atomic_lifecycle_commit_calls
.fetch_add(1, Ordering::SeqCst);
if self
.fail_atomic_lifecycle_commit_after
.is_some_and(|fail_after| call_index >= fail_after)
{
return Err(RuntimeStoreError::WriteFailed(
"synthetic atomic_lifecycle_commit failure".to_string(),
));
}
self.inner
.atomic_lifecycle_commit(runtime_id, runtime_state, input_states)
.await
}
}
#[tokio::test]
async fn ephemeral_adapter_accept_and_query() {
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
adapter.register_session(sid.clone()).await;
let input = make_prompt("hello");
let outcome = adapter.accept_input(&sid, input).await.unwrap();
assert!(outcome.is_accepted());
let state = adapter.runtime_state(&sid).await.unwrap();
assert_eq!(state, RuntimeState::Idle);
let active = adapter.list_active_inputs(&sid).await.unwrap();
assert_eq!(active.len(), 1);
}
#[tokio::test]
async fn persistent_adapter_accept() {
let store = Arc::new(crate::store::InMemoryRuntimeStore::new());
let adapter = RuntimeSessionAdapter::persistent(store);
let sid = SessionId::new();
adapter.register_session(sid.clone()).await;
let input = make_prompt("hello");
let outcome = adapter.accept_input(&sid, input).await.unwrap();
assert!(outcome.is_accepted());
}
#[tokio::test]
async fn unregistered_session_errors() {
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
let result = adapter.accept_input(&sid, make_prompt("hi")).await;
assert!(result.is_err());
}
#[tokio::test]
async fn unregister_removes_driver() {
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
adapter.register_session(sid.clone()).await;
adapter.unregister_session(&sid).await;
let result = adapter.runtime_state(&sid).await;
assert!(result.is_err());
}
#[tokio::test]
async fn accept_with_executor_triggers_loop() {
use meerkat_core::lifecycle::RunId;
use meerkat_core::lifecycle::core_executor::{
CoreApplyOutput, CoreExecutor, CoreExecutorError,
};
use meerkat_core::lifecycle::run_control::RunControlCommand;
use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
use std::sync::atomic::{AtomicBool, Ordering};
let apply_called = Arc::new(AtomicBool::new(false));
let apply_called_clone = apply_called.clone();
struct TestExecutor {
called: Arc<AtomicBool>,
}
#[async_trait::async_trait]
impl CoreExecutor for TestExecutor {
async fn apply(
&mut self,
run_id: RunId,
primitive: RunPrimitive,
) -> Result<CoreApplyOutput, CoreExecutorError> {
self.called.store(true, Ordering::SeqCst);
Ok(CoreApplyOutput {
receipt: RunBoundaryReceipt {
run_id,
boundary: RunApplyBoundary::RunStart,
contributing_input_ids: primitive.contributing_input_ids().to_vec(),
conversation_digest: None,
message_count: 0,
sequence: 0,
},
session_snapshot: None,
run_result: None,
})
}
async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
Ok(())
}
}
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
let executor = Box::new(TestExecutor {
called: apply_called_clone,
});
adapter
.register_session_with_executor(sid.clone(), executor)
.await;
let input = make_prompt("hello from executor test");
let outcome = adapter.accept_input(&sid, input).await.unwrap();
assert!(outcome.is_accepted());
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert!(
apply_called.load(Ordering::SeqCst),
"CoreExecutor::apply() should have been called by the RuntimeLoop"
);
let state = adapter.runtime_state(&sid).await.unwrap();
assert_eq!(state, RuntimeState::Idle);
let active = adapter.list_active_inputs(&sid).await.unwrap();
assert!(active.is_empty(), "All inputs should be consumed");
}
#[tokio::test]
async fn failed_executor_requeues_input() {
use crate::input_state::InputLifecycleState;
use meerkat_core::lifecycle::core_executor::{
CoreApplyOutput, CoreExecutor, CoreExecutorError,
};
use meerkat_core::lifecycle::run_control::RunControlCommand;
use meerkat_core::lifecycle::run_primitive::RunPrimitive;
struct FailingExecutor;
#[async_trait::async_trait]
impl CoreExecutor for FailingExecutor {
async fn apply(
&mut self,
_run_id: RunId,
_primitive: RunPrimitive,
) -> Result<CoreApplyOutput, CoreExecutorError> {
Err(CoreExecutorError::ApplyFailed {
reason: "LLM error".into(),
})
}
async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
Ok(())
}
}
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
adapter
.register_session_with_executor(sid.clone(), Box::new(FailingExecutor))
.await;
let input = make_prompt("hello failing");
let input_id = input.id().clone();
adapter.accept_input(&sid, input).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let state = adapter.runtime_state(&sid).await.unwrap();
assert_eq!(state, RuntimeState::Idle);
let is = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
assert_eq!(
is.current_state,
InputLifecycleState::Queued,
"Failed execution should roll input back to Queued, not strand in AppliedPendingConsumption"
);
}
#[tokio::test]
async fn failed_executor_continues_processing_backlog() {
use crate::input_state::InputLifecycleState;
use meerkat_core::lifecycle::core_executor::{
CoreApplyOutput, CoreExecutor, CoreExecutorError,
};
use meerkat_core::lifecycle::run_control::RunControlCommand;
use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
struct FailThenSucceedExecutor {
calls: Arc<AtomicUsize>,
}
#[async_trait::async_trait]
impl CoreExecutor for FailThenSucceedExecutor {
async fn apply(
&mut self,
run_id: RunId,
primitive: RunPrimitive,
) -> Result<CoreApplyOutput, CoreExecutorError> {
let call = self.calls.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(50)).await;
if call == 0 {
return Err(CoreExecutorError::ApplyFailed {
reason: "first run fails".into(),
});
}
Ok(CoreApplyOutput {
receipt: RunBoundaryReceipt {
run_id,
boundary: RunApplyBoundary::RunStart,
contributing_input_ids: primitive.contributing_input_ids().to_vec(),
conversation_digest: None,
message_count: 0,
sequence: 0,
},
session_snapshot: None,
run_result: None,
})
}
async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
Ok(())
}
}
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
let calls = Arc::new(AtomicUsize::new(0));
adapter
.register_session_with_executor(
sid.clone(),
Box::new(FailThenSucceedExecutor {
calls: Arc::clone(&calls),
}),
)
.await;
let first = make_prompt("first");
let first_id = first.id().clone();
let second = make_prompt("second");
let second_id = second.id().clone();
adapter.accept_input(&sid, first).await.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
adapter.accept_input(&sid, second).await.unwrap();
tokio::time::sleep(Duration::from_millis(220)).await;
let second_state = adapter
.input_state(&sid, &second_id)
.await
.unwrap()
.unwrap();
assert_eq!(second_state.current_state, InputLifecycleState::Consumed);
assert_eq!(
adapter.runtime_state(&sid).await.unwrap(),
RuntimeState::Idle
);
assert!(
calls.load(Ordering::SeqCst) >= 2,
"the runtime loop should keep draining queued backlog after a failed run"
);
let first_state = adapter.input_state(&sid, &first_id).await.unwrap().unwrap();
assert!(
matches!(
first_state.current_state,
InputLifecycleState::Queued | InputLifecycleState::Consumed
),
"the initially failed input should have been safely rolled back or retried after the backlog drained"
);
}
#[tokio::test]
async fn ensure_session_with_executor_upgrades_registered_session() {
use crate::input_state::InputLifecycleState;
use meerkat_core::lifecycle::RunId;
use meerkat_core::lifecycle::core_executor::{
CoreApplyOutput, CoreExecutor, CoreExecutorError,
};
use meerkat_core::lifecycle::run_control::RunControlCommand;
use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
use std::sync::atomic::{AtomicBool, Ordering};
struct SuccessExecutor {
called: Arc<AtomicBool>,
}
#[async_trait::async_trait]
impl CoreExecutor for SuccessExecutor {
async fn apply(
&mut self,
run_id: RunId,
primitive: RunPrimitive,
) -> Result<CoreApplyOutput, CoreExecutorError> {
self.called.store(true, Ordering::SeqCst);
Ok(CoreApplyOutput {
receipt: RunBoundaryReceipt {
run_id,
boundary: RunApplyBoundary::RunStart,
contributing_input_ids: primitive.contributing_input_ids().to_vec(),
conversation_digest: None,
message_count: 0,
sequence: 0,
},
session_snapshot: None,
run_result: None,
})
}
async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
Ok(())
}
}
let apply_called = Arc::new(AtomicBool::new(false));
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
adapter.register_session(sid.clone()).await;
let input = make_prompt("upgrade me");
let input_id = input.id().clone();
let outcome = adapter.accept_input(&sid, input).await.unwrap();
assert!(outcome.is_accepted());
adapter
.ensure_session_with_executor(
sid.clone(),
Box::new(SuccessExecutor {
called: Arc::clone(&apply_called),
}),
)
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert!(
apply_called.load(Ordering::SeqCst),
"upgrading an already-registered session should attach a live loop"
);
let state = adapter.runtime_state(&sid).await.unwrap();
assert_eq!(state, RuntimeState::Idle);
let active = adapter.list_active_inputs(&sid).await.unwrap();
assert!(active.is_empty(), "queued work should drain after upgrade");
let is = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
assert_eq!(
is.current_state,
InputLifecycleState::Consumed,
"the pre-upgrade queued input should be processed once the loop is attached"
);
}
#[tokio::test]
async fn ensure_session_with_executor_upgrades_racy_registration() {
use crate::input_state::InputLifecycleState;
use meerkat_core::lifecycle::RunId;
use meerkat_core::lifecycle::core_executor::{
CoreApplyOutput, CoreExecutor, CoreExecutorError,
};
use meerkat_core::lifecycle::run_control::RunControlCommand;
use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
struct SuccessExecutor {
called: Arc<AtomicBool>,
}
#[async_trait::async_trait]
impl CoreExecutor for SuccessExecutor {
async fn apply(
&mut self,
run_id: RunId,
primitive: RunPrimitive,
) -> Result<CoreApplyOutput, CoreExecutorError> {
self.called.store(true, Ordering::SeqCst);
Ok(CoreApplyOutput {
receipt: RunBoundaryReceipt {
run_id,
boundary: RunApplyBoundary::RunStart,
contributing_input_ids: primitive.contributing_input_ids().to_vec(),
conversation_digest: None,
message_count: 0,
sequence: 0,
},
session_snapshot: None,
run_result: None,
})
}
async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
Ok(())
}
}
let store = Arc::new(HarnessRuntimeStore::delayed_recover(Duration::from_millis(
75,
)));
let adapter = Arc::new(RuntimeSessionAdapter::persistent(store));
let sid = SessionId::new();
let apply_called = Arc::new(AtomicBool::new(false));
let ensure_task = {
let adapter = Arc::clone(&adapter);
let sid = sid.clone();
let apply_called = Arc::clone(&apply_called);
tokio::spawn(async move {
adapter
.ensure_session_with_executor(
sid,
Box::new(SuccessExecutor {
called: apply_called,
}),
)
.await;
})
};
tokio::time::sleep(Duration::from_millis(10)).await;
adapter.register_session(sid.clone()).await;
ensure_task.await.unwrap();
let input = make_prompt("race upgrade");
let input_id = input.id().clone();
adapter.accept_input(&sid, input).await.unwrap();
tokio::time::sleep(Duration::from_millis(120)).await;
assert!(
apply_called.load(Ordering::SeqCst),
"the racy registration path should still attach a live runtime loop"
);
let state = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
assert_eq!(state.current_state, InputLifecycleState::Consumed);
}
#[tokio::test]
async fn boundary_commit_failure_unwinds_sync_runtime_state() {
use crate::input_state::InputLifecycleState;
use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
let store = Arc::new(HarnessRuntimeStore::failing_atomic_apply());
let adapter = RuntimeSessionAdapter::persistent(store);
let sid = SessionId::new();
adapter.register_session(sid.clone()).await;
let input = make_prompt("sync boundary failure");
let input_id = input.id().clone();
let result = adapter
.accept_input_and_run(&sid, input, move |run_id, primitive| async move {
Ok((
(),
CoreApplyOutput {
receipt: RunBoundaryReceipt {
run_id,
boundary: RunApplyBoundary::RunStart,
contributing_input_ids: primitive.contributing_input_ids().to_vec(),
conversation_digest: None,
message_count: 0,
sequence: 0,
},
session_snapshot: None,
run_result: None,
},
))
})
.await;
assert!(result.is_err(), "boundary commit failure should surface");
let Err(err) = result else {
unreachable!("asserted runtime boundary commit failure above");
};
assert!(
err.to_string().contains("runtime boundary commit failed"),
"unexpected error: {err}"
);
assert_eq!(
adapter.runtime_state(&sid).await.unwrap(),
RuntimeState::Idle
);
let state = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
assert_eq!(state.current_state, InputLifecycleState::Queued);
}
#[tokio::test]
async fn boundary_commit_failure_unwinds_runtime_loop_state() {
use crate::input_state::InputLifecycleState;
use meerkat_core::lifecycle::core_executor::{
CoreApplyOutput, CoreExecutor, CoreExecutorError,
};
use meerkat_core::lifecycle::run_control::RunControlCommand;
use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
struct SuccessExecutor {
stop_called: Arc<AtomicBool>,
}
#[async_trait::async_trait]
impl CoreExecutor for SuccessExecutor {
async fn apply(
&mut self,
run_id: RunId,
primitive: RunPrimitive,
) -> Result<CoreApplyOutput, CoreExecutorError> {
Ok(CoreApplyOutput {
receipt: RunBoundaryReceipt {
run_id,
boundary: RunApplyBoundary::RunStart,
contributing_input_ids: primitive.contributing_input_ids().to_vec(),
conversation_digest: None,
message_count: 0,
sequence: 0,
},
session_snapshot: None,
run_result: None,
})
}
async fn control(&mut self, cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
if matches!(cmd, RunControlCommand::StopRuntimeExecutor { .. }) {
self.stop_called.store(true, Ordering::SeqCst);
}
Ok(())
}
}
let store = Arc::new(HarnessRuntimeStore::failing_atomic_apply());
let adapter = RuntimeSessionAdapter::persistent(store);
let sid = SessionId::new();
let stop_called = Arc::new(AtomicBool::new(false));
adapter
.register_session_with_executor(
sid.clone(),
Box::new(SuccessExecutor {
stop_called: Arc::clone(&stop_called),
}),
)
.await;
let input = make_prompt("loop boundary failure");
let input_id = input.id().clone();
adapter.accept_input(&sid, input).await.unwrap();
tokio::time::sleep(Duration::from_millis(120)).await;
assert!(
stop_called.load(Ordering::SeqCst),
"boundary commit failures should stop the dead executor path"
);
assert_eq!(
adapter.runtime_state(&sid).await.unwrap(),
RuntimeState::Idle
);
let state = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
assert_eq!(state.current_state, InputLifecycleState::Queued);
}
#[tokio::test]
async fn terminal_snapshot_failure_unregisters_runtime_loop_session() {
use meerkat_core::lifecycle::core_executor::{
CoreApplyOutput, CoreExecutor, CoreExecutorError,
};
use meerkat_core::lifecycle::run_control::RunControlCommand;
use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
struct SuccessExecutor {
adapter: Arc<RuntimeSessionAdapter>,
session_id: SessionId,
stop_called: Arc<AtomicBool>,
}
#[async_trait::async_trait]
impl CoreExecutor for SuccessExecutor {
async fn apply(
&mut self,
run_id: RunId,
primitive: RunPrimitive,
) -> Result<CoreApplyOutput, CoreExecutorError> {
Ok(CoreApplyOutput {
receipt: RunBoundaryReceipt {
run_id,
boundary: RunApplyBoundary::RunStart,
contributing_input_ids: primitive.contributing_input_ids().to_vec(),
conversation_digest: None,
message_count: 0,
sequence: 0,
},
session_snapshot: None,
run_result: None,
})
}
async fn control(&mut self, cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
if matches!(cmd, RunControlCommand::StopRuntimeExecutor { .. }) {
self.stop_called.store(true, Ordering::SeqCst);
self.adapter.unregister_session(&self.session_id).await;
}
Ok(())
}
}
let store = Arc::new(HarnessRuntimeStore::failing_terminal_snapshot());
let adapter = Arc::new(RuntimeSessionAdapter::persistent(store));
let sid = SessionId::new();
let stop_called = Arc::new(AtomicBool::new(false));
adapter
.register_session_with_executor(
sid.clone(),
Box::new(SuccessExecutor {
adapter: Arc::clone(&adapter),
session_id: sid.clone(),
stop_called: Arc::clone(&stop_called),
}),
)
.await;
adapter
.accept_input(&sid, make_prompt("terminal snapshot failure"))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(120)).await;
assert!(
stop_called.load(Ordering::SeqCst),
"terminal snapshot persistence failures should stop the runtime loop"
);
let state_result = adapter.runtime_state(&sid).await;
assert!(
state_result.is_err(),
"stopped runtime sessions should be unregistered"
);
let Err(err) = state_result else {
unreachable!("asserted stopped runtime unregistration above");
};
assert!(matches!(
err,
RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed
}
));
}
#[tokio::test]
async fn terminal_snapshot_failure_unregisters_sync_runtime_session() {
use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
let store = Arc::new(HarnessRuntimeStore::failing_terminal_snapshot());
let adapter = RuntimeSessionAdapter::persistent(store);
let sid = SessionId::new();
adapter.register_session(sid.clone()).await;
let result = adapter
.accept_input_and_run(
&sid,
make_prompt("sync terminal snapshot failure"),
move |run_id, primitive| async move {
Ok((
(),
CoreApplyOutput {
receipt: RunBoundaryReceipt {
run_id,
boundary: RunApplyBoundary::RunStart,
contributing_input_ids: primitive.contributing_input_ids().to_vec(),
conversation_digest: None,
message_count: 0,
sequence: 0,
},
session_snapshot: None,
run_result: None,
},
))
},
)
.await;
assert!(
result.is_err(),
"terminal snapshot persistence failure should surface"
);
let Err(err) = result else {
unreachable!("asserted terminal snapshot failure above");
};
assert!(
err.to_string().contains("terminal event persist failed")
|| err
.to_string()
.contains("failed to persist runtime completion snapshot"),
"unexpected error: {err}"
);
let runtime_state = adapter.runtime_state(&sid).await;
assert!(
matches!(
runtime_state,
Err(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed
})
),
"sync path should unregister the broken runtime session"
);
}
#[tokio::test]
async fn dedup_terminal_input_returns_none_handle() {
use crate::identifiers::IdempotencyKey;
use meerkat_core::lifecycle::core_executor::{
CoreApplyOutput, CoreExecutor, CoreExecutorError,
};
use meerkat_core::lifecycle::run_control::RunControlCommand;
use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
use meerkat_core::types::{RunResult, Usage};
struct ResultExecutor;
#[async_trait::async_trait]
impl CoreExecutor for ResultExecutor {
async fn apply(
&mut self,
run_id: RunId,
primitive: RunPrimitive,
) -> Result<CoreApplyOutput, CoreExecutorError> {
Ok(CoreApplyOutput {
receipt: RunBoundaryReceipt {
run_id,
boundary: RunApplyBoundary::RunStart,
contributing_input_ids: primitive.contributing_input_ids().to_vec(),
conversation_digest: None,
message_count: 0,
sequence: 0,
},
session_snapshot: None,
run_result: Some(RunResult {
text: "done".into(),
session_id: SessionId::new(),
usage: Usage::default(),
turns: 1,
tool_calls: 0,
structured_output: None,
schema_warnings: None,
skill_diagnostics: None,
}),
})
}
async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
Ok(())
}
}
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
adapter
.register_session_with_executor(sid.clone(), Box::new(ResultExecutor))
.await;
let key = IdempotencyKey::new("gate-a2");
let mut input1 = make_prompt("first");
if let Input::Prompt(ref mut p) = input1 {
p.header.idempotency_key = Some(key.clone());
}
let (outcome1, handle1) = adapter
.accept_input_with_completion(&sid, input1)
.await
.unwrap();
assert!(outcome1.is_accepted());
assert!(handle1.is_some(), "accepted input should have a handle");
let result = handle1.unwrap().wait().await;
assert!(
matches!(result, crate::completion::CompletionOutcome::Completed(_)),
"first input should complete successfully"
);
let mut input2 = make_prompt("duplicate");
if let Input::Prompt(ref mut p) = input2 {
p.header.idempotency_key = Some(key);
}
let (outcome2, handle2) = adapter
.accept_input_with_completion(&sid, input2)
.await
.unwrap();
assert!(
outcome2.is_deduplicated(),
"second input with same key should be deduplicated"
);
assert!(
handle2.is_none(),
"dedup on terminal input should return None handle"
);
}
#[tokio::test]
async fn dedup_inflight_input_returns_handle_that_resolves() {
use crate::identifiers::IdempotencyKey;
use meerkat_core::lifecycle::core_executor::{
CoreApplyOutput, CoreExecutor, CoreExecutorError,
};
use meerkat_core::lifecycle::run_control::RunControlCommand;
use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
use meerkat_core::types::{RunResult, Usage};
struct SlowExecutor;
#[async_trait::async_trait]
impl CoreExecutor for SlowExecutor {
async fn apply(
&mut self,
run_id: RunId,
primitive: RunPrimitive,
) -> Result<CoreApplyOutput, CoreExecutorError> {
tokio::time::sleep(Duration::from_millis(200)).await;
Ok(CoreApplyOutput {
receipt: RunBoundaryReceipt {
run_id,
boundary: RunApplyBoundary::RunStart,
contributing_input_ids: primitive.contributing_input_ids().to_vec(),
conversation_digest: None,
message_count: 0,
sequence: 0,
},
session_snapshot: None,
run_result: Some(RunResult {
text: "slow done".into(),
session_id: SessionId::new(),
usage: Usage::default(),
turns: 1,
tool_calls: 0,
structured_output: None,
schema_warnings: None,
skill_diagnostics: None,
}),
})
}
async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
Ok(())
}
}
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
adapter
.register_session_with_executor(sid.clone(), Box::new(SlowExecutor))
.await;
let key = IdempotencyKey::new("gate-a3");
let mut input1 = make_prompt("original");
if let Input::Prompt(ref mut p) = input1 {
p.header.idempotency_key = Some(key.clone());
}
let (outcome1, handle1) = adapter
.accept_input_with_completion(&sid, input1)
.await
.unwrap();
assert!(outcome1.is_accepted());
tokio::time::sleep(Duration::from_millis(50)).await;
let mut input2 = make_prompt("duplicate");
if let Input::Prompt(ref mut p) = input2 {
p.header.idempotency_key = Some(key);
}
let (outcome2, handle2) = adapter
.accept_input_with_completion(&sid, input2)
.await
.unwrap();
assert!(
outcome2.is_deduplicated(),
"second input should be deduplicated"
);
assert!(
handle2.is_some(),
"dedup on in-flight input should return Some(handle)"
);
let result1 = handle1.unwrap().wait().await;
let result2 = handle2.unwrap().wait().await;
assert!(
matches!(result1, crate::completion::CompletionOutcome::Completed(ref r) if r.text == "slow done"),
"original handle should complete with result"
);
assert!(
matches!(result2, crate::completion::CompletionOutcome::Completed(ref r) if r.text == "slow done"),
"duplicate handle should also complete with same result"
);
}
#[tokio::test]
async fn completion_handle_resolves_without_result() {
use meerkat_core::lifecycle::core_executor::{
CoreApplyOutput, CoreExecutor, CoreExecutorError,
};
use meerkat_core::lifecycle::run_control::RunControlCommand;
use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
struct NoResultExecutor;
#[async_trait::async_trait]
impl CoreExecutor for NoResultExecutor {
async fn apply(
&mut self,
run_id: RunId,
primitive: RunPrimitive,
) -> Result<CoreApplyOutput, CoreExecutorError> {
Ok(CoreApplyOutput {
receipt: RunBoundaryReceipt {
run_id,
boundary: RunApplyBoundary::RunStart,
contributing_input_ids: primitive.contributing_input_ids().to_vec(),
conversation_digest: None,
message_count: 0,
sequence: 0,
},
session_snapshot: None,
run_result: None, })
}
async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
Ok(())
}
}
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
adapter
.register_session_with_executor(sid.clone(), Box::new(NoResultExecutor))
.await;
let input = make_prompt("context append");
let (outcome, handle) = adapter
.accept_input_with_completion(&sid, input)
.await
.unwrap();
assert!(outcome.is_accepted());
let result = handle.unwrap().wait().await;
assert!(
matches!(
result,
crate::completion::CompletionOutcome::CompletedWithoutResult
),
"executor returning run_result: None should resolve as CompletedWithoutResult, got {result:?}"
);
}
#[tokio::test]
async fn reset_runtime_resolves_pending_waiters() {
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
adapter.register_session(sid.clone()).await;
let input = make_prompt("pending");
let (outcome, handle) = adapter
.accept_input_with_completion(&sid, input)
.await
.unwrap();
assert!(outcome.is_accepted());
assert!(handle.is_some());
adapter.reset_runtime(&sid).await.unwrap();
let result = handle.unwrap().wait().await;
assert!(
matches!(
result,
crate::completion::CompletionOutcome::RuntimeTerminated(_)
),
"reset should resolve pending waiters as terminated, got {result:?}"
);
}
#[tokio::test]
async fn retire_without_loop_resolves_waiters() {
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
adapter.register_session(sid.clone()).await;
let input = make_prompt("will be retired");
let (outcome, handle) = adapter
.accept_input_with_completion(&sid, input)
.await
.unwrap();
assert!(outcome.is_accepted());
assert!(handle.is_some());
adapter.retire_runtime(&sid).await.unwrap();
let result = handle.unwrap().wait().await;
assert!(
matches!(
result,
crate::completion::CompletionOutcome::RuntimeTerminated(_)
),
"retire without loop should resolve pending waiters as terminated, got {result:?}"
);
}
#[tokio::test]
async fn successful_execution_fires_boundary_applied() {
use crate::input_state::InputLifecycleState;
use meerkat_core::lifecycle::RunId;
use meerkat_core::lifecycle::core_executor::{
CoreApplyOutput, CoreExecutor, CoreExecutorError,
};
use meerkat_core::lifecycle::run_control::RunControlCommand;
use meerkat_core::lifecycle::run_primitive::{RunApplyBoundary, RunPrimitive};
use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
struct SuccessExecutor;
#[async_trait::async_trait]
impl CoreExecutor for SuccessExecutor {
async fn apply(
&mut self,
run_id: RunId,
primitive: RunPrimitive,
) -> Result<CoreApplyOutput, CoreExecutorError> {
Ok(CoreApplyOutput {
receipt: RunBoundaryReceipt {
run_id,
boundary: RunApplyBoundary::RunStart,
contributing_input_ids: primitive.contributing_input_ids().to_vec(),
conversation_digest: None,
message_count: 0,
sequence: 0,
},
session_snapshot: None,
run_result: None,
})
}
async fn control(&mut self, _cmd: RunControlCommand) -> Result<(), CoreExecutorError> {
Ok(())
}
}
let adapter = RuntimeSessionAdapter::ephemeral();
let sid = SessionId::new();
adapter
.register_session_with_executor(sid.clone(), Box::new(SuccessExecutor))
.await;
let input = make_prompt("hello success");
let input_id = input.id().clone();
adapter.accept_input(&sid, input).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let is = adapter.input_state(&sid, &input_id).await.unwrap().unwrap();
assert_eq!(
is.current_state,
InputLifecycleState::Consumed,
"Successful execution should consume the input"
);
let state = adapter.runtime_state(&sid).await.unwrap();
assert_eq!(state, RuntimeState::Idle);
}
}