use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Event {
pub id: String,
pub timestamp: DateTime<Utc>,
pub thread_id: String,
pub kind: EventKind,
}
impl Event {
pub fn new(thread_id: impl Into<String>, kind: EventKind) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
timestamp: Utc::now(),
thread_id: thread_id.into(),
kind,
}
}
pub fn graph_started(thread_id: impl Into<String>, graph_name: Option<String>, entry_point: String) -> Self {
Self::new(thread_id, EventKind::Graph(GraphEvent::Started {
graph_name,
entry_point,
}))
}
pub fn graph_completed(thread_id: impl Into<String>, iterations: u32, duration: Duration) -> Self {
Self::new(thread_id, EventKind::Graph(GraphEvent::Completed {
iterations,
duration_ms: duration.as_millis() as u64,
}))
}
pub fn graph_error(thread_id: impl Into<String>, error: String) -> Self {
Self::new(thread_id, EventKind::Graph(GraphEvent::Error { error }))
}
pub fn node_entered(thread_id: impl Into<String>, node_id: String, iteration: u32) -> Self {
Self::new(thread_id, EventKind::Node(NodeEvent::Entered {
node_id,
iteration,
}))
}
pub fn node_exited(
thread_id: impl Into<String>,
node_id: String,
next_node: Option<String>,
duration: Duration,
) -> Self {
Self::new(thread_id, EventKind::Node(NodeEvent::Exited {
node_id,
next_node,
duration_ms: duration.as_millis() as u64,
}))
}
pub fn node_error(thread_id: impl Into<String>, node_id: String, error: String) -> Self {
Self::new(thread_id, EventKind::Node(NodeEvent::Error { node_id, error }))
}
pub fn checkpoint_saved(
thread_id: impl Into<String>,
checkpoint_id: String,
node_id: String,
) -> Self {
Self::new(thread_id, EventKind::Checkpoint(CheckpointEvent::Saved {
checkpoint_id,
node_id,
}))
}
pub fn checkpoint_restored(
thread_id: impl Into<String>,
checkpoint_id: String,
node_id: String,
) -> Self {
Self::new(thread_id, EventKind::Checkpoint(CheckpointEvent::Restored {
checkpoint_id,
node_id,
}))
}
pub fn state_updated(
thread_id: impl Into<String>,
node_id: String,
keys_changed: Vec<String>,
) -> Self {
Self::new(thread_id, EventKind::State(StateEvent::Updated {
node_id,
keys_changed,
}))
}
pub fn is_graph_event(&self) -> bool {
matches!(self.kind, EventKind::Graph(_))
}
pub fn is_node_event(&self) -> bool {
matches!(self.kind, EventKind::Node(_))
}
pub fn is_checkpoint_event(&self) -> bool {
matches!(self.kind, EventKind::Checkpoint(_))
}
pub fn node_id(&self) -> Option<&str> {
match &self.kind {
EventKind::Node(NodeEvent::Entered { node_id, .. }) => Some(node_id),
EventKind::Node(NodeEvent::Exited { node_id, .. }) => Some(node_id),
EventKind::Node(NodeEvent::Error { node_id, .. }) => Some(node_id),
EventKind::Checkpoint(CheckpointEvent::Saved { node_id, .. }) => Some(node_id),
EventKind::Checkpoint(CheckpointEvent::Restored { node_id, .. }) => Some(node_id),
EventKind::State(StateEvent::Updated { node_id, .. }) => Some(node_id),
_ => None,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "category", content = "data")]
pub enum EventKind {
Graph(GraphEvent),
Node(NodeEvent),
Checkpoint(CheckpointEvent),
State(StateEvent),
Custom {
name: String,
payload: serde_json::Value,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum GraphEvent {
Started {
graph_name: Option<String>,
entry_point: String,
},
Completed {
iterations: u32,
duration_ms: u64,
},
Error {
error: String,
},
Interrupted {
reason: String,
node_id: String,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum NodeEvent {
Entered {
node_id: String,
iteration: u32,
},
Exited {
node_id: String,
next_node: Option<String>,
duration_ms: u64,
},
Error {
node_id: String,
error: String,
},
Retrying {
node_id: String,
attempt: u32,
delay_ms: u64,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum CheckpointEvent {
Saved {
checkpoint_id: String,
node_id: String,
},
Restored {
checkpoint_id: String,
node_id: String,
},
Deleted {
checkpoint_id: String,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum StateEvent {
Updated {
node_id: String,
keys_changed: Vec<String>,
},
MessageAdded {
role: String,
content_length: usize,
},
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_event_creation() {
let event = Event::graph_started("thread-1", Some("my_graph".to_string()), "start".to_string());
assert_eq!(event.thread_id, "thread-1");
assert!(event.is_graph_event());
assert!(!event.is_node_event());
}
#[test]
fn test_node_event() {
let event = Event::node_entered("thread-1", "my_node".to_string(), 5);
assert!(event.is_node_event());
assert_eq!(event.node_id(), Some("my_node"));
}
#[test]
fn test_event_serialization() {
let event = Event::node_exited(
"thread-1",
"processor".to_string(),
Some("next".to_string()),
Duration::from_millis(150),
);
let json = serde_json::to_string(&event).unwrap();
let parsed: Event = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.thread_id, event.thread_id);
}
}