complete_workflow/
complete_workflow.rs1use dataflow_rs::{engine::message::Message, Engine, Workflow};
2use serde_json::json;
3
4#[tokio::main]
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6 let mut engine = Engine::new();
8
9 let workflow_json = r#"
14 {
15 "id": "complete_workflow",
16 "name": "Complete Workflow Example",
17 "description": "Demonstrates fetch -> enrich -> validate flow",
18 "condition": { "==": [true, true] },
19 "tasks": [
20 {
21 "id": "fetch_user_data",
22 "name": "Fetch User Data",
23 "description": "Get user data from a public API",
24 "function": {
25 "name": "http",
26 "input": {
27 "url": "https://jsonplaceholder.typicode.com/users/1",
28 "method": "GET",
29 "headers": {
30 "Accept": "application/json"
31 }
32 }
33 }
34 },
35 {
36 "id": "initialize_user",
37 "name": "Initialize User Structure",
38 "description": "Create empty user object in data",
39 "function": {
40 "name": "map",
41 "input": {
42 "mappings": [
43 {
44 "path": "data",
45 "logic": { "preserve": {"user": {}} }
46 }
47 ]
48 }
49 }
50 },
51 {
52 "id": "transform_data",
53 "name": "Transform Data",
54 "description": "Map API response to our data model",
55 "function": {
56 "name": "map",
57 "input": {
58 "mappings": [
59 {
60 "path": "data.user.id",
61 "logic": { "var": "temp_data.body.id" }
62 },
63 {
64 "path": "data.user.name",
65 "logic": { "var": "temp_data.body.name" }
66 },
67 {
68 "path": "data.user.email",
69 "logic": { "var": "temp_data.body.email" }
70 },
71 {
72 "path": "data.user.address",
73 "logic": {
74 "cat": [
75 { "var": "temp_data.body.address.street" },
76 ", ",
77 { "var": "temp_data.body.address.city" }
78 ]
79 }
80 },
81 {
82 "path": "data.user.company",
83 "logic": { "var": "temp_data.body.company.name" }
84 }
85 ]
86 }
87 }
88 },
89 {
90 "id": "validate_user_data",
91 "name": "Validate User Data",
92 "description": "Ensure the user data meets our requirements",
93 "function": {
94 "name": "validate",
95 "input": {
96 "rules": [
97 {
98 "path": "data",
99 "logic": { "!!": { "var": "data.user.id" } },
100 "message": "User ID is required"
101 },
102 {
103 "path": "data",
104 "logic": { "!!": { "var": "data.user.name" } },
105 "message": "User name is required"
106 },
107 {
108 "path": "data",
109 "logic": { "!!": { "var": "data.user.email" } },
110 "message": "User email is required"
111 },
112 {
113 "path": "data",
114 "logic": {
115 "in": [
116 "@",
117 { "var": "data.user.email" }
118 ]
119 },
120 "message": "Email must be valid format"
121 }
122 ]
123 }
124 }
125 }
126 ]
127 }
128 "#;
129
130 let workflow = Workflow::from_json(workflow_json)?;
132 engine.add_workflow(&workflow);
133
134 let mut message = Message::new(&json!({}));
136
137 println!("Processing message through workflow...");
139
140 match engine.process_message(&mut message).await {
141 Ok(_) => {
142 println!("Workflow completed successfully!");
143 }
144 Err(e) => {
145 eprintln!("Error executing workflow: {:?}", e);
146 if !message.errors.is_empty() {
147 println!("\nErrors recorded in message:");
148 for err in &message.errors {
149 println!(
150 "- Workflow: {:?}, Task: {:?}, Error: {:?}",
151 err.workflow_id, err.task_id, err.error
152 );
153 }
154 }
155 }
156 }
157
158 println!(
159 "\nFull message structure:\n{}",
160 serde_json::to_string_pretty(&message)?
161 );
162
163 Ok(())
164}