use agent_sdk_foundation::events::AgentEventEnvelope;
use agent_sdk_foundation::llm;
use agent_sdk_foundation::types::{AgentState, ThreadId, ToolExecution};
use anyhow::{Context, Result};
use async_trait::async_trait;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::sync::RwLock;
use tokio::sync::RwLock as AsyncRwLock;
#[async_trait]
pub trait MessageStore: Send + Sync {
async fn append(&self, thread_id: &ThreadId, message: llm::Message) -> Result<()>;
async fn get_history(&self, thread_id: &ThreadId) -> Result<Vec<llm::Message>>;
async fn clear(&self, thread_id: &ThreadId) -> Result<()>;
async fn count(&self, thread_id: &ThreadId) -> Result<usize> {
Ok(self.get_history(thread_id).await?.len())
}
async fn replace_history(
&self,
thread_id: &ThreadId,
messages: Vec<llm::Message>,
) -> Result<()>;
}
#[async_trait]
pub trait StateStore: Send + Sync {
async fn save(&self, state: &AgentState) -> Result<()>;
async fn load(&self, thread_id: &ThreadId) -> Result<Option<AgentState>>;
async fn delete(&self, thread_id: &ThreadId) -> Result<()>;
}
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
pub struct StoredTurnEvents {
pub turn: usize,
pub events: Vec<AgentEventEnvelope>,
pub finished: bool,
}
#[async_trait]
pub trait EventStore: Send + Sync {
async fn append(
&self,
thread_id: &ThreadId,
turn: usize,
envelope: AgentEventEnvelope,
) -> Result<()>;
async fn finish_turn(&self, thread_id: &ThreadId, turn: usize) -> Result<()>;
async fn get_turn(&self, thread_id: &ThreadId, turn: usize)
-> Result<Option<StoredTurnEvents>>;
async fn get_turns(&self, thread_id: &ThreadId) -> Result<Vec<StoredTurnEvents>>;
async fn get_events(&self, thread_id: &ThreadId) -> Result<Vec<AgentEventEnvelope>> {
let turns = self.get_turns(thread_id).await?;
Ok(turns
.into_iter()
.flat_map(|turn| turn.events.into_iter())
.collect())
}
async fn event_count(&self, thread_id: &ThreadId) -> Result<usize> {
Ok(self.get_events(thread_id).await?.len())
}
async fn get_events_since(
&self,
thread_id: &ThreadId,
offset: usize,
) -> Result<Vec<AgentEventEnvelope>> {
Ok(self
.get_events(thread_id)
.await?
.into_iter()
.skip(offset)
.collect())
}
async fn clear(&self, thread_id: &ThreadId) -> Result<()>;
}
#[async_trait]
pub trait ToolExecutionStore: Send + Sync {
async fn get_execution(&self, tool_call_id: &str) -> Result<Option<ToolExecution>>;
async fn record_execution(&self, execution: ToolExecution) -> Result<()>;
async fn update_execution(&self, execution: ToolExecution) -> Result<()>;
async fn get_execution_by_operation_id(
&self,
operation_id: &str,
) -> Result<Option<ToolExecution>>;
}
#[derive(Default)]
struct InMemoryStoreInner {
messages: RwLock<HashMap<String, Vec<llm::Message>>>,
states: RwLock<HashMap<String, AgentState>>,
}
#[derive(Clone, Default)]
pub struct InMemoryStore {
inner: Arc<InMemoryStoreInner>,
}
impl InMemoryStore {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
#[derive(Default)]
struct InMemoryEventStoreInner {
turns: AsyncRwLock<HashMap<String, BTreeMap<usize, StoredTurnEvents>>>,
}
#[derive(Clone, Default)]
pub struct InMemoryEventStore {
inner: Arc<InMemoryEventStoreInner>,
}
impl InMemoryEventStore {
#[must_use]
pub fn new() -> Self {
Self::default()
}
async fn update_turn(
&self,
thread_id: &ThreadId,
turn: usize,
update: impl FnOnce(&mut StoredTurnEvents) -> Result<()>,
) -> Result<()> {
let mut turns = self.inner.turns.write().await;
let stored_turn = turns
.entry(thread_id.0.clone())
.or_default()
.entry(turn)
.or_insert_with(|| StoredTurnEvents {
turn,
events: Vec::new(),
finished: false,
});
let result = update(stored_turn);
drop(turns);
result
}
}
#[async_trait]
impl MessageStore for InMemoryStore {
async fn append(&self, thread_id: &ThreadId, message: llm::Message) -> Result<()> {
self.inner
.messages
.write()
.ok()
.context("lock poisoned")?
.entry(thread_id.0.clone())
.or_default()
.push(message);
Ok(())
}
async fn get_history(&self, thread_id: &ThreadId) -> Result<Vec<llm::Message>> {
let messages = self.inner.messages.read().ok().context("lock poisoned")?;
Ok(messages.get(&thread_id.0).cloned().unwrap_or_default())
}
async fn clear(&self, thread_id: &ThreadId) -> Result<()> {
self.inner
.messages
.write()
.ok()
.context("lock poisoned")?
.remove(&thread_id.0);
Ok(())
}
async fn replace_history(
&self,
thread_id: &ThreadId,
messages: Vec<llm::Message>,
) -> Result<()> {
self.inner
.messages
.write()
.ok()
.context("lock poisoned")?
.insert(thread_id.0.clone(), messages);
Ok(())
}
}
#[async_trait]
impl StateStore for InMemoryStore {
async fn save(&self, state: &AgentState) -> Result<()> {
self.inner
.states
.write()
.ok()
.context("lock poisoned")?
.insert(state.thread_id.0.clone(), state.clone());
Ok(())
}
async fn load(&self, thread_id: &ThreadId) -> Result<Option<AgentState>> {
let states = self.inner.states.read().ok().context("lock poisoned")?;
Ok(states.get(&thread_id.0).cloned())
}
async fn delete(&self, thread_id: &ThreadId) -> Result<()> {
self.inner
.states
.write()
.ok()
.context("lock poisoned")?
.remove(&thread_id.0);
Ok(())
}
}
#[async_trait]
impl<T: MessageStore + ?Sized> MessageStore for Arc<T> {
async fn append(&self, thread_id: &ThreadId, message: llm::Message) -> Result<()> {
(**self).append(thread_id, message).await
}
async fn get_history(&self, thread_id: &ThreadId) -> Result<Vec<llm::Message>> {
(**self).get_history(thread_id).await
}
async fn clear(&self, thread_id: &ThreadId) -> Result<()> {
(**self).clear(thread_id).await
}
async fn count(&self, thread_id: &ThreadId) -> Result<usize> {
(**self).count(thread_id).await
}
async fn replace_history(
&self,
thread_id: &ThreadId,
messages: Vec<llm::Message>,
) -> Result<()> {
(**self).replace_history(thread_id, messages).await
}
}
#[async_trait]
impl<T: StateStore + ?Sized> StateStore for Arc<T> {
async fn save(&self, state: &AgentState) -> Result<()> {
(**self).save(state).await
}
async fn load(&self, thread_id: &ThreadId) -> Result<Option<AgentState>> {
(**self).load(thread_id).await
}
async fn delete(&self, thread_id: &ThreadId) -> Result<()> {
(**self).delete(thread_id).await
}
}
#[async_trait]
impl EventStore for InMemoryEventStore {
async fn append(
&self,
thread_id: &ThreadId,
turn: usize,
envelope: AgentEventEnvelope,
) -> Result<()> {
self.update_turn(thread_id, turn, |stored_turn| {
anyhow::ensure!(
!stored_turn.finished,
"cannot append to finished turn {turn}"
);
stored_turn.events.push(envelope);
Ok(())
})
.await
}
async fn finish_turn(&self, thread_id: &ThreadId, turn: usize) -> Result<()> {
self.update_turn(thread_id, turn, |stored_turn| {
anyhow::ensure!(!stored_turn.finished, "turn {turn} is already finished");
stored_turn.finished = true;
Ok(())
})
.await
}
async fn get_turn(
&self,
thread_id: &ThreadId,
turn: usize,
) -> Result<Option<StoredTurnEvents>> {
let turns = self.inner.turns.read().await;
Ok(turns
.get(&thread_id.0)
.and_then(|thread_turns| thread_turns.get(&turn).cloned()))
}
async fn get_turns(&self, thread_id: &ThreadId) -> Result<Vec<StoredTurnEvents>> {
let turns = self.inner.turns.read().await;
Ok(turns
.get(&thread_id.0)
.map(|thread_turns| thread_turns.values().cloned().collect())
.unwrap_or_default())
}
async fn event_count(&self, thread_id: &ThreadId) -> Result<usize> {
let turns = self.inner.turns.read().await;
Ok(turns.get(&thread_id.0).map_or(0, |thread_turns| {
thread_turns.values().map(|turn| turn.events.len()).sum()
}))
}
async fn get_events_since(
&self,
thread_id: &ThreadId,
offset: usize,
) -> Result<Vec<AgentEventEnvelope>> {
let turns = self.inner.turns.read().await;
Ok(turns
.get(&thread_id.0)
.map(|thread_turns| {
thread_turns
.values()
.flat_map(|turn| turn.events.iter())
.skip(offset)
.cloned()
.collect()
})
.unwrap_or_default())
}
async fn clear(&self, thread_id: &ThreadId) -> Result<()> {
{
let mut turns = self.inner.turns.write().await;
turns.remove(&thread_id.0);
}
Ok(())
}
}
pub struct ObservingEventStore<S, F> {
inner: S,
observer: F,
}
impl<S, F> ObservingEventStore<S, F>
where
S: EventStore,
F: Fn(&AgentEventEnvelope) + Send + Sync,
{
#[must_use]
pub const fn new(inner: S, observer: F) -> Self {
Self { inner, observer }
}
#[must_use]
pub const fn inner(&self) -> &S {
&self.inner
}
}
#[async_trait]
impl<S, F> EventStore for ObservingEventStore<S, F>
where
S: EventStore,
F: Fn(&AgentEventEnvelope) + Send + Sync,
{
async fn append(
&self,
thread_id: &ThreadId,
turn: usize,
envelope: AgentEventEnvelope,
) -> Result<()> {
(self.observer)(&envelope);
self.inner.append(thread_id, turn, envelope).await
}
async fn finish_turn(&self, thread_id: &ThreadId, turn: usize) -> Result<()> {
self.inner.finish_turn(thread_id, turn).await
}
async fn get_turn(
&self,
thread_id: &ThreadId,
turn: usize,
) -> Result<Option<StoredTurnEvents>> {
self.inner.get_turn(thread_id, turn).await
}
async fn get_turns(&self, thread_id: &ThreadId) -> Result<Vec<StoredTurnEvents>> {
self.inner.get_turns(thread_id).await
}
async fn get_events(&self, thread_id: &ThreadId) -> Result<Vec<AgentEventEnvelope>> {
self.inner.get_events(thread_id).await
}
async fn event_count(&self, thread_id: &ThreadId) -> Result<usize> {
self.inner.event_count(thread_id).await
}
async fn get_events_since(
&self,
thread_id: &ThreadId,
offset: usize,
) -> Result<Vec<AgentEventEnvelope>> {
self.inner.get_events_since(thread_id, offset).await
}
async fn clear(&self, thread_id: &ThreadId) -> Result<()> {
self.inner.clear(thread_id).await
}
}
#[derive(Default)]
pub struct InMemoryExecutionStore {
executions: RwLock<HashMap<String, ToolExecution>>,
operation_index: RwLock<HashMap<String, String>>,
}
impl InMemoryExecutionStore {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
#[async_trait]
impl ToolExecutionStore for InMemoryExecutionStore {
async fn get_execution(&self, tool_call_id: &str) -> Result<Option<ToolExecution>> {
let executions = self.executions.read().ok().context("lock poisoned")?;
Ok(executions.get(tool_call_id).cloned())
}
async fn record_execution(&self, execution: ToolExecution) -> Result<()> {
let tool_call_id = execution.tool_call_id.clone();
let operation_id = execution.operation_id.clone();
let mut executions = self.executions.write().ok().context("lock poisoned")?;
if let Some(op_id) = operation_id {
self.operation_index
.write()
.ok()
.context("lock poisoned")?
.insert(op_id, tool_call_id.clone());
}
executions.insert(tool_call_id, execution);
drop(executions);
Ok(())
}
async fn update_execution(&self, execution: ToolExecution) -> Result<()> {
let tool_call_id = execution.tool_call_id.clone();
let new_operation_id = execution.operation_id.clone();
let mut executions = self.executions.write().ok().context("lock poisoned")?;
let stale_op_id = executions
.get(&tool_call_id)
.and_then(|prev| prev.operation_id.clone())
.filter(|prev| Some(prev) != new_operation_id.as_ref());
if stale_op_id.is_some() || new_operation_id.is_some() {
let mut op_index = self.operation_index.write().ok().context("lock poisoned")?;
if let Some(stale) = stale_op_id {
op_index.remove(&stale);
}
if let Some(op_id) = new_operation_id {
op_index.insert(op_id, tool_call_id.clone());
}
}
executions.insert(tool_call_id, execution);
drop(executions);
Ok(())
}
async fn get_execution_by_operation_id(
&self,
operation_id: &str,
) -> Result<Option<ToolExecution>> {
let executions = self.executions.read().ok().context("lock poisoned")?;
let tool_call_id = {
let op_index = self.operation_index.read().ok().context("lock poisoned")?;
op_index.get(operation_id).cloned()
};
let Some(tool_call_id) = tool_call_id else {
return Ok(None);
};
Ok(executions.get(&tool_call_id).cloned())
}
}
#[cfg(test)]
mod tests {
use super::*;
use agent_sdk_foundation::events::{AgentEvent, AgentEventEnvelope, SequenceCounter};
use agent_sdk_foundation::llm::Message;
use agent_sdk_foundation::types::ToolResult;
#[tokio::test]
async fn test_in_memory_message_store() -> Result<()> {
let store = InMemoryStore::new();
let thread_id = ThreadId::new();
let history = store.get_history(&thread_id).await?;
assert!(history.is_empty());
store.append(&thread_id, Message::user("Hello")).await?;
store
.append(&thread_id, Message::assistant("Hi there!"))
.await?;
let history = store.get_history(&thread_id).await?;
assert_eq!(history.len(), 2);
let count = store.count(&thread_id).await?;
assert_eq!(count, 2);
store.clear(&thread_id).await?;
let history = store.get_history(&thread_id).await?;
assert!(history.is_empty());
Ok(())
}
#[tokio::test]
async fn test_replace_history() -> Result<()> {
let store = InMemoryStore::new();
let thread_id = ThreadId::new();
store.append(&thread_id, Message::user("Hello")).await?;
store
.append(&thread_id, Message::assistant("Hi there!"))
.await?;
store
.append(&thread_id, Message::user("How are you?"))
.await?;
let history = store.get_history(&thread_id).await?;
assert_eq!(history.len(), 3);
let new_history = vec![
Message::user("[Summary] Previous conversation about greetings"),
Message::assistant("I understand the context. Continuing..."),
];
store.replace_history(&thread_id, new_history).await?;
let history = store.get_history(&thread_id).await?;
assert_eq!(history.len(), 2);
Ok(())
}
#[tokio::test]
async fn test_in_memory_state_store() -> Result<()> {
let store = InMemoryStore::new();
let thread_id = ThreadId::new();
let state = store.load(&thread_id).await?;
assert!(state.is_none());
let state = AgentState::new(thread_id.clone());
store.save(&state).await?;
let loaded = store.load(&thread_id).await?;
assert!(loaded.is_some());
if let Some(loaded_state) = loaded {
assert_eq!(loaded_state.thread_id, thread_id);
}
store.delete(&thread_id).await?;
let state = store.load(&thread_id).await?;
assert!(state.is_none());
Ok(())
}
#[tokio::test]
async fn test_in_memory_event_store_tracks_turns_and_finish_barrier() -> Result<()> {
let store = InMemoryEventStore::new();
let thread_id = ThreadId::new();
let seq = SequenceCounter::new();
store
.append(
&thread_id,
1,
AgentEventEnvelope::wrap(AgentEvent::text("msg_1", "hello"), &seq),
)
.await?;
store
.append(
&thread_id,
2,
AgentEventEnvelope::wrap(AgentEvent::text("msg_2", "world"), &seq),
)
.await?;
let turn_1 = store
.get_turn(&thread_id, 1)
.await?
.context("missing turn 1")?;
assert_eq!(turn_1.turn, 1);
assert_eq!(turn_1.events.len(), 1);
assert!(!turn_1.finished);
store.finish_turn(&thread_id, 1).await?;
store.finish_turn(&thread_id, 2).await?;
let turn_1 = store
.get_turn(&thread_id, 1)
.await?
.context("missing finished turn 1")?;
let turn_2 = store
.get_turn(&thread_id, 2)
.await?
.context("missing finished turn 2")?;
assert!(turn_1.finished);
assert!(turn_2.finished);
let turns = store.get_turns(&thread_id).await?;
assert_eq!(turns.len(), 2);
assert_eq!(turns[0].turn, 1);
assert_eq!(turns[1].turn, 2);
Ok(())
}
#[tokio::test]
async fn test_in_memory_event_store_finish_turn_without_events_creates_finished_turn()
-> Result<()> {
let store = InMemoryEventStore::new();
let thread_id = ThreadId::new();
store.finish_turn(&thread_id, 3).await?;
let turn = store
.get_turn(&thread_id, 3)
.await?
.context("missing empty finished turn")?;
assert_eq!(turn.turn, 3);
assert!(turn.events.is_empty());
assert!(turn.finished);
store.clear(&thread_id).await?;
assert!(store.get_turns(&thread_id).await?.is_empty());
Ok(())
}
#[tokio::test]
async fn test_in_memory_event_store_rejects_append_after_finish() -> Result<()> {
let store = InMemoryEventStore::new();
let thread_id = ThreadId::new();
let seq = SequenceCounter::new();
store.finish_turn(&thread_id, 1).await?;
let error = store
.append(
&thread_id,
1,
AgentEventEnvelope::wrap(AgentEvent::text("msg_1", "late"), &seq),
)
.await
.expect_err("append after finish should fail");
assert!(error.to_string().contains("cannot append to finished turn"));
Ok(())
}
#[tokio::test]
async fn test_in_memory_event_store_rejects_duplicate_finish() -> Result<()> {
let store = InMemoryEventStore::new();
let thread_id = ThreadId::new();
store.finish_turn(&thread_id, 1).await?;
let error = store
.finish_turn(&thread_id, 1)
.await
.expect_err("duplicate finish should fail");
assert!(error.to_string().contains("already finished"));
Ok(())
}
#[tokio::test]
async fn test_execution_store_basic_operations() -> Result<()> {
let store = InMemoryExecutionStore::new();
let thread_id = ThreadId::new();
let execution = store.get_execution("tool_call_123").await?;
assert!(execution.is_none());
let execution = ToolExecution::new_in_flight(
"tool_call_123",
thread_id.clone(),
"my_tool",
"My Tool",
serde_json::json!({"param": "value"}),
time::OffsetDateTime::now_utc(),
);
store.record_execution(execution).await?;
let loaded = store.get_execution("tool_call_123").await?;
assert!(loaded.is_some());
let loaded = loaded.expect("execution should exist");
assert_eq!(loaded.tool_call_id, "tool_call_123");
assert_eq!(loaded.tool_name, "my_tool");
assert!(loaded.is_in_flight());
Ok(())
}
#[tokio::test]
async fn test_execution_store_complete_execution() -> Result<()> {
let store = InMemoryExecutionStore::new();
let thread_id = ThreadId::new();
let mut execution = ToolExecution::new_in_flight(
"tool_call_456",
thread_id.clone(),
"my_tool",
"My Tool",
serde_json::json!({}),
time::OffsetDateTime::now_utc(),
);
store.record_execution(execution.clone()).await?;
execution.complete(ToolResult::success("Done!"));
store.update_execution(execution).await?;
let loaded = store.get_execution("tool_call_456").await?;
let loaded = loaded.expect("execution should exist");
assert!(loaded.is_completed());
assert!(loaded.result.is_some());
assert!(loaded.result.as_ref().is_some_and(|r| r.success));
Ok(())
}
#[tokio::test]
async fn test_execution_store_operation_id_lookup() -> Result<()> {
let store = InMemoryExecutionStore::new();
let thread_id = ThreadId::new();
let mut execution = ToolExecution::new_in_flight(
"tool_call_789",
thread_id.clone(),
"async_tool",
"Async Tool",
serde_json::json!({}),
time::OffsetDateTime::now_utc(),
);
execution.set_operation_id("op_abc123");
store.record_execution(execution.clone()).await?;
store.update_execution(execution).await?;
let loaded = store.get_execution_by_operation_id("op_abc123").await?;
assert!(loaded.is_some());
let loaded = loaded.expect("execution should exist");
assert_eq!(loaded.tool_call_id, "tool_call_789");
assert_eq!(loaded.operation_id, Some("op_abc123".to_string()));
let not_found = store.get_execution_by_operation_id("nonexistent").await?;
assert!(not_found.is_none());
Ok(())
}
#[tokio::test]
async fn in_memory_store_clone_shares_history() -> Result<()> {
let store = InMemoryStore::new();
let handle = store.clone();
let thread_id = ThreadId::new();
store.append(&thread_id, Message::user("hello")).await?;
let history = handle.get_history(&thread_id).await?;
assert_eq!(
history.len(),
1,
"clone must observe appends via the original"
);
Ok(())
}
#[tokio::test]
async fn arc_store_blanket_impls_forward() -> Result<()> {
let store: Arc<InMemoryStore> = Arc::new(InMemoryStore::new());
let thread_id = ThreadId::new();
MessageStore::append(&store, &thread_id, Message::user("hi")).await?;
assert_eq!(MessageStore::count(&store, &thread_id).await?, 1);
let state = AgentState::new(thread_id.clone());
StateStore::save(&store, &state).await?;
assert!(StateStore::load(&store, &thread_id).await?.is_some());
assert_eq!(store.get_history(&thread_id).await?.len(), 1);
Ok(())
}
#[tokio::test]
async fn event_count_and_get_events_since_are_incremental() -> Result<()> {
let store = InMemoryEventStore::new();
let thread_id = ThreadId::new();
let seq = SequenceCounter::new();
assert_eq!(store.event_count(&thread_id).await?, 0);
for (turn, (id, text)) in [(1, ("m1", "a")), (1, ("m2", "b")), (2, ("m3", "c"))] {
store
.append(
&thread_id,
turn,
AgentEventEnvelope::wrap(AgentEvent::text(id, text), &seq),
)
.await?;
}
assert_eq!(store.event_count(&thread_id).await?, 3);
let tail = store.get_events_since(&thread_id, 1).await?;
assert_eq!(tail.len(), 2, "should skip the first event");
let all = store.get_events(&thread_id).await?;
assert_eq!(all.len(), 3);
Ok(())
}
#[tokio::test]
async fn record_execution_indexes_operation_id_immediately() -> Result<()> {
let store = InMemoryExecutionStore::new();
let thread_id = ThreadId::new();
let mut execution = ToolExecution::new_in_flight(
"call_1",
thread_id,
"async_tool",
"Async Tool",
serde_json::json!({}),
time::OffsetDateTime::now_utc(),
);
execution.set_operation_id("op_1");
store.record_execution(execution).await?;
let loaded = store.get_execution_by_operation_id("op_1").await?;
assert_eq!(
loaded
.context("write-ahead operation_id must resolve")?
.tool_call_id,
"call_1"
);
Ok(())
}
#[tokio::test]
async fn update_execution_removes_stale_operation_id() -> Result<()> {
let store = InMemoryExecutionStore::new();
let thread_id = ThreadId::new();
let mut execution = ToolExecution::new_in_flight(
"call_2",
thread_id,
"async_tool",
"Async Tool",
serde_json::json!({}),
time::OffsetDateTime::now_utc(),
);
execution.set_operation_id("op_old");
store.record_execution(execution.clone()).await?;
execution.set_operation_id("op_new");
store.update_execution(execution).await?;
assert!(
store
.get_execution_by_operation_id("op_old")
.await?
.is_none(),
"superseded operation_id must stop resolving"
);
let loaded = store.get_execution_by_operation_id("op_new").await?;
assert_eq!(
loaded
.context("new operation_id must resolve")?
.tool_call_id,
"call_2"
);
Ok(())
}
#[tokio::test]
async fn observing_event_store_invokes_callback_and_delegates() -> Result<()> {
use std::sync::atomic::{AtomicUsize, Ordering};
let seen = Arc::new(AtomicUsize::new(0));
let seen_for_cb = Arc::clone(&seen);
let store = ObservingEventStore::new(InMemoryEventStore::new(), move |_envelope| {
seen_for_cb.fetch_add(1, Ordering::SeqCst);
});
let thread_id = ThreadId::new();
let seq = SequenceCounter::new();
store
.append(
&thread_id,
1,
AgentEventEnvelope::wrap(AgentEvent::text("m1", "hi"), &seq),
)
.await?;
store
.append(
&thread_id,
1,
AgentEventEnvelope::wrap(AgentEvent::text("m2", "yo"), &seq),
)
.await?;
assert_eq!(seen.load(Ordering::SeqCst), 2, "observer runs per append");
assert_eq!(store.get_events(&thread_id).await?.len(), 2);
assert_eq!(store.inner().get_events(&thread_id).await?.len(), 2);
Ok(())
}
}