dataflow_rs/engine/functions/
map.rs

1use crate::engine::error::{DataflowError, Result};
2use serde::Deserialize;
3use serde_json::Value;
4
5/// Pre-parsed configuration for map function
6#[derive(Debug, Clone, Deserialize)]
7pub struct MapConfig {
8    pub mappings: Vec<MapMapping>,
9}
10
11#[derive(Debug, Clone, Deserialize)]
12pub struct MapMapping {
13    pub path: String,
14    pub logic: Value,
15    #[serde(skip)]
16    pub logic_index: Option<usize>,
17}
18
19impl MapConfig {
20    pub fn from_json(input: &Value) -> Result<Self> {
21        let mappings = input.get("mappings").ok_or_else(|| {
22            DataflowError::Validation("Missing 'mappings' array in input".to_string())
23        })?;
24
25        let mappings_arr = mappings
26            .as_array()
27            .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
28
29        let mut parsed_mappings = Vec::new();
30
31        for mapping in mappings_arr {
32            let path = mapping
33                .get("path")
34                .and_then(Value::as_str)
35                .ok_or_else(|| DataflowError::Validation("Missing 'path' in mapping".to_string()))?
36                .to_string();
37
38            let logic = mapping
39                .get("logic")
40                .ok_or_else(|| DataflowError::Validation("Missing 'logic' in mapping".to_string()))?
41                .clone();
42
43            parsed_mappings.push(MapMapping {
44                path,
45                logic,
46                logic_index: None,
47            });
48        }
49
50        Ok(MapConfig {
51            mappings: parsed_mappings,
52        })
53    }
54}
55
56#[cfg(test)]
57mod tests {
58    use super::*;
59    use serde_json::json;
60
61    #[test]
62    fn test_map_config_from_json() {
63        let input = json!({
64            "mappings": [
65                {
66                    "path": "data.field1",
67                    "logic": {"var": "data.source"}
68                },
69                {
70                    "path": "data.field2",
71                    "logic": "static_value"
72                }
73            ]
74        });
75
76        let config = MapConfig::from_json(&input).unwrap();
77        assert_eq!(config.mappings.len(), 2);
78        assert_eq!(config.mappings[0].path, "data.field1");
79        assert_eq!(config.mappings[1].path, "data.field2");
80    }
81
82    #[test]
83    fn test_map_config_missing_mappings() {
84        let input = json!({});
85        let result = MapConfig::from_json(&input);
86        assert!(result.is_err());
87    }
88
89    #[test]
90    fn test_map_config_invalid_mappings() {
91        let input = json!({
92            "mappings": "not_an_array"
93        });
94        let result = MapConfig::from_json(&input);
95        assert!(result.is_err());
96    }
97
98    #[test]
99    fn test_map_config_missing_path() {
100        let input = json!({
101            "mappings": [
102                {
103                    "logic": {"var": "data.source"}
104                }
105            ]
106        });
107        let result = MapConfig::from_json(&input);
108        assert!(result.is_err());
109    }
110
111    #[test]
112    fn test_map_config_missing_logic() {
113        let input = json!({
114            "mappings": [
115                {
116                    "path": "data.field1"
117                }
118            ]
119        });
120        let result = MapConfig::from_json(&input);
121        assert!(result.is_err());
122    }
123}