Skip to main content

dataflow_rs/engine/functions/
map.rs

1//! # Map Function Module
2//!
3//! Data transformation via JSONLogic expressions. Each mapping evaluates a
4//! compiled JSONLogic rule against the message's context (`OwnedDataValue`)
5//! and assigns the result to a path. The result type is `OwnedDataValue` —
6//! no `serde_json::Value` intermediate.
7//!
8//! ## Features
9//!
10//! - JSONLogic-driven transformations
11//! - Dot-path target paths with auto-creation
12//! - Root-field merge semantics for `data` / `metadata` / `temp_data`
13//! - Null results skip assignment
14//! - Audit-trail change tracking
15
16use crate::engine::error::{DataflowError, Result};
17use crate::engine::executor::{ArenaContext, with_arena};
18use crate::engine::message::{Change, Message};
19use crate::engine::task_outcome::TaskOutcome;
20use crate::engine::utils::{get_nested_value_parts, set_nested_value_parts};
21use datalogic_rs::{Engine, Logic};
22use datavalue::OwnedDataValue;
23use log::{debug, error};
24use serde::Deserialize;
25use serde_json::Value;
26use std::sync::Arc;
27
28/// Configuration for the map function containing a list of mappings.
29#[derive(Debug, Clone, Deserialize)]
30pub struct MapConfig {
31    /// List of mappings to execute in order.
32    pub mappings: Vec<MapMapping>,
33}
34
35/// A single mapping that transforms and assigns data.
36#[derive(Debug, Clone, Deserialize, Default)]
37pub struct MapMapping {
38    /// Target path where the result will be stored (e.g., "data.user.name").
39    /// Supports dot notation for nested paths and `#` prefix for numeric field names.
40    pub path: String,
41
42    /// JSONLogic expression (kept as `serde_json::Value` since this is the
43    /// shape the compiler accepts; not runtime data).
44    pub logic: Value,
45
46    /// Engine-internal: pre-compiled JSONLogic, populated by `LogicCompiler`.
47    /// `None` is logged as an error during execute (the compiler should always
48    /// populate it). Not part of the stable API.
49    #[doc(hidden)]
50    #[serde(skip)]
51    pub compiled_logic: Option<Arc<Logic>>,
52
53    /// Engine-internal: `Arc<str>` mirror of `path`, populated by
54    /// `LogicCompiler`. Cloned (refcount bump) into `Change.path` per audit
55    /// emission so the hot path avoids `Arc::from(&path)` allocations.
56    /// Not part of the stable API.
57    #[doc(hidden)]
58    #[serde(skip)]
59    pub path_arc: Arc<str>,
60
61    /// Engine-internal: pre-split path segments (with the `#`-prefix escape
62    /// already applied, matching `utils::strip_hash_prefix`). Populated by
63    /// `LogicCompiler`. The hot path consumes this directly instead of running
64    /// `path.split('.')` — saves ~3% on `CharSearcher::next_match` per the
65    /// flamegraph. Not part of the stable API.
66    #[doc(hidden)]
67    #[serde(skip)]
68    pub path_parts: Arc<[Arc<str>]>,
69}
70
71impl MapConfig {
72    /// Parses a `MapConfig` from a JSON value.
73    pub fn from_json(input: &Value) -> Result<Self> {
74        let mappings = input.get("mappings").ok_or_else(|| {
75            DataflowError::Validation("Missing 'mappings' array in input".to_string())
76        })?;
77
78        let mappings_arr = mappings
79            .as_array()
80            .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
81
82        let mut parsed_mappings = Vec::new();
83
84        for mapping in mappings_arr {
85            let path = mapping
86                .get("path")
87                .and_then(Value::as_str)
88                .ok_or_else(|| DataflowError::Validation("Missing 'path' in mapping".to_string()))?
89                .to_string();
90
91            let logic = mapping
92                .get("logic")
93                .ok_or_else(|| DataflowError::Validation("Missing 'logic' in mapping".to_string()))?
94                .clone();
95
96            parsed_mappings.push(MapMapping {
97                path_arc: Arc::from(path.as_str()),
98                path_parts: Arc::from(Vec::<Arc<str>>::new().into_boxed_slice()),
99                path,
100                logic,
101                compiled_logic: None,
102            });
103        }
104
105        Ok(MapConfig {
106            mappings: parsed_mappings,
107        })
108    }
109
110    /// Executes all map transformations using pre-compiled logic.
111    ///
112    /// # Arguments
113    /// * `message` - The message to transform (modified in place)
114    /// * `engine` - Datalogic v5 engine for evaluation
115    pub fn execute(
116        &self,
117        message: &mut Message,
118        engine: &Arc<Engine>,
119    ) -> Result<(TaskOutcome, Vec<Change>)> {
120        // Default path: open the arena, build a fresh ArenaContext from the
121        // current `message.context`, run mappings. Used when no outer
122        // workflow-level arena session is available.
123        with_arena(|arena| {
124            let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
125            self.execute_in_arena(message, &mut arena_ctx, engine, None)
126        })
127    }
128
129    /// Mappings-loop run against an externally-provided `ArenaContext`.
130    /// Used by the workflow-level sync-stretch executor so the
131    /// `OwnedDataValue → arena` conversion done by an earlier task in the
132    /// same workflow stretch is reused.
133    ///
134    /// `trace_snapshots` (when `Some`) collects a `serde_json::Value` snapshot
135    /// of `message.context` *before* each mapping runs — the trace
136    /// surface uses this for per-mapping debugging. `None` skips the snapshot
137    /// work entirely (the production path).
138    pub(crate) fn execute_in_arena(
139        &self,
140        message: &mut Message,
141        arena_ctx: &mut ArenaContext<'_>,
142        engine: &Arc<Engine>,
143        mut trace_snapshots: Option<&mut Vec<Value>>,
144    ) -> Result<(TaskOutcome, Vec<Change>)> {
145        let mut changes = Vec::new();
146        let mut errors_encountered = false;
147
148        debug!("Map: Executing {} mappings", self.mappings.len());
149
150        let arena = arena_ctx.arena();
151        for mapping in &self.mappings {
152            debug!("Processing mapping to path: {}", mapping.path);
153
154            // Trace mode: snapshot the context as a serde_json::Value *before*
155            // applying this mapping. Bridge cost is acceptable on the debug
156            // surface; production callers pass `None` and skip it entirely.
157            if let Some(buf) = trace_snapshots.as_deref_mut() {
158                buf.push(Value::from(&message.context));
159            }
160
161            // Pre-compiled `Arc<Logic>` lives on the mapping; the workflow
162            // compiler always populates it. `None` only happens for mappings
163            // constructed directly without compilation (test surface) —
164            // logged and skipped here.
165            let compiled_logic = match &mapping.compiled_logic {
166                Some(logic) => logic,
167                None => {
168                    error!("Map: Logic not compiled for mapping to {}", mapping.path);
169                    errors_encountered = true;
170                    continue;
171                }
172            };
173
174            let ctx_av = arena_ctx.as_data_value();
175            let result_av = match engine.evaluate(compiled_logic, ctx_av, arena) {
176                Ok(av) => av,
177                Err(e) => {
178                    error!(
179                        "Map: Error evaluating logic for path {}: {:?}",
180                        mapping.path, e
181                    );
182                    errors_encountered = true;
183                    continue;
184                }
185            };
186
187            let transformed_value = result_av.to_owned();
188            debug!(
189                "Map: Evaluated logic for path {} resulted in: {:?}",
190                mapping.path, transformed_value
191            );
192
193            if matches!(transformed_value, OwnedDataValue::Null) {
194                debug!(
195                    "Map: Skipping mapping for path {} as result is null",
196                    mapping.path
197                );
198                continue;
199            }
200
201            // Compiler populates `path_parts`. For callers that build a
202            // `MapConfig` directly (the test surface and a few in-tree
203            // helpers) fall back to splitting on the fly — same semantics,
204            // one extra allocation per mapping per call.
205            let fallback_parts: Vec<Arc<str>>;
206            let parts: &[Arc<str>] = if mapping.path_parts.is_empty() && !mapping.path.is_empty() {
207                fallback_parts = mapping.path.split('.').map(Arc::from).collect();
208                &fallback_parts
209            } else {
210                &mapping.path_parts
211            };
212            let path_arc: Arc<str> = if mapping.path_arc.is_empty() && !mapping.path.is_empty() {
213                Arc::from(mapping.path.as_str())
214            } else {
215                Arc::clone(&mapping.path_arc)
216            };
217
218            if message.capture_changes {
219                // Audit-on: capture old/new values directly into the `Change`.
220                // `Change` owns `OwnedDataValue`s (not `Arc<…>`) — one fewer
221                // heap allocation per recorded mutation.
222                let old_value = get_nested_value_parts(&message.context, parts)
223                    .cloned()
224                    .unwrap_or(OwnedDataValue::Null);
225                let new_value = transformed_value.clone();
226
227                changes.push(Change {
228                    path: path_arc,
229                    old_value,
230                    new_value,
231                });
232            }
233            arena_ctx.apply_mutation_parts(&mut message.context, parts, |ctx| {
234                apply_mapping_parts(ctx, parts, &mapping.path, transformed_value);
235            });
236            debug!("Successfully mapped to path: {}", mapping.path);
237        }
238
239        let outcome = if errors_encountered {
240            TaskOutcome::Status(500)
241        } else {
242            TaskOutcome::Success
243        };
244        Ok((outcome, changes))
245    }
246}
247
248/// Pre-split variant of `apply_mapping`. Consumes `parts` for the
249/// `set_nested_value` walk; `full_path` is only needed for the root-merge
250/// detection (which checks the exact, un-split string).
251fn apply_mapping_parts(
252    context: &mut OwnedDataValue,
253    parts: &[Arc<str>],
254    full_path: &str,
255    new_value: OwnedDataValue,
256) {
257    if parts.len() == 1 && matches!(full_path, "data" | "metadata" | "temp_data") {
258        merge_root_field(context, full_path, new_value);
259    } else {
260        set_nested_value_parts(context, parts, new_value);
261    }
262}
263
264/// Merge `new_value` into the existing root-field slot named `path` on the
265/// context object. If both sides are objects, merge keys (new wins for
266/// collisions). Otherwise, overwrite.
267fn merge_root_field(context: &mut OwnedDataValue, path: &str, new_value: OwnedDataValue) {
268    let OwnedDataValue::Object(ctx_pairs) = context else {
269        // The canonical context is always an Object; if somehow not, replace.
270        *context = wrap_root(path, new_value);
271        return;
272    };
273
274    let slot_idx = ctx_pairs.iter().position(|(k, _)| k == path);
275    match slot_idx {
276        Some(idx) => {
277            let slot = &mut ctx_pairs[idx].1;
278            match (slot, new_value) {
279                (OwnedDataValue::Object(existing), OwnedDataValue::Object(new_pairs)) => {
280                    for (k, v) in new_pairs {
281                        if let Some(s) = existing.iter_mut().find(|(ek, _)| ek == &k) {
282                            s.1 = v;
283                        } else {
284                            existing.push((k, v));
285                        }
286                    }
287                }
288                (slot, new) => *slot = new,
289            }
290        }
291        None => {
292            ctx_pairs.push((path.to_string(), new_value));
293        }
294    }
295}
296
297/// Fallback wrap when the top-level context isn't an Object (shouldn't happen
298/// in normal flow but kept for defence in depth).
299fn wrap_root(path: &str, value: OwnedDataValue) -> OwnedDataValue {
300    OwnedDataValue::Object(vec![(path.to_string(), value)])
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306    use crate::engine::message::Message;
307    use crate::engine::utils::set_nested_value;
308    use serde_json::json;
309
310    fn dv(v: serde_json::Value) -> OwnedDataValue {
311        OwnedDataValue::from(&v)
312    }
313
314    fn fresh_message(initial: serde_json::Value) -> Message {
315        // Build a message whose context's `data` field starts as `initial`.
316        let mut m = Message::new(Arc::new(dv(json!({}))));
317        set_nested_value(&mut m.context, "data", dv(initial));
318        m
319    }
320
321    #[test]
322    fn test_map_config_from_json() {
323        let input = json!({
324            "mappings": [
325                { "path": "data.field1", "logic": {"var": "data.source"} },
326                { "path": "data.field2", "logic": "static_value" }
327            ]
328        });
329
330        let config = MapConfig::from_json(&input).unwrap();
331        assert_eq!(config.mappings.len(), 2);
332        assert_eq!(config.mappings[0].path, "data.field1");
333        assert_eq!(config.mappings[1].path, "data.field2");
334    }
335
336    #[test]
337    fn test_map_config_missing_mappings() {
338        assert!(MapConfig::from_json(&json!({})).is_err());
339    }
340
341    #[test]
342    fn test_map_config_invalid_mappings() {
343        assert!(MapConfig::from_json(&json!({"mappings": "not_an_array"})).is_err());
344    }
345
346    #[test]
347    fn test_map_config_missing_path() {
348        let input = json!({"mappings": [{"logic": {"var": "data.source"}}]});
349        assert!(MapConfig::from_json(&input).is_err());
350    }
351
352    #[test]
353    fn test_map_config_missing_logic() {
354        let input = json!({"mappings": [{"path": "data.field1"}]});
355        assert!(MapConfig::from_json(&input).is_err());
356    }
357
358    /// Helper that compiles each mapping's `logic` and stamps the resulting
359    /// `Arc<Logic>` into the `compiled_logic` slot — mirroring what
360    /// `LogicCompiler` does at engine construction.
361    fn compile_mappings(engine: &Arc<Engine>, config: &mut MapConfig) {
362        for mapping in &mut config.mappings {
363            mapping.compiled_logic = Some(engine.compile_arc(&mapping.logic).unwrap());
364        }
365    }
366
367    #[test]
368    fn test_map_metadata_assignment() {
369        let engine = Arc::new(Engine::builder().with_templating(true).build());
370
371        let mut message = fresh_message(json!({
372            "SwiftMT": { "message_type": "103" }
373        }));
374
375        let mut config = MapConfig {
376            mappings: vec![MapMapping {
377                path: "metadata.SwiftMT.message_type".to_string(),
378                logic: json!({"var": "data.SwiftMT.message_type"}),
379                ..Default::default()
380            }],
381        };
382        compile_mappings(&engine, &mut config);
383
384        let result = config.execute(&mut message, &engine);
385        assert!(result.is_ok());
386
387        let (outcome, changes) = result.unwrap();
388        assert_eq!(outcome, TaskOutcome::Success);
389        assert_eq!(changes.len(), 1);
390
391        assert_eq!(
392            message.context["metadata"]
393                .get("SwiftMT")
394                .and_then(|v| v.get("message_type")),
395            Some(&dv(json!("103")))
396        );
397    }
398
399    #[test]
400    fn test_map_null_values_skip_assignment() {
401        let engine = Arc::new(Engine::builder().with_templating(true).build());
402
403        let mut message = fresh_message(json!({ "existing_field": "should_remain" }));
404        set_nested_value(
405            &mut message.context,
406            "metadata",
407            dv(json!({"existing_meta": "should_remain"})),
408        );
409
410        let mut config = MapConfig {
411            mappings: vec![
412                MapMapping {
413                    path: "data.new_field".to_string(),
414                    logic: json!({"var": "data.non_existent_field"}),
415                    ..Default::default()
416                },
417                MapMapping {
418                    path: "metadata.new_meta".to_string(),
419                    logic: json!({"var": "data.another_non_existent"}),
420                    ..Default::default()
421                },
422                MapMapping {
423                    path: "data.actual_field".to_string(),
424                    logic: json!("actual_value"),
425                    ..Default::default()
426                },
427            ],
428        };
429        compile_mappings(&engine, &mut config);
430
431        let result = config.execute(&mut message, &engine);
432        assert!(result.is_ok());
433
434        let (outcome, changes) = result.unwrap();
435        assert_eq!(outcome, TaskOutcome::Success);
436        assert_eq!(changes.len(), 1);
437        assert_eq!(changes[0].path.as_ref(), "data.actual_field");
438
439        assert_eq!(message.context["data"].get("new_field"), None);
440        assert_eq!(message.context["metadata"].get("new_meta"), None);
441
442        assert_eq!(
443            message.context["data"].get("existing_field"),
444            Some(&dv(json!("should_remain")))
445        );
446        assert_eq!(
447            message.context["metadata"].get("existing_meta"),
448            Some(&dv(json!("should_remain")))
449        );
450
451        assert_eq!(
452            message.context["data"].get("actual_field"),
453            Some(&dv(json!("actual_value")))
454        );
455    }
456
457    #[test]
458    fn test_map_execute_with_trace_captures_context_snapshots() {
459        let engine = Arc::new(Engine::builder().with_templating(true).build());
460
461        let mut message = fresh_message(json!({ "first": "Alice", "last": "Smith" }));
462
463        let mut config = MapConfig {
464            mappings: vec![
465                MapMapping {
466                    path: "data.full_name".to_string(),
467                    logic: json!({"cat": [{"var": "data.first"}, " ", {"var": "data.last"}]}),
468                    ..Default::default()
469                },
470                MapMapping {
471                    path: "data.greeting".to_string(),
472                    logic: json!({"cat": ["Hello, ", {"var": "data.full_name"}]}),
473                    ..Default::default()
474                },
475            ],
476        };
477        compile_mappings(&engine, &mut config);
478
479        let mut context_snapshots: Vec<Value> = Vec::new();
480        let result = with_arena(|arena| {
481            let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
482            config.execute_in_arena(
483                &mut message,
484                &mut arena_ctx,
485                &engine,
486                Some(&mut context_snapshots),
487            )
488        });
489        assert!(result.is_ok());
490
491        let (outcome, changes) = result.unwrap();
492        assert_eq!(outcome, TaskOutcome::Success);
493        assert_eq!(changes.len(), 2);
494        assert_eq!(context_snapshots.len(), 2);
495
496        // Snapshots are `serde_json::Value` for the trace surface.
497        assert!(context_snapshots[0]["data"].get("full_name").is_none());
498        assert_eq!(
499            context_snapshots[1]["data"].get("full_name"),
500            Some(&json!("Alice Smith"))
501        );
502    }
503
504    #[test]
505    fn test_map_multiple_fields_including_metadata() {
506        let engine = Arc::new(Engine::builder().with_templating(true).build());
507
508        let mut message = fresh_message(json!({
509            "ISO20022_MX": {
510                "document": {
511                    "TxInf": {
512                        "OrgnlGrpInf": { "OrgnlMsgNmId": "pacs.008.001.08" }
513                    }
514                }
515            },
516            "SwiftMT": { "message_type": "103" }
517        }));
518
519        let mut config = MapConfig {
520            mappings: vec![
521                MapMapping {
522                    path: "data.SwiftMT.message_type".to_string(),
523                    logic: json!("103"),
524                    ..Default::default()
525                },
526                MapMapping {
527                    path: "metadata.SwiftMT.message_type".to_string(),
528                    logic: json!({"var": "data.SwiftMT.message_type"}),
529                    ..Default::default()
530                },
531                MapMapping {
532                    path: "temp_data.original_msg_type".to_string(),
533                    logic: json!({"var": "data.ISO20022_MX.document.TxInf.OrgnlGrpInf.OrgnlMsgNmId"}),
534                    ..Default::default()
535                },
536            ],
537        };
538        compile_mappings(&engine, &mut config);
539
540        let result = config.execute(&mut message, &engine);
541        assert!(result.is_ok());
542
543        let (outcome, changes) = result.unwrap();
544        assert_eq!(outcome, TaskOutcome::Success);
545        assert_eq!(changes.len(), 3);
546
547        assert_eq!(
548            message.context["data"]
549                .get("SwiftMT")
550                .and_then(|v| v.get("message_type")),
551            Some(&dv(json!("103")))
552        );
553        assert_eq!(
554            message.context["metadata"]
555                .get("SwiftMT")
556                .and_then(|v| v.get("message_type")),
557            Some(&dv(json!("103")))
558        );
559        assert_eq!(
560            message.context["temp_data"].get("original_msg_type"),
561            Some(&dv(json!("pacs.008.001.08")))
562        );
563    }
564}