1use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::{DateTime, Utc};
15use serde_json::Value;
16use tracing::{error, info};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
24use ironflow_store::store::Store;
25#[cfg(feature = "prometheus")]
26use metrics::{counter, gauge, histogram};
27
28use crate::context::WorkflowContext;
29use crate::error::EngineError;
30use crate::handler::{WorkflowHandler, WorkflowInfo};
31use crate::log_sender::LogSender;
32use crate::notify::{Event, EventPublisher, EventSubscriber};
33
34pub struct Engine {
75 store: Arc<dyn Store>,
76 provider: Arc<dyn AgentProvider>,
77 handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
78 event_publisher: EventPublisher,
79 log_sender: Option<LogSender>,
80}
81
82fn validate_category(handler_name: &str, category: &str) -> Result<(), EngineError> {
92 let reject = |reason: &str| {
93 Err(EngineError::InvalidWorkflow(format!(
94 "handler '{handler_name}' has invalid category '{category}': {reason}"
95 )))
96 };
97
98 if category.is_empty() {
99 return reject("empty category");
100 }
101 if category.starts_with('/') {
102 return reject("leading '/'");
103 }
104 if category.ends_with('/') {
105 return reject("trailing '/'");
106 }
107 for segment in category.split('/') {
108 if segment.is_empty() {
109 return reject("empty segment (double '/')");
110 }
111 if segment.trim().is_empty() {
112 return reject("whitespace-only segment");
113 }
114 }
115 Ok(())
116}
117
118impl Engine {
119 pub fn new(store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
135 Self {
136 store,
137 provider,
138 handlers: HashMap::new(),
139 event_publisher: EventPublisher::new(),
140 log_sender: None,
141 }
142 }
143
144 pub fn set_log_sender(&mut self, sender: LogSender) {
150 self.log_sender = Some(sender);
151 }
152
153 pub fn store(&self) -> &Arc<dyn Store> {
155 &self.store
156 }
157
158 pub fn provider(&self) -> &Arc<dyn AgentProvider> {
160 &self.provider
161 }
162
163 fn build_context(&self, run_id: Uuid) -> WorkflowContext {
165 let handlers = self.handlers.clone();
166 let resolver: crate::context::HandlerResolver =
167 Arc::new(move |name: &str| handlers.get(name).cloned());
168 let mut ctx = WorkflowContext::with_handler_resolver(
169 run_id,
170 self.store.clone(),
171 self.provider.clone(),
172 resolver,
173 );
174 if let Some(ref sender) = self.log_sender {
175 ctx.set_log_sender(sender.clone());
176 }
177 ctx
178 }
179
180 pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
224 let name = handler.name().to_string();
225 if self.handlers.contains_key(&name) {
226 return Err(EngineError::InvalidWorkflow(format!(
227 "handler '{}' already registered",
228 name
229 )));
230 }
231 if let Some(category) = handler.category() {
232 validate_category(&name, category)?;
233 }
234 self.handlers.insert(name, Arc::new(handler));
235 Ok(())
236 }
237
238 pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
245 let name = handler.name().to_string();
246 if self.handlers.contains_key(&name) {
247 return Err(EngineError::InvalidWorkflow(format!(
248 "handler '{}' already registered",
249 name
250 )));
251 }
252 if let Some(category) = handler.category() {
253 validate_category(&name, category)?;
254 }
255 self.handlers.insert(name, Arc::from(handler));
256 Ok(())
257 }
258
259 pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
261 self.handlers.get(name)
262 }
263
264 pub fn handler_names(&self) -> Vec<&str> {
266 self.handlers.keys().map(|s| s.as_str()).collect()
267 }
268
269 pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
271 self.handlers.get(name).map(|h| h.describe())
272 }
273
274 pub fn subscribe(
299 &mut self,
300 subscriber: impl EventSubscriber + 'static,
301 event_types: &[&'static str],
302 ) {
303 self.event_publisher.subscribe(subscriber, event_types);
304 }
305
306 pub fn event_publisher(&self) -> &EventPublisher {
311 &self.event_publisher
312 }
313
314 #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
344 pub async fn run_handler(
345 &self,
346 handler_name: &str,
347 trigger: TriggerKind,
348 payload: Value,
349 ) -> Result<Run, EngineError> {
350 let handler = self
351 .handlers
352 .get(handler_name)
353 .ok_or_else(|| {
354 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
355 })?
356 .clone();
357
358 let handler_version = handler.version().map(str::to_string);
359 let run = self
360 .store
361 .create_run(NewRun {
362 workflow_name: handler_name.to_string(),
363 trigger,
364 payload,
365 max_retries: 0,
366 handler_version,
367 labels: handler.default_labels(),
368 scheduled_at: None,
369 })
370 .await?;
371
372 let run_id = run.id;
373 info!(run_id = %run_id, handler_version = run.handler_version.as_deref().unwrap_or(""), "run created");
374
375 self.store
376 .update_run_status(run_id, RunStatus::Running)
377 .await?;
378
379 #[cfg(feature = "prometheus")]
380 gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
381
382 let run_start = Instant::now();
383 let mut ctx = self.build_context(run_id);
384
385 let result = handler.execute(&mut ctx).await;
386 self.finalize_run(run_id, handler_name, result, &ctx, run_start)
387 .await
388 }
389
390 #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
399 pub async fn enqueue_handler(
400 &self,
401 handler_name: &str,
402 trigger: TriggerKind,
403 payload: Value,
404 max_retries: u32,
405 ) -> Result<Run, EngineError> {
406 self.enqueue_handler_with_options(
407 handler_name,
408 trigger,
409 payload,
410 max_retries,
411 HashMap::new(),
412 None,
413 )
414 .await
415 }
416
417 #[tracing::instrument(name = "engine.enqueue_handler_with_options", skip_all, fields(workflow = %handler_name))]
423 pub async fn enqueue_handler_with_options(
424 &self,
425 handler_name: &str,
426 trigger: TriggerKind,
427 payload: Value,
428 max_retries: u32,
429 labels: HashMap<String, String>,
430 scheduled_at: Option<DateTime<Utc>>,
431 ) -> Result<Run, EngineError> {
432 let handler = self.handlers.get(handler_name).ok_or_else(|| {
433 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
434 })?;
435
436 let handler_version = handler.version().map(str::to_string);
437 let mut merged_labels = handler.default_labels();
438 merged_labels.extend(labels);
439
440 let run = self
441 .store
442 .create_run(NewRun {
443 workflow_name: handler_name.to_string(),
444 trigger,
445 payload,
446 max_retries,
447 handler_version,
448 labels: merged_labels,
449 scheduled_at,
450 })
451 .await?;
452
453 info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
454 Ok(run)
455 }
456
457 #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
466 pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
467 let run = self
468 .store
469 .get_run(run_id)
470 .await?
471 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
472
473 let handler = self
474 .handlers
475 .get(&run.workflow_name)
476 .ok_or_else(|| {
477 EngineError::InvalidWorkflow(format!(
478 "no handler registered: {}",
479 run.workflow_name
480 ))
481 })?
482 .clone();
483
484 #[cfg(feature = "prometheus")]
485 gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
486
487 let run_start = Instant::now();
488 let mut ctx = self.build_context(run_id);
489
490 let result = handler.execute(&mut ctx).await;
491 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
492 .await
493 }
494
495 #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
503 pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
504 self.execute_handler_run(run_id).await
505 }
506
507 #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
521 pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
522 let run = self
523 .store
524 .get_run(run_id)
525 .await?
526 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
527
528 let handler = self
529 .handlers
530 .get(&run.workflow_name)
531 .ok_or_else(|| {
532 EngineError::InvalidWorkflow(format!(
533 "no handler registered: {}",
534 run.workflow_name
535 ))
536 })?
537 .clone();
538
539 info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
540
541 let run_start = Instant::now();
542 let mut ctx = self.build_context(run_id);
543 ctx.load_replay_steps().await?;
544
545 let result = handler.execute(&mut ctx).await;
546 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
547 .await
548 }
549
550 async fn finalize_run(
556 &self,
557 run_id: Uuid,
558 workflow_name: &str,
559 result: Result<(), EngineError>,
560 ctx: &WorkflowContext,
561 run_start: Instant,
562 ) -> Result<Run, EngineError> {
563 let total_duration = run_start.elapsed().as_millis() as u64;
564 let completed_at = Utc::now();
565
566 let final_status;
567 let final_run;
568
569 match result {
570 Ok(()) => {
571 final_status = RunStatus::Completed;
572 final_run = self
573 .store
574 .update_run_returning(
575 run_id,
576 RunUpdate {
577 status: Some(RunStatus::Completed),
578 cost_usd: Some(ctx.total_cost_usd()),
579 duration_ms: Some(total_duration),
580 completed_at: Some(completed_at),
581 ..RunUpdate::default()
582 },
583 )
584 .await?;
585
586 info!(
587 run_id = %run_id,
588 cost_usd = %ctx.total_cost_usd(),
589 duration_ms = total_duration,
590 "run completed"
591 );
592 }
593 Err(EngineError::ApprovalRequired {
594 run_id: approval_run_id,
595 step_id,
596 ref message,
597 }) => {
598 final_status = RunStatus::AwaitingApproval;
599 final_run = self
600 .store
601 .update_run_returning(
602 run_id,
603 RunUpdate {
604 status: Some(RunStatus::AwaitingApproval),
605 cost_usd: Some(ctx.total_cost_usd()),
606 duration_ms: Some(total_duration),
607 ..RunUpdate::default()
608 },
609 )
610 .await?;
611
612 info!(
613 run_id = %approval_run_id,
614 step_id = %step_id,
615 message = %message,
616 "run awaiting approval"
617 );
618 }
619 Err(err) => {
620 final_status = RunStatus::Failed;
621 if let Err(store_err) = self
622 .store
623 .update_run(
624 run_id,
625 RunUpdate {
626 status: Some(RunStatus::Failed),
627 error: Some(err.to_string()),
628 cost_usd: Some(ctx.total_cost_usd()),
629 duration_ms: Some(total_duration),
630 completed_at: Some(completed_at),
631 ..RunUpdate::default()
632 },
633 )
634 .await
635 {
636 error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
637 }
638
639 error!(run_id = %run_id, error = %err, "run failed");
640
641 self.publish_run_status_changed(
642 workflow_name,
643 run_id,
644 final_status,
645 Some(err.to_string()),
646 ctx,
647 total_duration,
648 );
649
650 #[cfg(feature = "prometheus")]
651 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
652
653 return Err(err);
654 }
655 }
656
657 self.publish_run_status_changed(
658 workflow_name,
659 run_id,
660 final_status,
661 None,
662 ctx,
663 total_duration,
664 );
665
666 #[cfg(feature = "prometheus")]
667 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
668
669 Ok(final_run)
670 }
671
672 #[cfg(feature = "prometheus")]
674 fn emit_run_metrics(
675 &self,
676 workflow_name: &str,
677 status: RunStatus,
678 duration_ms: u64,
679 ctx: &WorkflowContext,
680 ) {
681 let status_str = status.to_string();
682 let wf = workflow_name.to_string();
683
684 counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
685 histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
686 .record(duration_ms as f64 / 1000.0);
687 histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
688 ctx.total_cost_usd()
689 .to_string()
690 .parse::<f64>()
691 .unwrap_or(0.0),
692 );
693 gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
694 }
695
696 fn publish_run_status_changed(
701 &self,
702 workflow_name: &str,
703 run_id: Uuid,
704 to: RunStatus,
705 error: Option<String>,
706 ctx: &WorkflowContext,
707 duration_ms: u64,
708 ) {
709 let now = Utc::now();
710 let cost_usd = ctx.total_cost_usd();
711 let wf = workflow_name.to_string();
712
713 self.event_publisher.publish(Event::RunStatusChanged {
714 run_id,
715 workflow_name: wf.clone(),
716 from: RunStatus::Running,
717 to,
718 error: error.clone(),
719 cost_usd,
720 duration_ms,
721 at: now,
722 });
723
724 if to == RunStatus::Failed {
725 self.event_publisher.publish(Event::RunFailed {
726 run_id,
727 workflow_name: wf,
728 error,
729 cost_usd,
730 duration_ms,
731 at: now,
732 });
733 }
734 }
735}
736
737impl fmt::Debug for Engine {
738 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
739 f.debug_struct("Engine")
740 .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
741 .finish_non_exhaustive()
742 }
743}
744
745#[cfg(test)]
746mod tests {
747 use super::*;
748 use crate::config::ShellConfig;
749 use crate::handler::{HandlerFuture, WorkflowHandler};
750 use ironflow_core::providers::claude::ClaudeCodeProvider;
751 use ironflow_core::providers::record_replay::RecordReplayProvider;
752 use ironflow_store::memory::InMemoryStore;
753 use ironflow_store::models::StepStatus;
754 use serde_json::json;
755
756 struct EchoWorkflow;
758
759 impl WorkflowHandler for EchoWorkflow {
760 fn name(&self) -> &str {
761 "echo-workflow"
762 }
763
764 fn describe(&self) -> WorkflowInfo {
765 WorkflowInfo {
766 description: "A simple workflow that echoes hello".to_string(),
767 source_code: None,
768 sub_workflows: Vec::new(),
769 category: None,
770 version: self.version().map(str::to_string),
771 input_schema: None,
772 default_labels: HashMap::new(),
773 }
774 }
775
776 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
777 Box::pin(async move {
778 ctx.shell("greet", ShellConfig::new("echo hello")).await?;
779 Ok(())
780 })
781 }
782 }
783
784 struct FailingWorkflow;
786
787 impl WorkflowHandler for FailingWorkflow {
788 fn name(&self) -> &str {
789 "failing-workflow"
790 }
791
792 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
793 Box::pin(async move {
794 ctx.shell("fail", ShellConfig::new("exit 1")).await?;
795 Ok(())
796 })
797 }
798 }
799
800 fn create_test_engine() -> Engine {
801 let store = Arc::new(InMemoryStore::new());
802 let inner = ClaudeCodeProvider::new();
803 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
804 inner,
805 "/tmp/ironflow-fixtures",
806 ));
807 Engine::new(store, provider)
808 }
809
810 #[test]
811 fn engine_new_creates_instance() {
812 let engine = create_test_engine();
813 assert_eq!(engine.handler_names().len(), 0);
814 }
815
816 #[test]
817 fn engine_register_handler() {
818 let mut engine = create_test_engine();
819 let result = engine.register(EchoWorkflow);
820 assert!(result.is_ok());
821 assert_eq!(engine.handler_names().len(), 1);
822 assert!(engine.handler_names().contains(&"echo-workflow"));
823 }
824
825 #[test]
826 fn engine_register_duplicate_returns_error() {
827 let mut engine = create_test_engine();
828 engine.register(EchoWorkflow).unwrap();
829 let result = engine.register(EchoWorkflow);
830 assert!(result.is_err());
831 }
832
833 #[test]
834 fn engine_get_handler_found() {
835 let mut engine = create_test_engine();
836 engine.register(EchoWorkflow).unwrap();
837 let handler = engine.get_handler("echo-workflow");
838 assert!(handler.is_some());
839 }
840
841 #[test]
842 fn engine_get_handler_not_found() {
843 let engine = create_test_engine();
844 let handler = engine.get_handler("nonexistent");
845 assert!(handler.is_none());
846 }
847
848 #[test]
849 fn engine_handler_names_lists_all() {
850 let mut engine = create_test_engine();
851 engine.register(EchoWorkflow).unwrap();
852 engine.register(FailingWorkflow).unwrap();
853 let names = engine.handler_names();
854 assert_eq!(names.len(), 2);
855 assert!(names.contains(&"echo-workflow"));
856 assert!(names.contains(&"failing-workflow"));
857 }
858
859 #[test]
860 fn engine_handler_info_returns_description() {
861 let mut engine = create_test_engine();
862 engine.register(EchoWorkflow).unwrap();
863 let info = engine.handler_info("echo-workflow");
864 assert!(info.is_some());
865 let info = info.unwrap();
866 assert_eq!(info.description, "A simple workflow that echoes hello");
867 }
868
869 struct CategorizedWorkflow;
870
871 impl WorkflowHandler for CategorizedWorkflow {
872 fn name(&self) -> &str {
873 "categorized"
874 }
875 fn category(&self) -> Option<&str> {
876 Some("data/etl")
877 }
878 fn execute<'a>(
879 &'a self,
880 _ctx: &'a mut WorkflowContext,
881 ) -> crate::handler::HandlerFuture<'a> {
882 Box::pin(async move { Ok(()) })
883 }
884 }
885
886 #[test]
887 fn engine_default_describe_propagates_category() {
888 let mut engine = create_test_engine();
889 engine.register(CategorizedWorkflow).unwrap();
890 let info = engine.handler_info("categorized").unwrap();
891 assert_eq!(info.category.as_deref(), Some("data/etl"));
892 }
893
894 #[test]
895 fn engine_default_describe_without_category() {
896 let mut engine = create_test_engine();
897 engine.register(EchoWorkflow).unwrap();
898 let info = engine.handler_info("echo-workflow").unwrap();
899 assert!(info.category.is_none());
900 }
901
902 struct BadCategoryWorkflow(&'static str);
903
904 impl WorkflowHandler for BadCategoryWorkflow {
905 fn name(&self) -> &str {
906 "bad-category"
907 }
908 fn category(&self) -> Option<&str> {
909 Some(self.0)
910 }
911 fn execute<'a>(
912 &'a self,
913 _ctx: &'a mut WorkflowContext,
914 ) -> crate::handler::HandlerFuture<'a> {
915 Box::pin(async move { Ok(()) })
916 }
917 }
918
919 #[test]
920 fn engine_register_rejects_empty_category() {
921 let mut engine = create_test_engine();
922 let err = engine.register(BadCategoryWorkflow("")).unwrap_err();
923 match err {
924 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty category")),
925 other => panic!("expected InvalidWorkflow, got {other:?}"),
926 }
927 }
928
929 #[test]
930 fn engine_register_rejects_leading_slash_category() {
931 let mut engine = create_test_engine();
932 let err = engine
933 .register(BadCategoryWorkflow("/data/etl"))
934 .unwrap_err();
935 match err {
936 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("leading '/'")),
937 other => panic!("expected InvalidWorkflow, got {other:?}"),
938 }
939 }
940
941 #[test]
942 fn engine_register_rejects_trailing_slash_category() {
943 let mut engine = create_test_engine();
944 let err = engine
945 .register(BadCategoryWorkflow("data/etl/"))
946 .unwrap_err();
947 match err {
948 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("trailing '/'")),
949 other => panic!("expected InvalidWorkflow, got {other:?}"),
950 }
951 }
952
953 #[test]
954 fn engine_register_rejects_double_slash_category() {
955 let mut engine = create_test_engine();
956 let err = engine
957 .register(BadCategoryWorkflow("data//etl"))
958 .unwrap_err();
959 match err {
960 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty segment")),
961 other => panic!("expected InvalidWorkflow, got {other:?}"),
962 }
963 }
964
965 #[test]
966 fn engine_register_rejects_whitespace_only_segment_category() {
967 let mut engine = create_test_engine();
968 let err = engine
969 .register(BadCategoryWorkflow("data/ /etl"))
970 .unwrap_err();
971 match err {
972 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("whitespace-only segment")),
973 other => panic!("expected InvalidWorkflow, got {other:?}"),
974 }
975 }
976
977 #[test]
978 fn engine_register_accepts_valid_nested_category() {
979 let mut engine = create_test_engine();
980 assert!(engine.register(CategorizedWorkflow).is_ok());
981 }
982
983 #[tokio::test]
984 async fn engine_unknown_workflow_returns_error() {
985 let engine = create_test_engine();
986 let result = engine
987 .run_handler("unknown", TriggerKind::Manual, json!({}))
988 .await;
989 assert!(result.is_err());
990 match result {
991 Err(EngineError::InvalidWorkflow(msg)) => {
992 assert!(msg.contains("no handler registered"));
993 }
994 _ => panic!("expected InvalidWorkflow error"),
995 }
996 }
997
998 #[tokio::test]
999 async fn engine_enqueue_handler_creates_pending_run() {
1000 let mut engine = create_test_engine();
1001 engine.register(EchoWorkflow).unwrap();
1002
1003 let run = engine
1004 .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
1005 .await
1006 .unwrap();
1007 assert_eq!(run.status.state, RunStatus::Pending);
1008 assert_eq!(run.workflow_name, "echo-workflow");
1009 }
1010
1011 #[tokio::test]
1012 async fn engine_register_boxed() {
1013 let mut engine = create_test_engine();
1014 let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
1015 let result = engine.register_boxed(handler);
1016 assert!(result.is_ok());
1017 assert_eq!(engine.handler_names().len(), 1);
1018 }
1019
1020 #[tokio::test]
1021 async fn engine_store_and_provider_accessors() {
1022 let store = Arc::new(InMemoryStore::new());
1023 let inner = ClaudeCodeProvider::new();
1024 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
1025 inner,
1026 "/tmp/ironflow-fixtures",
1027 ));
1028 let engine = Engine::new(store.clone(), provider.clone());
1029
1030 let _ = engine.store();
1032 let _ = engine.provider();
1033 }
1034
1035 use crate::operation::Operation;
1040 use ironflow_store::models::StepKind;
1041 use std::future::Future;
1042 use std::pin::Pin;
1043
1044 struct FakeGitlabOp {
1045 project_id: u64,
1046 title: String,
1047 }
1048
1049 impl Operation for FakeGitlabOp {
1050 fn kind(&self) -> &str {
1051 "gitlab"
1052 }
1053
1054 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1055 Box::pin(async move {
1056 Ok(json!({
1057 "issue_id": 42,
1058 "project_id": self.project_id,
1059 "title": self.title,
1060 }))
1061 })
1062 }
1063
1064 fn input(&self) -> Option<Value> {
1065 Some(json!({
1066 "project_id": self.project_id,
1067 "title": self.title,
1068 }))
1069 }
1070 }
1071
1072 struct FailingOp;
1073
1074 impl Operation for FailingOp {
1075 fn kind(&self) -> &str {
1076 "broken-service"
1077 }
1078
1079 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1080 Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
1081 }
1082 }
1083
1084 struct OperationWorkflow;
1085
1086 impl WorkflowHandler for OperationWorkflow {
1087 fn name(&self) -> &str {
1088 "operation-workflow"
1089 }
1090
1091 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1092 Box::pin(async move {
1093 let op = FakeGitlabOp {
1094 project_id: 123,
1095 title: "Bug report".to_string(),
1096 };
1097 ctx.operation("create-issue", &op).await?;
1098 Ok(())
1099 })
1100 }
1101 }
1102
1103 struct FailingOperationWorkflow;
1104
1105 impl WorkflowHandler for FailingOperationWorkflow {
1106 fn name(&self) -> &str {
1107 "failing-operation-workflow"
1108 }
1109
1110 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1111 Box::pin(async move {
1112 ctx.operation("broken-call", &FailingOp).await?;
1113 Ok(())
1114 })
1115 }
1116 }
1117
1118 struct MixedWorkflow;
1119
1120 impl WorkflowHandler for MixedWorkflow {
1121 fn name(&self) -> &str {
1122 "mixed-workflow"
1123 }
1124
1125 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1126 Box::pin(async move {
1127 ctx.shell("build", ShellConfig::new("echo built")).await?;
1128 let op = FakeGitlabOp {
1129 project_id: 456,
1130 title: "Deploy done".to_string(),
1131 };
1132 let result = ctx.operation("notify-gitlab", &op).await?;
1133 assert_eq!(result.output["issue_id"], 42);
1134 Ok(())
1135 })
1136 }
1137 }
1138
1139 #[tokio::test]
1140 async fn operation_step_happy_path() {
1141 let mut engine = create_test_engine();
1142 engine.register(OperationWorkflow).unwrap();
1143
1144 let run = engine
1145 .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
1146 .await
1147 .unwrap();
1148
1149 assert_eq!(run.status.state, RunStatus::Completed);
1150
1151 let steps = engine.store().list_steps(run.id).await.unwrap();
1152
1153 assert_eq!(steps.len(), 1);
1154 assert_eq!(steps[0].name, "create-issue");
1155 assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
1156 assert_eq!(
1157 steps[0].status.state,
1158 ironflow_store::models::StepStatus::Completed
1159 );
1160
1161 let output = steps[0].output.as_ref().unwrap();
1162 assert_eq!(output["issue_id"], 42);
1163 assert_eq!(output["project_id"], 123);
1164
1165 let input = steps[0].input.as_ref().unwrap();
1166 assert_eq!(input["project_id"], 123);
1167 assert_eq!(input["title"], "Bug report");
1168 }
1169
1170 #[tokio::test]
1171 async fn operation_step_failure_marks_run_failed() {
1172 let mut engine = create_test_engine();
1173 engine.register(FailingOperationWorkflow).unwrap();
1174
1175 let result = engine
1176 .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
1177 .await;
1178
1179 assert!(result.is_err());
1180 }
1181
1182 #[tokio::test]
1183 async fn operation_mixed_with_shell_steps() {
1184 let mut engine = create_test_engine();
1185 engine.register(MixedWorkflow).unwrap();
1186
1187 let run = engine
1188 .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
1189 .await
1190 .unwrap();
1191
1192 assert_eq!(run.status.state, RunStatus::Completed);
1193
1194 let steps = engine.store().list_steps(run.id).await.unwrap();
1195
1196 assert_eq!(steps.len(), 2);
1197 assert_eq!(steps[0].kind, StepKind::Shell);
1198 assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
1199 assert_eq!(steps[0].position, 0);
1200 assert_eq!(steps[1].position, 1);
1201 }
1202
1203 use crate::config::ApprovalConfig;
1208
1209 struct SingleApprovalWorkflow;
1210
1211 impl WorkflowHandler for SingleApprovalWorkflow {
1212 fn name(&self) -> &str {
1213 "single-approval"
1214 }
1215
1216 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1217 Box::pin(async move {
1218 ctx.shell("build", ShellConfig::new("echo built")).await?;
1219 ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
1220 ctx.shell("deploy", ShellConfig::new("echo deployed"))
1221 .await?;
1222 Ok(())
1223 })
1224 }
1225 }
1226
1227 struct DoubleApprovalWorkflow;
1228
1229 impl WorkflowHandler for DoubleApprovalWorkflow {
1230 fn name(&self) -> &str {
1231 "double-approval"
1232 }
1233
1234 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1235 Box::pin(async move {
1236 ctx.shell("build", ShellConfig::new("echo built")).await?;
1237 ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1238 .await?;
1239 ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1240 .await?;
1241 ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1242 .await?;
1243 ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1244 .await?;
1245 Ok(())
1246 })
1247 }
1248 }
1249
1250 #[tokio::test]
1251 async fn approval_pauses_run() {
1252 let mut engine = create_test_engine();
1253 engine.register(SingleApprovalWorkflow).unwrap();
1254
1255 let run = engine
1256 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1257 .await
1258 .unwrap();
1259
1260 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1261
1262 let steps = engine.store().list_steps(run.id).await.unwrap();
1263 assert_eq!(steps.len(), 2); assert_eq!(steps[0].kind, StepKind::Shell);
1265 assert_eq!(steps[0].status.state, StepStatus::Completed);
1266 assert_eq!(steps[1].kind, StepKind::Approval);
1267 assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1268 }
1269
1270 #[tokio::test]
1271 async fn approval_resume_completes_run() {
1272 let mut engine = create_test_engine();
1273 engine.register(SingleApprovalWorkflow).unwrap();
1274
1275 let run = engine
1277 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1278 .await
1279 .unwrap();
1280 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1281
1282 engine
1284 .store()
1285 .update_run_status(run.id, RunStatus::Running)
1286 .await
1287 .unwrap();
1288
1289 let resumed = engine.resume_run(run.id).await.unwrap();
1291 assert_eq!(resumed.status.state, RunStatus::Completed);
1292
1293 let steps = engine.store().list_steps(run.id).await.unwrap();
1294 assert_eq!(steps.len(), 3); assert_eq!(steps[0].name, "build");
1296 assert_eq!(steps[0].status.state, StepStatus::Completed);
1297 assert_eq!(steps[1].name, "gate");
1298 assert_eq!(steps[1].kind, StepKind::Approval);
1299 assert_eq!(steps[1].status.state, StepStatus::Completed);
1300 assert_eq!(steps[2].name, "deploy");
1301 assert_eq!(steps[2].status.state, StepStatus::Completed);
1302 }
1303
1304 #[tokio::test]
1305 async fn double_approval_two_resumes() {
1306 let mut engine = create_test_engine();
1307 engine.register(DoubleApprovalWorkflow).unwrap();
1308
1309 let run = engine
1311 .run_handler("double-approval", TriggerKind::Manual, json!({}))
1312 .await
1313 .unwrap();
1314 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1315
1316 let steps = engine.store().list_steps(run.id).await.unwrap();
1317 assert_eq!(steps.len(), 2); engine
1321 .store()
1322 .update_run_status(run.id, RunStatus::Running)
1323 .await
1324 .unwrap();
1325
1326 let resumed = engine.resume_run(run.id).await.unwrap();
1327 assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1328
1329 let steps = engine.store().list_steps(run.id).await.unwrap();
1330 assert_eq!(steps.len(), 4); engine
1334 .store()
1335 .update_run_status(run.id, RunStatus::Running)
1336 .await
1337 .unwrap();
1338
1339 let final_run = engine.resume_run(run.id).await.unwrap();
1340 assert_eq!(final_run.status.state, RunStatus::Completed);
1341
1342 let steps = engine.store().list_steps(run.id).await.unwrap();
1343 assert_eq!(steps.len(), 5);
1344 assert_eq!(steps[0].name, "build");
1345 assert_eq!(steps[1].name, "staging-gate");
1346 assert_eq!(steps[2].name, "deploy-staging");
1347 assert_eq!(steps[3].name, "prod-gate");
1348 assert_eq!(steps[4].name, "deploy-prod");
1349
1350 for step in &steps {
1351 assert_eq!(step.status.state, StepStatus::Completed);
1352 }
1353 }
1354}