pub struct Message {
pub id: String,
pub data: Value,
pub payload: Value,
pub metadata: Value,
pub temp_data: Value,
pub audit_trail: Vec<AuditTrail>,
pub errors: Vec<ErrorInfo>,
}
Fields§
§id: String
§data: Value
§payload: Value
§metadata: Value
§temp_data: Value
§audit_trail: Vec<AuditTrail>
§errors: Vec<ErrorInfo>
Errors that occurred during message processing
Implementations§
Source§impl Message
impl Message
Sourcepub fn new(payload: &Value) -> Self
pub fn new(payload: &Value) -> Self
Examples found in repository?
examples/benchmark.rs (line 186)
169async fn run_async_benchmark(
170 engine: &Engine,
171 sample_user_data: &Value,
172 num_iterations: usize,
173) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
174 let mut total_duration = Duration::new(0, 0);
175 let mut min_duration = Duration::new(u64::MAX, 0);
176 let mut max_duration = Duration::new(0, 0);
177 let mut all_durations = Vec::with_capacity(num_iterations);
178 let mut error_count = 0;
179
180 println!(
181 "Starting async benchmark with {} iterations...",
182 num_iterations
183 );
184
185 for i in 0..num_iterations {
186 let mut message = Message::new(&json!({}));
187 message.temp_data = sample_user_data.clone();
188 message.data = json!({});
189 message.metadata = json!({
190 "timestamp": chrono::Utc::now().to_rfc3339(),
191 "iteration": i
192 });
193
194 let start = Instant::now();
195 match engine.process_message(&mut message).await {
196 Ok(_) => {
197 let duration = start.elapsed();
198 all_durations.push(duration);
199 total_duration += duration;
200 min_duration = min_duration.min(duration);
201 max_duration = max_duration.max(duration);
202
203 // Check for processing errors
204 if message.has_errors() {
205 error_count += 1;
206 if error_count <= 5 {
207 // Only print first 5 errors
208 println!("Processing errors in iteration {}: {:?}", i, message.errors);
209 }
210 }
211 }
212 Err(e) => {
213 error_count += 1;
214 if error_count <= 5 {
215 println!("Error in iteration {}: {:?}", i, e);
216 }
217 // Still record the time even for errors
218 let duration = start.elapsed();
219 all_durations.push(duration);
220 total_duration += duration;
221 min_duration = min_duration.min(duration);
222 max_duration = max_duration.max(duration);
223 }
224 }
225
226 if (i + 1) % 1000 == 0 {
227 println!("Completed {} async iterations", i + 1);
228 }
229 }
230
231 if error_count > 0 {
232 println!("Total errors encountered: {}", error_count);
233 }
234
235 // Sort durations for percentile calculations
236 all_durations.sort();
237
238 let p95_idx = (num_iterations as f64 * 0.95) as usize;
239 let p99_idx = (num_iterations as f64 * 0.99) as usize;
240 let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
241 let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
242 let avg_duration = total_duration / num_iterations as u32;
243
244 println!("\nAsync Benchmark Results (v{}):", VERSION);
245 println!(" Iterations: {}", num_iterations);
246 println!(" Errors: {}", error_count);
247 println!(" Min time: {:?}", min_duration);
248 println!(" Max time: {:?}", max_duration);
249 println!(" Avg time: {:?}", avg_duration);
250 println!(" 95th percentile: {:?}", p95);
251 println!(" 99th percentile: {:?}", p99);
252 println!(" Total time: {:?}", total_duration);
253
254 Ok(BenchmarkResults {
255 iterations: num_iterations,
256 min_time: min_duration,
257 max_time: max_duration,
258 avg_time: avg_duration,
259 p95: *p95,
260 p99: *p99,
261 total_time: total_duration,
262 })
263}
264
265async fn run_sync_benchmark(
266 engine: &Engine,
267 sample_user_data: &Value,
268 num_iterations: usize,
269) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
270 let mut total_duration = Duration::new(0, 0);
271 let mut min_duration = Duration::new(u64::MAX, 0);
272 let mut max_duration = Duration::new(0, 0);
273 let mut all_durations = Vec::with_capacity(num_iterations);
274 let mut error_count = 0;
275
276 println!(
277 "Starting sync-style benchmark with {} iterations...",
278 num_iterations
279 );
280
281 for i in 0..num_iterations {
282 let mut message = Message::new(&json!({}));
283 message.temp_data = sample_user_data.clone();
284 message.data = json!({});
285 message.metadata = json!({
286 "timestamp": chrono::Utc::now().to_rfc3339(),
287 "iteration": i
288 });
289
290 let start = Instant::now();
291 // Use tokio::task::block_in_place to simulate sync behavior
292 let result = tokio::task::block_in_place(|| {
293 tokio::runtime::Handle::current().block_on(engine.process_message(&mut message))
294 });
295
296 match result {
297 Ok(_) => {
298 let duration = start.elapsed();
299 all_durations.push(duration);
300 total_duration += duration;
301 min_duration = min_duration.min(duration);
302 max_duration = max_duration.max(duration);
303
304 if message.has_errors() {
305 error_count += 1;
306 }
307 }
308 Err(e) => {
309 error_count += 1;
310 if error_count <= 5 {
311 println!("Sync error in iteration {}: {:?}", i, e);
312 }
313 let duration = start.elapsed();
314 all_durations.push(duration);
315 total_duration += duration;
316 min_duration = min_duration.min(duration);
317 max_duration = max_duration.max(duration);
318 }
319 }
320
321 if (i + 1) % 1000 == 0 {
322 println!("Completed {} sync iterations", i + 1);
323 }
324 }
325
326 if error_count > 0 {
327 println!("Total sync errors encountered: {}", error_count);
328 }
329
330 all_durations.sort();
331
332 let p95_idx = (num_iterations as f64 * 0.95) as usize;
333 let p99_idx = (num_iterations as f64 * 0.99) as usize;
334 let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
335 let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
336 let avg_duration = total_duration / num_iterations as u32;
337
338 println!("\nSync Benchmark Results (v{}):", VERSION);
339 println!(" Iterations: {}", num_iterations);
340 println!(" Errors: {}", error_count);
341 println!(" Min time: {:?}", min_duration);
342 println!(" Max time: {:?}", max_duration);
343 println!(" Avg time: {:?}", avg_duration);
344 println!(" 95th percentile: {:?}", p95);
345 println!(" 99th percentile: {:?}", p99);
346 println!(" Total time: {:?}", total_duration);
347
348 Ok(BenchmarkResults {
349 iterations: num_iterations,
350 min_time: min_duration,
351 max_time: max_duration,
352 avg_time: avg_duration,
353 p95: *p95,
354 p99: *p99,
355 total_time: total_duration,
356 })
357}
More examples
examples/complete_workflow.rs (line 136)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6 // Create the workflow engine (built-in functions are auto-registered)
7 let mut engine = Engine::new();
8
9 // Define a workflow that:
10 // 1. Fetches data from a public API
11 // 2. Enriches the message with transformed data
12 // 3. Validates the enriched data
13 let workflow_json = r#"
14 {
15 "id": "complete_workflow",
16 "name": "Complete Workflow Example",
17 "priority": 0,
18 "description": "Demonstrates fetch -> enrich -> validate flow",
19 "condition": { "==": [true, true] },
20 "tasks": [
21 {
22 "id": "fetch_user_data",
23 "name": "Fetch User Data",
24 "description": "Get user data from a public API",
25 "function": {
26 "name": "http",
27 "input": {
28 "url": "https://jsonplaceholder.typicode.com/users/1",
29 "method": "GET",
30 "headers": {
31 "Accept": "application/json"
32 }
33 }
34 }
35 },
36 {
37 "id": "initialize_user",
38 "name": "Initialize User Structure",
39 "description": "Create empty user object in data",
40 "function": {
41 "name": "map",
42 "input": {
43 "mappings": [
44 {
45 "path": "data",
46 "logic": { "preserve": {"user": {}} }
47 }
48 ]
49 }
50 }
51 },
52 {
53 "id": "transform_data",
54 "name": "Transform Data",
55 "description": "Map API response to our data model",
56 "function": {
57 "name": "map",
58 "input": {
59 "mappings": [
60 {
61 "path": "data.user.id",
62 "logic": { "var": "temp_data.body.id" }
63 },
64 {
65 "path": "data.user.name",
66 "logic": { "var": "temp_data.body.name" }
67 },
68 {
69 "path": "data.user.email",
70 "logic": { "var": "temp_data.body.email" }
71 },
72 {
73 "path": "data.user.address",
74 "logic": {
75 "cat": [
76 { "var": "temp_data.body.address.street" },
77 ", ",
78 { "var": "temp_data.body.address.city" }
79 ]
80 }
81 },
82 {
83 "path": "data.user.company",
84 "logic": { "var": "temp_data.body.company.name" }
85 }
86 ]
87 }
88 }
89 },
90 {
91 "id": "validate_user_data",
92 "name": "Validate User Data",
93 "description": "Ensure the user data meets our requirements",
94 "function": {
95 "name": "validate",
96 "input": {
97 "rules": [
98 {
99 "path": "data",
100 "logic": { "!!": { "var": "data.user.id" } },
101 "message": "User ID is required"
102 },
103 {
104 "path": "data",
105 "logic": { "!!": { "var": "data.user.name" } },
106 "message": "User name is required"
107 },
108 {
109 "path": "data",
110 "logic": { "!!": { "var": "data.user.email" } },
111 "message": "User email is required"
112 },
113 {
114 "path": "data",
115 "logic": {
116 "in": [
117 "@",
118 { "var": "data.user.email" }
119 ]
120 },
121 "message": "Email must be valid format"
122 }
123 ]
124 }
125 }
126 }
127 ]
128 }
129 "#;
130
131 // Parse and add the workflow to the engine
132 let workflow = Workflow::from_json(workflow_json)?;
133 engine.add_workflow(&workflow);
134
135 // Create a message to process with properly initialized data structure
136 let mut message = Message::new(&json!({}));
137
138 // Process the message through the workflow asynchronously
139 println!("Processing message through workflow...");
140
141 match engine.process_message(&mut message).await {
142 Ok(_) => {
143 println!("Workflow completed successfully!");
144 }
145 Err(e) => {
146 eprintln!("Error executing workflow: {:?}", e);
147 if !message.errors.is_empty() {
148 println!("\nErrors recorded in message:");
149 for err in &message.errors {
150 println!(
151 "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152 err.workflow_id, err.task_id, err.error_message
153 );
154 }
155 }
156 }
157 }
158
159 println!(
160 "\nFull message structure:\n{}",
161 serde_json::to_string_pretty(&message)?
162 );
163
164 Ok(())
165}
examples/custom_function.rs (line 399)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308 println!("=== Custom Function Example ===\n");
309
310 // Create engine without built-in functions to demonstrate custom ones
311 let mut engine = Engine::new_empty();
312
313 // Register our custom functions
314 engine.register_task_function(
315 "statistics".to_string(),
316 Box::new(StatisticsFunction::new()),
317 );
318
319 engine.register_task_function(
320 "enrich_data".to_string(),
321 Box::new(DataEnrichmentFunction::new()),
322 );
323
324 // Also register built-in map function for data preparation
325 engine.register_task_function(
326 "map".to_string(),
327 Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328 );
329
330 // Define a workflow that uses our custom functions
331 let workflow_json = r#"
332 {
333 "id": "custom_function_demo",
334 "name": "Custom Function Demo",
335 "description": "Demonstrates custom async functions in workflow",
336 "condition": { "==": [true, true] },
337 "tasks": [
338 {
339 "id": "prepare_data",
340 "name": "Prepare Data",
341 "description": "Extract and prepare data for analysis",
342 "function": {
343 "name": "map",
344 "input": {
345 "mappings": [
346 {
347 "path": "data.numbers",
348 "logic": { "var": "temp_data.measurements" }
349 },
350 {
351 "path": "data.user_id",
352 "logic": { "var": "temp_data.user_id" }
353 }
354 ]
355 }
356 }
357 },
358 {
359 "id": "calculate_stats",
360 "name": "Calculate Statistics",
361 "description": "Calculate statistical measures from numeric data",
362 "function": {
363 "name": "statistics",
364 "input": {
365 "data_path": "data.numbers",
366 "output_path": "data.stats"
367 }
368 }
369 },
370 {
371 "id": "enrich_user_data",
372 "name": "Enrich User Data",
373 "description": "Add additional user information",
374 "function": {
375 "name": "enrich_data",
376 "input": {
377 "lookup_field": "user_id",
378 "lookup_value": "user_123",
379 "output_path": "data.user_info"
380 }
381 }
382 }
383 ]
384 }
385 "#;
386
387 // Parse and add the workflow
388 let workflow = Workflow::from_json(workflow_json)?;
389 engine.add_workflow(&workflow);
390
391 // Create sample data
392 let sample_data = json!({
393 "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394 "user_id": "user_123",
395 "timestamp": "2024-01-15T10:30:00Z"
396 });
397
398 // Create and process message
399 let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400 message.temp_data = sample_data;
401 message.data = json!({});
402
403 println!("Processing message with custom functions...\n");
404
405 // Process the message through our custom workflow
406 match engine.process_message(&mut message).await {
407 Ok(_) => {
408 println!("✅ Message processed successfully!\n");
409
410 println!("📊 Final Results:");
411 println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413 println!("📋 Audit Trail:");
414 for (i, audit) in message.audit_trail.iter().enumerate() {
415 println!(
416 "{}. Task: {} (Status: {})",
417 i + 1,
418 audit.task_id,
419 audit.status_code
420 );
421 println!(" Timestamp: {}", audit.timestamp);
422 println!(" Changes: {} field(s) modified", audit.changes.len());
423 }
424
425 if message.has_errors() {
426 println!("\n⚠️ Errors encountered:");
427 for error in &message.errors {
428 println!(
429 " - {}: {:?}",
430 error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431 error.error_message
432 );
433 }
434 }
435 }
436 Err(e) => {
437 println!("❌ Error processing message: {:?}", e);
438 }
439 }
440
441 // Demonstrate another example with different data
442 let separator = "=".repeat(50);
443 println!("\n{}", separator);
444 println!("=== Second Example with Different User ===\n");
445
446 let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447 message2.temp_data = json!({
448 "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449 "user_id": "user_456",
450 "timestamp": "2024-01-15T11:00:00Z"
451 });
452 message2.data = json!({});
453
454 // Create a workflow for the second user
455 let workflow2_json = r#"
456 {
457 "id": "custom_function_demo_2",
458 "name": "Custom Function Demo 2",
459 "description": "Second demo with different user",
460 "condition": { "==": [true, true] },
461 "tasks": [
462 {
463 "id": "prepare_data",
464 "name": "Prepare Data",
465 "function": {
466 "name": "map",
467 "input": {
468 "mappings": [
469 {
470 "path": "data.numbers",
471 "logic": { "var": "temp_data.measurements" }
472 },
473 {
474 "path": "data.user_id",
475 "logic": { "var": "temp_data.user_id" }
476 }
477 ]
478 }
479 }
480 },
481 {
482 "id": "calculate_stats",
483 "name": "Calculate Statistics",
484 "function": {
485 "name": "statistics",
486 "input": {
487 "data_path": "data.numbers",
488 "output_path": "data.analysis"
489 }
490 }
491 },
492 {
493 "id": "enrich_user_data",
494 "name": "Enrich User Data",
495 "function": {
496 "name": "enrich_data",
497 "input": {
498 "lookup_field": "user_id",
499 "lookup_value": "user_456",
500 "output_path": "data.employee_details"
501 }
502 }
503 }
504 ]
505 }
506 "#;
507
508 let workflow2 = Workflow::from_json(workflow2_json)?;
509 engine.add_workflow(&workflow2);
510
511 match engine.process_message(&mut message2).await {
512 Ok(_) => {
513 println!("✅ Second message processed successfully!\n");
514 println!("📊 Results for user_456:");
515 println!("{}", serde_json::to_string_pretty(&message2.data)?);
516 }
517 Err(e) => {
518 println!("❌ Error processing second message: {:?}", e);
519 }
520 }
521
522 println!("\n🎉 Custom function examples completed!");
523
524 Ok(())
525}
Sourcepub fn has_errors(&self) -> bool
pub fn has_errors(&self) -> bool
Check if message has errors
Examples found in repository?
examples/benchmark.rs (line 204)
169async fn run_async_benchmark(
170 engine: &Engine,
171 sample_user_data: &Value,
172 num_iterations: usize,
173) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
174 let mut total_duration = Duration::new(0, 0);
175 let mut min_duration = Duration::new(u64::MAX, 0);
176 let mut max_duration = Duration::new(0, 0);
177 let mut all_durations = Vec::with_capacity(num_iterations);
178 let mut error_count = 0;
179
180 println!(
181 "Starting async benchmark with {} iterations...",
182 num_iterations
183 );
184
185 for i in 0..num_iterations {
186 let mut message = Message::new(&json!({}));
187 message.temp_data = sample_user_data.clone();
188 message.data = json!({});
189 message.metadata = json!({
190 "timestamp": chrono::Utc::now().to_rfc3339(),
191 "iteration": i
192 });
193
194 let start = Instant::now();
195 match engine.process_message(&mut message).await {
196 Ok(_) => {
197 let duration = start.elapsed();
198 all_durations.push(duration);
199 total_duration += duration;
200 min_duration = min_duration.min(duration);
201 max_duration = max_duration.max(duration);
202
203 // Check for processing errors
204 if message.has_errors() {
205 error_count += 1;
206 if error_count <= 5 {
207 // Only print first 5 errors
208 println!("Processing errors in iteration {}: {:?}", i, message.errors);
209 }
210 }
211 }
212 Err(e) => {
213 error_count += 1;
214 if error_count <= 5 {
215 println!("Error in iteration {}: {:?}", i, e);
216 }
217 // Still record the time even for errors
218 let duration = start.elapsed();
219 all_durations.push(duration);
220 total_duration += duration;
221 min_duration = min_duration.min(duration);
222 max_duration = max_duration.max(duration);
223 }
224 }
225
226 if (i + 1) % 1000 == 0 {
227 println!("Completed {} async iterations", i + 1);
228 }
229 }
230
231 if error_count > 0 {
232 println!("Total errors encountered: {}", error_count);
233 }
234
235 // Sort durations for percentile calculations
236 all_durations.sort();
237
238 let p95_idx = (num_iterations as f64 * 0.95) as usize;
239 let p99_idx = (num_iterations as f64 * 0.99) as usize;
240 let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
241 let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
242 let avg_duration = total_duration / num_iterations as u32;
243
244 println!("\nAsync Benchmark Results (v{}):", VERSION);
245 println!(" Iterations: {}", num_iterations);
246 println!(" Errors: {}", error_count);
247 println!(" Min time: {:?}", min_duration);
248 println!(" Max time: {:?}", max_duration);
249 println!(" Avg time: {:?}", avg_duration);
250 println!(" 95th percentile: {:?}", p95);
251 println!(" 99th percentile: {:?}", p99);
252 println!(" Total time: {:?}", total_duration);
253
254 Ok(BenchmarkResults {
255 iterations: num_iterations,
256 min_time: min_duration,
257 max_time: max_duration,
258 avg_time: avg_duration,
259 p95: *p95,
260 p99: *p99,
261 total_time: total_duration,
262 })
263}
264
265async fn run_sync_benchmark(
266 engine: &Engine,
267 sample_user_data: &Value,
268 num_iterations: usize,
269) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
270 let mut total_duration = Duration::new(0, 0);
271 let mut min_duration = Duration::new(u64::MAX, 0);
272 let mut max_duration = Duration::new(0, 0);
273 let mut all_durations = Vec::with_capacity(num_iterations);
274 let mut error_count = 0;
275
276 println!(
277 "Starting sync-style benchmark with {} iterations...",
278 num_iterations
279 );
280
281 for i in 0..num_iterations {
282 let mut message = Message::new(&json!({}));
283 message.temp_data = sample_user_data.clone();
284 message.data = json!({});
285 message.metadata = json!({
286 "timestamp": chrono::Utc::now().to_rfc3339(),
287 "iteration": i
288 });
289
290 let start = Instant::now();
291 // Use tokio::task::block_in_place to simulate sync behavior
292 let result = tokio::task::block_in_place(|| {
293 tokio::runtime::Handle::current().block_on(engine.process_message(&mut message))
294 });
295
296 match result {
297 Ok(_) => {
298 let duration = start.elapsed();
299 all_durations.push(duration);
300 total_duration += duration;
301 min_duration = min_duration.min(duration);
302 max_duration = max_duration.max(duration);
303
304 if message.has_errors() {
305 error_count += 1;
306 }
307 }
308 Err(e) => {
309 error_count += 1;
310 if error_count <= 5 {
311 println!("Sync error in iteration {}: {:?}", i, e);
312 }
313 let duration = start.elapsed();
314 all_durations.push(duration);
315 total_duration += duration;
316 min_duration = min_duration.min(duration);
317 max_duration = max_duration.max(duration);
318 }
319 }
320
321 if (i + 1) % 1000 == 0 {
322 println!("Completed {} sync iterations", i + 1);
323 }
324 }
325
326 if error_count > 0 {
327 println!("Total sync errors encountered: {}", error_count);
328 }
329
330 all_durations.sort();
331
332 let p95_idx = (num_iterations as f64 * 0.95) as usize;
333 let p99_idx = (num_iterations as f64 * 0.99) as usize;
334 let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
335 let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
336 let avg_duration = total_duration / num_iterations as u32;
337
338 println!("\nSync Benchmark Results (v{}):", VERSION);
339 println!(" Iterations: {}", num_iterations);
340 println!(" Errors: {}", error_count);
341 println!(" Min time: {:?}", min_duration);
342 println!(" Max time: {:?}", max_duration);
343 println!(" Avg time: {:?}", avg_duration);
344 println!(" 95th percentile: {:?}", p95);
345 println!(" 99th percentile: {:?}", p99);
346 println!(" Total time: {:?}", total_duration);
347
348 Ok(BenchmarkResults {
349 iterations: num_iterations,
350 min_time: min_duration,
351 max_time: max_duration,
352 avg_time: avg_duration,
353 p95: *p95,
354 p99: *p99,
355 total_time: total_duration,
356 })
357}
More examples
examples/custom_function.rs (line 425)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308 println!("=== Custom Function Example ===\n");
309
310 // Create engine without built-in functions to demonstrate custom ones
311 let mut engine = Engine::new_empty();
312
313 // Register our custom functions
314 engine.register_task_function(
315 "statistics".to_string(),
316 Box::new(StatisticsFunction::new()),
317 );
318
319 engine.register_task_function(
320 "enrich_data".to_string(),
321 Box::new(DataEnrichmentFunction::new()),
322 );
323
324 // Also register built-in map function for data preparation
325 engine.register_task_function(
326 "map".to_string(),
327 Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328 );
329
330 // Define a workflow that uses our custom functions
331 let workflow_json = r#"
332 {
333 "id": "custom_function_demo",
334 "name": "Custom Function Demo",
335 "description": "Demonstrates custom async functions in workflow",
336 "condition": { "==": [true, true] },
337 "tasks": [
338 {
339 "id": "prepare_data",
340 "name": "Prepare Data",
341 "description": "Extract and prepare data for analysis",
342 "function": {
343 "name": "map",
344 "input": {
345 "mappings": [
346 {
347 "path": "data.numbers",
348 "logic": { "var": "temp_data.measurements" }
349 },
350 {
351 "path": "data.user_id",
352 "logic": { "var": "temp_data.user_id" }
353 }
354 ]
355 }
356 }
357 },
358 {
359 "id": "calculate_stats",
360 "name": "Calculate Statistics",
361 "description": "Calculate statistical measures from numeric data",
362 "function": {
363 "name": "statistics",
364 "input": {
365 "data_path": "data.numbers",
366 "output_path": "data.stats"
367 }
368 }
369 },
370 {
371 "id": "enrich_user_data",
372 "name": "Enrich User Data",
373 "description": "Add additional user information",
374 "function": {
375 "name": "enrich_data",
376 "input": {
377 "lookup_field": "user_id",
378 "lookup_value": "user_123",
379 "output_path": "data.user_info"
380 }
381 }
382 }
383 ]
384 }
385 "#;
386
387 // Parse and add the workflow
388 let workflow = Workflow::from_json(workflow_json)?;
389 engine.add_workflow(&workflow);
390
391 // Create sample data
392 let sample_data = json!({
393 "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394 "user_id": "user_123",
395 "timestamp": "2024-01-15T10:30:00Z"
396 });
397
398 // Create and process message
399 let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400 message.temp_data = sample_data;
401 message.data = json!({});
402
403 println!("Processing message with custom functions...\n");
404
405 // Process the message through our custom workflow
406 match engine.process_message(&mut message).await {
407 Ok(_) => {
408 println!("✅ Message processed successfully!\n");
409
410 println!("📊 Final Results:");
411 println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413 println!("📋 Audit Trail:");
414 for (i, audit) in message.audit_trail.iter().enumerate() {
415 println!(
416 "{}. Task: {} (Status: {})",
417 i + 1,
418 audit.task_id,
419 audit.status_code
420 );
421 println!(" Timestamp: {}", audit.timestamp);
422 println!(" Changes: {} field(s) modified", audit.changes.len());
423 }
424
425 if message.has_errors() {
426 println!("\n⚠️ Errors encountered:");
427 for error in &message.errors {
428 println!(
429 " - {}: {:?}",
430 error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431 error.error_message
432 );
433 }
434 }
435 }
436 Err(e) => {
437 println!("❌ Error processing message: {:?}", e);
438 }
439 }
440
441 // Demonstrate another example with different data
442 let separator = "=".repeat(50);
443 println!("\n{}", separator);
444 println!("=== Second Example with Different User ===\n");
445
446 let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447 message2.temp_data = json!({
448 "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449 "user_id": "user_456",
450 "timestamp": "2024-01-15T11:00:00Z"
451 });
452 message2.data = json!({});
453
454 // Create a workflow for the second user
455 let workflow2_json = r#"
456 {
457 "id": "custom_function_demo_2",
458 "name": "Custom Function Demo 2",
459 "description": "Second demo with different user",
460 "condition": { "==": [true, true] },
461 "tasks": [
462 {
463 "id": "prepare_data",
464 "name": "Prepare Data",
465 "function": {
466 "name": "map",
467 "input": {
468 "mappings": [
469 {
470 "path": "data.numbers",
471 "logic": { "var": "temp_data.measurements" }
472 },
473 {
474 "path": "data.user_id",
475 "logic": { "var": "temp_data.user_id" }
476 }
477 ]
478 }
479 }
480 },
481 {
482 "id": "calculate_stats",
483 "name": "Calculate Statistics",
484 "function": {
485 "name": "statistics",
486 "input": {
487 "data_path": "data.numbers",
488 "output_path": "data.analysis"
489 }
490 }
491 },
492 {
493 "id": "enrich_user_data",
494 "name": "Enrich User Data",
495 "function": {
496 "name": "enrich_data",
497 "input": {
498 "lookup_field": "user_id",
499 "lookup_value": "user_456",
500 "output_path": "data.employee_details"
501 }
502 }
503 }
504 ]
505 }
506 "#;
507
508 let workflow2 = Workflow::from_json(workflow2_json)?;
509 engine.add_workflow(&workflow2);
510
511 match engine.process_message(&mut message2).await {
512 Ok(_) => {
513 println!("✅ Second message processed successfully!\n");
514 println!("📊 Results for user_456:");
515 println!("{}", serde_json::to_string_pretty(&message2.data)?);
516 }
517 Err(e) => {
518 println!("❌ Error processing second message: {:?}", e);
519 }
520 }
521
522 println!("\n🎉 Custom function examples completed!");
523
524 Ok(())
525}
Trait Implementations§
Source§impl<'de> Deserialize<'de> for Message
impl<'de> Deserialize<'de> for Message
Source§fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
Deserialize this value from the given Serde deserializer. Read more
Auto Trait Implementations§
impl Freeze for Message
impl RefUnwindSafe for Message
impl Send for Message
impl Sync for Message
impl Unpin for Message
impl UnwindSafe for Message
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more