Skip to main content

dataflow_rs/engine/
mod.rs

1/*!
2# Engine Module
3
4This module implements the core async workflow engine for dataflow-rs. The engine provides
5high-performance, asynchronous message processing through workflows composed of tasks.
6
7## Architecture
8
9The engine features a clean async-first architecture built on datalogic v5:
10- **Compiler**: Pre-compiles JSONLogic expressions into `Arc<Logic>` via `Engine::compile_arc`
11- **Executor**: Handles internal function execution (map, validation) with async support
12- **Engine**: Orchestrates workflow processing with shared compiled logic
13- **Thread-Safe**: Single `datalogic_rs::Engine` shared via `Arc`, with `Arc<Logic>` entries for zero-copy sharing
14
15## Key Components
16
17- **Engine**: Async engine optimized for Tokio runtime with mixed I/O and CPU workloads
18- **LogicCompiler**: Compiles and caches JSONLogic expressions during initialization
19- **InternalExecutor**: Executes built-in map and validation functions with compiled logic
20- **Workflow**: Collection of tasks with JSONLogic conditions (can access data, metadata, temp_data)
21- **Task**: Individual processing unit that performs a specific function on a message
22- **AsyncFunctionHandler**: Trait for custom async processing logic
23- **Message**: Data structure flowing through the engine with audit trail
24
25## Performance Optimizations
26
27- **Pre-compilation**: All JSONLogic expressions compiled at startup
28- **Arc-wrapped Logic**: Zero-copy sharing of compiled logic across async tasks
29- **Bump-arena evaluation**: Per-worker thread-local `Bump` is rewound (not freed) between evals
30- **True Async**: I/O operations remain fully async
31
32## Usage
33
34```rust,no_run
35use dataflow_rs::{Engine, Workflow, engine::message::Message};
36use serde_json::json;
37
38#[tokio::main]
39async fn main() -> Result<(), Box<dyn std::error::Error>> {
40    // Define workflows
41    let workflows = vec![
42        Workflow::from_json(r#"{"id": "example", "name": "Example", "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#)?
43    ];
44
45    // Create engine with defaults
46    let engine = Engine::builder().with_workflows(workflows).build()?;
47
48    // Process messages asynchronously
49    let mut message = Message::from_value(&json!({}));
50    engine.process_message(&mut message).await?;
51
52    Ok(())
53}
54```
55*/
56
57pub mod compiler;
58pub mod error;
59pub mod executor;
60pub mod functions;
61pub mod message;
62pub mod task;
63pub mod task_context;
64pub mod task_executor;
65pub mod task_outcome;
66pub mod trace;
67pub mod utils;
68pub mod workflow;
69pub mod workflow_executor;
70
71// Re-export key types for easier access
72pub use error::{DataflowError, ErrorInfo, Result};
73pub use functions::{
74    AsyncFunctionHandler, BoxedFunctionHandler, CompiledCustomInput, DynAsyncFunctionHandler,
75    FunctionConfig,
76};
77pub use message::Message;
78pub use task::Task;
79pub use task_context::TaskContext;
80pub use task_outcome::TaskOutcome;
81pub use trace::{ExecutionStep, ExecutionTrace, StepResult};
82pub use workflow::{Workflow, WorkflowStatus};
83
84// `EngineBuilder` is defined further down in this file but exposed here so
85// downstream paths can import it via `dataflow_rs::engine::EngineBuilder`.
86
87use chrono::Utc;
88use datalogic_rs::Engine as DatalogicEngine;
89use datavalue::OwnedDataValue;
90use std::collections::HashMap;
91use std::sync::Arc;
92
93use compiler::LogicCompiler;
94use task_executor::TaskExecutor;
95use utils::set_nested_value;
96use workflow_executor::WorkflowExecutor;
97
98/// High-performance async workflow engine for message processing.
99///
100/// ## Architecture
101///
102/// The engine is designed for async-first operation with Tokio:
103/// - **Separation of Concerns**: Distinct executors for workflows and tasks
104/// - **Shared datalogic engine**: Single `datalogic_rs::Engine` wrapped in `Arc` for thread-safe sharing
105/// - **Arc<Logic>**: Pre-compiled logic shared across all async tasks
106/// - **Async Functions**: Native async support for I/O-bound operations
107///
108/// ## Performance Characteristics
109///
110/// - **Zero Runtime Compilation**: All logic compiled during initialization
111/// - **Zero-Copy Sharing**: Arc-wrapped compiled logic shared without cloning
112/// - **Optimal for Mixed Workloads**: Async I/O with blocking CPU evaluation
113/// - **Thread-Safe by Design**: All components safe to share across Tokio tasks
114pub struct Engine {
115    /// Registry of available workflows, pre-sorted by priority (immutable after initialization).
116    /// Each workflow / task / function-config holds its own `Arc<Logic>` slots
117    /// — there is no central logic cache anymore.
118    workflows: Arc<Vec<Workflow>>,
119    /// Channel index: maps channel name -> indices into workflows vec (only Active workflows)
120    channel_index: Arc<HashMap<String, Vec<usize>>>,
121    /// Workflow executor for orchestrating workflow execution
122    workflow_executor: Arc<WorkflowExecutor>,
123    /// Shared datalogic v5 engine for JSONLogic evaluation (Send + Sync)
124    datalogic: Arc<DatalogicEngine>,
125    /// Pre-built `Arc<OwnedDataValue::String>` of the engine version. Built
126    /// once at construction; stamped into `metadata.engine_version` per
127    /// message via an `Arc` refcount bump (the underlying `String` is never
128    /// re-allocated for this stamp).
129    engine_version: Arc<OwnedDataValue>,
130}
131
132/// Build a channel index from pre-sorted workflows.
133/// Maps channel name -> indices into workflows vec, only for Active workflows.
134fn build_channel_index(workflows: &[Workflow]) -> HashMap<String, Vec<usize>> {
135    let mut index: HashMap<String, Vec<usize>> = HashMap::new();
136    for (i, workflow) in workflows.iter().enumerate() {
137        if workflow.status == WorkflowStatus::Active {
138            index.entry(workflow.channel.clone()).or_default().push(i);
139        }
140    }
141    index
142}
143
144impl Engine {
145    /// Creates a new Engine instance.
146    ///
147    /// Compiles every workflow / task / function-config JSONLogic expression
148    /// up-front. Returns `Err(DataflowError)` if any required expression
149    /// fails to compile — fail-loud at construction time instead of silently
150    /// dropping broken workflows at runtime.
151    ///
152    /// # Arguments
153    /// * `workflows` - The workflows to use for processing messages
154    /// * `custom_functions` - Custom async function handlers (use
155    ///   `HashMap::new()` for none, or prefer [`Engine::builder`])
156    ///
157    /// # Example
158    ///
159    /// ```
160    /// use dataflow_rs::{Engine, Workflow};
161    ///
162    /// let workflows = vec![Workflow::from_json(r#"{"id": "test", "name": "Test", "priority": 0, "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#).unwrap()];
163    ///
164    /// let engine = Engine::builder().with_workflows(workflows).build().unwrap();
165    /// ```
166    /// The recommended construction path is [`Engine::builder`]. `Engine::new`
167    /// is the lower-level escape hatch — accepts handlers as a plain
168    /// `HashMap` (use `HashMap::new()` for the no-handler case).
169    pub fn new(
170        workflows: Vec<Workflow>,
171        custom_functions: HashMap<String, BoxedFunctionHandler>,
172    ) -> Result<Self> {
173        // Compile workflows (sorted by priority at compile time). Each
174        // workflow/task/config owns its own `Arc<Logic>` slots — no central
175        // cache to return. Any compile failure bubbles up immediately.
176        let compiler = LogicCompiler::new();
177        let mut sorted_workflows = compiler.compile_workflows(workflows)?;
178        let datalogic = compiler.into_engine();
179
180        let task_functions = custom_functions;
181
182        // Pre-parse `FunctionConfig::Custom { input }` JSON into the
183        // registered handler's typed `Self::Input`, caching the boxed value
184        // on the task. Misshapen Custom configs fail here, not on first
185        // message — matches the "fail loud at startup" stance for compiled
186        // logic. Built-in async configs (HttpCall/Enrich/PublishKafka) are
187        // already typed by serde and need no second pass.
188        precompile_custom_inputs(&mut sorted_workflows, &task_functions)?;
189
190        let task_executor = Arc::new(TaskExecutor::new(
191            Arc::new(task_functions),
192            Arc::clone(&datalogic),
193        ));
194
195        let workflow_executor =
196            Arc::new(WorkflowExecutor::new(task_executor, Arc::clone(&datalogic)));
197
198        // Build channel index for O(1) channel-based routing
199        let channel_index = build_channel_index(&sorted_workflows);
200
201        Ok(Self {
202            workflows: Arc::new(sorted_workflows),
203            channel_index: Arc::new(channel_index),
204            workflow_executor,
205            datalogic,
206            engine_version: Arc::new(OwnedDataValue::String(
207                env!("CARGO_PKG_VERSION").to_string(),
208            )),
209        })
210    }
211
212    /// Start building an engine. The recommended construction path —
213    /// chains `register("name", handler)` and `with_workflow(w)` calls,
214    /// then `build()` to produce a `Result<Engine>`.
215    ///
216    /// ```no_run
217    /// use dataflow_rs::{Engine, Workflow};
218    /// # let workflow: Workflow = unimplemented!();
219    /// let engine = Engine::builder()
220    ///     .with_workflow(workflow)
221    ///     // .register("my_handler", MyHandler)  // any AsyncFunctionHandler
222    ///     .build()
223    ///     .unwrap();
224    /// ```
225    pub fn builder() -> EngineBuilder {
226        EngineBuilder::new()
227    }
228
229    /// Cached `OwnedDataValue::String` of the engine version.
230    pub fn engine_version_value(&self) -> &OwnedDataValue {
231        &self.engine_version
232    }
233
234    /// Creates a new Engine with different workflows but the same custom function handlers.
235    ///
236    /// This is the hot-reload path. The existing engine remains valid for any
237    /// in-flight `process_message` calls. The returned engine shares the same
238    /// function registry (zero-copy Arc bump) but has freshly compiled logic
239    /// for the new workflow set.
240    ///
241    /// # Arguments
242    /// * `workflows` - The new set of workflows to compile and use
243    pub fn with_new_workflows(&self, workflows: Vec<Workflow>) -> Result<Self> {
244        // Extract the shared function registry from the existing executor
245        let task_functions = self.workflow_executor.task_functions();
246
247        // Compile new workflows with a fresh datalogic engine instance.
248        let compiler = LogicCompiler::new();
249        let mut sorted_workflows = compiler.compile_workflows(workflows)?;
250        let datalogic = compiler.into_engine();
251
252        // Pre-parse Custom inputs against the existing handler registry —
253        // hot-reload still validates the new workflow set against the
254        // already-registered handlers.
255        precompile_custom_inputs(&mut sorted_workflows, &task_functions)?;
256
257        // Rebuild the executor stack, reusing the existing function registry
258        let task_executor = Arc::new(TaskExecutor::new(task_functions, Arc::clone(&datalogic)));
259
260        let workflow_executor =
261            Arc::new(WorkflowExecutor::new(task_executor, Arc::clone(&datalogic)));
262
263        // Build channel index for O(1) channel-based routing
264        let channel_index = build_channel_index(&sorted_workflows);
265
266        Ok(Self {
267            workflows: Arc::new(sorted_workflows),
268            channel_index: Arc::new(channel_index),
269            workflow_executor,
270            datalogic,
271            engine_version: Arc::clone(&self.engine_version),
272        })
273    }
274
275    /// Processes a message through workflows that match their conditions.
276    ///
277    /// This async method:
278    /// 1. Iterates through workflows sequentially in priority order (pre-sorted at construction)
279    /// 2. Delegates workflow execution to the WorkflowExecutor
280    /// 3. Updates message metadata
281    ///
282    /// # Error contract
283    ///
284    /// Errors flow through two complementary channels:
285    /// - `message.errors()` — **always** contains every error encountered
286    ///   (validation failures, task panics, 5xx-status outcomes, workflow
287    ///   wrappers). Callers that want a uniform view inspect this list.
288    /// - `Result::Err` — signals **only** that the engine stopped before
289    ///   processing every workflow. Callers that want fail-fast match on
290    ///   this. The error pushed to `message.errors` for the same failure
291    ///   carries the workflow context (id) that the bare `Err` doesn't.
292    ///
293    /// In particular: a workflow with `continue_on_error: true` records its
294    /// errors to `message.errors` and returns `Ok(())` here. A workflow
295    /// with `continue_on_error: false` records to `message.errors` *and*
296    /// returns `Result::Err` (which short-circuits the rest of this call).
297    ///
298    /// # Arguments
299    /// * `message` - The message to process through workflows
300    ///
301    /// # Returns
302    /// * `Result<()>` — `Ok(())` if every workflow completed (each may have
303    ///   pushed errors to `message.errors`); `Err(e)` if the engine
304    ///   stopped early on a hard failure.
305    pub async fn process_message(&self, message: &mut Message) -> Result<()> {
306        // Capture a single timestamp for the entire process_message call. The
307        // workflow executor reads it back via Message metadata if it needs to
308        // emit AuditTrail entries; this caps the number of `Utc::now()` syscalls
309        // at 1 per message (down from 3+ — one stamp here, one per AuditTrail).
310        let now = Utc::now();
311        set_processing_metadata(&mut message.context, &self.engine_version, now, None);
312
313        // Process each workflow in priority order (pre-sorted at construction)
314        for workflow in self.workflows.iter() {
315            self.workflow_executor
316                .execute(workflow, message, now)
317                .await?;
318        }
319
320        Ok(())
321    }
322
323    /// Processes a message through workflows with step-by-step tracing.
324    ///
325    /// This method is similar to `process_message` but captures an execution trace
326    /// that can be used for debugging and step-by-step visualization.
327    ///
328    /// # Arguments
329    /// * `message` - The message to process through workflows
330    ///
331    /// # Returns
332    /// * `Result<ExecutionTrace>` - The execution trace with message snapshots
333    pub async fn process_message_with_trace(
334        &self,
335        message: &mut Message,
336    ) -> Result<ExecutionTrace> {
337        use trace::ExecutionTrace;
338
339        let now = Utc::now();
340        set_processing_metadata(&mut message.context, &self.engine_version, now, None);
341
342        let mut trace = ExecutionTrace::new();
343
344        // Process each workflow in priority order (pre-sorted at construction)
345        for workflow in self.workflows.iter() {
346            self.workflow_executor
347                .execute_with_trace(workflow, message, &mut trace, now)
348                .await?;
349        }
350
351        Ok(trace)
352    }
353
354    /// Processes a message through only the Active workflows registered for a given channel.
355    ///
356    /// Workflows are processed in priority order (lowest first), same as process_message().
357    /// If the channel does not exist or has no Active workflows, this is a no-op.
358    ///
359    /// # Arguments
360    /// * `channel` - The channel name to route the message through
361    /// * `message` - The message to process
362    pub async fn process_message_for_channel(
363        &self,
364        channel: &str,
365        message: &mut Message,
366    ) -> Result<()> {
367        let now = Utc::now();
368        set_processing_metadata(
369            &mut message.context,
370            &self.engine_version,
371            now,
372            Some(channel),
373        );
374
375        if let Some(indices) = self.channel_index.get(channel) {
376            for &idx in indices {
377                self.workflow_executor
378                    .execute(&self.workflows[idx], message, now)
379                    .await?;
380            }
381        }
382
383        Ok(())
384    }
385
386    /// Processes a message through a channel with step-by-step tracing.
387    ///
388    /// # Arguments
389    /// * `channel` - The channel name to route the message through
390    /// * `message` - The message to process
391    pub async fn process_message_for_channel_with_trace(
392        &self,
393        channel: &str,
394        message: &mut Message,
395    ) -> Result<ExecutionTrace> {
396        use trace::ExecutionTrace;
397
398        let now = Utc::now();
399        set_processing_metadata(
400            &mut message.context,
401            &self.engine_version,
402            now,
403            Some(channel),
404        );
405
406        let mut trace = ExecutionTrace::new();
407
408        if let Some(indices) = self.channel_index.get(channel) {
409            for &idx in indices {
410                self.workflow_executor
411                    .execute_with_trace(&self.workflows[idx], message, &mut trace, now)
412                    .await?;
413            }
414        }
415
416        Ok(trace)
417    }
418
419    /// Get a reference to the workflows (pre-sorted by priority)
420    pub fn workflows(&self) -> &Arc<Vec<Workflow>> {
421        &self.workflows
422    }
423
424    /// Look up a workflow by its ID
425    pub fn workflow_by_id(&self, id: &str) -> Option<&Workflow> {
426        self.workflows.iter().find(|w| w.id == id)
427    }
428
429    /// Get a reference to the underlying datalogic v5 engine.
430    pub fn datalogic(&self) -> &Arc<DatalogicEngine> {
431        &self.datalogic
432    }
433}
434
435/// Builder for [`Engine`]. The recommended construction path — chain
436/// `register("name", handler)` and `with_workflow(workflow)` calls, then
437/// `build()` to produce a `Result<Engine>`. Empty registration is fine; an
438/// engine with no custom handlers still resolves the built-in functions.
439///
440/// `register` takes any [`AsyncFunctionHandler`] and boxes it internally; the
441/// `Box<dyn DynAsyncFunctionHandler + Send + Sync>` plumbing stays out of
442/// user code.
443///
444/// ```no_run
445/// use dataflow_rs::{Engine, Workflow};
446/// # let workflow: Workflow = unimplemented!();
447/// let engine = Engine::builder()
448///     .with_workflow(workflow)
449///     // .register("my_handler", MyHandler)
450///     .build()
451///     .unwrap();
452/// ```
453#[must_use = "EngineBuilder must be `.build()` to produce an Engine"]
454#[derive(Default)]
455pub struct EngineBuilder {
456    workflows: Vec<Workflow>,
457    handlers: HashMap<String, BoxedFunctionHandler>,
458}
459
460impl EngineBuilder {
461    /// Create an empty builder. Equivalent to [`EngineBuilder::default`].
462    pub fn new() -> Self {
463        Self::default()
464    }
465
466    /// Register a custom async handler under `name`. Accepts any
467    /// `AsyncFunctionHandler`; boxing happens internally via the engine's
468    /// blanket impl.
469    pub fn register<F>(mut self, name: impl Into<String>, handler: F) -> Self
470    where
471        F: AsyncFunctionHandler,
472    {
473        self.handlers.insert(name.into(), Box::new(handler));
474        self
475    }
476
477    /// Register a pre-boxed handler. Useful when handlers are constructed
478    /// dynamically (e.g. plugin registries) and the concrete type isn't
479    /// known at the call site.
480    pub fn register_boxed(
481        mut self,
482        name: impl Into<String>,
483        handler: BoxedFunctionHandler,
484    ) -> Self {
485        self.handlers.insert(name.into(), handler);
486        self
487    }
488
489    /// Add a single workflow. Subsequent calls append.
490    pub fn with_workflow(mut self, workflow: Workflow) -> Self {
491        self.workflows.push(workflow);
492        self
493    }
494
495    /// Append every workflow in `workflows`. Accepts anything iterable —
496    /// `Vec<Workflow>`, an array, an iterator. Existing workflows on the
497    /// builder are kept; subsequent registers/workflows still chain.
498    pub fn with_workflows<I>(mut self, workflows: I) -> Self
499    where
500        I: IntoIterator<Item = Workflow>,
501    {
502        self.workflows.extend(workflows);
503        self
504    }
505
506    /// Compile the workflows, pre-parse Custom inputs, and produce the
507    /// engine. Compile errors and missing handler references surface here —
508    /// the engine never deserializes Custom config on the hot path.
509    pub fn build(self) -> Result<Engine> {
510        Engine::new(self.workflows, self.handlers)
511    }
512}
513
514/// Walk every task in every workflow; for each `FunctionConfig::Custom`,
515/// look up the registered handler and ask it to parse the raw `input` JSON
516/// into its typed `Self::Input` (boxed as `dyn Any`). The cached result is
517/// stored on the task — dispatch then hands the handler a `&dyn Any` it
518/// downcasts in O(1).
519///
520/// Built-in async configs (`HttpCall`, `Enrich`, `PublishKafka`) are already
521/// parsed by serde's `untagged` representation on `FunctionConfig`; they
522/// need no second pass.
523///
524/// Returns `FunctionNotFound` when a Custom task references an unregistered
525/// handler — moves the failure from "first message" to engine construction.
526fn precompile_custom_inputs(
527    workflows: &mut [Workflow],
528    handlers: &HashMap<String, BoxedFunctionHandler>,
529) -> Result<()> {
530    for workflow in workflows {
531        for task in &mut workflow.tasks {
532            if let FunctionConfig::Custom {
533                name,
534                input,
535                compiled_input,
536            } = &mut task.function
537            {
538                let handler = handlers
539                    .get(name)
540                    .ok_or_else(|| function_not_found_error(name, handlers))?;
541                let parsed = handler.parse_input_box(input)?;
542                *compiled_input = Some(CompiledCustomInput(Arc::from(parsed)));
543            }
544        }
545    }
546    Ok(())
547}
548
549/// Build a `FunctionNotFound` error that lists both the registered custom
550/// handlers and the names of built-in functions, so a user with a typo
551/// (e.g. `htttp_call`) can immediately spot the intended name.
552fn function_not_found_error(
553    name: &str,
554    handlers: &HashMap<String, BoxedFunctionHandler>,
555) -> DataflowError {
556    use crate::engine::functions::config::BUILTIN_FUNCTION_NAMES;
557    let mut registered: Vec<&str> = handlers.keys().map(String::as_str).collect();
558    registered.sort_unstable();
559    let registered_part = if registered.is_empty() {
560        String::from("none")
561    } else {
562        registered.join(", ")
563    };
564    DataflowError::FunctionNotFound(format!(
565        "{name} (registered handlers: {registered_part}; built-ins: {})",
566        BUILTIN_FUNCTION_NAMES.join(", ")
567    ))
568}
569
570/// Stamp the standard processing metadata (`processed_at`, `engine_version`,
571/// and optionally `channel`) into the message context.
572///
573/// `now` is captured once at the top of `process_message` and reused so the
574/// timestamp on `metadata.processed_at` matches the one used for every
575/// `AuditTrail` entry within the same call.
576/// `engine_version` is the cached `Arc<OwnedDataValue::String>` owned by
577/// `Engine`; the deref-and-clone here is one Arc-bump's worth of work, not
578/// a `String` allocation.
579fn set_processing_metadata(
580    context: &mut OwnedDataValue,
581    engine_version: &Arc<OwnedDataValue>,
582    now: chrono::DateTime<Utc>,
583    channel: Option<&str>,
584) {
585    set_nested_value(
586        context,
587        "metadata.processed_at",
588        OwnedDataValue::String(now.to_rfc3339()),
589    );
590    set_nested_value(
591        context,
592        "metadata.engine_version",
593        (**engine_version).clone(),
594    );
595    if let Some(channel) = channel {
596        set_nested_value(
597            context,
598            "metadata.channel",
599            OwnedDataValue::String(channel.to_string()),
600        );
601    }
602}