dataflow_rs/lib.rs
1/*!
2# Dataflow-rs
3
4A lightweight, rule-driven workflow engine for building powerful data processing pipelines and nanoservices in Rust.
5
6## Overview
7
8Dataflow-rs provides a flexible and extensible framework for processing data through a series of tasks organized in workflows.
9The engine automatically routes messages through appropriate workflows based on configurable rules, and each workflow can
10contain multiple tasks that transform, validate, or enrich the data.
11
12## Key Components
13
14* **Engine**: The central component that processes messages through workflows
15* **Workflow**: A collection of tasks with conditions that determine when they should be applied
16* **Task**: An individual processing unit that performs a specific function on a message
17* **FunctionHandler**: A trait implemented by task handlers to define custom processing logic
18* **Message**: The data structure that flows through the engine, containing payload, metadata, and processing results
19
20## Built-in Functions
21
22The engine comes with several pre-registered functions:
23
24* **map**: Maps and transforms data between different parts of a message
25* **validate**: Validates message data against rules using JSONLogic expressions
26
27## Usage Example
28
29```rust,no_run
30use dataflow_rs::{Engine, Workflow};
31use dataflow_rs::engine::message::Message;
32use serde_json::json;
33
34fn main() -> Result<(), Box<dyn std::error::Error>> {
35 // Define a workflow in JSON
36 let workflow_json = r#"
37 {
38 "id": "data_processor",
39 "name": "Data Processor",
40 "priority": 0,
41 "tasks": [
42 {
43 "id": "transform_data",
44 "name": "Transform Data",
45 "function": {
46 "name": "map",
47 "input": {
48 "mappings": [
49 {
50 "path": "data.result",
51 "logic": { "var": "temp_data.value" }
52 }
53 ]
54 }
55 }
56 }
57 ]
58 }
59 "#;
60
61 // Parse the workflow
62 let workflow = Workflow::from_json(workflow_json)?;
63
64 // Create the workflow engine with the workflow (built-in functions are auto-registered by default)
65 let mut engine = Engine::new(vec![workflow], None, None);
66
67 // Create a message to process
68 let mut message = Message::new(&json!({}));
69
70 // Process the message through the workflow
71 match engine.process_message(&mut message) {
72 Ok(_) => {
73 println!("Processed result: {}", message.data["result"]);
74 }
75 Err(e) => {
76 println!("Error in workflow: {:?}", e);
77 }
78 }
79
80 Ok(())
81}
82```
83
84## Error Handling
85
86The library provides a comprehensive error handling system:
87
88```rust,no_run
89use dataflow_rs::{Engine, Result, DataflowError};
90use dataflow_rs::engine::message::Message;
91use serde_json::json;
92
93fn main() -> Result<()> {
94 // ... setup workflows ...
95 let mut engine = Engine::new(vec![/* workflows */], None, None);
96
97 let mut message = Message::new(&json!({}));
98
99 // Process the message, errors will be collected but not halt execution
100 engine.process_message(&mut message)?;
101
102 // Check if there were any errors during processing
103 if message.has_errors() {
104 for error in &message.errors {
105 println!("Error in workflow: {:?}, task: {:?}: {:?}",
106 error.workflow_id, error.task_id, error.error_message);
107 }
108 }
109
110 Ok(())
111}
112```
113
114## Extending with Custom Functions
115
116You can extend the engine with your own custom function handlers:
117
118```rust,no_run
119use dataflow_rs::{Engine, FunctionHandler, Result, Workflow};
120use dataflow_rs::engine::{FunctionConfig, message::{Change, Message}, error::DataflowError};
121use datalogic_rs::DataLogic;
122use serde_json::{json, Value};
123use std::collections::HashMap;
124
125struct CustomFunction;
126
127impl FunctionHandler for CustomFunction {
128 fn execute(
129 &self,
130 message: &mut Message,
131 config: &FunctionConfig,
132 datalogic: &DataLogic,
133 ) -> Result<(usize, Vec<Change>)> {
134 // Implement your custom logic here
135
136 // Extract the custom configuration from config
137 let input = match config {
138 FunctionConfig::Custom { input, .. } => input,
139 _ => return Err(DataflowError::Validation("Invalid configuration type".to_string())),
140 };
141
142 // Validate input
143 let required_field = input.get("field")
144 .ok_or_else(|| DataflowError::Validation("Missing required field".to_string()))?
145 .as_str()
146 .ok_or_else(|| DataflowError::Validation("Field must be a string".to_string()))?;
147
148 // Record changes for audit trail
149 let changes = vec![
150 Change {
151 path: "data.custom_field".to_string(),
152 old_value: Value::Null,
153 new_value: json!("custom value"),
154 }
155 ];
156
157 // Return success code (200) and changes
158 Ok((200, changes))
159 }
160}
161
162fn main() -> Result<()> {
163 // Create custom functions
164 let mut custom_functions = HashMap::new();
165 custom_functions.insert(
166 "custom".to_string(),
167 Box::new(CustomFunction) as Box<dyn FunctionHandler + Send + Sync>
168 );
169
170 // Create engine with workflows and custom functions
171 let engine = Engine::new(vec![/* workflows */], Some(custom_functions), None);
172
173 // Now it can be used in workflows...
174 Ok(())
175}
176```
177*/
178
179pub mod engine;
180
181// Re-export all public APIs for easier access
182pub use engine::RetryConfig;
183pub use engine::error::{DataflowError, ErrorInfo, Result};
184pub use engine::functions::{MapConfig, MapMapping, ValidationConfig, ValidationRule};
185pub use engine::message::{AuditTrail, Change, Message};
186pub use engine::{Engine, FunctionConfig, FunctionHandler, Task, ThreadedEngine, Workflow};