1use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::{DateTime, Utc};
15use serde_json::Value;
16use tracing::{error, info, warn};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{
24 NewRun, Run, RunStatus, RunUpdate, StepStatus, StepUpdate, TriggerKind,
25};
26use ironflow_store::store::Store;
27#[cfg(feature = "prometheus")]
28use metrics::{counter, gauge, histogram};
29
30use crate::context::WorkflowContext;
31use crate::error::EngineError;
32use crate::handler::{WorkflowHandler, WorkflowInfo};
33use crate::log_sender::LogSender;
34use crate::notify::{Event, EventPublisher, EventSubscriber};
35
36pub struct Engine {
77 store: Arc<dyn Store>,
78 provider: Arc<dyn AgentProvider>,
79 handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
80 event_publisher: EventPublisher,
81 log_sender: Option<LogSender>,
82}
83
84fn validate_category(handler_name: &str, category: &str) -> Result<(), EngineError> {
94 let reject = |reason: &str| {
95 Err(EngineError::InvalidWorkflow(format!(
96 "handler '{handler_name}' has invalid category '{category}': {reason}"
97 )))
98 };
99
100 if category.is_empty() {
101 return reject("empty category");
102 }
103 if category.starts_with('/') {
104 return reject("leading '/'");
105 }
106 if category.ends_with('/') {
107 return reject("trailing '/'");
108 }
109 for segment in category.split('/') {
110 if segment.is_empty() {
111 return reject("empty segment (double '/')");
112 }
113 if segment.trim().is_empty() {
114 return reject("whitespace-only segment");
115 }
116 }
117 Ok(())
118}
119
120impl Engine {
121 pub fn new(store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
137 Self {
138 store,
139 provider,
140 handlers: HashMap::new(),
141 event_publisher: EventPublisher::new(),
142 log_sender: None,
143 }
144 }
145
146 pub fn set_log_sender(&mut self, sender: LogSender) {
152 self.log_sender = Some(sender);
153 }
154
155 pub fn store(&self) -> &Arc<dyn Store> {
157 &self.store
158 }
159
160 pub fn provider(&self) -> &Arc<dyn AgentProvider> {
162 &self.provider
163 }
164
165 fn build_context(&self, run_id: Uuid) -> WorkflowContext {
167 let handlers = self.handlers.clone();
168 let resolver: crate::context::HandlerResolver =
169 Arc::new(move |name: &str| handlers.get(name).cloned());
170 let mut ctx = WorkflowContext::with_handler_resolver(
171 run_id,
172 self.store.clone(),
173 self.provider.clone(),
174 resolver,
175 );
176 if let Some(ref sender) = self.log_sender {
177 ctx.set_log_sender(sender.clone());
178 }
179 ctx
180 }
181
182 pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
226 let name = handler.name().to_string();
227 if self.handlers.contains_key(&name) {
228 return Err(EngineError::InvalidWorkflow(format!(
229 "handler '{}' already registered",
230 name
231 )));
232 }
233 if let Some(category) = handler.category() {
234 validate_category(&name, category)?;
235 }
236 self.handlers.insert(name, Arc::new(handler));
237 Ok(())
238 }
239
240 pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
247 let name = handler.name().to_string();
248 if self.handlers.contains_key(&name) {
249 return Err(EngineError::InvalidWorkflow(format!(
250 "handler '{}' already registered",
251 name
252 )));
253 }
254 if let Some(category) = handler.category() {
255 validate_category(&name, category)?;
256 }
257 self.handlers.insert(name, Arc::from(handler));
258 Ok(())
259 }
260
261 pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
263 self.handlers.get(name)
264 }
265
266 pub fn handler_names(&self) -> Vec<&str> {
268 self.handlers.keys().map(|s| s.as_str()).collect()
269 }
270
271 pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
273 self.handlers.get(name).map(|h| h.describe())
274 }
275
276 pub fn subscribe(
301 &mut self,
302 subscriber: impl EventSubscriber + 'static,
303 event_types: &[&'static str],
304 ) {
305 self.event_publisher.subscribe(subscriber, event_types);
306 }
307
308 pub fn event_publisher(&self) -> &EventPublisher {
313 &self.event_publisher
314 }
315
316 #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
346 pub async fn run_handler(
347 &self,
348 handler_name: &str,
349 trigger: TriggerKind,
350 payload: Value,
351 ) -> Result<Run, EngineError> {
352 let handler = self
353 .handlers
354 .get(handler_name)
355 .ok_or_else(|| {
356 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
357 })?
358 .clone();
359
360 let handler_version = handler.version().map(str::to_string);
361 let run = self
362 .store
363 .create_run(NewRun {
364 workflow_name: handler_name.to_string(),
365 trigger,
366 payload,
367 max_retries: 0,
368 handler_version,
369 labels: handler.default_labels(),
370 scheduled_at: None,
371 })
372 .await?;
373
374 let run_id = run.id;
375 info!(run_id = %run_id, handler_version = run.handler_version.as_deref().unwrap_or(""), "run created");
376
377 self.store
378 .update_run_status(run_id, RunStatus::Running)
379 .await?;
380
381 #[cfg(feature = "prometheus")]
382 gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
383
384 let run_start = Instant::now();
385 let mut ctx = self.build_context(run_id);
386
387 let result = handler.execute(&mut ctx).await;
388 self.finalize_run(run_id, handler_name, result, &ctx, run_start)
389 .await
390 }
391
392 #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
401 pub async fn enqueue_handler(
402 &self,
403 handler_name: &str,
404 trigger: TriggerKind,
405 payload: Value,
406 max_retries: u32,
407 ) -> Result<Run, EngineError> {
408 self.enqueue_handler_with_options(
409 handler_name,
410 trigger,
411 payload,
412 max_retries,
413 HashMap::new(),
414 None,
415 )
416 .await
417 }
418
419 #[tracing::instrument(name = "engine.enqueue_handler_with_options", skip_all, fields(workflow = %handler_name))]
425 pub async fn enqueue_handler_with_options(
426 &self,
427 handler_name: &str,
428 trigger: TriggerKind,
429 payload: Value,
430 max_retries: u32,
431 labels: HashMap<String, String>,
432 scheduled_at: Option<DateTime<Utc>>,
433 ) -> Result<Run, EngineError> {
434 let handler = self.handlers.get(handler_name).ok_or_else(|| {
435 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
436 })?;
437
438 let handler_version = handler.version().map(str::to_string);
439 let mut merged_labels = handler.default_labels();
440 merged_labels.extend(labels);
441
442 let run = self
443 .store
444 .create_run(NewRun {
445 workflow_name: handler_name.to_string(),
446 trigger,
447 payload,
448 max_retries,
449 handler_version,
450 labels: merged_labels,
451 scheduled_at,
452 })
453 .await?;
454
455 info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
456 Ok(run)
457 }
458
459 #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
468 pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
469 let run = self
470 .store
471 .get_run(run_id)
472 .await?
473 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
474
475 let handler = self
476 .handlers
477 .get(&run.workflow_name)
478 .ok_or_else(|| {
479 EngineError::InvalidWorkflow(format!(
480 "no handler registered: {}",
481 run.workflow_name
482 ))
483 })?
484 .clone();
485
486 #[cfg(feature = "prometheus")]
487 gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
488
489 let run_start = Instant::now();
490 let mut ctx = self.build_context(run_id);
491
492 let result = handler.execute(&mut ctx).await;
493 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
494 .await
495 }
496
497 #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
505 pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
506 self.execute_handler_run(run_id).await
507 }
508
509 #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
523 pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
524 let run = self
525 .store
526 .get_run(run_id)
527 .await?
528 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
529
530 let handler = self
531 .handlers
532 .get(&run.workflow_name)
533 .ok_or_else(|| {
534 EngineError::InvalidWorkflow(format!(
535 "no handler registered: {}",
536 run.workflow_name
537 ))
538 })?
539 .clone();
540
541 info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
542
543 let run_start = Instant::now();
544 let mut ctx = self.build_context(run_id);
545 ctx.load_replay_steps().await?;
546
547 let result = handler.execute(&mut ctx).await;
548 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
549 .await
550 }
551
552 pub async fn fail_orphaned_steps(
566 &self,
567 run_id: Uuid,
568 error_message: &str,
569 ) -> Result<(), EngineError> {
570 let steps = self.store.list_steps(run_id).await?;
571 let now = Utc::now();
572
573 for step in steps {
574 if step.status.state.is_terminal() {
575 continue;
576 }
577
578 let (target_status, error) = match step.status.state {
579 StepStatus::Running | StepStatus::AwaitingApproval => {
580 (StepStatus::Failed, Some(error_message.to_string()))
581 }
582 StepStatus::Pending => (StepStatus::Skipped, None),
583 _ => continue,
584 };
585
586 if let Err(e) = self
587 .store
588 .update_step(
589 step.id,
590 StepUpdate {
591 status: Some(target_status),
592 error,
593 completed_at: Some(now),
594 ..StepUpdate::default()
595 },
596 )
597 .await
598 {
599 warn!(
600 run_id = %run_id,
601 step_id = %step.id,
602 step_name = %step.name,
603 error = %e,
604 "failed to cleanup orphaned step"
605 );
606 } else {
607 info!(
608 run_id = %run_id,
609 step_id = %step.id,
610 step_name = %step.name,
611 from = %step.status.state,
612 to = %target_status,
613 "cleaned up orphaned step"
614 );
615 }
616 }
617
618 Ok(())
619 }
620
621 async fn finalize_run(
627 &self,
628 run_id: Uuid,
629 workflow_name: &str,
630 result: Result<(), EngineError>,
631 ctx: &WorkflowContext,
632 run_start: Instant,
633 ) -> Result<Run, EngineError> {
634 let total_duration = run_start.elapsed().as_millis() as u64;
635 let completed_at = Utc::now();
636
637 let final_status;
638 let final_run;
639
640 match result {
641 Ok(()) => {
642 final_status = RunStatus::Completed;
643 final_run = self
644 .store
645 .update_run_returning(
646 run_id,
647 RunUpdate {
648 status: Some(RunStatus::Completed),
649 cost_usd: Some(ctx.total_cost_usd()),
650 duration_ms: Some(total_duration),
651 completed_at: Some(completed_at),
652 ..RunUpdate::default()
653 },
654 )
655 .await?;
656
657 info!(
658 run_id = %run_id,
659 cost_usd = %ctx.total_cost_usd(),
660 duration_ms = total_duration,
661 "run completed"
662 );
663 }
664 Err(EngineError::ApprovalRequired {
665 run_id: approval_run_id,
666 step_id,
667 ref message,
668 }) => {
669 final_status = RunStatus::AwaitingApproval;
670 final_run = self
671 .store
672 .update_run_returning(
673 run_id,
674 RunUpdate {
675 status: Some(RunStatus::AwaitingApproval),
676 cost_usd: Some(ctx.total_cost_usd()),
677 duration_ms: Some(total_duration),
678 ..RunUpdate::default()
679 },
680 )
681 .await?;
682
683 info!(
684 run_id = %approval_run_id,
685 step_id = %step_id,
686 message = %message,
687 "run awaiting approval"
688 );
689 }
690 Err(err) => {
691 final_status = RunStatus::Failed;
692 if let Err(store_err) = self
693 .store
694 .update_run(
695 run_id,
696 RunUpdate {
697 status: Some(RunStatus::Failed),
698 error: Some(err.to_string()),
699 cost_usd: Some(ctx.total_cost_usd()),
700 duration_ms: Some(total_duration),
701 completed_at: Some(completed_at),
702 ..RunUpdate::default()
703 },
704 )
705 .await
706 {
707 error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
708 }
709
710 error!(run_id = %run_id, error = %err, "run failed");
711
712 self.publish_run_status_changed(
713 workflow_name,
714 run_id,
715 final_status,
716 Some(err.to_string()),
717 ctx,
718 total_duration,
719 );
720
721 #[cfg(feature = "prometheus")]
722 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
723
724 return Err(err);
725 }
726 }
727
728 self.publish_run_status_changed(
729 workflow_name,
730 run_id,
731 final_status,
732 None,
733 ctx,
734 total_duration,
735 );
736
737 #[cfg(feature = "prometheus")]
738 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
739
740 Ok(final_run)
741 }
742
743 #[cfg(feature = "prometheus")]
745 fn emit_run_metrics(
746 &self,
747 workflow_name: &str,
748 status: RunStatus,
749 duration_ms: u64,
750 ctx: &WorkflowContext,
751 ) {
752 let status_str = status.to_string();
753 let wf = workflow_name.to_string();
754
755 counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
756 histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
757 .record(duration_ms as f64 / 1000.0);
758 histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
759 ctx.total_cost_usd()
760 .to_string()
761 .parse::<f64>()
762 .unwrap_or(0.0),
763 );
764 gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
765 }
766
767 fn publish_run_status_changed(
772 &self,
773 workflow_name: &str,
774 run_id: Uuid,
775 to: RunStatus,
776 error: Option<String>,
777 ctx: &WorkflowContext,
778 duration_ms: u64,
779 ) {
780 let now = Utc::now();
781 let cost_usd = ctx.total_cost_usd();
782 let wf = workflow_name.to_string();
783
784 self.event_publisher.publish(Event::RunStatusChanged {
785 run_id,
786 workflow_name: wf.clone(),
787 from: RunStatus::Running,
788 to,
789 error: error.clone(),
790 cost_usd,
791 duration_ms,
792 at: now,
793 });
794
795 if to == RunStatus::Failed {
796 self.event_publisher.publish(Event::RunFailed {
797 run_id,
798 workflow_name: wf,
799 error,
800 cost_usd,
801 duration_ms,
802 at: now,
803 });
804 }
805 }
806}
807
808impl fmt::Debug for Engine {
809 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
810 f.debug_struct("Engine")
811 .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
812 .finish_non_exhaustive()
813 }
814}
815
816#[cfg(test)]
817mod tests {
818 use super::*;
819 use crate::config::ShellConfig;
820 use crate::handler::{HandlerFuture, WorkflowHandler};
821 use ironflow_core::providers::claude::ClaudeCodeProvider;
822 use ironflow_core::providers::record_replay::RecordReplayProvider;
823 use ironflow_store::memory::InMemoryStore;
824 use ironflow_store::models::StepStatus;
825 use serde_json::json;
826
827 struct EchoWorkflow;
829
830 impl WorkflowHandler for EchoWorkflow {
831 fn name(&self) -> &str {
832 "echo-workflow"
833 }
834
835 fn describe(&self) -> WorkflowInfo {
836 WorkflowInfo {
837 description: "A simple workflow that echoes hello".to_string(),
838 source_code: None,
839 sub_workflows: Vec::new(),
840 category: None,
841 version: self.version().map(str::to_string),
842 input_schema: None,
843 default_labels: HashMap::new(),
844 }
845 }
846
847 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
848 Box::pin(async move {
849 ctx.shell("greet", ShellConfig::new("echo hello")).await?;
850 Ok(())
851 })
852 }
853 }
854
855 struct FailingWorkflow;
857
858 impl WorkflowHandler for FailingWorkflow {
859 fn name(&self) -> &str {
860 "failing-workflow"
861 }
862
863 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
864 Box::pin(async move {
865 ctx.shell("fail", ShellConfig::new("exit 1")).await?;
866 Ok(())
867 })
868 }
869 }
870
871 fn create_test_engine() -> Engine {
872 let store = Arc::new(InMemoryStore::new());
873 let inner = ClaudeCodeProvider::new();
874 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
875 inner,
876 "/tmp/ironflow-fixtures",
877 ));
878 Engine::new(store, provider)
879 }
880
881 #[test]
882 fn engine_new_creates_instance() {
883 let engine = create_test_engine();
884 assert_eq!(engine.handler_names().len(), 0);
885 }
886
887 #[test]
888 fn engine_register_handler() {
889 let mut engine = create_test_engine();
890 let result = engine.register(EchoWorkflow);
891 assert!(result.is_ok());
892 assert_eq!(engine.handler_names().len(), 1);
893 assert!(engine.handler_names().contains(&"echo-workflow"));
894 }
895
896 #[test]
897 fn engine_register_duplicate_returns_error() {
898 let mut engine = create_test_engine();
899 engine.register(EchoWorkflow).unwrap();
900 let result = engine.register(EchoWorkflow);
901 assert!(result.is_err());
902 }
903
904 #[test]
905 fn engine_get_handler_found() {
906 let mut engine = create_test_engine();
907 engine.register(EchoWorkflow).unwrap();
908 let handler = engine.get_handler("echo-workflow");
909 assert!(handler.is_some());
910 }
911
912 #[test]
913 fn engine_get_handler_not_found() {
914 let engine = create_test_engine();
915 let handler = engine.get_handler("nonexistent");
916 assert!(handler.is_none());
917 }
918
919 #[test]
920 fn engine_handler_names_lists_all() {
921 let mut engine = create_test_engine();
922 engine.register(EchoWorkflow).unwrap();
923 engine.register(FailingWorkflow).unwrap();
924 let names = engine.handler_names();
925 assert_eq!(names.len(), 2);
926 assert!(names.contains(&"echo-workflow"));
927 assert!(names.contains(&"failing-workflow"));
928 }
929
930 #[test]
931 fn engine_handler_info_returns_description() {
932 let mut engine = create_test_engine();
933 engine.register(EchoWorkflow).unwrap();
934 let info = engine.handler_info("echo-workflow");
935 assert!(info.is_some());
936 let info = info.unwrap();
937 assert_eq!(info.description, "A simple workflow that echoes hello");
938 }
939
940 struct CategorizedWorkflow;
941
942 impl WorkflowHandler for CategorizedWorkflow {
943 fn name(&self) -> &str {
944 "categorized"
945 }
946 fn category(&self) -> Option<&str> {
947 Some("data/etl")
948 }
949 fn execute<'a>(
950 &'a self,
951 _ctx: &'a mut WorkflowContext,
952 ) -> crate::handler::HandlerFuture<'a> {
953 Box::pin(async move { Ok(()) })
954 }
955 }
956
957 #[test]
958 fn engine_default_describe_propagates_category() {
959 let mut engine = create_test_engine();
960 engine.register(CategorizedWorkflow).unwrap();
961 let info = engine.handler_info("categorized").unwrap();
962 assert_eq!(info.category.as_deref(), Some("data/etl"));
963 }
964
965 #[test]
966 fn engine_default_describe_without_category() {
967 let mut engine = create_test_engine();
968 engine.register(EchoWorkflow).unwrap();
969 let info = engine.handler_info("echo-workflow").unwrap();
970 assert!(info.category.is_none());
971 }
972
973 struct BadCategoryWorkflow(&'static str);
974
975 impl WorkflowHandler for BadCategoryWorkflow {
976 fn name(&self) -> &str {
977 "bad-category"
978 }
979 fn category(&self) -> Option<&str> {
980 Some(self.0)
981 }
982 fn execute<'a>(
983 &'a self,
984 _ctx: &'a mut WorkflowContext,
985 ) -> crate::handler::HandlerFuture<'a> {
986 Box::pin(async move { Ok(()) })
987 }
988 }
989
990 #[test]
991 fn engine_register_rejects_empty_category() {
992 let mut engine = create_test_engine();
993 let err = engine.register(BadCategoryWorkflow("")).unwrap_err();
994 match err {
995 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty category")),
996 other => panic!("expected InvalidWorkflow, got {other:?}"),
997 }
998 }
999
1000 #[test]
1001 fn engine_register_rejects_leading_slash_category() {
1002 let mut engine = create_test_engine();
1003 let err = engine
1004 .register(BadCategoryWorkflow("/data/etl"))
1005 .unwrap_err();
1006 match err {
1007 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("leading '/'")),
1008 other => panic!("expected InvalidWorkflow, got {other:?}"),
1009 }
1010 }
1011
1012 #[test]
1013 fn engine_register_rejects_trailing_slash_category() {
1014 let mut engine = create_test_engine();
1015 let err = engine
1016 .register(BadCategoryWorkflow("data/etl/"))
1017 .unwrap_err();
1018 match err {
1019 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("trailing '/'")),
1020 other => panic!("expected InvalidWorkflow, got {other:?}"),
1021 }
1022 }
1023
1024 #[test]
1025 fn engine_register_rejects_double_slash_category() {
1026 let mut engine = create_test_engine();
1027 let err = engine
1028 .register(BadCategoryWorkflow("data//etl"))
1029 .unwrap_err();
1030 match err {
1031 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty segment")),
1032 other => panic!("expected InvalidWorkflow, got {other:?}"),
1033 }
1034 }
1035
1036 #[test]
1037 fn engine_register_rejects_whitespace_only_segment_category() {
1038 let mut engine = create_test_engine();
1039 let err = engine
1040 .register(BadCategoryWorkflow("data/ /etl"))
1041 .unwrap_err();
1042 match err {
1043 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("whitespace-only segment")),
1044 other => panic!("expected InvalidWorkflow, got {other:?}"),
1045 }
1046 }
1047
1048 #[test]
1049 fn engine_register_accepts_valid_nested_category() {
1050 let mut engine = create_test_engine();
1051 assert!(engine.register(CategorizedWorkflow).is_ok());
1052 }
1053
1054 #[tokio::test]
1055 async fn engine_unknown_workflow_returns_error() {
1056 let engine = create_test_engine();
1057 let result = engine
1058 .run_handler("unknown", TriggerKind::Manual, json!({}))
1059 .await;
1060 assert!(result.is_err());
1061 match result {
1062 Err(EngineError::InvalidWorkflow(msg)) => {
1063 assert!(msg.contains("no handler registered"));
1064 }
1065 _ => panic!("expected InvalidWorkflow error"),
1066 }
1067 }
1068
1069 #[tokio::test]
1070 async fn engine_enqueue_handler_creates_pending_run() {
1071 let mut engine = create_test_engine();
1072 engine.register(EchoWorkflow).unwrap();
1073
1074 let run = engine
1075 .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
1076 .await
1077 .unwrap();
1078 assert_eq!(run.status.state, RunStatus::Pending);
1079 assert_eq!(run.workflow_name, "echo-workflow");
1080 }
1081
1082 #[tokio::test]
1083 async fn engine_register_boxed() {
1084 let mut engine = create_test_engine();
1085 let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
1086 let result = engine.register_boxed(handler);
1087 assert!(result.is_ok());
1088 assert_eq!(engine.handler_names().len(), 1);
1089 }
1090
1091 #[tokio::test]
1092 async fn engine_store_and_provider_accessors() {
1093 let store = Arc::new(InMemoryStore::new());
1094 let inner = ClaudeCodeProvider::new();
1095 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
1096 inner,
1097 "/tmp/ironflow-fixtures",
1098 ));
1099 let engine = Engine::new(store.clone(), provider.clone());
1100
1101 let _ = engine.store();
1103 let _ = engine.provider();
1104 }
1105
1106 use crate::operation::Operation;
1111 use ironflow_store::models::StepKind;
1112 use std::future::Future;
1113 use std::pin::Pin;
1114
1115 struct FakeGitlabOp {
1116 project_id: u64,
1117 title: String,
1118 }
1119
1120 impl Operation for FakeGitlabOp {
1121 fn kind(&self) -> &str {
1122 "gitlab"
1123 }
1124
1125 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1126 Box::pin(async move {
1127 Ok(json!({
1128 "issue_id": 42,
1129 "project_id": self.project_id,
1130 "title": self.title,
1131 }))
1132 })
1133 }
1134
1135 fn input(&self) -> Option<Value> {
1136 Some(json!({
1137 "project_id": self.project_id,
1138 "title": self.title,
1139 }))
1140 }
1141 }
1142
1143 struct FailingOp;
1144
1145 impl Operation for FailingOp {
1146 fn kind(&self) -> &str {
1147 "broken-service"
1148 }
1149
1150 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1151 Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
1152 }
1153 }
1154
1155 struct OperationWorkflow;
1156
1157 impl WorkflowHandler for OperationWorkflow {
1158 fn name(&self) -> &str {
1159 "operation-workflow"
1160 }
1161
1162 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1163 Box::pin(async move {
1164 let op = FakeGitlabOp {
1165 project_id: 123,
1166 title: "Bug report".to_string(),
1167 };
1168 ctx.operation("create-issue", &op).await?;
1169 Ok(())
1170 })
1171 }
1172 }
1173
1174 struct FailingOperationWorkflow;
1175
1176 impl WorkflowHandler for FailingOperationWorkflow {
1177 fn name(&self) -> &str {
1178 "failing-operation-workflow"
1179 }
1180
1181 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1182 Box::pin(async move {
1183 ctx.operation("broken-call", &FailingOp).await?;
1184 Ok(())
1185 })
1186 }
1187 }
1188
1189 struct MixedWorkflow;
1190
1191 impl WorkflowHandler for MixedWorkflow {
1192 fn name(&self) -> &str {
1193 "mixed-workflow"
1194 }
1195
1196 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1197 Box::pin(async move {
1198 ctx.shell("build", ShellConfig::new("echo built")).await?;
1199 let op = FakeGitlabOp {
1200 project_id: 456,
1201 title: "Deploy done".to_string(),
1202 };
1203 let result = ctx.operation("notify-gitlab", &op).await?;
1204 assert_eq!(result.output["issue_id"], 42);
1205 Ok(())
1206 })
1207 }
1208 }
1209
1210 #[tokio::test]
1211 async fn operation_step_happy_path() {
1212 let mut engine = create_test_engine();
1213 engine.register(OperationWorkflow).unwrap();
1214
1215 let run = engine
1216 .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
1217 .await
1218 .unwrap();
1219
1220 assert_eq!(run.status.state, RunStatus::Completed);
1221
1222 let steps = engine.store().list_steps(run.id).await.unwrap();
1223
1224 assert_eq!(steps.len(), 1);
1225 assert_eq!(steps[0].name, "create-issue");
1226 assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
1227 assert_eq!(
1228 steps[0].status.state,
1229 ironflow_store::models::StepStatus::Completed
1230 );
1231
1232 let output = steps[0].output.as_ref().unwrap();
1233 assert_eq!(output["issue_id"], 42);
1234 assert_eq!(output["project_id"], 123);
1235
1236 let input = steps[0].input.as_ref().unwrap();
1237 assert_eq!(input["project_id"], 123);
1238 assert_eq!(input["title"], "Bug report");
1239 }
1240
1241 #[tokio::test]
1242 async fn operation_step_failure_marks_run_failed() {
1243 let mut engine = create_test_engine();
1244 engine.register(FailingOperationWorkflow).unwrap();
1245
1246 let result = engine
1247 .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
1248 .await;
1249
1250 assert!(result.is_err());
1251 }
1252
1253 #[tokio::test]
1254 async fn operation_mixed_with_shell_steps() {
1255 let mut engine = create_test_engine();
1256 engine.register(MixedWorkflow).unwrap();
1257
1258 let run = engine
1259 .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
1260 .await
1261 .unwrap();
1262
1263 assert_eq!(run.status.state, RunStatus::Completed);
1264
1265 let steps = engine.store().list_steps(run.id).await.unwrap();
1266
1267 assert_eq!(steps.len(), 2);
1268 assert_eq!(steps[0].kind, StepKind::Shell);
1269 assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
1270 assert_eq!(steps[0].position, 0);
1271 assert_eq!(steps[1].position, 1);
1272 }
1273
1274 use crate::config::ApprovalConfig;
1279
1280 struct SingleApprovalWorkflow;
1281
1282 impl WorkflowHandler for SingleApprovalWorkflow {
1283 fn name(&self) -> &str {
1284 "single-approval"
1285 }
1286
1287 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1288 Box::pin(async move {
1289 ctx.shell("build", ShellConfig::new("echo built")).await?;
1290 ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
1291 ctx.shell("deploy", ShellConfig::new("echo deployed"))
1292 .await?;
1293 Ok(())
1294 })
1295 }
1296 }
1297
1298 struct DoubleApprovalWorkflow;
1299
1300 impl WorkflowHandler for DoubleApprovalWorkflow {
1301 fn name(&self) -> &str {
1302 "double-approval"
1303 }
1304
1305 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1306 Box::pin(async move {
1307 ctx.shell("build", ShellConfig::new("echo built")).await?;
1308 ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1309 .await?;
1310 ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1311 .await?;
1312 ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1313 .await?;
1314 ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1315 .await?;
1316 Ok(())
1317 })
1318 }
1319 }
1320
1321 #[tokio::test]
1322 async fn approval_pauses_run() {
1323 let mut engine = create_test_engine();
1324 engine.register(SingleApprovalWorkflow).unwrap();
1325
1326 let run = engine
1327 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1328 .await
1329 .unwrap();
1330
1331 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1332
1333 let steps = engine.store().list_steps(run.id).await.unwrap();
1334 assert_eq!(steps.len(), 2); assert_eq!(steps[0].kind, StepKind::Shell);
1336 assert_eq!(steps[0].status.state, StepStatus::Completed);
1337 assert_eq!(steps[1].kind, StepKind::Approval);
1338 assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1339 }
1340
1341 #[tokio::test]
1342 async fn approval_resume_completes_run() {
1343 let mut engine = create_test_engine();
1344 engine.register(SingleApprovalWorkflow).unwrap();
1345
1346 let run = engine
1348 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1349 .await
1350 .unwrap();
1351 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1352
1353 engine
1355 .store()
1356 .update_run_status(run.id, RunStatus::Running)
1357 .await
1358 .unwrap();
1359
1360 let resumed = engine.resume_run(run.id).await.unwrap();
1362 assert_eq!(resumed.status.state, RunStatus::Completed);
1363
1364 let steps = engine.store().list_steps(run.id).await.unwrap();
1365 assert_eq!(steps.len(), 3); assert_eq!(steps[0].name, "build");
1367 assert_eq!(steps[0].status.state, StepStatus::Completed);
1368 assert_eq!(steps[1].name, "gate");
1369 assert_eq!(steps[1].kind, StepKind::Approval);
1370 assert_eq!(steps[1].status.state, StepStatus::Completed);
1371 assert_eq!(steps[2].name, "deploy");
1372 assert_eq!(steps[2].status.state, StepStatus::Completed);
1373 }
1374
1375 #[tokio::test]
1376 async fn double_approval_two_resumes() {
1377 let mut engine = create_test_engine();
1378 engine.register(DoubleApprovalWorkflow).unwrap();
1379
1380 let run = engine
1382 .run_handler("double-approval", TriggerKind::Manual, json!({}))
1383 .await
1384 .unwrap();
1385 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1386
1387 let steps = engine.store().list_steps(run.id).await.unwrap();
1388 assert_eq!(steps.len(), 2); engine
1392 .store()
1393 .update_run_status(run.id, RunStatus::Running)
1394 .await
1395 .unwrap();
1396
1397 let resumed = engine.resume_run(run.id).await.unwrap();
1398 assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1399
1400 let steps = engine.store().list_steps(run.id).await.unwrap();
1401 assert_eq!(steps.len(), 4); engine
1405 .store()
1406 .update_run_status(run.id, RunStatus::Running)
1407 .await
1408 .unwrap();
1409
1410 let final_run = engine.resume_run(run.id).await.unwrap();
1411 assert_eq!(final_run.status.state, RunStatus::Completed);
1412
1413 let steps = engine.store().list_steps(run.id).await.unwrap();
1414 assert_eq!(steps.len(), 5);
1415 assert_eq!(steps[0].name, "build");
1416 assert_eq!(steps[1].name, "staging-gate");
1417 assert_eq!(steps[2].name, "deploy-staging");
1418 assert_eq!(steps[3].name, "prod-gate");
1419 assert_eq!(steps[4].name, "deploy-prod");
1420
1421 for step in &steps {
1422 assert_eq!(step.status.state, StepStatus::Completed);
1423 }
1424 }
1425
1426 use ironflow_store::models::{NewStep, StepUpdate};
1431
1432 async fn create_step_with_status(
1433 store: &Arc<dyn Store>,
1434 run_id: Uuid,
1435 name: &str,
1436 position: u32,
1437 status: StepStatus,
1438 ) -> ironflow_store::models::Step {
1439 let step = store
1440 .create_step(NewStep {
1441 run_id,
1442 name: name.to_string(),
1443 kind: StepKind::Shell,
1444 position,
1445 input: None,
1446 })
1447 .await
1448 .unwrap();
1449
1450 match status {
1451 StepStatus::Pending => {}
1452 StepStatus::Running => {
1453 store
1454 .update_step(
1455 step.id,
1456 StepUpdate {
1457 status: Some(StepStatus::Running),
1458 ..StepUpdate::default()
1459 },
1460 )
1461 .await
1462 .unwrap();
1463 }
1464 StepStatus::Completed => {
1465 store
1466 .update_step(
1467 step.id,
1468 StepUpdate {
1469 status: Some(StepStatus::Running),
1470 ..StepUpdate::default()
1471 },
1472 )
1473 .await
1474 .unwrap();
1475 store
1476 .update_step(
1477 step.id,
1478 StepUpdate {
1479 status: Some(StepStatus::Completed),
1480 ..StepUpdate::default()
1481 },
1482 )
1483 .await
1484 .unwrap();
1485 }
1486 StepStatus::AwaitingApproval => {
1487 store
1488 .update_step(
1489 step.id,
1490 StepUpdate {
1491 status: Some(StepStatus::Running),
1492 ..StepUpdate::default()
1493 },
1494 )
1495 .await
1496 .unwrap();
1497 store
1498 .update_step(
1499 step.id,
1500 StepUpdate {
1501 status: Some(StepStatus::AwaitingApproval),
1502 ..StepUpdate::default()
1503 },
1504 )
1505 .await
1506 .unwrap();
1507 }
1508 _ => panic!("unsupported status for test helper: {status}"),
1509 }
1510
1511 store.get_step(step.id).await.unwrap().unwrap()
1512 }
1513
1514 #[tokio::test]
1515 async fn fail_orphaned_steps_marks_running_as_failed() {
1516 let engine = create_test_engine();
1517 let run = engine
1518 .store()
1519 .create_run(NewRun {
1520 workflow_name: "test".to_string(),
1521 trigger: TriggerKind::Manual,
1522 payload: json!({}),
1523 max_retries: 0,
1524 handler_version: None,
1525 labels: HashMap::new(),
1526 scheduled_at: None,
1527 })
1528 .await
1529 .unwrap();
1530
1531 let step = create_step_with_status(
1532 engine.store(),
1533 run.id,
1534 "running-step",
1535 0,
1536 StepStatus::Running,
1537 )
1538 .await;
1539
1540 engine
1541 .fail_orphaned_steps(run.id, "parent run timed out")
1542 .await
1543 .unwrap();
1544
1545 let updated = engine.store().get_step(step.id).await.unwrap().unwrap();
1546 assert_eq!(updated.status.state, StepStatus::Failed);
1547 assert_eq!(updated.error.as_deref(), Some("parent run timed out"));
1548 assert!(updated.completed_at.is_some());
1549 }
1550
1551 #[tokio::test]
1552 async fn fail_orphaned_steps_marks_pending_as_skipped() {
1553 let engine = create_test_engine();
1554 let run = engine
1555 .store()
1556 .create_run(NewRun {
1557 workflow_name: "test".to_string(),
1558 trigger: TriggerKind::Manual,
1559 payload: json!({}),
1560 max_retries: 0,
1561 handler_version: None,
1562 labels: HashMap::new(),
1563 scheduled_at: None,
1564 })
1565 .await
1566 .unwrap();
1567
1568 let step = create_step_with_status(
1569 engine.store(),
1570 run.id,
1571 "pending-step",
1572 0,
1573 StepStatus::Pending,
1574 )
1575 .await;
1576
1577 engine
1578 .fail_orphaned_steps(run.id, "parent run timed out")
1579 .await
1580 .unwrap();
1581
1582 let updated = engine.store().get_step(step.id).await.unwrap().unwrap();
1583 assert_eq!(updated.status.state, StepStatus::Skipped);
1584 assert!(updated.error.is_none());
1585 assert!(updated.completed_at.is_some());
1586 }
1587
1588 #[tokio::test]
1589 async fn fail_orphaned_steps_marks_awaiting_approval_as_failed() {
1590 let engine = create_test_engine();
1591 let run = engine
1592 .store()
1593 .create_run(NewRun {
1594 workflow_name: "test".to_string(),
1595 trigger: TriggerKind::Manual,
1596 payload: json!({}),
1597 max_retries: 0,
1598 handler_version: None,
1599 labels: HashMap::new(),
1600 scheduled_at: None,
1601 })
1602 .await
1603 .unwrap();
1604
1605 let step = create_step_with_status(
1606 engine.store(),
1607 run.id,
1608 "approval-step",
1609 0,
1610 StepStatus::AwaitingApproval,
1611 )
1612 .await;
1613
1614 engine
1615 .fail_orphaned_steps(run.id, "parent run timed out")
1616 .await
1617 .unwrap();
1618
1619 let updated = engine.store().get_step(step.id).await.unwrap().unwrap();
1620 assert_eq!(updated.status.state, StepStatus::Failed);
1621 assert_eq!(updated.error.as_deref(), Some("parent run timed out"));
1622 assert!(updated.completed_at.is_some());
1623 }
1624
1625 #[tokio::test]
1626 async fn fail_orphaned_steps_skips_terminal_steps() {
1627 let engine = create_test_engine();
1628 let run = engine
1629 .store()
1630 .create_run(NewRun {
1631 workflow_name: "test".to_string(),
1632 trigger: TriggerKind::Manual,
1633 payload: json!({}),
1634 max_retries: 0,
1635 handler_version: None,
1636 labels: HashMap::new(),
1637 scheduled_at: None,
1638 })
1639 .await
1640 .unwrap();
1641
1642 let completed_step =
1643 create_step_with_status(engine.store(), run.id, "done", 0, StepStatus::Completed).await;
1644 let running_step =
1645 create_step_with_status(engine.store(), run.id, "in-flight", 1, StepStatus::Running)
1646 .await;
1647
1648 engine
1649 .fail_orphaned_steps(run.id, "parent run timed out")
1650 .await
1651 .unwrap();
1652
1653 let completed = engine
1654 .store()
1655 .get_step(completed_step.id)
1656 .await
1657 .unwrap()
1658 .unwrap();
1659 assert_eq!(completed.status.state, StepStatus::Completed);
1660
1661 let failed = engine
1662 .store()
1663 .get_step(running_step.id)
1664 .await
1665 .unwrap()
1666 .unwrap();
1667 assert_eq!(failed.status.state, StepStatus::Failed);
1668 }
1669
1670 #[tokio::test]
1671 async fn fail_orphaned_steps_mixed_states() {
1672 let engine = create_test_engine();
1673 let run = engine
1674 .store()
1675 .create_run(NewRun {
1676 workflow_name: "test".to_string(),
1677 trigger: TriggerKind::Manual,
1678 payload: json!({}),
1679 max_retries: 0,
1680 handler_version: None,
1681 labels: HashMap::new(),
1682 scheduled_at: None,
1683 })
1684 .await
1685 .unwrap();
1686
1687 let s_completed =
1688 create_step_with_status(engine.store(), run.id, "step-1", 0, StepStatus::Completed)
1689 .await;
1690 let s_running =
1691 create_step_with_status(engine.store(), run.id, "step-2", 1, StepStatus::Running).await;
1692 let s_pending =
1693 create_step_with_status(engine.store(), run.id, "step-3", 2, StepStatus::Pending).await;
1694
1695 engine.fail_orphaned_steps(run.id, "timeout").await.unwrap();
1696
1697 let r_completed = engine
1698 .store()
1699 .get_step(s_completed.id)
1700 .await
1701 .unwrap()
1702 .unwrap();
1703 assert_eq!(r_completed.status.state, StepStatus::Completed);
1704
1705 let r_running = engine
1706 .store()
1707 .get_step(s_running.id)
1708 .await
1709 .unwrap()
1710 .unwrap();
1711 assert_eq!(r_running.status.state, StepStatus::Failed);
1712 assert_eq!(r_running.error.as_deref(), Some("timeout"));
1713
1714 let r_pending = engine
1715 .store()
1716 .get_step(s_pending.id)
1717 .await
1718 .unwrap()
1719 .unwrap();
1720 assert_eq!(r_pending.status.state, StepStatus::Skipped);
1721 assert!(r_pending.error.is_none());
1722 }
1723
1724 #[tokio::test]
1725 async fn fail_orphaned_steps_no_steps_is_noop() {
1726 let engine = create_test_engine();
1727 let run = engine
1728 .store()
1729 .create_run(NewRun {
1730 workflow_name: "test".to_string(),
1731 trigger: TriggerKind::Manual,
1732 payload: json!({}),
1733 max_retries: 0,
1734 handler_version: None,
1735 labels: HashMap::new(),
1736 scheduled_at: None,
1737 })
1738 .await
1739 .unwrap();
1740
1741 let result = engine.fail_orphaned_steps(run.id, "timeout").await;
1742 assert!(result.is_ok());
1743 }
1744}