1use std::collections::{HashMap, HashSet};
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use crate::cancellation::CancellationToken;
7use crate::cancellation_reason::CancellationReason;
8use crate::dsl::{
9 detect_workflow_cycles, ValidationError, WorkflowDef, WorkflowNode, QUALITY_GATE_TYPE,
10};
11use crate::engine::{
12 run_workflow_engine, ChildWorkflowContext, ChildWorkflowRunner, ExecutionState,
13};
14use crate::engine_error::EngineError;
15use crate::events::EventSink;
16use crate::output_schema::OutputSchema;
17use crate::status::WorkflowRunStatus;
18use crate::traits::action_executor::{ActionExecutor, ActionRegistry};
19use crate::traits::gate_resolver::{GateResolver, GateResolverRegistry};
20use crate::traits::item_provider::{ItemProvider, ItemProviderRegistry};
21use crate::traits::persistence::WorkflowPersistence;
22use crate::traits::run_context::RunContext;
23use crate::traits::script_env_provider::{NoOpScriptEnvProvider, ScriptEnvProvider};
24use crate::traits::workflow_resolver::WorkflowResolver;
25use crate::types::{WorkflowExecConfig, WorkflowResult, WorkflowRunStep};
26use crate::workflow_resolver_directory::DirectoryWorkflowResolver;
27
28struct ActiveRunEntry {
35 token: CancellationToken,
36 shutdown: Arc<AtomicBool>,
37 persistence: Arc<dyn WorkflowPersistence>,
38 registry: Arc<ActionRegistry>,
39 exec_info: Arc<Mutex<Option<(String, String)>>>,
41 refresh_stop: Arc<AtomicBool>,
43 refresh_thread: Option<std::thread::Thread>,
45 refresh_handle: Option<std::thread::JoinHandle<()>>,
47}
48
49struct RefreshContext {
54 persistence: Arc<dyn WorkflowPersistence>,
55 run_id: String,
56 token: String,
57 ttl_seconds: i64,
58 refresh_interval: Duration,
59 stop: Arc<AtomicBool>,
60 cancellation: CancellationToken,
61 shutdown: Arc<AtomicBool>,
62 registry: Arc<ActionRegistry>,
63 exec_info: Arc<Mutex<Option<(String, String)>>>,
64}
65
66fn refresh_lease_loop(ctx: RefreshContext) {
67 loop {
68 std::thread::park_timeout(ctx.refresh_interval);
69 if ctx.stop.load(Ordering::Relaxed) {
70 return;
71 }
72 match ctx
73 .persistence
74 .acquire_lease(&ctx.run_id, &ctx.token, ctx.ttl_seconds)
75 {
76 Ok(Some(_)) => {} Ok(None) => {
78 tracing::warn!(
79 "run {}: lease reclaimed by another engine, aborting",
80 ctx.run_id
81 );
82 signal_lease_abort(ctx.shutdown, ctx.cancellation, ctx.registry, ctx.exec_info);
83 return;
84 }
85 Err(e) => {
86 tracing::warn!("run {}: lease refresh DB error: {e}, aborting", ctx.run_id);
87 signal_lease_abort(ctx.shutdown, ctx.cancellation, ctx.registry, ctx.exec_info);
88 return;
89 }
90 }
91 }
92}
93
94fn signal_lease_abort(
95 shutdown: Arc<AtomicBool>,
96 cancellation: CancellationToken,
97 registry: Arc<ActionRegistry>,
98 exec_info: Arc<Mutex<Option<(String, String)>>>,
99) {
100 shutdown.store(true, Ordering::SeqCst);
101 cancellation.cancel(CancellationReason::LeaseLost);
102 let snap = exec_info.lock().unwrap_or_else(|e| e.into_inner()).clone();
103 if let Some((exec_label, step_id)) = snap {
104 std::thread::spawn(move || {
105 if let Err(e) = registry.cancel(&exec_label, &step_id) {
106 tracing::warn!("lease abort: cancel step '{step_id}' failed: {e}");
107 }
108 });
109 }
110}
111
112fn stop_refresh_thread(stop: &AtomicBool, thread: Option<&std::thread::Thread>) {
114 stop.store(true, Ordering::SeqCst);
115 if let Some(t) = thread {
116 t.unpark();
117 }
118}
119
120pub struct FlowEngine {
124 pub(crate) action_registry: ActionRegistry,
125 pub(crate) item_provider_registry: ItemProviderRegistry,
126 pub(crate) gate_resolver_registry: GateResolverRegistry,
127 #[allow(dead_code)]
129 pub(crate) script_env_provider: Arc<dyn ScriptEnvProvider>,
130 pub(crate) workflow_resolver: Option<Arc<dyn WorkflowResolver>>,
131 pub(crate) event_sinks: Vec<Arc<dyn EventSink>>,
132 active_runs: Mutex<HashMap<String, ActiveRunEntry>>,
135}
136
137#[non_exhaustive]
142pub struct RunInput {
143 pub persistence: Arc<dyn WorkflowPersistence>,
144 pub workflow_run_id: String,
145 pub workflow_name: String,
146 pub action_registry: Arc<ActionRegistry>,
147 pub item_provider_registry: Arc<ItemProviderRegistry>,
148 pub script_env_provider: Arc<dyn ScriptEnvProvider>,
149 pub run_ctx: Arc<dyn RunContext>,
150 pub extra_plugin_dirs: Vec<String>,
151 pub model: Option<String>,
152 pub exec_config: WorkflowExecConfig,
153 pub inputs: HashMap<String, String>,
154 pub parent_run_id: String,
156 pub depth: u32,
157 pub target_label: Option<String>,
158 pub default_as_identity: Option<String>,
159 pub triggered_by_hook: bool,
160 #[allow(clippy::type_complexity)]
161 pub schema_resolver:
162 Option<Arc<dyn Fn(&str) -> crate::engine_error::Result<OutputSchema> + Send + Sync>>,
163 pub child_runner: Option<Arc<dyn ChildWorkflowRunner>>,
164 pub cancellation: CancellationToken,
165 pub event_sinks: Vec<Arc<dyn EventSink>>,
167}
168
169#[non_exhaustive]
178pub struct ChildRunInput {
179 pub workflow_run_id: String,
181 pub persistence: Arc<dyn WorkflowPersistence>,
182 pub action_registry: Arc<ActionRegistry>,
183 pub item_provider_registry: Arc<ItemProviderRegistry>,
184 pub script_env_provider: Arc<dyn ScriptEnvProvider>,
185 pub child_runner: Option<Arc<dyn ChildWorkflowRunner>>,
187 #[allow(clippy::type_complexity)]
188 pub schema_resolver:
189 Option<Arc<dyn Fn(&str) -> crate::engine_error::Result<OutputSchema> + Send + Sync>>,
190 pub as_identity: Option<String>,
192 pub depth: u32,
193 pub cancellation: CancellationToken,
194 pub target_label: Option<String>,
195 pub triggered_by_hook: bool,
196 pub inputs_override: Option<HashMap<String, String>>,
198}
199
200impl RunInput {
201 #[allow(clippy::too_many_arguments)]
205 pub fn new(
206 persistence: Arc<dyn WorkflowPersistence>,
207 workflow_run_id: String,
208 workflow_name: String,
209 action_registry: Arc<ActionRegistry>,
210 item_provider_registry: Arc<ItemProviderRegistry>,
211 script_env_provider: Arc<dyn ScriptEnvProvider>,
212 run_ctx: Arc<dyn RunContext>,
213 cancellation: CancellationToken,
214 ) -> Self {
215 Self {
216 persistence,
217 workflow_run_id,
218 workflow_name,
219 action_registry,
220 item_provider_registry,
221 script_env_provider,
222 run_ctx,
223 extra_plugin_dirs: vec![],
224 model: None,
225 exec_config: WorkflowExecConfig::default(),
226 inputs: HashMap::new(),
227 parent_run_id: String::new(),
228 depth: 0,
229 target_label: None,
230 default_as_identity: None,
231 triggered_by_hook: false,
232 schema_resolver: None,
233 child_runner: None,
234 cancellation,
235 event_sinks: vec![],
236 }
237 }
238}
239
240impl ChildRunInput {
241 pub fn new(
245 workflow_run_id: String,
246 persistence: Arc<dyn WorkflowPersistence>,
247 action_registry: Arc<ActionRegistry>,
248 item_provider_registry: Arc<ItemProviderRegistry>,
249 script_env_provider: Arc<dyn ScriptEnvProvider>,
250 depth: u32,
251 cancellation: CancellationToken,
252 ) -> Self {
253 Self {
254 workflow_run_id,
255 persistence,
256 action_registry,
257 item_provider_registry,
258 script_env_provider,
259 child_runner: None,
260 schema_resolver: None,
261 as_identity: None,
262 depth,
263 cancellation,
264 target_label: None,
265 triggered_by_hook: false,
266 inputs_override: None,
267 }
268 }
269}
270
271#[allow(clippy::too_many_arguments, clippy::type_complexity)]
274fn make_fresh_execution_state(
275 persistence: Arc<dyn WorkflowPersistence>,
276 action_registry: Arc<ActionRegistry>,
277 item_provider_registry: Arc<ItemProviderRegistry>,
278 script_env_provider: Arc<dyn ScriptEnvProvider>,
279 workflow_run_id: String,
280 workflow_name: String,
281 run_ctx: Arc<dyn RunContext>,
282 extra_plugin_dirs: Vec<String>,
283 model: Option<String>,
284 exec_config: WorkflowExecConfig,
285 inputs: HashMap<String, String>,
286 parent_run_id: String,
287 depth: u32,
288 target_label: Option<String>,
289 default_as_identity: Option<String>,
290 triggered_by_hook: bool,
291 #[allow(clippy::type_complexity)] schema_resolver: Option<
292 Arc<dyn Fn(&str) -> crate::engine_error::Result<OutputSchema> + Send + Sync>,
293 >,
294 child_runner: Option<Arc<dyn ChildWorkflowRunner>>,
295 cancellation: CancellationToken,
296 event_sinks: Arc<[Arc<dyn EventSink>]>,
297) -> ExecutionState {
298 ExecutionState {
299 persistence,
300 action_registry,
301 script_env_provider,
302 workflow_run_id,
303 workflow_name,
304 run_ctx,
305 extra_plugin_dirs,
306 model,
307 exec_config,
308 inputs,
309 parent_run_id,
310 depth,
311 target_label,
312 default_as_identity,
313 triggered_by_hook,
314 schema_resolver,
315 child_runner,
316 cancellation,
317 event_sinks,
318 registry: item_provider_registry,
319 step_results: HashMap::new(),
321 contexts: vec![],
322 position: 0,
323 all_succeeded: true,
324 total_cost: 0.0,
325 total_turns: 0,
326 total_duration_ms: 0,
327 total_input_tokens: 0,
328 total_output_tokens: 0,
329 total_cache_read_input_tokens: 0,
330 total_cache_creation_input_tokens: 0,
331 has_llm_metrics: false,
332 last_gate_feedback: None,
333 block_output: None,
334 block_with: vec![],
335 resume_ctx: None,
336 last_heartbeat_at: ExecutionState::new_heartbeat(),
337 current_execution_id: Arc::new(Mutex::new(None)),
338 owner_token: None,
339 lease_generation: None,
340 }
341}
342
343impl FlowEngine {
344 pub fn validate(&self, def: &WorkflowDef) -> Result<(), Vec<ValidationError>> {
363 self.validate_with_registries(
364 &self.action_registry,
365 &self.item_provider_registry,
366 &self.gate_resolver_registry,
367 def,
368 )
369 }
370
371 fn build_event_sinks(&self, extra: &[Arc<dyn EventSink>]) -> Arc<[Arc<dyn EventSink>]> {
373 let mut sinks = self.event_sinks.clone();
374 sinks.extend_from_slice(extra);
375 Arc::from(sinks)
376 }
377
378 fn run_inner(
381 &self,
382 def: &WorkflowDef,
383 state: &mut ExecutionState,
384 ) -> crate::engine_error::Result<WorkflowResult> {
385 if let Err(validation_errors) = self.validate_with_registries(
386 &state.action_registry,
387 &state.registry,
388 &self.gate_resolver_registry,
389 def,
390 ) {
391 let joined = validation_errors
392 .iter()
393 .map(|e| e.to_string())
394 .collect::<Vec<_>>()
395 .join("\n");
396 return Err(EngineError::Workflow(format!(
397 "workflow '{}' failed validation:\n{}",
398 def.name, joined
399 )));
400 }
401
402 let lease_ttl_secs = state.exec_config.lease_ttl_secs;
403 let refresh_interval = state.exec_config.lease_refresh_interval;
404
405 let token = state
407 .owner_token
408 .get_or_insert_with(|| ulid::Ulid::new().to_string())
409 .as_str();
410 match state
411 .persistence
412 .acquire_lease(&state.workflow_run_id, token, lease_ttl_secs)
413 {
414 Ok(Some(gen)) => {
415 state.lease_generation = Some(gen);
416 }
417 Ok(None) => return Err(EngineError::AlreadyOwned(state.workflow_run_id.clone())),
418 Err(e) => return Err(e),
419 }
420
421 let shutdown_arc = state
423 .exec_config
424 .shutdown
425 .get_or_insert_with(|| Arc::new(AtomicBool::new(false)))
426 .clone();
427
428 let run_id = state.workflow_run_id.clone();
429
430 let refresh_stop = Arc::new(AtomicBool::new(false));
432 let refresh_handle = {
433 let ctx = RefreshContext {
434 persistence: Arc::clone(&state.persistence),
435 run_id: run_id.clone(),
436 token: state
437 .owner_token
438 .clone()
439 .expect("owner_token was just set by get_or_insert_with"),
440 ttl_seconds: lease_ttl_secs,
441 refresh_interval,
442 stop: Arc::clone(&refresh_stop),
443 cancellation: state.cancellation.clone(),
444 shutdown: Arc::clone(&shutdown_arc),
445 registry: Arc::clone(&state.action_registry),
446 exec_info: Arc::clone(&state.current_execution_id),
447 };
448 std::thread::spawn(move || refresh_lease_loop(ctx))
449 };
450 let refresh_thread = refresh_handle.thread().clone();
451
452 {
455 let mut runs = self.active_runs.lock().unwrap_or_else(|e| e.into_inner());
456 runs.insert(
457 run_id.clone(),
458 ActiveRunEntry {
459 token: state.cancellation.clone(),
460 shutdown: shutdown_arc,
461 persistence: Arc::clone(&state.persistence),
462 registry: Arc::clone(&state.action_registry),
463 exec_info: Arc::clone(&state.current_execution_id),
464 refresh_stop,
465 refresh_thread: Some(refresh_thread),
466 refresh_handle: Some(refresh_handle),
467 },
468 );
469 }
470
471 let result = run_workflow_engine(state, def);
472
473 let lease_lost_during_run = matches!(
476 state.cancellation.reason(),
477 Some(CancellationReason::LeaseLost)
478 );
479
480 let join_handle = {
482 let mut runs = self.active_runs.lock().unwrap_or_else(|e| e.into_inner());
483 runs.remove(&run_id).and_then(|entry| {
484 stop_refresh_thread(&entry.refresh_stop, entry.refresh_thread.as_ref());
485 entry.refresh_handle
486 })
487 };
488 if let Some(h) = join_handle {
490 let _ = h.join();
491 }
492
493 if lease_lost_during_run {
494 return Err(EngineError::Cancelled(CancellationReason::LeaseLost));
495 }
496
497 result
498 }
499
500 pub fn run(
511 &self,
512 def: &WorkflowDef,
513 state: &mut ExecutionState,
514 ) -> crate::engine_error::Result<WorkflowResult> {
515 state.event_sinks = self.build_event_sinks(&[]);
516 self.run_inner(def, state)
517 }
518
519 pub fn run_workflow(
525 &self,
526 def: &WorkflowDef,
527 input: RunInput,
528 ) -> crate::engine_error::Result<WorkflowResult> {
529 let event_sinks = self.build_event_sinks(&input.event_sinks);
530 let mut state = make_fresh_execution_state(
531 input.persistence,
532 input.action_registry,
533 input.item_provider_registry,
534 input.script_env_provider,
535 input.workflow_run_id,
536 input.workflow_name,
537 input.run_ctx,
538 input.extra_plugin_dirs,
539 input.model,
540 input.exec_config,
541 input.inputs,
542 input.parent_run_id,
543 input.depth,
544 input.target_label,
545 input.default_as_identity,
546 input.triggered_by_hook,
547 input.schema_resolver,
548 input.child_runner,
549 input.cancellation,
550 event_sinks,
551 );
552 self.run_inner(def, &mut state)
553 }
554
555 pub fn run_child(
566 &self,
567 def: &WorkflowDef,
568 input: ChildRunInput,
569 parent_ctx: &ChildWorkflowContext,
570 ) -> crate::engine_error::Result<WorkflowResult> {
571 let event_sinks = self.build_event_sinks(&parent_ctx.event_sinks);
572 let inputs = input
573 .inputs_override
574 .unwrap_or_else(|| parent_ctx.inputs.clone());
575 let mut state = make_fresh_execution_state(
576 input.persistence,
577 input.action_registry,
578 input.item_provider_registry,
579 input.script_env_provider,
580 input.workflow_run_id,
581 def.name.clone(),
582 Arc::clone(&parent_ctx.run_ctx),
583 parent_ctx.extra_plugin_dirs.clone(),
584 parent_ctx.model.clone(),
585 parent_ctx.exec_config.clone(),
586 inputs,
587 parent_ctx.workflow_run_id.clone(),
588 input.depth,
589 input.target_label,
590 input.as_identity,
591 input.triggered_by_hook,
592 input.schema_resolver,
593 input.child_runner,
594 input.cancellation,
595 event_sinks,
596 );
597 self.run_inner(def, &mut state)
598 }
599
600 pub fn resume(
606 &self,
607 def: &WorkflowDef,
608 state: &mut ExecutionState,
609 ) -> crate::engine_error::Result<WorkflowResult> {
610 if state.resume_ctx.is_some() {
611 return Err(EngineError::Workflow(
612 "resume() requires resume_ctx to be None on entry".to_string(),
613 ));
614 }
615
616 let token = ulid::Ulid::new().to_string();
618 let lease_ttl_secs = state.exec_config.lease_ttl_secs;
619 match state
620 .persistence
621 .acquire_lease(&state.workflow_run_id, &token, lease_ttl_secs)
622 {
623 Ok(Some(gen)) => {
624 state.owner_token = Some(token);
625 state.lease_generation = Some(gen);
626 }
627 Ok(None) => return Err(EngineError::AlreadyOwned(state.workflow_run_id.clone())),
628 Err(e) => return Err(e),
629 }
630
631 let steps = state
632 .persistence
633 .get_steps(&state.workflow_run_id)
634 .map_err(|e| {
635 EngineError::Workflow(format!(
636 "resume: failed to load steps for run '{}': {e}",
637 state.workflow_run_id
638 ))
639 })?;
640 let mut step_map: HashMap<String, HashMap<u32, WorkflowRunStep>> = HashMap::new();
641 for s in steps
642 .into_iter()
643 .filter(|s| s.status == crate::status::WorkflowStepStatus::Completed)
644 {
645 step_map
646 .entry(s.step_name.clone())
647 .or_default()
648 .insert(s.iteration as u32, s);
649 }
650 if !step_map.is_empty() {
651 state.resume_ctx = Some(crate::engine::ResumeContext { step_map });
652 }
653 self.run(def, state)
654 }
655
656 pub fn cancel_run(
664 &self,
665 run_id: &str,
666 reason: CancellationReason,
667 ) -> crate::engine_error::Result<()> {
668 let entry = {
670 let runs = self.active_runs.lock().unwrap_or_else(|e| e.into_inner());
671 runs.get(run_id).map(|e| {
672 (
673 e.token.clone(),
674 Arc::clone(&e.shutdown),
675 Arc::clone(&e.persistence),
676 Arc::clone(&e.registry),
677 Arc::clone(&e.exec_info),
678 Arc::clone(&e.refresh_stop),
679 e.refresh_thread.clone(),
680 )
681 })
682 };
683
684 let (token, shutdown, persistence, registry, exec_info, refresh_stop, refresh_thread) =
685 match entry {
686 Some(e) => e,
687 None => {
688 return Err(EngineError::Workflow(format!(
689 "cancel_run: run '{run_id}' is not active in this engine instance"
690 )))
691 }
692 };
693
694 if let Err(e) =
696 persistence.update_run_status(run_id, WorkflowRunStatus::Cancelling, None, None)
697 {
698 tracing::warn!("cancel_run: failed to mark run {run_id} as Cancelling in DB: {e}");
699 }
700
701 shutdown.store(true, Ordering::SeqCst);
703
704 token.cancel(reason);
706
707 let exec_snap = exec_info.lock().unwrap_or_else(|e| e.into_inner()).clone();
709 if let Some((exec_label, step_id)) = exec_snap {
710 std::thread::spawn(move || {
711 if let Err(e) = registry.cancel(&exec_label, &step_id) {
712 tracing::warn!(
713 "cancel_run: executor.cancel() for '{exec_label}' step '{step_id}' failed: {e}"
714 );
715 }
716 });
717 }
718
719 stop_refresh_thread(&refresh_stop, refresh_thread.as_ref());
721
722 Ok(())
723 }
724
725 fn validate_with_registries(
729 &self,
730 action_registry: &ActionRegistry,
731 item_provider_registry: &ItemProviderRegistry,
732 gate_resolver_registry: &GateResolverRegistry,
733 def: &WorkflowDef,
734 ) -> Result<(), Vec<ValidationError>> {
735 let mut errors = Vec::new();
736
737 if let Some(resolver) = &self.workflow_resolver {
740 let r = Arc::clone(resolver);
741 let root_name = def.name.clone();
742 let cycle_loader = |name: &str| -> std::result::Result<WorkflowDef, String> {
744 if name == root_name.as_str() {
745 Ok(def.clone())
746 } else {
747 r.resolve(name)
748 .map(|arc_def| (*arc_def).clone())
749 .map_err(|e| e.to_string())
750 }
751 };
752 if let Err(cycle_msg) = detect_workflow_cycles(&def.name, &cycle_loader) {
753 errors.push(ValidationError {
754 message: cycle_msg,
755 hint: None,
756 });
757 }
758 }
759
760 let ctx = ValidateCtx {
761 action_registry,
762 item_provider_registry,
763 gate_resolver_registry,
764 workflow_resolver: &self.workflow_resolver,
765 };
766 let mut visited: HashSet<String> = HashSet::new();
767 validate_workflow_sections(&ctx, &def.body, &def.always, &mut errors, &mut visited);
768
769 if errors.is_empty() {
770 Ok(())
771 } else {
772 Err(errors)
773 }
774 }
775}
776
777struct ValidateCtx<'a> {
778 action_registry: &'a ActionRegistry,
779 item_provider_registry: &'a ItemProviderRegistry,
780 gate_resolver_registry: &'a GateResolverRegistry,
781 workflow_resolver: &'a Option<Arc<dyn WorkflowResolver>>,
782}
783
784fn validate_workflow_sections(
785 ctx: &ValidateCtx<'_>,
786 body: &[WorkflowNode],
787 always: &[WorkflowNode],
788 errors: &mut Vec<ValidationError>,
789 visited: &mut HashSet<String>,
790) {
791 validate_nodes_impl(ctx, body, errors, visited);
792 validate_nodes_impl(ctx, always, errors, visited);
793}
794
795fn validate_nodes_impl(
796 ctx: &ValidateCtx<'_>,
797 nodes: &[WorkflowNode],
798 errors: &mut Vec<ValidationError>,
799 visited: &mut HashSet<String>,
800) {
801 for node in nodes {
802 match node {
803 WorkflowNode::Call(n) => {
804 let name = n.agent.label();
805 if !ctx.action_registry.has_action(name) {
806 errors.push(ValidationError {
807 message: format!(
808 "call '{}': no registered ActionExecutor for '{}'",
809 n.agent.step_key(),
810 name
811 ),
812 hint: Some(format!(
813 "register an executor named '{}' or add a fallback executor",
814 name
815 )),
816 });
817 }
818 }
819 WorkflowNode::Parallel(n) => {
820 for agent_ref in &n.calls {
821 let name = agent_ref.label();
822 if !ctx.action_registry.has_action(name) {
823 errors.push(ValidationError {
824 message: format!(
825 "parallel call '{}': no registered ActionExecutor for '{}'",
826 agent_ref.step_key(),
827 name
828 ),
829 hint: Some(format!(
830 "register an executor named '{}' or add a fallback executor",
831 name
832 )),
833 });
834 }
835 }
836 }
837 WorkflowNode::ForEach(n) => {
838 if ctx.item_provider_registry.get(&n.over).is_none() {
839 errors.push(ValidationError {
840 message: format!(
841 "foreach '{}': no registered ItemProvider for '{}'",
842 n.name, n.over
843 ),
844 hint: Some(format!(
845 "register a provider with name '{}' via FlowEngineBuilder::item_provider()",
846 n.over
847 )),
848 });
849 }
850 }
851 WorkflowNode::Gate(n) => {
852 if n.gate_type != QUALITY_GATE_TYPE {
854 let type_str = n.gate_type.as_str();
855 if !ctx.gate_resolver_registry.has_type(type_str) {
856 errors.push(ValidationError {
857 message: format!(
858 "gate '{}': no registered GateResolver for type '{}'",
859 n.name, type_str
860 ),
861 hint: Some(format!(
862 "register a resolver with gate_type() == '{}' via FlowEngineBuilder::gate_resolver()",
863 type_str
864 )),
865 });
866 }
867 }
868 }
869 WorkflowNode::CallWorkflow(n) => {
870 if !visited.contains(&n.workflow) {
871 visited.insert(n.workflow.clone());
872 if let Some(resolver) = ctx.workflow_resolver {
873 match resolver.resolve(&n.workflow).map(|d| (*d).clone()) {
874 Ok(sub_def) => {
875 let mut sub_errors = Vec::new();
876 validate_workflow_sections(
877 ctx,
878 &sub_def.body,
879 &sub_def.always,
880 &mut sub_errors,
881 visited,
882 );
883 for sub_err in sub_errors {
884 errors.push(ValidationError {
885 message: format!(
886 "in sub-workflow '{}': {}",
887 n.workflow, sub_err.message
888 ),
889 hint: sub_err.hint,
890 });
891 }
892 }
893 Err(e) => {
894 errors.push(ValidationError {
897 message: format!(
898 "call workflow '{}': sub-workflow could not be loaded: {}",
899 n.workflow, e
900 ),
901 hint: None,
902 });
903 }
904 }
905 }
906 }
907 }
908 _ => {
909 if let Some(body) = node.body() {
910 validate_nodes_impl(ctx, body, errors, visited);
911 }
912 }
913 }
914 }
915}
916
917pub struct FlowEngineBuilder {
927 named: HashMap<String, Box<dyn ActionExecutor>>,
928 fallback: Option<Box<dyn ActionExecutor>>,
929 script_env_provider: Box<dyn ScriptEnvProvider>,
930 item_providers: ItemProviderRegistry,
931 gate_resolvers: GateResolverRegistry,
932 workflow_resolver: Option<Box<dyn WorkflowResolver>>,
933 event_sinks: Vec<Arc<dyn EventSink>>,
934}
935
936impl FlowEngineBuilder {
937 pub fn new() -> Self {
938 Self {
939 named: HashMap::new(),
940 fallback: None,
941 script_env_provider: Box::new(NoOpScriptEnvProvider),
942 item_providers: ItemProviderRegistry::new(),
943 gate_resolvers: GateResolverRegistry::new(),
944 workflow_resolver: None,
945 event_sinks: Vec::new(),
946 }
947 }
948
949 #[allow(dead_code)]
951 pub fn action(mut self, executor: Box<dyn ActionExecutor>) -> Self {
952 self.named.insert(executor.name().to_string(), executor);
953 self
954 }
955
956 pub fn action_fallback(
960 mut self,
961 executor: Box<dyn ActionExecutor>,
962 ) -> Result<Self, EngineError> {
963 if self.fallback.is_some() {
964 return Err(EngineError::Workflow(
965 "action_fallback already set — only one fallback executor is allowed".to_string(),
966 ));
967 }
968 self.fallback = Some(executor);
969 Ok(self)
970 }
971
972 pub fn item_provider<P: ItemProvider + 'static>(mut self, provider: P) -> Self {
974 self.item_providers.register(provider);
975 self
976 }
977
978 pub fn gate_resolver<R: GateResolver + 'static>(mut self, resolver: R) -> Self {
980 self.gate_resolvers.register(resolver);
981 self
982 }
983
984 pub fn script_env_provider(mut self, provider: Box<dyn ScriptEnvProvider>) -> Self {
986 self.script_env_provider = provider;
987 self
988 }
989
990 pub fn workflow_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
995 self.workflow_resolver = Some(Box::new(DirectoryWorkflowResolver::new(path)));
996 self
997 }
998
999 pub fn workflow_resolver(mut self, resolver: Box<dyn WorkflowResolver>) -> Self {
1003 self.workflow_resolver = Some(resolver);
1004 self
1005 }
1006
1007 pub fn event_sink(mut self, sink: Box<dyn EventSink>) -> Self {
1010 self.event_sinks.push(Arc::from(sink));
1011 self
1012 }
1013
1014 pub fn with_event_sinks(mut self, sinks: &Arc<[Arc<dyn EventSink>]>) -> Self {
1017 self.event_sinks.extend(sinks.iter().cloned());
1018 self
1019 }
1020
1021 pub fn build(self) -> Result<FlowEngine, EngineError> {
1023 Ok(FlowEngine {
1024 action_registry: ActionRegistry::new(self.named, self.fallback),
1025 item_provider_registry: self.item_providers,
1026 gate_resolver_registry: self.gate_resolvers,
1027 script_env_provider: Arc::from(self.script_env_provider),
1028 workflow_resolver: self.workflow_resolver.map(Arc::from),
1029 event_sinks: self.event_sinks,
1030 active_runs: Mutex::new(HashMap::new()),
1031 })
1032 }
1033}
1034
1035impl Default for FlowEngineBuilder {
1036 fn default() -> Self {
1037 Self::new()
1038 }
1039}
1040
1041impl Drop for FlowEngine {
1042 fn drop(&mut self) {
1043 let entries: Vec<ActiveRunEntry> = {
1044 let mut guard = self.active_runs.lock().unwrap_or_else(|e| e.into_inner());
1045 guard.drain().map(|(_, e)| e).collect()
1046 };
1047 for entry in entries {
1048 stop_refresh_thread(&entry.refresh_stop, entry.refresh_thread.as_ref());
1050 entry.shutdown.store(true, Ordering::SeqCst);
1052 entry.token.cancel(CancellationReason::EngineShutdown);
1053 }
1054 }
1055}
1056
1057#[cfg(test)]
1058mod tests {
1059 use super::*;
1060 use crate::dsl::{
1061 ApprovalMode, CallWorkflowNode, ForEachNode, GateNode, OnChildFail, OnCycle, OnTimeout,
1062 QUALITY_GATE_TYPE,
1063 };
1064 use crate::engine_error::EngineError;
1065 use crate::test_helpers::{
1066 call_node, make_def, make_params, make_run_ctx, make_step_info, ForwardSink, VecSink,
1067 };
1068 use crate::traits::action_executor::{ActionOutput, ActionParams, StepInfo};
1069 use crate::traits::gate_resolver::{GateParams, GatePoll};
1070 use crate::traits::item_provider::{FanOutItem, ProviderInfo};
1071 use crate::traits::run_context::RunContext;
1072 use crate::workflow_resolver_memory::InMemoryWorkflowResolver;
1073 use std::collections::HashMap;
1074
1075 struct AlphaExecutor;
1078 impl ActionExecutor for AlphaExecutor {
1079 fn name(&self) -> &str {
1080 "alpha"
1081 }
1082 fn execute(
1083 &self,
1084 _ctx: &dyn RunContext,
1085 _info: &StepInfo,
1086 _params: &ActionParams,
1087 ) -> Result<ActionOutput, EngineError> {
1088 Ok(ActionOutput {
1089 markers: vec!["alpha".to_string()],
1090 ..Default::default()
1091 })
1092 }
1093 }
1094
1095 struct BetaExecutor;
1096 impl ActionExecutor for BetaExecutor {
1097 fn name(&self) -> &str {
1098 "beta"
1099 }
1100 fn execute(
1101 &self,
1102 _ctx: &dyn RunContext,
1103 _info: &StepInfo,
1104 _params: &ActionParams,
1105 ) -> Result<ActionOutput, EngineError> {
1106 Ok(ActionOutput {
1107 markers: vec!["beta".to_string()],
1108 ..Default::default()
1109 })
1110 }
1111 }
1112
1113 struct CountingExecutor {
1114 name: &'static str,
1115 count: Arc<std::sync::atomic::AtomicUsize>,
1116 }
1117 impl ActionExecutor for CountingExecutor {
1118 fn name(&self) -> &str {
1119 self.name
1120 }
1121 fn execute(
1122 &self,
1123 _: &dyn RunContext,
1124 _: &StepInfo,
1125 _: &ActionParams,
1126 ) -> Result<ActionOutput, EngineError> {
1127 self.count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1128 Ok(ActionOutput::default())
1129 }
1130 }
1131
1132 fn make_test_run(
1133 p: &Arc<crate::persistence_memory::InMemoryWorkflowPersistence>,
1134 ) -> crate::types::WorkflowRun {
1135 use crate::traits::persistence::{NewRun, WorkflowPersistence};
1136 p.create_run(NewRun {
1137 workflow_name: "wf".to_string(),
1138 parent_run_id: String::new(),
1139 dry_run: false,
1140 trigger: "manual".to_string(),
1141 definition_snapshot: None,
1142 parent_workflow_run_id: None,
1143 })
1144 .unwrap()
1145 }
1146
1147 fn make_counting_state(
1150 persistence: Arc<crate::persistence_memory::InMemoryWorkflowPersistence>,
1151 run_id: String,
1152 ) -> (
1153 Arc<std::sync::atomic::AtomicUsize>,
1154 Arc<std::sync::atomic::AtomicUsize>,
1155 crate::engine::ExecutionState,
1156 ) {
1157 let alpha_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1158 let beta_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1159 let mut m = HashMap::new();
1160 m.insert(
1161 "alpha".to_string(),
1162 Box::new(CountingExecutor {
1163 name: "alpha",
1164 count: Arc::clone(&alpha_count),
1165 }) as Box<dyn crate::traits::action_executor::ActionExecutor>,
1166 );
1167 m.insert(
1168 "beta".to_string(),
1169 Box::new(CountingExecutor {
1170 name: "beta",
1171 count: Arc::clone(&beta_count),
1172 }) as Box<dyn crate::traits::action_executor::ActionExecutor>,
1173 );
1174 let mut state = make_bare_state("wf");
1175 state.persistence = persistence;
1176 state.action_registry = Arc::new(ActionRegistry::new(m, None));
1177 state.workflow_run_id = run_id;
1178 (alpha_count, beta_count, state)
1179 }
1180
1181 struct TicketsProvider;
1182 impl crate::traits::item_provider::ItemProvider for TicketsProvider {
1183 fn name(&self) -> &str {
1184 "tickets"
1185 }
1186 fn items(
1187 &self,
1188 _ctx: &dyn RunContext,
1189 _info: &ProviderInfo,
1190 _scope: Option<&dyn std::any::Any>,
1191 _filter: &HashMap<String, String>,
1192 ) -> Result<Vec<FanOutItem>, EngineError> {
1193 Ok(vec![])
1194 }
1195 }
1196
1197 struct HumanApprovalResolver;
1198 impl crate::traits::gate_resolver::GateResolver for HumanApprovalResolver {
1199 fn gate_type(&self) -> &str {
1200 "human_approval"
1201 }
1202 fn poll(
1203 &self,
1204 _run_id: &str,
1205 _params: &GateParams,
1206 _ctx: &dyn RunContext,
1207 ) -> Result<GatePoll, EngineError> {
1208 Ok(GatePoll::Approved(None))
1209 }
1210 }
1211
1212 fn foreach_node(step: &str, over: &str) -> WorkflowNode {
1215 WorkflowNode::ForEach(ForEachNode {
1216 name: step.to_string(),
1217 over: over.to_string(),
1218 scope: None,
1219 filter: HashMap::new(),
1220 ordered: false,
1221 on_cycle: OnCycle::Fail,
1222 max_parallel: 1,
1223 workflow: "child_wf".to_string(),
1224 inputs: HashMap::new(),
1225 on_child_fail: OnChildFail::Halt,
1226 })
1227 }
1228
1229 fn gate_node(name: &str, gate_type: &str) -> WorkflowNode {
1230 WorkflowNode::Gate(GateNode {
1231 name: name.to_string(),
1232 gate_type: gate_type.to_string(),
1233 prompt: None,
1234 min_approvals: 1,
1235 approval_mode: ApprovalMode::default(),
1236 timeout_secs: 0,
1237 on_timeout: OnTimeout::Fail,
1238 as_identity: None,
1239 quality_gate: None,
1240 options: None,
1241 })
1242 }
1243
1244 #[test]
1247 fn build_with_named_executor() {
1248 let engine = FlowEngineBuilder::new()
1249 .action(Box::new(AlphaExecutor))
1250 .build()
1251 .unwrap();
1252 let ctx = make_run_ctx();
1253 let info = make_step_info();
1254 let output = engine
1255 .action_registry
1256 .dispatch("alpha", ctx.as_ref(), &info, &make_params("alpha"))
1257 .unwrap();
1258 assert_eq!(output.markers, vec!["alpha"]);
1259 }
1260
1261 #[test]
1262 fn build_with_fallback() {
1263 let engine = FlowEngineBuilder::new()
1264 .action_fallback(Box::new(BetaExecutor))
1265 .unwrap()
1266 .build()
1267 .unwrap();
1268 let ctx = make_run_ctx();
1269 let info = make_step_info();
1270 let output = engine
1271 .action_registry
1272 .dispatch("anything", ctx.as_ref(), &info, &make_params("anything"))
1273 .unwrap();
1274 assert_eq!(output.markers, vec!["beta"]);
1275 }
1276
1277 #[test]
1278 fn named_takes_precedence_over_fallback() {
1279 let engine = FlowEngineBuilder::new()
1280 .action(Box::new(AlphaExecutor))
1281 .action_fallback(Box::new(BetaExecutor))
1282 .unwrap()
1283 .build()
1284 .unwrap();
1285 let ctx = make_run_ctx();
1286 let info = make_step_info();
1287 let output = engine
1288 .action_registry
1289 .dispatch("alpha", ctx.as_ref(), &info, &make_params("alpha"))
1290 .unwrap();
1291 assert_eq!(output.markers, vec!["alpha"]);
1292 }
1293
1294 #[test]
1295 fn second_action_fallback_returns_err() {
1296 let result = FlowEngineBuilder::new()
1297 .action_fallback(Box::new(AlphaExecutor))
1298 .unwrap()
1299 .action_fallback(Box::new(BetaExecutor));
1300 assert!(result.is_err(), "second action_fallback should return Err");
1301 }
1302
1303 #[test]
1304 fn custom_script_env_provider_is_stored_in_bundle() {
1305 struct FixedEnvProvider;
1306 impl ScriptEnvProvider for FixedEnvProvider {
1307 fn env(
1308 &self,
1309 _ctx: &dyn RunContext,
1310 _as_identity: Option<&str>,
1311 ) -> HashMap<String, String> {
1312 let mut m = HashMap::new();
1313 m.insert("CUSTOM_VAR".to_string(), "42".to_string());
1314 m
1315 }
1316 }
1317
1318 let engine = FlowEngineBuilder::new()
1319 .script_env_provider(Box::new(FixedEnvProvider))
1320 .build()
1321 .unwrap();
1322
1323 let env = engine
1324 .script_env_provider
1325 .env(&crate::traits::run_context::NoopRunContext::default(), None);
1326 assert_eq!(env.get("CUSTOM_VAR").map(String::as_str), Some("42"));
1327 }
1328
1329 #[test]
1333 fn validate_missing_action_name_produces_error() {
1334 let def = make_def("wf", vec![call_node("missing_agent")]);
1335 let engine = FlowEngineBuilder::new().build().unwrap();
1336
1337 let errors = engine.validate(&def).unwrap_err();
1338 assert!(
1339 !errors.is_empty(),
1340 "expected at least one error for missing action"
1341 );
1342 assert!(
1343 errors.iter().any(|e| e.message.contains("missing_agent")),
1344 "error should name the missing executor; got: {:?}",
1345 errors
1346 );
1347 }
1348
1349 #[test]
1351 fn validate_missing_item_provider_produces_error() {
1352 let def = make_def("wf", vec![foreach_node("items", "tickets")]);
1353 let engine = FlowEngineBuilder::new().build().unwrap();
1354
1355 let errors = engine.validate(&def).unwrap_err();
1356 assert!(
1357 errors.iter().any(|e| e.message.contains("tickets")),
1358 "error should mention the missing provider name; got: {:?}",
1359 errors
1360 );
1361 }
1362
1363 #[test]
1365 fn validate_missing_gate_type_produces_error() {
1366 let def = make_def("wf", vec![gate_node("approval", "human_approval")]);
1367 let engine = FlowEngineBuilder::new().build().unwrap();
1368
1369 let errors = engine.validate(&def).unwrap_err();
1370 assert!(
1371 errors.iter().any(|e| e.message.contains("human_approval")),
1372 "error should mention the missing gate type; got: {:?}",
1373 errors
1374 );
1375 }
1376
1377 #[test]
1379 fn validate_quality_gate_does_not_require_resolver() {
1380 use crate::dsl::{GateNode, OnFailAction, OnTimeout, QualityGateConfig};
1381 let gate = WorkflowNode::Gate(GateNode {
1382 name: "qg".to_string(),
1383 gate_type: QUALITY_GATE_TYPE.to_string(),
1384 prompt: None,
1385 min_approvals: 1,
1386 approval_mode: ApprovalMode::default(),
1387 timeout_secs: 0,
1388 on_timeout: OnTimeout::Fail,
1389 as_identity: None,
1390 quality_gate: Some(QualityGateConfig {
1391 source: "step1".to_string(),
1392 threshold: 80,
1393 on_fail_action: OnFailAction::Fail,
1394 }),
1395 options: None,
1396 });
1397 let def = make_def("wf", vec![gate]);
1400 let engine = FlowEngineBuilder::new().build().unwrap();
1401 let result = engine.validate(&def);
1403 if let Err(errors) = result {
1406 assert!(
1407 !errors.iter().any(|e| e.message.contains("quality_gate")),
1408 "QualityGate should not produce a resolver error; got: {:?}",
1409 errors
1410 );
1411 }
1412 }
1413
1414 #[test]
1416 fn validate_valid_workflow_passes() {
1417 let def = make_def(
1418 "wf",
1419 vec![
1420 call_node("alpha"),
1421 foreach_node("items", "tickets"),
1422 gate_node("approval", "human_approval"),
1423 ],
1424 );
1425 let engine = FlowEngineBuilder::new()
1426 .action(Box::new(AlphaExecutor))
1427 .item_provider(TicketsProvider)
1428 .gate_resolver(HumanApprovalResolver)
1429 .build()
1430 .unwrap();
1431
1432 assert!(
1433 engine.validate(&def).is_ok(),
1434 "all registrations present — validation should pass"
1435 );
1436 }
1437
1438 #[test]
1440 fn validate_open_registry_custom_gate_type_accepted() {
1441 struct SlackReactionResolver;
1442 impl crate::traits::gate_resolver::GateResolver for SlackReactionResolver {
1443 fn gate_type(&self) -> &str {
1444 "slack_reaction"
1445 }
1446 fn poll(
1447 &self,
1448 _run_id: &str,
1449 _params: &GateParams,
1450 _ctx: &dyn RunContext,
1451 ) -> Result<GatePoll, EngineError> {
1452 Ok(GatePoll::Approved(None))
1453 }
1454 }
1455
1456 let def = make_def("wf", vec![gate_node("notify", "slack_reaction")]);
1457 let engine = FlowEngineBuilder::new()
1458 .gate_resolver(SlackReactionResolver)
1459 .build()
1460 .unwrap();
1461
1462 assert!(
1463 engine.validate(&def).is_ok(),
1464 "registered slack_reaction resolver should satisfy validation"
1465 );
1466 }
1467
1468 #[test]
1470 fn validate_unregistered_gate_type_produces_error() {
1471 let def = make_def("wf", vec![gate_node("g", "fictional_type")]);
1472 let engine = FlowEngineBuilder::new().build().unwrap();
1473
1474 let errors = engine.validate(&def).unwrap_err();
1475 assert!(
1476 errors.iter().any(|e| e.message.contains("fictional_type")),
1477 "error should mention the unregistered gate type; got: {:?}",
1478 errors
1479 );
1480 }
1481
1482 #[test]
1484 fn validate_sub_workflow_errors_have_path_prefix() {
1485 let sub_def = make_def("sub_wf", vec![call_node("missing_in_sub")]);
1486 let engine = FlowEngineBuilder::new()
1487 .workflow_resolver(Box::new(InMemoryWorkflowResolver::new([(
1488 "sub_wf", sub_def,
1489 )])))
1490 .build()
1491 .unwrap();
1492
1493 let root_def = make_def(
1494 "root",
1495 vec![WorkflowNode::CallWorkflow(CallWorkflowNode {
1496 workflow: "sub_wf".to_string(),
1497 inputs: HashMap::new(),
1498 retries: 0,
1499 on_fail: None,
1500 as_identity: None,
1501 })],
1502 );
1503
1504 let errors = engine.validate(&root_def).unwrap_err();
1505 assert!(
1506 errors
1507 .iter()
1508 .any(|e| e.message.contains("in sub-workflow") && e.message.contains("sub_wf")),
1509 "sub-workflow errors should be prefixed with the sub-workflow name; got: {:?}",
1510 errors
1511 );
1512 assert!(
1513 errors.iter().any(|e| e.message.contains("missing_in_sub")),
1514 "error should mention the missing executor from the sub-workflow; got: {:?}",
1515 errors
1516 );
1517 }
1518
1519 #[test]
1521 fn validate_cycle_detection_triggers_error() {
1522 let cycle_def = make_def(
1524 "cycle_wf",
1525 vec![WorkflowNode::CallWorkflow(CallWorkflowNode {
1526 workflow: "cycle_wf".to_string(),
1527 inputs: HashMap::new(),
1528 retries: 0,
1529 on_fail: None,
1530 as_identity: None,
1531 })],
1532 );
1533 let engine = FlowEngineBuilder::new()
1534 .workflow_resolver(Box::new(InMemoryWorkflowResolver::new([(
1535 "cycle_wf",
1536 cycle_def.clone(),
1537 )])))
1538 .build()
1539 .unwrap();
1540
1541 let errors = engine.validate(&cycle_def).unwrap_err();
1542 assert!(
1543 errors
1544 .iter()
1545 .any(|e| e.message.contains("Circular") || e.message.contains("cycle")),
1546 "cycle detection should produce an error; got: {:?}",
1547 errors
1548 );
1549 }
1550
1551 #[test]
1553 fn resolver_returns_not_found_error_for_missing_sub_workflow() {
1554 let engine = FlowEngineBuilder::new()
1555 .workflow_resolver(Box::new(InMemoryWorkflowResolver::new(
1556 [] as [(String, WorkflowDef); 0]
1557 )))
1558 .build()
1559 .unwrap();
1560
1561 let root_def = make_def(
1562 "root",
1563 vec![WorkflowNode::CallWorkflow(CallWorkflowNode {
1564 workflow: "missing_sub".to_string(),
1565 inputs: HashMap::new(),
1566 retries: 0,
1567 on_fail: None,
1568 as_identity: None,
1569 })],
1570 );
1571
1572 let errors = engine.validate(&root_def).unwrap_err();
1573 assert!(
1574 errors.iter().any(|e| e.message.contains("missing_sub")),
1575 "error should mention the missing sub-workflow name; got: {:?}",
1576 errors
1577 );
1578 }
1579
1580 #[test]
1582 fn inmemory_resolver_sufficient_for_full_validation() {
1583 let sub_def = make_def("sub_wf", vec![call_node("alpha")]);
1584 let engine = FlowEngineBuilder::new()
1585 .action(Box::new(AlphaExecutor))
1586 .workflow_resolver(Box::new(InMemoryWorkflowResolver::new([(
1587 "sub_wf", sub_def,
1588 )])))
1589 .build()
1590 .unwrap();
1591
1592 let root_def = make_def(
1593 "root",
1594 vec![WorkflowNode::CallWorkflow(CallWorkflowNode {
1595 workflow: "sub_wf".to_string(),
1596 inputs: HashMap::new(),
1597 retries: 0,
1598 on_fail: None,
1599 as_identity: None,
1600 })],
1601 );
1602
1603 assert!(
1604 engine.validate(&root_def).is_ok(),
1605 "InMemoryWorkflowResolver alone should be sufficient for full validation"
1606 );
1607 }
1608
1609 fn make_bare_state(wf_name: &str) -> crate::engine::ExecutionState {
1611 use crate::cancellation::CancellationToken;
1612 use crate::engine::ExecutionState;
1613 use crate::persistence_memory::InMemoryWorkflowPersistence;
1614 use crate::traits::run_context::NoopRunContext;
1615 use crate::traits::script_env_provider::NoOpScriptEnvProvider;
1616 use crate::types::WorkflowExecConfig;
1617 let persistence = InMemoryWorkflowPersistence::new();
1618 persistence.seed_run("test-run");
1619 ExecutionState {
1620 persistence: Arc::new(persistence),
1621 action_registry: Arc::new(ActionRegistry::new(HashMap::new(), None)),
1622 script_env_provider: Arc::new(NoOpScriptEnvProvider),
1623 workflow_run_id: "test-run".to_string(),
1624 workflow_name: wf_name.to_string(),
1625 run_ctx: Arc::new(NoopRunContext::default())
1626 as Arc<dyn crate::traits::run_context::RunContext>,
1627 extra_plugin_dirs: vec![],
1628 model: None,
1629 exec_config: WorkflowExecConfig::default(),
1630 inputs: HashMap::new(),
1631 parent_run_id: String::new(),
1632 depth: 0,
1633 target_label: None,
1634 step_results: HashMap::new(),
1635 contexts: vec![],
1636 position: 0,
1637 all_succeeded: true,
1638 total_cost: 0.0,
1639 total_turns: 0,
1640 total_duration_ms: 0,
1641 total_input_tokens: 0,
1642 total_output_tokens: 0,
1643 total_cache_read_input_tokens: 0,
1644 total_cache_creation_input_tokens: 0,
1645 has_llm_metrics: false,
1646 last_gate_feedback: None,
1647 block_output: None,
1648 block_with: vec![],
1649 resume_ctx: None,
1650 default_as_identity: None,
1651 triggered_by_hook: false,
1652 schema_resolver: None,
1653 child_runner: None,
1654 last_heartbeat_at: ExecutionState::new_heartbeat(),
1655 registry: Arc::new(ItemProviderRegistry::new()),
1656 event_sinks: Arc::from(vec![]),
1657 cancellation: CancellationToken::new(),
1658 current_execution_id: Arc::new(std::sync::Mutex::new(None)),
1659 owner_token: None,
1660 lease_generation: None,
1661 }
1662 }
1663
1664 #[test]
1667 fn run_validates_against_state_registries_not_engine() {
1668 let def = make_def("wf", vec![call_node("alpha")]);
1669 let engine = FlowEngineBuilder::new()
1671 .action(Box::new(AlphaExecutor))
1672 .build()
1673 .unwrap();
1674 assert!(
1675 engine.validate(&def).is_ok(),
1676 "engine validate() should pass"
1677 );
1678
1679 let mut state = make_bare_state("wf");
1681
1682 let result = engine.run(&def, &mut state);
1683 assert!(
1684 result.is_err(),
1685 "run() must reject when state action registry lacks 'alpha'"
1686 );
1687 assert_eq!(state.position, 0, "no side effects on rejection");
1688 }
1689
1690 #[test]
1693 fn run_validates_item_provider_against_state_registry_not_engine() {
1694 let def = make_def("wf", vec![foreach_node("items", "tickets")]);
1695 let engine = FlowEngineBuilder::new()
1697 .item_provider(TicketsProvider)
1698 .build()
1699 .unwrap();
1700 assert!(
1701 engine.validate(&def).is_ok(),
1702 "engine validate() should pass for tickets provider"
1703 );
1704
1705 let mut state = make_bare_state("wf");
1707
1708 let result = engine.run(&def, &mut state);
1709 assert!(
1710 result.is_err(),
1711 "run() must reject when state item-provider registry lacks 'tickets'"
1712 );
1713 assert_eq!(state.position, 0, "no side effects on rejection");
1714 }
1715
1716 #[test]
1718 fn run_rejects_invalid_workflow_before_side_effects() {
1719 let def = make_def("wf", vec![call_node("unregistered_agent")]);
1720 let engine = FlowEngineBuilder::new().build().unwrap();
1721
1722 let mut state = make_bare_state("wf");
1723
1724 let result = engine.run(&def, &mut state);
1725 assert!(result.is_err(), "run() must reject an invalid workflow");
1726 let err = result.unwrap_err().to_string();
1727 assert!(
1728 err.contains("validation"),
1729 "error should mention validation; got: {err}"
1730 );
1731 assert_eq!(
1732 state.position, 0,
1733 "no side effects: position must be unchanged when validation fails"
1734 );
1735 assert!(
1736 state.step_results.is_empty(),
1737 "no side effects: step_results must be empty when validation fails"
1738 );
1739 }
1740
1741 use crate::events::{EngineEvent, EngineEventData, EventSink};
1746 use crate::persistence_memory::InMemoryWorkflowPersistence;
1747
1748 struct PanicSink;
1750
1751 impl EventSink for PanicSink {
1752 fn emit(&self, _event: &EngineEventData) {
1753 panic!("intentional sink panic");
1754 }
1755 }
1756
1757 fn make_single_step_def() -> WorkflowDef {
1759 make_def("wf", vec![call_node("alpha")])
1760 }
1761
1762 fn make_state_with_persistence(wf_name: &str) -> crate::engine::ExecutionState {
1764 use crate::traits::persistence::{NewRun, WorkflowPersistence};
1765
1766 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
1767 let run = persistence
1769 .create_run(NewRun {
1770 workflow_name: wf_name.to_string(),
1771 parent_run_id: String::new(),
1772 dry_run: false,
1773 trigger: "manual".to_string(),
1774 definition_snapshot: None,
1775 parent_workflow_run_id: None,
1776 })
1777 .unwrap();
1778
1779 let mut state = make_bare_state(wf_name);
1780 state.persistence = persistence;
1781 state.action_registry = Arc::new(ActionRegistry::new(
1782 {
1783 let mut m = HashMap::new();
1784 m.insert(
1785 "alpha".to_string(),
1786 Box::new(AlphaExecutor)
1787 as Box<dyn crate::traits::action_executor::ActionExecutor>,
1788 );
1789 m
1790 },
1791 None,
1792 ));
1793 state.workflow_run_id = run.id;
1794 state
1795 }
1796
1797 #[test]
1799 fn event_sinks_multi_sink_ordering() {
1800 let sink_a = VecSink::new();
1801 let sink_b = VecSink::new();
1802
1803 let sink_a_clone = Arc::clone(&sink_a);
1804 let sink_b_clone = Arc::clone(&sink_b);
1805
1806 let engine = FlowEngineBuilder::new()
1807 .action(Box::new(AlphaExecutor))
1808 .event_sink(Box::new(ForwardSink(sink_a_clone)))
1809 .event_sink(Box::new(ForwardSink(sink_b_clone)))
1810 .build()
1811 .unwrap();
1812
1813 let def = make_single_step_def();
1814 let mut state = make_state_with_persistence("wf");
1815 let result = engine.run(&def, &mut state);
1816 assert!(result.is_ok(), "run should succeed: {:?}", result);
1817
1818 let events_a = sink_a.collected();
1819 let events_b = sink_b.collected();
1820 assert!(!events_a.is_empty(), "sink_a should have received events");
1821 assert_eq!(
1822 events_a.len(),
1823 events_b.len(),
1824 "both sinks should receive the same number of events"
1825 );
1826 let has_run_started = events_a
1828 .iter()
1829 .any(|e| matches!(e.event, EngineEvent::RunStarted { .. }));
1830 let has_run_completed = events_a
1831 .iter()
1832 .any(|e| matches!(e.event, EngineEvent::RunCompleted { .. }));
1833 assert!(has_run_started, "should have RunStarted event");
1834 assert!(has_run_completed, "should have RunCompleted event");
1835 }
1836
1837 #[test]
1839 fn with_event_sinks_accumulates_sinks() {
1840 let sink_a = VecSink::new();
1841 let sink_b = VecSink::new();
1842
1843 let pre_built: Arc<[Arc<dyn EventSink>]> = Arc::from(vec![
1844 Arc::clone(&sink_a) as Arc<dyn EventSink>,
1845 Arc::clone(&sink_b) as Arc<dyn EventSink>,
1846 ]);
1847
1848 let engine = FlowEngineBuilder::new()
1849 .action(Box::new(AlphaExecutor))
1850 .with_event_sinks(&pre_built)
1851 .build()
1852 .unwrap();
1853
1854 let def = make_single_step_def();
1855 let mut state = make_state_with_persistence("wf");
1856 let result = engine.run(&def, &mut state);
1857 assert!(result.is_ok(), "run should succeed: {:?}", result);
1858
1859 let events_a = sink_a.collected();
1860 let events_b = sink_b.collected();
1861 assert!(
1862 !events_a.is_empty(),
1863 "sink_a registered via with_event_sinks should receive events"
1864 );
1865 assert_eq!(
1866 events_a.len(),
1867 events_b.len(),
1868 "both sinks should receive the same number of events"
1869 );
1870 assert!(
1871 events_a
1872 .iter()
1873 .any(|e| matches!(e.event, EngineEvent::RunStarted { .. })),
1874 "should have RunStarted event"
1875 );
1876 }
1877
1878 #[test]
1880 fn event_sink_and_with_event_sinks_both_accumulate() {
1881 let sink_a = VecSink::new();
1882 let sink_b = VecSink::new();
1883 let sink_c = VecSink::new();
1884
1885 let pre_built: Arc<[Arc<dyn EventSink>]> = Arc::from(vec![
1886 Arc::clone(&sink_b) as Arc<dyn EventSink>,
1887 Arc::clone(&sink_c) as Arc<dyn EventSink>,
1888 ]);
1889
1890 let engine = FlowEngineBuilder::new()
1891 .action(Box::new(AlphaExecutor))
1892 .event_sink(Box::new(ForwardSink(Arc::clone(&sink_a))))
1893 .with_event_sinks(&pre_built)
1894 .with_event_sinks(&pre_built) .build()
1896 .unwrap();
1897
1898 let def = make_single_step_def();
1899 let mut state = make_state_with_persistence("wf");
1900 engine.run(&def, &mut state).unwrap();
1901
1902 assert!(
1904 !sink_a.collected().is_empty(),
1905 "event_sink sink should receive events"
1906 );
1907 assert_eq!(
1908 sink_b.collected().len(),
1909 sink_a.collected().len() * 2,
1910 "sink_b registered twice via with_event_sinks should receive 2x events"
1911 );
1912 assert_eq!(
1913 sink_b.collected().len(),
1914 sink_c.collected().len(),
1915 "both with_event_sinks sinks should receive the same count"
1916 );
1917 }
1918
1919 #[test]
1921 fn event_sinks_panic_safety() {
1922 let good_sink = VecSink::new();
1923 let good_sink_clone = Arc::clone(&good_sink);
1924
1925 let engine = FlowEngineBuilder::new()
1926 .action(Box::new(AlphaExecutor))
1927 .event_sink(Box::new(PanicSink))
1928 .event_sink(Box::new(ForwardSink(good_sink_clone)))
1929 .build()
1930 .unwrap();
1931
1932 let def = make_single_step_def();
1933 let mut state = make_state_with_persistence("wf");
1934 let result = engine.run(&def, &mut state);
1935 assert!(result.is_ok(), "run should succeed despite panicking sink");
1936
1937 let events = good_sink.collected();
1938 assert!(
1939 !events.is_empty(),
1940 "good sink should still receive events after panicking sink"
1941 );
1942 }
1943
1944 #[test]
1946 fn event_sink_integration_sequence() {
1947 let sink = VecSink::new();
1948 let sink_clone = Arc::clone(&sink);
1949
1950 let engine = FlowEngineBuilder::new()
1951 .action(Box::new(AlphaExecutor))
1952 .event_sink(Box::new(ForwardSink(sink_clone)))
1953 .build()
1954 .unwrap();
1955
1956 let def = make_single_step_def();
1957 let mut state = make_state_with_persistence("wf");
1958 let result = engine.run(&def, &mut state);
1959 assert!(result.is_ok(), "run should succeed: {:?}", result);
1960
1961 let events = sink.collected();
1962 let kinds: Vec<&str> = events
1963 .iter()
1964 .map(|e| match &e.event {
1965 EngineEvent::RunStarted { .. } => "RunStarted",
1966 EngineEvent::RunCompleted { .. } => "RunCompleted",
1967 EngineEvent::RunResumed { .. } => "RunResumed",
1968 EngineEvent::RunCancelled { .. } => "RunCancelled",
1969 EngineEvent::StepStarted { .. } => "StepStarted",
1970 EngineEvent::StepCompleted { .. } => "StepCompleted",
1971 EngineEvent::StepRetrying { .. } => "StepRetrying",
1972 EngineEvent::GateWaiting { .. } => "GateWaiting",
1973 EngineEvent::GateResolved { .. } => "GateResolved",
1974 EngineEvent::FanOutItemsCollected { .. } => "FanOutItemsCollected",
1975 EngineEvent::FanOutItemStarted { .. } => "FanOutItemStarted",
1976 EngineEvent::FanOutItemCompleted { .. } => "FanOutItemCompleted",
1977 EngineEvent::MetricsUpdated { .. } => "MetricsUpdated",
1978 EngineEvent::Panicked { .. } => "Panicked",
1979 })
1980 .collect();
1981
1982 assert_eq!(kinds[0], "RunStarted", "first event should be RunStarted");
1983 assert!(
1984 kinds.contains(&"StepStarted"),
1985 "should have StepStarted; got: {:?}",
1986 kinds
1987 );
1988 assert!(
1989 kinds.contains(&"StepCompleted"),
1990 "should have StepCompleted; got: {:?}",
1991 kinds
1992 );
1993 assert!(
1994 kinds.contains(&"MetricsUpdated"),
1995 "should have MetricsUpdated; got: {:?}",
1996 kinds
1997 );
1998 let last = kinds.last().unwrap();
1999 assert_eq!(*last, "RunCompleted", "last event should be RunCompleted");
2000 }
2001
2002 struct FailingExecutor;
2008 impl ActionExecutor for FailingExecutor {
2009 fn name(&self) -> &str {
2010 "failing"
2011 }
2012 fn execute(
2013 &self,
2014 _ctx: &dyn RunContext,
2015 _info: &StepInfo,
2016 _params: &ActionParams,
2017 ) -> Result<ActionOutput, EngineError> {
2018 Err(EngineError::Workflow("intentional failure".to_string()))
2019 }
2020 }
2021
2022 #[test]
2024 fn cancel_run_marks_cancelling_in_db() {
2025 use crate::persistence_memory::InMemoryWorkflowPersistence;
2026 use crate::status::WorkflowRunStatus;
2027 use crate::traits::persistence::WorkflowPersistence;
2028
2029 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2030 let run = make_test_run(&persistence);
2031 persistence
2032 .update_run_status(&run.id, WorkflowRunStatus::Running, None, None)
2033 .unwrap();
2034
2035 let engine = FlowEngineBuilder::new().build().unwrap();
2036
2037 {
2039 let mut runs = engine.active_runs.lock().unwrap_or_else(|e| e.into_inner());
2040 runs.insert(
2041 run.id.clone(),
2042 ActiveRunEntry {
2043 token: crate::cancellation::CancellationToken::new(),
2044 shutdown: Arc::new(AtomicBool::new(false)),
2045 persistence: Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>,
2046 registry: Arc::new(ActionRegistry::new(HashMap::new(), None)),
2047 exec_info: Arc::new(Mutex::new(None)),
2048 refresh_stop: Arc::new(AtomicBool::new(false)),
2049 refresh_thread: None,
2050 refresh_handle: None,
2051 },
2052 );
2053 }
2054
2055 engine
2056 .cancel_run(&run.id, CancellationReason::UserRequested(None))
2057 .unwrap();
2058
2059 let updated = persistence.get_run(&run.id).unwrap().unwrap();
2060 assert_eq!(
2061 updated.status,
2062 WorkflowRunStatus::Cancelling,
2063 "DB status should be Cancelling after cancel_run"
2064 );
2065 }
2066
2067 #[test]
2069 fn cancel_run_returns_err_for_unknown_run() {
2070 let engine = FlowEngineBuilder::new().build().unwrap();
2071 let result = engine.cancel_run("nonexistent-run", CancellationReason::UserRequested(None));
2072 assert!(result.is_err(), "cancel_run on unknown run must return Err");
2073 }
2074
2075 #[test]
2077 fn pre_cancelled_token_causes_immediate_failure() {
2078 let engine = FlowEngineBuilder::new()
2079 .action(Box::new(AlphaExecutor))
2080 .build()
2081 .unwrap();
2082 let def = make_def("wf", vec![call_node("alpha")]);
2083 let mut state = make_state_with_persistence("wf");
2084
2085 state
2087 .cancellation
2088 .cancel(CancellationReason::UserRequested(None));
2089
2090 let result = engine.run(&def, &mut state);
2092 let did_not_succeed = match result {
2093 Ok(wr) => !wr.all_succeeded,
2094 Err(_) => true,
2095 };
2096 assert!(
2097 did_not_succeed,
2098 "run with pre-cancelled token should not succeed"
2099 );
2100 }
2101
2102 #[test]
2104 fn parallel_fail_fast_skips_remaining_branches() {
2105 use crate::dsl::{ParallelNode, WorkflowNode};
2106 use crate::persistence_memory::InMemoryWorkflowPersistence;
2107
2108 let engine = FlowEngineBuilder::new()
2110 .action(Box::new(AlphaExecutor))
2111 .action(Box::new(FailingExecutor))
2112 .build()
2113 .unwrap();
2114
2115 let parallel = WorkflowNode::Parallel(ParallelNode {
2116 fail_fast: true,
2117 min_success: None,
2118 calls: vec![
2119 crate::dsl::AgentRef::Name("failing".to_string()),
2120 crate::dsl::AgentRef::Name("alpha".to_string()),
2121 crate::dsl::AgentRef::Name("alpha".to_string()),
2122 ],
2123 output: None,
2124 call_outputs: HashMap::new(),
2125 with: vec![],
2126 call_with: HashMap::new(),
2127 call_if: HashMap::new(),
2128 call_retries: HashMap::new(),
2129 });
2130
2131 let def = make_def("wf", vec![parallel]);
2132
2133 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2134 let run = make_test_run(&persistence);
2135 let persistence: Arc<dyn crate::traits::persistence::WorkflowPersistence> = persistence;
2136
2137 let mut m = HashMap::new();
2139 m.insert(
2140 "alpha".to_string(),
2141 Box::new(AlphaExecutor) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2142 );
2143 m.insert(
2144 "failing".to_string(),
2145 Box::new(FailingExecutor) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2146 );
2147 let mut state = make_bare_state("wf");
2148 state.persistence = Arc::clone(&persistence);
2149 state.action_registry = Arc::new(ActionRegistry::new(m, None));
2150 state.workflow_run_id = run.id.clone();
2151
2152 engine.run(&def, &mut state).ok(); let steps = persistence.get_steps(&run.id).unwrap();
2159 let failed = steps
2160 .iter()
2161 .filter(|s| s.status == crate::status::WorkflowStepStatus::Failed)
2162 .count();
2163 assert!(
2164 failed >= 1,
2165 "at least the first (failing) branch should be Failed; got steps: {:?}",
2166 steps
2167 );
2168 }
2169
2170 #[test]
2172 fn step_timeout_marks_timed_out() {
2173 use crate::dsl::{CallNode, WorkflowNode};
2174 use crate::persistence_memory::InMemoryWorkflowPersistence;
2175
2176 struct SlowExecutor;
2178 impl ActionExecutor for SlowExecutor {
2179 fn name(&self) -> &str {
2180 "slow"
2181 }
2182 fn execute(
2183 &self,
2184 _ctx: &dyn RunContext,
2185 _info: &StepInfo,
2186 _params: &ActionParams,
2187 ) -> Result<ActionOutput, EngineError> {
2188 std::thread::sleep(std::time::Duration::from_millis(100));
2189 Ok(ActionOutput::default())
2190 }
2191 }
2192
2193 let engine = FlowEngineBuilder::new()
2194 .action(Box::new(SlowExecutor))
2195 .build()
2196 .unwrap();
2197
2198 let timed_out_call = WorkflowNode::Call(CallNode {
2199 agent: crate::dsl::AgentRef::Name("slow".to_string()),
2200 retries: 0,
2201 on_fail: None,
2202 output: None,
2203 with: vec![],
2204 as_identity: None,
2205 plugin_dirs: vec![],
2206 timeout: Some("10ms".to_string()),
2207 max_turns: None,
2208 });
2209
2210 let def = make_def("wf", vec![timed_out_call]);
2211
2212 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2213 let run = make_test_run(&persistence);
2214 let persistence: Arc<dyn crate::traits::persistence::WorkflowPersistence> = persistence;
2215
2216 let mut m = HashMap::new();
2217 m.insert(
2218 "slow".to_string(),
2219 Box::new(SlowExecutor) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2220 );
2221 let mut state = make_bare_state("wf");
2222 state.persistence = Arc::clone(&persistence);
2223 state.action_registry = Arc::new(ActionRegistry::new(m, None));
2224 state.workflow_run_id = run.id.clone();
2225
2226 engine.run(&def, &mut state).ok();
2227
2228 let steps = persistence.get_steps(&run.id).unwrap();
2229 let timed_out = steps
2230 .iter()
2231 .any(|s| s.status == crate::status::WorkflowStepStatus::TimedOut);
2232 assert!(
2233 timed_out,
2234 "step should be marked TimedOut; got: {:?}",
2235 steps
2236 );
2237 }
2238
2239 #[test]
2245 fn resume_skips_completed_steps() {
2246 use crate::persistence_memory::InMemoryWorkflowPersistence;
2247 use crate::status::WorkflowStepStatus;
2248 use crate::traits::persistence::{NewStep, StepUpdate, WorkflowPersistence};
2249 use std::sync::atomic::Ordering;
2250
2251 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2252 let run = make_test_run(&persistence);
2253
2254 let step_id = persistence
2256 .insert_step(NewStep {
2257 workflow_run_id: run.id.clone(),
2258 step_name: "alpha".to_string(),
2259 role: "actor".to_string(),
2260 can_commit: false,
2261 position: 0,
2262 iteration: 0,
2263 retry_count: Some(0),
2264 })
2265 .unwrap();
2266 persistence
2267 .update_step(
2268 &step_id,
2269 StepUpdate {
2270 generation: 0,
2271 status: WorkflowStepStatus::Completed,
2272 child_run_id: None,
2273 result_text: None,
2274 context_out: None,
2275 markers_out: None,
2276 retry_count: None,
2277 structured_output: None,
2278 step_error: None,
2279 },
2280 )
2281 .unwrap();
2282
2283 let (alpha_count, beta_count, mut state) =
2284 make_counting_state(Arc::clone(&persistence), run.id);
2285
2286 let engine = FlowEngineBuilder::new().build().unwrap();
2287 let def = make_def("wf", vec![call_node("alpha"), call_node("beta")]);
2288 engine.resume(&def, &mut state).unwrap();
2289
2290 assert_eq!(
2291 alpha_count.load(Ordering::SeqCst),
2292 0,
2293 "alpha was pre-completed and should be skipped"
2294 );
2295 assert_eq!(
2296 beta_count.load(Ordering::SeqCst),
2297 1,
2298 "beta should execute once"
2299 );
2300 }
2301
2302 #[test]
2306 fn resume_skips_pre_completed_steps() {
2307 use crate::persistence_memory::InMemoryWorkflowPersistence;
2308 use crate::status::WorkflowStepStatus;
2309 use crate::traits::persistence::{NewStep, StepUpdate, WorkflowPersistence};
2310
2311 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2312 let run = make_test_run(&persistence);
2313
2314 let step_id = persistence
2316 .insert_step(NewStep {
2317 workflow_run_id: run.id.clone(),
2318 step_name: "alpha".to_string(),
2319 role: "actor".to_string(),
2320 can_commit: false,
2321 position: 0,
2322 iteration: 0,
2323 retry_count: Some(0),
2324 })
2325 .unwrap();
2326 persistence
2327 .update_step(
2328 &step_id,
2329 StepUpdate {
2330 generation: 0,
2331 status: WorkflowStepStatus::Completed,
2332 child_run_id: None,
2333 result_text: None,
2334 context_out: None,
2335 markers_out: None,
2336 retry_count: None,
2337 structured_output: None,
2338 step_error: None,
2339 },
2340 )
2341 .unwrap();
2342
2343 let (alpha_count, beta_count, mut state) =
2344 make_counting_state(Arc::clone(&persistence), run.id);
2345
2346 let engine = FlowEngineBuilder::new().build().unwrap();
2347 let def = make_def("wf", vec![call_node("alpha"), call_node("beta")]);
2348 engine.resume(&def, &mut state).unwrap();
2349
2350 assert_eq!(
2351 alpha_count.load(std::sync::atomic::Ordering::SeqCst),
2352 0,
2353 "alpha was pre-completed and should be skipped"
2354 );
2355 assert_eq!(
2356 beta_count.load(std::sync::atomic::Ordering::SeqCst),
2357 1,
2358 "beta should execute once"
2359 );
2360 }
2361
2362 #[test]
2364 fn resume_empty_skip_set_runs_all() {
2365 use crate::persistence_memory::InMemoryWorkflowPersistence;
2366 use std::sync::atomic::Ordering;
2367
2368 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2369 let run = make_test_run(&persistence);
2370
2371 let (alpha_count, beta_count, mut state) = make_counting_state(persistence, run.id);
2372
2373 let engine = FlowEngineBuilder::new().build().unwrap();
2374 let def = make_def("wf", vec![call_node("alpha"), call_node("beta")]);
2375 engine.resume(&def, &mut state).unwrap();
2376
2377 assert_eq!(
2378 alpha_count.load(Ordering::SeqCst),
2379 1,
2380 "alpha should execute once when no completed steps exist"
2381 );
2382 assert_eq!(
2383 beta_count.load(Ordering::SeqCst),
2384 1,
2385 "beta should execute once when no completed steps exist"
2386 );
2387 }
2388
2389 #[test]
2392 fn resume_while_loop_starts_at_first_incomplete_iteration() {
2393 use crate::dsl::{OnMaxIter, WhileNode, WorkflowNode};
2394 use crate::persistence_memory::InMemoryWorkflowPersistence;
2395 use crate::status::WorkflowStepStatus;
2396 use crate::traits::persistence::{NewStep, StepUpdate, WorkflowPersistence};
2397 use std::sync::atomic::{AtomicUsize, Ordering};
2398
2399 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2400 let run = make_test_run(&persistence);
2401
2402 let cond_id = persistence
2404 .insert_step(NewStep {
2405 workflow_run_id: run.id.clone(),
2406 step_name: "cond".to_string(),
2407 role: "actor".to_string(),
2408 can_commit: false,
2409 position: 0,
2410 iteration: 0,
2411 retry_count: Some(0),
2412 })
2413 .unwrap();
2414 persistence
2415 .update_step(
2416 &cond_id,
2417 StepUpdate {
2418 generation: 0,
2419 status: WorkflowStepStatus::Completed,
2420 child_run_id: None,
2421 result_text: None,
2422 context_out: None,
2423 markers_out: Some(r#"["continue"]"#.to_string()),
2424 retry_count: None,
2425 structured_output: None,
2426 step_error: None,
2427 },
2428 )
2429 .unwrap();
2430
2431 for iter in 0i64..2 {
2433 for (pos_offset, name) in [(0i64, "body_a"), (1, "body_b")] {
2434 let sid = persistence
2435 .insert_step(NewStep {
2436 workflow_run_id: run.id.clone(),
2437 step_name: name.to_string(),
2438 role: "actor".to_string(),
2439 can_commit: false,
2440 position: iter * 2 + pos_offset + 1,
2441 iteration: iter,
2442 retry_count: Some(0),
2443 })
2444 .unwrap();
2445 persistence
2446 .update_step(
2447 &sid,
2448 StepUpdate {
2449 generation: 0,
2450 status: WorkflowStepStatus::Completed,
2451 child_run_id: None,
2452 result_text: None,
2453 context_out: None,
2454 markers_out: None,
2455 retry_count: None,
2456 structured_output: None,
2457 step_error: None,
2458 },
2459 )
2460 .unwrap();
2461 }
2462 }
2463
2464 let a_count = Arc::new(AtomicUsize::new(0));
2466 let b_count = Arc::new(AtomicUsize::new(0));
2467 let mut m = HashMap::new();
2468 m.insert(
2469 "body_a".to_string(),
2470 Box::new(CountingExecutor {
2471 name: "body_a",
2472 count: Arc::clone(&a_count),
2473 }) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2474 );
2475 m.insert(
2476 "body_b".to_string(),
2477 Box::new(CountingExecutor {
2478 name: "body_b",
2479 count: Arc::clone(&b_count),
2480 }) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2481 );
2482 m.insert(
2485 "cond".to_string(),
2486 Box::new(CountingExecutor {
2487 name: "cond",
2488 count: Arc::new(AtomicUsize::new(0)),
2489 }) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2490 );
2491 let mut state = make_bare_state("wf");
2492 state.persistence = Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>;
2493 state.action_registry = Arc::new(ActionRegistry::new(m, None));
2494 state.workflow_run_id = run.id.clone();
2495
2496 let while_node = WorkflowNode::While(WhileNode {
2498 step: "cond".to_string(),
2499 marker: "continue".to_string(),
2500 max_iterations: 3,
2501 stuck_after: None,
2502 on_max_iter: OnMaxIter::Continue,
2503 body: vec![call_node("body_a"), call_node("body_b")],
2504 });
2505 let def = make_def("wf", vec![call_node("cond"), while_node]);
2506
2507 let engine = FlowEngineBuilder::new().build().unwrap();
2508 engine.resume(&def, &mut state).unwrap();
2509
2510 assert_eq!(
2511 a_count.load(Ordering::SeqCst),
2512 1,
2513 "body_a should execute only for the third iteration (first incomplete)"
2514 );
2515 assert_eq!(
2516 b_count.load(Ordering::SeqCst),
2517 1,
2518 "body_b should execute only for the third iteration (first incomplete)"
2519 );
2520 }
2521
2522 #[test]
2524 fn resume_propagates_get_steps_error() {
2525 use crate::persistence_memory::InMemoryWorkflowPersistence;
2526
2527 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2528 persistence.seed_run("run-123");
2529 persistence.set_fail_get_steps(true);
2530
2531 let engine = FlowEngineBuilder::new().build().unwrap();
2532 let def = make_def("wf", vec![call_node("alpha")]);
2533 let mut state = make_bare_state("wf");
2534 state.persistence = persistence;
2535 state.workflow_run_id = "run-123".to_string();
2536
2537 let err = engine.resume(&def, &mut state).unwrap_err();
2538 let msg = err.to_string();
2539 assert!(
2540 msg.contains("resume: failed to load steps for run"),
2541 "error should contain the prefix; got: {msg}"
2542 );
2543 assert!(
2544 msg.contains("run-123"),
2545 "error should contain the run ID; got: {msg}"
2546 );
2547 }
2548
2549 #[test]
2551 fn resume_rejects_pre_seeded_resume_ctx() {
2552 use crate::engine::ResumeContext;
2553 use std::collections::HashMap;
2554
2555 let engine = FlowEngineBuilder::new().build().unwrap();
2556 let def = make_def("wf", vec![call_node("alpha")]);
2557 let mut state = make_bare_state("wf");
2558 state.resume_ctx = Some(ResumeContext {
2559 step_map: HashMap::new(),
2560 });
2561 state.workflow_run_id = "run-precond".to_string();
2562
2563 let err = engine.resume(&def, &mut state).unwrap_err();
2564 assert!(
2565 err.to_string().contains("resume_ctx"),
2566 "error should mention resume_ctx; got: {err}"
2567 );
2568 }
2569
2570 #[test]
2576 fn run_sets_lease_fields_on_success() {
2577 use crate::persistence_memory::InMemoryWorkflowPersistence;
2578
2579 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2580 let run = make_test_run(&persistence);
2581
2582 let engine = FlowEngineBuilder::new()
2583 .action(Box::new(AlphaExecutor))
2584 .build()
2585 .unwrap();
2586 let def = make_def("wf", vec![call_node("alpha")]);
2587
2588 let mut state = make_bare_state("wf");
2589 state.persistence =
2590 Arc::clone(&persistence) as Arc<dyn crate::traits::persistence::WorkflowPersistence>;
2591 state.action_registry = Arc::new(ActionRegistry::new(
2592 {
2593 let mut m = HashMap::new();
2594 m.insert(
2595 "alpha".to_string(),
2596 Box::new(AlphaExecutor)
2597 as Box<dyn crate::traits::action_executor::ActionExecutor>,
2598 );
2599 m
2600 },
2601 None,
2602 ));
2603 state.workflow_run_id = run.id.clone();
2604
2605 engine.run(&def, &mut state).unwrap();
2606
2607 assert!(
2608 state.owner_token.is_some(),
2609 "owner_token should be set after run()"
2610 );
2611 assert_eq!(
2612 state.lease_generation,
2613 Some(1),
2614 "lease_generation should be 1 after first acquire"
2615 );
2616 }
2617
2618 #[test]
2620 fn two_concurrent_runs_exactly_one_succeeds() {
2621 use crate::persistence_memory::InMemoryWorkflowPersistence;
2622 use crate::traits::persistence::WorkflowPersistence;
2623 use std::thread;
2624
2625 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2626 let run = make_test_run(&persistence);
2627 let run_id = run.id.clone();
2628
2629 let persistence: Arc<dyn WorkflowPersistence> = persistence;
2630
2631 let make_state_for_run = |run_id: String, p: Arc<dyn WorkflowPersistence>| {
2633 let mut s = make_bare_state("wf");
2634 s.persistence = p;
2635 s.action_registry = Arc::new(ActionRegistry::new(
2636 {
2637 let mut m = HashMap::new();
2638 m.insert(
2639 "alpha".to_string(),
2640 Box::new(AlphaExecutor)
2641 as Box<dyn crate::traits::action_executor::ActionExecutor>,
2642 );
2643 m
2644 },
2645 None,
2646 ));
2647 s.workflow_run_id = run_id;
2648 s
2649 };
2650
2651 let def = make_def("wf", vec![call_node("alpha")]);
2652
2653 let barrier = Arc::new(std::sync::Barrier::new(2));
2655
2656 let p1 = Arc::clone(&persistence);
2657 let run_id1 = run_id.clone();
2658 let barrier1 = Arc::clone(&barrier);
2659 let def1 = def.clone();
2660 let t1 = thread::spawn(move || {
2661 let engine = FlowEngineBuilder::new()
2662 .action(Box::new(AlphaExecutor))
2663 .build()
2664 .unwrap();
2665 let mut state = make_state_for_run(run_id1, p1);
2666 barrier1.wait();
2667 engine.run(&def1, &mut state)
2668 });
2669
2670 let p2 = Arc::clone(&persistence);
2671 let run_id2 = run_id.clone();
2672 let barrier2 = Arc::clone(&barrier);
2673 let def2 = def.clone();
2674 let t2 = thread::spawn(move || {
2675 let engine = FlowEngineBuilder::new()
2676 .action(Box::new(AlphaExecutor))
2677 .build()
2678 .unwrap();
2679 let mut state = make_state_for_run(run_id2, p2);
2680 barrier2.wait();
2681 engine.run(&def2, &mut state)
2682 });
2683
2684 let r1 = t1.join().unwrap();
2685 let r2 = t2.join().unwrap();
2686
2687 let successes = [&r1, &r2].iter().filter(|r| r.is_ok()).count();
2688 let already_owned = [&r1, &r2]
2689 .iter()
2690 .filter(|r| matches!(r, Err(EngineError::AlreadyOwned(_))))
2691 .count();
2692
2693 assert_eq!(
2694 successes, 1,
2695 "exactly one run should succeed; got r1={r1:?}, r2={r2:?}"
2696 );
2697 assert_eq!(
2698 already_owned, 1,
2699 "exactly one run should fail with AlreadyOwned; got r1={r1:?}, r2={r2:?}"
2700 );
2701 }
2702
2703 #[test]
2705 fn resume_acquires_before_get_steps() {
2706 use crate::persistence_memory::InMemoryWorkflowPersistence;
2707 use crate::traits::persistence::WorkflowPersistence;
2708
2709 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2710 let run = make_test_run(&persistence);
2711
2712 persistence
2714 .acquire_lease(&run.id, "other-engine-token", 3600)
2715 .unwrap();
2716
2717 let engine = FlowEngineBuilder::new()
2718 .action(Box::new(AlphaExecutor))
2719 .build()
2720 .unwrap();
2721 let def = make_def("wf", vec![call_node("alpha")]);
2722
2723 let mut state = make_bare_state("wf");
2724 state.persistence = Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>;
2725 state.workflow_run_id = run.id.clone();
2726
2727 let err = engine.resume(&def, &mut state).unwrap_err();
2728 assert!(
2729 matches!(err, EngineError::AlreadyOwned(_)),
2730 "resume() with a pre-held lease should fail with AlreadyOwned; got {err:?}"
2731 );
2732 }
2733
2734 #[test]
2736 fn single_engine_workflow_still_completes() {
2737 let engine = FlowEngineBuilder::new()
2738 .action(Box::new(AlphaExecutor))
2739 .build()
2740 .unwrap();
2741 let def = make_single_step_def();
2742 let mut state = make_state_with_persistence("wf");
2743 let result = engine.run(&def, &mut state).unwrap();
2744 assert!(
2745 result.all_succeeded,
2746 "single-engine workflow should complete successfully"
2747 );
2748 }
2749
2750 #[test]
2757 fn refresh_db_error_causes_lease_lost_abort() {
2758 use crate::persistence_memory::InMemoryWorkflowPersistence;
2759 use crate::traits::action_executor::{ActionOutput, ActionParams};
2760 use crate::traits::persistence::WorkflowPersistence;
2761 use std::sync::atomic::Ordering;
2762 use std::thread;
2763 use std::time::Duration;
2764
2765 struct BlockingExecutor {
2766 started: Arc<AtomicBool>,
2767 shutdown: Arc<AtomicBool>,
2768 }
2769 impl ActionExecutor for BlockingExecutor {
2770 fn name(&self) -> &str {
2771 "alpha"
2772 }
2773 fn execute(
2774 &self,
2775 _ctx: &dyn crate::traits::run_context::RunContext,
2776 _info: &crate::traits::action_executor::StepInfo,
2777 _: &ActionParams,
2778 ) -> Result<ActionOutput, EngineError> {
2779 self.started.store(true, Ordering::SeqCst);
2780 loop {
2782 if self.shutdown.load(Ordering::Relaxed) {
2783 return Ok(ActionOutput::default());
2784 }
2785 std::thread::sleep(Duration::from_millis(1));
2786 }
2787 }
2788 }
2789
2790 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2791 let run = make_test_run(&persistence);
2792
2793 let started = Arc::new(AtomicBool::new(false));
2794 let started_clone = Arc::clone(&started);
2795 let persistence_clone = Arc::clone(&persistence);
2796
2797 let watcher = thread::spawn(move || {
2800 while !started_clone.load(Ordering::Relaxed) {
2801 std::thread::sleep(Duration::from_millis(1));
2802 }
2803 persistence_clone.set_fail_acquire_lease(true);
2804 });
2805
2806 let shared_shutdown = Arc::new(AtomicBool::new(false));
2810
2811 let mut m = HashMap::new();
2812 m.insert(
2813 "alpha".to_string(),
2814 Box::new(BlockingExecutor {
2815 started: Arc::clone(&started),
2816 shutdown: Arc::clone(&shared_shutdown),
2817 }) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2818 );
2819 let mut state = make_bare_state("wf");
2820 state.persistence = Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>;
2821 state.action_registry = Arc::new(ActionRegistry::new(m, None));
2822 state.workflow_run_id = run.id.clone();
2823 state.exec_config.shutdown = Some(Arc::clone(&shared_shutdown));
2824 state.exec_config.lease_refresh_interval = Duration::from_millis(15);
2826
2827 let engine = FlowEngineBuilder::new().build().unwrap();
2828 let def = make_def("wf", vec![call_node("alpha")]);
2829
2830 let result = engine.run(&def, &mut state);
2831 watcher.join().unwrap();
2832
2833 assert!(
2834 matches!(
2835 result,
2836 Err(EngineError::Cancelled(CancellationReason::LeaseLost))
2837 ),
2838 "DB error in refresh should abort with LeaseLost; got {result:?}"
2839 );
2840 }
2841
2842 #[test]
2846 fn stale_generation_on_step_write_aborts_with_lease_lost() {
2847 use crate::persistence_memory::InMemoryWorkflowPersistence;
2848 use crate::traits::action_executor::{ActionOutput, ActionParams};
2849 use crate::traits::persistence::WorkflowPersistence;
2850 use std::sync::atomic::Ordering;
2851 use std::thread;
2852 use std::time::Duration;
2853
2854 struct LatchedExecutor {
2858 started: Arc<AtomicBool>,
2859 proceed: Arc<AtomicBool>,
2860 }
2861 impl ActionExecutor for LatchedExecutor {
2862 fn name(&self) -> &str {
2863 "alpha"
2864 }
2865 fn execute(
2866 &self,
2867 _ctx: &dyn crate::traits::run_context::RunContext,
2868 _info: &crate::traits::action_executor::StepInfo,
2869 _: &ActionParams,
2870 ) -> Result<ActionOutput, EngineError> {
2871 self.started.store(true, Ordering::SeqCst);
2872 while !self.proceed.load(Ordering::SeqCst) {
2873 std::thread::sleep(Duration::from_millis(1));
2874 }
2875 Ok(ActionOutput::default())
2876 }
2877 }
2878
2879 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2880 let run = make_test_run(&persistence);
2881 let run_id = run.id.clone();
2882
2883 let started = Arc::new(AtomicBool::new(false));
2884 let proceed = Arc::new(AtomicBool::new(false));
2885 let started_clone = Arc::clone(&started);
2886 let proceed_clone = Arc::clone(&proceed);
2887 let persistence_clone = Arc::clone(&persistence);
2888
2889 let stealer = thread::spawn(move || {
2892 while !started_clone.load(Ordering::SeqCst) {
2893 std::thread::sleep(Duration::from_millis(1));
2894 }
2895 persistence_clone.expire_and_steal_lease(&run_id, "thief-token");
2896 proceed_clone.store(true, Ordering::SeqCst);
2897 });
2898
2899 let mut m = HashMap::new();
2900 m.insert(
2901 "alpha".to_string(),
2902 Box::new(LatchedExecutor {
2903 started: Arc::clone(&started),
2904 proceed: Arc::clone(&proceed),
2905 }) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2906 );
2907
2908 let mut state = make_bare_state("wf");
2909 state.persistence = Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>;
2910 state.action_registry = Arc::new(ActionRegistry::new(m, None));
2911 state.workflow_run_id = run.id.clone();
2912 state.exec_config.lease_refresh_interval = Duration::from_millis(15);
2914
2915 let engine = FlowEngineBuilder::new().build().unwrap();
2916 let def = make_def("wf", vec![call_node("alpha")]);
2917
2918 let result = engine.run(&def, &mut state);
2919 stealer.join().unwrap();
2920
2921 assert!(
2922 matches!(
2923 result,
2924 Err(EngineError::Cancelled(CancellationReason::LeaseLost))
2925 ),
2926 "stale generation on step write should abort with LeaseLost; got {result:?}"
2927 );
2928 }
2929
2930 #[test]
2932 fn cross_process_cancel_via_db_poll() {
2933 use crate::persistence_memory::InMemoryWorkflowPersistence;
2934 use crate::status::WorkflowRunStatus;
2935 use crate::traits::persistence::WorkflowPersistence;
2936
2937 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2938 let run = make_test_run(&persistence);
2939
2940 persistence
2942 .update_run_status(&run.id, WorkflowRunStatus::Cancelling, None, None)
2943 .unwrap();
2944
2945 assert!(
2947 persistence.is_run_cancelled(&run.id).unwrap(),
2948 "is_run_cancelled should return true when status is Cancelling"
2949 );
2950 }
2951
2952 fn make_alpha_registry() -> Arc<ActionRegistry> {
2957 let mut m = HashMap::new();
2958 m.insert(
2959 "alpha".to_string(),
2960 Box::new(AlphaExecutor) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2961 );
2962 Arc::new(ActionRegistry::new(m, None))
2963 }
2964
2965 fn make_run_workflow_input(
2966 persistence: Arc<dyn crate::traits::persistence::WorkflowPersistence>,
2967 run_id: String,
2968 ) -> RunInput {
2969 use crate::traits::run_context::NoopRunContext;
2970 use crate::traits::script_env_provider::NoOpScriptEnvProvider;
2971 RunInput::new(
2972 persistence,
2973 run_id,
2974 "wf".to_string(),
2975 make_alpha_registry(),
2976 Arc::new(ItemProviderRegistry::new()),
2977 Arc::new(NoOpScriptEnvProvider),
2978 Arc::new(NoopRunContext::default()),
2979 CancellationToken::new(),
2980 )
2981 }
2982
2983 #[test]
2984 fn run_workflow_acquires_lease_and_runs_to_completion() {
2985 use crate::persistence_memory::InMemoryWorkflowPersistence;
2986 use crate::status::WorkflowRunStatus;
2987 use crate::traits::persistence::WorkflowPersistence;
2988
2989 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2990 let run = make_test_run(&persistence);
2991
2992 let engine = FlowEngineBuilder::new()
2993 .action(Box::new(AlphaExecutor))
2994 .build()
2995 .unwrap();
2996 let def = make_def("wf", vec![call_node("alpha")]);
2997
2998 let result = engine
2999 .run_workflow(
3000 &def,
3001 make_run_workflow_input(
3002 Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>,
3003 run.id.clone(),
3004 ),
3005 )
3006 .unwrap();
3007
3008 assert!(result.all_succeeded, "run_workflow should succeed");
3009 let row = persistence.get_run(&run.id).unwrap().unwrap();
3010 assert_eq!(
3011 row.status,
3012 WorkflowRunStatus::Completed,
3013 "run should be Completed in persistence"
3014 );
3015 }
3016
3017 #[test]
3018 fn run_workflow_does_not_inherit_lease_generation_some_zero() {
3019 use crate::persistence_memory::InMemoryWorkflowPersistence;
3020 use crate::traits::persistence::WorkflowPersistence;
3021
3022 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
3023 let run1 = make_test_run(&persistence);
3024 let run2 = make_test_run(&persistence);
3025
3026 let engine = FlowEngineBuilder::new()
3027 .action(Box::new(AlphaExecutor))
3028 .build()
3029 .unwrap();
3030 let def = make_def("wf", vec![call_node("alpha")]);
3031
3032 engine
3035 .run_workflow(
3036 &def,
3037 make_run_workflow_input(
3038 Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>,
3039 run1.id,
3040 ),
3041 )
3042 .unwrap();
3043 engine
3044 .run_workflow(
3045 &def,
3046 make_run_workflow_input(
3047 Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>,
3048 run2.id,
3049 ),
3050 )
3051 .unwrap();
3052 }
3053
3054 #[test]
3055 fn run_child_passes_parent_ctx_inputs_through_when_no_override() {
3056 use crate::engine::ChildWorkflowContext;
3057 use crate::persistence_memory::InMemoryWorkflowPersistence;
3058 use crate::traits::persistence::WorkflowPersistence;
3059 use crate::traits::run_context::NoopRunContext;
3060 use crate::traits::script_env_provider::NoOpScriptEnvProvider;
3061
3062 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
3063 let run = make_test_run(&persistence);
3064
3065 let engine = FlowEngineBuilder::new()
3066 .action(Box::new(AlphaExecutor))
3067 .build()
3068 .unwrap();
3069 let def = make_def("wf", vec![call_node("alpha")]);
3070
3071 let mut parent_inputs = HashMap::new();
3072 parent_inputs.insert("key".to_string(), "parent_value".to_string());
3073
3074 let parent_ctx = ChildWorkflowContext {
3075 run_ctx: Arc::new(NoopRunContext::default()),
3076 extra_plugin_dirs: vec![],
3077 workflow_run_id: "parent-run-id".to_string(),
3078 model: None,
3079 exec_config: crate::types::WorkflowExecConfig::default(),
3080 inputs: parent_inputs,
3081 event_sinks: Arc::from(vec![]),
3082 };
3083
3084 let result = engine
3085 .run_child(
3086 &def,
3087 ChildRunInput {
3088 workflow_run_id: run.id.clone(),
3089 persistence: Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>,
3090 action_registry: make_alpha_registry(),
3091 item_provider_registry: Arc::new(ItemProviderRegistry::new()),
3092 script_env_provider: Arc::new(NoOpScriptEnvProvider),
3093 child_runner: None,
3094 schema_resolver: None,
3095 as_identity: None,
3096 depth: 1,
3097 cancellation: CancellationToken::new(),
3098 target_label: None,
3099 triggered_by_hook: false,
3100 inputs_override: None, },
3102 &parent_ctx,
3103 )
3104 .unwrap();
3105
3106 assert!(
3107 result.all_succeeded,
3108 "run_child with no inputs_override should succeed"
3109 );
3110 }
3111
3112 #[test]
3113 fn run_child_inputs_override_replaces_parent_ctx_inputs() {
3114 use crate::engine::ChildWorkflowContext;
3115 use crate::persistence_memory::InMemoryWorkflowPersistence;
3116 use crate::traits::persistence::WorkflowPersistence;
3117 use crate::traits::run_context::NoopRunContext;
3118 use crate::traits::script_env_provider::NoOpScriptEnvProvider;
3119
3120 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
3121 let run = make_test_run(&persistence);
3122
3123 let engine = FlowEngineBuilder::new()
3124 .action(Box::new(AlphaExecutor))
3125 .build()
3126 .unwrap();
3127 let def = make_def("wf", vec![call_node("alpha")]);
3128
3129 let mut parent_inputs = HashMap::new();
3130 parent_inputs.insert("key".to_string(), "parent_value".to_string());
3131
3132 let parent_ctx = ChildWorkflowContext {
3133 run_ctx: Arc::new(NoopRunContext::default()),
3134 extra_plugin_dirs: vec![],
3135 workflow_run_id: "parent-run-id".to_string(),
3136 model: None,
3137 exec_config: crate::types::WorkflowExecConfig::default(),
3138 inputs: parent_inputs,
3139 event_sinks: Arc::from(vec![]),
3140 };
3141
3142 let mut override_inputs = HashMap::new();
3143 override_inputs.insert("key".to_string(), "override_value".to_string());
3144
3145 let result = engine
3146 .run_child(
3147 &def,
3148 ChildRunInput {
3149 workflow_run_id: run.id.clone(),
3150 persistence: Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>,
3151 action_registry: make_alpha_registry(),
3152 item_provider_registry: Arc::new(ItemProviderRegistry::new()),
3153 script_env_provider: Arc::new(NoOpScriptEnvProvider),
3154 child_runner: None,
3155 schema_resolver: None,
3156 as_identity: None,
3157 depth: 1,
3158 cancellation: CancellationToken::new(),
3159 target_label: None,
3160 triggered_by_hook: false,
3161 inputs_override: Some(override_inputs), },
3163 &parent_ctx,
3164 )
3165 .unwrap();
3166
3167 assert!(
3168 result.all_succeeded,
3169 "run_child with inputs_override should succeed"
3170 );
3171 }
3172
3173 #[test]
3174 fn child_run_input_new_sets_required_fields_and_zeros_optional() {
3175 use crate::cancellation::CancellationToken;
3176 use crate::persistence_memory::InMemoryWorkflowPersistence;
3177 use crate::traits::action_executor::ActionRegistry;
3178 use crate::traits::item_provider::ItemProviderRegistry;
3179 use crate::traits::script_env_provider::NoOpScriptEnvProvider;
3180
3181 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
3182 let action_registry = Arc::new(ActionRegistry::new(HashMap::new(), None));
3183 let item_provider_registry = Arc::new(ItemProviderRegistry::new());
3184 let script_env_provider = Arc::new(NoOpScriptEnvProvider);
3185 let cancellation = CancellationToken::new();
3186
3187 let input = ChildRunInput::new(
3188 "run-child-1".to_string(),
3189 Arc::clone(&persistence) as Arc<dyn crate::traits::persistence::WorkflowPersistence>,
3190 Arc::clone(&action_registry),
3191 Arc::clone(&item_provider_registry),
3192 Arc::clone(&script_env_provider)
3193 as Arc<dyn crate::traits::script_env_provider::ScriptEnvProvider>,
3194 2,
3195 cancellation,
3196 );
3197
3198 assert_eq!(input.workflow_run_id, "run-child-1");
3199 assert_eq!(input.depth, 2);
3200 assert!(input.child_runner.is_none());
3201 assert!(input.schema_resolver.is_none());
3202 assert!(input.as_identity.is_none());
3203 assert!(input.target_label.is_none());
3204 assert!(!input.triggered_by_hook);
3205 assert!(input.inputs_override.is_none());
3206 }
3207
3208 #[test]
3209 fn run_input_new_sets_required_fields_and_zeros_optional() {
3210 use crate::cancellation::CancellationToken;
3211 use crate::persistence_memory::InMemoryWorkflowPersistence;
3212 use crate::traits::action_executor::ActionRegistry;
3213 use crate::traits::item_provider::ItemProviderRegistry;
3214 use crate::traits::run_context::NoopRunContext;
3215 use crate::traits::script_env_provider::NoOpScriptEnvProvider;
3216
3217 let persistence = Arc::new(InMemoryWorkflowPersistence::new());
3218 let action_registry = Arc::new(ActionRegistry::new(HashMap::new(), None));
3219 let item_provider_registry = Arc::new(ItemProviderRegistry::new());
3220 let script_env_provider = Arc::new(NoOpScriptEnvProvider);
3221 let run_ctx = Arc::new(NoopRunContext::default());
3222 let cancellation = CancellationToken::new();
3223
3224 let input = RunInput::new(
3225 Arc::clone(&persistence) as Arc<dyn crate::traits::persistence::WorkflowPersistence>,
3226 "run-top-1".to_string(),
3227 "my-workflow".to_string(),
3228 Arc::clone(&action_registry),
3229 Arc::clone(&item_provider_registry),
3230 Arc::clone(&script_env_provider)
3231 as Arc<dyn crate::traits::script_env_provider::ScriptEnvProvider>,
3232 run_ctx as Arc<dyn crate::traits::run_context::RunContext>,
3233 cancellation,
3234 );
3235
3236 assert_eq!(input.workflow_run_id, "run-top-1");
3237 assert_eq!(input.workflow_name, "my-workflow");
3238 assert!(input.extra_plugin_dirs.is_empty());
3239 assert!(input.model.is_none());
3240 assert!(input.inputs.is_empty());
3241 assert_eq!(input.parent_run_id, "");
3242 assert_eq!(input.depth, 0);
3243 assert!(input.target_label.is_none());
3244 assert!(input.default_as_identity.is_none());
3245 assert!(!input.triggered_by_hook);
3246 assert!(input.schema_resolver.is_none());
3247 assert!(input.child_runner.is_none());
3248 assert!(input.event_sinks.is_empty());
3249 }
3250}