pub struct Engine { /* private fields */ }
Expand description
Thread-safe engine that processes messages through workflows using non-blocking async IO.
§Architecture
The engine is optimized for both IO-bound and CPU-bound workloads, featuring:
- Vertical Scalability: Automatically utilizes all available CPU cores
- Thread-Safe Design: All components are Send + Sync for concurrent access
- Unified Concurrency: Single parameter controls both DataLogic pool size and max concurrent messages
§Concurrency Model
Each message receives exclusive access to a DataLogic instance for its entire workflow execution, eliminating lock contention between tasks while maintaining thread-safety across messages.
§Performance
The engine achieves linear scalability with CPU cores, capable of processing millions of messages per second with appropriate concurrency settings.
Implementations§
Source§impl Engine
impl Engine
Sourcepub fn new() -> Self
pub fn new() -> Self
Creates a new Engine instance with built-in function handlers pre-registered.
§Example
use dataflow_rs::Engine;
let engine = Engine::new();
Examples found in repository?
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6 // Create the workflow engine (built-in functions are auto-registered)
7 let engine = Engine::new();
8
9 // Define a workflow that:
10 // 1. Fetches data from a public API
11 // 2. Enriches the message with transformed data
12 // 3. Validates the enriched data
13 let workflow_json = r#"
14 {
15 "id": "complete_workflow",
16 "name": "Complete Workflow Example",
17 "priority": 0,
18 "description": "Demonstrates fetch -> enrich -> validate flow",
19 "condition": { "==": [true, true] },
20 "tasks": [
21 {
22 "id": "fetch_user_data",
23 "name": "Fetch User Data",
24 "description": "Get user data from a public API",
25 "function": {
26 "name": "http",
27 "input": {
28 "url": "https://jsonplaceholder.typicode.com/users/1",
29 "method": "GET",
30 "headers": {
31 "Accept": "application/json"
32 }
33 }
34 }
35 },
36 {
37 "id": "initialize_user",
38 "name": "Initialize User Structure",
39 "description": "Create empty user object in data",
40 "function": {
41 "name": "map",
42 "input": {
43 "mappings": [
44 {
45 "path": "data",
46 "logic": { "preserve": {"user": {}} }
47 }
48 ]
49 }
50 }
51 },
52 {
53 "id": "transform_data",
54 "name": "Transform Data",
55 "description": "Map API response to our data model",
56 "function": {
57 "name": "map",
58 "input": {
59 "mappings": [
60 {
61 "path": "data.user.id",
62 "logic": { "var": "temp_data.body.id" }
63 },
64 {
65 "path": "data.user.name",
66 "logic": { "var": "temp_data.body.name" }
67 },
68 {
69 "path": "data.user.email",
70 "logic": { "var": "temp_data.body.email" }
71 },
72 {
73 "path": "data.user.address",
74 "logic": {
75 "cat": [
76 { "var": "temp_data.body.address.street" },
77 ", ",
78 { "var": "temp_data.body.address.city" }
79 ]
80 }
81 },
82 {
83 "path": "data.user.company",
84 "logic": { "var": "temp_data.body.company.name" }
85 }
86 ]
87 }
88 }
89 },
90 {
91 "id": "validate_user_data",
92 "name": "Validate User Data",
93 "description": "Ensure the user data meets our requirements",
94 "function": {
95 "name": "validate",
96 "input": {
97 "rules": [
98 {
99 "path": "data",
100 "logic": { "!!": { "var": "data.user.id" } },
101 "message": "User ID is required"
102 },
103 {
104 "path": "data",
105 "logic": { "!!": { "var": "data.user.name" } },
106 "message": "User name is required"
107 },
108 {
109 "path": "data",
110 "logic": { "!!": { "var": "data.user.email" } },
111 "message": "User email is required"
112 },
113 {
114 "path": "data",
115 "logic": {
116 "in": [
117 "@",
118 { "var": "data.user.email" }
119 ]
120 },
121 "message": "Email must be valid format"
122 }
123 ]
124 }
125 }
126 }
127 ]
128 }
129 "#;
130
131 // Parse and add the workflow to the engine
132 let workflow = Workflow::from_json(workflow_json)?;
133 engine.add_workflow(&workflow);
134
135 // Create a message to process with properly initialized data structure
136 let mut message = Message::new(&json!({}));
137
138 // Process the message through the workflow asynchronously
139 println!("Processing message through workflow...");
140
141 match engine.process_message(&mut message).await {
142 Ok(_) => {
143 println!("Workflow completed successfully!");
144 }
145 Err(e) => {
146 eprintln!("Error executing workflow: {e:?}");
147 if !message.errors.is_empty() {
148 println!("\nErrors recorded in message:");
149 for err in &message.errors {
150 println!(
151 "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152 err.workflow_id, err.task_id, err.error_message
153 );
154 }
155 }
156 }
157 }
158
159 println!(
160 "\nFull message structure:\n{}",
161 serde_json::to_string_pretty(&message)?
162 );
163
164 Ok(())
165}
Sourcepub fn with_concurrency(concurrency: usize) -> Self
pub fn with_concurrency(concurrency: usize) -> Self
Create a new engine with a specific concurrency level. This sets both the DataLogic pool size and the maximum concurrent messages.
§Arguments
concurrency
- Maximum number of messages that can be processed concurrently
Examples found in repository?
14async fn main() -> Result<(), Box<dyn std::error::Error>> {
15 println!("========================================");
16 println!("DATAFLOW ENGINE BENCHMARK");
17 println!("========================================\n");
18
19 // Define a workflow that:
20 // 1. Uses pre-loaded data instead of HTTP fetch
21 // 2. Enriches the message with transformed data
22 // 3. Demonstrates proper async workflow execution
23 let workflow_json = r#"
24 {
25 "id": "benchmark_workflow",
26 "name": "Benchmark Workflow Example",
27 "description": "Demonstrates async workflow execution with data transformation",
28 "priority": 1,
29 "condition": { "==": [true, true] },
30 "tasks": [
31 {
32 "id": "transform_data",
33 "name": "Transform Data",
34 "description": "Map API response to our data model",
35 "function": {
36 "name": "map",
37 "input": {
38 "mappings": [
39 {
40 "path": "data.user.id",
41 "logic": { "var": "temp_data.body.id" }
42 },
43 {
44 "path": "data.user.name",
45 "logic": { "var": "temp_data.body.name" }
46 },
47 {
48 "path": "data.user.email",
49 "logic": { "var": "temp_data.body.email" }
50 },
51 {
52 "path": "data.user.address",
53 "logic": {
54 "cat": [
55 { "var": "temp_data.body.address.street" },
56 ", ",
57 { "var": "temp_data.body.address.city" }
58 ]
59 }
60 },
61 {
62 "path": "data.user.company",
63 "logic": { "var": "temp_data.body.company.name" }
64 },
65 {
66 "path": "data.processed_at",
67 "logic": { "cat": ["Processed at ", { "var": "metadata.timestamp" }] }
68 }
69 ]
70 }
71 }
72 }
73 ]
74 }
75 "#;
76
77 // Parse the workflow
78 let workflow = Workflow::from_json(workflow_json)?;
79
80 // Create sample user data (similar to what the HTTP endpoint would return)
81 let sample_user_data = json!({
82 "body": {
83 "id": 1,
84 "name": "Leanne Graham",
85 "username": "Bret",
86 "email": "Sincere@april.biz",
87 "address": {
88 "street": "Kulas Light",
89 "suite": "Apt. 556",
90 "city": "Gwenborough",
91 "zipcode": "92998-3874",
92 "geo": {
93 "lat": "-37.3159",
94 "lng": "81.1496"
95 }
96 },
97 "phone": "1-770-736-8031 x56442",
98 "website": "hildegard.org",
99 "company": {
100 "name": "Romaguera-Crona",
101 "catchPhrase": "Multi-layered client-server neural-net",
102 "bs": "harness real-time e-markets"
103 }
104 }
105 });
106
107 let iterations = 100000;
108
109 println!("Testing with {} iterations\n", iterations);
110
111 // Test sequential performance with concurrency 1
112 println!("--- Sequential Performance (Baseline) ---");
113 println!("Concurrency | Avg Time per Message | Total Time | Messages/sec");
114 println!("------------|---------------------|------------|-------------");
115
116 // Sequential baseline
117 let engine = Arc::new(Engine::with_concurrency(1));
118 engine.add_workflow(&workflow);
119
120 let seq_results = run_sequential_benchmark(&*engine, &sample_user_data, iterations).await?;
121 let throughput = (iterations as f64) / seq_results.total_time.as_secs_f64();
122
123 println!(
124 "{:^11} | {:>19.3}μs | {:>10.2}ms | {:>12.0}",
125 1,
126 seq_results.avg_time.as_secs_f64() * 1_000_000.0,
127 seq_results.total_time.as_secs_f64() * 1000.0,
128 throughput
129 );
130
131 log_benchmark_results(
132 iterations,
133 seq_results.min_time,
134 seq_results.max_time,
135 seq_results.avg_time,
136 seq_results.p95,
137 seq_results.p99,
138 seq_results.total_time,
139 format!("seq_x1"),
140 )?;
141
142 // Test concurrent performance
143 println!("\n--- Concurrent Performance ---");
144 println!("Concurrency | Avg Time per Message | Total Time | Messages/sec | Speedup");
145 println!("------------|---------------------|------------|--------------|--------");
146
147 // Test configurations with different concurrency levels
148 let test_configs = vec![
149 1, 16, // 16 concurrent messages with 16 DataLogic instances
150 32, // 32 concurrent messages with 32 DataLogic instances
151 64, // 64 concurrent messages with 64 DataLogic instances
152 128, // 128 concurrent messages with 128 DataLogic instances
153 ];
154
155 let baseline_throughput = throughput;
156
157 for concurrency in test_configs {
158 let engine = Arc::new(Engine::with_concurrency(concurrency));
159 engine.add_workflow(&workflow);
160
161 let con_results = run_concurrent_benchmark(
162 engine.clone(),
163 &sample_user_data,
164 iterations,
165 concurrency, // Use same value for concurrent tasks
166 )
167 .await?;
168
169 let throughput = (iterations as f64) / con_results.total_time.as_secs_f64();
170 let speedup = throughput / baseline_throughput;
171
172 println!(
173 "{:^11} | {:>19.3}μs | {:>10.2}ms | {:>12.0} | {:>7.2}x",
174 concurrency,
175 con_results.avg_time.as_secs_f64() * 1_000_000.0,
176 con_results.total_time.as_secs_f64() * 1000.0,
177 throughput,
178 speedup
179 );
180
181 log_benchmark_results(
182 iterations,
183 con_results.min_time,
184 con_results.max_time,
185 con_results.avg_time,
186 con_results.p95,
187 con_results.p99,
188 con_results.total_time,
189 format!("con_x{}", concurrency),
190 )?;
191 }
192
193 println!("\n========================================");
194 println!("Benchmark results saved to '{}'", BENCHMARK_LOG_FILE);
195 println!("========================================");
196
197 Ok(())
198}
Sourcepub fn with_pool_size(pool_size: usize) -> Self
👎Deprecated since 1.0.0: Use with_concurrency instead
pub fn with_pool_size(pool_size: usize) -> Self
Create a new engine with a specific pool size (deprecated, use with_concurrency)
Sourcepub fn new_empty() -> Self
pub fn new_empty() -> Self
Create a new engine instance without any pre-registered functions
Examples found in repository?
318async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
319 println!("=== Custom Function Example ===\n");
320
321 // Create engine without built-in functions to demonstrate custom ones
322 let mut engine = Engine::new_empty();
323
324 // Register our custom functions
325 engine.register_task_function(
326 "statistics".to_string(),
327 Box::new(StatisticsFunction::new()),
328 );
329
330 engine.register_task_function(
331 "enrich_data".to_string(),
332 Box::new(DataEnrichmentFunction::new()),
333 );
334
335 // Also register built-in map function for data preparation
336 engine.register_task_function(
337 "map".to_string(),
338 Box::new(dataflow_rs::engine::functions::MapFunction::new()),
339 );
340
341 // Define a workflow that uses our custom functions
342 let workflow_json = r#"
343 {
344 "id": "custom_function_demo",
345 "name": "Custom Function Demo",
346 "description": "Demonstrates custom async functions in workflow",
347 "condition": { "==": [true, true] },
348 "tasks": [
349 {
350 "id": "prepare_data",
351 "name": "Prepare Data",
352 "description": "Extract and prepare data for analysis",
353 "function": {
354 "name": "map",
355 "input": {
356 "mappings": [
357 {
358 "path": "data.numbers",
359 "logic": { "var": "temp_data.measurements" }
360 },
361 {
362 "path": "data.user_id",
363 "logic": { "var": "temp_data.user_id" }
364 }
365 ]
366 }
367 }
368 },
369 {
370 "id": "calculate_stats",
371 "name": "Calculate Statistics",
372 "description": "Calculate statistical measures from numeric data",
373 "function": {
374 "name": "statistics",
375 "input": {
376 "data_path": "data.numbers",
377 "output_path": "data.stats"
378 }
379 }
380 },
381 {
382 "id": "enrich_user_data",
383 "name": "Enrich User Data",
384 "description": "Add additional user information",
385 "function": {
386 "name": "enrich_data",
387 "input": {
388 "lookup_field": "user_id",
389 "lookup_value": "user_123",
390 "output_path": "data.user_info"
391 }
392 }
393 }
394 ]
395 }
396 "#;
397
398 // Parse and add the workflow
399 let workflow = Workflow::from_json(workflow_json)?;
400 engine.add_workflow(&workflow);
401
402 // Create sample data
403 let sample_data = json!({
404 "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
405 "user_id": "user_123",
406 "timestamp": "2024-01-15T10:30:00Z"
407 });
408
409 // Create and process message
410 let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
411 message.temp_data = sample_data;
412 message.data = json!({});
413
414 println!("Processing message with custom functions...\n");
415
416 // Process the message through our custom workflow
417 match engine.process_message(&mut message).await {
418 Ok(_) => {
419 println!("✅ Message processed successfully!\n");
420
421 println!("📊 Final Results:");
422 println!("{}\n", serde_json::to_string_pretty(&message.data)?);
423
424 println!("📋 Audit Trail:");
425 for (i, audit) in message.audit_trail.iter().enumerate() {
426 println!(
427 "{}. Task: {} (Status: {})",
428 i + 1,
429 audit.task_id,
430 audit.status_code
431 );
432 println!(" Timestamp: {}", audit.timestamp);
433 println!(" Changes: {} field(s) modified", audit.changes.len());
434 }
435
436 if message.has_errors() {
437 println!("\n⚠️ Errors encountered:");
438 for error in &message.errors {
439 println!(
440 " - {}: {:?}",
441 error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
442 error.error_message
443 );
444 }
445 }
446 }
447 Err(e) => {
448 println!("❌ Error processing message: {e:?}");
449 }
450 }
451
452 // Demonstrate another example with different data
453 let separator = "=".repeat(50);
454 println!("\n{separator}");
455 println!("=== Second Example with Different User ===\n");
456
457 let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
458 message2.temp_data = json!({
459 "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
460 "user_id": "user_456",
461 "timestamp": "2024-01-15T11:00:00Z"
462 });
463 message2.data = json!({});
464
465 // Create a workflow for the second user
466 let workflow2_json = r#"
467 {
468 "id": "custom_function_demo_2",
469 "name": "Custom Function Demo 2",
470 "description": "Second demo with different user",
471 "condition": { "==": [true, true] },
472 "tasks": [
473 {
474 "id": "prepare_data",
475 "name": "Prepare Data",
476 "function": {
477 "name": "map",
478 "input": {
479 "mappings": [
480 {
481 "path": "data.numbers",
482 "logic": { "var": "temp_data.measurements" }
483 },
484 {
485 "path": "data.user_id",
486 "logic": { "var": "temp_data.user_id" }
487 }
488 ]
489 }
490 }
491 },
492 {
493 "id": "calculate_stats",
494 "name": "Calculate Statistics",
495 "function": {
496 "name": "statistics",
497 "input": {
498 "data_path": "data.numbers",
499 "output_path": "data.analysis"
500 }
501 }
502 },
503 {
504 "id": "enrich_user_data",
505 "name": "Enrich User Data",
506 "function": {
507 "name": "enrich_data",
508 "input": {
509 "lookup_field": "user_id",
510 "lookup_value": "user_456",
511 "output_path": "data.employee_details"
512 }
513 }
514 }
515 ]
516 }
517 "#;
518
519 let workflow2 = Workflow::from_json(workflow2_json)?;
520 engine.add_workflow(&workflow2);
521
522 match engine.process_message(&mut message2).await {
523 Ok(_) => {
524 println!("✅ Second message processed successfully!\n");
525 println!("📊 Results for user_456:");
526 println!("{}", serde_json::to_string_pretty(&message2.data)?);
527 }
528 Err(e) => {
529 println!("❌ Error processing second message: {e:?}");
530 }
531 }
532
533 println!("\n🎉 Custom function examples completed!");
534
535 Ok(())
536}
Sourcepub fn concurrency(&self) -> usize
pub fn concurrency(&self) -> usize
Get the configured concurrency level
Sourcepub fn with_retry_config(self, config: RetryConfig) -> Self
pub fn with_retry_config(self, config: RetryConfig) -> Self
Configure retry behavior
Sourcepub fn add_workflow(&self, workflow: &Workflow)
pub fn add_workflow(&self, workflow: &Workflow)
Examples found in repository?
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6 // Create the workflow engine (built-in functions are auto-registered)
7 let engine = Engine::new();
8
9 // Define a workflow that:
10 // 1. Fetches data from a public API
11 // 2. Enriches the message with transformed data
12 // 3. Validates the enriched data
13 let workflow_json = r#"
14 {
15 "id": "complete_workflow",
16 "name": "Complete Workflow Example",
17 "priority": 0,
18 "description": "Demonstrates fetch -> enrich -> validate flow",
19 "condition": { "==": [true, true] },
20 "tasks": [
21 {
22 "id": "fetch_user_data",
23 "name": "Fetch User Data",
24 "description": "Get user data from a public API",
25 "function": {
26 "name": "http",
27 "input": {
28 "url": "https://jsonplaceholder.typicode.com/users/1",
29 "method": "GET",
30 "headers": {
31 "Accept": "application/json"
32 }
33 }
34 }
35 },
36 {
37 "id": "initialize_user",
38 "name": "Initialize User Structure",
39 "description": "Create empty user object in data",
40 "function": {
41 "name": "map",
42 "input": {
43 "mappings": [
44 {
45 "path": "data",
46 "logic": { "preserve": {"user": {}} }
47 }
48 ]
49 }
50 }
51 },
52 {
53 "id": "transform_data",
54 "name": "Transform Data",
55 "description": "Map API response to our data model",
56 "function": {
57 "name": "map",
58 "input": {
59 "mappings": [
60 {
61 "path": "data.user.id",
62 "logic": { "var": "temp_data.body.id" }
63 },
64 {
65 "path": "data.user.name",
66 "logic": { "var": "temp_data.body.name" }
67 },
68 {
69 "path": "data.user.email",
70 "logic": { "var": "temp_data.body.email" }
71 },
72 {
73 "path": "data.user.address",
74 "logic": {
75 "cat": [
76 { "var": "temp_data.body.address.street" },
77 ", ",
78 { "var": "temp_data.body.address.city" }
79 ]
80 }
81 },
82 {
83 "path": "data.user.company",
84 "logic": { "var": "temp_data.body.company.name" }
85 }
86 ]
87 }
88 }
89 },
90 {
91 "id": "validate_user_data",
92 "name": "Validate User Data",
93 "description": "Ensure the user data meets our requirements",
94 "function": {
95 "name": "validate",
96 "input": {
97 "rules": [
98 {
99 "path": "data",
100 "logic": { "!!": { "var": "data.user.id" } },
101 "message": "User ID is required"
102 },
103 {
104 "path": "data",
105 "logic": { "!!": { "var": "data.user.name" } },
106 "message": "User name is required"
107 },
108 {
109 "path": "data",
110 "logic": { "!!": { "var": "data.user.email" } },
111 "message": "User email is required"
112 },
113 {
114 "path": "data",
115 "logic": {
116 "in": [
117 "@",
118 { "var": "data.user.email" }
119 ]
120 },
121 "message": "Email must be valid format"
122 }
123 ]
124 }
125 }
126 }
127 ]
128 }
129 "#;
130
131 // Parse and add the workflow to the engine
132 let workflow = Workflow::from_json(workflow_json)?;
133 engine.add_workflow(&workflow);
134
135 // Create a message to process with properly initialized data structure
136 let mut message = Message::new(&json!({}));
137
138 // Process the message through the workflow asynchronously
139 println!("Processing message through workflow...");
140
141 match engine.process_message(&mut message).await {
142 Ok(_) => {
143 println!("Workflow completed successfully!");
144 }
145 Err(e) => {
146 eprintln!("Error executing workflow: {e:?}");
147 if !message.errors.is_empty() {
148 println!("\nErrors recorded in message:");
149 for err in &message.errors {
150 println!(
151 "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152 err.workflow_id, err.task_id, err.error_message
153 );
154 }
155 }
156 }
157 }
158
159 println!(
160 "\nFull message structure:\n{}",
161 serde_json::to_string_pretty(&message)?
162 );
163
164 Ok(())
165}
More examples
14async fn main() -> Result<(), Box<dyn std::error::Error>> {
15 println!("========================================");
16 println!("DATAFLOW ENGINE BENCHMARK");
17 println!("========================================\n");
18
19 // Define a workflow that:
20 // 1. Uses pre-loaded data instead of HTTP fetch
21 // 2. Enriches the message with transformed data
22 // 3. Demonstrates proper async workflow execution
23 let workflow_json = r#"
24 {
25 "id": "benchmark_workflow",
26 "name": "Benchmark Workflow Example",
27 "description": "Demonstrates async workflow execution with data transformation",
28 "priority": 1,
29 "condition": { "==": [true, true] },
30 "tasks": [
31 {
32 "id": "transform_data",
33 "name": "Transform Data",
34 "description": "Map API response to our data model",
35 "function": {
36 "name": "map",
37 "input": {
38 "mappings": [
39 {
40 "path": "data.user.id",
41 "logic": { "var": "temp_data.body.id" }
42 },
43 {
44 "path": "data.user.name",
45 "logic": { "var": "temp_data.body.name" }
46 },
47 {
48 "path": "data.user.email",
49 "logic": { "var": "temp_data.body.email" }
50 },
51 {
52 "path": "data.user.address",
53 "logic": {
54 "cat": [
55 { "var": "temp_data.body.address.street" },
56 ", ",
57 { "var": "temp_data.body.address.city" }
58 ]
59 }
60 },
61 {
62 "path": "data.user.company",
63 "logic": { "var": "temp_data.body.company.name" }
64 },
65 {
66 "path": "data.processed_at",
67 "logic": { "cat": ["Processed at ", { "var": "metadata.timestamp" }] }
68 }
69 ]
70 }
71 }
72 }
73 ]
74 }
75 "#;
76
77 // Parse the workflow
78 let workflow = Workflow::from_json(workflow_json)?;
79
80 // Create sample user data (similar to what the HTTP endpoint would return)
81 let sample_user_data = json!({
82 "body": {
83 "id": 1,
84 "name": "Leanne Graham",
85 "username": "Bret",
86 "email": "Sincere@april.biz",
87 "address": {
88 "street": "Kulas Light",
89 "suite": "Apt. 556",
90 "city": "Gwenborough",
91 "zipcode": "92998-3874",
92 "geo": {
93 "lat": "-37.3159",
94 "lng": "81.1496"
95 }
96 },
97 "phone": "1-770-736-8031 x56442",
98 "website": "hildegard.org",
99 "company": {
100 "name": "Romaguera-Crona",
101 "catchPhrase": "Multi-layered client-server neural-net",
102 "bs": "harness real-time e-markets"
103 }
104 }
105 });
106
107 let iterations = 100000;
108
109 println!("Testing with {} iterations\n", iterations);
110
111 // Test sequential performance with concurrency 1
112 println!("--- Sequential Performance (Baseline) ---");
113 println!("Concurrency | Avg Time per Message | Total Time | Messages/sec");
114 println!("------------|---------------------|------------|-------------");
115
116 // Sequential baseline
117 let engine = Arc::new(Engine::with_concurrency(1));
118 engine.add_workflow(&workflow);
119
120 let seq_results = run_sequential_benchmark(&*engine, &sample_user_data, iterations).await?;
121 let throughput = (iterations as f64) / seq_results.total_time.as_secs_f64();
122
123 println!(
124 "{:^11} | {:>19.3}μs | {:>10.2}ms | {:>12.0}",
125 1,
126 seq_results.avg_time.as_secs_f64() * 1_000_000.0,
127 seq_results.total_time.as_secs_f64() * 1000.0,
128 throughput
129 );
130
131 log_benchmark_results(
132 iterations,
133 seq_results.min_time,
134 seq_results.max_time,
135 seq_results.avg_time,
136 seq_results.p95,
137 seq_results.p99,
138 seq_results.total_time,
139 format!("seq_x1"),
140 )?;
141
142 // Test concurrent performance
143 println!("\n--- Concurrent Performance ---");
144 println!("Concurrency | Avg Time per Message | Total Time | Messages/sec | Speedup");
145 println!("------------|---------------------|------------|--------------|--------");
146
147 // Test configurations with different concurrency levels
148 let test_configs = vec![
149 1, 16, // 16 concurrent messages with 16 DataLogic instances
150 32, // 32 concurrent messages with 32 DataLogic instances
151 64, // 64 concurrent messages with 64 DataLogic instances
152 128, // 128 concurrent messages with 128 DataLogic instances
153 ];
154
155 let baseline_throughput = throughput;
156
157 for concurrency in test_configs {
158 let engine = Arc::new(Engine::with_concurrency(concurrency));
159 engine.add_workflow(&workflow);
160
161 let con_results = run_concurrent_benchmark(
162 engine.clone(),
163 &sample_user_data,
164 iterations,
165 concurrency, // Use same value for concurrent tasks
166 )
167 .await?;
168
169 let throughput = (iterations as f64) / con_results.total_time.as_secs_f64();
170 let speedup = throughput / baseline_throughput;
171
172 println!(
173 "{:^11} | {:>19.3}μs | {:>10.2}ms | {:>12.0} | {:>7.2}x",
174 concurrency,
175 con_results.avg_time.as_secs_f64() * 1_000_000.0,
176 con_results.total_time.as_secs_f64() * 1000.0,
177 throughput,
178 speedup
179 );
180
181 log_benchmark_results(
182 iterations,
183 con_results.min_time,
184 con_results.max_time,
185 con_results.avg_time,
186 con_results.p95,
187 con_results.p99,
188 con_results.total_time,
189 format!("con_x{}", concurrency),
190 )?;
191 }
192
193 println!("\n========================================");
194 println!("Benchmark results saved to '{}'", BENCHMARK_LOG_FILE);
195 println!("========================================");
196
197 Ok(())
198}
318async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
319 println!("=== Custom Function Example ===\n");
320
321 // Create engine without built-in functions to demonstrate custom ones
322 let mut engine = Engine::new_empty();
323
324 // Register our custom functions
325 engine.register_task_function(
326 "statistics".to_string(),
327 Box::new(StatisticsFunction::new()),
328 );
329
330 engine.register_task_function(
331 "enrich_data".to_string(),
332 Box::new(DataEnrichmentFunction::new()),
333 );
334
335 // Also register built-in map function for data preparation
336 engine.register_task_function(
337 "map".to_string(),
338 Box::new(dataflow_rs::engine::functions::MapFunction::new()),
339 );
340
341 // Define a workflow that uses our custom functions
342 let workflow_json = r#"
343 {
344 "id": "custom_function_demo",
345 "name": "Custom Function Demo",
346 "description": "Demonstrates custom async functions in workflow",
347 "condition": { "==": [true, true] },
348 "tasks": [
349 {
350 "id": "prepare_data",
351 "name": "Prepare Data",
352 "description": "Extract and prepare data for analysis",
353 "function": {
354 "name": "map",
355 "input": {
356 "mappings": [
357 {
358 "path": "data.numbers",
359 "logic": { "var": "temp_data.measurements" }
360 },
361 {
362 "path": "data.user_id",
363 "logic": { "var": "temp_data.user_id" }
364 }
365 ]
366 }
367 }
368 },
369 {
370 "id": "calculate_stats",
371 "name": "Calculate Statistics",
372 "description": "Calculate statistical measures from numeric data",
373 "function": {
374 "name": "statistics",
375 "input": {
376 "data_path": "data.numbers",
377 "output_path": "data.stats"
378 }
379 }
380 },
381 {
382 "id": "enrich_user_data",
383 "name": "Enrich User Data",
384 "description": "Add additional user information",
385 "function": {
386 "name": "enrich_data",
387 "input": {
388 "lookup_field": "user_id",
389 "lookup_value": "user_123",
390 "output_path": "data.user_info"
391 }
392 }
393 }
394 ]
395 }
396 "#;
397
398 // Parse and add the workflow
399 let workflow = Workflow::from_json(workflow_json)?;
400 engine.add_workflow(&workflow);
401
402 // Create sample data
403 let sample_data = json!({
404 "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
405 "user_id": "user_123",
406 "timestamp": "2024-01-15T10:30:00Z"
407 });
408
409 // Create and process message
410 let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
411 message.temp_data = sample_data;
412 message.data = json!({});
413
414 println!("Processing message with custom functions...\n");
415
416 // Process the message through our custom workflow
417 match engine.process_message(&mut message).await {
418 Ok(_) => {
419 println!("✅ Message processed successfully!\n");
420
421 println!("📊 Final Results:");
422 println!("{}\n", serde_json::to_string_pretty(&message.data)?);
423
424 println!("📋 Audit Trail:");
425 for (i, audit) in message.audit_trail.iter().enumerate() {
426 println!(
427 "{}. Task: {} (Status: {})",
428 i + 1,
429 audit.task_id,
430 audit.status_code
431 );
432 println!(" Timestamp: {}", audit.timestamp);
433 println!(" Changes: {} field(s) modified", audit.changes.len());
434 }
435
436 if message.has_errors() {
437 println!("\n⚠️ Errors encountered:");
438 for error in &message.errors {
439 println!(
440 " - {}: {:?}",
441 error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
442 error.error_message
443 );
444 }
445 }
446 }
447 Err(e) => {
448 println!("❌ Error processing message: {e:?}");
449 }
450 }
451
452 // Demonstrate another example with different data
453 let separator = "=".repeat(50);
454 println!("\n{separator}");
455 println!("=== Second Example with Different User ===\n");
456
457 let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
458 message2.temp_data = json!({
459 "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
460 "user_id": "user_456",
461 "timestamp": "2024-01-15T11:00:00Z"
462 });
463 message2.data = json!({});
464
465 // Create a workflow for the second user
466 let workflow2_json = r#"
467 {
468 "id": "custom_function_demo_2",
469 "name": "Custom Function Demo 2",
470 "description": "Second demo with different user",
471 "condition": { "==": [true, true] },
472 "tasks": [
473 {
474 "id": "prepare_data",
475 "name": "Prepare Data",
476 "function": {
477 "name": "map",
478 "input": {
479 "mappings": [
480 {
481 "path": "data.numbers",
482 "logic": { "var": "temp_data.measurements" }
483 },
484 {
485 "path": "data.user_id",
486 "logic": { "var": "temp_data.user_id" }
487 }
488 ]
489 }
490 }
491 },
492 {
493 "id": "calculate_stats",
494 "name": "Calculate Statistics",
495 "function": {
496 "name": "statistics",
497 "input": {
498 "data_path": "data.numbers",
499 "output_path": "data.analysis"
500 }
501 }
502 },
503 {
504 "id": "enrich_user_data",
505 "name": "Enrich User Data",
506 "function": {
507 "name": "enrich_data",
508 "input": {
509 "lookup_field": "user_id",
510 "lookup_value": "user_456",
511 "output_path": "data.employee_details"
512 }
513 }
514 }
515 ]
516 }
517 "#;
518
519 let workflow2 = Workflow::from_json(workflow2_json)?;
520 engine.add_workflow(&workflow2);
521
522 match engine.process_message(&mut message2).await {
523 Ok(_) => {
524 println!("✅ Second message processed successfully!\n");
525 println!("📊 Results for user_456:");
526 println!("{}", serde_json::to_string_pretty(&message2.data)?);
527 }
528 Err(e) => {
529 println!("❌ Error processing second message: {e:?}");
530 }
531 }
532
533 println!("\n🎉 Custom function examples completed!");
534
535 Ok(())
536}
Sourcepub fn reload_workflows(&self, new_workflows: Vec<Workflow>) -> Result<()>
pub fn reload_workflows(&self, new_workflows: Vec<Workflow>) -> Result<()>
Reload all workflows atomically
Sourcepub fn register_task_function(
&mut self,
name: String,
handler: Box<dyn AsyncFunctionHandler + Send + Sync>,
)
pub fn register_task_function( &mut self, name: String, handler: Box<dyn AsyncFunctionHandler + Send + Sync>, )
Registers a custom function handler with the engine.
§Arguments
name
- The name of the function handlerhandler
- The function handler implementation
Examples found in repository?
318async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
319 println!("=== Custom Function Example ===\n");
320
321 // Create engine without built-in functions to demonstrate custom ones
322 let mut engine = Engine::new_empty();
323
324 // Register our custom functions
325 engine.register_task_function(
326 "statistics".to_string(),
327 Box::new(StatisticsFunction::new()),
328 );
329
330 engine.register_task_function(
331 "enrich_data".to_string(),
332 Box::new(DataEnrichmentFunction::new()),
333 );
334
335 // Also register built-in map function for data preparation
336 engine.register_task_function(
337 "map".to_string(),
338 Box::new(dataflow_rs::engine::functions::MapFunction::new()),
339 );
340
341 // Define a workflow that uses our custom functions
342 let workflow_json = r#"
343 {
344 "id": "custom_function_demo",
345 "name": "Custom Function Demo",
346 "description": "Demonstrates custom async functions in workflow",
347 "condition": { "==": [true, true] },
348 "tasks": [
349 {
350 "id": "prepare_data",
351 "name": "Prepare Data",
352 "description": "Extract and prepare data for analysis",
353 "function": {
354 "name": "map",
355 "input": {
356 "mappings": [
357 {
358 "path": "data.numbers",
359 "logic": { "var": "temp_data.measurements" }
360 },
361 {
362 "path": "data.user_id",
363 "logic": { "var": "temp_data.user_id" }
364 }
365 ]
366 }
367 }
368 },
369 {
370 "id": "calculate_stats",
371 "name": "Calculate Statistics",
372 "description": "Calculate statistical measures from numeric data",
373 "function": {
374 "name": "statistics",
375 "input": {
376 "data_path": "data.numbers",
377 "output_path": "data.stats"
378 }
379 }
380 },
381 {
382 "id": "enrich_user_data",
383 "name": "Enrich User Data",
384 "description": "Add additional user information",
385 "function": {
386 "name": "enrich_data",
387 "input": {
388 "lookup_field": "user_id",
389 "lookup_value": "user_123",
390 "output_path": "data.user_info"
391 }
392 }
393 }
394 ]
395 }
396 "#;
397
398 // Parse and add the workflow
399 let workflow = Workflow::from_json(workflow_json)?;
400 engine.add_workflow(&workflow);
401
402 // Create sample data
403 let sample_data = json!({
404 "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
405 "user_id": "user_123",
406 "timestamp": "2024-01-15T10:30:00Z"
407 });
408
409 // Create and process message
410 let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
411 message.temp_data = sample_data;
412 message.data = json!({});
413
414 println!("Processing message with custom functions...\n");
415
416 // Process the message through our custom workflow
417 match engine.process_message(&mut message).await {
418 Ok(_) => {
419 println!("✅ Message processed successfully!\n");
420
421 println!("📊 Final Results:");
422 println!("{}\n", serde_json::to_string_pretty(&message.data)?);
423
424 println!("📋 Audit Trail:");
425 for (i, audit) in message.audit_trail.iter().enumerate() {
426 println!(
427 "{}. Task: {} (Status: {})",
428 i + 1,
429 audit.task_id,
430 audit.status_code
431 );
432 println!(" Timestamp: {}", audit.timestamp);
433 println!(" Changes: {} field(s) modified", audit.changes.len());
434 }
435
436 if message.has_errors() {
437 println!("\n⚠️ Errors encountered:");
438 for error in &message.errors {
439 println!(
440 " - {}: {:?}",
441 error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
442 error.error_message
443 );
444 }
445 }
446 }
447 Err(e) => {
448 println!("❌ Error processing message: {e:?}");
449 }
450 }
451
452 // Demonstrate another example with different data
453 let separator = "=".repeat(50);
454 println!("\n{separator}");
455 println!("=== Second Example with Different User ===\n");
456
457 let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
458 message2.temp_data = json!({
459 "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
460 "user_id": "user_456",
461 "timestamp": "2024-01-15T11:00:00Z"
462 });
463 message2.data = json!({});
464
465 // Create a workflow for the second user
466 let workflow2_json = r#"
467 {
468 "id": "custom_function_demo_2",
469 "name": "Custom Function Demo 2",
470 "description": "Second demo with different user",
471 "condition": { "==": [true, true] },
472 "tasks": [
473 {
474 "id": "prepare_data",
475 "name": "Prepare Data",
476 "function": {
477 "name": "map",
478 "input": {
479 "mappings": [
480 {
481 "path": "data.numbers",
482 "logic": { "var": "temp_data.measurements" }
483 },
484 {
485 "path": "data.user_id",
486 "logic": { "var": "temp_data.user_id" }
487 }
488 ]
489 }
490 }
491 },
492 {
493 "id": "calculate_stats",
494 "name": "Calculate Statistics",
495 "function": {
496 "name": "statistics",
497 "input": {
498 "data_path": "data.numbers",
499 "output_path": "data.analysis"
500 }
501 }
502 },
503 {
504 "id": "enrich_user_data",
505 "name": "Enrich User Data",
506 "function": {
507 "name": "enrich_data",
508 "input": {
509 "lookup_field": "user_id",
510 "lookup_value": "user_456",
511 "output_path": "data.employee_details"
512 }
513 }
514 }
515 ]
516 }
517 "#;
518
519 let workflow2 = Workflow::from_json(workflow2_json)?;
520 engine.add_workflow(&workflow2);
521
522 match engine.process_message(&mut message2).await {
523 Ok(_) => {
524 println!("✅ Second message processed successfully!\n");
525 println!("📊 Results for user_456:");
526 println!("{}", serde_json::to_string_pretty(&message2.data)?);
527 }
528 Err(e) => {
529 println!("❌ Error processing second message: {e:?}");
530 }
531 }
532
533 println!("\n🎉 Custom function examples completed!");
534
535 Ok(())
536}
Sourcepub fn has_function(&self, name: &str) -> bool
pub fn has_function(&self, name: &str) -> bool
Check if a function with the given name is registered
Sourcepub async fn process_message(&self, message: &mut Message) -> Result<()>
pub async fn process_message(&self, message: &mut Message) -> Result<()>
Processes a message through workflows that match their conditions.
This async method:
- Iterates through workflows sequentially in deterministic order (sorted by ID)
- Evaluates conditions for each workflow right before execution
- Executes matching workflows one after another (not concurrently)
- Updates the message with processing results and audit trail
Workflows are executed sequentially because later workflows may depend on the results of earlier workflows, and their conditions may change based on modifications made by previous workflows.
§Arguments
message
- The message to process
§Returns
Result<()>
- Success or an error if processing failed
Examples found in repository?
210async fn run_sequential_benchmark(
211 engine: &Engine,
212 sample_user_data: &Value,
213 num_iterations: usize,
214) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
215 let mut total_duration = Duration::new(0, 0);
216 let mut min_duration = Duration::new(u64::MAX, 0);
217 let mut max_duration = Duration::new(0, 0);
218 let mut all_durations = Vec::with_capacity(num_iterations);
219 let mut error_count = 0;
220
221 // Sequential processing - one message at a time
222 for i in 0..num_iterations {
223 let mut message = Message::new(&json!({}));
224 message.temp_data = sample_user_data.clone();
225 message.data = json!({});
226 message.metadata = json!({
227 "timestamp": chrono::Utc::now().to_rfc3339(),
228 "iteration": i
229 });
230
231 let start = Instant::now();
232 match engine.process_message(&mut message).await {
233 Ok(_) => {
234 let duration = start.elapsed();
235 all_durations.push(duration);
236 total_duration += duration;
237 min_duration = min_duration.min(duration);
238 max_duration = max_duration.max(duration);
239
240 // Check for processing errors
241 if message.has_errors() {
242 error_count += 1;
243 if error_count <= 5 {
244 // Only print first 5 errors
245 println!("Processing errors in iteration {}: {:?}", i, message.errors);
246 }
247 }
248 }
249 Err(e) => {
250 error_count += 1;
251 if error_count <= 5 {
252 println!("Error in iteration {i}: {e:?}");
253 }
254 // Still record the time even for errors
255 let duration = start.elapsed();
256 all_durations.push(duration);
257 total_duration += duration;
258 min_duration = min_duration.min(duration);
259 max_duration = max_duration.max(duration);
260 }
261 }
262 }
263
264 if error_count > 0 {
265 println!("Total errors encountered: {error_count}");
266 }
267
268 // Sort durations for percentile calculations
269 all_durations.sort();
270
271 let p95_idx = (num_iterations as f64 * 0.95) as usize;
272 let p99_idx = (num_iterations as f64 * 0.99) as usize;
273 let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
274 let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
275 let avg_duration = total_duration / num_iterations as u32;
276
277 Ok(BenchmarkResults {
278 min_time: min_duration,
279 max_time: max_duration,
280 avg_time: avg_duration,
281 p95: *p95,
282 p99: *p99,
283 total_time: total_duration,
284 })
285}
286
287async fn run_concurrent_benchmark(
288 engine: Arc<Engine>,
289 sample_user_data: &Value,
290 num_iterations: usize,
291 concurrent_tasks: usize,
292) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
293 let start_time = Instant::now();
294 let mut all_durations = Vec::with_capacity(num_iterations);
295 let mut error_count = 0;
296
297 // Concurrent processing using JoinSet
298 let mut tasks = JoinSet::new();
299
300 for i in 0..num_iterations {
301 let engine_clone = engine.clone();
302 let data_clone = sample_user_data.clone();
303
304 // Spawn concurrent tasks
305 tasks.spawn(async move {
306 let mut message = Message::new(&json!({}));
307 message.temp_data = data_clone;
308 message.data = json!({});
309 message.metadata = json!({
310 "timestamp": chrono::Utc::now().to_rfc3339(),
311 "iteration": i
312 });
313
314 let msg_start = Instant::now();
315 let result = engine_clone.process_message(&mut message).await;
316 let duration = msg_start.elapsed();
317
318 (duration, result.is_ok(), message.has_errors())
319 });
320
321 // Limit concurrent tasks
322 while tasks.len() >= concurrent_tasks {
323 // Wait for at least one task to complete
324 if let Some(Ok((duration, ok, has_errors))) = tasks.join_next().await {
325 all_durations.push(duration);
326 if !ok || has_errors {
327 error_count += 1;
328 }
329 }
330 }
331 }
332
333 // Wait for remaining tasks
334 while let Some(Ok((duration, ok, has_errors))) = tasks.join_next().await {
335 all_durations.push(duration);
336 if !ok || has_errors {
337 error_count += 1;
338 }
339 }
340
341 let total_time = start_time.elapsed();
342
343 if error_count > 0 {
344 println!("Total errors encountered: {error_count}");
345 }
346
347 // Calculate statistics
348 all_durations.sort();
349 let min_duration = *all_durations.first().unwrap_or(&Duration::ZERO);
350 let max_duration = *all_durations.last().unwrap_or(&Duration::ZERO);
351 let sum: Duration = all_durations.iter().sum();
352 let avg_duration = sum / all_durations.len() as u32;
353
354 let p95_idx = (all_durations.len() as f64 * 0.95) as usize;
355 let p99_idx = (all_durations.len() as f64 * 0.99) as usize;
356 let p95 = *all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
357 let p99 = *all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
358
359 Ok(BenchmarkResults {
360 min_time: min_duration,
361 max_time: max_duration,
362 avg_time: avg_duration,
363 p95,
364 p99,
365 total_time,
366 })
367}
More examples
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6 // Create the workflow engine (built-in functions are auto-registered)
7 let engine = Engine::new();
8
9 // Define a workflow that:
10 // 1. Fetches data from a public API
11 // 2. Enriches the message with transformed data
12 // 3. Validates the enriched data
13 let workflow_json = r#"
14 {
15 "id": "complete_workflow",
16 "name": "Complete Workflow Example",
17 "priority": 0,
18 "description": "Demonstrates fetch -> enrich -> validate flow",
19 "condition": { "==": [true, true] },
20 "tasks": [
21 {
22 "id": "fetch_user_data",
23 "name": "Fetch User Data",
24 "description": "Get user data from a public API",
25 "function": {
26 "name": "http",
27 "input": {
28 "url": "https://jsonplaceholder.typicode.com/users/1",
29 "method": "GET",
30 "headers": {
31 "Accept": "application/json"
32 }
33 }
34 }
35 },
36 {
37 "id": "initialize_user",
38 "name": "Initialize User Structure",
39 "description": "Create empty user object in data",
40 "function": {
41 "name": "map",
42 "input": {
43 "mappings": [
44 {
45 "path": "data",
46 "logic": { "preserve": {"user": {}} }
47 }
48 ]
49 }
50 }
51 },
52 {
53 "id": "transform_data",
54 "name": "Transform Data",
55 "description": "Map API response to our data model",
56 "function": {
57 "name": "map",
58 "input": {
59 "mappings": [
60 {
61 "path": "data.user.id",
62 "logic": { "var": "temp_data.body.id" }
63 },
64 {
65 "path": "data.user.name",
66 "logic": { "var": "temp_data.body.name" }
67 },
68 {
69 "path": "data.user.email",
70 "logic": { "var": "temp_data.body.email" }
71 },
72 {
73 "path": "data.user.address",
74 "logic": {
75 "cat": [
76 { "var": "temp_data.body.address.street" },
77 ", ",
78 { "var": "temp_data.body.address.city" }
79 ]
80 }
81 },
82 {
83 "path": "data.user.company",
84 "logic": { "var": "temp_data.body.company.name" }
85 }
86 ]
87 }
88 }
89 },
90 {
91 "id": "validate_user_data",
92 "name": "Validate User Data",
93 "description": "Ensure the user data meets our requirements",
94 "function": {
95 "name": "validate",
96 "input": {
97 "rules": [
98 {
99 "path": "data",
100 "logic": { "!!": { "var": "data.user.id" } },
101 "message": "User ID is required"
102 },
103 {
104 "path": "data",
105 "logic": { "!!": { "var": "data.user.name" } },
106 "message": "User name is required"
107 },
108 {
109 "path": "data",
110 "logic": { "!!": { "var": "data.user.email" } },
111 "message": "User email is required"
112 },
113 {
114 "path": "data",
115 "logic": {
116 "in": [
117 "@",
118 { "var": "data.user.email" }
119 ]
120 },
121 "message": "Email must be valid format"
122 }
123 ]
124 }
125 }
126 }
127 ]
128 }
129 "#;
130
131 // Parse and add the workflow to the engine
132 let workflow = Workflow::from_json(workflow_json)?;
133 engine.add_workflow(&workflow);
134
135 // Create a message to process with properly initialized data structure
136 let mut message = Message::new(&json!({}));
137
138 // Process the message through the workflow asynchronously
139 println!("Processing message through workflow...");
140
141 match engine.process_message(&mut message).await {
142 Ok(_) => {
143 println!("Workflow completed successfully!");
144 }
145 Err(e) => {
146 eprintln!("Error executing workflow: {e:?}");
147 if !message.errors.is_empty() {
148 println!("\nErrors recorded in message:");
149 for err in &message.errors {
150 println!(
151 "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152 err.workflow_id, err.task_id, err.error_message
153 );
154 }
155 }
156 }
157 }
158
159 println!(
160 "\nFull message structure:\n{}",
161 serde_json::to_string_pretty(&message)?
162 );
163
164 Ok(())
165}
318async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
319 println!("=== Custom Function Example ===\n");
320
321 // Create engine without built-in functions to demonstrate custom ones
322 let mut engine = Engine::new_empty();
323
324 // Register our custom functions
325 engine.register_task_function(
326 "statistics".to_string(),
327 Box::new(StatisticsFunction::new()),
328 );
329
330 engine.register_task_function(
331 "enrich_data".to_string(),
332 Box::new(DataEnrichmentFunction::new()),
333 );
334
335 // Also register built-in map function for data preparation
336 engine.register_task_function(
337 "map".to_string(),
338 Box::new(dataflow_rs::engine::functions::MapFunction::new()),
339 );
340
341 // Define a workflow that uses our custom functions
342 let workflow_json = r#"
343 {
344 "id": "custom_function_demo",
345 "name": "Custom Function Demo",
346 "description": "Demonstrates custom async functions in workflow",
347 "condition": { "==": [true, true] },
348 "tasks": [
349 {
350 "id": "prepare_data",
351 "name": "Prepare Data",
352 "description": "Extract and prepare data for analysis",
353 "function": {
354 "name": "map",
355 "input": {
356 "mappings": [
357 {
358 "path": "data.numbers",
359 "logic": { "var": "temp_data.measurements" }
360 },
361 {
362 "path": "data.user_id",
363 "logic": { "var": "temp_data.user_id" }
364 }
365 ]
366 }
367 }
368 },
369 {
370 "id": "calculate_stats",
371 "name": "Calculate Statistics",
372 "description": "Calculate statistical measures from numeric data",
373 "function": {
374 "name": "statistics",
375 "input": {
376 "data_path": "data.numbers",
377 "output_path": "data.stats"
378 }
379 }
380 },
381 {
382 "id": "enrich_user_data",
383 "name": "Enrich User Data",
384 "description": "Add additional user information",
385 "function": {
386 "name": "enrich_data",
387 "input": {
388 "lookup_field": "user_id",
389 "lookup_value": "user_123",
390 "output_path": "data.user_info"
391 }
392 }
393 }
394 ]
395 }
396 "#;
397
398 // Parse and add the workflow
399 let workflow = Workflow::from_json(workflow_json)?;
400 engine.add_workflow(&workflow);
401
402 // Create sample data
403 let sample_data = json!({
404 "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
405 "user_id": "user_123",
406 "timestamp": "2024-01-15T10:30:00Z"
407 });
408
409 // Create and process message
410 let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
411 message.temp_data = sample_data;
412 message.data = json!({});
413
414 println!("Processing message with custom functions...\n");
415
416 // Process the message through our custom workflow
417 match engine.process_message(&mut message).await {
418 Ok(_) => {
419 println!("✅ Message processed successfully!\n");
420
421 println!("📊 Final Results:");
422 println!("{}\n", serde_json::to_string_pretty(&message.data)?);
423
424 println!("📋 Audit Trail:");
425 for (i, audit) in message.audit_trail.iter().enumerate() {
426 println!(
427 "{}. Task: {} (Status: {})",
428 i + 1,
429 audit.task_id,
430 audit.status_code
431 );
432 println!(" Timestamp: {}", audit.timestamp);
433 println!(" Changes: {} field(s) modified", audit.changes.len());
434 }
435
436 if message.has_errors() {
437 println!("\n⚠️ Errors encountered:");
438 for error in &message.errors {
439 println!(
440 " - {}: {:?}",
441 error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
442 error.error_message
443 );
444 }
445 }
446 }
447 Err(e) => {
448 println!("❌ Error processing message: {e:?}");
449 }
450 }
451
452 // Demonstrate another example with different data
453 let separator = "=".repeat(50);
454 println!("\n{separator}");
455 println!("=== Second Example with Different User ===\n");
456
457 let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
458 message2.temp_data = json!({
459 "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
460 "user_id": "user_456",
461 "timestamp": "2024-01-15T11:00:00Z"
462 });
463 message2.data = json!({});
464
465 // Create a workflow for the second user
466 let workflow2_json = r#"
467 {
468 "id": "custom_function_demo_2",
469 "name": "Custom Function Demo 2",
470 "description": "Second demo with different user",
471 "condition": { "==": [true, true] },
472 "tasks": [
473 {
474 "id": "prepare_data",
475 "name": "Prepare Data",
476 "function": {
477 "name": "map",
478 "input": {
479 "mappings": [
480 {
481 "path": "data.numbers",
482 "logic": { "var": "temp_data.measurements" }
483 },
484 {
485 "path": "data.user_id",
486 "logic": { "var": "temp_data.user_id" }
487 }
488 ]
489 }
490 }
491 },
492 {
493 "id": "calculate_stats",
494 "name": "Calculate Statistics",
495 "function": {
496 "name": "statistics",
497 "input": {
498 "data_path": "data.numbers",
499 "output_path": "data.analysis"
500 }
501 }
502 },
503 {
504 "id": "enrich_user_data",
505 "name": "Enrich User Data",
506 "function": {
507 "name": "enrich_data",
508 "input": {
509 "lookup_field": "user_id",
510 "lookup_value": "user_456",
511 "output_path": "data.employee_details"
512 }
513 }
514 }
515 ]
516 }
517 "#;
518
519 let workflow2 = Workflow::from_json(workflow2_json)?;
520 engine.add_workflow(&workflow2);
521
522 match engine.process_message(&mut message2).await {
523 Ok(_) => {
524 println!("✅ Second message processed successfully!\n");
525 println!("📊 Results for user_456:");
526 println!("{}", serde_json::to_string_pretty(&message2.data)?);
527 }
528 Err(e) => {
529 println!("❌ Error processing second message: {e:?}");
530 }
531 }
532
533 println!("\n🎉 Custom function examples completed!");
534
535 Ok(())
536}
Sourcepub async fn process_message_concurrent(
&self,
message: &mut Message,
) -> Result<()>
pub async fn process_message_concurrent( &self, message: &mut Message, ) -> Result<()>
Process a message with automatic concurrency control. This method will wait if the maximum concurrency level is reached.
Use this when spawning concurrent tasks to ensure the engine’s concurrency limit is respected.
§Example
use tokio::task::JoinSet;
let mut tasks = JoinSet::new();
for msg in messages {
let engine = engine.clone();
tasks.spawn(async move {
engine.process_message_concurrent(msg).await
});
}