drasi_middleware/map/
mod.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{BTreeMap, HashMap},
17    ops::Deref,
18    str::FromStr,
19    sync::Arc,
20};
21
22use async_trait::async_trait;
23use drasi_core::{
24    interface::{MiddlewareError, MiddlewareSetupError, SourceMiddleware, SourceMiddlewareFactory},
25    models::{Element, ElementPropertyMap, ElementReference, SourceChange, SourceMiddlewareConfig},
26};
27use jsonpath_rust::{path::config::JsonPathConfig, JsonPathInst};
28use serde::{Deserialize, Deserializer};
29use serde_json::Value;
30
31#[cfg(test)]
32mod tests;
33
34#[derive(Debug, Clone, Deserialize)]
35#[serde(rename_all = "camelCase")]
36pub struct SourceMappedOutput {
37    op: Option<MapOperation>,
38    selector: Option<JsonPathExpression>,
39    label: Option<String>,
40    id: Option<JsonPathExpression>,
41    element_type: Option<MapElementType>,
42
43    #[serde(default)]
44    properties: BTreeMap<String, JsonPathExpression>,
45}
46
47#[derive(Debug, Clone, Deserialize)]
48pub enum MapOperation {
49    Insert,
50    Update,
51    Delete,
52}
53
54#[derive(Debug, Clone, Deserialize)]
55pub enum MapElementType {
56    Node,
57    #[serde(rename_all = "camelCase")]
58    Relation {
59        in_node_id: JsonPathExpression,
60        out_node_id: JsonPathExpression,
61    },
62}
63
64#[derive(Debug, Clone, Deserialize)]
65pub struct SourceMappedOperations {
66    #[serde(default)]
67    insert: Vec<SourceMappedOutput>,
68
69    #[serde(default)]
70    update: Vec<SourceMappedOutput>,
71
72    #[serde(default)]
73    delete: Vec<SourceMappedOutput>,
74}
75
76#[derive(Clone)]
77pub struct JsonPathExpression {
78    expression: String,
79    path: JsonPathInst,
80}
81
82impl JsonPathExpression {
83    pub fn execute(&self, value: &Value) -> Vec<Value> {
84        let result = self.path.find_slice(value, JsonPathConfig::default());
85        result
86            .into_iter()
87            .map(|v| v.deref().clone())
88            .collect::<Vec<Value>>()
89    }
90
91    pub fn execute_one(&self, value: &Value) -> Option<Value> {
92        let result = self.path.find_slice(value, JsonPathConfig::default());
93        result.first().map(|v| v.deref().clone())
94    }
95}
96
97impl<'de> Deserialize<'de> for JsonPathExpression {
98    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
99    where
100        D: Deserializer<'de>,
101    {
102        let expression = String::deserialize(deserializer)?;
103        let path = match JsonPathInst::from_str(&expression) {
104            Ok(p) => p,
105            Err(e) => return Err(serde::de::Error::custom(e.to_string())),
106        };
107        Ok(JsonPathExpression { expression, path })
108    }
109}
110
111impl std::fmt::Debug for JsonPathExpression {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        write!(f, "{:#?}", self.expression)
114    }
115}
116
117pub struct Map {
118    mappings: HashMap<String, SourceMappedOperations>,
119}
120
121#[async_trait]
122impl SourceMiddleware for Map {
123    async fn process(
124        &self,
125        source_change: SourceChange,
126    ) -> Result<Vec<SourceChange>, MiddlewareError> {
127        let metadata = match &source_change {
128            SourceChange::Insert { element } => element.get_metadata(),
129            SourceChange::Update { element } => element.get_metadata(),
130            SourceChange::Delete { metadata } => metadata,
131            SourceChange::Future { .. } => return Ok(vec![source_change]),
132        };
133
134        let mut results = Vec::new();
135
136        for label in metadata.labels.iter() {
137            let mappings = match self.mappings.get(&label.to_string()) {
138                //todo: use Arc<str> instead of String
139                Some(mappings) => match &source_change {
140                    SourceChange::Insert { .. } => &mappings.insert,
141                    SourceChange::Update { .. } => &mappings.update,
142                    SourceChange::Delete { .. } => &mappings.delete,
143                    _ => continue,
144                },
145                None => continue,
146            };
147
148            log::info!("Processing mappings for label: {}", label);
149
150            #[allow(unused_assignments)]
151            let mut del_element_binding = Option::<Element>::None;
152
153            #[allow(clippy::unwrap_used)]
154            let element = match &source_change {
155                SourceChange::Insert { element } => element,
156                SourceChange::Update { element } => element,
157                SourceChange::Delete { metadata } => {
158                    del_element_binding = Some(Element::Node {
159                        metadata: metadata.clone(),
160                        properties: ElementPropertyMap::new(),
161                    });
162                    del_element_binding.as_ref().unwrap()
163                }
164                _ => continue,
165            };
166
167            for mapping in mappings {
168                let op = match &mapping.op {
169                    Some(op) => op,
170                    None => match &source_change {
171                        SourceChange::Insert { .. } => &MapOperation::Insert,
172                        SourceChange::Update { .. } => &MapOperation::Update,
173                        SourceChange::Delete { .. } => &MapOperation::Delete,
174                        _ => continue,
175                    },
176                };
177
178                let mut source_obj: Value = element.into();
179
180                let selected = match &mapping.selector {
181                    Some(selector) => selector.execute(&source_obj),
182                    None => vec![source_obj.clone()],
183                };
184
185                for s in selected {
186                    log::info!("Processing mapping for selector: {:#?}", s);
187                    if let Some(obj) = source_obj.as_object_mut() {
188                        obj.insert("$selected".to_string(), s);
189                    }
190
191                    let mut new_metadata = metadata.clone();
192
193                    if let Some(id) = &mapping.id {
194                        new_metadata.reference.element_id = match id.execute_one(&source_obj) {
195                            Some(id) => match id.as_str() {
196                                Some(id) => Arc::from(id),
197                                None => Arc::from(id.to_string().as_str()),
198                            },
199                            None => continue, //expression did not return an id
200                        }
201                    };
202
203                    if let Some(label) = &mapping.label {
204                        new_metadata.labels = Arc::from(vec![Arc::from(label.clone())]);
205                    }
206
207                    let mut properties = ElementPropertyMap::new();
208                    for (key, value) in &mapping.properties {
209                        match value.execute_one(&source_obj) {
210                            Some(p) => properties.insert(key.as_str(), (&p).into()),
211                            None => continue,
212                        }
213                    }
214
215                    let new_element = match &mapping.element_type {
216                        Some(MapElementType::Node) => Element::Node {
217                            metadata: new_metadata,
218                            properties,
219                        },
220                        Some(MapElementType::Relation {
221                            in_node_id,
222                            out_node_id,
223                        }) => {
224                            let in_node_id = match in_node_id.execute_one(&source_obj) {
225                                Some(id) => match id.as_str() {
226                                    Some(id) => Arc::from(id),
227                                    None => Arc::from(id.to_string().as_str()),
228                                },
229                                None => continue, //expression did not return an id
230                            };
231
232                            let out_node_id = match out_node_id.execute_one(&source_obj) {
233                                Some(id) => match id.as_str() {
234                                    Some(id) => Arc::from(id),
235                                    None => Arc::from(id.to_string().as_str()),
236                                },
237                                None => continue, //expression did not return an id
238                            };
239
240                            Element::Relation {
241                                metadata: new_metadata,
242                                in_node: ElementReference::new(
243                                    &metadata.reference.source_id,
244                                    &in_node_id,
245                                ),
246                                out_node: ElementReference::new(
247                                    &metadata.reference.source_id,
248                                    &out_node_id,
249                                ),
250                                properties,
251                            }
252                        }
253                        None => match element {
254                            Element::Node { .. } => Element::Node {
255                                metadata: new_metadata,
256                                properties,
257                            },
258                            Element::Relation {
259                                in_node, out_node, ..
260                            } => Element::Relation {
261                                metadata: new_metadata,
262                                in_node: in_node.clone(),
263                                out_node: out_node.clone(),
264                                properties,
265                            },
266                        },
267                    };
268
269                    match op {
270                        MapOperation::Insert => results.push(SourceChange::Insert {
271                            element: new_element,
272                        }),
273                        MapOperation::Update => results.push(SourceChange::Update {
274                            element: new_element,
275                        }),
276                        MapOperation::Delete => results.push(SourceChange::Delete {
277                            metadata: new_element.get_metadata().clone(),
278                        }),
279                    }
280                }
281            }
282        }
283        Ok(results)
284    }
285}
286
287pub struct MapFactory {}
288
289impl Default for MapFactory {
290    fn default() -> Self {
291        Self::new()
292    }
293}
294
295impl MapFactory {
296    pub fn new() -> Self {
297        MapFactory {}
298    }
299}
300
301impl SourceMiddlewareFactory for MapFactory {
302    fn name(&self) -> String {
303        "map".to_string()
304    }
305
306    fn create(
307        &self,
308        config: &SourceMiddlewareConfig,
309    ) -> Result<Arc<dyn SourceMiddleware>, MiddlewareSetupError> {
310        let cfg = Value::Object(config.config.clone());
311        let mappings = match serde_json::from_value(cfg) {
312            Ok(mappings) => mappings,
313            Err(e) => {
314                return Err(MiddlewareSetupError::InvalidConfiguration(format!(
315                    "Invalid configuration: {}",
316                    e
317                )))
318            }
319        };
320
321        log::info!(
322            "Map middleware {} mappings loaded: {:#?}",
323            config.name,
324            mappings
325        );
326
327        Ok(Arc::new(Map { mappings }))
328    }
329}