use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::Instant;
use chrono::Utc;
use serde_json::Value;
use tracing::{error, info};
use uuid::Uuid;
#[cfg(feature = "prometheus")]
use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
use ironflow_core::provider::AgentProvider;
use ironflow_store::error::StoreError;
use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
use ironflow_store::store::RunStore;
#[cfg(feature = "prometheus")]
use metrics::{counter, gauge, histogram};
use crate::context::WorkflowContext;
use crate::error::EngineError;
use crate::handler::{WorkflowHandler, WorkflowInfo};
use crate::notify::{Event, EventPublisher, EventSubscriber};
pub struct Engine {
store: Arc<dyn RunStore>,
provider: Arc<dyn AgentProvider>,
handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
event_publisher: EventPublisher,
}
impl Engine {
pub fn new(store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
Self {
store,
provider,
handlers: HashMap::new(),
event_publisher: EventPublisher::new(),
}
}
pub fn store(&self) -> &Arc<dyn RunStore> {
&self.store
}
pub fn provider(&self) -> &Arc<dyn AgentProvider> {
&self.provider
}
fn build_context(&self, run_id: Uuid) -> WorkflowContext {
let handlers = self.handlers.clone();
let resolver: crate::context::HandlerResolver =
Arc::new(move |name: &str| handlers.get(name).cloned());
WorkflowContext::with_handler_resolver(
run_id,
self.store.clone(),
self.provider.clone(),
resolver,
)
}
pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
let name = handler.name().to_string();
if self.handlers.contains_key(&name) {
return Err(EngineError::InvalidWorkflow(format!(
"handler '{}' already registered",
name
)));
}
self.handlers.insert(name, Arc::new(handler));
Ok(())
}
pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
let name = handler.name().to_string();
if self.handlers.contains_key(&name) {
return Err(EngineError::InvalidWorkflow(format!(
"handler '{}' already registered",
name
)));
}
self.handlers.insert(name, Arc::from(handler));
Ok(())
}
pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
self.handlers.get(name)
}
pub fn handler_names(&self) -> Vec<&str> {
self.handlers.keys().map(|s| s.as_str()).collect()
}
pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
self.handlers.get(name).map(|h| h.describe())
}
pub fn subscribe(
&mut self,
subscriber: impl EventSubscriber + 'static,
event_types: &[&'static str],
) {
self.event_publisher.subscribe(subscriber, event_types);
}
pub fn event_publisher(&self) -> &EventPublisher {
&self.event_publisher
}
#[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
pub async fn run_handler(
&self,
handler_name: &str,
trigger: TriggerKind,
payload: Value,
) -> Result<Run, EngineError> {
let handler = self
.handlers
.get(handler_name)
.ok_or_else(|| {
EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
})?
.clone();
let run = self
.store
.create_run(NewRun {
workflow_name: handler_name.to_string(),
trigger,
payload,
max_retries: 0,
})
.await?;
let run_id = run.id;
info!(run_id = %run_id, "run created");
self.store
.update_run_status(run_id, RunStatus::Running)
.await?;
#[cfg(feature = "prometheus")]
gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
let run_start = Instant::now();
let mut ctx = self.build_context(run_id);
let result = handler.execute(&mut ctx).await;
self.finalize_run(run_id, handler_name, result, &ctx, run_start)
.await
}
#[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
pub async fn enqueue_handler(
&self,
handler_name: &str,
trigger: TriggerKind,
payload: Value,
max_retries: u32,
) -> Result<Run, EngineError> {
if !self.handlers.contains_key(handler_name) {
return Err(EngineError::InvalidWorkflow(format!(
"no handler registered: {handler_name}"
)));
}
let run = self
.store
.create_run(NewRun {
workflow_name: handler_name.to_string(),
trigger,
payload,
max_retries,
})
.await?;
info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
Ok(run)
}
#[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
let run = self
.store
.get_run(run_id)
.await?
.ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
let handler = self
.handlers
.get(&run.workflow_name)
.ok_or_else(|| {
EngineError::InvalidWorkflow(format!(
"no handler registered: {}",
run.workflow_name
))
})?
.clone();
#[cfg(feature = "prometheus")]
gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
let run_start = Instant::now();
let mut ctx = self.build_context(run_id);
let result = handler.execute(&mut ctx).await;
self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
.await
}
#[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
self.execute_handler_run(run_id).await
}
#[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
let run = self
.store
.get_run(run_id)
.await?
.ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
let handler = self
.handlers
.get(&run.workflow_name)
.ok_or_else(|| {
EngineError::InvalidWorkflow(format!(
"no handler registered: {}",
run.workflow_name
))
})?
.clone();
info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
let run_start = Instant::now();
let mut ctx = self.build_context(run_id);
ctx.load_replay_steps().await?;
let result = handler.execute(&mut ctx).await;
self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
.await
}
async fn finalize_run(
&self,
run_id: Uuid,
workflow_name: &str,
result: Result<(), EngineError>,
ctx: &WorkflowContext,
run_start: Instant,
) -> Result<Run, EngineError> {
let total_duration = run_start.elapsed().as_millis() as u64;
let completed_at = Utc::now();
let final_status;
let final_run;
match result {
Ok(()) => {
final_status = RunStatus::Completed;
final_run = self
.store
.update_run_returning(
run_id,
RunUpdate {
status: Some(RunStatus::Completed),
cost_usd: Some(ctx.total_cost_usd()),
duration_ms: Some(total_duration),
completed_at: Some(completed_at),
..RunUpdate::default()
},
)
.await?;
info!(
run_id = %run_id,
cost_usd = %ctx.total_cost_usd(),
duration_ms = total_duration,
"run completed"
);
}
Err(EngineError::ApprovalRequired {
run_id: approval_run_id,
step_id,
ref message,
}) => {
final_status = RunStatus::AwaitingApproval;
final_run = self
.store
.update_run_returning(
run_id,
RunUpdate {
status: Some(RunStatus::AwaitingApproval),
cost_usd: Some(ctx.total_cost_usd()),
duration_ms: Some(total_duration),
..RunUpdate::default()
},
)
.await?;
info!(
run_id = %approval_run_id,
step_id = %step_id,
message = %message,
"run awaiting approval"
);
}
Err(err) => {
final_status = RunStatus::Failed;
if let Err(store_err) = self
.store
.update_run(
run_id,
RunUpdate {
status: Some(RunStatus::Failed),
error: Some(err.to_string()),
cost_usd: Some(ctx.total_cost_usd()),
duration_ms: Some(total_duration),
completed_at: Some(completed_at),
..RunUpdate::default()
},
)
.await
{
error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
}
error!(run_id = %run_id, error = %err, "run failed");
self.publish_run_status_changed(
workflow_name,
run_id,
final_status,
Some(err.to_string()),
ctx,
total_duration,
);
#[cfg(feature = "prometheus")]
self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
return Err(err);
}
}
self.publish_run_status_changed(
workflow_name,
run_id,
final_status,
None,
ctx,
total_duration,
);
#[cfg(feature = "prometheus")]
self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
Ok(final_run)
}
#[cfg(feature = "prometheus")]
fn emit_run_metrics(
&self,
workflow_name: &str,
status: RunStatus,
duration_ms: u64,
ctx: &WorkflowContext,
) {
let status_str = status.to_string();
let wf = workflow_name.to_string();
counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
.record(duration_ms as f64 / 1000.0);
histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
ctx.total_cost_usd()
.to_string()
.parse::<f64>()
.unwrap_or(0.0),
);
gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
}
fn publish_run_status_changed(
&self,
workflow_name: &str,
run_id: Uuid,
to: RunStatus,
error: Option<String>,
ctx: &WorkflowContext,
duration_ms: u64,
) {
let now = Utc::now();
let cost_usd = ctx.total_cost_usd();
let wf = workflow_name.to_string();
self.event_publisher.publish(Event::RunStatusChanged {
run_id,
workflow_name: wf.clone(),
from: RunStatus::Running,
to,
error: error.clone(),
cost_usd,
duration_ms,
at: now,
});
if to == RunStatus::Failed {
self.event_publisher.publish(Event::RunFailed {
run_id,
workflow_name: wf,
error,
cost_usd,
duration_ms,
at: now,
});
}
}
}
impl fmt::Debug for Engine {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Engine")
.field("handlers", &self.handlers.keys().collect::<Vec<_>>())
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ShellConfig;
use crate::handler::{HandlerFuture, WorkflowHandler};
use ironflow_core::providers::claude::ClaudeCodeProvider;
use ironflow_core::providers::record_replay::RecordReplayProvider;
use ironflow_store::memory::InMemoryStore;
use ironflow_store::models::StepStatus;
use serde_json::json;
struct EchoWorkflow;
impl WorkflowHandler for EchoWorkflow {
fn name(&self) -> &str {
"echo-workflow"
}
fn describe(&self) -> WorkflowInfo {
WorkflowInfo {
description: "A simple workflow that echoes hello".to_string(),
source_code: None,
sub_workflows: Vec::new(),
}
}
fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
Box::pin(async move {
ctx.shell("greet", ShellConfig::new("echo hello")).await?;
Ok(())
})
}
}
struct FailingWorkflow;
impl WorkflowHandler for FailingWorkflow {
fn name(&self) -> &str {
"failing-workflow"
}
fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
Box::pin(async move {
ctx.shell("fail", ShellConfig::new("exit 1")).await?;
Ok(())
})
}
}
fn create_test_engine() -> Engine {
let store = Arc::new(InMemoryStore::new());
let inner = ClaudeCodeProvider::new();
let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
inner,
"/tmp/ironflow-fixtures",
));
Engine::new(store, provider)
}
#[test]
fn engine_new_creates_instance() {
let engine = create_test_engine();
assert_eq!(engine.handler_names().len(), 0);
}
#[test]
fn engine_register_handler() {
let mut engine = create_test_engine();
let result = engine.register(EchoWorkflow);
assert!(result.is_ok());
assert_eq!(engine.handler_names().len(), 1);
assert!(engine.handler_names().contains(&"echo-workflow"));
}
#[test]
fn engine_register_duplicate_returns_error() {
let mut engine = create_test_engine();
engine.register(EchoWorkflow).unwrap();
let result = engine.register(EchoWorkflow);
assert!(result.is_err());
}
#[test]
fn engine_get_handler_found() {
let mut engine = create_test_engine();
engine.register(EchoWorkflow).unwrap();
let handler = engine.get_handler("echo-workflow");
assert!(handler.is_some());
}
#[test]
fn engine_get_handler_not_found() {
let engine = create_test_engine();
let handler = engine.get_handler("nonexistent");
assert!(handler.is_none());
}
#[test]
fn engine_handler_names_lists_all() {
let mut engine = create_test_engine();
engine.register(EchoWorkflow).unwrap();
engine.register(FailingWorkflow).unwrap();
let names = engine.handler_names();
assert_eq!(names.len(), 2);
assert!(names.contains(&"echo-workflow"));
assert!(names.contains(&"failing-workflow"));
}
#[test]
fn engine_handler_info_returns_description() {
let mut engine = create_test_engine();
engine.register(EchoWorkflow).unwrap();
let info = engine.handler_info("echo-workflow");
assert!(info.is_some());
let info = info.unwrap();
assert_eq!(info.description, "A simple workflow that echoes hello");
}
#[tokio::test]
async fn engine_unknown_workflow_returns_error() {
let engine = create_test_engine();
let result = engine
.run_handler("unknown", TriggerKind::Manual, json!({}))
.await;
assert!(result.is_err());
match result {
Err(EngineError::InvalidWorkflow(msg)) => {
assert!(msg.contains("no handler registered"));
}
_ => panic!("expected InvalidWorkflow error"),
}
}
#[tokio::test]
async fn engine_enqueue_handler_creates_pending_run() {
let mut engine = create_test_engine();
engine.register(EchoWorkflow).unwrap();
let run = engine
.enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
.await
.unwrap();
assert_eq!(run.status.state, RunStatus::Pending);
assert_eq!(run.workflow_name, "echo-workflow");
}
#[tokio::test]
async fn engine_register_boxed() {
let mut engine = create_test_engine();
let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
let result = engine.register_boxed(handler);
assert!(result.is_ok());
assert_eq!(engine.handler_names().len(), 1);
}
#[tokio::test]
async fn engine_store_and_provider_accessors() {
let store = Arc::new(InMemoryStore::new());
let inner = ClaudeCodeProvider::new();
let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
inner,
"/tmp/ironflow-fixtures",
));
let engine = Engine::new(store.clone(), provider.clone());
let _ = engine.store();
let _ = engine.provider();
}
use crate::operation::Operation;
use ironflow_store::models::StepKind;
use std::future::Future;
use std::pin::Pin;
struct FakeGitlabOp {
project_id: u64,
title: String,
}
impl Operation for FakeGitlabOp {
fn kind(&self) -> &str {
"gitlab"
}
fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
Box::pin(async move {
Ok(json!({
"issue_id": 42,
"project_id": self.project_id,
"title": self.title,
}))
})
}
fn input(&self) -> Option<Value> {
Some(json!({
"project_id": self.project_id,
"title": self.title,
}))
}
}
struct FailingOp;
impl Operation for FailingOp {
fn kind(&self) -> &str {
"broken-service"
}
fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
}
}
struct OperationWorkflow;
impl WorkflowHandler for OperationWorkflow {
fn name(&self) -> &str {
"operation-workflow"
}
fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
Box::pin(async move {
let op = FakeGitlabOp {
project_id: 123,
title: "Bug report".to_string(),
};
ctx.operation("create-issue", &op).await?;
Ok(())
})
}
}
struct FailingOperationWorkflow;
impl WorkflowHandler for FailingOperationWorkflow {
fn name(&self) -> &str {
"failing-operation-workflow"
}
fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
Box::pin(async move {
ctx.operation("broken-call", &FailingOp).await?;
Ok(())
})
}
}
struct MixedWorkflow;
impl WorkflowHandler for MixedWorkflow {
fn name(&self) -> &str {
"mixed-workflow"
}
fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
Box::pin(async move {
ctx.shell("build", ShellConfig::new("echo built")).await?;
let op = FakeGitlabOp {
project_id: 456,
title: "Deploy done".to_string(),
};
let result = ctx.operation("notify-gitlab", &op).await?;
assert_eq!(result.output["issue_id"], 42);
Ok(())
})
}
}
#[tokio::test]
async fn operation_step_happy_path() {
let mut engine = create_test_engine();
engine.register(OperationWorkflow).unwrap();
let run = engine
.run_handler("operation-workflow", TriggerKind::Manual, json!({}))
.await
.unwrap();
assert_eq!(run.status.state, RunStatus::Completed);
let steps = engine.store().list_steps(run.id).await.unwrap();
assert_eq!(steps.len(), 1);
assert_eq!(steps[0].name, "create-issue");
assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
assert_eq!(
steps[0].status.state,
ironflow_store::models::StepStatus::Completed
);
let output = steps[0].output.as_ref().unwrap();
assert_eq!(output["issue_id"], 42);
assert_eq!(output["project_id"], 123);
let input = steps[0].input.as_ref().unwrap();
assert_eq!(input["project_id"], 123);
assert_eq!(input["title"], "Bug report");
}
#[tokio::test]
async fn operation_step_failure_marks_run_failed() {
let mut engine = create_test_engine();
engine.register(FailingOperationWorkflow).unwrap();
let result = engine
.run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn operation_mixed_with_shell_steps() {
let mut engine = create_test_engine();
engine.register(MixedWorkflow).unwrap();
let run = engine
.run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
.await
.unwrap();
assert_eq!(run.status.state, RunStatus::Completed);
let steps = engine.store().list_steps(run.id).await.unwrap();
assert_eq!(steps.len(), 2);
assert_eq!(steps[0].kind, StepKind::Shell);
assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
assert_eq!(steps[0].position, 0);
assert_eq!(steps[1].position, 1);
}
use crate::config::ApprovalConfig;
struct SingleApprovalWorkflow;
impl WorkflowHandler for SingleApprovalWorkflow {
fn name(&self) -> &str {
"single-approval"
}
fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
Box::pin(async move {
ctx.shell("build", ShellConfig::new("echo built")).await?;
ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
ctx.shell("deploy", ShellConfig::new("echo deployed"))
.await?;
Ok(())
})
}
}
struct DoubleApprovalWorkflow;
impl WorkflowHandler for DoubleApprovalWorkflow {
fn name(&self) -> &str {
"double-approval"
}
fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
Box::pin(async move {
ctx.shell("build", ShellConfig::new("echo built")).await?;
ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
.await?;
ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
.await?;
ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
.await?;
ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
.await?;
Ok(())
})
}
}
#[tokio::test]
async fn approval_pauses_run() {
let mut engine = create_test_engine();
engine.register(SingleApprovalWorkflow).unwrap();
let run = engine
.run_handler("single-approval", TriggerKind::Manual, json!({}))
.await
.unwrap();
assert_eq!(run.status.state, RunStatus::AwaitingApproval);
let steps = engine.store().list_steps(run.id).await.unwrap();
assert_eq!(steps.len(), 2); assert_eq!(steps[0].kind, StepKind::Shell);
assert_eq!(steps[0].status.state, StepStatus::Completed);
assert_eq!(steps[1].kind, StepKind::Approval);
assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
}
#[tokio::test]
async fn approval_resume_completes_run() {
let mut engine = create_test_engine();
engine.register(SingleApprovalWorkflow).unwrap();
let run = engine
.run_handler("single-approval", TriggerKind::Manual, json!({}))
.await
.unwrap();
assert_eq!(run.status.state, RunStatus::AwaitingApproval);
engine
.store()
.update_run_status(run.id, RunStatus::Running)
.await
.unwrap();
let resumed = engine.resume_run(run.id).await.unwrap();
assert_eq!(resumed.status.state, RunStatus::Completed);
let steps = engine.store().list_steps(run.id).await.unwrap();
assert_eq!(steps.len(), 3); assert_eq!(steps[0].name, "build");
assert_eq!(steps[0].status.state, StepStatus::Completed);
assert_eq!(steps[1].name, "gate");
assert_eq!(steps[1].kind, StepKind::Approval);
assert_eq!(steps[1].status.state, StepStatus::Completed);
assert_eq!(steps[2].name, "deploy");
assert_eq!(steps[2].status.state, StepStatus::Completed);
}
#[tokio::test]
async fn double_approval_two_resumes() {
let mut engine = create_test_engine();
engine.register(DoubleApprovalWorkflow).unwrap();
let run = engine
.run_handler("double-approval", TriggerKind::Manual, json!({}))
.await
.unwrap();
assert_eq!(run.status.state, RunStatus::AwaitingApproval);
let steps = engine.store().list_steps(run.id).await.unwrap();
assert_eq!(steps.len(), 2);
engine
.store()
.update_run_status(run.id, RunStatus::Running)
.await
.unwrap();
let resumed = engine.resume_run(run.id).await.unwrap();
assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
let steps = engine.store().list_steps(run.id).await.unwrap();
assert_eq!(steps.len(), 4);
engine
.store()
.update_run_status(run.id, RunStatus::Running)
.await
.unwrap();
let final_run = engine.resume_run(run.id).await.unwrap();
assert_eq!(final_run.status.state, RunStatus::Completed);
let steps = engine.store().list_steps(run.id).await.unwrap();
assert_eq!(steps.len(), 5);
assert_eq!(steps[0].name, "build");
assert_eq!(steps[1].name, "staging-gate");
assert_eq!(steps[2].name, "deploy-staging");
assert_eq!(steps[3].name, "prod-gate");
assert_eq!(steps[4].name, "deploy-prod");
for step in &steps {
assert_eq!(step.status.state, StepStatus::Completed);
}
}
}