1use std::fmt::Debug;
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7use crate::{ProxyRequestInternalMetadata, ProxyRequestMetadata};
8
9#[derive(Debug, Deserialize)]
11#[serde(tag = "type", rename_all = "snake_case")]
12pub enum WorkflowEvent {
13 #[serde(rename = "run:start")]
14 RunStart(RunStartEvent),
15 #[serde(rename = "run:update")]
16 RunUpdate(RunUpdateEvent),
17 #[serde(rename = "step:start")]
19 StepStart(StepEventData<StepStartData>),
20 #[serde(rename = "step:end")]
22 StepEnd(StepEventData<StepEndData>),
23 #[serde(rename = "step:error")]
25 StepError(StepEventData<ErrorData>),
26 #[serde(rename = "step:state")]
28 StepState(StepEventData<StepStateData>),
29 #[serde(untagged)]
30 Event(EventPayload),
31}
32
33#[derive(Deserialize, Debug)]
34pub struct EventPayload {
35 #[serde(rename = "type")]
36 pub typ: String,
37 pub data: Option<serde_json::Value>,
38 pub error: Option<serde_json::Value>,
39 pub run_id: Uuid,
40 pub step_id: Uuid,
41 pub time: Option<DateTime<Utc>>,
42 #[serde(skip_deserializing)]
43 pub internal_metadata: Option<ProxyRequestInternalMetadata>,
44}
45
46#[derive(Debug, Serialize, Deserialize)]
48pub struct RunStartEvent {
49 pub id: Uuid,
50 pub name: String,
51 pub description: Option<String>,
52 pub application: Option<String>,
53 pub environment: Option<String>,
54 pub input: Option<serde_json::Value>,
55 pub trace_id: Option<String>,
56 pub span_id: Option<String>,
57 pub status: Option<String>,
59 #[serde(default)]
60 pub tags: Vec<String>,
61 pub info: Option<serde_json::Value>,
62 pub time: Option<DateTime<chrono::Utc>>,
63}
64
65impl RunStartEvent {
66 pub fn merge_metadata(&mut self, other: &ProxyRequestMetadata) {
68 if self.application.is_none() {
69 self.application = other.application.clone();
70 }
71 if self.environment.is_none() {
72 self.environment = other.environment.clone();
73 }
74
75 if self.info.is_none() {
77 self.info = Some(serde_json::Value::Object(serde_json::Map::new()));
78 }
79
80 let info = self.info.as_mut().unwrap().as_object_mut().unwrap();
82
83 if let Some(org_id) = &other.organization_id {
85 info.insert(
86 "organization_id".to_string(),
87 serde_json::Value::String(org_id.clone()),
88 );
89 }
90 if let Some(project_id) = &other.project_id {
91 info.insert(
92 "project_id".to_string(),
93 serde_json::Value::String(project_id.clone()),
94 );
95 }
96 if let Some(user_id) = &other.user_id {
97 info.insert(
98 "user_id".to_string(),
99 serde_json::Value::String(user_id.clone()),
100 );
101 }
102 if let Some(workflow_id) = &other.workflow_id {
103 info.insert(
104 "workflow_id".to_string(),
105 serde_json::Value::String(workflow_id.clone()),
106 );
107 }
108 if let Some(workflow_name) = &other.workflow_name {
109 info.insert(
110 "workflow_name".to_string(),
111 serde_json::Value::String(workflow_name.clone()),
112 );
113 }
114 if let Some(step_index) = &other.step_index {
115 info.insert(
116 "step_index".to_string(),
117 serde_json::Value::Number((*step_index).into()),
118 );
119 }
120 if let Some(prompt_id) = &other.prompt_id {
121 info.insert(
122 "prompt_id".to_string(),
123 serde_json::Value::String(prompt_id.clone()),
124 );
125 }
126 if let Some(prompt_version) = &other.prompt_version {
127 info.insert(
128 "prompt_version".to_string(),
129 serde_json::Value::Number((*prompt_version).into()),
130 );
131 }
132
133 if let Some(extra) = &other.extra {
135 for (key, value) in extra {
136 info.insert(key.clone(), value.clone());
137 }
138 }
139 }
140}
141
142#[derive(Debug, Serialize, Deserialize)]
144pub struct RunUpdateEvent {
145 pub id: Uuid,
147 pub status: Option<String>,
149 pub output: Option<serde_json::Value>,
150 pub info: Option<serde_json::Value>,
152 pub time: Option<DateTime<chrono::Utc>>,
153}
154
155#[derive(Debug, Serialize, Deserialize)]
157pub struct StepEventData<DATA> {
158 pub step_id: Uuid,
160 pub run_id: Uuid,
162 pub data: DATA,
164 pub time: Option<DateTime<chrono::Utc>>,
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct StepStartData {
170 #[serde(rename = "type")]
171 pub typ: String,
172 pub name: Option<String>,
174 pub parent_step: Option<Uuid>,
176 pub span_id: Option<String>,
178 #[serde(default)]
180 pub tags: Vec<String>,
181 pub info: Option<serde_json::Value>,
183 #[serde(default)]
185 pub input: serde_json::Value,
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct StepEndData {
191 pub output: serde_json::Value,
193 pub info: Option<serde_json::Value>,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct ErrorData {
200 pub error: serde_json::Value,
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct StepStateData {
207 pub state: String,
209}
210
211#[cfg(test)]
212mod tests {
213 use serde_json::json;
214
215 use super::*;
216
217 #[test]
218 fn test_workflow_event_step_start_deserialization() {
219 let json_data = json!({
220 "type": "step:start",
221 "data": {
222 "parent_step": "01234567-89ab-cdef-0123-456789abcdef",
223 "type": "a_step",
224 "span_id": "span-456",
225 "tags": ["dag", "node"],
226 "info": {"node_type": "task"},
227 "input": {"task_param": "value"},
228 "name": "main_workflow",
229 "context": {"dag_context": "some_context"}
230 },
231 "run_id": "01234567-89ab-cdef-0123-456789abcdef",
232 "step_id": "fedcba98-7654-3210-fedc-ba9876543210",
233 "time": "2023-06-27T12:34:56Z"
234 });
235
236 let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
237
238 let WorkflowEvent::StepStart(event) = event else {
239 panic!("Expected StepStart event");
240 };
241
242 assert_eq!(
243 event.run_id.to_string(),
244 "01234567-89ab-cdef-0123-456789abcdef"
245 );
246 assert_eq!(
247 event.step_id.to_string(),
248 "fedcba98-7654-3210-fedc-ba9876543210"
249 );
250 assert_eq!(
251 event.time.unwrap().to_rfc3339(),
252 "2023-06-27T12:34:56+00:00"
253 );
254
255 assert_eq!(
256 event.data.parent_step.unwrap().to_string(),
257 "01234567-89ab-cdef-0123-456789abcdef"
258 );
259 assert_eq!(event.data.typ, "a_step");
260 assert_eq!(event.data.name.unwrap(), "main_workflow");
261 assert_eq!(event.data.span_id.unwrap(), "span-456");
262 assert_eq!(event.data.tags, vec!["dag", "node"]);
263 assert_eq!(event.data.info.unwrap(), json!({"node_type": "task"}));
264 assert_eq!(event.data.input, json!({"task_param": "value"}));
265 }
266
267 #[test]
268 fn test_workflow_event_step_end_deserialization() {
269 let json_data = json!({
270 "type": "step:end",
271 "data": {
272 "output": {"result": "success"},
273 "info": {"duration": 1000}
274 },
275 "run_id": "01234567-89ab-cdef-0123-456789abcdef",
276 "step_id": "fedcba98-7654-3210-fedc-ba9876543210",
277 "time": "2023-06-27T12:34:56Z"
278 });
279
280 let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
281 let WorkflowEvent::StepEnd(event) = event else {
282 panic!("Expected StepEnd event");
283 };
284
285 assert_eq!(
286 event.run_id.to_string(),
287 "01234567-89ab-cdef-0123-456789abcdef"
288 );
289 assert_eq!(
290 event.step_id.to_string(),
291 "fedcba98-7654-3210-fedc-ba9876543210"
292 );
293 assert_eq!(
294 event.time.unwrap().to_rfc3339(),
295 "2023-06-27T12:34:56+00:00"
296 );
297
298 assert_eq!(event.data.output, json!({"result": "success"}));
299 assert_eq!(event.data.info.unwrap(), json!({"duration": 1000}));
300 }
301
302 #[test]
303 fn test_workflow_event_step_error_deserialization() {
304 let json_data = json!({
305 "type": "step:error",
306 "data": {
307 "error": "Step execution failed"
308 },
309 "run_id": "12345678-90ab-cdef-1234-567890abcdef",
310 "step_id": "abcdef01-2345-6789-abcd-ef0123456789",
311 "time": "2023-06-27T17:00:00Z"
312 });
313
314 let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
315 let WorkflowEvent::StepError(event) = event else {
316 panic!("Expected StepEnd event");
317 };
318
319 assert_eq!(
320 event.run_id.to_string(),
321 "12345678-90ab-cdef-1234-567890abcdef"
322 );
323 assert_eq!(
324 event.step_id.to_string(),
325 "abcdef01-2345-6789-abcd-ef0123456789"
326 );
327 assert_eq!(
328 event.time.unwrap().to_rfc3339(),
329 "2023-06-27T17:00:00+00:00"
330 );
331
332 assert_eq!(event.data.error, "Step execution failed");
333 }
334
335 #[test]
336 fn test_workflow_event_run_start_deserialization() {
337 let json_data = json!({
338 "type": "run:start",
339 "id": "01234567-89ab-cdef-0123-456789abcdef",
340 "name": "Test Run",
341 "description": "A test run",
342 "application": "TestApp",
343 "environment": "staging",
344 "input": {"param": "value"},
345 "trace_id": "trace-123",
346 "span_id": "span-456",
347 "tags": ["test", "run"],
348 "info": {"extra": "info"},
349 "time": "2023-06-28T10:00:00Z"
350 });
351
352 let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
353 let WorkflowEvent::RunStart(event) = event else {
354 panic!("Expected RunStart event");
355 };
356
357 assert_eq!(event.id.to_string(), "01234567-89ab-cdef-0123-456789abcdef");
358 assert_eq!(event.name, "Test Run");
359 assert_eq!(event.description, Some("A test run".to_string()));
360 assert_eq!(event.application, Some("TestApp".to_string()));
361 assert_eq!(event.environment, Some("staging".to_string()));
362 assert_eq!(event.input, Some(json!({"param": "value"})));
363 assert_eq!(event.trace_id, Some("trace-123".to_string()));
364 assert_eq!(event.span_id, Some("span-456".to_string()));
365 assert_eq!(event.tags, vec!["test", "run"]);
366 assert_eq!(event.info, Some(json!({"extra": "info"})));
367 assert_eq!(
368 event.time.unwrap().to_rfc3339(),
369 "2023-06-28T10:00:00+00:00"
370 );
371 }
372
373 #[test]
374 fn test_workflow_event_run_update_deserialization() {
375 let json_data = json!({
376 "type": "run:update",
377 "id": "fedcba98-7654-3210-fedc-ba9876543210",
378 "status": "completed",
379 "output": {"result": "success"},
380 "info": {"duration": 2000},
381 "time": "2023-06-28T11:00:00Z"
382 });
383
384 let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
385 let WorkflowEvent::RunUpdate(event) = event else {
386 panic!("Expected RunUpdate event");
387 };
388
389 assert_eq!(event.id.to_string(), "fedcba98-7654-3210-fedc-ba9876543210");
390 assert_eq!(event.status, Some("completed".to_string()));
391 assert_eq!(event.output, Some(json!({"result": "success"})));
392 assert_eq!(event.info, Some(json!({"duration": 2000})));
393 assert_eq!(
394 event.time.unwrap().to_rfc3339(),
395 "2023-06-28T11:00:00+00:00"
396 );
397 }
398
399 #[test]
400 fn test_workflow_event_step_state_deserialization() {
401 let json_data = json!({
402 "type": "step:state",
403 "data": {
404 "state": "running"
405 },
406 "run_id": "12345678-90ab-cdef-1234-567890abcdef",
407 "step_id": "abcdef01-2345-6789-abcd-ef0123456789",
408 "time": "2023-06-28T12:00:00Z"
409 });
410
411 let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
412 let WorkflowEvent::StepState(event) = event else {
413 panic!("Expected StepState event");
414 };
415
416 assert_eq!(
417 event.run_id.to_string(),
418 "12345678-90ab-cdef-1234-567890abcdef"
419 );
420 assert_eq!(
421 event.step_id.to_string(),
422 "abcdef01-2345-6789-abcd-ef0123456789"
423 );
424 assert_eq!(
425 event.time.unwrap().to_rfc3339(),
426 "2023-06-28T12:00:00+00:00"
427 );
428 assert_eq!(event.data.state, "running");
429 }
430
431 #[test]
432 fn test_workflow_event_untagged_deserialization() {
433 let json_data = json!({
434 "type": "custom_event",
435 "data": {
436 "custom_field": "custom_value"
437 },
438
439 "run_id": "12345678-90ab-cdef-1234-567890abcdef",
440 "step_id": "abcdef01-2345-6789-abcd-ef0123456789",
441 "time": "2023-06-28T12:00:00Z"
442 });
443
444 let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
445 let WorkflowEvent::Event(event) = event else {
446 panic!("Expected untagged Event");
447 };
448
449 assert_eq!(event.typ, "custom_event");
450 assert_eq!(event.data, Some(json!({"custom_field": "custom_value"})));
451 assert_eq!(
452 event.run_id.to_string(),
453 "12345678-90ab-cdef-1234-567890abcdef"
454 );
455 assert_eq!(
456 event.step_id.to_string(),
457 "abcdef01-2345-6789-abcd-ef0123456789"
458 );
459 assert_eq!(
460 event.time.unwrap().to_rfc3339(),
461 "2023-06-28T12:00:00+00:00"
462 );
463 }
464}