complete_workflow/
complete_workflow.rs

1use dataflow_rs::{engine::message::Message, Engine, Workflow};
2use serde_json::json;
3
4#[tokio::main]
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6    // Create the workflow engine (built-in functions are auto-registered)
7    let mut engine = Engine::new();
8
9    // Define a workflow that:
10    // 1. Fetches data from a public API
11    // 2. Enriches the message with transformed data
12    // 3. Validates the enriched data
13    let workflow_json = r#"
14    {
15        "id": "complete_workflow",
16        "name": "Complete Workflow Example",
17        "description": "Demonstrates fetch -> enrich -> validate flow",
18        "condition": { "==": [true, true] },
19        "tasks": [
20            {
21                "id": "fetch_user_data",
22                "name": "Fetch User Data",
23                "description": "Get user data from a public API",
24                "function": {
25                    "name": "http",
26                    "input": {
27                        "url": "https://jsonplaceholder.typicode.com/users/1",
28                        "method": "GET",
29                        "headers": {
30                            "Accept": "application/json"
31                        }
32                    }
33                }
34            },
35            {
36                "id": "initialize_user",
37                "name": "Initialize User Structure",
38                "description": "Create empty user object in data",
39                "function": {
40                    "name": "map",
41                    "input": {
42                        "mappings": [
43                            {
44                                "path": "data",
45                                "logic": { "preserve": {"user": {}} }
46                            }
47                        ]
48                    }
49                }
50            },
51            {
52                "id": "transform_data",
53                "name": "Transform Data",
54                "description": "Map API response to our data model",
55                "function": {
56                    "name": "map",
57                    "input": {
58                        "mappings": [
59                            {
60                                "path": "data.user.id", 
61                                "logic": { "var": "temp_data.body.id" }
62                            },
63                            {
64                                "path": "data.user.name", 
65                                "logic": { "var": "temp_data.body.name" }
66                            },
67                            {
68                                "path": "data.user.email", 
69                                "logic": { "var": "temp_data.body.email" }
70                            },
71                            {
72                                "path": "data.user.address", 
73                                "logic": {
74                                    "cat": [
75                                        { "var": "temp_data.body.address.street" },
76                                        ", ",
77                                        { "var": "temp_data.body.address.city" }
78                                    ]
79                                }
80                            },
81                            {
82                                "path": "data.user.company", 
83                                "logic": { "var": "temp_data.body.company.name" }
84                            }
85                        ]
86                    }
87                }
88            },
89            {
90                "id": "validate_user_data",
91                "name": "Validate User Data",
92                "description": "Ensure the user data meets our requirements",
93                "function": {
94                    "name": "validate",
95                    "input": {
96                        "rules": [
97                            {
98                                "path": "data",
99                                "logic": { "!!": { "var": "data.user.id" } },
100                                "message": "User ID is required"
101                            },
102                            {
103                                "path": "data",
104                                "logic": { "!!": { "var": "data.user.name" } },
105                                "message": "User name is required"
106                            },
107                            {
108                                "path": "data",
109                                "logic": { "!!": { "var": "data.user.email" } },
110                                "message": "User email is required"
111                            },
112                            {
113                                "path": "data",
114                                "logic": {
115                                    "in": [
116                                        "@",
117                                        { "var": "data.user.email" }
118                                    ]
119                                },
120                                "message": "Email must be valid format"
121                            }
122                        ]
123                    }
124                }
125            }
126        ]
127    }
128    "#;
129
130    // Parse and add the workflow to the engine
131    let workflow = Workflow::from_json(workflow_json)?;
132    engine.add_workflow(&workflow);
133
134    // Create a message to process with properly initialized data structure
135    let mut message = Message::new(&json!({}));
136
137    // Process the message through the workflow asynchronously
138    println!("Processing message through workflow...");
139
140    match engine.process_message(&mut message).await {
141        Ok(_) => {
142            println!("Workflow completed successfully!");
143        }
144        Err(e) => {
145            eprintln!("Error executing workflow: {:?}", e);
146            if !message.errors.is_empty() {
147                println!("\nErrors recorded in message:");
148                for err in &message.errors {
149                    println!(
150                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
151                        err.workflow_id, err.task_id, err.error
152                    );
153                }
154            }
155        }
156    }
157
158    println!(
159        "\nFull message structure:\n{}",
160        serde_json::to_string_pretty(&message)?
161    );
162
163    Ok(())
164}