Struct MapFunction

Source
pub struct MapFunction {}
Expand description

A mapping function that transforms data using JSONLogic expressions.

This function allows mapping data from one location to another within a message, applying transformations using JSONLogic expressions.

Implementations§

Source§

impl MapFunction

Source

pub fn new() -> Self

Create a new MapFunction

Examples found in repository?
examples/custom_function.rs (line 327)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308    println!("=== Custom Function Example ===\n");
309
310    // Create engine without built-in functions to demonstrate custom ones
311    let mut engine = Engine::new_empty();
312
313    // Register our custom functions
314    engine.register_task_function(
315        "statistics".to_string(),
316        Box::new(StatisticsFunction::new()),
317    );
318
319    engine.register_task_function(
320        "enrich_data".to_string(),
321        Box::new(DataEnrichmentFunction::new()),
322    );
323
324    // Also register built-in map function for data preparation
325    engine.register_task_function(
326        "map".to_string(),
327        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328    );
329
330    // Define a workflow that uses our custom functions
331    let workflow_json = r#"
332    {
333        "id": "custom_function_demo",
334        "name": "Custom Function Demo",
335        "description": "Demonstrates custom async functions in workflow",
336        "condition": { "==": [true, true] },
337        "tasks": [
338            {
339                "id": "prepare_data",
340                "name": "Prepare Data",
341                "description": "Extract and prepare data for analysis",
342                "function": {
343                    "name": "map",
344                    "input": {
345                        "mappings": [
346                            {
347                                "path": "data.numbers",
348                                "logic": { "var": "temp_data.measurements" }
349                            },
350                            {
351                                "path": "data.user_id",
352                                "logic": { "var": "temp_data.user_id" }
353                            }
354                        ]
355                    }
356                }
357            },
358            {
359                "id": "calculate_stats",
360                "name": "Calculate Statistics",
361                "description": "Calculate statistical measures from numeric data",
362                "function": {
363                    "name": "statistics",
364                    "input": {
365                        "data_path": "data.numbers",
366                        "output_path": "data.stats"
367                    }
368                }
369            },
370            {
371                "id": "enrich_user_data",
372                "name": "Enrich User Data",
373                "description": "Add additional user information",
374                "function": {
375                    "name": "enrich_data",
376                    "input": {
377                        "lookup_field": "user_id",
378                        "lookup_value": "user_123",
379                        "output_path": "data.user_info"
380                    }
381                }
382            }
383        ]
384    }
385    "#;
386
387    // Parse and add the workflow
388    let workflow = Workflow::from_json(workflow_json)?;
389    engine.add_workflow(&workflow);
390
391    // Create sample data
392    let sample_data = json!({
393        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394        "user_id": "user_123",
395        "timestamp": "2024-01-15T10:30:00Z"
396    });
397
398    // Create and process message
399    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400    message.temp_data = sample_data;
401    message.data = json!({});
402
403    println!("Processing message with custom functions...\n");
404
405    // Process the message through our custom workflow
406    match engine.process_message(&mut message).await {
407        Ok(_) => {
408            println!("✅ Message processed successfully!\n");
409
410            println!("📊 Final Results:");
411            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413            println!("📋 Audit Trail:");
414            for (i, audit) in message.audit_trail.iter().enumerate() {
415                println!(
416                    "{}. Task: {} (Status: {})",
417                    i + 1,
418                    audit.task_id,
419                    audit.status_code
420                );
421                println!("   Timestamp: {}", audit.timestamp);
422                println!("   Changes: {} field(s) modified", audit.changes.len());
423            }
424
425            if message.has_errors() {
426                println!("\n⚠️  Errors encountered:");
427                for error in &message.errors {
428                    println!(
429                        "   - {}: {:?}",
430                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431                        error.error_message
432                    );
433                }
434            }
435        }
436        Err(e) => {
437            println!("❌ Error processing message: {e:?}");
438        }
439    }
440
441    // Demonstrate another example with different data
442    let separator = "=".repeat(50);
443    println!("\n{separator}");
444    println!("=== Second Example with Different User ===\n");
445
446    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447    message2.temp_data = json!({
448        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449        "user_id": "user_456",
450        "timestamp": "2024-01-15T11:00:00Z"
451    });
452    message2.data = json!({});
453
454    // Create a workflow for the second user
455    let workflow2_json = r#"
456    {
457        "id": "custom_function_demo_2",
458        "name": "Custom Function Demo 2",
459        "description": "Second demo with different user",
460        "condition": { "==": [true, true] },
461        "tasks": [
462            {
463                "id": "prepare_data",
464                "name": "Prepare Data",
465                "function": {
466                    "name": "map",
467                    "input": {
468                        "mappings": [
469                            {
470                                "path": "data.numbers",
471                                "logic": { "var": "temp_data.measurements" }
472                            },
473                            {
474                                "path": "data.user_id",
475                                "logic": { "var": "temp_data.user_id" }
476                            }
477                        ]
478                    }
479                }
480            },
481            {
482                "id": "calculate_stats",
483                "name": "Calculate Statistics",
484                "function": {
485                    "name": "statistics",
486                    "input": {
487                        "data_path": "data.numbers",
488                        "output_path": "data.analysis"
489                    }
490                }
491            },
492            {
493                "id": "enrich_user_data",
494                "name": "Enrich User Data",
495                "function": {
496                    "name": "enrich_data",
497                    "input": {
498                        "lookup_field": "user_id",
499                        "lookup_value": "user_456",
500                        "output_path": "data.employee_details"
501                    }
502                }
503            }
504        ]
505    }
506    "#;
507
508    let workflow2 = Workflow::from_json(workflow2_json)?;
509    engine.add_workflow(&workflow2);
510
511    match engine.process_message(&mut message2).await {
512        Ok(_) => {
513            println!("✅ Second message processed successfully!\n");
514            println!("📊 Results for user_456:");
515            println!("{}", serde_json::to_string_pretty(&message2.data)?);
516        }
517        Err(e) => {
518            println!("❌ Error processing second message: {e:?}");
519        }
520    }
521
522    println!("\n🎉 Custom function examples completed!");
523
524    Ok(())
525}

Trait Implementations§

Source§

impl AsyncFunctionHandler for MapFunction

Source§

fn execute<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, message: &'life1 mut Message, input: &'life2 Value, ) -> Pin<Box<dyn Future<Output = Result<(usize, Vec<Change>)>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Execute the function asynchronously on a message with input parameters Read more
Source§

impl Default for MapFunction

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,