dataflow_rs/lib.rs
1/*!
2# Dataflow-rs
3
4A lightweight, rule-driven workflow engine for building powerful data processing pipelines and nanoservices in Rust.
5
6## Overview
7
8Dataflow-rs provides a flexible and extensible framework for processing data through a series of tasks organized in workflows.
9The engine automatically routes messages through appropriate workflows based on configurable rules, and each workflow can
10contain multiple tasks that transform, validate, or enrich the data.
11
12## Key Components
13
14* **Engine**: The central component that processes messages through workflows
15* **Workflow**: A collection of tasks with conditions that determine when they should be applied
16* **Task**: An individual processing unit that performs a specific function on a message
17* **AsyncFunctionHandler**: A trait implemented by task handlers to define custom async processing logic
18* **Message**: The data structure that flows through the engine, containing payload, metadata, and processing results
19
20## Built-in Functions
21
22The engine comes with several pre-registered functions:
23
24* **http**: Fetches data from external HTTP APIs
25* **map**: Maps and transforms data between different parts of a message
26* **validate**: Validates message data against rules using JSONLogic expressions
27
28## Async Support
29
30The engine fully supports asynchronous operation with Tokio, providing improved scalability and
31performance for IO-bound operations like HTTP requests:
32
33```rust
34use dataflow_rs::{Engine, Workflow};
35use dataflow_rs::engine::message::Message;
36use serde_json::json;
37
38#[tokio::main]
39async fn main() -> Result<(), Box<dyn std::error::Error>> {
40 // Create the async workflow engine
41 let mut engine = Engine::new();
42
43 // Define and add a workflow
44 let workflow_json = r#"{
45 "id": "data_processor",
46 "name": "Data Processor",
47 "priority": 0,
48 "tasks": [
49 {
50 "id": "fetch_data",
51 "name": "Fetch Data",
52 "function": {
53 "name": "http",
54 "input": { "url": "https://api.example.com/data" }
55 }
56 }
57 ]
58 }"#;
59
60 let workflow = Workflow::from_json(workflow_json)?;
61 engine.add_workflow(&workflow);
62
63 // Create and process a message
64 let mut message = Message::new(&json!({}));
65
66 // Process the message asynchronously
67 engine.process_message(&mut message).await?;
68
69 println!("Processed result: {}", message.data["result"]);
70 Ok(())
71}
72```
73
74## Usage Example
75
76```rust
77use dataflow_rs::{Engine, Workflow};
78use dataflow_rs::engine::message::Message;
79use serde_json::json;
80
81#[tokio::main]
82async fn main() -> Result<(), Box<dyn std::error::Error>> {
83 // Create the workflow engine (built-in functions are auto-registered)
84 let mut engine = Engine::new();
85
86 // Define a workflow in JSON
87 let workflow_json = r#"
88 {
89 "id": "data_processor",
90 "name": "Data Processor",
91 "priority": 0,
92 "tasks": [
93 {
94 "id": "fetch_data",
95 "name": "Fetch Data",
96 "function": {
97 "name": "http",
98 "input": { "url": "https://api.example.com/data" }
99 }
100 },
101 {
102 "id": "transform_data",
103 "name": "Transform Data",
104 "function": {
105 "name": "map",
106 "input": {
107 "mappings": [
108 {
109 "path": "data.result",
110 "logic": { "var": "temp_data.body.value" }
111 }
112 ]
113 }
114 }
115 }
116 ]
117 }
118 "#;
119
120 // Parse and add the workflow to the engine
121 let workflow = Workflow::from_json(workflow_json)?;
122 engine.add_workflow(&workflow);
123
124 // Create a message to process
125 let mut message = Message::new(&json!({}));
126
127 // Process the message through the workflow
128 match engine.process_message(&mut message).await {
129 Ok(_) => {
130 println!("Processed result: {}", message.data["result"]);
131 }
132 Err(e) => {
133 println!("Error in workflow: {:?}", e);
134 }
135 }
136
137 Ok(())
138}
139```
140
141## Error Handling
142
143The library provides a comprehensive error handling system:
144
145```rust
146use dataflow_rs::{Engine, Result, DataflowError};
147use dataflow_rs::engine::message::Message;
148use serde_json::json;
149
150#[tokio::main]
151async fn main() -> Result<()> {
152 let mut engine = Engine::new();
153 // ... setup workflows ...
154
155 let mut message = Message::new(&json!({}));
156
157 // Process the message, errors will be collected but not halt execution
158 engine.process_message(&mut message).await?;
159
160 // Check if there were any errors during processing
161 if message.has_errors() {
162 for error in &message.errors {
163 println!("Error in workflow: {:?}, task: {:?}: {:?}",
164 error.workflow_id, error.task_id, error.error_message);
165 }
166 }
167
168 Ok(())
169}
170```
171
172## Extending with Custom Functions
173
174You can extend the engine with your own custom function handlers:
175
176```rust
177use dataflow_rs::{Engine, AsyncFunctionHandler, Result, Workflow};
178use dataflow_rs::engine::message::{Change, Message};
179use dataflow_rs::engine::error::DataflowError;
180use serde_json::{json, Value};
181use async_trait::async_trait;
182
183struct CustomFunction;
184
185#[async_trait]
186impl AsyncFunctionHandler for CustomFunction {
187 async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
188 // Implement your custom logic here
189
190 // Validate input
191 let required_field = input.get("field")
192 .ok_or_else(|| DataflowError::Validation("Missing required field".to_string()))?
193 .as_str()
194 .ok_or_else(|| DataflowError::Validation("Field must be a string".to_string()))?;
195
196 // Record changes for audit trail
197 let changes = vec![
198 Change {
199 path: "data.custom_field".to_string(),
200 old_value: Value::Null,
201 new_value: json!("custom value"),
202 }
203 ];
204
205 // Return success code (200) and changes
206 Ok((200, changes))
207 }
208}
209
210#[tokio::main]
211async fn main() -> Result<()> {
212 let mut engine = Engine::new();
213
214 // Register your custom function
215 engine.register_task_function("custom".to_string(), Box::new(CustomFunction));
216
217 // Now it can be used in workflows...
218 Ok(())
219}
220```
221*/
222
223pub mod engine;
224
225// Re-export all public APIs for easier access
226pub use async_trait::async_trait;
227pub use engine::error::{DataflowError, ErrorInfo, Result};
228pub use engine::RetryConfig;
229pub use engine::{AsyncFunctionHandler, Engine, Task, Workflow};