Skip to main content

dataflow_rs/engine/
workflow_executor.rs

1//! # Workflow Execution Module
2//!
3//! This module handles the execution of workflows and their associated tasks.
4//! It provides a clean separation between workflow orchestration and task execution.
5
6use crate::engine::error::{DataflowError, ErrorInfo, Result};
7use crate::engine::executor::{
8    ArenaContext, evaluate_condition, evaluate_condition_in_arena, with_arena,
9};
10use crate::engine::functions::BoxedFunctionHandler;
11use crate::engine::message::{AuditTrail, Change, Message};
12use crate::engine::task::Task;
13use crate::engine::task_executor::TaskExecutor;
14use crate::engine::task_outcome::TaskOutcome;
15use crate::engine::trace::{ExecutionStep, ExecutionTrace};
16use crate::engine::utils::set_nested_value;
17use crate::engine::workflow::Workflow;
18use chrono::{DateTime, Utc};
19use datalogic_rs::Engine;
20use datavalue::OwnedDataValue;
21use log::{debug, error, info, warn};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::sync::Arc;
25
26/// Result of handling a task, including possible control flow signals
27enum TaskControlFlow {
28    /// Continue executing the next task
29    Continue,
30    /// Stop executing further tasks in this workflow (filter halt)
31    HaltWorkflow,
32}
33
34/// Return the index of the first task at or after `start` that is *not* a
35/// synchronous built-in. Used to chunk `workflow.tasks` into sync-only
36/// stretches that can share a single `ArenaContext`.
37fn next_async_boundary(tasks: &[Task], start: usize) -> usize {
38    let mut i = start;
39    while i < tasks.len() && tasks[i].function.is_sync_builtin() {
40        i += 1;
41    }
42    i
43}
44
45/// Handles the execution of workflows and their tasks
46///
47/// The `WorkflowExecutor` is responsible for:
48/// - Evaluating workflow conditions
49/// - Orchestrating task execution within workflows
50/// - Managing workflow-level error handling
51/// - Recording audit trails
52pub struct WorkflowExecutor {
53    /// Task executor for executing individual tasks
54    task_executor: Arc<TaskExecutor>,
55    /// Shared datalogic engine for condition evaluation
56    engine: Arc<Engine>,
57}
58
59impl WorkflowExecutor {
60    /// Create a new WorkflowExecutor
61    pub fn new(task_executor: Arc<TaskExecutor>, engine: Arc<Engine>) -> Self {
62        Self {
63            task_executor,
64            engine,
65        }
66    }
67
68    /// Get a clone of the task_functions Arc for reuse in new engines
69    pub fn task_functions(&self) -> Arc<HashMap<String, BoxedFunctionHandler>> {
70        self.task_executor.task_functions()
71    }
72
73    /// Execute a workflow if its condition is met
74    ///
75    /// This method:
76    /// 1. Evaluates the workflow condition
77    /// 2. Executes tasks sequentially if condition is met
78    /// 3. Handles error recovery based on workflow configuration
79    /// 4. Updates message metadata and audit trail
80    ///
81    /// # Arguments
82    /// * `workflow` - The workflow to execute
83    /// * `message` - The message being processed
84    ///
85    /// # Returns
86    /// * `Result<bool>` - Ok(true) if workflow was executed, Ok(false) if skipped, Err on failure
87    pub async fn execute(
88        &self,
89        workflow: &Workflow,
90        message: &mut Message,
91        now: DateTime<Utc>,
92    ) -> Result<bool> {
93        self.execute_inner(workflow, message, None, now).await
94    }
95
96    /// Execute a workflow with step-by-step tracing
97    ///
98    /// Similar to `execute` but records execution steps for debugging.
99    pub async fn execute_with_trace(
100        &self,
101        workflow: &Workflow,
102        message: &mut Message,
103        trace: &mut ExecutionTrace,
104        now: DateTime<Utc>,
105    ) -> Result<bool> {
106        self.execute_inner(workflow, message, Some(trace), now)
107            .await
108    }
109
110    /// Unified workflow-condition + task-loop driver. `trace` is `None` for
111    /// the production path and `Some(&mut trace)` for the debug path —
112    /// stepping is the only behavioural difference between them.
113    async fn execute_inner(
114        &self,
115        workflow: &Workflow,
116        message: &mut Message,
117        mut trace: Option<&mut ExecutionTrace>,
118        now: DateTime<Utc>,
119    ) -> Result<bool> {
120        // Evaluate workflow condition directly against the OwnedDataValue context
121        let should_execute = evaluate_condition(
122            &self.engine,
123            workflow.compiled_condition.as_ref(),
124            &message.context,
125        )?;
126
127        if !should_execute {
128            debug!("Skipping workflow {} - condition not met", workflow.id);
129            if let Some(t) = trace.as_deref_mut() {
130                t.add_step(ExecutionStep::workflow_skipped(&workflow.id));
131            }
132            return Ok(false);
133        }
134
135        // Execute workflow tasks (trace recording happens inside the loop)
136        match self.execute_tasks(workflow, message, trace, now).await {
137            Ok(_) => {
138                info!("Successfully completed workflow: {}", workflow.id);
139                Ok(true)
140            }
141            Err(e) => {
142                // Single-channel contract: every error appears in
143                // `message.errors`. The `Result::Err` return only signals to
144                // the caller that we stopped before processing further
145                // workflows. The workflow-level wrapper records workflow
146                // context that the underlying task error doesn't carry.
147                message.errors.push(
148                    ErrorInfo::builder(
149                        "WORKFLOW_ERROR",
150                        format!("Workflow {} error: {}", workflow.id, e),
151                    )
152                    .workflow_id(&workflow.id)
153                    .build(),
154                );
155
156                if workflow.continue_on_error {
157                    warn!(
158                        "Workflow {} encountered error but continuing: {:?}",
159                        workflow.id, e
160                    );
161                    Ok(true)
162                } else {
163                    error!("Workflow {} failed: {:?}", workflow.id, e);
164                    Err(e)
165                }
166            }
167        }
168    }
169
170    /// Execute all tasks in a workflow.
171    ///
172    /// Groups consecutive synchronous built-in tasks into a single
173    /// `with_arena` scope so the arena form of `message.context` is built
174    /// once at the start of the stretch and reused across `parse_json`,
175    /// `map`, `validation`, `log`, and `filter`. Async tasks (HTTP, Kafka,
176    /// custom handlers) break the stretch — the arena flushes any pending
177    /// state back to `OwnedDataValue` automatically (since each sync task
178    /// already mutates `message.context` in place) and the next stretch
179    /// rebuilds the arena form.
180    ///
181    /// When `trace` is `Some`, the loop also records `ExecutionStep` entries
182    /// after each task (skipped/executed) including per-mapping snapshots
183    /// for `Map` tasks.
184    async fn execute_tasks(
185        &self,
186        workflow: &Workflow,
187        message: &mut Message,
188        mut trace: Option<&mut ExecutionTrace>,
189        now: DateTime<Utc>,
190    ) -> Result<()> {
191        let tasks = &workflow.tasks;
192        let mut idx = 0;
193        while idx < tasks.len() {
194            let stretch_end = next_async_boundary(tasks, idx);
195
196            if stretch_end > idx {
197                // Run [idx, stretch_end) as a sync stretch inside one arena.
198                let halt = self.run_sync_stretch(
199                    &tasks[idx..stretch_end],
200                    workflow,
201                    message,
202                    trace.as_deref_mut(),
203                    now,
204                )?;
205                if halt {
206                    return Ok(());
207                }
208                idx = stretch_end;
209            }
210
211            if idx < tasks.len() {
212                // Single async task (or non-sync-builtin) at `idx`.
213                let task = &tasks[idx];
214                let should_execute = evaluate_condition(
215                    &self.engine,
216                    task.compiled_condition.as_ref(),
217                    &message.context,
218                )?;
219
220                if !should_execute {
221                    debug!("Skipping task {} - condition not met", task.id);
222                    if let Some(t) = trace.as_deref_mut() {
223                        t.add_step(ExecutionStep::task_skipped(&workflow.id, &task.id));
224                    }
225                    idx += 1;
226                    continue;
227                }
228
229                let result = self.task_executor.execute(task, message).await;
230                let control_flow = self.handle_task_result(
231                    result,
232                    &workflow.id_arc,
233                    &task.id_arc,
234                    task.continue_on_error,
235                    message,
236                    now,
237                )?;
238
239                // Async tasks at the boundary have no per-mapping snapshots —
240                // they're either HTTP/Kafka/Enrich or a custom handler.
241                if let Some(t) = trace.as_deref_mut() {
242                    t.add_step(ExecutionStep::executed(&workflow.id, &task.id, message));
243                }
244
245                if matches!(control_flow, TaskControlFlow::HaltWorkflow) {
246                    return Ok(());
247                }
248                idx += 1;
249            }
250        }
251
252        Ok(())
253    }
254
255    /// Execute a contiguous run of sync-builtin tasks inside one
256    /// `with_arena` scope. The arena context is built once at the start and
257    /// refreshed in place after each mutating task. Returns `Ok(true)` if a
258    /// filter task halted the workflow.
259    fn run_sync_stretch(
260        &self,
261        tasks: &[Task],
262        workflow: &Workflow,
263        message: &mut Message,
264        mut trace: Option<&mut ExecutionTrace>,
265        now: DateTime<Utc>,
266    ) -> Result<bool> {
267        let outcome = with_arena(|arena| -> Result<bool> {
268            let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
269
270            for task in tasks {
271                // Task condition — evaluate against the arena form so we don't
272                // re-borrow the thread-local `RefCell`.
273                let ctx_av = arena_ctx.as_data_value();
274                let should_execute = evaluate_condition_in_arena(
275                    &self.engine,
276                    task.compiled_condition.as_ref(),
277                    ctx_av,
278                    arena,
279                )?;
280
281                if !should_execute {
282                    debug!("Skipping task {} - condition not met", task.id);
283                    if let Some(t) = trace.as_deref_mut() {
284                        t.add_step(ExecutionStep::task_skipped(&workflow.id, &task.id));
285                    }
286                    continue;
287                }
288
289                // Per-task snapshot buffer — only used for Map tasks in trace
290                // mode. Allocating an empty Vec is cheap and the buffer stays
291                // empty for non-Map tasks.
292                let mut mapping_snapshots: Vec<Value> = Vec::new();
293                let snapshot_buf = if trace.is_some() {
294                    Some(&mut mapping_snapshots)
295                } else {
296                    None
297                };
298                let result =
299                    self.execute_sync_task_in_arena(task, message, &mut arena_ctx, snapshot_buf);
300
301                let control_flow = self.handle_task_result(
302                    result,
303                    &workflow.id_arc,
304                    &task.id_arc,
305                    task.continue_on_error,
306                    message,
307                    now,
308                )?;
309
310                // The audit-trail / progress-metadata writes performed by
311                // `handle_task_result` mutate `message.context`. Refresh the
312                // arena cache so the next task in the stretch sees them.
313                arena_ctx.refresh_for_path(&message.context, "metadata");
314
315                if let Some(t) = trace.as_deref_mut() {
316                    let mut step = ExecutionStep::executed(&workflow.id, &task.id, message);
317                    if !mapping_snapshots.is_empty() {
318                        step = step.with_mapping_contexts(mapping_snapshots);
319                    }
320                    t.add_step(step);
321                }
322
323                if matches!(control_flow, TaskControlFlow::HaltWorkflow) {
324                    return Ok(true);
325                }
326            }
327            Ok(false)
328        })?;
329        Ok(outcome)
330    }
331
332    /// Dispatch a single sync-builtin task via the consolidated
333    /// `FunctionConfig::try_execute_in_arena`. `next_async_boundary` guarantees
334    /// the stretch contents are sync built-ins, so the `None` arm is
335    /// unreachable in practice.
336    ///
337    /// `map_snapshot_buf` is only consulted by the `Map` variant; non-Map
338    /// sync builtins ignore it. Pass `None` from the production path.
339    fn execute_sync_task_in_arena(
340        &self,
341        task: &Task,
342        message: &mut Message,
343        arena_ctx: &mut ArenaContext<'_>,
344        map_snapshot_buf: Option<&mut Vec<Value>>,
345    ) -> Result<(TaskOutcome, Vec<Change>)> {
346        debug!(
347            "Executing sync task in arena: {} ({})",
348            task.id,
349            task.function.function_name()
350        );
351        debug_assert!(
352            task.function.is_sync_builtin(),
353            "execute_sync_task_in_arena called with non-sync-builtin task: {}",
354            task.function.function_name()
355        );
356        // In debug builds the assert above catches mis-dispatch; in release
357        // we still surface the invariant violation as a recoverable engine
358        // error rather than panicking via `unreachable!`.
359        task.function
360            .try_execute_in_arena(message, arena_ctx, &self.engine, map_snapshot_buf)
361            .ok_or_else(|| {
362                DataflowError::Task(format!(
363                    "execute_sync_task_in_arena dispatched to non-sync-builtin task '{}' \
364                     (engine bug — sync-stretch should only contain sync-builtin tasks)",
365                    task.function.function_name()
366                ))
367            })?
368    }
369
370    /// Handle the result of a task execution.
371    ///
372    /// `workflow_id_arc` and `task_id_arc` are the compile-time cached
373    /// `Arc<str>` mirrors of `workflow.id` / `task.id`; we Arc-clone them into
374    /// each `AuditTrail` rather than reallocating from the `&str` form.
375    fn handle_task_result(
376        &self,
377        result: Result<(TaskOutcome, Vec<Change>)>,
378        workflow_id_arc: &Arc<str>,
379        task_id_arc: &Arc<str>,
380        continue_on_error: bool,
381        message: &mut Message,
382        now: DateTime<Utc>,
383    ) -> Result<TaskControlFlow> {
384        let workflow_id: &str = workflow_id_arc;
385        let task_id: &str = task_id_arc;
386        match result {
387            Ok((TaskOutcome::Skip, _)) => {
388                // No audit trail, no progress write — task has explicitly opted
389                // out (filter gate set to `Skip`).
390                debug!("Task {} signaled skip", task_id);
391                Ok(TaskControlFlow::Continue)
392            }
393            Ok((outcome, changes)) => {
394                // `Skip` already returned above; the remaining variants all
395                // record an audit entry. `audit_status()` is `Some` for
396                // Success/Status/Halt — expect is for documentation only.
397                let status = outcome
398                    .audit_status()
399                    .expect("Skip handled above; remaining variants emit audit status");
400                let halt = outcome.halts_workflow();
401
402                // Record audit trail. workflow_id_arc/task_id_arc are populated
403                // by LogicCompiler at engine construction; cloning them is a
404                // refcount bump, not a string copy. `now` is shared with all
405                // other AuditTrails in this process_message call.
406                message.audit_trail.push(AuditTrail {
407                    timestamp: now,
408                    workflow_id: Arc::clone(workflow_id_arc),
409                    task_id: Arc::clone(task_id_arc),
410                    status: status as usize,
411                    changes,
412                });
413
414                // Update progress metadata for workflow chaining. Always
415                // emitted: when multiple workflows are registered in the same
416                // engine, downstream workflows route on
417                // `metadata.progress.{workflow_id,task_id,status_code}` to
418                // advance through linear sequences. One batched write
419                // (single tree walk + single Object alloc) benchmarked ~3%
420                // faster than three separate writes on the realistic
421                // workload — replacing a slot beats find-and-update walks.
422                set_nested_value(
423                    &mut message.context,
424                    "metadata.progress",
425                    OwnedDataValue::Object(vec![
426                        (
427                            "workflow_id".to_string(),
428                            OwnedDataValue::String(workflow_id.to_string()),
429                        ),
430                        (
431                            "task_id".to_string(),
432                            OwnedDataValue::String(task_id.to_string()),
433                        ),
434                        (
435                            "status_code".to_string(),
436                            OwnedDataValue::from(status as u64),
437                        ),
438                    ]),
439                );
440
441                if halt {
442                    info!("Task {} halted workflow {}", task_id, workflow_id);
443                    return Ok(TaskControlFlow::HaltWorkflow);
444                }
445
446                // Check status code
447                if (400..500).contains(&status) {
448                    warn!("Task {} returned client error status: {}", task_id, status);
449                } else if status >= 500 {
450                    error!("Task {} returned server error status: {}", task_id, status);
451                    // Single-channel contract: surface 5xx outcomes through
452                    // `message.errors` as well as the audit trail, so callers
453                    // that scan `errors()` see a 5xx-status task even when
454                    // the workflow continues past it.
455                    message.errors.push(
456                        ErrorInfo::builder(
457                            "TASK_STATUS_ERROR",
458                            format!("Task {} returned status {}", task_id, status),
459                        )
460                        .workflow_id(workflow_id)
461                        .task_id(task_id)
462                        .build(),
463                    );
464                    if !continue_on_error {
465                        return Err(DataflowError::Task(format!(
466                            "Task {} failed with status {}",
467                            task_id, status
468                        )));
469                    }
470                }
471                Ok(TaskControlFlow::Continue)
472            }
473            Err(e) => {
474                error!("Task {} failed: {:?}", task_id, e);
475
476                // Record error in audit trail (Arc clones are refcount bumps).
477                message.audit_trail.push(AuditTrail {
478                    timestamp: now,
479                    workflow_id: Arc::clone(workflow_id_arc),
480                    task_id: Arc::clone(task_id_arc),
481                    status: 500,
482                    changes: vec![],
483                });
484
485                // Add error to message
486                message.errors.push(
487                    ErrorInfo::builder("TASK_ERROR", format!("Task {} error: {}", task_id, e))
488                        .workflow_id(workflow_id)
489                        .task_id(task_id)
490                        .build(),
491                );
492
493                if !continue_on_error {
494                    Err(e)
495                } else {
496                    Ok(TaskControlFlow::Continue)
497                }
498            }
499        }
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506    use crate::engine::compiler::LogicCompiler;
507    use serde_json::json;
508    use std::collections::HashMap;
509
510    #[tokio::test]
511    async fn test_workflow_executor_skip_condition() {
512        // Create a workflow with a false condition
513        let workflow_json = r#"{
514            "id": "test_workflow",
515            "name": "Test Workflow",
516            "condition": false,
517            "tasks": [{
518                "id": "dummy_task",
519                "name": "Dummy Task",
520                "function": {
521                    "name": "map",
522                    "input": {"mappings": []}
523                }
524            }]
525        }"#;
526
527        let compiler = LogicCompiler::new();
528        let mut workflow = Workflow::from_json(workflow_json).unwrap();
529
530        // Compile the workflow condition
531        let workflows = compiler.compile_workflows(vec![workflow.clone()]).unwrap();
532        if let Some(compiled_workflow) = workflows.iter().find(|w| w.id == "test_workflow") {
533            workflow = compiled_workflow.clone();
534        }
535
536        let engine = compiler.into_engine();
537        let task_executor = Arc::new(TaskExecutor::new(
538            Arc::new(HashMap::new()),
539            Arc::clone(&engine),
540        ));
541        let workflow_executor = WorkflowExecutor::new(task_executor, engine);
542
543        let mut message = Message::from_value(&json!({}));
544
545        // Execute workflow - should be skipped due to false condition
546        let executed = workflow_executor
547            .execute(&workflow, &mut message, Utc::now())
548            .await
549            .unwrap();
550        assert!(!executed);
551        assert_eq!(message.audit_trail.len(), 0);
552    }
553
554    #[tokio::test]
555    async fn test_workflow_executor_execute_success() {
556        // Create a workflow with a true condition
557        let workflow_json = r#"{
558            "id": "test_workflow",
559            "name": "Test Workflow",
560            "condition": true,
561            "tasks": [{
562                "id": "dummy_task",
563                "name": "Dummy Task",
564                "function": {
565                    "name": "map",
566                    "input": {"mappings": []}
567                }
568            }]
569        }"#;
570
571        let compiler = LogicCompiler::new();
572        let mut workflow = Workflow::from_json(workflow_json).unwrap();
573
574        // Compile the workflow
575        let workflows = compiler.compile_workflows(vec![workflow.clone()]).unwrap();
576        if let Some(compiled_workflow) = workflows.iter().find(|w| w.id == "test_workflow") {
577            workflow = compiled_workflow.clone();
578        }
579
580        let engine = compiler.into_engine();
581        let task_executor = Arc::new(TaskExecutor::new(
582            Arc::new(HashMap::new()),
583            Arc::clone(&engine),
584        ));
585        let workflow_executor = WorkflowExecutor::new(task_executor, engine);
586
587        let mut message = Message::from_value(&json!({}));
588
589        // Execute workflow - should succeed with empty task list
590        let executed = workflow_executor
591            .execute(&workflow, &mut message, Utc::now())
592            .await
593            .unwrap();
594        assert!(executed);
595    }
596}