1use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Instant;
15
16use chrono::Utc;
17use serde_json::Value;
18use tracing::{error, info};
19use uuid::Uuid;
20
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
24use ironflow_store::store::RunStore;
25
26use crate::config::StepConfig;
27use crate::context::WorkflowContext;
28use crate::error::EngineError;
29use crate::handler::{WorkflowHandler, WorkflowInfo};
30use crate::workflow::WorkflowDef;
31
32pub struct Engine {
64 store: Arc<dyn RunStore>,
65 provider: Arc<dyn AgentProvider>,
66 handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
67}
68
69impl Engine {
70 pub fn new(store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
86 Self {
87 store,
88 provider,
89 handlers: HashMap::new(),
90 }
91 }
92
93 pub fn store(&self) -> &Arc<dyn RunStore> {
95 &self.store
96 }
97
98 pub fn provider(&self) -> &Arc<dyn AgentProvider> {
100 &self.provider
101 }
102
103 fn build_context(&self, run_id: Uuid) -> WorkflowContext {
105 let handlers = self.handlers.clone();
106 let resolver: crate::context::HandlerResolver =
107 Arc::new(move |name: &str| handlers.get(name).cloned());
108 WorkflowContext::with_handler_resolver(
109 run_id,
110 self.store.clone(),
111 self.provider.clone(),
112 resolver,
113 )
114 }
115
116 pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
160 let name = handler.name().to_string();
161 if self.handlers.contains_key(&name) {
162 return Err(EngineError::InvalidWorkflow(format!(
163 "handler '{}' already registered",
164 name
165 )));
166 }
167 self.handlers.insert(name, Arc::new(handler));
168 Ok(())
169 }
170
171 pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
178 let name = handler.name().to_string();
179 if self.handlers.contains_key(&name) {
180 return Err(EngineError::InvalidWorkflow(format!(
181 "handler '{}' already registered",
182 name
183 )));
184 }
185 self.handlers.insert(name, Arc::from(handler));
186 Ok(())
187 }
188
189 pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
191 self.handlers.get(name)
192 }
193
194 pub fn handler_names(&self) -> Vec<&str> {
196 self.handlers.keys().map(|s| s.as_str()).collect()
197 }
198
199 pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
201 self.handlers.get(name).map(|h| h.describe())
202 }
203
204 #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
234 pub async fn run_handler(
235 &self,
236 handler_name: &str,
237 trigger: TriggerKind,
238 payload: Value,
239 ) -> Result<Run, EngineError> {
240 let handler = self
241 .handlers
242 .get(handler_name)
243 .ok_or_else(|| {
244 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
245 })?
246 .clone();
247
248 let run = self
249 .store
250 .create_run(NewRun {
251 workflow_name: handler_name.to_string(),
252 trigger,
253 payload,
254 max_retries: 0,
255 })
256 .await?;
257
258 let run_id = run.id;
259 info!(run_id = %run_id, "run created");
260
261 self.store
262 .update_run_status(run_id, RunStatus::Running)
263 .await?;
264
265 let run_start = Instant::now();
266 let mut ctx = self.build_context(run_id);
267
268 let result = handler.execute(&mut ctx).await;
269 self.finalize_run(run_id, result, &ctx, run_start).await
270 }
271
272 #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
281 pub async fn enqueue_handler(
282 &self,
283 handler_name: &str,
284 trigger: TriggerKind,
285 payload: Value,
286 max_retries: u32,
287 ) -> Result<Run, EngineError> {
288 if !self.handlers.contains_key(handler_name) {
289 return Err(EngineError::InvalidWorkflow(format!(
290 "no handler registered: {handler_name}"
291 )));
292 }
293
294 let run = self
295 .store
296 .create_run(NewRun {
297 workflow_name: handler_name.to_string(),
298 trigger,
299 payload,
300 max_retries,
301 })
302 .await?;
303
304 info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
305 Ok(run)
306 }
307
308 #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
317 pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
318 let run = self
319 .store
320 .get_run(run_id)
321 .await?
322 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
323
324 let handler = self
325 .handlers
326 .get(&run.workflow_name)
327 .ok_or_else(|| {
328 EngineError::InvalidWorkflow(format!(
329 "no handler registered: {}",
330 run.workflow_name
331 ))
332 })?
333 .clone();
334
335 let run_start = Instant::now();
336 let mut ctx = self.build_context(run_id);
337
338 let result = handler.execute(&mut ctx).await;
339 self.finalize_run(run_id, result, &ctx, run_start).await
340 }
341
342 #[tracing::instrument(name = "engine.run_inline", skip_all, fields(workflow = %workflow.name))]
355 pub async fn run_inline(
356 &self,
357 workflow: &WorkflowDef,
358 trigger: TriggerKind,
359 payload: Value,
360 ) -> Result<Run, EngineError> {
361 let run = self
362 .store
363 .create_run(NewRun {
364 workflow_name: workflow.name.clone(),
365 trigger,
366 payload,
367 max_retries: 0,
368 })
369 .await?;
370
371 let run_id = run.id;
372 info!(run_id = %run_id, "run created");
373
374 self.store
375 .update_run_status(run_id, RunStatus::Running)
376 .await?;
377
378 let run_start = Instant::now();
379 let mut ctx = self.build_context(run_id);
380
381 let result = async {
383 for step_def in &workflow.steps {
384 match &step_def.config {
385 StepConfig::Shell(cfg) => {
386 ctx.shell(&step_def.name, cfg.clone()).await?;
387 }
388 StepConfig::Http(cfg) => {
389 ctx.http(&step_def.name, cfg.clone()).await?;
390 }
391 StepConfig::Agent(cfg) => {
392 ctx.agent(&step_def.name, cfg.clone()).await?;
393 }
394 StepConfig::Workflow(cfg) => {
395 let handler = self.handlers.get(&cfg.workflow_name).ok_or_else(|| {
396 EngineError::InvalidWorkflow(format!(
397 "no handler registered: {}",
398 cfg.workflow_name
399 ))
400 })?;
401 ctx.workflow(handler.as_ref(), cfg.payload.clone()).await?;
402 }
403 }
404 }
405 Ok::<(), EngineError>(())
406 }
407 .await;
408
409 self.finalize_run(run_id, result, &ctx, run_start).await
410 }
411
412 #[tracing::instrument(name = "engine.enqueue", skip_all, fields(workflow = %workflow.name))]
416 pub async fn enqueue(
417 &self,
418 workflow: &WorkflowDef,
419 trigger: TriggerKind,
420 payload: Value,
421 max_retries: u32,
422 ) -> Result<Run, EngineError> {
423 let enriched_payload = serde_json::json!({
424 "workflow": serde_json::to_value(workflow)?,
425 "original_payload": payload,
426 });
427
428 let run = self
429 .store
430 .create_run(NewRun {
431 workflow_name: workflow.name.clone(),
432 trigger,
433 payload: enriched_payload,
434 max_retries,
435 })
436 .await?;
437
438 info!(run_id = %run.id, workflow = %workflow.name, "run enqueued");
439 Ok(run)
440 }
441
442 #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
446 pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
447 let run = self
449 .store
450 .get_run(run_id)
451 .await?
452 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
453
454 if self.handlers.contains_key(&run.workflow_name) {
455 return self.execute_handler_run(run_id).await;
456 }
457
458 let workflow: WorkflowDef = serde_json::from_value(
460 run.payload
461 .get("workflow")
462 .cloned()
463 .ok_or_else(|| EngineError::StepConfig("missing 'workflow' in payload".into()))?,
464 )
465 .map_err(|e| EngineError::StepConfig(format!("invalid workflow in payload: {e}")))?;
466
467 let run_start = Instant::now();
468 let mut ctx = self.build_context(run_id);
469
470 let result = async {
472 for step_def in &workflow.steps {
473 match &step_def.config {
474 StepConfig::Shell(cfg) => {
475 ctx.shell(&step_def.name, cfg.clone()).await?;
476 }
477 StepConfig::Http(cfg) => {
478 ctx.http(&step_def.name, cfg.clone()).await?;
479 }
480 StepConfig::Agent(cfg) => {
481 ctx.agent(&step_def.name, cfg.clone()).await?;
482 }
483 StepConfig::Workflow(cfg) => {
484 let handler = self.handlers.get(&cfg.workflow_name).ok_or_else(|| {
485 EngineError::InvalidWorkflow(format!(
486 "no handler registered: {}",
487 cfg.workflow_name
488 ))
489 })?;
490 ctx.workflow(handler.as_ref(), cfg.payload.clone()).await?;
491 }
492 }
493 }
494 Ok::<(), EngineError>(())
495 }
496 .await;
497
498 self.finalize_run(run_id, result, &ctx, run_start).await
499 }
500
501 async fn finalize_run(
510 &self,
511 run_id: Uuid,
512 result: Result<(), EngineError>,
513 ctx: &WorkflowContext,
514 run_start: Instant,
515 ) -> Result<Run, EngineError> {
516 let total_duration = run_start.elapsed().as_millis() as u64;
517 let completed_at = Utc::now();
518
519 match result {
520 Ok(()) => {
521 self.store
522 .update_run(
523 run_id,
524 RunUpdate {
525 status: Some(RunStatus::Completed),
526 cost_usd: Some(ctx.total_cost_usd()),
527 duration_ms: Some(total_duration),
528 completed_at: Some(completed_at),
529 ..RunUpdate::default()
530 },
531 )
532 .await?;
533
534 info!(
535 run_id = %run_id,
536 cost_usd = %ctx.total_cost_usd(),
537 duration_ms = total_duration,
538 "run completed"
539 );
540 }
541 Err(err) => {
542 if let Err(store_err) = self
543 .store
544 .update_run(
545 run_id,
546 RunUpdate {
547 status: Some(RunStatus::Failed),
548 error: Some(err.to_string()),
549 cost_usd: Some(ctx.total_cost_usd()),
550 duration_ms: Some(total_duration),
551 completed_at: Some(completed_at),
552 ..RunUpdate::default()
553 },
554 )
555 .await
556 {
557 error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
558 }
559
560 error!(run_id = %run_id, error = %err, "run failed");
561 return Err(err);
562 }
563 }
564
565 self.store
566 .get_run(run_id)
567 .await?
568 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))
569 }
570}
571
572impl std::fmt::Debug for Engine {
573 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
574 f.debug_struct("Engine")
575 .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
576 .finish_non_exhaustive()
577 }
578}
579
580#[cfg(test)]
581mod tests {
582 use super::*;
583 use crate::config::ShellConfig;
584 use crate::handler::{HandlerFuture, WorkflowHandler};
585 use crate::workflow::Workflow;
586 use ironflow_core::providers::claude::ClaudeCodeProvider;
587 use ironflow_core::providers::record_replay::RecordReplayProvider;
588 use ironflow_store::memory::InMemoryStore;
589 use serde_json::json;
590
591 struct EchoWorkflow;
593
594 impl WorkflowHandler for EchoWorkflow {
595 fn name(&self) -> &str {
596 "echo-workflow"
597 }
598
599 fn describe(&self) -> WorkflowInfo {
600 WorkflowInfo {
601 description: "A simple workflow that echoes hello".to_string(),
602 source_code: None,
603 sub_workflows: Vec::new(),
604 }
605 }
606
607 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
608 Box::pin(async move {
609 ctx.shell("greet", ShellConfig::new("echo hello")).await?;
610 Ok(())
611 })
612 }
613 }
614
615 struct FailingWorkflow;
617
618 impl WorkflowHandler for FailingWorkflow {
619 fn name(&self) -> &str {
620 "failing-workflow"
621 }
622
623 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
624 Box::pin(async move {
625 ctx.shell("fail", ShellConfig::new("exit 1")).await?;
626 Ok(())
627 })
628 }
629 }
630
631 fn create_test_engine() -> Engine {
632 let store = Arc::new(InMemoryStore::new());
633 let inner = ClaudeCodeProvider::new();
634 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
635 inner,
636 "/tmp/ironflow-fixtures",
637 ));
638 Engine::new(store, provider)
639 }
640
641 #[test]
642 fn engine_new_creates_instance() {
643 let engine = create_test_engine();
644 assert_eq!(engine.handler_names().len(), 0);
645 }
646
647 #[test]
648 fn engine_register_handler() {
649 let mut engine = create_test_engine();
650 let result = engine.register(EchoWorkflow);
651 assert!(result.is_ok());
652 assert_eq!(engine.handler_names().len(), 1);
653 assert!(engine.handler_names().contains(&"echo-workflow"));
654 }
655
656 #[test]
657 fn engine_register_duplicate_returns_error() {
658 let mut engine = create_test_engine();
659 engine.register(EchoWorkflow).unwrap();
660 let result = engine.register(EchoWorkflow);
661 assert!(result.is_err());
662 }
663
664 #[test]
665 fn engine_get_handler_found() {
666 let mut engine = create_test_engine();
667 engine.register(EchoWorkflow).unwrap();
668 let handler = engine.get_handler("echo-workflow");
669 assert!(handler.is_some());
670 }
671
672 #[test]
673 fn engine_get_handler_not_found() {
674 let engine = create_test_engine();
675 let handler = engine.get_handler("nonexistent");
676 assert!(handler.is_none());
677 }
678
679 #[test]
680 fn engine_handler_names_lists_all() {
681 let mut engine = create_test_engine();
682 engine.register(EchoWorkflow).unwrap();
683 engine.register(FailingWorkflow).unwrap();
684 let names = engine.handler_names();
685 assert_eq!(names.len(), 2);
686 assert!(names.contains(&"echo-workflow"));
687 assert!(names.contains(&"failing-workflow"));
688 }
689
690 #[test]
691 fn engine_handler_info_returns_description() {
692 let mut engine = create_test_engine();
693 engine.register(EchoWorkflow).unwrap();
694 let info = engine.handler_info("echo-workflow");
695 assert!(info.is_some());
696 let info = info.unwrap();
697 assert_eq!(info.description, "A simple workflow that echoes hello");
698 }
699
700 #[tokio::test]
701 async fn engine_unknown_workflow_returns_error() {
702 let engine = create_test_engine();
703 let result = engine
704 .run_handler("unknown", TriggerKind::Manual, json!({}))
705 .await;
706 assert!(result.is_err());
707 match result {
708 Err(EngineError::InvalidWorkflow(msg)) => {
709 assert!(msg.contains("no handler registered"));
710 }
711 _ => panic!("expected InvalidWorkflow error"),
712 }
713 }
714
715 #[tokio::test]
716 async fn engine_run_inline_happy_path() {
717 let engine = create_test_engine();
718 let workflow = Workflow::new("simple")
719 .shell("test", ShellConfig::new("echo hello"))
720 .build()
721 .unwrap();
722
723 let result = engine
724 .run_inline(&workflow, TriggerKind::Manual, json!({}))
725 .await;
726 assert!(result.is_ok());
727 let run = result.unwrap();
728 assert_eq!(run.status.state, RunStatus::Completed);
729 }
730
731 #[tokio::test]
732 async fn engine_run_inline_step_failure_marks_failed() {
733 let engine = create_test_engine();
734 let workflow = Workflow::new("failing")
735 .shell("fail", ShellConfig::new("exit 1"))
736 .build()
737 .unwrap();
738
739 let result = engine
740 .run_inline(&workflow, TriggerKind::Manual, json!({}))
741 .await;
742 assert!(result.is_err());
743 }
744
745 #[tokio::test]
746 async fn engine_enqueue_creates_pending_run() {
747 let engine = create_test_engine();
748 let workflow = Workflow::new("queued")
749 .shell("test", ShellConfig::new("echo queued"))
750 .build()
751 .unwrap();
752
753 let run = engine
754 .enqueue(&workflow, TriggerKind::Manual, json!({}), 3)
755 .await
756 .unwrap();
757 assert_eq!(run.status.state, RunStatus::Pending);
758 assert_eq!(run.max_retries, 3);
759 }
760
761 #[tokio::test]
762 async fn engine_enqueue_handler_creates_pending_run() {
763 let mut engine = create_test_engine();
764 engine.register(EchoWorkflow).unwrap();
765
766 let run = engine
767 .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
768 .await
769 .unwrap();
770 assert_eq!(run.status.state, RunStatus::Pending);
771 assert_eq!(run.workflow_name, "echo-workflow");
772 }
773
774 #[tokio::test]
775 async fn engine_register_boxed() {
776 let mut engine = create_test_engine();
777 let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
778 let result = engine.register_boxed(handler);
779 assert!(result.is_ok());
780 assert_eq!(engine.handler_names().len(), 1);
781 }
782
783 #[tokio::test]
784 async fn engine_store_and_provider_accessors() {
785 let store = Arc::new(InMemoryStore::new());
786 let inner = ClaudeCodeProvider::new();
787 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
788 inner,
789 "/tmp/ironflow-fixtures",
790 ));
791 let engine = Engine::new(store.clone(), provider.clone());
792
793 let _ = engine.store();
795 let _ = engine.provider();
796 }
797
798 use crate::operation::Operation;
803 use ironflow_store::models::StepKind;
804 use std::future::Future;
805 use std::pin::Pin;
806
807 struct FakeGitlabOp {
808 project_id: u64,
809 title: String,
810 }
811
812 impl Operation for FakeGitlabOp {
813 fn kind(&self) -> &str {
814 "gitlab"
815 }
816
817 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
818 Box::pin(async move {
819 Ok(json!({
820 "issue_id": 42,
821 "project_id": self.project_id,
822 "title": self.title,
823 }))
824 })
825 }
826
827 fn input(&self) -> Option<Value> {
828 Some(json!({
829 "project_id": self.project_id,
830 "title": self.title,
831 }))
832 }
833 }
834
835 struct FailingOp;
836
837 impl Operation for FailingOp {
838 fn kind(&self) -> &str {
839 "broken-service"
840 }
841
842 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
843 Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
844 }
845 }
846
847 struct OperationWorkflow;
848
849 impl WorkflowHandler for OperationWorkflow {
850 fn name(&self) -> &str {
851 "operation-workflow"
852 }
853
854 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
855 Box::pin(async move {
856 let op = FakeGitlabOp {
857 project_id: 123,
858 title: "Bug report".to_string(),
859 };
860 ctx.operation("create-issue", &op).await?;
861 Ok(())
862 })
863 }
864 }
865
866 struct FailingOperationWorkflow;
867
868 impl WorkflowHandler for FailingOperationWorkflow {
869 fn name(&self) -> &str {
870 "failing-operation-workflow"
871 }
872
873 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
874 Box::pin(async move {
875 ctx.operation("broken-call", &FailingOp).await?;
876 Ok(())
877 })
878 }
879 }
880
881 struct MixedWorkflow;
882
883 impl WorkflowHandler for MixedWorkflow {
884 fn name(&self) -> &str {
885 "mixed-workflow"
886 }
887
888 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
889 Box::pin(async move {
890 ctx.shell("build", ShellConfig::new("echo built")).await?;
891 let op = FakeGitlabOp {
892 project_id: 456,
893 title: "Deploy done".to_string(),
894 };
895 let result = ctx.operation("notify-gitlab", &op).await?;
896 assert_eq!(result.output["issue_id"], 42);
897 Ok(())
898 })
899 }
900 }
901
902 #[tokio::test]
903 async fn operation_step_happy_path() {
904 let mut engine = create_test_engine();
905 engine.register(OperationWorkflow).unwrap();
906
907 let run = engine
908 .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
909 .await
910 .unwrap();
911
912 assert_eq!(run.status.state, RunStatus::Completed);
913
914 let steps = engine.store().list_steps(run.id).await.unwrap();
915
916 assert_eq!(steps.len(), 1);
917 assert_eq!(steps[0].name, "create-issue");
918 assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
919 assert_eq!(
920 steps[0].status.state,
921 ironflow_store::models::StepStatus::Completed
922 );
923
924 let output = steps[0].output.as_ref().unwrap();
925 assert_eq!(output["issue_id"], 42);
926 assert_eq!(output["project_id"], 123);
927
928 let input = steps[0].input.as_ref().unwrap();
929 assert_eq!(input["project_id"], 123);
930 assert_eq!(input["title"], "Bug report");
931 }
932
933 #[tokio::test]
934 async fn operation_step_failure_marks_run_failed() {
935 let mut engine = create_test_engine();
936 engine.register(FailingOperationWorkflow).unwrap();
937
938 let result = engine
939 .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
940 .await;
941
942 assert!(result.is_err());
943 }
944
945 #[tokio::test]
946 async fn operation_mixed_with_shell_steps() {
947 let mut engine = create_test_engine();
948 engine.register(MixedWorkflow).unwrap();
949
950 let run = engine
951 .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
952 .await
953 .unwrap();
954
955 assert_eq!(run.status.state, RunStatus::Completed);
956
957 let steps = engine.store().list_steps(run.id).await.unwrap();
958
959 assert_eq!(steps.len(), 2);
960 assert_eq!(steps[0].kind, StepKind::Shell);
961 assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
962 assert_eq!(steps[0].position, 0);
963 assert_eq!(steps[1].position, 1);
964 }
965}