use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::SystemTime;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
pub const DEFAULT_MAX_EVENTS_PER_SESSION: usize = 1000;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct EventRecord {
pub id: u64,
pub data: String,
pub timestamp: SystemTime,
}
impl EventRecord {
pub fn new(id: u64, data: impl Into<String>) -> Self {
Self {
id,
data: data.into(),
timestamp: SystemTime::now(),
}
}
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum EventStoreError {
#[error("encode error: {0}")]
Encode(String),
#[error("decode error: {0}")]
Decode(String),
#[error("backend error: {0}")]
Backend(String),
}
pub type Result<T> = std::result::Result<T, EventStoreError>;
#[async_trait]
pub trait EventStore: Send + Sync + 'static {
async fn append(&self, session_id: &str, event: EventRecord) -> Result<()>;
async fn replay_after(&self, session_id: &str, after_id: u64) -> Result<Vec<EventRecord>>;
async fn purge_session(&self, session_id: &str) -> Result<()>;
}
#[derive(Debug, Clone)]
pub struct MemoryEventStore {
inner: Arc<RwLock<HashMap<String, VecDeque<EventRecord>>>>,
max_events_per_session: usize,
}
impl Default for MemoryEventStore {
fn default() -> Self {
Self::with_capacity(DEFAULT_MAX_EVENTS_PER_SESSION)
}
}
impl MemoryEventStore {
pub fn new() -> Self {
Self::default()
}
pub fn with_capacity(max_events_per_session: usize) -> Self {
Self {
inner: Arc::new(RwLock::new(HashMap::new())),
max_events_per_session,
}
}
pub async fn total_events(&self) -> usize {
self.inner.read().await.values().map(|v| v.len()).sum()
}
pub async fn session_count(&self) -> usize {
self.inner.read().await.len()
}
}
#[async_trait]
impl EventStore for MemoryEventStore {
async fn append(&self, session_id: &str, event: EventRecord) -> Result<()> {
let mut map = self.inner.write().await;
let buf = map.entry(session_id.to_string()).or_default();
if buf.len() >= self.max_events_per_session {
buf.pop_front();
}
buf.push_back(event);
Ok(())
}
async fn replay_after(&self, session_id: &str, after_id: u64) -> Result<Vec<EventRecord>> {
let map = self.inner.read().await;
Ok(match map.get(session_id) {
Some(buf) => buf.iter().filter(|e| e.id > after_id).cloned().collect(),
None => Vec::new(),
})
}
async fn purge_session(&self, session_id: &str) -> Result<()> {
self.inner.write().await.remove(session_id);
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct CachingEventStore<Cache, Store> {
cache: Cache,
store: Store,
}
impl<Cache, Store> CachingEventStore<Cache, Store> {
pub fn new(cache: Cache, store: Store) -> Self {
Self { cache, store }
}
}
#[async_trait]
impl<Cache, Store> EventStore for CachingEventStore<Cache, Store>
where
Cache: EventStore,
Store: EventStore,
{
async fn append(&self, session_id: &str, event: EventRecord) -> Result<()> {
self.store.append(session_id, event.clone()).await?;
self.cache.append(session_id, event).await?;
Ok(())
}
async fn replay_after(&self, session_id: &str, after_id: u64) -> Result<Vec<EventRecord>> {
let cached = self.cache.replay_after(session_id, after_id).await?;
if !cached.is_empty() {
return Ok(cached);
}
let from_store = self.store.replay_after(session_id, after_id).await?;
for event in &from_store {
let _ = self.cache.append(session_id, event.clone()).await;
}
Ok(from_store)
}
async fn purge_session(&self, session_id: &str) -> Result<()> {
let cache_result = self.cache.purge_session(session_id).await;
let store_result = self.store.purge_session(session_id).await;
cache_result.and(store_result)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn memory_store_append_and_replay() {
let store = MemoryEventStore::new();
for i in 0..3 {
store
.append("s", EventRecord::new(i, format!("e{i}")))
.await
.unwrap();
}
let all = store.replay_after("s", 0).await.unwrap();
assert_eq!(all.len(), 2);
let none = store.replay_after("s", 5).await.unwrap();
assert!(none.is_empty());
let after_first = store.replay_after("s", 0).await.unwrap();
assert_eq!(after_first[0].id, 1);
assert_eq!(after_first[1].id, 2);
}
#[tokio::test]
async fn memory_store_respects_capacity() {
let store = MemoryEventStore::with_capacity(3);
for i in 0..5 {
store
.append("s", EventRecord::new(i, format!("e{i}")))
.await
.unwrap();
}
let events = store.replay_after("s", 0).await.unwrap();
assert_eq!(events.len(), 3);
assert_eq!(events[0].id, 2);
assert_eq!(events[2].id, 4);
}
#[tokio::test]
async fn memory_store_isolates_sessions() {
let store = MemoryEventStore::new();
store.append("a", EventRecord::new(0, "a0")).await.unwrap();
store.append("b", EventRecord::new(0, "b0")).await.unwrap();
let a = store.replay_after("a", 0).await.unwrap();
let b = store.replay_after("b", 0).await.unwrap();
assert!(a.is_empty() && b.is_empty(), "after_id filters out id 0");
store.append("a", EventRecord::new(1, "a1")).await.unwrap();
let a1 = store.replay_after("a", 0).await.unwrap();
assert_eq!(a1.len(), 1);
assert_eq!(a1[0].data, "a1");
}
#[tokio::test]
async fn memory_store_purge_removes_session() {
let store = MemoryEventStore::new();
store.append("s", EventRecord::new(0, "x")).await.unwrap();
assert_eq!(store.session_count().await, 1);
store.purge_session("s").await.unwrap();
assert_eq!(store.session_count().await, 0);
}
#[tokio::test]
async fn memory_store_purge_is_idempotent() {
MemoryEventStore::new()
.purge_session("nonexistent")
.await
.unwrap();
}
#[tokio::test]
async fn dyn_event_store_object_safe() {
let store: Arc<dyn EventStore> = Arc::new(MemoryEventStore::new());
store.append("s", EventRecord::new(0, "x")).await.unwrap();
}
#[tokio::test]
async fn caching_store_writes_to_both_tiers() {
let cache = MemoryEventStore::new();
let backend = MemoryEventStore::new();
let store = CachingEventStore::new(cache.clone(), backend.clone());
store.append("s", EventRecord::new(0, "x")).await.unwrap();
assert_eq!(cache.total_events().await, 1);
assert_eq!(backend.total_events().await, 1);
}
#[tokio::test]
async fn caching_store_reads_from_cache_first() {
let cache = MemoryEventStore::new();
let backend = MemoryEventStore::new();
backend
.append("s", EventRecord::new(0, "b0"))
.await
.unwrap();
backend
.append("s", EventRecord::new(1, "b1"))
.await
.unwrap();
let store = CachingEventStore::new(cache.clone(), backend);
let first = store.replay_after("s", 0).await.unwrap();
assert_eq!(first.len(), 1);
assert_eq!(cache.total_events().await, 1);
}
#[tokio::test]
async fn caching_store_purge_clears_both() {
let cache = MemoryEventStore::new();
let backend = MemoryEventStore::new();
let store = CachingEventStore::new(cache.clone(), backend.clone());
store.append("s", EventRecord::new(0, "x")).await.unwrap();
store.purge_session("s").await.unwrap();
assert_eq!(cache.total_events().await, 0);
assert_eq!(backend.total_events().await, 0);
}
}