simple_agents_workflow/yaml_runner/
events.rs1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3
4#[derive(Debug, Clone, Serialize, Deserialize)]
6#[serde(tag = "type", rename_all = "snake_case")]
7pub enum WorkflowEvent {
8 WorkflowStarted { workflow_id: String },
10 NodeStarted {
12 node_id: String,
13 node_type: NodeType,
14 },
15 LlmTokenDelta {
17 node_id: String,
18 token: String,
19 token_kind: TokenKind,
20 },
21 NodeCompleted { node_id: String, output: Value },
23 ToolCallRequested {
25 node_id: String,
26 tool_name: String,
27 arguments: Value,
28 },
29 ToolCallCompleted {
31 node_id: String,
32 tool_name: String,
33 output: Value,
34 },
35 NodeRetrying {
37 node_id: String,
38 attempt: u8,
39 error: String,
40 },
41 NodeFailed { node_id: String, error: String },
43 WorkflowCompleted {
45 output: Value,
46 metadata: Option<Value>,
47 },
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
52#[serde(rename_all = "snake_case")]
53pub enum NodeType {
54 LlmCall,
55 Switch,
56 End,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
61#[serde(rename_all = "snake_case")]
62pub enum TokenKind {
63 Output,
64 Reasoning,
65}
66
67pub trait WorkflowEventSink: Send + Sync {
69 fn emit(&self, event: &WorkflowEvent);
71 fn is_cancelled(&self) -> bool {
73 false
74 }
75}
76
77pub struct CallbackSink<F: Fn(&WorkflowEvent) + Send + Sync>(pub F);
79
80impl<F: Fn(&WorkflowEvent) + Send + Sync> WorkflowEventSink for CallbackSink<F> {
81 fn emit(&self, event: &WorkflowEvent) {
82 (self.0)(event);
83 }
84}
85
86pub struct NoopSink;
88impl WorkflowEventSink for NoopSink {
89 fn emit(&self, _event: &WorkflowEvent) {}
90}
91
92pub struct DefaultEventPrinter;
95
96impl WorkflowEventSink for DefaultEventPrinter {
97 fn emit(&self, event: &WorkflowEvent) {
98 match event {
99 WorkflowEvent::WorkflowStarted { workflow_id } => {
100 eprintln!("[workflow] started: {workflow_id}");
101 }
102 WorkflowEvent::NodeStarted { node_id, node_type } => {
103 eprintln!("[node] {node_id} ({node_type:?}) started");
104 }
105 WorkflowEvent::LlmTokenDelta {
106 token, token_kind, ..
107 } => {
108 if *token_kind == TokenKind::Output {
109 use std::io::Write;
110 let _ = std::io::stdout().write_all(token.as_bytes());
111 let _ = std::io::stdout().flush();
112 }
113 }
114 WorkflowEvent::NodeCompleted { node_id, .. } => {
115 eprintln!("[node] {node_id} completed");
116 }
117 WorkflowEvent::ToolCallRequested {
118 tool_name, node_id, ..
119 } => {
120 eprintln!("[tool] {node_id} calling {tool_name}");
121 }
122 WorkflowEvent::ToolCallCompleted {
123 tool_name, node_id, ..
124 } => {
125 eprintln!("[tool] {node_id} {tool_name} done");
126 }
127 WorkflowEvent::NodeRetrying {
128 node_id,
129 attempt,
130 error,
131 } => {
132 eprintln!("[retry] {node_id} attempt #{attempt}: {error}");
133 }
134 WorkflowEvent::NodeFailed { node_id, error } => {
135 eprintln!("[error] {node_id}: {error}");
136 }
137 WorkflowEvent::WorkflowCompleted { .. } => {
138 eprintln!("[workflow] completed");
139 }
140 }
141 }
142}
143
144#[cfg(test)]
145fn emit(sink: Option<&dyn WorkflowEventSink>, event: WorkflowEvent) {
146 if let Some(s) = sink {
147 s.emit(&event);
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use std::sync::{Arc, Mutex};
155
156 #[test]
157 fn test_callback_sink_collects_events() {
158 let events: Arc<Mutex<Vec<WorkflowEvent>>> = Arc::new(Mutex::new(vec![]));
159 let events_clone = events.clone();
160 let sink = CallbackSink(move |e: &WorkflowEvent| {
161 events_clone.lock().unwrap().push(e.clone());
162 });
163 emit(
164 Some(&sink),
165 WorkflowEvent::WorkflowStarted {
166 workflow_id: "test".into(),
167 },
168 );
169 assert_eq!(events.lock().unwrap().len(), 1);
170 }
171
172 #[test]
173 fn test_event_serialization() {
174 let event = WorkflowEvent::NodeStarted {
175 node_id: "classify".into(),
176 node_type: NodeType::LlmCall,
177 };
178 let json = serde_json::to_value(&event).unwrap();
179 assert_eq!(json["type"], "node_started");
180 assert_eq!(json["node_id"], "classify");
181 assert_eq!(json["node_type"], "llm_call");
182 }
183
184 #[test]
185 fn test_noop_sink_does_not_panic() {
186 let sink = NoopSink;
187 emit(
188 Some(&sink),
189 WorkflowEvent::WorkflowStarted {
190 workflow_id: "x".into(),
191 },
192 );
193 }
194}