dataflow_rs/engine/mod.rs
1/*!
2# Engine Module
3
4This module implements the core workflow engine for dataflow-rs. The engine provides
5high-performance, asynchronous message processing through workflows composed of tasks.
6
7## Architecture
8
9The engine features a modular architecture with clear separation of concerns:
10- **Compiler**: Pre-compiles JSONLogic expressions for optimal runtime performance
11- **Executor**: Handles internal function execution (map, validation) efficiently
12- **Engine**: Orchestrates workflow processing with immutable, pre-configured workflows
13- **Direct DataLogic**: Each engine instance has its own DataLogic for zero contention
14
15## Key Components
16
17- **Engine**: Single-threaded engine optimized for both IO-bound and CPU-bound workloads
18- **LogicCompiler**: Compiles and caches JSONLogic expressions during initialization
19- **InternalExecutor**: Executes built-in map and validation functions with compiled logic
20- **Workflow**: Collection of tasks with JSONLogic conditions (metadata-only access)
21- **Task**: Individual processing unit that performs a specific function on a message
22- **FunctionHandler**: Trait for custom processing logic implementation
23- **Message**: Data structure flowing through the engine with audit trail
24
25## Performance Optimizations
26
27- **Pre-compilation**: All JSONLogic expressions compiled at startup
28- **Direct Instantiation**: DataLogic instances created directly, avoiding any locking
29- **Immutable Workflows**: Workflows defined at initialization for predictable performance
30- **Efficient Caching**: Compiled logic cached for fast repeated evaluations
31
32## Usage
33
34```rust,no_run
35use dataflow_rs::{Engine, Workflow, engine::message::Message};
36use serde_json::json;
37
38fn main() -> Result<(), Box<dyn std::error::Error>> {
39 // Define workflows
40 let workflows = vec![
41 Workflow::from_json(r#"{"id": "example", "name": "Example", "tasks": []}"#)?
42 ];
43
44 // Create engine with defaults (built-ins enabled)
45 let engine = Engine::new(workflows.clone(), None, None);
46
47 // Process messages
48 let mut message = Message::new(&json!({}));
49 engine.process_message(&mut message)?;
50
51 Ok(())
52}
53```
54*/
55
56pub mod compiler;
57pub mod error;
58pub mod executor;
59pub mod functions;
60pub mod message;
61pub mod retry;
62pub mod task;
63pub mod workflow;
64
65// Re-export key types for easier access
66pub use error::{DataflowError, ErrorInfo, Result};
67pub use functions::{FunctionConfig, FunctionHandler};
68pub use message::Message;
69pub use retry::RetryConfig;
70pub use task::Task;
71pub use workflow::Workflow;
72
73use chrono::Utc;
74use datalogic_rs::{DataLogic, Logic};
75use log::{debug, error, info, warn};
76use message::{AuditTrail, Change};
77use serde_json::{Map, Number, Value, json};
78use std::collections::HashMap;
79use std::sync::Arc;
80
81use compiler::LogicCompiler;
82use executor::InternalExecutor;
83
84/// High-performance workflow engine for message processing.
85///
86/// ## Architecture
87///
88/// The engine features a modular design optimized for both IO-bound and CPU-bound workloads:
89/// - **Separation of Concerns**: Compiler handles pre-compilation, Executor handles runtime
90/// - **Direct DataLogic**: Single DataLogic instance per engine for zero contention
91/// - **Immutable Workflows**: All workflows compiled and cached at initialization
92/// - **Pre-compiled Logic**: JSONLogic expressions compiled once for optimal performance
93///
94/// ## Performance Characteristics
95///
96/// - **Zero Runtime Compilation**: All logic compiled during initialization
97/// - **Cache-Friendly**: Compiled logic stored in contiguous memory
98/// - **Predictable Latency**: No runtime allocations for logic evaluation
99/// - **Thread-Safe Design**: Applications can safely use multiple engine instances across threads
100pub struct Engine {
101 /// Registry of available workflows (immutable after initialization)
102 workflows: Arc<HashMap<String, Workflow>>,
103 /// Registry of function handlers that can be executed by tasks (immutable after initialization)
104 task_functions: Arc<HashMap<String, Box<dyn FunctionHandler + Send + Sync>>>,
105 /// DataLogic instance for JSONLogic evaluation
106 datalogic: DataLogic<'static>,
107 /// Compiled logic cache
108 logic_cache: Vec<Logic<'static>>,
109 /// Configuration for retry behavior
110 retry_config: RetryConfig,
111}
112
113impl Default for Engine {
114 fn default() -> Self {
115 Self::new(Vec::new(), None, None)
116 }
117}
118
119impl Engine {
120 /// Creates a new Engine instance with configurable parameters.
121 ///
122 /// # Arguments
123 /// * `workflows` - The workflows to use for processing messages
124 /// * `custom_functions` - Optional custom function handlers (None uses empty map)
125 /// * `include_builtins` - Optional flag to include built-in functions (defaults to true if None)
126 /// * `retry_config` - Optional retry configuration (uses default if None)
127 ///
128 /// # Example
129 ///
130 /// ```
131 /// use dataflow_rs::{Engine, Workflow};
132 ///
133 /// let workflows = vec![Workflow::from_json(r#"{"id": "test", "name": "Test", "priority": 0, "tasks": []}"#).unwrap()];
134 ///
135 /// // Simple usage with defaults
136 /// let engine = Engine::new(workflows.clone(), None, None);
137 ///
138 /// ```
139 pub fn new(
140 workflows: Vec<Workflow>,
141 custom_functions: Option<HashMap<String, Box<dyn FunctionHandler + Send + Sync>>>,
142 retry_config: Option<RetryConfig>,
143 ) -> Self {
144 // Compile workflows
145 let mut compiler = LogicCompiler::new();
146 let workflow_map = compiler.compile_workflows(workflows);
147 let (datalogic, logic_cache) = compiler.into_parts();
148
149 let mut task_functions = custom_functions.unwrap_or_default();
150
151 // Add built-in function handlers if requested (defaults to true)
152 for (name, handler) in functions::builtins::get_all_functions() {
153 task_functions.insert(name, handler);
154 }
155
156 Self {
157 workflows: Arc::new(workflow_map),
158 task_functions: Arc::new(task_functions),
159 datalogic,
160 logic_cache,
161 retry_config: retry_config.unwrap_or_default(),
162 }
163 }
164
165 /// Get the configured retry configuration
166 pub fn retry_config(&self) -> &RetryConfig {
167 &self.retry_config
168 }
169
170 /// Get the configured workflows
171 pub fn workflows(&self) -> &HashMap<String, Workflow> {
172 &self.workflows
173 }
174
175 /// Get the registered task functions
176 pub fn task_functions(&self) -> &HashMap<String, Box<dyn FunctionHandler + Send + Sync>> {
177 &self.task_functions
178 }
179
180 /// Check if a function with the given name is registered
181 pub fn has_function(&self, name: &str) -> bool {
182 self.task_functions.contains_key(name)
183 }
184
185 /// Processes a message through workflows that match their conditions.
186 ///
187 /// This method:
188 /// 1. Iterates through workflows sequentially in deterministic order (sorted by ID)
189 /// 2. Evaluates conditions for each workflow right before execution
190 /// 3. Executes matching workflows one after another (not concurrently)
191 /// 4. Updates the message with processing results and audit trail
192 ///
193 /// Workflows are executed sequentially because later workflows may depend
194 /// on the results of earlier workflows, and their conditions may change
195 /// based on modifications made by previous workflows.
196 ///
197 /// # Arguments
198 ///
199 /// * `message` - The message to process
200 ///
201 /// # Returns
202 ///
203 /// * `Result<()>` - Success or an error if processing failed
204 pub fn process_message(&self, message: &mut Message) -> Result<()> {
205 debug!(
206 "Processing message {} sequentially through workflows",
207 message.id
208 );
209
210 // Sort workflows by priority and ID to ensure deterministic execution order
211 let mut sorted_workflows: Vec<_> = self.workflows.iter().collect();
212 sorted_workflows.sort_by_key(|(id, workflow)| (workflow.priority, id.as_str()));
213
214 let executor = InternalExecutor::new(&self.datalogic, &self.logic_cache);
215
216 // Process workflows sequentially in sorted order
217 for (_, workflow) in sorted_workflows {
218 // Evaluate workflow condition using current message state
219 if !executor.evaluate_condition(
220 workflow.condition_index,
221 &workflow.condition,
222 &message.metadata,
223 )? {
224 debug!("Workflow {} skipped - condition not met", workflow.id);
225 continue;
226 }
227
228 info!("Processing workflow {}", workflow.id);
229 self.process_workflow(workflow, message, &executor);
230 info!("Completed processing workflow {}", workflow.id);
231 }
232
233 debug!(
234 "Completed processing all workflows for message {}",
235 message.id
236 );
237 Ok(())
238 }
239
240 fn process_workflow(
241 &self,
242 workflow: &Workflow,
243 message: &mut Message,
244 executor: &InternalExecutor,
245 ) {
246 let workflow_id = workflow.id.clone();
247 let mut workflow_errors = Vec::new();
248
249 // Cache timestamp for this workflow execution to reduce clock_gettime calls
250 let workflow_timestamp = Utc::now().to_rfc3339();
251
252 // Process tasks SEQUENTIALLY within this workflow
253 for task in &workflow.tasks {
254 // Evaluate task condition
255 let should_execute = executor.evaluate_condition(
256 task.condition_index,
257 &task.condition,
258 &message.metadata,
259 );
260
261 let should_execute = match should_execute {
262 Ok(result) => result,
263 Err(e) => {
264 workflow_errors.push(ErrorInfo::new(
265 Some(workflow_id.clone()),
266 Some(task.id.clone()),
267 e.clone(),
268 ));
269 false
270 }
271 };
272
273 if !should_execute {
274 debug!("Task {} skipped - condition not met", task.id);
275 continue;
276 }
277
278 // Execute task based on its type
279 let task_id = task.id.clone();
280 let function_config = &task.function;
281
282 let execution_result = match function_config {
283 FunctionConfig::Map { input, .. } => executor.execute_map(message, input),
284 FunctionConfig::Validation { input, .. } => {
285 executor.execute_validate(message, input)
286 }
287 FunctionConfig::Custom { name, .. } => {
288 if let Some(function) = self.task_functions.get(name) {
289 self.execute_task(
290 &task_id,
291 &workflow_id,
292 message,
293 function_config,
294 function.as_ref(),
295 )
296 } else {
297 Err(DataflowError::Workflow(format!(
298 "Function '{}' not found",
299 name
300 )))
301 }
302 }
303 };
304
305 // Handle execution result
306 match execution_result {
307 Ok((status_code, changes)) => {
308 debug!(
309 "Task {} completed successfully with status {}",
310 task_id, status_code
311 );
312
313 // Record audit trail using cached timestamp
314 message.audit_trail.push(AuditTrail {
315 workflow_id: workflow_id.to_string(),
316 task_id: task_id.to_string(),
317 timestamp: workflow_timestamp.clone(),
318 changes,
319 status_code,
320 });
321
322 // Add progress metadata with cached timestamp
323 self.update_progress_metadata_with_timestamp(
324 message,
325 &task_id,
326 &workflow_id,
327 status_code,
328 &workflow_timestamp,
329 );
330 }
331 Err(error) => {
332 workflow_errors.push(ErrorInfo::new(
333 Some(workflow_id.clone()),
334 Some(task_id.clone()),
335 error.clone(),
336 ));
337 break; // Break the task sequence if a task fails
338 }
339 }
340 }
341
342 // Add any errors encountered to the message
343 message.errors.extend(workflow_errors);
344 }
345
346 /// Execute a custom task with retries
347 fn execute_task(
348 &self,
349 task_id: &str,
350 workflow_id: &str,
351 message: &mut Message,
352 config: &FunctionConfig,
353 function: &dyn FunctionHandler,
354 ) -> Result<(usize, Vec<Change>)> {
355 info!("Executing task {} in workflow {}", task_id, workflow_id);
356
357 let mut last_error = None;
358 let mut retry_count = 0;
359
360 // Try executing the task with retries
361 while retry_count <= self.retry_config.max_retries {
362 match function.execute(message, config, &self.datalogic) {
363 Ok((status_code, changes)) => {
364 info!("Task {} completed with status {}", task_id, status_code);
365
366 if retry_count > 0 {
367 self.update_progress_metadata_with_retries(
368 message,
369 task_id,
370 workflow_id,
371 status_code,
372 retry_count,
373 );
374 } else {
375 self.update_progress_metadata(message, task_id, workflow_id, status_code);
376 }
377
378 return Ok((status_code, changes));
379 }
380 Err(e) => {
381 last_error = Some(e.clone());
382
383 // Check if this error is retryable
384 if retry_count < self.retry_config.max_retries && e.retryable() {
385 warn!(
386 "Task {} execution failed with retryable error, retry {}/{}: {:?}",
387 task_id,
388 retry_count + 1,
389 self.retry_config.max_retries,
390 e
391 );
392
393 self.retry_config.sleep(retry_count);
394 retry_count += 1;
395 } else {
396 if !e.retryable() {
397 info!(
398 "Task {} failed with non-retryable error, skipping retries: {:?}",
399 task_id, e
400 );
401 }
402 break;
403 }
404 }
405 }
406 }
407
408 // If we're here, all retries failed
409 let error = last_error.unwrap_or_else(|| {
410 DataflowError::Unknown("Unknown error during task execution".to_string())
411 });
412
413 error!(
414 "Task {} in workflow {} failed after {} retries: {:?}",
415 task_id, workflow_id, retry_count, error
416 );
417
418 Err(error)
419 }
420
421 /// Update progress metadata
422 fn update_progress_metadata(
423 &self,
424 message: &mut Message,
425 task_id: &str,
426 workflow_id: &str,
427 status_code: usize,
428 ) {
429 let timestamp = Utc::now().to_rfc3339();
430 self.update_progress_metadata_with_timestamp(
431 message,
432 task_id,
433 workflow_id,
434 status_code,
435 ×tamp,
436 );
437 }
438
439 /// Update progress metadata with provided timestamp
440 fn update_progress_metadata_with_timestamp(
441 &self,
442 message: &mut Message,
443 task_id: &str,
444 workflow_id: &str,
445 status_code: usize,
446 timestamp: &str,
447 ) {
448 let mut progress = Map::new();
449 progress.insert("task_id".to_string(), Value::String(task_id.to_string()));
450 progress.insert(
451 "workflow_id".to_string(),
452 Value::String(workflow_id.to_string()),
453 );
454 progress.insert(
455 "status_code".to_string(),
456 Value::Number(Number::from(status_code)),
457 );
458 progress.insert(
459 "timestamp".to_string(),
460 Value::String(timestamp.to_string()),
461 );
462 message.metadata["progress"] = json!(progress);
463 }
464
465 /// Update progress metadata with retry count
466 fn update_progress_metadata_with_retries(
467 &self,
468 message: &mut Message,
469 task_id: &str,
470 workflow_id: &str,
471 status_code: usize,
472 retry_count: u32,
473 ) {
474 let mut progress = Map::new();
475 progress.insert("task_id".to_string(), Value::String(task_id.to_string()));
476 progress.insert(
477 "workflow_id".to_string(),
478 Value::String(workflow_id.to_string()),
479 );
480 progress.insert(
481 "status_code".to_string(),
482 Value::Number(Number::from(status_code)),
483 );
484 progress.insert(
485 "timestamp".to_string(),
486 Value::String(Utc::now().to_rfc3339()),
487 );
488 progress.insert(
489 "retries".to_string(),
490 Value::Number(Number::from(retry_count)),
491 );
492 message.metadata["progress"] = json!(progress);
493 }
494
495 /// Evaluate logic using compiled index or direct evaluation (public for testing)
496 pub fn evaluate_logic(
497 &self,
498 logic_index: Option<usize>,
499 logic: &Value,
500 data: &Value,
501 ) -> Result<Value> {
502 let executor = InternalExecutor::new(&self.datalogic, &self.logic_cache);
503 executor.evaluate_logic(logic_index, logic, data)
504 }
505}