1use std::{
16 collections::{BTreeMap, HashMap},
17 ops::Deref,
18 str::FromStr,
19 sync::Arc,
20};
21
22use async_trait::async_trait;
23use drasi_core::{
24 interface::{MiddlewareError, MiddlewareSetupError, SourceMiddleware, SourceMiddlewareFactory},
25 models::{Element, ElementPropertyMap, ElementReference, SourceChange, SourceMiddlewareConfig},
26};
27use jsonpath_rust::{path::config::JsonPathConfig, JsonPathInst};
28use serde::{Deserialize, Deserializer};
29use serde_json::Value;
30
31#[cfg(test)]
32mod tests;
33
34#[derive(Debug, Clone, Deserialize)]
35#[serde(rename_all = "camelCase")]
36pub struct SourceMappedOutput {
37 op: Option<MapOperation>,
38 selector: Option<JsonPathExpression>,
39 label: Option<String>,
40 id: Option<JsonPathExpression>,
41 element_type: Option<MapElementType>,
42
43 #[serde(default)]
44 properties: BTreeMap<String, JsonPathExpression>,
45}
46
47#[derive(Debug, Clone, Deserialize)]
48pub enum MapOperation {
49 Insert,
50 Update,
51 Delete,
52}
53
54#[derive(Debug, Clone, Deserialize)]
55pub enum MapElementType {
56 Node,
57 #[serde(rename_all = "camelCase")]
58 Relation {
59 in_node_id: JsonPathExpression,
60 out_node_id: JsonPathExpression,
61 },
62}
63
64#[derive(Debug, Clone, Deserialize)]
65pub struct SourceMappedOperations {
66 #[serde(default)]
67 insert: Vec<SourceMappedOutput>,
68
69 #[serde(default)]
70 update: Vec<SourceMappedOutput>,
71
72 #[serde(default)]
73 delete: Vec<SourceMappedOutput>,
74}
75
76#[derive(Clone)]
77pub struct JsonPathExpression {
78 expression: String,
79 path: JsonPathInst,
80}
81
82impl JsonPathExpression {
83 pub fn execute(&self, value: &Value) -> Vec<Value> {
84 let result = self.path.find_slice(value, JsonPathConfig::default());
85 result
86 .into_iter()
87 .map(|v| v.deref().clone())
88 .collect::<Vec<Value>>()
89 }
90
91 pub fn execute_one(&self, value: &Value) -> Option<Value> {
92 let result = self.path.find_slice(value, JsonPathConfig::default());
93 result.first().map(|v| v.deref().clone())
94 }
95}
96
97impl<'de> Deserialize<'de> for JsonPathExpression {
98 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
99 where
100 D: Deserializer<'de>,
101 {
102 let expression = String::deserialize(deserializer)?;
103 let path = match JsonPathInst::from_str(&expression) {
104 Ok(p) => p,
105 Err(e) => return Err(serde::de::Error::custom(e.to_string())),
106 };
107 Ok(JsonPathExpression { expression, path })
108 }
109}
110
111impl std::fmt::Debug for JsonPathExpression {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 write!(f, "{:#?}", self.expression)
114 }
115}
116
117pub struct Map {
118 mappings: HashMap<String, SourceMappedOperations>,
119}
120
121#[async_trait]
122impl SourceMiddleware for Map {
123 async fn process(
124 &self,
125 source_change: SourceChange,
126 ) -> Result<Vec<SourceChange>, MiddlewareError> {
127 let metadata = match &source_change {
128 SourceChange::Insert { element } => element.get_metadata(),
129 SourceChange::Update { element } => element.get_metadata(),
130 SourceChange::Delete { metadata } => metadata,
131 SourceChange::Future { .. } => return Ok(vec![source_change]),
132 };
133
134 let mut results = Vec::new();
135
136 for label in metadata.labels.iter() {
137 let mappings = match self.mappings.get(&label.to_string()) {
138 Some(mappings) => match &source_change {
140 SourceChange::Insert { .. } => &mappings.insert,
141 SourceChange::Update { .. } => &mappings.update,
142 SourceChange::Delete { .. } => &mappings.delete,
143 _ => continue,
144 },
145 None => continue,
146 };
147
148 log::info!("Processing mappings for label: {}", label);
149
150 #[allow(unused_assignments)]
151 let mut del_element_binding = Option::<Element>::None;
152
153 #[allow(clippy::unwrap_used)]
154 let element = match &source_change {
155 SourceChange::Insert { element } => element,
156 SourceChange::Update { element } => element,
157 SourceChange::Delete { metadata } => {
158 del_element_binding = Some(Element::Node {
159 metadata: metadata.clone(),
160 properties: ElementPropertyMap::new(),
161 });
162 del_element_binding.as_ref().unwrap()
163 }
164 _ => continue,
165 };
166
167 for mapping in mappings {
168 let op = match &mapping.op {
169 Some(op) => op,
170 None => match &source_change {
171 SourceChange::Insert { .. } => &MapOperation::Insert,
172 SourceChange::Update { .. } => &MapOperation::Update,
173 SourceChange::Delete { .. } => &MapOperation::Delete,
174 _ => continue,
175 },
176 };
177
178 let mut source_obj: Value = element.into();
179
180 let selected = match &mapping.selector {
181 Some(selector) => selector.execute(&source_obj),
182 None => vec![source_obj.clone()],
183 };
184
185 for s in selected {
186 log::info!("Processing mapping for selector: {:#?}", s);
187 if let Some(obj) = source_obj.as_object_mut() {
188 obj.insert("$selected".to_string(), s);
189 }
190
191 let mut new_metadata = metadata.clone();
192
193 if let Some(id) = &mapping.id {
194 new_metadata.reference.element_id = match id.execute_one(&source_obj) {
195 Some(id) => match id.as_str() {
196 Some(id) => Arc::from(id),
197 None => Arc::from(id.to_string().as_str()),
198 },
199 None => continue, }
201 };
202
203 if let Some(label) = &mapping.label {
204 new_metadata.labels = Arc::from(vec![Arc::from(label.clone())]);
205 }
206
207 let mut properties = ElementPropertyMap::new();
208 for (key, value) in &mapping.properties {
209 match value.execute_one(&source_obj) {
210 Some(p) => properties.insert(key.as_str(), (&p).into()),
211 None => continue,
212 }
213 }
214
215 let new_element = match &mapping.element_type {
216 Some(MapElementType::Node) => Element::Node {
217 metadata: new_metadata,
218 properties,
219 },
220 Some(MapElementType::Relation {
221 in_node_id,
222 out_node_id,
223 }) => {
224 let in_node_id = match in_node_id.execute_one(&source_obj) {
225 Some(id) => match id.as_str() {
226 Some(id) => Arc::from(id),
227 None => Arc::from(id.to_string().as_str()),
228 },
229 None => continue, };
231
232 let out_node_id = match out_node_id.execute_one(&source_obj) {
233 Some(id) => match id.as_str() {
234 Some(id) => Arc::from(id),
235 None => Arc::from(id.to_string().as_str()),
236 },
237 None => continue, };
239
240 Element::Relation {
241 metadata: new_metadata,
242 in_node: ElementReference::new(
243 &metadata.reference.source_id,
244 &in_node_id,
245 ),
246 out_node: ElementReference::new(
247 &metadata.reference.source_id,
248 &out_node_id,
249 ),
250 properties,
251 }
252 }
253 None => match element {
254 Element::Node { .. } => Element::Node {
255 metadata: new_metadata,
256 properties,
257 },
258 Element::Relation {
259 in_node, out_node, ..
260 } => Element::Relation {
261 metadata: new_metadata,
262 in_node: in_node.clone(),
263 out_node: out_node.clone(),
264 properties,
265 },
266 },
267 };
268
269 match op {
270 MapOperation::Insert => results.push(SourceChange::Insert {
271 element: new_element,
272 }),
273 MapOperation::Update => results.push(SourceChange::Update {
274 element: new_element,
275 }),
276 MapOperation::Delete => results.push(SourceChange::Delete {
277 metadata: new_element.get_metadata().clone(),
278 }),
279 }
280 }
281 }
282 }
283 Ok(results)
284 }
285}
286
287pub struct MapFactory {}
288
289impl Default for MapFactory {
290 fn default() -> Self {
291 Self::new()
292 }
293}
294
295impl MapFactory {
296 pub fn new() -> Self {
297 MapFactory {}
298 }
299}
300
301impl SourceMiddlewareFactory for MapFactory {
302 fn name(&self) -> String {
303 "map".to_string()
304 }
305
306 fn create(
307 &self,
308 config: &SourceMiddlewareConfig,
309 ) -> Result<Arc<dyn SourceMiddleware>, MiddlewareSetupError> {
310 let cfg = Value::Object(config.config.clone());
311 let mappings = match serde_json::from_value(cfg) {
312 Ok(mappings) => mappings,
313 Err(e) => {
314 return Err(MiddlewareSetupError::InvalidConfiguration(format!(
315 "Invalid configuration: {}",
316 e
317 )))
318 }
319 };
320
321 log::info!(
322 "Map middleware {} mappings loaded: {:#?}",
323 config.name,
324 mappings
325 );
326
327 Ok(Arc::new(Map { mappings }))
328 }
329}