use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use tracing::{debug, info, warn};
use crate::db::models::Event;
use crate::error::{AppError, AppResult};
use crate::playbook::types::{Playbook, Step};
use super::commands::{Command, CommandBuilder};
use super::evaluator::ConditionEvaluator;
use super::state::{ExecutionState, WorkflowState};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrchestrationResult {
pub state: ExecutionState,
pub commands: Vec<Command>,
pub should_complete: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub completion_status: Option<CompletionStatus>,
pub events_to_emit: Vec<EventToEmit>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompletionStatus {
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub failed_steps: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventToEmit {
pub event_type: String,
pub node_name: Option<String>,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub context: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
pub struct WorkflowOrchestrator {
evaluator: ConditionEvaluator,
command_builder: CommandBuilder,
}
impl Default for WorkflowOrchestrator {
fn default() -> Self {
Self::new()
}
}
impl WorkflowOrchestrator {
pub fn new() -> Self {
Self {
evaluator: ConditionEvaluator::new(),
command_builder: CommandBuilder::new(),
}
}
pub fn evaluate(
&self,
events: &[Event],
playbook: &Playbook,
trigger_event_type: Option<&str>,
) -> AppResult<OrchestrationResult> {
let state = WorkflowState::from_events(events)
.ok_or_else(|| AppError::Validation("No events found for execution".to_string()))?;
debug!(
"Evaluating execution {}, state: {}, trigger: {:?}",
state.execution_id, state.state, trigger_event_type
);
if matches!(
state.state,
ExecutionState::Completed | ExecutionState::Failed | ExecutionState::Cancelled
) {
return Ok(OrchestrationResult {
state: state.state,
commands: vec![],
should_complete: false,
completion_status: None,
events_to_emit: vec![],
});
}
if let Some(event_type) = trigger_event_type {
if matches!(event_type, "step_started" | "step_running") {
debug!("Skipping orchestration for progress marker event");
return Ok(OrchestrationResult {
state: state.state,
commands: vec![],
should_complete: false,
completion_status: None,
events_to_emit: vec![],
});
}
}
let context = value_to_hashmap(&state.build_context());
let steps: HashMap<&str, &Step> = playbook
.workflow
.iter()
.map(|s| (s.step.as_str(), s))
.collect();
match state.state {
ExecutionState::Initial => {
self.dispatch_initial_steps(&state, playbook, &context)
}
ExecutionState::InProgress => {
if state.steps.is_empty() {
return self.dispatch_initial_steps(&state, playbook, &context);
}
self.process_in_progress(&state, &steps, &context, trigger_event_type)
}
_ => Ok(OrchestrationResult {
state: state.state,
commands: vec![],
should_complete: false,
completion_status: None,
events_to_emit: vec![],
}),
}
}
fn dispatch_initial_steps(
&self,
state: &WorkflowState,
playbook: &Playbook,
context: &HashMap<String, serde_json::Value>,
) -> AppResult<OrchestrationResult> {
let mut commands = Vec::new();
let mut events_to_emit = Vec::new();
let start_step = playbook
.get_step("start")
.ok_or_else(|| AppError::Validation("Start step 'start' not found".to_string()))?;
info!("Dispatching initial step: {}", start_step.step);
events_to_emit.push(EventToEmit {
event_type: "step.enter".to_string(),
node_name: Some(start_step.step.clone()),
status: "ENTERED".to_string(),
context: None,
result: None,
error: None,
});
let command = self.command_builder.build_command(
0, state.execution_id,
state.catalog_id,
0, start_step,
context,
None,
)?;
commands.push(command);
Ok(OrchestrationResult {
state: ExecutionState::InProgress,
commands,
should_complete: false,
completion_status: None,
events_to_emit,
})
}
fn process_in_progress(
&self,
state: &WorkflowState,
steps: &HashMap<&str, &Step>,
context: &HashMap<String, serde_json::Value>,
trigger_event_type: Option<&str>,
) -> AppResult<OrchestrationResult> {
let mut commands = Vec::new();
let mut events_to_emit = Vec::new();
if !matches!(
trigger_event_type,
Some("command.completed")
| Some("action_completed")
| Some("step.exit")
| Some("step_completed")
| Some("iterator_completed")
) {
return Ok(OrchestrationResult {
state: ExecutionState::InProgress,
commands,
should_complete: false,
completion_status: None,
events_to_emit,
});
}
for step_name in state.steps.keys() {
if !state.is_step_completed(step_name) {
continue;
}
let step = match steps.get(step_name.as_str()) {
Some(s) => *s,
None => continue,
};
let eval_results = self.evaluator.evaluate_next(step, context)?;
for result in eval_results {
if !result.matched {
continue;
}
if let Some(next_step_name) = &result.next_step {
if next_step_name == "end" {
info!("Reached 'end' step, workflow completing");
return Ok(OrchestrationResult {
state: ExecutionState::InProgress,
commands: vec![],
should_complete: true,
completion_status: Some(CompletionStatus {
status: "COMPLETED".to_string(),
error: None,
failed_steps: None,
}),
events_to_emit,
});
}
let next_step = match steps.get(next_step_name.as_str()) {
Some(s) => *s,
None => {
warn!("Next step '{}' not found in workflow", next_step_name);
continue;
}
};
if state.is_step_done(next_step_name) {
debug!("Step '{}' already done, skipping", next_step_name);
continue;
}
if state.running_steps().contains(&next_step_name.as_str()) {
debug!("Step '{}' already running, skipping", next_step_name);
continue;
}
let mut step_context = context.clone();
if let Some(serde_json::Value::Object(params)) = &result.with_params {
for (k, v) in params {
step_context.insert(k.clone(), v.clone());
}
}
info!("Transitioning to step: {}", next_step_name);
events_to_emit.push(EventToEmit {
event_type: "step.enter".to_string(),
node_name: Some(next_step_name.clone()),
status: "ENTERED".to_string(),
context: result.with_params.clone(),
result: None,
error: None,
});
let command = self.command_builder.build_command(
0,
state.execution_id,
state.catalog_id,
0,
next_step,
&step_context,
None,
)?;
commands.push(command);
}
}
}
let should_complete = self.check_completion(state, steps)?;
let completion_status = if should_complete {
let failed_steps: Vec<String> = state
.steps
.iter()
.filter(|(_, info)| info.error.is_some())
.map(|(name, _)| name.clone())
.collect();
if failed_steps.is_empty() {
Some(CompletionStatus {
status: "COMPLETED".to_string(),
error: None,
failed_steps: None,
})
} else {
Some(CompletionStatus {
status: "FAILED".to_string(),
error: Some(format!("Failed steps: {}", failed_steps.join(", "))),
failed_steps: Some(failed_steps),
})
}
} else {
None
};
Ok(OrchestrationResult {
state: ExecutionState::InProgress,
commands,
should_complete,
completion_status,
events_to_emit,
})
}
fn check_completion(
&self,
state: &WorkflowState,
steps: &HashMap<&str, &Step>,
) -> AppResult<bool> {
if state.has_running_steps() {
return Ok(false);
}
if state.is_step_completed("end") {
return Ok(true);
}
for (name, step) in steps {
if step.next.is_none() && state.is_step_completed(name) {
return Ok(true);
}
}
Ok(false)
}
pub fn handle_failure(
&self,
_state: &WorkflowState,
step_name: &str,
error: &str,
) -> AppResult<OrchestrationResult> {
info!("Handling failure for step '{}': {}", step_name, error);
let events_to_emit = vec![EventToEmit {
event_type: "step_failed".to_string(),
node_name: Some(step_name.to_string()),
status: "FAILED".to_string(),
context: None,
result: None,
error: Some(error.to_string()),
}];
Ok(OrchestrationResult {
state: ExecutionState::Failed,
commands: vec![],
should_complete: true,
completion_status: Some(CompletionStatus {
status: "FAILED".to_string(),
error: Some(error.to_string()),
failed_steps: Some(vec![step_name.to_string()]),
}),
events_to_emit,
})
}
}
fn value_to_hashmap(value: &serde_json::Value) -> HashMap<String, serde_json::Value> {
match value {
serde_json::Value::Object(map) => map.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
_ => HashMap::new(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::playbook::types::{Metadata, NextSpec, ToolDefinition, ToolKind, ToolSpec};
use chrono::Utc;
fn make_step(name: &str, next: Option<&str>) -> Step {
Step {
step: name.to_string(),
desc: None,
spec: None,
when: None,
args: None,
vars: None,
r#loop: None,
tool: ToolDefinition::Single(ToolSpec {
kind: ToolKind::Python,
eval: None,
auth: None,
libs: None,
args: None,
code: Some("return {}".to_string()),
url: None,
method: None,
query: None,
command: None,
connection: None,
params: None,
headers: None,
output_select: None,
extra: HashMap::new(),
}),
next: next.map(|n| NextSpec::Single(n.to_string())),
}
}
fn make_event(event_type: &str, node_name: Option<&str>) -> Event {
Event {
id: 1,
execution_id: 12345,
catalog_id: 67890,
event_id: 1,
parent_event_id: None,
parent_execution_id: None,
event_type: event_type.to_string(),
node_id: None,
node_name: node_name.map(|s| s.to_string()),
node_type: None,
status: "".to_string(),
context: None,
meta: None,
result: None,
worker_id: None,
attempt: None,
created_at: Utc::now(),
}
}
#[test]
fn test_evaluate_initial_state() {
let orchestrator = WorkflowOrchestrator::new();
let events = vec![{
let mut e = make_event("playbook_started", None);
e.context = Some(serde_json::json!({
"workload": {},
"path": "test",
"version": "1"
}));
e
}];
let playbook = Playbook {
api_version: "noetl.io/v2".to_string(),
kind: "Playbook".to_string(),
metadata: Metadata {
name: "test_playbook".to_string(),
path: Some("test/path".to_string()),
description: None,
labels: None,
extra: HashMap::new(),
},
workload: None,
vars: None,
keychain: None,
workbook: None,
workflow: vec![
make_step("start", Some("step2")),
make_step("step2", Some("end")),
make_step("end", None),
],
};
let result = orchestrator.evaluate(&events, &playbook, None).unwrap();
assert_eq!(result.state, ExecutionState::InProgress);
assert!(!result.commands.is_empty());
assert!(!result.events_to_emit.is_empty());
}
#[test]
fn test_handle_failure() {
let orchestrator = WorkflowOrchestrator::new();
let state = WorkflowState::new(12345, 67890);
let result = orchestrator
.handle_failure(&state, "failed_step", "Something went wrong")
.unwrap();
assert_eq!(result.state, ExecutionState::Failed);
assert!(result.should_complete);
assert!(result.completion_status.is_some());
let status = result.completion_status.unwrap();
assert_eq!(status.status, "FAILED");
assert!(status.error.is_some());
}
#[test]
fn test_orchestration_result_serialization() {
let result = OrchestrationResult {
state: ExecutionState::InProgress,
commands: vec![],
should_complete: false,
completion_status: None,
events_to_emit: vec![],
};
let json = serde_json::to_string(&result).unwrap();
assert!(json.contains("in_progress"));
}
}