Skip to main content

simple_agents_workflow/yaml_runner/
events.rs

1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3
4/// Typed workflow event for streaming and observation.
5#[derive(Debug, Clone, Serialize, Deserialize)]
6#[serde(tag = "type", rename_all = "snake_case")]
7pub enum WorkflowEvent {
8    /// Workflow execution has begun.
9    WorkflowStarted { workflow_id: String },
10    /// A workflow node has started executing.
11    NodeStarted {
12        node_id: String,
13        node_type: NodeType,
14    },
15    /// A token delta from an LLM node (streaming).
16    LlmTokenDelta {
17        node_id: String,
18        token: String,
19        token_kind: TokenKind,
20    },
21    /// A workflow node has finished executing.
22    NodeCompleted { node_id: String, output: Value },
23    /// An LLM node requested a tool call.
24    ToolCallRequested {
25        node_id: String,
26        tool_name: String,
27        arguments: Value,
28    },
29    /// A tool call has finished.
30    ToolCallCompleted {
31        node_id: String,
32        tool_name: String,
33        output: Value,
34    },
35    /// A node is retrying after failure.
36    NodeRetrying {
37        node_id: String,
38        attempt: u8,
39        error: String,
40    },
41    /// A node has failed permanently.
42    NodeFailed { node_id: String, error: String },
43    /// Workflow execution completed.
44    WorkflowCompleted {
45        output: Value,
46        metadata: Option<Value>,
47    },
48}
49
50/// The type of workflow node.
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
52#[serde(rename_all = "snake_case")]
53pub enum NodeType {
54    LlmCall,
55    Switch,
56    CustomWorker,
57    End,
58    Unknown,
59}
60
61/// The kind of LLM token in a streaming delta.
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63#[serde(rename_all = "snake_case")]
64pub enum TokenKind {
65    Output,
66    Reasoning,
67}
68
69/// Trait for receiving workflow events.
70pub trait WorkflowEventSink: Send + Sync {
71    /// Called for each workflow event.
72    fn emit(&self, event: &WorkflowEvent);
73    /// Return true to request cancellation.
74    fn is_cancelled(&self) -> bool {
75        false
76    }
77}
78
79/// Wraps a closure as a WorkflowEventSink.
80pub struct CallbackSink<F: Fn(&WorkflowEvent) + Send + Sync>(pub F);
81
82impl<F: Fn(&WorkflowEvent) + Send + Sync> WorkflowEventSink for CallbackSink<F> {
83    fn emit(&self, event: &WorkflowEvent) {
84        (self.0)(event);
85    }
86}
87
88/// No-op sink that discards all events.
89pub struct NoopSink;
90impl WorkflowEventSink for NoopSink {
91    fn emit(&self, _event: &WorkflowEvent) {}
92}
93
94/// Built-in pretty-printer for workflow events.
95/// Streams LLM tokens to stdout, logs node lifecycle to stderr.
96pub struct DefaultEventPrinter;
97
98impl WorkflowEventSink for DefaultEventPrinter {
99    fn emit(&self, event: &WorkflowEvent) {
100        match event {
101            WorkflowEvent::WorkflowStarted { workflow_id } => {
102                eprintln!("[workflow] started: {workflow_id}");
103            }
104            WorkflowEvent::NodeStarted { node_id, node_type } => {
105                eprintln!("[node] {node_id} ({node_type:?}) started");
106            }
107            WorkflowEvent::LlmTokenDelta {
108                token, token_kind, ..
109            } => {
110                if *token_kind == TokenKind::Output {
111                    use std::io::Write;
112                    let _ = std::io::stdout().write_all(token.as_bytes());
113                    let _ = std::io::stdout().flush();
114                }
115            }
116            WorkflowEvent::NodeCompleted { node_id, .. } => {
117                eprintln!("[node] {node_id} completed");
118            }
119            WorkflowEvent::ToolCallRequested {
120                tool_name, node_id, ..
121            } => {
122                eprintln!("[tool] {node_id} calling {tool_name}");
123            }
124            WorkflowEvent::ToolCallCompleted {
125                tool_name, node_id, ..
126            } => {
127                eprintln!("[tool] {node_id} {tool_name} done");
128            }
129            WorkflowEvent::NodeRetrying {
130                node_id,
131                attempt,
132                error,
133            } => {
134                eprintln!("[retry] {node_id} attempt #{attempt}: {error}");
135            }
136            WorkflowEvent::NodeFailed { node_id, error } => {
137                eprintln!("[error] {node_id}: {error}");
138            }
139            WorkflowEvent::WorkflowCompleted { .. } => {
140                eprintln!("[workflow] completed");
141            }
142        }
143    }
144}
145
146#[cfg(test)]
147fn emit(sink: Option<&dyn WorkflowEventSink>, event: WorkflowEvent) {
148    if let Some(s) = sink {
149        s.emit(&event);
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use std::sync::{Arc, Mutex};
157
158    #[test]
159    fn test_callback_sink_collects_events() {
160        let events: Arc<Mutex<Vec<WorkflowEvent>>> = Arc::new(Mutex::new(vec![]));
161        let events_clone = events.clone();
162        let sink = CallbackSink(move |e: &WorkflowEvent| {
163            events_clone.lock().unwrap().push(e.clone());
164        });
165        emit(
166            Some(&sink),
167            WorkflowEvent::WorkflowStarted {
168                workflow_id: "test".into(),
169            },
170        );
171        assert_eq!(events.lock().unwrap().len(), 1);
172    }
173
174    #[test]
175    fn test_event_serialization() {
176        let event = WorkflowEvent::NodeStarted {
177            node_id: "classify".into(),
178            node_type: NodeType::LlmCall,
179        };
180        let json = serde_json::to_value(&event).unwrap();
181        assert_eq!(json["type"], "node_started");
182        assert_eq!(json["node_id"], "classify");
183        assert_eq!(json["node_type"], "llm_call");
184    }
185
186    #[test]
187    fn test_noop_sink_does_not_panic() {
188        let sink = NoopSink;
189        emit(
190            Some(&sink),
191            WorkflowEvent::WorkflowStarted {
192                workflow_id: "x".into(),
193            },
194        );
195    }
196}