pub struct MapFunction {}
Expand description
A mapping function that transforms data using JSONLogic expressions.
This function allows mapping data from one location to another within a message, applying transformations using JSONLogic expressions.
Implementations§
Source§impl MapFunction
impl MapFunction
Sourcepub fn new() -> Self
pub fn new() -> Self
Create a new MapFunction
Examples found in repository?
examples/custom_function.rs (line 327)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308 println!("=== Custom Function Example ===\n");
309
310 // Create engine without built-in functions to demonstrate custom ones
311 let mut engine = Engine::new_empty();
312
313 // Register our custom functions
314 engine.register_task_function(
315 "statistics".to_string(),
316 Box::new(StatisticsFunction::new()),
317 );
318
319 engine.register_task_function(
320 "enrich_data".to_string(),
321 Box::new(DataEnrichmentFunction::new()),
322 );
323
324 // Also register built-in map function for data preparation
325 engine.register_task_function(
326 "map".to_string(),
327 Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328 );
329
330 // Define a workflow that uses our custom functions
331 let workflow_json = r#"
332 {
333 "id": "custom_function_demo",
334 "name": "Custom Function Demo",
335 "description": "Demonstrates custom async functions in workflow",
336 "condition": { "==": [true, true] },
337 "tasks": [
338 {
339 "id": "prepare_data",
340 "name": "Prepare Data",
341 "description": "Extract and prepare data for analysis",
342 "function": {
343 "name": "map",
344 "input": {
345 "mappings": [
346 {
347 "path": "data.numbers",
348 "logic": { "var": "temp_data.measurements" }
349 },
350 {
351 "path": "data.user_id",
352 "logic": { "var": "temp_data.user_id" }
353 }
354 ]
355 }
356 }
357 },
358 {
359 "id": "calculate_stats",
360 "name": "Calculate Statistics",
361 "description": "Calculate statistical measures from numeric data",
362 "function": {
363 "name": "statistics",
364 "input": {
365 "data_path": "data.numbers",
366 "output_path": "data.stats"
367 }
368 }
369 },
370 {
371 "id": "enrich_user_data",
372 "name": "Enrich User Data",
373 "description": "Add additional user information",
374 "function": {
375 "name": "enrich_data",
376 "input": {
377 "lookup_field": "user_id",
378 "lookup_value": "user_123",
379 "output_path": "data.user_info"
380 }
381 }
382 }
383 ]
384 }
385 "#;
386
387 // Parse and add the workflow
388 let workflow = Workflow::from_json(workflow_json)?;
389 engine.add_workflow(&workflow);
390
391 // Create sample data
392 let sample_data = json!({
393 "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394 "user_id": "user_123",
395 "timestamp": "2024-01-15T10:30:00Z"
396 });
397
398 // Create and process message
399 let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400 message.temp_data = sample_data;
401 message.data = json!({});
402
403 println!("Processing message with custom functions...\n");
404
405 // Process the message through our custom workflow
406 match engine.process_message(&mut message).await {
407 Ok(_) => {
408 println!("✅ Message processed successfully!\n");
409
410 println!("📊 Final Results:");
411 println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413 println!("📋 Audit Trail:");
414 for (i, audit) in message.audit_trail.iter().enumerate() {
415 println!(
416 "{}. Task: {} (Status: {})",
417 i + 1,
418 audit.task_id,
419 audit.status_code
420 );
421 println!(" Timestamp: {}", audit.timestamp);
422 println!(" Changes: {} field(s) modified", audit.changes.len());
423 }
424
425 if message.has_errors() {
426 println!("\n⚠️ Errors encountered:");
427 for error in &message.errors {
428 println!(
429 " - {}: {:?}",
430 error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431 error.error_message
432 );
433 }
434 }
435 }
436 Err(e) => {
437 println!("❌ Error processing message: {e:?}");
438 }
439 }
440
441 // Demonstrate another example with different data
442 let separator = "=".repeat(50);
443 println!("\n{separator}");
444 println!("=== Second Example with Different User ===\n");
445
446 let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447 message2.temp_data = json!({
448 "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449 "user_id": "user_456",
450 "timestamp": "2024-01-15T11:00:00Z"
451 });
452 message2.data = json!({});
453
454 // Create a workflow for the second user
455 let workflow2_json = r#"
456 {
457 "id": "custom_function_demo_2",
458 "name": "Custom Function Demo 2",
459 "description": "Second demo with different user",
460 "condition": { "==": [true, true] },
461 "tasks": [
462 {
463 "id": "prepare_data",
464 "name": "Prepare Data",
465 "function": {
466 "name": "map",
467 "input": {
468 "mappings": [
469 {
470 "path": "data.numbers",
471 "logic": { "var": "temp_data.measurements" }
472 },
473 {
474 "path": "data.user_id",
475 "logic": { "var": "temp_data.user_id" }
476 }
477 ]
478 }
479 }
480 },
481 {
482 "id": "calculate_stats",
483 "name": "Calculate Statistics",
484 "function": {
485 "name": "statistics",
486 "input": {
487 "data_path": "data.numbers",
488 "output_path": "data.analysis"
489 }
490 }
491 },
492 {
493 "id": "enrich_user_data",
494 "name": "Enrich User Data",
495 "function": {
496 "name": "enrich_data",
497 "input": {
498 "lookup_field": "user_id",
499 "lookup_value": "user_456",
500 "output_path": "data.employee_details"
501 }
502 }
503 }
504 ]
505 }
506 "#;
507
508 let workflow2 = Workflow::from_json(workflow2_json)?;
509 engine.add_workflow(&workflow2);
510
511 match engine.process_message(&mut message2).await {
512 Ok(_) => {
513 println!("✅ Second message processed successfully!\n");
514 println!("📊 Results for user_456:");
515 println!("{}", serde_json::to_string_pretty(&message2.data)?);
516 }
517 Err(e) => {
518 println!("❌ Error processing second message: {e:?}");
519 }
520 }
521
522 println!("\n🎉 Custom function examples completed!");
523
524 Ok(())
525}
Trait Implementations§
Source§impl AsyncFunctionHandler for MapFunction
impl AsyncFunctionHandler for MapFunction
Source§fn execute<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
message: &'life1 mut Message,
input: &'life2 Value,
) -> Pin<Box<dyn Future<Output = Result<(usize, Vec<Change>)>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn execute<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
message: &'life1 mut Message,
input: &'life2 Value,
) -> Pin<Box<dyn Future<Output = Result<(usize, Vec<Change>)>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Execute the function asynchronously on a message with input parameters Read more
Auto Trait Implementations§
impl Freeze for MapFunction
impl RefUnwindSafe for MapFunction
impl Send for MapFunction
impl Sync for MapFunction
impl Unpin for MapFunction
impl UnwindSafe for MapFunction
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more