use std::fmt::Debug;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::{ProxyRequestInternalMetadata, ProxyRequestMetadata};
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum WorkflowEvent {
#[serde(rename = "run:start")]
RunStart(RunStartEvent),
#[serde(rename = "run:update")]
RunUpdate(RunUpdateEvent),
#[serde(rename = "step:start")]
StepStart(StepEventData<StepStartData>),
#[serde(rename = "step:end")]
StepEnd(StepEventData<StepEndData>),
#[serde(rename = "step:error")]
StepError(StepEventData<ErrorData>),
#[serde(rename = "step:state")]
StepState(StepEventData<StepStateData>),
#[serde(untagged)]
Event(EventPayload),
}
#[derive(Deserialize, Debug)]
pub struct EventPayload {
#[serde(rename = "type")]
pub typ: String,
pub data: Option<serde_json::Value>,
pub error: Option<serde_json::Value>,
pub run_id: Uuid,
pub step_id: Uuid,
pub time: Option<DateTime<Utc>>,
#[serde(skip_deserializing)]
pub internal_metadata: Option<ProxyRequestInternalMetadata>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RunStartEvent {
pub id: Uuid,
pub name: String,
pub description: Option<String>,
pub application: Option<String>,
pub environment: Option<String>,
pub input: Option<serde_json::Value>,
pub trace_id: Option<String>,
pub span_id: Option<String>,
pub status: Option<String>,
#[serde(default)]
pub tags: Vec<String>,
pub info: Option<serde_json::Value>,
pub time: Option<DateTime<chrono::Utc>>,
}
impl RunStartEvent {
pub fn merge_metadata(&mut self, other: &ProxyRequestMetadata) {
if self.application.is_none() {
self.application = other.application.clone();
}
if self.environment.is_none() {
self.environment = other.environment.clone();
}
if self.info.is_none() {
self.info = Some(serde_json::Value::Object(serde_json::Map::new()));
}
let info = self.info.as_mut().unwrap().as_object_mut().unwrap();
if let Some(org_id) = &other.organization_id {
info.insert(
"organization_id".to_string(),
serde_json::Value::String(org_id.clone()),
);
}
if let Some(project_id) = &other.project_id {
info.insert(
"project_id".to_string(),
serde_json::Value::String(project_id.clone()),
);
}
if let Some(user_id) = &other.user_id {
info.insert(
"user_id".to_string(),
serde_json::Value::String(user_id.clone()),
);
}
if let Some(workflow_id) = &other.workflow_id {
info.insert(
"workflow_id".to_string(),
serde_json::Value::String(workflow_id.clone()),
);
}
if let Some(workflow_name) = &other.workflow_name {
info.insert(
"workflow_name".to_string(),
serde_json::Value::String(workflow_name.clone()),
);
}
if let Some(step_index) = &other.step_index {
info.insert(
"step_index".to_string(),
serde_json::Value::Number((*step_index).into()),
);
}
if let Some(prompt_id) = &other.prompt_id {
info.insert(
"prompt_id".to_string(),
serde_json::Value::String(prompt_id.clone()),
);
}
if let Some(prompt_version) = &other.prompt_version {
info.insert(
"prompt_version".to_string(),
serde_json::Value::Number((*prompt_version).into()),
);
}
if let Some(extra) = &other.extra {
for (key, value) in extra {
info.insert(key.clone(), value.clone());
}
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RunUpdateEvent {
pub id: Uuid,
pub status: Option<String>,
pub output: Option<serde_json::Value>,
pub info: Option<serde_json::Value>,
pub time: Option<DateTime<chrono::Utc>>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StepEventData<DATA> {
pub step_id: Uuid,
pub run_id: Uuid,
pub data: DATA,
pub time: Option<DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepStartData {
#[serde(rename = "type")]
pub typ: String,
pub name: Option<String>,
pub parent_step: Option<Uuid>,
pub span_id: Option<String>,
#[serde(default)]
pub tags: Vec<String>,
pub info: Option<serde_json::Value>,
#[serde(default)]
pub input: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepEndData {
pub output: serde_json::Value,
pub info: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorData {
pub error: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepStateData {
pub state: String,
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
#[test]
fn test_workflow_event_step_start_deserialization() {
let json_data = json!({
"type": "step:start",
"data": {
"parent_step": "01234567-89ab-cdef-0123-456789abcdef",
"type": "a_step",
"span_id": "span-456",
"tags": ["dag", "node"],
"info": {"node_type": "task"},
"input": {"task_param": "value"},
"name": "main_workflow",
"context": {"dag_context": "some_context"}
},
"run_id": "01234567-89ab-cdef-0123-456789abcdef",
"step_id": "fedcba98-7654-3210-fedc-ba9876543210",
"time": "2023-06-27T12:34:56Z"
});
let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
let WorkflowEvent::StepStart(event) = event else {
panic!("Expected StepStart event");
};
assert_eq!(
event.run_id.to_string(),
"01234567-89ab-cdef-0123-456789abcdef"
);
assert_eq!(
event.step_id.to_string(),
"fedcba98-7654-3210-fedc-ba9876543210"
);
assert_eq!(
event.time.unwrap().to_rfc3339(),
"2023-06-27T12:34:56+00:00"
);
assert_eq!(
event.data.parent_step.unwrap().to_string(),
"01234567-89ab-cdef-0123-456789abcdef"
);
assert_eq!(event.data.typ, "a_step");
assert_eq!(event.data.name.unwrap(), "main_workflow");
assert_eq!(event.data.span_id.unwrap(), "span-456");
assert_eq!(event.data.tags, vec!["dag", "node"]);
assert_eq!(event.data.info.unwrap(), json!({"node_type": "task"}));
assert_eq!(event.data.input, json!({"task_param": "value"}));
}
#[test]
fn test_workflow_event_step_end_deserialization() {
let json_data = json!({
"type": "step:end",
"data": {
"output": {"result": "success"},
"info": {"duration": 1000}
},
"run_id": "01234567-89ab-cdef-0123-456789abcdef",
"step_id": "fedcba98-7654-3210-fedc-ba9876543210",
"time": "2023-06-27T12:34:56Z"
});
let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
let WorkflowEvent::StepEnd(event) = event else {
panic!("Expected StepEnd event");
};
assert_eq!(
event.run_id.to_string(),
"01234567-89ab-cdef-0123-456789abcdef"
);
assert_eq!(
event.step_id.to_string(),
"fedcba98-7654-3210-fedc-ba9876543210"
);
assert_eq!(
event.time.unwrap().to_rfc3339(),
"2023-06-27T12:34:56+00:00"
);
assert_eq!(event.data.output, json!({"result": "success"}));
assert_eq!(event.data.info.unwrap(), json!({"duration": 1000}));
}
#[test]
fn test_workflow_event_step_error_deserialization() {
let json_data = json!({
"type": "step:error",
"data": {
"error": "Step execution failed"
},
"run_id": "12345678-90ab-cdef-1234-567890abcdef",
"step_id": "abcdef01-2345-6789-abcd-ef0123456789",
"time": "2023-06-27T17:00:00Z"
});
let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
let WorkflowEvent::StepError(event) = event else {
panic!("Expected StepEnd event");
};
assert_eq!(
event.run_id.to_string(),
"12345678-90ab-cdef-1234-567890abcdef"
);
assert_eq!(
event.step_id.to_string(),
"abcdef01-2345-6789-abcd-ef0123456789"
);
assert_eq!(
event.time.unwrap().to_rfc3339(),
"2023-06-27T17:00:00+00:00"
);
assert_eq!(event.data.error, "Step execution failed");
}
#[test]
fn test_workflow_event_run_start_deserialization() {
let json_data = json!({
"type": "run:start",
"id": "01234567-89ab-cdef-0123-456789abcdef",
"name": "Test Run",
"description": "A test run",
"application": "TestApp",
"environment": "staging",
"input": {"param": "value"},
"trace_id": "trace-123",
"span_id": "span-456",
"tags": ["test", "run"],
"info": {"extra": "info"},
"time": "2023-06-28T10:00:00Z"
});
let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
let WorkflowEvent::RunStart(event) = event else {
panic!("Expected RunStart event");
};
assert_eq!(event.id.to_string(), "01234567-89ab-cdef-0123-456789abcdef");
assert_eq!(event.name, "Test Run");
assert_eq!(event.description, Some("A test run".to_string()));
assert_eq!(event.application, Some("TestApp".to_string()));
assert_eq!(event.environment, Some("staging".to_string()));
assert_eq!(event.input, Some(json!({"param": "value"})));
assert_eq!(event.trace_id, Some("trace-123".to_string()));
assert_eq!(event.span_id, Some("span-456".to_string()));
assert_eq!(event.tags, vec!["test", "run"]);
assert_eq!(event.info, Some(json!({"extra": "info"})));
assert_eq!(
event.time.unwrap().to_rfc3339(),
"2023-06-28T10:00:00+00:00"
);
}
#[test]
fn test_workflow_event_run_update_deserialization() {
let json_data = json!({
"type": "run:update",
"id": "fedcba98-7654-3210-fedc-ba9876543210",
"status": "completed",
"output": {"result": "success"},
"info": {"duration": 2000},
"time": "2023-06-28T11:00:00Z"
});
let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
let WorkflowEvent::RunUpdate(event) = event else {
panic!("Expected RunUpdate event");
};
assert_eq!(event.id.to_string(), "fedcba98-7654-3210-fedc-ba9876543210");
assert_eq!(event.status, Some("completed".to_string()));
assert_eq!(event.output, Some(json!({"result": "success"})));
assert_eq!(event.info, Some(json!({"duration": 2000})));
assert_eq!(
event.time.unwrap().to_rfc3339(),
"2023-06-28T11:00:00+00:00"
);
}
#[test]
fn test_workflow_event_step_state_deserialization() {
let json_data = json!({
"type": "step:state",
"data": {
"state": "running"
},
"run_id": "12345678-90ab-cdef-1234-567890abcdef",
"step_id": "abcdef01-2345-6789-abcd-ef0123456789",
"time": "2023-06-28T12:00:00Z"
});
let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
let WorkflowEvent::StepState(event) = event else {
panic!("Expected StepState event");
};
assert_eq!(
event.run_id.to_string(),
"12345678-90ab-cdef-1234-567890abcdef"
);
assert_eq!(
event.step_id.to_string(),
"abcdef01-2345-6789-abcd-ef0123456789"
);
assert_eq!(
event.time.unwrap().to_rfc3339(),
"2023-06-28T12:00:00+00:00"
);
assert_eq!(event.data.state, "running");
}
#[test]
fn test_workflow_event_untagged_deserialization() {
let json_data = json!({
"type": "custom_event",
"data": {
"custom_field": "custom_value"
},
"run_id": "12345678-90ab-cdef-1234-567890abcdef",
"step_id": "abcdef01-2345-6789-abcd-ef0123456789",
"time": "2023-06-28T12:00:00Z"
});
let event: WorkflowEvent = serde_json::from_value(json_data).unwrap();
let WorkflowEvent::Event(event) = event else {
panic!("Expected untagged Event");
};
assert_eq!(event.typ, "custom_event");
assert_eq!(event.data, Some(json!({"custom_field": "custom_value"})));
assert_eq!(
event.run_id.to_string(),
"12345678-90ab-cdef-1234-567890abcdef"
);
assert_eq!(
event.step_id.to_string(),
"abcdef01-2345-6789-abcd-ef0123456789"
);
assert_eq!(
event.time.unwrap().to_rfc3339(),
"2023-06-28T12:00:00+00:00"
);
}
}