dataflow_rs/engine/mod.rs
1/*!
2# Engine Module
3
4This module implements the core async workflow engine for dataflow-rs. The engine provides
5high-performance, asynchronous message processing through workflows composed of tasks.
6
7## Architecture
8
9The engine features a clean async-first architecture built on datalogic v5:
10- **Compiler**: Pre-compiles JSONLogic expressions into `Arc<Logic>` via `Engine::compile_arc`
11- **Executor**: Handles internal function execution (map, validation) with async support
12- **Engine**: Orchestrates workflow processing with shared compiled logic
13- **Thread-Safe**: Single `datalogic_rs::Engine` shared via `Arc`, with `Arc<Logic>` entries for zero-copy sharing
14
15## Key Components
16
17- **Engine**: Async engine optimized for Tokio runtime with mixed I/O and CPU workloads
18- **LogicCompiler**: Compiles and caches JSONLogic expressions during initialization
19- **InternalExecutor**: Executes built-in map and validation functions with compiled logic
20- **Workflow**: Collection of tasks with JSONLogic conditions (can access data, metadata, temp_data)
21- **Task**: Individual processing unit that performs a specific function on a message
22- **AsyncFunctionHandler**: Trait for custom async processing logic
23- **Message**: Data structure flowing through the engine with audit trail
24
25## Performance Optimizations
26
27- **Pre-compilation**: All JSONLogic expressions compiled at startup
28- **Arc-wrapped Logic**: Zero-copy sharing of compiled logic across async tasks
29- **Bump-arena evaluation**: Per-worker thread-local `Bump` is rewound (not freed) between evals
30- **True Async**: I/O operations remain fully async
31
32## Usage
33
34```rust,no_run
35use dataflow_rs::{Engine, Workflow, engine::message::Message};
36use serde_json::json;
37
38#[tokio::main]
39async fn main() -> Result<(), Box<dyn std::error::Error>> {
40 // Define workflows
41 let workflows = vec![
42 Workflow::from_json(r#"{"id": "example", "name": "Example", "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#)?
43 ];
44
45 // Create engine with defaults
46 let engine = Engine::builder().with_workflows(workflows).build()?;
47
48 // Process messages asynchronously
49 let mut message = Message::from_value(&json!({}));
50 engine.process_message(&mut message).await?;
51
52 Ok(())
53}
54```
55*/
56
57pub mod compiler;
58pub mod error;
59pub mod executor;
60pub mod functions;
61pub mod message;
62pub mod task;
63pub mod task_context;
64pub mod task_executor;
65pub mod task_outcome;
66pub mod trace;
67pub mod utils;
68pub mod workflow;
69pub mod workflow_executor;
70
71// Re-export key types for easier access
72pub use error::{DataflowError, ErrorInfo, Result};
73pub use functions::{
74 AsyncFunctionHandler, BoxedFunctionHandler, CompiledCustomInput, DynAsyncFunctionHandler,
75 FunctionConfig,
76};
77pub use message::Message;
78pub use task::Task;
79pub use task_context::TaskContext;
80pub use task_outcome::TaskOutcome;
81pub use trace::{ExecutionStep, ExecutionTrace, StepResult};
82pub use workflow::{Workflow, WorkflowStatus};
83
84// `EngineBuilder` is defined further down in this file but exposed here so
85// downstream paths can import it via `dataflow_rs::engine::EngineBuilder`.
86
87use chrono::Utc;
88use datalogic_rs::Engine as DatalogicEngine;
89use datavalue::OwnedDataValue;
90use std::collections::HashMap;
91use std::sync::Arc;
92
93use compiler::LogicCompiler;
94use task_executor::TaskExecutor;
95use utils::set_nested_value;
96use workflow_executor::WorkflowExecutor;
97
98/// High-performance async workflow engine for message processing.
99///
100/// ## Architecture
101///
102/// The engine is designed for async-first operation with Tokio:
103/// - **Separation of Concerns**: Distinct executors for workflows and tasks
104/// - **Shared datalogic engine**: Single `datalogic_rs::Engine` wrapped in `Arc` for thread-safe sharing
105/// - **Arc<Logic>**: Pre-compiled logic shared across all async tasks
106/// - **Async Functions**: Native async support for I/O-bound operations
107///
108/// ## Performance Characteristics
109///
110/// - **Zero Runtime Compilation**: All logic compiled during initialization
111/// - **Zero-Copy Sharing**: Arc-wrapped compiled logic shared without cloning
112/// - **Optimal for Mixed Workloads**: Async I/O with blocking CPU evaluation
113/// - **Thread-Safe by Design**: All components safe to share across Tokio tasks
114pub struct Engine {
115 /// Registry of available workflows, pre-sorted by priority (immutable after initialization).
116 /// Each workflow / task / function-config holds its own `Arc<Logic>` slots
117 /// — there is no central logic cache anymore.
118 workflows: Arc<Vec<Workflow>>,
119 /// Channel index: maps channel name -> indices into workflows vec (only Active workflows)
120 channel_index: Arc<HashMap<String, Vec<usize>>>,
121 /// Workflow executor for orchestrating workflow execution
122 workflow_executor: Arc<WorkflowExecutor>,
123 /// Shared datalogic v5 engine for JSONLogic evaluation (Send + Sync)
124 datalogic: Arc<DatalogicEngine>,
125 /// Pre-built `Arc<OwnedDataValue::String>` of the engine version. Built
126 /// once at construction; stamped into `metadata.engine_version` per
127 /// message via an `Arc` refcount bump (the underlying `String` is never
128 /// re-allocated for this stamp).
129 engine_version: Arc<OwnedDataValue>,
130}
131
132/// Build a channel index from pre-sorted workflows.
133/// Maps channel name -> indices into workflows vec, only for Active workflows.
134fn build_channel_index(workflows: &[Workflow]) -> HashMap<String, Vec<usize>> {
135 let mut index: HashMap<String, Vec<usize>> = HashMap::new();
136 for (i, workflow) in workflows.iter().enumerate() {
137 if workflow.status == WorkflowStatus::Active {
138 index.entry(workflow.channel.clone()).or_default().push(i);
139 }
140 }
141 index
142}
143
144impl Engine {
145 /// Creates a new Engine instance.
146 ///
147 /// Compiles every workflow / task / function-config JSONLogic expression
148 /// up-front. Returns `Err(DataflowError)` if any required expression
149 /// fails to compile — fail-loud at construction time instead of silently
150 /// dropping broken workflows at runtime.
151 ///
152 /// # Arguments
153 /// * `workflows` - The workflows to use for processing messages
154 /// * `custom_functions` - Custom async function handlers (use
155 /// `HashMap::new()` for none, or prefer [`Engine::builder`])
156 ///
157 /// # Example
158 ///
159 /// ```
160 /// use dataflow_rs::{Engine, Workflow};
161 ///
162 /// let workflows = vec![Workflow::from_json(r#"{"id": "test", "name": "Test", "priority": 0, "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#).unwrap()];
163 ///
164 /// let engine = Engine::builder().with_workflows(workflows).build().unwrap();
165 /// ```
166 /// The recommended construction path is [`Engine::builder`]. `Engine::new`
167 /// is the lower-level escape hatch — accepts handlers as a plain
168 /// `HashMap` (use `HashMap::new()` for the no-handler case).
169 pub fn new(
170 workflows: Vec<Workflow>,
171 custom_functions: HashMap<String, BoxedFunctionHandler>,
172 ) -> Result<Self> {
173 // Compile workflows (sorted by priority at compile time). Each
174 // workflow/task/config owns its own `Arc<Logic>` slots — no central
175 // cache to return. Any compile failure bubbles up immediately.
176 let compiler = LogicCompiler::new();
177 let mut sorted_workflows = compiler.compile_workflows(workflows)?;
178 let datalogic = compiler.into_engine();
179
180 let task_functions = custom_functions;
181
182 // Pre-parse `FunctionConfig::Custom { input }` JSON into the
183 // registered handler's typed `Self::Input`, caching the boxed value
184 // on the task. Misshapen Custom configs fail here, not on first
185 // message — matches the "fail loud at startup" stance for compiled
186 // logic. Built-in async configs (HttpCall/Enrich/PublishKafka) are
187 // already typed by serde and need no second pass.
188 precompile_custom_inputs(&mut sorted_workflows, &task_functions)?;
189
190 let task_executor = Arc::new(TaskExecutor::new(
191 Arc::new(task_functions),
192 Arc::clone(&datalogic),
193 ));
194
195 let workflow_executor =
196 Arc::new(WorkflowExecutor::new(task_executor, Arc::clone(&datalogic)));
197
198 // Build channel index for O(1) channel-based routing
199 let channel_index = build_channel_index(&sorted_workflows);
200
201 Ok(Self {
202 workflows: Arc::new(sorted_workflows),
203 channel_index: Arc::new(channel_index),
204 workflow_executor,
205 datalogic,
206 engine_version: Arc::new(OwnedDataValue::String(
207 env!("CARGO_PKG_VERSION").to_string(),
208 )),
209 })
210 }
211
212 /// Start building an engine. The recommended construction path —
213 /// chains `register("name", handler)` and `with_workflow(w)` calls,
214 /// then `build()` to produce a `Result<Engine>`.
215 ///
216 /// ```no_run
217 /// use dataflow_rs::{Engine, Workflow};
218 /// # let workflow: Workflow = unimplemented!();
219 /// let engine = Engine::builder()
220 /// .with_workflow(workflow)
221 /// // .register("my_handler", MyHandler) // any AsyncFunctionHandler
222 /// .build()
223 /// .unwrap();
224 /// ```
225 pub fn builder() -> EngineBuilder {
226 EngineBuilder::new()
227 }
228
229 /// Cached `OwnedDataValue::String` of the engine version.
230 pub fn engine_version_value(&self) -> &OwnedDataValue {
231 &self.engine_version
232 }
233
234 /// Creates a new Engine with different workflows but the same custom function handlers.
235 ///
236 /// This is the hot-reload path. The existing engine remains valid for any
237 /// in-flight `process_message` calls. The returned engine shares the same
238 /// function registry (zero-copy Arc bump) but has freshly compiled logic
239 /// for the new workflow set.
240 ///
241 /// # Arguments
242 /// * `workflows` - The new set of workflows to compile and use
243 pub fn with_new_workflows(&self, workflows: Vec<Workflow>) -> Result<Self> {
244 // Extract the shared function registry from the existing executor
245 let task_functions = self.workflow_executor.task_functions();
246
247 // Compile new workflows with a fresh datalogic engine instance.
248 let compiler = LogicCompiler::new();
249 let mut sorted_workflows = compiler.compile_workflows(workflows)?;
250 let datalogic = compiler.into_engine();
251
252 // Pre-parse Custom inputs against the existing handler registry —
253 // hot-reload still validates the new workflow set against the
254 // already-registered handlers.
255 precompile_custom_inputs(&mut sorted_workflows, &task_functions)?;
256
257 // Rebuild the executor stack, reusing the existing function registry
258 let task_executor = Arc::new(TaskExecutor::new(task_functions, Arc::clone(&datalogic)));
259
260 let workflow_executor =
261 Arc::new(WorkflowExecutor::new(task_executor, Arc::clone(&datalogic)));
262
263 // Build channel index for O(1) channel-based routing
264 let channel_index = build_channel_index(&sorted_workflows);
265
266 Ok(Self {
267 workflows: Arc::new(sorted_workflows),
268 channel_index: Arc::new(channel_index),
269 workflow_executor,
270 datalogic,
271 engine_version: Arc::clone(&self.engine_version),
272 })
273 }
274
275 /// Processes a message through workflows that match their conditions.
276 ///
277 /// This async method:
278 /// 1. Iterates through workflows sequentially in priority order (pre-sorted at construction)
279 /// 2. Delegates workflow execution to the WorkflowExecutor
280 /// 3. Updates message metadata
281 ///
282 /// # Error contract
283 ///
284 /// Errors flow through two complementary channels:
285 /// - `message.errors()` — **always** contains every error encountered
286 /// (validation failures, task panics, 5xx-status outcomes, workflow
287 /// wrappers). Callers that want a uniform view inspect this list.
288 /// - `Result::Err` — signals **only** that the engine stopped before
289 /// processing every workflow. Callers that want fail-fast match on
290 /// this. The error pushed to `message.errors` for the same failure
291 /// carries the workflow context (id) that the bare `Err` doesn't.
292 ///
293 /// In particular: a workflow with `continue_on_error: true` records its
294 /// errors to `message.errors` and returns `Ok(())` here. A workflow
295 /// with `continue_on_error: false` records to `message.errors` *and*
296 /// returns `Result::Err` (which short-circuits the rest of this call).
297 ///
298 /// # Arguments
299 /// * `message` - The message to process through workflows
300 ///
301 /// # Returns
302 /// * `Result<()>` — `Ok(())` if every workflow completed (each may have
303 /// pushed errors to `message.errors`); `Err(e)` if the engine
304 /// stopped early on a hard failure.
305 pub async fn process_message(&self, message: &mut Message) -> Result<()> {
306 // Capture a single timestamp for the entire process_message call. The
307 // workflow executor reads it back via Message metadata if it needs to
308 // emit AuditTrail entries; this caps the number of `Utc::now()` syscalls
309 // at 1 per message (down from 3+ — one stamp here, one per AuditTrail).
310 let now = Utc::now();
311 set_processing_metadata(&mut message.context, &self.engine_version, now, None);
312
313 // Process each workflow in priority order (pre-sorted at construction)
314 for workflow in self.workflows.iter() {
315 self.workflow_executor
316 .execute(workflow, message, now)
317 .await?;
318 }
319
320 Ok(())
321 }
322
323 /// Processes a message through workflows with step-by-step tracing.
324 ///
325 /// This method is similar to `process_message` but captures an execution trace
326 /// that can be used for debugging and step-by-step visualization.
327 ///
328 /// # Arguments
329 /// * `message` - The message to process through workflows
330 ///
331 /// # Returns
332 /// * `Result<ExecutionTrace>` - The execution trace with message snapshots
333 pub async fn process_message_with_trace(
334 &self,
335 message: &mut Message,
336 ) -> Result<ExecutionTrace> {
337 use trace::ExecutionTrace;
338
339 let now = Utc::now();
340 set_processing_metadata(&mut message.context, &self.engine_version, now, None);
341
342 let mut trace = ExecutionTrace::new();
343
344 // Process each workflow in priority order (pre-sorted at construction)
345 for workflow in self.workflows.iter() {
346 self.workflow_executor
347 .execute_with_trace(workflow, message, &mut trace, now)
348 .await?;
349 }
350
351 Ok(trace)
352 }
353
354 /// Processes a message through only the Active workflows registered for a given channel.
355 ///
356 /// Workflows are processed in priority order (lowest first), same as process_message().
357 /// If the channel does not exist or has no Active workflows, this is a no-op.
358 ///
359 /// # Arguments
360 /// * `channel` - The channel name to route the message through
361 /// * `message` - The message to process
362 pub async fn process_message_for_channel(
363 &self,
364 channel: &str,
365 message: &mut Message,
366 ) -> Result<()> {
367 let now = Utc::now();
368 set_processing_metadata(
369 &mut message.context,
370 &self.engine_version,
371 now,
372 Some(channel),
373 );
374
375 if let Some(indices) = self.channel_index.get(channel) {
376 for &idx in indices {
377 self.workflow_executor
378 .execute(&self.workflows[idx], message, now)
379 .await?;
380 }
381 }
382
383 Ok(())
384 }
385
386 /// Processes a message through a channel with step-by-step tracing.
387 ///
388 /// # Arguments
389 /// * `channel` - The channel name to route the message through
390 /// * `message` - The message to process
391 pub async fn process_message_for_channel_with_trace(
392 &self,
393 channel: &str,
394 message: &mut Message,
395 ) -> Result<ExecutionTrace> {
396 use trace::ExecutionTrace;
397
398 let now = Utc::now();
399 set_processing_metadata(
400 &mut message.context,
401 &self.engine_version,
402 now,
403 Some(channel),
404 );
405
406 let mut trace = ExecutionTrace::new();
407
408 if let Some(indices) = self.channel_index.get(channel) {
409 for &idx in indices {
410 self.workflow_executor
411 .execute_with_trace(&self.workflows[idx], message, &mut trace, now)
412 .await?;
413 }
414 }
415
416 Ok(trace)
417 }
418
419 /// Get a reference to the workflows (pre-sorted by priority)
420 pub fn workflows(&self) -> &Arc<Vec<Workflow>> {
421 &self.workflows
422 }
423
424 /// Look up a workflow by its ID
425 pub fn workflow_by_id(&self, id: &str) -> Option<&Workflow> {
426 self.workflows.iter().find(|w| w.id == id)
427 }
428
429 /// Get a reference to the underlying datalogic v5 engine.
430 pub fn datalogic(&self) -> &Arc<DatalogicEngine> {
431 &self.datalogic
432 }
433}
434
435/// Builder for [`Engine`]. The recommended construction path — chain
436/// `register("name", handler)` and `with_workflow(workflow)` calls, then
437/// `build()` to produce a `Result<Engine>`. Empty registration is fine; an
438/// engine with no custom handlers still resolves the built-in functions.
439///
440/// `register` takes any [`AsyncFunctionHandler`] and boxes it internally; the
441/// `Box<dyn DynAsyncFunctionHandler + Send + Sync>` plumbing stays out of
442/// user code.
443///
444/// ```no_run
445/// use dataflow_rs::{Engine, Workflow};
446/// # let workflow: Workflow = unimplemented!();
447/// let engine = Engine::builder()
448/// .with_workflow(workflow)
449/// // .register("my_handler", MyHandler)
450/// .build()
451/// .unwrap();
452/// ```
453#[must_use = "EngineBuilder must be `.build()` to produce an Engine"]
454#[derive(Default)]
455pub struct EngineBuilder {
456 workflows: Vec<Workflow>,
457 handlers: HashMap<String, BoxedFunctionHandler>,
458}
459
460impl EngineBuilder {
461 /// Create an empty builder. Equivalent to [`EngineBuilder::default`].
462 pub fn new() -> Self {
463 Self::default()
464 }
465
466 /// Register a custom async handler under `name`. Accepts any
467 /// `AsyncFunctionHandler`; boxing happens internally via the engine's
468 /// blanket impl.
469 pub fn register<F>(mut self, name: impl Into<String>, handler: F) -> Self
470 where
471 F: AsyncFunctionHandler,
472 {
473 self.handlers.insert(name.into(), Box::new(handler));
474 self
475 }
476
477 /// Register a pre-boxed handler. Useful when handlers are constructed
478 /// dynamically (e.g. plugin registries) and the concrete type isn't
479 /// known at the call site.
480 pub fn register_boxed(
481 mut self,
482 name: impl Into<String>,
483 handler: BoxedFunctionHandler,
484 ) -> Self {
485 self.handlers.insert(name.into(), handler);
486 self
487 }
488
489 /// Add a single workflow. Subsequent calls append.
490 pub fn with_workflow(mut self, workflow: Workflow) -> Self {
491 self.workflows.push(workflow);
492 self
493 }
494
495 /// Append every workflow in `workflows`. Accepts anything iterable —
496 /// `Vec<Workflow>`, an array, an iterator. Existing workflows on the
497 /// builder are kept; subsequent registers/workflows still chain.
498 pub fn with_workflows<I>(mut self, workflows: I) -> Self
499 where
500 I: IntoIterator<Item = Workflow>,
501 {
502 self.workflows.extend(workflows);
503 self
504 }
505
506 /// Compile the workflows, pre-parse Custom inputs, and produce the
507 /// engine. Compile errors and missing handler references surface here —
508 /// the engine never deserializes Custom config on the hot path.
509 pub fn build(self) -> Result<Engine> {
510 Engine::new(self.workflows, self.handlers)
511 }
512}
513
514/// Walk every task in every workflow; for each `FunctionConfig::Custom`,
515/// look up the registered handler and ask it to parse the raw `input` JSON
516/// into its typed `Self::Input` (boxed as `dyn Any`). The cached result is
517/// stored on the task — dispatch then hands the handler a `&dyn Any` it
518/// downcasts in O(1).
519///
520/// Built-in async configs (`HttpCall`, `Enrich`, `PublishKafka`) are already
521/// parsed by serde's `untagged` representation on `FunctionConfig`; they
522/// need no second pass.
523///
524/// Returns `FunctionNotFound` when a Custom task references an unregistered
525/// handler — moves the failure from "first message" to engine construction.
526fn precompile_custom_inputs(
527 workflows: &mut [Workflow],
528 handlers: &HashMap<String, BoxedFunctionHandler>,
529) -> Result<()> {
530 for workflow in workflows {
531 for task in &mut workflow.tasks {
532 if let FunctionConfig::Custom {
533 name,
534 input,
535 compiled_input,
536 } = &mut task.function
537 {
538 let handler = handlers
539 .get(name)
540 .ok_or_else(|| function_not_found_error(name, handlers))?;
541 let parsed = handler.parse_input_box(input)?;
542 *compiled_input = Some(CompiledCustomInput(Arc::from(parsed)));
543 }
544 }
545 }
546 Ok(())
547}
548
549/// Build a `FunctionNotFound` error that lists both the registered custom
550/// handlers and the names of built-in functions, so a user with a typo
551/// (e.g. `htttp_call`) can immediately spot the intended name.
552fn function_not_found_error(
553 name: &str,
554 handlers: &HashMap<String, BoxedFunctionHandler>,
555) -> DataflowError {
556 use crate::engine::functions::config::BUILTIN_FUNCTION_NAMES;
557 let mut registered: Vec<&str> = handlers.keys().map(String::as_str).collect();
558 registered.sort_unstable();
559 let registered_part = if registered.is_empty() {
560 String::from("none")
561 } else {
562 registered.join(", ")
563 };
564 DataflowError::FunctionNotFound(format!(
565 "{name} (registered handlers: {registered_part}; built-ins: {})",
566 BUILTIN_FUNCTION_NAMES.join(", ")
567 ))
568}
569
570/// Stamp the standard processing metadata (`processed_at`, `engine_version`,
571/// and optionally `channel`) into the message context.
572///
573/// `now` is captured once at the top of `process_message` and reused so the
574/// timestamp on `metadata.processed_at` matches the one used for every
575/// `AuditTrail` entry within the same call.
576/// `engine_version` is the cached `Arc<OwnedDataValue::String>` owned by
577/// `Engine`; the deref-and-clone here is one Arc-bump's worth of work, not
578/// a `String` allocation.
579fn set_processing_metadata(
580 context: &mut OwnedDataValue,
581 engine_version: &Arc<OwnedDataValue>,
582 now: chrono::DateTime<Utc>,
583 channel: Option<&str>,
584) {
585 set_nested_value(
586 context,
587 "metadata.processed_at",
588 OwnedDataValue::String(now.to_rfc3339()),
589 );
590 set_nested_value(
591 context,
592 "metadata.engine_version",
593 (**engine_version).clone(),
594 );
595 if let Some(channel) = channel {
596 set_nested_value(
597 context,
598 "metadata.channel",
599 OwnedDataValue::String(channel.to_string()),
600 );
601 }
602}