use async_trait::async_trait;
use indexmap::IndexMap;
use meerkat_core::event::{AgentEvent, EventEnvelope};
use meerkat_core::service::{
AppendSystemContextRequest, AppendSystemContextResult, CreateSessionRequest,
SessionControlError, SessionError, SessionHistoryPage, SessionHistoryQuery, SessionInfo,
SessionQuery, SessionService, SessionServiceCommsExt, SessionServiceControlExt,
SessionServiceHistoryExt, SessionSummary, SessionUsage, SessionView, StartTurnRequest,
TurnToolOverlay,
};
use meerkat_core::time_compat::SystemTime;
use meerkat_core::types::{RunResult, SessionId, Usage};
use meerkat_core::{PendingSystemContextAppend, SessionSystemContextState};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(target_arch = "wasm32")]
use crate::tokio;
#[cfg(target_arch = "wasm32")]
use crate::tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore, mpsc, oneshot, watch};
#[cfg(not(target_arch = "wasm32"))]
use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore, mpsc, oneshot, watch};
const EVENT_CHANNEL_CAPACITY: usize = 256;
const COMMAND_CHANNEL_CAPACITY: usize = 8;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SessionState {
Idle,
Running,
ShuttingDown,
}
#[derive(Debug, Clone)]
pub struct SessionSnapshot {
pub created_at: SystemTime,
pub updated_at: SystemTime,
pub message_count: usize,
pub total_tokens: u64,
pub usage: Usage,
pub last_assistant_text: Option<String>,
}
enum SessionCommand {
StartTurn {
prompt: String,
host_mode: bool,
event_tx: Option<mpsc::Sender<EventEnvelope<AgentEvent>>>,
result_tx: oneshot::Sender<Result<RunResult, meerkat_core::error::AgentError>>,
skill_references: Option<Vec<meerkat_core::skills::SkillKey>>,
flow_tool_overlay: Option<TurnToolOverlay>,
},
ReadSnapshot {
reply_tx: oneshot::Sender<SessionSnapshot>,
},
ExportSession {
reply_tx: oneshot::Sender<meerkat_core::Session>,
},
ApplyRuntimeSystemContext {
appends: Vec<PendingSystemContextAppend>,
reply_tx: oneshot::Sender<()>,
},
Shutdown,
}
struct SessionSummaryCache {
updated_at: SystemTime,
message_count: usize,
total_tokens: u64,
}
struct SessionHandle {
command_tx: mpsc::Sender<SessionCommand>,
state_rx: watch::Receiver<SessionState>,
summary_rx: watch::Receiver<SessionSummaryCache>,
turn_lock: Arc<AtomicBool>,
_capacity_permit: OwnedSemaphorePermit,
created_at: SystemTime,
labels: BTreeMap<String, String>,
event_injector: Option<Arc<dyn meerkat_core::EventInjector>>,
interaction_event_injector: Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>>,
comms_runtime: Option<Arc<dyn meerkat_core::agent::CommsRuntime>>,
system_context_state: Arc<std::sync::Mutex<SessionSystemContextState>>,
interrupt_requested: Arc<AtomicBool>,
interrupt_notify: Arc<tokio::sync::Notify>,
session_event_tx: tokio::sync::broadcast::Sender<EventEnvelope<AgentEvent>>,
}
struct SessionTaskControl {
state_tx: watch::Sender<SessionState>,
summary_tx: watch::Sender<SessionSummaryCache>,
turn_lock: Arc<AtomicBool>,
interrupt_requested: Arc<AtomicBool>,
interrupt_notify: Arc<tokio::sync::Notify>,
session_event_tx: tokio::sync::broadcast::Sender<EventEnvelope<AgentEvent>>,
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait SessionAgentBuilder: Send + Sync {
#[cfg(not(target_arch = "wasm32"))]
type Agent: SessionAgent + Send + 'static;
#[cfg(target_arch = "wasm32")]
type Agent: SessionAgent + 'static;
async fn build_agent(
&self,
req: &CreateSessionRequest,
event_tx: mpsc::Sender<AgentEvent>,
) -> Result<Self::Agent, SessionError>;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait SessionAgent: Send {
async fn run_with_events(
&mut self,
prompt: String,
event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, meerkat_core::error::AgentError>;
async fn run_host_mode(
&mut self,
prompt: String,
) -> Result<RunResult, meerkat_core::error::AgentError>;
fn set_skill_references(&mut self, refs: Option<Vec<meerkat_core::skills::SkillKey>>);
fn set_flow_tool_overlay(
&mut self,
overlay: Option<TurnToolOverlay>,
) -> Result<(), meerkat_core::error::AgentError>;
fn cancel(&mut self);
fn session_id(&self) -> SessionId;
fn snapshot(&self) -> SessionSnapshot;
fn session_clone(&self) -> meerkat_core::Session;
fn apply_runtime_system_context(&mut self, appends: &[PendingSystemContextAppend]);
fn system_context_state(
&self,
) -> Arc<std::sync::Mutex<meerkat_core::SessionSystemContextState>>;
fn event_injector(&self) -> Option<Arc<dyn meerkat_core::EventInjector>> {
None
}
#[doc(hidden)]
fn interaction_event_injector(
&self,
) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
None
}
fn comms_runtime(&self) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
None
}
}
pub struct EphemeralSessionService<B: SessionAgentBuilder> {
sessions: RwLock<IndexMap<SessionId, SessionHandle>>,
archived_views: RwLock<IndexMap<SessionId, SessionView>>,
builder: B,
max_sessions: usize,
session_capacity: Arc<Semaphore>,
session_registered: tokio::sync::Notify,
}
impl<B: SessionAgentBuilder + 'static> EphemeralSessionService<B> {
pub fn new(builder: B, max_sessions: usize) -> Self {
Self {
sessions: RwLock::new(IndexMap::new()),
archived_views: RwLock::new(IndexMap::new()),
builder,
max_sessions,
session_capacity: Arc::new(Semaphore::new(max_sessions)),
session_registered: tokio::sync::Notify::new(),
}
}
fn archived_view_from_handle(id: &SessionId, handle: &SessionHandle) -> SessionView {
let cache = handle.summary_rx.borrow();
SessionView {
state: SessionInfo {
session_id: id.clone(),
created_at: handle.created_at,
updated_at: cache.updated_at,
message_count: cache.message_count,
is_active: false,
last_assistant_text: None,
labels: handle.labels.clone(),
},
billing: SessionUsage {
total_tokens: cache.total_tokens,
usage: Usage::default(),
},
}
}
pub async fn export_session(
&self,
id: &SessionId,
) -> Result<meerkat_core::Session, SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::ExportSession { reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})
}
pub async fn discard_live_session(&self, id: &SessionId) -> Result<(), SessionError> {
let mut sessions = self.sessions.write().await;
let handle = sessions
.swap_remove(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
drop(sessions);
let _ = handle.command_tx.send(SessionCommand::Shutdown).await;
Ok(())
}
pub async fn apply_runtime_system_context(
&self,
id: &SessionId,
appends: Vec<PendingSystemContextAppend>,
) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::ApplyRuntimeSystemContext { appends, reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})
}
pub async fn event_injector(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::EventInjector>> {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.and_then(|h| h.event_injector.clone())
}
#[doc(hidden)]
pub async fn interaction_event_injector(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.and_then(|h| h.interaction_event_injector.clone())
}
pub async fn system_context_state(
&self,
session_id: &SessionId,
) -> Option<Arc<std::sync::Mutex<SessionSystemContextState>>> {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.map(|h| Arc::clone(&h.system_context_state))
}
pub async fn comms_runtime(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.and_then(|h| h.comms_runtime.clone())
}
pub async fn wait_session_registered(&self) {
self.session_registered.notified().await;
}
pub async fn shutdown(&self) {
let mut sessions = self.sessions.write().await;
for (_id, handle) in sessions.drain(..) {
let _ = handle.command_tx.send(SessionCommand::Shutdown).await;
}
}
pub async fn subscribe_session_events(
&self,
id: &SessionId,
) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| meerkat_core::comms::StreamError::NotFound(format!("session {id}")))?;
let rx = handle.session_event_tx.subscribe();
Ok(Box::pin(futures::stream::unfold(rx, |mut rx| async move {
loop {
match rx.recv().await {
Ok(event) => return Some((event, rx)),
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => return None,
}
}
})))
}
pub async fn subscribe_session_events_raw(
&self,
id: &SessionId,
) -> Result<
tokio::sync::broadcast::Receiver<EventEnvelope<AgentEvent>>,
meerkat_core::comms::StreamError,
> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| meerkat_core::comms::StreamError::NotFound(format!("session {id}")))?;
Ok(handle.session_event_tx.subscribe())
}
fn try_acquire_turn(id: &SessionId, handle: &SessionHandle) -> Result<(), SessionError> {
match handle
.turn_lock
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => Ok(()),
Err(_) => Err(SessionError::Busy { id: id.clone() }),
}
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<B: SessionAgentBuilder + 'static> SessionService for EphemeralSessionService<B> {
async fn create_session(&self, req: CreateSessionRequest) -> Result<RunResult, SessionError> {
let capacity_permit = match self.session_capacity.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
let active = self.sessions.read().await.len();
return Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(format!(
"Max sessions reached ({}/{})",
active, self.max_sessions
)),
));
}
};
let prompt = req.prompt.clone();
let caller_event_tx = req.event_tx.clone();
let defer_initial_turn =
req.initial_turn == meerkat_core::service::InitialTurnPolicy::Defer;
let labels = req.labels.clone().unwrap_or_default();
let (agent_event_tx, agent_event_rx) = mpsc::channel::<AgentEvent>(EVENT_CHANNEL_CAPACITY);
let agent = self
.builder
.build_agent(&req, agent_event_tx.clone())
.await?;
let session_id = agent.session_id();
let created_at = SystemTime::now();
let turn_lock = Arc::new(AtomicBool::new(false));
let event_injector = agent.event_injector();
let interaction_event_injector = agent.interaction_event_injector();
let comms_runtime = agent.comms_runtime();
let system_context_state = agent.system_context_state();
let (command_tx, command_rx) = mpsc::channel::<SessionCommand>(COMMAND_CHANNEL_CAPACITY);
let (state_tx, state_rx) = watch::channel(SessionState::Idle);
let (summary_tx, summary_rx) = watch::channel(SessionSummaryCache {
updated_at: created_at,
message_count: 0,
total_tokens: 0,
});
let (session_event_tx, session_event_rx) =
tokio::sync::broadcast::channel::<EventEnvelope<AgentEvent>>(EVENT_CHANNEL_CAPACITY);
drop(session_event_rx);
let interrupt_requested = Arc::new(AtomicBool::new(false));
let interrupt_notify = Arc::new(tokio::sync::Notify::new());
let task_turn_lock = turn_lock.clone();
tokio::spawn(session_task(
agent,
agent_event_tx,
agent_event_rx,
command_rx,
SessionTaskControl {
state_tx,
summary_tx,
turn_lock: task_turn_lock,
interrupt_requested: interrupt_requested.clone(),
interrupt_notify: interrupt_notify.clone(),
session_event_tx: session_event_tx.clone(),
},
));
let handle = SessionHandle {
command_tx: command_tx.clone(),
state_rx,
summary_rx,
turn_lock: turn_lock.clone(),
_capacity_permit: capacity_permit,
created_at,
labels,
event_injector,
interaction_event_injector,
comms_runtime,
system_context_state,
interrupt_requested,
interrupt_notify,
session_event_tx,
};
let inserted = {
let mut sessions = self.sessions.write().await;
if sessions.contains_key(&session_id) {
false
} else {
sessions.insert(session_id.clone(), handle);
self.session_registered.notify_waiters();
true
}
};
if !inserted {
let _ = command_tx.send(SessionCommand::Shutdown).await;
return Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(format!(
"Duplicate session ID generated: {session_id}"
)),
));
}
if defer_initial_turn {
return Ok(RunResult {
text: String::new(),
session_id,
turns: 0,
tool_calls: 0,
usage: Usage::default(),
structured_output: None,
schema_warnings: None,
skill_diagnostics: None,
});
}
turn_lock.store(true, Ordering::Release);
let host_mode = req.host_mode;
let (result_tx, result_rx) = oneshot::channel();
if command_tx
.send(SessionCommand::StartTurn {
prompt,
host_mode,
event_tx: caller_event_tx,
result_tx,
skill_references: req.skill_references,
flow_tool_overlay: None,
})
.await
.is_err()
{
turn_lock.store(false, Ordering::Release);
let mut sessions = self.sessions.write().await;
sessions.swap_remove(&session_id);
return Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(
"Session task exited before first turn".to_string(),
),
));
}
let result = match result_rx.await {
Ok(result) => result,
Err(_) => {
let mut sessions = self.sessions.write().await;
sessions.swap_remove(&session_id);
return Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(
"Session task dropped the result channel".to_string(),
),
));
}
};
result.map_err(SessionError::Agent)
}
async fn start_turn(
&self,
id: &SessionId,
req: StartTurnRequest,
) -> Result<RunResult, SessionError> {
let (result_tx, result_rx) = oneshot::channel();
let prompt = match &req.additional_instructions {
Some(instructions) if !instructions.is_empty() => {
let mut prefix = String::new();
for instruction in instructions {
prefix.push_str("[SYSTEM NOTICE: ");
prefix.push_str(instruction);
prefix.push_str("]\n\n");
}
prefix.push_str(&req.prompt);
prefix
}
_ => req.prompt,
};
{
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
Self::try_acquire_turn(id, handle)?;
handle
.command_tx
.send(SessionCommand::StartTurn {
prompt,
host_mode: req.host_mode,
event_tx: req.event_tx,
result_tx,
skill_references: req.skill_references,
flow_tool_overlay: req.flow_tool_overlay,
})
.await
.map_err(|_| {
handle.turn_lock.store(false, Ordering::Release);
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
}
let result = result_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the result channel".to_string(),
))
})?;
result.map_err(SessionError::Agent)
}
async fn interrupt(&self, id: &SessionId) -> Result<(), SessionError> {
let sessions = self.sessions.read().await;
let handle = sessions
.get(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
if !handle.turn_lock.load(Ordering::Acquire) {
return Err(SessionError::NotRunning { id: id.clone() });
}
handle.interrupt_requested.store(true, Ordering::Release);
handle.interrupt_notify.notify_waiters();
Ok(())
}
async fn read(&self, id: &SessionId) -> Result<SessionView, SessionError> {
let sessions = self.sessions.read().await;
let handle = match sessions.get(id) {
Some(handle) => handle,
None => {
drop(sessions);
return self
.archived_views
.read()
.await
.get(id)
.cloned()
.ok_or_else(|| SessionError::NotFound { id: id.clone() });
}
};
let (reply_tx, reply_rx) = oneshot::channel();
handle
.command_tx
.send(SessionCommand::ReadSnapshot { reply_tx })
.await
.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task has exited".to_string(),
))
})?;
let snapshot = reply_rx.await.map_err(|_| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(
"Session task dropped the reply channel".to_string(),
))
})?;
let is_active = *handle.state_rx.borrow() == SessionState::Running;
Ok(SessionView {
state: SessionInfo {
session_id: id.clone(),
created_at: snapshot.created_at,
updated_at: snapshot.updated_at,
message_count: snapshot.message_count,
is_active,
last_assistant_text: snapshot.last_assistant_text,
labels: handle.labels.clone(),
},
billing: SessionUsage {
total_tokens: snapshot.total_tokens,
usage: snapshot.usage,
},
})
}
async fn list(&self, query: SessionQuery) -> Result<Vec<SessionSummary>, SessionError> {
let sessions = self.sessions.read().await;
let mut summaries: Vec<SessionSummary> = sessions
.iter()
.map(|(session_id, h)| {
let state = *h.state_rx.borrow();
let cache = h.summary_rx.borrow();
SessionSummary {
session_id: session_id.clone(),
created_at: h.created_at,
updated_at: cache.updated_at,
message_count: cache.message_count,
total_tokens: cache.total_tokens,
is_active: state == SessionState::Running,
labels: h.labels.clone(),
}
})
.collect();
if let Some(ref filter_labels) = query.labels {
summaries.retain(|s| {
filter_labels
.iter()
.all(|(k, v)| s.labels.get(k) == Some(v))
});
}
if let Some(offset) = query.offset {
if offset < summaries.len() {
summaries = summaries.split_off(offset);
} else {
summaries.clear();
}
}
if let Some(limit) = query.limit {
summaries.truncate(limit);
}
Ok(summaries)
}
async fn archive(&self, id: &SessionId) -> Result<(), SessionError> {
let mut sessions = self.sessions.write().await;
let handle = sessions
.swap_remove(id)
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let archived_view = Self::archived_view_from_handle(id, &handle);
drop(sessions);
self.archived_views
.write()
.await
.insert(id.clone(), archived_view);
let _ = handle.command_tx.send(SessionCommand::Shutdown).await;
Ok(())
}
async fn subscribe_session_events(
&self,
id: &SessionId,
) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
EphemeralSessionService::<B>::subscribe_session_events(self, id).await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<B: SessionAgentBuilder + 'static> SessionServiceControlExt for EphemeralSessionService<B> {
async fn append_system_context(
&self,
id: &SessionId,
req: AppendSystemContextRequest,
) -> Result<AppendSystemContextResult, SessionControlError> {
let state = self
.system_context_state(id)
.await
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let status = {
let mut guard = match state.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!(
session_id = %id,
"system-context state lock poisoned while staging append"
);
poisoned.into_inner()
}
};
guard
.stage_append(&req, SystemTime::now())
.map_err(|err| err.into_control_error(id))?
};
Ok(AppendSystemContextResult { status })
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<B: SessionAgentBuilder + 'static> SessionServiceCommsExt for EphemeralSessionService<B> {
async fn comms_runtime(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
EphemeralSessionService::<B>::comms_runtime(self, session_id).await
}
async fn event_injector(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::EventInjector>> {
EphemeralSessionService::<B>::event_injector(self, session_id).await
}
async fn interaction_event_injector(
&self,
session_id: &SessionId,
) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
EphemeralSessionService::<B>::interaction_event_injector(self, session_id).await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<B: SessionAgentBuilder + 'static> SessionServiceHistoryExt for EphemeralSessionService<B> {
async fn read_history(
&self,
id: &SessionId,
query: SessionHistoryQuery,
) -> Result<SessionHistoryPage, SessionError> {
match self.export_session(id).await {
Ok(session) => Ok(SessionHistoryPage::from_messages(
session.id().clone(),
session.messages(),
query,
)),
Err(SessionError::NotFound { .. }) => {
if self.archived_views.read().await.contains_key(id) {
Err(SessionError::PersistenceDisabled)
} else {
Err(SessionError::NotFound { id: id.clone() })
}
}
Err(err) => Err(err),
}
}
}
fn stamp_event_envelope(
next_seq: &mut u64,
source_id: &str,
event: AgentEvent,
) -> EventEnvelope<AgentEvent> {
*next_seq += 1;
EventEnvelope::new(source_id, *next_seq, None, event)
}
async fn session_task<A: SessionAgent>(
mut agent: A,
agent_event_tx: mpsc::Sender<AgentEvent>,
mut agent_event_rx: mpsc::Receiver<AgentEvent>,
mut commands: mpsc::Receiver<SessionCommand>,
control: SessionTaskControl,
) {
let mut next_seq: u64 = 0;
let source_id = format!("session:{}", agent.session_id());
while let Some(cmd) = commands.recv().await {
match cmd {
SessionCommand::StartTurn {
prompt,
host_mode,
event_tx,
result_tx,
skill_references,
flow_tool_overlay,
} => {
agent.set_skill_references(skill_references);
if let Err(error) = agent.set_flow_tool_overlay(flow_tool_overlay) {
control.turn_lock.store(false, Ordering::Release);
control.interrupt_requested.store(false, Ordering::Release);
let _ = result_tx.send(Err(error));
continue;
}
control.state_tx.send_replace(SessionState::Running);
let mut event_stream_open = true;
let result = {
#[cfg(not(target_arch = "wasm32"))]
type RunFut<'a> = std::pin::Pin<
Box<
dyn std::future::Future<
Output = Result<RunResult, meerkat_core::error::AgentError>,
> + Send
+ 'a,
>,
>;
#[cfg(target_arch = "wasm32")]
type RunFut<'a> = std::pin::Pin<
Box<
dyn std::future::Future<
Output = Result<RunResult, meerkat_core::error::AgentError>,
> + 'a,
>,
>;
let run_fut: RunFut<'_> = if host_mode {
Box::pin(agent.run_host_mode(prompt))
} else {
Box::pin(agent.run_with_events(prompt, agent_event_tx.clone()))
};
let mut run_fut = run_fut;
let mut interrupted = false;
let r = loop {
let interrupt_wait = control.interrupt_notify.notified();
tokio::select! {
result = &mut run_fut => break result,
() = interrupt_wait => {
if control.interrupt_requested.swap(false, Ordering::AcqRel) {
interrupted = true;
break Err(meerkat_core::error::AgentError::Cancelled);
}
}
Some(event) = agent_event_rx.recv() => {
let envelope = stamp_event_envelope(&mut next_seq, &source_id, event);
let _ = control.session_event_tx.send(envelope.clone());
if event_stream_open
&& let Some(ref tx) = event_tx
&& tx.send(envelope).await.is_err()
{
event_stream_open = false;
tracing::warn!("session event stream receiver dropped; continuing without streaming events");
}
}
}
};
drop(run_fut);
if interrupted {
agent.cancel();
}
while let Ok(event) = agent_event_rx.try_recv() {
let envelope = stamp_event_envelope(&mut next_seq, &source_id, event);
let _ = control.session_event_tx.send(envelope.clone());
if event_stream_open
&& let Some(ref tx) = event_tx
&& tx.send(envelope).await.is_err()
{
event_stream_open = false;
tracing::warn!(
"session event stream receiver dropped while draining events"
);
}
}
r
};
let snap = agent.snapshot();
control.summary_tx.send_replace(SessionSummaryCache {
updated_at: snap.updated_at,
message_count: snap.message_count,
total_tokens: snap.total_tokens,
});
control.state_tx.send_replace(SessionState::Idle);
let result = if let Err(error) = agent.set_flow_tool_overlay(None) {
tracing::error!(
error = %error,
"failed to clear flow tool overlay; failing turn to avoid stale scope"
);
Err(error)
} else {
result
};
control.turn_lock.store(false, Ordering::Release);
control.interrupt_requested.store(false, Ordering::Release);
let _ = result_tx.send(result);
}
SessionCommand::ReadSnapshot { reply_tx } => {
let _ = reply_tx.send(agent.snapshot());
}
SessionCommand::ExportSession { reply_tx } => {
let _ = reply_tx.send(agent.session_clone());
}
SessionCommand::ApplyRuntimeSystemContext { appends, reply_tx } => {
agent.apply_runtime_system_context(&appends);
let snap = agent.snapshot();
control.summary_tx.send_replace(SessionSummaryCache {
updated_at: snap.updated_at,
message_count: snap.message_count,
total_tokens: snap.total_tokens,
});
let _ = reply_tx.send(());
}
SessionCommand::Shutdown => {
control.state_tx.send_replace(SessionState::ShuttingDown);
break;
}
}
}
}