Workflow

Struct Workflow 

Source
pub struct Workflow {
    pub id: String,
    pub name: String,
    pub priority: u32,
    pub description: Option<String>,
    pub condition: Option<Value>,
    pub tasks: Vec<Task>,
}

Fields§

§id: String§name: String§priority: u32§description: Option<String>§condition: Option<Value>§tasks: Vec<Task>

Implementations§

Source§

impl Workflow

Source

pub fn new() -> Self

Source

pub fn from_json(json_str: &str) -> Result<Self>

Load workflow from JSON string

Examples found in repository?
examples/complete_workflow.rs (line 132)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6    // Create the workflow engine (built-in functions are auto-registered)
7    let engine = Engine::new();
8
9    // Define a workflow that:
10    // 1. Fetches data from a public API
11    // 2. Enriches the message with transformed data
12    // 3. Validates the enriched data
13    let workflow_json = r#"
14    {
15        "id": "complete_workflow",
16        "name": "Complete Workflow Example",
17        "priority": 0,
18        "description": "Demonstrates fetch -> enrich -> validate flow",
19        "condition": { "==": [true, true] },
20        "tasks": [
21            {
22                "id": "fetch_user_data",
23                "name": "Fetch User Data",
24                "description": "Get user data from a public API",
25                "function": {
26                    "name": "http",
27                    "input": {
28                        "url": "https://jsonplaceholder.typicode.com/users/1",
29                        "method": "GET",
30                        "headers": {
31                            "Accept": "application/json"
32                        }
33                    }
34                }
35            },
36            {
37                "id": "initialize_user",
38                "name": "Initialize User Structure",
39                "description": "Create empty user object in data",
40                "function": {
41                    "name": "map",
42                    "input": {
43                        "mappings": [
44                            {
45                                "path": "data",
46                                "logic": { "preserve": {"user": {}} }
47                            }
48                        ]
49                    }
50                }
51            },
52            {
53                "id": "transform_data",
54                "name": "Transform Data",
55                "description": "Map API response to our data model",
56                "function": {
57                    "name": "map",
58                    "input": {
59                        "mappings": [
60                            {
61                                "path": "data.user.id", 
62                                "logic": { "var": "temp_data.body.id" }
63                            },
64                            {
65                                "path": "data.user.name", 
66                                "logic": { "var": "temp_data.body.name" }
67                            },
68                            {
69                                "path": "data.user.email", 
70                                "logic": { "var": "temp_data.body.email" }
71                            },
72                            {
73                                "path": "data.user.address", 
74                                "logic": {
75                                    "cat": [
76                                        { "var": "temp_data.body.address.street" },
77                                        ", ",
78                                        { "var": "temp_data.body.address.city" }
79                                    ]
80                                }
81                            },
82                            {
83                                "path": "data.user.company", 
84                                "logic": { "var": "temp_data.body.company.name" }
85                            }
86                        ]
87                    }
88                }
89            },
90            {
91                "id": "validate_user_data",
92                "name": "Validate User Data",
93                "description": "Ensure the user data meets our requirements",
94                "function": {
95                    "name": "validate",
96                    "input": {
97                        "rules": [
98                            {
99                                "path": "data",
100                                "logic": { "!!": { "var": "data.user.id" } },
101                                "message": "User ID is required"
102                            },
103                            {
104                                "path": "data",
105                                "logic": { "!!": { "var": "data.user.name" } },
106                                "message": "User name is required"
107                            },
108                            {
109                                "path": "data",
110                                "logic": { "!!": { "var": "data.user.email" } },
111                                "message": "User email is required"
112                            },
113                            {
114                                "path": "data",
115                                "logic": {
116                                    "in": [
117                                        "@",
118                                        { "var": "data.user.email" }
119                                    ]
120                                },
121                                "message": "Email must be valid format"
122                            }
123                        ]
124                    }
125                }
126            }
127        ]
128    }
129    "#;
130
131    // Parse and add the workflow to the engine
132    let workflow = Workflow::from_json(workflow_json)?;
133    engine.add_workflow(&workflow);
134
135    // Create a message to process with properly initialized data structure
136    let mut message = Message::new(&json!({}));
137
138    // Process the message through the workflow asynchronously
139    println!("Processing message through workflow...");
140
141    match engine.process_message(&mut message).await {
142        Ok(_) => {
143            println!("Workflow completed successfully!");
144        }
145        Err(e) => {
146            eprintln!("Error executing workflow: {e:?}");
147            if !message.errors.is_empty() {
148                println!("\nErrors recorded in message:");
149                for err in &message.errors {
150                    println!(
151                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152                        err.workflow_id, err.task_id, err.error_message
153                    );
154                }
155            }
156        }
157    }
158
159    println!(
160        "\nFull message structure:\n{}",
161        serde_json::to_string_pretty(&message)?
162    );
163
164    Ok(())
165}
More examples
Hide additional examples
examples/benchmark.rs (line 78)
14async fn main() -> Result<(), Box<dyn std::error::Error>> {
15    println!("========================================");
16    println!("DATAFLOW ENGINE BENCHMARK");
17    println!("========================================\n");
18
19    // Define a workflow that:
20    // 1. Uses pre-loaded data instead of HTTP fetch
21    // 2. Enriches the message with transformed data
22    // 3. Demonstrates proper async workflow execution
23    let workflow_json = r#"
24    {
25        "id": "benchmark_workflow",
26        "name": "Benchmark Workflow Example",
27        "description": "Demonstrates async workflow execution with data transformation",
28        "priority": 1,
29        "condition": { "==": [true, true] },
30        "tasks": [
31            {
32                "id": "transform_data",
33                "name": "Transform Data",
34                "description": "Map API response to our data model",
35                "function": {
36                    "name": "map",
37                    "input": {
38                        "mappings": [
39                            {
40                                "path": "data.user.id", 
41                                "logic": { "var": "temp_data.body.id" }
42                            },
43                            {
44                                "path": "data.user.name", 
45                                "logic": { "var": "temp_data.body.name" }
46                            },
47                            {
48                                "path": "data.user.email", 
49                                "logic": { "var": "temp_data.body.email" }
50                            },
51                            {
52                                "path": "data.user.address", 
53                                "logic": {
54                                    "cat": [
55                                        { "var": "temp_data.body.address.street" },
56                                        ", ",
57                                        { "var": "temp_data.body.address.city" }
58                                    ]
59                                }
60                            },
61                            {
62                                "path": "data.user.company", 
63                                "logic": { "var": "temp_data.body.company.name" }
64                            },
65                            {
66                                "path": "data.processed_at", 
67                                "logic": { "cat": ["Processed at ", { "var": "metadata.timestamp" }] }
68                            }
69                        ]
70                    }
71                }
72            }
73        ]
74    }
75    "#;
76
77    // Parse the workflow
78    let workflow = Workflow::from_json(workflow_json)?;
79
80    // Create sample user data (similar to what the HTTP endpoint would return)
81    let sample_user_data = json!({
82        "body": {
83            "id": 1,
84            "name": "Leanne Graham",
85            "username": "Bret",
86            "email": "Sincere@april.biz",
87            "address": {
88                "street": "Kulas Light",
89                "suite": "Apt. 556",
90                "city": "Gwenborough",
91                "zipcode": "92998-3874",
92                "geo": {
93                    "lat": "-37.3159",
94                    "lng": "81.1496"
95                }
96            },
97            "phone": "1-770-736-8031 x56442",
98            "website": "hildegard.org",
99            "company": {
100                "name": "Romaguera-Crona",
101                "catchPhrase": "Multi-layered client-server neural-net",
102                "bs": "harness real-time e-markets"
103            }
104        }
105    });
106
107    let iterations = 100000;
108
109    println!("Testing with {} iterations\n", iterations);
110
111    // Test sequential performance with concurrency 1
112    println!("--- Sequential Performance (Baseline) ---");
113    println!("Concurrency | Avg Time per Message | Total Time | Messages/sec");
114    println!("------------|---------------------|------------|-------------");
115
116    // Sequential baseline
117    let engine = Arc::new(Engine::with_concurrency(1));
118    engine.add_workflow(&workflow);
119
120    let seq_results = run_sequential_benchmark(&*engine, &sample_user_data, iterations).await?;
121    let throughput = (iterations as f64) / seq_results.total_time.as_secs_f64();
122
123    println!(
124        "{:^11} | {:>19.3}μs | {:>10.2}ms | {:>12.0}",
125        1,
126        seq_results.avg_time.as_secs_f64() * 1_000_000.0,
127        seq_results.total_time.as_secs_f64() * 1000.0,
128        throughput
129    );
130
131    log_benchmark_results(
132        iterations,
133        seq_results.min_time,
134        seq_results.max_time,
135        seq_results.avg_time,
136        seq_results.p95,
137        seq_results.p99,
138        seq_results.total_time,
139        format!("seq_x1"),
140    )?;
141
142    // Test concurrent performance
143    println!("\n--- Concurrent Performance ---");
144    println!("Concurrency | Avg Time per Message | Total Time | Messages/sec | Speedup");
145    println!("------------|---------------------|------------|--------------|--------");
146
147    // Test configurations with different concurrency levels
148    let test_configs = vec![
149        1, 16,  // 16 concurrent messages with 16 DataLogic instances
150        32,  // 32 concurrent messages with 32 DataLogic instances
151        64,  // 64 concurrent messages with 64 DataLogic instances
152        128, // 128 concurrent messages with 128 DataLogic instances
153    ];
154
155    let baseline_throughput = throughput;
156
157    for concurrency in test_configs {
158        let engine = Arc::new(Engine::with_concurrency(concurrency));
159        engine.add_workflow(&workflow);
160
161        let con_results = run_concurrent_benchmark(
162            engine.clone(),
163            &sample_user_data,
164            iterations,
165            concurrency, // Use same value for concurrent tasks
166        )
167        .await?;
168
169        let throughput = (iterations as f64) / con_results.total_time.as_secs_f64();
170        let speedup = throughput / baseline_throughput;
171
172        println!(
173            "{:^11} | {:>19.3}μs | {:>10.2}ms | {:>12.0} | {:>7.2}x",
174            concurrency,
175            con_results.avg_time.as_secs_f64() * 1_000_000.0,
176            con_results.total_time.as_secs_f64() * 1000.0,
177            throughput,
178            speedup
179        );
180
181        log_benchmark_results(
182            iterations,
183            con_results.min_time,
184            con_results.max_time,
185            con_results.avg_time,
186            con_results.p95,
187            con_results.p99,
188            con_results.total_time,
189            format!("con_x{}", concurrency),
190        )?;
191    }
192
193    println!("\n========================================");
194    println!("Benchmark results saved to '{}'", BENCHMARK_LOG_FILE);
195    println!("========================================");
196
197    Ok(())
198}
examples/custom_function.rs (line 399)
318async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
319    println!("=== Custom Function Example ===\n");
320
321    // Create engine without built-in functions to demonstrate custom ones
322    let mut engine = Engine::new_empty();
323
324    // Register our custom functions
325    engine.register_task_function(
326        "statistics".to_string(),
327        Box::new(StatisticsFunction::new()),
328    );
329
330    engine.register_task_function(
331        "enrich_data".to_string(),
332        Box::new(DataEnrichmentFunction::new()),
333    );
334
335    // Also register built-in map function for data preparation
336    engine.register_task_function(
337        "map".to_string(),
338        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
339    );
340
341    // Define a workflow that uses our custom functions
342    let workflow_json = r#"
343    {
344        "id": "custom_function_demo",
345        "name": "Custom Function Demo",
346        "description": "Demonstrates custom async functions in workflow",
347        "condition": { "==": [true, true] },
348        "tasks": [
349            {
350                "id": "prepare_data",
351                "name": "Prepare Data",
352                "description": "Extract and prepare data for analysis",
353                "function": {
354                    "name": "map",
355                    "input": {
356                        "mappings": [
357                            {
358                                "path": "data.numbers",
359                                "logic": { "var": "temp_data.measurements" }
360                            },
361                            {
362                                "path": "data.user_id",
363                                "logic": { "var": "temp_data.user_id" }
364                            }
365                        ]
366                    }
367                }
368            },
369            {
370                "id": "calculate_stats",
371                "name": "Calculate Statistics",
372                "description": "Calculate statistical measures from numeric data",
373                "function": {
374                    "name": "statistics",
375                    "input": {
376                        "data_path": "data.numbers",
377                        "output_path": "data.stats"
378                    }
379                }
380            },
381            {
382                "id": "enrich_user_data",
383                "name": "Enrich User Data",
384                "description": "Add additional user information",
385                "function": {
386                    "name": "enrich_data",
387                    "input": {
388                        "lookup_field": "user_id",
389                        "lookup_value": "user_123",
390                        "output_path": "data.user_info"
391                    }
392                }
393            }
394        ]
395    }
396    "#;
397
398    // Parse and add the workflow
399    let workflow = Workflow::from_json(workflow_json)?;
400    engine.add_workflow(&workflow);
401
402    // Create sample data
403    let sample_data = json!({
404        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
405        "user_id": "user_123",
406        "timestamp": "2024-01-15T10:30:00Z"
407    });
408
409    // Create and process message
410    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
411    message.temp_data = sample_data;
412    message.data = json!({});
413
414    println!("Processing message with custom functions...\n");
415
416    // Process the message through our custom workflow
417    match engine.process_message(&mut message).await {
418        Ok(_) => {
419            println!("✅ Message processed successfully!\n");
420
421            println!("📊 Final Results:");
422            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
423
424            println!("📋 Audit Trail:");
425            for (i, audit) in message.audit_trail.iter().enumerate() {
426                println!(
427                    "{}. Task: {} (Status: {})",
428                    i + 1,
429                    audit.task_id,
430                    audit.status_code
431                );
432                println!("   Timestamp: {}", audit.timestamp);
433                println!("   Changes: {} field(s) modified", audit.changes.len());
434            }
435
436            if message.has_errors() {
437                println!("\n⚠️  Errors encountered:");
438                for error in &message.errors {
439                    println!(
440                        "   - {}: {:?}",
441                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
442                        error.error_message
443                    );
444                }
445            }
446        }
447        Err(e) => {
448            println!("❌ Error processing message: {e:?}");
449        }
450    }
451
452    // Demonstrate another example with different data
453    let separator = "=".repeat(50);
454    println!("\n{separator}");
455    println!("=== Second Example with Different User ===\n");
456
457    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
458    message2.temp_data = json!({
459        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
460        "user_id": "user_456",
461        "timestamp": "2024-01-15T11:00:00Z"
462    });
463    message2.data = json!({});
464
465    // Create a workflow for the second user
466    let workflow2_json = r#"
467    {
468        "id": "custom_function_demo_2",
469        "name": "Custom Function Demo 2",
470        "description": "Second demo with different user",
471        "condition": { "==": [true, true] },
472        "tasks": [
473            {
474                "id": "prepare_data",
475                "name": "Prepare Data",
476                "function": {
477                    "name": "map",
478                    "input": {
479                        "mappings": [
480                            {
481                                "path": "data.numbers",
482                                "logic": { "var": "temp_data.measurements" }
483                            },
484                            {
485                                "path": "data.user_id",
486                                "logic": { "var": "temp_data.user_id" }
487                            }
488                        ]
489                    }
490                }
491            },
492            {
493                "id": "calculate_stats",
494                "name": "Calculate Statistics",
495                "function": {
496                    "name": "statistics",
497                    "input": {
498                        "data_path": "data.numbers",
499                        "output_path": "data.analysis"
500                    }
501                }
502            },
503            {
504                "id": "enrich_user_data",
505                "name": "Enrich User Data",
506                "function": {
507                    "name": "enrich_data",
508                    "input": {
509                        "lookup_field": "user_id",
510                        "lookup_value": "user_456",
511                        "output_path": "data.employee_details"
512                    }
513                }
514            }
515        ]
516    }
517    "#;
518
519    let workflow2 = Workflow::from_json(workflow2_json)?;
520    engine.add_workflow(&workflow2);
521
522    match engine.process_message(&mut message2).await {
523        Ok(_) => {
524            println!("✅ Second message processed successfully!\n");
525            println!("📊 Results for user_456:");
526            println!("{}", serde_json::to_string_pretty(&message2.data)?);
527        }
528        Err(e) => {
529            println!("❌ Error processing second message: {e:?}");
530        }
531    }
532
533    println!("\n🎉 Custom function examples completed!");
534
535    Ok(())
536}
Source

pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self>

Load workflow from JSON file

Source

pub fn validate(&self) -> Result<()>

Validate the workflow structure

Trait Implementations§

Source§

impl Clone for Workflow

Source§

fn clone(&self) -> Workflow

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for Workflow

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Default for Workflow

Source§

fn default() -> Self

Returns the “default value” for a type. Read more
Source§

impl<'de> Deserialize<'de> for Workflow

Source§

fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>
where __D: Deserializer<'de>,

Deserialize this value from the given Serde deserializer. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> DeserializeOwned for T
where T: for<'de> Deserialize<'de>,

Source§

impl<T> ErasedDestructor for T
where T: 'static,