dataflow_rs/engine/functions/
map.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::FUNCTION_DATA_LOGIC;
3use crate::engine::message::{Change, Message};
4use crate::engine::AsyncFunctionHandler;
5use async_trait::async_trait;
6use serde_json::{json, Value};
7
8/// A mapping function that transforms data using JSONLogic expressions.
9///
10/// This function allows mapping data from one location to another within
11/// a message, applying transformations using JSONLogic expressions.
12pub struct MapFunction {
13    // No longer needs data_logic field
14}
15
16impl Default for MapFunction {
17    fn default() -> Self {
18        Self::new()
19    }
20}
21
22impl MapFunction {
23    /// Create a new MapFunction
24    pub fn new() -> Self {
25        Self {}
26    }
27
28    /// Set a value at the specified path within the target object
29    fn set_value_at_path(&self, target: &mut Value, path: &str, value: Value) -> Result<Value> {
30        let mut current = target;
31        let mut old_value = Value::Null;
32        let path_parts: Vec<&str> = path.split('.').collect();
33
34        // Navigate to the parent of the target location
35        for (i, part) in path_parts.iter().enumerate() {
36            if i == path_parts.len() - 1 {
37                // We're at the last part, so set the value
38                if current.is_object() {
39                    if let Value::Object(map) = current {
40                        // Save the old value before replacing
41                        old_value = map.get(*part).cloned().unwrap_or(Value::Null);
42                        map.insert(part.to_string(), value.clone());
43                    }
44                } else if current.is_array() {
45                    // Handle array indices with special care
46                    if let Ok(index) = part.parse::<usize>() {
47                        if let Value::Array(arr) = current {
48                            // Extend array if needed
49                            while arr.len() <= index {
50                                arr.push(Value::Null);
51                            }
52                            // Save old value
53                            old_value = arr[index].clone();
54                            arr[index] = value.clone();
55                        }
56                    } else {
57                        return Err(DataflowError::Validation(format!(
58                            "Invalid array index: {}",
59                            part
60                        )));
61                    }
62                } else {
63                    return Err(DataflowError::Validation(format!(
64                        "Cannot set property '{}' on non-object value",
65                        part
66                    )));
67                }
68            } else {
69                // We need to navigate deeper
70                match current {
71                    Value::Object(map) => {
72                        if !map.contains_key(*part) {
73                            map.insert(part.to_string(), json!({}));
74                        }
75                        current = map.get_mut(*part).unwrap();
76                    }
77                    Value::Array(arr) => {
78                        if let Ok(index) = part.parse::<usize>() {
79                            // Extend array if needed
80                            while arr.len() <= index {
81                                arr.push(json!({}));
82                            }
83                            current = &mut arr[index];
84                        } else {
85                            return Err(DataflowError::Validation(format!(
86                                "Invalid array index: {}",
87                                part
88                            )));
89                        }
90                    }
91                    _ => {
92                        return Err(DataflowError::Validation(format!(
93                            "Cannot navigate path '{}' on non-object value",
94                            part
95                        )));
96                    }
97                }
98            }
99        }
100
101        Ok(old_value)
102    }
103}
104
105#[async_trait]
106impl AsyncFunctionHandler for MapFunction {
107    async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
108        // Extract mappings array from input
109        let mappings = input.get("mappings").ok_or_else(|| {
110            DataflowError::Validation("Missing 'mappings' array in input".to_string())
111        })?;
112
113        let mappings_arr = mappings
114            .as_array()
115            .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
116
117        let mut changes = Vec::new();
118
119        // Process each mapping
120        for mapping in mappings_arr {
121            // Get path where to store the result
122            let target_path = mapping.get("path").and_then(Value::as_str).ok_or_else(|| {
123                DataflowError::Validation("Missing 'path' in mapping".to_string())
124            })?;
125
126            // Get the logic to evaluate
127            let logic = mapping.get("logic").ok_or_else(|| {
128                DataflowError::Validation("Missing 'logic' in mapping".to_string())
129            })?;
130
131            // Clone message data for evaluation context
132            let data_clone = message.data.clone();
133            let metadata_clone = message.metadata.clone();
134            let temp_data_clone = message.temp_data.clone();
135
136            // Create a combined data object with message fields for evaluation
137            let data_for_eval = json!({
138                "data": data_clone,
139                "metadata": metadata_clone,
140                "temp_data": temp_data_clone,
141            });
142
143            // Determine target object based on path prefix
144            let (target_object, adjusted_path) =
145                if let Some(path) = target_path.strip_prefix("data.") {
146                    (&mut message.data, path)
147                } else if let Some(path) = target_path.strip_prefix("metadata.") {
148                    (&mut message.metadata, path)
149                } else if let Some(path) = target_path.strip_prefix("temp_data.") {
150                    (&mut message.temp_data, path)
151                } else if target_path == "data" {
152                    (&mut message.data, "")
153                } else if target_path == "metadata" {
154                    (&mut message.metadata, "")
155                } else if target_path == "temp_data" {
156                    (&mut message.temp_data, "")
157                } else {
158                    // Default to data
159                    (&mut message.data, target_path)
160                };
161
162            // Evaluate the logic using thread-local DataLogic
163            let result = FUNCTION_DATA_LOGIC.with(|data_logic_cell| {
164                let data_logic = data_logic_cell.borrow_mut();
165
166                data_logic
167                    .evaluate_json(logic, &data_for_eval, None)
168                    .map_err(|e| {
169                        DataflowError::LogicEvaluation(format!("Failed to evaluate logic: {}", e))
170                    })
171            })?;
172
173            // Set the result at the target path
174            if adjusted_path.is_empty() {
175                // Replace the entire object
176                let old_value = std::mem::replace(target_object, result.clone());
177                changes.push(Change {
178                    path: target_path.to_string(),
179                    old_value,
180                    new_value: result,
181                });
182            } else {
183                // Set a specific path
184                let old_value =
185                    self.set_value_at_path(target_object, adjusted_path, result.clone())?;
186                changes.push(Change {
187                    path: target_path.to_string(),
188                    old_value,
189                    new_value: result,
190                });
191            }
192        }
193
194        Ok((200, changes))
195    }
196}