complete_workflow/
complete_workflow.rs

1use dataflow_rs::{Engine, Workflow, engine::message::Message};
2use serde_json::json;
3
4fn main() -> Result<(), Box<dyn std::error::Error>> {
5    // Define a workflow that:
6    // 1. Prepares sample user data
7    // 2. Enriches the message with transformed data
8    // 3. Validates the enriched data
9    let workflow_json = r#"
10    {
11        "id": "complete_workflow",
12        "name": "Complete Workflow Example",
13        "priority": 0,
14        "description": "Demonstrates enrich -> validate flow",
15        "tasks": [
16            {
17                "id": "initialize_user",
18                "name": "Initialize User Structure",
19                "description": "Create empty user object in data",
20                "function": {
21                    "name": "map",
22                    "input": {
23                        "mappings": [
24                            {
25                                "path": "data.user",
26                                "logic": {}
27                            }
28                        ]
29                    }
30                }
31            },
32            {
33                "id": "transform_data",
34                "name": "Transform Data",
35                "description": "Map API response to our data model",
36                "function": {
37                    "name": "map",
38                    "input": {
39                        "mappings": [
40                            {
41                                "path": "data.user.id", 
42                                "logic": { "var": "temp_data.body.id" }
43                            },
44                            {
45                                "path": "data.user.name", 
46                                "logic": { "var": "temp_data.body.name" }
47                            },
48                            {
49                                "path": "data.user.email", 
50                                "logic": { "var": "temp_data.body.email" }
51                            },
52                            {
53                                "path": "data.user.address", 
54                                "logic": {
55                                    "cat": [
56                                        { "var": "temp_data.body.address.street" },
57                                        ", ",
58                                        { "var": "temp_data.body.address.city" }
59                                    ]
60                                }
61                            },
62                            {
63                                "path": "data.user.company", 
64                                "logic": { "var": "temp_data.body.company.name" }
65                            }
66                        ]
67                    }
68                }
69            },
70            {
71                "id": "validate_user_data",
72                "name": "Validate User Data",
73                "description": "Ensure the user data meets our requirements",
74                "function": {
75                    "name": "validate",
76                    "input": {
77                        "rules": [
78                            {
79                                "path": "data",
80                                "logic": { "!!": { "var": "data.user.id" } },
81                                "message": "User ID is required"
82                            },
83                            {
84                                "path": "data",
85                                "logic": { "!!": { "var": "data.user.name" } },
86                                "message": "User name is required"
87                            },
88                            {
89                                "path": "data",
90                                "logic": { "!!": { "var": "data.user.email" } },
91                                "message": "User email is required"
92                            },
93                            {
94                                "path": "data",
95                                "logic": {
96                                    "in": [
97                                        "@",
98                                        { "var": "data.user.email" }
99                                    ]
100                                },
101                                "message": "Email must be valid format"
102                            }
103                        ]
104                    }
105                }
106            }
107        ]
108    }
109    "#;
110
111    // Parse the workflow
112    let workflow = Workflow::from_json(workflow_json)?;
113
114    // Create the workflow engine with the workflow (built-in functions are auto-registered by default)
115    let engine = Engine::new(vec![workflow], None, None);
116
117    // Create a message to process with sample user data
118    let mut message = Message::new(&json!({}));
119
120    // Add sample user data to temp_data (simulating what would come from an API)
121    message.temp_data = json!({
122        "body": {
123            "id": 1,
124            "name": "John Doe",
125            "email": "john.doe@example.com",
126            "address": {
127                "street": "123 Main St",
128                "city": "New York"
129            },
130            "company": {
131                "name": "Acme Corp"
132            }
133        }
134    });
135
136    // Process the message through the workflow
137    println!("Processing message through workflow...");
138
139    match engine.process_message(&mut message) {
140        Ok(_) => {
141            println!("Workflow completed successfully!");
142        }
143        Err(e) => {
144            eprintln!("Error executing workflow: {e:?}");
145            if !message.errors.is_empty() {
146                println!("\nErrors recorded in message:");
147                for err in &message.errors {
148                    println!(
149                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
150                        err.workflow_id, err.task_id, err.error_message
151                    );
152                }
153            }
154        }
155    }
156
157    println!(
158        "\nFull message structure:\n{}",
159        serde_json::to_string_pretty(&message)?
160    );
161
162    Ok(())
163}