1use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::Utc;
15use serde_json::Value;
16use tracing::{error, info};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
24use ironflow_store::store::Store;
25#[cfg(feature = "prometheus")]
26use metrics::{counter, gauge, histogram};
27
28use crate::context::WorkflowContext;
29use crate::error::EngineError;
30use crate::handler::{WorkflowHandler, WorkflowInfo};
31use crate::notify::{Event, EventPublisher, EventSubscriber};
32
33pub struct Engine {
74 store: Arc<dyn Store>,
75 provider: Arc<dyn AgentProvider>,
76 handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
77 event_publisher: EventPublisher,
78}
79
80fn validate_category(handler_name: &str, category: &str) -> Result<(), EngineError> {
90 let reject = |reason: &str| {
91 Err(EngineError::InvalidWorkflow(format!(
92 "handler '{handler_name}' has invalid category '{category}': {reason}"
93 )))
94 };
95
96 if category.is_empty() {
97 return reject("empty category");
98 }
99 if category.starts_with('/') {
100 return reject("leading '/'");
101 }
102 if category.ends_with('/') {
103 return reject("trailing '/'");
104 }
105 for segment in category.split('/') {
106 if segment.is_empty() {
107 return reject("empty segment (double '/')");
108 }
109 if segment.trim().is_empty() {
110 return reject("whitespace-only segment");
111 }
112 }
113 Ok(())
114}
115
116impl Engine {
117 pub fn new(store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
133 Self {
134 store,
135 provider,
136 handlers: HashMap::new(),
137 event_publisher: EventPublisher::new(),
138 }
139 }
140
141 pub fn store(&self) -> &Arc<dyn Store> {
143 &self.store
144 }
145
146 pub fn provider(&self) -> &Arc<dyn AgentProvider> {
148 &self.provider
149 }
150
151 fn build_context(&self, run_id: Uuid) -> WorkflowContext {
153 let handlers = self.handlers.clone();
154 let resolver: crate::context::HandlerResolver =
155 Arc::new(move |name: &str| handlers.get(name).cloned());
156 WorkflowContext::with_handler_resolver(
157 run_id,
158 self.store.clone(),
159 self.provider.clone(),
160 resolver,
161 )
162 }
163
164 pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
208 let name = handler.name().to_string();
209 if self.handlers.contains_key(&name) {
210 return Err(EngineError::InvalidWorkflow(format!(
211 "handler '{}' already registered",
212 name
213 )));
214 }
215 if let Some(category) = handler.category() {
216 validate_category(&name, category)?;
217 }
218 self.handlers.insert(name, Arc::new(handler));
219 Ok(())
220 }
221
222 pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
229 let name = handler.name().to_string();
230 if self.handlers.contains_key(&name) {
231 return Err(EngineError::InvalidWorkflow(format!(
232 "handler '{}' already registered",
233 name
234 )));
235 }
236 if let Some(category) = handler.category() {
237 validate_category(&name, category)?;
238 }
239 self.handlers.insert(name, Arc::from(handler));
240 Ok(())
241 }
242
243 pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
245 self.handlers.get(name)
246 }
247
248 pub fn handler_names(&self) -> Vec<&str> {
250 self.handlers.keys().map(|s| s.as_str()).collect()
251 }
252
253 pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
255 self.handlers.get(name).map(|h| h.describe())
256 }
257
258 pub fn subscribe(
283 &mut self,
284 subscriber: impl EventSubscriber + 'static,
285 event_types: &[&'static str],
286 ) {
287 self.event_publisher.subscribe(subscriber, event_types);
288 }
289
290 pub fn event_publisher(&self) -> &EventPublisher {
295 &self.event_publisher
296 }
297
298 #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
328 pub async fn run_handler(
329 &self,
330 handler_name: &str,
331 trigger: TriggerKind,
332 payload: Value,
333 ) -> Result<Run, EngineError> {
334 let handler = self
335 .handlers
336 .get(handler_name)
337 .ok_or_else(|| {
338 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
339 })?
340 .clone();
341
342 let handler_version = handler.version().map(str::to_string);
343 let run = self
344 .store
345 .create_run(NewRun {
346 workflow_name: handler_name.to_string(),
347 trigger,
348 payload,
349 max_retries: 0,
350 handler_version,
351 })
352 .await?;
353
354 let run_id = run.id;
355 info!(run_id = %run_id, handler_version = run.handler_version.as_deref().unwrap_or(""), "run created");
356
357 self.store
358 .update_run_status(run_id, RunStatus::Running)
359 .await?;
360
361 #[cfg(feature = "prometheus")]
362 gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
363
364 let run_start = Instant::now();
365 let mut ctx = self.build_context(run_id);
366
367 let result = handler.execute(&mut ctx).await;
368 self.finalize_run(run_id, handler_name, result, &ctx, run_start)
369 .await
370 }
371
372 #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
381 pub async fn enqueue_handler(
382 &self,
383 handler_name: &str,
384 trigger: TriggerKind,
385 payload: Value,
386 max_retries: u32,
387 ) -> Result<Run, EngineError> {
388 let handler = self.handlers.get(handler_name).ok_or_else(|| {
389 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
390 })?;
391
392 let handler_version = handler.version().map(str::to_string);
393 let run = self
394 .store
395 .create_run(NewRun {
396 workflow_name: handler_name.to_string(),
397 trigger,
398 payload,
399 max_retries,
400 handler_version,
401 })
402 .await?;
403
404 info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
405 Ok(run)
406 }
407
408 #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
417 pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
418 let run = self
419 .store
420 .get_run(run_id)
421 .await?
422 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
423
424 let handler = self
425 .handlers
426 .get(&run.workflow_name)
427 .ok_or_else(|| {
428 EngineError::InvalidWorkflow(format!(
429 "no handler registered: {}",
430 run.workflow_name
431 ))
432 })?
433 .clone();
434
435 #[cfg(feature = "prometheus")]
436 gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
437
438 let run_start = Instant::now();
439 let mut ctx = self.build_context(run_id);
440
441 let result = handler.execute(&mut ctx).await;
442 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
443 .await
444 }
445
446 #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
454 pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
455 self.execute_handler_run(run_id).await
456 }
457
458 #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
472 pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
473 let run = self
474 .store
475 .get_run(run_id)
476 .await?
477 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
478
479 let handler = self
480 .handlers
481 .get(&run.workflow_name)
482 .ok_or_else(|| {
483 EngineError::InvalidWorkflow(format!(
484 "no handler registered: {}",
485 run.workflow_name
486 ))
487 })?
488 .clone();
489
490 info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
491
492 let run_start = Instant::now();
493 let mut ctx = self.build_context(run_id);
494 ctx.load_replay_steps().await?;
495
496 let result = handler.execute(&mut ctx).await;
497 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
498 .await
499 }
500
501 async fn finalize_run(
507 &self,
508 run_id: Uuid,
509 workflow_name: &str,
510 result: Result<(), EngineError>,
511 ctx: &WorkflowContext,
512 run_start: Instant,
513 ) -> Result<Run, EngineError> {
514 let total_duration = run_start.elapsed().as_millis() as u64;
515 let completed_at = Utc::now();
516
517 let final_status;
518 let final_run;
519
520 match result {
521 Ok(()) => {
522 final_status = RunStatus::Completed;
523 final_run = self
524 .store
525 .update_run_returning(
526 run_id,
527 RunUpdate {
528 status: Some(RunStatus::Completed),
529 cost_usd: Some(ctx.total_cost_usd()),
530 duration_ms: Some(total_duration),
531 completed_at: Some(completed_at),
532 ..RunUpdate::default()
533 },
534 )
535 .await?;
536
537 info!(
538 run_id = %run_id,
539 cost_usd = %ctx.total_cost_usd(),
540 duration_ms = total_duration,
541 "run completed"
542 );
543 }
544 Err(EngineError::ApprovalRequired {
545 run_id: approval_run_id,
546 step_id,
547 ref message,
548 }) => {
549 final_status = RunStatus::AwaitingApproval;
550 final_run = self
551 .store
552 .update_run_returning(
553 run_id,
554 RunUpdate {
555 status: Some(RunStatus::AwaitingApproval),
556 cost_usd: Some(ctx.total_cost_usd()),
557 duration_ms: Some(total_duration),
558 ..RunUpdate::default()
559 },
560 )
561 .await?;
562
563 info!(
564 run_id = %approval_run_id,
565 step_id = %step_id,
566 message = %message,
567 "run awaiting approval"
568 );
569 }
570 Err(err) => {
571 final_status = RunStatus::Failed;
572 if let Err(store_err) = self
573 .store
574 .update_run(
575 run_id,
576 RunUpdate {
577 status: Some(RunStatus::Failed),
578 error: Some(err.to_string()),
579 cost_usd: Some(ctx.total_cost_usd()),
580 duration_ms: Some(total_duration),
581 completed_at: Some(completed_at),
582 ..RunUpdate::default()
583 },
584 )
585 .await
586 {
587 error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
588 }
589
590 error!(run_id = %run_id, error = %err, "run failed");
591
592 self.publish_run_status_changed(
593 workflow_name,
594 run_id,
595 final_status,
596 Some(err.to_string()),
597 ctx,
598 total_duration,
599 );
600
601 #[cfg(feature = "prometheus")]
602 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
603
604 return Err(err);
605 }
606 }
607
608 self.publish_run_status_changed(
609 workflow_name,
610 run_id,
611 final_status,
612 None,
613 ctx,
614 total_duration,
615 );
616
617 #[cfg(feature = "prometheus")]
618 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
619
620 Ok(final_run)
621 }
622
623 #[cfg(feature = "prometheus")]
625 fn emit_run_metrics(
626 &self,
627 workflow_name: &str,
628 status: RunStatus,
629 duration_ms: u64,
630 ctx: &WorkflowContext,
631 ) {
632 let status_str = status.to_string();
633 let wf = workflow_name.to_string();
634
635 counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
636 histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
637 .record(duration_ms as f64 / 1000.0);
638 histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
639 ctx.total_cost_usd()
640 .to_string()
641 .parse::<f64>()
642 .unwrap_or(0.0),
643 );
644 gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
645 }
646
647 fn publish_run_status_changed(
652 &self,
653 workflow_name: &str,
654 run_id: Uuid,
655 to: RunStatus,
656 error: Option<String>,
657 ctx: &WorkflowContext,
658 duration_ms: u64,
659 ) {
660 let now = Utc::now();
661 let cost_usd = ctx.total_cost_usd();
662 let wf = workflow_name.to_string();
663
664 self.event_publisher.publish(Event::RunStatusChanged {
665 run_id,
666 workflow_name: wf.clone(),
667 from: RunStatus::Running,
668 to,
669 error: error.clone(),
670 cost_usd,
671 duration_ms,
672 at: now,
673 });
674
675 if to == RunStatus::Failed {
676 self.event_publisher.publish(Event::RunFailed {
677 run_id,
678 workflow_name: wf,
679 error,
680 cost_usd,
681 duration_ms,
682 at: now,
683 });
684 }
685 }
686}
687
688impl fmt::Debug for Engine {
689 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
690 f.debug_struct("Engine")
691 .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
692 .finish_non_exhaustive()
693 }
694}
695
696#[cfg(test)]
697mod tests {
698 use super::*;
699 use crate::config::ShellConfig;
700 use crate::handler::{HandlerFuture, WorkflowHandler};
701 use ironflow_core::providers::claude::ClaudeCodeProvider;
702 use ironflow_core::providers::record_replay::RecordReplayProvider;
703 use ironflow_store::memory::InMemoryStore;
704 use ironflow_store::models::StepStatus;
705 use serde_json::json;
706
707 struct EchoWorkflow;
709
710 impl WorkflowHandler for EchoWorkflow {
711 fn name(&self) -> &str {
712 "echo-workflow"
713 }
714
715 fn describe(&self) -> WorkflowInfo {
716 WorkflowInfo {
717 description: "A simple workflow that echoes hello".to_string(),
718 source_code: None,
719 sub_workflows: Vec::new(),
720 category: None,
721 version: self.version().map(str::to_string),
722 }
723 }
724
725 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
726 Box::pin(async move {
727 ctx.shell("greet", ShellConfig::new("echo hello")).await?;
728 Ok(())
729 })
730 }
731 }
732
733 struct FailingWorkflow;
735
736 impl WorkflowHandler for FailingWorkflow {
737 fn name(&self) -> &str {
738 "failing-workflow"
739 }
740
741 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
742 Box::pin(async move {
743 ctx.shell("fail", ShellConfig::new("exit 1")).await?;
744 Ok(())
745 })
746 }
747 }
748
749 fn create_test_engine() -> Engine {
750 let store = Arc::new(InMemoryStore::new());
751 let inner = ClaudeCodeProvider::new();
752 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
753 inner,
754 "/tmp/ironflow-fixtures",
755 ));
756 Engine::new(store, provider)
757 }
758
759 #[test]
760 fn engine_new_creates_instance() {
761 let engine = create_test_engine();
762 assert_eq!(engine.handler_names().len(), 0);
763 }
764
765 #[test]
766 fn engine_register_handler() {
767 let mut engine = create_test_engine();
768 let result = engine.register(EchoWorkflow);
769 assert!(result.is_ok());
770 assert_eq!(engine.handler_names().len(), 1);
771 assert!(engine.handler_names().contains(&"echo-workflow"));
772 }
773
774 #[test]
775 fn engine_register_duplicate_returns_error() {
776 let mut engine = create_test_engine();
777 engine.register(EchoWorkflow).unwrap();
778 let result = engine.register(EchoWorkflow);
779 assert!(result.is_err());
780 }
781
782 #[test]
783 fn engine_get_handler_found() {
784 let mut engine = create_test_engine();
785 engine.register(EchoWorkflow).unwrap();
786 let handler = engine.get_handler("echo-workflow");
787 assert!(handler.is_some());
788 }
789
790 #[test]
791 fn engine_get_handler_not_found() {
792 let engine = create_test_engine();
793 let handler = engine.get_handler("nonexistent");
794 assert!(handler.is_none());
795 }
796
797 #[test]
798 fn engine_handler_names_lists_all() {
799 let mut engine = create_test_engine();
800 engine.register(EchoWorkflow).unwrap();
801 engine.register(FailingWorkflow).unwrap();
802 let names = engine.handler_names();
803 assert_eq!(names.len(), 2);
804 assert!(names.contains(&"echo-workflow"));
805 assert!(names.contains(&"failing-workflow"));
806 }
807
808 #[test]
809 fn engine_handler_info_returns_description() {
810 let mut engine = create_test_engine();
811 engine.register(EchoWorkflow).unwrap();
812 let info = engine.handler_info("echo-workflow");
813 assert!(info.is_some());
814 let info = info.unwrap();
815 assert_eq!(info.description, "A simple workflow that echoes hello");
816 }
817
818 struct CategorizedWorkflow;
819
820 impl WorkflowHandler for CategorizedWorkflow {
821 fn name(&self) -> &str {
822 "categorized"
823 }
824 fn category(&self) -> Option<&str> {
825 Some("data/etl")
826 }
827 fn execute<'a>(
828 &'a self,
829 _ctx: &'a mut WorkflowContext,
830 ) -> crate::handler::HandlerFuture<'a> {
831 Box::pin(async move { Ok(()) })
832 }
833 }
834
835 #[test]
836 fn engine_default_describe_propagates_category() {
837 let mut engine = create_test_engine();
838 engine.register(CategorizedWorkflow).unwrap();
839 let info = engine.handler_info("categorized").unwrap();
840 assert_eq!(info.category.as_deref(), Some("data/etl"));
841 }
842
843 #[test]
844 fn engine_default_describe_without_category() {
845 let mut engine = create_test_engine();
846 engine.register(EchoWorkflow).unwrap();
847 let info = engine.handler_info("echo-workflow").unwrap();
848 assert!(info.category.is_none());
849 }
850
851 struct BadCategoryWorkflow(&'static str);
852
853 impl WorkflowHandler for BadCategoryWorkflow {
854 fn name(&self) -> &str {
855 "bad-category"
856 }
857 fn category(&self) -> Option<&str> {
858 Some(self.0)
859 }
860 fn execute<'a>(
861 &'a self,
862 _ctx: &'a mut WorkflowContext,
863 ) -> crate::handler::HandlerFuture<'a> {
864 Box::pin(async move { Ok(()) })
865 }
866 }
867
868 #[test]
869 fn engine_register_rejects_empty_category() {
870 let mut engine = create_test_engine();
871 let err = engine.register(BadCategoryWorkflow("")).unwrap_err();
872 match err {
873 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty category")),
874 other => panic!("expected InvalidWorkflow, got {other:?}"),
875 }
876 }
877
878 #[test]
879 fn engine_register_rejects_leading_slash_category() {
880 let mut engine = create_test_engine();
881 let err = engine
882 .register(BadCategoryWorkflow("/data/etl"))
883 .unwrap_err();
884 match err {
885 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("leading '/'")),
886 other => panic!("expected InvalidWorkflow, got {other:?}"),
887 }
888 }
889
890 #[test]
891 fn engine_register_rejects_trailing_slash_category() {
892 let mut engine = create_test_engine();
893 let err = engine
894 .register(BadCategoryWorkflow("data/etl/"))
895 .unwrap_err();
896 match err {
897 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("trailing '/'")),
898 other => panic!("expected InvalidWorkflow, got {other:?}"),
899 }
900 }
901
902 #[test]
903 fn engine_register_rejects_double_slash_category() {
904 let mut engine = create_test_engine();
905 let err = engine
906 .register(BadCategoryWorkflow("data//etl"))
907 .unwrap_err();
908 match err {
909 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty segment")),
910 other => panic!("expected InvalidWorkflow, got {other:?}"),
911 }
912 }
913
914 #[test]
915 fn engine_register_rejects_whitespace_only_segment_category() {
916 let mut engine = create_test_engine();
917 let err = engine
918 .register(BadCategoryWorkflow("data/ /etl"))
919 .unwrap_err();
920 match err {
921 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("whitespace-only segment")),
922 other => panic!("expected InvalidWorkflow, got {other:?}"),
923 }
924 }
925
926 #[test]
927 fn engine_register_accepts_valid_nested_category() {
928 let mut engine = create_test_engine();
929 assert!(engine.register(CategorizedWorkflow).is_ok());
930 }
931
932 #[tokio::test]
933 async fn engine_unknown_workflow_returns_error() {
934 let engine = create_test_engine();
935 let result = engine
936 .run_handler("unknown", TriggerKind::Manual, json!({}))
937 .await;
938 assert!(result.is_err());
939 match result {
940 Err(EngineError::InvalidWorkflow(msg)) => {
941 assert!(msg.contains("no handler registered"));
942 }
943 _ => panic!("expected InvalidWorkflow error"),
944 }
945 }
946
947 #[tokio::test]
948 async fn engine_enqueue_handler_creates_pending_run() {
949 let mut engine = create_test_engine();
950 engine.register(EchoWorkflow).unwrap();
951
952 let run = engine
953 .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
954 .await
955 .unwrap();
956 assert_eq!(run.status.state, RunStatus::Pending);
957 assert_eq!(run.workflow_name, "echo-workflow");
958 }
959
960 #[tokio::test]
961 async fn engine_register_boxed() {
962 let mut engine = create_test_engine();
963 let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
964 let result = engine.register_boxed(handler);
965 assert!(result.is_ok());
966 assert_eq!(engine.handler_names().len(), 1);
967 }
968
969 #[tokio::test]
970 async fn engine_store_and_provider_accessors() {
971 let store = Arc::new(InMemoryStore::new());
972 let inner = ClaudeCodeProvider::new();
973 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
974 inner,
975 "/tmp/ironflow-fixtures",
976 ));
977 let engine = Engine::new(store.clone(), provider.clone());
978
979 let _ = engine.store();
981 let _ = engine.provider();
982 }
983
984 use crate::operation::Operation;
989 use ironflow_store::models::StepKind;
990 use std::future::Future;
991 use std::pin::Pin;
992
993 struct FakeGitlabOp {
994 project_id: u64,
995 title: String,
996 }
997
998 impl Operation for FakeGitlabOp {
999 fn kind(&self) -> &str {
1000 "gitlab"
1001 }
1002
1003 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1004 Box::pin(async move {
1005 Ok(json!({
1006 "issue_id": 42,
1007 "project_id": self.project_id,
1008 "title": self.title,
1009 }))
1010 })
1011 }
1012
1013 fn input(&self) -> Option<Value> {
1014 Some(json!({
1015 "project_id": self.project_id,
1016 "title": self.title,
1017 }))
1018 }
1019 }
1020
1021 struct FailingOp;
1022
1023 impl Operation for FailingOp {
1024 fn kind(&self) -> &str {
1025 "broken-service"
1026 }
1027
1028 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1029 Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
1030 }
1031 }
1032
1033 struct OperationWorkflow;
1034
1035 impl WorkflowHandler for OperationWorkflow {
1036 fn name(&self) -> &str {
1037 "operation-workflow"
1038 }
1039
1040 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1041 Box::pin(async move {
1042 let op = FakeGitlabOp {
1043 project_id: 123,
1044 title: "Bug report".to_string(),
1045 };
1046 ctx.operation("create-issue", &op).await?;
1047 Ok(())
1048 })
1049 }
1050 }
1051
1052 struct FailingOperationWorkflow;
1053
1054 impl WorkflowHandler for FailingOperationWorkflow {
1055 fn name(&self) -> &str {
1056 "failing-operation-workflow"
1057 }
1058
1059 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1060 Box::pin(async move {
1061 ctx.operation("broken-call", &FailingOp).await?;
1062 Ok(())
1063 })
1064 }
1065 }
1066
1067 struct MixedWorkflow;
1068
1069 impl WorkflowHandler for MixedWorkflow {
1070 fn name(&self) -> &str {
1071 "mixed-workflow"
1072 }
1073
1074 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1075 Box::pin(async move {
1076 ctx.shell("build", ShellConfig::new("echo built")).await?;
1077 let op = FakeGitlabOp {
1078 project_id: 456,
1079 title: "Deploy done".to_string(),
1080 };
1081 let result = ctx.operation("notify-gitlab", &op).await?;
1082 assert_eq!(result.output["issue_id"], 42);
1083 Ok(())
1084 })
1085 }
1086 }
1087
1088 #[tokio::test]
1089 async fn operation_step_happy_path() {
1090 let mut engine = create_test_engine();
1091 engine.register(OperationWorkflow).unwrap();
1092
1093 let run = engine
1094 .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
1095 .await
1096 .unwrap();
1097
1098 assert_eq!(run.status.state, RunStatus::Completed);
1099
1100 let steps = engine.store().list_steps(run.id).await.unwrap();
1101
1102 assert_eq!(steps.len(), 1);
1103 assert_eq!(steps[0].name, "create-issue");
1104 assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
1105 assert_eq!(
1106 steps[0].status.state,
1107 ironflow_store::models::StepStatus::Completed
1108 );
1109
1110 let output = steps[0].output.as_ref().unwrap();
1111 assert_eq!(output["issue_id"], 42);
1112 assert_eq!(output["project_id"], 123);
1113
1114 let input = steps[0].input.as_ref().unwrap();
1115 assert_eq!(input["project_id"], 123);
1116 assert_eq!(input["title"], "Bug report");
1117 }
1118
1119 #[tokio::test]
1120 async fn operation_step_failure_marks_run_failed() {
1121 let mut engine = create_test_engine();
1122 engine.register(FailingOperationWorkflow).unwrap();
1123
1124 let result = engine
1125 .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
1126 .await;
1127
1128 assert!(result.is_err());
1129 }
1130
1131 #[tokio::test]
1132 async fn operation_mixed_with_shell_steps() {
1133 let mut engine = create_test_engine();
1134 engine.register(MixedWorkflow).unwrap();
1135
1136 let run = engine
1137 .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
1138 .await
1139 .unwrap();
1140
1141 assert_eq!(run.status.state, RunStatus::Completed);
1142
1143 let steps = engine.store().list_steps(run.id).await.unwrap();
1144
1145 assert_eq!(steps.len(), 2);
1146 assert_eq!(steps[0].kind, StepKind::Shell);
1147 assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
1148 assert_eq!(steps[0].position, 0);
1149 assert_eq!(steps[1].position, 1);
1150 }
1151
1152 use crate::config::ApprovalConfig;
1157
1158 struct SingleApprovalWorkflow;
1159
1160 impl WorkflowHandler for SingleApprovalWorkflow {
1161 fn name(&self) -> &str {
1162 "single-approval"
1163 }
1164
1165 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1166 Box::pin(async move {
1167 ctx.shell("build", ShellConfig::new("echo built")).await?;
1168 ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
1169 ctx.shell("deploy", ShellConfig::new("echo deployed"))
1170 .await?;
1171 Ok(())
1172 })
1173 }
1174 }
1175
1176 struct DoubleApprovalWorkflow;
1177
1178 impl WorkflowHandler for DoubleApprovalWorkflow {
1179 fn name(&self) -> &str {
1180 "double-approval"
1181 }
1182
1183 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1184 Box::pin(async move {
1185 ctx.shell("build", ShellConfig::new("echo built")).await?;
1186 ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1187 .await?;
1188 ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1189 .await?;
1190 ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1191 .await?;
1192 ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1193 .await?;
1194 Ok(())
1195 })
1196 }
1197 }
1198
1199 #[tokio::test]
1200 async fn approval_pauses_run() {
1201 let mut engine = create_test_engine();
1202 engine.register(SingleApprovalWorkflow).unwrap();
1203
1204 let run = engine
1205 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1206 .await
1207 .unwrap();
1208
1209 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1210
1211 let steps = engine.store().list_steps(run.id).await.unwrap();
1212 assert_eq!(steps.len(), 2); assert_eq!(steps[0].kind, StepKind::Shell);
1214 assert_eq!(steps[0].status.state, StepStatus::Completed);
1215 assert_eq!(steps[1].kind, StepKind::Approval);
1216 assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1217 }
1218
1219 #[tokio::test]
1220 async fn approval_resume_completes_run() {
1221 let mut engine = create_test_engine();
1222 engine.register(SingleApprovalWorkflow).unwrap();
1223
1224 let run = engine
1226 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1227 .await
1228 .unwrap();
1229 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1230
1231 engine
1233 .store()
1234 .update_run_status(run.id, RunStatus::Running)
1235 .await
1236 .unwrap();
1237
1238 let resumed = engine.resume_run(run.id).await.unwrap();
1240 assert_eq!(resumed.status.state, RunStatus::Completed);
1241
1242 let steps = engine.store().list_steps(run.id).await.unwrap();
1243 assert_eq!(steps.len(), 3); assert_eq!(steps[0].name, "build");
1245 assert_eq!(steps[0].status.state, StepStatus::Completed);
1246 assert_eq!(steps[1].name, "gate");
1247 assert_eq!(steps[1].kind, StepKind::Approval);
1248 assert_eq!(steps[1].status.state, StepStatus::Completed);
1249 assert_eq!(steps[2].name, "deploy");
1250 assert_eq!(steps[2].status.state, StepStatus::Completed);
1251 }
1252
1253 #[tokio::test]
1254 async fn double_approval_two_resumes() {
1255 let mut engine = create_test_engine();
1256 engine.register(DoubleApprovalWorkflow).unwrap();
1257
1258 let run = engine
1260 .run_handler("double-approval", TriggerKind::Manual, json!({}))
1261 .await
1262 .unwrap();
1263 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1264
1265 let steps = engine.store().list_steps(run.id).await.unwrap();
1266 assert_eq!(steps.len(), 2); engine
1270 .store()
1271 .update_run_status(run.id, RunStatus::Running)
1272 .await
1273 .unwrap();
1274
1275 let resumed = engine.resume_run(run.id).await.unwrap();
1276 assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1277
1278 let steps = engine.store().list_steps(run.id).await.unwrap();
1279 assert_eq!(steps.len(), 4); engine
1283 .store()
1284 .update_run_status(run.id, RunStatus::Running)
1285 .await
1286 .unwrap();
1287
1288 let final_run = engine.resume_run(run.id).await.unwrap();
1289 assert_eq!(final_run.status.state, RunStatus::Completed);
1290
1291 let steps = engine.store().list_steps(run.id).await.unwrap();
1292 assert_eq!(steps.len(), 5);
1293 assert_eq!(steps[0].name, "build");
1294 assert_eq!(steps[1].name, "staging-gate");
1295 assert_eq!(steps[2].name, "deploy-staging");
1296 assert_eq!(steps[3].name, "prod-gate");
1297 assert_eq!(steps[4].name, "deploy-prod");
1298
1299 for step in &steps {
1300 assert_eq!(step.status.state, StepStatus::Completed);
1301 }
1302 }
1303}