Skip to main content

simple_agents_workflow/yaml_runner/
events.rs

1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3
4/// Typed workflow event for streaming and observation.
5#[derive(Debug, Clone, Serialize, Deserialize)]
6#[serde(tag = "type", rename_all = "snake_case")]
7pub enum WorkflowEvent {
8    /// Workflow execution has begun.
9    WorkflowStarted { workflow_id: String },
10    /// A workflow node has started executing.
11    NodeStarted {
12        node_id: String,
13        node_type: NodeType,
14    },
15    /// A token delta from an LLM node (streaming).
16    LlmTokenDelta {
17        node_id: String,
18        token: String,
19        token_kind: TokenKind,
20    },
21    /// A workflow node has finished executing.
22    NodeCompleted { node_id: String, output: Value },
23    /// An LLM node requested a tool call.
24    ToolCallRequested {
25        node_id: String,
26        tool_name: String,
27        arguments: Value,
28    },
29    /// A tool call has finished.
30    ToolCallCompleted {
31        node_id: String,
32        tool_name: String,
33        output: Value,
34    },
35    /// Human input was requested; workflow is paused.
36    HumanInputRequested { node_id: String, request: Value },
37    /// Human input was received; workflow can resume.
38    HumanInputReceived { node_id: String, response: Value },
39    /// A node is retrying after failure.
40    NodeRetrying {
41        node_id: String,
42        attempt: u8,
43        error: String,
44    },
45    /// A node has failed permanently.
46    NodeFailed { node_id: String, error: String },
47    /// Workflow execution completed.
48    WorkflowCompleted {
49        output: Value,
50        metadata: Option<Value>,
51    },
52}
53
54/// The type of workflow node.
55#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
56#[serde(rename_all = "snake_case")]
57pub enum NodeType {
58    LlmCall,
59    Switch,
60    CustomWorker,
61    HumanInput,
62    End,
63    Unknown,
64}
65
66/// The kind of LLM token in a streaming delta.
67#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
68#[serde(rename_all = "snake_case")]
69pub enum TokenKind {
70    Output,
71    Reasoning,
72}
73
74/// Trait for receiving workflow events.
75pub trait WorkflowEventSink: Send + Sync {
76    /// Called for each workflow event.
77    fn emit(&self, event: &WorkflowEvent);
78    /// Return true to request cancellation.
79    fn is_cancelled(&self) -> bool {
80        false
81    }
82}
83
84/// Wraps a closure as a WorkflowEventSink.
85pub struct CallbackSink<F: Fn(&WorkflowEvent) + Send + Sync>(pub F);
86
87impl<F: Fn(&WorkflowEvent) + Send + Sync> WorkflowEventSink for CallbackSink<F> {
88    fn emit(&self, event: &WorkflowEvent) {
89        (self.0)(event);
90    }
91}
92
93/// No-op sink that discards all events.
94pub struct NoopSink;
95impl WorkflowEventSink for NoopSink {
96    fn emit(&self, _event: &WorkflowEvent) {}
97}
98
99/// Built-in pretty-printer for workflow events.
100/// Streams LLM tokens to stdout, logs node lifecycle to stderr.
101pub struct DefaultEventPrinter;
102
103impl WorkflowEventSink for DefaultEventPrinter {
104    fn emit(&self, event: &WorkflowEvent) {
105        match event {
106            WorkflowEvent::WorkflowStarted { workflow_id } => {
107                eprintln!("[workflow] started: {workflow_id}");
108            }
109            WorkflowEvent::NodeStarted { node_id, node_type } => {
110                eprintln!("[node] {node_id} ({node_type:?}) started");
111            }
112            WorkflowEvent::LlmTokenDelta {
113                token, token_kind, ..
114            } => {
115                if *token_kind == TokenKind::Output {
116                    use std::io::Write;
117                    let _ = std::io::stdout().write_all(token.as_bytes());
118                    let _ = std::io::stdout().flush();
119                }
120            }
121            WorkflowEvent::NodeCompleted { node_id, .. } => {
122                eprintln!("[node] {node_id} completed");
123            }
124            WorkflowEvent::ToolCallRequested {
125                tool_name, node_id, ..
126            } => {
127                eprintln!("[tool] {node_id} calling {tool_name}");
128            }
129            WorkflowEvent::ToolCallCompleted {
130                tool_name, node_id, ..
131            } => {
132                eprintln!("[tool] {node_id} {tool_name} done");
133            }
134            WorkflowEvent::HumanInputRequested { node_id, .. } => {
135                eprintln!("[human] {node_id} waiting for response");
136            }
137            WorkflowEvent::HumanInputReceived { node_id, .. } => {
138                eprintln!("[human] {node_id} response received");
139            }
140            WorkflowEvent::NodeRetrying {
141                node_id,
142                attempt,
143                error,
144            } => {
145                eprintln!("[retry] {node_id} attempt #{attempt}: {error}");
146            }
147            WorkflowEvent::NodeFailed { node_id, error } => {
148                eprintln!("[error] {node_id}: {error}");
149            }
150            WorkflowEvent::WorkflowCompleted { .. } => {
151                eprintln!("[workflow] completed");
152            }
153        }
154    }
155}
156
157#[cfg(test)]
158fn emit(sink: Option<&dyn WorkflowEventSink>, event: WorkflowEvent) {
159    if let Some(s) = sink {
160        s.emit(&event);
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167    use std::sync::{Arc, Mutex};
168
169    #[test]
170    fn test_callback_sink_collects_events() {
171        let events: Arc<Mutex<Vec<WorkflowEvent>>> = Arc::new(Mutex::new(vec![]));
172        let events_clone = events.clone();
173        let sink = CallbackSink(move |e: &WorkflowEvent| {
174            events_clone.lock().unwrap().push(e.clone());
175        });
176        emit(
177            Some(&sink),
178            WorkflowEvent::WorkflowStarted {
179                workflow_id: "test".into(),
180            },
181        );
182        assert_eq!(events.lock().unwrap().len(), 1);
183    }
184
185    #[test]
186    fn test_event_serialization() {
187        let event = WorkflowEvent::NodeStarted {
188            node_id: "classify".into(),
189            node_type: NodeType::LlmCall,
190        };
191        let json = serde_json::to_value(&event).unwrap();
192        assert_eq!(json["type"], "node_started");
193        assert_eq!(json["node_id"], "classify");
194        assert_eq!(json["node_type"], "llm_call");
195    }
196
197    #[test]
198    fn test_noop_sink_does_not_panic() {
199        let sink = NoopSink;
200        emit(
201            Some(&sink),
202            WorkflowEvent::WorkflowStarted {
203                workflow_id: "x".into(),
204            },
205        );
206    }
207}