chronicle_proxy/
workflow_events.rs

1use std::fmt::Debug;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7use crate::{ProxyRequestInternalMetadata, ProxyRequestMetadata};
8
9/// Type-specific data for an event.
10#[derive(Debug, Deserialize)]
11#[serde(tag = "type", rename_all = "snake_case")]
12pub enum WorkflowEvent {
13    #[serde(rename = "run:start")]
14    RunStart(RunStartEvent),
15    #[serde(rename = "run:update")]
16    RunUpdate(RunUpdateEvent),
17    /// Event data for the start of a step.
18    #[serde(rename = "step:start")]
19    StepStart(StepEventData<StepStartData>),
20    /// Event data for the end of a step.
21    #[serde(rename = "step:end")]
22    StepEnd(StepEventData<StepEndData>),
23    /// Event data for a step error.
24    #[serde(rename = "step:error")]
25    StepError(StepEventData<ErrorData>),
26    /// Event data for a DAG node state change.
27    #[serde(rename = "step:state")]
28    StepState(StepEventData<StepStateData>),
29    #[serde(untagged)]
30    Event(EventPayload),
31}
32
33#[derive(Deserialize, Debug)]
34pub struct EventPayload {
35    #[serde(rename = "type")]
36    pub typ: String,
37    pub data: Option<serde_json::Value>,
38    pub error: Option<serde_json::Value>,
39    pub run_id: Uuid,
40    pub step_id: Uuid,
41    pub time: Option<DateTime<Utc>>,
42    #[serde(skip_deserializing)]
43    pub internal_metadata: Option<ProxyRequestInternalMetadata>,
44}
45
46/// An event that starts a run in a workflow.
47#[derive(Debug, Serialize, Deserialize)]
48pub struct RunStartEvent {
49    pub id: Uuid,
50    pub name: String,
51    pub description: Option<String>,
52    pub application: Option<String>,
53    pub environment: Option<String>,
54    pub input: Option<serde_json::Value>,
55    pub trace_id: Option<String>,
56    pub span_id: Option<String>,
57    /// A status to start with. If omitted, 'started' is used.
58    pub status: Option<String>,
59    #[serde(default)]
60    pub tags: Vec<String>,
61    pub info: Option<serde_json::Value>,
62    pub time: Option<DateTime<chrono::Utc>>,
63}
64
65impl RunStartEvent {
66    /// Merge metadata into the event.
67    pub fn merge_metadata(&mut self, other: &ProxyRequestMetadata) {
68        if self.application.is_none() {
69            self.application = other.application.clone();
70        }
71        if self.environment.is_none() {
72            self.environment = other.environment.clone();
73        }
74
75        // Create info if it doesn't exist
76        if self.info.is_none() {
77            self.info = Some(serde_json::Value::Object(serde_json::Map::new()));
78        }
79
80        // Get a mutable reference to the info object
81        let info = self.info.as_mut().unwrap().as_object_mut().unwrap();
82
83        // Add other fields to info
84        if let Some(org_id) = &other.organization_id {
85            info.insert(
86                "organization_id".to_string(),
87                serde_json::Value::String(org_id.clone()),
88            );
89        }
90        if let Some(project_id) = &other.project_id {
91            info.insert(
92                "project_id".to_string(),
93                serde_json::Value::String(project_id.clone()),
94            );
95        }
96        if let Some(user_id) = &other.user_id {
97            info.insert(
98                "user_id".to_string(),
99                serde_json::Value::String(user_id.clone()),
100            );
101        }
102        if let Some(workflow_id) = &other.workflow_id {
103            info.insert(
104                "workflow_id".to_string(),
105                serde_json::Value::String(workflow_id.clone()),
106            );
107        }
108        if let Some(workflow_name) = &other.workflow_name {
109            info.insert(
110                "workflow_name".to_string(),
111                serde_json::Value::String(workflow_name.clone()),
112            );
113        }
114        if let Some(step_index) = &other.step_index {
115            info.insert(
116                "step_index".to_string(),
117                serde_json::Value::Number((*step_index).into()),
118            );
119        }
120        if let Some(prompt_id) = &other.prompt_id {
121            info.insert(
122                "prompt_id".to_string(),
123                serde_json::Value::String(prompt_id.clone()),
124            );
125        }
126        if let Some(prompt_version) = &other.prompt_version {
127            info.insert(
128                "prompt_version".to_string(),
129                serde_json::Value::Number((*prompt_version).into()),
130            );
131        }
132
133        // Merge extra fields
134        if let Some(extra) = &other.extra {
135            for (key, value) in extra {
136                info.insert(key.clone(), value.clone());
137            }
138        }
139    }
140}
141
142/// An event that updates a run in a workflow.
143#[derive(Debug, Serialize, Deserialize)]
144pub struct RunUpdateEvent {
145    /// The run ID
146    pub id: Uuid,
147    /// The new status value for the run.
148    pub status: Option<String>,
149    pub output: Option<serde_json::Value>,
150    /// Extra info for the run. This is merged with any existing info.
151    pub info: Option<serde_json::Value>,
152    pub time: Option<DateTime<chrono::Utc>>,
153}
154
155/// An event that updates a run or step in a workflow.
156#[derive(Debug, Serialize, Deserialize)]
157pub struct StepEventData<DATA> {
158    /// A UUIDv7 identifying the step the event belongs to
159    pub step_id: Uuid,
160    /// A UUIDv7 for the entire run
161    pub run_id: Uuid,
162    /// The event's type and data
163    pub data: DATA,
164    pub time: Option<DateTime<chrono::Utc>>,
165}
166
167/// Data structure for the start of a step.
168#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct StepStartData {
170    #[serde(rename = "type")]
171    pub typ: String,
172    /// A human-readable name for this step
173    pub name: Option<String>,
174    /// UUID of the parent step, if any.
175    pub parent_step: Option<Uuid>,
176    /// Span ID for tracing purposes.
177    pub span_id: Option<String>,
178    /// Tags associated with the step.
179    #[serde(default)]
180    pub tags: Vec<String>,
181    /// Additional information about the step.
182    pub info: Option<serde_json::Value>,
183    /// Input data for the step.
184    #[serde(default)]
185    pub input: serde_json::Value,
186}
187
188/// Data structure for the end of a step.
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct StepEndData {
191    /// Output data from the step.
192    pub output: serde_json::Value,
193    /// Additional information about the step completion.
194    pub info: Option<serde_json::Value>,
195}
196
197/// Data structure for error information.
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct ErrorData {
200    /// Error message or description.
201    pub error: serde_json::Value,
202}
203
204/// Data structure for DAG node state information.
205#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct StepStateData {
207    /// Current state of the DAG node.
208    pub state: String,
209}
210
211#[cfg(test)]
212mod tests {
213    use serde_json::json;
214
215    use super::*;
216
217    #[test]
218    fn test_workflow_event_step_start_deserialization() {
219        let json_data = json!({
220            "type": "step:start",
221            "data": {
222                "parent_step": "01234567-89ab-cdef-0123-456789abcdef",
223                "type": "a_step",
224                "span_id": "span-456",
225                "tags": ["dag", "node"],
226                "info": {"node_type": "task"},
227                "input": {"task_param": "value"},
228                "name": "main_workflow",
229                "context": {"dag_context": "some_context"}
230            },
231            "run_id": "01234567-89ab-cdef-0123-456789abcdef",
232            "step_id": "fedcba98-7654-3210-fedc-ba9876543210",
233            "time": "2023-06-27T12:34:56Z"
234        });
235
236        let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
237
238        let WorkflowEvent::StepStart(event) = event else {
239            panic!("Expected StepStart event");
240        };
241
242        assert_eq!(
243            event.run_id.to_string(),
244            "01234567-89ab-cdef-0123-456789abcdef"
245        );
246        assert_eq!(
247            event.step_id.to_string(),
248            "fedcba98-7654-3210-fedc-ba9876543210"
249        );
250        assert_eq!(
251            event.time.unwrap().to_rfc3339(),
252            "2023-06-27T12:34:56+00:00"
253        );
254
255        assert_eq!(
256            event.data.parent_step.unwrap().to_string(),
257            "01234567-89ab-cdef-0123-456789abcdef"
258        );
259        assert_eq!(event.data.typ, "a_step");
260        assert_eq!(event.data.name.unwrap(), "main_workflow");
261        assert_eq!(event.data.span_id.unwrap(), "span-456");
262        assert_eq!(event.data.tags, vec!["dag", "node"]);
263        assert_eq!(event.data.info.unwrap(), json!({"node_type": "task"}));
264        assert_eq!(event.data.input, json!({"task_param": "value"}));
265    }
266
267    #[test]
268    fn test_workflow_event_step_end_deserialization() {
269        let json_data = json!({
270            "type": "step:end",
271            "data": {
272                "output": {"result": "success"},
273                "info": {"duration": 1000}
274            },
275            "run_id": "01234567-89ab-cdef-0123-456789abcdef",
276            "step_id": "fedcba98-7654-3210-fedc-ba9876543210",
277            "time": "2023-06-27T12:34:56Z"
278        });
279
280        let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
281        let WorkflowEvent::StepEnd(event) = event else {
282            panic!("Expected StepEnd event");
283        };
284
285        assert_eq!(
286            event.run_id.to_string(),
287            "01234567-89ab-cdef-0123-456789abcdef"
288        );
289        assert_eq!(
290            event.step_id.to_string(),
291            "fedcba98-7654-3210-fedc-ba9876543210"
292        );
293        assert_eq!(
294            event.time.unwrap().to_rfc3339(),
295            "2023-06-27T12:34:56+00:00"
296        );
297
298        assert_eq!(event.data.output, json!({"result": "success"}));
299        assert_eq!(event.data.info.unwrap(), json!({"duration": 1000}));
300    }
301
302    #[test]
303    fn test_workflow_event_step_error_deserialization() {
304        let json_data = json!({
305            "type": "step:error",
306            "data": {
307                "error": "Step execution failed"
308            },
309            "run_id": "12345678-90ab-cdef-1234-567890abcdef",
310            "step_id": "abcdef01-2345-6789-abcd-ef0123456789",
311            "time": "2023-06-27T17:00:00Z"
312        });
313
314        let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
315        let WorkflowEvent::StepError(event) = event else {
316            panic!("Expected StepEnd event");
317        };
318
319        assert_eq!(
320            event.run_id.to_string(),
321            "12345678-90ab-cdef-1234-567890abcdef"
322        );
323        assert_eq!(
324            event.step_id.to_string(),
325            "abcdef01-2345-6789-abcd-ef0123456789"
326        );
327        assert_eq!(
328            event.time.unwrap().to_rfc3339(),
329            "2023-06-27T17:00:00+00:00"
330        );
331
332        assert_eq!(event.data.error, "Step execution failed");
333    }
334
335    #[test]
336    fn test_workflow_event_run_start_deserialization() {
337        let json_data = json!({
338            "type": "run:start",
339            "id": "01234567-89ab-cdef-0123-456789abcdef",
340            "name": "Test Run",
341            "description": "A test run",
342            "application": "TestApp",
343            "environment": "staging",
344            "input": {"param": "value"},
345            "trace_id": "trace-123",
346            "span_id": "span-456",
347            "tags": ["test", "run"],
348            "info": {"extra": "info"},
349            "time": "2023-06-28T10:00:00Z"
350        });
351
352        let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
353        let WorkflowEvent::RunStart(event) = event else {
354            panic!("Expected RunStart event");
355        };
356
357        assert_eq!(event.id.to_string(), "01234567-89ab-cdef-0123-456789abcdef");
358        assert_eq!(event.name, "Test Run");
359        assert_eq!(event.description, Some("A test run".to_string()));
360        assert_eq!(event.application, Some("TestApp".to_string()));
361        assert_eq!(event.environment, Some("staging".to_string()));
362        assert_eq!(event.input, Some(json!({"param": "value"})));
363        assert_eq!(event.trace_id, Some("trace-123".to_string()));
364        assert_eq!(event.span_id, Some("span-456".to_string()));
365        assert_eq!(event.tags, vec!["test", "run"]);
366        assert_eq!(event.info, Some(json!({"extra": "info"})));
367        assert_eq!(
368            event.time.unwrap().to_rfc3339(),
369            "2023-06-28T10:00:00+00:00"
370        );
371    }
372
373    #[test]
374    fn test_workflow_event_run_update_deserialization() {
375        let json_data = json!({
376            "type": "run:update",
377            "id": "fedcba98-7654-3210-fedc-ba9876543210",
378            "status": "completed",
379            "output": {"result": "success"},
380            "info": {"duration": 2000},
381            "time": "2023-06-28T11:00:00Z"
382        });
383
384        let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
385        let WorkflowEvent::RunUpdate(event) = event else {
386            panic!("Expected RunUpdate event");
387        };
388
389        assert_eq!(event.id.to_string(), "fedcba98-7654-3210-fedc-ba9876543210");
390        assert_eq!(event.status, Some("completed".to_string()));
391        assert_eq!(event.output, Some(json!({"result": "success"})));
392        assert_eq!(event.info, Some(json!({"duration": 2000})));
393        assert_eq!(
394            event.time.unwrap().to_rfc3339(),
395            "2023-06-28T11:00:00+00:00"
396        );
397    }
398
399    #[test]
400    fn test_workflow_event_step_state_deserialization() {
401        let json_data = json!({
402            "type": "step:state",
403            "data": {
404                "state": "running"
405            },
406            "run_id": "12345678-90ab-cdef-1234-567890abcdef",
407            "step_id": "abcdef01-2345-6789-abcd-ef0123456789",
408            "time": "2023-06-28T12:00:00Z"
409        });
410
411        let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
412        let WorkflowEvent::StepState(event) = event else {
413            panic!("Expected StepState event");
414        };
415
416        assert_eq!(
417            event.run_id.to_string(),
418            "12345678-90ab-cdef-1234-567890abcdef"
419        );
420        assert_eq!(
421            event.step_id.to_string(),
422            "abcdef01-2345-6789-abcd-ef0123456789"
423        );
424        assert_eq!(
425            event.time.unwrap().to_rfc3339(),
426            "2023-06-28T12:00:00+00:00"
427        );
428        assert_eq!(event.data.state, "running");
429    }
430
431    #[test]
432    fn test_workflow_event_untagged_deserialization() {
433        let json_data = json!({
434            "type": "custom_event",
435            "data": {
436                "custom_field": "custom_value"
437            },
438
439            "run_id": "12345678-90ab-cdef-1234-567890abcdef",
440            "step_id": "abcdef01-2345-6789-abcd-ef0123456789",
441            "time": "2023-06-28T12:00:00Z"
442        });
443
444        let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
445        let WorkflowEvent::Event(event) = event else {
446            panic!("Expected untagged Event");
447        };
448
449        assert_eq!(event.typ, "custom_event");
450        assert_eq!(event.data, Some(json!({"custom_field": "custom_value"})));
451        assert_eq!(
452            event.run_id.to_string(),
453            "12345678-90ab-cdef-1234-567890abcdef"
454        );
455        assert_eq!(
456            event.step_id.to_string(),
457            "abcdef01-2345-6789-abcd-ef0123456789"
458        );
459        assert_eq!(
460            event.time.unwrap().to_rfc3339(),
461            "2023-06-28T12:00:00+00:00"
462        );
463    }
464}