Skip to main content

simple_agents_workflow/yaml_runner/
events.rs

1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3
4/// Typed workflow event for streaming and observation.
5#[derive(Debug, Clone, Serialize, Deserialize)]
6#[serde(tag = "type", rename_all = "snake_case")]
7pub enum WorkflowEvent {
8    /// Workflow execution has begun.
9    WorkflowStarted { workflow_id: String },
10    /// A workflow node has started executing.
11    NodeStarted {
12        node_id: String,
13        node_type: NodeType,
14    },
15    /// A token delta from an LLM node (streaming).
16    LlmTokenDelta {
17        node_id: String,
18        token: String,
19        token_kind: TokenKind,
20    },
21    /// A workflow node has finished executing.
22    NodeCompleted { node_id: String, output: Value },
23    /// An LLM node requested a tool call.
24    ToolCallRequested {
25        node_id: String,
26        tool_name: String,
27        arguments: Value,
28    },
29    /// A tool call has finished.
30    ToolCallCompleted {
31        node_id: String,
32        tool_name: String,
33        output: Value,
34    },
35    /// A node is retrying after failure.
36    NodeRetrying {
37        node_id: String,
38        attempt: u8,
39        error: String,
40    },
41    /// A node has failed permanently.
42    NodeFailed { node_id: String, error: String },
43    /// Workflow execution completed.
44    WorkflowCompleted {
45        output: Value,
46        metadata: Option<Value>,
47    },
48}
49
50/// The type of workflow node.
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
52#[serde(rename_all = "snake_case")]
53pub enum NodeType {
54    LlmCall,
55    Switch,
56    End,
57}
58
59/// The kind of LLM token in a streaming delta.
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
61#[serde(rename_all = "snake_case")]
62pub enum TokenKind {
63    Output,
64    Reasoning,
65}
66
67/// Trait for receiving workflow events.
68pub trait WorkflowEventSink: Send + Sync {
69    /// Called for each workflow event.
70    fn emit(&self, event: &WorkflowEvent);
71    /// Return true to request cancellation.
72    fn is_cancelled(&self) -> bool {
73        false
74    }
75}
76
77/// Wraps a closure as a WorkflowEventSink.
78pub struct CallbackSink<F: Fn(&WorkflowEvent) + Send + Sync>(pub F);
79
80impl<F: Fn(&WorkflowEvent) + Send + Sync> WorkflowEventSink for CallbackSink<F> {
81    fn emit(&self, event: &WorkflowEvent) {
82        (self.0)(event);
83    }
84}
85
86/// No-op sink that discards all events.
87pub struct NoopSink;
88impl WorkflowEventSink for NoopSink {
89    fn emit(&self, _event: &WorkflowEvent) {}
90}
91
92/// Built-in pretty-printer for workflow events.
93/// Streams LLM tokens to stdout, logs node lifecycle to stderr.
94pub struct DefaultEventPrinter;
95
96impl WorkflowEventSink for DefaultEventPrinter {
97    fn emit(&self, event: &WorkflowEvent) {
98        match event {
99            WorkflowEvent::WorkflowStarted { workflow_id } => {
100                eprintln!("[workflow] started: {workflow_id}");
101            }
102            WorkflowEvent::NodeStarted { node_id, node_type } => {
103                eprintln!("[node] {node_id} ({node_type:?}) started");
104            }
105            WorkflowEvent::LlmTokenDelta {
106                token, token_kind, ..
107            } => {
108                if *token_kind == TokenKind::Output {
109                    use std::io::Write;
110                    let _ = std::io::stdout().write_all(token.as_bytes());
111                    let _ = std::io::stdout().flush();
112                }
113            }
114            WorkflowEvent::NodeCompleted { node_id, .. } => {
115                eprintln!("[node] {node_id} completed");
116            }
117            WorkflowEvent::ToolCallRequested {
118                tool_name, node_id, ..
119            } => {
120                eprintln!("[tool] {node_id} calling {tool_name}");
121            }
122            WorkflowEvent::ToolCallCompleted {
123                tool_name, node_id, ..
124            } => {
125                eprintln!("[tool] {node_id} {tool_name} done");
126            }
127            WorkflowEvent::NodeRetrying {
128                node_id,
129                attempt,
130                error,
131            } => {
132                eprintln!("[retry] {node_id} attempt #{attempt}: {error}");
133            }
134            WorkflowEvent::NodeFailed { node_id, error } => {
135                eprintln!("[error] {node_id}: {error}");
136            }
137            WorkflowEvent::WorkflowCompleted { .. } => {
138                eprintln!("[workflow] completed");
139            }
140        }
141    }
142}
143
144#[cfg(test)]
145fn emit(sink: Option<&dyn WorkflowEventSink>, event: WorkflowEvent) {
146    if let Some(s) = sink {
147        s.emit(&event);
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use std::sync::{Arc, Mutex};
155
156    #[test]
157    fn test_callback_sink_collects_events() {
158        let events: Arc<Mutex<Vec<WorkflowEvent>>> = Arc::new(Mutex::new(vec![]));
159        let events_clone = events.clone();
160        let sink = CallbackSink(move |e: &WorkflowEvent| {
161            events_clone.lock().unwrap().push(e.clone());
162        });
163        emit(
164            Some(&sink),
165            WorkflowEvent::WorkflowStarted {
166                workflow_id: "test".into(),
167            },
168        );
169        assert_eq!(events.lock().unwrap().len(), 1);
170    }
171
172    #[test]
173    fn test_event_serialization() {
174        let event = WorkflowEvent::NodeStarted {
175            node_id: "classify".into(),
176            node_type: NodeType::LlmCall,
177        };
178        let json = serde_json::to_value(&event).unwrap();
179        assert_eq!(json["type"], "node_started");
180        assert_eq!(json["node_id"], "classify");
181        assert_eq!(json["node_type"], "llm_call");
182    }
183
184    #[test]
185    fn test_noop_sink_does_not_panic() {
186        let sink = NoopSink;
187        emit(
188            Some(&sink),
189            WorkflowEvent::WorkflowStarted {
190                workflow_id: "x".into(),
191            },
192        );
193    }
194}