use async_trait::async_trait;
use indexmap::IndexMap;
#[cfg(test)]
use meerkat_core::SessionSystemContextState;
use meerkat_core::error::AgentError;
use meerkat_core::event::{AgentEvent, EventEnvelope, EventSourceIdentity};
use meerkat_core::image_content::{MissingBlobBehavior, hydrate_deferred_turn_state};
use meerkat_core::lifecycle::core_executor::{CoreApplyOutput, CoreApplyTerminal};
use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
use meerkat_core::lifecycle::run_receipt::RunBoundaryReceiptDraft;
use meerkat_core::service::{
AppendSystemContextRequest, AppendSystemContextResult, CreateSessionRequest,
DeferredPromptPolicy, MobToolAuthorityContext, SessionControlError, SessionError,
SessionHistoryPage, SessionHistoryQuery, SessionInfo, SessionQuery, SessionService,
SessionServiceCommsExt, SessionServiceControlExt, SessionServiceHistoryExt, SessionSummary,
SessionUsage, SessionView, StageToolResultsRequest, StageToolResultsResult, StartTurnRequest,
TurnToolOverlay,
};
use meerkat_core::session_document::{
SessionDocumentEffect, SessionDocumentKey, SessionDocumentMachineAuthority,
};
use meerkat_core::time_compat::SystemTime;
use meerkat_core::types::{ContentInput, RunResult, SessionId, ToolResult, Usage};
use meerkat_core::{
CancelAfterBoundaryCommand, CancelAfterBoundarySender, ConsumedDeferredTurnInputs,
DeferredFirstTurnPhase, InputId, PendingSystemContextAppend, RealtimeTranscriptApplyOutcome,
RealtimeTranscriptEvent, RealtimeTranscriptMaterializedMessage, RunId,
SessionDeferredTurnState, SessionLlmIdentity, SnapshotProjectionError, SystemContextStateError,
TurnPhase, TurnStateHandle, TurnStateSnapshot,
};
use sha2::{Digest, Sha256};
use std::collections::BTreeMap;
use std::sync::Arc;
#[cfg(target_arch = "wasm32")]
use crate::tokio;
#[cfg(target_arch = "wasm32")]
use crate::tokio::sync::{OwnedSemaphorePermit, RwLock, mpsc, oneshot, watch};
#[cfg(not(target_arch = "wasm32"))]
use tokio::sync::{OwnedSemaphorePermit, RwLock, mpsc, oneshot, watch};
#[cfg(test)]
use crate::staged_registry::MaterializationStatus;
use crate::staged_registry::{PromotionTicket, StagedSessionRegistry};
pub use crate::turn_admission::ObservedSessionTailKind;
use crate::turn_admission::{
RuntimeKeepAliveRequest, StartTurnDispatchAuthorization, StartTurnDisposition,
StartTurnPublicTerminal, TurnAdmissionPhase, TurnAdmissionProjection, TurnAdmissionSlot,
};
const EVENT_CHANNEL_CAPACITY: usize = 256;
const COMMAND_CHANNEL_CAPACITY: usize = 8;
type SessionState = TurnAdmissionProjection;
#[derive(Debug, Clone)]
pub struct SessionSnapshot {
pub created_at: SystemTime,
pub updated_at: SystemTime,
pub message_count: usize,
pub total_tokens: u64,
pub usage: Usage,
pub last_assistant_text: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ActiveTurnBoundaryStagingToken {
pub(crate) active_run_id: RunId,
pub(crate) boundary_count: u64,
}
enum SessionCommand {
StartTurn {
prompt: meerkat_core::types::ContentInput,
runtime: Box<meerkat_core::service::StartTurnRuntimeSemantics>,
event_tx: Option<mpsc::Sender<EventEnvelope<AgentEvent>>>,
result_tx: oneshot::Sender<Result<RunResult, meerkat_core::error::AgentError>>,
active_admission: Option<RuntimeContextAdmissionGuard>,
},
ReplaceClient {
client: Arc<dyn meerkat_core::AgentLlmClient>,
reply_tx: oneshot::Sender<()>,
},
HotSwapLlmIdentity {
client: Arc<dyn meerkat_core::AgentLlmClient>,
identity: Box<SessionLlmIdentity>,
request_policy: Box<meerkat_core::SessionLlmRequestPolicy>,
reply_tx: oneshot::Sender<Result<(), meerkat_core::error::AgentError>>,
},
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
UpdateKeepAlive {
keep_alive: bool,
reply_tx: oneshot::Sender<()>,
},
StageToolFilter {
filter: meerkat_core::ToolFilter,
reply_tx: oneshot::Sender<Result<(), meerkat_core::error::AgentError>>,
},
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
SetToolVisibilityState {
state: Option<Box<meerkat_core::SessionToolVisibilityState>>,
reply_tx: oneshot::Sender<Result<(), meerkat_core::error::AgentError>>,
},
SyncSystemContextState {
reply_tx: oneshot::Sender<Result<(), SystemContextStateError>>,
},
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
SyncSessionFromDurableSnapshot {
session: Box<meerkat_core::Session>,
reply_tx: oneshot::Sender<Result<(), meerkat_core::error::AgentError>>,
},
ExportSession {
reply_tx: oneshot::Sender<Result<meerkat_core::Session, SystemContextStateError>>,
},
ExecutionSnapshot {
reply_tx: oneshot::Sender<
Result<Option<meerkat_core::AgentExecutionSnapshot>, SnapshotProjectionError>,
>,
},
ToolScopeSnapshot {
reply_tx: oneshot::Sender<Option<meerkat_core::ToolScopeSnapshot>>,
},
VisibleToolDefs {
reply_tx: oneshot::Sender<Vec<meerkat_core::ToolDef>>,
},
ExternalToolSurfaceSnapshot {
reply_tx: oneshot::Sender<Option<meerkat_core::ExternalToolSurfaceSnapshot>>,
},
ApplyRuntimeSystemContext {
appends: Vec<PendingSystemContextAppend>,
reply_tx: oneshot::Sender<()>,
},
ApplyRuntimeSystemContextForTurn {
appends: Vec<PendingSystemContextAppend>,
reply_tx: oneshot::Sender<()>,
},
PublishRuntimeSystemContextEvents {
appends: Vec<PendingSystemContextAppend>,
reply_tx: oneshot::Sender<()>,
},
RecordLiveTerminalError {
cause: meerkat_core::live_adapter::LiveAdapterErrorCode,
reply_tx: oneshot::Sender<()>,
},
RecordLiveOutputAudioDegraded {
dropped: u64,
reply_tx: oneshot::Sender<()>,
},
AppendExternalUserContent {
content: ContentInput,
reply_tx: oneshot::Sender<Result<(), meerkat_core::error::AgentError>>,
},
AppendExternalAssistantOutput {
blocks: Vec<meerkat_core::types::AssistantBlock>,
stop_reason: meerkat_core::types::StopReason,
usage: Usage,
reply_tx: oneshot::Sender<Result<(), meerkat_core::error::AgentError>>,
},
AppendRealtimeTranscriptEvent {
event: RealtimeTranscriptEvent,
reply_tx: oneshot::Sender<
Result<RealtimeTranscriptApplyOutcome, meerkat_core::error::AgentError>,
>,
},
DispatchExternalToolCall {
call: meerkat_core::ToolCall,
timeout_policy: meerkat_core::ToolDispatchTimeoutPolicy,
reply_tx: oneshot::Sender<
Result<meerkat_core::ops::ToolDispatchOutcome, meerkat_core::error::AgentError>,
>,
},
UpdateMobToolAuthority {
authority_context: Option<MobToolAuthorityContext>,
reply_tx: oneshot::Sender<Result<(), meerkat_core::error::AgentError>>,
},
UpdateSystemPrompt {
system_prompt: String,
reply_tx: oneshot::Sender<Result<(), meerkat_core::error::AgentError>>,
},
Shutdown,
}
#[derive(Clone)]
struct SessionSummaryCache {
updated_at: SystemTime,
message_count: usize,
total_tokens: u64,
usage: Usage,
last_assistant_text: Option<String>,
}
struct SessionHandle {
command_tx: mpsc::Sender<SessionCommand>,
state_tx: watch::Sender<SessionState>,
state_rx: watch::Receiver<SessionState>,
summary_rx: watch::Receiver<SessionSummaryCache>,
llm_identity_rx: watch::Receiver<SessionLlmIdentity>,
turn_admission: Arc<std::sync::Mutex<TurnAdmissionSlot>>,
created_at: SystemTime,
labels: BTreeMap<String, String>,
event_injector: Option<Arc<dyn meerkat_core::EventInjector>>,
interaction_event_injector: Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>>,
comms_runtime: Option<Arc<dyn meerkat_core::agent::CommsRuntime>>,
system_context_state: meerkat_core::SystemContextStateHandle,
turn_state_handle: Option<Arc<dyn TurnStateHandle>>,
deferred_turn_state: Arc<std::sync::Mutex<SessionDeferredTurnState>>,
active_capacity_lease: Arc<std::sync::Mutex<SessionActiveCapacityLease>>,
interrupt_notify: Arc<tokio::sync::Notify>,
cancel_after_boundary_handle: Option<CancelAfterBoundarySender>,
session_event_tx: tokio::sync::broadcast::Sender<EventEnvelope<AgentEvent>>,
}
pub struct RuntimeContextAdmissionGuard {
active_capacity_lease: Option<Arc<std::sync::Mutex<SessionActiveCapacityLease>>>,
active_permit: Option<OwnedSemaphorePermit>,
}
#[derive(Default)]
struct SessionActiveCapacityLease {
permit: Option<OwnedSemaphorePermit>,
leases: usize,
promotion: Option<PromotionTicket>,
}
#[derive(Default)]
struct ActiveCapacityLeaseRelease {
permit: Option<OwnedSemaphorePermit>,
promotion: Option<PromotionTicket>,
}
impl ActiveCapacityLeaseRelease {
fn settle(self) {
match self.promotion {
Some(ticket) => ticket.settle(self.permit),
None => drop(self.permit),
}
}
}
impl Drop for RuntimeContextAdmissionGuard {
fn drop(&mut self) {
if let Some(active_capacity_lease) = self.active_capacity_lease.take() {
release_active_capacity_lease(&active_capacity_lease).settle();
}
}
}
impl RuntimeContextAdmissionGuard {
fn commit_promotion(&self) {
let Some(active_capacity_lease) = self.active_capacity_lease.as_ref() else {
return;
};
let ticket = lock_active_capacity_lease(active_capacity_lease)
.promotion
.take();
if let Some(ticket) = ticket {
ticket.commit();
}
}
pub(crate) fn into_create_session_permit(mut self) -> Option<OwnedSemaphorePermit> {
if let Some(active_capacity_lease) = self.active_capacity_lease.take() {
let released = release_active_capacity_lease(&active_capacity_lease);
if let Some(ticket) = released.promotion {
ticket.commit();
}
return released.permit;
}
self.active_permit.take()
}
}
struct SessionTaskControl {
state_tx: watch::Sender<SessionState>,
summary_tx: watch::Sender<SessionSummaryCache>,
llm_identity_tx: watch::Sender<SessionLlmIdentity>,
turn_admission: Arc<std::sync::Mutex<TurnAdmissionSlot>>,
interrupt_notify: Arc<tokio::sync::Notify>,
session_event_tx: tokio::sync::broadcast::Sender<EventEnvelope<AgentEvent>>,
session_context: Option<Arc<dyn meerkat_core::handles::SessionContextHandle>>,
}
impl SessionTaskControl {
fn advance_session_context_at(&self, observed_at: SystemTime, reason: &'static str) {
let Some(handle) = self.session_context.as_ref() else {
return;
};
let observed_ms = summary_updated_at_ms(observed_at);
let current_ms = handle.current_watermark_ms();
let updated_at_ms = observed_ms.max(current_ms.saturating_add(1));
if let Err(err) = handle.context_advanced(updated_at_ms) {
tracing::debug!(
error = %err,
reason,
"AdvanceSessionContext rejected by DSL; projection refresh will rely on later ticks"
);
}
}
fn publish_summary(&self, snapshot: SessionSummaryCache) {
let updated_at = snapshot.updated_at;
self.summary_tx.send_replace(snapshot);
self.advance_session_context_at(updated_at, "summary");
}
fn publish_committed_runtime_context_summary(&self, snapshot: SessionSummaryCache) {
self.summary_tx.send_replace(snapshot);
self.advance_session_context_at(SystemTime::now(), "committed_runtime_system_context");
}
}
fn summary_updated_at_ms(updated_at: SystemTime) -> u64 {
updated_at
.duration_since(meerkat_core::time_compat::UNIX_EPOCH)
.map(|d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX))
.unwrap_or(0)
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait SessionAgentBuilder: Send + Sync {
#[cfg(not(target_arch = "wasm32"))]
type Agent: SessionAgent + Send + 'static;
#[cfg(target_arch = "wasm32")]
type Agent: SessionAgent + 'static;
async fn model_supports_inline_video(&self, identity: &SessionLlmIdentity) -> Option<bool> {
meerkat_core::model_profile::inline_video_support_for(identity.provider, &identity.model)
}
async fn build_agent(
&self,
req: &CreateSessionRequest,
event_tx: mpsc::Sender<AgentEvent>,
) -> Result<Self::Agent, SessionError>;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait SessionAgent: Send {
async fn run_with_events(
&mut self,
prompt: meerkat_core::types::ContentInput,
event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, meerkat_core::error::AgentError>;
async fn run_turn_with_events(
&mut self,
prompt: meerkat_core::types::ContentInput,
handling_mode: meerkat_core::types::HandlingMode,
render_metadata: Option<meerkat_core::types::RenderMetadata>,
typed_turn_appends: Vec<meerkat_core::lifecycle::run_primitive::ConversationAppend>,
_execution_kind: Option<meerkat_core::lifecycle::RuntimeExecutionKind>,
event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, meerkat_core::error::AgentError> {
if handling_mode != meerkat_core::types::HandlingMode::Queue {
return Err(meerkat_core::error::AgentError::ConfigError(format!(
"handling_mode {handling_mode:?} requires a runtime-backed surface",
)));
}
if render_metadata.is_some() {
return Err(meerkat_core::error::AgentError::ConfigError(
"render_metadata requires a runtime-backed surface".to_string(),
));
}
if !typed_turn_appends.is_empty() {
return Err(meerkat_core::error::AgentError::ConfigError(
"typed turn appends require a runtime-backed surface".to_string(),
));
}
self.run_with_events(prompt, event_tx).await
}
async fn run_pending_with_events(
&mut self,
_execution_kind: Option<meerkat_core::lifecycle::RuntimeExecutionKind>,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, meerkat_core::error::AgentError> {
Err(meerkat_core::error::AgentError::ConfigError(
"run_pending_with_events is not supported by this session agent".to_string(),
))
}
fn set_skill_references(&mut self, refs: Option<Vec<meerkat_core::skills::SkillKey>>);
fn set_flow_tool_overlay(
&mut self,
overlay: Option<TurnToolOverlay>,
) -> Result<(), meerkat_core::error::AgentError>;
fn apply_pending_tool_results(
&mut self,
results: Vec<meerkat_core::ToolResult>,
) -> Result<(), meerkat_core::error::AgentError> {
if results.is_empty() {
return Ok(());
}
Err(meerkat_core::error::AgentError::ConfigError(
"staged tool-result continuations are not supported by this session agent".to_string(),
))
}
fn replace_client(&mut self, _client: std::sync::Arc<dyn meerkat_core::AgentLlmClient>) {}
fn hot_swap_llm_identity(
&mut self,
client: std::sync::Arc<dyn meerkat_core::AgentLlmClient>,
identity: SessionLlmIdentity,
request_policy: meerkat_core::SessionLlmRequestPolicy,
) -> Result<(), meerkat_core::error::AgentError>;
fn stage_external_tool_filter(
&mut self,
_filter: meerkat_core::ToolFilter,
) -> Result<(), meerkat_core::error::AgentError> {
Ok(())
}
fn set_tool_visibility_state(
&mut self,
_state: Option<meerkat_core::SessionToolVisibilityState>,
) -> Result<(), meerkat_core::error::AgentError> {
Err(meerkat_core::error::AgentError::ConfigError(
"tool visibility updates are not supported by this session agent".to_string(),
))
}
async fn dispatch_external_tool_call(
&mut self,
_call: meerkat_core::ToolCall,
) -> Result<meerkat_core::ops::ToolDispatchOutcome, meerkat_core::error::AgentError> {
Err(meerkat_core::error::AgentError::ConfigError(
"external live tool dispatch is not supported by this session agent".to_string(),
))
}
async fn dispatch_external_tool_call_with_timeout_policy(
&mut self,
call: meerkat_core::ToolCall,
_timeout_policy: meerkat_core::ToolDispatchTimeoutPolicy,
) -> Result<meerkat_core::ops::ToolDispatchOutcome, meerkat_core::error::AgentError> {
self.dispatch_external_tool_call(call).await
}
fn cancel(&mut self);
fn cancel_after_boundary_handle(&self) -> Option<CancelAfterBoundarySender> {
None
}
fn turn_state_handle(&self) -> Option<Arc<dyn TurnStateHandle>> {
None
}
fn session_context_handle(
&self,
) -> Option<Arc<dyn meerkat_core::handles::SessionContextHandle>> {
None
}
fn session_id(&self) -> SessionId;
fn snapshot(&self) -> SessionSnapshot;
fn execution_snapshot(
&self,
) -> Result<Option<meerkat_core::AgentExecutionSnapshot>, SnapshotProjectionError> {
Ok(None)
}
fn tool_scope_snapshot(&self) -> Option<meerkat_core::ToolScopeSnapshot> {
None
}
fn visible_tool_defs(&self) -> Vec<meerkat_core::ToolDef> {
Vec::new()
}
fn external_tool_surface_snapshot(&self) -> Option<meerkat_core::ExternalToolSurfaceSnapshot> {
None
}
fn session_clone(&self) -> Result<meerkat_core::Session, SystemContextStateError>;
fn durable_llm_identity(&self) -> Option<SessionLlmIdentity> {
None
}
fn observed_session_tail(&self) -> ObservedSessionTailKind;
fn update_keep_alive(&mut self, _keep_alive: bool) {}
fn update_mob_tool_authority_context(
&mut self,
_authority_context: Option<MobToolAuthorityContext>,
) -> Result<(), meerkat_core::error::AgentError> {
Err(meerkat_core::error::AgentError::ConfigError(
"mob tool authority updates are not supported by this session agent".to_string(),
))
}
fn update_system_prompt(
&mut self,
_system_prompt: String,
) -> Result<(), meerkat_core::error::AgentError> {
Err(meerkat_core::error::AgentError::ConfigError(
"system_prompt override is not supported by this session agent".to_string(),
))
}
fn apply_runtime_system_context(&mut self, appends: &[PendingSystemContextAppend]);
fn append_external_user_content(
&mut self,
_content: ContentInput,
) -> Result<(), meerkat_core::error::AgentError> {
Err(meerkat_core::error::AgentError::ConfigError(
"external user content append is not supported by this session agent".to_string(),
))
}
fn append_external_assistant_output(
&mut self,
_blocks: Vec<meerkat_core::types::AssistantBlock>,
_stop_reason: meerkat_core::types::StopReason,
_usage: Usage,
) -> Result<(), meerkat_core::error::AgentError> {
Err(meerkat_core::error::AgentError::ConfigError(
"external assistant output append is not supported by this session agent".to_string(),
))
}
fn append_realtime_transcript_event(
&mut self,
_event: RealtimeTranscriptEvent,
) -> Result<RealtimeTranscriptApplyOutcome, meerkat_core::error::AgentError> {
Err(meerkat_core::error::AgentError::ConfigError(
"realtime transcript append is not supported by this session agent".to_string(),
))
}
fn system_context_state(&self) -> meerkat_core::SystemContextStateHandle;
fn sync_system_context_state(&mut self) -> Result<(), SystemContextStateError> {
Ok(())
}
fn discard_unapplied_active_turn_system_context(
&mut self,
) -> Result<usize, SystemContextStateError> {
let discarded_count = {
let state = self.system_context_state();
state.discard_unapplied_active_turn_pending()
};
if discarded_count > 0 {
self.sync_system_context_state()?;
}
Ok(discarded_count)
}
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
fn sync_session_from_durable_snapshot(
&mut self,
_session: meerkat_core::Session,
) -> Result<(), meerkat_core::error::AgentError> {
Err(meerkat_core::error::AgentError::DurableSnapshotSyncUnsupported)
}
fn event_injector(&self) -> Option<Arc<dyn meerkat_core::EventInjector>> {
None
}
#[doc(hidden)]
fn interaction_event_injector(
&self,
) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
None
}
fn comms_runtime(&self) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
None
}
}
#[cfg(test)]
fn validate_prompt_video_input_against_capability(
prompt: &ContentInput,
identity: &SessionLlmIdentity,
supports_inline_video: bool,
) -> Result<(), SessionError> {
let blocks = match prompt {
ContentInput::Text(_) => return Ok(()),
ContentInput::Blocks(blocks) => blocks,
};
meerkat_core::validate_inline_video_blocks(blocks)
.map_err(|err| SessionError::Agent(AgentError::ConfigError(err)))?;
if meerkat_core::has_video(blocks) && !supports_inline_video {
return Err(SessionError::Agent(AgentError::ConfigError(format!(
"inline video input is not supported by model '{}' on provider '{}'",
identity.model,
identity.provider.as_str()
))));
}
Ok(())
}
fn wake_interrupt_notify(notify: &tokio::sync::Notify) {
notify.notify_waiters();
notify.notify_one();
}
pub struct EphemeralSessionService<B: SessionAgentBuilder> {
sessions: RwLock<IndexMap<SessionId, SessionHandle>>,
archived_views: RwLock<IndexMap<SessionId, SessionView>>,
builder: B,
staged_registry: Arc<StagedSessionRegistry>,
session_registered: tokio::sync::Notify,
}
impl<B: SessionAgentBuilder + 'static> EphemeralSessionService<B> {
fn build_runtime_receipt(
run_id: RunId,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
session: &meerkat_core::Session,
) -> Result<RunBoundaryReceiptDraft, SessionError> {
let encoded_messages = serde_json::to_vec(session.messages()).map_err(|err| {
SessionError::Agent(AgentError::InternalError(format!(
"failed to serialize session for runtime receipt digest: {err}"
)))
})?;
let digest = format!("{:x}", Sha256::digest(encoded_messages));
Ok(RunBoundaryReceiptDraft {
run_id,
boundary,
contributing_input_ids,
conversation_digest: Some(digest),
message_count: session.messages().len(),
})
}
fn callback_pending_terminal(error: &SessionError) -> Option<CoreApplyTerminal> {
match error {
SessionError::Agent(AgentError::CallbackPending { tool_name, args }) => {
Some(CoreApplyTerminal::CallbackPending {
tool_name: tool_name.clone(),
args: args.clone(),
})
}
_ => None,
}
}
pub(crate) fn active_turn_boundary_staging_token_from_snapshot(
snapshot: &TurnStateSnapshot,
) -> Option<ActiveTurnBoundaryStagingToken> {
if matches!(
snapshot.turn_phase,
TurnPhase::ApplyingPrimitive | TurnPhase::WaitingForOps
) {
return snapshot.active_run_id.clone().map(|active_run_id| {
ActiveTurnBoundaryStagingToken {
active_run_id,
boundary_count: snapshot.boundary_count,
}
});
}
None
}
pub(crate) fn active_turn_boundary_staging_token(
turn_state_handle: &dyn TurnStateHandle,
) -> Option<ActiveTurnBoundaryStagingToken> {
let snapshot = turn_state_handle.snapshot();
Self::active_turn_boundary_staging_token_from_snapshot(&snapshot)
}
pub(crate) async fn active_turn_state_handle(
&self,
id: &SessionId,
) -> Result<Option<Arc<dyn TurnStateHandle>>, SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
Ok(handle.turn_state_handle.clone())
}
async fn build_runtime_output(
&self,
id: &SessionId,
run_id: RunId,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
terminal: Option<CoreApplyTerminal>,
) -> Result<CoreApplyOutput, SessionError> {
let session = self.export_session(id).await?;
let session_snapshot = serde_json::to_vec(&session).map_err(|err| {
SessionError::Agent(AgentError::InternalError(format!(
"failed to serialize session snapshot for runtime commit: {err}"
)))
})?;
let receipt =
Self::build_runtime_receipt(run_id, boundary, contributing_input_ids, &session)?;
Ok(match terminal {
Some(CoreApplyTerminal::RunResult(run_result)) => {
CoreApplyOutput::with_run_result(receipt, Some(session_snapshot), *run_result)
}
Some(CoreApplyTerminal::CallbackPending { tool_name, args }) => {
CoreApplyOutput::with_callback_pending(
receipt,
Some(session_snapshot),
tool_name,
args,
)
}
Some(CoreApplyTerminal::NoPendingBoundary) => CoreApplyOutput {
receipt,
session_snapshot: Some(session_snapshot),
terminal: Some(CoreApplyTerminal::NoPendingBoundary),
},
None => CoreApplyOutput::without_terminal(receipt, Some(session_snapshot)),
})
}
async fn require_inline_video_support(
&self,
identity: &SessionLlmIdentity,
) -> Result<(), meerkat_core::UnsupportedModelCapabilityEvidence> {
match self.builder.model_supports_inline_video(identity).await {
Some(true) => Ok(()),
Some(false) => Err(
meerkat_core::UnsupportedModelCapabilityEvidence::inline_video(
identity.provider,
identity.model.clone(),
meerkat_core::UnsupportedModelCapabilityReason::CapabilityDisabled,
),
),
None => Err(
meerkat_core::UnsupportedModelCapabilityEvidence::inline_video(
identity.provider,
identity.model.clone(),
meerkat_core::UnsupportedModelCapabilityReason::ProviderModelProfileMissing,
),
),
}
}
fn missing_durable_llm_identity_error(context: &str) -> SessionError {
SessionError::Agent(AgentError::ConfigError(format!(
"{context} requires durable LLM identity from the session agent"
)))
}
async fn validate_prompt_video_input(
&self,
prompt: &ContentInput,
identity: &SessionLlmIdentity,
) -> Result<(), SessionError> {
let blocks = match prompt {
ContentInput::Text(_) => return Ok(()),
ContentInput::Blocks(blocks) => blocks,
};
meerkat_core::validate_inline_video_blocks(blocks)
.map_err(|err| SessionError::Agent(AgentError::ConfigError(err)))?;
if meerkat_core::has_video(blocks)
&& let Err(evidence) = self.require_inline_video_support(identity).await
{
return Err(SessionError::Agent(AgentError::ConfigError(
evidence.to_string(),
)));
}
Ok(())
}
fn validate_tool_result_video(results: &[ToolResult]) -> Result<(), SessionError> {
if results.iter().any(ToolResult::has_video) {
return Err(SessionError::Agent(AgentError::ConfigError(
"video blocks are not supported in tool results".to_string(),
)));
}
Ok(())
}
pub fn new(builder: B, max_sessions: usize) -> Self {
Self {
sessions: RwLock::new(IndexMap::new()),
archived_views: RwLock::new(IndexMap::new()),
builder,
staged_registry: Arc::new(StagedSessionRegistry::bounded(max_sessions)),
session_registered: tokio::sync::Notify::new(),
}
}
fn try_acquire_active_permit(&self) -> Result<Option<OwnedSemaphorePermit>, SessionError> {
self.staged_registry.reserve_capacity()
}
fn acquire_runtime_context_admission_for_handle(
&self,
id: &SessionId,
handle: &SessionHandle,
) -> Result<RuntimeContextAdmissionGuard, SessionError> {
if let Some(promotion) = self.staged_registry.begin_promotion(id) {
let pending_first_turn = {
let state = lock_deferred_turn_state(&handle.deferred_turn_state);
matches!(state.first_turn_phase(), DeferredFirstTurnPhase::Pending)
};
let ticket = if pending_first_turn {
Some(PromotionTicket::new(
Arc::clone(&self.staged_registry),
id.clone(),
))
} else {
self.staged_registry.complete_promotion(id);
None
};
return Ok(acquire_active_capacity_lease(
Arc::clone(&handle.active_capacity_lease),
promotion.permit,
ticket,
));
}
if let Some(admission) =
try_join_active_capacity_lease(Arc::clone(&handle.active_capacity_lease))
{
return Ok(admission);
}
match self.staged_registry.reserve(id) {
Ok(outcome) => Ok(acquire_active_capacity_lease(
Arc::clone(&handle.active_capacity_lease),
outcome.permit,
None,
)),
Err(err) => {
if let Some(admission) =
try_join_active_capacity_lease(Arc::clone(&handle.active_capacity_lease))
{
Ok(admission)
} else {
Err(err)
}
}
}
}
pub fn ensure_active_capacity_available(&self) -> Result<(), SessionError> {
self.staged_registry.ensure_capacity_available()
}
#[cfg(test)]
fn materialization_status(&self, id: &SessionId) -> Option<MaterializationStatus> {
self.staged_registry.status(id)
}
fn archived_view_from_handle(id: &SessionId, handle: &SessionHandle) -> SessionView {
let cache = handle.summary_rx.borrow();
let llm_identity = handle.llm_identity_rx.borrow().clone();
SessionView {
state: SessionInfo {
session_id: id.clone(),
created_at: handle.created_at,
updated_at: cache.updated_at,
message_count: cache.message_count,
is_active: false,
model: llm_identity.model,
provider: llm_identity.provider,
last_assistant_text: cache.last_assistant_text.clone(),
labels: handle.labels.clone(),
},
billing: SessionUsage {
total_tokens: cache.total_tokens,
usage: cache.usage.clone(),
},
}
}
pub async fn export_session(
&self,
id: &SessionId,
) -> Result<meerkat_core::Session, SessionError> {
let (command_tx, deferred_turn_state, system_context_state) = {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
(
handle.command_tx.clone(),
Arc::clone(&handle.deferred_turn_state),
handle.system_context_state.clone(),
)
};
let (reply_tx, reply_rx) = oneshot::channel();
command_tx
.send(SessionCommand::ExportSession { reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
let mut session = reply_rx
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})?
.map_err(|e: SystemContextStateError| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
e.to_string(),
))
})?;
let state = lock_deferred_turn_state(&deferred_turn_state).clone();
session.set_deferred_turn_state(state).map_err(|err| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(format!(
"failed to serialize deferred-turn state: {err}"
)))
})?;
let system_context = system_context_state.snapshot();
session
.set_system_context_state(system_context)
.map_err(|err| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(format!(
"failed to serialize system-context state: {err}"
)))
})?;
Ok(session)
}
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
pub(crate) async fn set_session_tool_visibility_state(
&self,
id: &SessionId,
state: Option<meerkat_core::SessionToolVisibilityState>,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::SetToolVisibilityState {
state: state.map(Box::new),
reply_tx,
})
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})?
.map_err(SessionError::Agent)
}
pub async fn execution_snapshot(
&self,
id: &SessionId,
) -> Result<Option<meerkat_core::AgentExecutionSnapshot>, SessionError> {
let command_tx = {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
handle.command_tx.clone()
};
let (reply_tx, reply_rx) = oneshot::channel();
command_tx
.send(SessionCommand::ExecutionSnapshot { reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})?
.map_err(|e: SnapshotProjectionError| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
e.to_string(),
))
})
}
pub async fn tool_scope_snapshot(
&self,
id: &SessionId,
) -> Result<Option<meerkat_core::ToolScopeSnapshot>, SessionError> {
let command_tx = {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
handle.command_tx.clone()
};
let (reply_tx, reply_rx) = oneshot::channel();
command_tx
.send(SessionCommand::ToolScopeSnapshot { reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})
}
pub async fn live_visible_tool_defs(
&self,
id: &SessionId,
) -> Result<Vec<meerkat_core::ToolDef>, SessionError> {
let command_tx = {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
handle.command_tx.clone()
};
let (reply_tx, reply_rx) = oneshot::channel();
command_tx
.send(SessionCommand::VisibleToolDefs { reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})
}
pub async fn external_tool_surface_snapshot(
&self,
id: &SessionId,
) -> Result<Option<meerkat_core::ExternalToolSurfaceSnapshot>, SessionError> {
let command_tx = {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
handle.command_tx.clone()
};
let (reply_tx, reply_rx) = oneshot::channel();
command_tx
.send(SessionCommand::ExternalToolSurfaceSnapshot { reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})
}
pub async fn dispatch_external_tool_call(
&self,
id: &SessionId,
call: meerkat_core::ToolCall,
) -> Result<meerkat_core::ops::ToolDispatchOutcome, SessionError> {
self.dispatch_external_tool_call_with_timeout_policy(
id,
call,
meerkat_core::ToolDispatchTimeoutPolicy::Disabled,
)
.await
}
pub async fn dispatch_external_tool_call_with_timeout_policy(
&self,
id: &SessionId,
call: meerkat_core::ToolCall,
timeout_policy: meerkat_core::ToolDispatchTimeoutPolicy,
) -> Result<meerkat_core::ops::ToolDispatchOutcome, SessionError> {
let command_tx = {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
handle.command_tx.clone()
};
let (reply_tx, reply_rx) = oneshot::channel();
command_tx
.send(SessionCommand::DispatchExternalToolCall {
call,
timeout_policy,
reply_tx,
})
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})?
.map_err(SessionError::Agent)
}
pub async fn deferred_turn_state(
&self,
session_id: &SessionId,
) -> Option<Arc<std::sync::Mutex<SessionDeferredTurnState>>> {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.map(|h| Arc::clone(&h.deferred_turn_state))
}
pub async fn discard_live_session(&self, id: &SessionId) -> Result<(), SessionError> {
let mut sessions = self.sessions.write().await;
let handle = sessions
.swap_remove(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
drop(sessions);
self.staged_registry.forget(id);
let projection = {
let mut slot = lock_turn_admission(&handle.turn_admission);
slot.request_shutdown().ok().map(|_| slot.projection())
};
if let Some(projection) = projection {
handle.state_tx.send_replace(projection);
}
let _ = handle.command_tx.send(SessionCommand::Shutdown).await;
Ok(())
}
pub async fn apply_runtime_system_context(
&self,
id: &SessionId,
appends: Vec<PendingSystemContextAppend>,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::ApplyRuntimeSystemContext { appends, reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})
}
pub async fn apply_runtime_system_context_for_turn(
&self,
id: &SessionId,
appends: Vec<PendingSystemContextAppend>,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::ApplyRuntimeSystemContextForTurn { appends, reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})
}
pub async fn stage_runtime_system_context_for_active_turn(
&self,
id: &SessionId,
expected_run_id: &RunId,
appends: Vec<PendingSystemContextAppend>,
) -> Result<(), SessionError> {
if appends.is_empty() {
return Ok(());
}
let turn_state_handle = self.active_turn_state_handle(id).await?;
let turn_state_handle =
turn_state_handle.ok_or_else(|| SessionError::NotRunning { id: id.clone() })?;
let initial_token = Self::active_turn_boundary_staging_token(turn_state_handle.as_ref())
.ok_or_else(|| SessionError::NotRunning { id: id.clone() })?;
if initial_token.active_run_id != *expected_run_id {
return Err(SessionError::NotRunning { id: id.clone() });
}
let state = self
.system_context_state(id)
.await
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let locked_token = Self::active_turn_boundary_staging_token(turn_state_handle.as_ref())
.ok_or_else(|| SessionError::NotRunning { id: id.clone() })?;
if locked_token != initial_token {
return Err(SessionError::NotRunning { id: id.clone() });
}
let staged_appends = appends
.into_iter()
.map(|append| {
(
AppendSystemContextRequest {
content: append.content,
source: append.source,
idempotency_key: append.idempotency_key,
source_kind: append.source_kind,
peer_response_terminal: None,
},
append.accepted_at,
)
})
.collect();
let (snapshot_state, staged_state) = state
.stage_active_turn_appends_with_snapshot(staged_appends)
.map_err(|err| crate::control_error_into_session_error(err.into_control_error(id)))?;
let staged_token = Self::active_turn_boundary_staging_token(turn_state_handle.as_ref())
.ok_or_else(|| SessionError::NotRunning { id: id.clone() })?;
if staged_token != initial_token {
let _ = state.replace_from_generated_restore_if_current(&staged_state, snapshot_state);
return Err(SessionError::NotRunning { id: id.clone() });
}
tracing::debug!(
session_id = %id,
pending_count = staged_state.pending().len(),
active_turn_pending_count = staged_state.active_turn_pending_len(),
"staged active-turn runtime system context"
);
Ok(())
}
pub async fn discard_runtime_system_context_for_active_turn(
&self,
id: &SessionId,
expected_run_id: &RunId,
idempotency_keys: Vec<String>,
) -> Result<usize, SessionError> {
if idempotency_keys.is_empty() {
return Ok(0);
}
let state = self
.system_context_state(id)
.await
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let discarded = state.discard_active_turn_pending_by_keys(&idempotency_keys);
if !discarded.is_empty() {
tracing::debug!(
session_id = %id,
expected_run_id = %expected_run_id,
discarded_count = discarded.len(),
"rolled back staged active-turn runtime system context"
);
}
Ok(discarded.len())
}
pub async fn active_turn_system_context_boundary_available(
&self,
id: &SessionId,
) -> Result<Option<bool>, SessionError> {
let turn_state_handle = self.active_turn_state_handle(id).await?;
let Some(turn_state_handle) = turn_state_handle else {
return Ok(None);
};
let snapshot = turn_state_handle.snapshot();
let available = Self::active_turn_boundary_staging_token_from_snapshot(&snapshot).is_some();
tracing::debug!(
session_id = %id,
active_run_id = ?snapshot.active_run_id,
turn_phase = %snapshot.turn_phase,
available,
"observed runtime turn phase for live system-context boundary"
);
Ok(Some(available))
}
pub async fn publish_runtime_system_context_events(
&self,
id: &SessionId,
appends: Vec<PendingSystemContextAppend>,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::PublishRuntimeSystemContextEvents { appends, reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})
}
pub async fn record_live_terminal_error(
&self,
id: &SessionId,
cause: meerkat_core::live_adapter::LiveAdapterErrorCode,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::RecordLiveTerminalError { cause, reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})
}
pub async fn record_live_output_audio_degraded(
&self,
id: &SessionId,
dropped: u64,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::RecordLiveOutputAudioDegraded { dropped, reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped reply".to_string(),
))
})
}
pub async fn append_external_user_content(
&self,
id: &SessionId,
content: ContentInput,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::AppendExternalUserContent { content, reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})?
.map_err(SessionError::Agent)
}
pub async fn append_external_assistant_output(
&self,
id: &SessionId,
blocks: Vec<meerkat_core::types::AssistantBlock>,
stop_reason: meerkat_core::types::StopReason,
usage: Usage,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::AppendExternalAssistantOutput {
blocks,
stop_reason,
usage,
reply_tx,
})
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})?
.map_err(SessionError::Agent)
}
pub async fn append_realtime_transcript_event(
&self,
id: &SessionId,
event: RealtimeTranscriptEvent,
) -> Result<RealtimeTranscriptApplyOutcome, SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::AppendRealtimeTranscriptEvent { event, reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})?
.map_err(SessionError::Agent)
}
pub(crate) async fn sync_system_context_state(
&self,
id: &SessionId,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::SyncSystemContextState { reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped reply channel".to_string(),
))
})?
.map_err(|e: SystemContextStateError| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
e.to_string(),
))
})
}
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
pub(crate) async fn sync_session_from_durable_snapshot(
&self,
id: &SessionId,
session: meerkat_core::Session,
) -> Result<(), SessionError> {
if session.id() != id {
return Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(format!(
"durable snapshot session id {} does not match live session {id}",
session.id()
)),
));
}
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::SyncSessionFromDurableSnapshot {
session: Box::new(session),
reply_tx,
})
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped reply channel".to_string(),
))
})?
.map_err(SessionError::Agent)
}
pub async fn apply_runtime_turn(
&self,
id: &SessionId,
run_id: RunId,
req: StartTurnRequest,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
) -> Result<CoreApplyOutput, SessionError> {
Self::require_runtime_execution_kind_stamp(&req)?;
match self.start_turn(id, req).await {
Ok(run_result) => {
self.build_runtime_output(
id,
run_id,
boundary,
contributing_input_ids,
Some(CoreApplyTerminal::RunResult(Box::new(run_result))),
)
.await
}
Err(SessionError::Agent(meerkat_core::error::AgentError::NoPendingBoundary)) => {
let terminal = self.resolve_no_pending_boundary_terminal(id).await?;
self.build_runtime_output(
id,
run_id,
boundary,
contributing_input_ids,
Some(terminal),
)
.await
}
Err(error) => {
if let Some(terminal) = Self::callback_pending_terminal(&error) {
self.build_runtime_output(
id,
run_id,
boundary,
contributing_input_ids,
Some(terminal),
)
.await
} else {
Err(error)
}
}
}
}
pub(crate) async fn resolve_no_pending_boundary_terminal(
&self,
id: &SessionId,
) -> Result<CoreApplyTerminal, SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let terminal = {
let mut slot = lock_turn_admission(&handle.turn_admission);
slot.resolve_last_start_turn_public_terminal()
}
.map_err(|error| {
SessionError::Agent(AgentError::InternalError(format!(
"generated turn authority did not confirm NoPendingBoundary terminal: {error}"
)))
})?;
match terminal {
StartTurnPublicTerminal::NoPendingBoundary => Ok(CoreApplyTerminal::NoPendingBoundary),
}
}
fn require_runtime_execution_kind_stamp(req: &StartTurnRequest) -> Result<(), SessionError> {
if req
.runtime
.turn_metadata
.as_ref()
.and_then(|metadata| metadata.execution_kind)
.is_some()
{
return Ok(());
}
Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(
"runtime_execution_kind not set: runtime-backed turn did not stamp RuntimeTurnMetadata.execution_kind"
.to_string(),
),
))
}
pub async fn apply_runtime_context_appends(
&self,
id: &SessionId,
run_id: RunId,
appends: Vec<PendingSystemContextAppend>,
contributing_input_ids: Vec<InputId>,
) -> Result<CoreApplyOutput, SessionError> {
self.apply_runtime_context_appends_with_boundary(
id,
run_id,
appends,
RunApplyBoundary::Immediate,
contributing_input_ids,
)
.await
}
pub async fn apply_runtime_context_appends_with_boundary(
&self,
id: &SessionId,
run_id: RunId,
appends: Vec<PendingSystemContextAppend>,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
) -> Result<CoreApplyOutput, SessionError> {
self.apply_runtime_context_appends_with_admission(
id,
run_id,
appends,
boundary,
contributing_input_ids,
None,
)
.await
}
pub(crate) async fn apply_runtime_context_appends_with_admission(
&self,
id: &SessionId,
run_id: RunId,
appends: Vec<PendingSystemContextAppend>,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
admission: Option<RuntimeContextAdmissionGuard>,
) -> Result<CoreApplyOutput, SessionError> {
self.apply_runtime_context_appends_with_admission_recovering_not_found(
id,
run_id,
appends,
boundary,
contributing_input_ids,
admission,
)
.await
.map_err(|(error, _admission)| error)
}
pub(crate) async fn apply_runtime_context_appends_with_admission_recovering_not_found(
&self,
id: &SessionId,
run_id: RunId,
appends: Vec<PendingSystemContextAppend>,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
admission: Option<RuntimeContextAdmissionGuard>,
) -> Result<CoreApplyOutput, (SessionError, Option<RuntimeContextAdmissionGuard>)> {
let preserve_reserved_admission = admission.is_some();
let active_guard = match admission {
Some(admission) => admission,
None => self
.acquire_runtime_context_admission(id)
.await
.map_err(|error| (error, None))?,
};
if let Err(error) = self.apply_runtime_system_context(id, appends).await {
let admission =
if preserve_reserved_admission && matches!(error, SessionError::NotFound { .. }) {
Some(active_guard)
} else {
None
};
return Err((error, admission));
}
self.build_runtime_output(id, run_id, boundary, contributing_input_ids, None)
.await
.map_err(|error| (error, None))
}
pub async fn acquire_runtime_context_admission(
&self,
id: &SessionId,
) -> Result<RuntimeContextAdmissionGuard, SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
self.acquire_runtime_context_admission_for_handle(id, handle)
}
pub async fn join_active_runtime_context_admission(
&self,
id: &SessionId,
) -> Result<Option<RuntimeContextAdmissionGuard>, SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
Ok(try_join_active_capacity_lease(Arc::clone(
&handle.active_capacity_lease,
)))
}
#[cfg(feature = "session-store")]
pub(crate) async fn acquire_runtime_capacity_admission(
&self,
) -> Result<RuntimeContextAdmissionGuard, SessionError> {
let active_permit = self.try_acquire_active_permit()?;
Ok(RuntimeContextAdmissionGuard {
active_capacity_lease: None,
active_permit,
})
}
#[cfg(feature = "session-store")]
pub(crate) async fn start_turn_with_runtime_context_admission(
&self,
id: &SessionId,
req: StartTurnRequest,
admission: RuntimeContextAdmissionGuard,
) -> Result<RunResult, SessionError> {
self.start_turn_with_admission_recovering_not_found(id, req, Some(admission))
.await
.map_err(|(error, _admission)| error)
}
#[cfg(feature = "session-store")]
pub(crate) async fn start_turn_with_runtime_context_admission_recovering_not_found(
&self,
id: &SessionId,
req: StartTurnRequest,
admission: RuntimeContextAdmissionGuard,
) -> Result<RunResult, (SessionError, Option<RuntimeContextAdmissionGuard>)> {
self.start_turn_with_admission_recovering_not_found(id, req, Some(admission))
.await
}
async fn start_turn_with_admission(
&self,
id: &SessionId,
req: StartTurnRequest,
reserved_admission: Option<RuntimeContextAdmissionGuard>,
) -> Result<RunResult, SessionError> {
self.start_turn_with_admission_recovering_not_found(id, req, reserved_admission)
.await
.map_err(|(error, _admission)| error)
}
async fn start_turn_with_admission_recovering_not_found(
&self,
id: &SessionId,
req: StartTurnRequest,
mut reserved_admission: Option<RuntimeContextAdmissionGuard>,
) -> Result<RunResult, (SessionError, Option<RuntimeContextAdmissionGuard>)> {
let (result_tx, result_rx) = oneshot::channel();
let prompt: meerkat_core::types::ContentInput = req.prompt.clone();
{
let sessions = self.sessions.read().await;
let handle = match sessions.get(id) {
Some(handle) => handle,
None => {
return Err((
SessionError::NotFound { id: id.clone() },
reserved_admission.take(),
));
}
};
let identity = handle.llm_identity_rx.borrow().clone();
self.validate_prompt_video_input(&prompt, &identity)
.await
.map_err(|error| (error, None))?;
Self::request_start_turn(id, handle).map_err(|error| (error, None))?;
if let Some(system_prompt) = req.system_prompt {
let allows_override = {
let guard = lock_deferred_turn_state(&handle.deferred_turn_state);
guard.allows_initial_turn_overrides()
};
if !allows_override {
Self::try_abort_admitted_turn(handle);
return Err((
SessionError::Unsupported(
"system_prompt override is only allowed on a deferred session's first turn"
.to_string(),
),
None,
));
}
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::UpdateSystemPrompt {
system_prompt,
reply_tx,
})
.await
.map_err(|_| {
Self::try_abort_admitted_turn(handle);
(
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
)),
None,
)
})?;
let update_result = reply_rx.await.map_err(|_| {
Self::try_abort_admitted_turn(handle);
(
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped reply channel".to_string(),
)),
None,
)
})?;
update_result.map_err(|error| {
Self::try_abort_admitted_turn(handle);
(SessionError::Agent(error), None)
})?;
}
let active_admission = if let Some(admission) = reserved_admission.take() {
admission
} else {
match self.acquire_runtime_context_admission_for_handle(id, handle) {
Ok(admission) => admission,
Err(err) => {
Self::try_abort_admitted_turn(handle);
return Err((err, None));
}
}
};
let command = SessionCommand::StartTurn {
prompt,
runtime: Box::new(req.runtime),
event_tx: req.event_tx,
result_tx,
active_admission: Some(active_admission),
};
if handle.command_tx.send(command).await.is_err() {
Self::try_abort_admitted_turn(handle);
return Err((
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
)),
None,
));
}
}
let result = result_rx.await.map_err(|_| {
(
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the result channel".to_string(),
)),
None,
)
})?;
result.map_err(|error| (SessionError::Agent(error), None))
}
pub async fn event_injector(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::EventInjector>> {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.and_then(|h| h.event_injector.clone())
}
#[doc(hidden)]
pub async fn interaction_event_injector(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.and_then(|h| h.interaction_event_injector.clone())
}
pub async fn system_context_state(
&self,
session_id: &SessionId,
) -> Option<meerkat_core::SystemContextStateHandle> {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.map(|h| h.system_context_state.clone())
}
pub async fn live_session_llm_identity(
&self,
session_id: &SessionId,
) -> Result<SessionLlmIdentity, SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(session_id)
.ok_or_else(|| SessionError::NotFound {
id: session_id.clone(),
})?;
Ok(handle.llm_identity_rx.borrow().clone())
}
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
pub(crate) async fn update_session_keep_alive(
&self,
id: &SessionId,
keep_alive: bool,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::UpdateKeepAlive {
keep_alive,
reply_tx,
})
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped reply channel".to_string(),
))
})
}
pub async fn comms_runtime(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.and_then(|h| h.comms_runtime.clone())
}
pub async fn wait_session_registered(&self) {
self.session_registered.notified().await;
}
pub async fn shutdown(&self) {
let mut sessions = self.sessions.write().await;
for (id, handle) in sessions.drain(..) {
self.staged_registry.forget(&id);
let projection = {
let mut slot = lock_turn_admission(&handle.turn_admission);
slot.request_shutdown().ok().map(|_| slot.projection())
};
if let Some(projection) = projection {
handle.state_tx.send_replace(projection);
}
let _ = handle.command_tx.send(SessionCommand::Shutdown).await;
}
}
pub async fn subscribe_session_events(
&self,
id: &SessionId,
) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| meerkat_core::comms::StreamError::NotFound(format!("session {id}")))?;
let rx = handle.session_event_tx.subscribe();
Ok(Box::pin(futures::stream::unfold(rx, |mut rx| async move {
loop {
match rx.recv().await {
Ok(event) => return Some((event, rx)),
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => return None,
}
}
})))
}
pub async fn wait_for_session_mutation_after(
&self,
id: &SessionId,
after: SystemTime,
) -> Result<SystemTime, meerkat_core::comms::StreamError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| meerkat_core::comms::StreamError::NotFound(format!("session {id}")))?;
let mut rx = handle.summary_rx.clone();
drop(sessions);
loop {
let current = rx.borrow().updated_at;
if current > after {
return Ok(current);
}
rx.changed()
.await
.map_err(|_| meerkat_core::comms::StreamError::Closed)?;
}
}
pub async fn subscribe_session_events_raw(
&self,
id: &SessionId,
) -> Result<
tokio::sync::broadcast::Receiver<EventEnvelope<AgentEvent>>,
meerkat_core::comms::StreamError,
> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| meerkat_core::comms::StreamError::NotFound(format!("session {id}")))?;
Ok(handle.session_event_tx.subscribe())
}
fn is_session_state_active(state: SessionState) -> bool {
state.is_active
}
fn request_start_turn(id: &SessionId, handle: &SessionHandle) -> Result<(), SessionError> {
let projection = {
let mut slot = lock_turn_admission(&handle.turn_admission);
slot.claim()
.map_err(|_| SessionError::Busy { id: id.clone() })?;
slot.projection()
};
handle.state_tx.send_replace(projection);
Ok(())
}
fn try_abort_admitted_turn(handle: &SessionHandle) {
let projection = {
let mut slot = lock_turn_admission(&handle.turn_admission);
let phase = slot.abort_claim().ok();
phase.map(|_| slot.projection())
};
if let Some(projection) = projection {
handle.state_tx.send_replace(projection);
}
}
}
impl<B: SessionAgentBuilder + 'static> EphemeralSessionService<B> {
pub(crate) async fn create_session_with_admission(
&self,
req: CreateSessionRequest,
reserved_create_admission: Option<RuntimeContextAdmissionGuard>,
) -> Result<RunResult, SessionError> {
let prompt = req.prompt.clone();
let caller_event_tx = req.event_tx.clone();
let defer_initial_turn =
req.initial_turn == meerkat_core::service::InitialTurnPolicy::Defer;
let labels = req.labels.clone().unwrap_or_default();
let resumed_session = req
.build
.as_ref()
.and_then(|build| build.resume_session.as_ref());
let resumed_deferred_turn_state = resumed_session
.map(meerkat_core::Session::try_deferred_turn_state)
.transpose()
.map_err(|err| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(format!(
"generated deferred-turn authority rejected session creation restore: {err}"
)))
})?
.flatten();
let mut deferred_turn_state = resumed_deferred_turn_state.clone().unwrap_or_default();
let resumed_session_is_deferred_template = resumed_session.is_some_and(|session| {
session.messages().is_empty() && resumed_deferred_turn_state.is_none()
});
if let Some(blob_store) = req
.build
.as_ref()
.and_then(|build| build.blob_store_override.clone())
{
hydrate_deferred_turn_state(
blob_store.as_ref(),
&mut deferred_turn_state,
MissingBlobBehavior::Error,
)
.await
.map_err(|err| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(format!(
"failed to hydrate deferred-turn state during session creation: {err}"
)))
})?;
}
if defer_initial_turn && (resumed_session.is_none() || resumed_session_is_deferred_template)
{
deferred_turn_state.mark_initial_turn_pending();
}
if defer_initial_turn && req.deferred_prompt_policy == DeferredPromptPolicy::Stage {
deferred_turn_state.stage_initial_prompt(prompt.clone(), SystemTime::now());
}
let deferred_turn_state = Arc::new(std::sync::Mutex::new(deferred_turn_state));
let create_capacity_permit = match reserved_create_admission {
Some(admission) => admission.into_create_session_permit(),
None => self.try_acquire_active_permit()?,
};
let (agent_event_tx, agent_event_rx) = mpsc::channel::<AgentEvent>(EVENT_CHANNEL_CAPACITY);
let agent = self
.builder
.build_agent(&req, agent_event_tx.clone())
.await?;
let llm_identity = agent
.durable_llm_identity()
.ok_or_else(|| Self::missing_durable_llm_identity_error("session creation"))?;
self.validate_prompt_video_input(&prompt, &llm_identity)
.await?;
let session_id = agent.session_id();
let created_at = SystemTime::now();
let turn_admission_slot = TurnAdmissionSlot::new();
let initial_session_state = turn_admission_slot.projection();
let turn_admission = Arc::new(std::sync::Mutex::new(turn_admission_slot));
let active_capacity_lease =
Arc::new(std::sync::Mutex::new(SessionActiveCapacityLease::default()));
let (eager_active_admission, staged_create_permit) = if defer_initial_turn {
(None, Some(create_capacity_permit))
} else {
(
Some(acquire_active_capacity_lease(
Arc::clone(&active_capacity_lease),
create_capacity_permit,
None,
)),
None,
)
};
let event_injector = agent.event_injector();
let interaction_event_injector = agent.interaction_event_injector();
let comms_runtime = agent.comms_runtime();
let cancel_after_boundary_handle = agent.cancel_after_boundary_handle();
let turn_state_handle = agent.turn_state_handle();
let system_context_state = agent.system_context_state();
let session_context = agent.session_context_handle();
let (command_tx, command_rx) = mpsc::channel::<SessionCommand>(COMMAND_CHANNEL_CAPACITY);
let (state_tx, state_rx) = watch::channel(initial_session_state);
let state_tx_handle = state_tx.clone();
let (summary_tx, summary_rx) = watch::channel(SessionSummaryCache {
updated_at: created_at,
message_count: 0,
total_tokens: 0,
usage: Usage::default(),
last_assistant_text: None,
});
let (llm_identity_tx, llm_identity_rx) = watch::channel(llm_identity);
let (session_event_tx, session_event_rx) =
tokio::sync::broadcast::channel::<EventEnvelope<AgentEvent>>(EVENT_CHANNEL_CAPACITY);
drop(session_event_rx);
let interrupt_notify = Arc::new(tokio::sync::Notify::new());
#[cfg(not(target_arch = "wasm32"))]
tokio::spawn(session_task(
agent,
agent_event_tx,
agent_event_rx,
command_rx,
Arc::clone(&deferred_turn_state),
SessionTaskControl {
state_tx,
summary_tx,
llm_identity_tx,
turn_admission: Arc::clone(&turn_admission),
interrupt_notify: interrupt_notify.clone(),
session_event_tx: session_event_tx.clone(),
session_context: session_context.clone(),
},
));
#[cfg(target_arch = "wasm32")]
tokio_with_wasm::alias::task::spawn(session_task(
agent,
agent_event_tx,
agent_event_rx,
command_rx,
Arc::clone(&deferred_turn_state),
SessionTaskControl {
state_tx,
summary_tx,
llm_identity_tx,
turn_admission: Arc::clone(&turn_admission),
interrupt_notify: interrupt_notify.clone(),
session_event_tx: session_event_tx.clone(),
session_context: session_context.clone(),
},
));
let handle = SessionHandle {
command_tx: command_tx.clone(),
state_tx: state_tx_handle,
state_rx,
summary_rx,
llm_identity_rx,
turn_admission: Arc::clone(&turn_admission),
created_at,
labels,
event_injector,
interaction_event_injector,
comms_runtime,
system_context_state,
turn_state_handle,
deferred_turn_state,
active_capacity_lease,
interrupt_notify,
cancel_after_boundary_handle,
session_event_tx,
};
let inserted = {
let mut sessions = self.sessions.write().await;
if sessions.contains_key(&session_id) {
false
} else {
sessions.insert(session_id.clone(), handle);
match staged_create_permit {
Some(permit) => self.staged_registry.record_staged(&session_id, permit),
None => self.staged_registry.record_active(&session_id),
}
self.session_registered.notify_waiters();
true
}
};
if !inserted {
let _ = command_tx.send(SessionCommand::Shutdown).await;
return Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(format!(
"Duplicate session ID generated: {session_id}"
)),
));
}
if defer_initial_turn {
return Ok(RunResult {
text: String::new(),
session_id,
turns: 0,
tool_calls: 0,
usage: Usage::default(),
terminal_cause_kind: None,
structured_output: None,
extraction_error: None,
schema_warnings: None,
skill_diagnostics: None,
});
}
{
let sessions = self.sessions.read().await;
let handle = sessions.get(&session_id).ok_or_else(|| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(format!(
"fresh session handle missing for eager first turn: {session_id}"
)))
})?;
if let Err(error) = Self::request_start_turn(&session_id, handle) {
return Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(format!(
"fresh session failed to admit eager first turn: {error}"
)),
));
}
}
let initial_turn_metadata = req
.build
.as_ref()
.and_then(|build| build.initial_turn_metadata.as_ref())
.cloned();
let initial_handling_mode = initial_turn_metadata
.as_ref()
.and_then(|metadata| metadata.handling_mode)
.unwrap_or(meerkat_core::types::HandlingMode::Queue);
let initial_flow_tool_overlay = initial_turn_metadata
.as_ref()
.and_then(|metadata| metadata.flow_tool_overlay.clone());
let initial_runtime = meerkat_core::service::StartTurnRuntimeSemantics::new(
initial_handling_mode,
initial_flow_tool_overlay,
Vec::new(),
initial_turn_metadata,
);
let (result_tx, result_rx) = oneshot::channel();
if command_tx
.send(SessionCommand::StartTurn {
prompt,
runtime: Box::new(initial_runtime),
event_tx: caller_event_tx,
result_tx,
active_admission: eager_active_admission,
})
.await
.is_err()
{
let sessions = self.sessions.read().await;
if let Some(handle) = sessions.get(&session_id) {
Self::try_abort_admitted_turn(handle);
}
drop(sessions);
let mut sessions = self.sessions.write().await;
sessions.swap_remove(&session_id);
self.staged_registry.forget(&session_id);
return Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(
"Session task exited before first turn".to_string(),
),
));
}
let result = match result_rx.await {
Ok(result) => result,
Err(_) => {
let mut sessions = self.sessions.write().await;
sessions.swap_remove(&session_id);
self.staged_registry.forget(&session_id);
return Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(
"Session task dropped the result channel".to_string(),
),
));
}
};
result.map_err(SessionError::Agent)
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<B: SessionAgentBuilder + 'static> SessionService for EphemeralSessionService<B> {
async fn create_session(&self, req: CreateSessionRequest) -> Result<RunResult, SessionError> {
self.create_session_with_admission(req, None).await
}
async fn start_turn(
&self,
id: &SessionId,
req: StartTurnRequest,
) -> Result<RunResult, SessionError> {
self.start_turn_with_admission(id, req, None).await
}
async fn set_session_client(
&self,
id: &SessionId,
client: Arc<dyn meerkat_core::AgentLlmClient>,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::ReplaceClient { client, reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped reply channel".to_string(),
))
})
}
async fn hot_swap_session_llm_identity(
&self,
id: &SessionId,
client: Arc<dyn meerkat_core::AgentLlmClient>,
identity: SessionLlmIdentity,
request_policy: meerkat_core::SessionLlmRequestPolicy,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::HotSwapLlmIdentity {
client,
identity: Box::new(identity),
request_policy: Box::new(request_policy),
reply_tx,
})
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped reply channel".to_string(),
))
})?
.map_err(SessionError::Agent)
}
async fn set_session_tool_visibility_state(
&self,
id: &SessionId,
state: Option<meerkat_core::SessionToolVisibilityState>,
) -> Result<(), SessionError> {
Self::set_session_tool_visibility_state(self, id, state).await
}
async fn set_session_tool_filter(
&self,
id: &SessionId,
filter: meerkat_core::ToolFilter,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::StageToolFilter { filter, reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped reply channel".to_string(),
))
})?
.map_err(SessionError::Agent)
}
async fn interrupt(&self, id: &SessionId) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let woke = {
let mut slot = lock_turn_admission(&handle.turn_admission);
slot.request_interrupt()
.map_err(|_| SessionError::NotRunning { id: id.clone() })?
};
if woke {
wake_interrupt_notify(&handle.interrupt_notify);
}
Ok(())
}
async fn cancel_after_boundary(&self, id: &SessionId) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let Some(cancel_after_boundary_handle) = handle.cancel_after_boundary_handle.as_ref()
else {
return Err(SessionError::Unsupported(
"cancel_after_boundary".to_string(),
));
};
{
let mut slot = lock_turn_admission(&handle.turn_admission);
slot.authorize_cancel_after_boundary()
.map_err(|_| SessionError::NotRunning { id: id.clone() })?;
let _ = cancel_after_boundary_handle.send(CancelAfterBoundaryCommand);
}
wake_interrupt_notify(&handle.interrupt_notify);
Ok(())
}
async fn read(&self, id: &SessionId) -> Result<SessionView, SessionError> {
let sessions = self.sessions.read().await;
let handle = match sessions.get(id) {
Some(handle) => handle,
None => {
drop(sessions);
return self
.archived_views
.read()
.await
.get(id)
.cloned()
.ok_or_else(|| SessionError::NotFound { id: id.clone() });
}
};
let state = *handle.state_rx.borrow();
let summary = handle.summary_rx.borrow().clone();
let live_identity = handle.llm_identity_rx.borrow().clone();
Ok(SessionView {
state: SessionInfo {
session_id: id.clone(),
created_at: handle.created_at,
updated_at: summary.updated_at,
message_count: summary.message_count,
is_active: Self::is_session_state_active(state),
model: live_identity.model,
provider: live_identity.provider,
last_assistant_text: summary.last_assistant_text,
labels: handle.labels.clone(),
},
billing: SessionUsage {
total_tokens: summary.total_tokens,
usage: summary.usage,
},
})
}
async fn list(&self, query: SessionQuery) -> Result<Vec<SessionSummary>, SessionError> {
let sessions = self.sessions.read().await;
let mut summaries: Vec<SessionSummary> = sessions
.iter()
.map(|(session_id, h)| {
let state = *h.state_rx.borrow();
let cache = h.summary_rx.borrow();
SessionSummary {
session_id: session_id.clone(),
created_at: h.created_at,
updated_at: cache.updated_at,
message_count: cache.message_count,
total_tokens: cache.total_tokens,
is_active: Self::is_session_state_active(state),
labels: h.labels.clone(),
}
})
.collect();
if let Some(ref filter_labels) = query.labels {
summaries.retain(|s| {
filter_labels
.iter()
.all(|(k, v)| s.labels.get(k) == Some(v))
});
}
if let Some(offset) = query.offset {
if offset < summaries.len() {
summaries = summaries.split_off(offset);
} else {
summaries.clear();
}
}
if let Some(limit) = query.limit {
summaries.truncate(limit);
}
Ok(summaries)
}
async fn has_live_session(&self, id: &SessionId) -> Result<bool, SessionError> {
Ok(self.sessions.read().await.contains_key(id))
}
async fn archive(&self, id: &SessionId) -> Result<(), SessionError> {
let mut sessions = self.sessions.write().await;
let handle = sessions
.swap_remove(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let archived_view = Self::archived_view_from_handle(id, &handle);
drop(sessions);
self.staged_registry.forget(id);
self.archived_views
.write()
.await
.insert(id.clone(), archived_view);
let projection = {
let mut slot = lock_turn_admission(&handle.turn_admission);
slot.request_shutdown().ok().map(|_| slot.projection())
};
if let Some(projection) = projection {
handle.state_tx.send_replace(projection);
}
let _ = handle.command_tx.send(SessionCommand::Shutdown).await;
Ok(())
}
async fn update_session_mob_authority_context(
&self,
id: &SessionId,
authority_context: Option<MobToolAuthorityContext>,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::UpdateMobToolAuthority {
authority_context,
reply_tx,
})
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped reply channel".to_string(),
))
})?
.map_err(SessionError::Agent)
}
async fn subscribe_session_events(
&self,
id: &SessionId,
) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
EphemeralSessionService::<B>::subscribe_session_events(self, id).await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<B: SessionAgentBuilder + 'static> SessionServiceControlExt for EphemeralSessionService<B> {
async fn append_system_context(
&self,
id: &SessionId,
req: AppendSystemContextRequest,
) -> Result<AppendSystemContextResult, SessionControlError> {
let state = self
.system_context_state(id)
.await
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (status, _, _) = state
.stage_append_with_snapshot(&req, SystemTime::now())
.map_err(|err| err.into_control_error(id))?;
self.sync_system_context_state(id)
.await
.map_err(SessionControlError::Session)?;
Ok(AppendSystemContextResult { status })
}
async fn stage_tool_results(
&self,
id: &SessionId,
req: StageToolResultsRequest,
) -> Result<StageToolResultsResult, SessionError> {
Self::validate_tool_result_video(&req.results)?;
let state = self
.deferred_turn_state(id)
.await
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let accepted = {
let mut guard = lock_deferred_turn_state(&state);
guard.stage_tool_results(req.results, SystemTime::now())
};
Ok(StageToolResultsResult {
accepted_result_count: accepted,
})
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<B: SessionAgentBuilder + 'static> SessionServiceCommsExt for EphemeralSessionService<B> {
async fn comms_runtime(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
EphemeralSessionService::<B>::comms_runtime(self, session_id).await
}
async fn event_injector(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::EventInjector>> {
EphemeralSessionService::<B>::event_injector(self, session_id).await
}
async fn interaction_event_injector(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
EphemeralSessionService::<B>::interaction_event_injector(self, session_id).await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<B: SessionAgentBuilder + 'static> SessionServiceHistoryExt for EphemeralSessionService<B> {
async fn read_history(
&self,
id: &SessionId,
query: SessionHistoryQuery,
) -> Result<SessionHistoryPage, SessionError> {
match self.export_session(id).await {
Ok(session) => Ok(SessionHistoryPage::from_messages(
session.id().clone(),
session.messages(),
query,
)),
Err(SessionError::NotFound { .. }) => {
if self.archived_views.read().await.contains_key(id) {
Err(SessionError::PersistenceDisabled)
} else {
Err(SessionError::NotFound { id: id.clone() })
}
}
Err(err) => Err(err),
}
}
}
fn stamp_event_envelope(
next_seq: &mut u64,
source: &EventSourceIdentity,
event: AgentEvent,
) -> EventEnvelope<AgentEvent> {
*next_seq += 1;
EventEnvelope::new_with_source(source.clone(), *next_seq, None, event)
}
fn render_live_terminal_error_message(
cause: &meerkat_core::live_adapter::LiveAdapterErrorCode,
) -> String {
use meerkat_core::live_adapter::LiveAdapterErrorCode as Code;
match cause {
Code::ConnectionFailed => "live channel connection failed".to_string(),
Code::ConnectionLost => "live channel connection lost".to_string(),
Code::ConfigRejected { reason } => {
format!("live channel configuration rejected: {reason}")
}
Code::ProviderError => "live channel provider error".to_string(),
Code::AuthenticationFailed => "live channel authentication failed".to_string(),
Code::InternalError => "live channel internal error".to_string(),
Code::Other { raw } => format!("live channel error: {raw}"),
_ => "live channel terminal error".to_string(),
}
}
fn render_runtime_system_context_event_prompt(
appends: &[PendingSystemContextAppend],
) -> Option<String> {
if appends.is_empty() {
return None;
}
let rendered = appends
.iter()
.map(|append| {
let mut text = String::from("[Runtime System Context]");
if let Some(source) = &append.source {
text.push_str("\nsource: ");
text.push_str(source);
}
text.push_str("\n\n");
text.push_str(&append.content.render_text());
text
})
.collect::<Vec<_>>()
.join(meerkat_core::SYSTEM_CONTEXT_SEPARATOR);
Some(rendered)
}
fn apply_runtime_system_context_and_publish<A: SessionAgent>(
agent: &mut A,
appends: &[PendingSystemContextAppend],
control: &SessionTaskControl,
next_seq: &mut u64,
source: &EventSourceIdentity,
) {
agent.apply_runtime_system_context(appends);
let snap = agent.snapshot();
control.publish_committed_runtime_context_summary(SessionSummaryCache {
updated_at: snap.updated_at,
message_count: snap.message_count,
total_tokens: snap.total_tokens,
usage: snap.usage,
last_assistant_text: snap.last_assistant_text,
});
if let Some(prompt) = render_runtime_system_context_event_prompt(appends) {
let session_id = agent.session_id();
let started = stamp_event_envelope(
next_seq,
source,
AgentEvent::RunStarted {
session_id: session_id.clone(),
input: meerkat_core::types::RunInput::Content {
content: ContentInput::Text(prompt),
},
},
);
let _ = control.session_event_tx.send(started);
let completed = stamp_event_envelope(
next_seq,
source,
AgentEvent::RunCompleted {
session_id,
result: String::new(),
structured_output: None,
extraction_required: false,
usage: Usage::default(),
terminal_cause_kind: None,
},
);
let _ = control.session_event_tx.send(completed);
}
}
fn publish_runtime_system_context_events<A: SessionAgent>(
agent: &A,
appends: &[PendingSystemContextAppend],
control: &SessionTaskControl,
next_seq: &mut u64,
source: &EventSourceIdentity,
) {
let snap = agent.snapshot();
control.publish_summary(SessionSummaryCache {
updated_at: snap.updated_at,
message_count: snap.message_count,
total_tokens: snap.total_tokens,
usage: snap.usage,
last_assistant_text: snap.last_assistant_text,
});
if let Some(prompt) = render_runtime_system_context_event_prompt(appends) {
let session_id = agent.session_id();
let started = stamp_event_envelope(
next_seq,
source,
AgentEvent::RunStarted {
session_id: session_id.clone(),
input: meerkat_core::types::RunInput::Content {
content: ContentInput::Text(prompt),
},
},
);
let _ = control.session_event_tx.send(started);
let completed = stamp_event_envelope(
next_seq,
source,
AgentEvent::RunCompleted {
session_id,
result: String::new(),
structured_output: None,
extraction_required: false,
usage: Usage::default(),
terminal_cause_kind: None,
},
);
let _ = control.session_event_tx.send(completed);
}
if !appends.is_empty() {
let snap = agent.snapshot();
control.publish_summary(SessionSummaryCache {
updated_at: snap.updated_at,
message_count: snap.message_count,
total_tokens: snap.total_tokens,
usage: snap.usage,
last_assistant_text: snap.last_assistant_text,
});
}
}
fn lock_deferred_turn_state(
state: &Arc<std::sync::Mutex<SessionDeferredTurnState>>,
) -> std::sync::MutexGuard<'_, SessionDeferredTurnState> {
match state.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!("deferred-turn state lock poisoned; continuing with inner state");
poisoned.into_inner()
}
}
}
fn lock_active_capacity_lease(
lease: &Arc<std::sync::Mutex<SessionActiveCapacityLease>>,
) -> std::sync::MutexGuard<'_, SessionActiveCapacityLease> {
lease
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
fn acquire_active_capacity_lease(
active_capacity_lease: Arc<std::sync::Mutex<SessionActiveCapacityLease>>,
permit: Option<OwnedSemaphorePermit>,
promotion: Option<PromotionTicket>,
) -> RuntimeContextAdmissionGuard {
let mut lease = lock_active_capacity_lease(&active_capacity_lease);
if lease.leases == 0 {
lease.permit = permit;
lease.promotion = promotion;
} else {
drop(permit);
if lease.promotion.is_none() {
lease.promotion = promotion;
}
}
lease.leases = lease.leases.saturating_add(1);
drop(lease);
RuntimeContextAdmissionGuard {
active_capacity_lease: Some(active_capacity_lease),
active_permit: None,
}
}
fn try_join_active_capacity_lease(
active_capacity_lease: Arc<std::sync::Mutex<SessionActiveCapacityLease>>,
) -> Option<RuntimeContextAdmissionGuard> {
let mut lease = lock_active_capacity_lease(&active_capacity_lease);
if lease.leases == 0 {
return None;
}
lease.leases = lease.leases.saturating_add(1);
drop(lease);
Some(RuntimeContextAdmissionGuard {
active_capacity_lease: Some(active_capacity_lease),
active_permit: None,
})
}
fn release_active_capacity_lease(
active_capacity_lease: &Arc<std::sync::Mutex<SessionActiveCapacityLease>>,
) -> ActiveCapacityLeaseRelease {
let mut lease = lock_active_capacity_lease(active_capacity_lease);
if lease.leases == 0 {
return ActiveCapacityLeaseRelease::default();
}
lease.leases -= 1;
if lease.leases == 0 {
ActiveCapacityLeaseRelease {
permit: lease.permit.take(),
promotion: lease.promotion.take(),
}
} else {
ActiveCapacityLeaseRelease::default()
}
}
fn lock_turn_admission(
slot: &Arc<std::sync::Mutex<TurnAdmissionSlot>>,
) -> std::sync::MutexGuard<'_, TurnAdmissionSlot> {
slot.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
fn abort_admitted_turn(control: &SessionTaskControl) {
let projection = {
let mut slot = lock_turn_admission(&control.turn_admission);
let phase = slot.abort_claim().ok();
phase.map(|_| slot.projection())
};
if let Some(projection) = projection {
control.state_tx.send_replace(projection);
}
}
fn merge_content_inputs(
deferred: meerkat_core::types::ContentInput,
turn: meerkat_core::types::ContentInput,
) -> meerkat_core::types::ContentInput {
match (&deferred, &turn) {
(
meerkat_core::types::ContentInput::Text(deferred_text),
meerkat_core::types::ContentInput::Text(turn_text),
) => meerkat_core::types::ContentInput::Text(format!("{deferred_text}\n\n{turn_text}")),
_ => {
let mut blocks = deferred.into_blocks();
blocks.extend(turn.into_blocks());
meerkat_core::types::ContentInput::Blocks(blocks)
}
}
}
fn restore_deferred_turn_inputs(
deferred_turn_state: &Arc<std::sync::Mutex<SessionDeferredTurnState>>,
consumed: ConsumedDeferredTurnInputs,
) {
let mut guard = lock_deferred_turn_state(deferred_turn_state);
guard.restore_consumed_turn_inputs(consumed);
}
async fn session_task<A: SessionAgent>(
mut agent: A,
agent_event_tx: mpsc::Sender<AgentEvent>,
mut agent_event_rx: mpsc::Receiver<AgentEvent>,
mut commands: mpsc::Receiver<SessionCommand>,
deferred_turn_state: Arc<std::sync::Mutex<SessionDeferredTurnState>>,
control: SessionTaskControl,
) {
let mut next_seq: u64 = 0;
let source = EventSourceIdentity::session(agent.session_id());
loop {
let Some(cmd) = commands.recv().await else {
break;
};
match cmd {
SessionCommand::ReplaceClient { client, reply_tx } => {
agent.replace_client(client);
let _ = reply_tx.send(());
continue;
}
SessionCommand::HotSwapLlmIdentity {
client,
identity,
request_policy,
reply_tx,
} => {
let result =
agent.hot_swap_llm_identity(client, (*identity).clone(), *request_policy);
if result.is_ok() {
control.llm_identity_tx.send_replace(*identity);
}
let _ = reply_tx.send(result);
continue;
}
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
SessionCommand::UpdateKeepAlive {
keep_alive,
reply_tx,
} => {
agent.update_keep_alive(keep_alive);
let _ = reply_tx.send(());
continue;
}
SessionCommand::StageToolFilter { filter, reply_tx } => {
let _ = reply_tx.send(agent.stage_external_tool_filter(filter));
continue;
}
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
SessionCommand::SetToolVisibilityState { state, reply_tx } => {
let _ = reply_tx.send(agent.set_tool_visibility_state(state.map(|state| *state)));
continue;
}
SessionCommand::SyncSystemContextState { reply_tx } => {
let _ = reply_tx.send(agent.sync_system_context_state());
continue;
}
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
SessionCommand::SyncSessionFromDurableSnapshot { session, reply_tx } => {
let durable_deferred_turn_state = session
.try_deferred_turn_state()
.map_err(|err| {
meerkat_core::error::AgentError::InternalError(format!(
"failed to restore durable deferred-turn state during live session sync: {err}"
))
});
let result = match durable_deferred_turn_state {
Ok(durable_deferred_turn_state) => {
let result = agent.sync_session_from_durable_snapshot(*session);
if result.is_ok()
&& let Some(durable_deferred_turn_state) = durable_deferred_turn_state
{
*lock_deferred_turn_state(&deferred_turn_state) =
durable_deferred_turn_state;
}
result
}
Err(err) => Err(err),
};
if result.is_ok() {
let snap = agent.snapshot();
control.publish_summary(SessionSummaryCache {
updated_at: snap.updated_at,
message_count: snap.message_count,
total_tokens: snap.total_tokens,
usage: snap.usage,
last_assistant_text: snap.last_assistant_text,
});
}
let _ = reply_tx.send(result);
continue;
}
SessionCommand::StartTurn {
prompt,
runtime,
event_tx,
result_tx,
active_admission,
} => {
let runtime = *runtime;
let metadata = runtime.turn_metadata;
let render_metadata = metadata
.as_ref()
.and_then(|metadata| metadata.render_metadata.clone());
let handling_mode = metadata
.as_ref()
.and_then(|metadata| metadata.handling_mode)
.unwrap_or(runtime.handling_mode);
let skill_references = metadata
.as_ref()
.and_then(|metadata| metadata.skill_references.clone());
let flow_tool_overlay = metadata
.as_ref()
.and_then(|metadata| metadata.flow_tool_overlay.clone())
.or(runtime.flow_tool_overlay);
let keep_alive_request =
match metadata.as_ref().and_then(|metadata| metadata.keep_alive) {
Some(
meerkat_core::lifecycle::run_primitive::KeepAliveDirective::Enable(_),
) => RuntimeKeepAliveRequest::Enable,
Some(
meerkat_core::lifecycle::run_primitive::KeepAliveDirective::Disable,
) => RuntimeKeepAliveRequest::Disable,
None => RuntimeKeepAliveRequest::Preserve,
};
let pre_turn_context_appends = runtime.pre_turn_context_appends;
let typed_turn_appends = runtime.typed_turn_appends;
let prompt = if typed_turn_appends.is_empty() {
prompt
} else {
meerkat_core::lifecycle::run_primitive::model_projection_content_input_from_conversation_appends(
&typed_turn_appends,
)
};
let execution_kind = metadata
.as_ref()
.and_then(|metadata| metadata.execution_kind);
let dispatch_authorization = {
let mut slot = lock_turn_admission(&control.turn_admission);
slot.authorize_start_turn_dispatch()
};
match dispatch_authorization {
Ok(StartTurnDispatchAuthorization::Authorized) => {}
Ok(StartTurnDispatchAuthorization::Cancelled) => {
let _ = result_tx.send(Err(meerkat_core::error::AgentError::Cancelled));
continue;
}
Err(error) => {
let _ =
result_tx.send(Err(meerkat_core::error::AgentError::InternalError(
format!("generated turn authority rejected dispatch: {error}"),
)));
continue;
}
}
let consumed_deferred_inputs = {
let mut guard = lock_deferred_turn_state(&deferred_turn_state);
guard.consume_for_started_turn()
};
let prompt = match consumed_deferred_inputs.pending_initial_prompt() {
Some(staged_prompt) => {
merge_content_inputs(staged_prompt.prompt.clone(), prompt)
}
None => prompt,
};
let flattened_tool_results = consumed_deferred_inputs
.pending_tool_results()
.iter()
.flat_map(|pending| pending.results.clone())
.collect::<Vec<_>>();
let pending_tool_results_count =
u64::try_from(flattened_tool_results.len()).unwrap_or(u64::MAX);
let resolution = {
let mut slot = lock_turn_admission(&control.turn_admission);
slot.resolve_start_turn_disposition(
execution_kind,
&prompt,
agent.observed_session_tail(),
pending_tool_results_count,
)
};
let resolution = match resolution {
Ok(resolution) => resolution,
Err(error) => {
restore_deferred_turn_inputs(
&deferred_turn_state,
consumed_deferred_inputs,
);
abort_admitted_turn(&control);
let _ =
result_tx.send(Err(meerkat_core::error::AgentError::InternalError(
format!("illegal start-turn disposition transition: {error}"),
)));
continue;
}
};
let disposition = resolution.disposition;
if matches!(disposition, StartTurnDisposition::NoPendingBoundary) {
let terminal = resolution.public_terminal;
restore_deferred_turn_inputs(&deferred_turn_state, consumed_deferred_inputs);
abort_admitted_turn(&control);
let result = if terminal == Some(StartTurnPublicTerminal::NoPendingBoundary) {
Err(meerkat_core::error::AgentError::NoPendingBoundary)
} else {
Err(meerkat_core::error::AgentError::InternalError(
"generated turn authority omitted NoPendingBoundary terminal witness"
.to_string(),
))
};
let _ = result_tx.send(result);
continue;
}
if let Some(terminal) = resolution.public_terminal {
restore_deferred_turn_inputs(&deferred_turn_state, consumed_deferred_inputs);
abort_admitted_turn(&control);
let _ =
result_tx.send(Err(meerkat_core::error::AgentError::InternalError(
format!(
"generated turn authority emitted terminal {terminal:?} for runnable disposition {disposition:?}"
),
)));
continue;
}
let persist_runtime_keep_alive = {
let mut slot = lock_turn_admission(&control.turn_admission);
slot.resolve_runtime_keep_alive(keep_alive_request)
};
let persist_runtime_keep_alive = match persist_runtime_keep_alive {
Ok(persist_runtime_keep_alive) => persist_runtime_keep_alive,
Err(error) => {
restore_deferred_turn_inputs(
&deferred_turn_state,
consumed_deferred_inputs,
);
abort_admitted_turn(&control);
let _ = result_tx.send(Err(
meerkat_core::error::AgentError::InternalError(format!(
"generated turn authority rejected keep_alive intent: {error}"
)),
));
continue;
}
};
match agent.discard_unapplied_active_turn_system_context() {
Ok(discarded_stale_active_context) => {
if discarded_stale_active_context > 0 {
tracing::debug!(
discarded_stale_active_context,
"discarded stale active-turn system context before starting a new run"
);
}
}
Err(error) => {
restore_deferred_turn_inputs(
&deferred_turn_state,
consumed_deferred_inputs,
);
abort_admitted_turn(&control);
let _ = result_tx.send(Err(
meerkat_core::error::AgentError::InternalError(error.to_string()),
));
continue;
}
}
agent.set_skill_references(skill_references);
if let Err(error) = agent.set_flow_tool_overlay(flow_tool_overlay) {
restore_deferred_turn_inputs(&deferred_turn_state, consumed_deferred_inputs);
abort_admitted_turn(&control);
let _ = result_tx.send(Err(error));
continue;
}
let apply_authorization = {
let mut authority = SessionDocumentMachineAuthority::new();
authority
.apply_pending_tool_results(
SessionDocumentKey::new(agent.session_id().to_string()),
pending_tool_results_count,
)
.map_err(|err| {
meerkat_core::error::AgentError::InternalError(format!(
"generated session document authority rejected pending \
tool-results apply: {err}"
))
})
.and_then(|effects| {
effects
.iter()
.find_map(|effect| match effect {
SessionDocumentEffect::SessionToolResultsApplied {
applied_count,
..
} => Some(*applied_count),
_ => None,
})
.ok_or_else(|| {
meerkat_core::error::AgentError::InternalError(
"generated session document authority returned no \
pending tool-results apply verdict"
.to_string(),
)
})
})
.and_then(|applied_count| {
if applied_count == pending_tool_results_count {
Ok(())
} else {
Err(meerkat_core::error::AgentError::InternalError(format!(
"generated session document authority authorized \
{applied_count} pending tool-results but the staged \
disposition consumed {pending_tool_results_count}"
)))
}
})
};
let apply_result = match apply_authorization {
Ok(()) => agent.apply_pending_tool_results(flattened_tool_results),
Err(error) => Err(error),
};
if let Err(error) = apply_result {
let _ = agent.set_flow_tool_overlay(None);
restore_deferred_turn_inputs(&deferred_turn_state, consumed_deferred_inputs);
abort_admitted_turn(&control);
let _ = result_tx.send(Err(error));
continue;
}
match persist_runtime_keep_alive {
crate::turn_admission::RuntimeKeepAlivePersistenceDecision::PersistEnabled => {
agent.update_keep_alive(true);
}
crate::turn_admission::RuntimeKeepAlivePersistenceDecision::PersistDisabled => {
agent.update_keep_alive(false);
}
crate::turn_admission::RuntimeKeepAlivePersistenceDecision::PreserveExisting => {}
}
let begin_phase = {
let mut slot = lock_turn_admission(&control.turn_admission);
slot.begin().map(|_| slot.projection())
};
match begin_phase {
Ok(projection) => {
control.state_tx.send_replace(projection);
if let Some(admission) = active_admission.as_ref() {
admission.commit_promotion();
}
}
Err(error) => {
let _ = agent.set_flow_tool_overlay(None);
restore_deferred_turn_inputs(
&deferred_turn_state,
consumed_deferred_inputs,
);
let _ =
result_tx.send(Err(meerkat_core::error::AgentError::InternalError(
format!("illegal begin-run transition: {error}"),
)));
continue;
}
}
if !pre_turn_context_appends.is_empty() {
agent.apply_runtime_system_context(&pre_turn_context_appends);
}
let mut event_stream_open = true;
let (result, resolved_projection) = {
#[cfg(not(target_arch = "wasm32"))]
type RunFut<'a> = std::pin::Pin<
Box<
dyn std::future::Future<
Output = Result<RunResult, meerkat_core::error::AgentError>,
> + Send
+ 'a,
>,
>;
#[cfg(target_arch = "wasm32")]
type RunFut<'a> = std::pin::Pin<
Box<
dyn std::future::Future<
Output = Result<RunResult, meerkat_core::error::AgentError>,
> + 'a,
>,
>;
let run_fut: RunFut<'_> = match disposition {
StartTurnDisposition::RunContentTurn => {
Box::pin(agent.run_turn_with_events(
prompt,
handling_mode,
render_metadata,
typed_turn_appends,
execution_kind,
agent_event_tx.clone(),
))
}
StartTurnDisposition::RunPending => Box::pin(
agent.run_pending_with_events(execution_kind, agent_event_tx.clone()),
),
StartTurnDisposition::NoPendingBoundary => {
unreachable!("NoPendingBoundary handled before Running state")
}
};
let mut run_fut = run_fut;
let mut interrupted = false;
let mut resolved_projection = None;
let r = loop {
if lock_turn_admission(&control.turn_admission).interrupt_pending() {
interrupted = true;
break Err(meerkat_core::error::AgentError::Cancelled);
}
let interrupt_wait = control.interrupt_notify.notified();
tokio::pin!(interrupt_wait);
if lock_turn_admission(&control.turn_admission).interrupt_pending() {
interrupted = true;
break Err(meerkat_core::error::AgentError::Cancelled);
}
tokio::select! {
result = &mut run_fut => {
let mut slot = lock_turn_admission(&control.turn_admission);
if slot.interrupt_pending() {
interrupted = true;
break Err(meerkat_core::error::AgentError::Cancelled);
}
resolved_projection = slot.resolve().ok().map(|_| slot.projection());
break result;
}
() = &mut interrupt_wait => {
let interrupt_pending =
lock_turn_admission(&control.turn_admission).interrupt_pending();
if interrupt_pending {
interrupted = true;
break Err(meerkat_core::error::AgentError::Cancelled);
}
}
Some(event) = agent_event_rx.recv() => {
let envelope = stamp_event_envelope(&mut next_seq, &source, event);
let _ = control.session_event_tx.send(envelope.clone());
if event_stream_open
&& let Some(ref tx) = event_tx
&& tx.send(envelope).await.is_err()
{
event_stream_open = false;
tracing::warn!("session event stream receiver dropped; continuing without streaming events");
}
}
}
};
drop(run_fut);
if interrupted {
agent.cancel();
}
while let Ok(event) = agent_event_rx.try_recv() {
let envelope = stamp_event_envelope(&mut next_seq, &source, event);
let _ = control.session_event_tx.send(envelope.clone());
if event_stream_open
&& let Some(ref tx) = event_tx
&& tx.send(envelope).await.is_err()
{
event_stream_open = false;
tracing::warn!(
"session event stream receiver dropped while draining events"
);
}
}
(r, resolved_projection)
};
let discard_active_context_error = match agent
.discard_unapplied_active_turn_system_context()
{
Ok(discarded_active_context) => {
if discarded_active_context > 0 {
tracing::debug!(
discarded_active_context,
"discarded active-turn system context that missed the run boundary"
);
}
None
}
Err(error) => Some(error),
};
let resolve_projection = resolved_projection.or_else(|| {
let mut slot = lock_turn_admission(&control.turn_admission);
slot.resolve().ok().map(|_| slot.projection())
});
if let Some(projection) = resolve_projection {
control.state_tx.send_replace(projection);
}
let snap = agent.snapshot();
control.publish_summary(SessionSummaryCache {
updated_at: snap.updated_at,
message_count: snap.message_count,
total_tokens: snap.total_tokens,
usage: snap.usage,
last_assistant_text: snap.last_assistant_text,
});
let result = if let Err(error) = agent.set_flow_tool_overlay(None) {
tracing::error!(
error = %error,
"failed to clear flow tool overlay; failing turn to avoid stale scope"
);
Err(error)
} else if let Some(error) = discard_active_context_error {
tracing::error!(
error = %error,
"failed to sync system-context state while discarding stale active-turn \
context; failing turn to avoid stale canonical metadata"
);
Err(meerkat_core::error::AgentError::InternalError(
error.to_string(),
))
} else {
result
};
let finalize = {
let mut slot = lock_turn_admission(&control.turn_admission);
let finalize = slot.finalize();
finalize.map(|outcome| (outcome, slot.projection()))
};
let shutting_down = match finalize {
Ok((outcome, projection)) => {
control.state_tx.send_replace(projection);
outcome.next_phase == TurnAdmissionPhase::ShuttingDown
}
Err(error) => {
tracing::error!(
error = %error,
"failed to finalize session turn admission state"
);
false
}
};
drop(active_admission);
let _ = result_tx.send(result);
if shutting_down {
break;
}
}
SessionCommand::ExportSession { reply_tx } => {
let _ = reply_tx.send(agent.session_clone());
}
SessionCommand::ExecutionSnapshot { reply_tx } => {
let _ = reply_tx.send(agent.execution_snapshot());
}
SessionCommand::ToolScopeSnapshot { reply_tx } => {
let _ = reply_tx.send(agent.tool_scope_snapshot());
}
SessionCommand::VisibleToolDefs { reply_tx } => {
let _ = reply_tx.send(agent.visible_tool_defs());
}
SessionCommand::ExternalToolSurfaceSnapshot { reply_tx } => {
let _ = reply_tx.send(agent.external_tool_surface_snapshot());
}
SessionCommand::ApplyRuntimeSystemContext { appends, reply_tx } => {
apply_runtime_system_context_and_publish(
&mut agent,
&appends,
&control,
&mut next_seq,
&source,
);
let _ = reply_tx.send(());
}
SessionCommand::ApplyRuntimeSystemContextForTurn { appends, reply_tx } => {
agent.apply_runtime_system_context(&appends);
let _ = reply_tx.send(());
}
SessionCommand::PublishRuntimeSystemContextEvents { appends, reply_tx } => {
publish_runtime_system_context_events(
&agent,
&appends,
&control,
&mut next_seq,
&source,
);
let _ = reply_tx.send(());
}
SessionCommand::RecordLiveTerminalError { cause, reply_tx } => {
let message = render_live_terminal_error_message(&cause);
let failed = stamp_event_envelope(
&mut next_seq,
&source,
AgentEvent::RunFailed {
session_id: agent.session_id(),
terminal_cause_kind: None,
error_report: meerkat_core::event::AgentErrorReport {
class: meerkat_core::event::AgentErrorClass::Terminal,
reason: None,
message,
},
},
);
let _ = control.session_event_tx.send(failed);
let _ = reply_tx.send(());
}
SessionCommand::RecordLiveOutputAudioDegraded { dropped, reply_tx } => {
let truncated = stamp_event_envelope(
&mut next_seq,
&source,
AgentEvent::StreamTruncated {
reason: meerkat_core::event::StreamTruncationReason::OutputAudioDegraded {
dropped,
},
},
);
let _ = control.session_event_tx.send(truncated);
let _ = reply_tx.send(());
}
SessionCommand::AppendExternalUserContent { content, reply_tx } => {
let result = agent.append_external_user_content(content);
if result.is_ok() {
let snap = agent.snapshot();
control.publish_summary(SessionSummaryCache {
updated_at: snap.updated_at,
message_count: snap.message_count,
total_tokens: snap.total_tokens,
usage: snap.usage,
last_assistant_text: snap.last_assistant_text,
});
}
let _ = reply_tx.send(result);
}
SessionCommand::AppendExternalAssistantOutput {
blocks,
stop_reason,
usage,
reply_tx,
} => {
let text_content = blocks
.iter()
.filter_map(|block| match block {
meerkat_core::types::AssistantBlock::Text { text, .. }
| meerkat_core::types::AssistantBlock::Transcript { text, .. } => {
Some(text.as_str())
}
_ => None,
})
.collect::<String>();
let usage_for_event = usage.clone();
let result = agent.append_external_assistant_output(blocks, stop_reason, usage);
if result.is_ok() {
let snap = agent.snapshot();
control.publish_summary(SessionSummaryCache {
updated_at: snap.updated_at,
message_count: snap.message_count,
total_tokens: snap.total_tokens,
usage: snap.usage,
last_assistant_text: snap.last_assistant_text,
});
if !text_content.is_empty() {
let envelope = stamp_event_envelope(
&mut next_seq,
&source,
AgentEvent::TextComplete {
content: text_content,
},
);
let _ = control.session_event_tx.send(envelope);
}
let envelope = stamp_event_envelope(
&mut next_seq,
&source,
AgentEvent::TurnCompleted {
stop_reason,
usage: usage_for_event,
},
);
let _ = control.session_event_tx.send(envelope);
}
let _ = reply_tx.send(result);
}
SessionCommand::AppendRealtimeTranscriptEvent { event, reply_tx } => {
let result = agent.append_realtime_transcript_event(event);
if let Ok(outcome) = &result {
let snap = agent.snapshot();
control.publish_summary(SessionSummaryCache {
updated_at: snap.updated_at,
message_count: snap.message_count,
total_tokens: snap.total_tokens,
usage: snap.usage,
last_assistant_text: snap.last_assistant_text,
});
for materialized in &outcome.materialized_messages {
if let RealtimeTranscriptMaterializedMessage::Assistant {
text,
stop_reason,
usage,
..
} = materialized
{
if !text.is_empty() {
let envelope = stamp_event_envelope(
&mut next_seq,
&source,
AgentEvent::TextComplete {
content: text.clone(),
},
);
let _ = control.session_event_tx.send(envelope);
}
let envelope = stamp_event_envelope(
&mut next_seq,
&source,
AgentEvent::TurnCompleted {
stop_reason: *stop_reason,
usage: usage.clone(),
},
);
let _ = control.session_event_tx.send(envelope);
}
}
}
let _ = reply_tx.send(result);
}
SessionCommand::DispatchExternalToolCall {
call,
timeout_policy,
reply_tx,
} => {
let result = agent
.dispatch_external_tool_call_with_timeout_policy(call, timeout_policy)
.await;
if result.is_ok() {
let snap = agent.snapshot();
control.publish_summary(SessionSummaryCache {
updated_at: snap.updated_at,
message_count: snap.message_count,
total_tokens: snap.total_tokens,
usage: snap.usage,
last_assistant_text: snap.last_assistant_text,
});
}
let _ = reply_tx.send(result);
}
SessionCommand::UpdateMobToolAuthority {
authority_context,
reply_tx,
} => {
let _ = reply_tx.send(agent.update_mob_tool_authority_context(authority_context));
}
SessionCommand::UpdateSystemPrompt {
system_prompt,
reply_tx,
} => {
let _ = reply_tx.send(agent.update_system_prompt(system_prompt));
}
SessionCommand::Shutdown => {
let next_projection = {
let mut slot = lock_turn_admission(&control.turn_admission);
let next_phase = slot.request_shutdown().ok();
next_phase.map(|_| slot.projection())
};
if let Some(projection) = next_projection {
control.state_tx.send_replace(projection);
}
break;
}
}
}
}
#[cfg(test)]
#[allow(clippy::expect_used)]
mod runtime_turn_metadata_tests {
use super::*;
use async_trait::async_trait;
use meerkat_core::handles::SessionContextHandle;
use meerkat_core::handles::TurnStateHandle;
use meerkat_core::lifecycle::RuntimeExecutionKind;
use meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata;
use meerkat_core::service::{
DeferredPromptPolicy, InitialTurnPolicy, SessionBuildOptions, SessionService,
};
use meerkat_core::skills::{SkillKey, SkillName};
use meerkat_core::turn_execution_authority::{ContentShape, TurnPrimitiveKind};
use meerkat_runtime::handles::RuntimeTurnStateHandle;
use std::sync::{Arc, Mutex};
fn test_llm_identity(model: &str) -> SessionLlmIdentity {
SessionLlmIdentity {
model: model.to_string(),
provider: meerkat_core::Provider::OpenAI,
self_hosted_server_id: None,
provider_params: None,
auth_binding: None,
}
}
#[derive(Clone)]
struct MetadataProbeBuilder {
observed_skill_references: Arc<Mutex<Vec<Option<Vec<SkillKey>>>>>,
observed_context_texts: Arc<Mutex<Vec<String>>>,
run_context_counts: Arc<Mutex<Vec<usize>>>,
fail_flow_overlay_set: bool,
session_context_handle: Option<Arc<RecordingSessionContextHandle>>,
}
struct MetadataProbeAgent {
session_id: SessionId,
session: meerkat_core::Session,
identity: SessionLlmIdentity,
observed_skill_references: Arc<Mutex<Vec<Option<Vec<SkillKey>>>>>,
observed_context_texts: Arc<Mutex<Vec<String>>>,
run_context_counts: Arc<Mutex<Vec<usize>>>,
fail_flow_overlay_set: bool,
system_context_state: meerkat_core::SystemContextStateHandle,
session_context_handle: Option<Arc<RecordingSessionContextHandle>>,
}
#[derive(Debug, Default)]
struct RecordingSessionContextHandle {
ticks: Mutex<Vec<u64>>,
}
impl RecordingSessionContextHandle {
fn ticks(&self) -> Vec<u64> {
self.ticks
.lock()
.expect("session context ticks lock poisoned")
.clone()
}
}
impl meerkat_core::handles::SessionContextHandle for RecordingSessionContextHandle {
fn context_advanced(
&self,
updated_at_ms: u64,
) -> Result<bool, meerkat_core::handles::DslTransitionError> {
let mut ticks = self
.ticks
.lock()
.expect("session context ticks lock poisoned");
if ticks
.last()
.is_some_and(|current| updated_at_ms <= *current)
{
return Ok(false);
}
ticks.push(updated_at_ms);
Ok(true)
}
fn current_watermark_ms(&self) -> u64 {
self.ticks
.lock()
.expect("session context ticks lock poisoned")
.last()
.copied()
.unwrap_or(0)
}
fn install_observer(
&self,
_observer: Arc<dyn meerkat_core::handles::SessionContextAdvancedObserver>,
) {
}
fn install_observer_with_baseline(
&self,
_observer: Arc<dyn meerkat_core::handles::SessionContextAdvancedObserver>,
) -> u64 {
self.current_watermark_ms()
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl SessionAgentBuilder for MetadataProbeBuilder {
type Agent = MetadataProbeAgent;
async fn build_agent(
&self,
req: &CreateSessionRequest,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<Self::Agent, SessionError> {
let session = req
.build
.as_ref()
.and_then(|build| build.resume_session.clone())
.unwrap_or_default();
let session_id = session.id().clone();
Ok(MetadataProbeAgent {
session_id,
session,
identity: test_llm_identity(&req.model),
observed_skill_references: Arc::clone(&self.observed_skill_references),
observed_context_texts: Arc::clone(&self.observed_context_texts),
run_context_counts: Arc::clone(&self.run_context_counts),
fail_flow_overlay_set: self.fail_flow_overlay_set,
system_context_state: meerkat_core::SystemContextStateHandle::new(
Default::default(),
)
.expect("default system-context state should restore"),
session_context_handle: self.session_context_handle.clone(),
})
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl SessionAgent for MetadataProbeAgent {
async fn run_with_events(
&mut self,
_prompt: ContentInput,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, AgentError> {
self.run_context_counts
.lock()
.expect("run context counts lock poisoned")
.push(
self.observed_context_texts
.lock()
.expect("observed context texts lock poisoned")
.len(),
);
Ok(RunResult {
text: "ok".to_string(),
session_id: self.session_id.clone(),
usage: Usage::default(),
turns: 1,
tool_calls: 0,
terminal_cause_kind: None,
structured_output: None,
extraction_error: None,
schema_warnings: None,
skill_diagnostics: None,
})
}
fn set_skill_references(&mut self, refs: Option<Vec<SkillKey>>) {
self.observed_skill_references
.lock()
.expect("observed skill references lock poisoned")
.push(refs);
}
fn set_flow_tool_overlay(
&mut self,
overlay: Option<TurnToolOverlay>,
) -> Result<(), AgentError> {
if self.fail_flow_overlay_set && overlay.is_some() {
return Err(AgentError::ConfigError(
"synthetic flow overlay failure".to_string(),
));
}
Ok(())
}
fn hot_swap_llm_identity(
&mut self,
_client: Arc<dyn meerkat_core::AgentLlmClient>,
_identity: SessionLlmIdentity,
_request_policy: meerkat_core::SessionLlmRequestPolicy,
) -> Result<(), AgentError> {
Ok(())
}
fn cancel(&mut self) {}
fn session_id(&self) -> SessionId {
self.session_id.clone()
}
fn snapshot(&self) -> SessionSnapshot {
SessionSnapshot {
created_at: SystemTime::now(),
updated_at: SystemTime::now(),
message_count: 0,
total_tokens: 0,
usage: Usage::default(),
last_assistant_text: None,
}
}
fn session_clone(&self) -> Result<meerkat_core::Session, SystemContextStateError> {
Ok(self.session.clone())
}
fn observed_session_tail(&self) -> ObservedSessionTailKind {
ObservedSessionTailKind::Empty
}
fn durable_llm_identity(&self) -> Option<SessionLlmIdentity> {
Some(self.identity.clone())
}
fn apply_runtime_system_context(&mut self, appends: &[PendingSystemContextAppend]) {
self.observed_context_texts
.lock()
.expect("observed context texts lock poisoned")
.extend(appends.iter().map(|append| append.content.render_text()));
}
fn system_context_state(&self) -> meerkat_core::SystemContextStateHandle {
self.system_context_state.clone()
}
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
fn sync_session_from_durable_snapshot(
&mut self,
session: meerkat_core::Session,
) -> Result<(), AgentError> {
if session.id() != &self.session_id {
return Err(AgentError::InternalError(format!(
"snapshot session id {} did not match live session {}",
session.id(),
self.session_id
)));
}
self.session = session;
Ok(())
}
fn session_context_handle(
&self,
) -> Option<Arc<dyn meerkat_core::handles::SessionContextHandle>> {
self.session_context_handle.as_ref().map(|handle| {
Arc::clone(handle) as Arc<dyn meerkat_core::handles::SessionContextHandle>
})
}
}
#[derive(Clone)]
struct BlockingBoundaryBuilder {
started: Arc<tokio::sync::Notify>,
release: Arc<tokio::sync::Notify>,
turn_state: Option<Arc<RuntimeTurnStateHandle>>,
}
struct BlockingBoundaryAgent {
session_id: SessionId,
identity: SessionLlmIdentity,
started: Arc<tokio::sync::Notify>,
release: Arc<tokio::sync::Notify>,
turn_state: Option<Arc<RuntimeTurnStateHandle>>,
system_context_state: meerkat_core::SystemContextStateHandle,
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl SessionAgentBuilder for BlockingBoundaryBuilder {
type Agent = BlockingBoundaryAgent;
async fn build_agent(
&self,
req: &CreateSessionRequest,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<Self::Agent, SessionError> {
Ok(BlockingBoundaryAgent {
session_id: SessionId::new(),
identity: test_llm_identity(&req.model),
started: Arc::clone(&self.started),
release: Arc::clone(&self.release),
turn_state: self.turn_state.clone(),
system_context_state: meerkat_core::SystemContextStateHandle::new(
Default::default(),
)
.expect("default system-context state should restore"),
})
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl SessionAgent for BlockingBoundaryAgent {
async fn run_with_events(
&mut self,
_prompt: ContentInput,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, AgentError> {
let run_id = RunId::new();
if let Some(turn_state) = self.turn_state.as_ref() {
turn_state
.start_conversation_run(
run_id.clone(),
TurnPrimitiveKind::ConversationTurn,
ContentShape::Conversation,
false,
false,
0,
)
.map_err(|err| AgentError::InternalError(err.to_string()))?;
}
self.started.notify_waiters();
self.release.notified().await;
if let Some(turn_state) = self.turn_state.as_ref() {
turn_state
.primitive_applied(run_id.clone())
.map_err(|err| AgentError::InternalError(err.to_string()))?;
turn_state
.llm_returned_terminal(run_id)
.map_err(|err| AgentError::InternalError(err.to_string()))?;
}
Ok(RunResult {
text: "released".to_string(),
session_id: self.session_id.clone(),
usage: Usage::default(),
turns: 1,
tool_calls: 0,
terminal_cause_kind: None,
structured_output: None,
extraction_error: None,
schema_warnings: None,
skill_diagnostics: None,
})
}
fn cancel(&mut self) {}
fn set_skill_references(&mut self, _refs: Option<Vec<SkillKey>>) {}
fn set_flow_tool_overlay(
&mut self,
_overlay: Option<TurnToolOverlay>,
) -> Result<(), AgentError> {
Ok(())
}
fn hot_swap_llm_identity(
&mut self,
_client: Arc<dyn meerkat_core::AgentLlmClient>,
_identity: SessionLlmIdentity,
_request_policy: meerkat_core::SessionLlmRequestPolicy,
) -> Result<(), AgentError> {
Ok(())
}
fn session_id(&self) -> SessionId {
self.session_id.clone()
}
fn snapshot(&self) -> SessionSnapshot {
SessionSnapshot {
created_at: SystemTime::now(),
updated_at: SystemTime::now(),
message_count: 0,
total_tokens: 0,
usage: Usage::default(),
last_assistant_text: None,
}
}
fn session_clone(&self) -> Result<meerkat_core::Session, SystemContextStateError> {
let mut session = meerkat_core::Session::new();
session
.set_system_context_state(self.system_context_state.snapshot())
.expect("test system context state should serialize");
Ok(session)
}
fn observed_session_tail(&self) -> ObservedSessionTailKind {
ObservedSessionTailKind::Empty
}
fn durable_llm_identity(&self) -> Option<SessionLlmIdentity> {
Some(self.identity.clone())
}
fn apply_runtime_system_context(&mut self, appends: &[PendingSystemContextAppend]) {
let mut session = self
.session_clone()
.expect("test session clone should succeed");
session.append_system_context_blocks(appends);
let state = session.system_context_state().unwrap_or_default();
self.system_context_state
.replace_from_generated_restore(state)
.expect("test system context state should restore");
}
fn turn_state_handle(&self) -> Option<Arc<dyn TurnStateHandle>> {
self.turn_state
.as_ref()
.map(|turn_state| Arc::clone(turn_state) as Arc<dyn TurnStateHandle>)
}
fn system_context_state(&self) -> meerkat_core::SystemContextStateHandle {
self.system_context_state.clone()
}
}
#[derive(Clone)]
struct BoundaryPhaseProbeBuilder {
turn_state: Arc<RuntimeTurnStateHandle>,
}
struct BoundaryPhaseProbeAgent {
session_id: SessionId,
identity: SessionLlmIdentity,
turn_state: Arc<RuntimeTurnStateHandle>,
system_context_state: meerkat_core::SystemContextStateHandle,
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl SessionAgentBuilder for BoundaryPhaseProbeBuilder {
type Agent = BoundaryPhaseProbeAgent;
async fn build_agent(
&self,
req: &CreateSessionRequest,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<Self::Agent, SessionError> {
Ok(BoundaryPhaseProbeAgent {
session_id: SessionId::new(),
identity: test_llm_identity(&req.model),
turn_state: Arc::clone(&self.turn_state),
system_context_state: meerkat_core::SystemContextStateHandle::new(
Default::default(),
)
.expect("default system-context state should restore"),
})
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl SessionAgent for BoundaryPhaseProbeAgent {
async fn run_with_events(
&mut self,
_prompt: ContentInput,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, AgentError> {
Ok(RunResult {
text: "ok".to_string(),
session_id: self.session_id.clone(),
usage: Usage::default(),
turns: 1,
tool_calls: 0,
terminal_cause_kind: None,
structured_output: None,
extraction_error: None,
schema_warnings: None,
skill_diagnostics: None,
})
}
fn cancel(&mut self) {}
fn set_skill_references(&mut self, _refs: Option<Vec<SkillKey>>) {}
fn set_flow_tool_overlay(
&mut self,
_overlay: Option<TurnToolOverlay>,
) -> Result<(), AgentError> {
Ok(())
}
fn hot_swap_llm_identity(
&mut self,
_client: Arc<dyn meerkat_core::AgentLlmClient>,
_identity: SessionLlmIdentity,
_request_policy: meerkat_core::SessionLlmRequestPolicy,
) -> Result<(), AgentError> {
Ok(())
}
fn session_id(&self) -> SessionId {
self.session_id.clone()
}
fn snapshot(&self) -> SessionSnapshot {
SessionSnapshot {
created_at: SystemTime::now(),
updated_at: SystemTime::now(),
message_count: 0,
total_tokens: 0,
usage: Usage::default(),
last_assistant_text: None,
}
}
fn session_clone(&self) -> Result<meerkat_core::Session, SystemContextStateError> {
Ok(meerkat_core::Session::with_id(self.session_id.clone()))
}
fn observed_session_tail(&self) -> ObservedSessionTailKind {
ObservedSessionTailKind::Empty
}
fn durable_llm_identity(&self) -> Option<SessionLlmIdentity> {
Some(self.identity.clone())
}
fn turn_state_handle(&self) -> Option<Arc<dyn TurnStateHandle>> {
let handle: Arc<dyn TurnStateHandle> = self.turn_state.clone();
Some(handle)
}
fn apply_runtime_system_context(&mut self, appends: &[PendingSystemContextAppend]) {
for append in appends {
self.system_context_state
.stage_append_with_snapshot(
&meerkat_core::service::AppendSystemContextRequest {
content: meerkat_core::lifecycle::run_primitive::CoreRenderable::text(
append.content.render_text(),
),
source: append.source.clone(),
idempotency_key: append.idempotency_key.clone(),
source_kind: append.source_kind,
peer_response_terminal: None,
},
append.accepted_at,
)
.expect("test runtime system context should stage");
}
}
fn system_context_state(&self) -> meerkat_core::SystemContextStateHandle {
self.system_context_state.clone()
}
}
#[tokio::test]
async fn active_turn_boundary_probe_uses_turn_phase_not_running_latch() {
let turn_state = Arc::new(RuntimeTurnStateHandle::ephemeral());
let service = EphemeralSessionService::new(
BoundaryPhaseProbeBuilder {
turn_state: Arc::clone(&turn_state),
},
1,
);
let created = service
.create_session(CreateSessionRequest {
model: "phase-probe".to_string(),
prompt: ContentInput::Text("hello".to_string()),
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn: InitialTurnPolicy::Defer,
deferred_prompt_policy: DeferredPromptPolicy::Discard,
build: None,
labels: None,
})
.await
.expect("create probe session");
assert_eq!(
service
.active_turn_system_context_boundary_available(&created.session_id)
.await
.expect("boundary probe should succeed"),
Some(false),
"no active run means no active-turn boundary"
);
let run_id = RunId::new();
turn_state
.start_conversation_run(
run_id.clone(),
TurnPrimitiveKind::ConversationTurn,
ContentShape::Conversation,
false,
false,
0,
)
.expect("start conversation run");
assert_eq!(
service
.active_turn_system_context_boundary_available(&created.session_id)
.await
.expect("boundary probe should succeed"),
Some(true),
"pre-LLM applying primitive can still consume staged context"
);
turn_state
.primitive_applied(run_id.clone())
.expect("enter calling LLM");
assert_eq!(
service
.active_turn_system_context_boundary_available(&created.session_id)
.await
.expect("boundary probe should succeed"),
Some(false),
"an in-flight model request has already built its context"
);
turn_state
.llm_returned_tool_calls(run_id.clone(), 1)
.expect("enter waiting for ops");
assert_eq!(
service
.active_turn_system_context_boundary_available(&created.session_id)
.await
.expect("boundary probe should succeed"),
Some(true),
"tool execution phase has a deterministic post-tool model boundary"
);
turn_state
.tool_calls_resolved(run_id.clone())
.expect("drain boundary");
assert_eq!(
service
.active_turn_system_context_boundary_available(&created.session_id)
.await
.expect("boundary probe should succeed"),
Some(false),
"draining boundary is not enough to prove another LLM request remains"
);
}
#[tokio::test]
async fn materialization_status_is_owned_by_registry_across_lifecycle() {
let turn_state = Arc::new(RuntimeTurnStateHandle::ephemeral());
let service = EphemeralSessionService::new(
BoundaryPhaseProbeBuilder {
turn_state: Arc::clone(&turn_state),
},
1,
);
let created = service
.create_session(CreateSessionRequest {
model: "phase-probe".to_string(),
prompt: ContentInput::Text("hello".to_string()),
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn: InitialTurnPolicy::Defer,
deferred_prompt_policy: DeferredPromptPolicy::Discard,
build: None,
labels: None,
})
.await
.expect("create deferred session");
assert_eq!(
service.materialization_status(&created.session_id),
Some(MaterializationStatus::Staged),
"deferred session is recorded Staged by the registry"
);
let second = service
.create_session(CreateSessionRequest {
model: "phase-probe".to_string(),
prompt: ContentInput::Text("again".to_string()),
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn: InitialTurnPolicy::Defer,
deferred_prompt_policy: DeferredPromptPolicy::Discard,
build: None,
labels: None,
})
.await;
let err = second.expect_err("capacity gate must reject the second session");
assert!(
format!("{err}").contains("Max sessions reached"),
"expected typed capacity-exhaustion error, got: {err}"
);
service
.archive(&created.session_id)
.await
.expect("archive deferred session");
assert_eq!(
service.materialization_status(&created.session_id),
None,
"archive clears the registry status with the handle"
);
service
.ensure_active_capacity_available()
.expect("capacity freed after archive");
}
#[tokio::test]
async fn eager_initial_turn_forwards_full_runtime_metadata_carrier() {
let observed_skill_references = Arc::new(Mutex::new(Vec::new()));
let observed_context_texts = Arc::new(Mutex::new(Vec::new()));
let run_context_counts = Arc::new(Mutex::new(Vec::new()));
let service = EphemeralSessionService::new(
MetadataProbeBuilder {
observed_skill_references: Arc::clone(&observed_skill_references),
observed_context_texts,
run_context_counts,
fail_flow_overlay_set: false,
session_context_handle: None,
},
1,
);
let skill = SkillKey::builtin(SkillName::parse("metadata-probe").expect("valid skill"));
service
.create_session(CreateSessionRequest {
model: "metadata-probe-model".to_string(),
prompt: ContentInput::Text("hello".to_string()),
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn: InitialTurnPolicy::RunImmediately,
deferred_prompt_policy: DeferredPromptPolicy::Discard,
build: Some(SessionBuildOptions {
initial_turn_metadata: Some(RuntimeTurnMetadata {
execution_kind: Some(RuntimeExecutionKind::ContentTurn),
skill_references: Some(vec![skill.clone()]),
..Default::default()
}),
..Default::default()
}),
labels: None,
})
.await
.expect("eager first turn should run");
assert_eq!(
*observed_skill_references
.lock()
.expect("observed skill references lock poisoned"),
vec![Some(vec![skill])],
"eager first turn must forward the full runtime metadata carrier"
);
}
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
#[tokio::test]
async fn durable_snapshot_sync_preserves_live_deferred_turn_handle_without_authority_state() {
let service = EphemeralSessionService::new(
MetadataProbeBuilder {
observed_skill_references: Arc::new(Mutex::new(Vec::new())),
observed_context_texts: Arc::new(Mutex::new(Vec::new())),
run_context_counts: Arc::new(Mutex::new(Vec::new())),
fail_flow_overlay_set: false,
session_context_handle: None,
},
1,
);
let created = service
.create_session(CreateSessionRequest {
model: "metadata-probe-model".to_string(),
prompt: ContentInput::Text("staged".to_string()),
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn: InitialTurnPolicy::Defer,
deferred_prompt_policy: DeferredPromptPolicy::Stage,
build: None,
labels: None,
})
.await
.expect("deferred session should create");
let deferred_state = service
.deferred_turn_state(&created.session_id)
.await
.expect("deferred-turn state handle should exist");
assert_eq!(
lock_deferred_turn_state(&deferred_state).first_turn_phase(),
DeferredFirstTurnPhase::Pending
);
let durable = meerkat_core::Session::with_id(created.session_id.clone());
service
.sync_session_from_durable_snapshot(&created.session_id, durable)
.await
.expect("durable sync should succeed");
let guard = lock_deferred_turn_state(&deferred_state);
assert_eq!(guard.first_turn_phase(), DeferredFirstTurnPhase::Pending);
assert!(
guard.pending_initial_prompt().is_some(),
"missing durable authority state must not replace staged live deferred prompt"
);
}
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
#[tokio::test]
async fn durable_snapshot_sync_updates_live_deferred_turn_handle_from_authority_state() {
let service = EphemeralSessionService::new(
MetadataProbeBuilder {
observed_skill_references: Arc::new(Mutex::new(Vec::new())),
observed_context_texts: Arc::new(Mutex::new(Vec::new())),
run_context_counts: Arc::new(Mutex::new(Vec::new())),
fail_flow_overlay_set: false,
session_context_handle: None,
},
1,
);
let created = service
.create_session(CreateSessionRequest {
model: "metadata-probe-model".to_string(),
prompt: ContentInput::Text("staged".to_string()),
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn: InitialTurnPolicy::Defer,
deferred_prompt_policy: DeferredPromptPolicy::Stage,
build: None,
labels: None,
})
.await
.expect("deferred session should create");
let deferred_state = service
.deferred_turn_state(&created.session_id)
.await
.expect("deferred-turn state handle should exist");
assert_eq!(
lock_deferred_turn_state(&deferred_state).first_turn_phase(),
DeferredFirstTurnPhase::Pending
);
let mut durable = meerkat_core::Session::with_id(created.session_id.clone());
durable
.set_deferred_turn_state(SessionDeferredTurnState::default())
.expect("generated deferred-turn authority should authorize default state");
service
.sync_session_from_durable_snapshot(&created.session_id, durable)
.await
.expect("durable sync should succeed");
let guard = lock_deferred_turn_state(&deferred_state);
assert_eq!(guard.first_turn_phase(), DeferredFirstTurnPhase::Inactive);
assert!(
guard.pending_initial_prompt().is_none(),
"explicit generated durable state should replace staged live deferred prompt"
);
}
#[tokio::test]
async fn start_turn_runtime_metadata_is_sole_skill_carrier() {
let observed_skill_references = Arc::new(Mutex::new(Vec::new()));
let observed_context_texts = Arc::new(Mutex::new(Vec::new()));
let run_context_counts = Arc::new(Mutex::new(Vec::new()));
let service = EphemeralSessionService::new(
MetadataProbeBuilder {
observed_skill_references: Arc::clone(&observed_skill_references),
observed_context_texts,
run_context_counts,
fail_flow_overlay_set: false,
session_context_handle: None,
},
1,
);
let canonical =
SkillKey::builtin(SkillName::parse("runtime-canonical-skill").expect("valid skill"));
let result = service
.create_session(CreateSessionRequest {
model: "metadata-probe-model".to_string(),
prompt: ContentInput::Text("defer".to_string()),
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn: InitialTurnPolicy::Defer,
deferred_prompt_policy: DeferredPromptPolicy::Discard,
build: Some(SessionBuildOptions::default()),
labels: None,
})
.await
.expect("deferred session should create");
service
.start_turn(
&result.session_id,
StartTurnRequest {
prompt: ContentInput::Text("go".to_string()),
system_prompt: None,
event_tx: None,
runtime: meerkat_core::service::StartTurnRuntimeSemantics::new(
meerkat_core::types::HandlingMode::Queue,
None,
Vec::new(),
Some(RuntimeTurnMetadata {
execution_kind: Some(RuntimeExecutionKind::ContentTurn),
skill_references: Some(vec![canonical.clone()]),
..Default::default()
}),
),
},
)
.await
.expect("turn should run with canonical runtime metadata");
assert_eq!(
*observed_skill_references
.lock()
.expect("observed skill references lock poisoned"),
vec![Some(vec![canonical])],
"canonical RuntimeTurnMetadata must be the only skill carrier once present"
);
}
#[tokio::test]
async fn start_turn_applies_pre_turn_context_before_run() {
let observed_skill_references = Arc::new(Mutex::new(Vec::new()));
let observed_context_texts = Arc::new(Mutex::new(Vec::new()));
let run_context_counts = Arc::new(Mutex::new(Vec::new()));
let service = EphemeralSessionService::new(
MetadataProbeBuilder {
observed_skill_references,
observed_context_texts: Arc::clone(&observed_context_texts),
run_context_counts: Arc::clone(&run_context_counts),
fail_flow_overlay_set: false,
session_context_handle: None,
},
1,
);
let result = service
.create_session(CreateSessionRequest {
model: "metadata-probe-model".to_string(),
prompt: ContentInput::Text("defer".to_string()),
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn: InitialTurnPolicy::Defer,
deferred_prompt_policy: DeferredPromptPolicy::Discard,
build: Some(SessionBuildOptions::default()),
labels: None,
})
.await
.expect("deferred session should create");
service
.start_turn(
&result.session_id,
StartTurnRequest {
prompt: ContentInput::Text("reaction".to_string()),
system_prompt: None,
event_tx: None,
runtime: meerkat_core::service::StartTurnRuntimeSemantics::new(
meerkat_core::types::HandlingMode::Queue,
None,
vec![PendingSystemContextAppend {
content: meerkat_core::lifecycle::run_primitive::CoreRenderable::text(
"terminal peer context".to_string(),
),
source: Some("peer_response_terminal:test:req".to_string()),
idempotency_key: Some("peer_response_terminal:test:req".to_string()),
source_kind: meerkat_core::session::SystemContextSource::Normal,
accepted_at: meerkat_core::time_compat::SystemTime::now(),
peer_response_terminal: None,
}],
Some(RuntimeTurnMetadata {
execution_kind: Some(RuntimeExecutionKind::ContentTurn),
..Default::default()
}),
),
},
)
.await
.expect("pre-turn context turn should run");
assert_eq!(
*observed_context_texts
.lock()
.expect("observed context texts lock poisoned"),
vec!["terminal peer context".to_string()]
);
assert_eq!(
*run_context_counts
.lock()
.expect("run context counts lock poisoned"),
vec![1],
"pre-turn context must be applied before the agent run starts"
);
}
#[tokio::test]
async fn committed_runtime_context_events_advance_session_context_after_apply() {
let observed_skill_references = Arc::new(Mutex::new(Vec::new()));
let observed_context_texts = Arc::new(Mutex::new(Vec::new()));
let run_context_counts = Arc::new(Mutex::new(Vec::new()));
let session_context_handle = Arc::new(RecordingSessionContextHandle::default());
let service = EphemeralSessionService::new(
MetadataProbeBuilder {
observed_skill_references,
observed_context_texts,
run_context_counts,
fail_flow_overlay_set: false,
session_context_handle: Some(Arc::clone(&session_context_handle)),
},
1,
);
let result = service
.create_session(CreateSessionRequest {
model: "metadata-probe-model".to_string(),
prompt: ContentInput::Text("defer".to_string()),
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn: InitialTurnPolicy::Defer,
deferred_prompt_policy: DeferredPromptPolicy::Discard,
build: Some(SessionBuildOptions::default()),
labels: None,
})
.await
.expect("deferred session should create");
let appends = vec![PendingSystemContextAppend {
content: meerkat_core::lifecycle::run_primitive::CoreRenderable::text(
"Peer terminal response from test\nRequest ID: req\nStatus: completed\ntoken birch seventeen".to_string()
),
source: Some("peer_response_terminal:test:req".to_string()),
idempotency_key: Some("peer_response_terminal:test:req".to_string()),
source_kind: meerkat_core::session::SystemContextSource::Normal,
accepted_at: meerkat_core::time_compat::SystemTime::now(),
peer_response_terminal: None,
}];
let baseline_ticks = session_context_handle.ticks().len();
service
.apply_runtime_system_context_for_turn(&result.session_id, appends.clone())
.await
.expect("pre-turn context apply should succeed");
assert_eq!(
session_context_handle.ticks().len(),
baseline_ticks,
"pre-commit context apply must not advance realtime projection freshness"
);
let precommit_session = service
.export_session(&result.session_id)
.await
.expect("pre-commit context session should export");
let stale_runtime_context_ms =
summary_updated_at_ms(precommit_session.updated_at()).saturating_add(60_000);
session_context_handle
.context_advanced(stale_runtime_context_ms)
.expect("synthetic later watermark should apply");
service
.publish_runtime_system_context_events(&result.session_id, appends)
.await
.expect("post-commit context event publish should succeed");
let ticks = session_context_handle.ticks();
assert!(
ticks.len() > baseline_ticks + 1,
"committed runtime context event publication must advance realtime projection freshness even when the live-session updated_at is stale"
);
assert!(
ticks.last().copied().unwrap_or_default() > stale_runtime_context_ms,
"post-commit runtime context tick must move past the current projection watermark"
);
}
#[tokio::test]
async fn active_turn_context_staging_does_not_wait_for_session_task_mailbox() {
let started = Arc::new(tokio::sync::Notify::new());
let release = Arc::new(tokio::sync::Notify::new());
let turn_state = Arc::new(RuntimeTurnStateHandle::ephemeral());
let service = Arc::new(EphemeralSessionService::new(
BlockingBoundaryBuilder {
started: Arc::clone(&started),
release: Arc::clone(&release),
turn_state: Some(Arc::clone(&turn_state)),
},
1,
));
let result = service
.create_session(CreateSessionRequest {
model: "blocking-boundary-model".to_string(),
prompt: ContentInput::Text("defer".to_string()),
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn: InitialTurnPolicy::Defer,
deferred_prompt_policy: DeferredPromptPolicy::Discard,
build: Some(SessionBuildOptions::default()),
labels: None,
})
.await
.expect("deferred blocking session should create");
let run_service = Arc::clone(&service);
let run_session_id = result.session_id.clone();
let run = tokio::spawn(async move {
run_service
.start_turn(
&run_session_id,
StartTurnRequest {
prompt: ContentInput::Text("hold the turn open".to_string()),
system_prompt: None,
event_tx: None,
runtime: meerkat_core::service::StartTurnRuntimeSemantics::new(
meerkat_core::types::HandlingMode::Queue,
None,
Vec::new(),
Some(RuntimeTurnMetadata {
execution_kind: Some(RuntimeExecutionKind::ContentTurn),
..Default::default()
}),
),
},
)
.await
});
tokio::time::timeout(std::time::Duration::from_secs(1), started.notified())
.await
.expect("turn should enter the blocking agent");
let active_run_id = turn_state
.snapshot()
.active_run_id
.expect("blocking turn should expose active run id");
let appends = vec![PendingSystemContextAppend {
content: meerkat_core::lifecycle::run_primitive::CoreRenderable::text(
"active-turn steer context".to_string(),
),
source: Some("console:steer:test".to_string()),
idempotency_key: Some("console:steer:test".to_string()),
source_kind: meerkat_core::session::SystemContextSource::Normal,
accepted_at: meerkat_core::time_compat::SystemTime::now(),
peer_response_terminal: None,
}];
tokio::time::timeout(
std::time::Duration::from_millis(200),
service.stage_runtime_system_context_for_active_turn(
&result.session_id,
&active_run_id,
appends,
),
)
.await
.expect("active-turn staging must not wait for the busy session task")
.expect("active-turn staging should succeed");
let state = service
.system_context_state(&result.session_id)
.await
.expect("session state should exist");
let pending = state.snapshot().pending().to_vec();
assert_eq!(pending.len(), 1);
assert_eq!(
pending[0].content.render_text(),
"active-turn steer context"
);
release.notify_waiters();
run.await
.expect("start_turn task should join")
.expect("blocked turn should complete after release");
let after = state.snapshot();
assert!(
after.pending().is_empty(),
"active-turn context that missed the run boundary must not leak into the next run"
);
assert!(
after.applied().is_empty(),
"missed active-turn context was never applied at an LLM boundary"
);
assert!(
after.seen().is_empty(),
"discarded active-turn context should not reserve idempotency keys"
);
}
#[tokio::test]
async fn active_turn_context_staging_revalidates_after_positive_probe() {
let turn_state = Arc::new(RuntimeTurnStateHandle::ephemeral());
let service = EphemeralSessionService::new(
BoundaryPhaseProbeBuilder {
turn_state: Arc::clone(&turn_state),
},
1,
);
let result = service
.create_session(CreateSessionRequest {
model: "boundary-phase-probe".to_string(),
prompt: ContentInput::Text("defer".to_string()),
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn: InitialTurnPolicy::Defer,
deferred_prompt_policy: DeferredPromptPolicy::Discard,
build: Some(SessionBuildOptions::default()),
labels: None,
})
.await
.expect("deferred boundary probe session should create");
let run_id = RunId::new();
turn_state
.start_conversation_run(
run_id.clone(),
TurnPrimitiveKind::ConversationTurn,
ContentShape::Conversation,
false,
false,
0,
)
.expect("start active run");
assert_eq!(
service
.active_turn_system_context_boundary_available(&result.session_id)
.await
.expect("boundary probe should succeed"),
Some(true)
);
turn_state
.primitive_applied(run_id.clone())
.expect("advance past boundary");
let err = service
.stage_runtime_system_context_for_active_turn(
&result.session_id,
&run_id,
vec![PendingSystemContextAppend {
content: meerkat_core::lifecycle::run_primitive::CoreRenderable::text(
"STALE_ACTIVE_STEER_SHOULD_NOT_STAGE".to_string(),
),
source: Some("console:steer:stale".to_string()),
idempotency_key: Some("console:steer:stale".to_string()),
source_kind: meerkat_core::session::SystemContextSource::Normal,
accepted_at: meerkat_core::time_compat::SystemTime::now(),
peer_response_terminal: None,
}],
)
.await
.expect_err("staging must revalidate the active turn phase");
assert!(
matches!(err, SessionError::NotRunning { .. }),
"unexpected error: {err}"
);
let state = service
.system_context_state(&result.session_id)
.await
.expect("session state should exist");
let guard = state.snapshot();
assert!(guard.pending().is_empty());
assert!(guard.applied().is_empty());
assert!(guard.active_turn_pending_keys().is_empty());
assert!(
guard.seen().is_empty(),
"discarded stale active-turn context should not reserve idempotency keys"
);
}
#[tokio::test]
async fn active_turn_context_staging_rejects_wrong_run_token() {
let turn_state = Arc::new(RuntimeTurnStateHandle::ephemeral());
let service = EphemeralSessionService::new(
BoundaryPhaseProbeBuilder {
turn_state: Arc::clone(&turn_state),
},
1,
);
let result = service
.create_session(CreateSessionRequest {
model: "boundary-phase-probe".to_string(),
prompt: ContentInput::Text("defer".to_string()),
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn: InitialTurnPolicy::Defer,
deferred_prompt_policy: DeferredPromptPolicy::Discard,
build: Some(SessionBuildOptions::default()),
labels: None,
})
.await
.expect("deferred boundary probe session should create");
let run_id = RunId::new();
turn_state
.start_conversation_run(
run_id,
TurnPrimitiveKind::ConversationTurn,
ContentShape::Conversation,
false,
false,
0,
)
.expect("start active run");
let wrong_run_id = RunId::new();
let err = service
.stage_runtime_system_context_for_active_turn(
&result.session_id,
&wrong_run_id,
vec![PendingSystemContextAppend {
content: meerkat_core::lifecycle::run_primitive::CoreRenderable::text(
"WRONG_RUN_STEER_SHOULD_NOT_STAGE".to_string(),
),
source: Some("console:steer:wrong-run".to_string()),
idempotency_key: Some("console:steer:wrong-run".to_string()),
source_kind: meerkat_core::session::SystemContextSource::Normal,
accepted_at: meerkat_core::time_compat::SystemTime::now(),
peer_response_terminal: None,
}],
)
.await
.expect_err("staging must be bound to the active run token");
assert!(
matches!(err, SessionError::NotRunning { .. }),
"unexpected error: {err}"
);
let state = service
.system_context_state(&result.session_id)
.await
.expect("session state should exist");
let guard = state.snapshot();
assert!(guard.pending().is_empty());
assert!(guard.active_turn_pending_keys().is_empty());
assert!(guard.seen().is_empty());
}
#[tokio::test]
async fn active_turn_context_staging_is_atomic_on_batch_conflict() {
let turn_state = Arc::new(RuntimeTurnStateHandle::ephemeral());
let service = EphemeralSessionService::new(
BoundaryPhaseProbeBuilder {
turn_state: Arc::clone(&turn_state),
},
1,
);
let result = service
.create_session(CreateSessionRequest {
model: "blocking-boundary-model".to_string(),
prompt: ContentInput::Text("defer".to_string()),
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn: InitialTurnPolicy::Defer,
deferred_prompt_policy: DeferredPromptPolicy::Discard,
build: Some(SessionBuildOptions::default()),
labels: None,
})
.await
.expect("deferred blocking session should create");
let run_id = RunId::new();
turn_state
.start_conversation_run(
run_id.clone(),
TurnPrimitiveKind::ConversationTurn,
ContentShape::Conversation,
false,
false,
0,
)
.expect("start active run");
let accepted_at = meerkat_core::time_compat::SystemTime::now();
let err = service
.stage_runtime_system_context_for_active_turn(
&result.session_id,
&run_id,
vec![
PendingSystemContextAppend {
content: meerkat_core::lifecycle::run_primitive::CoreRenderable::text(
"first staged context".to_string(),
),
source: Some("console:steer:test".to_string()),
idempotency_key: Some("console:steer:conflict".to_string()),
source_kind: meerkat_core::session::SystemContextSource::Normal,
accepted_at,
peer_response_terminal: None,
},
PendingSystemContextAppend {
content: meerkat_core::lifecycle::run_primitive::CoreRenderable::text(
"conflicting staged context".to_string(),
),
source: Some("console:steer:test".to_string()),
idempotency_key: Some("console:steer:conflict".to_string()),
source_kind: meerkat_core::session::SystemContextSource::Normal,
accepted_at,
peer_response_terminal: None,
},
],
)
.await
.expect_err("conflicting batch should fail");
assert!(
err.to_string().contains("idempotency conflict"),
"unexpected error: {err}"
);
let state = service
.system_context_state(&result.session_id)
.await
.expect("session state should exist");
let guard = state.snapshot();
assert!(
guard.pending().is_empty(),
"failed active-turn staging batch must not leave partial pending context"
);
assert!(
guard.seen().is_empty(),
"failed active-turn staging batch must not reserve idempotency keys"
);
}
#[tokio::test]
async fn start_turn_does_not_apply_pre_turn_context_when_setup_fails() {
let observed_skill_references = Arc::new(Mutex::new(Vec::new()));
let observed_context_texts = Arc::new(Mutex::new(Vec::new()));
let run_context_counts = Arc::new(Mutex::new(Vec::new()));
let service = EphemeralSessionService::new(
MetadataProbeBuilder {
observed_skill_references,
observed_context_texts: Arc::clone(&observed_context_texts),
run_context_counts: Arc::clone(&run_context_counts),
fail_flow_overlay_set: true,
session_context_handle: None,
},
1,
);
let result = service
.create_session(CreateSessionRequest {
model: "metadata-probe-model".to_string(),
prompt: ContentInput::Text("defer".to_string()),
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn: InitialTurnPolicy::Defer,
deferred_prompt_policy: DeferredPromptPolicy::Discard,
build: Some(SessionBuildOptions::default()),
labels: None,
})
.await
.expect("deferred session should create");
let error = service
.start_turn(
&result.session_id,
StartTurnRequest {
prompt: ContentInput::Text("reaction".to_string()),
system_prompt: None,
event_tx: None,
runtime: meerkat_core::service::StartTurnRuntimeSemantics::new(
meerkat_core::types::HandlingMode::Queue,
Some(TurnToolOverlay {
allowed_tools: Some(vec!["flow_tool".into()]),
blocked_tools: None,
dispatch_context: Default::default(),
}),
vec![PendingSystemContextAppend {
content: meerkat_core::lifecycle::run_primitive::CoreRenderable::text(
"must not leak before setup succeeds".to_string(),
),
source: Some("peer_response_terminal:test:req".to_string()),
idempotency_key: Some("peer_response_terminal:test:req".to_string()),
source_kind: meerkat_core::session::SystemContextSource::Normal,
accepted_at: meerkat_core::time_compat::SystemTime::now(),
peer_response_terminal: None,
}],
Some(RuntimeTurnMetadata {
execution_kind: Some(RuntimeExecutionKind::ContentTurn),
..Default::default()
}),
),
},
)
.await
.expect_err("flow overlay setup should fail");
assert!(
error.to_string().contains("synthetic flow overlay failure"),
"unexpected error: {error}"
);
assert!(
observed_context_texts
.lock()
.expect("observed context texts lock poisoned")
.is_empty(),
"pre-turn context must not be visible when setup fails before run"
);
assert!(
run_context_counts
.lock()
.expect("run context counts lock poisoned")
.is_empty(),
"agent run must not start after setup failure"
);
}
}
#[cfg(test)]
#[allow(clippy::expect_used)]
mod admission_window_tests {
use super::*;
use async_trait::async_trait;
use meerkat_core::service::{
InitialTurnPolicy, SessionBuildOptions, SessionService, StartTurnRequest,
};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
fn test_llm_identity(model: &str) -> SessionLlmIdentity {
SessionLlmIdentity {
model: model.to_string(),
provider: meerkat_core::Provider::OpenAI,
self_hosted_server_id: None,
provider_params: None,
auth_binding: None,
}
}
#[derive(Clone)]
struct AdmissionProbeBuilder {
run_calls: Arc<AtomicUsize>,
cancel_calls: Arc<AtomicUsize>,
cancel_after_boundary_tx: CancelAfterBoundarySender,
turn_admission_for_run: Arc<Mutex<Option<Arc<Mutex<TurnAdmissionSlot>>>>>,
interrupt_before_success: bool,
}
struct AdmissionProbeAgent {
session_id: SessionId,
identity: SessionLlmIdentity,
run_calls: Arc<AtomicUsize>,
cancel_calls: Arc<AtomicUsize>,
cancel_after_boundary_tx: CancelAfterBoundarySender,
turn_admission_for_run: Arc<Mutex<Option<Arc<Mutex<TurnAdmissionSlot>>>>>,
interrupt_before_success: bool,
system_context_state: meerkat_core::SystemContextStateHandle,
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl SessionAgentBuilder for AdmissionProbeBuilder {
type Agent = AdmissionProbeAgent;
async fn build_agent(
&self,
req: &CreateSessionRequest,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<Self::Agent, SessionError> {
Ok(AdmissionProbeAgent {
session_id: SessionId::new(),
identity: test_llm_identity(&req.model),
run_calls: Arc::clone(&self.run_calls),
cancel_calls: Arc::clone(&self.cancel_calls),
cancel_after_boundary_tx: self.cancel_after_boundary_tx.clone(),
turn_admission_for_run: Arc::clone(&self.turn_admission_for_run),
interrupt_before_success: self.interrupt_before_success,
system_context_state: meerkat_core::SystemContextStateHandle::new(
SessionSystemContextState::default(),
)
.expect("default system-context state should restore"),
})
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl SessionAgent for AdmissionProbeAgent {
async fn run_with_events(
&mut self,
_prompt: ContentInput,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, AgentError> {
if self.interrupt_before_success {
let turn_admission = self
.turn_admission_for_run
.lock()
.expect("turn admission probe lock poisoned")
.clone()
.expect("turn admission probe installed");
let mut slot = lock_turn_admission(&turn_admission);
slot.request_interrupt()
.expect("running turn should accept interrupt probe");
}
self.run_calls.fetch_add(1, Ordering::SeqCst);
Ok(RunResult {
text: "ran".to_string(),
session_id: self.session_id.clone(),
usage: Usage::default(),
turns: 1,
tool_calls: 0,
terminal_cause_kind: None,
structured_output: None,
extraction_error: None,
schema_warnings: None,
skill_diagnostics: None,
})
}
fn set_skill_references(&mut self, _refs: Option<Vec<meerkat_core::skills::SkillKey>>) {}
fn set_flow_tool_overlay(
&mut self,
_overlay: Option<TurnToolOverlay>,
) -> Result<(), AgentError> {
Ok(())
}
fn hot_swap_llm_identity(
&mut self,
_client: Arc<dyn meerkat_core::AgentLlmClient>,
_identity: SessionLlmIdentity,
_request_policy: meerkat_core::SessionLlmRequestPolicy,
) -> Result<(), AgentError> {
Ok(())
}
fn cancel(&mut self) {
self.cancel_calls.fetch_add(1, Ordering::SeqCst);
}
fn cancel_after_boundary_handle(&self) -> Option<CancelAfterBoundarySender> {
Some(self.cancel_after_boundary_tx.clone())
}
fn session_id(&self) -> SessionId {
self.session_id.clone()
}
fn snapshot(&self) -> SessionSnapshot {
SessionSnapshot {
created_at: SystemTime::now(),
updated_at: SystemTime::now(),
message_count: 0,
total_tokens: 0,
usage: Usage::default(),
last_assistant_text: None,
}
}
fn session_clone(&self) -> Result<meerkat_core::Session, SystemContextStateError> {
Ok(meerkat_core::Session::new())
}
fn observed_session_tail(&self) -> ObservedSessionTailKind {
ObservedSessionTailKind::Empty
}
fn durable_llm_identity(&self) -> Option<SessionLlmIdentity> {
Some(self.identity.clone())
}
fn apply_runtime_system_context(&mut self, _appends: &[PendingSystemContextAppend]) {}
fn system_context_state(&self) -> meerkat_core::SystemContextStateHandle {
self.system_context_state.clone()
}
}
fn create_request() -> CreateSessionRequest {
CreateSessionRequest {
model: "admission-window-test".to_string(),
prompt: ContentInput::Text("defer".to_string()),
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn: InitialTurnPolicy::Defer,
deferred_prompt_policy: DeferredPromptPolicy::Discard,
build: Some(SessionBuildOptions::default()),
labels: None,
}
}
fn start_turn_request() -> StartTurnRequest {
StartTurnRequest {
prompt: ContentInput::Text("go".to_string()),
system_prompt: None,
event_tx: None,
runtime: meerkat_core::service::StartTurnRuntimeSemantics::default(),
}
}
fn probe_builder(
run_calls: Arc<AtomicUsize>,
cancel_calls: Arc<AtomicUsize>,
cancel_after_boundary_tx: CancelAfterBoundarySender,
) -> AdmissionProbeBuilder {
AdmissionProbeBuilder {
run_calls,
cancel_calls,
cancel_after_boundary_tx,
turn_admission_for_run: Arc::new(Mutex::new(None)),
interrupt_before_success: false,
}
}
async fn create_admitted_session(
service: &EphemeralSessionService<AdmissionProbeBuilder>,
) -> (SessionId, mpsc::Sender<SessionCommand>) {
let result = service
.create_session(create_request())
.await
.expect("create deferred session");
let command_tx = {
let sessions = service.sessions.read().await;
let handle = sessions.get(&result.session_id).expect("session handle");
EphemeralSessionService::<AdmissionProbeBuilder>::request_start_turn(
&result.session_id,
handle,
)
.expect("admit turn before command delivery");
handle.command_tx.clone()
};
(result.session_id, command_tx)
}
async fn deliver_start_turn(
command_tx: mpsc::Sender<SessionCommand>,
) -> Result<RunResult, AgentError> {
let (result_tx, result_rx) = oneshot::channel();
let request = start_turn_request();
command_tx
.send(SessionCommand::StartTurn {
prompt: request.prompt,
runtime: Box::new(request.runtime),
event_tx: request.event_tx,
result_tx,
active_admission: None,
})
.await
.expect("send start turn");
result_rx.await.expect("receive start turn result")
}
#[tokio::test]
async fn hard_interrupt_during_admission_cancels_before_agent_poll() {
let run_calls = Arc::new(AtomicUsize::new(0));
let cancel_calls = Arc::new(AtomicUsize::new(0));
let (cancel_after_boundary_tx, _cancel_after_boundary_rx) =
tokio::sync::mpsc::unbounded_channel();
let service = EphemeralSessionService::new(
probe_builder(
Arc::clone(&run_calls),
Arc::clone(&cancel_calls),
cancel_after_boundary_tx,
),
1,
);
let (session_id, command_tx) = create_admitted_session(&service).await;
service
.interrupt(&session_id)
.await
.expect("admitted turn accepts hard interrupt");
let result = deliver_start_turn(command_tx).await;
assert!(matches!(result, Err(AgentError::Cancelled)));
assert_eq!(run_calls.load(Ordering::SeqCst), 0);
assert_eq!(cancel_calls.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn boundary_cancel_during_admission_delivers_command_to_agent() {
let run_calls = Arc::new(AtomicUsize::new(0));
let (cancel_after_boundary_tx, mut cancel_after_boundary_rx) =
tokio::sync::mpsc::unbounded_channel();
let service = EphemeralSessionService::new(
probe_builder(
Arc::clone(&run_calls),
Arc::new(AtomicUsize::new(0)),
cancel_after_boundary_tx,
),
1,
);
let (session_id, command_tx) = create_admitted_session(&service).await;
service
.cancel_after_boundary(&session_id)
.await
.expect("admitted turn accepts boundary cancel");
assert!(matches!(
cancel_after_boundary_rx.try_recv(),
Ok(CancelAfterBoundaryCommand)
));
let result = deliver_start_turn(command_tx)
.await
.expect("start turn should run cooperatively");
assert_eq!(result.text, "ran");
assert_eq!(run_calls.load(Ordering::SeqCst), 1);
assert!(matches!(
cancel_after_boundary_rx.try_recv(),
Err(tokio::sync::mpsc::error::TryRecvError::Empty)
));
}
#[tokio::test]
async fn hard_interrupt_pending_when_run_result_is_ready_wins_over_success() {
let run_calls = Arc::new(AtomicUsize::new(0));
let cancel_calls = Arc::new(AtomicUsize::new(0));
let turn_admission_for_run = Arc::new(Mutex::new(None));
let (cancel_after_boundary_tx, _cancel_after_boundary_rx) =
tokio::sync::mpsc::unbounded_channel();
let mut builder = probe_builder(
Arc::clone(&run_calls),
Arc::clone(&cancel_calls),
cancel_after_boundary_tx,
);
builder.turn_admission_for_run = Arc::clone(&turn_admission_for_run);
builder.interrupt_before_success = true;
let service = EphemeralSessionService::new(builder, 1);
let result = service
.create_session(create_request())
.await
.expect("create deferred session");
{
let sessions = service.sessions.read().await;
let handle = sessions.get(&result.session_id).expect("session handle");
*turn_admission_for_run
.lock()
.expect("turn admission probe lock poisoned") =
Some(Arc::clone(&handle.turn_admission));
}
let result = service
.start_turn(&result.session_id, start_turn_request())
.await;
assert!(matches!(
result,
Err(SessionError::Agent(AgentError::Cancelled))
));
assert_eq!(run_calls.load(Ordering::SeqCst), 1);
assert_eq!(cancel_calls.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn boundary_cancel_on_aborted_admission_delivers_command_once() {
let run_calls = Arc::new(AtomicUsize::new(0));
let (cancel_after_boundary_tx, mut cancel_after_boundary_rx) =
tokio::sync::mpsc::unbounded_channel();
let service = EphemeralSessionService::new(
probe_builder(
Arc::clone(&run_calls),
Arc::new(AtomicUsize::new(0)),
cancel_after_boundary_tx,
),
1,
);
let (session_id, _command_tx) = create_admitted_session(&service).await;
service
.cancel_after_boundary(&session_id)
.await
.expect("admitted turn accepts boundary cancel");
assert!(matches!(
cancel_after_boundary_rx.try_recv(),
Ok(CancelAfterBoundaryCommand)
));
{
let sessions = service.sessions.read().await;
let handle = sessions.get(&session_id).expect("session handle");
EphemeralSessionService::<AdmissionProbeBuilder>::try_abort_admitted_turn(handle);
}
assert!(matches!(
cancel_after_boundary_rx.try_recv(),
Err(tokio::sync::mpsc::error::TryRecvError::Empty)
));
let result = service
.start_turn(&session_id, start_turn_request())
.await
.expect("next turn should run");
assert_eq!(result.text, "ran");
assert_eq!(run_calls.load(Ordering::SeqCst), 1);
}
}
#[cfg(test)]
#[allow(clippy::expect_used)]
mod inline_video_admission_tests {
use super::*;
use async_trait::async_trait;
use meerkat_core::Provider;
use meerkat_core::service::{
DeferredPromptPolicy, InitialTurnPolicy, SessionBuildOptions, SessionService,
StartTurnRequest,
};
use meerkat_core::types::{ContentBlock, VideoData};
use std::sync::{Arc, Mutex};
fn identity(provider: Provider, model: &str) -> SessionLlmIdentity {
SessionLlmIdentity {
model: model.to_string(),
provider,
self_hosted_server_id: None,
provider_params: None,
auth_binding: None,
}
}
fn inline_video_prompt() -> ContentInput {
ContentInput::Blocks(vec![
ContentBlock::Text {
text: "describe this".to_string(),
},
ContentBlock::Video {
media_type: "video/mp4".to_string(),
duration_ms: 1_000,
data: VideoData::Inline {
data: "AAAA".to_string(),
},
},
])
}
struct BuilderIdentityProbe {
identity: Option<SessionLlmIdentity>,
validated_identities: Arc<Mutex<Vec<SessionLlmIdentity>>>,
}
struct BuilderIdentityAgent {
session_id: SessionId,
identity: Option<SessionLlmIdentity>,
system_context_state: meerkat_core::SystemContextStateHandle,
}
struct NoopAgentLlmClient {
model: String,
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl meerkat_core::AgentLlmClient for NoopAgentLlmClient {
async fn stream_response(
&self,
_messages: &[meerkat_core::Message],
_tools: &[Arc<meerkat_core::ToolDef>],
_max_tokens: u32,
_temperature: Option<f32>,
_provider_params: Option<
&meerkat_core::lifecycle::run_primitive::ProviderParamsOverride,
>,
) -> Result<meerkat_core::LlmStreamResult, AgentError> {
Err(AgentError::ConfigError(
"noop test client should not be called".to_string(),
))
}
fn provider(&self) -> meerkat_core::Provider {
meerkat_core::Provider::Other
}
fn model(&self) -> &str {
&self.model
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl SessionAgentBuilder for BuilderIdentityProbe {
type Agent = BuilderIdentityAgent;
async fn model_supports_inline_video(&self, identity: &SessionLlmIdentity) -> Option<bool> {
self.validated_identities
.lock()
.expect("validated identities lock poisoned")
.push(identity.clone());
Some(
self.identity
.as_ref()
.is_some_and(|expected| identity == expected),
)
}
async fn build_agent(
&self,
_req: &CreateSessionRequest,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<Self::Agent, SessionError> {
Ok(BuilderIdentityAgent {
session_id: SessionId::new(),
identity: self.identity.clone(),
system_context_state: meerkat_core::SystemContextStateHandle::new(
Default::default(),
)
.expect("default system-context state should restore"),
})
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl SessionAgent for BuilderIdentityAgent {
async fn run_with_events(
&mut self,
_prompt: ContentInput,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, AgentError> {
Ok(RunResult {
text: "ok".to_string(),
session_id: self.session_id.clone(),
usage: Usage::default(),
turns: 1,
tool_calls: 0,
terminal_cause_kind: None,
structured_output: None,
extraction_error: None,
schema_warnings: None,
skill_diagnostics: None,
})
}
fn set_skill_references(&mut self, _refs: Option<Vec<meerkat_core::skills::SkillKey>>) {}
fn set_flow_tool_overlay(
&mut self,
_overlay: Option<TurnToolOverlay>,
) -> Result<(), AgentError> {
Ok(())
}
fn hot_swap_llm_identity(
&mut self,
_client: Arc<dyn meerkat_core::AgentLlmClient>,
identity: SessionLlmIdentity,
_request_policy: meerkat_core::SessionLlmRequestPolicy,
) -> Result<(), AgentError> {
self.identity = Some(identity);
Ok(())
}
fn cancel(&mut self) {}
fn session_id(&self) -> SessionId {
self.session_id.clone()
}
fn snapshot(&self) -> SessionSnapshot {
SessionSnapshot {
created_at: SystemTime::now(),
updated_at: SystemTime::now(),
message_count: 0,
total_tokens: 0,
usage: Usage::default(),
last_assistant_text: None,
}
}
fn session_clone(&self) -> Result<meerkat_core::Session, SystemContextStateError> {
Ok(meerkat_core::Session::new())
}
fn observed_session_tail(&self) -> ObservedSessionTailKind {
ObservedSessionTailKind::Empty
}
fn durable_llm_identity(&self) -> Option<SessionLlmIdentity> {
self.identity.clone()
}
fn apply_runtime_system_context(&mut self, _appends: &[PendingSystemContextAppend]) {}
fn system_context_state(&self) -> meerkat_core::SystemContextStateHandle {
self.system_context_state.clone()
}
}
fn create_request(
prompt: ContentInput,
initial_turn: InitialTurnPolicy,
) -> CreateSessionRequest {
CreateSessionRequest {
model: "providerless-video-alias".to_string(),
prompt,
system_prompt: meerkat_core::SystemPromptOverride::Inherit,
max_tokens: None,
event_tx: None,
initial_turn,
deferred_prompt_policy: DeferredPromptPolicy::Discard,
build: Some(SessionBuildOptions::default()),
labels: None,
}
}
fn start_turn_request(prompt: ContentInput) -> StartTurnRequest {
StartTurnRequest {
prompt,
system_prompt: None,
event_tx: None,
runtime: meerkat_core::service::StartTurnRuntimeSemantics::default(),
}
}
#[test]
fn provider_gemini_capability_false_rejects_inline_video() {
let result = validate_prompt_video_input_against_capability(
&inline_video_prompt(),
&identity(Provider::Gemini, "gemini-3.5-flash"),
false,
);
let message = match result {
Err(SessionError::Agent(AgentError::ConfigError(message))) => Some(message),
_ => None,
};
assert!(
message
.as_deref()
.is_some_and(|message| message.contains("not supported by model"))
);
}
#[test]
fn provider_not_gemini_capability_true_accepts_inline_video() {
let result = validate_prompt_video_input_against_capability(
&inline_video_prompt(),
&identity(Provider::OpenAI, "openai-video-capable-test-model"),
true,
);
assert!(result.is_ok());
}
#[tokio::test]
async fn create_session_validates_initial_video_against_builder_identity() {
let durable_identity = identity(Provider::Gemini, "providerless-video-alias");
let validated_identities = Arc::new(Mutex::new(Vec::new()));
let service = EphemeralSessionService::new(
BuilderIdentityProbe {
identity: Some(durable_identity.clone()),
validated_identities: Arc::clone(&validated_identities),
},
1,
);
let result = service
.create_session(create_request(
inline_video_prompt(),
InitialTurnPolicy::Defer,
))
.await
.expect("builder-owned Gemini identity should allow inline video");
let live_identity = service
.live_session_llm_identity(&result.session_id)
.await
.expect("live identity");
assert_eq!(live_identity, durable_identity);
assert_eq!(
*validated_identities
.lock()
.expect("validated identities lock poisoned"),
vec![durable_identity]
);
}
#[tokio::test]
async fn create_session_rejects_without_builder_identity() {
let validated_identities = Arc::new(Mutex::new(Vec::new()));
let service = EphemeralSessionService::new(
BuilderIdentityProbe {
identity: None,
validated_identities: Arc::clone(&validated_identities),
},
1,
);
let result = service
.create_session(create_request(
ContentInput::Text("defer".to_string()),
InitialTurnPolicy::Defer,
))
.await;
let message = match result {
Err(SessionError::Agent(AgentError::ConfigError(message))) => Some(message),
_ => None,
};
assert!(
message
.as_deref()
.is_some_and(|message| message.contains("durable LLM identity"))
);
assert!(
validated_identities
.lock()
.expect("validated identities lock poisoned")
.is_empty()
);
}
#[tokio::test]
async fn start_turn_validates_video_against_builder_seeded_live_identity() {
let durable_identity = identity(Provider::Gemini, "providerless-video-alias");
let validated_identities = Arc::new(Mutex::new(Vec::new()));
let service = EphemeralSessionService::new(
BuilderIdentityProbe {
identity: Some(durable_identity.clone()),
validated_identities: Arc::clone(&validated_identities),
},
1,
);
let result = service
.create_session(create_request(
ContentInput::Text("defer".to_string()),
InitialTurnPolicy::Defer,
))
.await
.expect("create session");
validated_identities
.lock()
.expect("validated identities lock poisoned")
.clear();
service
.start_turn(
&result.session_id,
start_turn_request(inline_video_prompt()),
)
.await
.expect("builder-seeded live identity should allow inline video turn");
assert_eq!(
*validated_identities
.lock()
.expect("validated identities lock poisoned"),
vec![durable_identity]
);
}
#[tokio::test]
async fn hot_swap_replaces_builder_seeded_live_identity() {
let durable_identity = identity(Provider::Gemini, "providerless-video-alias");
let validated_identities = Arc::new(Mutex::new(Vec::new()));
let service = EphemeralSessionService::new(
BuilderIdentityProbe {
identity: Some(durable_identity),
validated_identities,
},
1,
);
let result = service
.create_session(create_request(
ContentInput::Text("defer".to_string()),
InitialTurnPolicy::Defer,
))
.await
.expect("create session");
let target_identity = identity(Provider::OpenAI, "gpt-5.4");
service
.hot_swap_session_llm_identity(
&result.session_id,
Arc::new(NoopAgentLlmClient {
model: target_identity.model.clone(),
}),
target_identity.clone(),
meerkat_core::SessionLlmRequestPolicy {
model: target_identity.model.clone(),
provider_params: None,
provider_tool_defaults: None,
},
)
.await
.expect("hot-swap should update the live identity watch");
let live_identity = service
.live_session_llm_identity(&result.session_id)
.await
.expect("live identity");
assert_eq!(live_identity, target_identity);
}
}