pub struct Message {
pub id: String,
pub data: Value,
pub payload: Value,
pub metadata: Value,
pub temp_data: Value,
pub audit_trail: Vec<AuditTrail>,
pub errors: Vec<ErrorInfo>,
}
Fields§
§id: String
§data: Value
§payload: Value
§metadata: Value
§temp_data: Value
§audit_trail: Vec<AuditTrail>
§errors: Vec<ErrorInfo>
Errors that occurred during message processing
Implementations§
Source§impl Message
impl Message
Sourcepub fn new(payload: &Value) -> Self
pub fn new(payload: &Value) -> Self
Examples found in repository?
examples/benchmark.rs (line 183)
169async fn run_async_benchmark(
170 engine: &Engine,
171 sample_user_data: &Value,
172 num_iterations: usize,
173) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
174 let mut total_duration = Duration::new(0, 0);
175 let mut min_duration = Duration::new(u64::MAX, 0);
176 let mut max_duration = Duration::new(0, 0);
177 let mut all_durations = Vec::with_capacity(num_iterations);
178 let mut error_count = 0;
179
180 println!("Starting async benchmark with {num_iterations} iterations...");
181
182 for i in 0..num_iterations {
183 let mut message = Message::new(&json!({}));
184 message.temp_data = sample_user_data.clone();
185 message.data = json!({});
186 message.metadata = json!({
187 "timestamp": chrono::Utc::now().to_rfc3339(),
188 "iteration": i
189 });
190
191 let start = Instant::now();
192 match engine.process_message(&mut message).await {
193 Ok(_) => {
194 let duration = start.elapsed();
195 all_durations.push(duration);
196 total_duration += duration;
197 min_duration = min_duration.min(duration);
198 max_duration = max_duration.max(duration);
199
200 // Check for processing errors
201 if message.has_errors() {
202 error_count += 1;
203 if error_count <= 5 {
204 // Only print first 5 errors
205 println!("Processing errors in iteration {}: {:?}", i, message.errors);
206 }
207 }
208 }
209 Err(e) => {
210 error_count += 1;
211 if error_count <= 5 {
212 println!("Error in iteration {i}: {e:?}");
213 }
214 // Still record the time even for errors
215 let duration = start.elapsed();
216 all_durations.push(duration);
217 total_duration += duration;
218 min_duration = min_duration.min(duration);
219 max_duration = max_duration.max(duration);
220 }
221 }
222
223 if (i + 1) % 1000 == 0 {
224 println!("Completed {} async iterations", i + 1);
225 }
226 }
227
228 if error_count > 0 {
229 println!("Total errors encountered: {error_count}");
230 }
231
232 // Sort durations for percentile calculations
233 all_durations.sort();
234
235 let p95_idx = (num_iterations as f64 * 0.95) as usize;
236 let p99_idx = (num_iterations as f64 * 0.99) as usize;
237 let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
238 let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
239 let avg_duration = total_duration / num_iterations as u32;
240
241 println!("\nAsync Benchmark Results (v{VERSION}):");
242 println!(" Iterations: {num_iterations}");
243 println!(" Errors: {error_count}");
244 println!(" Min time: {min_duration:?}");
245 println!(" Max time: {max_duration:?}");
246 println!(" Avg time: {avg_duration:?}");
247 println!(" 95th percentile: {p95:?}");
248 println!(" 99th percentile: {p99:?}");
249 println!(" Total time: {total_duration:?}");
250
251 Ok(BenchmarkResults {
252 iterations: num_iterations,
253 min_time: min_duration,
254 max_time: max_duration,
255 avg_time: avg_duration,
256 p95: *p95,
257 p99: *p99,
258 total_time: total_duration,
259 })
260}
261
262async fn run_sync_benchmark(
263 engine: &Engine,
264 sample_user_data: &Value,
265 num_iterations: usize,
266) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
267 let mut total_duration = Duration::new(0, 0);
268 let mut min_duration = Duration::new(u64::MAX, 0);
269 let mut max_duration = Duration::new(0, 0);
270 let mut all_durations = Vec::with_capacity(num_iterations);
271 let mut error_count = 0;
272
273 println!("Starting sync-style benchmark with {num_iterations} iterations...");
274
275 for i in 0..num_iterations {
276 let mut message = Message::new(&json!({}));
277 message.temp_data = sample_user_data.clone();
278 message.data = json!({});
279 message.metadata = json!({
280 "timestamp": chrono::Utc::now().to_rfc3339(),
281 "iteration": i
282 });
283
284 let start = Instant::now();
285 // Use tokio::task::block_in_place to simulate sync behavior
286 let result = tokio::task::block_in_place(|| {
287 tokio::runtime::Handle::current().block_on(engine.process_message(&mut message))
288 });
289
290 match result {
291 Ok(_) => {
292 let duration = start.elapsed();
293 all_durations.push(duration);
294 total_duration += duration;
295 min_duration = min_duration.min(duration);
296 max_duration = max_duration.max(duration);
297
298 if message.has_errors() {
299 error_count += 1;
300 }
301 }
302 Err(e) => {
303 error_count += 1;
304 if error_count <= 5 {
305 println!("Sync error in iteration {i}: {e:?}");
306 }
307 let duration = start.elapsed();
308 all_durations.push(duration);
309 total_duration += duration;
310 min_duration = min_duration.min(duration);
311 max_duration = max_duration.max(duration);
312 }
313 }
314
315 if (i + 1) % 1000 == 0 {
316 println!("Completed {} sync iterations", i + 1);
317 }
318 }
319
320 if error_count > 0 {
321 println!("Total sync errors encountered: {error_count}");
322 }
323
324 all_durations.sort();
325
326 let p95_idx = (num_iterations as f64 * 0.95) as usize;
327 let p99_idx = (num_iterations as f64 * 0.99) as usize;
328 let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
329 let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
330 let avg_duration = total_duration / num_iterations as u32;
331
332 println!("\nSync Benchmark Results (v{VERSION}):");
333 println!(" Iterations: {num_iterations}");
334 println!(" Errors: {error_count}");
335 println!(" Min time: {min_duration:?}");
336 println!(" Max time: {max_duration:?}");
337 println!(" Avg time: {avg_duration:?}");
338 println!(" 95th percentile: {p95:?}");
339 println!(" 99th percentile: {p99:?}");
340 println!(" Total time: {total_duration:?}");
341
342 Ok(BenchmarkResults {
343 iterations: num_iterations,
344 min_time: min_duration,
345 max_time: max_duration,
346 avg_time: avg_duration,
347 p95: *p95,
348 p99: *p99,
349 total_time: total_duration,
350 })
351}
More examples
examples/complete_workflow.rs (line 136)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6 // Create the workflow engine (built-in functions are auto-registered)
7 let mut engine = Engine::new();
8
9 // Define a workflow that:
10 // 1. Fetches data from a public API
11 // 2. Enriches the message with transformed data
12 // 3. Validates the enriched data
13 let workflow_json = r#"
14 {
15 "id": "complete_workflow",
16 "name": "Complete Workflow Example",
17 "priority": 0,
18 "description": "Demonstrates fetch -> enrich -> validate flow",
19 "condition": { "==": [true, true] },
20 "tasks": [
21 {
22 "id": "fetch_user_data",
23 "name": "Fetch User Data",
24 "description": "Get user data from a public API",
25 "function": {
26 "name": "http",
27 "input": {
28 "url": "https://jsonplaceholder.typicode.com/users/1",
29 "method": "GET",
30 "headers": {
31 "Accept": "application/json"
32 }
33 }
34 }
35 },
36 {
37 "id": "initialize_user",
38 "name": "Initialize User Structure",
39 "description": "Create empty user object in data",
40 "function": {
41 "name": "map",
42 "input": {
43 "mappings": [
44 {
45 "path": "data",
46 "logic": { "preserve": {"user": {}} }
47 }
48 ]
49 }
50 }
51 },
52 {
53 "id": "transform_data",
54 "name": "Transform Data",
55 "description": "Map API response to our data model",
56 "function": {
57 "name": "map",
58 "input": {
59 "mappings": [
60 {
61 "path": "data.user.id",
62 "logic": { "var": "temp_data.body.id" }
63 },
64 {
65 "path": "data.user.name",
66 "logic": { "var": "temp_data.body.name" }
67 },
68 {
69 "path": "data.user.email",
70 "logic": { "var": "temp_data.body.email" }
71 },
72 {
73 "path": "data.user.address",
74 "logic": {
75 "cat": [
76 { "var": "temp_data.body.address.street" },
77 ", ",
78 { "var": "temp_data.body.address.city" }
79 ]
80 }
81 },
82 {
83 "path": "data.user.company",
84 "logic": { "var": "temp_data.body.company.name" }
85 }
86 ]
87 }
88 }
89 },
90 {
91 "id": "validate_user_data",
92 "name": "Validate User Data",
93 "description": "Ensure the user data meets our requirements",
94 "function": {
95 "name": "validate",
96 "input": {
97 "rules": [
98 {
99 "path": "data",
100 "logic": { "!!": { "var": "data.user.id" } },
101 "message": "User ID is required"
102 },
103 {
104 "path": "data",
105 "logic": { "!!": { "var": "data.user.name" } },
106 "message": "User name is required"
107 },
108 {
109 "path": "data",
110 "logic": { "!!": { "var": "data.user.email" } },
111 "message": "User email is required"
112 },
113 {
114 "path": "data",
115 "logic": {
116 "in": [
117 "@",
118 { "var": "data.user.email" }
119 ]
120 },
121 "message": "Email must be valid format"
122 }
123 ]
124 }
125 }
126 }
127 ]
128 }
129 "#;
130
131 // Parse and add the workflow to the engine
132 let workflow = Workflow::from_json(workflow_json)?;
133 engine.add_workflow(&workflow);
134
135 // Create a message to process with properly initialized data structure
136 let mut message = Message::new(&json!({}));
137
138 // Process the message through the workflow asynchronously
139 println!("Processing message through workflow...");
140
141 match engine.process_message(&mut message).await {
142 Ok(_) => {
143 println!("Workflow completed successfully!");
144 }
145 Err(e) => {
146 eprintln!("Error executing workflow: {e:?}");
147 if !message.errors.is_empty() {
148 println!("\nErrors recorded in message:");
149 for err in &message.errors {
150 println!(
151 "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152 err.workflow_id, err.task_id, err.error_message
153 );
154 }
155 }
156 }
157 }
158
159 println!(
160 "\nFull message structure:\n{}",
161 serde_json::to_string_pretty(&message)?
162 );
163
164 Ok(())
165}
examples/custom_function.rs (line 399)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308 println!("=== Custom Function Example ===\n");
309
310 // Create engine without built-in functions to demonstrate custom ones
311 let mut engine = Engine::new_empty();
312
313 // Register our custom functions
314 engine.register_task_function(
315 "statistics".to_string(),
316 Box::new(StatisticsFunction::new()),
317 );
318
319 engine.register_task_function(
320 "enrich_data".to_string(),
321 Box::new(DataEnrichmentFunction::new()),
322 );
323
324 // Also register built-in map function for data preparation
325 engine.register_task_function(
326 "map".to_string(),
327 Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328 );
329
330 // Define a workflow that uses our custom functions
331 let workflow_json = r#"
332 {
333 "id": "custom_function_demo",
334 "name": "Custom Function Demo",
335 "description": "Demonstrates custom async functions in workflow",
336 "condition": { "==": [true, true] },
337 "tasks": [
338 {
339 "id": "prepare_data",
340 "name": "Prepare Data",
341 "description": "Extract and prepare data for analysis",
342 "function": {
343 "name": "map",
344 "input": {
345 "mappings": [
346 {
347 "path": "data.numbers",
348 "logic": { "var": "temp_data.measurements" }
349 },
350 {
351 "path": "data.user_id",
352 "logic": { "var": "temp_data.user_id" }
353 }
354 ]
355 }
356 }
357 },
358 {
359 "id": "calculate_stats",
360 "name": "Calculate Statistics",
361 "description": "Calculate statistical measures from numeric data",
362 "function": {
363 "name": "statistics",
364 "input": {
365 "data_path": "data.numbers",
366 "output_path": "data.stats"
367 }
368 }
369 },
370 {
371 "id": "enrich_user_data",
372 "name": "Enrich User Data",
373 "description": "Add additional user information",
374 "function": {
375 "name": "enrich_data",
376 "input": {
377 "lookup_field": "user_id",
378 "lookup_value": "user_123",
379 "output_path": "data.user_info"
380 }
381 }
382 }
383 ]
384 }
385 "#;
386
387 // Parse and add the workflow
388 let workflow = Workflow::from_json(workflow_json)?;
389 engine.add_workflow(&workflow);
390
391 // Create sample data
392 let sample_data = json!({
393 "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394 "user_id": "user_123",
395 "timestamp": "2024-01-15T10:30:00Z"
396 });
397
398 // Create and process message
399 let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400 message.temp_data = sample_data;
401 message.data = json!({});
402
403 println!("Processing message with custom functions...\n");
404
405 // Process the message through our custom workflow
406 match engine.process_message(&mut message).await {
407 Ok(_) => {
408 println!("✅ Message processed successfully!\n");
409
410 println!("📊 Final Results:");
411 println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413 println!("📋 Audit Trail:");
414 for (i, audit) in message.audit_trail.iter().enumerate() {
415 println!(
416 "{}. Task: {} (Status: {})",
417 i + 1,
418 audit.task_id,
419 audit.status_code
420 );
421 println!(" Timestamp: {}", audit.timestamp);
422 println!(" Changes: {} field(s) modified", audit.changes.len());
423 }
424
425 if message.has_errors() {
426 println!("\n⚠️ Errors encountered:");
427 for error in &message.errors {
428 println!(
429 " - {}: {:?}",
430 error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431 error.error_message
432 );
433 }
434 }
435 }
436 Err(e) => {
437 println!("❌ Error processing message: {e:?}");
438 }
439 }
440
441 // Demonstrate another example with different data
442 let separator = "=".repeat(50);
443 println!("\n{separator}");
444 println!("=== Second Example with Different User ===\n");
445
446 let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447 message2.temp_data = json!({
448 "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449 "user_id": "user_456",
450 "timestamp": "2024-01-15T11:00:00Z"
451 });
452 message2.data = json!({});
453
454 // Create a workflow for the second user
455 let workflow2_json = r#"
456 {
457 "id": "custom_function_demo_2",
458 "name": "Custom Function Demo 2",
459 "description": "Second demo with different user",
460 "condition": { "==": [true, true] },
461 "tasks": [
462 {
463 "id": "prepare_data",
464 "name": "Prepare Data",
465 "function": {
466 "name": "map",
467 "input": {
468 "mappings": [
469 {
470 "path": "data.numbers",
471 "logic": { "var": "temp_data.measurements" }
472 },
473 {
474 "path": "data.user_id",
475 "logic": { "var": "temp_data.user_id" }
476 }
477 ]
478 }
479 }
480 },
481 {
482 "id": "calculate_stats",
483 "name": "Calculate Statistics",
484 "function": {
485 "name": "statistics",
486 "input": {
487 "data_path": "data.numbers",
488 "output_path": "data.analysis"
489 }
490 }
491 },
492 {
493 "id": "enrich_user_data",
494 "name": "Enrich User Data",
495 "function": {
496 "name": "enrich_data",
497 "input": {
498 "lookup_field": "user_id",
499 "lookup_value": "user_456",
500 "output_path": "data.employee_details"
501 }
502 }
503 }
504 ]
505 }
506 "#;
507
508 let workflow2 = Workflow::from_json(workflow2_json)?;
509 engine.add_workflow(&workflow2);
510
511 match engine.process_message(&mut message2).await {
512 Ok(_) => {
513 println!("✅ Second message processed successfully!\n");
514 println!("📊 Results for user_456:");
515 println!("{}", serde_json::to_string_pretty(&message2.data)?);
516 }
517 Err(e) => {
518 println!("❌ Error processing second message: {e:?}");
519 }
520 }
521
522 println!("\n🎉 Custom function examples completed!");
523
524 Ok(())
525}
Sourcepub fn has_errors(&self) -> bool
pub fn has_errors(&self) -> bool
Check if message has errors
Examples found in repository?
examples/benchmark.rs (line 201)
169async fn run_async_benchmark(
170 engine: &Engine,
171 sample_user_data: &Value,
172 num_iterations: usize,
173) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
174 let mut total_duration = Duration::new(0, 0);
175 let mut min_duration = Duration::new(u64::MAX, 0);
176 let mut max_duration = Duration::new(0, 0);
177 let mut all_durations = Vec::with_capacity(num_iterations);
178 let mut error_count = 0;
179
180 println!("Starting async benchmark with {num_iterations} iterations...");
181
182 for i in 0..num_iterations {
183 let mut message = Message::new(&json!({}));
184 message.temp_data = sample_user_data.clone();
185 message.data = json!({});
186 message.metadata = json!({
187 "timestamp": chrono::Utc::now().to_rfc3339(),
188 "iteration": i
189 });
190
191 let start = Instant::now();
192 match engine.process_message(&mut message).await {
193 Ok(_) => {
194 let duration = start.elapsed();
195 all_durations.push(duration);
196 total_duration += duration;
197 min_duration = min_duration.min(duration);
198 max_duration = max_duration.max(duration);
199
200 // Check for processing errors
201 if message.has_errors() {
202 error_count += 1;
203 if error_count <= 5 {
204 // Only print first 5 errors
205 println!("Processing errors in iteration {}: {:?}", i, message.errors);
206 }
207 }
208 }
209 Err(e) => {
210 error_count += 1;
211 if error_count <= 5 {
212 println!("Error in iteration {i}: {e:?}");
213 }
214 // Still record the time even for errors
215 let duration = start.elapsed();
216 all_durations.push(duration);
217 total_duration += duration;
218 min_duration = min_duration.min(duration);
219 max_duration = max_duration.max(duration);
220 }
221 }
222
223 if (i + 1) % 1000 == 0 {
224 println!("Completed {} async iterations", i + 1);
225 }
226 }
227
228 if error_count > 0 {
229 println!("Total errors encountered: {error_count}");
230 }
231
232 // Sort durations for percentile calculations
233 all_durations.sort();
234
235 let p95_idx = (num_iterations as f64 * 0.95) as usize;
236 let p99_idx = (num_iterations as f64 * 0.99) as usize;
237 let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
238 let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
239 let avg_duration = total_duration / num_iterations as u32;
240
241 println!("\nAsync Benchmark Results (v{VERSION}):");
242 println!(" Iterations: {num_iterations}");
243 println!(" Errors: {error_count}");
244 println!(" Min time: {min_duration:?}");
245 println!(" Max time: {max_duration:?}");
246 println!(" Avg time: {avg_duration:?}");
247 println!(" 95th percentile: {p95:?}");
248 println!(" 99th percentile: {p99:?}");
249 println!(" Total time: {total_duration:?}");
250
251 Ok(BenchmarkResults {
252 iterations: num_iterations,
253 min_time: min_duration,
254 max_time: max_duration,
255 avg_time: avg_duration,
256 p95: *p95,
257 p99: *p99,
258 total_time: total_duration,
259 })
260}
261
262async fn run_sync_benchmark(
263 engine: &Engine,
264 sample_user_data: &Value,
265 num_iterations: usize,
266) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
267 let mut total_duration = Duration::new(0, 0);
268 let mut min_duration = Duration::new(u64::MAX, 0);
269 let mut max_duration = Duration::new(0, 0);
270 let mut all_durations = Vec::with_capacity(num_iterations);
271 let mut error_count = 0;
272
273 println!("Starting sync-style benchmark with {num_iterations} iterations...");
274
275 for i in 0..num_iterations {
276 let mut message = Message::new(&json!({}));
277 message.temp_data = sample_user_data.clone();
278 message.data = json!({});
279 message.metadata = json!({
280 "timestamp": chrono::Utc::now().to_rfc3339(),
281 "iteration": i
282 });
283
284 let start = Instant::now();
285 // Use tokio::task::block_in_place to simulate sync behavior
286 let result = tokio::task::block_in_place(|| {
287 tokio::runtime::Handle::current().block_on(engine.process_message(&mut message))
288 });
289
290 match result {
291 Ok(_) => {
292 let duration = start.elapsed();
293 all_durations.push(duration);
294 total_duration += duration;
295 min_duration = min_duration.min(duration);
296 max_duration = max_duration.max(duration);
297
298 if message.has_errors() {
299 error_count += 1;
300 }
301 }
302 Err(e) => {
303 error_count += 1;
304 if error_count <= 5 {
305 println!("Sync error in iteration {i}: {e:?}");
306 }
307 let duration = start.elapsed();
308 all_durations.push(duration);
309 total_duration += duration;
310 min_duration = min_duration.min(duration);
311 max_duration = max_duration.max(duration);
312 }
313 }
314
315 if (i + 1) % 1000 == 0 {
316 println!("Completed {} sync iterations", i + 1);
317 }
318 }
319
320 if error_count > 0 {
321 println!("Total sync errors encountered: {error_count}");
322 }
323
324 all_durations.sort();
325
326 let p95_idx = (num_iterations as f64 * 0.95) as usize;
327 let p99_idx = (num_iterations as f64 * 0.99) as usize;
328 let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
329 let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
330 let avg_duration = total_duration / num_iterations as u32;
331
332 println!("\nSync Benchmark Results (v{VERSION}):");
333 println!(" Iterations: {num_iterations}");
334 println!(" Errors: {error_count}");
335 println!(" Min time: {min_duration:?}");
336 println!(" Max time: {max_duration:?}");
337 println!(" Avg time: {avg_duration:?}");
338 println!(" 95th percentile: {p95:?}");
339 println!(" 99th percentile: {p99:?}");
340 println!(" Total time: {total_duration:?}");
341
342 Ok(BenchmarkResults {
343 iterations: num_iterations,
344 min_time: min_duration,
345 max_time: max_duration,
346 avg_time: avg_duration,
347 p95: *p95,
348 p99: *p99,
349 total_time: total_duration,
350 })
351}
More examples
examples/custom_function.rs (line 425)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308 println!("=== Custom Function Example ===\n");
309
310 // Create engine without built-in functions to demonstrate custom ones
311 let mut engine = Engine::new_empty();
312
313 // Register our custom functions
314 engine.register_task_function(
315 "statistics".to_string(),
316 Box::new(StatisticsFunction::new()),
317 );
318
319 engine.register_task_function(
320 "enrich_data".to_string(),
321 Box::new(DataEnrichmentFunction::new()),
322 );
323
324 // Also register built-in map function for data preparation
325 engine.register_task_function(
326 "map".to_string(),
327 Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328 );
329
330 // Define a workflow that uses our custom functions
331 let workflow_json = r#"
332 {
333 "id": "custom_function_demo",
334 "name": "Custom Function Demo",
335 "description": "Demonstrates custom async functions in workflow",
336 "condition": { "==": [true, true] },
337 "tasks": [
338 {
339 "id": "prepare_data",
340 "name": "Prepare Data",
341 "description": "Extract and prepare data for analysis",
342 "function": {
343 "name": "map",
344 "input": {
345 "mappings": [
346 {
347 "path": "data.numbers",
348 "logic": { "var": "temp_data.measurements" }
349 },
350 {
351 "path": "data.user_id",
352 "logic": { "var": "temp_data.user_id" }
353 }
354 ]
355 }
356 }
357 },
358 {
359 "id": "calculate_stats",
360 "name": "Calculate Statistics",
361 "description": "Calculate statistical measures from numeric data",
362 "function": {
363 "name": "statistics",
364 "input": {
365 "data_path": "data.numbers",
366 "output_path": "data.stats"
367 }
368 }
369 },
370 {
371 "id": "enrich_user_data",
372 "name": "Enrich User Data",
373 "description": "Add additional user information",
374 "function": {
375 "name": "enrich_data",
376 "input": {
377 "lookup_field": "user_id",
378 "lookup_value": "user_123",
379 "output_path": "data.user_info"
380 }
381 }
382 }
383 ]
384 }
385 "#;
386
387 // Parse and add the workflow
388 let workflow = Workflow::from_json(workflow_json)?;
389 engine.add_workflow(&workflow);
390
391 // Create sample data
392 let sample_data = json!({
393 "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394 "user_id": "user_123",
395 "timestamp": "2024-01-15T10:30:00Z"
396 });
397
398 // Create and process message
399 let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400 message.temp_data = sample_data;
401 message.data = json!({});
402
403 println!("Processing message with custom functions...\n");
404
405 // Process the message through our custom workflow
406 match engine.process_message(&mut message).await {
407 Ok(_) => {
408 println!("✅ Message processed successfully!\n");
409
410 println!("📊 Final Results:");
411 println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413 println!("📋 Audit Trail:");
414 for (i, audit) in message.audit_trail.iter().enumerate() {
415 println!(
416 "{}. Task: {} (Status: {})",
417 i + 1,
418 audit.task_id,
419 audit.status_code
420 );
421 println!(" Timestamp: {}", audit.timestamp);
422 println!(" Changes: {} field(s) modified", audit.changes.len());
423 }
424
425 if message.has_errors() {
426 println!("\n⚠️ Errors encountered:");
427 for error in &message.errors {
428 println!(
429 " - {}: {:?}",
430 error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431 error.error_message
432 );
433 }
434 }
435 }
436 Err(e) => {
437 println!("❌ Error processing message: {e:?}");
438 }
439 }
440
441 // Demonstrate another example with different data
442 let separator = "=".repeat(50);
443 println!("\n{separator}");
444 println!("=== Second Example with Different User ===\n");
445
446 let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447 message2.temp_data = json!({
448 "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449 "user_id": "user_456",
450 "timestamp": "2024-01-15T11:00:00Z"
451 });
452 message2.data = json!({});
453
454 // Create a workflow for the second user
455 let workflow2_json = r#"
456 {
457 "id": "custom_function_demo_2",
458 "name": "Custom Function Demo 2",
459 "description": "Second demo with different user",
460 "condition": { "==": [true, true] },
461 "tasks": [
462 {
463 "id": "prepare_data",
464 "name": "Prepare Data",
465 "function": {
466 "name": "map",
467 "input": {
468 "mappings": [
469 {
470 "path": "data.numbers",
471 "logic": { "var": "temp_data.measurements" }
472 },
473 {
474 "path": "data.user_id",
475 "logic": { "var": "temp_data.user_id" }
476 }
477 ]
478 }
479 }
480 },
481 {
482 "id": "calculate_stats",
483 "name": "Calculate Statistics",
484 "function": {
485 "name": "statistics",
486 "input": {
487 "data_path": "data.numbers",
488 "output_path": "data.analysis"
489 }
490 }
491 },
492 {
493 "id": "enrich_user_data",
494 "name": "Enrich User Data",
495 "function": {
496 "name": "enrich_data",
497 "input": {
498 "lookup_field": "user_id",
499 "lookup_value": "user_456",
500 "output_path": "data.employee_details"
501 }
502 }
503 }
504 ]
505 }
506 "#;
507
508 let workflow2 = Workflow::from_json(workflow2_json)?;
509 engine.add_workflow(&workflow2);
510
511 match engine.process_message(&mut message2).await {
512 Ok(_) => {
513 println!("✅ Second message processed successfully!\n");
514 println!("📊 Results for user_456:");
515 println!("{}", serde_json::to_string_pretty(&message2.data)?);
516 }
517 Err(e) => {
518 println!("❌ Error processing second message: {e:?}");
519 }
520 }
521
522 println!("\n🎉 Custom function examples completed!");
523
524 Ok(())
525}
Trait Implementations§
Source§impl<'de> Deserialize<'de> for Message
impl<'de> Deserialize<'de> for Message
Source§fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
Deserialize this value from the given Serde deserializer. Read more
Auto Trait Implementations§
impl Freeze for Message
impl RefUnwindSafe for Message
impl Send for Message
impl Sync for Message
impl Unpin for Message
impl UnwindSafe for Message
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more