dataflow_rs/engine/
mod.rs

1/*!
2# Engine Module
3
4This module implements the core workflow engine for dataflow-rs. The engine processes
5messages through workflows composed of tasks, providing a flexible and extensible
6data processing pipeline.
7
8## Key Components
9
10- **Engine**: The main engine that processes messages through workflows
11- **Workflow**: A collection of tasks with conditions that determine when they should be applied
12- **Task**: An individual processing unit that performs a specific function on a message
13- **AsyncFunctionHandler**: A trait implemented by task handlers to define custom async processing logic
14- **Message**: The data structure that flows through the engine, with data, metadata, and processing results
15*/
16
17pub mod error;
18pub mod functions;
19pub mod message;
20pub mod task;
21pub mod workflow;
22
23// Re-export key types for easier access
24pub use error::{DataflowError, ErrorInfo, Result};
25pub use functions::AsyncFunctionHandler;
26pub use message::Message;
27pub use task::Task;
28pub use workflow::Workflow;
29
30// Re-export the jsonlogic library under our namespace
31pub use datalogic_rs as jsonlogic;
32
33use chrono::Utc;
34use datalogic_rs::DataLogic;
35use log::{debug, error, info, warn};
36use message::AuditTrail;
37use serde_json::{json, Map, Number, Value};
38use std::{cell::RefCell, collections::HashMap};
39use tokio::time::sleep;
40
41// Thread-local DataLogic instance to avoid mutex contention
42thread_local! {
43    static THREAD_LOCAL_DATA_LOGIC: RefCell<DataLogic> = RefCell::new(DataLogic::new());
44}
45
46/// Configuration for retry behavior
47#[derive(Debug, Clone)]
48pub struct RetryConfig {
49    /// Maximum number of retries
50    pub max_retries: u32,
51    /// Delay between retries in milliseconds
52    pub retry_delay_ms: u64,
53    /// Whether to use exponential backoff
54    pub use_backoff: bool,
55}
56
57impl Default for RetryConfig {
58    fn default() -> Self {
59        Self {
60            max_retries: 3,
61            retry_delay_ms: 1000,
62            use_backoff: true,
63        }
64    }
65}
66
67/// Engine that processes messages through workflows using non-blocking async IO.
68///
69/// This engine is optimized for IO-bound workloads like HTTP requests, database access,
70/// and file operations. It uses Tokio for efficient async task execution.
71///
72/// Workflows are processed sequentially to ensure that later workflows can depend
73/// on the results of earlier workflows.
74pub struct Engine {
75    /// Registry of available workflows
76    workflows: HashMap<String, Workflow>,
77    /// Registry of function handlers that can be executed by tasks
78    task_functions: HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>,
79    /// Configuration for retry behavior
80    retry_config: RetryConfig,
81}
82
83impl Default for Engine {
84    fn default() -> Self {
85        Self::new()
86    }
87}
88
89impl Engine {
90    /// Creates a new Engine instance with built-in function handlers pre-registered.
91    ///
92    /// # Example
93    ///
94    /// ```
95    /// use dataflow_rs::Engine;
96    ///
97    /// let engine = Engine::new();
98    /// ```
99    pub fn new() -> Self {
100        let mut engine = Self {
101            workflows: HashMap::new(),
102            task_functions: HashMap::new(),
103            retry_config: RetryConfig::default(),
104        };
105
106        // Register built-in function handlers
107        for (name, handler) in functions::builtins::get_all_functions() {
108            engine.register_task_function(name, handler);
109        }
110
111        engine
112    }
113
114    /// Create a new engine instance without any pre-registered functions
115    pub fn new_empty() -> Self {
116        Self {
117            task_functions: HashMap::new(),
118            workflows: HashMap::new(),
119            retry_config: RetryConfig::default(),
120        }
121    }
122
123    /// Configure retry behavior
124    pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
125        self.retry_config = config;
126        self
127    }
128
129    /// Adds a workflow to the engine.
130    ///
131    /// # Arguments
132    ///
133    /// * `workflow` - The workflow to add
134    pub fn add_workflow(&mut self, workflow: &Workflow) {
135        if workflow.validate().is_ok() {
136            self.workflows.insert(workflow.id.clone(), workflow.clone());
137        } else {
138            error!("Invalid workflow: {}", workflow.id);
139        }
140    }
141
142    /// Registers a custom function handler with the engine.
143    ///
144    /// # Arguments
145    ///
146    /// * `name` - The name of the function handler
147    /// * `handler` - The function handler implementation
148    pub fn register_task_function(
149        &mut self,
150        name: String,
151        handler: Box<dyn AsyncFunctionHandler + Send + Sync>,
152    ) {
153        self.task_functions.insert(name, handler);
154    }
155
156    /// Check if a function with the given name is registered
157    pub fn has_function(&self, name: &str) -> bool {
158        self.task_functions.contains_key(name)
159    }
160
161    /// Processes a message through workflows that match their conditions.
162    ///
163    /// This async method:
164    /// 1. Iterates through workflows sequentially in deterministic order (sorted by ID)
165    /// 2. Evaluates conditions for each workflow right before execution
166    /// 3. Executes matching workflows one after another (not concurrently)
167    /// 4. Updates the message with processing results and audit trail
168    ///
169    /// Workflows are executed sequentially because later workflows may depend
170    /// on the results of earlier workflows, and their conditions may change
171    /// based on modifications made by previous workflows.
172    ///
173    /// # Arguments
174    ///
175    /// * `message` - The message to process
176    ///
177    /// # Returns
178    ///
179    /// * `Result<()>` - Success or an error if processing failed
180    pub async fn process_message(&self, message: &mut Message) -> Result<()> {
181        debug!(
182            "Processing message {} sequentially through workflows",
183            message.id
184        );
185
186        // Collect and sort workflows by ID to ensure deterministic execution order
187        // This prevents non-deterministic behavior caused by HashMap iteration order
188        let mut sorted_workflows: Vec<_> = self.workflows.iter().collect();
189        sorted_workflows.sort_by_key(|(id, _)| id.as_str());
190
191        // Process workflows sequentially in sorted order, evaluating conditions just before execution
192        for (_, workflow) in sorted_workflows {
193            // Evaluate workflow condition using current message state
194            let condition = workflow.condition.clone().unwrap_or(Value::Bool(true));
195
196            if !self
197                .evaluate_condition(&condition, &message.metadata)
198                .await?
199            {
200                debug!("Workflow {} skipped - condition not met", workflow.id);
201                continue;
202            }
203
204            info!("Processing workflow {}", workflow.id);
205
206            // Execute this workflow and merge results back into the message
207            let (workflow_id, workflow_message) =
208                Self::process_workflow(workflow.clone(), message.clone(), &self.task_functions)
209                    .await;
210
211            // Merge workflow results back into the original message
212            message.data = workflow_message.data;
213            message.metadata = workflow_message.metadata;
214            message.temp_data = workflow_message.temp_data;
215            message.audit_trail.extend(workflow_message.audit_trail);
216            message.errors.extend(workflow_message.errors);
217
218            info!("Completed processing workflow {}", workflow_id);
219
220            // If there were errors in this workflow, we may want to decide whether to continue
221            // For now, we continue processing remaining workflows even if one fails
222        }
223
224        debug!(
225            "Completed processing all workflows for message {}",
226            message.id
227        );
228        Ok(())
229    }
230
231    /// Process a single workflow with sequential task execution
232    async fn process_workflow(
233        workflow: Workflow,
234        mut message: Message,
235        task_functions: &HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>,
236    ) -> (String, Message) {
237        let workflow_id = workflow.id.clone();
238        let mut workflow_errors = Vec::new();
239
240        // Process tasks SEQUENTIALLY within this workflow
241        // IMPORTANT: Task order matters! Results from previous tasks are used by subsequent tasks.
242        // We intentionally process tasks one after another rather than concurrently.
243        for task in &workflow.tasks {
244            let task_condition = task.condition.clone().unwrap_or(Value::Bool(true));
245
246            // Evaluate task condition using thread-local DataLogic
247            let should_execute = THREAD_LOCAL_DATA_LOGIC.with(|data_logic_cell| {
248                let mut data_logic = data_logic_cell.borrow_mut();
249                data_logic.reset_arena();
250                data_logic
251                    .evaluate_json(&task_condition, &message.metadata, None)
252                    .map_err(|e| {
253                        DataflowError::LogicEvaluation(format!("Error evaluating condition: {}", e))
254                    })
255                    .map(|result| result.as_bool().unwrap_or(false))
256            });
257
258            // Handle condition evaluation result
259            let should_execute = match should_execute {
260                Ok(result) => result,
261                Err(e) => {
262                    workflow_errors.push(ErrorInfo::new(
263                        Some(workflow_id.clone()),
264                        Some(task.id.clone()),
265                        e.clone(),
266                    ));
267                    false
268                }
269            };
270
271            if !should_execute {
272                debug!("Task {} skipped - condition not met", task.id);
273                continue;
274            }
275
276            // Execute task if we have a handler
277            if let Some(function) = task_functions.get(&task.function.name) {
278                let task_id = task.id.clone();
279                let function_input = task.function.input.clone();
280
281                // Execute this task (with retries)
282                match Self::execute_task_static(
283                    &task_id,
284                    &workflow_id,
285                    &mut message,
286                    &function_input,
287                    function.as_ref(),
288                )
289                .await
290                {
291                    Ok(_) => {
292                        debug!("Task {} completed successfully", task_id);
293                    }
294                    Err(error) => {
295                        workflow_errors.push(ErrorInfo::new(
296                            Some(workflow_id.clone()),
297                            Some(task_id.clone()),
298                            error.clone(),
299                        ));
300
301                        // Break the task sequence if a task fails
302                        break;
303                    }
304                }
305            } else {
306                let error =
307                    DataflowError::Workflow(format!("Function '{}' not found", task.function.name));
308
309                workflow_errors.push(ErrorInfo::new(
310                    Some(workflow_id.clone()),
311                    Some(task.id.clone()),
312                    error,
313                ));
314
315                // Break the task sequence if a function is not found
316                break;
317            }
318        }
319
320        // Add any errors encountered to the message
321        message.errors.extend(workflow_errors);
322
323        // Return the processed message for this workflow
324        (workflow_id, message)
325    }
326
327    /// Static helper method to execute a task with retries
328    async fn execute_task_static(
329        task_id: &str,
330        workflow_id: &str,
331        message: &mut Message,
332        input_json: &Value,
333        function: &dyn AsyncFunctionHandler,
334    ) -> Result<()> {
335        info!("Executing task {} in workflow {}", task_id, workflow_id);
336
337        let mut last_error = None;
338        let mut retry_count = 0;
339        let max_retries = 3; // Default max retries
340        let retry_delay_ms = 1000; // Default retry delay in ms
341        let use_backoff = true; // Default backoff behavior
342
343        // Try executing the task with retries
344        while retry_count <= max_retries {
345            match function.execute(message, input_json).await {
346                Ok((status_code, changes)) => {
347                    // Success! Record audit trail and return
348                    message.audit_trail.push(AuditTrail {
349                        workflow_id: workflow_id.to_string(),
350                        task_id: task_id.to_string(),
351                        timestamp: Utc::now().to_rfc3339(),
352                        changes,
353                        status_code,
354                    });
355
356                    info!("Task {} completed with status {}", task_id, status_code);
357
358                    // Add progress metadata
359                    let mut progress = Map::new();
360                    progress.insert("task_id".to_string(), Value::String(task_id.to_string()));
361                    progress.insert(
362                        "workflow_id".to_string(),
363                        Value::String(workflow_id.to_string()),
364                    );
365                    progress.insert(
366                        "status_code".to_string(),
367                        Value::Number(Number::from(status_code)),
368                    );
369                    progress.insert(
370                        "timestamp".to_string(),
371                        Value::String(Utc::now().to_rfc3339()),
372                    );
373
374                    if retry_count > 0 {
375                        progress.insert(
376                            "retries".to_string(),
377                            Value::Number(Number::from(retry_count)),
378                        );
379                    }
380
381                    message.metadata["progress"] = json!(progress);
382
383                    return Ok(());
384                }
385                Err(e) => {
386                    last_error = Some(e.clone());
387
388                    if retry_count < max_retries {
389                        warn!(
390                            "Task {} execution failed, retry {}/{}: {:?}",
391                            task_id,
392                            retry_count + 1,
393                            max_retries,
394                            e
395                        );
396
397                        // Calculate delay with optional exponential backoff
398                        let delay = if use_backoff {
399                            retry_delay_ms * (2_u64.pow(retry_count))
400                        } else {
401                            retry_delay_ms
402                        };
403
404                        // Use tokio's non-blocking sleep
405                        sleep(std::time::Duration::from_millis(delay)).await;
406
407                        retry_count += 1;
408                    } else {
409                        break;
410                    }
411                }
412            }
413        }
414
415        // If we're here, all retries failed
416        let error = last_error.unwrap_or_else(|| {
417            DataflowError::Unknown("Unknown error during task execution".to_string())
418        });
419
420        error!(
421            "Task {} in workflow {} failed after {} retries: {:?}",
422            task_id, workflow_id, retry_count, error
423        );
424
425        Err(error)
426    }
427
428    /// Evaluates a condition using DataLogic
429    async fn evaluate_condition(&self, condition: &Value, data: &Value) -> Result<bool> {
430        // For simple boolean conditions, short-circuit
431        if let Value::Bool(b) = condition {
432            return Ok(*b);
433        }
434
435        // Use thread-local DataLogic instance instead of mutex-protected one
436        THREAD_LOCAL_DATA_LOGIC.with(|data_logic_cell| {
437            let mut data_logic = data_logic_cell.borrow_mut();
438            data_logic.reset_arena();
439            data_logic
440                .evaluate_json(condition, data, None)
441                .map_err(|e| {
442                    DataflowError::LogicEvaluation(format!("Error evaluating condition: {}", e))
443                })
444                .map(|result| result.as_bool().unwrap_or(false))
445        })
446    }
447}