Workflow

Struct Workflow 

Source
pub struct Workflow {
    pub id: String,
    pub name: String,
    pub priority: u32,
    pub description: Option<String>,
    pub condition: Value,
    pub condition_index: Option<usize>,
    pub tasks: Vec<Task>,
}
Expand description

Workflow represents a collection of tasks that execute sequentially

Fields§

§id: String§name: String§priority: u32§description: Option<String>§condition: Value§condition_index: Option<usize>§tasks: Vec<Task>

Implementations§

Source§

impl Workflow

Source

pub fn new() -> Self

Source

pub fn from_json(json_str: &str) -> Result<Self>

Load workflow from JSON string

Examples found in repository?
examples/complete_workflow.rs (line 112)
4fn main() -> Result<(), Box<dyn std::error::Error>> {
5    // Define a workflow that:
6    // 1. Prepares sample user data
7    // 2. Enriches the message with transformed data
8    // 3. Validates the enriched data
9    let workflow_json = r#"
10    {
11        "id": "complete_workflow",
12        "name": "Complete Workflow Example",
13        "priority": 0,
14        "description": "Demonstrates enrich -> validate flow",
15        "tasks": [
16            {
17                "id": "initialize_user",
18                "name": "Initialize User Structure",
19                "description": "Create empty user object in data",
20                "function": {
21                    "name": "map",
22                    "input": {
23                        "mappings": [
24                            {
25                                "path": "data.user",
26                                "logic": {}
27                            }
28                        ]
29                    }
30                }
31            },
32            {
33                "id": "transform_data",
34                "name": "Transform Data",
35                "description": "Map API response to our data model",
36                "function": {
37                    "name": "map",
38                    "input": {
39                        "mappings": [
40                            {
41                                "path": "data.user.id", 
42                                "logic": { "var": "temp_data.body.id" }
43                            },
44                            {
45                                "path": "data.user.name", 
46                                "logic": { "var": "temp_data.body.name" }
47                            },
48                            {
49                                "path": "data.user.email", 
50                                "logic": { "var": "temp_data.body.email" }
51                            },
52                            {
53                                "path": "data.user.address", 
54                                "logic": {
55                                    "cat": [
56                                        { "var": "temp_data.body.address.street" },
57                                        ", ",
58                                        { "var": "temp_data.body.address.city" }
59                                    ]
60                                }
61                            },
62                            {
63                                "path": "data.user.company", 
64                                "logic": { "var": "temp_data.body.company.name" }
65                            }
66                        ]
67                    }
68                }
69            },
70            {
71                "id": "validate_user_data",
72                "name": "Validate User Data",
73                "description": "Ensure the user data meets our requirements",
74                "function": {
75                    "name": "validate",
76                    "input": {
77                        "rules": [
78                            {
79                                "path": "data",
80                                "logic": { "!!": { "var": "data.user.id" } },
81                                "message": "User ID is required"
82                            },
83                            {
84                                "path": "data",
85                                "logic": { "!!": { "var": "data.user.name" } },
86                                "message": "User name is required"
87                            },
88                            {
89                                "path": "data",
90                                "logic": { "!!": { "var": "data.user.email" } },
91                                "message": "User email is required"
92                            },
93                            {
94                                "path": "data",
95                                "logic": {
96                                    "in": [
97                                        "@",
98                                        { "var": "data.user.email" }
99                                    ]
100                                },
101                                "message": "Email must be valid format"
102                            }
103                        ]
104                    }
105                }
106            }
107        ]
108    }
109    "#;
110
111    // Parse the workflow
112    let workflow = Workflow::from_json(workflow_json)?;
113
114    // Create the workflow engine with the workflow (built-in functions are auto-registered by default)
115    let engine = Engine::new(vec![workflow], None, None);
116
117    // Create a message to process with sample user data
118    let mut message = Message::new(&json!({}));
119
120    // Add sample user data to temp_data (simulating what would come from an API)
121    message.temp_data = json!({
122        "body": {
123            "id": 1,
124            "name": "John Doe",
125            "email": "john.doe@example.com",
126            "address": {
127                "street": "123 Main St",
128                "city": "New York"
129            },
130            "company": {
131                "name": "Acme Corp"
132            }
133        }
134    });
135
136    // Process the message through the workflow
137    println!("Processing message through workflow...");
138
139    match engine.process_message(&mut message) {
140        Ok(_) => {
141            println!("Workflow completed successfully!");
142        }
143        Err(e) => {
144            eprintln!("Error executing workflow: {e:?}");
145            if !message.errors.is_empty() {
146                println!("\nErrors recorded in message:");
147                for err in &message.errors {
148                    println!(
149                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
150                        err.workflow_id, err.task_id, err.error_message
151                    );
152                }
153            }
154        }
155    }
156
157    println!(
158        "\nFull message structure:\n{}",
159        serde_json::to_string_pretty(&message)?
160    );
161
162    Ok(())
163}
More examples
Hide additional examples
examples/benchmark.rs (line 89)
7fn main() -> Result<(), Box<dyn std::error::Error>> {
8    println!("========================================");
9    println!("DATAFLOW ENGINE BENCHMARK");
10    println!("========================================\n");
11    println!(
12        "Running {} iterations on single-threaded engine\n",
13        ITERATIONS
14    );
15
16    // Define a simple workflow with data transformation
17    let workflow_json = r#"
18    {
19        "id": "benchmark_workflow",
20        "name": "Benchmark Workflow",
21        "description": "Simple workflow for performance testing",
22        "priority": 1,
23        "tasks": [
24            {
25                "id": "transform_data",
26                "name": "Transform Data",
27                "description": "Map data fields",
28                "function": {
29                    "name": "map",
30                    "input": {
31                        "mappings": [
32                            {
33                                "path": "data.user.id", 
34                                "logic": { "var": "temp_data.id" }
35                            },
36                            {
37                                "path": "data.user.name", 
38                                "logic": { "var": "temp_data.name" }
39                            },
40                            {
41                                "path": "data.user.email", 
42                                "logic": { "var": "temp_data.email" }
43                            },
44                            {
45                                "path": "data.user.age",
46                                "logic": { "+": [{ "var": "temp_data.age" }, 1] }
47                            },
48                            {
49                                "path": "data.user.status",
50                                "logic": { 
51                                    "if": [
52                                        { ">": [{ "var": "temp_data.age" }, 18] },
53                                        "adult",
54                                        "minor"
55                                    ]
56                                }
57                            }
58                        ]
59                    }
60                }
61            },
62            {
63                "id": "validate_data",
64                "name": "Validate Data",
65                "description": "Validate transformed data",
66                "function": {
67                    "name": "validate",
68                    "input": {
69                        "rules": [
70                            {
71                                "path": "data",
72                                "logic": { "!!": { "var": "data.user.id" } },
73                                "message": "User ID is required"
74                            },
75                            {
76                                "path": "data",
77                                "logic": { "!!": { "var": "data.user.email" } },
78                                "message": "User email is required"
79                            }
80                        ]
81                    }
82                }
83            }
84        ]
85    }
86    "#;
87
88    // Parse the workflow
89    let workflow = Workflow::from_json(workflow_json)?;
90
91    // Create the engine with built-in functions
92    let engine = Engine::new(vec![workflow], None, None);
93
94    // Sample data for benchmarking
95    let sample_data = json!({
96        "id": 12345,
97        "name": "John Doe",
98        "email": "john.doe@example.com",
99        "age": 25,
100        "department": "Engineering"
101    });
102
103    // Warm-up run
104    println!("Warming up...");
105    for _ in 0..1000 {
106        let mut message = Message::new(&json!({}));
107        message.temp_data = sample_data.clone();
108        let _ = engine.process_message(&mut message);
109    }
110
111    // Benchmark run
112    println!("Starting benchmark...\n");
113
114    let mut all_durations = Vec::with_capacity(ITERATIONS);
115    let mut success_count = 0;
116    let mut error_count = 0;
117
118    let benchmark_start = Instant::now();
119
120    for i in 0..ITERATIONS {
121        let mut message = Message::new(&json!({}));
122        message.temp_data = sample_data.clone();
123        message.metadata = json!({
124            "iteration": i,
125            "timestamp": chrono::Utc::now().to_rfc3339()
126        });
127
128        let iteration_start = Instant::now();
129        match engine.process_message(&mut message) {
130            Ok(_) => {
131                success_count += 1;
132                if message.has_errors() {
133                    error_count += 1;
134                }
135            }
136            Err(_) => {
137                error_count += 1;
138            }
139        }
140        let iteration_duration = iteration_start.elapsed();
141        all_durations.push(iteration_duration);
142
143        // Progress indicator every 10k iterations
144        if (i + 1) % 10000 == 0 {
145            print!(".");
146            use std::io::Write;
147            std::io::stdout().flush()?;
148        }
149    }
150
151    let total_time = benchmark_start.elapsed();
152    println!("\n\nBenchmark Complete!");
153    println!("==========================================\n");
154
155    // Calculate statistics
156    all_durations.sort_unstable();
157    let p50 = all_durations[ITERATIONS * 50 / 100];
158    let p90 = all_durations[ITERATIONS * 90 / 100];
159    let p95 = all_durations[ITERATIONS * 95 / 100];
160    let p99 = all_durations[ITERATIONS * 99 / 100];
161    let throughput = ITERATIONS as f64 / total_time.as_secs_f64();
162
163    // Display results
164    println!("📊 PERFORMANCE METRICS");
165    println!("──────────────────────────────────────────");
166    println!("Total iterations:    {:>10}", ITERATIONS);
167    println!("Successful:          {:>10}", success_count);
168    println!("Errors:              {:>10}", error_count);
169    println!(
170        "Total time:          {:>10.3} seconds",
171        total_time.as_secs_f64()
172    );
173    println!();
174
175    println!("Messages/second:     {:>10.0}", throughput);
176    println!();
177
178    println!("📉 LATENCY PERCENTILES");
179    println!("──────────────────────────────────────────");
180    println!("P50:                 {:>10.3} μs", p50.as_micros() as f64);
181    println!("P90:                 {:>10.3} μs", p90.as_micros() as f64);
182    println!("P95:                 {:>10.3} μs", p95.as_micros() as f64);
183    println!("P99:                 {:>10.3} μs", p99.as_micros() as f64);
184    println!();
185
186    Ok(())
187}
examples/custom_function.rs (line 394)
334fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
335    println!("=== Custom Function Example ===\n");
336
337    // Define a workflow that uses our custom functions
338    let workflow_json = r#"
339    {
340        "id": "custom_function_demo",
341        "name": "Custom Function Demo",
342        "description": "Demonstrates custom functions in workflow",
343        "tasks": [
344            {
345                "id": "prepare_data",
346                "name": "Prepare Data",
347                "description": "Extract and prepare data for analysis",
348                "function": {
349                    "name": "map",
350                    "input": {
351                        "mappings": [
352                            {
353                                "path": "data.numbers",
354                                "logic": { "var": "temp_data.measurements" }
355                            },
356                            {
357                                "path": "data.user_id",
358                                "logic": { "var": "temp_data.user_id" }
359                            }
360                        ]
361                    }
362                }
363            },
364            {
365                "id": "calculate_stats",
366                "name": "Calculate Statistics",
367                "description": "Calculate statistical measures from numeric data",
368                "function": {
369                    "name": "statistics",
370                    "input": {
371                        "data_path": "data.numbers",
372                        "output_path": "data.stats"
373                    }
374                }
375            },
376            {
377                "id": "enrich_user_data",
378                "name": "Enrich User Data",
379                "description": "Add additional user information",
380                "function": {
381                    "name": "enrich_data",
382                    "input": {
383                        "lookup_field": "user_id",
384                        "lookup_value": "user_123",
385                        "output_path": "data.user_info"
386                    }
387                }
388            }
389        ]
390    }
391    "#;
392
393    // Parse the first workflow
394    let workflow = Workflow::from_json(workflow_json)?;
395
396    // Demonstrate another example with different data
397    let separator = "=".repeat(50);
398    println!("\n{separator}");
399    println!("=== Second Example with Different User ===\n");
400
401    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
402    message2.temp_data = json!({
403        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
404        "user_id": "user_456",
405        "timestamp": "2024-01-15T11:00:00Z"
406    });
407    message2.data = json!({});
408
409    // Create a workflow for the second user
410    let workflow2_json = r#"
411    {
412        "id": "custom_function_demo_2",
413        "name": "Custom Function Demo 2",
414        "description": "Second demo with different user",
415        "tasks": [
416            {
417                "id": "prepare_data",
418                "name": "Prepare Data",
419                "function": {
420                    "name": "map",
421                    "input": {
422                        "mappings": [
423                            {
424                                "path": "data.numbers",
425                                "logic": { "var": "temp_data.measurements" }
426                            },
427                            {
428                                "path": "data.user_id",
429                                "logic": { "var": "temp_data.user_id" }
430                            }
431                        ]
432                    }
433                }
434            },
435            {
436                "id": "calculate_stats",
437                "name": "Calculate Statistics",
438                "function": {
439                    "name": "statistics",
440                    "input": {
441                        "data_path": "data.numbers",
442                        "output_path": "data.analysis"
443                    }
444                }
445            },
446            {
447                "id": "enrich_user_data",
448                "name": "Enrich User Data",
449                "function": {
450                    "name": "enrich_data",
451                    "input": {
452                        "lookup_field": "user_id",
453                        "lookup_value": "user_456",
454                        "output_path": "data.employee_details"
455                    }
456                }
457            }
458        ]
459    }
460    "#;
461
462    let workflow2 = Workflow::from_json(workflow2_json)?;
463
464    // Prepare custom functions
465    let mut custom_functions = HashMap::new();
466    custom_functions.insert(
467        "statistics".to_string(),
468        Box::new(StatisticsFunction::new()) as Box<dyn FunctionHandler + Send + Sync>,
469    );
470    custom_functions.insert(
471        "enrich_data".to_string(),
472        Box::new(DataEnrichmentFunction::new()) as Box<dyn FunctionHandler + Send + Sync>,
473    );
474    // Note: map and validate are now built-in to the Engine and will be used automatically
475
476    // Create engine with custom functions and built-ins (map/validate are always included internally)
477    let engine = Engine::new(
478        vec![workflow, workflow2],
479        Some(custom_functions),
480        None, // Use default (includes built-ins)
481    );
482
483    // Create sample data for first message
484    let sample_data = json!({
485        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
486        "user_id": "user_123",
487        "timestamp": "2024-01-15T10:30:00Z"
488    });
489
490    // Create and process first message
491    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
492    message.temp_data = sample_data;
493    message.data = json!({});
494
495    println!("Processing message with custom functions...\n");
496
497    // Process the message through our custom workflow
498    match engine.process_message(&mut message) {
499        Ok(_) => {
500            println!("✅ Message processed successfully!\n");
501
502            println!("📊 Final Results:");
503            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
504
505            println!("📋 Audit Trail:");
506            for (i, audit) in message.audit_trail.iter().enumerate() {
507                println!(
508                    "{}. Task: {} (Status: {})",
509                    i + 1,
510                    audit.task_id,
511                    audit.status_code
512                );
513                println!("   Timestamp: {}", audit.timestamp);
514                println!("   Changes: {} field(s) modified", audit.changes.len());
515            }
516
517            if message.has_errors() {
518                println!("\n⚠️  Errors encountered:");
519                for error in &message.errors {
520                    println!(
521                        "   - {}: {:?}",
522                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
523                        error.error_message
524                    );
525                }
526            }
527        }
528        Err(e) => {
529            println!("❌ Error processing message: {e:?}");
530        }
531    }
532
533    // Process second message
534    match engine.process_message(&mut message2) {
535        Ok(_) => {
536            println!("✅ Second message processed successfully!\n");
537            println!("📊 Results for user_456:");
538            println!("{}", serde_json::to_string_pretty(&message2.data)?);
539        }
540        Err(e) => {
541            println!("❌ Error processing second message: {e:?}");
542        }
543    }
544
545    println!("\n🎉 Custom function examples completed!");
546
547    Ok(())
548}
Source

pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self>

Load workflow from JSON file

Source

pub fn validate(&self) -> Result<()>

Validate the workflow structure

Trait Implementations§

Source§

impl Clone for Workflow

Source§

fn clone(&self) -> Workflow

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for Workflow

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Default for Workflow

Source§

fn default() -> Self

Returns the “default value” for a type. Read more
Source§

impl<'de> Deserialize<'de> for Workflow

Source§

fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>
where __D: Deserializer<'de>,

Deserialize this value from the given Serde deserializer. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> DeserializeOwned for T
where T: for<'de> Deserialize<'de>,