simple_agents_workflow/yaml_runner/
events.rs1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3
4#[derive(Debug, Clone, Serialize, Deserialize)]
6#[serde(tag = "type", rename_all = "snake_case")]
7pub enum WorkflowEvent {
8 WorkflowStarted { workflow_id: String },
10 NodeStarted {
12 node_id: String,
13 node_type: NodeType,
14 },
15 LlmTokenDelta {
17 node_id: String,
18 token: String,
19 token_kind: TokenKind,
20 },
21 NodeCompleted { node_id: String, output: Value },
23 ToolCallRequested {
25 node_id: String,
26 tool_name: String,
27 arguments: Value,
28 },
29 ToolCallCompleted {
31 node_id: String,
32 tool_name: String,
33 output: Value,
34 },
35 NodeRetrying {
37 node_id: String,
38 attempt: u8,
39 error: String,
40 },
41 NodeFailed { node_id: String, error: String },
43 WorkflowCompleted {
45 output: Value,
46 metadata: Option<Value>,
47 },
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
52#[serde(rename_all = "snake_case")]
53pub enum NodeType {
54 LlmCall,
55 Switch,
56 CustomWorker,
57 End,
58 Unknown,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63#[serde(rename_all = "snake_case")]
64pub enum TokenKind {
65 Output,
66 Reasoning,
67}
68
69pub trait WorkflowEventSink: Send + Sync {
71 fn emit(&self, event: &WorkflowEvent);
73 fn is_cancelled(&self) -> bool {
75 false
76 }
77}
78
79pub struct CallbackSink<F: Fn(&WorkflowEvent) + Send + Sync>(pub F);
81
82impl<F: Fn(&WorkflowEvent) + Send + Sync> WorkflowEventSink for CallbackSink<F> {
83 fn emit(&self, event: &WorkflowEvent) {
84 (self.0)(event);
85 }
86}
87
88pub struct NoopSink;
90impl WorkflowEventSink for NoopSink {
91 fn emit(&self, _event: &WorkflowEvent) {}
92}
93
94pub struct DefaultEventPrinter;
97
98impl WorkflowEventSink for DefaultEventPrinter {
99 fn emit(&self, event: &WorkflowEvent) {
100 match event {
101 WorkflowEvent::WorkflowStarted { workflow_id } => {
102 eprintln!("[workflow] started: {workflow_id}");
103 }
104 WorkflowEvent::NodeStarted { node_id, node_type } => {
105 eprintln!("[node] {node_id} ({node_type:?}) started");
106 }
107 WorkflowEvent::LlmTokenDelta {
108 token, token_kind, ..
109 } => {
110 if *token_kind == TokenKind::Output {
111 use std::io::Write;
112 let _ = std::io::stdout().write_all(token.as_bytes());
113 let _ = std::io::stdout().flush();
114 }
115 }
116 WorkflowEvent::NodeCompleted { node_id, .. } => {
117 eprintln!("[node] {node_id} completed");
118 }
119 WorkflowEvent::ToolCallRequested {
120 tool_name, node_id, ..
121 } => {
122 eprintln!("[tool] {node_id} calling {tool_name}");
123 }
124 WorkflowEvent::ToolCallCompleted {
125 tool_name, node_id, ..
126 } => {
127 eprintln!("[tool] {node_id} {tool_name} done");
128 }
129 WorkflowEvent::NodeRetrying {
130 node_id,
131 attempt,
132 error,
133 } => {
134 eprintln!("[retry] {node_id} attempt #{attempt}: {error}");
135 }
136 WorkflowEvent::NodeFailed { node_id, error } => {
137 eprintln!("[error] {node_id}: {error}");
138 }
139 WorkflowEvent::WorkflowCompleted { .. } => {
140 eprintln!("[workflow] completed");
141 }
142 }
143 }
144}
145
146#[cfg(test)]
147fn emit(sink: Option<&dyn WorkflowEventSink>, event: WorkflowEvent) {
148 if let Some(s) = sink {
149 s.emit(&event);
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use std::sync::{Arc, Mutex};
157
158 #[test]
159 fn test_callback_sink_collects_events() {
160 let events: Arc<Mutex<Vec<WorkflowEvent>>> = Arc::new(Mutex::new(vec![]));
161 let events_clone = events.clone();
162 let sink = CallbackSink(move |e: &WorkflowEvent| {
163 events_clone.lock().unwrap().push(e.clone());
164 });
165 emit(
166 Some(&sink),
167 WorkflowEvent::WorkflowStarted {
168 workflow_id: "test".into(),
169 },
170 );
171 assert_eq!(events.lock().unwrap().len(), 1);
172 }
173
174 #[test]
175 fn test_event_serialization() {
176 let event = WorkflowEvent::NodeStarted {
177 node_id: "classify".into(),
178 node_type: NodeType::LlmCall,
179 };
180 let json = serde_json::to_value(&event).unwrap();
181 assert_eq!(json["type"], "node_started");
182 assert_eq!(json["node_id"], "classify");
183 assert_eq!(json["node_type"], "llm_call");
184 }
185
186 #[test]
187 fn test_noop_sink_does_not_panic() {
188 let sink = NoopSink;
189 emit(
190 Some(&sink),
191 WorkflowEvent::WorkflowStarted {
192 workflow_id: "x".into(),
193 },
194 );
195 }
196}