pub struct Message {
pub id: String,
pub data: Value,
pub payload: Value,
pub metadata: Value,
pub temp_data: Value,
pub audit_trail: Vec<AuditTrail>,
pub errors: Vec<ErrorInfo>,
}
Fields§
§id: String
§data: Value
§payload: Value
§metadata: Value
§temp_data: Value
§audit_trail: Vec<AuditTrail>
§errors: Vec<ErrorInfo>
Errors that occurred during message processing
Implementations§
Source§impl Message
impl Message
Sourcepub fn new(payload: &Value) -> Self
pub fn new(payload: &Value) -> Self
Examples found in repository?
examples/complete_workflow.rs (line 118)
4fn main() -> Result<(), Box<dyn std::error::Error>> {
5 // Define a workflow that:
6 // 1. Prepares sample user data
7 // 2. Enriches the message with transformed data
8 // 3. Validates the enriched data
9 let workflow_json = r#"
10 {
11 "id": "complete_workflow",
12 "name": "Complete Workflow Example",
13 "priority": 0,
14 "description": "Demonstrates enrich -> validate flow",
15 "tasks": [
16 {
17 "id": "initialize_user",
18 "name": "Initialize User Structure",
19 "description": "Create empty user object in data",
20 "function": {
21 "name": "map",
22 "input": {
23 "mappings": [
24 {
25 "path": "data.user",
26 "logic": {}
27 }
28 ]
29 }
30 }
31 },
32 {
33 "id": "transform_data",
34 "name": "Transform Data",
35 "description": "Map API response to our data model",
36 "function": {
37 "name": "map",
38 "input": {
39 "mappings": [
40 {
41 "path": "data.user.id",
42 "logic": { "var": "temp_data.body.id" }
43 },
44 {
45 "path": "data.user.name",
46 "logic": { "var": "temp_data.body.name" }
47 },
48 {
49 "path": "data.user.email",
50 "logic": { "var": "temp_data.body.email" }
51 },
52 {
53 "path": "data.user.address",
54 "logic": {
55 "cat": [
56 { "var": "temp_data.body.address.street" },
57 ", ",
58 { "var": "temp_data.body.address.city" }
59 ]
60 }
61 },
62 {
63 "path": "data.user.company",
64 "logic": { "var": "temp_data.body.company.name" }
65 }
66 ]
67 }
68 }
69 },
70 {
71 "id": "validate_user_data",
72 "name": "Validate User Data",
73 "description": "Ensure the user data meets our requirements",
74 "function": {
75 "name": "validate",
76 "input": {
77 "rules": [
78 {
79 "path": "data",
80 "logic": { "!!": { "var": "data.user.id" } },
81 "message": "User ID is required"
82 },
83 {
84 "path": "data",
85 "logic": { "!!": { "var": "data.user.name" } },
86 "message": "User name is required"
87 },
88 {
89 "path": "data",
90 "logic": { "!!": { "var": "data.user.email" } },
91 "message": "User email is required"
92 },
93 {
94 "path": "data",
95 "logic": {
96 "in": [
97 "@",
98 { "var": "data.user.email" }
99 ]
100 },
101 "message": "Email must be valid format"
102 }
103 ]
104 }
105 }
106 }
107 ]
108 }
109 "#;
110
111 // Parse the workflow
112 let workflow = Workflow::from_json(workflow_json)?;
113
114 // Create the workflow engine with the workflow (built-in functions are auto-registered by default)
115 let engine = Engine::new(vec![workflow], None, None, None, None);
116
117 // Create a message to process with sample user data
118 let mut message = Message::new(&json!({}));
119
120 // Add sample user data to temp_data (simulating what would come from an API)
121 message.temp_data = json!({
122 "body": {
123 "id": 1,
124 "name": "John Doe",
125 "email": "john.doe@example.com",
126 "address": {
127 "street": "123 Main St",
128 "city": "New York"
129 },
130 "company": {
131 "name": "Acme Corp"
132 }
133 }
134 });
135
136 // Process the message through the workflow
137 println!("Processing message through workflow...");
138
139 match engine.process_message(&mut message) {
140 Ok(_) => {
141 println!("Workflow completed successfully!");
142 }
143 Err(e) => {
144 eprintln!("Error executing workflow: {e:?}");
145 if !message.errors.is_empty() {
146 println!("\nErrors recorded in message:");
147 for err in &message.errors {
148 println!(
149 "- Workflow: {:?}, Task: {:?}, Error: {:?}",
150 err.workflow_id, err.task_id, err.error_message
151 );
152 }
153 }
154 }
155 }
156
157 println!(
158 "\nFull message structure:\n{}",
159 serde_json::to_string_pretty(&message)?
160 );
161
162 Ok(())
163}
More examples
examples/benchmark.rs (line 106)
7fn main() -> Result<(), Box<dyn std::error::Error>> {
8 println!("========================================");
9 println!("DATAFLOW ENGINE BENCHMARK");
10 println!("========================================\n");
11 println!(
12 "Running {} iterations on single-threaded engine\n",
13 ITERATIONS
14 );
15
16 // Define a simple workflow with data transformation
17 let workflow_json = r#"
18 {
19 "id": "benchmark_workflow",
20 "name": "Benchmark Workflow",
21 "description": "Simple workflow for performance testing",
22 "priority": 1,
23 "tasks": [
24 {
25 "id": "transform_data",
26 "name": "Transform Data",
27 "description": "Map data fields",
28 "function": {
29 "name": "map",
30 "input": {
31 "mappings": [
32 {
33 "path": "data.user.id",
34 "logic": { "var": "temp_data.id" }
35 },
36 {
37 "path": "data.user.name",
38 "logic": { "var": "temp_data.name" }
39 },
40 {
41 "path": "data.user.email",
42 "logic": { "var": "temp_data.email" }
43 },
44 {
45 "path": "data.user.age",
46 "logic": { "+": [{ "var": "temp_data.age" }, 1] }
47 },
48 {
49 "path": "data.user.status",
50 "logic": {
51 "if": [
52 { ">": [{ "var": "temp_data.age" }, 18] },
53 "adult",
54 "minor"
55 ]
56 }
57 }
58 ]
59 }
60 }
61 },
62 {
63 "id": "validate_data",
64 "name": "Validate Data",
65 "description": "Validate transformed data",
66 "function": {
67 "name": "validate",
68 "input": {
69 "rules": [
70 {
71 "path": "data",
72 "logic": { "!!": { "var": "data.user.id" } },
73 "message": "User ID is required"
74 },
75 {
76 "path": "data",
77 "logic": { "!!": { "var": "data.user.email" } },
78 "message": "User email is required"
79 }
80 ]
81 }
82 }
83 }
84 ]
85 }
86 "#;
87
88 // Parse the workflow
89 let workflow = Workflow::from_json(workflow_json)?;
90
91 // Create the engine with built-in functions
92 let engine = Engine::new(vec![workflow], None, None, None, None);
93
94 // Sample data for benchmarking
95 let sample_data = json!({
96 "id": 12345,
97 "name": "John Doe",
98 "email": "john.doe@example.com",
99 "age": 25,
100 "department": "Engineering"
101 });
102
103 // Warm-up run
104 println!("Warming up...");
105 for _ in 0..1000 {
106 let mut message = Message::new(&json!({}));
107 message.temp_data = sample_data.clone();
108 let _ = engine.process_message(&mut message);
109 }
110
111 // Benchmark run
112 println!("Starting benchmark...\n");
113
114 let mut all_durations = Vec::with_capacity(ITERATIONS);
115 let mut success_count = 0;
116 let mut error_count = 0;
117
118 let benchmark_start = Instant::now();
119
120 for i in 0..ITERATIONS {
121 let mut message = Message::new(&json!({}));
122 message.temp_data = sample_data.clone();
123 message.metadata = json!({
124 "iteration": i,
125 "timestamp": chrono::Utc::now().to_rfc3339()
126 });
127
128 let iteration_start = Instant::now();
129 match engine.process_message(&mut message) {
130 Ok(_) => {
131 success_count += 1;
132 if message.has_errors() {
133 error_count += 1;
134 }
135 }
136 Err(_) => {
137 error_count += 1;
138 }
139 }
140 let iteration_duration = iteration_start.elapsed();
141 all_durations.push(iteration_duration);
142
143 // Progress indicator every 10k iterations
144 if (i + 1) % 10000 == 0 {
145 print!(".");
146 use std::io::Write;
147 std::io::stdout().flush()?;
148 }
149 }
150
151 let total_time = benchmark_start.elapsed();
152 println!("\n\nBenchmark Complete!");
153 println!("==========================================\n");
154
155 // Calculate statistics
156 all_durations.sort_unstable();
157 let p50 = all_durations[ITERATIONS * 50 / 100];
158 let p90 = all_durations[ITERATIONS * 90 / 100];
159 let p95 = all_durations[ITERATIONS * 95 / 100];
160 let p99 = all_durations[ITERATIONS * 99 / 100];
161 let throughput = ITERATIONS as f64 / total_time.as_secs_f64();
162
163 // Display results
164 println!("📊 PERFORMANCE METRICS");
165 println!("──────────────────────────────────────────");
166 println!("Total iterations: {:>10}", ITERATIONS);
167 println!("Successful: {:>10}", success_count);
168 println!("Errors: {:>10}", error_count);
169 println!(
170 "Total time: {:>10.3} seconds",
171 total_time.as_secs_f64()
172 );
173 println!();
174
175 println!("Messages/second: {:>10.0}", throughput);
176 println!();
177
178 println!("📉 LATENCY PERCENTILES");
179 println!("──────────────────────────────────────────");
180 println!("P50: {:>10.3} μs", p50.as_micros() as f64);
181 println!("P90: {:>10.3} μs", p90.as_micros() as f64);
182 println!("P95: {:>10.3} μs", p95.as_micros() as f64);
183 println!("P99: {:>10.3} μs", p99.as_micros() as f64);
184 println!();
185
186 Ok(())
187}
examples/custom_function.rs (line 401)
334fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
335 println!("=== Custom Function Example ===\n");
336
337 // Define a workflow that uses our custom functions
338 let workflow_json = r#"
339 {
340 "id": "custom_function_demo",
341 "name": "Custom Function Demo",
342 "description": "Demonstrates custom functions in workflow",
343 "tasks": [
344 {
345 "id": "prepare_data",
346 "name": "Prepare Data",
347 "description": "Extract and prepare data for analysis",
348 "function": {
349 "name": "map",
350 "input": {
351 "mappings": [
352 {
353 "path": "data.numbers",
354 "logic": { "var": "temp_data.measurements" }
355 },
356 {
357 "path": "data.user_id",
358 "logic": { "var": "temp_data.user_id" }
359 }
360 ]
361 }
362 }
363 },
364 {
365 "id": "calculate_stats",
366 "name": "Calculate Statistics",
367 "description": "Calculate statistical measures from numeric data",
368 "function": {
369 "name": "statistics",
370 "input": {
371 "data_path": "data.numbers",
372 "output_path": "data.stats"
373 }
374 }
375 },
376 {
377 "id": "enrich_user_data",
378 "name": "Enrich User Data",
379 "description": "Add additional user information",
380 "function": {
381 "name": "enrich_data",
382 "input": {
383 "lookup_field": "user_id",
384 "lookup_value": "user_123",
385 "output_path": "data.user_info"
386 }
387 }
388 }
389 ]
390 }
391 "#;
392
393 // Parse the first workflow
394 let workflow = Workflow::from_json(workflow_json)?;
395
396 // Demonstrate another example with different data
397 let separator = "=".repeat(50);
398 println!("\n{separator}");
399 println!("=== Second Example with Different User ===\n");
400
401 let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
402 message2.temp_data = json!({
403 "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
404 "user_id": "user_456",
405 "timestamp": "2024-01-15T11:00:00Z"
406 });
407 message2.data = json!({});
408
409 // Create a workflow for the second user
410 let workflow2_json = r#"
411 {
412 "id": "custom_function_demo_2",
413 "name": "Custom Function Demo 2",
414 "description": "Second demo with different user",
415 "tasks": [
416 {
417 "id": "prepare_data",
418 "name": "Prepare Data",
419 "function": {
420 "name": "map",
421 "input": {
422 "mappings": [
423 {
424 "path": "data.numbers",
425 "logic": { "var": "temp_data.measurements" }
426 },
427 {
428 "path": "data.user_id",
429 "logic": { "var": "temp_data.user_id" }
430 }
431 ]
432 }
433 }
434 },
435 {
436 "id": "calculate_stats",
437 "name": "Calculate Statistics",
438 "function": {
439 "name": "statistics",
440 "input": {
441 "data_path": "data.numbers",
442 "output_path": "data.analysis"
443 }
444 }
445 },
446 {
447 "id": "enrich_user_data",
448 "name": "Enrich User Data",
449 "function": {
450 "name": "enrich_data",
451 "input": {
452 "lookup_field": "user_id",
453 "lookup_value": "user_456",
454 "output_path": "data.employee_details"
455 }
456 }
457 }
458 ]
459 }
460 "#;
461
462 let workflow2 = Workflow::from_json(workflow2_json)?;
463
464 // Prepare custom functions
465 let mut custom_functions = HashMap::new();
466 custom_functions.insert(
467 "statistics".to_string(),
468 Box::new(StatisticsFunction::new()) as Box<dyn FunctionHandler + Send + Sync>,
469 );
470 custom_functions.insert(
471 "enrich_data".to_string(),
472 Box::new(DataEnrichmentFunction::new()) as Box<dyn FunctionHandler + Send + Sync>,
473 );
474 // Note: map and validate are now built-in to the Engine and will be used automatically
475
476 // Create engine with custom functions and built-ins (map/validate are always included internally)
477 let engine = Engine::new(
478 vec![workflow, workflow2],
479 Some(custom_functions),
480 None, // Use default (includes built-ins)
481 None, // Default concurrency
482 None, // Default retry config
483 );
484
485 // Create sample data for first message
486 let sample_data = json!({
487 "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
488 "user_id": "user_123",
489 "timestamp": "2024-01-15T10:30:00Z"
490 });
491
492 // Create and process first message
493 let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
494 message.temp_data = sample_data;
495 message.data = json!({});
496
497 println!("Processing message with custom functions...\n");
498
499 // Process the message through our custom workflow
500 match engine.process_message(&mut message) {
501 Ok(_) => {
502 println!("✅ Message processed successfully!\n");
503
504 println!("📊 Final Results:");
505 println!("{}\n", serde_json::to_string_pretty(&message.data)?);
506
507 println!("📋 Audit Trail:");
508 for (i, audit) in message.audit_trail.iter().enumerate() {
509 println!(
510 "{}. Task: {} (Status: {})",
511 i + 1,
512 audit.task_id,
513 audit.status_code
514 );
515 println!(" Timestamp: {}", audit.timestamp);
516 println!(" Changes: {} field(s) modified", audit.changes.len());
517 }
518
519 if message.has_errors() {
520 println!("\n⚠️ Errors encountered:");
521 for error in &message.errors {
522 println!(
523 " - {}: {:?}",
524 error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
525 error.error_message
526 );
527 }
528 }
529 }
530 Err(e) => {
531 println!("❌ Error processing message: {e:?}");
532 }
533 }
534
535 // Process second message
536 match engine.process_message(&mut message2) {
537 Ok(_) => {
538 println!("✅ Second message processed successfully!\n");
539 println!("📊 Results for user_456:");
540 println!("{}", serde_json::to_string_pretty(&message2.data)?);
541 }
542 Err(e) => {
543 println!("❌ Error processing second message: {e:?}");
544 }
545 }
546
547 println!("\n🎉 Custom function examples completed!");
548
549 Ok(())
550}
Sourcepub fn has_errors(&self) -> bool
pub fn has_errors(&self) -> bool
Check if message has errors
Examples found in repository?
examples/benchmark.rs (line 132)
7fn main() -> Result<(), Box<dyn std::error::Error>> {
8 println!("========================================");
9 println!("DATAFLOW ENGINE BENCHMARK");
10 println!("========================================\n");
11 println!(
12 "Running {} iterations on single-threaded engine\n",
13 ITERATIONS
14 );
15
16 // Define a simple workflow with data transformation
17 let workflow_json = r#"
18 {
19 "id": "benchmark_workflow",
20 "name": "Benchmark Workflow",
21 "description": "Simple workflow for performance testing",
22 "priority": 1,
23 "tasks": [
24 {
25 "id": "transform_data",
26 "name": "Transform Data",
27 "description": "Map data fields",
28 "function": {
29 "name": "map",
30 "input": {
31 "mappings": [
32 {
33 "path": "data.user.id",
34 "logic": { "var": "temp_data.id" }
35 },
36 {
37 "path": "data.user.name",
38 "logic": { "var": "temp_data.name" }
39 },
40 {
41 "path": "data.user.email",
42 "logic": { "var": "temp_data.email" }
43 },
44 {
45 "path": "data.user.age",
46 "logic": { "+": [{ "var": "temp_data.age" }, 1] }
47 },
48 {
49 "path": "data.user.status",
50 "logic": {
51 "if": [
52 { ">": [{ "var": "temp_data.age" }, 18] },
53 "adult",
54 "minor"
55 ]
56 }
57 }
58 ]
59 }
60 }
61 },
62 {
63 "id": "validate_data",
64 "name": "Validate Data",
65 "description": "Validate transformed data",
66 "function": {
67 "name": "validate",
68 "input": {
69 "rules": [
70 {
71 "path": "data",
72 "logic": { "!!": { "var": "data.user.id" } },
73 "message": "User ID is required"
74 },
75 {
76 "path": "data",
77 "logic": { "!!": { "var": "data.user.email" } },
78 "message": "User email is required"
79 }
80 ]
81 }
82 }
83 }
84 ]
85 }
86 "#;
87
88 // Parse the workflow
89 let workflow = Workflow::from_json(workflow_json)?;
90
91 // Create the engine with built-in functions
92 let engine = Engine::new(vec![workflow], None, None, None, None);
93
94 // Sample data for benchmarking
95 let sample_data = json!({
96 "id": 12345,
97 "name": "John Doe",
98 "email": "john.doe@example.com",
99 "age": 25,
100 "department": "Engineering"
101 });
102
103 // Warm-up run
104 println!("Warming up...");
105 for _ in 0..1000 {
106 let mut message = Message::new(&json!({}));
107 message.temp_data = sample_data.clone();
108 let _ = engine.process_message(&mut message);
109 }
110
111 // Benchmark run
112 println!("Starting benchmark...\n");
113
114 let mut all_durations = Vec::with_capacity(ITERATIONS);
115 let mut success_count = 0;
116 let mut error_count = 0;
117
118 let benchmark_start = Instant::now();
119
120 for i in 0..ITERATIONS {
121 let mut message = Message::new(&json!({}));
122 message.temp_data = sample_data.clone();
123 message.metadata = json!({
124 "iteration": i,
125 "timestamp": chrono::Utc::now().to_rfc3339()
126 });
127
128 let iteration_start = Instant::now();
129 match engine.process_message(&mut message) {
130 Ok(_) => {
131 success_count += 1;
132 if message.has_errors() {
133 error_count += 1;
134 }
135 }
136 Err(_) => {
137 error_count += 1;
138 }
139 }
140 let iteration_duration = iteration_start.elapsed();
141 all_durations.push(iteration_duration);
142
143 // Progress indicator every 10k iterations
144 if (i + 1) % 10000 == 0 {
145 print!(".");
146 use std::io::Write;
147 std::io::stdout().flush()?;
148 }
149 }
150
151 let total_time = benchmark_start.elapsed();
152 println!("\n\nBenchmark Complete!");
153 println!("==========================================\n");
154
155 // Calculate statistics
156 all_durations.sort_unstable();
157 let p50 = all_durations[ITERATIONS * 50 / 100];
158 let p90 = all_durations[ITERATIONS * 90 / 100];
159 let p95 = all_durations[ITERATIONS * 95 / 100];
160 let p99 = all_durations[ITERATIONS * 99 / 100];
161 let throughput = ITERATIONS as f64 / total_time.as_secs_f64();
162
163 // Display results
164 println!("📊 PERFORMANCE METRICS");
165 println!("──────────────────────────────────────────");
166 println!("Total iterations: {:>10}", ITERATIONS);
167 println!("Successful: {:>10}", success_count);
168 println!("Errors: {:>10}", error_count);
169 println!(
170 "Total time: {:>10.3} seconds",
171 total_time.as_secs_f64()
172 );
173 println!();
174
175 println!("Messages/second: {:>10.0}", throughput);
176 println!();
177
178 println!("📉 LATENCY PERCENTILES");
179 println!("──────────────────────────────────────────");
180 println!("P50: {:>10.3} μs", p50.as_micros() as f64);
181 println!("P90: {:>10.3} μs", p90.as_micros() as f64);
182 println!("P95: {:>10.3} μs", p95.as_micros() as f64);
183 println!("P99: {:>10.3} μs", p99.as_micros() as f64);
184 println!();
185
186 Ok(())
187}
More examples
examples/custom_function.rs (line 519)
334fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
335 println!("=== Custom Function Example ===\n");
336
337 // Define a workflow that uses our custom functions
338 let workflow_json = r#"
339 {
340 "id": "custom_function_demo",
341 "name": "Custom Function Demo",
342 "description": "Demonstrates custom functions in workflow",
343 "tasks": [
344 {
345 "id": "prepare_data",
346 "name": "Prepare Data",
347 "description": "Extract and prepare data for analysis",
348 "function": {
349 "name": "map",
350 "input": {
351 "mappings": [
352 {
353 "path": "data.numbers",
354 "logic": { "var": "temp_data.measurements" }
355 },
356 {
357 "path": "data.user_id",
358 "logic": { "var": "temp_data.user_id" }
359 }
360 ]
361 }
362 }
363 },
364 {
365 "id": "calculate_stats",
366 "name": "Calculate Statistics",
367 "description": "Calculate statistical measures from numeric data",
368 "function": {
369 "name": "statistics",
370 "input": {
371 "data_path": "data.numbers",
372 "output_path": "data.stats"
373 }
374 }
375 },
376 {
377 "id": "enrich_user_data",
378 "name": "Enrich User Data",
379 "description": "Add additional user information",
380 "function": {
381 "name": "enrich_data",
382 "input": {
383 "lookup_field": "user_id",
384 "lookup_value": "user_123",
385 "output_path": "data.user_info"
386 }
387 }
388 }
389 ]
390 }
391 "#;
392
393 // Parse the first workflow
394 let workflow = Workflow::from_json(workflow_json)?;
395
396 // Demonstrate another example with different data
397 let separator = "=".repeat(50);
398 println!("\n{separator}");
399 println!("=== Second Example with Different User ===\n");
400
401 let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
402 message2.temp_data = json!({
403 "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
404 "user_id": "user_456",
405 "timestamp": "2024-01-15T11:00:00Z"
406 });
407 message2.data = json!({});
408
409 // Create a workflow for the second user
410 let workflow2_json = r#"
411 {
412 "id": "custom_function_demo_2",
413 "name": "Custom Function Demo 2",
414 "description": "Second demo with different user",
415 "tasks": [
416 {
417 "id": "prepare_data",
418 "name": "Prepare Data",
419 "function": {
420 "name": "map",
421 "input": {
422 "mappings": [
423 {
424 "path": "data.numbers",
425 "logic": { "var": "temp_data.measurements" }
426 },
427 {
428 "path": "data.user_id",
429 "logic": { "var": "temp_data.user_id" }
430 }
431 ]
432 }
433 }
434 },
435 {
436 "id": "calculate_stats",
437 "name": "Calculate Statistics",
438 "function": {
439 "name": "statistics",
440 "input": {
441 "data_path": "data.numbers",
442 "output_path": "data.analysis"
443 }
444 }
445 },
446 {
447 "id": "enrich_user_data",
448 "name": "Enrich User Data",
449 "function": {
450 "name": "enrich_data",
451 "input": {
452 "lookup_field": "user_id",
453 "lookup_value": "user_456",
454 "output_path": "data.employee_details"
455 }
456 }
457 }
458 ]
459 }
460 "#;
461
462 let workflow2 = Workflow::from_json(workflow2_json)?;
463
464 // Prepare custom functions
465 let mut custom_functions = HashMap::new();
466 custom_functions.insert(
467 "statistics".to_string(),
468 Box::new(StatisticsFunction::new()) as Box<dyn FunctionHandler + Send + Sync>,
469 );
470 custom_functions.insert(
471 "enrich_data".to_string(),
472 Box::new(DataEnrichmentFunction::new()) as Box<dyn FunctionHandler + Send + Sync>,
473 );
474 // Note: map and validate are now built-in to the Engine and will be used automatically
475
476 // Create engine with custom functions and built-ins (map/validate are always included internally)
477 let engine = Engine::new(
478 vec![workflow, workflow2],
479 Some(custom_functions),
480 None, // Use default (includes built-ins)
481 None, // Default concurrency
482 None, // Default retry config
483 );
484
485 // Create sample data for first message
486 let sample_data = json!({
487 "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
488 "user_id": "user_123",
489 "timestamp": "2024-01-15T10:30:00Z"
490 });
491
492 // Create and process first message
493 let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
494 message.temp_data = sample_data;
495 message.data = json!({});
496
497 println!("Processing message with custom functions...\n");
498
499 // Process the message through our custom workflow
500 match engine.process_message(&mut message) {
501 Ok(_) => {
502 println!("✅ Message processed successfully!\n");
503
504 println!("📊 Final Results:");
505 println!("{}\n", serde_json::to_string_pretty(&message.data)?);
506
507 println!("📋 Audit Trail:");
508 for (i, audit) in message.audit_trail.iter().enumerate() {
509 println!(
510 "{}. Task: {} (Status: {})",
511 i + 1,
512 audit.task_id,
513 audit.status_code
514 );
515 println!(" Timestamp: {}", audit.timestamp);
516 println!(" Changes: {} field(s) modified", audit.changes.len());
517 }
518
519 if message.has_errors() {
520 println!("\n⚠️ Errors encountered:");
521 for error in &message.errors {
522 println!(
523 " - {}: {:?}",
524 error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
525 error.error_message
526 );
527 }
528 }
529 }
530 Err(e) => {
531 println!("❌ Error processing message: {e:?}");
532 }
533 }
534
535 // Process second message
536 match engine.process_message(&mut message2) {
537 Ok(_) => {
538 println!("✅ Second message processed successfully!\n");
539 println!("📊 Results for user_456:");
540 println!("{}", serde_json::to_string_pretty(&message2.data)?);
541 }
542 Err(e) => {
543 println!("❌ Error processing second message: {e:?}");
544 }
545 }
546
547 println!("\n🎉 Custom function examples completed!");
548
549 Ok(())
550}
Trait Implementations§
Source§impl<'de> Deserialize<'de> for Message
impl<'de> Deserialize<'de> for Message
Source§fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
Deserialize this value from the given Serde deserializer. Read more
Auto Trait Implementations§
impl Freeze for Message
impl RefUnwindSafe for Message
impl Send for Message
impl Sync for Message
impl Unpin for Message
impl UnwindSafe for Message
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more