use async_trait::async_trait;
use indexmap::IndexMap;
use meerkat_core::error::AgentError;
use meerkat_core::event::{AgentEvent, EventEnvelope};
use meerkat_core::image_content::{MissingBlobBehavior, hydrate_deferred_turn_state};
use meerkat_core::lifecycle::core_executor::{CoreApplyOutput, CoreApplyTerminal};
use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
use meerkat_core::service::{
AppendSystemContextRequest, AppendSystemContextResult, CreateSessionRequest,
DeferredPromptPolicy, MobToolAuthorityContext, SessionControlError, SessionError,
SessionHistoryPage, SessionHistoryQuery, SessionInfo, SessionQuery, SessionService,
SessionServiceCommsExt, SessionServiceControlExt, SessionServiceHistoryExt, SessionSummary,
SessionUsage, SessionView, StageToolResultsRequest, StageToolResultsResult, StartTurnRequest,
TurnToolOverlay,
};
use meerkat_core::time_compat::SystemTime;
use meerkat_core::types::{ContentInput, RunResult, SessionId, ToolResult, Usage};
use meerkat_core::{
InputId, PendingDeferredPrompt, PendingSystemContextAppend, PendingToolResultsMessage, RunId,
SessionDeferredTurnState, SessionLlmIdentity, SessionSystemContextState,
};
use sha2::{Digest, Sha256};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(target_arch = "wasm32")]
use crate::tokio;
#[cfg(target_arch = "wasm32")]
use crate::tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore, mpsc, oneshot, watch};
#[cfg(not(target_arch = "wasm32"))]
use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore, mpsc, oneshot, watch};
const EVENT_CHANNEL_CAPACITY: usize = 256;
const COMMAND_CHANNEL_CAPACITY: usize = 8;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SessionState {
Idle,
Running,
ShuttingDown,
}
#[derive(Debug, Clone)]
pub struct SessionSnapshot {
pub created_at: SystemTime,
pub updated_at: SystemTime,
pub message_count: usize,
pub total_tokens: u64,
pub usage: Usage,
pub last_assistant_text: Option<String>,
}
enum SessionCommand {
StartTurn {
prompt: meerkat_core::types::ContentInput,
render_metadata: Option<meerkat_core::types::RenderMetadata>,
handling_mode: meerkat_core::types::HandlingMode,
event_tx: Option<mpsc::Sender<EventEnvelope<AgentEvent>>>,
result_tx: oneshot::Sender<Result<RunResult, meerkat_core::error::AgentError>>,
skill_references: Option<Vec<meerkat_core::skills::SkillKey>>,
flow_tool_overlay: Option<TurnToolOverlay>,
},
ReplaceClient {
client: Arc<dyn meerkat_core::AgentLlmClient>,
reply_tx: oneshot::Sender<()>,
},
HotSwapLlmIdentity {
client: Arc<dyn meerkat_core::AgentLlmClient>,
identity: SessionLlmIdentity,
reply_tx: oneshot::Sender<Result<(), meerkat_core::error::AgentError>>,
},
StageToolFilter {
filter: meerkat_core::ToolFilter,
reply_tx: oneshot::Sender<Result<(), meerkat_core::error::AgentError>>,
},
ExportSession {
reply_tx: oneshot::Sender<meerkat_core::Session>,
},
ApplyRuntimeSystemContext {
appends: Vec<PendingSystemContextAppend>,
reply_tx: oneshot::Sender<()>,
},
UpdateKeepAlive {
keep_alive: bool,
reply_tx: oneshot::Sender<()>,
},
UpdateMobToolAuthority {
authority_context: Option<MobToolAuthorityContext>,
reply_tx: oneshot::Sender<Result<(), meerkat_core::error::AgentError>>,
},
UpdateSystemPrompt {
system_prompt: String,
reply_tx: oneshot::Sender<Result<(), meerkat_core::error::AgentError>>,
},
Shutdown,
}
#[derive(Clone)]
struct SessionSummaryCache {
updated_at: SystemTime,
message_count: usize,
total_tokens: u64,
usage: Usage,
last_assistant_text: Option<String>,
}
struct SessionHandle {
command_tx: mpsc::Sender<SessionCommand>,
state_rx: watch::Receiver<SessionState>,
summary_rx: watch::Receiver<SessionSummaryCache>,
llm_identity_rx: watch::Receiver<SessionLlmIdentity>,
turn_lock: Arc<AtomicBool>,
_capacity_permit: OwnedSemaphorePermit,
created_at: SystemTime,
labels: BTreeMap<String, String>,
event_injector: Option<Arc<dyn meerkat_core::EventInjector>>,
interaction_event_injector: Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>>,
comms_runtime: Option<Arc<dyn meerkat_core::agent::CommsRuntime>>,
system_context_state: Arc<std::sync::Mutex<SessionSystemContextState>>,
deferred_turn_state: Arc<std::sync::Mutex<SessionDeferredTurnState>>,
interrupt_requested: Arc<AtomicBool>,
interrupt_notify: Arc<tokio::sync::Notify>,
session_event_tx: tokio::sync::broadcast::Sender<EventEnvelope<AgentEvent>>,
}
struct SessionTaskControl {
state_tx: watch::Sender<SessionState>,
summary_tx: watch::Sender<SessionSummaryCache>,
llm_identity_tx: watch::Sender<SessionLlmIdentity>,
turn_lock: Arc<AtomicBool>,
interrupt_requested: Arc<AtomicBool>,
interrupt_notify: Arc<tokio::sync::Notify>,
session_event_tx: tokio::sync::broadcast::Sender<EventEnvelope<AgentEvent>>,
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait SessionAgentBuilder: Send + Sync {
#[cfg(not(target_arch = "wasm32"))]
type Agent: SessionAgent + Send + 'static;
#[cfg(target_arch = "wasm32")]
type Agent: SessionAgent + 'static;
async fn build_agent(
&self,
req: &CreateSessionRequest,
event_tx: mpsc::Sender<AgentEvent>,
) -> Result<Self::Agent, SessionError>;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait SessionAgent: Send {
async fn run_with_events(
&mut self,
prompt: meerkat_core::types::ContentInput,
event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, meerkat_core::error::AgentError>;
async fn run_turn_with_events(
&mut self,
prompt: meerkat_core::types::ContentInput,
handling_mode: meerkat_core::types::HandlingMode,
render_metadata: Option<meerkat_core::types::RenderMetadata>,
event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, meerkat_core::error::AgentError> {
if handling_mode != meerkat_core::types::HandlingMode::Queue {
return Err(meerkat_core::error::AgentError::ConfigError(format!(
"handling_mode {handling_mode:?} requires a runtime-backed surface",
)));
}
if render_metadata.is_some() {
return Err(meerkat_core::error::AgentError::ConfigError(
"render_metadata requires a runtime-backed surface".to_string(),
));
}
self.run_with_events(prompt, event_tx).await
}
async fn run_pending_with_events(
&mut self,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, meerkat_core::error::AgentError> {
Err(meerkat_core::error::AgentError::ConfigError(
"run_pending_with_events is not supported by this session agent".to_string(),
))
}
fn set_skill_references(&mut self, refs: Option<Vec<meerkat_core::skills::SkillKey>>);
fn set_flow_tool_overlay(
&mut self,
overlay: Option<TurnToolOverlay>,
) -> Result<(), meerkat_core::error::AgentError>;
fn apply_pending_tool_results(
&mut self,
results: Vec<meerkat_core::ToolResult>,
) -> Result<(), meerkat_core::error::AgentError> {
if results.is_empty() {
return Ok(());
}
Err(meerkat_core::error::AgentError::ConfigError(
"staged tool-result continuations are not supported by this session agent".to_string(),
))
}
fn replace_client(&mut self, _client: std::sync::Arc<dyn meerkat_core::AgentLlmClient>) {}
fn hot_swap_llm_identity(
&mut self,
client: std::sync::Arc<dyn meerkat_core::AgentLlmClient>,
identity: SessionLlmIdentity,
) -> Result<(), meerkat_core::error::AgentError>;
fn stage_external_tool_filter(
&mut self,
_filter: meerkat_core::ToolFilter,
) -> Result<(), meerkat_core::error::AgentError> {
Ok(())
}
fn cancel(&mut self);
fn session_id(&self) -> SessionId;
fn snapshot(&self) -> SessionSnapshot;
fn session_clone(&self) -> meerkat_core::Session;
fn update_keep_alive(&mut self, _keep_alive: bool) {}
fn update_mob_tool_authority_context(
&mut self,
_authority_context: Option<MobToolAuthorityContext>,
) -> Result<(), meerkat_core::error::AgentError> {
Err(meerkat_core::error::AgentError::ConfigError(
"mob tool authority updates are not supported by this session agent".to_string(),
))
}
fn update_system_prompt(
&mut self,
_system_prompt: String,
) -> Result<(), meerkat_core::error::AgentError> {
Err(meerkat_core::error::AgentError::ConfigError(
"system_prompt override is not supported by this session agent".to_string(),
))
}
fn apply_runtime_system_context(&mut self, appends: &[PendingSystemContextAppend]);
fn system_context_state(
&self,
) -> Arc<std::sync::Mutex<meerkat_core::SessionSystemContextState>>;
fn event_injector(&self) -> Option<Arc<dyn meerkat_core::EventInjector>> {
None
}
#[doc(hidden)]
fn interaction_event_injector(
&self,
) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
None
}
fn comms_runtime(&self) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
None
}
}
pub struct EphemeralSessionService<B: SessionAgentBuilder> {
sessions: RwLock<IndexMap<SessionId, SessionHandle>>,
archived_views: RwLock<IndexMap<SessionId, SessionView>>,
builder: B,
max_sessions: usize,
session_capacity: Arc<Semaphore>,
session_registered: tokio::sync::Notify,
}
impl<B: SessionAgentBuilder + 'static> EphemeralSessionService<B> {
fn build_runtime_receipt(
run_id: RunId,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
session: &meerkat_core::Session,
) -> Result<RunBoundaryReceipt, SessionError> {
let encoded_messages = serde_json::to_vec(session.messages()).map_err(|err| {
SessionError::Agent(AgentError::InternalError(format!(
"failed to serialize session for runtime receipt digest: {err}"
)))
})?;
let digest = format!("{:x}", Sha256::digest(encoded_messages));
Ok(RunBoundaryReceipt {
run_id,
boundary,
contributing_input_ids,
conversation_digest: Some(digest),
message_count: session.messages().len(),
sequence: 0,
})
}
fn callback_pending_terminal(error: &SessionError) -> Option<CoreApplyTerminal> {
match error {
SessionError::Agent(AgentError::CallbackPending { tool_name, args }) => {
Some(CoreApplyTerminal::CallbackPending {
tool_name: tool_name.clone(),
args: args.clone(),
})
}
_ => None,
}
}
async fn build_runtime_output(
&self,
id: &SessionId,
run_id: RunId,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
terminal: Option<CoreApplyTerminal>,
) -> Result<CoreApplyOutput, SessionError> {
let session = self.export_session(id).await?;
let session_snapshot = serde_json::to_vec(&session).map_err(|err| {
SessionError::Agent(AgentError::InternalError(format!(
"failed to serialize session snapshot for runtime commit: {err}"
)))
})?;
let receipt =
Self::build_runtime_receipt(run_id, boundary, contributing_input_ids, &session)?;
Ok(match terminal {
Some(CoreApplyTerminal::RunResult(run_result)) => {
CoreApplyOutput::with_run_result(receipt, Some(session_snapshot), run_result)
}
Some(CoreApplyTerminal::CallbackPending { tool_name, args }) => {
CoreApplyOutput::with_callback_pending(
receipt,
Some(session_snapshot),
tool_name,
args,
)
}
None => CoreApplyOutput::without_terminal(receipt, Some(session_snapshot)),
})
}
fn provider_supports_inline_video(identity: &SessionLlmIdentity) -> bool {
identity.provider == meerkat_core::Provider::Gemini
}
fn validate_prompt_video_input(
prompt: &ContentInput,
identity: &SessionLlmIdentity,
) -> Result<(), SessionError> {
let blocks = match prompt {
ContentInput::Text(_) => return Ok(()),
ContentInput::Blocks(blocks) => blocks,
};
meerkat_core::validate_inline_video_blocks(blocks)
.map_err(|err| SessionError::Agent(AgentError::ConfigError(err)))?;
if meerkat_core::has_video(blocks) && !Self::provider_supports_inline_video(identity) {
return Err(SessionError::Agent(AgentError::ConfigError(format!(
"inline video input requires a Gemini model; current provider is '{}'",
identity.provider.as_str()
))));
}
Ok(())
}
fn validate_tool_result_video(results: &[ToolResult]) -> Result<(), SessionError> {
if results.iter().any(ToolResult::has_video) {
return Err(SessionError::Agent(AgentError::ConfigError(
"video blocks are not supported in tool results".to_string(),
)));
}
Ok(())
}
fn llm_identity_from_create_request(req: &CreateSessionRequest) -> SessionLlmIdentity {
let provider = req
.build
.as_ref()
.and_then(|build| build.provider)
.or_else(|| meerkat_core::Provider::infer_from_model(&req.model))
.unwrap_or(meerkat_core::Provider::Other);
let provider_params = req
.build
.as_ref()
.and_then(|build| build.provider_params.clone());
SessionLlmIdentity {
model: req.model.clone(),
provider,
provider_params,
}
}
pub fn new(builder: B, max_sessions: usize) -> Self {
Self {
sessions: RwLock::new(IndexMap::new()),
archived_views: RwLock::new(IndexMap::new()),
builder,
max_sessions,
session_capacity: Arc::new(Semaphore::new(max_sessions)),
session_registered: tokio::sync::Notify::new(),
}
}
fn archived_view_from_handle(id: &SessionId, handle: &SessionHandle) -> SessionView {
let cache = handle.summary_rx.borrow();
let llm_identity = handle.llm_identity_rx.borrow().clone();
SessionView {
state: SessionInfo {
session_id: id.clone(),
created_at: handle.created_at,
updated_at: cache.updated_at,
message_count: cache.message_count,
is_active: false,
model: llm_identity.model,
provider: llm_identity.provider,
last_assistant_text: cache.last_assistant_text.clone(),
labels: handle.labels.clone(),
},
billing: SessionUsage {
total_tokens: cache.total_tokens,
usage: cache.usage.clone(),
},
}
}
pub async fn export_session(
&self,
id: &SessionId,
) -> Result<meerkat_core::Session, SessionError> {
let (command_tx, deferred_turn_state) = {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
(
handle.command_tx.clone(),
Arc::clone(&handle.deferred_turn_state),
)
};
let (reply_tx, reply_rx) = oneshot::channel();
command_tx
.send(SessionCommand::ExportSession { reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
let mut session = reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})?;
let state = lock_deferred_turn_state(&deferred_turn_state).clone();
session.set_deferred_turn_state(state).map_err(|err| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(format!(
"failed to serialize deferred-turn state: {err}"
)))
})?;
Ok(session)
}
pub async fn deferred_turn_state(
&self,
session_id: &SessionId,
) -> Option<Arc<std::sync::Mutex<SessionDeferredTurnState>>> {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.map(|h| Arc::clone(&h.deferred_turn_state))
}
pub async fn discard_live_session(&self, id: &SessionId) -> Result<(), SessionError> {
let mut sessions = self.sessions.write().await;
let handle = sessions
.swap_remove(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
drop(sessions);
let _ = handle.command_tx.send(SessionCommand::Shutdown).await;
Ok(())
}
pub async fn apply_runtime_system_context(
&self,
id: &SessionId,
appends: Vec<PendingSystemContextAppend>,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::ApplyRuntimeSystemContext { appends, reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})
}
pub async fn apply_runtime_turn(
&self,
id: &SessionId,
run_id: RunId,
req: StartTurnRequest,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
) -> Result<CoreApplyOutput, SessionError> {
match self.start_turn(id, req).await {
Ok(run_result) => {
self.build_runtime_output(
id,
run_id,
boundary,
contributing_input_ids,
Some(CoreApplyTerminal::RunResult(run_result)),
)
.await
}
Err(error) => {
if let Some(terminal) = Self::callback_pending_terminal(&error) {
self.build_runtime_output(
id,
run_id,
boundary,
contributing_input_ids,
Some(terminal),
)
.await
} else {
Err(error)
}
}
}
}
pub async fn apply_runtime_context_appends(
&self,
id: &SessionId,
run_id: RunId,
appends: Vec<PendingSystemContextAppend>,
contributing_input_ids: Vec<InputId>,
) -> Result<CoreApplyOutput, SessionError> {
self.apply_runtime_system_context(id, appends).await?;
self.build_runtime_output(
id,
run_id,
RunApplyBoundary::Immediate,
contributing_input_ids,
None,
)
.await
}
pub async fn event_injector(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::EventInjector>> {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.and_then(|h| h.event_injector.clone())
}
#[doc(hidden)]
pub async fn interaction_event_injector(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.and_then(|h| h.interaction_event_injector.clone())
}
pub async fn system_context_state(
&self,
session_id: &SessionId,
) -> Option<Arc<std::sync::Mutex<SessionSystemContextState>>> {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.map(|h| Arc::clone(&h.system_context_state))
}
pub async fn live_session_llm_identity(
&self,
session_id: &SessionId,
) -> Result<SessionLlmIdentity, SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(session_id)
.ok_or_else(|| SessionError::NotFound {
id: session_id.clone(),
})?;
Ok(handle.llm_identity_rx.borrow().clone())
}
pub async fn comms_runtime(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.and_then(|h| h.comms_runtime.clone())
}
pub async fn wait_session_registered(&self) {
self.session_registered.notified().await;
}
pub async fn shutdown(&self) {
let mut sessions = self.sessions.write().await;
for (_id, handle) in sessions.drain(..) {
let _ = handle.command_tx.send(SessionCommand::Shutdown).await;
}
}
pub async fn subscribe_session_events(
&self,
id: &SessionId,
) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| meerkat_core::comms::StreamError::NotFound(format!("session {id}")))?;
let rx = handle.session_event_tx.subscribe();
Ok(Box::pin(futures::stream::unfold(rx, |mut rx| async move {
loop {
match rx.recv().await {
Ok(event) => return Some((event, rx)),
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => return None,
}
}
})))
}
pub async fn subscribe_session_events_raw(
&self,
id: &SessionId,
) -> Result<
tokio::sync::broadcast::Receiver<EventEnvelope<AgentEvent>>,
meerkat_core::comms::StreamError,
> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| meerkat_core::comms::StreamError::NotFound(format!("session {id}")))?;
Ok(handle.session_event_tx.subscribe())
}
fn try_acquire_turn(id: &SessionId, handle: &SessionHandle) -> Result<(), SessionError> {
match handle
.turn_lock
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => Ok(()),
Err(_) => Err(SessionError::Busy { id: id.clone() }),
}
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<B: SessionAgentBuilder + 'static> SessionService for EphemeralSessionService<B> {
async fn create_session(&self, req: CreateSessionRequest) -> Result<RunResult, SessionError> {
let capacity_permit = match self.session_capacity.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
let active = self.sessions.read().await.len();
return Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(format!(
"Max sessions reached ({}/{})",
active, self.max_sessions
)),
));
}
};
let prompt = req.prompt.clone();
let caller_event_tx = req.event_tx.clone();
let defer_initial_turn =
req.initial_turn == meerkat_core::service::InitialTurnPolicy::Defer;
let llm_identity = Self::llm_identity_from_create_request(&req);
Self::validate_prompt_video_input(&prompt, &llm_identity)?;
let labels = req.labels.clone().unwrap_or_default();
let resumed_session = req
.build
.as_ref()
.and_then(|build| build.resume_session.as_ref());
let mut deferred_turn_state = resumed_session
.and_then(meerkat_core::Session::deferred_turn_state)
.unwrap_or_default();
let resumed_session_is_deferred_template = resumed_session.is_some_and(|session| {
session.messages().is_empty() && session.deferred_turn_state().is_none()
});
if let Some(blob_store) = req
.build
.as_ref()
.and_then(|build| build.blob_store_override.clone())
{
hydrate_deferred_turn_state(
blob_store.as_ref(),
&mut deferred_turn_state,
MissingBlobBehavior::HistoricalPlaceholder,
)
.await
.map_err(|err| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(format!(
"failed to hydrate deferred-turn state during session creation: {err}"
)))
})?;
}
if defer_initial_turn && (resumed_session.is_none() || resumed_session_is_deferred_template)
{
deferred_turn_state.mark_initial_turn_pending();
}
if defer_initial_turn && req.deferred_prompt_policy == DeferredPromptPolicy::Stage {
deferred_turn_state.stage_initial_prompt(prompt.clone(), SystemTime::now());
}
let deferred_turn_state = Arc::new(std::sync::Mutex::new(deferred_turn_state));
let (agent_event_tx, agent_event_rx) = mpsc::channel::<AgentEvent>(EVENT_CHANNEL_CAPACITY);
let agent = self
.builder
.build_agent(&req, agent_event_tx.clone())
.await?;
let session_id = agent.session_id();
let created_at = SystemTime::now();
let turn_lock = Arc::new(AtomicBool::new(false));
let event_injector = agent.event_injector();
let interaction_event_injector = agent.interaction_event_injector();
let comms_runtime = agent.comms_runtime();
let system_context_state = agent.system_context_state();
let (command_tx, command_rx) = mpsc::channel::<SessionCommand>(COMMAND_CHANNEL_CAPACITY);
let (state_tx, state_rx) = watch::channel(SessionState::Idle);
let (summary_tx, summary_rx) = watch::channel(SessionSummaryCache {
updated_at: created_at,
message_count: 0,
total_tokens: 0,
usage: Usage::default(),
last_assistant_text: None,
});
let (llm_identity_tx, llm_identity_rx) = watch::channel(llm_identity);
let (session_event_tx, session_event_rx) =
tokio::sync::broadcast::channel::<EventEnvelope<AgentEvent>>(EVENT_CHANNEL_CAPACITY);
drop(session_event_rx);
let interrupt_requested = Arc::new(AtomicBool::new(false));
let interrupt_notify = Arc::new(tokio::sync::Notify::new());
let task_turn_lock = turn_lock.clone();
tokio::spawn(session_task(
agent,
agent_event_tx,
agent_event_rx,
command_rx,
Arc::clone(&deferred_turn_state),
SessionTaskControl {
state_tx,
summary_tx,
llm_identity_tx,
turn_lock: task_turn_lock,
interrupt_requested: interrupt_requested.clone(),
interrupt_notify: interrupt_notify.clone(),
session_event_tx: session_event_tx.clone(),
},
));
let handle = SessionHandle {
command_tx: command_tx.clone(),
state_rx,
summary_rx,
llm_identity_rx,
turn_lock: turn_lock.clone(),
_capacity_permit: capacity_permit,
created_at,
labels,
event_injector,
interaction_event_injector,
comms_runtime,
system_context_state,
deferred_turn_state,
interrupt_requested,
interrupt_notify,
session_event_tx,
};
let inserted = {
let mut sessions = self.sessions.write().await;
if sessions.contains_key(&session_id) {
false
} else {
sessions.insert(session_id.clone(), handle);
self.session_registered.notify_waiters();
true
}
};
if !inserted {
let _ = command_tx.send(SessionCommand::Shutdown).await;
return Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(format!(
"Duplicate session ID generated: {session_id}"
)),
));
}
if defer_initial_turn {
return Ok(RunResult {
text: String::new(),
session_id,
turns: 0,
tool_calls: 0,
usage: Usage::default(),
structured_output: None,
schema_warnings: None,
skill_diagnostics: None,
});
}
turn_lock.store(true, Ordering::Release);
let (result_tx, result_rx) = oneshot::channel();
if command_tx
.send(SessionCommand::StartTurn {
prompt,
render_metadata: req.render_metadata,
handling_mode: meerkat_core::types::HandlingMode::Queue,
event_tx: caller_event_tx,
result_tx,
skill_references: req.skill_references,
flow_tool_overlay: None,
})
.await
.is_err()
{
turn_lock.store(false, Ordering::Release);
let mut sessions = self.sessions.write().await;
sessions.swap_remove(&session_id);
return Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(
"Session task exited before first turn".to_string(),
),
));
}
let result = match result_rx.await {
Ok(result) => result,
Err(_) => {
let mut sessions = self.sessions.write().await;
sessions.swap_remove(&session_id);
return Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(
"Session task dropped the result channel".to_string(),
),
));
}
};
result.map_err(SessionError::Agent)
}
async fn start_turn(
&self,
id: &SessionId,
req: StartTurnRequest,
) -> Result<RunResult, SessionError> {
let (result_tx, result_rx) = oneshot::channel();
let prompt: meerkat_core::types::ContentInput = match &req.additional_instructions {
Some(instructions) if !instructions.is_empty() => {
let mut prefix = String::new();
for instruction in instructions {
prefix.push_str("[SYSTEM NOTICE: ");
prefix.push_str(instruction);
prefix.push_str("]\n\n");
}
if req.prompt.has_non_text_content() {
let mut blocks = vec![meerkat_core::types::ContentBlock::Text { text: prefix }];
blocks.extend(req.prompt.clone().into_blocks());
meerkat_core::types::ContentInput::Blocks(blocks)
} else {
prefix.push_str(&req.prompt.text_content());
meerkat_core::types::ContentInput::Text(prefix)
}
}
_ => req.prompt.clone(),
};
{
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let identity = handle.llm_identity_rx.borrow().clone();
Self::validate_prompt_video_input(&prompt, &identity)?;
Self::try_acquire_turn(id, handle)?;
if let Some(system_prompt) = req.system_prompt {
let allows_override = {
let guard = lock_deferred_turn_state(&handle.deferred_turn_state);
guard.allows_initial_turn_overrides()
};
if !allows_override {
handle.turn_lock.store(false, Ordering::Release);
return Err(SessionError::Unsupported(
"system_prompt override is only allowed on a deferred session's first turn"
.to_string(),
));
}
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::UpdateSystemPrompt {
system_prompt,
reply_tx,
})
.await
.map_err(|_| {
handle.turn_lock.store(false, Ordering::Release);
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
let update_result = reply_rx.await.map_err(|_| {
handle.turn_lock.store(false, Ordering::Release);
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped reply channel".to_string(),
))
})?;
update_result.map_err(|error| {
handle.turn_lock.store(false, Ordering::Release);
SessionError::Agent(error)
})?;
}
handle
.command_tx
.send(SessionCommand::StartTurn {
prompt,
render_metadata: req.render_metadata,
handling_mode: req.handling_mode,
event_tx: req.event_tx,
result_tx,
skill_references: req.skill_references,
flow_tool_overlay: req.flow_tool_overlay,
})
.await
.map_err(|_| {
handle.turn_lock.store(false, Ordering::Release);
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
}
let result = result_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the result channel".to_string(),
))
})?;
result.map_err(SessionError::Agent)
}
async fn set_session_client(
&self,
id: &SessionId,
client: Arc<dyn meerkat_core::AgentLlmClient>,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::ReplaceClient { client, reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped reply channel".to_string(),
))
})
}
async fn hot_swap_session_llm_identity(
&self,
id: &SessionId,
client: Arc<dyn meerkat_core::AgentLlmClient>,
identity: SessionLlmIdentity,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::HotSwapLlmIdentity {
client,
identity,
reply_tx,
})
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped reply channel".to_string(),
))
})?
.map_err(SessionError::Agent)
}
async fn set_session_tool_filter(
&self,
id: &SessionId,
filter: meerkat_core::ToolFilter,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::StageToolFilter { filter, reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped reply channel".to_string(),
))
})?
.map_err(SessionError::Agent)
}
async fn interrupt(&self, id: &SessionId) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
if !handle.turn_lock.load(Ordering::Acquire) {
return Err(SessionError::NotRunning { id: id.clone() });
}
handle.interrupt_requested.store(true, Ordering::Release);
handle.interrupt_notify.notify_waiters();
Ok(())
}
async fn read(&self, id: &SessionId) -> Result<SessionView, SessionError> {
let sessions = self.sessions.read().await;
let handle = match sessions.get(id) {
Some(handle) => handle,
None => {
drop(sessions);
return self
.archived_views
.read()
.await
.get(id)
.cloned()
.ok_or_else(|| SessionError::NotFound { id: id.clone() });
}
};
let state = *handle.state_rx.borrow();
let summary = handle.summary_rx.borrow().clone();
Ok(SessionView {
state: SessionInfo {
session_id: id.clone(),
created_at: handle.created_at,
updated_at: summary.updated_at,
message_count: summary.message_count,
is_active: state == SessionState::Running,
model: handle.llm_identity_rx.borrow().model.clone(),
provider: handle.llm_identity_rx.borrow().provider,
last_assistant_text: summary.last_assistant_text,
labels: handle.labels.clone(),
},
billing: SessionUsage {
total_tokens: summary.total_tokens,
usage: summary.usage,
},
})
}
async fn list(&self, query: SessionQuery) -> Result<Vec<SessionSummary>, SessionError> {
let sessions = self.sessions.read().await;
let mut summaries: Vec<SessionSummary> = sessions
.iter()
.map(|(session_id, h)| {
let state = *h.state_rx.borrow();
let cache = h.summary_rx.borrow();
SessionSummary {
session_id: session_id.clone(),
created_at: h.created_at,
updated_at: cache.updated_at,
message_count: cache.message_count,
total_tokens: cache.total_tokens,
is_active: state == SessionState::Running,
labels: h.labels.clone(),
}
})
.collect();
if let Some(ref filter_labels) = query.labels {
summaries.retain(|s| {
filter_labels
.iter()
.all(|(k, v)| s.labels.get(k) == Some(v))
});
}
if let Some(offset) = query.offset {
if offset < summaries.len() {
summaries = summaries.split_off(offset);
} else {
summaries.clear();
}
}
if let Some(limit) = query.limit {
summaries.truncate(limit);
}
Ok(summaries)
}
async fn has_live_session(&self, id: &SessionId) -> Result<bool, SessionError> {
Ok(self.sessions.read().await.contains_key(id))
}
async fn archive(&self, id: &SessionId) -> Result<(), SessionError> {
let mut sessions = self.sessions.write().await;
let handle = sessions
.swap_remove(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let archived_view = Self::archived_view_from_handle(id, &handle);
drop(sessions);
self.archived_views
.write()
.await
.insert(id.clone(), archived_view);
let _ = handle.command_tx.send(SessionCommand::Shutdown).await;
Ok(())
}
async fn update_session_keep_alive(
&self,
id: &SessionId,
keep_alive: bool,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::UpdateKeepAlive {
keep_alive,
reply_tx,
})
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped reply channel".to_string(),
))
})
}
async fn update_session_mob_authority_context(
&self,
id: &SessionId,
authority_context: Option<MobToolAuthorityContext>,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::UpdateMobToolAuthority {
authority_context,
reply_tx,
})
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped reply channel".to_string(),
))
})?
.map_err(SessionError::Agent)
}
async fn subscribe_session_events(
&self,
id: &SessionId,
) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
EphemeralSessionService::<B>::subscribe_session_events(self, id).await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<B: SessionAgentBuilder + 'static> SessionServiceControlExt for EphemeralSessionService<B> {
async fn append_system_context(
&self,
id: &SessionId,
req: AppendSystemContextRequest,
) -> Result<AppendSystemContextResult, SessionControlError> {
let state = self
.system_context_state(id)
.await
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let status = {
let mut guard = match state.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!(
session_id = %id,
"system-context state lock poisoned while staging append"
);
poisoned.into_inner()
}
};
guard
.stage_append(&req, SystemTime::now())
.map_err(|err| err.into_control_error(id))?
};
Ok(AppendSystemContextResult { status })
}
async fn stage_tool_results(
&self,
id: &SessionId,
req: StageToolResultsRequest,
) -> Result<StageToolResultsResult, SessionError> {
Self::validate_tool_result_video(&req.results)?;
let state = self
.deferred_turn_state(id)
.await
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let accepted = {
let mut guard = lock_deferred_turn_state(&state);
guard.stage_tool_results(req.results, SystemTime::now())
};
Ok(StageToolResultsResult {
accepted_result_count: accepted,
})
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<B: SessionAgentBuilder + 'static> SessionServiceCommsExt for EphemeralSessionService<B> {
async fn comms_runtime(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
EphemeralSessionService::<B>::comms_runtime(self, session_id).await
}
async fn event_injector(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::EventInjector>> {
EphemeralSessionService::<B>::event_injector(self, session_id).await
}
async fn interaction_event_injector(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
EphemeralSessionService::<B>::interaction_event_injector(self, session_id).await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<B: SessionAgentBuilder + 'static> SessionServiceHistoryExt for EphemeralSessionService<B> {
async fn read_history(
&self,
id: &SessionId,
query: SessionHistoryQuery,
) -> Result<SessionHistoryPage, SessionError> {
match self.export_session(id).await {
Ok(session) => Ok(SessionHistoryPage::from_messages(
session.id().clone(),
session.messages(),
query,
)),
Err(SessionError::NotFound { .. }) => {
if self.archived_views.read().await.contains_key(id) {
Err(SessionError::PersistenceDisabled)
} else {
Err(SessionError::NotFound { id: id.clone() })
}
}
Err(err) => Err(err),
}
}
}
fn stamp_event_envelope(
next_seq: &mut u64,
source_id: &str,
event: AgentEvent,
) -> EventEnvelope<AgentEvent> {
*next_seq += 1;
EventEnvelope::new(source_id, *next_seq, None, event)
}
fn lock_deferred_turn_state(
state: &Arc<std::sync::Mutex<SessionDeferredTurnState>>,
) -> std::sync::MutexGuard<'_, SessionDeferredTurnState> {
match state.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!("deferred-turn state lock poisoned; continuing with inner state");
poisoned.into_inner()
}
}
}
fn merge_content_inputs(
deferred: meerkat_core::types::ContentInput,
turn: meerkat_core::types::ContentInput,
) -> meerkat_core::types::ContentInput {
match (&deferred, &turn) {
(
meerkat_core::types::ContentInput::Text(deferred_text),
meerkat_core::types::ContentInput::Text(turn_text),
) => meerkat_core::types::ContentInput::Text(format!("{deferred_text}\n\n{turn_text}")),
_ => {
let mut blocks = deferred.into_blocks();
blocks.extend(turn.into_blocks());
meerkat_core::types::ContentInput::Blocks(blocks)
}
}
}
fn restore_deferred_turn_inputs(
deferred_turn_state: &Arc<std::sync::Mutex<SessionDeferredTurnState>>,
restore_first_turn_pending: bool,
pending_initial_prompt: Option<PendingDeferredPrompt>,
pending_tool_results: Vec<PendingToolResultsMessage>,
) {
if !restore_first_turn_pending
&& pending_initial_prompt.is_none()
&& pending_tool_results.is_empty()
{
return;
}
let mut guard = lock_deferred_turn_state(deferred_turn_state);
if restore_first_turn_pending {
guard.restore_initial_turn_pending();
}
if guard.pending_initial_prompt.is_none() {
guard.pending_initial_prompt = pending_initial_prompt;
}
if !pending_tool_results.is_empty() {
let mut restored = pending_tool_results;
restored.extend(std::mem::take(&mut guard.pending_tool_results));
guard.pending_tool_results = restored;
}
}
async fn session_task<A: SessionAgent>(
mut agent: A,
agent_event_tx: mpsc::Sender<AgentEvent>,
mut agent_event_rx: mpsc::Receiver<AgentEvent>,
mut commands: mpsc::Receiver<SessionCommand>,
deferred_turn_state: Arc<std::sync::Mutex<SessionDeferredTurnState>>,
control: SessionTaskControl,
) {
let mut next_seq: u64 = 0;
let source_id = format!("session:{}", agent.session_id());
loop {
let Some(cmd) = commands.recv().await else {
break;
};
match cmd {
SessionCommand::ReplaceClient { client, reply_tx } => {
agent.replace_client(client);
let _ = reply_tx.send(());
continue;
}
SessionCommand::HotSwapLlmIdentity {
client,
identity,
reply_tx,
} => {
let result = agent.hot_swap_llm_identity(client, identity.clone());
if result.is_ok() {
control.llm_identity_tx.send_replace(identity);
}
let _ = reply_tx.send(result);
continue;
}
SessionCommand::StageToolFilter { filter, reply_tx } => {
let _ = reply_tx.send(agent.stage_external_tool_filter(filter));
continue;
}
SessionCommand::StartTurn {
prompt,
render_metadata,
handling_mode,
event_tx,
result_tx,
skill_references,
flow_tool_overlay,
} => {
let (restore_first_turn_pending, pending_initial_prompt, pending_tool_results) = {
let mut guard = lock_deferred_turn_state(&deferred_turn_state);
(
guard.mark_initial_turn_started(),
guard.pending_initial_prompt.take(),
std::mem::take(&mut guard.pending_tool_results),
)
};
let prompt = match pending_initial_prompt.as_ref() {
Some(staged_prompt) => {
merge_content_inputs(staged_prompt.prompt.clone(), prompt)
}
None => prompt,
};
let flattened_tool_results = pending_tool_results
.iter()
.flat_map(|pending| pending.results.clone())
.collect::<Vec<_>>();
agent.set_skill_references(skill_references);
if let Err(error) = agent.set_flow_tool_overlay(flow_tool_overlay) {
restore_deferred_turn_inputs(
&deferred_turn_state,
restore_first_turn_pending,
pending_initial_prompt,
pending_tool_results,
);
control.turn_lock.store(false, Ordering::Release);
control.interrupt_requested.store(false, Ordering::Release);
let _ = result_tx.send(Err(error));
continue;
}
if let Err(error) = agent.apply_pending_tool_results(flattened_tool_results) {
let _ = agent.set_flow_tool_overlay(None);
restore_deferred_turn_inputs(
&deferred_turn_state,
restore_first_turn_pending,
pending_initial_prompt,
pending_tool_results,
);
control.turn_lock.store(false, Ordering::Release);
control.interrupt_requested.store(false, Ordering::Release);
let _ = result_tx.send(Err(error));
continue;
}
control.state_tx.send_replace(SessionState::Running);
let mut event_stream_open = true;
let result = {
#[cfg(not(target_arch = "wasm32"))]
type RunFut<'a> = std::pin::Pin<
Box<
dyn std::future::Future<
Output = Result<RunResult, meerkat_core::error::AgentError>,
> + Send
+ 'a,
>,
>;
#[cfg(target_arch = "wasm32")]
type RunFut<'a> = std::pin::Pin<
Box<
dyn std::future::Future<
Output = Result<RunResult, meerkat_core::error::AgentError>,
> + 'a,
>,
>;
let has_prompt =
prompt.has_images() || !prompt.text_content().trim().is_empty();
let run_fut: RunFut<'_> = if has_prompt {
Box::pin(agent.run_turn_with_events(
prompt,
handling_mode,
render_metadata,
agent_event_tx.clone(),
))
} else {
Box::pin(agent.run_pending_with_events(agent_event_tx.clone()))
};
let mut run_fut = run_fut;
let mut interrupted = false;
let r = loop {
let interrupt_wait = control.interrupt_notify.notified();
tokio::select! {
result = &mut run_fut => break result,
() = interrupt_wait => {
if control.interrupt_requested.swap(false, Ordering::AcqRel) {
interrupted = true;
break Err(meerkat_core::error::AgentError::Cancelled);
}
}
Some(event) = agent_event_rx.recv() => {
let envelope = stamp_event_envelope(&mut next_seq, &source_id, event);
let _ = control.session_event_tx.send(envelope.clone());
if event_stream_open
&& let Some(ref tx) = event_tx
&& tx.send(envelope).await.is_err()
{
event_stream_open = false;
tracing::warn!("session event stream receiver dropped; continuing without streaming events");
}
}
}
};
drop(run_fut);
if interrupted {
agent.cancel();
}
while let Ok(event) = agent_event_rx.try_recv() {
let envelope = stamp_event_envelope(&mut next_seq, &source_id, event);
let _ = control.session_event_tx.send(envelope.clone());
if event_stream_open
&& let Some(ref tx) = event_tx
&& tx.send(envelope).await.is_err()
{
event_stream_open = false;
tracing::warn!(
"session event stream receiver dropped while draining events"
);
}
}
r
};
let snap = agent.snapshot();
control.summary_tx.send_replace(SessionSummaryCache {
updated_at: snap.updated_at,
message_count: snap.message_count,
total_tokens: snap.total_tokens,
usage: snap.usage,
last_assistant_text: snap.last_assistant_text,
});
control.state_tx.send_replace(SessionState::Idle);
let result = if let Err(error) = agent.set_flow_tool_overlay(None) {
tracing::error!(
error = %error,
"failed to clear flow tool overlay; failing turn to avoid stale scope"
);
Err(error)
} else {
result
};
control.turn_lock.store(false, Ordering::Release);
control.interrupt_requested.store(false, Ordering::Release);
let _ = result_tx.send(result);
}
SessionCommand::ExportSession { reply_tx } => {
let _ = reply_tx.send(agent.session_clone());
}
SessionCommand::ApplyRuntimeSystemContext { appends, reply_tx } => {
agent.apply_runtime_system_context(&appends);
let snap = agent.snapshot();
control.summary_tx.send_replace(SessionSummaryCache {
updated_at: snap.updated_at,
message_count: snap.message_count,
total_tokens: snap.total_tokens,
usage: snap.usage,
last_assistant_text: snap.last_assistant_text,
});
let _ = reply_tx.send(());
}
SessionCommand::UpdateKeepAlive {
keep_alive,
reply_tx,
} => {
agent.update_keep_alive(keep_alive);
let _ = reply_tx.send(());
}
SessionCommand::UpdateMobToolAuthority {
authority_context,
reply_tx,
} => {
let _ = reply_tx.send(agent.update_mob_tool_authority_context(authority_context));
}
SessionCommand::UpdateSystemPrompt {
system_prompt,
reply_tx,
} => {
let _ = reply_tx.send(agent.update_system_prompt(system_prompt));
}
SessionCommand::Shutdown => {
control.state_tx.send_replace(SessionState::ShuttingDown);
break;
}
}
}
}