dataflow_rs/engine/
mod.rs

1/*!
2# Engine Module
3
4This module implements the core workflow engine for dataflow-rs. The engine processes
5messages through workflows composed of tasks, providing a flexible and extensible
6data processing pipeline.
7
8## Key Components
9
10- **Engine**: The main engine that processes messages through workflows
11- **Workflow**: A collection of tasks with conditions that determine when they should be applied
12- **Task**: An individual processing unit that performs a specific function on a message
13- **AsyncFunctionHandler**: A trait implemented by task handlers to define custom async processing logic
14- **Message**: The data structure that flows through the engine, with data, metadata, and processing results
15*/
16
17pub mod error;
18pub mod functions;
19pub mod message;
20pub mod task;
21pub mod workflow;
22
23// Re-export key types for easier access
24pub use error::{DataflowError, ErrorInfo, Result};
25pub use functions::AsyncFunctionHandler;
26pub use message::Message;
27pub use task::Task;
28pub use workflow::Workflow;
29
30// Re-export the jsonlogic library under our namespace
31pub use datalogic_rs as jsonlogic;
32
33use chrono::Utc;
34use datalogic_rs::DataLogic;
35use futures::{stream::FuturesUnordered, StreamExt};
36use log::{debug, error, info, warn};
37use message::AuditTrail;
38use serde_json::{json, Map, Number, Value};
39use std::{cell::RefCell, collections::HashMap};
40use tokio::time::sleep;
41
42// Thread-local DataLogic instance to avoid mutex contention
43thread_local! {
44    static THREAD_LOCAL_DATA_LOGIC: RefCell<DataLogic> = RefCell::new(DataLogic::new());
45}
46
47/// Configuration for retry behavior
48#[derive(Debug, Clone)]
49pub struct RetryConfig {
50    /// Maximum number of retries
51    pub max_retries: u32,
52    /// Delay between retries in milliseconds
53    pub retry_delay_ms: u64,
54    /// Whether to use exponential backoff
55    pub use_backoff: bool,
56}
57
58impl Default for RetryConfig {
59    fn default() -> Self {
60        Self {
61            max_retries: 3,
62            retry_delay_ms: 1000,
63            use_backoff: true,
64        }
65    }
66}
67
68/// Engine that processes messages through workflows using non-blocking async IO.
69///
70/// This engine is optimized for IO-bound workloads like HTTP requests, database access,
71/// and file operations. It uses Tokio for efficient async task execution.
72pub struct Engine {
73    /// Registry of available workflows
74    workflows: HashMap<String, Workflow>,
75    /// Registry of function handlers that can be executed by tasks
76    task_functions: HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>,
77    /// Configuration for retry behavior
78    retry_config: RetryConfig,
79    /// Maximum number of concurrent tasks to execute
80    max_concurrency: usize,
81}
82
83impl Default for Engine {
84    fn default() -> Self {
85        Self::new()
86    }
87}
88
89impl Engine {
90    /// Creates a new Engine instance with built-in function handlers pre-registered.
91    ///
92    /// # Example
93    ///
94    /// ```
95    /// use dataflow_rs::Engine;
96    ///
97    /// let engine = Engine::new();
98    /// ```
99    pub fn new() -> Self {
100        let mut engine = Self {
101            workflows: HashMap::new(),
102            task_functions: HashMap::new(),
103            retry_config: RetryConfig::default(),
104            max_concurrency: 10, // Default max concurrency
105        };
106
107        // Register built-in function handlers
108        for (name, handler) in functions::builtins::get_all_functions() {
109            engine.register_task_function(name, handler);
110        }
111
112        engine
113    }
114
115    /// Create a new engine instance without any pre-registered functions
116    pub fn new_empty() -> Self {
117        Self {
118            task_functions: HashMap::new(),
119            workflows: HashMap::new(),
120            retry_config: RetryConfig::default(),
121            max_concurrency: 10, // Default max concurrency
122        }
123    }
124
125    /// Configure max concurrency
126    pub fn with_max_concurrency(mut self, max_concurrency: usize) -> Self {
127        self.max_concurrency = max_concurrency;
128        self
129    }
130
131    /// Configure retry behavior
132    pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
133        self.retry_config = config;
134        self
135    }
136
137    /// Adds a workflow to the engine.
138    ///
139    /// # Arguments
140    ///
141    /// * `workflow` - The workflow to add
142    pub fn add_workflow(&mut self, workflow: &Workflow) {
143        if workflow.validate().is_ok() {
144            self.workflows.insert(workflow.id.clone(), workflow.clone());
145        } else {
146            error!("Invalid workflow: {}", workflow.id);
147        }
148    }
149
150    /// Registers a custom function handler with the engine.
151    ///
152    /// # Arguments
153    ///
154    /// * `name` - The name of the function handler
155    /// * `handler` - The function handler implementation
156    pub fn register_task_function(
157        &mut self,
158        name: String,
159        handler: Box<dyn AsyncFunctionHandler + Send + Sync>,
160    ) {
161        self.task_functions.insert(name, handler);
162    }
163
164    /// Check if a function with the given name is registered
165    pub fn has_function(&self, name: &str) -> bool {
166        self.task_functions.contains_key(name)
167    }
168
169    /// Processes a message through workflows that match their conditions.
170    ///
171    /// This async method:
172    /// 1. Evaluates conditions for each workflow
173    /// 2. For matching workflows, executes each task concurrently up to max_concurrency
174    /// 3. Updates the message with processing results and audit trail
175    ///
176    /// # Arguments
177    ///
178    /// * `message` - The message to process
179    ///
180    /// # Returns
181    ///
182    /// * `Result<()>` - Success or an error if processing failed
183    pub async fn process_message(&self, message: &mut Message) -> Result<()> {
184        debug!("Processing message {} asynchronously", message.id);
185
186        // Create a FuturesUnordered to track concurrent workflow execution
187        let mut workflow_futures = FuturesUnordered::new();
188
189        // First filter workflows that should be executed and prepare them for concurrent processing
190        let mut workflows_to_process = Vec::new();
191
192        for workflow in self.workflows.values() {
193            // Check workflow condition
194            let condition = workflow.condition.clone().unwrap_or(Value::Bool(true));
195            let metadata_ref = &message.metadata;
196
197            if !self.evaluate_condition(&condition, metadata_ref).await? {
198                debug!("Workflow {} skipped - condition not met", workflow.id);
199                continue;
200            }
201
202            info!("Preparing to process workflow {}", workflow.id);
203            workflows_to_process.push(workflow.clone());
204        }
205
206        // Start processing workflows up to max_concurrency at a time
207        let engine_task_functions = &self.task_functions;
208
209        // Start initial batch of workflows
210        let initial_count = self.max_concurrency.min(workflows_to_process.len());
211        for workflow in workflows_to_process.iter().take(initial_count) {
212            let message_clone = message.clone();
213
214            workflow_futures.push(Self::process_workflow(
215                workflow.clone(),
216                message_clone,
217                engine_task_functions,
218            ));
219        }
220
221        // Process remaining workflows as current ones complete
222        let mut next_workflow_index = initial_count;
223
224        // As workflows complete, process the results and start more workflows if needed
225        while let Some((workflow_id, workflow_message)) = workflow_futures.next().await {
226            // Merge this workflow's results back into the original message
227            message.data = workflow_message.data;
228            message.metadata = workflow_message.metadata;
229            message.temp_data = workflow_message.temp_data;
230            message.audit_trail.extend(workflow_message.audit_trail);
231            message.errors.extend(workflow_message.errors);
232
233            info!("Completed processing workflow {}", workflow_id);
234
235            // Start a new workflow if there are more
236            if next_workflow_index < workflows_to_process.len() {
237                let workflow = workflows_to_process[next_workflow_index].clone();
238                next_workflow_index += 1;
239
240                let message_clone = message.clone();
241
242                workflow_futures.push(Self::process_workflow(
243                    workflow,
244                    message_clone,
245                    &self.task_functions,
246                ));
247            }
248        }
249
250        Ok(())
251    }
252
253    /// Process a single workflow with sequential task execution
254    async fn process_workflow(
255        workflow: Workflow,
256        mut message: Message,
257        task_functions: &HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>,
258    ) -> (String, Message) {
259        let workflow_id = workflow.id.clone();
260        let mut workflow_errors = Vec::new();
261
262        // Process tasks SEQUENTIALLY within this workflow
263        // IMPORTANT: Task order matters! Results from previous tasks are used by subsequent tasks.
264        // We intentionally process tasks one after another rather than concurrently.
265        for task in &workflow.tasks {
266            let task_condition = task.condition.clone().unwrap_or(Value::Bool(true));
267
268            // Evaluate task condition using thread-local DataLogic
269            let should_execute = THREAD_LOCAL_DATA_LOGIC.with(|data_logic_cell| {
270                let data_logic = data_logic_cell.borrow_mut();
271                data_logic
272                    .evaluate_json(&task_condition, &message.metadata, None)
273                    .map_err(|e| {
274                        DataflowError::LogicEvaluation(format!("Error evaluating condition: {}", e))
275                    })
276                    .map(|result| result.as_bool().unwrap_or(false))
277            });
278
279            // Handle condition evaluation result
280            let should_execute = match should_execute {
281                Ok(result) => result,
282                Err(e) => {
283                    workflow_errors.push(ErrorInfo::new(
284                        Some(workflow_id.clone()),
285                        Some(task.id.clone()),
286                        e.clone(),
287                    ));
288                    false
289                }
290            };
291
292            if !should_execute {
293                debug!("Task {} skipped - condition not met", task.id);
294                continue;
295            }
296
297            // Execute task if we have a handler
298            if let Some(function) = task_functions.get(&task.function.name) {
299                let task_id = task.id.clone();
300                let function_input = task.function.input.clone();
301
302                // Execute this task (with retries)
303                match Self::execute_task_static(
304                    &task_id,
305                    &workflow_id,
306                    &mut message,
307                    &function_input,
308                    function.as_ref(),
309                )
310                .await
311                {
312                    Ok(_) => {
313                        debug!("Task {} completed successfully", task_id);
314                    }
315                    Err(error) => {
316                        workflow_errors.push(ErrorInfo::new(
317                            Some(workflow_id.clone()),
318                            Some(task_id.clone()),
319                            error.clone(),
320                        ));
321
322                        // Break the task sequence if a task fails
323                        break;
324                    }
325                }
326            } else {
327                let error =
328                    DataflowError::Workflow(format!("Function '{}' not found", task.function.name));
329
330                workflow_errors.push(ErrorInfo::new(
331                    Some(workflow_id.clone()),
332                    Some(task.id.clone()),
333                    error,
334                ));
335
336                // Break the task sequence if a function is not found
337                break;
338            }
339        }
340
341        // Add any errors encountered to the message
342        message.errors.extend(workflow_errors);
343
344        // Return the processed message for this workflow
345        (workflow_id, message)
346    }
347
348    /// Static helper method to execute a task with retries
349    async fn execute_task_static(
350        task_id: &str,
351        workflow_id: &str,
352        message: &mut Message,
353        input_json: &Value,
354        function: &dyn AsyncFunctionHandler,
355    ) -> Result<()> {
356        info!("Executing task {} in workflow {}", task_id, workflow_id);
357
358        let mut last_error = None;
359        let mut retry_count = 0;
360        let max_retries = 3; // Default max retries
361        let retry_delay_ms = 1000; // Default retry delay in ms
362        let use_backoff = true; // Default backoff behavior
363
364        // Try executing the task with retries
365        while retry_count <= max_retries {
366            match function.execute(message, input_json).await {
367                Ok((status_code, changes)) => {
368                    // Success! Record audit trail and return
369                    message.audit_trail.push(AuditTrail {
370                        workflow_id: workflow_id.to_string(),
371                        task_id: task_id.to_string(),
372                        timestamp: Utc::now().to_rfc3339(),
373                        changes,
374                        status_code,
375                    });
376
377                    info!("Task {} completed with status {}", task_id, status_code);
378
379                    // Add progress metadata
380                    let mut progress = Map::new();
381                    progress.insert("task_id".to_string(), Value::String(task_id.to_string()));
382                    progress.insert(
383                        "workflow_id".to_string(),
384                        Value::String(workflow_id.to_string()),
385                    );
386                    progress.insert(
387                        "status_code".to_string(),
388                        Value::Number(Number::from(status_code)),
389                    );
390                    progress.insert(
391                        "timestamp".to_string(),
392                        Value::String(Utc::now().to_rfc3339()),
393                    );
394
395                    if retry_count > 0 {
396                        progress.insert(
397                            "retries".to_string(),
398                            Value::Number(Number::from(retry_count)),
399                        );
400                    }
401
402                    message.metadata["progress"] = json!(progress);
403
404                    return Ok(());
405                }
406                Err(e) => {
407                    last_error = Some(e.clone());
408
409                    if retry_count < max_retries {
410                        warn!(
411                            "Task {} execution failed, retry {}/{}: {:?}",
412                            task_id,
413                            retry_count + 1,
414                            max_retries,
415                            e
416                        );
417
418                        // Calculate delay with optional exponential backoff
419                        let delay = if use_backoff {
420                            retry_delay_ms * (2_u64.pow(retry_count))
421                        } else {
422                            retry_delay_ms
423                        };
424
425                        // Use tokio's non-blocking sleep
426                        sleep(std::time::Duration::from_millis(delay)).await;
427
428                        retry_count += 1;
429                    } else {
430                        break;
431                    }
432                }
433            }
434        }
435
436        // If we're here, all retries failed
437        let error = last_error.unwrap_or_else(|| {
438            DataflowError::Unknown("Unknown error during task execution".to_string())
439        });
440
441        error!(
442            "Task {} in workflow {} failed after {} retries: {:?}",
443            task_id, workflow_id, retry_count, error
444        );
445
446        Err(error)
447    }
448
449    /// Evaluates a condition using DataLogic
450    async fn evaluate_condition(&self, condition: &Value, data: &Value) -> Result<bool> {
451        // For simple boolean conditions, short-circuit
452        if let Value::Bool(b) = condition {
453            return Ok(*b);
454        }
455
456        // Use thread-local DataLogic instance instead of mutex-protected one
457        THREAD_LOCAL_DATA_LOGIC.with(|data_logic_cell| {
458            let data_logic = data_logic_cell.borrow_mut();
459            data_logic
460                .evaluate_json(condition, data, None)
461                .map_err(|e| {
462                    DataflowError::LogicEvaluation(format!("Error evaluating condition: {}", e))
463                })
464                .map(|result| result.as_bool().unwrap_or(false))
465        })
466    }
467}