1use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::Utc;
15use serde_json::Value;
16use tracing::{error, info};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
24use ironflow_store::store::RunStore;
25#[cfg(feature = "prometheus")]
26use metrics::{counter, gauge, histogram};
27
28use crate::context::WorkflowContext;
29use crate::error::EngineError;
30use crate::handler::{WorkflowHandler, WorkflowInfo};
31use crate::notify::{Event, EventPublisher, EventSubscriber};
32
33pub struct Engine {
74 store: Arc<dyn RunStore>,
75 provider: Arc<dyn AgentProvider>,
76 handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
77 event_publisher: EventPublisher,
78}
79
80impl Engine {
81 pub fn new(store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
97 Self {
98 store,
99 provider,
100 handlers: HashMap::new(),
101 event_publisher: EventPublisher::new(),
102 }
103 }
104
105 pub fn store(&self) -> &Arc<dyn RunStore> {
107 &self.store
108 }
109
110 pub fn provider(&self) -> &Arc<dyn AgentProvider> {
112 &self.provider
113 }
114
115 fn build_context(&self, run_id: Uuid) -> WorkflowContext {
117 let handlers = self.handlers.clone();
118 let resolver: crate::context::HandlerResolver =
119 Arc::new(move |name: &str| handlers.get(name).cloned());
120 WorkflowContext::with_handler_resolver(
121 run_id,
122 self.store.clone(),
123 self.provider.clone(),
124 resolver,
125 )
126 }
127
128 pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
172 let name = handler.name().to_string();
173 if self.handlers.contains_key(&name) {
174 return Err(EngineError::InvalidWorkflow(format!(
175 "handler '{}' already registered",
176 name
177 )));
178 }
179 self.handlers.insert(name, Arc::new(handler));
180 Ok(())
181 }
182
183 pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
190 let name = handler.name().to_string();
191 if self.handlers.contains_key(&name) {
192 return Err(EngineError::InvalidWorkflow(format!(
193 "handler '{}' already registered",
194 name
195 )));
196 }
197 self.handlers.insert(name, Arc::from(handler));
198 Ok(())
199 }
200
201 pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
203 self.handlers.get(name)
204 }
205
206 pub fn handler_names(&self) -> Vec<&str> {
208 self.handlers.keys().map(|s| s.as_str()).collect()
209 }
210
211 pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
213 self.handlers.get(name).map(|h| h.describe())
214 }
215
216 pub fn subscribe(
241 &mut self,
242 subscriber: impl EventSubscriber + 'static,
243 event_types: &[&'static str],
244 ) {
245 self.event_publisher.subscribe(subscriber, event_types);
246 }
247
248 pub fn event_publisher(&self) -> &EventPublisher {
253 &self.event_publisher
254 }
255
256 #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
286 pub async fn run_handler(
287 &self,
288 handler_name: &str,
289 trigger: TriggerKind,
290 payload: Value,
291 ) -> Result<Run, EngineError> {
292 let handler = self
293 .handlers
294 .get(handler_name)
295 .ok_or_else(|| {
296 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
297 })?
298 .clone();
299
300 let run = self
301 .store
302 .create_run(NewRun {
303 workflow_name: handler_name.to_string(),
304 trigger,
305 payload,
306 max_retries: 0,
307 })
308 .await?;
309
310 let run_id = run.id;
311 info!(run_id = %run_id, "run created");
312
313 self.store
314 .update_run_status(run_id, RunStatus::Running)
315 .await?;
316
317 #[cfg(feature = "prometheus")]
318 gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
319
320 let run_start = Instant::now();
321 let mut ctx = self.build_context(run_id);
322
323 let result = handler.execute(&mut ctx).await;
324 self.finalize_run(run_id, handler_name, result, &ctx, run_start)
325 .await
326 }
327
328 #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
337 pub async fn enqueue_handler(
338 &self,
339 handler_name: &str,
340 trigger: TriggerKind,
341 payload: Value,
342 max_retries: u32,
343 ) -> Result<Run, EngineError> {
344 if !self.handlers.contains_key(handler_name) {
345 return Err(EngineError::InvalidWorkflow(format!(
346 "no handler registered: {handler_name}"
347 )));
348 }
349
350 let run = self
351 .store
352 .create_run(NewRun {
353 workflow_name: handler_name.to_string(),
354 trigger,
355 payload,
356 max_retries,
357 })
358 .await?;
359
360 info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
361 Ok(run)
362 }
363
364 #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
373 pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
374 let run = self
375 .store
376 .get_run(run_id)
377 .await?
378 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
379
380 let handler = self
381 .handlers
382 .get(&run.workflow_name)
383 .ok_or_else(|| {
384 EngineError::InvalidWorkflow(format!(
385 "no handler registered: {}",
386 run.workflow_name
387 ))
388 })?
389 .clone();
390
391 #[cfg(feature = "prometheus")]
392 gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
393
394 let run_start = Instant::now();
395 let mut ctx = self.build_context(run_id);
396
397 let result = handler.execute(&mut ctx).await;
398 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
399 .await
400 }
401
402 #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
410 pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
411 self.execute_handler_run(run_id).await
412 }
413
414 #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
428 pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
429 let run = self
430 .store
431 .get_run(run_id)
432 .await?
433 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
434
435 let handler = self
436 .handlers
437 .get(&run.workflow_name)
438 .ok_or_else(|| {
439 EngineError::InvalidWorkflow(format!(
440 "no handler registered: {}",
441 run.workflow_name
442 ))
443 })?
444 .clone();
445
446 info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
447
448 let run_start = Instant::now();
449 let mut ctx = self.build_context(run_id);
450 ctx.load_replay_steps().await?;
451
452 let result = handler.execute(&mut ctx).await;
453 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
454 .await
455 }
456
457 async fn finalize_run(
463 &self,
464 run_id: Uuid,
465 workflow_name: &str,
466 result: Result<(), EngineError>,
467 ctx: &WorkflowContext,
468 run_start: Instant,
469 ) -> Result<Run, EngineError> {
470 let total_duration = run_start.elapsed().as_millis() as u64;
471 let completed_at = Utc::now();
472
473 let final_status;
474 let final_run;
475
476 match result {
477 Ok(()) => {
478 final_status = RunStatus::Completed;
479 final_run = self
480 .store
481 .update_run_returning(
482 run_id,
483 RunUpdate {
484 status: Some(RunStatus::Completed),
485 cost_usd: Some(ctx.total_cost_usd()),
486 duration_ms: Some(total_duration),
487 completed_at: Some(completed_at),
488 ..RunUpdate::default()
489 },
490 )
491 .await?;
492
493 info!(
494 run_id = %run_id,
495 cost_usd = %ctx.total_cost_usd(),
496 duration_ms = total_duration,
497 "run completed"
498 );
499 }
500 Err(EngineError::ApprovalRequired {
501 run_id: approval_run_id,
502 step_id,
503 ref message,
504 }) => {
505 final_status = RunStatus::AwaitingApproval;
506 final_run = self
507 .store
508 .update_run_returning(
509 run_id,
510 RunUpdate {
511 status: Some(RunStatus::AwaitingApproval),
512 cost_usd: Some(ctx.total_cost_usd()),
513 duration_ms: Some(total_duration),
514 ..RunUpdate::default()
515 },
516 )
517 .await?;
518
519 info!(
520 run_id = %approval_run_id,
521 step_id = %step_id,
522 message = %message,
523 "run awaiting approval"
524 );
525 }
526 Err(err) => {
527 final_status = RunStatus::Failed;
528 if let Err(store_err) = self
529 .store
530 .update_run(
531 run_id,
532 RunUpdate {
533 status: Some(RunStatus::Failed),
534 error: Some(err.to_string()),
535 cost_usd: Some(ctx.total_cost_usd()),
536 duration_ms: Some(total_duration),
537 completed_at: Some(completed_at),
538 ..RunUpdate::default()
539 },
540 )
541 .await
542 {
543 error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
544 }
545
546 error!(run_id = %run_id, error = %err, "run failed");
547
548 self.publish_run_status_changed(
549 workflow_name,
550 run_id,
551 final_status,
552 Some(err.to_string()),
553 ctx,
554 total_duration,
555 );
556
557 #[cfg(feature = "prometheus")]
558 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
559
560 return Err(err);
561 }
562 }
563
564 self.publish_run_status_changed(
565 workflow_name,
566 run_id,
567 final_status,
568 None,
569 ctx,
570 total_duration,
571 );
572
573 #[cfg(feature = "prometheus")]
574 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
575
576 Ok(final_run)
577 }
578
579 #[cfg(feature = "prometheus")]
581 fn emit_run_metrics(
582 &self,
583 workflow_name: &str,
584 status: RunStatus,
585 duration_ms: u64,
586 ctx: &WorkflowContext,
587 ) {
588 let status_str = status.to_string();
589 let wf = workflow_name.to_string();
590
591 counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
592 histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
593 .record(duration_ms as f64 / 1000.0);
594 histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
595 ctx.total_cost_usd()
596 .to_string()
597 .parse::<f64>()
598 .unwrap_or(0.0),
599 );
600 gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
601 }
602
603 fn publish_run_status_changed(
608 &self,
609 workflow_name: &str,
610 run_id: Uuid,
611 to: RunStatus,
612 error: Option<String>,
613 ctx: &WorkflowContext,
614 duration_ms: u64,
615 ) {
616 self.event_publisher.publish(Event::RunStatusChanged {
617 run_id,
618 workflow_name: workflow_name.to_string(),
619 from: RunStatus::Running,
620 to,
621 error,
622 cost_usd: ctx.total_cost_usd(),
623 duration_ms,
624 at: Utc::now(),
625 });
626 }
627}
628
629impl fmt::Debug for Engine {
630 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
631 f.debug_struct("Engine")
632 .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
633 .finish_non_exhaustive()
634 }
635}
636
637#[cfg(test)]
638mod tests {
639 use super::*;
640 use crate::config::ShellConfig;
641 use crate::handler::{HandlerFuture, WorkflowHandler};
642 use ironflow_core::providers::claude::ClaudeCodeProvider;
643 use ironflow_core::providers::record_replay::RecordReplayProvider;
644 use ironflow_store::memory::InMemoryStore;
645 use ironflow_store::models::StepStatus;
646 use serde_json::json;
647
648 struct EchoWorkflow;
650
651 impl WorkflowHandler for EchoWorkflow {
652 fn name(&self) -> &str {
653 "echo-workflow"
654 }
655
656 fn describe(&self) -> WorkflowInfo {
657 WorkflowInfo {
658 description: "A simple workflow that echoes hello".to_string(),
659 source_code: None,
660 sub_workflows: Vec::new(),
661 }
662 }
663
664 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
665 Box::pin(async move {
666 ctx.shell("greet", ShellConfig::new("echo hello")).await?;
667 Ok(())
668 })
669 }
670 }
671
672 struct FailingWorkflow;
674
675 impl WorkflowHandler for FailingWorkflow {
676 fn name(&self) -> &str {
677 "failing-workflow"
678 }
679
680 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
681 Box::pin(async move {
682 ctx.shell("fail", ShellConfig::new("exit 1")).await?;
683 Ok(())
684 })
685 }
686 }
687
688 fn create_test_engine() -> Engine {
689 let store = Arc::new(InMemoryStore::new());
690 let inner = ClaudeCodeProvider::new();
691 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
692 inner,
693 "/tmp/ironflow-fixtures",
694 ));
695 Engine::new(store, provider)
696 }
697
698 #[test]
699 fn engine_new_creates_instance() {
700 let engine = create_test_engine();
701 assert_eq!(engine.handler_names().len(), 0);
702 }
703
704 #[test]
705 fn engine_register_handler() {
706 let mut engine = create_test_engine();
707 let result = engine.register(EchoWorkflow);
708 assert!(result.is_ok());
709 assert_eq!(engine.handler_names().len(), 1);
710 assert!(engine.handler_names().contains(&"echo-workflow"));
711 }
712
713 #[test]
714 fn engine_register_duplicate_returns_error() {
715 let mut engine = create_test_engine();
716 engine.register(EchoWorkflow).unwrap();
717 let result = engine.register(EchoWorkflow);
718 assert!(result.is_err());
719 }
720
721 #[test]
722 fn engine_get_handler_found() {
723 let mut engine = create_test_engine();
724 engine.register(EchoWorkflow).unwrap();
725 let handler = engine.get_handler("echo-workflow");
726 assert!(handler.is_some());
727 }
728
729 #[test]
730 fn engine_get_handler_not_found() {
731 let engine = create_test_engine();
732 let handler = engine.get_handler("nonexistent");
733 assert!(handler.is_none());
734 }
735
736 #[test]
737 fn engine_handler_names_lists_all() {
738 let mut engine = create_test_engine();
739 engine.register(EchoWorkflow).unwrap();
740 engine.register(FailingWorkflow).unwrap();
741 let names = engine.handler_names();
742 assert_eq!(names.len(), 2);
743 assert!(names.contains(&"echo-workflow"));
744 assert!(names.contains(&"failing-workflow"));
745 }
746
747 #[test]
748 fn engine_handler_info_returns_description() {
749 let mut engine = create_test_engine();
750 engine.register(EchoWorkflow).unwrap();
751 let info = engine.handler_info("echo-workflow");
752 assert!(info.is_some());
753 let info = info.unwrap();
754 assert_eq!(info.description, "A simple workflow that echoes hello");
755 }
756
757 #[tokio::test]
758 async fn engine_unknown_workflow_returns_error() {
759 let engine = create_test_engine();
760 let result = engine
761 .run_handler("unknown", TriggerKind::Manual, json!({}))
762 .await;
763 assert!(result.is_err());
764 match result {
765 Err(EngineError::InvalidWorkflow(msg)) => {
766 assert!(msg.contains("no handler registered"));
767 }
768 _ => panic!("expected InvalidWorkflow error"),
769 }
770 }
771
772 #[tokio::test]
773 async fn engine_enqueue_handler_creates_pending_run() {
774 let mut engine = create_test_engine();
775 engine.register(EchoWorkflow).unwrap();
776
777 let run = engine
778 .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
779 .await
780 .unwrap();
781 assert_eq!(run.status.state, RunStatus::Pending);
782 assert_eq!(run.workflow_name, "echo-workflow");
783 }
784
785 #[tokio::test]
786 async fn engine_register_boxed() {
787 let mut engine = create_test_engine();
788 let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
789 let result = engine.register_boxed(handler);
790 assert!(result.is_ok());
791 assert_eq!(engine.handler_names().len(), 1);
792 }
793
794 #[tokio::test]
795 async fn engine_store_and_provider_accessors() {
796 let store = Arc::new(InMemoryStore::new());
797 let inner = ClaudeCodeProvider::new();
798 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
799 inner,
800 "/tmp/ironflow-fixtures",
801 ));
802 let engine = Engine::new(store.clone(), provider.clone());
803
804 let _ = engine.store();
806 let _ = engine.provider();
807 }
808
809 use crate::operation::Operation;
814 use ironflow_store::models::StepKind;
815 use std::future::Future;
816 use std::pin::Pin;
817
818 struct FakeGitlabOp {
819 project_id: u64,
820 title: String,
821 }
822
823 impl Operation for FakeGitlabOp {
824 fn kind(&self) -> &str {
825 "gitlab"
826 }
827
828 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
829 Box::pin(async move {
830 Ok(json!({
831 "issue_id": 42,
832 "project_id": self.project_id,
833 "title": self.title,
834 }))
835 })
836 }
837
838 fn input(&self) -> Option<Value> {
839 Some(json!({
840 "project_id": self.project_id,
841 "title": self.title,
842 }))
843 }
844 }
845
846 struct FailingOp;
847
848 impl Operation for FailingOp {
849 fn kind(&self) -> &str {
850 "broken-service"
851 }
852
853 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
854 Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
855 }
856 }
857
858 struct OperationWorkflow;
859
860 impl WorkflowHandler for OperationWorkflow {
861 fn name(&self) -> &str {
862 "operation-workflow"
863 }
864
865 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
866 Box::pin(async move {
867 let op = FakeGitlabOp {
868 project_id: 123,
869 title: "Bug report".to_string(),
870 };
871 ctx.operation("create-issue", &op).await?;
872 Ok(())
873 })
874 }
875 }
876
877 struct FailingOperationWorkflow;
878
879 impl WorkflowHandler for FailingOperationWorkflow {
880 fn name(&self) -> &str {
881 "failing-operation-workflow"
882 }
883
884 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
885 Box::pin(async move {
886 ctx.operation("broken-call", &FailingOp).await?;
887 Ok(())
888 })
889 }
890 }
891
892 struct MixedWorkflow;
893
894 impl WorkflowHandler for MixedWorkflow {
895 fn name(&self) -> &str {
896 "mixed-workflow"
897 }
898
899 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
900 Box::pin(async move {
901 ctx.shell("build", ShellConfig::new("echo built")).await?;
902 let op = FakeGitlabOp {
903 project_id: 456,
904 title: "Deploy done".to_string(),
905 };
906 let result = ctx.operation("notify-gitlab", &op).await?;
907 assert_eq!(result.output["issue_id"], 42);
908 Ok(())
909 })
910 }
911 }
912
913 #[tokio::test]
914 async fn operation_step_happy_path() {
915 let mut engine = create_test_engine();
916 engine.register(OperationWorkflow).unwrap();
917
918 let run = engine
919 .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
920 .await
921 .unwrap();
922
923 assert_eq!(run.status.state, RunStatus::Completed);
924
925 let steps = engine.store().list_steps(run.id).await.unwrap();
926
927 assert_eq!(steps.len(), 1);
928 assert_eq!(steps[0].name, "create-issue");
929 assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
930 assert_eq!(
931 steps[0].status.state,
932 ironflow_store::models::StepStatus::Completed
933 );
934
935 let output = steps[0].output.as_ref().unwrap();
936 assert_eq!(output["issue_id"], 42);
937 assert_eq!(output["project_id"], 123);
938
939 let input = steps[0].input.as_ref().unwrap();
940 assert_eq!(input["project_id"], 123);
941 assert_eq!(input["title"], "Bug report");
942 }
943
944 #[tokio::test]
945 async fn operation_step_failure_marks_run_failed() {
946 let mut engine = create_test_engine();
947 engine.register(FailingOperationWorkflow).unwrap();
948
949 let result = engine
950 .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
951 .await;
952
953 assert!(result.is_err());
954 }
955
956 #[tokio::test]
957 async fn operation_mixed_with_shell_steps() {
958 let mut engine = create_test_engine();
959 engine.register(MixedWorkflow).unwrap();
960
961 let run = engine
962 .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
963 .await
964 .unwrap();
965
966 assert_eq!(run.status.state, RunStatus::Completed);
967
968 let steps = engine.store().list_steps(run.id).await.unwrap();
969
970 assert_eq!(steps.len(), 2);
971 assert_eq!(steps[0].kind, StepKind::Shell);
972 assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
973 assert_eq!(steps[0].position, 0);
974 assert_eq!(steps[1].position, 1);
975 }
976
977 use crate::config::ApprovalConfig;
982
983 struct SingleApprovalWorkflow;
984
985 impl WorkflowHandler for SingleApprovalWorkflow {
986 fn name(&self) -> &str {
987 "single-approval"
988 }
989
990 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
991 Box::pin(async move {
992 ctx.shell("build", ShellConfig::new("echo built")).await?;
993 ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
994 ctx.shell("deploy", ShellConfig::new("echo deployed"))
995 .await?;
996 Ok(())
997 })
998 }
999 }
1000
1001 struct DoubleApprovalWorkflow;
1002
1003 impl WorkflowHandler for DoubleApprovalWorkflow {
1004 fn name(&self) -> &str {
1005 "double-approval"
1006 }
1007
1008 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1009 Box::pin(async move {
1010 ctx.shell("build", ShellConfig::new("echo built")).await?;
1011 ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1012 .await?;
1013 ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1014 .await?;
1015 ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1016 .await?;
1017 ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1018 .await?;
1019 Ok(())
1020 })
1021 }
1022 }
1023
1024 #[tokio::test]
1025 async fn approval_pauses_run() {
1026 let mut engine = create_test_engine();
1027 engine.register(SingleApprovalWorkflow).unwrap();
1028
1029 let run = engine
1030 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1031 .await
1032 .unwrap();
1033
1034 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1035
1036 let steps = engine.store().list_steps(run.id).await.unwrap();
1037 assert_eq!(steps.len(), 2); assert_eq!(steps[0].kind, StepKind::Shell);
1039 assert_eq!(steps[0].status.state, StepStatus::Completed);
1040 assert_eq!(steps[1].kind, StepKind::Approval);
1041 assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1042 }
1043
1044 #[tokio::test]
1045 async fn approval_resume_completes_run() {
1046 let mut engine = create_test_engine();
1047 engine.register(SingleApprovalWorkflow).unwrap();
1048
1049 let run = engine
1051 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1052 .await
1053 .unwrap();
1054 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1055
1056 engine
1058 .store()
1059 .update_run_status(run.id, RunStatus::Running)
1060 .await
1061 .unwrap();
1062
1063 let resumed = engine.resume_run(run.id).await.unwrap();
1065 assert_eq!(resumed.status.state, RunStatus::Completed);
1066
1067 let steps = engine.store().list_steps(run.id).await.unwrap();
1068 assert_eq!(steps.len(), 3); assert_eq!(steps[0].name, "build");
1070 assert_eq!(steps[0].status.state, StepStatus::Completed);
1071 assert_eq!(steps[1].name, "gate");
1072 assert_eq!(steps[1].kind, StepKind::Approval);
1073 assert_eq!(steps[1].status.state, StepStatus::Completed);
1074 assert_eq!(steps[2].name, "deploy");
1075 assert_eq!(steps[2].status.state, StepStatus::Completed);
1076 }
1077
1078 #[tokio::test]
1079 async fn double_approval_two_resumes() {
1080 let mut engine = create_test_engine();
1081 engine.register(DoubleApprovalWorkflow).unwrap();
1082
1083 let run = engine
1085 .run_handler("double-approval", TriggerKind::Manual, json!({}))
1086 .await
1087 .unwrap();
1088 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1089
1090 let steps = engine.store().list_steps(run.id).await.unwrap();
1091 assert_eq!(steps.len(), 2); engine
1095 .store()
1096 .update_run_status(run.id, RunStatus::Running)
1097 .await
1098 .unwrap();
1099
1100 let resumed = engine.resume_run(run.id).await.unwrap();
1101 assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1102
1103 let steps = engine.store().list_steps(run.id).await.unwrap();
1104 assert_eq!(steps.len(), 4); engine
1108 .store()
1109 .update_run_status(run.id, RunStatus::Running)
1110 .await
1111 .unwrap();
1112
1113 let final_run = engine.resume_run(run.id).await.unwrap();
1114 assert_eq!(final_run.status.state, RunStatus::Completed);
1115
1116 let steps = engine.store().list_steps(run.id).await.unwrap();
1117 assert_eq!(steps.len(), 5);
1118 assert_eq!(steps[0].name, "build");
1119 assert_eq!(steps[1].name, "staging-gate");
1120 assert_eq!(steps[2].name, "deploy-staging");
1121 assert_eq!(steps[3].name, "prod-gate");
1122 assert_eq!(steps[4].name, "deploy-prod");
1123
1124 for step in &steps {
1125 assert_eq!(step.status.state, StepStatus::Completed);
1126 }
1127 }
1128}