Skip to main content

drasi_source_mapping/
engine.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Source mapping engine using Handlebars templates.
16
17use crate::config::{
18    EffectiveFromConfig, ElementTemplate, ElementType, MappingCondition, OperationType,
19    SourceMapping, TimestampFormat,
20};
21use anyhow::{anyhow, Result};
22use drasi_core::models::{
23    Element, ElementMetadata, ElementPropertyMap, ElementReference, ElementValue, SourceChange,
24};
25use handlebars::{
26    Context, Handlebars, Helper, HelperResult, Output, RenderContext, RenderErrorReason,
27};
28use ordered_float::OrderedFloat;
29use regex::Regex;
30use serde_json::Value as JsonValue;
31use std::collections::HashMap;
32use std::sync::Arc;
33
34/// Source mapping engine that transforms payloads into graph change events.
35///
36/// Uses Handlebars templates to extract element IDs, labels, properties,
37/// and operations from arbitrary JSON contexts.
38///
39/// The context is a `serde_json::Value` that each source builds with its own
40/// variables. For example:
41/// - HTTP source: `{ "payload": ..., "headers": ..., "route": ..., "query": ... }`
42/// - Kafka source: `{ "payload": ..., "key": ..., "topic": ..., "partition": ..., "offset": ... }`
43pub struct SourceMappingEngine {
44    handlebars: Handlebars<'static>,
45    regex_cache: std::sync::Mutex<HashMap<String, Regex>>,
46}
47
48impl SourceMappingEngine {
49    /// Create a new mapping engine with custom helpers registered
50    pub fn new() -> Self {
51        let mut handlebars = Handlebars::new();
52        handlebars.set_strict_mode(false);
53
54        register_helpers(&mut handlebars);
55
56        Self {
57            handlebars,
58            regex_cache: std::sync::Mutex::new(HashMap::new()),
59        }
60    }
61
62    /// Render a template string with the given context
63    pub fn render_string(&self, template: &str, context: &JsonValue) -> Result<String> {
64        self.handlebars
65            .render_template(template, context)
66            .map_err(|e| anyhow!("Template render error: {e}"))
67    }
68
69    /// Render a template and preserve the JSON value type
70    ///
71    /// If the template is a simple variable reference like `{{payload.field}}`,
72    /// this returns the original JSON value. Otherwise, it returns the rendered string.
73    pub fn render_value(&self, template: &str, context: &JsonValue) -> Result<JsonValue> {
74        // Check if this is a simple variable reference
75        if let Some(path) = extract_simple_path(template) {
76            if let Some(value) = resolve_path(context, &path) {
77                return Ok(value.clone());
78            }
79        }
80
81        // Fall back to string rendering
82        let rendered = self.render_string(template, context)?;
83
84        // Try to parse as JSON, otherwise return as string
85        if rendered.is_empty() {
86            Ok(JsonValue::Null)
87        } else if let Ok(parsed) = serde_json::from_str::<JsonValue>(&rendered) {
88            Ok(parsed)
89        } else {
90            Ok(JsonValue::String(rendered))
91        }
92    }
93
94    /// Process a source mapping and create a SourceChange.
95    ///
96    /// The `context` should be a JSON object containing all variables available
97    /// to templates (e.g., payload, key, topic, headers, etc.).
98    pub fn process_mapping(
99        &self,
100        mapping: &SourceMapping,
101        context: &JsonValue,
102        source_id: &str,
103    ) -> Result<SourceChange> {
104        // Determine the operation
105        let operation = self.resolve_operation(mapping, context)?;
106
107        // Get effective_from timestamp
108        let effective_from = self.resolve_effective_from(mapping, context)?;
109
110        // Build the element based on type
111        let element = self.build_element(mapping, context, source_id, effective_from)?;
112
113        // Create the appropriate SourceChange
114        match operation {
115            OperationType::Insert => Ok(SourceChange::Insert { element }),
116            OperationType::Update => Ok(SourceChange::Update { element }),
117            OperationType::Delete => {
118                // For delete, we only need metadata
119                let metadata = match element {
120                    Element::Node { metadata, .. } => metadata,
121                    Element::Relation { metadata, .. } => metadata,
122                };
123                Ok(SourceChange::Delete { metadata })
124            }
125        }
126    }
127
128    /// Check if a mapping condition matches the given context.
129    ///
130    /// The `headers` parameter is optional and only used for header-based conditions.
131    pub fn condition_matches(
132        &self,
133        condition: &MappingCondition,
134        context: &JsonValue,
135        headers: Option<&std::collections::HashMap<String, String>>,
136    ) -> bool {
137        let value = if let Some(ref header_name) = condition.header {
138            // Check headers
139            headers
140                .and_then(|h| h.get(header_name))
141                .map(|v| JsonValue::String(v.clone()))
142        } else if let Some(ref field_path) = condition.field {
143            // Check context field (look in payload by default)
144            let lookup_path = if field_path.starts_with("payload.")
145                || field_path.starts_with("key")
146                || field_path.starts_with("topic")
147                || field_path.starts_with("headers.")
148                || field_path.starts_with("partition")
149                || field_path.starts_with("offset")
150                || field_path.starts_with("source_id")
151            {
152                field_path.clone()
153            } else {
154                format!("payload.{field_path}")
155            };
156            resolve_path(context, &lookup_path).cloned()
157        } else {
158            return false;
159        };
160
161        let Some(value) = value else {
162            return false;
163        };
164
165        let value_str = match &value {
166            JsonValue::String(s) => s.clone(),
167            JsonValue::Number(n) => n.to_string(),
168            JsonValue::Bool(b) => b.to_string(),
169            _ => serde_json::to_string(&value).unwrap_or_default(),
170        };
171
172        if let Some(ref equals) = condition.equals {
173            return value_str == *equals;
174        }
175
176        if let Some(ref contains) = condition.contains {
177            return value_str.contains(contains.as_str());
178        }
179
180        if let Some(ref regex_str) = condition.regex {
181            if let Ok(mut cache) = self.regex_cache.lock() {
182                let re = cache.entry(regex_str.clone()).or_insert_with(|| {
183                    regex::Regex::new(regex_str)
184                        .unwrap_or_else(|_| regex::Regex::new("(?:)").expect("infallible"))
185                });
186                return re.is_match(&value_str);
187            }
188            // Mutex poisoned — fall back to one-shot compile
189            if let Ok(re) = regex::Regex::new(regex_str) {
190                return re.is_match(&value_str);
191            }
192        }
193
194        false
195    }
196
197    /// Find the first matching mapping from a list based on conditions.
198    ///
199    /// Returns `None` if no mapping matches.
200    pub fn find_matching_mapping<'a>(
201        &self,
202        mappings: &'a [SourceMapping],
203        context: &JsonValue,
204        headers: Option<&std::collections::HashMap<String, String>>,
205    ) -> Option<&'a SourceMapping> {
206        for mapping in mappings {
207            if let Some(ref condition) = mapping.when {
208                if self.condition_matches(condition, context, headers) {
209                    return Some(mapping);
210                }
211            } else {
212                // No condition means always matches
213                return Some(mapping);
214            }
215        }
216        None
217    }
218
219    /// Resolve the operation type from mapping configuration
220    fn resolve_operation(
221        &self,
222        mapping: &SourceMapping,
223        context: &JsonValue,
224    ) -> Result<OperationType> {
225        // If static operation is defined, use it
226        if let Some(ref op) = mapping.operation {
227            return Ok(op.clone());
228        }
229
230        // Otherwise, extract from context using operation_from
231        let op_path = mapping
232            .operation_from
233            .as_ref()
234            .ok_or_else(|| anyhow!("No operation or operation_from specified"))?;
235
236        let op_map = mapping
237            .operation_map
238            .as_ref()
239            .ok_or_else(|| anyhow!("operation_map required when using operation_from"))?;
240
241        // Resolve the path value
242        let value = resolve_path(context, op_path)
243            .ok_or_else(|| anyhow!("operation_from path '{op_path}' not found in context"))?;
244
245        let value_str = match value {
246            JsonValue::String(s) => s.clone(),
247            JsonValue::Number(n) => n.to_string(),
248            JsonValue::Bool(b) => b.to_string(),
249            _ => return Err(anyhow!("operation_from value must be a string or number")),
250        };
251
252        op_map
253            .get(&value_str)
254            .cloned()
255            .ok_or_else(|| anyhow!("No operation mapping found for value '{value_str}'"))
256    }
257
258    /// Resolve effective_from timestamp
259    fn resolve_effective_from(&self, mapping: &SourceMapping, context: &JsonValue) -> Result<u64> {
260        let Some(ref config) = mapping.effective_from else {
261            return Ok(current_time_millis());
262        };
263
264        let (template, format) = match config {
265            EffectiveFromConfig::Simple(t) => (t.as_str(), None),
266            EffectiveFromConfig::Explicit { value, format } => (value.as_str(), Some(format)),
267        };
268
269        let rendered = self.render_string(template, context)?;
270        if rendered.is_empty() {
271            return Ok(current_time_millis());
272        }
273
274        parse_timestamp(&rendered, format)
275    }
276
277    /// Build an Element from the template
278    fn build_element(
279        &self,
280        mapping: &SourceMapping,
281        context: &JsonValue,
282        source_id: &str,
283        effective_from: u64,
284    ) -> Result<Element> {
285        let template = &mapping.template;
286
287        // Render ID
288        let id = self.render_string(&template.id, context)?;
289        if id.is_empty() {
290            return Err(anyhow!("Template rendered empty ID"));
291        }
292
293        // Render labels
294        let labels: Result<Vec<Arc<str>>> = template
295            .labels
296            .iter()
297            .map(|l| {
298                let rendered = self.render_string(l, context)?;
299                Ok(Arc::from(rendered.as_str()))
300            })
301            .collect();
302        let labels = labels?;
303
304        // Build metadata
305        let metadata = ElementMetadata {
306            reference: ElementReference {
307                source_id: Arc::from(source_id),
308                element_id: Arc::from(id.as_str()),
309            },
310            labels: Arc::from(labels),
311            effective_from,
312        };
313
314        // Render properties
315        let properties = self.render_properties(template, context)?;
316
317        match mapping.element_type {
318            ElementType::Node => Ok(Element::Node {
319                metadata,
320                properties,
321            }),
322            ElementType::Relation => {
323                let from_template = template
324                    .from
325                    .as_ref()
326                    .ok_or_else(|| anyhow!("Relation template missing 'from' field"))?;
327                let to_template = template
328                    .to
329                    .as_ref()
330                    .ok_or_else(|| anyhow!("Relation template missing 'to' field"))?;
331
332                let from_id = self.render_string(from_template, context)?;
333                let to_id = self.render_string(to_template, context)?;
334
335                Ok(Element::Relation {
336                    metadata,
337                    properties,
338                    in_node: ElementReference {
339                        source_id: Arc::from(source_id),
340                        element_id: Arc::from(to_id.as_str()),
341                    },
342                    out_node: ElementReference {
343                        source_id: Arc::from(source_id),
344                        element_id: Arc::from(from_id.as_str()),
345                    },
346                })
347            }
348        }
349    }
350
351    /// Render properties from template
352    fn render_properties(
353        &self,
354        template: &ElementTemplate,
355        context: &JsonValue,
356    ) -> Result<ElementPropertyMap> {
357        let mut props = ElementPropertyMap::new();
358
359        let Some(ref prop_value) = template.properties else {
360            return Ok(props);
361        };
362
363        match prop_value {
364            JsonValue::Object(obj) => {
365                for (key, value) in obj {
366                    let rendered = self.render_property_value(value, context)?;
367                    props.insert(key, rendered);
368                }
369            }
370            JsonValue::String(template_str) => {
371                // Single template that should resolve to an object
372                let rendered = self.render_value(template_str, context)?;
373                if let JsonValue::Object(obj) = rendered {
374                    for (key, value) in obj {
375                        props.insert(&key, json_to_element_value(&value)?);
376                    }
377                }
378            }
379            _ => {
380                return Err(anyhow!("Properties must be an object or a template string"));
381            }
382        }
383
384        Ok(props)
385    }
386
387    /// Render a single property value
388    fn render_property_value(
389        &self,
390        value: &JsonValue,
391        context: &JsonValue,
392    ) -> Result<ElementValue> {
393        match value {
394            JsonValue::String(template) => {
395                let rendered = self.render_value(template, context)?;
396                json_to_element_value(&rendered)
397            }
398            JsonValue::Number(n) => {
399                if let Some(i) = n.as_i64() {
400                    Ok(ElementValue::Integer(i))
401                } else if let Some(f) = n.as_f64() {
402                    Ok(ElementValue::Float(OrderedFloat(f)))
403                } else {
404                    Err(anyhow!("Invalid number"))
405                }
406            }
407            JsonValue::Bool(b) => Ok(ElementValue::Bool(*b)),
408            JsonValue::Null => Ok(ElementValue::Null),
409            JsonValue::Array(arr) => {
410                let items: Result<Vec<_>> = arr
411                    .iter()
412                    .map(|v| self.render_property_value(v, context))
413                    .collect();
414                Ok(ElementValue::List(items?))
415            }
416            JsonValue::Object(obj) => {
417                let mut map = ElementPropertyMap::new();
418                for (k, v) in obj {
419                    map.insert(k, self.render_property_value(v, context)?);
420                }
421                Ok(ElementValue::Object(map))
422            }
423        }
424    }
425}
426
427impl Default for SourceMappingEngine {
428    fn default() -> Self {
429        Self::new()
430    }
431}
432
433/// Convert JSON value to ElementValue
434pub fn json_to_element_value(value: &JsonValue) -> Result<ElementValue> {
435    match value {
436        JsonValue::Null => Ok(ElementValue::Null),
437        JsonValue::Bool(b) => Ok(ElementValue::Bool(*b)),
438        JsonValue::Number(n) => {
439            if let Some(i) = n.as_i64() {
440                Ok(ElementValue::Integer(i))
441            } else if let Some(f) = n.as_f64() {
442                Ok(ElementValue::Float(OrderedFloat(f)))
443            } else {
444                Err(anyhow!("Invalid number value"))
445            }
446        }
447        JsonValue::String(s) => Ok(ElementValue::String(Arc::from(s.as_str()))),
448        JsonValue::Array(arr) => {
449            let items: Result<Vec<_>> = arr.iter().map(json_to_element_value).collect();
450            Ok(ElementValue::List(items?))
451        }
452        JsonValue::Object(obj) => {
453            let mut map = ElementPropertyMap::new();
454            for (k, v) in obj {
455                map.insert(k, json_to_element_value(v)?);
456            }
457            Ok(ElementValue::Object(map))
458        }
459    }
460}
461
462/// Register custom Handlebars helpers
463fn register_helpers(handlebars: &mut Handlebars) {
464    // lowercase helper
465    handlebars.register_helper(
466        "lowercase",
467        Box::new(
468            |h: &Helper,
469             _: &Handlebars,
470             _: &Context,
471             _: &mut RenderContext,
472             out: &mut dyn Output|
473             -> HelperResult {
474                let param = h
475                    .param(0)
476                    .ok_or(RenderErrorReason::ParamNotFoundForIndex("lowercase", 0))?;
477                let value = param.value().as_str().unwrap_or("");
478                out.write(&value.to_lowercase())?;
479                Ok(())
480            },
481        ),
482    );
483
484    // uppercase helper
485    handlebars.register_helper(
486        "uppercase",
487        Box::new(
488            |h: &Helper,
489             _: &Handlebars,
490             _: &Context,
491             _: &mut RenderContext,
492             out: &mut dyn Output|
493             -> HelperResult {
494                let param = h
495                    .param(0)
496                    .ok_or(RenderErrorReason::ParamNotFoundForIndex("uppercase", 0))?;
497                let value = param.value().as_str().unwrap_or("");
498                out.write(&value.to_uppercase())?;
499                Ok(())
500            },
501        ),
502    );
503
504    // now helper - returns current timestamp in milliseconds
505    handlebars.register_helper(
506        "now",
507        Box::new(
508            |_: &Helper,
509             _: &Handlebars,
510             _: &Context,
511             _: &mut RenderContext,
512             out: &mut dyn Output|
513             -> HelperResult {
514                out.write(&current_time_millis().to_string())?;
515                Ok(())
516            },
517        ),
518    );
519
520    // concat helper
521    handlebars.register_helper(
522        "concat",
523        Box::new(
524            |h: &Helper,
525             _: &Handlebars,
526             _: &Context,
527             _: &mut RenderContext,
528             out: &mut dyn Output|
529             -> HelperResult {
530                let mut result = String::new();
531                for param in h.params() {
532                    if let Some(s) = param.value().as_str() {
533                        result.push_str(s);
534                    } else {
535                        result.push_str(&param.value().to_string());
536                    }
537                }
538                out.write(&result)?;
539                Ok(())
540            },
541        ),
542    );
543
544    // default helper
545    handlebars.register_helper(
546        "default",
547        Box::new(
548            |h: &Helper,
549             _: &Handlebars,
550             _: &Context,
551             _: &mut RenderContext,
552             out: &mut dyn Output|
553             -> HelperResult {
554                let value = h.param(0).map(|p| p.value());
555                let default = h.param(1).map(|p| p.value());
556
557                let output = match value {
558                    Some(v) if !v.is_null() && v.as_str() != Some("") => v,
559                    _ => default.unwrap_or(&JsonValue::Null),
560                };
561
562                if let Some(s) = output.as_str() {
563                    out.write(s)?;
564                } else {
565                    out.write(&output.to_string())?;
566                }
567                Ok(())
568            },
569        ),
570    );
571
572    // json helper - serialize value to JSON string
573    handlebars.register_helper(
574        "json",
575        Box::new(
576            |h: &Helper,
577             _: &Handlebars,
578             _: &Context,
579             _: &mut RenderContext,
580             out: &mut dyn Output|
581             -> HelperResult {
582                let param = h
583                    .param(0)
584                    .ok_or(RenderErrorReason::ParamNotFoundForIndex("json", 0))?;
585                let json_str =
586                    serde_json::to_string(param.value()).unwrap_or_else(|_| "null".to_string());
587                out.write(&json_str)?;
588                Ok(())
589            },
590        ),
591    );
592}
593
594/// Extract a simple variable path from a template like `{{payload.field}}`
595fn extract_simple_path(template: &str) -> Option<String> {
596    let trimmed = template.trim();
597    if trimmed.starts_with("{{") && trimmed.ends_with("}}") {
598        let inner = trimmed[2..trimmed.len() - 2].trim();
599        // Check if it's a simple path (no spaces, no helpers)
600        if !inner.contains(' ') && !inner.contains('#') && !inner.contains('/') {
601            return Some(inner.to_string());
602        }
603    }
604    None
605}
606
607/// Resolve a dot-separated path in a JSON value
608fn resolve_path<'a>(value: &'a JsonValue, path: &str) -> Option<&'a JsonValue> {
609    let mut current = value;
610    for part in path.split('.') {
611        current = match current {
612            JsonValue::Object(obj) => obj.get(part)?,
613            JsonValue::Array(arr) => {
614                let index: usize = part.parse().ok()?;
615                arr.get(index)?
616            }
617            _ => return None,
618        };
619    }
620    Some(current)
621}
622
623/// Get current time in milliseconds
624fn current_time_millis() -> u64 {
625    std::time::SystemTime::now()
626        .duration_since(std::time::UNIX_EPOCH)
627        .map(|d| d.as_millis() as u64)
628        .unwrap_or(0)
629}
630
631/// Parse a timestamp string into milliseconds since epoch
632fn parse_timestamp(value: &str, format: Option<&TimestampFormat>) -> Result<u64> {
633    if let Some(fmt) = format {
634        return parse_with_format(value, fmt);
635    }
636
637    // Auto-detect format
638    let trimmed = value.trim();
639
640    // Try ISO 8601 first (contains 'T' or '-')
641    if trimmed.contains('T') || (trimmed.contains('-') && !trimmed.starts_with('-')) {
642        if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(trimmed) {
643            return Ok(dt.timestamp_millis() as u64);
644        }
645        // Try without timezone
646        if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S") {
647            return Ok(dt.and_utc().timestamp_millis() as u64);
648        }
649        if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S%.f") {
650            return Ok(dt.and_utc().timestamp_millis() as u64);
651        }
652    }
653
654    // Try parsing as number
655    if let Ok(num) = trimmed.parse::<i64>() {
656        let abs = num.unsigned_abs();
657        // Heuristic based on magnitude
658        if abs < 10_000_000_000 {
659            // Seconds (before year 2286)
660            return Ok(abs * 1000);
661        } else if abs < 10_000_000_000_000 {
662            // Milliseconds
663            return Ok(abs);
664        } else {
665            // Nanoseconds
666            return Ok(abs / 1_000_000);
667        }
668    }
669
670    Err(anyhow!(
671        "Unable to parse timestamp '{value}'. Expected ISO 8601 or Unix timestamp"
672    ))
673}
674
675/// Parse timestamp with explicit format
676fn parse_with_format(value: &str, format: &TimestampFormat) -> Result<u64> {
677    match format {
678        TimestampFormat::Iso8601 => {
679            let dt = chrono::DateTime::parse_from_rfc3339(value.trim())
680                .map_err(|e| anyhow!("Invalid ISO 8601 timestamp: {e}"))?;
681            Ok(dt.timestamp_millis() as u64)
682        }
683        TimestampFormat::UnixSeconds => {
684            let secs: i64 = value
685                .trim()
686                .parse()
687                .map_err(|e| anyhow!("Invalid Unix seconds: {e}"))?;
688            if secs < 0 {
689                return Err(anyhow!("Negative Unix timestamp not supported: {secs}"));
690            }
691            Ok((secs as u64) * 1000)
692        }
693        TimestampFormat::UnixMillis => {
694            let millis: u64 = value
695                .trim()
696                .parse()
697                .map_err(|e| anyhow!("Invalid Unix milliseconds: {e}"))?;
698            Ok(millis)
699        }
700        TimestampFormat::UnixNanos => {
701            let nanos: u64 = value
702                .trim()
703                .parse()
704                .map_err(|e| anyhow!("Invalid Unix nanoseconds: {e}"))?;
705            Ok(nanos / 1_000_000)
706        }
707    }
708}
709
710#[cfg(test)]
711mod tests {
712    use super::*;
713    use crate::config::{ElementTemplate, SourceMapping};
714
715    fn create_test_context() -> JsonValue {
716        serde_json::json!({
717            "payload": {
718                "id": "123",
719                "name": "Test Order",
720                "customer": "Alice",
721                "total": 150,
722                "status": "pending",
723                "metadata": { "source": "webhook" }
724            },
725            "key": "order-123",
726            "topic": "orders",
727            "partition": 0,
728            "offset": 42,
729            "source_id": "test-source"
730        })
731    }
732
733    #[test]
734    fn test_render_simple_template() {
735        let engine = SourceMappingEngine::new();
736        let context = create_test_context();
737
738        let result = engine.render_string("{{payload.id}}", &context).unwrap();
739        assert_eq!(result, "123");
740    }
741
742    #[test]
743    fn test_render_value_preserves_type() {
744        let engine = SourceMappingEngine::new();
745        let context = create_test_context();
746
747        let result = engine.render_value("{{payload.total}}", &context).unwrap();
748        assert_eq!(result, JsonValue::Number(150.into()));
749    }
750
751    #[test]
752    fn test_render_value_preserves_object() {
753        let engine = SourceMappingEngine::new();
754        let context = create_test_context();
755
756        let result = engine
757            .render_value("{{payload.metadata}}", &context)
758            .unwrap();
759        assert_eq!(result, serde_json::json!({"source": "webhook"}));
760    }
761
762    #[test]
763    fn test_process_mapping_node_insert() {
764        let engine = SourceMappingEngine::new();
765        let context = create_test_context();
766
767        let mapping = SourceMapping {
768            when: None,
769            operation: Some(OperationType::Insert),
770            operation_from: None,
771            operation_map: None,
772            element_type: ElementType::Node,
773            effective_from: None,
774            template: ElementTemplate {
775                id: "{{key}}".to_string(),
776                labels: vec!["Order".to_string()],
777                properties: Some(JsonValue::String("{{payload}}".to_string())),
778                from: None,
779                to: None,
780            },
781        };
782
783        let result = engine
784            .process_mapping(&mapping, &context, "test-source")
785            .unwrap();
786        match result {
787            SourceChange::Insert { element } => {
788                match element {
789                    Element::Node {
790                        metadata,
791                        properties,
792                    } => {
793                        assert_eq!(metadata.reference.element_id.as_ref(), "order-123");
794                        assert_eq!(metadata.labels[0].as_ref(), "Order");
795                        // Properties should contain fields from payload
796                        assert!(properties.get("customer").is_some());
797                        assert!(properties.get("total").is_some());
798                    }
799                    _ => panic!("Expected Node element"),
800                }
801            }
802            _ => panic!("Expected Insert"),
803        }
804    }
805
806    #[test]
807    fn test_process_mapping_with_operation_from() {
808        let engine = SourceMappingEngine::new();
809        let context = serde_json::json!({
810            "payload": {
811                "action": "updated",
812                "id": "order-1",
813                "total": 200
814            },
815            "key": "order-1"
816        });
817
818        let mut op_map = std::collections::HashMap::new();
819        op_map.insert("created".to_string(), OperationType::Insert);
820        op_map.insert("updated".to_string(), OperationType::Update);
821        op_map.insert("deleted".to_string(), OperationType::Delete);
822
823        let mapping = SourceMapping {
824            when: None,
825            operation: None,
826            operation_from: Some("payload.action".to_string()),
827            operation_map: Some(op_map),
828            element_type: ElementType::Node,
829            effective_from: None,
830            template: ElementTemplate {
831                id: "{{payload.id}}".to_string(),
832                labels: vec!["Order".to_string()],
833                properties: Some(serde_json::json!({
834                    "total": "{{payload.total}}"
835                })),
836                from: None,
837                to: None,
838            },
839        };
840
841        let result = engine
842            .process_mapping(&mapping, &context, "test-source")
843            .unwrap();
844        assert!(matches!(result, SourceChange::Update { .. }));
845    }
846
847    #[test]
848    fn test_process_mapping_relation() {
849        let engine = SourceMappingEngine::new();
850        let context = serde_json::json!({
851            "payload": {
852                "id": "rel-1",
853                "customer_id": "cust-1",
854                "order_id": "order-1",
855                "quantity": 5
856            },
857            "key": "rel-1"
858        });
859
860        let mapping = SourceMapping {
861            when: None,
862            operation: Some(OperationType::Insert),
863            operation_from: None,
864            operation_map: None,
865            element_type: ElementType::Relation,
866            effective_from: None,
867            template: ElementTemplate {
868                id: "{{payload.id}}".to_string(),
869                labels: vec!["PURCHASED".to_string()],
870                properties: Some(serde_json::json!({
871                    "quantity": "{{payload.quantity}}"
872                })),
873                from: Some("{{payload.customer_id}}".to_string()),
874                to: Some("{{payload.order_id}}".to_string()),
875            },
876        };
877
878        let result = engine
879            .process_mapping(&mapping, &context, "test-source")
880            .unwrap();
881        match result {
882            SourceChange::Insert { element } => match element {
883                Element::Relation {
884                    metadata,
885                    out_node,
886                    in_node,
887                    ..
888                } => {
889                    assert_eq!(metadata.reference.element_id.as_ref(), "rel-1");
890                    assert_eq!(metadata.labels[0].as_ref(), "PURCHASED");
891                    assert_eq!(out_node.element_id.as_ref(), "cust-1");
892                    assert_eq!(in_node.element_id.as_ref(), "order-1");
893                }
894                _ => panic!("Expected Relation element"),
895            },
896            _ => panic!("Expected Insert"),
897        }
898    }
899
900    #[test]
901    fn test_condition_matches_field_equals() {
902        let engine = SourceMappingEngine::new();
903        let context = serde_json::json!({
904            "payload": {
905                "type": "order",
906                "id": "123"
907            }
908        });
909
910        let condition = MappingCondition {
911            header: None,
912            field: Some("type".to_string()),
913            equals: Some("order".to_string()),
914            contains: None,
915            regex: None,
916        };
917
918        assert!(engine.condition_matches(&condition, &context, None));
919    }
920
921    #[test]
922    fn test_condition_matches_field_not_equals() {
923        let engine = SourceMappingEngine::new();
924        let context = serde_json::json!({
925            "payload": {
926                "type": "shipment",
927                "id": "123"
928            }
929        });
930
931        let condition = MappingCondition {
932            header: None,
933            field: Some("type".to_string()),
934            equals: Some("order".to_string()),
935            contains: None,
936            regex: None,
937        };
938
939        assert!(!engine.condition_matches(&condition, &context, None));
940    }
941
942    #[test]
943    fn test_json_to_element_value_types() {
944        let null_val = json_to_element_value(&JsonValue::Null).unwrap();
945        assert_eq!(null_val, ElementValue::Null);
946
947        let bool_val = json_to_element_value(&JsonValue::Bool(true)).unwrap();
948        assert_eq!(bool_val, ElementValue::Bool(true));
949
950        let int_val = json_to_element_value(&serde_json::json!(42)).unwrap();
951        assert_eq!(int_val, ElementValue::Integer(42));
952
953        let str_val = json_to_element_value(&serde_json::json!("hello")).unwrap();
954        assert_eq!(str_val, ElementValue::String(Arc::from("hello")));
955    }
956
957    #[test]
958    fn test_helpers_lowercase() {
959        let engine = SourceMappingEngine::new();
960        let context = serde_json::json!({"payload": {"name": "HELLO"}});
961
962        let result = engine
963            .render_string("{{lowercase payload.name}}", &context)
964            .unwrap();
965        assert_eq!(result, "hello");
966    }
967
968    #[test]
969    fn test_helpers_concat() {
970        let engine = SourceMappingEngine::new();
971        let context = serde_json::json!({"payload": {"id": "123"}});
972
973        let result = engine
974            .render_string("{{concat \"prefix-\" payload.id}}", &context)
975            .unwrap();
976        assert_eq!(result, "prefix-123");
977    }
978
979    #[test]
980    fn test_extract_simple_path_basic() {
981        assert_eq!(
982            extract_simple_path("{{payload.name}}"),
983            Some("payload.name".to_string())
984        );
985    }
986
987    #[test]
988    fn test_extract_simple_path_with_spaces_around_braces() {
989        assert_eq!(extract_simple_path("{{ key }}"), Some("key".to_string()));
990    }
991
992    #[test]
993    fn test_extract_simple_path_helper_returns_none() {
994        assert_eq!(extract_simple_path("{{#if x}}yes{{/if}}"), None);
995    }
996
997    #[test]
998    fn test_extract_simple_path_with_space_returns_none() {
999        // Contains a space inside — indicates a helper call
1000        assert_eq!(extract_simple_path("{{lowercase payload.name}}"), None);
1001    }
1002
1003    #[test]
1004    fn test_extract_simple_path_not_template() {
1005        assert_eq!(extract_simple_path("plain-text"), None);
1006    }
1007
1008    #[test]
1009    fn test_parse_with_format_iso8601() {
1010        let result = parse_with_format("2024-01-15T10:30:00Z", &TimestampFormat::Iso8601).unwrap();
1011        assert_eq!(result, 1705314600000);
1012    }
1013
1014    #[test]
1015    fn test_parse_with_format_unix_seconds() {
1016        let result = parse_with_format("1705311000", &TimestampFormat::UnixSeconds).unwrap();
1017        assert_eq!(result, 1705311000000);
1018    }
1019
1020    #[test]
1021    fn test_parse_with_format_unix_millis() {
1022        let result = parse_with_format("1705311000123", &TimestampFormat::UnixMillis).unwrap();
1023        assert_eq!(result, 1705311000123);
1024    }
1025
1026    #[test]
1027    fn test_parse_with_format_unix_nanos() {
1028        let result = parse_with_format("1705311000123456789", &TimestampFormat::UnixNanos).unwrap();
1029        assert_eq!(result, 1705311000123);
1030    }
1031
1032    #[test]
1033    fn test_parse_with_format_negative_seconds_rejected() {
1034        let result = parse_with_format("-100", &TimestampFormat::UnixSeconds);
1035        assert!(result.is_err());
1036        assert!(result.unwrap_err().to_string().contains("Negative"));
1037    }
1038
1039    #[test]
1040    fn test_parse_timestamp_auto_detect_seconds() {
1041        // Under 10 billion → treated as seconds
1042        let result = parse_timestamp("1705311000", None).unwrap();
1043        assert_eq!(result, 1705311000000);
1044    }
1045
1046    #[test]
1047    fn test_parse_timestamp_auto_detect_millis() {
1048        // Between 10B and 10T → treated as milliseconds
1049        let result = parse_timestamp("1705311000123", None).unwrap();
1050        assert_eq!(result, 1705311000123);
1051    }
1052
1053    #[test]
1054    fn test_parse_timestamp_auto_detect_nanos() {
1055        // Over 10T → treated as nanoseconds
1056        let result = parse_timestamp("1705311000123456789", None).unwrap();
1057        assert_eq!(result, 1705311000123);
1058    }
1059
1060    #[test]
1061    fn test_parse_timestamp_iso8601() {
1062        let result = parse_timestamp("2024-01-15T10:30:00Z", None).unwrap();
1063        assert_eq!(result, 1705314600000);
1064    }
1065}