dataflow_rs/engine/functions/
mod.rs

1use crate::engine::error::Result;
2use crate::engine::message::{Change, Message};
3use async_trait::async_trait;
4use serde_json::Value;
5use std::cell::RefCell;
6
7pub mod validation;
8pub use validation::ValidationFunction;
9
10pub mod http;
11pub use http::*;
12
13pub mod map;
14pub use map::MapFunction;
15
16// Thread-local DataLogic instance to avoid mutex contention
17thread_local! {
18    pub static FUNCTION_DATA_LOGIC: RefCell<datalogic_rs::DataLogic> = RefCell::new(
19        datalogic_rs::DataLogic::with_preserve_structure()
20    );
21}
22
23// Re-export all built-in functions for easier access
24pub mod builtins {
25    use super::*;
26
27    // Standard function names used for registering built-ins
28    pub const VALIDATION_FUNCTION: &str = "validate";
29    pub const MAP_FUNCTION: &str = "map";
30    pub const HTTP_FUNCTION: &str = "http";
31
32    // Get all built-in functions with their standard names
33    pub fn get_all_functions() -> Vec<(String, Box<dyn AsyncFunctionHandler + Send + Sync>)> {
34        vec![
35            // Create validation function with thread-local DataLogic
36            (
37                VALIDATION_FUNCTION.to_string(),
38                Box::new(ValidationFunction::new()),
39            ),
40            // Create map function with thread-local DataLogic
41            (MAP_FUNCTION.to_string(), Box::new(MapFunction::new())),
42            // Create HTTP function with 30-second timeout
43            (HTTP_FUNCTION.to_string(), Box::new(HttpFunction::new(30))),
44        ]
45    }
46
47    // Get all built-in async functions with their standard names
48    // This is the same as get_all_functions but separated to maintain
49    // API compatibility for AsyncEngine
50    pub fn get_all_async_functions() -> Vec<(String, Box<dyn AsyncFunctionHandler + Send + Sync>)> {
51        get_all_functions()
52    }
53}
54
55/// Async interface for task functions that operate on messages
56///
57/// This trait defines how async task functions process a message with given
58/// input parameters. It is particularly useful for IO-bound operations
59/// like HTTP requests, file operations, and database queries.
60#[async_trait]
61pub trait AsyncFunctionHandler: Send + Sync {
62    /// Execute the function asynchronously on a message with input parameters
63    ///
64    /// # Arguments
65    ///
66    /// * `message` - The message to process
67    /// * `input` - Function input parameters
68    ///
69    /// # Returns
70    ///
71    /// * `Result<(usize, Vec<Change>)>` - Result containing status code and changes, or error
72    async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)>;
73}