dataflow_rs/engine/functions/
map.rs1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::FUNCTION_DATA_LOGIC;
3use crate::engine::message::{Change, Message};
4use crate::engine::AsyncFunctionHandler;
5use async_trait::async_trait;
6use serde_json::{json, Value};
7
8pub struct MapFunction {
13 }
15
16impl Default for MapFunction {
17 fn default() -> Self {
18 Self::new()
19 }
20}
21
22impl MapFunction {
23 pub fn new() -> Self {
25 Self {}
26 }
27
28 fn set_value_at_path(&self, target: &mut Value, path: &str, value: Value) -> Result<Value> {
30 let mut current = target;
31 let mut old_value = Value::Null;
32 let path_parts: Vec<&str> = path.split('.').collect();
33
34 for (i, part) in path_parts.iter().enumerate() {
36 if i == path_parts.len() - 1 {
37 if current.is_object() {
39 if let Value::Object(map) = current {
40 old_value = map.get(*part).cloned().unwrap_or(Value::Null);
42 map.insert(part.to_string(), value.clone());
43 }
44 } else if current.is_array() {
45 if let Ok(index) = part.parse::<usize>() {
47 if let Value::Array(arr) = current {
48 while arr.len() <= index {
50 arr.push(Value::Null);
51 }
52 old_value = arr[index].clone();
54 arr[index] = value.clone();
55 }
56 } else {
57 return Err(DataflowError::Validation(format!(
58 "Invalid array index: {}",
59 part
60 )));
61 }
62 } else {
63 return Err(DataflowError::Validation(format!(
64 "Cannot set property '{}' on non-object value",
65 part
66 )));
67 }
68 } else {
69 match current {
71 Value::Object(map) => {
72 if !map.contains_key(*part) {
73 map.insert(part.to_string(), json!({}));
74 }
75 current = map.get_mut(*part).unwrap();
76 }
77 Value::Array(arr) => {
78 if let Ok(index) = part.parse::<usize>() {
79 while arr.len() <= index {
81 arr.push(json!({}));
82 }
83 current = &mut arr[index];
84 } else {
85 return Err(DataflowError::Validation(format!(
86 "Invalid array index: {}",
87 part
88 )));
89 }
90 }
91 _ => {
92 return Err(DataflowError::Validation(format!(
93 "Cannot navigate path '{}' on non-object value",
94 part
95 )));
96 }
97 }
98 }
99 }
100
101 Ok(old_value)
102 }
103}
104
105#[async_trait]
106impl AsyncFunctionHandler for MapFunction {
107 async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
108 let mappings = input.get("mappings").ok_or_else(|| {
110 DataflowError::Validation("Missing 'mappings' array in input".to_string())
111 })?;
112
113 let mappings_arr = mappings
114 .as_array()
115 .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
116
117 let mut changes = Vec::new();
118
119 for mapping in mappings_arr {
121 let target_path = mapping.get("path").and_then(Value::as_str).ok_or_else(|| {
123 DataflowError::Validation("Missing 'path' in mapping".to_string())
124 })?;
125
126 let logic = mapping.get("logic").ok_or_else(|| {
128 DataflowError::Validation("Missing 'logic' in mapping".to_string())
129 })?;
130
131 let data_clone = message.data.clone();
133 let metadata_clone = message.metadata.clone();
134 let temp_data_clone = message.temp_data.clone();
135
136 let data_for_eval = json!({
138 "data": data_clone,
139 "metadata": metadata_clone,
140 "temp_data": temp_data_clone,
141 });
142
143 let (target_object, adjusted_path) =
145 if let Some(path) = target_path.strip_prefix("data.") {
146 (&mut message.data, path)
147 } else if let Some(path) = target_path.strip_prefix("metadata.") {
148 (&mut message.metadata, path)
149 } else if let Some(path) = target_path.strip_prefix("temp_data.") {
150 (&mut message.temp_data, path)
151 } else if target_path == "data" {
152 (&mut message.data, "")
153 } else if target_path == "metadata" {
154 (&mut message.metadata, "")
155 } else if target_path == "temp_data" {
156 (&mut message.temp_data, "")
157 } else {
158 (&mut message.data, target_path)
160 };
161
162 let result = FUNCTION_DATA_LOGIC.with(|data_logic_cell| {
164 let data_logic = data_logic_cell.borrow_mut();
165
166 data_logic
167 .evaluate_json(logic, &data_for_eval, None)
168 .map_err(|e| {
169 DataflowError::LogicEvaluation(format!("Failed to evaluate logic: {}", e))
170 })
171 })?;
172
173 if adjusted_path.is_empty() {
175 let old_value = std::mem::replace(target_object, result.clone());
177 changes.push(Change {
178 path: target_path.to_string(),
179 old_value,
180 new_value: result,
181 });
182 } else {
183 let old_value =
185 self.set_value_at_path(target_object, adjusted_path, result.clone())?;
186 changes.push(Change {
187 path: target_path.to_string(),
188 old_value,
189 new_value: result,
190 });
191 }
192 }
193
194 Ok((200, changes))
195 }
196}