Struct Workflow

Source
pub struct Workflow {
    pub id: String,
    pub name: String,
    pub priority: u32,
    pub description: Option<String>,
    pub condition: Option<Value>,
    pub tasks: Vec<Task>,
}

Fields§

§id: String§name: String§priority: u32§description: Option<String>§condition: Option<Value>§tasks: Vec<Task>

Implementations§

Source§

impl Workflow

Source

pub fn new() -> Self

Source

pub fn from_json(json_str: &str) -> Result<Self>

Load workflow from JSON string

Examples found in repository?
examples/benchmark.rs (line 74)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    // Create the workflow engine (built-in functions are auto-registered)
14    let mut engine = Engine::new();
15
16    // Define a workflow that:
17    // 1. Uses pre-loaded data instead of HTTP fetch
18    // 2. Enriches the message with transformed data
19    // 3. Demonstrates proper async workflow execution
20    let workflow_json = r#"
21    {
22        "id": "benchmark_workflow",
23        "name": "Benchmark Workflow Example",
24        "description": "Demonstrates async workflow execution with data transformation",
25        "condition": { "==": [true, true] },
26        "tasks": [
27            {
28                "id": "transform_data",
29                "name": "Transform Data",
30                "description": "Map API response to our data model",
31                "function": {
32                    "name": "map",
33                    "input": {
34                        "mappings": [
35                            {
36                                "path": "data.user.id", 
37                                "logic": { "var": "temp_data.body.id" }
38                            },
39                            {
40                                "path": "data.user.name", 
41                                "logic": { "var": "temp_data.body.name" }
42                            },
43                            {
44                                "path": "data.user.email", 
45                                "logic": { "var": "temp_data.body.email" }
46                            },
47                            {
48                                "path": "data.user.address", 
49                                "logic": {
50                                    "cat": [
51                                        { "var": "temp_data.body.address.street" },
52                                        ", ",
53                                        { "var": "temp_data.body.address.city" }
54                                    ]
55                                }
56                            },
57                            {
58                                "path": "data.user.company", 
59                                "logic": { "var": "temp_data.body.company.name" }
60                            },
61                            {
62                                "path": "data.processed_at", 
63                                "logic": { "cat": ["Processed at ", { "var": "metadata.timestamp" }] }
64                            }
65                        ]
66                    }
67                }
68            }
69        ]
70    }
71    "#;
72
73    // Parse and add the workflow to the engine
74    let workflow = Workflow::from_json(workflow_json)?;
75    engine.add_workflow(&workflow);
76
77    // Create sample user data (similar to what the HTTP endpoint would return)
78    let sample_user_data = json!({
79        "body": {
80            "id": 1,
81            "name": "Leanne Graham",
82            "username": "Bret",
83            "email": "Sincere@april.biz",
84            "address": {
85                "street": "Kulas Light",
86                "suite": "Apt. 556",
87                "city": "Gwenborough",
88                "zipcode": "92998-3874",
89                "geo": {
90                    "lat": "-37.3159",
91                    "lng": "81.1496"
92                }
93            },
94            "phone": "1-770-736-8031 x56442",
95            "website": "hildegard.org",
96            "company": {
97                "name": "Romaguera-Crona",
98                "catchPhrase": "Multi-layered client-server neural-net",
99                "bs": "harness real-time e-markets"
100            }
101        }
102    });
103
104    // Run async benchmark
105    println!("=== ASYNC BENCHMARK ===");
106    let async_results = run_async_benchmark(&engine, &sample_user_data, 1000).await?;
107
108    // Run sync benchmark for comparison (using blocking approach)
109    println!("\n=== SYNC BENCHMARK (for comparison) ===");
110    let sync_results = run_sync_benchmark(&engine, &sample_user_data, 1000).await?;
111
112    // Compare results
113    println!("\n=== COMPARISON ===");
114    println!("Async avg: {:?}", async_results.avg_time);
115    println!("Sync avg:  {:?}", sync_results.avg_time);
116    println!(
117        "Async is {:.2}x {} than sync",
118        if async_results.avg_time < sync_results.avg_time {
119            sync_results.avg_time.as_nanos() as f64 / async_results.avg_time.as_nanos() as f64
120        } else {
121            async_results.avg_time.as_nanos() as f64 / sync_results.avg_time.as_nanos() as f64
122        },
123        if async_results.avg_time < sync_results.avg_time {
124            "faster"
125        } else {
126            "slower"
127        }
128    );
129
130    // Log results to file
131    log_benchmark_results(
132        async_results.iterations,
133        async_results.min_time,
134        async_results.max_time,
135        async_results.avg_time,
136        async_results.p95,
137        async_results.p99,
138        async_results.total_time,
139        "async".to_string(),
140    )?;
141
142    log_benchmark_results(
143        sync_results.iterations,
144        sync_results.min_time,
145        sync_results.max_time,
146        sync_results.avg_time,
147        sync_results.p95,
148        sync_results.p99,
149        sync_results.total_time,
150        "sync".to_string(),
151    )?;
152
153    println!("\nBenchmark results saved to '{BENCHMARK_LOG_FILE}'");
154
155    Ok(())
156}
More examples
Hide additional examples
examples/complete_workflow.rs (line 132)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6    // Create the workflow engine (built-in functions are auto-registered)
7    let mut engine = Engine::new();
8
9    // Define a workflow that:
10    // 1. Fetches data from a public API
11    // 2. Enriches the message with transformed data
12    // 3. Validates the enriched data
13    let workflow_json = r#"
14    {
15        "id": "complete_workflow",
16        "name": "Complete Workflow Example",
17        "priority": 0,
18        "description": "Demonstrates fetch -> enrich -> validate flow",
19        "condition": { "==": [true, true] },
20        "tasks": [
21            {
22                "id": "fetch_user_data",
23                "name": "Fetch User Data",
24                "description": "Get user data from a public API",
25                "function": {
26                    "name": "http",
27                    "input": {
28                        "url": "https://jsonplaceholder.typicode.com/users/1",
29                        "method": "GET",
30                        "headers": {
31                            "Accept": "application/json"
32                        }
33                    }
34                }
35            },
36            {
37                "id": "initialize_user",
38                "name": "Initialize User Structure",
39                "description": "Create empty user object in data",
40                "function": {
41                    "name": "map",
42                    "input": {
43                        "mappings": [
44                            {
45                                "path": "data",
46                                "logic": { "preserve": {"user": {}} }
47                            }
48                        ]
49                    }
50                }
51            },
52            {
53                "id": "transform_data",
54                "name": "Transform Data",
55                "description": "Map API response to our data model",
56                "function": {
57                    "name": "map",
58                    "input": {
59                        "mappings": [
60                            {
61                                "path": "data.user.id", 
62                                "logic": { "var": "temp_data.body.id" }
63                            },
64                            {
65                                "path": "data.user.name", 
66                                "logic": { "var": "temp_data.body.name" }
67                            },
68                            {
69                                "path": "data.user.email", 
70                                "logic": { "var": "temp_data.body.email" }
71                            },
72                            {
73                                "path": "data.user.address", 
74                                "logic": {
75                                    "cat": [
76                                        { "var": "temp_data.body.address.street" },
77                                        ", ",
78                                        { "var": "temp_data.body.address.city" }
79                                    ]
80                                }
81                            },
82                            {
83                                "path": "data.user.company", 
84                                "logic": { "var": "temp_data.body.company.name" }
85                            }
86                        ]
87                    }
88                }
89            },
90            {
91                "id": "validate_user_data",
92                "name": "Validate User Data",
93                "description": "Ensure the user data meets our requirements",
94                "function": {
95                    "name": "validate",
96                    "input": {
97                        "rules": [
98                            {
99                                "path": "data",
100                                "logic": { "!!": { "var": "data.user.id" } },
101                                "message": "User ID is required"
102                            },
103                            {
104                                "path": "data",
105                                "logic": { "!!": { "var": "data.user.name" } },
106                                "message": "User name is required"
107                            },
108                            {
109                                "path": "data",
110                                "logic": { "!!": { "var": "data.user.email" } },
111                                "message": "User email is required"
112                            },
113                            {
114                                "path": "data",
115                                "logic": {
116                                    "in": [
117                                        "@",
118                                        { "var": "data.user.email" }
119                                    ]
120                                },
121                                "message": "Email must be valid format"
122                            }
123                        ]
124                    }
125                }
126            }
127        ]
128    }
129    "#;
130
131    // Parse and add the workflow to the engine
132    let workflow = Workflow::from_json(workflow_json)?;
133    engine.add_workflow(&workflow);
134
135    // Create a message to process with properly initialized data structure
136    let mut message = Message::new(&json!({}));
137
138    // Process the message through the workflow asynchronously
139    println!("Processing message through workflow...");
140
141    match engine.process_message(&mut message).await {
142        Ok(_) => {
143            println!("Workflow completed successfully!");
144        }
145        Err(e) => {
146            eprintln!("Error executing workflow: {e:?}");
147            if !message.errors.is_empty() {
148                println!("\nErrors recorded in message:");
149                for err in &message.errors {
150                    println!(
151                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152                        err.workflow_id, err.task_id, err.error_message
153                    );
154                }
155            }
156        }
157    }
158
159    println!(
160        "\nFull message structure:\n{}",
161        serde_json::to_string_pretty(&message)?
162    );
163
164    Ok(())
165}
examples/custom_function.rs (line 388)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308    println!("=== Custom Function Example ===\n");
309
310    // Create engine without built-in functions to demonstrate custom ones
311    let mut engine = Engine::new_empty();
312
313    // Register our custom functions
314    engine.register_task_function(
315        "statistics".to_string(),
316        Box::new(StatisticsFunction::new()),
317    );
318
319    engine.register_task_function(
320        "enrich_data".to_string(),
321        Box::new(DataEnrichmentFunction::new()),
322    );
323
324    // Also register built-in map function for data preparation
325    engine.register_task_function(
326        "map".to_string(),
327        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328    );
329
330    // Define a workflow that uses our custom functions
331    let workflow_json = r#"
332    {
333        "id": "custom_function_demo",
334        "name": "Custom Function Demo",
335        "description": "Demonstrates custom async functions in workflow",
336        "condition": { "==": [true, true] },
337        "tasks": [
338            {
339                "id": "prepare_data",
340                "name": "Prepare Data",
341                "description": "Extract and prepare data for analysis",
342                "function": {
343                    "name": "map",
344                    "input": {
345                        "mappings": [
346                            {
347                                "path": "data.numbers",
348                                "logic": { "var": "temp_data.measurements" }
349                            },
350                            {
351                                "path": "data.user_id",
352                                "logic": { "var": "temp_data.user_id" }
353                            }
354                        ]
355                    }
356                }
357            },
358            {
359                "id": "calculate_stats",
360                "name": "Calculate Statistics",
361                "description": "Calculate statistical measures from numeric data",
362                "function": {
363                    "name": "statistics",
364                    "input": {
365                        "data_path": "data.numbers",
366                        "output_path": "data.stats"
367                    }
368                }
369            },
370            {
371                "id": "enrich_user_data",
372                "name": "Enrich User Data",
373                "description": "Add additional user information",
374                "function": {
375                    "name": "enrich_data",
376                    "input": {
377                        "lookup_field": "user_id",
378                        "lookup_value": "user_123",
379                        "output_path": "data.user_info"
380                    }
381                }
382            }
383        ]
384    }
385    "#;
386
387    // Parse and add the workflow
388    let workflow = Workflow::from_json(workflow_json)?;
389    engine.add_workflow(&workflow);
390
391    // Create sample data
392    let sample_data = json!({
393        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394        "user_id": "user_123",
395        "timestamp": "2024-01-15T10:30:00Z"
396    });
397
398    // Create and process message
399    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400    message.temp_data = sample_data;
401    message.data = json!({});
402
403    println!("Processing message with custom functions...\n");
404
405    // Process the message through our custom workflow
406    match engine.process_message(&mut message).await {
407        Ok(_) => {
408            println!("✅ Message processed successfully!\n");
409
410            println!("📊 Final Results:");
411            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413            println!("📋 Audit Trail:");
414            for (i, audit) in message.audit_trail.iter().enumerate() {
415                println!(
416                    "{}. Task: {} (Status: {})",
417                    i + 1,
418                    audit.task_id,
419                    audit.status_code
420                );
421                println!("   Timestamp: {}", audit.timestamp);
422                println!("   Changes: {} field(s) modified", audit.changes.len());
423            }
424
425            if message.has_errors() {
426                println!("\n⚠️  Errors encountered:");
427                for error in &message.errors {
428                    println!(
429                        "   - {}: {:?}",
430                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431                        error.error_message
432                    );
433                }
434            }
435        }
436        Err(e) => {
437            println!("❌ Error processing message: {e:?}");
438        }
439    }
440
441    // Demonstrate another example with different data
442    let separator = "=".repeat(50);
443    println!("\n{separator}");
444    println!("=== Second Example with Different User ===\n");
445
446    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447    message2.temp_data = json!({
448        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449        "user_id": "user_456",
450        "timestamp": "2024-01-15T11:00:00Z"
451    });
452    message2.data = json!({});
453
454    // Create a workflow for the second user
455    let workflow2_json = r#"
456    {
457        "id": "custom_function_demo_2",
458        "name": "Custom Function Demo 2",
459        "description": "Second demo with different user",
460        "condition": { "==": [true, true] },
461        "tasks": [
462            {
463                "id": "prepare_data",
464                "name": "Prepare Data",
465                "function": {
466                    "name": "map",
467                    "input": {
468                        "mappings": [
469                            {
470                                "path": "data.numbers",
471                                "logic": { "var": "temp_data.measurements" }
472                            },
473                            {
474                                "path": "data.user_id",
475                                "logic": { "var": "temp_data.user_id" }
476                            }
477                        ]
478                    }
479                }
480            },
481            {
482                "id": "calculate_stats",
483                "name": "Calculate Statistics",
484                "function": {
485                    "name": "statistics",
486                    "input": {
487                        "data_path": "data.numbers",
488                        "output_path": "data.analysis"
489                    }
490                }
491            },
492            {
493                "id": "enrich_user_data",
494                "name": "Enrich User Data",
495                "function": {
496                    "name": "enrich_data",
497                    "input": {
498                        "lookup_field": "user_id",
499                        "lookup_value": "user_456",
500                        "output_path": "data.employee_details"
501                    }
502                }
503            }
504        ]
505    }
506    "#;
507
508    let workflow2 = Workflow::from_json(workflow2_json)?;
509    engine.add_workflow(&workflow2);
510
511    match engine.process_message(&mut message2).await {
512        Ok(_) => {
513            println!("✅ Second message processed successfully!\n");
514            println!("📊 Results for user_456:");
515            println!("{}", serde_json::to_string_pretty(&message2.data)?);
516        }
517        Err(e) => {
518            println!("❌ Error processing second message: {e:?}");
519        }
520    }
521
522    println!("\n🎉 Custom function examples completed!");
523
524    Ok(())
525}
Source

pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self>

Load workflow from JSON file

Source

pub fn validate(&self) -> Result<()>

Validate the workflow structure

Trait Implementations§

Source§

impl Clone for Workflow

Source§

fn clone(&self) -> Workflow

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for Workflow

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Default for Workflow

Source§

fn default() -> Self

Returns the “default value” for a type. Read more
Source§

impl<'de> Deserialize<'de> for Workflow

Source§

fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>
where __D: Deserializer<'de>,

Deserialize this value from the given Serde deserializer. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> DeserializeOwned for T
where T: for<'de> Deserialize<'de>,

Source§

impl<T> ErasedDestructor for T
where T: 'static,