dataflow_rs/
lib.rs

1/*!
2# Dataflow-rs
3
4A lightweight, rule-driven workflow engine for building powerful data processing pipelines and nanoservices in Rust.
5
6## Overview
7
8Dataflow-rs provides a flexible and extensible framework for processing data through a series of tasks organized in workflows.
9The engine automatically routes messages through appropriate workflows based on configurable rules, and each workflow can
10contain multiple tasks that transform, validate, or enrich the data.
11
12## Key Components
13
14* **Engine**: The central component that processes messages through workflows
15* **Workflow**: A collection of tasks with conditions that determine when they should be applied
16* **Task**: An individual processing unit that performs a specific function on a message
17* **AsyncFunctionHandler**: A trait implemented by task handlers to define custom async processing logic
18* **Message**: The data structure that flows through the engine, containing payload, metadata, and processing results
19
20## Built-in Functions
21
22The engine comes with several pre-registered functions:
23
24* **http**: Fetches data from external HTTP APIs
25* **map**: Maps and transforms data between different parts of a message
26* **validate**: Validates message data against rules using JSONLogic expressions
27
28## Async Support
29
30The engine fully supports asynchronous operation with Tokio, providing improved scalability and
31performance for IO-bound operations like HTTP requests:
32
33```rust
34use dataflow_rs::{Engine, Workflow};
35use dataflow_rs::engine::message::Message;
36use serde_json::json;
37
38#[tokio::main]
39async fn main() -> Result<(), Box<dyn std::error::Error>> {
40    // Create the async workflow engine
41    let mut engine = Engine::new();
42
43    // Define and add a workflow
44    let workflow_json = r#"{
45        "id": "data_processor",
46        "name": "Data Processor",
47        "priority": 0,
48        "tasks": [
49            {
50                "id": "fetch_data",
51                "name": "Fetch Data",
52                "function": {
53                    "name": "http",
54                    "input": { "url": "https://api.example.com/data" }
55                }
56            }
57        ]
58    }"#;
59
60    let workflow = Workflow::from_json(workflow_json)?;
61    engine.add_workflow(&workflow);
62
63    // Create and process a message
64    let mut message = Message::new(&json!({}));
65
66    // Process the message asynchronously
67    engine.process_message(&mut message).await?;
68
69    println!("Processed result: {}", message.data["result"]);
70    Ok(())
71}
72```
73
74## Usage Example
75
76```rust
77use dataflow_rs::{Engine, Workflow};
78use dataflow_rs::engine::message::Message;
79use serde_json::json;
80
81#[tokio::main]
82async fn main() -> Result<(), Box<dyn std::error::Error>> {
83    // Create the workflow engine (built-in functions are auto-registered)
84    let mut engine = Engine::new();
85
86    // Define a workflow in JSON
87    let workflow_json = r#"
88    {
89        "id": "data_processor",
90        "name": "Data Processor",
91        "priority": 0,
92        "tasks": [
93            {
94                "id": "fetch_data",
95                "name": "Fetch Data",
96                "function": {
97                    "name": "http",
98                    "input": { "url": "https://api.example.com/data" }
99                }
100            },
101            {
102                "id": "transform_data",
103                "name": "Transform Data",
104                "function": {
105                    "name": "map",
106                    "input": {
107                        "mappings": [
108                            {
109                                "path": "data.result",
110                                "logic": { "var": "temp_data.body.value" }
111                            }
112                        ]
113                    }
114                }
115            }
116        ]
117    }
118    "#;
119
120    // Parse and add the workflow to the engine
121    let workflow = Workflow::from_json(workflow_json)?;
122    engine.add_workflow(&workflow);
123
124    // Create a message to process
125    let mut message = Message::new(&json!({}));
126
127    // Process the message through the workflow
128    match engine.process_message(&mut message).await {
129        Ok(_) => {
130            println!("Processed result: {}", message.data["result"]);
131        }
132        Err(e) => {
133            println!("Error in workflow: {:?}", e);
134        }
135    }
136
137    Ok(())
138}
139```
140
141## Error Handling
142
143The library provides a comprehensive error handling system:
144
145```rust
146use dataflow_rs::{Engine, Result, DataflowError};
147use dataflow_rs::engine::message::Message;
148use serde_json::json;
149
150#[tokio::main]
151async fn main() -> Result<()> {
152    let mut engine = Engine::new();
153    // ... setup workflows ...
154
155    let mut message = Message::new(&json!({}));
156
157    // Process the message, errors will be collected but not halt execution
158    engine.process_message(&mut message).await?;
159
160    // Check if there were any errors during processing
161    if message.has_errors() {
162        for error in &message.errors {
163            println!("Error in workflow: {:?}, task: {:?}: {:?}",
164                     error.workflow_id, error.task_id, error.error_message);
165        }
166    }
167
168    Ok(())
169}
170```
171
172## Extending with Custom Functions
173
174You can extend the engine with your own custom function handlers:
175
176```rust
177use dataflow_rs::{Engine, AsyncFunctionHandler, Result, Workflow};
178use dataflow_rs::engine::message::{Change, Message};
179use dataflow_rs::engine::error::DataflowError;
180use serde_json::{json, Value};
181use async_trait::async_trait;
182
183struct CustomFunction;
184
185#[async_trait]
186impl AsyncFunctionHandler for CustomFunction {
187    async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
188        // Implement your custom logic here
189
190        // Validate input
191        let required_field = input.get("field")
192            .ok_or_else(|| DataflowError::Validation("Missing required field".to_string()))?
193            .as_str()
194            .ok_or_else(|| DataflowError::Validation("Field must be a string".to_string()))?;
195
196        // Record changes for audit trail
197        let changes = vec![
198            Change {
199                path: "data.custom_field".to_string(),
200                old_value: Value::Null,
201                new_value: json!("custom value"),
202            }
203        ];
204
205        // Return success code (200) and changes
206        Ok((200, changes))
207    }
208}
209
210#[tokio::main]
211async fn main() -> Result<()> {
212    let mut engine = Engine::new();
213
214    // Register your custom function
215    engine.register_task_function("custom".to_string(), Box::new(CustomFunction));
216
217    // Now it can be used in workflows...
218    Ok(())
219}
220```
221*/
222
223pub mod engine;
224
225// Re-export all public APIs for easier access
226pub use async_trait::async_trait;
227pub use engine::error::{DataflowError, ErrorInfo, Result};
228pub use engine::RetryConfig;
229pub use engine::{AsyncFunctionHandler, Engine, Task, Workflow};