dataflow_rs/engine/workflow_executor.rs
1//! # Workflow Execution Module
2//!
3//! This module handles the execution of workflows and their associated tasks.
4//! It provides a clean separation between workflow orchestration and task execution.
5
6use crate::engine::error::{DataflowError, ErrorInfo, Result};
7use crate::engine::executor::{
8 ArenaContext, evaluate_condition, evaluate_condition_in_arena, with_arena,
9};
10use crate::engine::functions::BoxedFunctionHandler;
11use crate::engine::message::{AuditTrail, Change, Message};
12use crate::engine::task::Task;
13use crate::engine::task_executor::TaskExecutor;
14use crate::engine::task_outcome::TaskOutcome;
15use crate::engine::trace::{ExecutionStep, ExecutionTrace};
16use crate::engine::utils::set_nested_value;
17use crate::engine::workflow::Workflow;
18use chrono::{DateTime, Utc};
19use datalogic_rs::Engine;
20use datavalue::OwnedDataValue;
21use log::{debug, error, info, warn};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::sync::Arc;
25
26/// Result of handling a task, including possible control flow signals
27enum TaskControlFlow {
28 /// Continue executing the next task
29 Continue,
30 /// Stop executing further tasks in this workflow (filter halt)
31 HaltWorkflow,
32}
33
34/// Return the index of the first task at or after `start` that is *not* a
35/// synchronous built-in. Used to chunk `workflow.tasks` into sync-only
36/// stretches that can share a single `ArenaContext`.
37fn next_async_boundary(tasks: &[Task], start: usize) -> usize {
38 let mut i = start;
39 while i < tasks.len() && tasks[i].function.is_sync_builtin() {
40 i += 1;
41 }
42 i
43}
44
45/// Handles the execution of workflows and their tasks
46///
47/// The `WorkflowExecutor` is responsible for:
48/// - Evaluating workflow conditions
49/// - Orchestrating task execution within workflows
50/// - Managing workflow-level error handling
51/// - Recording audit trails
52pub struct WorkflowExecutor {
53 /// Task executor for executing individual tasks
54 task_executor: Arc<TaskExecutor>,
55 /// Shared datalogic engine for condition evaluation
56 engine: Arc<Engine>,
57}
58
59impl WorkflowExecutor {
60 /// Create a new WorkflowExecutor
61 pub fn new(task_executor: Arc<TaskExecutor>, engine: Arc<Engine>) -> Self {
62 Self {
63 task_executor,
64 engine,
65 }
66 }
67
68 /// Get a clone of the task_functions Arc for reuse in new engines
69 pub fn task_functions(&self) -> Arc<HashMap<String, BoxedFunctionHandler>> {
70 self.task_executor.task_functions()
71 }
72
73 /// Execute a workflow if its condition is met
74 ///
75 /// This method:
76 /// 1. Evaluates the workflow condition
77 /// 2. Executes tasks sequentially if condition is met
78 /// 3. Handles error recovery based on workflow configuration
79 /// 4. Updates message metadata and audit trail
80 ///
81 /// # Arguments
82 /// * `workflow` - The workflow to execute
83 /// * `message` - The message being processed
84 ///
85 /// # Returns
86 /// * `Result<bool>` - Ok(true) if workflow was executed, Ok(false) if skipped, Err on failure
87 pub async fn execute(
88 &self,
89 workflow: &Workflow,
90 message: &mut Message,
91 now: DateTime<Utc>,
92 ) -> Result<bool> {
93 self.execute_inner(workflow, message, None, now).await
94 }
95
96 /// Execute a workflow with step-by-step tracing
97 ///
98 /// Similar to `execute` but records execution steps for debugging.
99 pub async fn execute_with_trace(
100 &self,
101 workflow: &Workflow,
102 message: &mut Message,
103 trace: &mut ExecutionTrace,
104 now: DateTime<Utc>,
105 ) -> Result<bool> {
106 self.execute_inner(workflow, message, Some(trace), now)
107 .await
108 }
109
110 /// Unified workflow-condition + task-loop driver. `trace` is `None` for
111 /// the production path and `Some(&mut trace)` for the debug path —
112 /// stepping is the only behavioural difference between them.
113 async fn execute_inner(
114 &self,
115 workflow: &Workflow,
116 message: &mut Message,
117 mut trace: Option<&mut ExecutionTrace>,
118 now: DateTime<Utc>,
119 ) -> Result<bool> {
120 // Evaluate workflow condition directly against the OwnedDataValue context
121 let should_execute = evaluate_condition(
122 &self.engine,
123 workflow.compiled_condition.as_ref(),
124 &message.context,
125 )?;
126
127 if !should_execute {
128 debug!("Skipping workflow {} - condition not met", workflow.id);
129 if let Some(t) = trace.as_deref_mut() {
130 t.add_step(ExecutionStep::workflow_skipped(&workflow.id));
131 }
132 return Ok(false);
133 }
134
135 // Execute workflow tasks (trace recording happens inside the loop)
136 match self.execute_tasks(workflow, message, trace, now).await {
137 Ok(_) => {
138 info!("Successfully completed workflow: {}", workflow.id);
139 Ok(true)
140 }
141 Err(e) => {
142 // Single-channel contract: every error appears in
143 // `message.errors`. The `Result::Err` return only signals to
144 // the caller that we stopped before processing further
145 // workflows. The workflow-level wrapper records workflow
146 // context that the underlying task error doesn't carry.
147 message.errors.push(
148 ErrorInfo::builder(
149 "WORKFLOW_ERROR",
150 format!("Workflow {} error: {}", workflow.id, e),
151 )
152 .workflow_id(&workflow.id)
153 .build(),
154 );
155
156 if workflow.continue_on_error {
157 warn!(
158 "Workflow {} encountered error but continuing: {:?}",
159 workflow.id, e
160 );
161 Ok(true)
162 } else {
163 error!("Workflow {} failed: {:?}", workflow.id, e);
164 Err(e)
165 }
166 }
167 }
168 }
169
170 /// Execute all tasks in a workflow.
171 ///
172 /// Groups consecutive synchronous built-in tasks into a single
173 /// `with_arena` scope so the arena form of `message.context` is built
174 /// once at the start of the stretch and reused across `parse_json`,
175 /// `map`, `validation`, `log`, and `filter`. Async tasks (HTTP, Kafka,
176 /// custom handlers) break the stretch — the arena flushes any pending
177 /// state back to `OwnedDataValue` automatically (since each sync task
178 /// already mutates `message.context` in place) and the next stretch
179 /// rebuilds the arena form.
180 ///
181 /// When `trace` is `Some`, the loop also records `ExecutionStep` entries
182 /// after each task (skipped/executed) including per-mapping snapshots
183 /// for `Map` tasks.
184 async fn execute_tasks(
185 &self,
186 workflow: &Workflow,
187 message: &mut Message,
188 mut trace: Option<&mut ExecutionTrace>,
189 now: DateTime<Utc>,
190 ) -> Result<()> {
191 let tasks = &workflow.tasks;
192 let mut idx = 0;
193 while idx < tasks.len() {
194 let stretch_end = next_async_boundary(tasks, idx);
195
196 if stretch_end > idx {
197 // Run [idx, stretch_end) as a sync stretch inside one arena.
198 let halt = self.run_sync_stretch(
199 &tasks[idx..stretch_end],
200 workflow,
201 message,
202 trace.as_deref_mut(),
203 now,
204 )?;
205 if halt {
206 return Ok(());
207 }
208 idx = stretch_end;
209 }
210
211 if idx < tasks.len() {
212 // Single async task (or non-sync-builtin) at `idx`.
213 let task = &tasks[idx];
214 let should_execute = evaluate_condition(
215 &self.engine,
216 task.compiled_condition.as_ref(),
217 &message.context,
218 )?;
219
220 if !should_execute {
221 debug!("Skipping task {} - condition not met", task.id);
222 if let Some(t) = trace.as_deref_mut() {
223 t.add_step(ExecutionStep::task_skipped(&workflow.id, &task.id));
224 }
225 idx += 1;
226 continue;
227 }
228
229 let result = self.task_executor.execute(task, message).await;
230 let control_flow = self.handle_task_result(
231 result,
232 &workflow.id_arc,
233 &task.id_arc,
234 task.continue_on_error,
235 message,
236 now,
237 )?;
238
239 // Async tasks at the boundary have no per-mapping snapshots —
240 // they're either HTTP/Kafka/Enrich or a custom handler.
241 if let Some(t) = trace.as_deref_mut() {
242 t.add_step(ExecutionStep::executed(&workflow.id, &task.id, message));
243 }
244
245 if matches!(control_flow, TaskControlFlow::HaltWorkflow) {
246 return Ok(());
247 }
248 idx += 1;
249 }
250 }
251
252 Ok(())
253 }
254
255 /// Execute a contiguous run of sync-builtin tasks inside one
256 /// `with_arena` scope. The arena context is built once at the start and
257 /// refreshed in place after each mutating task. Returns `Ok(true)` if a
258 /// filter task halted the workflow.
259 fn run_sync_stretch(
260 &self,
261 tasks: &[Task],
262 workflow: &Workflow,
263 message: &mut Message,
264 mut trace: Option<&mut ExecutionTrace>,
265 now: DateTime<Utc>,
266 ) -> Result<bool> {
267 let outcome = with_arena(|arena| -> Result<bool> {
268 let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
269
270 for task in tasks {
271 // Task condition — evaluate against the arena form so we don't
272 // re-borrow the thread-local `RefCell`.
273 let ctx_av = arena_ctx.as_data_value();
274 let should_execute = evaluate_condition_in_arena(
275 &self.engine,
276 task.compiled_condition.as_ref(),
277 ctx_av,
278 arena,
279 )?;
280
281 if !should_execute {
282 debug!("Skipping task {} - condition not met", task.id);
283 if let Some(t) = trace.as_deref_mut() {
284 t.add_step(ExecutionStep::task_skipped(&workflow.id, &task.id));
285 }
286 continue;
287 }
288
289 // Per-task snapshot buffer — only used for Map tasks in trace
290 // mode. Allocating an empty Vec is cheap and the buffer stays
291 // empty for non-Map tasks.
292 let mut mapping_snapshots: Vec<Value> = Vec::new();
293 let snapshot_buf = if trace.is_some() {
294 Some(&mut mapping_snapshots)
295 } else {
296 None
297 };
298 let result =
299 self.execute_sync_task_in_arena(task, message, &mut arena_ctx, snapshot_buf);
300
301 let control_flow = self.handle_task_result(
302 result,
303 &workflow.id_arc,
304 &task.id_arc,
305 task.continue_on_error,
306 message,
307 now,
308 )?;
309
310 // The audit-trail / progress-metadata writes performed by
311 // `handle_task_result` mutate `message.context`. Refresh the
312 // arena cache so the next task in the stretch sees them.
313 arena_ctx.refresh_for_path(&message.context, "metadata");
314
315 if let Some(t) = trace.as_deref_mut() {
316 let mut step = ExecutionStep::executed(&workflow.id, &task.id, message);
317 if !mapping_snapshots.is_empty() {
318 step = step.with_mapping_contexts(mapping_snapshots);
319 }
320 t.add_step(step);
321 }
322
323 if matches!(control_flow, TaskControlFlow::HaltWorkflow) {
324 return Ok(true);
325 }
326 }
327 Ok(false)
328 })?;
329 Ok(outcome)
330 }
331
332 /// Dispatch a single sync-builtin task via the consolidated
333 /// `FunctionConfig::try_execute_in_arena`. `next_async_boundary` guarantees
334 /// the stretch contents are sync built-ins, so the `None` arm is
335 /// unreachable in practice.
336 ///
337 /// `map_snapshot_buf` is only consulted by the `Map` variant; non-Map
338 /// sync builtins ignore it. Pass `None` from the production path.
339 fn execute_sync_task_in_arena(
340 &self,
341 task: &Task,
342 message: &mut Message,
343 arena_ctx: &mut ArenaContext<'_>,
344 map_snapshot_buf: Option<&mut Vec<Value>>,
345 ) -> Result<(TaskOutcome, Vec<Change>)> {
346 debug!(
347 "Executing sync task in arena: {} ({})",
348 task.id,
349 task.function.function_name()
350 );
351 debug_assert!(
352 task.function.is_sync_builtin(),
353 "execute_sync_task_in_arena called with non-sync-builtin task: {}",
354 task.function.function_name()
355 );
356 // In debug builds the assert above catches mis-dispatch; in release
357 // we still surface the invariant violation as a recoverable engine
358 // error rather than panicking via `unreachable!`.
359 task.function
360 .try_execute_in_arena(message, arena_ctx, &self.engine, map_snapshot_buf)
361 .ok_or_else(|| {
362 DataflowError::Task(format!(
363 "execute_sync_task_in_arena dispatched to non-sync-builtin task '{}' \
364 (engine bug — sync-stretch should only contain sync-builtin tasks)",
365 task.function.function_name()
366 ))
367 })?
368 }
369
370 /// Handle the result of a task execution.
371 ///
372 /// `workflow_id_arc` and `task_id_arc` are the compile-time cached
373 /// `Arc<str>` mirrors of `workflow.id` / `task.id`; we Arc-clone them into
374 /// each `AuditTrail` rather than reallocating from the `&str` form.
375 fn handle_task_result(
376 &self,
377 result: Result<(TaskOutcome, Vec<Change>)>,
378 workflow_id_arc: &Arc<str>,
379 task_id_arc: &Arc<str>,
380 continue_on_error: bool,
381 message: &mut Message,
382 now: DateTime<Utc>,
383 ) -> Result<TaskControlFlow> {
384 let workflow_id: &str = workflow_id_arc;
385 let task_id: &str = task_id_arc;
386 match result {
387 Ok((TaskOutcome::Skip, _)) => {
388 // No audit trail, no progress write — task has explicitly opted
389 // out (filter gate set to `Skip`).
390 debug!("Task {} signaled skip", task_id);
391 Ok(TaskControlFlow::Continue)
392 }
393 Ok((outcome, changes)) => {
394 // `Skip` already returned above; the remaining variants all
395 // record an audit entry. `audit_status()` is `Some` for
396 // Success/Status/Halt — expect is for documentation only.
397 let status = outcome
398 .audit_status()
399 .expect("Skip handled above; remaining variants emit audit status");
400 let halt = outcome.halts_workflow();
401
402 // Record audit trail. workflow_id_arc/task_id_arc are populated
403 // by LogicCompiler at engine construction; cloning them is a
404 // refcount bump, not a string copy. `now` is shared with all
405 // other AuditTrails in this process_message call.
406 message.audit_trail.push(AuditTrail {
407 timestamp: now,
408 workflow_id: Arc::clone(workflow_id_arc),
409 task_id: Arc::clone(task_id_arc),
410 status: status as usize,
411 changes,
412 });
413
414 // Update progress metadata for workflow chaining. Always
415 // emitted: when multiple workflows are registered in the same
416 // engine, downstream workflows route on
417 // `metadata.progress.{workflow_id,task_id,status_code}` to
418 // advance through linear sequences. One batched write
419 // (single tree walk + single Object alloc) benchmarked ~3%
420 // faster than three separate writes on the realistic
421 // workload — replacing a slot beats find-and-update walks.
422 set_nested_value(
423 &mut message.context,
424 "metadata.progress",
425 OwnedDataValue::Object(vec![
426 (
427 "workflow_id".to_string(),
428 OwnedDataValue::String(workflow_id.to_string()),
429 ),
430 (
431 "task_id".to_string(),
432 OwnedDataValue::String(task_id.to_string()),
433 ),
434 (
435 "status_code".to_string(),
436 OwnedDataValue::from(status as u64),
437 ),
438 ]),
439 );
440
441 if halt {
442 info!("Task {} halted workflow {}", task_id, workflow_id);
443 return Ok(TaskControlFlow::HaltWorkflow);
444 }
445
446 // Check status code
447 if (400..500).contains(&status) {
448 warn!("Task {} returned client error status: {}", task_id, status);
449 } else if status >= 500 {
450 error!("Task {} returned server error status: {}", task_id, status);
451 // Single-channel contract: surface 5xx outcomes through
452 // `message.errors` as well as the audit trail, so callers
453 // that scan `errors()` see a 5xx-status task even when
454 // the workflow continues past it.
455 message.errors.push(
456 ErrorInfo::builder(
457 "TASK_STATUS_ERROR",
458 format!("Task {} returned status {}", task_id, status),
459 )
460 .workflow_id(workflow_id)
461 .task_id(task_id)
462 .build(),
463 );
464 if !continue_on_error {
465 return Err(DataflowError::Task(format!(
466 "Task {} failed with status {}",
467 task_id, status
468 )));
469 }
470 }
471 Ok(TaskControlFlow::Continue)
472 }
473 Err(e) => {
474 error!("Task {} failed: {:?}", task_id, e);
475
476 // Record error in audit trail (Arc clones are refcount bumps).
477 message.audit_trail.push(AuditTrail {
478 timestamp: now,
479 workflow_id: Arc::clone(workflow_id_arc),
480 task_id: Arc::clone(task_id_arc),
481 status: 500,
482 changes: vec![],
483 });
484
485 // Add error to message
486 message.errors.push(
487 ErrorInfo::builder("TASK_ERROR", format!("Task {} error: {}", task_id, e))
488 .workflow_id(workflow_id)
489 .task_id(task_id)
490 .build(),
491 );
492
493 if !continue_on_error {
494 Err(e)
495 } else {
496 Ok(TaskControlFlow::Continue)
497 }
498 }
499 }
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506 use crate::engine::compiler::LogicCompiler;
507 use serde_json::json;
508 use std::collections::HashMap;
509
510 #[tokio::test]
511 async fn test_workflow_executor_skip_condition() {
512 // Create a workflow with a false condition
513 let workflow_json = r#"{
514 "id": "test_workflow",
515 "name": "Test Workflow",
516 "condition": false,
517 "tasks": [{
518 "id": "dummy_task",
519 "name": "Dummy Task",
520 "function": {
521 "name": "map",
522 "input": {"mappings": []}
523 }
524 }]
525 }"#;
526
527 let compiler = LogicCompiler::new();
528 let mut workflow = Workflow::from_json(workflow_json).unwrap();
529
530 // Compile the workflow condition
531 let workflows = compiler.compile_workflows(vec![workflow.clone()]).unwrap();
532 if let Some(compiled_workflow) = workflows.iter().find(|w| w.id == "test_workflow") {
533 workflow = compiled_workflow.clone();
534 }
535
536 let engine = compiler.into_engine();
537 let task_executor = Arc::new(TaskExecutor::new(
538 Arc::new(HashMap::new()),
539 Arc::clone(&engine),
540 ));
541 let workflow_executor = WorkflowExecutor::new(task_executor, engine);
542
543 let mut message = Message::from_value(&json!({}));
544
545 // Execute workflow - should be skipped due to false condition
546 let executed = workflow_executor
547 .execute(&workflow, &mut message, Utc::now())
548 .await
549 .unwrap();
550 assert!(!executed);
551 assert_eq!(message.audit_trail.len(), 0);
552 }
553
554 #[tokio::test]
555 async fn test_workflow_executor_execute_success() {
556 // Create a workflow with a true condition
557 let workflow_json = r#"{
558 "id": "test_workflow",
559 "name": "Test Workflow",
560 "condition": true,
561 "tasks": [{
562 "id": "dummy_task",
563 "name": "Dummy Task",
564 "function": {
565 "name": "map",
566 "input": {"mappings": []}
567 }
568 }]
569 }"#;
570
571 let compiler = LogicCompiler::new();
572 let mut workflow = Workflow::from_json(workflow_json).unwrap();
573
574 // Compile the workflow
575 let workflows = compiler.compile_workflows(vec![workflow.clone()]).unwrap();
576 if let Some(compiled_workflow) = workflows.iter().find(|w| w.id == "test_workflow") {
577 workflow = compiled_workflow.clone();
578 }
579
580 let engine = compiler.into_engine();
581 let task_executor = Arc::new(TaskExecutor::new(
582 Arc::new(HashMap::new()),
583 Arc::clone(&engine),
584 ));
585 let workflow_executor = WorkflowExecutor::new(task_executor, engine);
586
587 let mut message = Message::from_value(&json!({}));
588
589 // Execute workflow - should succeed with empty task list
590 let executed = workflow_executor
591 .execute(&workflow, &mut message, Utc::now())
592 .await
593 .unwrap();
594 assert!(executed);
595 }
596}