use chrono::{DateTime, Utc};
use distri_types::TaskStatus;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowDefinition {
pub id: String,
pub steps: Vec<WorkflowStep>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub input_schema: Option<serde_json::Value>,
#[serde(default)]
pub checkpoint: CheckpointStrategy,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub entry_points: Vec<EntryPoint>,
}
pub const BUILTIN_CHANNEL_COMMANDS: &[&str] = &[
"/start",
"/stop",
"/disconnect",
"/reset",
"/new",
"/newsession",
"/newthread",
"/status",
"/debug",
"/verbose",
"/help",
"/switch",
"/workspace",
"/context",
"/ctx",
];
impl WorkflowDefinition {
pub fn new(steps: Vec<WorkflowStep>) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
steps,
input_schema: None,
checkpoint: CheckpointStrategy::default(),
entry_points: vec![],
}
}
pub fn with_id(mut self, id: &str) -> Self {
self.id = id.to_string();
self
}
pub fn with_checkpoint(mut self, strategy: CheckpointStrategy) -> Self {
self.checkpoint = strategy;
self
}
pub fn with_entry_points(mut self, entry_points: Vec<EntryPoint>) -> Self {
self.entry_points = entry_points;
self
}
pub fn entry_point(&self, id: &str) -> Option<&EntryPoint> {
self.entry_points.iter().find(|ep| ep.id == id)
}
pub fn reachable_from(&self, start_step_id: &str) -> std::collections::HashSet<String> {
use std::collections::{HashSet, VecDeque};
let mut reachable = HashSet::new();
let mut queue = VecDeque::new();
queue.push_back(start_step_id.to_string());
while let Some(current) = queue.pop_front() {
if !reachable.insert(current.clone()) {
continue;
}
for step in &self.steps {
if step.depends_on.contains(¤t) && !reachable.contains(&step.id) {
queue.push_back(step.id.clone());
}
}
}
reachable
}
pub fn validate_channel_surface(&self) -> Result<(), String> {
use distri_types::channel_commands::ChannelTrigger;
use std::collections::HashSet;
let step_ids: HashSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
let mut slash_names: HashSet<String> = HashSet::new();
let mut callback_ids: HashSet<String> = HashSet::new();
let mut message_count = 0usize;
for ep in &self.entry_points {
if !step_ids.contains(ep.starts_at.as_str()) {
return Err(format!(
"entry point '{}' starts_at unknown step '{}'",
ep.id, ep.starts_at
));
}
let Some(trigger) = &ep.trigger else { continue };
match trigger {
ChannelTrigger::Slash { name, aliases, .. } => {
for n in std::iter::once(name).chain(aliases.iter()) {
let lower = n.to_lowercase();
if BUILTIN_CHANNEL_COMMANDS.contains(&lower.as_str()) {
return Err(format!("slash command '{n}' shadows a built-in command"));
}
if !slash_names.insert(lower.clone()) {
return Err(format!(
"entry point '{}': slash command '{}' is already declared",
ep.id, n
));
}
}
}
ChannelTrigger::Callback { id, .. } => {
if !callback_ids.insert(id.clone()) {
return Err(format!(
"entry point '{}': callback id '{}' is already declared",
ep.id, id
));
}
}
ChannelTrigger::Message {} => message_count += 1,
}
}
if message_count > 1 {
return Err(format!(
"workflow declares {message_count} message catch-all entry \
points; at most one is allowed"
));
}
for step in &self.steps {
if let StepKind::Reply {
buttons_from,
button_template,
..
} = &step.kind
{
if button_template.is_some() != buttons_from.is_some() {
return Err(format!(
"reply step '{}': button_template and buttons_from \
must be set together",
step.id
));
}
}
}
Ok(())
}
pub fn detect_cycles(&self) -> Result<(), String> {
use std::collections::{HashMap, HashSet};
let step_ids: HashSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
for step in &self.steps {
adj.insert(
step.id.as_str(),
step.depends_on.iter().map(|s| s.as_str()).collect(),
);
}
let mut visited = HashSet::new();
let mut in_stack = HashSet::new();
fn dfs<'a>(
node: &'a str,
adj: &HashMap<&'a str, Vec<&'a str>>,
visited: &mut HashSet<&'a str>,
in_stack: &mut HashSet<&'a str>,
path: &mut Vec<&'a str>,
) -> Result<(), String> {
visited.insert(node);
in_stack.insert(node);
path.push(node);
if let Some(deps) = adj.get(node) {
for &dep in deps {
if !visited.contains(dep) {
dfs(dep, adj, visited, in_stack, path)?;
} else if in_stack.contains(dep) {
let cycle_start = path.iter().position(|&n| n == dep).unwrap();
let cycle: Vec<&str> = path[cycle_start..].to_vec();
return Err(format!(
"Circular dependency detected: {} → {}",
cycle.join(" → "),
dep
));
}
}
}
in_stack.remove(node);
path.pop();
Ok(())
}
let mut path = Vec::new();
for step in &self.steps {
if !visited.contains(step.id.as_str()) {
dfs(
step.id.as_str(),
&adj,
&mut visited,
&mut in_stack,
&mut path,
)?;
}
}
for step in &self.steps {
for dep in &step.depends_on {
if !step_ids.contains(dep.as_str()) {
return Err(format!(
"Step '{}' depends on '{}' which does not exist",
step.id, dep
));
}
}
}
Ok(())
}
}
fn default_empty_object() -> serde_json::Value {
serde_json::json!({})
}
fn default_now() -> DateTime<Utc> {
Utc::now()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowRun {
#[serde(flatten)]
pub definition: WorkflowDefinition,
#[serde(default)]
pub status: WorkflowStatus,
#[serde(default)]
pub current_step: usize,
#[serde(default = "default_empty_object")]
pub context: serde_json::Value,
#[serde(default)]
pub notes: Vec<WorkflowNote>,
#[serde(default)]
pub step_runs: Vec<WorkflowStepRun>,
#[serde(default = "default_now")]
pub created_at: DateTime<Utc>,
#[serde(default = "default_now")]
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WorkflowStepRun {
pub step_id: String,
#[serde(default)]
pub status: StepStatus,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub result: Option<serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub started_at: Option<DateTime<Utc>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub completed_at: Option<DateTime<Utc>>,
}
impl WorkflowRun {
pub fn new(definition: WorkflowDefinition) -> Self {
let step_runs = definition
.steps
.iter()
.map(|s| WorkflowStepRun {
step_id: s.id.clone(),
..Default::default()
})
.collect();
Self {
definition,
status: WorkflowStatus::Pending,
current_step: 0,
context: serde_json::json!({}),
notes: vec![],
step_runs,
created_at: Utc::now(),
updated_at: Utc::now(),
}
}
pub fn from_steps(steps: Vec<WorkflowStep>) -> Self {
Self::new(WorkflowDefinition::new(steps))
}
pub fn with_context(mut self, context: serde_json::Value) -> Self {
self.context = context;
self
}
pub fn with_id(mut self, id: &str) -> Self {
self.definition.id = id.to_string();
self
}
pub fn with_checkpoint(mut self, strategy: CheckpointStrategy) -> Self {
self.definition.checkpoint = strategy;
self
}
pub fn with_entry_points(mut self, entry_points: Vec<EntryPoint>) -> Self {
self.definition.entry_points = entry_points;
self
}
pub fn id(&self) -> &str {
&self.definition.id
}
pub fn steps(&self) -> &[WorkflowStep] {
&self.definition.steps
}
pub fn step(&self, idx: usize) -> &WorkflowStep {
&self.definition.steps[idx]
}
pub fn step_run(&self, idx: usize) -> &WorkflowStepRun {
&self.step_runs[idx]
}
pub fn step_run_mut(&mut self, idx: usize) -> &mut WorkflowStepRun {
&mut self.step_runs[idx]
}
pub fn step_run_by_id(&self, step_id: &str) -> Option<&WorkflowStepRun> {
self.step_runs.iter().find(|s| s.step_id == step_id)
}
pub fn step_run_by_id_mut(&mut self, step_id: &str) -> Option<&mut WorkflowStepRun> {
self.step_runs.iter_mut().find(|s| s.step_id == step_id)
}
pub fn apply_entry_point(mut self, entry_point_id: &str) -> Result<Self, String> {
let ep = self
.definition
.entry_points
.iter()
.find(|ep| ep.id == entry_point_id)
.ok_or_else(|| format!("Entry point '{}' not found", entry_point_id))?
.clone();
if !self.definition.steps.iter().any(|s| s.id == ep.starts_at) {
return Err(format!(
"Entry point '{}' references step '{}' which does not exist",
entry_point_id, ep.starts_at
));
}
let reachable = self.definition.reachable_from(&ep.starts_at);
for (i, step) in self.definition.steps.iter().enumerate() {
if !reachable.contains(&step.id) {
self.step_runs[i].status = StepStatus::Skipped;
if let Some(result) = ep.preset_results.get(&step.id) {
self.step_runs[i].result = Some(result.clone());
}
}
}
if let Some(ctx) = self.context.as_object_mut() {
let steps = ctx
.entry("steps")
.or_insert(serde_json::json!({}))
.as_object_mut()
.expect("steps must be an object");
for (step_id, result) in &ep.preset_results {
steps.insert(step_id.clone(), result.clone());
}
}
Ok(self)
}
pub fn with_input(mut self, input: serde_json::Value) -> Result<Self, String> {
if let Some(ref schema_value) = self.definition.input_schema {
let validator = jsonschema::validator_for(schema_value)
.map_err(|e| format!("Invalid input_schema: {e}"))?;
if !validator.is_valid(&input) {
let errors: Vec<String> = validator
.iter_errors(&input)
.map(|e| format!("{}", e))
.collect();
return Err(format!("Input validation failed: {}", errors.join("; ")));
}
}
if let (Some(ctx), Some(inp)) = (self.context.as_object_mut(), input.as_object()) {
for (k, v) in inp {
ctx.insert(k.clone(), v.clone());
}
ctx.insert("input".to_string(), input.clone());
}
self.status = WorkflowStatus::Running;
self.updated_at = Utc::now();
Ok(self)
}
pub fn next_pending_step(&self) -> Option<(usize, &WorkflowStep)> {
self.step_runs
.iter()
.enumerate()
.find(|(_, s)| s.status == StepStatus::Pending)
.map(|(i, _)| (i, &self.definition.steps[i]))
}
pub fn runnable_steps(&self) -> Vec<(usize, &WorkflowStep)> {
let mut runnable = vec![];
for (i, step) in self.definition.steps.iter().enumerate() {
if self.step_runs[i].status != StepStatus::Pending {
continue;
}
let deps_met = step.depends_on.iter().all(|dep_id| {
self.definition
.steps
.iter()
.zip(self.step_runs.iter())
.any(|(s, sr)| {
&s.id == dep_id
&& matches!(sr.status, StepStatus::Done | StepStatus::Skipped)
})
});
if deps_met {
runnable.push((i, step));
}
}
runnable
}
pub fn is_complete(&self) -> bool {
self.step_runs.iter().all(|s| {
matches!(
s.status,
StepStatus::Done | StepStatus::Skipped | StepStatus::Blocked
)
})
}
pub fn is_waiting_for_input(&self) -> bool {
self.step_runs
.iter()
.any(|s| s.status == StepStatus::WaitingForInput)
}
pub fn waiting_step(&self) -> Option<(usize, &WorkflowStep)> {
self.step_runs
.iter()
.enumerate()
.find(|(_, s)| s.status == StepStatus::WaitingForInput)
.map(|(i, _)| (i, &self.definition.steps[i]))
}
pub fn resume_step(
&mut self,
step_id: &str,
result: serde_json::Value,
) -> Result<usize, String> {
let idx = self
.step_runs
.iter()
.position(|s| s.step_id == step_id && s.status == StepStatus::WaitingForInput)
.ok_or_else(|| {
format!(
"Step '{}' not found or not in waiting_for_input state",
step_id
)
})?;
self.step_runs[idx].status = StepStatus::Done;
self.step_runs[idx].result = Some(result.clone());
self.step_runs[idx].completed_at = Some(Utc::now());
if let Some(ctx) = self.context.as_object_mut() {
let steps = ctx
.entry("steps")
.or_insert(serde_json::json!({}))
.as_object_mut()
.expect("steps must be an object");
steps.insert(step_id.to_string(), result);
}
self.status = WorkflowStatus::Running;
self.updated_at = Utc::now();
Ok(idx)
}
pub fn is_stuck(&self) -> bool {
let has_blocked = self
.step_runs
.iter()
.any(|s| s.status == StepStatus::Blocked);
let has_pending = self
.step_runs
.iter()
.any(|s| s.status == StepStatus::Pending);
let has_running = self
.step_runs
.iter()
.any(|s| s.status == StepStatus::Running);
if !has_blocked || has_running {
return false;
}
if !has_pending {
return true;
}
!self
.definition
.steps
.iter()
.zip(self.step_runs.iter())
.any(|(step, run)| {
run.status == StepStatus::Pending
&& step.depends_on.iter().all(|dep_id| {
self.definition
.steps
.iter()
.zip(self.step_runs.iter())
.any(|(s, sr)| {
&s.id == dep_id
&& matches!(
sr.status,
StepStatus::Done
| StepStatus::Pending
| StepStatus::Running
)
})
})
})
}
pub fn has_failed(&self) -> bool {
self.step_runs
.iter()
.any(|s| s.status == StepStatus::Failed)
}
pub fn add_note(&mut self, step_id: &str, message: &str) {
self.notes.push(WorkflowNote {
step_id: step_id.to_string(),
message: message.to_string(),
at: Utc::now(),
});
self.updated_at = Utc::now();
}
pub fn detect_cycles(&self) -> Result<(), String> {
self.definition.detect_cycles()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EntryPoint {
pub id: String,
pub label: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
pub starts_at: String,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub preset_results: HashMap<String, serde_json::Value>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub required_inputs: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub trigger: Option<distri_types::channel_commands::ChannelTrigger>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowStep {
pub id: String,
pub label: String,
pub kind: StepKind,
#[serde(default)]
pub depends_on: Vec<String>,
#[serde(default)]
pub execution: StepExecution,
#[serde(default)]
pub requires: Vec<StepRequirement>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub input: Option<serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub skip_if: Option<String>,
}
impl WorkflowStep {
fn new_step(id: &str, label: &str, kind: StepKind) -> Self {
Self {
id: id.to_string(),
label: label.to_string(),
kind,
depends_on: vec![],
execution: StepExecution::Sequential,
requires: vec![],
input: None,
skip_if: None,
}
}
pub fn api_call(id: &str, label: &str, method: &str, url: &str) -> Self {
Self::new_step(
id,
label,
StepKind::ApiCall {
method: method.to_string(),
url: url.to_string(),
body: None,
headers: None,
},
)
}
pub fn agent_run(id: &str, label: &str, agent_id: &str, prompt: &str) -> Self {
Self::new_step(
id,
label,
StepKind::AgentRun {
agent_id: agent_id.to_string(),
prompt: prompt.to_string(),
tools: vec![],
skills: vec![],
model: None,
max_iterations: None,
},
)
}
pub fn script(id: &str, label: &str, command: &str) -> Self {
Self::new_step(
id,
label,
StepKind::Script {
command: command.to_string(),
args: vec![],
cwd: None,
env: None,
timeout_secs: None,
output_format: None,
shell: None,
},
)
}
pub fn tool_call(id: &str, label: &str, tool_name: &str, input: serde_json::Value) -> Self {
Self::new_step(
id,
label,
StepKind::ToolCall {
tool_name: tool_name.to_string(),
input,
agent_id: None,
},
)
}
pub fn condition(
id: &str,
label: &str,
expression: &str,
if_true: StepKind,
if_false: Option<StepKind>,
) -> Self {
Self::new_step(
id,
label,
StepKind::Condition {
expression: expression.to_string(),
if_true: Box::new(if_true),
if_false: if_false.map(Box::new),
},
)
}
pub fn checkpoint(id: &str, label: &str, message: &str) -> Self {
Self::new_step(
id,
label,
StepKind::Checkpoint {
message: message.to_string(),
},
)
}
pub fn wait_for_input(id: &str, label: &str, message: &str) -> Self {
Self::new_step(
id,
label,
StepKind::WaitForInput {
message: message.to_string(),
schema: None,
},
)
}
pub fn with_body(mut self, body: serde_json::Value) -> Self {
if let StepKind::ApiCall {
body: ref mut b, ..
} = self.kind
{
*b = Some(body);
}
self
}
pub fn with_depends_on(mut self, deps: Vec<&str>) -> Self {
self.depends_on = deps.into_iter().map(|s| s.to_string()).collect();
self
}
pub fn parallel(mut self) -> Self {
self.execution = StepExecution::Parallel;
self
}
pub fn with_requires(mut self, requires: Vec<StepRequirement>) -> Self {
self.requires = requires;
self
}
pub fn with_cwd(mut self, cwd: &str) -> Self {
if let StepKind::Script { cwd: ref mut c, .. } = self.kind {
*c = Some(cwd.to_string());
}
self
}
pub fn with_timeout(mut self, secs: u64) -> Self {
if let StepKind::Script {
timeout_secs: ref mut t,
..
} = self.kind
{
*t = Some(secs);
}
self
}
pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
if let StepKind::Script { env: ref mut e, .. } = self.kind {
*e = Some(env);
}
self
}
pub fn with_input_mapping(mut self, input: serde_json::Value) -> Self {
self.input = Some(input);
self
}
pub fn with_skip_if(mut self, expression: &str) -> Self {
self.skip_if = Some(expression.to_string());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StepKind {
ApiCall {
method: String,
url: String,
#[serde(skip_serializing_if = "Option::is_none")]
body: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
headers: Option<HashMap<String, String>>,
},
Script {
command: String,
#[serde(default)]
args: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
cwd: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
env: Option<HashMap<String, String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
timeout_secs: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
output_format: Option<ScriptOutputFormat>,
#[serde(default, skip_serializing_if = "Option::is_none")]
shell: Option<ShellType>,
},
AgentRun {
agent_id: String,
prompt: String,
#[serde(default)]
tools: Vec<String>,
#[serde(default)]
skills: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
model: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
max_iterations: Option<u32>,
},
ToolCall {
tool_name: String,
input: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
agent_id: Option<String>,
},
Condition {
expression: String,
if_true: Box<StepKind>,
#[serde(skip_serializing_if = "Option::is_none")]
if_false: Option<Box<StepKind>>,
},
Checkpoint { message: String },
WaitForInput {
message: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
schema: Option<serde_json::Value>,
},
Reply {
text: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
buttons: Vec<Vec<distri_types::channel_commands::ReplyButtonSpec>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
buttons_from: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
button_template: Option<distri_types::channel_commands::ReplyButtonSpec>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct StepRequirement {
pub skill: String,
#[serde(default)]
pub permissions: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config: Option<serde_json::Value>,
}
impl StepRequirement {
pub fn native(skill: &str) -> Self {
Self {
skill: format!("native:{}", skill),
permissions: vec![],
config: None,
}
}
pub fn connection(provider: &str, service: &str) -> Self {
Self {
skill: format!("{}:{}", provider, service),
permissions: vec![],
config: None,
}
}
pub fn with_permissions(mut self, perms: Vec<&str>) -> Self {
self.permissions = perms.into_iter().map(|s| s.to_string()).collect();
self
}
pub fn namespace(&self) -> Option<&str> {
self.skill.split(':').next()
}
pub fn skill_name(&self) -> Option<&str> {
self.skill.split(':').nth(1)
}
pub fn is_native(&self) -> bool {
self.skill.starts_with("native:")
}
pub fn validate(&self) -> Result<(), String> {
if !self.skill.contains(':') {
return Err(format!(
"Invalid skill identifier '{}': must be namespaced (e.g., 'native:shell', 'google:drive')",
self.skill
));
}
if self.is_native() {
let known = ["shell", "browser", "network", "agent", "tool"];
if let Some(name) = self.skill_name() {
if !known.contains(&name) {
return Err(format!(
"Unknown native skill '{}'. Known: {:?}",
name, known
));
}
}
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum CheckpointStrategy {
Internal {
#[serde(default, skip_serializing_if = "Option::is_none")]
ttl_secs: Option<u64>,
},
External { tool_name: String },
}
impl Default for CheckpointStrategy {
fn default() -> Self {
CheckpointStrategy::Internal { ttl_secs: None }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckpointMeta {
pub checkpoint_id: String,
pub workflow_id: String,
pub step_id: String,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum WorkflowStatus {
#[default]
Pending,
Running,
Paused,
Completed,
Failed,
Blocked,
}
impl From<WorkflowStatus> for TaskStatus {
fn from(s: WorkflowStatus) -> Self {
match s {
WorkflowStatus::Pending => TaskStatus::Pending,
WorkflowStatus::Running => TaskStatus::Running,
WorkflowStatus::Paused => TaskStatus::InputRequired,
WorkflowStatus::Completed => TaskStatus::Completed,
WorkflowStatus::Failed => TaskStatus::Failed,
WorkflowStatus::Blocked => TaskStatus::Failed,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum StepStatus {
#[default]
Pending,
Blocked,
Running,
Done,
Failed,
Skipped,
WaitingForInput,
}
impl From<StepStatus> for TaskStatus {
fn from(s: StepStatus) -> Self {
match s {
StepStatus::Pending => TaskStatus::Pending,
StepStatus::Blocked => TaskStatus::Failed,
StepStatus::Running => TaskStatus::Running,
StepStatus::Done => TaskStatus::Completed,
StepStatus::Failed => TaskStatus::Failed,
StepStatus::Skipped => TaskStatus::Canceled,
StepStatus::WaitingForInput => TaskStatus::InputRequired,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum StepExecution {
#[default]
Sequential,
Parallel,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ScriptOutputFormat {
Text,
Json,
Stream,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ShellType {
Bash,
Sh,
Zsh,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepResult {
pub status: StepStatus,
pub result: Option<serde_json::Value>,
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub context_updates: Option<serde_json::Value>,
}
impl StepResult {
pub fn done(result: serde_json::Value) -> Self {
Self {
status: StepStatus::Done,
result: Some(result),
error: None,
context_updates: None,
}
}
pub fn done_with_context(result: serde_json::Value, updates: serde_json::Value) -> Self {
Self {
status: StepStatus::Done,
result: Some(result),
error: None,
context_updates: Some(updates),
}
}
pub fn failed(error: &str) -> Self {
Self {
status: StepStatus::Failed,
result: None,
error: Some(error.to_string()),
context_updates: None,
}
}
pub fn skipped() -> Self {
Self {
status: StepStatus::Skipped,
result: None,
error: None,
context_updates: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowNote {
pub step_id: String,
pub message: String,
pub at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowStepSummary {
pub id: String,
pub label: String,
pub status: TaskStatus,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub result: Option<serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowRunSummary {
pub workflow_id: String,
pub status: TaskStatus,
pub steps: Vec<WorkflowStepSummary>,
}
impl WorkflowRunSummary {
pub fn from_run(run: &WorkflowRun, status: WorkflowStatus) -> Self {
let steps = run
.steps()
.iter()
.zip(run.step_runs.iter())
.map(|(step, sr)| WorkflowStepSummary {
id: step.id.clone(),
label: step.label.clone(),
status: sr.status.into(),
result: sr.result.clone(),
error: sr.error.clone(),
})
.collect();
Self {
workflow_id: run.id().to_string(),
status: status.into(),
steps,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "event", rename_all = "snake_case")]
pub enum WorkflowEvent {
WorkflowStarted {
workflow_id: String,
total_steps: usize,
},
StepStarted {
workflow_id: String,
step_id: String,
step_label: String,
},
StepCompleted {
workflow_id: String,
step_id: String,
step_label: String,
result: Option<serde_json::Value>,
},
StepFailed {
workflow_id: String,
step_id: String,
step_label: String,
error: String,
},
StepWaiting {
workflow_id: String,
step_id: String,
step_label: String,
message: String,
schema: Option<serde_json::Value>,
},
WorkflowCompleted {
workflow_id: String,
status: WorkflowStatus,
steps_done: usize,
steps_failed: usize,
},
}