1use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::Utc;
15use serde_json::Value;
16use tracing::{error, info};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
24use ironflow_store::store::RunStore;
25#[cfg(feature = "prometheus")]
26use metrics::{counter, gauge, histogram};
27
28use crate::context::WorkflowContext;
29use crate::error::EngineError;
30use crate::handler::{WorkflowHandler, WorkflowInfo};
31use crate::notify::{Event, EventPublisher, EventSubscriber};
32
33pub struct Engine {
74 store: Arc<dyn RunStore>,
75 provider: Arc<dyn AgentProvider>,
76 handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
77 event_publisher: EventPublisher,
78}
79
80fn validate_category(handler_name: &str, category: &str) -> Result<(), EngineError> {
90 let reject = |reason: &str| {
91 Err(EngineError::InvalidWorkflow(format!(
92 "handler '{handler_name}' has invalid category '{category}': {reason}"
93 )))
94 };
95
96 if category.is_empty() {
97 return reject("empty category");
98 }
99 if category.starts_with('/') {
100 return reject("leading '/'");
101 }
102 if category.ends_with('/') {
103 return reject("trailing '/'");
104 }
105 for segment in category.split('/') {
106 if segment.is_empty() {
107 return reject("empty segment (double '/')");
108 }
109 if segment.trim().is_empty() {
110 return reject("whitespace-only segment");
111 }
112 }
113 Ok(())
114}
115
116impl Engine {
117 pub fn new(store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
133 Self {
134 store,
135 provider,
136 handlers: HashMap::new(),
137 event_publisher: EventPublisher::new(),
138 }
139 }
140
141 pub fn store(&self) -> &Arc<dyn RunStore> {
143 &self.store
144 }
145
146 pub fn provider(&self) -> &Arc<dyn AgentProvider> {
148 &self.provider
149 }
150
151 fn build_context(&self, run_id: Uuid) -> WorkflowContext {
153 let handlers = self.handlers.clone();
154 let resolver: crate::context::HandlerResolver =
155 Arc::new(move |name: &str| handlers.get(name).cloned());
156 WorkflowContext::with_handler_resolver(
157 run_id,
158 self.store.clone(),
159 self.provider.clone(),
160 resolver,
161 )
162 }
163
164 pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
208 let name = handler.name().to_string();
209 if self.handlers.contains_key(&name) {
210 return Err(EngineError::InvalidWorkflow(format!(
211 "handler '{}' already registered",
212 name
213 )));
214 }
215 if let Some(category) = handler.category() {
216 validate_category(&name, category)?;
217 }
218 self.handlers.insert(name, Arc::new(handler));
219 Ok(())
220 }
221
222 pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
229 let name = handler.name().to_string();
230 if self.handlers.contains_key(&name) {
231 return Err(EngineError::InvalidWorkflow(format!(
232 "handler '{}' already registered",
233 name
234 )));
235 }
236 if let Some(category) = handler.category() {
237 validate_category(&name, category)?;
238 }
239 self.handlers.insert(name, Arc::from(handler));
240 Ok(())
241 }
242
243 pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
245 self.handlers.get(name)
246 }
247
248 pub fn handler_names(&self) -> Vec<&str> {
250 self.handlers.keys().map(|s| s.as_str()).collect()
251 }
252
253 pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
255 self.handlers.get(name).map(|h| h.describe())
256 }
257
258 pub fn subscribe(
283 &mut self,
284 subscriber: impl EventSubscriber + 'static,
285 event_types: &[&'static str],
286 ) {
287 self.event_publisher.subscribe(subscriber, event_types);
288 }
289
290 pub fn event_publisher(&self) -> &EventPublisher {
295 &self.event_publisher
296 }
297
298 #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
328 pub async fn run_handler(
329 &self,
330 handler_name: &str,
331 trigger: TriggerKind,
332 payload: Value,
333 ) -> Result<Run, EngineError> {
334 let handler = self
335 .handlers
336 .get(handler_name)
337 .ok_or_else(|| {
338 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
339 })?
340 .clone();
341
342 let run = self
343 .store
344 .create_run(NewRun {
345 workflow_name: handler_name.to_string(),
346 trigger,
347 payload,
348 max_retries: 0,
349 })
350 .await?;
351
352 let run_id = run.id;
353 info!(run_id = %run_id, "run created");
354
355 self.store
356 .update_run_status(run_id, RunStatus::Running)
357 .await?;
358
359 #[cfg(feature = "prometheus")]
360 gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
361
362 let run_start = Instant::now();
363 let mut ctx = self.build_context(run_id);
364
365 let result = handler.execute(&mut ctx).await;
366 self.finalize_run(run_id, handler_name, result, &ctx, run_start)
367 .await
368 }
369
370 #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
379 pub async fn enqueue_handler(
380 &self,
381 handler_name: &str,
382 trigger: TriggerKind,
383 payload: Value,
384 max_retries: u32,
385 ) -> Result<Run, EngineError> {
386 if !self.handlers.contains_key(handler_name) {
387 return Err(EngineError::InvalidWorkflow(format!(
388 "no handler registered: {handler_name}"
389 )));
390 }
391
392 let run = self
393 .store
394 .create_run(NewRun {
395 workflow_name: handler_name.to_string(),
396 trigger,
397 payload,
398 max_retries,
399 })
400 .await?;
401
402 info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
403 Ok(run)
404 }
405
406 #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
415 pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
416 let run = self
417 .store
418 .get_run(run_id)
419 .await?
420 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
421
422 let handler = self
423 .handlers
424 .get(&run.workflow_name)
425 .ok_or_else(|| {
426 EngineError::InvalidWorkflow(format!(
427 "no handler registered: {}",
428 run.workflow_name
429 ))
430 })?
431 .clone();
432
433 #[cfg(feature = "prometheus")]
434 gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
435
436 let run_start = Instant::now();
437 let mut ctx = self.build_context(run_id);
438
439 let result = handler.execute(&mut ctx).await;
440 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
441 .await
442 }
443
444 #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
452 pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
453 self.execute_handler_run(run_id).await
454 }
455
456 #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
470 pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
471 let run = self
472 .store
473 .get_run(run_id)
474 .await?
475 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
476
477 let handler = self
478 .handlers
479 .get(&run.workflow_name)
480 .ok_or_else(|| {
481 EngineError::InvalidWorkflow(format!(
482 "no handler registered: {}",
483 run.workflow_name
484 ))
485 })?
486 .clone();
487
488 info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
489
490 let run_start = Instant::now();
491 let mut ctx = self.build_context(run_id);
492 ctx.load_replay_steps().await?;
493
494 let result = handler.execute(&mut ctx).await;
495 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
496 .await
497 }
498
499 async fn finalize_run(
505 &self,
506 run_id: Uuid,
507 workflow_name: &str,
508 result: Result<(), EngineError>,
509 ctx: &WorkflowContext,
510 run_start: Instant,
511 ) -> Result<Run, EngineError> {
512 let total_duration = run_start.elapsed().as_millis() as u64;
513 let completed_at = Utc::now();
514
515 let final_status;
516 let final_run;
517
518 match result {
519 Ok(()) => {
520 final_status = RunStatus::Completed;
521 final_run = self
522 .store
523 .update_run_returning(
524 run_id,
525 RunUpdate {
526 status: Some(RunStatus::Completed),
527 cost_usd: Some(ctx.total_cost_usd()),
528 duration_ms: Some(total_duration),
529 completed_at: Some(completed_at),
530 ..RunUpdate::default()
531 },
532 )
533 .await?;
534
535 info!(
536 run_id = %run_id,
537 cost_usd = %ctx.total_cost_usd(),
538 duration_ms = total_duration,
539 "run completed"
540 );
541 }
542 Err(EngineError::ApprovalRequired {
543 run_id: approval_run_id,
544 step_id,
545 ref message,
546 }) => {
547 final_status = RunStatus::AwaitingApproval;
548 final_run = self
549 .store
550 .update_run_returning(
551 run_id,
552 RunUpdate {
553 status: Some(RunStatus::AwaitingApproval),
554 cost_usd: Some(ctx.total_cost_usd()),
555 duration_ms: Some(total_duration),
556 ..RunUpdate::default()
557 },
558 )
559 .await?;
560
561 info!(
562 run_id = %approval_run_id,
563 step_id = %step_id,
564 message = %message,
565 "run awaiting approval"
566 );
567 }
568 Err(err) => {
569 final_status = RunStatus::Failed;
570 if let Err(store_err) = self
571 .store
572 .update_run(
573 run_id,
574 RunUpdate {
575 status: Some(RunStatus::Failed),
576 error: Some(err.to_string()),
577 cost_usd: Some(ctx.total_cost_usd()),
578 duration_ms: Some(total_duration),
579 completed_at: Some(completed_at),
580 ..RunUpdate::default()
581 },
582 )
583 .await
584 {
585 error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
586 }
587
588 error!(run_id = %run_id, error = %err, "run failed");
589
590 self.publish_run_status_changed(
591 workflow_name,
592 run_id,
593 final_status,
594 Some(err.to_string()),
595 ctx,
596 total_duration,
597 );
598
599 #[cfg(feature = "prometheus")]
600 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
601
602 return Err(err);
603 }
604 }
605
606 self.publish_run_status_changed(
607 workflow_name,
608 run_id,
609 final_status,
610 None,
611 ctx,
612 total_duration,
613 );
614
615 #[cfg(feature = "prometheus")]
616 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
617
618 Ok(final_run)
619 }
620
621 #[cfg(feature = "prometheus")]
623 fn emit_run_metrics(
624 &self,
625 workflow_name: &str,
626 status: RunStatus,
627 duration_ms: u64,
628 ctx: &WorkflowContext,
629 ) {
630 let status_str = status.to_string();
631 let wf = workflow_name.to_string();
632
633 counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
634 histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
635 .record(duration_ms as f64 / 1000.0);
636 histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
637 ctx.total_cost_usd()
638 .to_string()
639 .parse::<f64>()
640 .unwrap_or(0.0),
641 );
642 gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
643 }
644
645 fn publish_run_status_changed(
650 &self,
651 workflow_name: &str,
652 run_id: Uuid,
653 to: RunStatus,
654 error: Option<String>,
655 ctx: &WorkflowContext,
656 duration_ms: u64,
657 ) {
658 let now = Utc::now();
659 let cost_usd = ctx.total_cost_usd();
660 let wf = workflow_name.to_string();
661
662 self.event_publisher.publish(Event::RunStatusChanged {
663 run_id,
664 workflow_name: wf.clone(),
665 from: RunStatus::Running,
666 to,
667 error: error.clone(),
668 cost_usd,
669 duration_ms,
670 at: now,
671 });
672
673 if to == RunStatus::Failed {
674 self.event_publisher.publish(Event::RunFailed {
675 run_id,
676 workflow_name: wf,
677 error,
678 cost_usd,
679 duration_ms,
680 at: now,
681 });
682 }
683 }
684}
685
686impl fmt::Debug for Engine {
687 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
688 f.debug_struct("Engine")
689 .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
690 .finish_non_exhaustive()
691 }
692}
693
694#[cfg(test)]
695mod tests {
696 use super::*;
697 use crate::config::ShellConfig;
698 use crate::handler::{HandlerFuture, WorkflowHandler};
699 use ironflow_core::providers::claude::ClaudeCodeProvider;
700 use ironflow_core::providers::record_replay::RecordReplayProvider;
701 use ironflow_store::memory::InMemoryStore;
702 use ironflow_store::models::StepStatus;
703 use serde_json::json;
704
705 struct EchoWorkflow;
707
708 impl WorkflowHandler for EchoWorkflow {
709 fn name(&self) -> &str {
710 "echo-workflow"
711 }
712
713 fn describe(&self) -> WorkflowInfo {
714 WorkflowInfo {
715 description: "A simple workflow that echoes hello".to_string(),
716 source_code: None,
717 sub_workflows: Vec::new(),
718 category: None,
719 }
720 }
721
722 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
723 Box::pin(async move {
724 ctx.shell("greet", ShellConfig::new("echo hello")).await?;
725 Ok(())
726 })
727 }
728 }
729
730 struct FailingWorkflow;
732
733 impl WorkflowHandler for FailingWorkflow {
734 fn name(&self) -> &str {
735 "failing-workflow"
736 }
737
738 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
739 Box::pin(async move {
740 ctx.shell("fail", ShellConfig::new("exit 1")).await?;
741 Ok(())
742 })
743 }
744 }
745
746 fn create_test_engine() -> Engine {
747 let store = Arc::new(InMemoryStore::new());
748 let inner = ClaudeCodeProvider::new();
749 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
750 inner,
751 "/tmp/ironflow-fixtures",
752 ));
753 Engine::new(store, provider)
754 }
755
756 #[test]
757 fn engine_new_creates_instance() {
758 let engine = create_test_engine();
759 assert_eq!(engine.handler_names().len(), 0);
760 }
761
762 #[test]
763 fn engine_register_handler() {
764 let mut engine = create_test_engine();
765 let result = engine.register(EchoWorkflow);
766 assert!(result.is_ok());
767 assert_eq!(engine.handler_names().len(), 1);
768 assert!(engine.handler_names().contains(&"echo-workflow"));
769 }
770
771 #[test]
772 fn engine_register_duplicate_returns_error() {
773 let mut engine = create_test_engine();
774 engine.register(EchoWorkflow).unwrap();
775 let result = engine.register(EchoWorkflow);
776 assert!(result.is_err());
777 }
778
779 #[test]
780 fn engine_get_handler_found() {
781 let mut engine = create_test_engine();
782 engine.register(EchoWorkflow).unwrap();
783 let handler = engine.get_handler("echo-workflow");
784 assert!(handler.is_some());
785 }
786
787 #[test]
788 fn engine_get_handler_not_found() {
789 let engine = create_test_engine();
790 let handler = engine.get_handler("nonexistent");
791 assert!(handler.is_none());
792 }
793
794 #[test]
795 fn engine_handler_names_lists_all() {
796 let mut engine = create_test_engine();
797 engine.register(EchoWorkflow).unwrap();
798 engine.register(FailingWorkflow).unwrap();
799 let names = engine.handler_names();
800 assert_eq!(names.len(), 2);
801 assert!(names.contains(&"echo-workflow"));
802 assert!(names.contains(&"failing-workflow"));
803 }
804
805 #[test]
806 fn engine_handler_info_returns_description() {
807 let mut engine = create_test_engine();
808 engine.register(EchoWorkflow).unwrap();
809 let info = engine.handler_info("echo-workflow");
810 assert!(info.is_some());
811 let info = info.unwrap();
812 assert_eq!(info.description, "A simple workflow that echoes hello");
813 }
814
815 struct CategorizedWorkflow;
816
817 impl WorkflowHandler for CategorizedWorkflow {
818 fn name(&self) -> &str {
819 "categorized"
820 }
821 fn category(&self) -> Option<&str> {
822 Some("data/etl")
823 }
824 fn execute<'a>(
825 &'a self,
826 _ctx: &'a mut WorkflowContext,
827 ) -> crate::handler::HandlerFuture<'a> {
828 Box::pin(async move { Ok(()) })
829 }
830 }
831
832 #[test]
833 fn engine_default_describe_propagates_category() {
834 let mut engine = create_test_engine();
835 engine.register(CategorizedWorkflow).unwrap();
836 let info = engine.handler_info("categorized").unwrap();
837 assert_eq!(info.category.as_deref(), Some("data/etl"));
838 }
839
840 #[test]
841 fn engine_default_describe_without_category() {
842 let mut engine = create_test_engine();
843 engine.register(EchoWorkflow).unwrap();
844 let info = engine.handler_info("echo-workflow").unwrap();
845 assert!(info.category.is_none());
846 }
847
848 struct BadCategoryWorkflow(&'static str);
849
850 impl WorkflowHandler for BadCategoryWorkflow {
851 fn name(&self) -> &str {
852 "bad-category"
853 }
854 fn category(&self) -> Option<&str> {
855 Some(self.0)
856 }
857 fn execute<'a>(
858 &'a self,
859 _ctx: &'a mut WorkflowContext,
860 ) -> crate::handler::HandlerFuture<'a> {
861 Box::pin(async move { Ok(()) })
862 }
863 }
864
865 #[test]
866 fn engine_register_rejects_empty_category() {
867 let mut engine = create_test_engine();
868 let err = engine.register(BadCategoryWorkflow("")).unwrap_err();
869 match err {
870 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty category")),
871 other => panic!("expected InvalidWorkflow, got {other:?}"),
872 }
873 }
874
875 #[test]
876 fn engine_register_rejects_leading_slash_category() {
877 let mut engine = create_test_engine();
878 let err = engine
879 .register(BadCategoryWorkflow("/data/etl"))
880 .unwrap_err();
881 match err {
882 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("leading '/'")),
883 other => panic!("expected InvalidWorkflow, got {other:?}"),
884 }
885 }
886
887 #[test]
888 fn engine_register_rejects_trailing_slash_category() {
889 let mut engine = create_test_engine();
890 let err = engine
891 .register(BadCategoryWorkflow("data/etl/"))
892 .unwrap_err();
893 match err {
894 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("trailing '/'")),
895 other => panic!("expected InvalidWorkflow, got {other:?}"),
896 }
897 }
898
899 #[test]
900 fn engine_register_rejects_double_slash_category() {
901 let mut engine = create_test_engine();
902 let err = engine
903 .register(BadCategoryWorkflow("data//etl"))
904 .unwrap_err();
905 match err {
906 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty segment")),
907 other => panic!("expected InvalidWorkflow, got {other:?}"),
908 }
909 }
910
911 #[test]
912 fn engine_register_rejects_whitespace_only_segment_category() {
913 let mut engine = create_test_engine();
914 let err = engine
915 .register(BadCategoryWorkflow("data/ /etl"))
916 .unwrap_err();
917 match err {
918 EngineError::InvalidWorkflow(msg) => assert!(msg.contains("whitespace-only segment")),
919 other => panic!("expected InvalidWorkflow, got {other:?}"),
920 }
921 }
922
923 #[test]
924 fn engine_register_accepts_valid_nested_category() {
925 let mut engine = create_test_engine();
926 assert!(engine.register(CategorizedWorkflow).is_ok());
927 }
928
929 #[tokio::test]
930 async fn engine_unknown_workflow_returns_error() {
931 let engine = create_test_engine();
932 let result = engine
933 .run_handler("unknown", TriggerKind::Manual, json!({}))
934 .await;
935 assert!(result.is_err());
936 match result {
937 Err(EngineError::InvalidWorkflow(msg)) => {
938 assert!(msg.contains("no handler registered"));
939 }
940 _ => panic!("expected InvalidWorkflow error"),
941 }
942 }
943
944 #[tokio::test]
945 async fn engine_enqueue_handler_creates_pending_run() {
946 let mut engine = create_test_engine();
947 engine.register(EchoWorkflow).unwrap();
948
949 let run = engine
950 .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
951 .await
952 .unwrap();
953 assert_eq!(run.status.state, RunStatus::Pending);
954 assert_eq!(run.workflow_name, "echo-workflow");
955 }
956
957 #[tokio::test]
958 async fn engine_register_boxed() {
959 let mut engine = create_test_engine();
960 let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
961 let result = engine.register_boxed(handler);
962 assert!(result.is_ok());
963 assert_eq!(engine.handler_names().len(), 1);
964 }
965
966 #[tokio::test]
967 async fn engine_store_and_provider_accessors() {
968 let store = Arc::new(InMemoryStore::new());
969 let inner = ClaudeCodeProvider::new();
970 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
971 inner,
972 "/tmp/ironflow-fixtures",
973 ));
974 let engine = Engine::new(store.clone(), provider.clone());
975
976 let _ = engine.store();
978 let _ = engine.provider();
979 }
980
981 use crate::operation::Operation;
986 use ironflow_store::models::StepKind;
987 use std::future::Future;
988 use std::pin::Pin;
989
990 struct FakeGitlabOp {
991 project_id: u64,
992 title: String,
993 }
994
995 impl Operation for FakeGitlabOp {
996 fn kind(&self) -> &str {
997 "gitlab"
998 }
999
1000 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1001 Box::pin(async move {
1002 Ok(json!({
1003 "issue_id": 42,
1004 "project_id": self.project_id,
1005 "title": self.title,
1006 }))
1007 })
1008 }
1009
1010 fn input(&self) -> Option<Value> {
1011 Some(json!({
1012 "project_id": self.project_id,
1013 "title": self.title,
1014 }))
1015 }
1016 }
1017
1018 struct FailingOp;
1019
1020 impl Operation for FailingOp {
1021 fn kind(&self) -> &str {
1022 "broken-service"
1023 }
1024
1025 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1026 Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
1027 }
1028 }
1029
1030 struct OperationWorkflow;
1031
1032 impl WorkflowHandler for OperationWorkflow {
1033 fn name(&self) -> &str {
1034 "operation-workflow"
1035 }
1036
1037 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1038 Box::pin(async move {
1039 let op = FakeGitlabOp {
1040 project_id: 123,
1041 title: "Bug report".to_string(),
1042 };
1043 ctx.operation("create-issue", &op).await?;
1044 Ok(())
1045 })
1046 }
1047 }
1048
1049 struct FailingOperationWorkflow;
1050
1051 impl WorkflowHandler for FailingOperationWorkflow {
1052 fn name(&self) -> &str {
1053 "failing-operation-workflow"
1054 }
1055
1056 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1057 Box::pin(async move {
1058 ctx.operation("broken-call", &FailingOp).await?;
1059 Ok(())
1060 })
1061 }
1062 }
1063
1064 struct MixedWorkflow;
1065
1066 impl WorkflowHandler for MixedWorkflow {
1067 fn name(&self) -> &str {
1068 "mixed-workflow"
1069 }
1070
1071 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1072 Box::pin(async move {
1073 ctx.shell("build", ShellConfig::new("echo built")).await?;
1074 let op = FakeGitlabOp {
1075 project_id: 456,
1076 title: "Deploy done".to_string(),
1077 };
1078 let result = ctx.operation("notify-gitlab", &op).await?;
1079 assert_eq!(result.output["issue_id"], 42);
1080 Ok(())
1081 })
1082 }
1083 }
1084
1085 #[tokio::test]
1086 async fn operation_step_happy_path() {
1087 let mut engine = create_test_engine();
1088 engine.register(OperationWorkflow).unwrap();
1089
1090 let run = engine
1091 .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
1092 .await
1093 .unwrap();
1094
1095 assert_eq!(run.status.state, RunStatus::Completed);
1096
1097 let steps = engine.store().list_steps(run.id).await.unwrap();
1098
1099 assert_eq!(steps.len(), 1);
1100 assert_eq!(steps[0].name, "create-issue");
1101 assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
1102 assert_eq!(
1103 steps[0].status.state,
1104 ironflow_store::models::StepStatus::Completed
1105 );
1106
1107 let output = steps[0].output.as_ref().unwrap();
1108 assert_eq!(output["issue_id"], 42);
1109 assert_eq!(output["project_id"], 123);
1110
1111 let input = steps[0].input.as_ref().unwrap();
1112 assert_eq!(input["project_id"], 123);
1113 assert_eq!(input["title"], "Bug report");
1114 }
1115
1116 #[tokio::test]
1117 async fn operation_step_failure_marks_run_failed() {
1118 let mut engine = create_test_engine();
1119 engine.register(FailingOperationWorkflow).unwrap();
1120
1121 let result = engine
1122 .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
1123 .await;
1124
1125 assert!(result.is_err());
1126 }
1127
1128 #[tokio::test]
1129 async fn operation_mixed_with_shell_steps() {
1130 let mut engine = create_test_engine();
1131 engine.register(MixedWorkflow).unwrap();
1132
1133 let run = engine
1134 .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
1135 .await
1136 .unwrap();
1137
1138 assert_eq!(run.status.state, RunStatus::Completed);
1139
1140 let steps = engine.store().list_steps(run.id).await.unwrap();
1141
1142 assert_eq!(steps.len(), 2);
1143 assert_eq!(steps[0].kind, StepKind::Shell);
1144 assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
1145 assert_eq!(steps[0].position, 0);
1146 assert_eq!(steps[1].position, 1);
1147 }
1148
1149 use crate::config::ApprovalConfig;
1154
1155 struct SingleApprovalWorkflow;
1156
1157 impl WorkflowHandler for SingleApprovalWorkflow {
1158 fn name(&self) -> &str {
1159 "single-approval"
1160 }
1161
1162 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1163 Box::pin(async move {
1164 ctx.shell("build", ShellConfig::new("echo built")).await?;
1165 ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
1166 ctx.shell("deploy", ShellConfig::new("echo deployed"))
1167 .await?;
1168 Ok(())
1169 })
1170 }
1171 }
1172
1173 struct DoubleApprovalWorkflow;
1174
1175 impl WorkflowHandler for DoubleApprovalWorkflow {
1176 fn name(&self) -> &str {
1177 "double-approval"
1178 }
1179
1180 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1181 Box::pin(async move {
1182 ctx.shell("build", ShellConfig::new("echo built")).await?;
1183 ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1184 .await?;
1185 ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1186 .await?;
1187 ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1188 .await?;
1189 ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1190 .await?;
1191 Ok(())
1192 })
1193 }
1194 }
1195
1196 #[tokio::test]
1197 async fn approval_pauses_run() {
1198 let mut engine = create_test_engine();
1199 engine.register(SingleApprovalWorkflow).unwrap();
1200
1201 let run = engine
1202 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1203 .await
1204 .unwrap();
1205
1206 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1207
1208 let steps = engine.store().list_steps(run.id).await.unwrap();
1209 assert_eq!(steps.len(), 2); assert_eq!(steps[0].kind, StepKind::Shell);
1211 assert_eq!(steps[0].status.state, StepStatus::Completed);
1212 assert_eq!(steps[1].kind, StepKind::Approval);
1213 assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1214 }
1215
1216 #[tokio::test]
1217 async fn approval_resume_completes_run() {
1218 let mut engine = create_test_engine();
1219 engine.register(SingleApprovalWorkflow).unwrap();
1220
1221 let run = engine
1223 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1224 .await
1225 .unwrap();
1226 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1227
1228 engine
1230 .store()
1231 .update_run_status(run.id, RunStatus::Running)
1232 .await
1233 .unwrap();
1234
1235 let resumed = engine.resume_run(run.id).await.unwrap();
1237 assert_eq!(resumed.status.state, RunStatus::Completed);
1238
1239 let steps = engine.store().list_steps(run.id).await.unwrap();
1240 assert_eq!(steps.len(), 3); assert_eq!(steps[0].name, "build");
1242 assert_eq!(steps[0].status.state, StepStatus::Completed);
1243 assert_eq!(steps[1].name, "gate");
1244 assert_eq!(steps[1].kind, StepKind::Approval);
1245 assert_eq!(steps[1].status.state, StepStatus::Completed);
1246 assert_eq!(steps[2].name, "deploy");
1247 assert_eq!(steps[2].status.state, StepStatus::Completed);
1248 }
1249
1250 #[tokio::test]
1251 async fn double_approval_two_resumes() {
1252 let mut engine = create_test_engine();
1253 engine.register(DoubleApprovalWorkflow).unwrap();
1254
1255 let run = engine
1257 .run_handler("double-approval", TriggerKind::Manual, json!({}))
1258 .await
1259 .unwrap();
1260 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1261
1262 let steps = engine.store().list_steps(run.id).await.unwrap();
1263 assert_eq!(steps.len(), 2); engine
1267 .store()
1268 .update_run_status(run.id, RunStatus::Running)
1269 .await
1270 .unwrap();
1271
1272 let resumed = engine.resume_run(run.id).await.unwrap();
1273 assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1274
1275 let steps = engine.store().list_steps(run.id).await.unwrap();
1276 assert_eq!(steps.len(), 4); engine
1280 .store()
1281 .update_run_status(run.id, RunStatus::Running)
1282 .await
1283 .unwrap();
1284
1285 let final_run = engine.resume_run(run.id).await.unwrap();
1286 assert_eq!(final_run.status.state, RunStatus::Completed);
1287
1288 let steps = engine.store().list_steps(run.id).await.unwrap();
1289 assert_eq!(steps.len(), 5);
1290 assert_eq!(steps[0].name, "build");
1291 assert_eq!(steps[1].name, "staging-gate");
1292 assert_eq!(steps[2].name, "deploy-staging");
1293 assert_eq!(steps[3].name, "prod-gate");
1294 assert_eq!(steps[4].name, "deploy-prod");
1295
1296 for step in &steps {
1297 assert_eq!(step.status.state, StepStatus::Completed);
1298 }
1299 }
1300}