dataflow_rs/engine/mod.rs
1/*!
2# Engine Module
3
4This module implements the core workflow engine for dataflow-rs. The engine processes
5messages through workflows composed of tasks, providing a flexible and extensible
6data processing pipeline.
7
8## Key Components
9
10- **Engine**: The main engine that processes messages through workflows
11- **Workflow**: A collection of tasks with conditions that determine when they should be applied
12- **Task**: An individual processing unit that performs a specific function on a message
13- **AsyncFunctionHandler**: A trait implemented by task handlers to define custom async processing logic
14- **Message**: The data structure that flows through the engine, with data, metadata, and processing results
15*/
16
17pub mod error;
18pub mod functions;
19pub mod message;
20pub mod task;
21pub mod workflow;
22
23// Re-export key types for easier access
24pub use error::{DataflowError, ErrorInfo, Result};
25pub use functions::AsyncFunctionHandler;
26pub use message::Message;
27pub use task::Task;
28pub use workflow::Workflow;
29
30// Re-export the jsonlogic library under our namespace
31pub use datalogic_rs as jsonlogic;
32
33use chrono::Utc;
34use datalogic_rs::DataLogic;
35use log::{debug, error, info, warn};
36use message::AuditTrail;
37use serde_json::{json, Map, Number, Value};
38use std::{cell::RefCell, collections::HashMap};
39use tokio::time::sleep;
40
41// Thread-local DataLogic instance to avoid mutex contention
42thread_local! {
43 static THREAD_LOCAL_DATA_LOGIC: RefCell<DataLogic> = RefCell::new(DataLogic::new());
44}
45
46/// Configuration for retry behavior
47#[derive(Debug, Clone)]
48pub struct RetryConfig {
49 /// Maximum number of retries
50 pub max_retries: u32,
51 /// Delay between retries in milliseconds
52 pub retry_delay_ms: u64,
53 /// Whether to use exponential backoff
54 pub use_backoff: bool,
55}
56
57impl Default for RetryConfig {
58 fn default() -> Self {
59 Self {
60 max_retries: 3,
61 retry_delay_ms: 1000,
62 use_backoff: true,
63 }
64 }
65}
66
67/// Engine that processes messages through workflows using non-blocking async IO.
68///
69/// This engine is optimized for IO-bound workloads like HTTP requests, database access,
70/// and file operations. It uses Tokio for efficient async task execution.
71///
72/// Workflows are processed sequentially to ensure that later workflows can depend
73/// on the results of earlier workflows.
74pub struct Engine {
75 /// Registry of available workflows
76 workflows: HashMap<String, Workflow>,
77 /// Registry of function handlers that can be executed by tasks
78 task_functions: HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>,
79 /// Configuration for retry behavior
80 retry_config: RetryConfig,
81}
82
83impl Default for Engine {
84 fn default() -> Self {
85 Self::new()
86 }
87}
88
89impl Engine {
90 /// Creates a new Engine instance with built-in function handlers pre-registered.
91 ///
92 /// # Example
93 ///
94 /// ```
95 /// use dataflow_rs::Engine;
96 ///
97 /// let engine = Engine::new();
98 /// ```
99 pub fn new() -> Self {
100 let mut engine = Self {
101 workflows: HashMap::new(),
102 task_functions: HashMap::new(),
103 retry_config: RetryConfig::default(),
104 };
105
106 // Register built-in function handlers
107 for (name, handler) in functions::builtins::get_all_functions() {
108 engine.register_task_function(name, handler);
109 }
110
111 engine
112 }
113
114 /// Create a new engine instance without any pre-registered functions
115 pub fn new_empty() -> Self {
116 Self {
117 task_functions: HashMap::new(),
118 workflows: HashMap::new(),
119 retry_config: RetryConfig::default(),
120 }
121 }
122
123 /// Configure retry behavior
124 pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
125 self.retry_config = config;
126 self
127 }
128
129 /// Adds a workflow to the engine.
130 ///
131 /// # Arguments
132 ///
133 /// * `workflow` - The workflow to add
134 pub fn add_workflow(&mut self, workflow: &Workflow) {
135 if workflow.validate().is_ok() {
136 self.workflows.insert(workflow.id.clone(), workflow.clone());
137 } else {
138 error!("Invalid workflow: {}", workflow.id);
139 }
140 }
141
142 /// Registers a custom function handler with the engine.
143 ///
144 /// # Arguments
145 ///
146 /// * `name` - The name of the function handler
147 /// * `handler` - The function handler implementation
148 pub fn register_task_function(
149 &mut self,
150 name: String,
151 handler: Box<dyn AsyncFunctionHandler + Send + Sync>,
152 ) {
153 self.task_functions.insert(name, handler);
154 }
155
156 /// Check if a function with the given name is registered
157 pub fn has_function(&self, name: &str) -> bool {
158 self.task_functions.contains_key(name)
159 }
160
161 /// Processes a message through workflows that match their conditions.
162 ///
163 /// This async method:
164 /// 1. Iterates through workflows sequentially in deterministic order (sorted by ID)
165 /// 2. Evaluates conditions for each workflow right before execution
166 /// 3. Executes matching workflows one after another (not concurrently)
167 /// 4. Updates the message with processing results and audit trail
168 ///
169 /// Workflows are executed sequentially because later workflows may depend
170 /// on the results of earlier workflows, and their conditions may change
171 /// based on modifications made by previous workflows.
172 ///
173 /// # Arguments
174 ///
175 /// * `message` - The message to process
176 ///
177 /// # Returns
178 ///
179 /// * `Result<()>` - Success or an error if processing failed
180 pub async fn process_message(&self, message: &mut Message) -> Result<()> {
181 debug!(
182 "Processing message {} sequentially through workflows",
183 message.id
184 );
185
186 // Collect and sort workflows by ID to ensure deterministic execution order
187 // This prevents non-deterministic behavior caused by HashMap iteration order
188 let mut sorted_workflows: Vec<_> = self.workflows.iter().collect();
189 sorted_workflows.sort_by_key(|(id, _)| id.as_str());
190
191 // Process workflows sequentially in sorted order, evaluating conditions just before execution
192 for (_, workflow) in sorted_workflows {
193 // Evaluate workflow condition using current message state
194 let condition = workflow.condition.clone().unwrap_or(Value::Bool(true));
195
196 if !self
197 .evaluate_condition(&condition, &message.metadata)
198 .await?
199 {
200 debug!("Workflow {} skipped - condition not met", workflow.id);
201 continue;
202 }
203
204 info!("Processing workflow {}", workflow.id);
205
206 // Execute this workflow and merge results back into the message
207 let (workflow_id, workflow_message) =
208 Self::process_workflow(workflow.clone(), message.clone(), &self.task_functions)
209 .await;
210
211 // Merge workflow results back into the original message
212 message.data = workflow_message.data;
213 message.metadata = workflow_message.metadata;
214 message.temp_data = workflow_message.temp_data;
215 message.audit_trail.extend(workflow_message.audit_trail);
216 message.errors.extend(workflow_message.errors);
217
218 info!("Completed processing workflow {}", workflow_id);
219
220 // If there were errors in this workflow, we may want to decide whether to continue
221 // For now, we continue processing remaining workflows even if one fails
222 }
223
224 debug!(
225 "Completed processing all workflows for message {}",
226 message.id
227 );
228 Ok(())
229 }
230
231 /// Process a single workflow with sequential task execution
232 async fn process_workflow(
233 workflow: Workflow,
234 mut message: Message,
235 task_functions: &HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>,
236 ) -> (String, Message) {
237 let workflow_id = workflow.id.clone();
238 let mut workflow_errors = Vec::new();
239
240 // Process tasks SEQUENTIALLY within this workflow
241 // IMPORTANT: Task order matters! Results from previous tasks are used by subsequent tasks.
242 // We intentionally process tasks one after another rather than concurrently.
243 for task in &workflow.tasks {
244 let task_condition = task.condition.clone().unwrap_or(Value::Bool(true));
245
246 // Evaluate task condition using thread-local DataLogic
247 let should_execute = THREAD_LOCAL_DATA_LOGIC.with(|data_logic_cell| {
248 let mut data_logic = data_logic_cell.borrow_mut();
249 data_logic.reset_arena();
250 data_logic
251 .evaluate_json(&task_condition, &message.metadata, None)
252 .map_err(|e| {
253 DataflowError::LogicEvaluation(format!("Error evaluating condition: {}", e))
254 })
255 .map(|result| result.as_bool().unwrap_or(false))
256 });
257
258 // Handle condition evaluation result
259 let should_execute = match should_execute {
260 Ok(result) => result,
261 Err(e) => {
262 workflow_errors.push(ErrorInfo::new(
263 Some(workflow_id.clone()),
264 Some(task.id.clone()),
265 e.clone(),
266 ));
267 false
268 }
269 };
270
271 if !should_execute {
272 debug!("Task {} skipped - condition not met", task.id);
273 continue;
274 }
275
276 // Execute task if we have a handler
277 if let Some(function) = task_functions.get(&task.function.name) {
278 let task_id = task.id.clone();
279 let function_input = task.function.input.clone();
280
281 // Execute this task (with retries)
282 match Self::execute_task_static(
283 &task_id,
284 &workflow_id,
285 &mut message,
286 &function_input,
287 function.as_ref(),
288 )
289 .await
290 {
291 Ok(_) => {
292 debug!("Task {} completed successfully", task_id);
293 }
294 Err(error) => {
295 workflow_errors.push(ErrorInfo::new(
296 Some(workflow_id.clone()),
297 Some(task_id.clone()),
298 error.clone(),
299 ));
300
301 // Break the task sequence if a task fails
302 break;
303 }
304 }
305 } else {
306 let error =
307 DataflowError::Workflow(format!("Function '{}' not found", task.function.name));
308
309 workflow_errors.push(ErrorInfo::new(
310 Some(workflow_id.clone()),
311 Some(task.id.clone()),
312 error,
313 ));
314
315 // Break the task sequence if a function is not found
316 break;
317 }
318 }
319
320 // Add any errors encountered to the message
321 message.errors.extend(workflow_errors);
322
323 // Return the processed message for this workflow
324 (workflow_id, message)
325 }
326
327 /// Static helper method to execute a task with retries
328 async fn execute_task_static(
329 task_id: &str,
330 workflow_id: &str,
331 message: &mut Message,
332 input_json: &Value,
333 function: &dyn AsyncFunctionHandler,
334 ) -> Result<()> {
335 info!("Executing task {} in workflow {}", task_id, workflow_id);
336
337 let mut last_error = None;
338 let mut retry_count = 0;
339 let max_retries = 3; // Default max retries
340 let retry_delay_ms = 1000; // Default retry delay in ms
341 let use_backoff = true; // Default backoff behavior
342
343 // Try executing the task with retries
344 while retry_count <= max_retries {
345 match function.execute(message, input_json).await {
346 Ok((status_code, changes)) => {
347 // Success! Record audit trail and return
348 message.audit_trail.push(AuditTrail {
349 workflow_id: workflow_id.to_string(),
350 task_id: task_id.to_string(),
351 timestamp: Utc::now().to_rfc3339(),
352 changes,
353 status_code,
354 });
355
356 info!("Task {} completed with status {}", task_id, status_code);
357
358 // Add progress metadata
359 let mut progress = Map::new();
360 progress.insert("task_id".to_string(), Value::String(task_id.to_string()));
361 progress.insert(
362 "workflow_id".to_string(),
363 Value::String(workflow_id.to_string()),
364 );
365 progress.insert(
366 "status_code".to_string(),
367 Value::Number(Number::from(status_code)),
368 );
369 progress.insert(
370 "timestamp".to_string(),
371 Value::String(Utc::now().to_rfc3339()),
372 );
373
374 if retry_count > 0 {
375 progress.insert(
376 "retries".to_string(),
377 Value::Number(Number::from(retry_count)),
378 );
379 }
380
381 message.metadata["progress"] = json!(progress);
382
383 return Ok(());
384 }
385 Err(e) => {
386 last_error = Some(e.clone());
387
388 if retry_count < max_retries {
389 warn!(
390 "Task {} execution failed, retry {}/{}: {:?}",
391 task_id,
392 retry_count + 1,
393 max_retries,
394 e
395 );
396
397 // Calculate delay with optional exponential backoff
398 let delay = if use_backoff {
399 retry_delay_ms * (2_u64.pow(retry_count))
400 } else {
401 retry_delay_ms
402 };
403
404 // Use tokio's non-blocking sleep
405 sleep(std::time::Duration::from_millis(delay)).await;
406
407 retry_count += 1;
408 } else {
409 break;
410 }
411 }
412 }
413 }
414
415 // If we're here, all retries failed
416 let error = last_error.unwrap_or_else(|| {
417 DataflowError::Unknown("Unknown error during task execution".to_string())
418 });
419
420 error!(
421 "Task {} in workflow {} failed after {} retries: {:?}",
422 task_id, workflow_id, retry_count, error
423 );
424
425 Err(error)
426 }
427
428 /// Evaluates a condition using DataLogic
429 async fn evaluate_condition(&self, condition: &Value, data: &Value) -> Result<bool> {
430 // For simple boolean conditions, short-circuit
431 if let Value::Bool(b) = condition {
432 return Ok(*b);
433 }
434
435 // Use thread-local DataLogic instance instead of mutex-protected one
436 THREAD_LOCAL_DATA_LOGIC.with(|data_logic_cell| {
437 let mut data_logic = data_logic_cell.borrow_mut();
438 data_logic.reset_arena();
439 data_logic
440 .evaluate_json(condition, data, None)
441 .map_err(|e| {
442 DataflowError::LogicEvaluation(format!("Error evaluating condition: {}", e))
443 })
444 .map(|result| result.as_bool().unwrap_or(false))
445 })
446 }
447}