Skip to main content

dataflow_rs/engine/
workflow_executor.rs

1//! # Workflow Execution Module
2//!
3//! This module handles the execution of workflows and their associated tasks.
4//! It provides a clean separation between workflow orchestration and task execution.
5
6use crate::engine::error::{DataflowError, ErrorInfo, Result};
7use crate::engine::executor::{
8    ArenaContext, evaluate_condition, evaluate_condition_in_arena, with_arena,
9};
10use crate::engine::functions::BoxedFunctionHandler;
11use crate::engine::message::{AuditTrail, Change, Message};
12use crate::engine::task::Task;
13use crate::engine::task_executor::TaskExecutor;
14use crate::engine::task_outcome::TaskOutcome;
15use crate::engine::trace::{ExecutionStep, ExecutionTrace};
16use crate::engine::utils::set_nested_value;
17use crate::engine::workflow::Workflow;
18use chrono::{DateTime, Utc};
19use datalogic_rs::Engine;
20use datavalue::OwnedDataValue;
21use log::{debug, error, info, warn};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::sync::Arc;
25
26/// Result of handling a task, including possible control flow signals
27enum TaskControlFlow {
28    /// Continue executing the next task
29    Continue,
30    /// Stop executing further tasks in this workflow (filter halt)
31    HaltWorkflow,
32}
33
34/// Return the index of the first task at or after `start` that is *not* a
35/// synchronous built-in. Used to chunk `workflow.tasks` into sync-only
36/// stretches that can share a single `ArenaContext`.
37fn next_async_boundary(tasks: &[Task], start: usize) -> usize {
38    let mut i = start;
39    while i < tasks.len() && tasks[i].function.is_sync_builtin() {
40        i += 1;
41    }
42    i
43}
44
45/// Handles the execution of workflows and their tasks
46///
47/// The `WorkflowExecutor` is responsible for:
48/// - Evaluating workflow conditions
49/// - Orchestrating task execution within workflows
50/// - Managing workflow-level error handling
51/// - Recording audit trails
52pub struct WorkflowExecutor {
53    /// Task executor for executing individual tasks
54    task_executor: Arc<TaskExecutor>,
55    /// Shared datalogic engine for condition evaluation
56    engine: Arc<Engine>,
57}
58
59impl WorkflowExecutor {
60    /// Create a new WorkflowExecutor
61    pub fn new(task_executor: Arc<TaskExecutor>, engine: Arc<Engine>) -> Self {
62        Self {
63            task_executor,
64            engine,
65        }
66    }
67
68    /// Get a clone of the task_functions Arc for reuse in new engines
69    pub fn task_functions(&self) -> Arc<HashMap<String, BoxedFunctionHandler>> {
70        self.task_executor.task_functions()
71    }
72
73    /// Execute a workflow if its condition is met
74    ///
75    /// This method:
76    /// 1. Evaluates the workflow condition
77    /// 2. Executes tasks sequentially if condition is met
78    /// 3. Handles error recovery based on workflow configuration
79    /// 4. Updates message metadata and audit trail
80    ///
81    /// # Arguments
82    /// * `workflow` - The workflow to execute
83    /// * `message` - The message being processed
84    ///
85    /// # Returns
86    /// * `Result<bool>` - Ok(true) if workflow was executed, Ok(false) if skipped, Err on failure
87    pub async fn execute(
88        &self,
89        workflow: &Workflow,
90        message: &mut Message,
91        now: DateTime<Utc>,
92    ) -> Result<bool> {
93        self.execute_inner(workflow, message, None, now).await
94    }
95
96    /// Execute a workflow with step-by-step tracing
97    ///
98    /// Similar to `execute` but records execution steps for debugging.
99    pub async fn execute_with_trace(
100        &self,
101        workflow: &Workflow,
102        message: &mut Message,
103        trace: &mut ExecutionTrace,
104        now: DateTime<Utc>,
105    ) -> Result<bool> {
106        self.execute_inner(workflow, message, Some(trace), now)
107            .await
108    }
109
110    /// Unified workflow-condition + task-loop driver. `trace` is `None` for
111    /// the production path and `Some(&mut trace)` for the debug path —
112    /// stepping is the only behavioural difference between them.
113    async fn execute_inner(
114        &self,
115        workflow: &Workflow,
116        message: &mut Message,
117        mut trace: Option<&mut ExecutionTrace>,
118        now: DateTime<Utc>,
119    ) -> Result<bool> {
120        // Evaluate workflow condition directly against the OwnedDataValue context
121        let should_execute = evaluate_condition(
122            &self.engine,
123            workflow.compiled_condition.as_ref(),
124            &message.context,
125        )?;
126
127        if !should_execute {
128            debug!("Skipping workflow {} - condition not met", workflow.id);
129            if let Some(t) = trace.as_deref_mut() {
130                t.add_step(ExecutionStep::workflow_skipped(&workflow.id));
131            }
132            return Ok(false);
133        }
134
135        // Execute workflow tasks (trace recording happens inside the loop)
136        match self.execute_tasks(workflow, message, trace, now).await {
137            Ok(_) => {
138                info!("Successfully completed workflow: {}", workflow.id);
139                Ok(true)
140            }
141            Err(e) => {
142                // Single-channel contract: every error appears in
143                // `message.errors`. The `Result::Err` return only signals to
144                // the caller that we stopped before processing further
145                // workflows. The workflow-level wrapper records workflow
146                // context that the underlying task error doesn't carry.
147                message.errors.push(
148                    ErrorInfo::builder(
149                        "WORKFLOW_ERROR",
150                        format!("Workflow {} error: {}", workflow.id, e),
151                    )
152                    .workflow_id(&workflow.id)
153                    .build(),
154                );
155
156                if workflow.continue_on_error {
157                    warn!(
158                        "Workflow {} encountered error but continuing: {:?}",
159                        workflow.id, e
160                    );
161                    Ok(true)
162                } else {
163                    error!("Workflow {} failed: {:?}", workflow.id, e);
164                    Err(e)
165                }
166            }
167        }
168    }
169
170    /// Execute all tasks in a workflow.
171    ///
172    /// Groups consecutive synchronous built-in tasks into a single
173    /// `with_arena` scope so the arena form of `message.context` is built
174    /// once at the start of the stretch and reused across `parse_json`,
175    /// `map`, `validation`, `log`, and `filter`. Async tasks (HTTP, Kafka,
176    /// custom handlers) break the stretch — the arena flushes any pending
177    /// state back to `OwnedDataValue` automatically (since each sync task
178    /// already mutates `message.context` in place) and the next stretch
179    /// rebuilds the arena form.
180    ///
181    /// When `trace` is `Some`, the loop also records `ExecutionStep` entries
182    /// after each task (skipped/executed) including per-mapping snapshots
183    /// for `Map` tasks.
184    async fn execute_tasks(
185        &self,
186        workflow: &Workflow,
187        message: &mut Message,
188        mut trace: Option<&mut ExecutionTrace>,
189        now: DateTime<Utc>,
190    ) -> Result<()> {
191        let tasks = &workflow.tasks;
192        let mut idx = 0;
193        while idx < tasks.len() {
194            let stretch_end = next_async_boundary(tasks, idx);
195
196            if stretch_end > idx {
197                // Run [idx, stretch_end) as a sync stretch inside one arena.
198                let halt = self.run_sync_stretch(
199                    &tasks[idx..stretch_end],
200                    workflow,
201                    message,
202                    trace.as_deref_mut(),
203                    now,
204                )?;
205                if halt {
206                    return Ok(());
207                }
208                idx = stretch_end;
209            }
210
211            if idx < tasks.len() {
212                // Single async task (or non-sync-builtin) at `idx`.
213                let task = &tasks[idx];
214                let should_execute = evaluate_condition(
215                    &self.engine,
216                    task.compiled_condition.as_ref(),
217                    &message.context,
218                )?;
219
220                if !should_execute {
221                    debug!("Skipping task {} - condition not met", task.id);
222                    if let Some(t) = trace.as_deref_mut() {
223                        t.add_step(ExecutionStep::task_skipped(&workflow.id, &task.id));
224                    }
225                    idx += 1;
226                    continue;
227                }
228
229                let result = self.task_executor.execute(task, message).await;
230                let control_flow = self.handle_task_result(
231                    result,
232                    &workflow.id_arc,
233                    &task.id_arc,
234                    task.continue_on_error,
235                    message,
236                    now,
237                )?;
238
239                // Async tasks at the boundary have no per-mapping snapshots —
240                // they're either HTTP/Kafka/Enrich or a custom handler.
241                if let Some(t) = trace.as_deref_mut() {
242                    t.add_step(ExecutionStep::executed(&workflow.id, &task.id, message));
243                }
244
245                if matches!(control_flow, TaskControlFlow::HaltWorkflow) {
246                    return Ok(());
247                }
248                idx += 1;
249            }
250        }
251
252        Ok(())
253    }
254
255    /// Execute a contiguous run of sync-builtin tasks inside one
256    /// `with_arena` scope. The arena context is built once at the start and
257    /// refreshed in place after each mutating task. Returns `Ok(true)` if a
258    /// filter task halted the workflow.
259    ///
260    /// This is the single-workflow entry; the cross-workflow path
261    /// (`execute_sync_workflow_run`) shares the same task loop via
262    /// `run_tasks_slice_in_arena` but carries one `ArenaContext` across several
263    /// workflows.
264    fn run_sync_stretch(
265        &self,
266        tasks: &[Task],
267        workflow: &Workflow,
268        message: &mut Message,
269        trace: Option<&mut ExecutionTrace>,
270        now: DateTime<Utc>,
271    ) -> Result<bool> {
272        with_arena(|arena| -> Result<bool> {
273            let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
274            self.run_tasks_slice_in_arena(tasks, workflow, message, &mut arena_ctx, trace, now)
275        })
276    }
277
278    /// Run `tasks` against an already-built `ArenaContext`, evaluating each
279    /// task's condition in-arena and refreshing the cache after each mutating
280    /// task. Returns `Ok(true)` if a filter task halted the workflow.
281    ///
282    /// Factored out of `run_sync_stretch` so both the single-workflow stretch
283    /// and the cross-workflow shared-arena run (`execute_sync_workflow_run`)
284    /// share one implementation. The caller owns the `ArenaContext` lifetime,
285    /// so the cross-workflow path can reuse the same arena form of
286    /// `message.context` across consecutive workflows instead of rebuilding it.
287    fn run_tasks_slice_in_arena(
288        &self,
289        tasks: &[Task],
290        workflow: &Workflow,
291        message: &mut Message,
292        arena_ctx: &mut ArenaContext<'_>,
293        mut trace: Option<&mut ExecutionTrace>,
294        now: DateTime<Utc>,
295    ) -> Result<bool> {
296        let arena = arena_ctx.arena();
297
298        for task in tasks {
299            // Task condition — evaluate against the arena form so we don't
300            // re-borrow the thread-local `RefCell`. A `None` compiled
301            // condition (compiler folds the default literal `true` to
302            // `None`) skips both the eval and the per-task arena context
303            // slice build.
304            let should_execute = match task.compiled_condition.as_ref() {
305                None => true,
306                Some(compiled) => evaluate_condition_in_arena(
307                    &self.engine,
308                    Some(compiled),
309                    arena_ctx.as_data_value(),
310                    arena,
311                )?,
312            };
313
314            if !should_execute {
315                debug!("Skipping task {} - condition not met", task.id);
316                if let Some(t) = trace.as_deref_mut() {
317                    t.add_step(ExecutionStep::task_skipped(&workflow.id, &task.id));
318                }
319                continue;
320            }
321
322            // Per-task snapshot buffer — only used for Map tasks in trace
323            // mode. Allocating an empty Vec is cheap and the buffer stays
324            // empty for non-Map tasks.
325            let mut mapping_snapshots: Vec<Value> = Vec::new();
326            let snapshot_buf = if trace.is_some() {
327                Some(&mut mapping_snapshots)
328            } else {
329                None
330            };
331            let result = self.execute_sync_task_in_arena(task, message, arena_ctx, snapshot_buf);
332
333            let control_flow = self.handle_task_result(
334                result,
335                &workflow.id_arc,
336                &task.id_arc,
337                task.continue_on_error,
338                message,
339                now,
340            )?;
341
342            // The audit-trail / progress-metadata writes performed by
343            // `handle_task_result` mutate `message.context`. Refresh the
344            // arena cache so the next task — and, in the cross-workflow path,
345            // the next workflow's condition — sees them.
346            arena_ctx.refresh_for_path(&message.context, "metadata");
347
348            if let Some(t) = trace.as_deref_mut() {
349                let mut step = ExecutionStep::executed(&workflow.id, &task.id, message);
350                if !mapping_snapshots.is_empty() {
351                    step = step.with_mapping_contexts(mapping_snapshots);
352                }
353                t.add_step(step);
354            }
355
356            if matches!(control_flow, TaskControlFlow::HaltWorkflow) {
357                return Ok(true);
358            }
359        }
360        Ok(false)
361    }
362
363    /// Drive a message through `workflows` in order, grouping maximal runs of
364    /// consecutive `fully_sync` workflows into a single shared-arena scope
365    /// (`execute_sync_workflow_run`) and falling back to the per-workflow
366    /// `.await` path (`execute_inner`) for any workflow containing an async
367    /// task. This is the single orchestration entry for all four
368    /// `Engine::process_message*` variants.
369    pub async fn run_all(
370        &self,
371        workflows: &[&Workflow],
372        message: &mut Message,
373        mut trace: Option<&mut ExecutionTrace>,
374        now: DateTime<Utc>,
375    ) -> Result<()> {
376        let mut i = 0;
377        while i < workflows.len() {
378            if workflows[i].fully_sync {
379                // Extend over the maximal run of consecutive fully-sync
380                // workflows and execute them in one shared arena scope.
381                let mut j = i + 1;
382                while j < workflows.len() && workflows[j].fully_sync {
383                    j += 1;
384                }
385                self.execute_sync_workflow_run(
386                    &workflows[i..j],
387                    message,
388                    trace.as_deref_mut(),
389                    now,
390                )?;
391                i = j;
392            } else {
393                // Mixed sync+async (or fully-async) workflow: the existing
394                // driver interleaves per-stretch arenas with `.await`.
395                self.execute_inner(workflows[i], message, trace.as_deref_mut(), now)
396                    .await?;
397                i += 1;
398            }
399        }
400        Ok(())
401    }
402
403    /// Execute a maximal run of consecutive fully-sync workflows inside ONE
404    /// shared `with_arena` scope. The message context is deep-walked into the
405    /// arena once for the whole run, then carried — with the existing
406    /// incremental `refresh_for_path` after each mutating task — across
407    /// workflow boundaries, instead of being rebuilt per workflow.
408    ///
409    /// Per-workflow semantics are preserved exactly: each workflow's condition
410    /// is evaluated (in-arena), a false condition skips only that workflow, a
411    /// filter-halt stops only that workflow, and task errors are wrapped with
412    /// the workflow id and honor `continue_on_error` (continue, or propagate
413    /// `Err` out of the run to stop the whole message) — mirroring
414    /// `execute_inner`.
415    ///
416    /// **Tokio safety:** this method is synchronous and the `fully_sync`
417    /// precondition guarantees every task is a sync built-in, so no `.await`
418    /// occurs while the `!Send` arena borrow is live. The borrow checker
419    /// enforces this — the shared `ArenaContext` cannot escape the closure.
420    fn execute_sync_workflow_run(
421        &self,
422        workflows: &[&Workflow],
423        message: &mut Message,
424        mut trace: Option<&mut ExecutionTrace>,
425        now: DateTime<Utc>,
426    ) -> Result<()> {
427        with_arena(|arena| -> Result<()> {
428            let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
429
430            for &workflow in workflows {
431                // Workflow condition in-arena: a folded `None` skips the eval;
432                // a real condition reuses the carried context instead of the
433                // owned-path `eval_to_owned` deep-walk.
434                let should_execute = match workflow.compiled_condition.as_ref() {
435                    None => true,
436                    Some(compiled) => evaluate_condition_in_arena(
437                        &self.engine,
438                        Some(compiled),
439                        arena_ctx.as_data_value(),
440                        arena,
441                    )?,
442                };
443
444                if !should_execute {
445                    debug!("Skipping workflow {} - condition not met", workflow.id);
446                    if let Some(t) = trace.as_deref_mut() {
447                        t.add_step(ExecutionStep::workflow_skipped(&workflow.id));
448                    }
449                    continue;
450                }
451
452                match self.run_tasks_slice_in_arena(
453                    &workflow.tasks,
454                    workflow,
455                    message,
456                    &mut arena_ctx,
457                    trace.as_deref_mut(),
458                    now,
459                ) {
460                    // Filter-halt stops only this workflow; carry on with the
461                    // next one (and keep the shared arena context).
462                    Ok(_halted) => {
463                        info!("Successfully completed workflow: {}", workflow.id);
464                    }
465                    Err(e) => {
466                        // Single-channel contract — mirror `execute_inner`:
467                        // record the workflow-context error to `message.errors`
468                        // and honor `continue_on_error`.
469                        message.errors.push(
470                            ErrorInfo::builder(
471                                "WORKFLOW_ERROR",
472                                format!("Workflow {} error: {}", workflow.id, e),
473                            )
474                            .workflow_id(&workflow.id)
475                            .build(),
476                        );
477
478                        if workflow.continue_on_error {
479                            warn!(
480                                "Workflow {} encountered error but continuing: {:?}",
481                                workflow.id, e
482                            );
483                        } else {
484                            error!("Workflow {} failed: {:?}", workflow.id, e);
485                            return Err(e);
486                        }
487                    }
488                }
489            }
490            Ok(())
491        })
492    }
493
494    /// Dispatch a single sync-builtin task via the consolidated
495    /// `FunctionConfig::try_execute_in_arena`. `next_async_boundary` guarantees
496    /// the stretch contents are sync built-ins, so the `None` arm is
497    /// unreachable in practice.
498    ///
499    /// `map_snapshot_buf` is only consulted by the `Map` variant; non-Map
500    /// sync builtins ignore it. Pass `None` from the production path.
501    fn execute_sync_task_in_arena(
502        &self,
503        task: &Task,
504        message: &mut Message,
505        arena_ctx: &mut ArenaContext<'_>,
506        map_snapshot_buf: Option<&mut Vec<Value>>,
507    ) -> Result<(TaskOutcome, Vec<Change>)> {
508        debug!(
509            "Executing sync task in arena: {} ({})",
510            task.id,
511            task.function.function_name()
512        );
513        debug_assert!(
514            task.function.is_sync_builtin(),
515            "execute_sync_task_in_arena called with non-sync-builtin task: {}",
516            task.function.function_name()
517        );
518        // In debug builds the assert above catches mis-dispatch; in release
519        // we still surface the invariant violation as a recoverable engine
520        // error rather than panicking via `unreachable!`.
521        task.function
522            .try_execute_in_arena(message, arena_ctx, &self.engine, map_snapshot_buf)
523            .ok_or_else(|| {
524                DataflowError::Task(format!(
525                    "execute_sync_task_in_arena dispatched to non-sync-builtin task '{}' \
526                     (engine bug — sync-stretch should only contain sync-builtin tasks)",
527                    task.function.function_name()
528                ))
529            })?
530    }
531
532    /// Handle the result of a task execution.
533    ///
534    /// `workflow_id_arc` and `task_id_arc` are the compile-time cached
535    /// `Arc<str>` mirrors of `workflow.id` / `task.id`; we Arc-clone them into
536    /// each `AuditTrail` rather than reallocating from the `&str` form.
537    fn handle_task_result(
538        &self,
539        result: Result<(TaskOutcome, Vec<Change>)>,
540        workflow_id_arc: &Arc<str>,
541        task_id_arc: &Arc<str>,
542        continue_on_error: bool,
543        message: &mut Message,
544        now: DateTime<Utc>,
545    ) -> Result<TaskControlFlow> {
546        let workflow_id: &str = workflow_id_arc;
547        let task_id: &str = task_id_arc;
548        match result {
549            Ok((TaskOutcome::Skip, _)) => {
550                // No audit trail, no progress write — task has explicitly opted
551                // out (filter gate set to `Skip`).
552                debug!("Task {} signaled skip", task_id);
553                Ok(TaskControlFlow::Continue)
554            }
555            Ok((outcome, changes)) => {
556                // `Skip` already returned above; the remaining variants all
557                // record an audit entry. `audit_status()` is `Some` for
558                // Success/Status/Halt — expect is for documentation only.
559                let status = outcome
560                    .audit_status()
561                    .expect("Skip handled above; remaining variants emit audit status");
562                let halt = outcome.halts_workflow();
563
564                // Record audit trail. workflow_id_arc/task_id_arc are populated
565                // by LogicCompiler at engine construction; cloning them is a
566                // refcount bump, not a string copy. `now` is shared with all
567                // other AuditTrails in this process_message call.
568                message.audit_trail.push(AuditTrail {
569                    timestamp: now,
570                    workflow_id: Arc::clone(workflow_id_arc),
571                    task_id: Arc::clone(task_id_arc),
572                    status: status as usize,
573                    changes,
574                });
575
576                // Update progress metadata for workflow chaining. Always
577                // emitted: when multiple workflows are registered in the same
578                // engine, downstream workflows route on
579                // `metadata.progress.{workflow_id,task_id,status_code}` to
580                // advance through linear sequences. One batched write
581                // (single tree walk + single Object alloc) benchmarked ~3%
582                // faster than three separate writes on the realistic
583                // workload — replacing a slot beats find-and-update walks.
584                set_nested_value(
585                    &mut message.context,
586                    "metadata.progress",
587                    OwnedDataValue::Object(vec![
588                        (
589                            "workflow_id".to_string(),
590                            OwnedDataValue::String(workflow_id.to_string()),
591                        ),
592                        (
593                            "task_id".to_string(),
594                            OwnedDataValue::String(task_id.to_string()),
595                        ),
596                        (
597                            "status_code".to_string(),
598                            OwnedDataValue::from(status as u64),
599                        ),
600                    ]),
601                );
602
603                if halt {
604                    info!("Task {} halted workflow {}", task_id, workflow_id);
605                    return Ok(TaskControlFlow::HaltWorkflow);
606                }
607
608                // Check status code
609                if (400..500).contains(&status) {
610                    warn!("Task {} returned client error status: {}", task_id, status);
611                } else if status >= 500 {
612                    error!("Task {} returned server error status: {}", task_id, status);
613                    // Single-channel contract: surface 5xx outcomes through
614                    // `message.errors` as well as the audit trail, so callers
615                    // that scan `errors()` see a 5xx-status task even when
616                    // the workflow continues past it.
617                    message.errors.push(
618                        ErrorInfo::builder(
619                            "TASK_STATUS_ERROR",
620                            format!("Task {} returned status {}", task_id, status),
621                        )
622                        .workflow_id(workflow_id)
623                        .task_id(task_id)
624                        .build(),
625                    );
626                    if !continue_on_error {
627                        return Err(DataflowError::Task(format!(
628                            "Task {} failed with status {}",
629                            task_id, status
630                        )));
631                    }
632                }
633                Ok(TaskControlFlow::Continue)
634            }
635            Err(e) => {
636                error!("Task {} failed: {:?}", task_id, e);
637
638                // Record error in audit trail (Arc clones are refcount bumps).
639                message.audit_trail.push(AuditTrail {
640                    timestamp: now,
641                    workflow_id: Arc::clone(workflow_id_arc),
642                    task_id: Arc::clone(task_id_arc),
643                    status: 500,
644                    changes: vec![],
645                });
646
647                // Add error to message
648                message.errors.push(
649                    ErrorInfo::builder("TASK_ERROR", format!("Task {} error: {}", task_id, e))
650                        .workflow_id(workflow_id)
651                        .task_id(task_id)
652                        .build(),
653                );
654
655                if !continue_on_error {
656                    Err(e)
657                } else {
658                    Ok(TaskControlFlow::Continue)
659                }
660            }
661        }
662    }
663}
664
665#[cfg(test)]
666mod tests {
667    use super::*;
668    use crate::engine::compiler::LogicCompiler;
669    use serde_json::json;
670    use std::collections::HashMap;
671
672    #[tokio::test]
673    async fn test_workflow_executor_skip_condition() {
674        // Create a workflow with a false condition
675        let workflow_json = r#"{
676            "id": "test_workflow",
677            "name": "Test Workflow",
678            "condition": false,
679            "tasks": [{
680                "id": "dummy_task",
681                "name": "Dummy Task",
682                "function": {
683                    "name": "map",
684                    "input": {"mappings": []}
685                }
686            }]
687        }"#;
688
689        let compiler = LogicCompiler::new();
690        let mut workflow = Workflow::from_json(workflow_json).unwrap();
691
692        // Compile the workflow condition
693        let workflows = compiler.compile_workflows(vec![workflow.clone()]).unwrap();
694        if let Some(compiled_workflow) = workflows.iter().find(|w| w.id == "test_workflow") {
695            workflow = compiled_workflow.clone();
696        }
697
698        let engine = compiler.into_engine();
699        let task_executor = Arc::new(TaskExecutor::new(
700            Arc::new(HashMap::new()),
701            Arc::clone(&engine),
702        ));
703        let workflow_executor = WorkflowExecutor::new(task_executor, engine);
704
705        let mut message = Message::from_value(&json!({}));
706
707        // Execute workflow - should be skipped due to false condition
708        let executed = workflow_executor
709            .execute(&workflow, &mut message, Utc::now())
710            .await
711            .unwrap();
712        assert!(!executed);
713        assert_eq!(message.audit_trail.len(), 0);
714    }
715
716    #[tokio::test]
717    async fn test_workflow_executor_execute_success() {
718        // Create a workflow with a true condition
719        let workflow_json = r#"{
720            "id": "test_workflow",
721            "name": "Test Workflow",
722            "condition": true,
723            "tasks": [{
724                "id": "dummy_task",
725                "name": "Dummy Task",
726                "function": {
727                    "name": "map",
728                    "input": {"mappings": []}
729                }
730            }]
731        }"#;
732
733        let compiler = LogicCompiler::new();
734        let mut workflow = Workflow::from_json(workflow_json).unwrap();
735
736        // Compile the workflow
737        let workflows = compiler.compile_workflows(vec![workflow.clone()]).unwrap();
738        if let Some(compiled_workflow) = workflows.iter().find(|w| w.id == "test_workflow") {
739            workflow = compiled_workflow.clone();
740        }
741
742        let engine = compiler.into_engine();
743        let task_executor = Arc::new(TaskExecutor::new(
744            Arc::new(HashMap::new()),
745            Arc::clone(&engine),
746        ));
747        let workflow_executor = WorkflowExecutor::new(task_executor, engine);
748
749        let mut message = Message::from_value(&json!({}));
750
751        // Execute workflow - should succeed with empty task list
752        let executed = workflow_executor
753            .execute(&workflow, &mut message, Utc::now())
754            .await
755            .unwrap();
756        assert!(executed);
757    }
758}