use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::kernel::{
ExecutionError, ExecutionId, ParentLink, StepId, StepSourceType, StepType, TenantId,
};
use super::StorageBackend;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoredEvent {
pub sequence: u64,
pub execution_id: ExecutionId,
pub tenant_id: TenantId,
pub event: ExecutionEventData,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ExecutionEventData {
ExecutionStarted {
parent: Option<ParentLink>,
metadata: Option<serde_json::Value>,
},
StepStarted {
step_id: StepId,
parent_step_id: Option<StepId>,
step_type: StepType,
name: String,
},
StepCompleted {
step_id: StepId,
output: Option<String>,
duration_ms: u64,
},
StepFailed {
step_id: StepId,
error: ExecutionError,
},
ExecutionPaused { reason: String },
ExecutionResumed,
ExecutionCompleted {
output: Option<String>,
duration_ms: u64,
},
ExecutionFailed { error: ExecutionError },
ExecutionCancelled { reason: String },
ArtifactStored {
artifact_id: String,
step_id: StepId,
content_hash: String,
size_bytes: u64,
},
StepDiscovered {
step_id: StepId,
discovered_by: Option<StepId>,
source_type: StepSourceType,
reason: String,
depth: u32,
},
ToolCallStarted {
step_id: Option<StepId>,
tool_call_id: String,
tool_name: String,
input: serde_json::Value,
},
ToolCallCompleted {
step_id: Option<StepId>,
tool_call_id: String,
output: serde_json::Value,
},
CheckpointSaved {
step_id: Option<StepId>,
checkpoint_id: String,
state_hash: String,
},
GoalEvaluated {
step_id: Option<StepId>,
goal_id: String,
status: String, score: Option<f64>,
reason: Option<String>,
},
Custom {
event_type: String,
data: serde_json::Value,
},
}
#[async_trait]
pub trait EventStore: StorageBackend {
async fn append(
&self,
execution_id: &ExecutionId,
tenant_id: &TenantId,
event: ExecutionEventData,
) -> anyhow::Result<u64>;
async fn append_batch(
&self,
execution_id: &ExecutionId,
tenant_id: &TenantId,
events: Vec<ExecutionEventData>,
) -> anyhow::Result<u64>;
async fn load_events(&self, execution_id: &ExecutionId) -> anyhow::Result<Vec<StoredEvent>>;
async fn load_events_after(
&self,
execution_id: &ExecutionId,
after_seq: u64,
) -> anyhow::Result<Vec<StoredEvent>>;
async fn get_latest_sequence(&self, execution_id: &ExecutionId) -> anyhow::Result<Option<u64>>;
async fn execution_exists(&self, execution_id: &ExecutionId) -> anyhow::Result<bool> {
Ok(self.get_latest_sequence(execution_id).await?.is_some())
}
async fn list_executions(
&self,
tenant_id: &TenantId,
limit: usize,
offset: usize,
) -> anyhow::Result<Vec<ExecutionId>>;
async fn count_events(&self, execution_id: &ExecutionId) -> anyhow::Result<u64> {
Ok(self.get_latest_sequence(execution_id).await?.unwrap_or(0))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_event_serialization() {
let event = ExecutionEventData::StepStarted {
step_id: StepId::new(),
parent_step_id: None,
step_type: StepType::FunctionNode,
name: "test_step".to_string(),
};
let json = serde_json::to_string(&event).unwrap();
let parsed: ExecutionEventData = serde_json::from_str(&json).unwrap();
match parsed {
ExecutionEventData::StepStarted { name, .. } => {
assert_eq!(name, "test_step");
}
_ => panic!("Wrong event type"),
}
}
}