1use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::Utc;
15use serde_json::Value;
16use tracing::{error, info};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
24use ironflow_store::store::RunStore;
25#[cfg(feature = "prometheus")]
26use metrics::{counter, gauge, histogram};
27
28use crate::context::WorkflowContext;
29use crate::error::EngineError;
30use crate::handler::{WorkflowHandler, WorkflowInfo};
31use crate::notify::{Event, EventPublisher, EventSubscriber};
32
33pub struct Engine {
74 store: Arc<dyn RunStore>,
75 provider: Arc<dyn AgentProvider>,
76 handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
77 event_publisher: EventPublisher,
78}
79
80impl Engine {
81 pub fn new(store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
97 Self {
98 store,
99 provider,
100 handlers: HashMap::new(),
101 event_publisher: EventPublisher::new(),
102 }
103 }
104
105 pub fn store(&self) -> &Arc<dyn RunStore> {
107 &self.store
108 }
109
110 pub fn provider(&self) -> &Arc<dyn AgentProvider> {
112 &self.provider
113 }
114
115 fn build_context(&self, run_id: Uuid) -> WorkflowContext {
117 let handlers = self.handlers.clone();
118 let resolver: crate::context::HandlerResolver =
119 Arc::new(move |name: &str| handlers.get(name).cloned());
120 WorkflowContext::with_handler_resolver(
121 run_id,
122 self.store.clone(),
123 self.provider.clone(),
124 resolver,
125 )
126 }
127
128 pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
172 let name = handler.name().to_string();
173 if self.handlers.contains_key(&name) {
174 return Err(EngineError::InvalidWorkflow(format!(
175 "handler '{}' already registered",
176 name
177 )));
178 }
179 self.handlers.insert(name, Arc::new(handler));
180 Ok(())
181 }
182
183 pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
190 let name = handler.name().to_string();
191 if self.handlers.contains_key(&name) {
192 return Err(EngineError::InvalidWorkflow(format!(
193 "handler '{}' already registered",
194 name
195 )));
196 }
197 self.handlers.insert(name, Arc::from(handler));
198 Ok(())
199 }
200
201 pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
203 self.handlers.get(name)
204 }
205
206 pub fn handler_names(&self) -> Vec<&str> {
208 self.handlers.keys().map(|s| s.as_str()).collect()
209 }
210
211 pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
213 self.handlers.get(name).map(|h| h.describe())
214 }
215
216 pub fn subscribe(
241 &mut self,
242 subscriber: impl EventSubscriber + 'static,
243 event_types: &[&'static str],
244 ) {
245 self.event_publisher.subscribe(subscriber, event_types);
246 }
247
248 pub fn event_publisher(&self) -> &EventPublisher {
253 &self.event_publisher
254 }
255
256 #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
286 pub async fn run_handler(
287 &self,
288 handler_name: &str,
289 trigger: TriggerKind,
290 payload: Value,
291 ) -> Result<Run, EngineError> {
292 let handler = self
293 .handlers
294 .get(handler_name)
295 .ok_or_else(|| {
296 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
297 })?
298 .clone();
299
300 let run = self
301 .store
302 .create_run(NewRun {
303 workflow_name: handler_name.to_string(),
304 trigger,
305 payload,
306 max_retries: 0,
307 })
308 .await?;
309
310 let run_id = run.id;
311 info!(run_id = %run_id, "run created");
312
313 self.store
314 .update_run_status(run_id, RunStatus::Running)
315 .await?;
316
317 #[cfg(feature = "prometheus")]
318 gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
319
320 let run_start = Instant::now();
321 let mut ctx = self.build_context(run_id);
322
323 let result = handler.execute(&mut ctx).await;
324 self.finalize_run(run_id, handler_name, result, &ctx, run_start)
325 .await
326 }
327
328 #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
337 pub async fn enqueue_handler(
338 &self,
339 handler_name: &str,
340 trigger: TriggerKind,
341 payload: Value,
342 max_retries: u32,
343 ) -> Result<Run, EngineError> {
344 if !self.handlers.contains_key(handler_name) {
345 return Err(EngineError::InvalidWorkflow(format!(
346 "no handler registered: {handler_name}"
347 )));
348 }
349
350 let run = self
351 .store
352 .create_run(NewRun {
353 workflow_name: handler_name.to_string(),
354 trigger,
355 payload,
356 max_retries,
357 })
358 .await?;
359
360 info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
361 Ok(run)
362 }
363
364 #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
373 pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
374 let run = self
375 .store
376 .get_run(run_id)
377 .await?
378 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
379
380 let handler = self
381 .handlers
382 .get(&run.workflow_name)
383 .ok_or_else(|| {
384 EngineError::InvalidWorkflow(format!(
385 "no handler registered: {}",
386 run.workflow_name
387 ))
388 })?
389 .clone();
390
391 #[cfg(feature = "prometheus")]
392 gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
393
394 let run_start = Instant::now();
395 let mut ctx = self.build_context(run_id);
396
397 let result = handler.execute(&mut ctx).await;
398 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
399 .await
400 }
401
402 #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
410 pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
411 self.execute_handler_run(run_id).await
412 }
413
414 #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
428 pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
429 let run = self
430 .store
431 .get_run(run_id)
432 .await?
433 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
434
435 let handler = self
436 .handlers
437 .get(&run.workflow_name)
438 .ok_or_else(|| {
439 EngineError::InvalidWorkflow(format!(
440 "no handler registered: {}",
441 run.workflow_name
442 ))
443 })?
444 .clone();
445
446 info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
447
448 let run_start = Instant::now();
449 let mut ctx = self.build_context(run_id);
450 ctx.load_replay_steps().await?;
451
452 let result = handler.execute(&mut ctx).await;
453 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
454 .await
455 }
456
457 async fn finalize_run(
466 &self,
467 run_id: Uuid,
468 workflow_name: &str,
469 result: Result<(), EngineError>,
470 ctx: &WorkflowContext,
471 run_start: Instant,
472 ) -> Result<Run, EngineError> {
473 let total_duration = run_start.elapsed().as_millis() as u64;
474 let completed_at = Utc::now();
475
476 let final_status;
477
478 match result {
479 Ok(()) => {
480 final_status = RunStatus::Completed;
481 self.store
482 .update_run(
483 run_id,
484 RunUpdate {
485 status: Some(RunStatus::Completed),
486 cost_usd: Some(ctx.total_cost_usd()),
487 duration_ms: Some(total_duration),
488 completed_at: Some(completed_at),
489 ..RunUpdate::default()
490 },
491 )
492 .await?;
493
494 info!(
495 run_id = %run_id,
496 cost_usd = %ctx.total_cost_usd(),
497 duration_ms = total_duration,
498 "run completed"
499 );
500 }
501 Err(EngineError::ApprovalRequired {
502 run_id: approval_run_id,
503 step_id,
504 ref message,
505 }) => {
506 final_status = RunStatus::AwaitingApproval;
507 self.store
508 .update_run(
509 run_id,
510 RunUpdate {
511 status: Some(RunStatus::AwaitingApproval),
512 cost_usd: Some(ctx.total_cost_usd()),
513 duration_ms: Some(total_duration),
514 ..RunUpdate::default()
515 },
516 )
517 .await?;
518
519 info!(
520 run_id = %approval_run_id,
521 step_id = %step_id,
522 message = %message,
523 "run awaiting approval"
524 );
525 }
526 Err(err) => {
527 final_status = RunStatus::Failed;
528 if let Err(store_err) = self
529 .store
530 .update_run(
531 run_id,
532 RunUpdate {
533 status: Some(RunStatus::Failed),
534 error: Some(err.to_string()),
535 cost_usd: Some(ctx.total_cost_usd()),
536 duration_ms: Some(total_duration),
537 completed_at: Some(completed_at),
538 ..RunUpdate::default()
539 },
540 )
541 .await
542 {
543 error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
544 }
545
546 error!(run_id = %run_id, error = %err, "run failed");
547
548 self.publish_run_status_changed(
549 workflow_name,
550 run_id,
551 final_status,
552 Some(err.to_string()),
553 ctx,
554 total_duration,
555 );
556
557 #[cfg(feature = "prometheus")]
558 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
559
560 return Err(err);
561 }
562 }
563
564 self.publish_run_status_changed(
565 workflow_name,
566 run_id,
567 final_status,
568 None,
569 ctx,
570 total_duration,
571 );
572
573 #[cfg(feature = "prometheus")]
574 self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
575
576 self.store
577 .get_run(run_id)
578 .await?
579 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))
580 }
581
582 #[cfg(feature = "prometheus")]
584 fn emit_run_metrics(
585 &self,
586 workflow_name: &str,
587 status: RunStatus,
588 duration_ms: u64,
589 ctx: &WorkflowContext,
590 ) {
591 let status_str = status.to_string();
592 let wf = workflow_name.to_string();
593
594 counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
595 histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
596 .record(duration_ms as f64 / 1000.0);
597 histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
598 ctx.total_cost_usd()
599 .to_string()
600 .parse::<f64>()
601 .unwrap_or(0.0),
602 );
603 gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
604 }
605
606 fn publish_run_status_changed(
611 &self,
612 workflow_name: &str,
613 run_id: Uuid,
614 to: RunStatus,
615 error: Option<String>,
616 ctx: &WorkflowContext,
617 duration_ms: u64,
618 ) {
619 self.event_publisher.publish(Event::RunStatusChanged {
620 run_id,
621 workflow_name: workflow_name.to_string(),
622 from: RunStatus::Running,
623 to,
624 error,
625 cost_usd: ctx.total_cost_usd(),
626 duration_ms,
627 at: Utc::now(),
628 });
629 }
630}
631
632impl fmt::Debug for Engine {
633 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
634 f.debug_struct("Engine")
635 .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
636 .finish_non_exhaustive()
637 }
638}
639
640#[cfg(test)]
641mod tests {
642 use super::*;
643 use crate::config::ShellConfig;
644 use crate::handler::{HandlerFuture, WorkflowHandler};
645 use ironflow_core::providers::claude::ClaudeCodeProvider;
646 use ironflow_core::providers::record_replay::RecordReplayProvider;
647 use ironflow_store::memory::InMemoryStore;
648 use ironflow_store::models::StepStatus;
649 use serde_json::json;
650
651 struct EchoWorkflow;
653
654 impl WorkflowHandler for EchoWorkflow {
655 fn name(&self) -> &str {
656 "echo-workflow"
657 }
658
659 fn describe(&self) -> WorkflowInfo {
660 WorkflowInfo {
661 description: "A simple workflow that echoes hello".to_string(),
662 source_code: None,
663 sub_workflows: Vec::new(),
664 }
665 }
666
667 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
668 Box::pin(async move {
669 ctx.shell("greet", ShellConfig::new("echo hello")).await?;
670 Ok(())
671 })
672 }
673 }
674
675 struct FailingWorkflow;
677
678 impl WorkflowHandler for FailingWorkflow {
679 fn name(&self) -> &str {
680 "failing-workflow"
681 }
682
683 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
684 Box::pin(async move {
685 ctx.shell("fail", ShellConfig::new("exit 1")).await?;
686 Ok(())
687 })
688 }
689 }
690
691 fn create_test_engine() -> Engine {
692 let store = Arc::new(InMemoryStore::new());
693 let inner = ClaudeCodeProvider::new();
694 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
695 inner,
696 "/tmp/ironflow-fixtures",
697 ));
698 Engine::new(store, provider)
699 }
700
701 #[test]
702 fn engine_new_creates_instance() {
703 let engine = create_test_engine();
704 assert_eq!(engine.handler_names().len(), 0);
705 }
706
707 #[test]
708 fn engine_register_handler() {
709 let mut engine = create_test_engine();
710 let result = engine.register(EchoWorkflow);
711 assert!(result.is_ok());
712 assert_eq!(engine.handler_names().len(), 1);
713 assert!(engine.handler_names().contains(&"echo-workflow"));
714 }
715
716 #[test]
717 fn engine_register_duplicate_returns_error() {
718 let mut engine = create_test_engine();
719 engine.register(EchoWorkflow).unwrap();
720 let result = engine.register(EchoWorkflow);
721 assert!(result.is_err());
722 }
723
724 #[test]
725 fn engine_get_handler_found() {
726 let mut engine = create_test_engine();
727 engine.register(EchoWorkflow).unwrap();
728 let handler = engine.get_handler("echo-workflow");
729 assert!(handler.is_some());
730 }
731
732 #[test]
733 fn engine_get_handler_not_found() {
734 let engine = create_test_engine();
735 let handler = engine.get_handler("nonexistent");
736 assert!(handler.is_none());
737 }
738
739 #[test]
740 fn engine_handler_names_lists_all() {
741 let mut engine = create_test_engine();
742 engine.register(EchoWorkflow).unwrap();
743 engine.register(FailingWorkflow).unwrap();
744 let names = engine.handler_names();
745 assert_eq!(names.len(), 2);
746 assert!(names.contains(&"echo-workflow"));
747 assert!(names.contains(&"failing-workflow"));
748 }
749
750 #[test]
751 fn engine_handler_info_returns_description() {
752 let mut engine = create_test_engine();
753 engine.register(EchoWorkflow).unwrap();
754 let info = engine.handler_info("echo-workflow");
755 assert!(info.is_some());
756 let info = info.unwrap();
757 assert_eq!(info.description, "A simple workflow that echoes hello");
758 }
759
760 #[tokio::test]
761 async fn engine_unknown_workflow_returns_error() {
762 let engine = create_test_engine();
763 let result = engine
764 .run_handler("unknown", TriggerKind::Manual, json!({}))
765 .await;
766 assert!(result.is_err());
767 match result {
768 Err(EngineError::InvalidWorkflow(msg)) => {
769 assert!(msg.contains("no handler registered"));
770 }
771 _ => panic!("expected InvalidWorkflow error"),
772 }
773 }
774
775 #[tokio::test]
776 async fn engine_enqueue_handler_creates_pending_run() {
777 let mut engine = create_test_engine();
778 engine.register(EchoWorkflow).unwrap();
779
780 let run = engine
781 .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
782 .await
783 .unwrap();
784 assert_eq!(run.status.state, RunStatus::Pending);
785 assert_eq!(run.workflow_name, "echo-workflow");
786 }
787
788 #[tokio::test]
789 async fn engine_register_boxed() {
790 let mut engine = create_test_engine();
791 let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
792 let result = engine.register_boxed(handler);
793 assert!(result.is_ok());
794 assert_eq!(engine.handler_names().len(), 1);
795 }
796
797 #[tokio::test]
798 async fn engine_store_and_provider_accessors() {
799 let store = Arc::new(InMemoryStore::new());
800 let inner = ClaudeCodeProvider::new();
801 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
802 inner,
803 "/tmp/ironflow-fixtures",
804 ));
805 let engine = Engine::new(store.clone(), provider.clone());
806
807 let _ = engine.store();
809 let _ = engine.provider();
810 }
811
812 use crate::operation::Operation;
817 use ironflow_store::models::StepKind;
818 use std::future::Future;
819 use std::pin::Pin;
820
821 struct FakeGitlabOp {
822 project_id: u64,
823 title: String,
824 }
825
826 impl Operation for FakeGitlabOp {
827 fn kind(&self) -> &str {
828 "gitlab"
829 }
830
831 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
832 Box::pin(async move {
833 Ok(json!({
834 "issue_id": 42,
835 "project_id": self.project_id,
836 "title": self.title,
837 }))
838 })
839 }
840
841 fn input(&self) -> Option<Value> {
842 Some(json!({
843 "project_id": self.project_id,
844 "title": self.title,
845 }))
846 }
847 }
848
849 struct FailingOp;
850
851 impl Operation for FailingOp {
852 fn kind(&self) -> &str {
853 "broken-service"
854 }
855
856 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
857 Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
858 }
859 }
860
861 struct OperationWorkflow;
862
863 impl WorkflowHandler for OperationWorkflow {
864 fn name(&self) -> &str {
865 "operation-workflow"
866 }
867
868 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
869 Box::pin(async move {
870 let op = FakeGitlabOp {
871 project_id: 123,
872 title: "Bug report".to_string(),
873 };
874 ctx.operation("create-issue", &op).await?;
875 Ok(())
876 })
877 }
878 }
879
880 struct FailingOperationWorkflow;
881
882 impl WorkflowHandler for FailingOperationWorkflow {
883 fn name(&self) -> &str {
884 "failing-operation-workflow"
885 }
886
887 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
888 Box::pin(async move {
889 ctx.operation("broken-call", &FailingOp).await?;
890 Ok(())
891 })
892 }
893 }
894
895 struct MixedWorkflow;
896
897 impl WorkflowHandler for MixedWorkflow {
898 fn name(&self) -> &str {
899 "mixed-workflow"
900 }
901
902 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
903 Box::pin(async move {
904 ctx.shell("build", ShellConfig::new("echo built")).await?;
905 let op = FakeGitlabOp {
906 project_id: 456,
907 title: "Deploy done".to_string(),
908 };
909 let result = ctx.operation("notify-gitlab", &op).await?;
910 assert_eq!(result.output["issue_id"], 42);
911 Ok(())
912 })
913 }
914 }
915
916 #[tokio::test]
917 async fn operation_step_happy_path() {
918 let mut engine = create_test_engine();
919 engine.register(OperationWorkflow).unwrap();
920
921 let run = engine
922 .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
923 .await
924 .unwrap();
925
926 assert_eq!(run.status.state, RunStatus::Completed);
927
928 let steps = engine.store().list_steps(run.id).await.unwrap();
929
930 assert_eq!(steps.len(), 1);
931 assert_eq!(steps[0].name, "create-issue");
932 assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
933 assert_eq!(
934 steps[0].status.state,
935 ironflow_store::models::StepStatus::Completed
936 );
937
938 let output = steps[0].output.as_ref().unwrap();
939 assert_eq!(output["issue_id"], 42);
940 assert_eq!(output["project_id"], 123);
941
942 let input = steps[0].input.as_ref().unwrap();
943 assert_eq!(input["project_id"], 123);
944 assert_eq!(input["title"], "Bug report");
945 }
946
947 #[tokio::test]
948 async fn operation_step_failure_marks_run_failed() {
949 let mut engine = create_test_engine();
950 engine.register(FailingOperationWorkflow).unwrap();
951
952 let result = engine
953 .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
954 .await;
955
956 assert!(result.is_err());
957 }
958
959 #[tokio::test]
960 async fn operation_mixed_with_shell_steps() {
961 let mut engine = create_test_engine();
962 engine.register(MixedWorkflow).unwrap();
963
964 let run = engine
965 .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
966 .await
967 .unwrap();
968
969 assert_eq!(run.status.state, RunStatus::Completed);
970
971 let steps = engine.store().list_steps(run.id).await.unwrap();
972
973 assert_eq!(steps.len(), 2);
974 assert_eq!(steps[0].kind, StepKind::Shell);
975 assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
976 assert_eq!(steps[0].position, 0);
977 assert_eq!(steps[1].position, 1);
978 }
979
980 use crate::config::ApprovalConfig;
985
986 struct SingleApprovalWorkflow;
987
988 impl WorkflowHandler for SingleApprovalWorkflow {
989 fn name(&self) -> &str {
990 "single-approval"
991 }
992
993 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
994 Box::pin(async move {
995 ctx.shell("build", ShellConfig::new("echo built")).await?;
996 ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
997 ctx.shell("deploy", ShellConfig::new("echo deployed"))
998 .await?;
999 Ok(())
1000 })
1001 }
1002 }
1003
1004 struct DoubleApprovalWorkflow;
1005
1006 impl WorkflowHandler for DoubleApprovalWorkflow {
1007 fn name(&self) -> &str {
1008 "double-approval"
1009 }
1010
1011 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1012 Box::pin(async move {
1013 ctx.shell("build", ShellConfig::new("echo built")).await?;
1014 ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1015 .await?;
1016 ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1017 .await?;
1018 ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1019 .await?;
1020 ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1021 .await?;
1022 Ok(())
1023 })
1024 }
1025 }
1026
1027 #[tokio::test]
1028 async fn approval_pauses_run() {
1029 let mut engine = create_test_engine();
1030 engine.register(SingleApprovalWorkflow).unwrap();
1031
1032 let run = engine
1033 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1034 .await
1035 .unwrap();
1036
1037 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1038
1039 let steps = engine.store().list_steps(run.id).await.unwrap();
1040 assert_eq!(steps.len(), 2); assert_eq!(steps[0].kind, StepKind::Shell);
1042 assert_eq!(steps[0].status.state, StepStatus::Completed);
1043 assert_eq!(steps[1].kind, StepKind::Approval);
1044 assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1045 }
1046
1047 #[tokio::test]
1048 async fn approval_resume_completes_run() {
1049 let mut engine = create_test_engine();
1050 engine.register(SingleApprovalWorkflow).unwrap();
1051
1052 let run = engine
1054 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1055 .await
1056 .unwrap();
1057 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1058
1059 engine
1061 .store()
1062 .update_run_status(run.id, RunStatus::Running)
1063 .await
1064 .unwrap();
1065
1066 let resumed = engine.resume_run(run.id).await.unwrap();
1068 assert_eq!(resumed.status.state, RunStatus::Completed);
1069
1070 let steps = engine.store().list_steps(run.id).await.unwrap();
1071 assert_eq!(steps.len(), 3); assert_eq!(steps[0].name, "build");
1073 assert_eq!(steps[0].status.state, StepStatus::Completed);
1074 assert_eq!(steps[1].name, "gate");
1075 assert_eq!(steps[1].kind, StepKind::Approval);
1076 assert_eq!(steps[1].status.state, StepStatus::Completed);
1077 assert_eq!(steps[2].name, "deploy");
1078 assert_eq!(steps[2].status.state, StepStatus::Completed);
1079 }
1080
1081 #[tokio::test]
1082 async fn double_approval_two_resumes() {
1083 let mut engine = create_test_engine();
1084 engine.register(DoubleApprovalWorkflow).unwrap();
1085
1086 let run = engine
1088 .run_handler("double-approval", TriggerKind::Manual, json!({}))
1089 .await
1090 .unwrap();
1091 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1092
1093 let steps = engine.store().list_steps(run.id).await.unwrap();
1094 assert_eq!(steps.len(), 2); engine
1098 .store()
1099 .update_run_status(run.id, RunStatus::Running)
1100 .await
1101 .unwrap();
1102
1103 let resumed = engine.resume_run(run.id).await.unwrap();
1104 assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1105
1106 let steps = engine.store().list_steps(run.id).await.unwrap();
1107 assert_eq!(steps.len(), 4); engine
1111 .store()
1112 .update_run_status(run.id, RunStatus::Running)
1113 .await
1114 .unwrap();
1115
1116 let final_run = engine.resume_run(run.id).await.unwrap();
1117 assert_eq!(final_run.status.state, RunStatus::Completed);
1118
1119 let steps = engine.store().list_steps(run.id).await.unwrap();
1120 assert_eq!(steps.len(), 5);
1121 assert_eq!(steps[0].name, "build");
1122 assert_eq!(steps[1].name, "staging-gate");
1123 assert_eq!(steps[2].name, "deploy-staging");
1124 assert_eq!(steps[3].name, "prod-gate");
1125 assert_eq!(steps[4].name, "deploy-prod");
1126
1127 for step in &steps {
1128 assert_eq!(step.status.state, StepStatus::Completed);
1129 }
1130 }
1131}