1use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::{DateTime, Utc};
15use serde_json::Value;
16use tracing::{error, info, warn};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{
24 NewRun, Run, RunStatus, RunUpdate, StepStatus, StepUpdate, TriggerKind,
25};
26use ironflow_store::store::Store;
27#[cfg(feature = "prometheus")]
28use metrics::{counter, gauge, histogram};
29
30use crate::context::WorkflowContext;
31use crate::error::EngineError;
32use crate::handler::{WorkflowHandler, WorkflowInfo};
33use crate::log_sender::LogSender;
34use crate::notify::{Event, EventPublisher, EventSubscriber};
35
36pub struct Engine {
77 store: Arc<dyn Store>,
78 provider: Arc<dyn AgentProvider>,
79 handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
80 event_publisher: EventPublisher,
81 log_sender: Option<LogSender>,
82}
83
84fn validate_category(handler_name: &str, category: &str) -> Result<(), EngineError> {
94 let reject = |reason: &str| {
95 Err(EngineError::InvalidWorkflow(format!(
96 "handler '{handler_name}' has invalid category '{category}': {reason}"
97 )))
98 };
99
100 if category.is_empty() {
101 return reject("empty category");
102 }
103 if category.starts_with('/') {
104 return reject("leading '/'");
105 }
106 if category.ends_with('/') {
107 return reject("trailing '/'");
108 }
109 for segment in category.split('/') {
110 if segment.is_empty() {
111 return reject("empty segment (double '/')");
112 }
113 if segment.trim().is_empty() {
114 return reject("whitespace-only segment");
115 }
116 }
117 Ok(())
118}
119
120impl Engine {
121 pub fn new(store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
137 Self {
138 store,
139 provider,
140 handlers: HashMap::new(),
141 event_publisher: EventPublisher::new(),
142 log_sender: None,
143 }
144 }
145
146 pub fn set_log_sender(&mut self, sender: LogSender) {
152 self.log_sender = Some(sender);
153 }
154
155 pub fn store(&self) -> &Arc<dyn Store> {
157 &self.store
158 }
159
160 pub fn provider(&self) -> &Arc<dyn AgentProvider> {
162 &self.provider
163 }
164
165 fn build_context(&self, run_id: Uuid) -> WorkflowContext {
167 let handlers = self.handlers.clone();
168 let resolver: crate::context::HandlerResolver =
169 Arc::new(move |name: &str| handlers.get(name).cloned());
170 let mut ctx = WorkflowContext::with_handler_resolver(
171 run_id,
172 self.store.clone(),
173 self.provider.clone(),
174 resolver,
175 );
176 if let Some(ref sender) = self.log_sender {
177 ctx.set_log_sender(sender.clone());
178 }
179 ctx
180 }
181
182 pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
226 let name = handler.name().to_string();
227 if self.handlers.contains_key(&name) {
228 return Err(EngineError::InvalidWorkflow(format!(
229 "handler '{}' already registered",
230 name
231 )));
232 }
233 if let Some(category) = handler.category() {
234 validate_category(&name, category)?;
235 }
236 self.handlers.insert(name, Arc::new(handler));
237 Ok(())
238 }
239
240 pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
247 let name = handler.name().to_string();
248 if self.handlers.contains_key(&name) {
249 return Err(EngineError::InvalidWorkflow(format!(
250 "handler '{}' already registered",
251 name
252 )));
253 }
254 if let Some(category) = handler.category() {
255 validate_category(&name, category)?;
256 }
257 self.handlers.insert(name, Arc::from(handler));
258 Ok(())
259 }
260
261 pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
263 self.handlers.get(name)
264 }
265
266 pub fn handler_names(&self) -> Vec<&str> {
268 self.handlers.keys().map(|s| s.as_str()).collect()
269 }
270
271 pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
273 self.handlers.get(name).map(|h| h.describe())
274 }
275
276 pub fn subscribe(
301 &mut self,
302 subscriber: impl EventSubscriber + 'static,
303 event_types: &[&'static str],
304 ) {
305 self.event_publisher.subscribe(subscriber, event_types);
306 }
307
308 pub fn event_publisher(&self) -> &EventPublisher {
313 &self.event_publisher
314 }
315
316 #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
346 pub async fn run_handler(
347 &self,
348 handler_name: &str,
349 trigger: TriggerKind,
350 payload: Value,
351 ) -> Result<Run, EngineError> {
352 let handler = self
353 .handlers
354 .get(handler_name)
355 .ok_or_else(|| {
356 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
357 })?
358 .clone();
359
360 let handler_version = handler.version().map(str::to_string);
361 let run = self
362 .store
363 .create_run(NewRun {
364 workflow_name: handler_name.to_string(),
365 trigger,
366 payload,
367 max_retries: 0,
368 handler_version,
369 labels: handler.default_labels(),
370 scheduled_at: None,
371 })
372 .await?;
373
374 let run_id = run.id;
375 info!(run_id = %run_id, handler_version = run.handler_version.as_deref().unwrap_or(""), "run created");
376
377 self.store
378 .update_run_status(run_id, RunStatus::Running)
379 .await?;
380
381 #[cfg(feature = "prometheus")]
382 gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
383
384 let run_start = Instant::now();
385 let mut ctx = self.build_context(run_id);
386
387 let result = handler.execute(&mut ctx).await;
388 self.finalize_run(run_id, handler_name, result, &ctx, run_start)
389 .await
390 }
391
392 #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
401 pub async fn enqueue_handler(
402 &self,
403 handler_name: &str,
404 trigger: TriggerKind,
405 payload: Value,
406 max_retries: u32,
407 ) -> Result<Run, EngineError> {
408 self.enqueue_handler_with_options(
409 handler_name,
410 trigger,
411 payload,
412 max_retries,
413 HashMap::new(),
414 None,
415 )
416 .await
417 }
418
419 #[tracing::instrument(name = "engine.enqueue_handler_with_options", skip_all, fields(workflow = %handler_name))]
425 pub async fn enqueue_handler_with_options(
426 &self,
427 handler_name: &str,
428 trigger: TriggerKind,
429 payload: Value,
430 max_retries: u32,
431 labels: HashMap<String, String>,
432 scheduled_at: Option<DateTime<Utc>>,
433 ) -> Result<Run, EngineError> {
434 let handler = self.handlers.get(handler_name).ok_or_else(|| {
435 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
436 })?;
437
438 let handler_version = handler.version().map(str::to_string);
439 let mut merged_labels = handler.default_labels();
440 merged_labels.extend(labels);
441
442 let run = self
443 .store
444 .create_run(NewRun {
445 workflow_name: handler_name.to_string(),
446 trigger,
447 payload,
448 max_retries,
449 handler_version,
450 labels: merged_labels,
451 scheduled_at,
452 })
453 .await?;
454
455 info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
456 Ok(run)
457 }
458
459 #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
468 pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
469 let run = self
470 .store
471 .get_run(run_id)
472 .await?
473 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
474
475 let handler = self
476 .handlers
477 .get(&run.workflow_name)
478 .ok_or_else(|| {
479 EngineError::InvalidWorkflow(format!(
480 "no handler registered: {}",
481 run.workflow_name
482 ))
483 })?
484 .clone();
485
486 #[cfg(feature = "prometheus")]
487 gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
488
489 let run_start = Instant::now();
490 let mut ctx = self.build_context(run_id);
491
492 let result = handler.execute(&mut ctx).await;
493 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
494 .await
495 }
496
497 #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
505 pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
506 self.execute_handler_run(run_id).await
507 }
508
509 #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
523 pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
524 let run = self
525 .store
526 .get_run(run_id)
527 .await?
528 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
529
530 let handler = self
531 .handlers
532 .get(&run.workflow_name)
533 .ok_or_else(|| {
534 EngineError::InvalidWorkflow(format!(
535 "no handler registered: {}",
536 run.workflow_name
537 ))
538 })?
539 .clone();
540
541 info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
542
543 let run_start = Instant::now();
544 let mut ctx = self.build_context(run_id);
545 ctx.load_replay_steps().await?;
546
547 let result = handler.execute(&mut ctx).await;
548 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
549 .await
550 }
551
552 pub async fn fail_orphaned_steps(
566 &self,
567 run_id: Uuid,
568 error_message: &str,
569 ) -> Result<(), EngineError> {
570 let steps = self.store.list_steps(run_id).await?;
571 let now = Utc::now();
572
573 for step in steps {
574 if step.status.state.is_terminal() {
575 continue;
576 }
577
578 let (target_status, error) = match step.status.state {
579 StepStatus::Running | StepStatus::AwaitingApproval => {
580 let err = if step.error.is_some() {
581 None
582 } else {
583 Some(error_message.to_string())
584 };
585 (StepStatus::Failed, err)
586 }
587 StepStatus::Pending => (StepStatus::Skipped, None),
588 _ => continue,
589 };
590
591 if let Err(e) = self
592 .store
593 .update_step(
594 step.id,
595 StepUpdate {
596 status: Some(target_status),
597 error,
598 completed_at: Some(now),
599 ..StepUpdate::default()
600 },
601 )
602 .await
603 {
604 warn!(
605 run_id = %run_id,
606 step_id = %step.id,
607 step_name = %step.name,
608 error = %e,
609 "failed to cleanup orphaned step"
610 );
611 } else {
612 info!(
613 run_id = %run_id,
614 step_id = %step.id,
615 step_name = %step.name,
616 from = %step.status.state,
617 to = %target_status,
618 "cleaned up orphaned step"
619 );
620 }
621 }
622
623 Ok(())
624 }
625
626 async fn finalize_run(
632 &self,
633 run_id: Uuid,
634 workflow_name: &str,
635 result: Result<(), EngineError>,
636 ctx: &WorkflowContext,
637 run_start: Instant,
638 ) -> Result<Run, EngineError> {
639 let total_duration = run_start.elapsed().as_millis() as u64;
640 let completed_at = Utc::now();
641
642 let final_status;
643 let final_run;
644
645 match result {
646 Ok(()) => {
647 final_status = RunStatus::Completed;
648 final_run = self
649 .store
650 .update_run_returning(
651 run_id,
652 RunUpdate {
653 status: Some(RunStatus::Completed),
654 cost_usd: Some(ctx.total_cost_usd()),
655 duration_ms: Some(total_duration),
656 completed_at: Some(completed_at),
657 ..RunUpdate::default()
658 },
659 )
660 .await?;
661
662 info!(
663 run_id = %run_id,
664 cost_usd = %ctx.total_cost_usd(),
665 duration_ms = total_duration,
666 "run completed"
667 );
668 }
669 Err(EngineError::ApprovalRequired {
670 run_id: approval_run_id,
671 step_id,
672 ref message,
673 }) => {
674 final_status = RunStatus::AwaitingApproval;
675 final_run = self
676 .store
677 .update_run_returning(
678 run_id,
679 RunUpdate {
680 status: Some(RunStatus::AwaitingApproval),
681 cost_usd: Some(ctx.total_cost_usd()),
682 duration_ms: Some(total_duration),
683 ..RunUpdate::default()
684 },
685 )
686 .await?;
687
688 info!(
689 run_id = %approval_run_id,
690 step_id = %step_id,
691 message = %message,
692 "run awaiting approval"
693 );
694 }
695 Err(err) => {
696 final_status = RunStatus::Failed;
697 if let Err(store_err) = self
698 .store
699 .update_run(
700 run_id,
701 RunUpdate {
702 status: Some(RunStatus::Failed),
703 error: Some(err.to_string()),
704 cost_usd: Some(ctx.total_cost_usd()),
705 duration_ms: Some(total_duration),
706 completed_at: Some(completed_at),
707 ..RunUpdate::default()
708 },
709 )
710 .await
711 {
712 error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
713 }
714
715 error!(run_id = %run_id, error = %err, "run failed");
716
717 self.publish_run_status_changed(
718 workflow_name,
719 run_id,
720 final_status,
721 Some(err.to_string()),
722 ctx,
723 total_duration,
724 );
725
726 #[cfg(feature = "prometheus")]
727 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
728
729 return Err(err);
730 }
731 }
732
733 self.publish_run_status_changed(
734 workflow_name,
735 run_id,
736 final_status,
737 None,
738 ctx,
739 total_duration,
740 );
741
742 #[cfg(feature = "prometheus")]
743 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
744
745 Ok(final_run)
746 }
747
748 #[cfg(feature = "prometheus")]
750 fn emit_run_metrics(
751 &self,
752 workflow_name: &str,
753 status: RunStatus,
754 duration_ms: u64,
755 ctx: &WorkflowContext,
756 ) {
757 let status_str = status.to_string();
758 let wf = workflow_name.to_string();
759
760 counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
761 histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
762 .record(duration_ms as f64 / 1000.0);
763 histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
764 ctx.total_cost_usd()
765 .to_string()
766 .parse::<f64>()
767 .unwrap_or(0.0),
768 );
769 gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
770 }
771
772 fn publish_run_status_changed(
777 &self,
778 workflow_name: &str,
779 run_id: Uuid,
780 to: RunStatus,
781 error: Option<String>,
782 ctx: &WorkflowContext,
783 duration_ms: u64,
784 ) {
785 let now = Utc::now();
786 let cost_usd = ctx.total_cost_usd();
787 let wf = workflow_name.to_string();
788
789 self.event_publisher.publish(Event::RunStatusChanged {
790 run_id,
791 workflow_name: wf.clone(),
792 from: RunStatus::Running,
793 to,
794 error: error.clone(),
795 cost_usd,
796 duration_ms,
797 at: now,
798 });
799
800 if to == RunStatus::Failed {
801 self.event_publisher.publish(Event::RunFailed {
802 run_id,
803 workflow_name: wf,
804 error,
805 cost_usd,
806 duration_ms,
807 at: now,
808 });
809 }
810 }
811}
812
813impl fmt::Debug for Engine {
814 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
815 f.debug_struct("Engine")
816 .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
817 .finish_non_exhaustive()
818 }
819}
820
821#[cfg(test)]
822mod tests {
823 use super::*;
824 use crate::config::ShellConfig;
825 use crate::handler::{HandlerFuture, WorkflowHandler};
826 use ironflow_core::providers::claude::ClaudeCodeProvider;
827 use ironflow_core::providers::record_replay::RecordReplayProvider;
828 use ironflow_store::memory::InMemoryStore;
829 use ironflow_store::models::StepStatus;
830 use serde_json::json;
831
832 struct EchoWorkflow;
834
835 impl WorkflowHandler for EchoWorkflow {
836 fn name(&self) -> &str {
837 "echo-workflow"
838 }
839
840 fn describe(&self) -> WorkflowInfo {
841 WorkflowInfo {
842 description: "A simple workflow that echoes hello".to_string(),
843 source_code: None,
844 sub_workflows: Vec::new(),
845 category: None,
846 version: self.version().map(str::to_string),
847 input_schema: None,
848 default_labels: HashMap::new(),
849 }
850 }
851
852 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
853 Box::pin(async move {
854 ctx.shell("greet", ShellConfig::new("echo hello")).await?;
855 Ok(())
856 })
857 }
858 }
859
860 struct FailingWorkflow;
862
863 impl WorkflowHandler for FailingWorkflow {
864 fn name(&self) -> &str {
865 "failing-workflow"
866 }
867
868 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
869 Box::pin(async move {
870 ctx.shell("fail", ShellConfig::new("exit 1")).await?;
871 Ok(())
872 })
873 }
874 }
875
876 fn create_test_engine() -> Engine {
877 let store = Arc::new(InMemoryStore::new());
878 let inner = ClaudeCodeProvider::new();
879 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
880 inner,
881 "/tmp/ironflow-fixtures",
882 ));
883 Engine::new(store, provider)
884 }
885
886 #[test]
887 fn engine_new_creates_instance() {
888 let engine = create_test_engine();
889 assert_eq!(engine.handler_names().len(), 0);
890 }
891
892 #[test]
893 fn engine_register_handler() {
894 let mut engine = create_test_engine();
895 let result = engine.register(EchoWorkflow);
896 assert!(result.is_ok());
897 assert_eq!(engine.handler_names().len(), 1);
898 assert!(engine.handler_names().contains(&"echo-workflow"));
899 }
900
901 #[test]
902 fn engine_register_duplicate_returns_error() {
903 let mut engine = create_test_engine();
904 engine.register(EchoWorkflow).unwrap();
905 let result = engine.register(EchoWorkflow);
906 assert!(result.is_err());
907 }
908
909 #[test]
910 fn engine_get_handler_found() {
911 let mut engine = create_test_engine();
912 engine.register(EchoWorkflow).unwrap();
913 let handler = engine.get_handler("echo-workflow");
914 assert!(handler.is_some());
915 }
916
917 #[test]
918 fn engine_get_handler_not_found() {
919 let engine = create_test_engine();
920 let handler = engine.get_handler("nonexistent");
921 assert!(handler.is_none());
922 }
923
924 #[test]
925 fn engine_handler_names_lists_all() {
926 let mut engine = create_test_engine();
927 engine.register(EchoWorkflow).unwrap();
928 engine.register(FailingWorkflow).unwrap();
929 let names = engine.handler_names();
930 assert_eq!(names.len(), 2);
931 assert!(names.contains(&"echo-workflow"));
932 assert!(names.contains(&"failing-workflow"));
933 }
934
935 #[test]
936 fn engine_handler_info_returns_description() {
937 let mut engine = create_test_engine();
938 engine.register(EchoWorkflow).unwrap();
939 let info = engine.handler_info("echo-workflow");
940 assert!(info.is_some());
941 let info = info.unwrap();
942 assert_eq!(info.description, "A simple workflow that echoes hello");
943 }
944
945 struct CategorizedWorkflow;
946
947 impl WorkflowHandler for CategorizedWorkflow {
948 fn name(&self) -> &str {
949 "categorized"
950 }
951 fn category(&self) -> Option<&str> {
952 Some("data/etl")
953 }
954 fn execute<'a>(
955 &'a self,
956 _ctx: &'a mut WorkflowContext,
957 ) -> crate::handler::HandlerFuture<'a> {
958 Box::pin(async move { Ok(()) })
959 }
960 }
961
962 #[test]
963 fn engine_default_describe_propagates_category() {
964 let mut engine = create_test_engine();
965 engine.register(CategorizedWorkflow).unwrap();
966 let info = engine.handler_info("categorized").unwrap();
967 assert_eq!(info.category.as_deref(), Some("data/etl"));
968 }
969
970 #[test]
971 fn engine_default_describe_without_category() {
972 let mut engine = create_test_engine();
973 engine.register(EchoWorkflow).unwrap();
974 let info = engine.handler_info("echo-workflow").unwrap();
975 assert!(info.category.is_none());
976 }
977
978 struct BadCategoryWorkflow(&'static str);
979
980 impl WorkflowHandler for BadCategoryWorkflow {
981 fn name(&self) -> &str {
982 "bad-category"
983 }
984 fn category(&self) -> Option<&str> {
985 Some(self.0)
986 }
987 fn execute<'a>(
988 &'a self,
989 _ctx: &'a mut WorkflowContext,
990 ) -> crate::handler::HandlerFuture<'a> {
991 Box::pin(async move { Ok(()) })
992 }
993 }
994
995 #[test]
996 fn engine_register_rejects_empty_category() {
997 let mut engine = create_test_engine();
998 let err = engine.register(BadCategoryWorkflow("")).unwrap_err();
999 match err {
1000 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty category")),
1001 other => panic!("expected InvalidWorkflow, got {other:?}"),
1002 }
1003 }
1004
1005 #[test]
1006 fn engine_register_rejects_leading_slash_category() {
1007 let mut engine = create_test_engine();
1008 let err = engine
1009 .register(BadCategoryWorkflow("/data/etl"))
1010 .unwrap_err();
1011 match err {
1012 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("leading '/'")),
1013 other => panic!("expected InvalidWorkflow, got {other:?}"),
1014 }
1015 }
1016
1017 #[test]
1018 fn engine_register_rejects_trailing_slash_category() {
1019 let mut engine = create_test_engine();
1020 let err = engine
1021 .register(BadCategoryWorkflow("data/etl/"))
1022 .unwrap_err();
1023 match err {
1024 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("trailing '/'")),
1025 other => panic!("expected InvalidWorkflow, got {other:?}"),
1026 }
1027 }
1028
1029 #[test]
1030 fn engine_register_rejects_double_slash_category() {
1031 let mut engine = create_test_engine();
1032 let err = engine
1033 .register(BadCategoryWorkflow("data//etl"))
1034 .unwrap_err();
1035 match err {
1036 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty segment")),
1037 other => panic!("expected InvalidWorkflow, got {other:?}"),
1038 }
1039 }
1040
1041 #[test]
1042 fn engine_register_rejects_whitespace_only_segment_category() {
1043 let mut engine = create_test_engine();
1044 let err = engine
1045 .register(BadCategoryWorkflow("data/ /etl"))
1046 .unwrap_err();
1047 match err {
1048 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("whitespace-only segment")),
1049 other => panic!("expected InvalidWorkflow, got {other:?}"),
1050 }
1051 }
1052
1053 #[test]
1054 fn engine_register_accepts_valid_nested_category() {
1055 let mut engine = create_test_engine();
1056 assert!(engine.register(CategorizedWorkflow).is_ok());
1057 }
1058
1059 #[tokio::test]
1060 async fn engine_unknown_workflow_returns_error() {
1061 let engine = create_test_engine();
1062 let result = engine
1063 .run_handler("unknown", TriggerKind::Manual, json!({}))
1064 .await;
1065 assert!(result.is_err());
1066 match result {
1067 Err(EngineError::InvalidWorkflow(msg)) => {
1068 assert!(msg.contains("no handler registered"));
1069 }
1070 _ => panic!("expected InvalidWorkflow error"),
1071 }
1072 }
1073
1074 #[tokio::test]
1075 async fn engine_enqueue_handler_creates_pending_run() {
1076 let mut engine = create_test_engine();
1077 engine.register(EchoWorkflow).unwrap();
1078
1079 let run = engine
1080 .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
1081 .await
1082 .unwrap();
1083 assert_eq!(run.status.state, RunStatus::Pending);
1084 assert_eq!(run.workflow_name, "echo-workflow");
1085 }
1086
1087 #[tokio::test]
1088 async fn engine_register_boxed() {
1089 let mut engine = create_test_engine();
1090 let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
1091 let result = engine.register_boxed(handler);
1092 assert!(result.is_ok());
1093 assert_eq!(engine.handler_names().len(), 1);
1094 }
1095
1096 #[tokio::test]
1097 async fn engine_store_and_provider_accessors() {
1098 let store = Arc::new(InMemoryStore::new());
1099 let inner = ClaudeCodeProvider::new();
1100 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
1101 inner,
1102 "/tmp/ironflow-fixtures",
1103 ));
1104 let engine = Engine::new(store.clone(), provider.clone());
1105
1106 let _ = engine.store();
1108 let _ = engine.provider();
1109 }
1110
1111 use crate::operation::Operation;
1116 use ironflow_store::models::StepKind;
1117 use std::future::Future;
1118 use std::pin::Pin;
1119
1120 struct FakeGitlabOp {
1121 project_id: u64,
1122 title: String,
1123 }
1124
1125 impl Operation for FakeGitlabOp {
1126 fn kind(&self) -> &str {
1127 "gitlab"
1128 }
1129
1130 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1131 Box::pin(async move {
1132 Ok(json!({
1133 "issue_id": 42,
1134 "project_id": self.project_id,
1135 "title": self.title,
1136 }))
1137 })
1138 }
1139
1140 fn input(&self) -> Option<Value> {
1141 Some(json!({
1142 "project_id": self.project_id,
1143 "title": self.title,
1144 }))
1145 }
1146 }
1147
1148 struct FailingOp;
1149
1150 impl Operation for FailingOp {
1151 fn kind(&self) -> &str {
1152 "broken-service"
1153 }
1154
1155 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1156 Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
1157 }
1158 }
1159
1160 struct OperationWorkflow;
1161
1162 impl WorkflowHandler for OperationWorkflow {
1163 fn name(&self) -> &str {
1164 "operation-workflow"
1165 }
1166
1167 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1168 Box::pin(async move {
1169 let op = FakeGitlabOp {
1170 project_id: 123,
1171 title: "Bug report".to_string(),
1172 };
1173 ctx.operation("create-issue", &op).await?;
1174 Ok(())
1175 })
1176 }
1177 }
1178
1179 struct FailingOperationWorkflow;
1180
1181 impl WorkflowHandler for FailingOperationWorkflow {
1182 fn name(&self) -> &str {
1183 "failing-operation-workflow"
1184 }
1185
1186 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1187 Box::pin(async move {
1188 ctx.operation("broken-call", &FailingOp).await?;
1189 Ok(())
1190 })
1191 }
1192 }
1193
1194 struct MixedWorkflow;
1195
1196 impl WorkflowHandler for MixedWorkflow {
1197 fn name(&self) -> &str {
1198 "mixed-workflow"
1199 }
1200
1201 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1202 Box::pin(async move {
1203 ctx.shell("build", ShellConfig::new("echo built")).await?;
1204 let op = FakeGitlabOp {
1205 project_id: 456,
1206 title: "Deploy done".to_string(),
1207 };
1208 let result = ctx.operation("notify-gitlab", &op).await?;
1209 assert_eq!(result.output["issue_id"], 42);
1210 Ok(())
1211 })
1212 }
1213 }
1214
1215 #[tokio::test]
1216 async fn operation_step_happy_path() {
1217 let mut engine = create_test_engine();
1218 engine.register(OperationWorkflow).unwrap();
1219
1220 let run = engine
1221 .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
1222 .await
1223 .unwrap();
1224
1225 assert_eq!(run.status.state, RunStatus::Completed);
1226
1227 let steps = engine.store().list_steps(run.id).await.unwrap();
1228
1229 assert_eq!(steps.len(), 1);
1230 assert_eq!(steps[0].name, "create-issue");
1231 assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
1232 assert_eq!(
1233 steps[0].status.state,
1234 ironflow_store::models::StepStatus::Completed
1235 );
1236
1237 let output = steps[0].output.as_ref().unwrap();
1238 assert_eq!(output["issue_id"], 42);
1239 assert_eq!(output["project_id"], 123);
1240
1241 let input = steps[0].input.as_ref().unwrap();
1242 assert_eq!(input["project_id"], 123);
1243 assert_eq!(input["title"], "Bug report");
1244 }
1245
1246 #[tokio::test]
1247 async fn operation_step_failure_marks_run_failed() {
1248 let mut engine = create_test_engine();
1249 engine.register(FailingOperationWorkflow).unwrap();
1250
1251 let result = engine
1252 .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
1253 .await;
1254
1255 assert!(result.is_err());
1256 }
1257
1258 #[tokio::test]
1259 async fn operation_mixed_with_shell_steps() {
1260 let mut engine = create_test_engine();
1261 engine.register(MixedWorkflow).unwrap();
1262
1263 let run = engine
1264 .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
1265 .await
1266 .unwrap();
1267
1268 assert_eq!(run.status.state, RunStatus::Completed);
1269
1270 let steps = engine.store().list_steps(run.id).await.unwrap();
1271
1272 assert_eq!(steps.len(), 2);
1273 assert_eq!(steps[0].kind, StepKind::Shell);
1274 assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
1275 assert_eq!(steps[0].position, 0);
1276 assert_eq!(steps[1].position, 1);
1277 }
1278
1279 use crate::config::ApprovalConfig;
1284
1285 struct SingleApprovalWorkflow;
1286
1287 impl WorkflowHandler for SingleApprovalWorkflow {
1288 fn name(&self) -> &str {
1289 "single-approval"
1290 }
1291
1292 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1293 Box::pin(async move {
1294 ctx.shell("build", ShellConfig::new("echo built")).await?;
1295 ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
1296 ctx.shell("deploy", ShellConfig::new("echo deployed"))
1297 .await?;
1298 Ok(())
1299 })
1300 }
1301 }
1302
1303 struct DoubleApprovalWorkflow;
1304
1305 impl WorkflowHandler for DoubleApprovalWorkflow {
1306 fn name(&self) -> &str {
1307 "double-approval"
1308 }
1309
1310 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1311 Box::pin(async move {
1312 ctx.shell("build", ShellConfig::new("echo built")).await?;
1313 ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1314 .await?;
1315 ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1316 .await?;
1317 ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1318 .await?;
1319 ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1320 .await?;
1321 Ok(())
1322 })
1323 }
1324 }
1325
1326 #[tokio::test]
1327 async fn approval_pauses_run() {
1328 let mut engine = create_test_engine();
1329 engine.register(SingleApprovalWorkflow).unwrap();
1330
1331 let run = engine
1332 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1333 .await
1334 .unwrap();
1335
1336 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1337
1338 let steps = engine.store().list_steps(run.id).await.unwrap();
1339 assert_eq!(steps.len(), 2); assert_eq!(steps[0].kind, StepKind::Shell);
1341 assert_eq!(steps[0].status.state, StepStatus::Completed);
1342 assert_eq!(steps[1].kind, StepKind::Approval);
1343 assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1344 }
1345
1346 #[tokio::test]
1347 async fn approval_resume_completes_run() {
1348 let mut engine = create_test_engine();
1349 engine.register(SingleApprovalWorkflow).unwrap();
1350
1351 let run = engine
1353 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1354 .await
1355 .unwrap();
1356 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1357
1358 engine
1360 .store()
1361 .update_run_status(run.id, RunStatus::Running)
1362 .await
1363 .unwrap();
1364
1365 let resumed = engine.resume_run(run.id).await.unwrap();
1367 assert_eq!(resumed.status.state, RunStatus::Completed);
1368
1369 let steps = engine.store().list_steps(run.id).await.unwrap();
1370 assert_eq!(steps.len(), 3); assert_eq!(steps[0].name, "build");
1372 assert_eq!(steps[0].status.state, StepStatus::Completed);
1373 assert_eq!(steps[1].name, "gate");
1374 assert_eq!(steps[1].kind, StepKind::Approval);
1375 assert_eq!(steps[1].status.state, StepStatus::Completed);
1376 assert_eq!(steps[2].name, "deploy");
1377 assert_eq!(steps[2].status.state, StepStatus::Completed);
1378 }
1379
1380 #[tokio::test]
1381 async fn double_approval_two_resumes() {
1382 let mut engine = create_test_engine();
1383 engine.register(DoubleApprovalWorkflow).unwrap();
1384
1385 let run = engine
1387 .run_handler("double-approval", TriggerKind::Manual, json!({}))
1388 .await
1389 .unwrap();
1390 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1391
1392 let steps = engine.store().list_steps(run.id).await.unwrap();
1393 assert_eq!(steps.len(), 2); engine
1397 .store()
1398 .update_run_status(run.id, RunStatus::Running)
1399 .await
1400 .unwrap();
1401
1402 let resumed = engine.resume_run(run.id).await.unwrap();
1403 assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1404
1405 let steps = engine.store().list_steps(run.id).await.unwrap();
1406 assert_eq!(steps.len(), 4); engine
1410 .store()
1411 .update_run_status(run.id, RunStatus::Running)
1412 .await
1413 .unwrap();
1414
1415 let final_run = engine.resume_run(run.id).await.unwrap();
1416 assert_eq!(final_run.status.state, RunStatus::Completed);
1417
1418 let steps = engine.store().list_steps(run.id).await.unwrap();
1419 assert_eq!(steps.len(), 5);
1420 assert_eq!(steps[0].name, "build");
1421 assert_eq!(steps[1].name, "staging-gate");
1422 assert_eq!(steps[2].name, "deploy-staging");
1423 assert_eq!(steps[3].name, "prod-gate");
1424 assert_eq!(steps[4].name, "deploy-prod");
1425
1426 for step in &steps {
1427 assert_eq!(step.status.state, StepStatus::Completed);
1428 }
1429 }
1430
1431 use ironflow_store::models::{NewStep, StepUpdate};
1436
1437 async fn create_step_with_status(
1438 store: &Arc<dyn Store>,
1439 run_id: Uuid,
1440 name: &str,
1441 position: u32,
1442 status: StepStatus,
1443 ) -> ironflow_store::models::Step {
1444 let step = store
1445 .create_step(NewStep {
1446 run_id,
1447 name: name.to_string(),
1448 kind: StepKind::Shell,
1449 position,
1450 input: None,
1451 })
1452 .await
1453 .unwrap();
1454
1455 match status {
1456 StepStatus::Pending => {}
1457 StepStatus::Running => {
1458 store
1459 .update_step(
1460 step.id,
1461 StepUpdate {
1462 status: Some(StepStatus::Running),
1463 ..StepUpdate::default()
1464 },
1465 )
1466 .await
1467 .unwrap();
1468 }
1469 StepStatus::Completed => {
1470 store
1471 .update_step(
1472 step.id,
1473 StepUpdate {
1474 status: Some(StepStatus::Running),
1475 ..StepUpdate::default()
1476 },
1477 )
1478 .await
1479 .unwrap();
1480 store
1481 .update_step(
1482 step.id,
1483 StepUpdate {
1484 status: Some(StepStatus::Completed),
1485 ..StepUpdate::default()
1486 },
1487 )
1488 .await
1489 .unwrap();
1490 }
1491 StepStatus::AwaitingApproval => {
1492 store
1493 .update_step(
1494 step.id,
1495 StepUpdate {
1496 status: Some(StepStatus::Running),
1497 ..StepUpdate::default()
1498 },
1499 )
1500 .await
1501 .unwrap();
1502 store
1503 .update_step(
1504 step.id,
1505 StepUpdate {
1506 status: Some(StepStatus::AwaitingApproval),
1507 ..StepUpdate::default()
1508 },
1509 )
1510 .await
1511 .unwrap();
1512 }
1513 _ => panic!("unsupported status for test helper: {status}"),
1514 }
1515
1516 store.get_step(step.id).await.unwrap().unwrap()
1517 }
1518
1519 #[tokio::test]
1520 async fn fail_orphaned_steps_marks_running_as_failed() {
1521 let engine = create_test_engine();
1522 let run = engine
1523 .store()
1524 .create_run(NewRun {
1525 workflow_name: "test".to_string(),
1526 trigger: TriggerKind::Manual,
1527 payload: json!({}),
1528 max_retries: 0,
1529 handler_version: None,
1530 labels: HashMap::new(),
1531 scheduled_at: None,
1532 })
1533 .await
1534 .unwrap();
1535
1536 let step = create_step_with_status(
1537 engine.store(),
1538 run.id,
1539 "running-step",
1540 0,
1541 StepStatus::Running,
1542 )
1543 .await;
1544
1545 engine
1546 .fail_orphaned_steps(run.id, "parent run timed out")
1547 .await
1548 .unwrap();
1549
1550 let updated = engine.store().get_step(step.id).await.unwrap().unwrap();
1551 assert_eq!(updated.status.state, StepStatus::Failed);
1552 assert_eq!(updated.error.as_deref(), Some("parent run timed out"));
1553 assert!(updated.completed_at.is_some());
1554 }
1555
1556 #[tokio::test]
1557 async fn fail_orphaned_steps_marks_pending_as_skipped() {
1558 let engine = create_test_engine();
1559 let run = engine
1560 .store()
1561 .create_run(NewRun {
1562 workflow_name: "test".to_string(),
1563 trigger: TriggerKind::Manual,
1564 payload: json!({}),
1565 max_retries: 0,
1566 handler_version: None,
1567 labels: HashMap::new(),
1568 scheduled_at: None,
1569 })
1570 .await
1571 .unwrap();
1572
1573 let step = create_step_with_status(
1574 engine.store(),
1575 run.id,
1576 "pending-step",
1577 0,
1578 StepStatus::Pending,
1579 )
1580 .await;
1581
1582 engine
1583 .fail_orphaned_steps(run.id, "parent run timed out")
1584 .await
1585 .unwrap();
1586
1587 let updated = engine.store().get_step(step.id).await.unwrap().unwrap();
1588 assert_eq!(updated.status.state, StepStatus::Skipped);
1589 assert!(updated.error.is_none());
1590 assert!(updated.completed_at.is_some());
1591 }
1592
1593 #[tokio::test]
1594 async fn fail_orphaned_steps_marks_awaiting_approval_as_failed() {
1595 let engine = create_test_engine();
1596 let run = engine
1597 .store()
1598 .create_run(NewRun {
1599 workflow_name: "test".to_string(),
1600 trigger: TriggerKind::Manual,
1601 payload: json!({}),
1602 max_retries: 0,
1603 handler_version: None,
1604 labels: HashMap::new(),
1605 scheduled_at: None,
1606 })
1607 .await
1608 .unwrap();
1609
1610 let step = create_step_with_status(
1611 engine.store(),
1612 run.id,
1613 "approval-step",
1614 0,
1615 StepStatus::AwaitingApproval,
1616 )
1617 .await;
1618
1619 engine
1620 .fail_orphaned_steps(run.id, "parent run timed out")
1621 .await
1622 .unwrap();
1623
1624 let updated = engine.store().get_step(step.id).await.unwrap().unwrap();
1625 assert_eq!(updated.status.state, StepStatus::Failed);
1626 assert_eq!(updated.error.as_deref(), Some("parent run timed out"));
1627 assert!(updated.completed_at.is_some());
1628 }
1629
1630 #[tokio::test]
1631 async fn fail_orphaned_steps_skips_terminal_steps() {
1632 let engine = create_test_engine();
1633 let run = engine
1634 .store()
1635 .create_run(NewRun {
1636 workflow_name: "test".to_string(),
1637 trigger: TriggerKind::Manual,
1638 payload: json!({}),
1639 max_retries: 0,
1640 handler_version: None,
1641 labels: HashMap::new(),
1642 scheduled_at: None,
1643 })
1644 .await
1645 .unwrap();
1646
1647 let completed_step =
1648 create_step_with_status(engine.store(), run.id, "done", 0, StepStatus::Completed).await;
1649 let running_step =
1650 create_step_with_status(engine.store(), run.id, "in-flight", 1, StepStatus::Running)
1651 .await;
1652
1653 engine
1654 .fail_orphaned_steps(run.id, "parent run timed out")
1655 .await
1656 .unwrap();
1657
1658 let completed = engine
1659 .store()
1660 .get_step(completed_step.id)
1661 .await
1662 .unwrap()
1663 .unwrap();
1664 assert_eq!(completed.status.state, StepStatus::Completed);
1665
1666 let failed = engine
1667 .store()
1668 .get_step(running_step.id)
1669 .await
1670 .unwrap()
1671 .unwrap();
1672 assert_eq!(failed.status.state, StepStatus::Failed);
1673 }
1674
1675 #[tokio::test]
1676 async fn fail_orphaned_steps_mixed_states() {
1677 let engine = create_test_engine();
1678 let run = engine
1679 .store()
1680 .create_run(NewRun {
1681 workflow_name: "test".to_string(),
1682 trigger: TriggerKind::Manual,
1683 payload: json!({}),
1684 max_retries: 0,
1685 handler_version: None,
1686 labels: HashMap::new(),
1687 scheduled_at: None,
1688 })
1689 .await
1690 .unwrap();
1691
1692 let s_completed =
1693 create_step_with_status(engine.store(), run.id, "step-1", 0, StepStatus::Completed)
1694 .await;
1695 let s_running =
1696 create_step_with_status(engine.store(), run.id, "step-2", 1, StepStatus::Running).await;
1697 let s_pending =
1698 create_step_with_status(engine.store(), run.id, "step-3", 2, StepStatus::Pending).await;
1699
1700 engine.fail_orphaned_steps(run.id, "timeout").await.unwrap();
1701
1702 let r_completed = engine
1703 .store()
1704 .get_step(s_completed.id)
1705 .await
1706 .unwrap()
1707 .unwrap();
1708 assert_eq!(r_completed.status.state, StepStatus::Completed);
1709
1710 let r_running = engine
1711 .store()
1712 .get_step(s_running.id)
1713 .await
1714 .unwrap()
1715 .unwrap();
1716 assert_eq!(r_running.status.state, StepStatus::Failed);
1717 assert_eq!(r_running.error.as_deref(), Some("timeout"));
1718
1719 let r_pending = engine
1720 .store()
1721 .get_step(s_pending.id)
1722 .await
1723 .unwrap()
1724 .unwrap();
1725 assert_eq!(r_pending.status.state, StepStatus::Skipped);
1726 assert!(r_pending.error.is_none());
1727 }
1728
1729 #[tokio::test]
1730 async fn fail_orphaned_steps_no_steps_is_noop() {
1731 let engine = create_test_engine();
1732 let run = engine
1733 .store()
1734 .create_run(NewRun {
1735 workflow_name: "test".to_string(),
1736 trigger: TriggerKind::Manual,
1737 payload: json!({}),
1738 max_retries: 0,
1739 handler_version: None,
1740 labels: HashMap::new(),
1741 scheduled_at: None,
1742 })
1743 .await
1744 .unwrap();
1745
1746 let result = engine.fail_orphaned_steps(run.id, "timeout").await;
1747 assert!(result.is_ok());
1748 }
1749
1750 #[tokio::test]
1751 async fn fail_orphaned_steps_preserves_existing_error() {
1752 let engine = create_test_engine();
1753 let run = engine
1754 .store()
1755 .create_run(NewRun {
1756 workflow_name: "test".to_string(),
1757 trigger: TriggerKind::Manual,
1758 payload: json!({}),
1759 max_retries: 0,
1760 handler_version: None,
1761 labels: HashMap::new(),
1762 scheduled_at: None,
1763 })
1764 .await
1765 .unwrap();
1766
1767 let step_with_error = create_step_with_status(
1768 engine.store(),
1769 run.id,
1770 "already-errored",
1771 0,
1772 StepStatus::Running,
1773 )
1774 .await;
1775
1776 engine
1777 .store()
1778 .update_step(
1779 step_with_error.id,
1780 StepUpdate {
1781 error: Some("real error from provider".to_string()),
1782 ..StepUpdate::default()
1783 },
1784 )
1785 .await
1786 .unwrap();
1787
1788 let step_no_error = create_step_with_status(
1789 engine.store(),
1790 run.id,
1791 "no-error-yet",
1792 1,
1793 StepStatus::Running,
1794 )
1795 .await;
1796
1797 engine
1798 .fail_orphaned_steps(run.id, "parent run failed")
1799 .await
1800 .unwrap();
1801
1802 let updated_with = engine
1803 .store()
1804 .get_step(step_with_error.id)
1805 .await
1806 .unwrap()
1807 .unwrap();
1808 assert_eq!(updated_with.status.state, StepStatus::Failed);
1809 assert_eq!(
1810 updated_with.error.as_deref(),
1811 Some("real error from provider"),
1812 );
1813
1814 let updated_without = engine
1815 .store()
1816 .get_step(step_no_error.id)
1817 .await
1818 .unwrap()
1819 .unwrap();
1820 assert_eq!(updated_without.status.state, StepStatus::Failed);
1821 assert_eq!(updated_without.error.as_deref(), Some("parent run failed"),);
1822 }
1823}