#![allow(clippy::significant_drop_tightening)]
use std::collections::HashMap;
use async_trait::async_trait;
use entelix_core::{Result, ThreadKey};
use parking_lot::Mutex;
use crate::event::GraphEvent;
#[async_trait]
pub trait SessionLog: Send + Sync + 'static {
async fn append(&self, key: &ThreadKey, events: &[GraphEvent]) -> Result<u64>;
async fn load_since(&self, key: &ThreadKey, cursor: u64) -> Result<Vec<GraphEvent>>;
async fn archive_before(&self, key: &ThreadKey, watermark: u64) -> Result<usize>;
}
#[derive(Default)]
pub struct InMemorySessionLog {
inner: Mutex<HashMap<ThreadKey, ThreadLog>>,
}
#[derive(Default)]
struct ThreadLog {
events: Vec<GraphEvent>,
archival_watermark: u64,
}
impl InMemorySessionLog {
pub fn new() -> Self {
Self::default()
}
}
#[async_trait]
impl SessionLog for InMemorySessionLog {
async fn append(&self, key: &ThreadKey, events: &[GraphEvent]) -> Result<u64> {
let len = {
let mut guard = self.inner.lock();
let log = guard.entry(key.clone()).or_default();
log.events.extend(events.iter().cloned());
log.events.len()
};
Ok(len as u64)
}
async fn load_since(&self, key: &ThreadKey, cursor: u64) -> Result<Vec<GraphEvent>> {
let snapshot = {
let guard = self.inner.lock();
let Some(log) = guard.get(key) else {
return Ok(Vec::new());
};
let effective_start = cursor.max(log.archival_watermark);
let start = usize::try_from(effective_start).unwrap_or(usize::MAX);
log.events.get(start..).unwrap_or(&[]).to_vec()
};
Ok(snapshot)
}
async fn archive_before(&self, key: &ThreadKey, watermark: u64) -> Result<usize> {
let archived = {
let mut guard = self.inner.lock();
let Some(log) = guard.get_mut(key) else {
return Ok(0);
};
if watermark <= log.archival_watermark {
return Ok(0);
}
let prior = log.archival_watermark;
log.archival_watermark = watermark;
watermark.saturating_sub(prior)
};
Ok(usize::try_from(archived).unwrap_or(usize::MAX))
}
}