simple_agents_workflow/yaml_runner/
events.rs1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3
4#[derive(Debug, Clone, Serialize, Deserialize)]
6#[serde(tag = "type", rename_all = "snake_case")]
7pub enum WorkflowEvent {
8 WorkflowStarted { workflow_id: String },
10 NodeStarted {
12 node_id: String,
13 node_type: NodeType,
14 },
15 LlmTokenDelta {
17 node_id: String,
18 token: String,
19 token_kind: TokenKind,
20 },
21 NodeCompleted { node_id: String, output: Value },
23 ToolCallRequested {
25 node_id: String,
26 tool_name: String,
27 arguments: Value,
28 },
29 ToolCallCompleted {
31 node_id: String,
32 tool_name: String,
33 output: Value,
34 },
35 HumanInputRequested { node_id: String, request: Value },
37 HumanInputReceived { node_id: String, response: Value },
39 NodeRetrying {
41 node_id: String,
42 attempt: u8,
43 error: String,
44 },
45 NodeFailed { node_id: String, error: String },
47 WorkflowCompleted {
49 output: Value,
50 metadata: Option<Value>,
51 },
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
56#[serde(rename_all = "snake_case")]
57pub enum NodeType {
58 LlmCall,
59 Switch,
60 CustomWorker,
61 HumanInput,
62 End,
63 Unknown,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
68#[serde(rename_all = "snake_case")]
69pub enum TokenKind {
70 Output,
71 Reasoning,
72}
73
74pub trait WorkflowEventSink: Send + Sync {
76 fn emit(&self, event: &WorkflowEvent);
78 fn is_cancelled(&self) -> bool {
80 false
81 }
82}
83
84pub struct CallbackSink<F: Fn(&WorkflowEvent) + Send + Sync>(pub F);
86
87impl<F: Fn(&WorkflowEvent) + Send + Sync> WorkflowEventSink for CallbackSink<F> {
88 fn emit(&self, event: &WorkflowEvent) {
89 (self.0)(event);
90 }
91}
92
93pub struct NoopSink;
95impl WorkflowEventSink for NoopSink {
96 fn emit(&self, _event: &WorkflowEvent) {}
97}
98
99pub struct DefaultEventPrinter;
102
103impl WorkflowEventSink for DefaultEventPrinter {
104 fn emit(&self, event: &WorkflowEvent) {
105 match event {
106 WorkflowEvent::WorkflowStarted { workflow_id } => {
107 eprintln!("[workflow] started: {workflow_id}");
108 }
109 WorkflowEvent::NodeStarted { node_id, node_type } => {
110 eprintln!("[node] {node_id} ({node_type:?}) started");
111 }
112 WorkflowEvent::LlmTokenDelta {
113 token, token_kind, ..
114 } => {
115 if *token_kind == TokenKind::Output {
116 use std::io::Write;
117 let _ = std::io::stdout().write_all(token.as_bytes());
118 let _ = std::io::stdout().flush();
119 }
120 }
121 WorkflowEvent::NodeCompleted { node_id, .. } => {
122 eprintln!("[node] {node_id} completed");
123 }
124 WorkflowEvent::ToolCallRequested {
125 tool_name, node_id, ..
126 } => {
127 eprintln!("[tool] {node_id} calling {tool_name}");
128 }
129 WorkflowEvent::ToolCallCompleted {
130 tool_name, node_id, ..
131 } => {
132 eprintln!("[tool] {node_id} {tool_name} done");
133 }
134 WorkflowEvent::HumanInputRequested { node_id, .. } => {
135 eprintln!("[human] {node_id} waiting for response");
136 }
137 WorkflowEvent::HumanInputReceived { node_id, .. } => {
138 eprintln!("[human] {node_id} response received");
139 }
140 WorkflowEvent::NodeRetrying {
141 node_id,
142 attempt,
143 error,
144 } => {
145 eprintln!("[retry] {node_id} attempt #{attempt}: {error}");
146 }
147 WorkflowEvent::NodeFailed { node_id, error } => {
148 eprintln!("[error] {node_id}: {error}");
149 }
150 WorkflowEvent::WorkflowCompleted { .. } => {
151 eprintln!("[workflow] completed");
152 }
153 }
154 }
155}
156
157#[cfg(test)]
158fn emit(sink: Option<&dyn WorkflowEventSink>, event: WorkflowEvent) {
159 if let Some(s) = sink {
160 s.emit(&event);
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167 use std::sync::{Arc, Mutex};
168
169 #[test]
170 fn test_callback_sink_collects_events() {
171 let events: Arc<Mutex<Vec<WorkflowEvent>>> = Arc::new(Mutex::new(vec![]));
172 let events_clone = events.clone();
173 let sink = CallbackSink(move |e: &WorkflowEvent| {
174 events_clone.lock().unwrap().push(e.clone());
175 });
176 emit(
177 Some(&sink),
178 WorkflowEvent::WorkflowStarted {
179 workflow_id: "test".into(),
180 },
181 );
182 assert_eq!(events.lock().unwrap().len(), 1);
183 }
184
185 #[test]
186 fn test_event_serialization() {
187 let event = WorkflowEvent::NodeStarted {
188 node_id: "classify".into(),
189 node_type: NodeType::LlmCall,
190 };
191 let json = serde_json::to_value(&event).unwrap();
192 assert_eq!(json["type"], "node_started");
193 assert_eq!(json["node_id"], "classify");
194 assert_eq!(json["node_type"], "llm_call");
195 }
196
197 #[test]
198 fn test_noop_sink_does_not_panic() {
199 let sink = NoopSink;
200 emit(
201 Some(&sink),
202 WorkflowEvent::WorkflowStarted {
203 workflow_id: "x".into(),
204 },
205 );
206 }
207}