dataflow_rs/lib.rs
1/*!
2# Dataflow-rs
3
4A lightweight, rule-driven workflow engine for building powerful data processing pipelines and nanoservices in Rust.
5
6## Overview
7
8Dataflow-rs provides a flexible and extensible framework for processing data through a series of tasks organized in workflows.
9The engine automatically routes messages through appropriate workflows based on configurable rules, and each workflow can
10contain multiple tasks that transform, validate, or enrich the data.
11
12## Key Components
13
14* **Engine**: The central component that processes messages through workflows
15* **Workflow**: A collection of tasks with conditions that determine when they should be applied
16* **Task**: An individual processing unit that performs a specific function on a message
17* **FunctionHandler**: A trait implemented by task handlers to define custom processing logic
18* **Message**: The data structure that flows through the engine, containing payload, metadata, and processing results
19
20## Built-in Functions
21
22The engine comes with several pre-registered functions:
23
24* **http**: Fetches data from external HTTP APIs
25* **map**: Maps and transforms data between different parts of a message
26* **validate**: Validates message data against rules using JSONLogic expressions
27
28## Usage Example
29
30```rust,no_run
31use dataflow_rs::{Engine, Workflow};
32use dataflow_rs::engine::message::Message;
33use serde_json::json;
34
35fn main() -> Result<(), Box<dyn std::error::Error>> {
36 // Define a workflow in JSON
37 let workflow_json = r#"
38 {
39 "id": "data_processor",
40 "name": "Data Processor",
41 "priority": 0,
42 "tasks": [
43 {
44 "id": "fetch_data",
45 "name": "Fetch Data",
46 "function": {
47 "name": "http",
48 "input": { "url": "https://api.example.com/data" }
49 }
50 },
51 {
52 "id": "transform_data",
53 "name": "Transform Data",
54 "function": {
55 "name": "map",
56 "input": {
57 "mappings": [
58 {
59 "path": "data.result",
60 "logic": { "var": "temp_data.body.value" }
61 }
62 ]
63 }
64 }
65 }
66 ]
67 }
68 "#;
69
70 // Parse the workflow
71 let workflow = Workflow::from_json(workflow_json)?;
72
73 // Create the workflow engine with the workflow (built-in functions are auto-registered by default)
74 let mut engine = Engine::new(vec![workflow], None, None);
75
76 // Create a message to process
77 let mut message = Message::new(&json!({}));
78
79 // Process the message through the workflow
80 match engine.process_message(&mut message) {
81 Ok(_) => {
82 println!("Processed result: {}", message.data["result"]);
83 }
84 Err(e) => {
85 println!("Error in workflow: {:?}", e);
86 }
87 }
88
89 Ok(())
90}
91```
92
93## Error Handling
94
95The library provides a comprehensive error handling system:
96
97```rust,no_run
98use dataflow_rs::{Engine, Result, DataflowError};
99use dataflow_rs::engine::message::Message;
100use serde_json::json;
101
102fn main() -> Result<()> {
103 // ... setup workflows ...
104 let mut engine = Engine::new(vec![/* workflows */], None, None);
105
106 let mut message = Message::new(&json!({}));
107
108 // Process the message, errors will be collected but not halt execution
109 engine.process_message(&mut message)?;
110
111 // Check if there were any errors during processing
112 if message.has_errors() {
113 for error in &message.errors {
114 println!("Error in workflow: {:?}, task: {:?}: {:?}",
115 error.workflow_id, error.task_id, error.error_message);
116 }
117 }
118
119 Ok(())
120}
121```
122
123## Extending with Custom Functions
124
125You can extend the engine with your own custom function handlers:
126
127```rust,no_run
128use dataflow_rs::{Engine, FunctionHandler, Result, Workflow};
129use dataflow_rs::engine::{FunctionConfig, message::{Change, Message}, error::DataflowError};
130use datalogic_rs::DataLogic;
131use serde_json::{json, Value};
132use std::collections::HashMap;
133
134struct CustomFunction;
135
136impl FunctionHandler for CustomFunction {
137 fn execute(
138 &self,
139 message: &mut Message,
140 config: &FunctionConfig,
141 datalogic: &DataLogic,
142 ) -> Result<(usize, Vec<Change>)> {
143 // Implement your custom logic here
144
145 // Extract the custom configuration from config
146 let input = match config {
147 FunctionConfig::Custom { input, .. } => input,
148 _ => return Err(DataflowError::Validation("Invalid configuration type".to_string())),
149 };
150
151 // Validate input
152 let required_field = input.get("field")
153 .ok_or_else(|| DataflowError::Validation("Missing required field".to_string()))?
154 .as_str()
155 .ok_or_else(|| DataflowError::Validation("Field must be a string".to_string()))?;
156
157 // Record changes for audit trail
158 let changes = vec![
159 Change {
160 path: "data.custom_field".to_string(),
161 old_value: Value::Null,
162 new_value: json!("custom value"),
163 }
164 ];
165
166 // Return success code (200) and changes
167 Ok((200, changes))
168 }
169}
170
171fn main() -> Result<()> {
172 // Create custom functions
173 let mut custom_functions = HashMap::new();
174 custom_functions.insert(
175 "custom".to_string(),
176 Box::new(CustomFunction) as Box<dyn FunctionHandler + Send + Sync>
177 );
178
179 // Create engine with workflows and custom functions
180 let engine = Engine::new(vec![/* workflows */], Some(custom_functions), None);
181
182 // Now it can be used in workflows...
183 Ok(())
184}
185```
186*/
187
188pub mod engine;
189
190// Re-export all public APIs for easier access
191pub use engine::RetryConfig;
192pub use engine::error::{DataflowError, ErrorInfo, Result};
193pub use engine::functions::{MapConfig, MapMapping, ValidationConfig, ValidationRule};
194pub use engine::message::{AuditTrail, Change, Message};
195pub use engine::{Engine, FunctionConfig, FunctionHandler, Task, Workflow};