1use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::Utc;
15use serde_json::Value;
16use tracing::{error, info};
17use uuid::Uuid;
18
19use ironflow_core::provider::AgentProvider;
20use ironflow_store::error::StoreError;
21use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
22use ironflow_store::store::RunStore;
23
24use crate::context::WorkflowContext;
25use crate::error::EngineError;
26use crate::handler::{WorkflowHandler, WorkflowInfo};
27use crate::notify::{Event, EventPublisher, EventSubscriber};
28
29pub struct Engine {
70 store: Arc<dyn RunStore>,
71 provider: Arc<dyn AgentProvider>,
72 handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
73 event_publisher: EventPublisher,
74}
75
76impl Engine {
77 pub fn new(store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
93 Self {
94 store,
95 provider,
96 handlers: HashMap::new(),
97 event_publisher: EventPublisher::new(),
98 }
99 }
100
101 pub fn store(&self) -> &Arc<dyn RunStore> {
103 &self.store
104 }
105
106 pub fn provider(&self) -> &Arc<dyn AgentProvider> {
108 &self.provider
109 }
110
111 fn build_context(&self, run_id: Uuid) -> WorkflowContext {
113 let handlers = self.handlers.clone();
114 let resolver: crate::context::HandlerResolver =
115 Arc::new(move |name: &str| handlers.get(name).cloned());
116 WorkflowContext::with_handler_resolver(
117 run_id,
118 self.store.clone(),
119 self.provider.clone(),
120 resolver,
121 )
122 }
123
124 pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
168 let name = handler.name().to_string();
169 if self.handlers.contains_key(&name) {
170 return Err(EngineError::InvalidWorkflow(format!(
171 "handler '{}' already registered",
172 name
173 )));
174 }
175 self.handlers.insert(name, Arc::new(handler));
176 Ok(())
177 }
178
179 pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
186 let name = handler.name().to_string();
187 if self.handlers.contains_key(&name) {
188 return Err(EngineError::InvalidWorkflow(format!(
189 "handler '{}' already registered",
190 name
191 )));
192 }
193 self.handlers.insert(name, Arc::from(handler));
194 Ok(())
195 }
196
197 pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
199 self.handlers.get(name)
200 }
201
202 pub fn handler_names(&self) -> Vec<&str> {
204 self.handlers.keys().map(|s| s.as_str()).collect()
205 }
206
207 pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
209 self.handlers.get(name).map(|h| h.describe())
210 }
211
212 pub fn subscribe(
237 &mut self,
238 subscriber: impl EventSubscriber + 'static,
239 event_types: &[&'static str],
240 ) {
241 self.event_publisher.subscribe(subscriber, event_types);
242 }
243
244 pub fn event_publisher(&self) -> &EventPublisher {
249 &self.event_publisher
250 }
251
252 #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
282 pub async fn run_handler(
283 &self,
284 handler_name: &str,
285 trigger: TriggerKind,
286 payload: Value,
287 ) -> Result<Run, EngineError> {
288 let handler = self
289 .handlers
290 .get(handler_name)
291 .ok_or_else(|| {
292 EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
293 })?
294 .clone();
295
296 let run = self
297 .store
298 .create_run(NewRun {
299 workflow_name: handler_name.to_string(),
300 trigger,
301 payload,
302 max_retries: 0,
303 })
304 .await?;
305
306 let run_id = run.id;
307 info!(run_id = %run_id, "run created");
308
309 self.store
310 .update_run_status(run_id, RunStatus::Running)
311 .await?;
312
313 let run_start = Instant::now();
314 let mut ctx = self.build_context(run_id);
315
316 let result = handler.execute(&mut ctx).await;
317 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
318 .await
319 }
320
321 #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
330 pub async fn enqueue_handler(
331 &self,
332 handler_name: &str,
333 trigger: TriggerKind,
334 payload: Value,
335 max_retries: u32,
336 ) -> Result<Run, EngineError> {
337 if !self.handlers.contains_key(handler_name) {
338 return Err(EngineError::InvalidWorkflow(format!(
339 "no handler registered: {handler_name}"
340 )));
341 }
342
343 let run = self
344 .store
345 .create_run(NewRun {
346 workflow_name: handler_name.to_string(),
347 trigger,
348 payload,
349 max_retries,
350 })
351 .await?;
352
353 info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
354 Ok(run)
355 }
356
357 #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
366 pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
367 let run = self
368 .store
369 .get_run(run_id)
370 .await?
371 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
372
373 let handler = self
374 .handlers
375 .get(&run.workflow_name)
376 .ok_or_else(|| {
377 EngineError::InvalidWorkflow(format!(
378 "no handler registered: {}",
379 run.workflow_name
380 ))
381 })?
382 .clone();
383
384 let run_start = Instant::now();
385 let mut ctx = self.build_context(run_id);
386
387 let result = handler.execute(&mut ctx).await;
388 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
389 .await
390 }
391
392 #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
400 pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
401 self.execute_handler_run(run_id).await
402 }
403
404 #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
418 pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
419 let run = self
420 .store
421 .get_run(run_id)
422 .await?
423 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
424
425 let handler = self
426 .handlers
427 .get(&run.workflow_name)
428 .ok_or_else(|| {
429 EngineError::InvalidWorkflow(format!(
430 "no handler registered: {}",
431 run.workflow_name
432 ))
433 })?
434 .clone();
435
436 info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
437
438 let run_start = Instant::now();
439 let mut ctx = self.build_context(run_id);
440 ctx.load_replay_steps().await?;
441
442 let result = handler.execute(&mut ctx).await;
443 self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
444 .await
445 }
446
447 async fn finalize_run(
456 &self,
457 run_id: Uuid,
458 workflow_name: &str,
459 result: Result<(), EngineError>,
460 ctx: &WorkflowContext,
461 run_start: Instant,
462 ) -> Result<Run, EngineError> {
463 let total_duration = run_start.elapsed().as_millis() as u64;
464 let completed_at = Utc::now();
465
466 let final_status;
467
468 match result {
469 Ok(()) => {
470 final_status = RunStatus::Completed;
471 self.store
472 .update_run(
473 run_id,
474 RunUpdate {
475 status: Some(RunStatus::Completed),
476 cost_usd: Some(ctx.total_cost_usd()),
477 duration_ms: Some(total_duration),
478 completed_at: Some(completed_at),
479 ..RunUpdate::default()
480 },
481 )
482 .await?;
483
484 info!(
485 run_id = %run_id,
486 cost_usd = %ctx.total_cost_usd(),
487 duration_ms = total_duration,
488 "run completed"
489 );
490 }
491 Err(EngineError::ApprovalRequired {
492 run_id: approval_run_id,
493 step_id,
494 ref message,
495 }) => {
496 final_status = RunStatus::AwaitingApproval;
497 self.store
498 .update_run(
499 run_id,
500 RunUpdate {
501 status: Some(RunStatus::AwaitingApproval),
502 cost_usd: Some(ctx.total_cost_usd()),
503 duration_ms: Some(total_duration),
504 ..RunUpdate::default()
505 },
506 )
507 .await?;
508
509 info!(
510 run_id = %approval_run_id,
511 step_id = %step_id,
512 message = %message,
513 "run awaiting approval"
514 );
515 }
516 Err(err) => {
517 final_status = RunStatus::Failed;
518 if let Err(store_err) = self
519 .store
520 .update_run(
521 run_id,
522 RunUpdate {
523 status: Some(RunStatus::Failed),
524 error: Some(err.to_string()),
525 cost_usd: Some(ctx.total_cost_usd()),
526 duration_ms: Some(total_duration),
527 completed_at: Some(completed_at),
528 ..RunUpdate::default()
529 },
530 )
531 .await
532 {
533 error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
534 }
535
536 error!(run_id = %run_id, error = %err, "run failed");
537
538 self.publish_run_status_changed(
539 workflow_name,
540 run_id,
541 final_status,
542 Some(err.to_string()),
543 ctx,
544 total_duration,
545 );
546 return Err(err);
547 }
548 }
549
550 self.publish_run_status_changed(
551 workflow_name,
552 run_id,
553 final_status,
554 None,
555 ctx,
556 total_duration,
557 );
558
559 self.store
560 .get_run(run_id)
561 .await?
562 .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))
563 }
564
565 fn publish_run_status_changed(
570 &self,
571 workflow_name: &str,
572 run_id: Uuid,
573 to: RunStatus,
574 error: Option<String>,
575 ctx: &WorkflowContext,
576 duration_ms: u64,
577 ) {
578 self.event_publisher.publish(Event::RunStatusChanged {
579 run_id,
580 workflow_name: workflow_name.to_string(),
581 from: RunStatus::Running,
582 to,
583 error,
584 cost_usd: ctx.total_cost_usd(),
585 duration_ms,
586 at: Utc::now(),
587 });
588 }
589}
590
591impl fmt::Debug for Engine {
592 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
593 f.debug_struct("Engine")
594 .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
595 .finish_non_exhaustive()
596 }
597}
598
599#[cfg(test)]
600mod tests {
601 use super::*;
602 use crate::config::ShellConfig;
603 use crate::handler::{HandlerFuture, WorkflowHandler};
604 use ironflow_core::providers::claude::ClaudeCodeProvider;
605 use ironflow_core::providers::record_replay::RecordReplayProvider;
606 use ironflow_store::memory::InMemoryStore;
607 use ironflow_store::models::StepStatus;
608 use serde_json::json;
609
610 struct EchoWorkflow;
612
613 impl WorkflowHandler for EchoWorkflow {
614 fn name(&self) -> &str {
615 "echo-workflow"
616 }
617
618 fn describe(&self) -> WorkflowInfo {
619 WorkflowInfo {
620 description: "A simple workflow that echoes hello".to_string(),
621 source_code: None,
622 sub_workflows: Vec::new(),
623 }
624 }
625
626 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
627 Box::pin(async move {
628 ctx.shell("greet", ShellConfig::new("echo hello")).await?;
629 Ok(())
630 })
631 }
632 }
633
634 struct FailingWorkflow;
636
637 impl WorkflowHandler for FailingWorkflow {
638 fn name(&self) -> &str {
639 "failing-workflow"
640 }
641
642 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
643 Box::pin(async move {
644 ctx.shell("fail", ShellConfig::new("exit 1")).await?;
645 Ok(())
646 })
647 }
648 }
649
650 fn create_test_engine() -> Engine {
651 let store = Arc::new(InMemoryStore::new());
652 let inner = ClaudeCodeProvider::new();
653 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
654 inner,
655 "/tmp/ironflow-fixtures",
656 ));
657 Engine::new(store, provider)
658 }
659
660 #[test]
661 fn engine_new_creates_instance() {
662 let engine = create_test_engine();
663 assert_eq!(engine.handler_names().len(), 0);
664 }
665
666 #[test]
667 fn engine_register_handler() {
668 let mut engine = create_test_engine();
669 let result = engine.register(EchoWorkflow);
670 assert!(result.is_ok());
671 assert_eq!(engine.handler_names().len(), 1);
672 assert!(engine.handler_names().contains(&"echo-workflow"));
673 }
674
675 #[test]
676 fn engine_register_duplicate_returns_error() {
677 let mut engine = create_test_engine();
678 engine.register(EchoWorkflow).unwrap();
679 let result = engine.register(EchoWorkflow);
680 assert!(result.is_err());
681 }
682
683 #[test]
684 fn engine_get_handler_found() {
685 let mut engine = create_test_engine();
686 engine.register(EchoWorkflow).unwrap();
687 let handler = engine.get_handler("echo-workflow");
688 assert!(handler.is_some());
689 }
690
691 #[test]
692 fn engine_get_handler_not_found() {
693 let engine = create_test_engine();
694 let handler = engine.get_handler("nonexistent");
695 assert!(handler.is_none());
696 }
697
698 #[test]
699 fn engine_handler_names_lists_all() {
700 let mut engine = create_test_engine();
701 engine.register(EchoWorkflow).unwrap();
702 engine.register(FailingWorkflow).unwrap();
703 let names = engine.handler_names();
704 assert_eq!(names.len(), 2);
705 assert!(names.contains(&"echo-workflow"));
706 assert!(names.contains(&"failing-workflow"));
707 }
708
709 #[test]
710 fn engine_handler_info_returns_description() {
711 let mut engine = create_test_engine();
712 engine.register(EchoWorkflow).unwrap();
713 let info = engine.handler_info("echo-workflow");
714 assert!(info.is_some());
715 let info = info.unwrap();
716 assert_eq!(info.description, "A simple workflow that echoes hello");
717 }
718
719 #[tokio::test]
720 async fn engine_unknown_workflow_returns_error() {
721 let engine = create_test_engine();
722 let result = engine
723 .run_handler("unknown", TriggerKind::Manual, json!({}))
724 .await;
725 assert!(result.is_err());
726 match result {
727 Err(EngineError::InvalidWorkflow(msg)) => {
728 assert!(msg.contains("no handler registered"));
729 }
730 _ => panic!("expected InvalidWorkflow error"),
731 }
732 }
733
734 #[tokio::test]
735 async fn engine_enqueue_handler_creates_pending_run() {
736 let mut engine = create_test_engine();
737 engine.register(EchoWorkflow).unwrap();
738
739 let run = engine
740 .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
741 .await
742 .unwrap();
743 assert_eq!(run.status.state, RunStatus::Pending);
744 assert_eq!(run.workflow_name, "echo-workflow");
745 }
746
747 #[tokio::test]
748 async fn engine_register_boxed() {
749 let mut engine = create_test_engine();
750 let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
751 let result = engine.register_boxed(handler);
752 assert!(result.is_ok());
753 assert_eq!(engine.handler_names().len(), 1);
754 }
755
756 #[tokio::test]
757 async fn engine_store_and_provider_accessors() {
758 let store = Arc::new(InMemoryStore::new());
759 let inner = ClaudeCodeProvider::new();
760 let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
761 inner,
762 "/tmp/ironflow-fixtures",
763 ));
764 let engine = Engine::new(store.clone(), provider.clone());
765
766 let _ = engine.store();
768 let _ = engine.provider();
769 }
770
771 use crate::operation::Operation;
776 use ironflow_store::models::StepKind;
777 use std::future::Future;
778 use std::pin::Pin;
779
780 struct FakeGitlabOp {
781 project_id: u64,
782 title: String,
783 }
784
785 impl Operation for FakeGitlabOp {
786 fn kind(&self) -> &str {
787 "gitlab"
788 }
789
790 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
791 Box::pin(async move {
792 Ok(json!({
793 "issue_id": 42,
794 "project_id": self.project_id,
795 "title": self.title,
796 }))
797 })
798 }
799
800 fn input(&self) -> Option<Value> {
801 Some(json!({
802 "project_id": self.project_id,
803 "title": self.title,
804 }))
805 }
806 }
807
808 struct FailingOp;
809
810 impl Operation for FailingOp {
811 fn kind(&self) -> &str {
812 "broken-service"
813 }
814
815 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
816 Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
817 }
818 }
819
820 struct OperationWorkflow;
821
822 impl WorkflowHandler for OperationWorkflow {
823 fn name(&self) -> &str {
824 "operation-workflow"
825 }
826
827 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
828 Box::pin(async move {
829 let op = FakeGitlabOp {
830 project_id: 123,
831 title: "Bug report".to_string(),
832 };
833 ctx.operation("create-issue", &op).await?;
834 Ok(())
835 })
836 }
837 }
838
839 struct FailingOperationWorkflow;
840
841 impl WorkflowHandler for FailingOperationWorkflow {
842 fn name(&self) -> &str {
843 "failing-operation-workflow"
844 }
845
846 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
847 Box::pin(async move {
848 ctx.operation("broken-call", &FailingOp).await?;
849 Ok(())
850 })
851 }
852 }
853
854 struct MixedWorkflow;
855
856 impl WorkflowHandler for MixedWorkflow {
857 fn name(&self) -> &str {
858 "mixed-workflow"
859 }
860
861 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
862 Box::pin(async move {
863 ctx.shell("build", ShellConfig::new("echo built")).await?;
864 let op = FakeGitlabOp {
865 project_id: 456,
866 title: "Deploy done".to_string(),
867 };
868 let result = ctx.operation("notify-gitlab", &op).await?;
869 assert_eq!(result.output["issue_id"], 42);
870 Ok(())
871 })
872 }
873 }
874
875 #[tokio::test]
876 async fn operation_step_happy_path() {
877 let mut engine = create_test_engine();
878 engine.register(OperationWorkflow).unwrap();
879
880 let run = engine
881 .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
882 .await
883 .unwrap();
884
885 assert_eq!(run.status.state, RunStatus::Completed);
886
887 let steps = engine.store().list_steps(run.id).await.unwrap();
888
889 assert_eq!(steps.len(), 1);
890 assert_eq!(steps[0].name, "create-issue");
891 assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
892 assert_eq!(
893 steps[0].status.state,
894 ironflow_store::models::StepStatus::Completed
895 );
896
897 let output = steps[0].output.as_ref().unwrap();
898 assert_eq!(output["issue_id"], 42);
899 assert_eq!(output["project_id"], 123);
900
901 let input = steps[0].input.as_ref().unwrap();
902 assert_eq!(input["project_id"], 123);
903 assert_eq!(input["title"], "Bug report");
904 }
905
906 #[tokio::test]
907 async fn operation_step_failure_marks_run_failed() {
908 let mut engine = create_test_engine();
909 engine.register(FailingOperationWorkflow).unwrap();
910
911 let result = engine
912 .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
913 .await;
914
915 assert!(result.is_err());
916 }
917
918 #[tokio::test]
919 async fn operation_mixed_with_shell_steps() {
920 let mut engine = create_test_engine();
921 engine.register(MixedWorkflow).unwrap();
922
923 let run = engine
924 .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
925 .await
926 .unwrap();
927
928 assert_eq!(run.status.state, RunStatus::Completed);
929
930 let steps = engine.store().list_steps(run.id).await.unwrap();
931
932 assert_eq!(steps.len(), 2);
933 assert_eq!(steps[0].kind, StepKind::Shell);
934 assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
935 assert_eq!(steps[0].position, 0);
936 assert_eq!(steps[1].position, 1);
937 }
938
939 use crate::config::ApprovalConfig;
944
945 struct SingleApprovalWorkflow;
946
947 impl WorkflowHandler for SingleApprovalWorkflow {
948 fn name(&self) -> &str {
949 "single-approval"
950 }
951
952 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
953 Box::pin(async move {
954 ctx.shell("build", ShellConfig::new("echo built")).await?;
955 ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
956 ctx.shell("deploy", ShellConfig::new("echo deployed"))
957 .await?;
958 Ok(())
959 })
960 }
961 }
962
963 struct DoubleApprovalWorkflow;
964
965 impl WorkflowHandler for DoubleApprovalWorkflow {
966 fn name(&self) -> &str {
967 "double-approval"
968 }
969
970 fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
971 Box::pin(async move {
972 ctx.shell("build", ShellConfig::new("echo built")).await?;
973 ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
974 .await?;
975 ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
976 .await?;
977 ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
978 .await?;
979 ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
980 .await?;
981 Ok(())
982 })
983 }
984 }
985
986 #[tokio::test]
987 async fn approval_pauses_run() {
988 let mut engine = create_test_engine();
989 engine.register(SingleApprovalWorkflow).unwrap();
990
991 let run = engine
992 .run_handler("single-approval", TriggerKind::Manual, json!({}))
993 .await
994 .unwrap();
995
996 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
997
998 let steps = engine.store().list_steps(run.id).await.unwrap();
999 assert_eq!(steps.len(), 2); assert_eq!(steps[0].kind, StepKind::Shell);
1001 assert_eq!(steps[0].status.state, StepStatus::Completed);
1002 assert_eq!(steps[1].kind, StepKind::Approval);
1003 assert_eq!(steps[1].status.state, StepStatus::Running);
1004 }
1005
1006 #[tokio::test]
1007 async fn approval_resume_completes_run() {
1008 let mut engine = create_test_engine();
1009 engine.register(SingleApprovalWorkflow).unwrap();
1010
1011 let run = engine
1013 .run_handler("single-approval", TriggerKind::Manual, json!({}))
1014 .await
1015 .unwrap();
1016 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1017
1018 engine
1020 .store()
1021 .update_run_status(run.id, RunStatus::Running)
1022 .await
1023 .unwrap();
1024
1025 let resumed = engine.resume_run(run.id).await.unwrap();
1027 assert_eq!(resumed.status.state, RunStatus::Completed);
1028
1029 let steps = engine.store().list_steps(run.id).await.unwrap();
1030 assert_eq!(steps.len(), 3); assert_eq!(steps[0].name, "build");
1032 assert_eq!(steps[0].status.state, StepStatus::Completed);
1033 assert_eq!(steps[1].name, "gate");
1034 assert_eq!(steps[1].kind, StepKind::Approval);
1035 assert_eq!(steps[1].status.state, StepStatus::Completed);
1036 assert_eq!(steps[2].name, "deploy");
1037 assert_eq!(steps[2].status.state, StepStatus::Completed);
1038 }
1039
1040 #[tokio::test]
1041 async fn double_approval_two_resumes() {
1042 let mut engine = create_test_engine();
1043 engine.register(DoubleApprovalWorkflow).unwrap();
1044
1045 let run = engine
1047 .run_handler("double-approval", TriggerKind::Manual, json!({}))
1048 .await
1049 .unwrap();
1050 assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1051
1052 let steps = engine.store().list_steps(run.id).await.unwrap();
1053 assert_eq!(steps.len(), 2); engine
1057 .store()
1058 .update_run_status(run.id, RunStatus::Running)
1059 .await
1060 .unwrap();
1061
1062 let resumed = engine.resume_run(run.id).await.unwrap();
1063 assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1064
1065 let steps = engine.store().list_steps(run.id).await.unwrap();
1066 assert_eq!(steps.len(), 4); engine
1070 .store()
1071 .update_run_status(run.id, RunStatus::Running)
1072 .await
1073 .unwrap();
1074
1075 let final_run = engine.resume_run(run.id).await.unwrap();
1076 assert_eq!(final_run.status.state, RunStatus::Completed);
1077
1078 let steps = engine.store().list_steps(run.id).await.unwrap();
1079 assert_eq!(steps.len(), 5);
1080 assert_eq!(steps[0].name, "build");
1081 assert_eq!(steps[1].name, "staging-gate");
1082 assert_eq!(steps[2].name, "deploy-staging");
1083 assert_eq!(steps[3].name, "prod-gate");
1084 assert_eq!(steps[4].name, "deploy-prod");
1085
1086 for step in &steps {
1087 assert_eq!(step.status.state, StepStatus::Completed);
1088 }
1089 }
1090}