use crate::kernel::{ExecutionEvent, ExecutionId};
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct EventLogEntry {
pub sequence: u64,
pub event: ExecutionEvent,
}
impl EventLogEntry {
pub fn new(sequence: u64, event: ExecutionEvent) -> Self {
Self { sequence, event }
}
}
#[async_trait]
pub trait EventStore: Send + Sync {
async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry>;
async fn get_by_execution(
&self,
execution_id: &ExecutionId,
) -> anyhow::Result<Vec<EventLogEntry>>;
async fn get_after(
&self,
after_sequence: u64,
limit: usize,
) -> anyhow::Result<Vec<EventLogEntry>>;
async fn latest_sequence(&self) -> anyhow::Result<u64>;
}
#[derive(Debug, Default)]
pub struct InMemoryEventStore {
events: RwLock<Vec<EventLogEntry>>,
by_execution: RwLock<HashMap<String, Vec<usize>>>,
}
impl InMemoryEventStore {
pub fn new() -> Self {
Self {
events: RwLock::new(Vec::new()),
by_execution: RwLock::new(HashMap::new()),
}
}
}
#[async_trait]
impl EventStore for InMemoryEventStore {
async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry> {
let mut events = self
.events
.write()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
let sequence = events.len() as u64;
let entry = EventLogEntry::new(sequence, event.clone());
events.push(entry.clone());
let mut by_execution = self
.by_execution
.write()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
by_execution
.entry(event.context.execution_id.as_str().to_string())
.or_insert_with(Vec::new)
.push(sequence as usize);
Ok(entry)
}
async fn get_by_execution(
&self,
execution_id: &ExecutionId,
) -> anyhow::Result<Vec<EventLogEntry>> {
let events = self
.events
.read()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
let by_execution = self
.by_execution
.read()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
let indices = by_execution
.get(execution_id.as_str())
.map(|v| v.as_slice())
.unwrap_or(&[]);
Ok(indices
.iter()
.filter_map(|&i| events.get(i).cloned())
.collect())
}
async fn get_after(
&self,
after_sequence: u64,
limit: usize,
) -> anyhow::Result<Vec<EventLogEntry>> {
let events = self
.events
.read()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
Ok(events
.iter()
.skip(after_sequence as usize + 1)
.take(limit)
.cloned()
.collect())
}
async fn latest_sequence(&self) -> anyhow::Result<u64> {
let events = self
.events
.read()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
Ok(events.len().saturating_sub(1) as u64)
}
}
pub struct EventLog {
store: Arc<dyn EventStore>,
}
impl EventLog {
pub fn new(store: Arc<dyn EventStore>) -> Self {
Self { store }
}
pub fn in_memory() -> Self {
Self::new(Arc::new(InMemoryEventStore::new()))
}
pub async fn append(&self, event: ExecutionEvent) -> anyhow::Result<EventLogEntry> {
self.store.append(event).await
}
pub async fn get_by_execution(
&self,
execution_id: &ExecutionId,
) -> anyhow::Result<Vec<EventLogEntry>> {
self.store.get_by_execution(execution_id).await
}
pub async fn get_after(
&self,
after_sequence: u64,
limit: usize,
) -> anyhow::Result<Vec<EventLogEntry>> {
self.store.get_after(after_sequence, limit).await
}
pub async fn latest_sequence(&self) -> anyhow::Result<u64> {
self.store.latest_sequence().await
}
pub fn store(&self) -> &Arc<dyn EventStore> {
&self.store
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kernel::{ExecutionContext, ExecutionEventType};
#[tokio::test]
async fn test_event_log_append_and_retrieve() {
let log = EventLog::in_memory();
let exec_id = ExecutionId::new();
let ctx = ExecutionContext::new(exec_id.clone());
let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);
let entry = log.append(event).await.unwrap();
assert_eq!(entry.sequence, 0);
let events = log.get_by_execution(&exec_id).await.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].sequence, 0);
}
#[tokio::test]
async fn test_event_log_get_after() {
let log = EventLog::in_memory();
for _ in 0..5 {
let exec_id = ExecutionId::new();
let ctx = ExecutionContext::new(exec_id);
let event = ExecutionEvent::new(ExecutionEventType::ExecutionStart, ctx);
log.append(event).await.unwrap();
}
let events = log.get_after(2, 10).await.unwrap();
assert_eq!(events.len(), 2); assert_eq!(events[0].sequence, 3);
assert_eq!(events[1].sequence, 4);
}
}