use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum WorkflowEvent {
WorkflowStarted { workflow_id: String },
NodeStarted {
node_id: String,
node_type: NodeType,
},
LlmTokenDelta {
node_id: String,
token: String,
token_kind: TokenKind,
},
NodeCompleted { node_id: String, output: Value },
ToolCallRequested {
node_id: String,
tool_name: String,
arguments: Value,
},
ToolCallCompleted {
node_id: String,
tool_name: String,
output: Value,
},
HumanInputRequested { node_id: String, request: Value },
HumanInputReceived { node_id: String, response: Value },
NodeRetrying {
node_id: String,
attempt: u8,
error: String,
},
NodeFailed { node_id: String, error: String },
WorkflowCompleted {
output: Value,
metadata: Option<Value>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum NodeType {
LlmCall,
Switch,
CustomWorker,
HumanInput,
End,
Unknown,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum TokenKind {
Output,
Reasoning,
}
pub trait WorkflowEventSink: Send + Sync {
fn emit(&self, event: &WorkflowEvent);
fn is_cancelled(&self) -> bool {
false
}
}
pub struct CallbackSink<F: Fn(&WorkflowEvent) + Send + Sync>(pub F);
impl<F: Fn(&WorkflowEvent) + Send + Sync> WorkflowEventSink for CallbackSink<F> {
fn emit(&self, event: &WorkflowEvent) {
(self.0)(event);
}
}
pub struct NoopSink;
impl WorkflowEventSink for NoopSink {
fn emit(&self, _event: &WorkflowEvent) {}
}
pub struct DefaultEventPrinter;
impl WorkflowEventSink for DefaultEventPrinter {
fn emit(&self, event: &WorkflowEvent) {
match event {
WorkflowEvent::WorkflowStarted { workflow_id } => {
eprintln!("[workflow] started: {workflow_id}");
}
WorkflowEvent::NodeStarted { node_id, node_type } => {
eprintln!("[node] {node_id} ({node_type:?}) started");
}
WorkflowEvent::LlmTokenDelta {
token, token_kind, ..
} => {
if *token_kind == TokenKind::Output {
use std::io::Write;
let _ = std::io::stdout().write_all(token.as_bytes());
let _ = std::io::stdout().flush();
}
}
WorkflowEvent::NodeCompleted { node_id, .. } => {
eprintln!("[node] {node_id} completed");
}
WorkflowEvent::ToolCallRequested {
tool_name, node_id, ..
} => {
eprintln!("[tool] {node_id} calling {tool_name}");
}
WorkflowEvent::ToolCallCompleted {
tool_name, node_id, ..
} => {
eprintln!("[tool] {node_id} {tool_name} done");
}
WorkflowEvent::HumanInputRequested { node_id, .. } => {
eprintln!("[human] {node_id} waiting for response");
}
WorkflowEvent::HumanInputReceived { node_id, .. } => {
eprintln!("[human] {node_id} response received");
}
WorkflowEvent::NodeRetrying {
node_id,
attempt,
error,
} => {
eprintln!("[retry] {node_id} attempt #{attempt}: {error}");
}
WorkflowEvent::NodeFailed { node_id, error } => {
eprintln!("[error] {node_id}: {error}");
}
WorkflowEvent::WorkflowCompleted { .. } => {
eprintln!("[workflow] completed");
}
}
}
}
#[cfg(test)]
fn emit(sink: Option<&dyn WorkflowEventSink>, event: WorkflowEvent) {
if let Some(s) = sink {
s.emit(&event);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
#[test]
fn test_callback_sink_collects_events() {
let events: Arc<Mutex<Vec<WorkflowEvent>>> = Arc::new(Mutex::new(vec![]));
let events_clone = events.clone();
let sink = CallbackSink(move |e: &WorkflowEvent| {
events_clone.lock().unwrap().push(e.clone());
});
emit(
Some(&sink),
WorkflowEvent::WorkflowStarted {
workflow_id: "test".into(),
},
);
assert_eq!(events.lock().unwrap().len(), 1);
}
#[test]
fn test_event_serialization() {
let event = WorkflowEvent::NodeStarted {
node_id: "classify".into(),
node_type: NodeType::LlmCall,
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "node_started");
assert_eq!(json["node_id"], "classify");
assert_eq!(json["node_type"], "llm_call");
}
#[test]
fn test_noop_sink_does_not_panic() {
let sink = NoopSink;
emit(
Some(&sink),
WorkflowEvent::WorkflowStarted {
workflow_id: "x".into(),
},
);
}
}