Skip to main content

smol_workflow_engine/
events.rs

1//! Workflow event stream types.
2//!
3//! This module contains the shared Rust representation of the smol-workflows
4//! JSONL event envelope documented in `docs/usages/events.md`. The top-level
5//! envelope is owned by smol-workflows; individual event `data` payloads are
6//! owned by their event type. In particular, `workflow.agent_event` data is raw
7//! provider-owned data and should not be normalized into a common agent schema.
8
9use serde::{Deserialize, Deserializer, Serialize, Serializer};
10use serde_json::Value;
11
12/// Known workflow event types.
13///
14/// The JSON representation is still the string value documented for the event
15/// stream, such as `"workflow.started"`. The enum gives Rust producers and
16/// sinks type-safe matching for known event types while preserving forward
17/// compatibility through [`WorkflowEventType::Other`].
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19pub enum WorkflowEventType {
20    /// A workflow scope started.
21    ///
22    /// The root `workflow.started` is the first event in a stream. Child
23    /// workflows invoked with `workflow(...)` may emit additional started
24    /// events with `metadata.workflowDepth > 0`.
25    Started,
26    /// Workflow code called `phase(name)`.
27    Phase,
28    /// Workflow code called `log(...)`.
29    Log,
30    /// A workflow-owned `agent(...)` call started.
31    AgentStarted,
32    /// Raw provider event payload associated with an `agent(...)` call.
33    AgentEvent,
34    /// A workflow-owned `agent(...)` call completed successfully.
35    AgentCompleted,
36    /// A workflow-owned `agent(...)` call failed.
37    AgentFailed,
38    /// A workflow scope completed successfully.
39    Result,
40    /// A workflow scope failed after event streaming had started.
41    Error,
42    /// Unknown or future event type.
43    ///
44    /// Consumers should ignore unknown event types unless they explicitly
45    /// support them. Keeping the original string allows lossless round-tripping.
46    Other(String),
47}
48
49impl WorkflowEventType {
50    /// Return the JSON string representation of this event type.
51    pub fn as_str(&self) -> &str {
52        match self {
53            Self::Started => "workflow.started",
54            Self::Phase => "workflow.phase",
55            Self::Log => "workflow.log",
56            Self::AgentStarted => "workflow.agent_started",
57            Self::AgentEvent => "workflow.agent_event",
58            Self::AgentCompleted => "workflow.agent_completed",
59            Self::AgentFailed => "workflow.agent_failed",
60            Self::Result => "workflow.result",
61            Self::Error => "workflow.error",
62            Self::Other(event_type) => event_type.as_str(),
63        }
64    }
65}
66
67impl From<&str> for WorkflowEventType {
68    fn from(value: &str) -> Self {
69        match value {
70            "workflow.started" => Self::Started,
71            "workflow.phase" => Self::Phase,
72            "workflow.log" => Self::Log,
73            "workflow.agent_started" => Self::AgentStarted,
74            "workflow.agent_event" => Self::AgentEvent,
75            "workflow.agent_completed" => Self::AgentCompleted,
76            "workflow.agent_failed" => Self::AgentFailed,
77            "workflow.result" => Self::Result,
78            "workflow.error" => Self::Error,
79            value => Self::Other(value.to_string()),
80        }
81    }
82}
83
84impl From<String> for WorkflowEventType {
85    fn from(value: String) -> Self {
86        WorkflowEventType::from(value.as_str())
87    }
88}
89
90impl std::fmt::Display for WorkflowEventType {
91    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        formatter.write_str(self.as_str())
93    }
94}
95
96impl Serialize for WorkflowEventType {
97    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
98    where
99        S: Serializer,
100    {
101        serializer.serialize_str(self.as_str())
102    }
103}
104
105impl<'de> Deserialize<'de> for WorkflowEventType {
106    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
107    where
108        D: Deserializer<'de>,
109    {
110        let event_type = String::deserialize(deserializer)?;
111        Ok(Self::from(event_type))
112    }
113}
114
115/// Optional metadata for correlating workflow events.
116#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
117#[serde(rename_all = "camelCase")]
118pub struct WorkflowEventMetadata {
119    /// Durable workflow run ID when available.
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub run_id: Option<String>,
122    /// Opaque workflow step ID for an event associated with a runtime step.
123    ///
124    /// For example, `workflow.agent_event` uses this to identify the
125    /// `agent(...)` request. The value is intentionally opaque; consumers must
126    /// not infer ordering from it.
127    #[serde(skip_serializing_if = "Option::is_none")]
128    pub step_id: Option<String>,
129    /// Agent provider name for provider-owned events.
130    #[serde(skip_serializing_if = "Option::is_none")]
131    pub provider: Option<String>,
132    /// Provider session/thread/conversation ID when the provider exposes one.
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub session_id: Option<String>,
135    /// Workflow nesting depth for this event scope.
136    ///
137    /// The root workflow has depth `0`; a direct child workflow has depth `1`.
138    #[serde(skip_serializing_if = "Option::is_none")]
139    pub workflow_depth: Option<u32>,
140    /// Opaque parent `workflow(...)` step ID for nested workflow events.
141    #[serde(skip_serializing_if = "Option::is_none")]
142    pub parent_step_id: Option<String>,
143}
144
145/// One smol-workflows event stream envelope.
146///
147/// Serialized events are intended to be written as JSON Lines: one complete
148/// [`WorkflowEvent`] per line. The JSON field name for [`event_type`](Self::event_type)
149/// is `type`.
150#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
151#[serde(rename_all = "camelCase")]
152pub struct WorkflowEvent {
153    /// Event type discriminator, serialized as the top-level `type` field.
154    #[serde(rename = "type")]
155    pub event_type: WorkflowEventType,
156    /// Nanoseconds since the root workflow stream start.
157    ///
158    /// The root `workflow.started` event may omit this. All later events,
159    /// including child `workflow.started` events, should include it.
160    #[serde(skip_serializing_if = "Option::is_none")]
161    pub elapsed_nanos: Option<u64>,
162    /// Optional correlation metadata.
163    #[serde(skip_serializing_if = "Option::is_none")]
164    pub metadata: Option<WorkflowEventMetadata>,
165    /// Event payload.
166    ///
167    /// The payload shape is defined by the event type. For
168    /// `workflow.agent_event`, this is raw provider-owned data.
169    pub data: Value,
170}
171
172impl WorkflowEvent {
173    /// Construct a `workflow.started` event.
174    pub fn started(start_time: String) -> Self {
175        Self {
176            event_type: WorkflowEventType::Started,
177            elapsed_nanos: None,
178            metadata: None,
179            data: serde_json::json!({ "startTime": start_time }),
180        }
181    }
182
183    /// Construct a `workflow.log` event.
184    pub fn log(message: String) -> Self {
185        Self {
186            event_type: WorkflowEventType::Log,
187            elapsed_nanos: None,
188            metadata: None,
189            data: serde_json::json!({ "message": message }),
190        }
191    }
192
193    /// Construct a `workflow.phase` event.
194    pub fn phase(name: String, options: Option<Value>) -> Self {
195        let mut data = serde_json::Map::new();
196        data.insert("name".to_string(), Value::String(name));
197        if let Some(options) = options {
198            data.insert("options".to_string(), options);
199        }
200        Self {
201            event_type: WorkflowEventType::Phase,
202            elapsed_nanos: None,
203            metadata: None,
204            data: Value::Object(data),
205        }
206    }
207
208    /// Construct a `workflow.result` event.
209    pub fn result(
210        input_tokens: u64,
211        output_tokens: u64,
212        total_tokens: u64,
213        results: Value,
214    ) -> Self {
215        Self {
216            event_type: WorkflowEventType::Result,
217            elapsed_nanos: None,
218            metadata: None,
219            data: serde_json::json!({
220                "tokenUsage": {
221                    "inputTokens": input_tokens,
222                    "outputTokens": output_tokens,
223                    "totalTokens": total_tokens,
224                },
225                "results": results,
226            }),
227        }
228    }
229
230    /// Construct a `workflow.error` event.
231    pub fn error(message: String, details: Option<String>) -> Self {
232        let mut data = serde_json::Map::new();
233        data.insert("message".to_string(), Value::String(message));
234        if let Some(details) = details {
235            data.insert("details".to_string(), Value::String(details));
236        }
237        Self {
238            event_type: WorkflowEventType::Error,
239            elapsed_nanos: None,
240            metadata: None,
241            data: Value::Object(data),
242        }
243    }
244
245    /// Construct a `workflow.agent_started` event.
246    pub fn agent_started(data: Value, metadata: WorkflowEventMetadata) -> Self {
247        Self {
248            event_type: WorkflowEventType::AgentStarted,
249            elapsed_nanos: None,
250            metadata: Some(metadata),
251            data,
252        }
253    }
254
255    /// Construct a `workflow.agent_event` event from raw provider data.
256    pub fn agent_event(data: Value, metadata: WorkflowEventMetadata) -> Self {
257        Self {
258            event_type: WorkflowEventType::AgentEvent,
259            elapsed_nanos: None,
260            metadata: Some(metadata),
261            data,
262        }
263    }
264
265    /// Construct a `workflow.agent_completed` event.
266    pub fn agent_completed(data: Value, metadata: WorkflowEventMetadata) -> Self {
267        Self {
268            event_type: WorkflowEventType::AgentCompleted,
269            elapsed_nanos: None,
270            metadata: Some(metadata),
271            data,
272        }
273    }
274
275    /// Construct a `workflow.agent_failed` event.
276    pub fn agent_failed(data: Value, metadata: WorkflowEventMetadata) -> Self {
277        Self {
278            event_type: WorkflowEventType::AgentFailed,
279            elapsed_nanos: None,
280            metadata: Some(metadata),
281            data,
282        }
283    }
284}
285
286/// Async receiver for workflow events.
287///
288/// Implementations may render events for humans, write JSONL, forward events to
289/// another process, or collect them for tests. Returning an error makes workflow
290/// execution fail, which is useful for strict machine-readable streams such as
291/// CLI `--events` output.
292#[async_trait::async_trait]
293pub trait WorkflowEventSink: Send + Sync {
294    /// Emit one workflow event.
295    async fn emit(&self, event: WorkflowEvent) -> anyhow::Result<()>;
296}