use async_trait::async_trait;
use indexmap::IndexSet;
#[allow(unused_imports)] use meerkat_core::Session;
use meerkat_core::SessionSystemContextState;
use meerkat_core::service::{
AppendSystemContextRequest, AppendSystemContextResult, CreateSessionRequest,
SessionControlError, SessionError, SessionInfo, SessionQuery, SessionService,
SessionServiceCommsExt, SessionServiceControlExt, SessionSummary, SessionUsage, SessionView,
StartTurnRequest,
};
use meerkat_core::types::{RunResult, SessionId};
use meerkat_store::SessionStore;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::SESSION_LABELS_KEY;
use crate::ephemeral::{EphemeralSessionService, SessionAgentBuilder};
fn write_system_context_state(
session: &mut Session,
state: SessionSystemContextState,
) -> Result<(), SessionControlError> {
session.set_system_context_state(state).map_err(|err| {
SessionControlError::Session(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(format!(
"failed to serialize system-context state: {err}"
)),
))
})
}
struct CheckpointerGate {
cancelled: Mutex<bool>,
}
struct StoreCheckpointer {
store: Arc<dyn SessionStore>,
gate: Arc<CheckpointerGate>,
last_saved_len: std::sync::atomic::AtomicUsize,
}
#[async_trait]
impl meerkat_core::checkpoint::SessionCheckpointer for StoreCheckpointer {
async fn checkpoint(&self, session: &Session) {
let guard = self.gate.cancelled.lock().await;
if *guard {
return;
}
let current_len = session.messages().len();
let prev_len = self
.last_saved_len
.load(std::sync::atomic::Ordering::Acquire);
if current_len == prev_len {
return;
}
if let Err(e) = self.store.save(session).await {
tracing::warn!("Host-mode checkpoint failed: {e}");
} else {
self.last_saved_len
.store(current_len, std::sync::atomic::Ordering::Release);
}
drop(guard);
}
}
pub struct PersistentSessionService<B: SessionAgentBuilder> {
inner: EphemeralSessionService<B>,
store: Arc<dyn SessionStore>,
checkpointer_gates: Mutex<HashMap<SessionId, Arc<CheckpointerGate>>>,
}
fn extract_labels_from_metadata(
metadata: &serde_json::Map<String, serde_json::Value>,
) -> BTreeMap<String, String> {
match metadata.get(SESSION_LABELS_KEY) {
Some(v) => match serde_json::from_value::<BTreeMap<String, String>>(v.clone()) {
Ok(labels) => labels,
Err(e) => {
tracing::warn!(
key = SESSION_LABELS_KEY,
error = %e,
"failed to deserialize session labels from metadata"
);
BTreeMap::new()
}
},
None => BTreeMap::new(),
}
}
impl<B: SessionAgentBuilder + 'static> PersistentSessionService<B> {
pub fn new(builder: B, max_sessions: usize, store: Arc<dyn SessionStore>) -> Self {
Self {
inner: EphemeralSessionService::new(builder, max_sessions),
store,
checkpointer_gates: Mutex::new(HashMap::new()),
}
}
async fn gate_for_session(&self, id: &SessionId) -> Arc<CheckpointerGate> {
let mut gates = self.checkpointer_gates.lock().await;
Arc::clone(gates.entry(id.clone()).or_insert_with(|| {
Arc::new(CheckpointerGate {
cancelled: Mutex::new(false),
})
}))
}
async fn existing_gate_for_session(&self, id: &SessionId) -> Option<Arc<CheckpointerGate>> {
let gates = self.checkpointer_gates.lock().await;
gates.get(id).cloned()
}
}
#[async_trait]
impl<B: SessionAgentBuilder + 'static> SessionService for PersistentSessionService<B> {
async fn create_session(
&self,
mut req: CreateSessionRequest,
) -> Result<RunResult, SessionError> {
let req_labels = req.labels.clone();
let gate = Arc::new(CheckpointerGate {
cancelled: Mutex::new(false),
});
let checkpointer = Arc::new(StoreCheckpointer {
store: Arc::clone(&self.store),
gate: Arc::clone(&gate),
last_saved_len: std::sync::atomic::AtomicUsize::new(0),
});
let build = req.build.get_or_insert_with(Default::default);
build.checkpointer = Some(checkpointer.clone());
let result = self.inner.create_session(req).await?;
{
self.checkpointer_gates
.lock()
.await
.insert(result.session_id.clone(), gate);
}
let saved_len = self.persist_full_session(&result.session_id).await?;
checkpointer
.last_saved_len
.store(saved_len, std::sync::atomic::Ordering::Release);
if let Some(ref labels) = req_labels
&& !labels.is_empty()
&& let Ok(Some(mut session)) = self.store.load(&result.session_id).await
&& let Ok(labels_value) = serde_json::to_value(labels)
{
session.set_metadata(SESSION_LABELS_KEY, labels_value);
if let Err(e) = self.store.save(&session).await {
tracing::warn!(
session_id = %result.session_id,
error = %e,
"failed to persist session labels"
);
}
}
Ok(result)
}
async fn start_turn(
&self,
id: &SessionId,
req: StartTurnRequest,
) -> Result<RunResult, SessionError> {
let result = self.inner.start_turn(id, req).await?;
let _ = self.persist_full_session(id).await?;
Ok(result)
}
async fn interrupt(&self, id: &SessionId) -> Result<(), SessionError> {
self.inner.interrupt(id).await
}
async fn read(&self, id: &SessionId) -> Result<SessionView, SessionError> {
match self.inner.read(id).await {
Ok(view) => Ok(view),
Err(SessionError::NotFound { .. }) => {
let session = self
.store
.load(id)
.await
.map_err(|e| SessionError::Store(Box::new(e)))?
.ok_or_else(|| SessionError::NotFound { id: id.clone() })?;
let labels = extract_labels_from_metadata(session.metadata());
Ok(SessionView {
state: SessionInfo {
session_id: session.id().clone(),
created_at: session.created_at(),
updated_at: session.updated_at(),
message_count: session.messages().len(),
is_active: false,
last_assistant_text: session.last_assistant_text(),
labels,
},
billing: SessionUsage {
total_tokens: session.total_tokens(),
usage: session.total_usage(),
},
})
}
Err(e) => Err(e),
}
}
async fn list(&self, query: SessionQuery) -> Result<Vec<SessionSummary>, SessionError> {
let mut summaries = self.inner.list(SessionQuery::default()).await?;
let live_ids: IndexSet<_> = summaries.iter().map(|s| s.session_id.clone()).collect();
let stored = self
.store
.list(meerkat_store::SessionFilter::default())
.await
.map_err(|e| SessionError::Store(Box::new(e)))?;
for meta in stored {
if !live_ids.contains(&meta.id) {
let labels = extract_labels_from_metadata(&meta.metadata);
summaries.push(SessionSummary {
session_id: meta.id,
created_at: meta.created_at,
updated_at: meta.updated_at,
message_count: meta.message_count,
total_tokens: meta.total_tokens,
is_active: false,
labels,
});
}
}
if let Some(ref filter_labels) = query.labels {
summaries.retain(|s| {
filter_labels
.iter()
.all(|(k, v)| s.labels.get(k) == Some(v))
});
}
if let Some(offset) = query.offset {
if offset < summaries.len() {
summaries = summaries.split_off(offset);
} else {
summaries.clear();
}
}
if let Some(limit) = query.limit {
summaries.truncate(limit);
}
Ok(summaries)
}
async fn archive(&self, id: &SessionId) -> Result<(), SessionError> {
let gate = self.existing_gate_for_session(id).await;
let mut gate_guard = if let Some(ref gate) = gate {
let mut guard = gate.cancelled.lock().await;
*guard = true;
Some(guard)
} else {
None
};
let live_result = self.inner.archive(id).await;
let in_store = self
.store
.exists(id)
.await
.map_err(|e| SessionError::Store(Box::new(e)))?;
if in_store {
self.store
.delete(id)
.await
.map_err(|e| SessionError::Store(Box::new(e)))?;
}
drop(gate_guard.take());
self.checkpointer_gates.lock().await.remove(id);
match (&live_result, in_store) {
(Ok(()), _) | (_, true) => Ok(()),
_ => live_result,
}
}
async fn subscribe_session_events(
&self,
id: &SessionId,
) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
self.inner.subscribe_session_events(id).await
}
}
#[async_trait]
impl<B: SessionAgentBuilder + 'static> SessionServiceCommsExt for PersistentSessionService<B> {
async fn comms_runtime(
&self,
session_id: &SessionId,
) -> Option<std::sync::Arc<dyn meerkat_core::agent::CommsRuntime>> {
self.inner.comms_runtime(session_id).await
}
async fn event_injector(
&self,
session_id: &SessionId,
) -> Option<std::sync::Arc<dyn meerkat_core::EventInjector>> {
self.inner.event_injector(session_id).await
}
async fn interaction_event_injector(
&self,
session_id: &SessionId,
) -> Option<std::sync::Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
self.inner.interaction_event_injector(session_id).await
}
}
#[async_trait]
impl<B: SessionAgentBuilder + 'static> SessionServiceControlExt for PersistentSessionService<B> {
async fn append_system_context(
&self,
id: &SessionId,
req: AppendSystemContextRequest,
) -> Result<AppendSystemContextResult, SessionControlError> {
let existing_gate = self.existing_gate_for_session(id).await;
if let Some(state_arc) = self.inner.system_context_state(id).await {
let created_gate = existing_gate.is_none();
let gate = match existing_gate {
Some(gate) => gate,
None => self.gate_for_session(id).await,
};
let gate_guard = gate.cancelled.lock().await;
if *gate_guard {
return Err(SessionControlError::Session(SessionError::NotFound {
id: id.clone(),
}));
}
let accepted_at = meerkat_core::time_compat::SystemTime::now();
let mut attempts = 0usize;
loop {
attempts += 1;
let (status, snapshot_state, persisted_state) = {
let guard = match state_arc.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!(
session_id = %id,
"system-context state lock poisoned while snapshotting live append"
);
poisoned.into_inner()
}
};
let snapshot_state = guard.clone();
let mut candidate = snapshot_state.clone();
let status = candidate
.stage_append(&req, accepted_at)
.map_err(|err| err.into_control_error(id))?;
(status, snapshot_state, candidate)
};
let mut session = match self.inner.export_session(id).await {
Ok(session) => session,
Err(err) => {
if created_gate && matches!(err, SessionError::NotFound { .. }) {
drop(gate_guard);
self.checkpointer_gates.lock().await.remove(id);
}
return Err(SessionControlError::Session(err));
}
};
write_system_context_state(&mut session, persisted_state)?;
self.store
.save(&session)
.await
.map_err(|e| SessionControlError::Session(SessionError::Store(Box::new(e))))?;
let commit_result = {
let mut guard = match state_arc.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!(
session_id = %id,
"system-context state lock poisoned while committing live append"
);
poisoned.into_inner()
}
};
if *guard == snapshot_state {
let live_status = guard
.stage_append(&req, accepted_at)
.map_err(|err| err.into_control_error(id))?;
Some(live_status)
} else {
None
}
};
if let Some(live_status) = commit_result {
debug_assert_eq!(live_status, status);
drop(gate_guard);
return Ok(AppendSystemContextResult { status });
}
if attempts >= 8 {
return Err(SessionControlError::Session(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(
"system-context state changed repeatedly while staging append"
.to_string(),
),
)));
}
}
}
if let Some(gate) = existing_gate {
let gate_guard = gate.cancelled.lock().await;
if *gate_guard {
return Err(SessionControlError::Session(SessionError::NotFound {
id: id.clone(),
}));
}
drop(gate_guard);
}
let mut session = match self
.store
.load(id)
.await
.map_err(|e| SessionError::Store(Box::new(e)))?
{
Some(session) => session,
None => {
self.checkpointer_gates.lock().await.remove(id);
return Err(SessionControlError::Session(SessionError::NotFound {
id: id.clone(),
}));
}
};
let mut state = session.system_context_state().unwrap_or_default();
let status = state
.stage_append(&req, meerkat_core::time_compat::SystemTime::now())
.map_err(|err| err.into_control_error(id))?;
write_system_context_state(&mut session, state)?;
self.store
.save(&session)
.await
.map_err(|e| SessionControlError::Session(SessionError::Store(Box::new(e))))?;
Ok(AppendSystemContextResult { status })
}
}
impl<B: SessionAgentBuilder + 'static> PersistentSessionService<B> {
pub async fn event_injector(
&self,
session_id: &SessionId,
) -> Option<std::sync::Arc<dyn meerkat_core::EventInjector>> {
self.inner.event_injector(session_id).await
}
#[doc(hidden)]
pub async fn interaction_event_injector(
&self,
session_id: &SessionId,
) -> Option<std::sync::Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
self.inner.interaction_event_injector(session_id).await
}
pub async fn comms_runtime(
&self,
session_id: &SessionId,
) -> Option<std::sync::Arc<dyn meerkat_core::agent::CommsRuntime>> {
self.inner.comms_runtime(session_id).await
}
pub async fn wait_session_registered(&self) {
self.inner.wait_session_registered().await;
}
pub async fn shutdown(&self) {
self.inner.shutdown().await;
}
pub async fn cancel_all_checkpointers(&self) {
let gates = self.checkpointer_gates.lock().await;
for gate in gates.values() {
let mut cancelled = gate.cancelled.lock().await;
*cancelled = true;
}
}
pub async fn rearm_all_checkpointers(&self) {
let gates = self.checkpointer_gates.lock().await;
for gate in gates.values() {
let mut cancelled = gate.cancelled.lock().await;
*cancelled = false;
}
}
pub async fn subscribe_session_events(
&self,
id: &SessionId,
) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
self.inner.subscribe_session_events(id).await
}
pub async fn load_persisted(&self, id: &SessionId) -> Result<Option<Session>, SessionError> {
self.store
.load(id)
.await
.map_err(|e| SessionError::Store(Box::new(e)))
}
async fn persist_full_session(&self, id: &SessionId) -> Result<usize, SessionError> {
let session = self.inner.export_session(id).await?;
let message_count = session.messages().len();
self.store
.save(&session)
.await
.map_err(|e| SessionError::Store(Box::new(e)))?;
Ok(message_count)
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::ephemeral::{SessionAgent, SessionAgentBuilder, SessionSnapshot};
use meerkat_store::MemoryStore;
use meerkat_store::StoreError;
use std::sync::atomic::{AtomicBool, Ordering};
struct FailSaveStore {
inner: MemoryStore,
fail_save: AtomicBool,
}
impl FailSaveStore {
fn new() -> Self {
Self {
inner: MemoryStore::new(),
fail_save: AtomicBool::new(false),
}
}
fn set_fail_save(&self, fail: bool) {
self.fail_save.store(fail, Ordering::Release);
}
}
#[async_trait::async_trait]
impl SessionStore for FailSaveStore {
async fn save(&self, session: &Session) -> Result<(), StoreError> {
if self.fail_save.load(Ordering::Acquire) {
return Err(StoreError::Internal("forced save failure".to_string()));
}
self.inner.save(session).await
}
async fn load(&self, id: &SessionId) -> Result<Option<Session>, StoreError> {
self.inner.load(id).await
}
async fn list(
&self,
filter: meerkat_store::SessionFilter,
) -> Result<Vec<meerkat_core::SessionMeta>, StoreError> {
self.inner.list(filter).await
}
async fn delete(&self, id: &SessionId) -> Result<(), StoreError> {
self.inner.delete(id).await
}
}
struct DummyAgent {
session: Arc<std::sync::Mutex<Session>>,
system_context_state: Arc<std::sync::Mutex<meerkat_core::SessionSystemContextState>>,
}
#[async_trait::async_trait]
impl SessionAgent for DummyAgent {
async fn run_with_events(
&mut self,
prompt: String,
_event_tx: tokio::sync::mpsc::Sender<meerkat_core::event::AgentEvent>,
) -> Result<RunResult, meerkat_core::error::AgentError> {
let session_id = self.session_id();
let mut session = match self.session.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
session.push(meerkat_core::types::Message::User(
meerkat_core::types::UserMessage { content: prompt },
));
session.push(meerkat_core::types::Message::Assistant(
meerkat_core::types::AssistantMessage {
content: "ok".to_string(),
tool_calls: Vec::new(),
stop_reason: meerkat_core::types::StopReason::EndTurn,
usage: meerkat_core::types::Usage::default(),
},
));
Ok(RunResult {
text: "ok".to_string(),
session_id,
usage: meerkat_core::types::Usage::default(),
turns: 1,
tool_calls: 0,
structured_output: None,
schema_warnings: None,
skill_diagnostics: None,
})
}
async fn run_host_mode(
&mut self,
prompt: String,
) -> Result<RunResult, meerkat_core::error::AgentError> {
self.run_with_events(prompt, tokio::sync::mpsc::channel(1).0)
.await
}
fn set_skill_references(&mut self, _refs: Option<Vec<meerkat_core::skills::SkillKey>>) {}
fn set_flow_tool_overlay(
&mut self,
_overlay: Option<meerkat_core::service::TurnToolOverlay>,
) -> Result<(), meerkat_core::error::AgentError> {
Ok(())
}
fn cancel(&mut self) {}
fn session_id(&self) -> SessionId {
match self.session.lock() {
Ok(guard) => guard.id().clone(),
Err(poisoned) => poisoned.into_inner().id().clone(),
}
}
fn snapshot(&self) -> SessionSnapshot {
let session = match self.session.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
SessionSnapshot {
created_at: session.created_at(),
updated_at: session.updated_at(),
message_count: session.messages().len(),
total_tokens: session.total_tokens(),
usage: session.total_usage(),
last_assistant_text: session.last_assistant_text(),
}
}
fn session_clone(&self) -> Session {
match self.session.lock() {
Ok(guard) => guard.clone(),
Err(poisoned) => poisoned.into_inner().clone(),
}
}
fn system_context_state(
&self,
) -> Arc<std::sync::Mutex<meerkat_core::SessionSystemContextState>> {
Arc::clone(&self.system_context_state)
}
}
struct DummyBuilder;
#[async_trait::async_trait]
impl SessionAgentBuilder for DummyBuilder {
type Agent = DummyAgent;
async fn build_agent(
&self,
_req: &CreateSessionRequest,
_event_tx: tokio::sync::mpsc::Sender<meerkat_core::event::AgentEvent>,
) -> Result<Self::Agent, SessionError> {
Ok(DummyAgent {
session: Arc::new(std::sync::Mutex::new(Session::new())),
system_context_state: Arc::new(std::sync::Mutex::new(
meerkat_core::SessionSystemContextState::default(),
)),
})
}
}
#[tokio::test]
async fn test_persistent_load_persisted_returns_stored_session() {
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let session = Session::new();
let id = session.id().clone();
store.save(&session).await.unwrap();
let loaded = store.load(&id).await.unwrap();
assert!(loaded.is_some());
assert_eq!(loaded.unwrap().id(), &id);
}
#[tokio::test]
async fn test_persistent_load_persisted_returns_none_for_unknown() {
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let unknown = SessionId::new();
let loaded = store.load(&unknown).await.unwrap();
assert!(loaded.is_none());
}
#[tokio::test]
async fn test_persistent_archive_deletes_from_store() {
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let session = Session::new();
let id = session.id().clone();
store.save(&session).await.unwrap();
assert!(store.load(&id).await.unwrap().is_some());
store.delete(&id).await.unwrap();
assert!(store.load(&id).await.unwrap().is_none());
}
#[tokio::test]
async fn test_store_checkpointer_saves_session() {
use meerkat_core::checkpoint::SessionCheckpointer;
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let gate = Arc::new(super::CheckpointerGate {
cancelled: tokio::sync::Mutex::new(false),
});
let checkpointer = super::StoreCheckpointer {
store: Arc::clone(&store),
gate,
last_saved_len: std::sync::atomic::AtomicUsize::new(0),
};
let mut session = Session::new();
session.push(meerkat_core::types::Message::User(
meerkat_core::types::UserMessage {
content: "hello".to_string(),
},
));
checkpointer.checkpoint(&session).await;
let loaded = store.load(session.id()).await.unwrap();
assert!(
loaded.is_some(),
"session should be persisted after checkpoint"
);
let loaded = loaded.unwrap();
assert_eq!(loaded.id(), session.id());
assert_eq!(loaded.messages().len(), session.messages().len());
}
#[tokio::test]
async fn test_store_checkpointer_suppressed_after_cancellation() {
use meerkat_core::checkpoint::SessionCheckpointer;
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let gate = Arc::new(super::CheckpointerGate {
cancelled: tokio::sync::Mutex::new(false),
});
let checkpointer = super::StoreCheckpointer {
store: Arc::clone(&store),
gate: Arc::clone(&gate),
last_saved_len: std::sync::atomic::AtomicUsize::new(0),
};
let mut session = Session::new();
session.push(meerkat_core::types::Message::User(
meerkat_core::types::UserMessage {
content: "hello".to_string(),
},
));
checkpointer.checkpoint(&session).await;
assert!(store.load(session.id()).await.unwrap().is_some());
{
let mut guard = gate.cancelled.lock().await;
*guard = true;
store.delete(session.id()).await.unwrap();
}
session.push(meerkat_core::types::Message::User(
meerkat_core::types::UserMessage {
content: "world".to_string(),
},
));
checkpointer.checkpoint(&session).await;
assert!(
store.load(session.id()).await.unwrap().is_none(),
"cancelled checkpointer should not write session back"
);
}
#[tokio::test]
async fn test_store_checkpointer_skips_unchanged_session() {
use meerkat_core::checkpoint::SessionCheckpointer;
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let gate = Arc::new(super::CheckpointerGate {
cancelled: tokio::sync::Mutex::new(false),
});
let checkpointer = super::StoreCheckpointer {
store: Arc::clone(&store),
gate,
last_saved_len: std::sync::atomic::AtomicUsize::new(0),
};
let mut session = Session::new();
session.push(meerkat_core::types::Message::User(
meerkat_core::types::UserMessage {
content: "hello".to_string(),
},
));
checkpointer.checkpoint(&session).await;
assert!(store.load(session.id()).await.unwrap().is_some());
store.delete(session.id()).await.unwrap();
checkpointer.checkpoint(&session).await;
assert!(
store.load(session.id()).await.unwrap().is_none(),
"unchanged session should not be re-saved"
);
session.push(meerkat_core::types::Message::User(
meerkat_core::types::UserMessage {
content: "world".to_string(),
},
));
checkpointer.checkpoint(&session).await;
assert!(
store.load(session.id()).await.unwrap().is_some(),
"changed session should be saved"
);
}
#[tokio::test]
async fn test_persistent_archive_store_only_session_succeeds() {
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let session = Session::new();
let id = session.id().clone();
store.save(&session).await.unwrap();
assert!(store.load(&id).await.unwrap().is_some());
store.delete(&id).await.unwrap();
assert!(store.load(&id).await.unwrap().is_none());
}
#[tokio::test]
async fn test_append_system_context_does_not_recreate_archived_store_row() {
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let service = PersistentSessionService::new(DummyBuilder, 4, Arc::clone(&store));
let session = Session::new();
let id = session.id().clone();
store.save(&session).await.unwrap();
service.archive(&id).await.unwrap();
assert!(store.load(&id).await.unwrap().is_none());
let err = service
.append_system_context(
&id,
AppendSystemContextRequest {
text: "runtime notice".to_string(),
source: Some("mob".to_string()),
idempotency_key: Some("ctx-persistent-archive".to_string()),
},
)
.await
.expect_err("archived session must not be recreated by append");
assert_eq!(err.code(), "SESSION_NOT_FOUND");
assert!(
store.load(&id).await.unwrap().is_none(),
"append after archive must not recreate the store row"
);
}
#[tokio::test]
async fn test_append_system_context_repersist_live_session_when_store_row_missing() {
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let service = PersistentSessionService::new(DummyBuilder, 4, Arc::clone(&store));
let result = service
.create_session(CreateSessionRequest {
model: "test".to_string(),
prompt: "hello".to_string(),
system_prompt: None,
max_tokens: None,
event_tx: None,
host_mode: false,
skill_references: None,
initial_turn: meerkat_core::service::InitialTurnPolicy::RunImmediately,
build: None,
labels: None,
})
.await
.expect("create_session should succeed");
let id = result.session_id;
store
.delete(&id)
.await
.expect("test should be able to remove persisted row");
assert!(
store.load(&id).await.unwrap().is_none(),
"store row should be absent before append"
);
let result = service
.append_system_context(
&id,
AppendSystemContextRequest {
text: "runtime notice".to_string(),
source: Some("mob".to_string()),
idempotency_key: Some("ctx-persistent-live".to_string()),
},
)
.await
.expect("live append should repersist from the live session snapshot");
assert_eq!(
result.status,
meerkat_core::AppendSystemContextStatus::Staged
);
let stored = store
.load(&id)
.await
.expect("load should succeed")
.expect("append should restore the persisted row");
let state = stored
.system_context_state()
.expect("restored row should contain pending system-context state");
assert_eq!(state.pending.len(), 1);
assert_eq!(state.pending[0].text, "runtime notice");
}
#[tokio::test]
async fn test_append_system_context_live_save_failure_does_not_mutate_runtime_state() {
let store = Arc::new(FailSaveStore::new());
let service = PersistentSessionService::new(
DummyBuilder,
4,
Arc::clone(&store) as Arc<dyn SessionStore>,
);
let result = service
.create_session(CreateSessionRequest {
model: "test".to_string(),
prompt: "hello".to_string(),
system_prompt: None,
max_tokens: None,
event_tx: None,
host_mode: false,
skill_references: None,
initial_turn: meerkat_core::service::InitialTurnPolicy::RunImmediately,
build: None,
labels: None,
})
.await
.expect("create_session should succeed");
let id = result.session_id;
store.set_fail_save(true);
let err = service
.append_system_context(
&id,
AppendSystemContextRequest {
text: "runtime notice".to_string(),
source: Some("mob".to_string()),
idempotency_key: Some("ctx-save-failure".to_string()),
},
)
.await
.expect_err("append should surface the store failure");
assert_eq!(err.code(), "SESSION_STORE_ERROR");
let state = service
.inner
.system_context_state(&id)
.await
.expect("live session should still exist");
let guard = state.lock().expect("system-context state lock");
assert!(
guard.pending.is_empty(),
"failed append must not mutate live runtime state"
);
assert!(
!guard.seen.contains_key("ctx-save-failure"),
"failed append must not reserve the idempotency key in live state"
);
}
#[tokio::test]
async fn test_append_system_context_unknown_session_does_not_allocate_gate() {
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let service = PersistentSessionService::new(DummyBuilder, 4, Arc::clone(&store));
let unknown = SessionId::new();
let err = service
.append_system_context(
&unknown,
AppendSystemContextRequest {
text: "runtime notice".to_string(),
source: Some("mob".to_string()),
idempotency_key: Some("ctx-unknown".to_string()),
},
)
.await
.expect_err("unknown session must fail");
assert_eq!(err.code(), "SESSION_NOT_FOUND");
assert!(
service.checkpointer_gates.lock().await.is_empty(),
"unknown-session append must not allocate a checkpointer gate"
);
}
}