use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[derive(Default)]
pub enum WorkflowStatus {
#[default]
Pending,
Running,
Paused,
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[derive(Default)]
pub enum NodeStatus {
#[default]
Pending,
Running,
Completed,
Failed,
Skipped,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeExecution {
pub node_id: String,
pub status: NodeStatus,
pub started_at: Option<DateTime<Utc>>,
pub finished_at: Option<DateTime<Utc>>,
pub retry_count: u32,
pub error: Option<String>,
pub output: Option<serde_json::Value>,
}
impl NodeExecution {
pub fn new(node_id: String) -> Self {
Self {
node_id,
status: NodeStatus::Pending,
started_at: None,
finished_at: None,
retry_count: 0,
error: None,
output: None,
}
}
pub fn start(&mut self) {
self.status = NodeStatus::Running;
self.started_at = Some(Utc::now());
}
pub fn complete(&mut self, output: Option<serde_json::Value>) {
self.status = NodeStatus::Completed;
self.finished_at = Some(Utc::now());
self.output = output;
}
pub fn fail(&mut self, error: String) {
self.status = NodeStatus::Failed;
self.finished_at = Some(Utc::now());
self.error = Some(error);
}
pub fn skip(&mut self) {
self.status = NodeStatus::Skipped;
self.finished_at = Some(Utc::now());
}
pub fn increment_retry(&mut self) {
self.retry_count += 1;
}
pub fn duration_ms(&self) -> Option<i64> {
match (self.started_at, self.finished_at) {
(Some(start), Some(end)) => Some((end - start).num_milliseconds()),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowContext {
pub instance_id: String,
pub workflow_id: String,
pub status: WorkflowStatus,
pub current_node_id: Option<String>,
pub inputs: HashMap<String, serde_json::Value>,
pub variables: HashMap<String, serde_json::Value>,
pub node_executions: HashMap<String, NodeExecution>,
pub execution_path: Vec<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub started_at: Option<DateTime<Utc>>,
pub finished_at: Option<DateTime<Utc>>,
pub error: Option<String>,
}
impl WorkflowContext {
pub fn new(workflow_id: String, inputs: HashMap<String, serde_json::Value>) -> Self {
let now = Utc::now();
Self {
instance_id: format!("inst_{}", uuid::Uuid::new_v4()),
workflow_id,
status: WorkflowStatus::Pending,
current_node_id: None,
inputs,
variables: HashMap::new(),
node_executions: HashMap::new(),
execution_path: Vec::new(),
created_at: now,
updated_at: now,
started_at: None,
finished_at: None,
error: None,
}
}
pub fn set_variable(&mut self, key: String, value: serde_json::Value) {
self.variables.insert(key, value);
self.updated_at = Utc::now();
}
pub fn get_variable(&self, key: &str) -> Option<&serde_json::Value> {
self.variables.get(key)
}
pub fn set_input(&mut self, key: String, value: serde_json::Value) {
self.inputs.insert(key, value);
self.updated_at = Utc::now();
}
pub fn get_input(&self, key: &str) -> Option<&serde_json::Value> {
self.inputs.get(key)
}
pub fn start(&mut self) {
self.status = WorkflowStatus::Running;
self.started_at = Some(Utc::now());
self.updated_at = Utc::now();
}
pub fn complete(&mut self) {
self.status = WorkflowStatus::Completed;
self.finished_at = Some(Utc::now());
self.updated_at = Utc::now();
}
pub fn fail(&mut self, error: String) {
self.status = WorkflowStatus::Failed;
self.error = Some(error);
self.finished_at = Some(Utc::now());
self.updated_at = Utc::now();
}
pub fn cancel(&mut self) {
self.status = WorkflowStatus::Cancelled;
self.finished_at = Some(Utc::now());
self.updated_at = Utc::now();
}
pub fn pause(&mut self) {
self.status = WorkflowStatus::Paused;
self.updated_at = Utc::now();
}
pub fn resume(&mut self) {
self.status = WorkflowStatus::Running;
self.updated_at = Utc::now();
}
pub fn set_current_node(&mut self, node_id: String) {
self.current_node_id = Some(node_id.clone());
self.execution_path.push(node_id);
self.updated_at = Utc::now();
}
pub fn get_or_create_node_execution(&mut self, node_id: &str) -> &mut NodeExecution {
self.node_executions
.entry(node_id.to_string())
.or_insert_with(|| NodeExecution::new(node_id.to_string()))
}
pub fn get_node_execution(&self, node_id: &str) -> Option<&NodeExecution> {
self.node_executions.get(node_id)
}
pub fn total_duration_ms(&self) -> Option<i64> {
match (self.started_at, self.finished_at) {
(Some(start), Some(end)) => Some((end - start).num_milliseconds()),
_ => None,
}
}
pub fn can_continue(&self) -> bool {
matches!(
self.status,
WorkflowStatus::Pending | WorkflowStatus::Running
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workflow_context_new() {
let ctx = WorkflowContext::new("test-workflow".to_string(), HashMap::new());
assert_eq!(ctx.status, WorkflowStatus::Pending);
assert!(ctx.current_node_id.is_none());
}
#[test]
fn test_workflow_context_lifecycle() {
let mut ctx = WorkflowContext::new("test-workflow".to_string(), HashMap::new());
ctx.start();
assert_eq!(ctx.status, WorkflowStatus::Running);
ctx.set_current_node("node1".to_string());
assert_eq!(ctx.current_node_id, Some("node1".to_string()));
ctx.complete();
assert_eq!(ctx.status, WorkflowStatus::Completed);
assert!(ctx.finished_at.is_some());
}
#[test]
fn test_node_execution() {
let mut exec = NodeExecution::new("node1".to_string());
assert_eq!(exec.status, NodeStatus::Pending);
exec.start();
assert_eq!(exec.status, NodeStatus::Running);
exec.complete(Some(serde_json::json!({"result": "ok"})));
assert_eq!(exec.status, NodeStatus::Completed);
assert!(exec.output.is_some());
}
#[test]
fn test_pause_resume() {
let mut ctx = WorkflowContext::new("test-workflow".to_string(), HashMap::new());
ctx.start();
assert_eq!(ctx.status, WorkflowStatus::Running);
assert!(ctx.can_continue());
ctx.pause();
assert_eq!(ctx.status, WorkflowStatus::Paused);
assert!(!ctx.can_continue());
ctx.resume();
assert_eq!(ctx.status, WorkflowStatus::Running);
assert!(ctx.can_continue());
}
}