Skip to main content

dataflow_rs/
lib.rs

1/*!
2# Dataflow-rs
3
4A lightweight rules engine for building IFTTT-style automation and data processing pipelines in Rust.
5
6## Overview
7
8Dataflow-rs provides a high-performance rules engine that follows the **IF → THEN → THAT** model:
9
10- **IF** — Define conditions using JSONLogic expressions (evaluated against `data`, `metadata`, `temp_data`)
11- **THEN** — Execute actions: data transformation, validation, or custom async logic
12- **THAT** — Chain multiple actions and rules with priority ordering
13
14Rules are defined declaratively in JSON and compiled once at startup for zero-overhead evaluation at runtime.
15
16## Key Components
17
18| Rules Engine | Workflow Engine | Description |
19|---|---|---|
20| **RulesEngine** | **Engine** | Central async component that evaluates rules and executes actions |
21| **Rule** | **Workflow** | A condition + actions bundle — IF condition THEN execute actions |
22| **Action** | **Task** | An individual processing step that performs a function on a message |
23
24* **AsyncFunctionHandler**: A trait implemented by action handlers to define custom async processing logic
25* **Message**: The data structure that flows through the engine, containing payload, metadata, and processing results
26
27## Built-in Functions
28
29The engine comes with several pre-registered functions:
30
31* **map**: Maps and transforms data between different parts of a message
32* **validate**: Validates message data against rules using JSONLogic expressions
33
34## Usage Example
35
36```rust,no_run
37use dataflow_rs::{Engine, Workflow};
38use dataflow_rs::engine::message::Message;
39use serde_json::json;
40
41#[tokio::main]
42async fn main() -> Result<(), Box<dyn std::error::Error>> {
43    // Define a workflow in JSON
44    let workflow_json = r#"
45    {
46        "id": "data_processor",
47        "name": "Data Processor",
48        "priority": 0,
49        "tasks": [
50            {
51                "id": "transform_data",
52                "name": "Transform Data",
53                "function": {
54                    "name": "map",
55                    "input": {
56                        "mappings": [
57                            {
58                                "path": "data.result",
59                                "logic": { "var": "temp_data.value" }
60                            }
61                        ]
62                    }
63                }
64            }
65        ]
66    }
67    "#;
68
69    // Parse the workflow
70    let workflow = Workflow::from_json(workflow_json)?;
71
72    // Create the workflow engine with the workflow (built-in functions are auto-registered by default)
73    let engine = Engine::new(vec![workflow], None);
74
75    // Create a message to process
76    let mut message = Message::from_value(&json!({}));
77
78    // Process the message through the workflow
79    match engine.process_message(&mut message).await {
80        Ok(_) => {
81            println!("Processed result: {}", message.context["data"]["result"]);
82        }
83        Err(e) => {
84            println!("Error in workflow: {:?}", e);
85        }
86    }
87
88    Ok(())
89}
90```
91
92## Error Handling
93
94The library provides a comprehensive error handling system:
95
96```rust,no_run
97use dataflow_rs::{Engine, Result, DataflowError};
98use dataflow_rs::engine::message::Message;
99use serde_json::json;
100
101#[tokio::main]
102async fn main() -> Result<()> {
103    // ... setup workflows ...
104    let engine = Engine::new(vec![/* workflows */], None);
105
106    let mut message = Message::from_value(&json!({}));
107
108    // Process the message, errors will be collected but not halt execution
109    engine.process_message(&mut message).await?;
110
111    // Check if there were any errors during processing
112    if message.has_errors() {
113        for error in &message.errors {
114            println!("Error in workflow: {:?}, task: {:?}: {:?}",
115                     error.workflow_id, error.task_id, error.message);
116        }
117    }
118
119    Ok(())
120}
121```
122
123## Extending with Custom Functions
124
125You can extend the engine with your own custom function handlers:
126
127```rust,no_run
128use dataflow_rs::{Engine, AsyncFunctionHandler, Result, Workflow};
129use dataflow_rs::engine::{FunctionConfig, message::{Change, Message}, error::DataflowError};
130use datalogic_rs::DataLogic;
131use serde_json::{json, Value};
132use std::collections::HashMap;
133use std::sync::Arc;
134use async_trait::async_trait;
135
136struct CustomFunction;
137
138#[async_trait]
139impl AsyncFunctionHandler for CustomFunction {
140    async fn execute(
141        &self,
142        message: &mut Message,
143        config: &FunctionConfig,
144        datalogic: Arc<DataLogic>,
145    ) -> Result<(usize, Vec<Change>)> {
146        // Implement your custom logic here
147
148        // Extract the custom configuration from config
149        let input = match config {
150            FunctionConfig::Custom { input, .. } => input,
151            _ => return Err(DataflowError::Validation("Invalid configuration type".to_string())),
152        };
153
154        // Validate input
155        let required_field = input.get("field")
156            .ok_or_else(|| DataflowError::Validation("Missing required field".to_string()))?
157            .as_str()
158            .ok_or_else(|| DataflowError::Validation("Field must be a string".to_string()))?;
159
160        // Record changes for audit trail
161        let changes = vec![
162            Change {
163                path: Arc::from("data.custom_field"),
164                old_value: Arc::new(Value::Null),
165                new_value: Arc::new(json!("custom value")),
166            }
167        ];
168
169        // Return success code (200) and changes
170        Ok((200, changes))
171    }
172}
173
174#[tokio::main]
175async fn main() -> Result<()> {
176    // Create custom functions
177    let mut custom_functions = HashMap::new();
178    custom_functions.insert(
179        "custom".to_string(),
180        Box::new(CustomFunction) as Box<dyn AsyncFunctionHandler + Send + Sync>
181    );
182
183    // Create engine with workflows and custom functions
184    let engine = Engine::new(vec![/* workflows */], Some(custom_functions));
185
186    // Now it can be used in workflows...
187    Ok(())
188}
189```
190*/
191
192pub mod engine;
193
194// Re-export all public APIs for easier access
195pub use engine::error::{DataflowError, ErrorInfo, Result};
196pub use engine::functions::{
197    AsyncFunctionHandler, EnrichConfig, FilterConfig, FunctionConfig, HttpCallConfig, LogConfig,
198    MapConfig, MapMapping, PublishKafkaConfig, ValidationConfig, ValidationRule,
199};
200pub use engine::message::{AuditTrail, Change, Message};
201pub use engine::trace::{ExecutionStep, ExecutionTrace, StepResult};
202pub use engine::{Engine, Task, Workflow, WorkflowStatus};
203
204/// Type alias for `Workflow` — a Rule represents an IF-THEN unit: IF condition THEN execute actions.
205pub type Rule = Workflow;
206
207/// Type alias for `Task` — an Action is an individual processing step within a rule.
208pub type Action = Task;
209
210/// Type alias for `Engine` — the RulesEngine evaluates rules and executes their actions.
211pub type RulesEngine = Engine;