dataflow_rs/lib.rs
1/*!
2# Dataflow-rs
3
4A lightweight, rule-driven workflow engine for building powerful data processing pipelines and nanoservices in Rust.
5
6## Overview
7
8Dataflow-rs provides a flexible and extensible framework for processing data through a series of tasks organized in workflows.
9The engine automatically routes messages through appropriate workflows based on configurable rules, and each workflow can
10contain multiple tasks that transform, validate, or enrich the data.
11
12## Key Components
13
14* **Engine**: The central component that processes messages through workflows
15* **Workflow**: A collection of tasks with conditions that determine when they should be applied
16* **Task**: An individual processing unit that performs a specific function on a message
17* **AsyncFunctionHandler**: A trait implemented by task handlers to define custom async processing logic
18* **Message**: The data structure that flows through the engine, containing payload, metadata, and processing results
19
20## Built-in Functions
21
22The engine comes with several pre-registered functions:
23
24* **http**: Fetches data from external HTTP APIs
25* **map**: Maps and transforms data between different parts of a message
26* **validate**: Validates message data against rules using JSONLogic expressions
27
28## Async Support
29
30The engine fully supports asynchronous operation with Tokio, providing improved scalability and
31performance for IO-bound operations like HTTP requests:
32
33```rust
34use dataflow_rs::{Engine, Workflow};
35use dataflow_rs::engine::message::Message;
36use serde_json::json;
37
38#[tokio::main]
39async fn main() -> Result<(), Box<dyn std::error::Error>> {
40 // Create the async workflow engine
41 let mut engine = Engine::new();
42
43 // Define and add a workflow
44 let workflow_json = r#"{
45 "id": "data_processor",
46 "name": "Data Processor",
47 "tasks": [
48 {
49 "id": "fetch_data",
50 "name": "Fetch Data",
51 "function": {
52 "name": "http",
53 "input": { "url": "https://api.example.com/data" }
54 }
55 }
56 ]
57 }"#;
58
59 let workflow = Workflow::from_json(workflow_json)?;
60 engine.add_workflow(&workflow);
61
62 // Create and process a message
63 let mut message = Message::new(&json!({}));
64
65 // Process the message asynchronously
66 engine.process_message(&mut message).await?;
67
68 println!("Processed result: {}", message.data["result"]);
69 Ok(())
70}
71```
72
73## Usage Example
74
75```rust
76use dataflow_rs::{Engine, Workflow};
77use dataflow_rs::engine::message::Message;
78use serde_json::json;
79
80#[tokio::main]
81async fn main() -> Result<(), Box<dyn std::error::Error>> {
82 // Create the workflow engine (built-in functions are auto-registered)
83 let mut engine = Engine::new();
84
85 // Define a workflow in JSON
86 let workflow_json = r#"
87 {
88 "id": "data_processor",
89 "name": "Data Processor",
90 "tasks": [
91 {
92 "id": "fetch_data",
93 "name": "Fetch Data",
94 "function": {
95 "name": "http",
96 "input": { "url": "https://api.example.com/data" }
97 }
98 },
99 {
100 "id": "transform_data",
101 "name": "Transform Data",
102 "function": {
103 "name": "map",
104 "input": {
105 "mappings": [
106 {
107 "path": "data.result",
108 "logic": { "var": "temp_data.body.value" }
109 }
110 ]
111 }
112 }
113 }
114 ]
115 }
116 "#;
117
118 // Parse and add the workflow to the engine
119 let workflow = Workflow::from_json(workflow_json)?;
120 engine.add_workflow(&workflow);
121
122 // Create a message to process
123 let mut message = Message::new(&json!({}));
124
125 // Process the message through the workflow
126 match engine.process_message(&mut message).await {
127 Ok(_) => {
128 println!("Processed result: {}", message.data["result"]);
129 }
130 Err(e) => {
131 println!("Error in workflow: {:?}", e);
132 }
133 }
134
135 Ok(())
136}
137```
138
139## Error Handling
140
141The library provides a comprehensive error handling system:
142
143```rust
144use dataflow_rs::{Engine, Result, DataflowError};
145use dataflow_rs::engine::message::Message;
146use serde_json::json;
147
148#[tokio::main]
149async fn main() -> Result<()> {
150 let mut engine = Engine::new();
151 // ... setup workflows ...
152
153 let mut message = Message::new(&json!({}));
154
155 // Process the message, errors will be collected but not halt execution
156 engine.process_message(&mut message).await?;
157
158 // Check if there were any errors during processing
159 if message.has_errors() {
160 for error in &message.errors {
161 println!("Error in workflow: {:?}, task: {:?}: {:?}",
162 error.workflow_id, error.task_id, error.error);
163 }
164 }
165
166 Ok(())
167}
168```
169
170## Extending with Custom Functions
171
172You can extend the engine with your own custom function handlers:
173
174```rust
175use dataflow_rs::{Engine, AsyncFunctionHandler, Result, Workflow};
176use dataflow_rs::engine::message::{Change, Message};
177use dataflow_rs::engine::error::DataflowError;
178use serde_json::{json, Value};
179use async_trait::async_trait;
180
181struct CustomFunction;
182
183#[async_trait]
184impl AsyncFunctionHandler for CustomFunction {
185 async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
186 // Implement your custom logic here
187
188 // Validate input
189 let required_field = input.get("field")
190 .ok_or_else(|| DataflowError::Validation("Missing required field".to_string()))?
191 .as_str()
192 .ok_or_else(|| DataflowError::Validation("Field must be a string".to_string()))?;
193
194 // Record changes for audit trail
195 let changes = vec![
196 Change {
197 path: "data.custom_field".to_string(),
198 old_value: Value::Null,
199 new_value: json!("custom value"),
200 }
201 ];
202
203 // Return success code (200) and changes
204 Ok((200, changes))
205 }
206}
207
208#[tokio::main]
209async fn main() -> Result<()> {
210 let mut engine = Engine::new();
211
212 // Register your custom function
213 engine.register_task_function("custom".to_string(), Box::new(CustomFunction));
214
215 // Now it can be used in workflows...
216 Ok(())
217}
218```
219*/
220
221pub mod engine;
222
223// Re-export all public APIs for easier access
224pub use async_trait::async_trait;
225pub use engine::error::{DataflowError, ErrorInfo, Result};
226pub use engine::RetryConfig;
227pub use engine::{AsyncFunctionHandler, Engine, Task, Workflow};