#![allow(clippy::unwrap_used, clippy::expect_used)]
use async_trait::async_trait;
use futures::StreamExt;
use meerkat_core::AgentExecutionSnapshot;
use meerkat_core::event::AgentEvent;
use meerkat_core::lifecycle::RunId;
use meerkat_core::ops::OperationId;
use meerkat_core::service::{
AppendSystemContextRequest, AppendSystemContextStatus, CreateSessionRequest,
DeferredPromptPolicy, InitialTurnPolicy, SessionError, SessionQuery, SessionService,
SessionServiceControlExt, StartTurnRequest, TurnToolOverlay,
};
use meerkat_core::types::{
HandlingMode, RenderClass, RenderMetadata, RenderSalience, RunResult, SessionId, Usage,
};
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
use meerkat_session::PersistentSessionService;
use meerkat_session::ephemeral::SessionSnapshot;
use meerkat_session::{EphemeralSessionService, SessionAgent, SessionAgentBuilder};
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
use meerkat_store::{MemoryStore, SessionStore};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::mpsc;
struct MockAgent {
session_id: SessionId,
message_count: usize,
delay_ms: Option<u64>,
should_fail: bool,
total_input_tokens: u64,
total_output_tokens: u64,
system_context_state: Arc<std::sync::Mutex<meerkat_core::SessionSystemContextState>>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct RecordedTurnMetadata {
handling_mode: HandlingMode,
render_metadata: Option<RenderMetadata>,
}
struct RecordingTurnAgent {
session_id: SessionId,
recorded: Arc<std::sync::Mutex<Vec<RecordedTurnMetadata>>>,
system_context_state: Arc<std::sync::Mutex<meerkat_core::SessionSystemContextState>>,
}
struct SnapshotAgent {
session_id: SessionId,
execution_snapshot: AgentExecutionSnapshot,
tool_scope_snapshot: meerkat_core::ToolScopeSnapshot,
external_tool_surface_snapshot: Option<meerkat_core::ExternalToolSurfaceSnapshot>,
system_context_state: Arc<std::sync::Mutex<meerkat_core::SessionSystemContextState>>,
}
#[async_trait]
impl SessionAgent for MockAgent {
async fn run_with_events(
&mut self,
_prompt: meerkat_core::types::ContentInput,
event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, meerkat_core::error::AgentError> {
if let Some(delay) = self.delay_ms {
tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
}
if self.should_fail {
let _ = event_tx
.send(AgentEvent::RunFailed {
session_id: self.session_id.clone(),
error_class: meerkat_core::event::AgentErrorClass::Internal,
error: "simulated failure".to_string(),
error_report: None,
terminal_cause_kind: None,
})
.await;
return Err(meerkat_core::error::AgentError::InternalError(
"simulated agent failure".to_string(),
));
}
let _ = event_tx
.send(AgentEvent::RunStarted {
session_id: self.session_id.clone(),
prompt: meerkat_core::ContentInput::Text("test".to_string()),
})
.await;
self.message_count += 2; self.total_input_tokens += 10;
self.total_output_tokens += 5;
let usage = Usage {
input_tokens: 10,
output_tokens: 5,
cache_creation_tokens: None,
cache_read_tokens: None,
};
let _ = event_tx
.send(AgentEvent::RunCompleted {
session_id: self.session_id.clone(),
result: "Hello from mock".to_string(),
structured_output: None,
extraction_required: false,
usage: usage.clone(),
terminal_cause_kind: None,
})
.await;
Ok(RunResult {
text: "Hello from mock".to_string(),
session_id: self.session_id.clone(),
usage,
turns: 1,
tool_calls: 0,
terminal_cause_kind: None,
structured_output: None,
extraction_error: None,
schema_warnings: None,
skill_diagnostics: None,
})
}
fn set_skill_references(&mut self, _refs: Option<Vec<meerkat_core::skills::SkillKey>>) {}
fn set_flow_tool_overlay(
&mut self,
_overlay: Option<TurnToolOverlay>,
) -> Result<(), meerkat_core::error::AgentError> {
Ok(())
}
fn cancel(&mut self) {}
fn hot_swap_llm_identity(
&mut self,
_client: Arc<dyn meerkat_core::AgentLlmClient>,
_identity: meerkat_core::SessionLlmIdentity,
_request_policy: meerkat_core::SessionLlmRequestPolicy,
) -> Result<(), meerkat_core::error::AgentError> {
Ok(())
}
fn session_id(&self) -> SessionId {
self.session_id.clone()
}
fn snapshot(&self) -> SessionSnapshot {
SessionSnapshot {
created_at: SystemTime::now(),
updated_at: SystemTime::now(),
message_count: self.message_count,
total_tokens: self.total_input_tokens + self.total_output_tokens,
usage: Usage {
input_tokens: self.total_input_tokens,
output_tokens: self.total_output_tokens,
cache_creation_tokens: None,
cache_read_tokens: None,
},
last_assistant_text: Some("Hello from mock".to_string()),
}
}
fn session_clone(&self) -> meerkat_core::Session {
let mut session = meerkat_core::Session::with_id(self.session_id.clone());
session
.set_system_context_state(
self.system_context_state
.lock()
.expect("system-context lock poisoned")
.clone(),
)
.expect("serialize system-context state");
session
}
fn apply_runtime_system_context(
&mut self,
appends: &[meerkat_core::PendingSystemContextAppend],
) {
let mut session = self.session_clone();
session.append_system_context_blocks(appends);
self.message_count = session.messages().len();
}
fn system_context_state(
&self,
) -> Arc<std::sync::Mutex<meerkat_core::SessionSystemContextState>> {
Arc::clone(&self.system_context_state)
}
}
#[async_trait]
impl SessionAgent for SnapshotAgent {
async fn run_with_events(
&mut self,
_prompt: meerkat_core::types::ContentInput,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, meerkat_core::error::AgentError> {
Ok(RunResult {
text: "snapshot".to_string(),
session_id: self.session_id.clone(),
usage: Usage::default(),
turns: 0,
tool_calls: 0,
terminal_cause_kind: None,
structured_output: None,
extraction_error: None,
schema_warnings: None,
skill_diagnostics: None,
})
}
fn set_skill_references(&mut self, _refs: Option<Vec<meerkat_core::skills::SkillKey>>) {}
fn set_flow_tool_overlay(
&mut self,
_overlay: Option<TurnToolOverlay>,
) -> Result<(), meerkat_core::error::AgentError> {
Ok(())
}
fn cancel(&mut self) {}
fn hot_swap_llm_identity(
&mut self,
_client: Arc<dyn meerkat_core::AgentLlmClient>,
_identity: meerkat_core::SessionLlmIdentity,
_request_policy: meerkat_core::SessionLlmRequestPolicy,
) -> Result<(), meerkat_core::error::AgentError> {
Ok(())
}
fn session_id(&self) -> SessionId {
self.session_id.clone()
}
fn snapshot(&self) -> SessionSnapshot {
SessionSnapshot {
created_at: SystemTime::now(),
updated_at: SystemTime::now(),
message_count: 0,
total_tokens: 0,
usage: Usage::default(),
last_assistant_text: None,
}
}
fn execution_snapshot(&self) -> Option<AgentExecutionSnapshot> {
Some(self.execution_snapshot.clone())
}
fn tool_scope_snapshot(&self) -> Option<meerkat_core::ToolScopeSnapshot> {
Some(self.tool_scope_snapshot.clone())
}
fn external_tool_surface_snapshot(&self) -> Option<meerkat_core::ExternalToolSurfaceSnapshot> {
self.external_tool_surface_snapshot.clone()
}
fn session_clone(&self) -> meerkat_core::Session {
let mut session = meerkat_core::Session::with_id(self.session_id.clone());
session
.set_system_context_state(
self.system_context_state
.lock()
.expect("system-context lock poisoned")
.clone(),
)
.expect("serialize system-context state");
session
}
fn apply_runtime_system_context(
&mut self,
_appends: &[meerkat_core::PendingSystemContextAppend],
) {
}
fn system_context_state(
&self,
) -> Arc<std::sync::Mutex<meerkat_core::SessionSystemContextState>> {
Arc::clone(&self.system_context_state)
}
}
struct MockAgentBuilder;
#[async_trait]
impl SessionAgentBuilder for MockAgentBuilder {
type Agent = MockAgent;
async fn build_agent(
&self,
_req: &CreateSessionRequest,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<MockAgent, SessionError> {
Ok(MockAgent {
session_id: SessionId::new(),
message_count: 0,
delay_ms: None,
should_fail: false,
total_input_tokens: 0,
total_output_tokens: 0,
system_context_state: Arc::new(std::sync::Mutex::new(Default::default())),
})
}
}
struct SnapshotAgentBuilder {
execution_snapshot: AgentExecutionSnapshot,
tool_scope_snapshot: meerkat_core::ToolScopeSnapshot,
external_tool_surface_snapshot: Option<meerkat_core::ExternalToolSurfaceSnapshot>,
}
#[async_trait]
impl SessionAgentBuilder for SnapshotAgentBuilder {
type Agent = SnapshotAgent;
async fn build_agent(
&self,
_req: &CreateSessionRequest,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<SnapshotAgent, SessionError> {
Ok(SnapshotAgent {
session_id: SessionId::new(),
execution_snapshot: self.execution_snapshot.clone(),
tool_scope_snapshot: self.tool_scope_snapshot.clone(),
external_tool_surface_snapshot: self.external_tool_surface_snapshot.clone(),
system_context_state: Arc::new(std::sync::Mutex::new(Default::default())),
})
}
}
struct SlowMockAgentBuilder {
delay_ms: u64,
}
#[async_trait]
impl SessionAgentBuilder for SlowMockAgentBuilder {
type Agent = MockAgent;
async fn build_agent(
&self,
_req: &CreateSessionRequest,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<MockAgent, SessionError> {
Ok(MockAgent {
session_id: SessionId::new(),
message_count: 0,
delay_ms: Some(self.delay_ms),
should_fail: false,
total_input_tokens: 0,
total_output_tokens: 0,
system_context_state: Arc::new(std::sync::Mutex::new(Default::default())),
})
}
}
struct FailingMockAgentBuilder;
#[async_trait]
impl SessionAgentBuilder for FailingMockAgentBuilder {
type Agent = MockAgent;
async fn build_agent(
&self,
_req: &CreateSessionRequest,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<MockAgent, SessionError> {
Ok(MockAgent {
session_id: SessionId::new(),
message_count: 0,
delay_ms: None,
should_fail: true,
total_input_tokens: 0,
total_output_tokens: 0,
system_context_state: Arc::new(std::sync::Mutex::new(Default::default())),
})
}
}
struct RecordingTurnAgentBuilder {
recorded: Arc<std::sync::Mutex<Vec<RecordedTurnMetadata>>>,
}
#[async_trait]
impl SessionAgentBuilder for RecordingTurnAgentBuilder {
type Agent = RecordingTurnAgent;
async fn build_agent(
&self,
_req: &CreateSessionRequest,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RecordingTurnAgent, SessionError> {
Ok(RecordingTurnAgent {
session_id: SessionId::new(),
recorded: Arc::clone(&self.recorded),
system_context_state: Arc::new(std::sync::Mutex::new(Default::default())),
})
}
}
#[async_trait]
impl SessionAgent for RecordingTurnAgent {
async fn run_with_events(
&mut self,
_prompt: meerkat_core::types::ContentInput,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, meerkat_core::error::AgentError> {
Ok(RunResult {
text: "recorded".to_string(),
session_id: self.session_id.clone(),
usage: Usage::default(),
turns: 1,
tool_calls: 0,
terminal_cause_kind: None,
structured_output: None,
extraction_error: None,
schema_warnings: None,
skill_diagnostics: None,
})
}
async fn run_turn_with_events(
&mut self,
_prompt: meerkat_core::types::ContentInput,
handling_mode: HandlingMode,
render_metadata: Option<RenderMetadata>,
_typed_turn_appends: Vec<meerkat_core::lifecycle::run_primitive::ConversationAppend>,
_execution_kind: Option<meerkat_core::lifecycle::RuntimeExecutionKind>,
_event_tx: mpsc::Sender<AgentEvent>,
) -> Result<RunResult, meerkat_core::error::AgentError> {
self.recorded
.lock()
.expect("recorded-turn lock poisoned")
.push(RecordedTurnMetadata {
handling_mode,
render_metadata,
});
Ok(RunResult {
text: "recorded".to_string(),
session_id: self.session_id.clone(),
usage: Usage::default(),
turns: 1,
tool_calls: 0,
terminal_cause_kind: None,
structured_output: None,
extraction_error: None,
schema_warnings: None,
skill_diagnostics: None,
})
}
fn set_skill_references(&mut self, _refs: Option<Vec<meerkat_core::skills::SkillKey>>) {}
fn set_flow_tool_overlay(
&mut self,
_overlay: Option<TurnToolOverlay>,
) -> Result<(), meerkat_core::error::AgentError> {
Ok(())
}
fn cancel(&mut self) {}
fn hot_swap_llm_identity(
&mut self,
_client: Arc<dyn meerkat_core::AgentLlmClient>,
_identity: meerkat_core::SessionLlmIdentity,
_request_policy: meerkat_core::SessionLlmRequestPolicy,
) -> Result<(), meerkat_core::error::AgentError> {
Ok(())
}
fn session_id(&self) -> SessionId {
self.session_id.clone()
}
fn snapshot(&self) -> SessionSnapshot {
SessionSnapshot {
created_at: SystemTime::now(),
updated_at: SystemTime::now(),
message_count: 0,
total_tokens: 0,
usage: Usage::default(),
last_assistant_text: Some("recorded".to_string()),
}
}
fn session_clone(&self) -> meerkat_core::Session {
let mut session = meerkat_core::Session::with_id(self.session_id.clone());
session
.set_system_context_state(
self.system_context_state
.lock()
.expect("system-context lock poisoned")
.clone(),
)
.expect("serialize system-context state");
session
}
fn apply_runtime_system_context(
&mut self,
_appends: &[meerkat_core::PendingSystemContextAppend],
) {
}
fn system_context_state(
&self,
) -> Arc<std::sync::Mutex<meerkat_core::SessionSystemContextState>> {
Arc::clone(&self.system_context_state)
}
}
fn make_service(builder: MockAgentBuilder) -> Arc<EphemeralSessionService<MockAgentBuilder>> {
Arc::new(EphemeralSessionService::new(builder, 10))
}
fn make_slow_service(delay_ms: u64) -> Arc<EphemeralSessionService<SlowMockAgentBuilder>> {
Arc::new(EphemeralSessionService::new(
SlowMockAgentBuilder { delay_ms },
10,
))
}
fn make_failing_service() -> Arc<EphemeralSessionService<FailingMockAgentBuilder>> {
Arc::new(EphemeralSessionService::new(FailingMockAgentBuilder, 10))
}
fn make_recording_service(
recorded: Arc<std::sync::Mutex<Vec<RecordedTurnMetadata>>>,
) -> Arc<EphemeralSessionService<RecordingTurnAgentBuilder>> {
Arc::new(EphemeralSessionService::new(
RecordingTurnAgentBuilder { recorded },
10,
))
}
fn make_snapshot_service(
execution_snapshot: AgentExecutionSnapshot,
tool_scope_snapshot: meerkat_core::ToolScopeSnapshot,
external_tool_surface_snapshot: Option<meerkat_core::ExternalToolSurfaceSnapshot>,
) -> Arc<EphemeralSessionService<SnapshotAgentBuilder>> {
Arc::new(EphemeralSessionService::new(
SnapshotAgentBuilder {
execution_snapshot,
tool_scope_snapshot,
external_tool_surface_snapshot,
},
10,
))
}
fn create_req(prompt: &str) -> CreateSessionRequest {
CreateSessionRequest {
model: "mock".to_string(),
prompt: prompt.to_string().into(),
render_metadata: None,
system_prompt: None,
max_tokens: None,
event_tx: None,
skill_references: None,
initial_turn: InitialTurnPolicy::RunImmediately,
deferred_prompt_policy: DeferredPromptPolicy::Discard,
build: None,
labels: None,
}
}
fn create_req_deferred(prompt: &str) -> CreateSessionRequest {
CreateSessionRequest {
initial_turn: InitialTurnPolicy::Defer,
deferred_prompt_policy: DeferredPromptPolicy::Stage,
..create_req(prompt)
}
}
fn create_req_with_labels(prompt: &str, labels: BTreeMap<String, String>) -> CreateSessionRequest {
CreateSessionRequest {
labels: Some(labels),
..create_req(prompt)
}
}
fn turn_req(prompt: &str) -> StartTurnRequest {
StartTurnRequest {
prompt: prompt.to_string().into(),
system_prompt: None,
event_tx: None,
runtime: meerkat_core::service::StartTurnRuntimeSemantics::default(),
}
}
#[tokio::test]
async fn full_lifecycle_create_turns_read_archive_gone() {
let service = make_service(MockAgentBuilder);
let r1 = service.create_session(create_req("Hello")).await.unwrap();
let sid = r1.session_id.clone();
let _r2 = service
.start_turn(&sid, turn_req("Follow up"))
.await
.unwrap();
let view = service.read(&sid).await.unwrap();
assert!(
view.state.message_count >= 4,
"expected >= 4 messages after 2 turns, got {}",
view.state.message_count
);
service.archive(&sid).await.unwrap();
let sessions = service.list(SessionQuery::default()).await.unwrap();
assert!(
sessions.is_empty(),
"archived session should not appear in list"
);
let archived_view = service.read(&sid).await;
assert!(
archived_view.is_ok(),
"read after archive should return the archived view"
);
let archived_view = archived_view.unwrap();
assert!(!archived_view.state.is_active);
}
#[tokio::test]
async fn read_returns_all_session_view_fields() {
let service = make_service(MockAgentBuilder);
let result = service
.create_session(create_req("Check fields"))
.await
.unwrap();
let sid = result.session_id.clone();
let view = service.read(&sid).await.unwrap();
assert_eq!(view.state.session_id, sid);
assert!(view.state.created_at <= SystemTime::now());
assert!(view.state.updated_at <= SystemTime::now());
assert!(
view.state.message_count > 0,
"message_count should be > 0 after a turn"
);
assert!(
!view.state.is_active,
"session should be idle after turn completes"
);
assert!(
view.state.last_assistant_text.is_some(),
"last_assistant_text should be set after a turn"
);
assert!(view.billing.total_tokens > 0, "total_tokens should be > 0");
assert!(
view.billing.usage.input_tokens > 0,
"input_tokens should be > 0"
);
assert!(
view.billing.usage.output_tokens > 0,
"output_tokens should be > 0"
);
}
#[tokio::test]
async fn usage_accumulates_across_turns() {
let service = make_service(MockAgentBuilder);
let r1 = service.create_session(create_req("Turn 1")).await.unwrap();
let sid = r1.session_id.clone();
let view1 = service.read(&sid).await.unwrap();
let tokens_after_1 = view1.billing.total_tokens;
assert!(tokens_after_1 > 0, "should have tokens after first turn");
let _r2 = service.start_turn(&sid, turn_req("Turn 2")).await.unwrap();
let view2 = service.read(&sid).await.unwrap();
let tokens_after_2 = view2.billing.total_tokens;
assert!(
tokens_after_2 > tokens_after_1,
"tokens should accumulate: {tokens_after_2} should be > {tokens_after_1}"
);
}
#[tokio::test]
async fn interrupt_during_slow_turn_returns_promptly() {
let service = make_slow_service(500);
let created = service
.create_session(create_req_deferred("slow"))
.await
.unwrap();
let sid = created.session_id;
let svc = service.clone();
let sid2 = sid.clone();
let turn_handle =
tokio::spawn(async move { svc.start_turn(&sid2, turn_req("Slow turn")).await });
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let start = tokio::time::Instant::now();
let interrupt_result = service.interrupt(&sid).await;
let elapsed = start.elapsed();
assert!(interrupt_result.is_ok(), "interrupt should succeed");
assert!(
elapsed < tokio::time::Duration::from_millis(200),
"interrupt should return promptly, took {elapsed:?}"
);
let turn_result = turn_handle.await.unwrap();
assert!(turn_result.is_err(), "interrupted turn should return error");
}
#[tokio::test]
async fn interrupt_idle_session_returns_not_running() {
let service = make_service(MockAgentBuilder);
let result = service
.create_session(create_req("Idle test"))
.await
.unwrap();
let sid = result.session_id;
let err = service.interrupt(&sid).await.unwrap_err();
assert_eq!(err.code(), "SESSION_NOT_RUNNING");
}
#[tokio::test]
async fn concurrent_turn_on_busy_session_returns_busy() {
let service = make_slow_service(200);
let created = service
.create_session(create_req_deferred("busy"))
.await
.unwrap();
let sid = created.session_id;
let svc = service.clone();
let sid2 = sid.clone();
let _handle = tokio::spawn(async move { svc.start_turn(&sid2, turn_req("Slow")).await });
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let result = service.start_turn(&sid, turn_req("Fast")).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.code(), "SESSION_BUSY");
}
#[tokio::test]
async fn read_during_active_turn_returns_cached_view() {
let service = make_slow_service(200);
let created = service
.create_session(create_req_deferred("read during active turn"))
.await
.unwrap();
let sid = created.session_id;
let svc = service.clone();
let sid2 = sid.clone();
let turn = tokio::spawn(async move { svc.start_turn(&sid2, turn_req("Slow")).await });
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let view = tokio::time::timeout(tokio::time::Duration::from_millis(50), service.read(&sid))
.await
.expect("read should not wait for the running turn")
.expect("read should succeed while the turn is active");
assert!(
view.state.is_active,
"live read should reflect that the session is currently running",
);
turn.await.unwrap().unwrap();
}
#[tokio::test]
async fn archive_during_running_turn_gracefully_drains_current_turn() {
let service = make_slow_service(150);
let created = service
.create_session(create_req_deferred("archive during run"))
.await
.unwrap();
let sid = created.session_id;
let svc = service.clone();
let sid2 = sid.clone();
let turn = tokio::spawn(async move { svc.start_turn(&sid2, turn_req("Slow")).await });
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
service.archive(&sid).await.unwrap();
let turn_result = turn.await.unwrap();
assert!(
turn_result.is_ok(),
"archive should gracefully drain the admitted/running turn"
);
let archived = service.read(&sid).await.unwrap();
assert!(
!archived.state.is_active,
"archived session must be inactive"
);
let err = service
.start_turn(&sid, turn_req("after archive"))
.await
.unwrap_err();
assert_eq!(err.code(), "SESSION_NOT_FOUND");
}
#[tokio::test]
async fn event_stream_emits_run_started_and_completed() {
let service = make_service(MockAgentBuilder);
let created = service
.create_session(create_req_deferred("event test"))
.await
.unwrap();
let sid = created.session_id;
let mut stream = service
.subscribe_session_events(&sid)
.await
.expect("should subscribe to session events");
service
.start_turn(&sid, turn_req("trigger events"))
.await
.unwrap();
let mut events = Vec::new();
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2);
while let Ok(Some(envelope)) = tokio::time::timeout_at(deadline, stream.next()).await {
events.push(envelope);
}
let has_run_started = events
.iter()
.any(|e| matches!(e.payload, AgentEvent::RunStarted { .. }));
let has_run_completed = events
.iter()
.any(|e| matches!(e.payload, AgentEvent::RunCompleted { .. }));
assert!(
has_run_started,
"events should contain RunStarted: {events:?}"
);
assert!(
has_run_completed,
"events should contain RunCompleted: {events:?}"
);
}
#[tokio::test]
async fn event_ordering_run_started_before_completed() {
let service = make_service(MockAgentBuilder);
let created = service
.create_session(create_req_deferred("order test"))
.await
.unwrap();
let sid = created.session_id;
let mut stream = service
.subscribe_session_events(&sid)
.await
.expect("should subscribe to session events");
service.start_turn(&sid, turn_req("trigger")).await.unwrap();
let mut events = Vec::new();
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2);
while let Ok(Some(envelope)) = tokio::time::timeout_at(deadline, stream.next()).await {
events.push(envelope);
}
let started_idx = events
.iter()
.position(|e| matches!(e.payload, AgentEvent::RunStarted { .. }));
let completed_idx = events
.iter()
.position(|e| matches!(e.payload, AgentEvent::RunCompleted { .. }));
assert!(started_idx.is_some(), "RunStarted should be present");
assert!(completed_idx.is_some(), "RunCompleted should be present");
assert!(
started_idx.unwrap() < completed_idx.unwrap(),
"RunStarted (idx={}) should come before RunCompleted (idx={})",
started_idx.unwrap(),
completed_idx.unwrap()
);
if let (Some(si), Some(ci)) = (started_idx, completed_idx) {
assert!(
events[si].seq < events[ci].seq,
"RunStarted seq ({}) should be < RunCompleted seq ({})",
events[si].seq,
events[ci].seq
);
}
}
#[tokio::test]
async fn concurrent_creates_produce_distinct_sessions() {
let service = Arc::new(EphemeralSessionService::new(MockAgentBuilder, 10));
let mut handles = Vec::new();
for i in 0..5 {
let svc = service.clone();
handles.push(tokio::spawn(async move {
svc.create_session(create_req(&format!("concurrent-{i}")))
.await
}));
}
let mut session_ids = Vec::new();
for handle in handles {
let result = handle.await.unwrap().unwrap();
session_ids.push(result.session_id);
}
let unique: std::collections::HashSet<_> = session_ids.iter().collect();
assert_eq!(
unique.len(),
5,
"all 5 sessions should have distinct IDs, got: {session_ids:?}"
);
let sessions = service.list(SessionQuery::default()).await.unwrap();
assert_eq!(sessions.len(), 5);
}
#[tokio::test]
async fn labels_create_and_filter() {
let service = make_service(MockAgentBuilder);
let mut test_labels = BTreeMap::new();
test_labels.insert("env".to_string(), "test".to_string());
let _ = service
.create_session(create_req_with_labels("test session", test_labels))
.await
.unwrap();
let mut prod_labels = BTreeMap::new();
prod_labels.insert("env".to_string(), "prod".to_string());
let _ = service
.create_session(create_req_with_labels("prod session", prod_labels))
.await
.unwrap();
let mut filter = BTreeMap::new();
filter.insert("env".to_string(), "test".to_string());
let results = service
.list(SessionQuery {
limit: None,
offset: None,
labels: Some(filter),
})
.await
.unwrap();
assert_eq!(results.len(), 1, "should only return the test session");
assert_eq!(results[0].labels.get("env").unwrap(), "test");
}
#[tokio::test]
async fn labels_preserved_in_read() {
let service = make_service(MockAgentBuilder);
let mut labels = BTreeMap::new();
labels.insert("team".to_string(), "platform".to_string());
labels.insert("tier".to_string(), "critical".to_string());
let result = service
.create_session(create_req_with_labels("labeled session", labels.clone()))
.await
.unwrap();
let sid = result.session_id;
let view = service.read(&sid).await.unwrap();
assert_eq!(
view.state.labels, labels,
"labels should be preserved in read"
);
}
#[tokio::test]
async fn inject_context_applied_when_idle() {
let service = make_service(MockAgentBuilder);
let created = service
.create_session(create_req_deferred("context test"))
.await
.unwrap();
let sid = created.session_id;
let result = service
.append_system_context(
&sid,
AppendSystemContextRequest {
text: "New runtime context".to_string(),
source: Some("test".to_string()),
idempotency_key: Some("ctx-idle-1".to_string()),
},
)
.await
.expect("append should succeed on idle session");
assert_eq!(
result.status,
AppendSystemContextStatus::Staged,
"context on idle session should be staged"
);
}
#[tokio::test]
async fn inject_context_duplicate_idempotent() {
let service = make_service(MockAgentBuilder);
let created = service
.create_session(create_req_deferred("dedup test"))
.await
.unwrap();
let sid = created.session_id;
let req = AppendSystemContextRequest {
text: "Idempotent context".to_string(),
source: Some("test".to_string()),
idempotency_key: Some("ctx-dedup-1".to_string()),
};
let first = service
.append_system_context(&sid, req.clone())
.await
.expect("first append should succeed");
assert_eq!(first.status, AppendSystemContextStatus::Staged);
let second = service
.append_system_context(&sid, req.clone())
.await
.expect("duplicate append should succeed");
assert_eq!(
second.status,
AppendSystemContextStatus::Duplicate,
"second append with same key should be duplicate"
);
}
#[tokio::test]
async fn start_turn_forwards_handling_mode_and_render_metadata() {
let recorded = Arc::new(std::sync::Mutex::new(Vec::<RecordedTurnMetadata>::new()));
let service = make_recording_service(Arc::clone(&recorded));
let created = service
.create_session(create_req_deferred("record"))
.await
.expect("create session");
service
.start_turn(
&created.session_id,
StartTurnRequest {
prompt: "steer me".into(),
system_prompt: None,
event_tx: None,
runtime: meerkat_core::service::StartTurnRuntimeSemantics::new(
Some(RenderMetadata {
class: RenderClass::ExternalEvent,
salience: RenderSalience::Urgent,
}),
HandlingMode::Steer,
None,
None,
Vec::new(),
None,
),
},
)
.await
.expect("start turn");
let recorded = recorded.lock().expect("recorded-turn lock poisoned");
assert_eq!(recorded.len(), 1);
assert_eq!(recorded[0].handling_mode, HandlingMode::Steer);
assert_eq!(
recorded[0].render_metadata,
Some(RenderMetadata {
class: RenderClass::ExternalEvent,
salience: RenderSalience::Urgent,
})
);
}
#[tokio::test]
async fn failed_turn_returns_agent_error() {
let service = make_failing_service();
let created = service
.create_session(create_req_deferred("will fail"))
.await
.unwrap();
let sid = created.session_id;
let result = service.start_turn(&sid, turn_req("fail")).await;
assert!(result.is_err(), "failing agent should produce an error");
let err = result.unwrap_err();
assert_eq!(
err.code(),
"AGENT_ERROR",
"error code should be AGENT_ERROR, got: {}",
err.code()
);
}
#[tokio::test]
async fn execution_snapshot_returns_live_agent_execution_state() {
let expected = AgentExecutionSnapshot {
loop_state: meerkat_core::state::LoopState::WaitingForOps,
turn_phase: meerkat_core::turn_execution_authority::TurnPhase::WaitingForOps,
active_run_id: Some(RunId::new()),
primitive_kind: meerkat_core::turn_execution_authority::TurnPrimitiveKind::ConversationTurn,
admitted_content_shape: Some(
meerkat_core::turn_execution_authority::ContentShape::Conversation,
),
vision_enabled: true,
image_tool_results_enabled: false,
tool_calls_pending: 2,
pending_operation_ids: Some(vec![OperationId::new(), OperationId::new()]),
barrier_operation_ids: vec![OperationId::new()],
has_barrier_ops: true,
barrier_satisfied: false,
boundary_count: 1,
cancel_after_boundary: true,
terminal_outcome: meerkat_core::turn_execution_authority::TurnTerminalOutcome::Cancelled,
terminal_cause_kind: None,
extraction_attempts: 1,
max_extraction_retries: 3,
applied_cursor: 17,
};
let expected_tool_scope = meerkat_core::ToolScopeSnapshot {
known_base_names: vec!["alpha".to_string(), "beta".to_string()],
visible_names: vec!["beta".to_string()],
base_filter: meerkat_core::ToolFilter::All,
active_external_filter: meerkat_core::ToolFilter::Deny(
["alpha".to_string()].into_iter().collect(),
),
active_turn_allow: Some(vec!["beta".to_string()]),
active_turn_deny: Vec::new(),
active_revision: meerkat_core::ToolScopeRevision(3),
staged_external_filter: meerkat_core::ToolFilter::Deny(
["alpha".to_string()].into_iter().collect(),
),
staged_revision: meerkat_core::ToolScopeRevision(3),
capability_base_filter: meerkat_core::ToolFilter::All,
};
let service = make_snapshot_service(expected.clone(), expected_tool_scope, None);
let created = service
.create_session(create_req_deferred("snapshot"))
.await
.expect("create snapshot-backed session");
let actual = service
.execution_snapshot(&created.session_id)
.await
.expect("execution snapshot query should succeed")
.expect("snapshot should be present");
assert_eq!(actual, expected);
}
#[tokio::test]
async fn tool_scope_snapshot_returns_live_agent_tool_scope_state() {
let expected_execution = AgentExecutionSnapshot {
loop_state: meerkat_core::state::LoopState::CallingLlm,
turn_phase: meerkat_core::turn_execution_authority::TurnPhase::Ready,
active_run_id: None,
primitive_kind: meerkat_core::turn_execution_authority::TurnPrimitiveKind::ConversationTurn,
admitted_content_shape: None,
vision_enabled: false,
image_tool_results_enabled: false,
tool_calls_pending: 0,
pending_operation_ids: None,
barrier_operation_ids: Vec::new(),
has_barrier_ops: false,
barrier_satisfied: true,
boundary_count: 0,
cancel_after_boundary: false,
terminal_outcome: meerkat_core::turn_execution_authority::TurnTerminalOutcome::None,
terminal_cause_kind: None,
extraction_attempts: 0,
max_extraction_retries: 2,
applied_cursor: 0,
};
let expected = meerkat_core::ToolScopeSnapshot {
known_base_names: vec![
"read_file".to_string(),
"search".to_string(),
"write_file".to_string(),
],
visible_names: vec!["read_file".to_string(), "search".to_string()],
base_filter: meerkat_core::ToolFilter::All,
active_external_filter: meerkat_core::ToolFilter::Deny(
["write_file".to_string()].into_iter().collect(),
),
active_turn_allow: None,
active_turn_deny: vec!["write_file".to_string()],
active_revision: meerkat_core::ToolScopeRevision(4),
staged_external_filter: meerkat_core::ToolFilter::Allow(
["read_file".to_string(), "search".to_string()]
.into_iter()
.collect(),
),
staged_revision: meerkat_core::ToolScopeRevision(5),
capability_base_filter: meerkat_core::ToolFilter::All,
};
let service = make_snapshot_service(expected_execution, expected.clone(), None);
let created = service
.create_session(create_req_deferred("tool-scope snapshot"))
.await
.expect("create snapshot-backed session");
let actual = service
.tool_scope_snapshot(&created.session_id)
.await
.expect("tool-scope snapshot query should succeed")
.expect("tool-scope snapshot should be present");
assert_eq!(actual, expected);
}
#[tokio::test]
async fn external_tool_surface_snapshot_returns_live_agent_tool_surface_state() {
let expected_execution = AgentExecutionSnapshot {
loop_state: meerkat_core::state::LoopState::CallingLlm,
turn_phase: meerkat_core::turn_execution_authority::TurnPhase::Ready,
active_run_id: None,
primitive_kind: meerkat_core::turn_execution_authority::TurnPrimitiveKind::ConversationTurn,
admitted_content_shape: None,
vision_enabled: false,
image_tool_results_enabled: false,
tool_calls_pending: 0,
pending_operation_ids: None,
barrier_operation_ids: Vec::new(),
has_barrier_ops: false,
barrier_satisfied: true,
boundary_count: 0,
cancel_after_boundary: false,
terminal_outcome: meerkat_core::turn_execution_authority::TurnTerminalOutcome::None,
terminal_cause_kind: None,
extraction_attempts: 0,
max_extraction_retries: 0,
applied_cursor: 0,
};
let expected_tool_scope = meerkat_core::ToolScopeSnapshot {
known_base_names: vec!["mcp__planner".to_string()],
visible_names: vec!["mcp__planner".to_string()],
base_filter: meerkat_core::ToolFilter::All,
active_external_filter: meerkat_core::ToolFilter::All,
active_turn_allow: None,
active_turn_deny: Vec::new(),
active_revision: meerkat_core::ToolScopeRevision(0),
staged_external_filter: meerkat_core::ToolFilter::All,
staged_revision: meerkat_core::ToolScopeRevision(0),
capability_base_filter: meerkat_core::ToolFilter::All,
};
let expected_surface = meerkat_core::ExternalToolSurfaceSnapshot {
phase: meerkat_core::ExternalToolSurfaceGlobalPhase::Operating,
snapshot_epoch: 0,
snapshot_aligned_epoch: 0,
entries: vec![meerkat_core::ExternalToolSurfaceEntrySnapshot {
surface_id: "planner".to_string(),
visible: false,
base_state: meerkat_core::ExternalToolSurfaceBaseState::Absent,
has_removal_timing: false,
pending_op: meerkat_core::ExternalToolSurfacePendingOp::None,
staged_op: meerkat_core::ExternalToolSurfaceStagedOp::Add,
staged_intent_sequence: 1,
pending_task_sequence: 0,
pending_lineage_sequence: 0,
inflight_call_count: 0,
last_delta_operation: meerkat_core::ExternalToolSurfaceDeltaOperation::None,
last_delta_phase: meerkat_core::ExternalToolSurfaceDeltaPhase::None,
}],
};
let service = make_snapshot_service(
expected_execution,
expected_tool_scope,
Some(expected_surface.clone()),
);
let created = service
.create_session(create_req_deferred("tool-surface snapshot"))
.await
.expect("create snapshot-backed session");
let actual = service
.external_tool_surface_snapshot(&created.session_id)
.await
.expect("tool-surface snapshot query should succeed")
.expect("tool-surface snapshot should be present");
assert_eq!(actual, expected_surface);
}
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
#[tokio::test]
async fn persistent_start_turn_recovers_after_discarding_stale_live_session() {
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let service = PersistentSessionService::new(
MockAgentBuilder,
4,
Arc::clone(&store),
None,
Arc::new(meerkat_store::MemoryBlobStore::new()),
);
let created = service
.create_session(create_req_deferred("seed"))
.await
.expect("create deferred session");
let session_id = created.session_id.clone();
let mut stored = store
.load(&session_id)
.await
.expect("load should succeed")
.expect("persisted session should exist");
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
stored.touch();
store.save(&stored).await.expect("save updated session");
let result = service
.start_turn(&session_id, turn_req("follow up"))
.await
.expect("start_turn should recover a stale discarded live session");
assert_eq!(result.session_id, session_id);
assert!(
service
.has_live_session(&session_id)
.await
.expect("live session query should succeed"),
"recovered session should be live again after start_turn",
);
}
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
#[tokio::test]
async fn persistent_start_turn_rejects_archived_stored_only_session() {
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let service = PersistentSessionService::new(
MockAgentBuilder,
4,
Arc::clone(&store),
None,
Arc::new(meerkat_store::MemoryBlobStore::new()),
);
let created = service
.create_session(create_req_deferred("seed"))
.await
.expect("create deferred session");
let session_id = created.session_id.clone();
service
.archive(&session_id)
.await
.expect("archive should succeed");
let err = service
.start_turn(&session_id, turn_req("after archive"))
.await
.expect_err("archived stored-only sessions must stay read-only");
assert_eq!(err.code(), "SESSION_NOT_FOUND");
assert!(
!service
.has_live_session(&session_id)
.await
.expect("live session query should succeed"),
"archived session must not be recreated as live",
);
}
#[cfg(all(feature = "session-store", not(target_arch = "wasm32")))]
#[tokio::test]
async fn persistent_concurrent_start_turn_recovery_never_surfaces_duplicate_id() {
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let service = Arc::new(PersistentSessionService::new(
MockAgentBuilder,
4,
Arc::clone(&store),
None,
Arc::new(meerkat_store::MemoryBlobStore::new()),
));
let created = service
.create_session(create_req_deferred("seed"))
.await
.expect("create deferred session");
let session_id = created.session_id.clone();
let mut stored = store
.load(&session_id)
.await
.expect("load should succeed")
.expect("persisted session should exist");
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
stored.touch();
store.save(&stored).await.expect("save updated session");
let service_a = Arc::clone(&service);
let session_id_a = session_id.clone();
let first =
tokio::spawn(async move { service_a.start_turn(&session_id_a, turn_req("first")).await });
let service_b = Arc::clone(&service);
let session_id_b = session_id.clone();
let second = tokio::spawn(async move {
service_b
.start_turn(&session_id_b, turn_req("second"))
.await
});
let outcomes = [
first.await.expect("first start_turn task should join"),
second.await.expect("second start_turn task should join"),
];
let outcome_codes: Vec<String> = outcomes
.iter()
.map(|result| match result {
Ok(_) => "OK".to_string(),
Err(err) => err.code().to_string(),
})
.collect();
let outcome_debug: Vec<String> = outcomes
.iter()
.map(|result| match result {
Ok(run) => format!("OK({})", run.session_id),
Err(err) => format!("{err:?}"),
})
.collect();
assert!(
outcomes.iter().any(Result::is_ok),
"at least one concurrent start_turn should succeed after recovery; got {outcome_codes:?} {outcome_debug:?}",
);
for result in outcomes {
if let Err(err) = result {
assert_eq!(
err.code(),
"SESSION_BUSY",
"concurrent recovery must degrade to normal busy semantics; got {outcome_codes:?} {outcome_debug:?}",
);
}
}
}