use crate::kernel::{ExecutionEvent, ExecutionId};
use crate::streaming::event_logger::{EventLogEntry, EventStore};
use async_trait::async_trait;
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::path::PathBuf;
use std::sync::RwLock;
use tokio::fs;
use tracing::debug;
pub struct JsonlEventStore {
base_dir: PathBuf,
sequence: RwLock<u64>,
index: RwLock<HashMap<String, Vec<u64>>>,
}
impl JsonlEventStore {
pub async fn new(base_dir: PathBuf) -> anyhow::Result<Self> {
fs::create_dir_all(&base_dir).await?;
let store = Self {
base_dir,
sequence: RwLock::new(0),
index: RwLock::new(HashMap::new()),
};
store.rebuild_index().await?;
Ok(store)
}
fn execution_file(&self, execution_id: &str) -> PathBuf {
self.base_dir.join(format!("{}.jsonl", execution_id))
}
async fn rebuild_index(&self) -> anyhow::Result<()> {
let mut max_sequence = 0u64;
let mut index = HashMap::new();
let mut entries = match fs::read_dir(&self.base_dir).await {
Ok(e) => e,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(e) => return Err(e.into()),
};
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
let execution_id = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("")
.to_string();
if let Ok(file) = std::fs::File::open(&path) {
let reader = BufReader::new(file);
let mut sequences = Vec::new();
for line in reader.lines().map_while(Result::ok) {
if let Ok(entry) = serde_json::from_str::<EventLogEntry>(&line) {
sequences.push(entry.sequence);
if entry.sequence > max_sequence {
max_sequence = entry.sequence;
}
}
}
if !sequences.is_empty() {
index.insert(execution_id, sequences);
}
}
}
}
let num_executions = index.len();
*self
.sequence
.write()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))? = max_sequence;
*self
.index
.write()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))? = index;
debug!(
"Rebuilt JSONL event store index: {} executions, max_sequence={}",
num_executions, max_sequence
);
Ok(())
}
fn read_execution_file(&self, execution_id: &str) -> anyhow::Result<Vec<EventLogEntry>> {
let path = self.execution_file(execution_id);
if !path.exists() {
return Ok(Vec::new());
}
let file = std::fs::File::open(&path)?;
let reader = BufReader::new(file);
let mut entries = Vec::new();
for line in reader.lines() {
let line = line?;
if !line.trim().is_empty() {
let entry: EventLogEntry = serde_json::from_str(&line)?;
entries.push(entry);
}
}
Ok(entries)
}
}
impl std::fmt::Debug for JsonlEventStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JsonlEventStore")
.field("base_dir", &self.base_dir)
.finish()
}
}
#[async_trait]
impl EventStore for JsonlEventStore {
async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry> {
let execution_id = event.context.execution_id.as_str().to_string();
let sequence = {
let mut seq = self
.sequence
.write()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
*seq += 1;
*seq
};
let entry = EventLogEntry::new(sequence, event);
let json = serde_json::to_string(&entry)?;
let path = self.execution_file(&execution_id);
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)?;
writeln!(file, "{}", json)?;
file.flush()?;
{
let mut index = self
.index
.write()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
index
.entry(execution_id)
.or_insert_with(Vec::new)
.push(sequence);
}
debug!("Appended event {} to JSONL store", sequence);
Ok(entry)
}
async fn get_by_execution(
&self,
execution_id: &ExecutionId,
) -> anyhow::Result<Vec<EventLogEntry>> {
self.read_execution_file(execution_id.as_str())
}
async fn get_after(
&self,
after_sequence: u64,
limit: usize,
) -> anyhow::Result<Vec<EventLogEntry>> {
let index = self
.index
.read()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
let mut all_entries = Vec::new();
for execution_id in index.keys() {
let entries = self.read_execution_file(execution_id)?;
for entry in entries {
if entry.sequence > after_sequence {
all_entries.push(entry);
}
}
}
all_entries.sort_by_key(|e| e.sequence);
all_entries.truncate(limit);
Ok(all_entries)
}
async fn latest_sequence(&self) -> anyhow::Result<u64> {
Ok(*self
.sequence
.read()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kernel::{ExecutionContext, ExecutionEventType};
use tempfile::tempdir;
#[tokio::test]
async fn test_jsonl_store_append_and_retrieve() {
let dir = tempdir().unwrap();
let store = JsonlEventStore::new(dir.path().to_path_buf())
.await
.unwrap();
let exec_id = ExecutionId::new();
let ctx = ExecutionContext::new(exec_id.clone());
let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);
let entry = store.append(event).await.unwrap();
assert_eq!(entry.sequence, 1);
let events = store.get_by_execution(&exec_id).await.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].sequence, 1);
}
#[tokio::test]
async fn test_jsonl_store_persistence() {
let dir = tempdir().unwrap();
let exec_id = ExecutionId::new();
{
let store = JsonlEventStore::new(dir.path().to_path_buf())
.await
.unwrap();
let ctx = ExecutionContext::new(exec_id.clone());
let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);
store.append(event).await.unwrap();
}
{
let store = JsonlEventStore::new(dir.path().to_path_buf())
.await
.unwrap();
let events = store.get_by_execution(&exec_id).await.unwrap();
assert_eq!(events.len(), 1);
}
}
}