dataflow_rs/lib.rs
1/*!
2# Dataflow-rs
3
4A lightweight rules engine for building IFTTT-style automation and data processing pipelines in Rust.
5
6## Overview
7
8Dataflow-rs provides a high-performance rules engine that follows the **IF → THEN → THAT** model:
9
10- **IF** — Define conditions using JSONLogic expressions (evaluated against `data`, `metadata`, `temp_data`)
11- **THEN** — Execute actions: data transformation, validation, or custom async logic
12- **THAT** — Chain multiple actions and rules with priority ordering
13
14Rules are defined declaratively in JSON and compiled once at startup for zero-overhead evaluation at runtime.
15
16## Key Components
17
18| Rules Engine | Workflow Engine | Description |
19|---|---|---|
20| **RulesEngine** | **Engine** | Central async component that evaluates rules and executes actions |
21| **Rule** | **Workflow** | A condition + actions bundle — IF condition THEN execute actions |
22| **Action** | **Task** | An individual processing step that performs a function on a message |
23
24* **AsyncFunctionHandler**: A trait implemented by action handlers to define custom async processing logic
25* **Message**: The data structure that flows through the engine, containing payload, metadata, and processing results
26
27## Built-in Functions
28
29The engine comes with several pre-registered functions:
30
31* **map**: Maps and transforms data between different parts of a message
32* **validate**: Validates message data against rules using JSONLogic expressions
33
34## Usage Example
35
36```rust,no_run
37use dataflow_rs::{Engine, Workflow};
38use dataflow_rs::engine::message::Message;
39use serde_json::json;
40
41#[tokio::main]
42async fn main() -> Result<(), Box<dyn std::error::Error>> {
43 // Define a workflow in JSON
44 let workflow_json = r#"
45 {
46 "id": "data_processor",
47 "name": "Data Processor",
48 "priority": 0,
49 "tasks": [
50 {
51 "id": "transform_data",
52 "name": "Transform Data",
53 "function": {
54 "name": "map",
55 "input": {
56 "mappings": [
57 {
58 "path": "data.result",
59 "logic": { "var": "temp_data.value" }
60 }
61 ]
62 }
63 }
64 }
65 ]
66 }
67 "#;
68
69 // Parse the workflow
70 let workflow = Workflow::from_json(workflow_json)?;
71
72 // Create the workflow engine with the workflow (built-in functions are auto-registered by default)
73 let engine = Engine::new(vec![workflow], None);
74
75 // Create a message to process
76 let mut message = Message::from_value(&json!({}));
77
78 // Process the message through the workflow
79 match engine.process_message(&mut message).await {
80 Ok(_) => {
81 println!("Processed result: {}", message.context["data"]["result"]);
82 }
83 Err(e) => {
84 println!("Error in workflow: {:?}", e);
85 }
86 }
87
88 Ok(())
89}
90```
91
92## Error Handling
93
94The library provides a comprehensive error handling system:
95
96```rust,no_run
97use dataflow_rs::{Engine, Result, DataflowError};
98use dataflow_rs::engine::message::Message;
99use serde_json::json;
100
101#[tokio::main]
102async fn main() -> Result<()> {
103 // ... setup workflows ...
104 let engine = Engine::new(vec![/* workflows */], None);
105
106 let mut message = Message::from_value(&json!({}));
107
108 // Process the message, errors will be collected but not halt execution
109 engine.process_message(&mut message).await?;
110
111 // Check if there were any errors during processing
112 if message.has_errors() {
113 for error in &message.errors {
114 println!("Error in workflow: {:?}, task: {:?}: {:?}",
115 error.workflow_id, error.task_id, error.message);
116 }
117 }
118
119 Ok(())
120}
121```
122
123## Extending with Custom Functions
124
125You can extend the engine with your own custom function handlers:
126
127```rust,no_run
128use dataflow_rs::{Engine, AsyncFunctionHandler, Result, Workflow};
129use dataflow_rs::engine::{FunctionConfig, message::{Change, Message}, error::DataflowError};
130use datalogic_rs::DataLogic;
131use serde_json::{json, Value};
132use std::collections::HashMap;
133use std::sync::Arc;
134use async_trait::async_trait;
135
136struct CustomFunction;
137
138#[async_trait]
139impl AsyncFunctionHandler for CustomFunction {
140 async fn execute(
141 &self,
142 message: &mut Message,
143 config: &FunctionConfig,
144 datalogic: Arc<DataLogic>,
145 ) -> Result<(usize, Vec<Change>)> {
146 // Implement your custom logic here
147
148 // Extract the custom configuration from config
149 let input = match config {
150 FunctionConfig::Custom { input, .. } => input,
151 _ => return Err(DataflowError::Validation("Invalid configuration type".to_string())),
152 };
153
154 // Validate input
155 let required_field = input.get("field")
156 .ok_or_else(|| DataflowError::Validation("Missing required field".to_string()))?
157 .as_str()
158 .ok_or_else(|| DataflowError::Validation("Field must be a string".to_string()))?;
159
160 // Record changes for audit trail
161 let changes = vec![
162 Change {
163 path: Arc::from("data.custom_field"),
164 old_value: Arc::new(Value::Null),
165 new_value: Arc::new(json!("custom value")),
166 }
167 ];
168
169 // Return success code (200) and changes
170 Ok((200, changes))
171 }
172}
173
174#[tokio::main]
175async fn main() -> Result<()> {
176 // Create custom functions
177 let mut custom_functions = HashMap::new();
178 custom_functions.insert(
179 "custom".to_string(),
180 Box::new(CustomFunction) as Box<dyn AsyncFunctionHandler + Send + Sync>
181 );
182
183 // Create engine with workflows and custom functions
184 let engine = Engine::new(vec![/* workflows */], Some(custom_functions));
185
186 // Now it can be used in workflows...
187 Ok(())
188}
189```
190*/
191
192pub mod engine;
193
194// Re-export all public APIs for easier access
195pub use engine::error::{DataflowError, ErrorInfo, Result};
196pub use engine::functions::{
197 AsyncFunctionHandler, EnrichConfig, FilterConfig, FunctionConfig, HttpCallConfig, LogConfig,
198 MapConfig, MapMapping, PublishKafkaConfig, ValidationConfig, ValidationRule,
199};
200pub use engine::message::{AuditTrail, Change, Message};
201pub use engine::trace::{ExecutionStep, ExecutionTrace, StepResult};
202pub use engine::{Engine, Task, Workflow, WorkflowStatus};
203
204/// Type alias for `Workflow` — a Rule represents an IF-THEN unit: IF condition THEN execute actions.
205pub type Rule = Workflow;
206
207/// Type alias for `Task` — an Action is an individual processing step within a rule.
208pub type Action = Task;
209
210/// Type alias for `Engine` — the RulesEngine evaluates rules and executes their actions.
211pub type RulesEngine = Engine;