dataflow_rs/engine/workflow_executor.rs
1//! # Workflow Execution Module
2//!
3//! This module handles the execution of workflows and their associated tasks.
4//! It provides a clean separation between workflow orchestration and task execution.
5
6use crate::engine::error::{DataflowError, ErrorInfo, Result};
7use crate::engine::executor::{
8 ArenaContext, evaluate_condition, evaluate_condition_in_arena, with_arena,
9};
10use crate::engine::functions::BoxedFunctionHandler;
11use crate::engine::message::{AuditTrail, Change, Message};
12use crate::engine::task::Task;
13use crate::engine::task_executor::TaskExecutor;
14use crate::engine::task_outcome::TaskOutcome;
15use crate::engine::trace::{ExecutionStep, ExecutionTrace};
16use crate::engine::utils::set_nested_value;
17use crate::engine::workflow::Workflow;
18use chrono::{DateTime, Utc};
19use datalogic_rs::Engine;
20use datavalue::OwnedDataValue;
21use log::{debug, error, info, warn};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::sync::Arc;
25
26/// Result of handling a task, including possible control flow signals
27enum TaskControlFlow {
28 /// Continue executing the next task
29 Continue,
30 /// Stop executing further tasks in this workflow (filter halt)
31 HaltWorkflow,
32}
33
34/// Return the index of the first task at or after `start` that is *not* a
35/// synchronous built-in. Used to chunk `workflow.tasks` into sync-only
36/// stretches that can share a single `ArenaContext`.
37fn next_async_boundary(tasks: &[Task], start: usize) -> usize {
38 let mut i = start;
39 while i < tasks.len() && tasks[i].function.is_sync_builtin() {
40 i += 1;
41 }
42 i
43}
44
45/// Handles the execution of workflows and their tasks
46///
47/// The `WorkflowExecutor` is responsible for:
48/// - Evaluating workflow conditions
49/// - Orchestrating task execution within workflows
50/// - Managing workflow-level error handling
51/// - Recording audit trails
52pub struct WorkflowExecutor {
53 /// Task executor for executing individual tasks
54 task_executor: Arc<TaskExecutor>,
55 /// Shared datalogic engine for condition evaluation
56 engine: Arc<Engine>,
57}
58
59impl WorkflowExecutor {
60 /// Create a new WorkflowExecutor
61 pub fn new(task_executor: Arc<TaskExecutor>, engine: Arc<Engine>) -> Self {
62 Self {
63 task_executor,
64 engine,
65 }
66 }
67
68 /// Get a clone of the task_functions Arc for reuse in new engines
69 pub fn task_functions(&self) -> Arc<HashMap<String, BoxedFunctionHandler>> {
70 self.task_executor.task_functions()
71 }
72
73 /// Execute a workflow if its condition is met
74 ///
75 /// This method:
76 /// 1. Evaluates the workflow condition
77 /// 2. Executes tasks sequentially if condition is met
78 /// 3. Handles error recovery based on workflow configuration
79 /// 4. Updates message metadata and audit trail
80 ///
81 /// # Arguments
82 /// * `workflow` - The workflow to execute
83 /// * `message` - The message being processed
84 ///
85 /// # Returns
86 /// * `Result<bool>` - Ok(true) if workflow was executed, Ok(false) if skipped, Err on failure
87 pub async fn execute(
88 &self,
89 workflow: &Workflow,
90 message: &mut Message,
91 now: DateTime<Utc>,
92 ) -> Result<bool> {
93 self.execute_inner(workflow, message, None, now).await
94 }
95
96 /// Execute a workflow with step-by-step tracing
97 ///
98 /// Similar to `execute` but records execution steps for debugging.
99 pub async fn execute_with_trace(
100 &self,
101 workflow: &Workflow,
102 message: &mut Message,
103 trace: &mut ExecutionTrace,
104 now: DateTime<Utc>,
105 ) -> Result<bool> {
106 self.execute_inner(workflow, message, Some(trace), now)
107 .await
108 }
109
110 /// Unified workflow-condition + task-loop driver. `trace` is `None` for
111 /// the production path and `Some(&mut trace)` for the debug path —
112 /// stepping is the only behavioural difference between them.
113 async fn execute_inner(
114 &self,
115 workflow: &Workflow,
116 message: &mut Message,
117 mut trace: Option<&mut ExecutionTrace>,
118 now: DateTime<Utc>,
119 ) -> Result<bool> {
120 // Evaluate workflow condition directly against the OwnedDataValue context
121 let should_execute = evaluate_condition(
122 &self.engine,
123 workflow.compiled_condition.as_ref(),
124 &message.context,
125 )?;
126
127 if !should_execute {
128 debug!("Skipping workflow {} - condition not met", workflow.id);
129 if let Some(t) = trace.as_deref_mut() {
130 t.add_step(ExecutionStep::workflow_skipped(&workflow.id));
131 }
132 return Ok(false);
133 }
134
135 // Execute workflow tasks (trace recording happens inside the loop)
136 match self.execute_tasks(workflow, message, trace, now).await {
137 Ok(_) => {
138 info!("Successfully completed workflow: {}", workflow.id);
139 Ok(true)
140 }
141 Err(e) => {
142 // Single-channel contract: every error appears in
143 // `message.errors`. The `Result::Err` return only signals to
144 // the caller that we stopped before processing further
145 // workflows. The workflow-level wrapper records workflow
146 // context that the underlying task error doesn't carry.
147 message.errors.push(
148 ErrorInfo::builder(
149 "WORKFLOW_ERROR",
150 format!("Workflow {} error: {}", workflow.id, e),
151 )
152 .workflow_id(&workflow.id)
153 .build(),
154 );
155
156 if workflow.continue_on_error {
157 warn!(
158 "Workflow {} encountered error but continuing: {:?}",
159 workflow.id, e
160 );
161 Ok(true)
162 } else {
163 error!("Workflow {} failed: {:?}", workflow.id, e);
164 Err(e)
165 }
166 }
167 }
168 }
169
170 /// Execute all tasks in a workflow.
171 ///
172 /// Groups consecutive synchronous built-in tasks into a single
173 /// `with_arena` scope so the arena form of `message.context` is built
174 /// once at the start of the stretch and reused across `parse_json`,
175 /// `map`, `validation`, `log`, and `filter`. Async tasks (HTTP, Kafka,
176 /// custom handlers) break the stretch — the arena flushes any pending
177 /// state back to `OwnedDataValue` automatically (since each sync task
178 /// already mutates `message.context` in place) and the next stretch
179 /// rebuilds the arena form.
180 ///
181 /// When `trace` is `Some`, the loop also records `ExecutionStep` entries
182 /// after each task (skipped/executed) including per-mapping snapshots
183 /// for `Map` tasks.
184 async fn execute_tasks(
185 &self,
186 workflow: &Workflow,
187 message: &mut Message,
188 mut trace: Option<&mut ExecutionTrace>,
189 now: DateTime<Utc>,
190 ) -> Result<()> {
191 let tasks = &workflow.tasks;
192 let mut idx = 0;
193 while idx < tasks.len() {
194 let stretch_end = next_async_boundary(tasks, idx);
195
196 if stretch_end > idx {
197 // Run [idx, stretch_end) as a sync stretch inside one arena.
198 let halt = self.run_sync_stretch(
199 &tasks[idx..stretch_end],
200 workflow,
201 message,
202 trace.as_deref_mut(),
203 now,
204 )?;
205 if halt {
206 return Ok(());
207 }
208 idx = stretch_end;
209 }
210
211 if idx < tasks.len() {
212 // Single async task (or non-sync-builtin) at `idx`.
213 let task = &tasks[idx];
214 let should_execute = evaluate_condition(
215 &self.engine,
216 task.compiled_condition.as_ref(),
217 &message.context,
218 )?;
219
220 if !should_execute {
221 debug!("Skipping task {} - condition not met", task.id);
222 if let Some(t) = trace.as_deref_mut() {
223 t.add_step(ExecutionStep::task_skipped(&workflow.id, &task.id));
224 }
225 idx += 1;
226 continue;
227 }
228
229 let result = self.task_executor.execute(task, message).await;
230 let control_flow = self.handle_task_result(
231 result,
232 &workflow.id_arc,
233 &task.id_arc,
234 task.continue_on_error,
235 message,
236 now,
237 )?;
238
239 // Async tasks at the boundary have no per-mapping snapshots —
240 // they're either HTTP/Kafka/Enrich or a custom handler.
241 if let Some(t) = trace.as_deref_mut() {
242 t.add_step(ExecutionStep::executed(&workflow.id, &task.id, message));
243 }
244
245 if matches!(control_flow, TaskControlFlow::HaltWorkflow) {
246 return Ok(());
247 }
248 idx += 1;
249 }
250 }
251
252 Ok(())
253 }
254
255 /// Execute a contiguous run of sync-builtin tasks inside one
256 /// `with_arena` scope. The arena context is built once at the start and
257 /// refreshed in place after each mutating task. Returns `Ok(true)` if a
258 /// filter task halted the workflow.
259 ///
260 /// This is the single-workflow entry; the cross-workflow path
261 /// (`execute_sync_workflow_run`) shares the same task loop via
262 /// `run_tasks_slice_in_arena` but carries one `ArenaContext` across several
263 /// workflows.
264 fn run_sync_stretch(
265 &self,
266 tasks: &[Task],
267 workflow: &Workflow,
268 message: &mut Message,
269 trace: Option<&mut ExecutionTrace>,
270 now: DateTime<Utc>,
271 ) -> Result<bool> {
272 with_arena(|arena| -> Result<bool> {
273 let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
274 self.run_tasks_slice_in_arena(tasks, workflow, message, &mut arena_ctx, trace, now)
275 })
276 }
277
278 /// Run `tasks` against an already-built `ArenaContext`, evaluating each
279 /// task's condition in-arena and refreshing the cache after each mutating
280 /// task. Returns `Ok(true)` if a filter task halted the workflow.
281 ///
282 /// Factored out of `run_sync_stretch` so both the single-workflow stretch
283 /// and the cross-workflow shared-arena run (`execute_sync_workflow_run`)
284 /// share one implementation. The caller owns the `ArenaContext` lifetime,
285 /// so the cross-workflow path can reuse the same arena form of
286 /// `message.context` across consecutive workflows instead of rebuilding it.
287 fn run_tasks_slice_in_arena(
288 &self,
289 tasks: &[Task],
290 workflow: &Workflow,
291 message: &mut Message,
292 arena_ctx: &mut ArenaContext<'_>,
293 mut trace: Option<&mut ExecutionTrace>,
294 now: DateTime<Utc>,
295 ) -> Result<bool> {
296 let arena = arena_ctx.arena();
297
298 for task in tasks {
299 // Task condition — evaluate against the arena form so we don't
300 // re-borrow the thread-local `RefCell`. A `None` compiled
301 // condition (compiler folds the default literal `true` to
302 // `None`) skips both the eval and the per-task arena context
303 // slice build.
304 let should_execute = match task.compiled_condition.as_ref() {
305 None => true,
306 Some(compiled) => evaluate_condition_in_arena(
307 &self.engine,
308 Some(compiled),
309 arena_ctx.as_data_value(),
310 arena,
311 )?,
312 };
313
314 if !should_execute {
315 debug!("Skipping task {} - condition not met", task.id);
316 if let Some(t) = trace.as_deref_mut() {
317 t.add_step(ExecutionStep::task_skipped(&workflow.id, &task.id));
318 }
319 continue;
320 }
321
322 // Per-task snapshot buffer — only used for Map tasks in trace
323 // mode. Allocating an empty Vec is cheap and the buffer stays
324 // empty for non-Map tasks.
325 let mut mapping_snapshots: Vec<Value> = Vec::new();
326 let snapshot_buf = if trace.is_some() {
327 Some(&mut mapping_snapshots)
328 } else {
329 None
330 };
331 let result = self.execute_sync_task_in_arena(task, message, arena_ctx, snapshot_buf);
332
333 let control_flow = self.handle_task_result(
334 result,
335 &workflow.id_arc,
336 &task.id_arc,
337 task.continue_on_error,
338 message,
339 now,
340 )?;
341
342 // The audit-trail / progress-metadata writes performed by
343 // `handle_task_result` mutate `message.context`. Refresh the
344 // arena cache so the next task — and, in the cross-workflow path,
345 // the next workflow's condition — sees them.
346 arena_ctx.refresh_for_path(&message.context, "metadata");
347
348 if let Some(t) = trace.as_deref_mut() {
349 let mut step = ExecutionStep::executed(&workflow.id, &task.id, message);
350 if !mapping_snapshots.is_empty() {
351 step = step.with_mapping_contexts(mapping_snapshots);
352 }
353 t.add_step(step);
354 }
355
356 if matches!(control_flow, TaskControlFlow::HaltWorkflow) {
357 return Ok(true);
358 }
359 }
360 Ok(false)
361 }
362
363 /// Drive a message through `workflows` in order, grouping maximal runs of
364 /// consecutive `fully_sync` workflows into a single shared-arena scope
365 /// (`execute_sync_workflow_run`) and falling back to the per-workflow
366 /// `.await` path (`execute_inner`) for any workflow containing an async
367 /// task. This is the single orchestration entry for all four
368 /// `Engine::process_message*` variants.
369 pub async fn run_all(
370 &self,
371 workflows: &[&Workflow],
372 message: &mut Message,
373 mut trace: Option<&mut ExecutionTrace>,
374 now: DateTime<Utc>,
375 ) -> Result<()> {
376 let mut i = 0;
377 while i < workflows.len() {
378 if workflows[i].fully_sync {
379 // Extend over the maximal run of consecutive fully-sync
380 // workflows and execute them in one shared arena scope.
381 let mut j = i + 1;
382 while j < workflows.len() && workflows[j].fully_sync {
383 j += 1;
384 }
385 self.execute_sync_workflow_run(
386 &workflows[i..j],
387 message,
388 trace.as_deref_mut(),
389 now,
390 )?;
391 i = j;
392 } else {
393 // Mixed sync+async (or fully-async) workflow: the existing
394 // driver interleaves per-stretch arenas with `.await`.
395 self.execute_inner(workflows[i], message, trace.as_deref_mut(), now)
396 .await?;
397 i += 1;
398 }
399 }
400 Ok(())
401 }
402
403 /// Execute a maximal run of consecutive fully-sync workflows inside ONE
404 /// shared `with_arena` scope. The message context is deep-walked into the
405 /// arena once for the whole run, then carried — with the existing
406 /// incremental `refresh_for_path` after each mutating task — across
407 /// workflow boundaries, instead of being rebuilt per workflow.
408 ///
409 /// Per-workflow semantics are preserved exactly: each workflow's condition
410 /// is evaluated (in-arena), a false condition skips only that workflow, a
411 /// filter-halt stops only that workflow, and task errors are wrapped with
412 /// the workflow id and honor `continue_on_error` (continue, or propagate
413 /// `Err` out of the run to stop the whole message) — mirroring
414 /// `execute_inner`.
415 ///
416 /// **Tokio safety:** this method is synchronous and the `fully_sync`
417 /// precondition guarantees every task is a sync built-in, so no `.await`
418 /// occurs while the `!Send` arena borrow is live. The borrow checker
419 /// enforces this — the shared `ArenaContext` cannot escape the closure.
420 fn execute_sync_workflow_run(
421 &self,
422 workflows: &[&Workflow],
423 message: &mut Message,
424 mut trace: Option<&mut ExecutionTrace>,
425 now: DateTime<Utc>,
426 ) -> Result<()> {
427 with_arena(|arena| -> Result<()> {
428 let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
429
430 for &workflow in workflows {
431 // Workflow condition in-arena: a folded `None` skips the eval;
432 // a real condition reuses the carried context instead of the
433 // owned-path `eval_to_owned` deep-walk.
434 let should_execute = match workflow.compiled_condition.as_ref() {
435 None => true,
436 Some(compiled) => evaluate_condition_in_arena(
437 &self.engine,
438 Some(compiled),
439 arena_ctx.as_data_value(),
440 arena,
441 )?,
442 };
443
444 if !should_execute {
445 debug!("Skipping workflow {} - condition not met", workflow.id);
446 if let Some(t) = trace.as_deref_mut() {
447 t.add_step(ExecutionStep::workflow_skipped(&workflow.id));
448 }
449 continue;
450 }
451
452 match self.run_tasks_slice_in_arena(
453 &workflow.tasks,
454 workflow,
455 message,
456 &mut arena_ctx,
457 trace.as_deref_mut(),
458 now,
459 ) {
460 // Filter-halt stops only this workflow; carry on with the
461 // next one (and keep the shared arena context).
462 Ok(_halted) => {
463 info!("Successfully completed workflow: {}", workflow.id);
464 }
465 Err(e) => {
466 // Single-channel contract — mirror `execute_inner`:
467 // record the workflow-context error to `message.errors`
468 // and honor `continue_on_error`.
469 message.errors.push(
470 ErrorInfo::builder(
471 "WORKFLOW_ERROR",
472 format!("Workflow {} error: {}", workflow.id, e),
473 )
474 .workflow_id(&workflow.id)
475 .build(),
476 );
477
478 if workflow.continue_on_error {
479 warn!(
480 "Workflow {} encountered error but continuing: {:?}",
481 workflow.id, e
482 );
483 } else {
484 error!("Workflow {} failed: {:?}", workflow.id, e);
485 return Err(e);
486 }
487 }
488 }
489 }
490 Ok(())
491 })
492 }
493
494 /// Dispatch a single sync-builtin task via the consolidated
495 /// `FunctionConfig::try_execute_in_arena`. `next_async_boundary` guarantees
496 /// the stretch contents are sync built-ins, so the `None` arm is
497 /// unreachable in practice.
498 ///
499 /// `map_snapshot_buf` is only consulted by the `Map` variant; non-Map
500 /// sync builtins ignore it. Pass `None` from the production path.
501 fn execute_sync_task_in_arena(
502 &self,
503 task: &Task,
504 message: &mut Message,
505 arena_ctx: &mut ArenaContext<'_>,
506 map_snapshot_buf: Option<&mut Vec<Value>>,
507 ) -> Result<(TaskOutcome, Vec<Change>)> {
508 debug!(
509 "Executing sync task in arena: {} ({})",
510 task.id,
511 task.function.function_name()
512 );
513 debug_assert!(
514 task.function.is_sync_builtin(),
515 "execute_sync_task_in_arena called with non-sync-builtin task: {}",
516 task.function.function_name()
517 );
518 // In debug builds the assert above catches mis-dispatch; in release
519 // we still surface the invariant violation as a recoverable engine
520 // error rather than panicking via `unreachable!`.
521 task.function
522 .try_execute_in_arena(message, arena_ctx, &self.engine, map_snapshot_buf)
523 .ok_or_else(|| {
524 DataflowError::Task(format!(
525 "execute_sync_task_in_arena dispatched to non-sync-builtin task '{}' \
526 (engine bug — sync-stretch should only contain sync-builtin tasks)",
527 task.function.function_name()
528 ))
529 })?
530 }
531
532 /// Handle the result of a task execution.
533 ///
534 /// `workflow_id_arc` and `task_id_arc` are the compile-time cached
535 /// `Arc<str>` mirrors of `workflow.id` / `task.id`; we Arc-clone them into
536 /// each `AuditTrail` rather than reallocating from the `&str` form.
537 fn handle_task_result(
538 &self,
539 result: Result<(TaskOutcome, Vec<Change>)>,
540 workflow_id_arc: &Arc<str>,
541 task_id_arc: &Arc<str>,
542 continue_on_error: bool,
543 message: &mut Message,
544 now: DateTime<Utc>,
545 ) -> Result<TaskControlFlow> {
546 let workflow_id: &str = workflow_id_arc;
547 let task_id: &str = task_id_arc;
548 match result {
549 Ok((TaskOutcome::Skip, _)) => {
550 // No audit trail, no progress write — task has explicitly opted
551 // out (filter gate set to `Skip`).
552 debug!("Task {} signaled skip", task_id);
553 Ok(TaskControlFlow::Continue)
554 }
555 Ok((outcome, changes)) => {
556 // `Skip` already returned above; the remaining variants all
557 // record an audit entry. `audit_status()` is `Some` for
558 // Success/Status/Halt — expect is for documentation only.
559 let status = outcome
560 .audit_status()
561 .expect("Skip handled above; remaining variants emit audit status");
562 let halt = outcome.halts_workflow();
563
564 // Record audit trail. workflow_id_arc/task_id_arc are populated
565 // by LogicCompiler at engine construction; cloning them is a
566 // refcount bump, not a string copy. `now` is shared with all
567 // other AuditTrails in this process_message call.
568 message.audit_trail.push(AuditTrail {
569 timestamp: now,
570 workflow_id: Arc::clone(workflow_id_arc),
571 task_id: Arc::clone(task_id_arc),
572 status: status as usize,
573 changes,
574 });
575
576 // Update progress metadata for workflow chaining. Always
577 // emitted: when multiple workflows are registered in the same
578 // engine, downstream workflows route on
579 // `metadata.progress.{workflow_id,task_id,status_code}` to
580 // advance through linear sequences. One batched write
581 // (single tree walk + single Object alloc) benchmarked ~3%
582 // faster than three separate writes on the realistic
583 // workload — replacing a slot beats find-and-update walks.
584 set_nested_value(
585 &mut message.context,
586 "metadata.progress",
587 OwnedDataValue::Object(vec![
588 (
589 "workflow_id".to_string(),
590 OwnedDataValue::String(workflow_id.to_string()),
591 ),
592 (
593 "task_id".to_string(),
594 OwnedDataValue::String(task_id.to_string()),
595 ),
596 (
597 "status_code".to_string(),
598 OwnedDataValue::from(status as u64),
599 ),
600 ]),
601 );
602
603 if halt {
604 info!("Task {} halted workflow {}", task_id, workflow_id);
605 return Ok(TaskControlFlow::HaltWorkflow);
606 }
607
608 // Check status code
609 if (400..500).contains(&status) {
610 warn!("Task {} returned client error status: {}", task_id, status);
611 } else if status >= 500 {
612 error!("Task {} returned server error status: {}", task_id, status);
613 // Single-channel contract: surface 5xx outcomes through
614 // `message.errors` as well as the audit trail, so callers
615 // that scan `errors()` see a 5xx-status task even when
616 // the workflow continues past it.
617 message.errors.push(
618 ErrorInfo::builder(
619 "TASK_STATUS_ERROR",
620 format!("Task {} returned status {}", task_id, status),
621 )
622 .workflow_id(workflow_id)
623 .task_id(task_id)
624 .build(),
625 );
626 if !continue_on_error {
627 return Err(DataflowError::Task(format!(
628 "Task {} failed with status {}",
629 task_id, status
630 )));
631 }
632 }
633 Ok(TaskControlFlow::Continue)
634 }
635 Err(e) => {
636 error!("Task {} failed: {:?}", task_id, e);
637
638 // Record error in audit trail (Arc clones are refcount bumps).
639 message.audit_trail.push(AuditTrail {
640 timestamp: now,
641 workflow_id: Arc::clone(workflow_id_arc),
642 task_id: Arc::clone(task_id_arc),
643 status: 500,
644 changes: vec![],
645 });
646
647 // Add error to message
648 message.errors.push(
649 ErrorInfo::builder("TASK_ERROR", format!("Task {} error: {}", task_id, e))
650 .workflow_id(workflow_id)
651 .task_id(task_id)
652 .build(),
653 );
654
655 if !continue_on_error {
656 Err(e)
657 } else {
658 Ok(TaskControlFlow::Continue)
659 }
660 }
661 }
662 }
663}
664
665#[cfg(test)]
666mod tests {
667 use super::*;
668 use crate::engine::compiler::LogicCompiler;
669 use serde_json::json;
670 use std::collections::HashMap;
671
672 #[tokio::test]
673 async fn test_workflow_executor_skip_condition() {
674 // Create a workflow with a false condition
675 let workflow_json = r#"{
676 "id": "test_workflow",
677 "name": "Test Workflow",
678 "condition": false,
679 "tasks": [{
680 "id": "dummy_task",
681 "name": "Dummy Task",
682 "function": {
683 "name": "map",
684 "input": {"mappings": []}
685 }
686 }]
687 }"#;
688
689 let compiler = LogicCompiler::new();
690 let mut workflow = Workflow::from_json(workflow_json).unwrap();
691
692 // Compile the workflow condition
693 let workflows = compiler.compile_workflows(vec![workflow.clone()]).unwrap();
694 if let Some(compiled_workflow) = workflows.iter().find(|w| w.id == "test_workflow") {
695 workflow = compiled_workflow.clone();
696 }
697
698 let engine = compiler.into_engine();
699 let task_executor = Arc::new(TaskExecutor::new(
700 Arc::new(HashMap::new()),
701 Arc::clone(&engine),
702 ));
703 let workflow_executor = WorkflowExecutor::new(task_executor, engine);
704
705 let mut message = Message::from_value(&json!({}));
706
707 // Execute workflow - should be skipped due to false condition
708 let executed = workflow_executor
709 .execute(&workflow, &mut message, Utc::now())
710 .await
711 .unwrap();
712 assert!(!executed);
713 assert_eq!(message.audit_trail.len(), 0);
714 }
715
716 #[tokio::test]
717 async fn test_workflow_executor_execute_success() {
718 // Create a workflow with a true condition
719 let workflow_json = r#"{
720 "id": "test_workflow",
721 "name": "Test Workflow",
722 "condition": true,
723 "tasks": [{
724 "id": "dummy_task",
725 "name": "Dummy Task",
726 "function": {
727 "name": "map",
728 "input": {"mappings": []}
729 }
730 }]
731 }"#;
732
733 let compiler = LogicCompiler::new();
734 let mut workflow = Workflow::from_json(workflow_json).unwrap();
735
736 // Compile the workflow
737 let workflows = compiler.compile_workflows(vec![workflow.clone()]).unwrap();
738 if let Some(compiled_workflow) = workflows.iter().find(|w| w.id == "test_workflow") {
739 workflow = compiled_workflow.clone();
740 }
741
742 let engine = compiler.into_engine();
743 let task_executor = Arc::new(TaskExecutor::new(
744 Arc::new(HashMap::new()),
745 Arc::clone(&engine),
746 ));
747 let workflow_executor = WorkflowExecutor::new(task_executor, engine);
748
749 let mut message = Message::from_value(&json!({}));
750
751 // Execute workflow - should succeed with empty task list
752 let executed = workflow_executor
753 .execute(&workflow, &mut message, Utc::now())
754 .await
755 .unwrap();
756 assert!(executed);
757 }
758}