dataflow_rs/engine/
mod.rs

1/*!
2# Engine Module
3
4This module implements the core workflow engine for dataflow-rs. The engine provides
5high-performance, asynchronous message processing through workflows composed of tasks.
6
7## Architecture
8
9The engine features a modular architecture with clear separation of concerns:
10- **Compiler**: Pre-compiles JSONLogic expressions for optimal runtime performance
11- **Executor**: Handles internal function execution (map, validation) efficiently
12- **Engine**: Orchestrates workflow processing with immutable, pre-configured workflows
13- **Direct DataLogic**: Each engine instance has its own DataLogic for zero contention
14
15## Key Components
16
17- **Engine**: Single-threaded engine optimized for both IO-bound and CPU-bound workloads
18- **LogicCompiler**: Compiles and caches JSONLogic expressions during initialization
19- **InternalExecutor**: Executes built-in map and validation functions with compiled logic
20- **Workflow**: Collection of tasks with JSONLogic conditions (metadata-only access)
21- **Task**: Individual processing unit that performs a specific function on a message
22- **FunctionHandler**: Trait for custom processing logic implementation
23- **Message**: Data structure flowing through the engine with audit trail
24
25## Performance Optimizations
26
27- **Pre-compilation**: All JSONLogic expressions compiled at startup
28- **Direct Instantiation**: DataLogic instances created directly, avoiding any locking
29- **Immutable Workflows**: Workflows defined at initialization for predictable performance
30- **Efficient Caching**: Compiled logic cached for fast repeated evaluations
31
32## Usage
33
34```rust,no_run
35use dataflow_rs::{Engine, Workflow, engine::message::Message};
36use serde_json::json;
37
38fn main() -> Result<(), Box<dyn std::error::Error>> {
39    // Define workflows
40    let workflows = vec![
41        Workflow::from_json(r#"{"id": "example", "name": "Example", "tasks": []}"#)?
42    ];
43
44    // Create engine with defaults (built-ins enabled)
45    let engine = Engine::new(workflows.clone(), None, None);
46
47    // Process messages
48    let mut message = Message::new(&json!({}));
49    engine.process_message(&mut message)?;
50
51    Ok(())
52}
53```
54*/
55
56pub mod compiler;
57pub mod error;
58pub mod executor;
59pub mod functions;
60pub mod message;
61pub mod retry;
62pub mod task;
63pub mod workflow;
64
65// Re-export key types for easier access
66pub use error::{DataflowError, ErrorInfo, Result};
67pub use functions::{FunctionConfig, FunctionHandler};
68pub use message::Message;
69pub use retry::RetryConfig;
70pub use task::Task;
71pub use workflow::Workflow;
72
73use chrono::Utc;
74use datalogic_rs::{DataLogic, Logic};
75use log::{debug, error, info, warn};
76use message::{AuditTrail, Change};
77use serde_json::{Map, Number, Value, json};
78use std::collections::HashMap;
79use std::sync::Arc;
80
81use compiler::LogicCompiler;
82use executor::InternalExecutor;
83
84/// High-performance workflow engine for message processing.
85///
86/// ## Architecture
87///
88/// The engine features a modular design optimized for both IO-bound and CPU-bound workloads:
89/// - **Separation of Concerns**: Compiler handles pre-compilation, Executor handles runtime
90/// - **Direct DataLogic**: Single DataLogic instance per engine for zero contention
91/// - **Immutable Workflows**: All workflows compiled and cached at initialization
92/// - **Pre-compiled Logic**: JSONLogic expressions compiled once for optimal performance
93///
94/// ## Performance Characteristics
95///
96/// - **Zero Runtime Compilation**: All logic compiled during initialization
97/// - **Cache-Friendly**: Compiled logic stored in contiguous memory
98/// - **Predictable Latency**: No runtime allocations for logic evaluation
99/// - **Thread-Safe Design**: Applications can safely use multiple engine instances across threads
100pub struct Engine {
101    /// Registry of available workflows (immutable after initialization)
102    workflows: Arc<HashMap<String, Workflow>>,
103    /// Registry of function handlers that can be executed by tasks (immutable after initialization)
104    task_functions: Arc<HashMap<String, Box<dyn FunctionHandler + Send + Sync>>>,
105    /// DataLogic instance for JSONLogic evaluation
106    datalogic: DataLogic<'static>,
107    /// Compiled logic cache
108    logic_cache: Vec<Logic<'static>>,
109    /// Configuration for retry behavior
110    retry_config: RetryConfig,
111}
112
113impl Default for Engine {
114    fn default() -> Self {
115        Self::new(Vec::new(), None, None)
116    }
117}
118
119impl Engine {
120    /// Creates a new Engine instance with configurable parameters.
121    ///
122    /// # Arguments
123    /// * `workflows` - The workflows to use for processing messages
124    /// * `custom_functions` - Optional custom function handlers (None uses empty map)
125    /// * `include_builtins` - Optional flag to include built-in functions (defaults to true if None)
126    /// * `retry_config` - Optional retry configuration (uses default if None)
127    ///
128    /// # Example
129    ///
130    /// ```
131    /// use dataflow_rs::{Engine, Workflow};
132    ///
133    /// let workflows = vec![Workflow::from_json(r#"{"id": "test", "name": "Test", "priority": 0, "tasks": []}"#).unwrap()];
134    ///
135    /// // Simple usage with defaults
136    /// let engine = Engine::new(workflows.clone(), None, None);
137    ///
138    /// ```
139    pub fn new(
140        workflows: Vec<Workflow>,
141        custom_functions: Option<HashMap<String, Box<dyn FunctionHandler + Send + Sync>>>,
142        retry_config: Option<RetryConfig>,
143    ) -> Self {
144        // Compile workflows
145        let mut compiler = LogicCompiler::new();
146        let workflow_map = compiler.compile_workflows(workflows);
147        let (datalogic, logic_cache) = compiler.into_parts();
148
149        let mut task_functions = custom_functions.unwrap_or_default();
150
151        // Add built-in function handlers if requested (defaults to true)
152        for (name, handler) in functions::builtins::get_all_functions() {
153            task_functions.insert(name, handler);
154        }
155
156        Self {
157            workflows: Arc::new(workflow_map),
158            task_functions: Arc::new(task_functions),
159            datalogic,
160            logic_cache,
161            retry_config: retry_config.unwrap_or_default(),
162        }
163    }
164
165    /// Get the configured retry configuration
166    pub fn retry_config(&self) -> &RetryConfig {
167        &self.retry_config
168    }
169
170    /// Get the configured workflows
171    pub fn workflows(&self) -> &HashMap<String, Workflow> {
172        &self.workflows
173    }
174
175    /// Get the registered task functions
176    pub fn task_functions(&self) -> &HashMap<String, Box<dyn FunctionHandler + Send + Sync>> {
177        &self.task_functions
178    }
179
180    /// Check if a function with the given name is registered
181    pub fn has_function(&self, name: &str) -> bool {
182        self.task_functions.contains_key(name)
183    }
184
185    /// Processes a message through workflows that match their conditions.
186    ///
187    /// This method:
188    /// 1. Iterates through workflows sequentially in deterministic order (sorted by ID)
189    /// 2. Evaluates conditions for each workflow right before execution
190    /// 3. Executes matching workflows one after another (not concurrently)
191    /// 4. Updates the message with processing results and audit trail
192    ///
193    /// Workflows are executed sequentially because later workflows may depend
194    /// on the results of earlier workflows, and their conditions may change
195    /// based on modifications made by previous workflows.
196    ///
197    /// # Arguments
198    ///
199    /// * `message` - The message to process
200    ///
201    /// # Returns
202    ///
203    /// * `Result<()>` - Success or an error if processing failed
204    pub fn process_message(&self, message: &mut Message) -> Result<()> {
205        debug!(
206            "Processing message {} sequentially through workflows",
207            message.id
208        );
209
210        // Sort workflows by priority and ID to ensure deterministic execution order
211        let mut sorted_workflows: Vec<_> = self.workflows.iter().collect();
212        sorted_workflows.sort_by_key(|(id, workflow)| (workflow.priority, id.as_str()));
213
214        let executor = InternalExecutor::new(&self.datalogic, &self.logic_cache);
215
216        // Process workflows sequentially in sorted order
217        for (_, workflow) in sorted_workflows {
218            // Evaluate workflow condition using current message state
219            if !executor.evaluate_condition(
220                workflow.condition_index,
221                &workflow.condition,
222                &message.metadata,
223            )? {
224                debug!("Workflow {} skipped - condition not met", workflow.id);
225                continue;
226            }
227
228            info!("Processing workflow {}", workflow.id);
229            self.process_workflow(workflow, message, &executor);
230            info!("Completed processing workflow {}", workflow.id);
231        }
232
233        debug!(
234            "Completed processing all workflows for message {}",
235            message.id
236        );
237        Ok(())
238    }
239
240    fn process_workflow(
241        &self,
242        workflow: &Workflow,
243        message: &mut Message,
244        executor: &InternalExecutor,
245    ) {
246        let workflow_id = workflow.id.clone();
247        let mut workflow_errors = Vec::new();
248
249        // Cache timestamp for this workflow execution to reduce clock_gettime calls
250        let workflow_timestamp = Utc::now().to_rfc3339();
251
252        // Process tasks SEQUENTIALLY within this workflow
253        for task in &workflow.tasks {
254            // Evaluate task condition
255            let should_execute = executor.evaluate_condition(
256                task.condition_index,
257                &task.condition,
258                &message.metadata,
259            );
260
261            let should_execute = match should_execute {
262                Ok(result) => result,
263                Err(e) => {
264                    workflow_errors.push(ErrorInfo::new(
265                        Some(workflow_id.clone()),
266                        Some(task.id.clone()),
267                        e.clone(),
268                    ));
269                    false
270                }
271            };
272
273            if !should_execute {
274                debug!("Task {} skipped - condition not met", task.id);
275                continue;
276            }
277
278            // Execute task based on its type
279            let task_id = task.id.clone();
280            let function_config = &task.function;
281
282            let execution_result = match function_config {
283                FunctionConfig::Map { input, .. } => executor.execute_map(message, input),
284                FunctionConfig::Validation { input, .. } => {
285                    executor.execute_validate(message, input)
286                }
287                FunctionConfig::Custom { name, .. } => {
288                    if let Some(function) = self.task_functions.get(name) {
289                        self.execute_task(
290                            &task_id,
291                            &workflow_id,
292                            message,
293                            function_config,
294                            function.as_ref(),
295                        )
296                    } else {
297                        Err(DataflowError::Workflow(format!(
298                            "Function '{}' not found",
299                            name
300                        )))
301                    }
302                }
303            };
304
305            // Handle execution result
306            match execution_result {
307                Ok((status_code, changes)) => {
308                    debug!(
309                        "Task {} completed successfully with status {}",
310                        task_id, status_code
311                    );
312
313                    // Record audit trail using cached timestamp
314                    message.audit_trail.push(AuditTrail {
315                        workflow_id: workflow_id.to_string(),
316                        task_id: task_id.to_string(),
317                        timestamp: workflow_timestamp.clone(),
318                        changes,
319                        status_code,
320                    });
321
322                    // Add progress metadata with cached timestamp
323                    self.update_progress_metadata_with_timestamp(
324                        message,
325                        &task_id,
326                        &workflow_id,
327                        status_code,
328                        &workflow_timestamp,
329                    );
330                }
331                Err(error) => {
332                    workflow_errors.push(ErrorInfo::new(
333                        Some(workflow_id.clone()),
334                        Some(task_id.clone()),
335                        error.clone(),
336                    ));
337                    break; // Break the task sequence if a task fails
338                }
339            }
340        }
341
342        // Add any errors encountered to the message
343        message.errors.extend(workflow_errors);
344    }
345
346    /// Execute a custom task with retries
347    fn execute_task(
348        &self,
349        task_id: &str,
350        workflow_id: &str,
351        message: &mut Message,
352        config: &FunctionConfig,
353        function: &dyn FunctionHandler,
354    ) -> Result<(usize, Vec<Change>)> {
355        info!("Executing task {} in workflow {}", task_id, workflow_id);
356
357        let mut last_error = None;
358        let mut retry_count = 0;
359
360        // Try executing the task with retries
361        while retry_count <= self.retry_config.max_retries {
362            match function.execute(message, config, &self.datalogic) {
363                Ok((status_code, changes)) => {
364                    info!("Task {} completed with status {}", task_id, status_code);
365
366                    if retry_count > 0 {
367                        self.update_progress_metadata_with_retries(
368                            message,
369                            task_id,
370                            workflow_id,
371                            status_code,
372                            retry_count,
373                        );
374                    } else {
375                        self.update_progress_metadata(message, task_id, workflow_id, status_code);
376                    }
377
378                    return Ok((status_code, changes));
379                }
380                Err(e) => {
381                    last_error = Some(e.clone());
382
383                    // Check if this error is retryable
384                    if retry_count < self.retry_config.max_retries && e.retryable() {
385                        warn!(
386                            "Task {} execution failed with retryable error, retry {}/{}: {:?}",
387                            task_id,
388                            retry_count + 1,
389                            self.retry_config.max_retries,
390                            e
391                        );
392
393                        self.retry_config.sleep(retry_count);
394                        retry_count += 1;
395                    } else {
396                        if !e.retryable() {
397                            info!(
398                                "Task {} failed with non-retryable error, skipping retries: {:?}",
399                                task_id, e
400                            );
401                        }
402                        break;
403                    }
404                }
405            }
406        }
407
408        // If we're here, all retries failed
409        let error = last_error.unwrap_or_else(|| {
410            DataflowError::Unknown("Unknown error during task execution".to_string())
411        });
412
413        error!(
414            "Task {} in workflow {} failed after {} retries: {:?}",
415            task_id, workflow_id, retry_count, error
416        );
417
418        Err(error)
419    }
420
421    /// Update progress metadata
422    fn update_progress_metadata(
423        &self,
424        message: &mut Message,
425        task_id: &str,
426        workflow_id: &str,
427        status_code: usize,
428    ) {
429        let timestamp = Utc::now().to_rfc3339();
430        self.update_progress_metadata_with_timestamp(
431            message,
432            task_id,
433            workflow_id,
434            status_code,
435            &timestamp,
436        );
437    }
438
439    /// Update progress metadata with provided timestamp
440    fn update_progress_metadata_with_timestamp(
441        &self,
442        message: &mut Message,
443        task_id: &str,
444        workflow_id: &str,
445        status_code: usize,
446        timestamp: &str,
447    ) {
448        let mut progress = Map::new();
449        progress.insert("task_id".to_string(), Value::String(task_id.to_string()));
450        progress.insert(
451            "workflow_id".to_string(),
452            Value::String(workflow_id.to_string()),
453        );
454        progress.insert(
455            "status_code".to_string(),
456            Value::Number(Number::from(status_code)),
457        );
458        progress.insert(
459            "timestamp".to_string(),
460            Value::String(timestamp.to_string()),
461        );
462        message.metadata["progress"] = json!(progress);
463    }
464
465    /// Update progress metadata with retry count
466    fn update_progress_metadata_with_retries(
467        &self,
468        message: &mut Message,
469        task_id: &str,
470        workflow_id: &str,
471        status_code: usize,
472        retry_count: u32,
473    ) {
474        let mut progress = Map::new();
475        progress.insert("task_id".to_string(), Value::String(task_id.to_string()));
476        progress.insert(
477            "workflow_id".to_string(),
478            Value::String(workflow_id.to_string()),
479        );
480        progress.insert(
481            "status_code".to_string(),
482            Value::Number(Number::from(status_code)),
483        );
484        progress.insert(
485            "timestamp".to_string(),
486            Value::String(Utc::now().to_rfc3339()),
487        );
488        progress.insert(
489            "retries".to_string(),
490            Value::Number(Number::from(retry_count)),
491        );
492        message.metadata["progress"] = json!(progress);
493    }
494
495    /// Evaluate logic using compiled index or direct evaluation (public for testing)
496    pub fn evaluate_logic(
497        &self,
498        logic_index: Option<usize>,
499        logic: &Value,
500        data: &Value,
501    ) -> Result<Value> {
502        let executor = InternalExecutor::new(&self.datalogic, &self.logic_cache);
503        executor.evaluate_logic(logic_index, logic, data)
504    }
505}