pub struct Workflow {
pub id: String,
pub name: String,
pub priority: u32,
pub description: Option<String>,
pub condition: Option<Value>,
pub tasks: Vec<Task>,
}
Fields§
§id: String
§name: String
§priority: u32
§description: Option<String>
§condition: Option<Value>
§tasks: Vec<Task>
Implementations§
Source§impl Workflow
impl Workflow
pub fn new() -> Self
Sourcepub fn from_json(json_str: &str) -> Result<Self>
pub fn from_json(json_str: &str) -> Result<Self>
Load workflow from JSON string
Examples found in repository?
examples/complete_workflow.rs (line 132)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6 // Create the workflow engine (built-in functions are auto-registered)
7 let engine = Engine::new();
8
9 // Define a workflow that:
10 // 1. Fetches data from a public API
11 // 2. Enriches the message with transformed data
12 // 3. Validates the enriched data
13 let workflow_json = r#"
14 {
15 "id": "complete_workflow",
16 "name": "Complete Workflow Example",
17 "priority": 0,
18 "description": "Demonstrates fetch -> enrich -> validate flow",
19 "condition": { "==": [true, true] },
20 "tasks": [
21 {
22 "id": "fetch_user_data",
23 "name": "Fetch User Data",
24 "description": "Get user data from a public API",
25 "function": {
26 "name": "http",
27 "input": {
28 "url": "https://jsonplaceholder.typicode.com/users/1",
29 "method": "GET",
30 "headers": {
31 "Accept": "application/json"
32 }
33 }
34 }
35 },
36 {
37 "id": "initialize_user",
38 "name": "Initialize User Structure",
39 "description": "Create empty user object in data",
40 "function": {
41 "name": "map",
42 "input": {
43 "mappings": [
44 {
45 "path": "data",
46 "logic": { "preserve": {"user": {}} }
47 }
48 ]
49 }
50 }
51 },
52 {
53 "id": "transform_data",
54 "name": "Transform Data",
55 "description": "Map API response to our data model",
56 "function": {
57 "name": "map",
58 "input": {
59 "mappings": [
60 {
61 "path": "data.user.id",
62 "logic": { "var": "temp_data.body.id" }
63 },
64 {
65 "path": "data.user.name",
66 "logic": { "var": "temp_data.body.name" }
67 },
68 {
69 "path": "data.user.email",
70 "logic": { "var": "temp_data.body.email" }
71 },
72 {
73 "path": "data.user.address",
74 "logic": {
75 "cat": [
76 { "var": "temp_data.body.address.street" },
77 ", ",
78 { "var": "temp_data.body.address.city" }
79 ]
80 }
81 },
82 {
83 "path": "data.user.company",
84 "logic": { "var": "temp_data.body.company.name" }
85 }
86 ]
87 }
88 }
89 },
90 {
91 "id": "validate_user_data",
92 "name": "Validate User Data",
93 "description": "Ensure the user data meets our requirements",
94 "function": {
95 "name": "validate",
96 "input": {
97 "rules": [
98 {
99 "path": "data",
100 "logic": { "!!": { "var": "data.user.id" } },
101 "message": "User ID is required"
102 },
103 {
104 "path": "data",
105 "logic": { "!!": { "var": "data.user.name" } },
106 "message": "User name is required"
107 },
108 {
109 "path": "data",
110 "logic": { "!!": { "var": "data.user.email" } },
111 "message": "User email is required"
112 },
113 {
114 "path": "data",
115 "logic": {
116 "in": [
117 "@",
118 { "var": "data.user.email" }
119 ]
120 },
121 "message": "Email must be valid format"
122 }
123 ]
124 }
125 }
126 }
127 ]
128 }
129 "#;
130
131 // Parse and add the workflow to the engine
132 let workflow = Workflow::from_json(workflow_json)?;
133 engine.add_workflow(&workflow);
134
135 // Create a message to process with properly initialized data structure
136 let mut message = Message::new(&json!({}));
137
138 // Process the message through the workflow asynchronously
139 println!("Processing message through workflow...");
140
141 match engine.process_message(&mut message).await {
142 Ok(_) => {
143 println!("Workflow completed successfully!");
144 }
145 Err(e) => {
146 eprintln!("Error executing workflow: {e:?}");
147 if !message.errors.is_empty() {
148 println!("\nErrors recorded in message:");
149 for err in &message.errors {
150 println!(
151 "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152 err.workflow_id, err.task_id, err.error_message
153 );
154 }
155 }
156 }
157 }
158
159 println!(
160 "\nFull message structure:\n{}",
161 serde_json::to_string_pretty(&message)?
162 );
163
164 Ok(())
165}
More examples
examples/benchmark.rs (line 78)
14async fn main() -> Result<(), Box<dyn std::error::Error>> {
15 println!("========================================");
16 println!("DATAFLOW ENGINE BENCHMARK");
17 println!("========================================\n");
18
19 // Define a workflow that:
20 // 1. Uses pre-loaded data instead of HTTP fetch
21 // 2. Enriches the message with transformed data
22 // 3. Demonstrates proper async workflow execution
23 let workflow_json = r#"
24 {
25 "id": "benchmark_workflow",
26 "name": "Benchmark Workflow Example",
27 "description": "Demonstrates async workflow execution with data transformation",
28 "priority": 1,
29 "condition": { "==": [true, true] },
30 "tasks": [
31 {
32 "id": "transform_data",
33 "name": "Transform Data",
34 "description": "Map API response to our data model",
35 "function": {
36 "name": "map",
37 "input": {
38 "mappings": [
39 {
40 "path": "data.user.id",
41 "logic": { "var": "temp_data.body.id" }
42 },
43 {
44 "path": "data.user.name",
45 "logic": { "var": "temp_data.body.name" }
46 },
47 {
48 "path": "data.user.email",
49 "logic": { "var": "temp_data.body.email" }
50 },
51 {
52 "path": "data.user.address",
53 "logic": {
54 "cat": [
55 { "var": "temp_data.body.address.street" },
56 ", ",
57 { "var": "temp_data.body.address.city" }
58 ]
59 }
60 },
61 {
62 "path": "data.user.company",
63 "logic": { "var": "temp_data.body.company.name" }
64 },
65 {
66 "path": "data.processed_at",
67 "logic": { "cat": ["Processed at ", { "var": "metadata.timestamp" }] }
68 }
69 ]
70 }
71 }
72 }
73 ]
74 }
75 "#;
76
77 // Parse the workflow
78 let workflow = Workflow::from_json(workflow_json)?;
79
80 // Create sample user data (similar to what the HTTP endpoint would return)
81 let sample_user_data = json!({
82 "body": {
83 "id": 1,
84 "name": "Leanne Graham",
85 "username": "Bret",
86 "email": "Sincere@april.biz",
87 "address": {
88 "street": "Kulas Light",
89 "suite": "Apt. 556",
90 "city": "Gwenborough",
91 "zipcode": "92998-3874",
92 "geo": {
93 "lat": "-37.3159",
94 "lng": "81.1496"
95 }
96 },
97 "phone": "1-770-736-8031 x56442",
98 "website": "hildegard.org",
99 "company": {
100 "name": "Romaguera-Crona",
101 "catchPhrase": "Multi-layered client-server neural-net",
102 "bs": "harness real-time e-markets"
103 }
104 }
105 });
106
107 let iterations = 100000;
108
109 println!("Testing with {} iterations\n", iterations);
110
111 // Test sequential performance with concurrency 1
112 println!("--- Sequential Performance (Baseline) ---");
113 println!("Concurrency | Avg Time per Message | Total Time | Messages/sec");
114 println!("------------|---------------------|------------|-------------");
115
116 // Sequential baseline
117 let engine = Arc::new(Engine::with_concurrency(1));
118 engine.add_workflow(&workflow);
119
120 let seq_results = run_sequential_benchmark(&*engine, &sample_user_data, iterations).await?;
121 let throughput = (iterations as f64) / seq_results.total_time.as_secs_f64();
122
123 println!(
124 "{:^11} | {:>19.3}μs | {:>10.2}ms | {:>12.0}",
125 1,
126 seq_results.avg_time.as_secs_f64() * 1_000_000.0,
127 seq_results.total_time.as_secs_f64() * 1000.0,
128 throughput
129 );
130
131 log_benchmark_results(
132 iterations,
133 seq_results.min_time,
134 seq_results.max_time,
135 seq_results.avg_time,
136 seq_results.p95,
137 seq_results.p99,
138 seq_results.total_time,
139 format!("seq_x1"),
140 )?;
141
142 // Test concurrent performance
143 println!("\n--- Concurrent Performance ---");
144 println!("Concurrency | Avg Time per Message | Total Time | Messages/sec | Speedup");
145 println!("------------|---------------------|------------|--------------|--------");
146
147 // Test configurations with different concurrency levels
148 let test_configs = vec![
149 1, 16, // 16 concurrent messages with 16 DataLogic instances
150 32, // 32 concurrent messages with 32 DataLogic instances
151 64, // 64 concurrent messages with 64 DataLogic instances
152 128, // 128 concurrent messages with 128 DataLogic instances
153 ];
154
155 let baseline_throughput = throughput;
156
157 for concurrency in test_configs {
158 let engine = Arc::new(Engine::with_concurrency(concurrency));
159 engine.add_workflow(&workflow);
160
161 let con_results = run_concurrent_benchmark(
162 engine.clone(),
163 &sample_user_data,
164 iterations,
165 concurrency, // Use same value for concurrent tasks
166 )
167 .await?;
168
169 let throughput = (iterations as f64) / con_results.total_time.as_secs_f64();
170 let speedup = throughput / baseline_throughput;
171
172 println!(
173 "{:^11} | {:>19.3}μs | {:>10.2}ms | {:>12.0} | {:>7.2}x",
174 concurrency,
175 con_results.avg_time.as_secs_f64() * 1_000_000.0,
176 con_results.total_time.as_secs_f64() * 1000.0,
177 throughput,
178 speedup
179 );
180
181 log_benchmark_results(
182 iterations,
183 con_results.min_time,
184 con_results.max_time,
185 con_results.avg_time,
186 con_results.p95,
187 con_results.p99,
188 con_results.total_time,
189 format!("con_x{}", concurrency),
190 )?;
191 }
192
193 println!("\n========================================");
194 println!("Benchmark results saved to '{}'", BENCHMARK_LOG_FILE);
195 println!("========================================");
196
197 Ok(())
198}
examples/custom_function.rs (line 399)
318async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
319 println!("=== Custom Function Example ===\n");
320
321 // Create engine without built-in functions to demonstrate custom ones
322 let mut engine = Engine::new_empty();
323
324 // Register our custom functions
325 engine.register_task_function(
326 "statistics".to_string(),
327 Box::new(StatisticsFunction::new()),
328 );
329
330 engine.register_task_function(
331 "enrich_data".to_string(),
332 Box::new(DataEnrichmentFunction::new()),
333 );
334
335 // Also register built-in map function for data preparation
336 engine.register_task_function(
337 "map".to_string(),
338 Box::new(dataflow_rs::engine::functions::MapFunction::new()),
339 );
340
341 // Define a workflow that uses our custom functions
342 let workflow_json = r#"
343 {
344 "id": "custom_function_demo",
345 "name": "Custom Function Demo",
346 "description": "Demonstrates custom async functions in workflow",
347 "condition": { "==": [true, true] },
348 "tasks": [
349 {
350 "id": "prepare_data",
351 "name": "Prepare Data",
352 "description": "Extract and prepare data for analysis",
353 "function": {
354 "name": "map",
355 "input": {
356 "mappings": [
357 {
358 "path": "data.numbers",
359 "logic": { "var": "temp_data.measurements" }
360 },
361 {
362 "path": "data.user_id",
363 "logic": { "var": "temp_data.user_id" }
364 }
365 ]
366 }
367 }
368 },
369 {
370 "id": "calculate_stats",
371 "name": "Calculate Statistics",
372 "description": "Calculate statistical measures from numeric data",
373 "function": {
374 "name": "statistics",
375 "input": {
376 "data_path": "data.numbers",
377 "output_path": "data.stats"
378 }
379 }
380 },
381 {
382 "id": "enrich_user_data",
383 "name": "Enrich User Data",
384 "description": "Add additional user information",
385 "function": {
386 "name": "enrich_data",
387 "input": {
388 "lookup_field": "user_id",
389 "lookup_value": "user_123",
390 "output_path": "data.user_info"
391 }
392 }
393 }
394 ]
395 }
396 "#;
397
398 // Parse and add the workflow
399 let workflow = Workflow::from_json(workflow_json)?;
400 engine.add_workflow(&workflow);
401
402 // Create sample data
403 let sample_data = json!({
404 "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
405 "user_id": "user_123",
406 "timestamp": "2024-01-15T10:30:00Z"
407 });
408
409 // Create and process message
410 let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
411 message.temp_data = sample_data;
412 message.data = json!({});
413
414 println!("Processing message with custom functions...\n");
415
416 // Process the message through our custom workflow
417 match engine.process_message(&mut message).await {
418 Ok(_) => {
419 println!("✅ Message processed successfully!\n");
420
421 println!("📊 Final Results:");
422 println!("{}\n", serde_json::to_string_pretty(&message.data)?);
423
424 println!("📋 Audit Trail:");
425 for (i, audit) in message.audit_trail.iter().enumerate() {
426 println!(
427 "{}. Task: {} (Status: {})",
428 i + 1,
429 audit.task_id,
430 audit.status_code
431 );
432 println!(" Timestamp: {}", audit.timestamp);
433 println!(" Changes: {} field(s) modified", audit.changes.len());
434 }
435
436 if message.has_errors() {
437 println!("\n⚠️ Errors encountered:");
438 for error in &message.errors {
439 println!(
440 " - {}: {:?}",
441 error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
442 error.error_message
443 );
444 }
445 }
446 }
447 Err(e) => {
448 println!("❌ Error processing message: {e:?}");
449 }
450 }
451
452 // Demonstrate another example with different data
453 let separator = "=".repeat(50);
454 println!("\n{separator}");
455 println!("=== Second Example with Different User ===\n");
456
457 let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
458 message2.temp_data = json!({
459 "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
460 "user_id": "user_456",
461 "timestamp": "2024-01-15T11:00:00Z"
462 });
463 message2.data = json!({});
464
465 // Create a workflow for the second user
466 let workflow2_json = r#"
467 {
468 "id": "custom_function_demo_2",
469 "name": "Custom Function Demo 2",
470 "description": "Second demo with different user",
471 "condition": { "==": [true, true] },
472 "tasks": [
473 {
474 "id": "prepare_data",
475 "name": "Prepare Data",
476 "function": {
477 "name": "map",
478 "input": {
479 "mappings": [
480 {
481 "path": "data.numbers",
482 "logic": { "var": "temp_data.measurements" }
483 },
484 {
485 "path": "data.user_id",
486 "logic": { "var": "temp_data.user_id" }
487 }
488 ]
489 }
490 }
491 },
492 {
493 "id": "calculate_stats",
494 "name": "Calculate Statistics",
495 "function": {
496 "name": "statistics",
497 "input": {
498 "data_path": "data.numbers",
499 "output_path": "data.analysis"
500 }
501 }
502 },
503 {
504 "id": "enrich_user_data",
505 "name": "Enrich User Data",
506 "function": {
507 "name": "enrich_data",
508 "input": {
509 "lookup_field": "user_id",
510 "lookup_value": "user_456",
511 "output_path": "data.employee_details"
512 }
513 }
514 }
515 ]
516 }
517 "#;
518
519 let workflow2 = Workflow::from_json(workflow2_json)?;
520 engine.add_workflow(&workflow2);
521
522 match engine.process_message(&mut message2).await {
523 Ok(_) => {
524 println!("✅ Second message processed successfully!\n");
525 println!("📊 Results for user_456:");
526 println!("{}", serde_json::to_string_pretty(&message2.data)?);
527 }
528 Err(e) => {
529 println!("❌ Error processing second message: {e:?}");
530 }
531 }
532
533 println!("\n🎉 Custom function examples completed!");
534
535 Ok(())
536}
Trait Implementations§
Source§impl<'de> Deserialize<'de> for Workflow
impl<'de> Deserialize<'de> for Workflow
Source§fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
Deserialize this value from the given Serde deserializer. Read more
Auto Trait Implementations§
impl Freeze for Workflow
impl RefUnwindSafe for Workflow
impl Send for Workflow
impl Sync for Workflow
impl Unpin for Workflow
impl UnwindSafe for Workflow
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more