1use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::{DateTime, Utc};
15use serde_json::Value;
16use tracing::{error, info};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
24use ironflow_store::store::Store;
25#[cfg(feature = "prometheus")]
26use metrics::{counter, gauge, histogram};
27
28use crate::context::WorkflowContext;
29use crate::error::EngineError;
30use crate::handler::{WorkflowHandler, WorkflowInfo};
31use crate::notify::{Event, EventPublisher, EventSubscriber};
32
33pub struct Engine {
74 store: Arc<dyn Store>,
75 provider: Arc<dyn AgentProvider>,
76 handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
77 event_publisher: EventPublisher,
78}
79
80fn validate_category(handler_name: &str, category: &str) -> Result<(), EngineError> {
90 let reject = |reason: &str| {
91 Err(EngineError::InvalidWorkflow(format!(
92 "handler '{handler_name}' has invalid category '{category}': {reason}"
93 )))
94 };
95
96 if category.is_empty() {
97 return reject("empty category");
98 }
99 if category.starts_with('/') {
100 return reject("leading '/'");
101 }
102 if category.ends_with('/') {
103 return reject("trailing '/'");
104 }
105 for segment in category.split('/') {
106 if segment.is_empty() {
107 return reject("empty segment (double '/')");
108 }
109 if segment.trim().is_empty() {
110 return reject("whitespace-only segment");
111 }
112 }
113 Ok(())
114}
115
116impl Engine {
117 pub fn new(store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
133 Self {
134 store,
135 provider,
136 handlers: HashMap::new(),
137 event_publisher: EventPublisher::new(),
138 }
139 }
140
141 pub fn store(&self) -> &Arc<dyn Store> {
143 &self.store
144 }
145
146 pub fn provider(&self) -> &Arc<dyn AgentProvider> {
148 &self.provider
149 }
150
151 fn build_context(&self, run_id: Uuid) -> WorkflowContext {
153 let handlers = self.handlers.clone();
154 let resolver: crate::context::HandlerResolver =
155 Arc::new(move |name: &str| handlers.get(name).cloned());
156 WorkflowContext::with_handler_resolver(
157 run_id,
158 self.store.clone(),
159 self.provider.clone(),
160 resolver,
161 )
162 }
163
164 pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
208 let name = handler.name().to_string();
209 if self.handlers.contains_key(&name) {
210 return Err(EngineError::InvalidWorkflow(format!(
211 "handler '{}' already registered",
212 name
213 )));
214 }
215 if let Some(category) = handler.category() {
216 validate_category(&name, category)?;
217 }
218 self.handlers.insert(name, Arc::new(handler));
219 Ok(())
220 }
221
222 pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
229 let name = handler.name().to_string();
230 if self.handlers.contains_key(&name) {
231 return Err(EngineError::InvalidWorkflow(format!(
232 "handler '{}' already registered",
233 name
234 )));
235 }
236 if let Some(category) = handler.category() {
237 validate_category(&name, category)?;
238 }
239 self.handlers.insert(name, Arc::from(handler));
240 Ok(())
241 }
242
243 pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
245 self.handlers.get(name)
246 }
247
248 pub fn handler_names(&self) -> Vec<&str> {
250 self.handlers.keys().map(|s| s.as_str()).collect()
251 }
252
253 pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
255 self.handlers.get(name).map(|h| h.describe())
256 }
257
258 pub fn subscribe(
283 &mut self,
284 subscriber: impl EventSubscriber + 'static,
285 event_types: &[&'static str],
286 ) {
287 self.event_publisher.subscribe(subscriber, event_types);
288 }
289
290 pub fn event_publisher(&self) -> &EventPublisher {
295 &self.event_publisher
296 }
297
298 #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
328 pub async fn run_handler(
329 &self,
330 handler_name: &str,
331 trigger: TriggerKind,
332 payload: Value,
333 ) -> Result<Run, EngineError> {
334 let handler = self
335 .handlers
336 .get(handler_name)
337 .ok_or_else(|| {
338 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
339 })?
340 .clone();
341
342 let handler_version = handler.version().map(str::to_string);
343 let run = self
344 .store
345 .create_run(NewRun {
346 workflow_name: handler_name.to_string(),
347 trigger,
348 payload,
349 max_retries: 0,
350 handler_version,
351 labels: handler.default_labels(),
352 scheduled_at: None,
353 })
354 .await?;
355
356 let run_id = run.id;
357 info!(run_id = %run_id, handler_version = run.handler_version.as_deref().unwrap_or(""), "run created");
358
359 self.store
360 .update_run_status(run_id, RunStatus::Running)
361 .await?;
362
363 #[cfg(feature = "prometheus")]
364 gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
365
366 let run_start = Instant::now();
367 let mut ctx = self.build_context(run_id);
368
369 let result = handler.execute(&mut ctx).await;
370 self.finalize_run(run_id, handler_name, result, &ctx, run_start)
371 .await
372 }
373
374 #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
383 pub async fn enqueue_handler(
384 &self,
385 handler_name: &str,
386 trigger: TriggerKind,
387 payload: Value,
388 max_retries: u32,
389 ) -> Result<Run, EngineError> {
390 self.enqueue_handler_with_options(
391 handler_name,
392 trigger,
393 payload,
394 max_retries,
395 HashMap::new(),
396 None,
397 )
398 .await
399 }
400
401 #[tracing::instrument(name = "engine.enqueue_handler_with_options", skip_all, fields(workflow = %handler_name))]
407 pub async fn enqueue_handler_with_options(
408 &self,
409 handler_name: &str,
410 trigger: TriggerKind,
411 payload: Value,
412 max_retries: u32,
413 labels: HashMap<String, String>,
414 scheduled_at: Option<DateTime<Utc>>,
415 ) -> Result<Run, EngineError> {
416 let handler = self.handlers.get(handler_name).ok_or_else(|| {
417 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
418 })?;
419
420 let handler_version = handler.version().map(str::to_string);
421 let mut merged_labels = handler.default_labels();
422 merged_labels.extend(labels);
423
424 let run = self
425 .store
426 .create_run(NewRun {
427 workflow_name: handler_name.to_string(),
428 trigger,
429 payload,
430 max_retries,
431 handler_version,
432 labels: merged_labels,
433 scheduled_at,
434 })
435 .await?;
436
437 info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
438 Ok(run)
439 }
440
441 #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
450 pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
451 let run = self
452 .store
453 .get_run(run_id)
454 .await?
455 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
456
457 let handler = self
458 .handlers
459 .get(&run.workflow_name)
460 .ok_or_else(|| {
461 EngineError::InvalidWorkflow(format!(
462 "no handler registered: {}",
463 run.workflow_name
464 ))
465 })?
466 .clone();
467
468 #[cfg(feature = "prometheus")]
469 gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
470
471 let run_start = Instant::now();
472 let mut ctx = self.build_context(run_id);
473
474 let result = handler.execute(&mut ctx).await;
475 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
476 .await
477 }
478
479 #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
487 pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
488 self.execute_handler_run(run_id).await
489 }
490
491 #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
505 pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
506 let run = self
507 .store
508 .get_run(run_id)
509 .await?
510 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
511
512 let handler = self
513 .handlers
514 .get(&run.workflow_name)
515 .ok_or_else(|| {
516 EngineError::InvalidWorkflow(format!(
517 "no handler registered: {}",
518 run.workflow_name
519 ))
520 })?
521 .clone();
522
523 info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
524
525 let run_start = Instant::now();
526 let mut ctx = self.build_context(run_id);
527 ctx.load_replay_steps().await?;
528
529 let result = handler.execute(&mut ctx).await;
530 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
531 .await
532 }
533
534 async fn finalize_run(
540 &self,
541 run_id: Uuid,
542 workflow_name: &str,
543 result: Result<(), EngineError>,
544 ctx: &WorkflowContext,
545 run_start: Instant,
546 ) -> Result<Run, EngineError> {
547 let total_duration = run_start.elapsed().as_millis() as u64;
548 let completed_at = Utc::now();
549
550 let final_status;
551 let final_run;
552
553 match result {
554 Ok(()) => {
555 final_status = RunStatus::Completed;
556 final_run = self
557 .store
558 .update_run_returning(
559 run_id,
560 RunUpdate {
561 status: Some(RunStatus::Completed),
562 cost_usd: Some(ctx.total_cost_usd()),
563 duration_ms: Some(total_duration),
564 completed_at: Some(completed_at),
565 ..RunUpdate::default()
566 },
567 )
568 .await?;
569
570 info!(
571 run_id = %run_id,
572 cost_usd = %ctx.total_cost_usd(),
573 duration_ms = total_duration,
574 "run completed"
575 );
576 }
577 Err(EngineError::ApprovalRequired {
578 run_id: approval_run_id,
579 step_id,
580 ref message,
581 }) => {
582 final_status = RunStatus::AwaitingApproval;
583 final_run = self
584 .store
585 .update_run_returning(
586 run_id,
587 RunUpdate {
588 status: Some(RunStatus::AwaitingApproval),
589 cost_usd: Some(ctx.total_cost_usd()),
590 duration_ms: Some(total_duration),
591 ..RunUpdate::default()
592 },
593 )
594 .await?;
595
596 info!(
597 run_id = %approval_run_id,
598 step_id = %step_id,
599 message = %message,
600 "run awaiting approval"
601 );
602 }
603 Err(err) => {
604 final_status = RunStatus::Failed;
605 if let Err(store_err) = self
606 .store
607 .update_run(
608 run_id,
609 RunUpdate {
610 status: Some(RunStatus::Failed),
611 error: Some(err.to_string()),
612 cost_usd: Some(ctx.total_cost_usd()),
613 duration_ms: Some(total_duration),
614 completed_at: Some(completed_at),
615 ..RunUpdate::default()
616 },
617 )
618 .await
619 {
620 error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
621 }
622
623 error!(run_id = %run_id, error = %err, "run failed");
624
625 self.publish_run_status_changed(
626 workflow_name,
627 run_id,
628 final_status,
629 Some(err.to_string()),
630 ctx,
631 total_duration,
632 );
633
634 #[cfg(feature = "prometheus")]
635 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
636
637 return Err(err);
638 }
639 }
640
641 self.publish_run_status_changed(
642 workflow_name,
643 run_id,
644 final_status,
645 None,
646 ctx,
647 total_duration,
648 );
649
650 #[cfg(feature = "prometheus")]
651 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
652
653 Ok(final_run)
654 }
655
656 #[cfg(feature = "prometheus")]
658 fn emit_run_metrics(
659 &self,
660 workflow_name: &str,
661 status: RunStatus,
662 duration_ms: u64,
663 ctx: &WorkflowContext,
664 ) {
665 let status_str = status.to_string();
666 let wf = workflow_name.to_string();
667
668 counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
669 histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
670 .record(duration_ms as f64 / 1000.0);
671 histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
672 ctx.total_cost_usd()
673 .to_string()
674 .parse::<f64>()
675 .unwrap_or(0.0),
676 );
677 gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
678 }
679
680 fn publish_run_status_changed(
685 &self,
686 workflow_name: &str,
687 run_id: Uuid,
688 to: RunStatus,
689 error: Option<String>,
690 ctx: &WorkflowContext,
691 duration_ms: u64,
692 ) {
693 let now = Utc::now();
694 let cost_usd = ctx.total_cost_usd();
695 let wf = workflow_name.to_string();
696
697 self.event_publisher.publish(Event::RunStatusChanged {
698 run_id,
699 workflow_name: wf.clone(),
700 from: RunStatus::Running,
701 to,
702 error: error.clone(),
703 cost_usd,
704 duration_ms,
705 at: now,
706 });
707
708 if to == RunStatus::Failed {
709 self.event_publisher.publish(Event::RunFailed {
710 run_id,
711 workflow_name: wf,
712 error,
713 cost_usd,
714 duration_ms,
715 at: now,
716 });
717 }
718 }
719}
720
721impl fmt::Debug for Engine {
722 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
723 f.debug_struct("Engine")
724 .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
725 .finish_non_exhaustive()
726 }
727}
728
729#[cfg(test)]
730mod tests {
731 use super::*;
732 use crate::config::ShellConfig;
733 use crate::handler::{HandlerFuture, WorkflowHandler};
734 use ironflow_core::providers::claude::ClaudeCodeProvider;
735 use ironflow_core::providers::record_replay::RecordReplayProvider;
736 use ironflow_store::memory::InMemoryStore;
737 use ironflow_store::models::StepStatus;
738 use serde_json::json;
739
740 struct EchoWorkflow;
742
743 impl WorkflowHandler for EchoWorkflow {
744 fn name(&self) -> &str {
745 "echo-workflow"
746 }
747
748 fn describe(&self) -> WorkflowInfo {
749 WorkflowInfo {
750 description: "A simple workflow that echoes hello".to_string(),
751 source_code: None,
752 sub_workflows: Vec::new(),
753 category: None,
754 version: self.version().map(str::to_string),
755 input_schema: None,
756 default_labels: HashMap::new(),
757 }
758 }
759
760 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
761 Box::pin(async move {
762 ctx.shell("greet", ShellConfig::new("echo hello")).await?;
763 Ok(())
764 })
765 }
766 }
767
768 struct FailingWorkflow;
770
771 impl WorkflowHandler for FailingWorkflow {
772 fn name(&self) -> &str {
773 "failing-workflow"
774 }
775
776 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
777 Box::pin(async move {
778 ctx.shell("fail", ShellConfig::new("exit 1")).await?;
779 Ok(())
780 })
781 }
782 }
783
784 fn create_test_engine() -> Engine {
785 let store = Arc::new(InMemoryStore::new());
786 let inner = ClaudeCodeProvider::new();
787 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
788 inner,
789 "/tmp/ironflow-fixtures",
790 ));
791 Engine::new(store, provider)
792 }
793
794 #[test]
795 fn engine_new_creates_instance() {
796 let engine = create_test_engine();
797 assert_eq!(engine.handler_names().len(), 0);
798 }
799
800 #[test]
801 fn engine_register_handler() {
802 let mut engine = create_test_engine();
803 let result = engine.register(EchoWorkflow);
804 assert!(result.is_ok());
805 assert_eq!(engine.handler_names().len(), 1);
806 assert!(engine.handler_names().contains(&"echo-workflow"));
807 }
808
809 #[test]
810 fn engine_register_duplicate_returns_error() {
811 let mut engine = create_test_engine();
812 engine.register(EchoWorkflow).unwrap();
813 let result = engine.register(EchoWorkflow);
814 assert!(result.is_err());
815 }
816
817 #[test]
818 fn engine_get_handler_found() {
819 let mut engine = create_test_engine();
820 engine.register(EchoWorkflow).unwrap();
821 let handler = engine.get_handler("echo-workflow");
822 assert!(handler.is_some());
823 }
824
825 #[test]
826 fn engine_get_handler_not_found() {
827 let engine = create_test_engine();
828 let handler = engine.get_handler("nonexistent");
829 assert!(handler.is_none());
830 }
831
832 #[test]
833 fn engine_handler_names_lists_all() {
834 let mut engine = create_test_engine();
835 engine.register(EchoWorkflow).unwrap();
836 engine.register(FailingWorkflow).unwrap();
837 let names = engine.handler_names();
838 assert_eq!(names.len(), 2);
839 assert!(names.contains(&"echo-workflow"));
840 assert!(names.contains(&"failing-workflow"));
841 }
842
843 #[test]
844 fn engine_handler_info_returns_description() {
845 let mut engine = create_test_engine();
846 engine.register(EchoWorkflow).unwrap();
847 let info = engine.handler_info("echo-workflow");
848 assert!(info.is_some());
849 let info = info.unwrap();
850 assert_eq!(info.description, "A simple workflow that echoes hello");
851 }
852
853 struct CategorizedWorkflow;
854
855 impl WorkflowHandler for CategorizedWorkflow {
856 fn name(&self) -> &str {
857 "categorized"
858 }
859 fn category(&self) -> Option<&str> {
860 Some("data/etl")
861 }
862 fn execute<'a>(
863 &'a self,
864 _ctx: &'a mut WorkflowContext,
865 ) -> crate::handler::HandlerFuture<'a> {
866 Box::pin(async move { Ok(()) })
867 }
868 }
869
870 #[test]
871 fn engine_default_describe_propagates_category() {
872 let mut engine = create_test_engine();
873 engine.register(CategorizedWorkflow).unwrap();
874 let info = engine.handler_info("categorized").unwrap();
875 assert_eq!(info.category.as_deref(), Some("data/etl"));
876 }
877
878 #[test]
879 fn engine_default_describe_without_category() {
880 let mut engine = create_test_engine();
881 engine.register(EchoWorkflow).unwrap();
882 let info = engine.handler_info("echo-workflow").unwrap();
883 assert!(info.category.is_none());
884 }
885
886 struct BadCategoryWorkflow(&'static str);
887
888 impl WorkflowHandler for BadCategoryWorkflow {
889 fn name(&self) -> &str {
890 "bad-category"
891 }
892 fn category(&self) -> Option<&str> {
893 Some(self.0)
894 }
895 fn execute<'a>(
896 &'a self,
897 _ctx: &'a mut WorkflowContext,
898 ) -> crate::handler::HandlerFuture<'a> {
899 Box::pin(async move { Ok(()) })
900 }
901 }
902
903 #[test]
904 fn engine_register_rejects_empty_category() {
905 let mut engine = create_test_engine();
906 let err = engine.register(BadCategoryWorkflow("")).unwrap_err();
907 match err {
908 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty category")),
909 other => panic!("expected InvalidWorkflow, got {other:?}"),
910 }
911 }
912
913 #[test]
914 fn engine_register_rejects_leading_slash_category() {
915 let mut engine = create_test_engine();
916 let err = engine
917 .register(BadCategoryWorkflow("/data/etl"))
918 .unwrap_err();
919 match err {
920 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("leading '/'")),
921 other => panic!("expected InvalidWorkflow, got {other:?}"),
922 }
923 }
924
925 #[test]
926 fn engine_register_rejects_trailing_slash_category() {
927 let mut engine = create_test_engine();
928 let err = engine
929 .register(BadCategoryWorkflow("data/etl/"))
930 .unwrap_err();
931 match err {
932 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("trailing '/'")),
933 other => panic!("expected InvalidWorkflow, got {other:?}"),
934 }
935 }
936
937 #[test]
938 fn engine_register_rejects_double_slash_category() {
939 let mut engine = create_test_engine();
940 let err = engine
941 .register(BadCategoryWorkflow("data//etl"))
942 .unwrap_err();
943 match err {
944 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty segment")),
945 other => panic!("expected InvalidWorkflow, got {other:?}"),
946 }
947 }
948
949 #[test]
950 fn engine_register_rejects_whitespace_only_segment_category() {
951 let mut engine = create_test_engine();
952 let err = engine
953 .register(BadCategoryWorkflow("data/ /etl"))
954 .unwrap_err();
955 match err {
956 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("whitespace-only segment")),
957 other => panic!("expected InvalidWorkflow, got {other:?}"),
958 }
959 }
960
961 #[test]
962 fn engine_register_accepts_valid_nested_category() {
963 let mut engine = create_test_engine();
964 assert!(engine.register(CategorizedWorkflow).is_ok());
965 }
966
967 #[tokio::test]
968 async fn engine_unknown_workflow_returns_error() {
969 let engine = create_test_engine();
970 let result = engine
971 .run_handler("unknown", TriggerKind::Manual, json!({}))
972 .await;
973 assert!(result.is_err());
974 match result {
975 Err(EngineError::InvalidWorkflow(msg)) => {
976 assert!(msg.contains("no handler registered"));
977 }
978 _ => panic!("expected InvalidWorkflow error"),
979 }
980 }
981
982 #[tokio::test]
983 async fn engine_enqueue_handler_creates_pending_run() {
984 let mut engine = create_test_engine();
985 engine.register(EchoWorkflow).unwrap();
986
987 let run = engine
988 .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
989 .await
990 .unwrap();
991 assert_eq!(run.status.state, RunStatus::Pending);
992 assert_eq!(run.workflow_name, "echo-workflow");
993 }
994
995 #[tokio::test]
996 async fn engine_register_boxed() {
997 let mut engine = create_test_engine();
998 let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
999 let result = engine.register_boxed(handler);
1000 assert!(result.is_ok());
1001 assert_eq!(engine.handler_names().len(), 1);
1002 }
1003
1004 #[tokio::test]
1005 async fn engine_store_and_provider_accessors() {
1006 let store = Arc::new(InMemoryStore::new());
1007 let inner = ClaudeCodeProvider::new();
1008 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
1009 inner,
1010 "/tmp/ironflow-fixtures",
1011 ));
1012 let engine = Engine::new(store.clone(), provider.clone());
1013
1014 let _ = engine.store();
1016 let _ = engine.provider();
1017 }
1018
1019 use crate::operation::Operation;
1024 use ironflow_store::models::StepKind;
1025 use std::future::Future;
1026 use std::pin::Pin;
1027
1028 struct FakeGitlabOp {
1029 project_id: u64,
1030 title: String,
1031 }
1032
1033 impl Operation for FakeGitlabOp {
1034 fn kind(&self) -> &str {
1035 "gitlab"
1036 }
1037
1038 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1039 Box::pin(async move {
1040 Ok(json!({
1041 "issue_id": 42,
1042 "project_id": self.project_id,
1043 "title": self.title,
1044 }))
1045 })
1046 }
1047
1048 fn input(&self) -> Option<Value> {
1049 Some(json!({
1050 "project_id": self.project_id,
1051 "title": self.title,
1052 }))
1053 }
1054 }
1055
1056 struct FailingOp;
1057
1058 impl Operation for FailingOp {
1059 fn kind(&self) -> &str {
1060 "broken-service"
1061 }
1062
1063 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1064 Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
1065 }
1066 }
1067
1068 struct OperationWorkflow;
1069
1070 impl WorkflowHandler for OperationWorkflow {
1071 fn name(&self) -> &str {
1072 "operation-workflow"
1073 }
1074
1075 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1076 Box::pin(async move {
1077 let op = FakeGitlabOp {
1078 project_id: 123,
1079 title: "Bug report".to_string(),
1080 };
1081 ctx.operation("create-issue", &op).await?;
1082 Ok(())
1083 })
1084 }
1085 }
1086
1087 struct FailingOperationWorkflow;
1088
1089 impl WorkflowHandler for FailingOperationWorkflow {
1090 fn name(&self) -> &str {
1091 "failing-operation-workflow"
1092 }
1093
1094 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1095 Box::pin(async move {
1096 ctx.operation("broken-call", &FailingOp).await?;
1097 Ok(())
1098 })
1099 }
1100 }
1101
1102 struct MixedWorkflow;
1103
1104 impl WorkflowHandler for MixedWorkflow {
1105 fn name(&self) -> &str {
1106 "mixed-workflow"
1107 }
1108
1109 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1110 Box::pin(async move {
1111 ctx.shell("build", ShellConfig::new("echo built")).await?;
1112 let op = FakeGitlabOp {
1113 project_id: 456,
1114 title: "Deploy done".to_string(),
1115 };
1116 let result = ctx.operation("notify-gitlab", &op).await?;
1117 assert_eq!(result.output["issue_id"], 42);
1118 Ok(())
1119 })
1120 }
1121 }
1122
1123 #[tokio::test]
1124 async fn operation_step_happy_path() {
1125 let mut engine = create_test_engine();
1126 engine.register(OperationWorkflow).unwrap();
1127
1128 let run = engine
1129 .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
1130 .await
1131 .unwrap();
1132
1133 assert_eq!(run.status.state, RunStatus::Completed);
1134
1135 let steps = engine.store().list_steps(run.id).await.unwrap();
1136
1137 assert_eq!(steps.len(), 1);
1138 assert_eq!(steps[0].name, "create-issue");
1139 assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
1140 assert_eq!(
1141 steps[0].status.state,
1142 ironflow_store::models::StepStatus::Completed
1143 );
1144
1145 let output = steps[0].output.as_ref().unwrap();
1146 assert_eq!(output["issue_id"], 42);
1147 assert_eq!(output["project_id"], 123);
1148
1149 let input = steps[0].input.as_ref().unwrap();
1150 assert_eq!(input["project_id"], 123);
1151 assert_eq!(input["title"], "Bug report");
1152 }
1153
1154 #[tokio::test]
1155 async fn operation_step_failure_marks_run_failed() {
1156 let mut engine = create_test_engine();
1157 engine.register(FailingOperationWorkflow).unwrap();
1158
1159 let result = engine
1160 .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
1161 .await;
1162
1163 assert!(result.is_err());
1164 }
1165
1166 #[tokio::test]
1167 async fn operation_mixed_with_shell_steps() {
1168 let mut engine = create_test_engine();
1169 engine.register(MixedWorkflow).unwrap();
1170
1171 let run = engine
1172 .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
1173 .await
1174 .unwrap();
1175
1176 assert_eq!(run.status.state, RunStatus::Completed);
1177
1178 let steps = engine.store().list_steps(run.id).await.unwrap();
1179
1180 assert_eq!(steps.len(), 2);
1181 assert_eq!(steps[0].kind, StepKind::Shell);
1182 assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
1183 assert_eq!(steps[0].position, 0);
1184 assert_eq!(steps[1].position, 1);
1185 }
1186
1187 use crate::config::ApprovalConfig;
1192
1193 struct SingleApprovalWorkflow;
1194
1195 impl WorkflowHandler for SingleApprovalWorkflow {
1196 fn name(&self) -> &str {
1197 "single-approval"
1198 }
1199
1200 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1201 Box::pin(async move {
1202 ctx.shell("build", ShellConfig::new("echo built")).await?;
1203 ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
1204 ctx.shell("deploy", ShellConfig::new("echo deployed"))
1205 .await?;
1206 Ok(())
1207 })
1208 }
1209 }
1210
1211 struct DoubleApprovalWorkflow;
1212
1213 impl WorkflowHandler for DoubleApprovalWorkflow {
1214 fn name(&self) -> &str {
1215 "double-approval"
1216 }
1217
1218 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1219 Box::pin(async move {
1220 ctx.shell("build", ShellConfig::new("echo built")).await?;
1221 ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1222 .await?;
1223 ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1224 .await?;
1225 ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1226 .await?;
1227 ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1228 .await?;
1229 Ok(())
1230 })
1231 }
1232 }
1233
1234 #[tokio::test]
1235 async fn approval_pauses_run() {
1236 let mut engine = create_test_engine();
1237 engine.register(SingleApprovalWorkflow).unwrap();
1238
1239 let run = engine
1240 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1241 .await
1242 .unwrap();
1243
1244 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1245
1246 let steps = engine.store().list_steps(run.id).await.unwrap();
1247 assert_eq!(steps.len(), 2); assert_eq!(steps[0].kind, StepKind::Shell);
1249 assert_eq!(steps[0].status.state, StepStatus::Completed);
1250 assert_eq!(steps[1].kind, StepKind::Approval);
1251 assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1252 }
1253
1254 #[tokio::test]
1255 async fn approval_resume_completes_run() {
1256 let mut engine = create_test_engine();
1257 engine.register(SingleApprovalWorkflow).unwrap();
1258
1259 let run = engine
1261 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1262 .await
1263 .unwrap();
1264 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1265
1266 engine
1268 .store()
1269 .update_run_status(run.id, RunStatus::Running)
1270 .await
1271 .unwrap();
1272
1273 let resumed = engine.resume_run(run.id).await.unwrap();
1275 assert_eq!(resumed.status.state, RunStatus::Completed);
1276
1277 let steps = engine.store().list_steps(run.id).await.unwrap();
1278 assert_eq!(steps.len(), 3); assert_eq!(steps[0].name, "build");
1280 assert_eq!(steps[0].status.state, StepStatus::Completed);
1281 assert_eq!(steps[1].name, "gate");
1282 assert_eq!(steps[1].kind, StepKind::Approval);
1283 assert_eq!(steps[1].status.state, StepStatus::Completed);
1284 assert_eq!(steps[2].name, "deploy");
1285 assert_eq!(steps[2].status.state, StepStatus::Completed);
1286 }
1287
1288 #[tokio::test]
1289 async fn double_approval_two_resumes() {
1290 let mut engine = create_test_engine();
1291 engine.register(DoubleApprovalWorkflow).unwrap();
1292
1293 let run = engine
1295 .run_handler("double-approval", TriggerKind::Manual, json!({}))
1296 .await
1297 .unwrap();
1298 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1299
1300 let steps = engine.store().list_steps(run.id).await.unwrap();
1301 assert_eq!(steps.len(), 2); engine
1305 .store()
1306 .update_run_status(run.id, RunStatus::Running)
1307 .await
1308 .unwrap();
1309
1310 let resumed = engine.resume_run(run.id).await.unwrap();
1311 assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1312
1313 let steps = engine.store().list_steps(run.id).await.unwrap();
1314 assert_eq!(steps.len(), 4); engine
1318 .store()
1319 .update_run_status(run.id, RunStatus::Running)
1320 .await
1321 .unwrap();
1322
1323 let final_run = engine.resume_run(run.id).await.unwrap();
1324 assert_eq!(final_run.status.state, RunStatus::Completed);
1325
1326 let steps = engine.store().list_steps(run.id).await.unwrap();
1327 assert_eq!(steps.len(), 5);
1328 assert_eq!(steps[0].name, "build");
1329 assert_eq!(steps[1].name, "staging-gate");
1330 assert_eq!(steps[2].name, "deploy-staging");
1331 assert_eq!(steps[3].name, "prod-gate");
1332 assert_eq!(steps[4].name, "deploy-prod");
1333
1334 for step in &steps {
1335 assert_eq!(step.status.state, StepStatus::Completed);
1336 }
1337 }
1338}