dataflow_rs/
lib.rs

1/*!
2# Dataflow-rs
3
4A lightweight, rule-driven workflow engine for building powerful data processing pipelines and nanoservices in Rust.
5
6## Overview
7
8Dataflow-rs provides a flexible and extensible framework for processing data through a series of tasks organized in workflows.
9The engine automatically routes messages through appropriate workflows based on configurable rules, and each workflow can
10contain multiple tasks that transform, validate, or enrich the data.
11
12## Key Components
13
14* **Engine**: The central component that processes messages through workflows
15* **Workflow**: A collection of tasks with conditions that determine when they should be applied
16* **Task**: An individual processing unit that performs a specific function on a message
17* **FunctionHandler**: A trait implemented by task handlers to define custom processing logic
18* **Message**: The data structure that flows through the engine, containing payload, metadata, and processing results
19
20## Built-in Functions
21
22The engine comes with several pre-registered functions:
23
24* **http**: Fetches data from external HTTP APIs
25* **map**: Maps and transforms data between different parts of a message
26* **validate**: Validates message data against rules using JSONLogic expressions
27
28## Usage Example
29
30```rust,no_run
31use dataflow_rs::{Engine, Workflow};
32use dataflow_rs::engine::message::Message;
33use serde_json::json;
34
35fn main() -> Result<(), Box<dyn std::error::Error>> {
36    // Define a workflow in JSON
37    let workflow_json = r#"
38    {
39        "id": "data_processor",
40        "name": "Data Processor",
41        "priority": 0,
42        "tasks": [
43            {
44                "id": "fetch_data",
45                "name": "Fetch Data",
46                "function": {
47                    "name": "http",
48                    "input": { "url": "https://api.example.com/data" }
49                }
50            },
51            {
52                "id": "transform_data",
53                "name": "Transform Data",
54                "function": {
55                    "name": "map",
56                    "input": {
57                        "mappings": [
58                            {
59                                "path": "data.result",
60                                "logic": { "var": "temp_data.body.value" }
61                            }
62                        ]
63                    }
64                }
65            }
66        ]
67    }
68    "#;
69
70    // Parse the workflow
71    let workflow = Workflow::from_json(workflow_json)?;
72
73    // Create the workflow engine with the workflow (built-in functions are auto-registered by default)
74    let mut engine = Engine::new(vec![workflow], None, None);
75
76    // Create a message to process
77    let mut message = Message::new(&json!({}));
78
79    // Process the message through the workflow
80    match engine.process_message(&mut message) {
81        Ok(_) => {
82            println!("Processed result: {}", message.data["result"]);
83        }
84        Err(e) => {
85            println!("Error in workflow: {:?}", e);
86        }
87    }
88
89    Ok(())
90}
91```
92
93## Error Handling
94
95The library provides a comprehensive error handling system:
96
97```rust,no_run
98use dataflow_rs::{Engine, Result, DataflowError};
99use dataflow_rs::engine::message::Message;
100use serde_json::json;
101
102fn main() -> Result<()> {
103    // ... setup workflows ...
104    let mut engine = Engine::new(vec![/* workflows */], None, None);
105
106    let mut message = Message::new(&json!({}));
107
108    // Process the message, errors will be collected but not halt execution
109    engine.process_message(&mut message)?;
110
111    // Check if there were any errors during processing
112    if message.has_errors() {
113        for error in &message.errors {
114            println!("Error in workflow: {:?}, task: {:?}: {:?}",
115                     error.workflow_id, error.task_id, error.error_message);
116        }
117    }
118
119    Ok(())
120}
121```
122
123## Extending with Custom Functions
124
125You can extend the engine with your own custom function handlers:
126
127```rust,no_run
128use dataflow_rs::{Engine, FunctionHandler, Result, Workflow};
129use dataflow_rs::engine::{FunctionConfig, message::{Change, Message}, error::DataflowError};
130use datalogic_rs::DataLogic;
131use serde_json::{json, Value};
132use std::collections::HashMap;
133
134struct CustomFunction;
135
136impl FunctionHandler for CustomFunction {
137    fn execute(
138        &self,
139        message: &mut Message,
140        config: &FunctionConfig,
141        datalogic: &DataLogic,
142    ) -> Result<(usize, Vec<Change>)> {
143        // Implement your custom logic here
144
145        // Extract the custom configuration from config
146        let input = match config {
147            FunctionConfig::Custom { input, .. } => input,
148            _ => return Err(DataflowError::Validation("Invalid configuration type".to_string())),
149        };
150
151        // Validate input
152        let required_field = input.get("field")
153            .ok_or_else(|| DataflowError::Validation("Missing required field".to_string()))?
154            .as_str()
155            .ok_or_else(|| DataflowError::Validation("Field must be a string".to_string()))?;
156
157        // Record changes for audit trail
158        let changes = vec![
159            Change {
160                path: "data.custom_field".to_string(),
161                old_value: Value::Null,
162                new_value: json!("custom value"),
163            }
164        ];
165
166        // Return success code (200) and changes
167        Ok((200, changes))
168    }
169}
170
171fn main() -> Result<()> {
172    // Create custom functions
173    let mut custom_functions = HashMap::new();
174    custom_functions.insert(
175        "custom".to_string(),
176        Box::new(CustomFunction) as Box<dyn FunctionHandler + Send + Sync>
177    );
178
179    // Create engine with workflows and custom functions
180    let engine = Engine::new(vec![/* workflows */], Some(custom_functions), None);
181
182    // Now it can be used in workflows...
183    Ok(())
184}
185```
186*/
187
188pub mod engine;
189
190// Re-export all public APIs for easier access
191pub use engine::RetryConfig;
192pub use engine::error::{DataflowError, ErrorInfo, Result};
193pub use engine::functions::{MapConfig, MapMapping, ValidationConfig, ValidationRule};
194pub use engine::message::{AuditTrail, Change, Message};
195pub use engine::{Engine, FunctionConfig, FunctionHandler, Task, Workflow};