complete_workflow/complete_workflow.rs
1use dataflow_rs::{Engine, Workflow, engine::message::Message};
2use serde_json::json;
3
4fn main() -> Result<(), Box<dyn std::error::Error>> {
5 // Define a workflow that:
6 // 1. Prepares sample user data
7 // 2. Enriches the message with transformed data
8 // 3. Validates the enriched data
9 let workflow_json = r#"
10 {
11 "id": "complete_workflow",
12 "name": "Complete Workflow Example",
13 "priority": 0,
14 "description": "Demonstrates enrich -> validate flow",
15 "tasks": [
16 {
17 "id": "initialize_user",
18 "name": "Initialize User Structure",
19 "description": "Create empty user object in data",
20 "function": {
21 "name": "map",
22 "input": {
23 "mappings": [
24 {
25 "path": "data.user",
26 "logic": {}
27 }
28 ]
29 }
30 }
31 },
32 {
33 "id": "transform_data",
34 "name": "Transform Data",
35 "description": "Map API response to our data model",
36 "function": {
37 "name": "map",
38 "input": {
39 "mappings": [
40 {
41 "path": "data.user.id",
42 "logic": { "var": "temp_data.body.id" }
43 },
44 {
45 "path": "data.user.name",
46 "logic": { "var": "temp_data.body.name" }
47 },
48 {
49 "path": "data.user.email",
50 "logic": { "var": "temp_data.body.email" }
51 },
52 {
53 "path": "data.user.address",
54 "logic": {
55 "cat": [
56 { "var": "temp_data.body.address.street" },
57 ", ",
58 { "var": "temp_data.body.address.city" }
59 ]
60 }
61 },
62 {
63 "path": "data.user.company",
64 "logic": { "var": "temp_data.body.company.name" }
65 }
66 ]
67 }
68 }
69 },
70 {
71 "id": "validate_user_data",
72 "name": "Validate User Data",
73 "description": "Ensure the user data meets our requirements",
74 "function": {
75 "name": "validate",
76 "input": {
77 "rules": [
78 {
79 "path": "data",
80 "logic": { "!!": { "var": "data.user.id" } },
81 "message": "User ID is required"
82 },
83 {
84 "path": "data",
85 "logic": { "!!": { "var": "data.user.name" } },
86 "message": "User name is required"
87 },
88 {
89 "path": "data",
90 "logic": { "!!": { "var": "data.user.email" } },
91 "message": "User email is required"
92 },
93 {
94 "path": "data",
95 "logic": {
96 "in": [
97 "@",
98 { "var": "data.user.email" }
99 ]
100 },
101 "message": "Email must be valid format"
102 }
103 ]
104 }
105 }
106 }
107 ]
108 }
109 "#;
110
111 // Parse the workflow
112 let workflow = Workflow::from_json(workflow_json)?;
113
114 // Create the workflow engine with the workflow (built-in functions are auto-registered by default)
115 let engine = Engine::new(vec![workflow], None, None);
116
117 // Create a message to process with sample user data
118 let mut message = Message::new(&json!({}));
119
120 // Add sample user data to temp_data (simulating what would come from an API)
121 message.temp_data = json!({
122 "body": {
123 "id": 1,
124 "name": "John Doe",
125 "email": "john.doe@example.com",
126 "address": {
127 "street": "123 Main St",
128 "city": "New York"
129 },
130 "company": {
131 "name": "Acme Corp"
132 }
133 }
134 });
135
136 // Process the message through the workflow
137 println!("Processing message through workflow...");
138
139 match engine.process_message(&mut message) {
140 Ok(_) => {
141 println!("Workflow completed successfully!");
142 }
143 Err(e) => {
144 eprintln!("Error executing workflow: {e:?}");
145 if !message.errors.is_empty() {
146 println!("\nErrors recorded in message:");
147 for err in &message.errors {
148 println!(
149 "- Workflow: {:?}, Task: {:?}, Error: {:?}",
150 err.workflow_id, err.task_id, err.error_message
151 );
152 }
153 }
154 }
155 }
156
157 println!(
158 "\nFull message structure:\n{}",
159 serde_json::to_string_pretty(&message)?
160 );
161
162 Ok(())
163}