use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use chrono::Utc;
use thiserror::Error;
use tokio::sync::Mutex;
use super::event::AgentEvent;
use super::run::RunId;
use super::stream::{RuntimeEventEnvelope, RuntimeEventId};
#[derive(Debug, Error)]
pub enum RuntimeEventStoreError {
#[error("runtime event store append failed: {message}")]
Append {
message: String,
},
#[error("runtime event store has no events for run {run_id}")]
NotFound {
run_id: RunId,
},
}
#[async_trait]
pub trait RuntimeEventStore: Send + Sync {
async fn append(
&self,
event: AgentEvent,
) -> Result<RuntimeEventEnvelope, RuntimeEventStoreError>;
async fn list_after(
&self,
run_id: RunId,
after_seq: Option<u64>,
limit: usize,
) -> Result<Vec<RuntimeEventEnvelope>, RuntimeEventStoreError>;
}
#[derive(Debug, Default)]
pub struct MemoryRuntimeEventStore {
events: Mutex<HashMap<RunId, Vec<RuntimeEventEnvelope>>>,
seq: Mutex<HashMap<RunId, u64>>,
sessions: Mutex<HashMap<RunId, Option<uuid::Uuid>>>,
}
impl MemoryRuntimeEventStore {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
#[async_trait]
impl RuntimeEventStore for MemoryRuntimeEventStore {
async fn append(
&self,
event: AgentEvent,
) -> Result<RuntimeEventEnvelope, RuntimeEventStoreError> {
let run_id = event.run_id();
let session_id = if let AgentEvent::RunStarted(started) = &event {
Some(started.session_id)
} else {
self.sessions.lock().await.get(&run_id).copied().flatten()
};
if let AgentEvent::RunStarted(started) = &event {
self.sessions
.lock()
.await
.insert(run_id, Some(started.session_id));
}
let next_seq = {
let mut counters = self.seq.lock().await;
let entry = counters.entry(run_id).or_default();
*entry += 1;
*entry
};
let envelope = RuntimeEventEnvelope {
event_id: RuntimeEventId::new(),
seq: next_seq,
run_id,
session_id,
event,
emitted_at: Utc::now(),
};
self.events
.lock()
.await
.entry(run_id)
.or_default()
.push(envelope.clone());
Ok(envelope)
}
async fn list_after(
&self,
run_id: RunId,
after_seq: Option<u64>,
limit: usize,
) -> Result<Vec<RuntimeEventEnvelope>, RuntimeEventStoreError> {
let events = self.events.lock().await;
let Some(run_events) = events.get(&run_id) else {
return Err(RuntimeEventStoreError::NotFound { run_id });
};
let filtered: Vec<RuntimeEventEnvelope> = run_events
.iter()
.filter(|env| match after_seq {
None => true,
Some(seq) => env.seq > seq,
})
.take(limit)
.cloned()
.collect();
Ok(filtered)
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct FailingRuntimeEventStore;
impl FailingRuntimeEventStore {
#[must_use]
pub fn new() -> Self {
Self
}
}
#[async_trait]
impl RuntimeEventStore for FailingRuntimeEventStore {
async fn append(
&self,
_event: AgentEvent,
) -> Result<RuntimeEventEnvelope, RuntimeEventStoreError> {
Err(RuntimeEventStoreError::Append {
message: "failing runtime event store always rejects appends".to_owned(),
})
}
async fn list_after(
&self,
run_id: RunId,
_after_seq: Option<u64>,
_limit: usize,
) -> Result<Vec<RuntimeEventEnvelope>, RuntimeEventStoreError> {
Err(RuntimeEventStoreError::NotFound { run_id })
}
}
pub type DynRuntimeEventStore = Arc<dyn RuntimeEventStore>;
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used, clippy::expect_used)]
use chrono::Utc;
use uuid::Uuid;
use super::*;
use crate::provider::{ModelName, ProviderId};
use crate::runtime::event::{RunCancelled, RunCompleted, RunFailed, RunStarted};
fn started(run_id: RunId, session_id: Uuid) -> AgentEvent {
AgentEvent::RunStarted(RunStarted {
run_id,
session_id,
provider: ProviderId::new("acme"),
model: ModelName::new("gpt-test"),
timestamp: Utc::now(),
})
}
fn terminal(run_id: RunId) -> AgentEvent {
AgentEvent::RunCompleted(RunCompleted {
run_id,
finish_reason: crate::provider::FinishReason::Stop,
iterations: 1,
timestamp: Utc::now(),
})
}
fn failed(run_id: RunId) -> AgentEvent {
AgentEvent::RunFailed(RunFailed {
run_id,
error: "boom".to_owned(),
timestamp: Utc::now(),
})
}
fn cancelled(run_id: RunId) -> AgentEvent {
AgentEvent::RunCancelled(RunCancelled {
run_id,
timestamp: Utc::now(),
})
}
#[tokio::test]
async fn append_assigns_monotonic_seq_per_run() {
let store = MemoryRuntimeEventStore::new();
let run = RunId::new();
let sid = Uuid::now_v7();
let e1 = store.append(started(run, sid)).await.unwrap();
let e2 = store.append(terminal(run)).await.unwrap();
let e3 = store.append(failed(run)).await.unwrap();
assert_eq!(e1.seq, 1);
assert_eq!(e2.seq, 2);
assert_eq!(e3.seq, 3);
}
#[tokio::test]
async fn append_propagates_session_id_from_run_started() {
let store = MemoryRuntimeEventStore::new();
let run = RunId::new();
let sid = Uuid::now_v7();
let started_env = store.append(started(run, sid)).await.unwrap();
assert_eq!(started_env.session_id, Some(sid));
let terminal_env = store.append(terminal(run)).await.unwrap();
assert_eq!(terminal_env.session_id, Some(sid));
}
#[tokio::test]
async fn list_after_filters_by_seq() {
let store = MemoryRuntimeEventStore::new();
let run = RunId::new();
let sid = Uuid::now_v7();
store.append(started(run, sid)).await.unwrap();
let e2 = store.append(terminal(run)).await.unwrap();
let e3 = store.append(failed(run)).await.unwrap();
let page = store.list_after(run, Some(e2.seq), 10).await.unwrap();
assert_eq!(page.len(), 1);
assert_eq!(page[0].seq, e3.seq);
}
#[tokio::test]
async fn list_after_respects_limit() {
let store = MemoryRuntimeEventStore::new();
let run = RunId::new();
let sid = Uuid::now_v7();
store.append(started(run, sid)).await.unwrap();
store.append(terminal(run)).await.unwrap();
store.append(failed(run)).await.unwrap();
let page = store.list_after(run, None, 2).await.unwrap();
assert_eq!(page.len(), 2);
}
#[tokio::test]
async fn list_after_unknown_run_is_not_found() {
let store = MemoryRuntimeEventStore::new();
let run = RunId::new();
let err = store.list_after(run, None, 10).await.unwrap_err();
assert!(matches!(err, RuntimeEventStoreError::NotFound { .. }));
}
#[tokio::test]
async fn envelope_is_terminal_recognizes_terminal_variants() {
let store = MemoryRuntimeEventStore::new();
let run = RunId::new();
let sid = Uuid::now_v7();
let non_terminal = store.append(started(run, sid)).await.unwrap();
assert!(!non_terminal.is_terminal());
let completed = store.append(terminal(run)).await.unwrap();
let failed_env = store.append(failed(run)).await.unwrap();
let cancelled_env = store.append(cancelled(run)).await.unwrap();
assert!(completed.is_terminal());
assert!(failed_env.is_terminal());
assert!(cancelled_env.is_terminal());
}
}