Struct Message

Source
pub struct Message {
    pub id: String,
    pub data: Value,
    pub payload: Value,
    pub metadata: Value,
    pub temp_data: Value,
    pub audit_trail: Vec<AuditTrail>,
    pub errors: Vec<ErrorInfo>,
}

Fields§

§id: String§data: Value§payload: Value§metadata: Value§temp_data: Value§audit_trail: Vec<AuditTrail>§errors: Vec<ErrorInfo>

Errors that occurred during message processing

Implementations§

Source§

impl Message

Source

pub fn new(payload: &Value) -> Self

Examples found in repository?
examples/benchmark.rs (line 183)
169async fn run_async_benchmark(
170    engine: &Engine,
171    sample_user_data: &Value,
172    num_iterations: usize,
173) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
174    let mut total_duration = Duration::new(0, 0);
175    let mut min_duration = Duration::new(u64::MAX, 0);
176    let mut max_duration = Duration::new(0, 0);
177    let mut all_durations = Vec::with_capacity(num_iterations);
178    let mut error_count = 0;
179
180    println!("Starting async benchmark with {num_iterations} iterations...");
181
182    for i in 0..num_iterations {
183        let mut message = Message::new(&json!({}));
184        message.temp_data = sample_user_data.clone();
185        message.data = json!({});
186        message.metadata = json!({
187            "timestamp": chrono::Utc::now().to_rfc3339(),
188            "iteration": i
189        });
190
191        let start = Instant::now();
192        match engine.process_message(&mut message).await {
193            Ok(_) => {
194                let duration = start.elapsed();
195                all_durations.push(duration);
196                total_duration += duration;
197                min_duration = min_duration.min(duration);
198                max_duration = max_duration.max(duration);
199
200                // Check for processing errors
201                if message.has_errors() {
202                    error_count += 1;
203                    if error_count <= 5 {
204                        // Only print first 5 errors
205                        println!("Processing errors in iteration {}: {:?}", i, message.errors);
206                    }
207                }
208            }
209            Err(e) => {
210                error_count += 1;
211                if error_count <= 5 {
212                    println!("Error in iteration {i}: {e:?}");
213                }
214                // Still record the time even for errors
215                let duration = start.elapsed();
216                all_durations.push(duration);
217                total_duration += duration;
218                min_duration = min_duration.min(duration);
219                max_duration = max_duration.max(duration);
220            }
221        }
222
223        if (i + 1) % 1000 == 0 {
224            println!("Completed {} async iterations", i + 1);
225        }
226    }
227
228    if error_count > 0 {
229        println!("Total errors encountered: {error_count}");
230    }
231
232    // Sort durations for percentile calculations
233    all_durations.sort();
234
235    let p95_idx = (num_iterations as f64 * 0.95) as usize;
236    let p99_idx = (num_iterations as f64 * 0.99) as usize;
237    let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
238    let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
239    let avg_duration = total_duration / num_iterations as u32;
240
241    println!("\nAsync Benchmark Results (v{VERSION}):");
242    println!("  Iterations: {num_iterations}");
243    println!("  Errors: {error_count}");
244    println!("  Min time: {min_duration:?}");
245    println!("  Max time: {max_duration:?}");
246    println!("  Avg time: {avg_duration:?}");
247    println!("  95th percentile: {p95:?}");
248    println!("  99th percentile: {p99:?}");
249    println!("  Total time: {total_duration:?}");
250
251    Ok(BenchmarkResults {
252        iterations: num_iterations,
253        min_time: min_duration,
254        max_time: max_duration,
255        avg_time: avg_duration,
256        p95: *p95,
257        p99: *p99,
258        total_time: total_duration,
259    })
260}
261
262async fn run_sync_benchmark(
263    engine: &Engine,
264    sample_user_data: &Value,
265    num_iterations: usize,
266) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
267    let mut total_duration = Duration::new(0, 0);
268    let mut min_duration = Duration::new(u64::MAX, 0);
269    let mut max_duration = Duration::new(0, 0);
270    let mut all_durations = Vec::with_capacity(num_iterations);
271    let mut error_count = 0;
272
273    println!("Starting sync-style benchmark with {num_iterations} iterations...");
274
275    for i in 0..num_iterations {
276        let mut message = Message::new(&json!({}));
277        message.temp_data = sample_user_data.clone();
278        message.data = json!({});
279        message.metadata = json!({
280            "timestamp": chrono::Utc::now().to_rfc3339(),
281            "iteration": i
282        });
283
284        let start = Instant::now();
285        // Use tokio::task::block_in_place to simulate sync behavior
286        let result = tokio::task::block_in_place(|| {
287            tokio::runtime::Handle::current().block_on(engine.process_message(&mut message))
288        });
289
290        match result {
291            Ok(_) => {
292                let duration = start.elapsed();
293                all_durations.push(duration);
294                total_duration += duration;
295                min_duration = min_duration.min(duration);
296                max_duration = max_duration.max(duration);
297
298                if message.has_errors() {
299                    error_count += 1;
300                }
301            }
302            Err(e) => {
303                error_count += 1;
304                if error_count <= 5 {
305                    println!("Sync error in iteration {i}: {e:?}");
306                }
307                let duration = start.elapsed();
308                all_durations.push(duration);
309                total_duration += duration;
310                min_duration = min_duration.min(duration);
311                max_duration = max_duration.max(duration);
312            }
313        }
314
315        if (i + 1) % 1000 == 0 {
316            println!("Completed {} sync iterations", i + 1);
317        }
318    }
319
320    if error_count > 0 {
321        println!("Total sync errors encountered: {error_count}");
322    }
323
324    all_durations.sort();
325
326    let p95_idx = (num_iterations as f64 * 0.95) as usize;
327    let p99_idx = (num_iterations as f64 * 0.99) as usize;
328    let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
329    let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
330    let avg_duration = total_duration / num_iterations as u32;
331
332    println!("\nSync Benchmark Results (v{VERSION}):");
333    println!("  Iterations: {num_iterations}");
334    println!("  Errors: {error_count}");
335    println!("  Min time: {min_duration:?}");
336    println!("  Max time: {max_duration:?}");
337    println!("  Avg time: {avg_duration:?}");
338    println!("  95th percentile: {p95:?}");
339    println!("  99th percentile: {p99:?}");
340    println!("  Total time: {total_duration:?}");
341
342    Ok(BenchmarkResults {
343        iterations: num_iterations,
344        min_time: min_duration,
345        max_time: max_duration,
346        avg_time: avg_duration,
347        p95: *p95,
348        p99: *p99,
349        total_time: total_duration,
350    })
351}
More examples
Hide additional examples
examples/complete_workflow.rs (line 136)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6    // Create the workflow engine (built-in functions are auto-registered)
7    let mut engine = Engine::new();
8
9    // Define a workflow that:
10    // 1. Fetches data from a public API
11    // 2. Enriches the message with transformed data
12    // 3. Validates the enriched data
13    let workflow_json = r#"
14    {
15        "id": "complete_workflow",
16        "name": "Complete Workflow Example",
17        "priority": 0,
18        "description": "Demonstrates fetch -> enrich -> validate flow",
19        "condition": { "==": [true, true] },
20        "tasks": [
21            {
22                "id": "fetch_user_data",
23                "name": "Fetch User Data",
24                "description": "Get user data from a public API",
25                "function": {
26                    "name": "http",
27                    "input": {
28                        "url": "https://jsonplaceholder.typicode.com/users/1",
29                        "method": "GET",
30                        "headers": {
31                            "Accept": "application/json"
32                        }
33                    }
34                }
35            },
36            {
37                "id": "initialize_user",
38                "name": "Initialize User Structure",
39                "description": "Create empty user object in data",
40                "function": {
41                    "name": "map",
42                    "input": {
43                        "mappings": [
44                            {
45                                "path": "data",
46                                "logic": { "preserve": {"user": {}} }
47                            }
48                        ]
49                    }
50                }
51            },
52            {
53                "id": "transform_data",
54                "name": "Transform Data",
55                "description": "Map API response to our data model",
56                "function": {
57                    "name": "map",
58                    "input": {
59                        "mappings": [
60                            {
61                                "path": "data.user.id", 
62                                "logic": { "var": "temp_data.body.id" }
63                            },
64                            {
65                                "path": "data.user.name", 
66                                "logic": { "var": "temp_data.body.name" }
67                            },
68                            {
69                                "path": "data.user.email", 
70                                "logic": { "var": "temp_data.body.email" }
71                            },
72                            {
73                                "path": "data.user.address", 
74                                "logic": {
75                                    "cat": [
76                                        { "var": "temp_data.body.address.street" },
77                                        ", ",
78                                        { "var": "temp_data.body.address.city" }
79                                    ]
80                                }
81                            },
82                            {
83                                "path": "data.user.company", 
84                                "logic": { "var": "temp_data.body.company.name" }
85                            }
86                        ]
87                    }
88                }
89            },
90            {
91                "id": "validate_user_data",
92                "name": "Validate User Data",
93                "description": "Ensure the user data meets our requirements",
94                "function": {
95                    "name": "validate",
96                    "input": {
97                        "rules": [
98                            {
99                                "path": "data",
100                                "logic": { "!!": { "var": "data.user.id" } },
101                                "message": "User ID is required"
102                            },
103                            {
104                                "path": "data",
105                                "logic": { "!!": { "var": "data.user.name" } },
106                                "message": "User name is required"
107                            },
108                            {
109                                "path": "data",
110                                "logic": { "!!": { "var": "data.user.email" } },
111                                "message": "User email is required"
112                            },
113                            {
114                                "path": "data",
115                                "logic": {
116                                    "in": [
117                                        "@",
118                                        { "var": "data.user.email" }
119                                    ]
120                                },
121                                "message": "Email must be valid format"
122                            }
123                        ]
124                    }
125                }
126            }
127        ]
128    }
129    "#;
130
131    // Parse and add the workflow to the engine
132    let workflow = Workflow::from_json(workflow_json)?;
133    engine.add_workflow(&workflow);
134
135    // Create a message to process with properly initialized data structure
136    let mut message = Message::new(&json!({}));
137
138    // Process the message through the workflow asynchronously
139    println!("Processing message through workflow...");
140
141    match engine.process_message(&mut message).await {
142        Ok(_) => {
143            println!("Workflow completed successfully!");
144        }
145        Err(e) => {
146            eprintln!("Error executing workflow: {e:?}");
147            if !message.errors.is_empty() {
148                println!("\nErrors recorded in message:");
149                for err in &message.errors {
150                    println!(
151                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152                        err.workflow_id, err.task_id, err.error_message
153                    );
154                }
155            }
156        }
157    }
158
159    println!(
160        "\nFull message structure:\n{}",
161        serde_json::to_string_pretty(&message)?
162    );
163
164    Ok(())
165}
examples/custom_function.rs (line 399)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308    println!("=== Custom Function Example ===\n");
309
310    // Create engine without built-in functions to demonstrate custom ones
311    let mut engine = Engine::new_empty();
312
313    // Register our custom functions
314    engine.register_task_function(
315        "statistics".to_string(),
316        Box::new(StatisticsFunction::new()),
317    );
318
319    engine.register_task_function(
320        "enrich_data".to_string(),
321        Box::new(DataEnrichmentFunction::new()),
322    );
323
324    // Also register built-in map function for data preparation
325    engine.register_task_function(
326        "map".to_string(),
327        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328    );
329
330    // Define a workflow that uses our custom functions
331    let workflow_json = r#"
332    {
333        "id": "custom_function_demo",
334        "name": "Custom Function Demo",
335        "description": "Demonstrates custom async functions in workflow",
336        "condition": { "==": [true, true] },
337        "tasks": [
338            {
339                "id": "prepare_data",
340                "name": "Prepare Data",
341                "description": "Extract and prepare data for analysis",
342                "function": {
343                    "name": "map",
344                    "input": {
345                        "mappings": [
346                            {
347                                "path": "data.numbers",
348                                "logic": { "var": "temp_data.measurements" }
349                            },
350                            {
351                                "path": "data.user_id",
352                                "logic": { "var": "temp_data.user_id" }
353                            }
354                        ]
355                    }
356                }
357            },
358            {
359                "id": "calculate_stats",
360                "name": "Calculate Statistics",
361                "description": "Calculate statistical measures from numeric data",
362                "function": {
363                    "name": "statistics",
364                    "input": {
365                        "data_path": "data.numbers",
366                        "output_path": "data.stats"
367                    }
368                }
369            },
370            {
371                "id": "enrich_user_data",
372                "name": "Enrich User Data",
373                "description": "Add additional user information",
374                "function": {
375                    "name": "enrich_data",
376                    "input": {
377                        "lookup_field": "user_id",
378                        "lookup_value": "user_123",
379                        "output_path": "data.user_info"
380                    }
381                }
382            }
383        ]
384    }
385    "#;
386
387    // Parse and add the workflow
388    let workflow = Workflow::from_json(workflow_json)?;
389    engine.add_workflow(&workflow);
390
391    // Create sample data
392    let sample_data = json!({
393        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394        "user_id": "user_123",
395        "timestamp": "2024-01-15T10:30:00Z"
396    });
397
398    // Create and process message
399    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400    message.temp_data = sample_data;
401    message.data = json!({});
402
403    println!("Processing message with custom functions...\n");
404
405    // Process the message through our custom workflow
406    match engine.process_message(&mut message).await {
407        Ok(_) => {
408            println!("✅ Message processed successfully!\n");
409
410            println!("📊 Final Results:");
411            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413            println!("📋 Audit Trail:");
414            for (i, audit) in message.audit_trail.iter().enumerate() {
415                println!(
416                    "{}. Task: {} (Status: {})",
417                    i + 1,
418                    audit.task_id,
419                    audit.status_code
420                );
421                println!("   Timestamp: {}", audit.timestamp);
422                println!("   Changes: {} field(s) modified", audit.changes.len());
423            }
424
425            if message.has_errors() {
426                println!("\n⚠️  Errors encountered:");
427                for error in &message.errors {
428                    println!(
429                        "   - {}: {:?}",
430                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431                        error.error_message
432                    );
433                }
434            }
435        }
436        Err(e) => {
437            println!("❌ Error processing message: {e:?}");
438        }
439    }
440
441    // Demonstrate another example with different data
442    let separator = "=".repeat(50);
443    println!("\n{separator}");
444    println!("=== Second Example with Different User ===\n");
445
446    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447    message2.temp_data = json!({
448        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449        "user_id": "user_456",
450        "timestamp": "2024-01-15T11:00:00Z"
451    });
452    message2.data = json!({});
453
454    // Create a workflow for the second user
455    let workflow2_json = r#"
456    {
457        "id": "custom_function_demo_2",
458        "name": "Custom Function Demo 2",
459        "description": "Second demo with different user",
460        "condition": { "==": [true, true] },
461        "tasks": [
462            {
463                "id": "prepare_data",
464                "name": "Prepare Data",
465                "function": {
466                    "name": "map",
467                    "input": {
468                        "mappings": [
469                            {
470                                "path": "data.numbers",
471                                "logic": { "var": "temp_data.measurements" }
472                            },
473                            {
474                                "path": "data.user_id",
475                                "logic": { "var": "temp_data.user_id" }
476                            }
477                        ]
478                    }
479                }
480            },
481            {
482                "id": "calculate_stats",
483                "name": "Calculate Statistics",
484                "function": {
485                    "name": "statistics",
486                    "input": {
487                        "data_path": "data.numbers",
488                        "output_path": "data.analysis"
489                    }
490                }
491            },
492            {
493                "id": "enrich_user_data",
494                "name": "Enrich User Data",
495                "function": {
496                    "name": "enrich_data",
497                    "input": {
498                        "lookup_field": "user_id",
499                        "lookup_value": "user_456",
500                        "output_path": "data.employee_details"
501                    }
502                }
503            }
504        ]
505    }
506    "#;
507
508    let workflow2 = Workflow::from_json(workflow2_json)?;
509    engine.add_workflow(&workflow2);
510
511    match engine.process_message(&mut message2).await {
512        Ok(_) => {
513            println!("✅ Second message processed successfully!\n");
514            println!("📊 Results for user_456:");
515            println!("{}", serde_json::to_string_pretty(&message2.data)?);
516        }
517        Err(e) => {
518            println!("❌ Error processing second message: {e:?}");
519        }
520    }
521
522    println!("\n🎉 Custom function examples completed!");
523
524    Ok(())
525}
Source

pub fn add_error(&mut self, error: ErrorInfo)

Add an error to the message

Source

pub fn has_errors(&self) -> bool

Check if message has errors

Examples found in repository?
examples/benchmark.rs (line 201)
169async fn run_async_benchmark(
170    engine: &Engine,
171    sample_user_data: &Value,
172    num_iterations: usize,
173) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
174    let mut total_duration = Duration::new(0, 0);
175    let mut min_duration = Duration::new(u64::MAX, 0);
176    let mut max_duration = Duration::new(0, 0);
177    let mut all_durations = Vec::with_capacity(num_iterations);
178    let mut error_count = 0;
179
180    println!("Starting async benchmark with {num_iterations} iterations...");
181
182    for i in 0..num_iterations {
183        let mut message = Message::new(&json!({}));
184        message.temp_data = sample_user_data.clone();
185        message.data = json!({});
186        message.metadata = json!({
187            "timestamp": chrono::Utc::now().to_rfc3339(),
188            "iteration": i
189        });
190
191        let start = Instant::now();
192        match engine.process_message(&mut message).await {
193            Ok(_) => {
194                let duration = start.elapsed();
195                all_durations.push(duration);
196                total_duration += duration;
197                min_duration = min_duration.min(duration);
198                max_duration = max_duration.max(duration);
199
200                // Check for processing errors
201                if message.has_errors() {
202                    error_count += 1;
203                    if error_count <= 5 {
204                        // Only print first 5 errors
205                        println!("Processing errors in iteration {}: {:?}", i, message.errors);
206                    }
207                }
208            }
209            Err(e) => {
210                error_count += 1;
211                if error_count <= 5 {
212                    println!("Error in iteration {i}: {e:?}");
213                }
214                // Still record the time even for errors
215                let duration = start.elapsed();
216                all_durations.push(duration);
217                total_duration += duration;
218                min_duration = min_duration.min(duration);
219                max_duration = max_duration.max(duration);
220            }
221        }
222
223        if (i + 1) % 1000 == 0 {
224            println!("Completed {} async iterations", i + 1);
225        }
226    }
227
228    if error_count > 0 {
229        println!("Total errors encountered: {error_count}");
230    }
231
232    // Sort durations for percentile calculations
233    all_durations.sort();
234
235    let p95_idx = (num_iterations as f64 * 0.95) as usize;
236    let p99_idx = (num_iterations as f64 * 0.99) as usize;
237    let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
238    let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
239    let avg_duration = total_duration / num_iterations as u32;
240
241    println!("\nAsync Benchmark Results (v{VERSION}):");
242    println!("  Iterations: {num_iterations}");
243    println!("  Errors: {error_count}");
244    println!("  Min time: {min_duration:?}");
245    println!("  Max time: {max_duration:?}");
246    println!("  Avg time: {avg_duration:?}");
247    println!("  95th percentile: {p95:?}");
248    println!("  99th percentile: {p99:?}");
249    println!("  Total time: {total_duration:?}");
250
251    Ok(BenchmarkResults {
252        iterations: num_iterations,
253        min_time: min_duration,
254        max_time: max_duration,
255        avg_time: avg_duration,
256        p95: *p95,
257        p99: *p99,
258        total_time: total_duration,
259    })
260}
261
262async fn run_sync_benchmark(
263    engine: &Engine,
264    sample_user_data: &Value,
265    num_iterations: usize,
266) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
267    let mut total_duration = Duration::new(0, 0);
268    let mut min_duration = Duration::new(u64::MAX, 0);
269    let mut max_duration = Duration::new(0, 0);
270    let mut all_durations = Vec::with_capacity(num_iterations);
271    let mut error_count = 0;
272
273    println!("Starting sync-style benchmark with {num_iterations} iterations...");
274
275    for i in 0..num_iterations {
276        let mut message = Message::new(&json!({}));
277        message.temp_data = sample_user_data.clone();
278        message.data = json!({});
279        message.metadata = json!({
280            "timestamp": chrono::Utc::now().to_rfc3339(),
281            "iteration": i
282        });
283
284        let start = Instant::now();
285        // Use tokio::task::block_in_place to simulate sync behavior
286        let result = tokio::task::block_in_place(|| {
287            tokio::runtime::Handle::current().block_on(engine.process_message(&mut message))
288        });
289
290        match result {
291            Ok(_) => {
292                let duration = start.elapsed();
293                all_durations.push(duration);
294                total_duration += duration;
295                min_duration = min_duration.min(duration);
296                max_duration = max_duration.max(duration);
297
298                if message.has_errors() {
299                    error_count += 1;
300                }
301            }
302            Err(e) => {
303                error_count += 1;
304                if error_count <= 5 {
305                    println!("Sync error in iteration {i}: {e:?}");
306                }
307                let duration = start.elapsed();
308                all_durations.push(duration);
309                total_duration += duration;
310                min_duration = min_duration.min(duration);
311                max_duration = max_duration.max(duration);
312            }
313        }
314
315        if (i + 1) % 1000 == 0 {
316            println!("Completed {} sync iterations", i + 1);
317        }
318    }
319
320    if error_count > 0 {
321        println!("Total sync errors encountered: {error_count}");
322    }
323
324    all_durations.sort();
325
326    let p95_idx = (num_iterations as f64 * 0.95) as usize;
327    let p99_idx = (num_iterations as f64 * 0.99) as usize;
328    let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
329    let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
330    let avg_duration = total_duration / num_iterations as u32;
331
332    println!("\nSync Benchmark Results (v{VERSION}):");
333    println!("  Iterations: {num_iterations}");
334    println!("  Errors: {error_count}");
335    println!("  Min time: {min_duration:?}");
336    println!("  Max time: {max_duration:?}");
337    println!("  Avg time: {avg_duration:?}");
338    println!("  95th percentile: {p95:?}");
339    println!("  99th percentile: {p99:?}");
340    println!("  Total time: {total_duration:?}");
341
342    Ok(BenchmarkResults {
343        iterations: num_iterations,
344        min_time: min_duration,
345        max_time: max_duration,
346        avg_time: avg_duration,
347        p95: *p95,
348        p99: *p99,
349        total_time: total_duration,
350    })
351}
More examples
Hide additional examples
examples/custom_function.rs (line 425)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308    println!("=== Custom Function Example ===\n");
309
310    // Create engine without built-in functions to demonstrate custom ones
311    let mut engine = Engine::new_empty();
312
313    // Register our custom functions
314    engine.register_task_function(
315        "statistics".to_string(),
316        Box::new(StatisticsFunction::new()),
317    );
318
319    engine.register_task_function(
320        "enrich_data".to_string(),
321        Box::new(DataEnrichmentFunction::new()),
322    );
323
324    // Also register built-in map function for data preparation
325    engine.register_task_function(
326        "map".to_string(),
327        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328    );
329
330    // Define a workflow that uses our custom functions
331    let workflow_json = r#"
332    {
333        "id": "custom_function_demo",
334        "name": "Custom Function Demo",
335        "description": "Demonstrates custom async functions in workflow",
336        "condition": { "==": [true, true] },
337        "tasks": [
338            {
339                "id": "prepare_data",
340                "name": "Prepare Data",
341                "description": "Extract and prepare data for analysis",
342                "function": {
343                    "name": "map",
344                    "input": {
345                        "mappings": [
346                            {
347                                "path": "data.numbers",
348                                "logic": { "var": "temp_data.measurements" }
349                            },
350                            {
351                                "path": "data.user_id",
352                                "logic": { "var": "temp_data.user_id" }
353                            }
354                        ]
355                    }
356                }
357            },
358            {
359                "id": "calculate_stats",
360                "name": "Calculate Statistics",
361                "description": "Calculate statistical measures from numeric data",
362                "function": {
363                    "name": "statistics",
364                    "input": {
365                        "data_path": "data.numbers",
366                        "output_path": "data.stats"
367                    }
368                }
369            },
370            {
371                "id": "enrich_user_data",
372                "name": "Enrich User Data",
373                "description": "Add additional user information",
374                "function": {
375                    "name": "enrich_data",
376                    "input": {
377                        "lookup_field": "user_id",
378                        "lookup_value": "user_123",
379                        "output_path": "data.user_info"
380                    }
381                }
382            }
383        ]
384    }
385    "#;
386
387    // Parse and add the workflow
388    let workflow = Workflow::from_json(workflow_json)?;
389    engine.add_workflow(&workflow);
390
391    // Create sample data
392    let sample_data = json!({
393        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394        "user_id": "user_123",
395        "timestamp": "2024-01-15T10:30:00Z"
396    });
397
398    // Create and process message
399    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400    message.temp_data = sample_data;
401    message.data = json!({});
402
403    println!("Processing message with custom functions...\n");
404
405    // Process the message through our custom workflow
406    match engine.process_message(&mut message).await {
407        Ok(_) => {
408            println!("✅ Message processed successfully!\n");
409
410            println!("📊 Final Results:");
411            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413            println!("📋 Audit Trail:");
414            for (i, audit) in message.audit_trail.iter().enumerate() {
415                println!(
416                    "{}. Task: {} (Status: {})",
417                    i + 1,
418                    audit.task_id,
419                    audit.status_code
420                );
421                println!("   Timestamp: {}", audit.timestamp);
422                println!("   Changes: {} field(s) modified", audit.changes.len());
423            }
424
425            if message.has_errors() {
426                println!("\n⚠️  Errors encountered:");
427                for error in &message.errors {
428                    println!(
429                        "   - {}: {:?}",
430                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431                        error.error_message
432                    );
433                }
434            }
435        }
436        Err(e) => {
437            println!("❌ Error processing message: {e:?}");
438        }
439    }
440
441    // Demonstrate another example with different data
442    let separator = "=".repeat(50);
443    println!("\n{separator}");
444    println!("=== Second Example with Different User ===\n");
445
446    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447    message2.temp_data = json!({
448        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449        "user_id": "user_456",
450        "timestamp": "2024-01-15T11:00:00Z"
451    });
452    message2.data = json!({});
453
454    // Create a workflow for the second user
455    let workflow2_json = r#"
456    {
457        "id": "custom_function_demo_2",
458        "name": "Custom Function Demo 2",
459        "description": "Second demo with different user",
460        "condition": { "==": [true, true] },
461        "tasks": [
462            {
463                "id": "prepare_data",
464                "name": "Prepare Data",
465                "function": {
466                    "name": "map",
467                    "input": {
468                        "mappings": [
469                            {
470                                "path": "data.numbers",
471                                "logic": { "var": "temp_data.measurements" }
472                            },
473                            {
474                                "path": "data.user_id",
475                                "logic": { "var": "temp_data.user_id" }
476                            }
477                        ]
478                    }
479                }
480            },
481            {
482                "id": "calculate_stats",
483                "name": "Calculate Statistics",
484                "function": {
485                    "name": "statistics",
486                    "input": {
487                        "data_path": "data.numbers",
488                        "output_path": "data.analysis"
489                    }
490                }
491            },
492            {
493                "id": "enrich_user_data",
494                "name": "Enrich User Data",
495                "function": {
496                    "name": "enrich_data",
497                    "input": {
498                        "lookup_field": "user_id",
499                        "lookup_value": "user_456",
500                        "output_path": "data.employee_details"
501                    }
502                }
503            }
504        ]
505    }
506    "#;
507
508    let workflow2 = Workflow::from_json(workflow2_json)?;
509    engine.add_workflow(&workflow2);
510
511    match engine.process_message(&mut message2).await {
512        Ok(_) => {
513            println!("✅ Second message processed successfully!\n");
514            println!("📊 Results for user_456:");
515            println!("{}", serde_json::to_string_pretty(&message2.data)?);
516        }
517        Err(e) => {
518            println!("❌ Error processing second message: {e:?}");
519        }
520    }
521
522    println!("\n🎉 Custom function examples completed!");
523
524    Ok(())
525}

Trait Implementations§

Source§

impl Clone for Message

Source§

fn clone(&self) -> Message

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for Message

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<'de> Deserialize<'de> for Message

Source§

fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>
where __D: Deserializer<'de>,

Deserialize this value from the given Serde deserializer. Read more
Source§

impl Serialize for Message

Source§

fn serialize<__S>(&self, __serializer: __S) -> Result<__S::Ok, __S::Error>
where __S: Serializer,

Serialize this value into the given Serde serializer. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> DeserializeOwned for T
where T: for<'de> Deserialize<'de>,

Source§

impl<T> ErasedDestructor for T
where T: 'static,