use crate::event_store::EventStore;
use meerkat_core::event::AgentEvent;
use meerkat_core::types::SessionId;
use std::path::{Path, PathBuf};
pub struct SessionProjector {
output_dir: PathBuf,
}
impl SessionProjector {
pub fn new(output_dir: impl Into<PathBuf>) -> Self {
Self {
output_dir: output_dir.into(),
}
}
pub fn output_dir(&self) -> &Path {
&self.output_dir
}
fn session_dir(&self, session_id: &SessionId) -> PathBuf {
self.output_dir
.join("sessions")
.join(session_id.to_string())
}
pub async fn project<E: EventStore>(
&self,
event_store: &E,
session_id: &SessionId,
from_seq: u64,
) -> Result<u64, ProjectionError> {
let events = event_store
.read_from(session_id, from_seq)
.await
.map_err(|e| ProjectionError::EventStore(e.to_string()))?;
if events.is_empty() {
return Ok(from_seq.saturating_sub(1));
}
let dir = self.session_dir(session_id);
tokio::fs::create_dir_all(&dir)
.await
.map_err(ProjectionError::Io)?;
let events_path = dir.join("events.jsonl");
let mut lines = String::new();
let mut last_seq = from_seq.saturating_sub(1);
let mut last_assistant_text: Option<String> = None;
for stored in &events {
let line = serde_json::to_string(&stored)
.map_err(|e| ProjectionError::Serialization(e.to_string()))?;
lines.push_str(&line);
lines.push('\n');
last_seq = stored.seq;
if let AgentEvent::TextComplete { content } = &stored.event
&& !content.is_empty()
{
last_assistant_text = Some(content.clone());
}
}
use tokio::io::AsyncWriteExt;
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&events_path)
.await
.map_err(ProjectionError::Io)?;
file.write_all(lines.as_bytes())
.await
.map_err(ProjectionError::Io)?;
if let Some(text) = last_assistant_text {
let summary_path = dir.join("summary.txt");
tokio::fs::write(&summary_path, text.as_bytes())
.await
.map_err(ProjectionError::Io)?;
}
let checkpoint_path = dir.join("checkpoint");
tokio::fs::write(&checkpoint_path, last_seq.to_string().as_bytes())
.await
.map_err(ProjectionError::Io)?;
Ok(last_seq)
}
pub async fn read_checkpoint(&self, session_id: &SessionId) -> u64 {
let path = self.session_dir(session_id).join("checkpoint");
match tokio::fs::read_to_string(&path).await {
Ok(s) => match s.trim().parse() {
Ok(seq) => seq,
Err(err) => {
tracing::warn!(
checkpoint = ?path,
"invalid checkpoint contents (defaulting to 0): {err}"
);
0
}
},
Err(err) if err.kind() == std::io::ErrorKind::NotFound => 0,
Err(err) => {
tracing::warn!(
checkpoint = ?path,
"failed to read checkpoint (defaulting to 0): {err}"
);
0
}
}
}
pub async fn replay<E: EventStore>(
&self,
event_store: &E,
session_id: &SessionId,
) -> Result<u64, ProjectionError> {
let dir = self.session_dir(session_id);
if dir.exists() {
tokio::fs::remove_dir_all(&dir)
.await
.map_err(ProjectionError::Io)?;
}
self.project(event_store, session_id, 1).await
}
pub async fn resume<E: EventStore>(
&self,
event_store: &E,
session_id: &SessionId,
) -> Result<u64, ProjectionError> {
let checkpoint = self.read_checkpoint(session_id).await;
self.project(event_store, session_id, checkpoint + 1).await
}
}
#[derive(Debug, thiserror::Error)]
pub enum ProjectionError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Event store error: {0}")]
EventStore(String),
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::event_store::{EVENT_SCHEMA_VERSION, EventStoreError, StoredEvent};
use meerkat_core::types::Usage;
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::SystemTime;
use tempfile::TempDir;
struct MemEventStore {
events: Mutex<HashMap<String, Vec<StoredEvent>>>,
}
impl MemEventStore {
fn new() -> Self {
Self {
events: Mutex::new(HashMap::new()),
}
}
fn add_events(&self, session_id: &SessionId, agent_events: &[AgentEvent]) {
let mut map = self.events.lock().unwrap();
let entry = map.entry(session_id.to_string()).or_default();
let base_seq = entry.len() as u64;
for (i, event) in agent_events.iter().enumerate() {
entry.push(StoredEvent {
seq: base_seq + i as u64 + 1,
schema_version: EVENT_SCHEMA_VERSION,
timestamp: SystemTime::now(),
event: event.clone(),
});
}
}
}
#[async_trait::async_trait]
impl EventStore for MemEventStore {
async fn append(
&self,
session_id: &SessionId,
events: &[AgentEvent],
) -> Result<u64, EventStoreError> {
self.add_events(session_id, events);
self.last_seq(session_id).await
}
async fn read_from(
&self,
session_id: &SessionId,
from_seq: u64,
) -> Result<Vec<StoredEvent>, EventStoreError> {
let map = self.events.lock().unwrap();
let key = session_id.to_string();
Ok(map
.get(&key)
.map(|events| {
events
.iter()
.filter(|e| e.seq >= from_seq)
.cloned()
.collect()
})
.unwrap_or_default())
}
async fn last_seq(&self, session_id: &SessionId) -> Result<u64, EventStoreError> {
let map = self.events.lock().unwrap();
let key = session_id.to_string();
Ok(map
.get(&key)
.and_then(|events| events.last().map(|e| e.seq))
.unwrap_or(0))
}
}
#[tokio::test]
async fn test_projector_materializes_session_files() {
let dir = TempDir::new().unwrap();
let projector = SessionProjector::new(dir.path().join(".rkat"));
let store = MemEventStore::new();
let sid = SessionId::new();
store.add_events(
&sid,
&[
AgentEvent::RunStarted {
session_id: sid.clone(),
prompt: "Hello".to_string(),
},
AgentEvent::TextComplete {
content: "Hi there!".to_string(),
},
AgentEvent::RunCompleted {
session_id: sid.clone(),
result: "Hi there!".to_string(),
usage: Usage::default(),
},
],
);
let last_seq = projector.project(&store, &sid, 1).await.unwrap();
assert_eq!(last_seq, 3);
let session_dir = projector.session_dir(&sid);
assert!(session_dir.join("events.jsonl").exists());
assert!(session_dir.join("summary.txt").exists());
assert!(session_dir.join("checkpoint").exists());
let summary = std::fs::read_to_string(session_dir.join("summary.txt")).unwrap();
assert_eq!(summary, "Hi there!");
let checkpoint = std::fs::read_to_string(session_dir.join("checkpoint")).unwrap();
assert_eq!(checkpoint.trim(), "3");
}
#[tokio::test]
async fn test_projector_idempotent_replay() {
let dir = TempDir::new().unwrap();
let projector = SessionProjector::new(dir.path().join(".rkat"));
let store = MemEventStore::new();
let sid = SessionId::new();
store.add_events(
&sid,
&[
AgentEvent::TurnStarted { turn_number: 0 },
AgentEvent::TextComplete {
content: "Response 1".to_string(),
},
],
);
let seq1 = projector.replay(&store, &sid).await.unwrap();
let events_content_1 =
std::fs::read_to_string(projector.session_dir(&sid).join("events.jsonl")).unwrap();
let seq2 = projector.replay(&store, &sid).await.unwrap();
let events_content_2 =
std::fs::read_to_string(projector.session_dir(&sid).join("events.jsonl")).unwrap();
assert_eq!(seq1, seq2);
assert_eq!(events_content_1, events_content_2);
}
#[tokio::test]
async fn test_projector_resumes_from_checkpoint() {
let dir = TempDir::new().unwrap();
let projector = SessionProjector::new(dir.path().join(".rkat"));
let store = MemEventStore::new();
let sid = SessionId::new();
store.add_events(&sid, &[AgentEvent::TurnStarted { turn_number: 0 }]);
projector.project(&store, &sid, 1).await.unwrap();
let cp = projector.read_checkpoint(&sid).await;
assert_eq!(cp, 1);
store.add_events(
&sid,
&[
AgentEvent::TurnStarted { turn_number: 1 },
AgentEvent::TextComplete {
content: "Done".to_string(),
},
],
);
let seq = projector.resume(&store, &sid).await.unwrap();
assert_eq!(seq, 3);
let events_content =
std::fs::read_to_string(projector.session_dir(&sid).join("events.jsonl")).unwrap();
let lines: Vec<&str> = events_content.trim().split('\n').collect();
assert_eq!(lines.len(), 3);
}
}