1use crate::config::{
18 EffectiveFromConfig, ElementTemplate, ElementType, MappingCondition, OperationType,
19 SourceMapping, TimestampFormat,
20};
21use anyhow::{anyhow, Result};
22use drasi_core::models::{
23 Element, ElementMetadata, ElementPropertyMap, ElementReference, ElementValue, SourceChange,
24};
25use handlebars::{
26 Context, Handlebars, Helper, HelperResult, Output, RenderContext, RenderErrorReason,
27};
28use ordered_float::OrderedFloat;
29use regex::Regex;
30use serde_json::Value as JsonValue;
31use std::collections::HashMap;
32use std::sync::Arc;
33
34pub struct SourceMappingEngine {
44 handlebars: Handlebars<'static>,
45 regex_cache: std::sync::Mutex<HashMap<String, Regex>>,
46}
47
48impl SourceMappingEngine {
49 pub fn new() -> Self {
51 let mut handlebars = Handlebars::new();
52 handlebars.set_strict_mode(false);
53
54 register_helpers(&mut handlebars);
55
56 Self {
57 handlebars,
58 regex_cache: std::sync::Mutex::new(HashMap::new()),
59 }
60 }
61
62 pub fn render_string(&self, template: &str, context: &JsonValue) -> Result<String> {
64 self.handlebars
65 .render_template(template, context)
66 .map_err(|e| anyhow!("Template render error: {e}"))
67 }
68
69 pub fn render_value(&self, template: &str, context: &JsonValue) -> Result<JsonValue> {
74 if let Some(path) = extract_simple_path(template) {
76 if let Some(value) = resolve_path(context, &path) {
77 return Ok(value.clone());
78 }
79 }
80
81 let rendered = self.render_string(template, context)?;
83
84 if rendered.is_empty() {
86 Ok(JsonValue::Null)
87 } else if let Ok(parsed) = serde_json::from_str::<JsonValue>(&rendered) {
88 Ok(parsed)
89 } else {
90 Ok(JsonValue::String(rendered))
91 }
92 }
93
94 pub fn process_mapping(
99 &self,
100 mapping: &SourceMapping,
101 context: &JsonValue,
102 source_id: &str,
103 ) -> Result<SourceChange> {
104 let operation = self.resolve_operation(mapping, context)?;
106
107 let effective_from = self.resolve_effective_from(mapping, context)?;
109
110 let element = self.build_element(mapping, context, source_id, effective_from)?;
112
113 match operation {
115 OperationType::Insert => Ok(SourceChange::Insert { element }),
116 OperationType::Update => Ok(SourceChange::Update { element }),
117 OperationType::Delete => {
118 let metadata = match element {
120 Element::Node { metadata, .. } => metadata,
121 Element::Relation { metadata, .. } => metadata,
122 };
123 Ok(SourceChange::Delete { metadata })
124 }
125 }
126 }
127
128 pub fn condition_matches(
132 &self,
133 condition: &MappingCondition,
134 context: &JsonValue,
135 headers: Option<&std::collections::HashMap<String, String>>,
136 ) -> bool {
137 let value = if let Some(ref header_name) = condition.header {
138 headers
140 .and_then(|h| h.get(header_name))
141 .map(|v| JsonValue::String(v.clone()))
142 } else if let Some(ref field_path) = condition.field {
143 let lookup_path = if field_path.starts_with("payload.")
145 || field_path.starts_with("key")
146 || field_path.starts_with("topic")
147 || field_path.starts_with("headers.")
148 || field_path.starts_with("partition")
149 || field_path.starts_with("offset")
150 || field_path.starts_with("source_id")
151 {
152 field_path.clone()
153 } else {
154 format!("payload.{field_path}")
155 };
156 resolve_path(context, &lookup_path).cloned()
157 } else {
158 return false;
159 };
160
161 let Some(value) = value else {
162 return false;
163 };
164
165 let value_str = match &value {
166 JsonValue::String(s) => s.clone(),
167 JsonValue::Number(n) => n.to_string(),
168 JsonValue::Bool(b) => b.to_string(),
169 _ => serde_json::to_string(&value).unwrap_or_default(),
170 };
171
172 if let Some(ref equals) = condition.equals {
173 return value_str == *equals;
174 }
175
176 if let Some(ref contains) = condition.contains {
177 return value_str.contains(contains.as_str());
178 }
179
180 if let Some(ref regex_str) = condition.regex {
181 if let Ok(mut cache) = self.regex_cache.lock() {
182 let re = cache.entry(regex_str.clone()).or_insert_with(|| {
183 regex::Regex::new(regex_str)
184 .unwrap_or_else(|_| regex::Regex::new("(?:)").expect("infallible"))
185 });
186 return re.is_match(&value_str);
187 }
188 if let Ok(re) = regex::Regex::new(regex_str) {
190 return re.is_match(&value_str);
191 }
192 }
193
194 false
195 }
196
197 pub fn find_matching_mapping<'a>(
201 &self,
202 mappings: &'a [SourceMapping],
203 context: &JsonValue,
204 headers: Option<&std::collections::HashMap<String, String>>,
205 ) -> Option<&'a SourceMapping> {
206 for mapping in mappings {
207 if let Some(ref condition) = mapping.when {
208 if self.condition_matches(condition, context, headers) {
209 return Some(mapping);
210 }
211 } else {
212 return Some(mapping);
214 }
215 }
216 None
217 }
218
219 fn resolve_operation(
221 &self,
222 mapping: &SourceMapping,
223 context: &JsonValue,
224 ) -> Result<OperationType> {
225 if let Some(ref op) = mapping.operation {
227 return Ok(op.clone());
228 }
229
230 let op_path = mapping
232 .operation_from
233 .as_ref()
234 .ok_or_else(|| anyhow!("No operation or operation_from specified"))?;
235
236 let op_map = mapping
237 .operation_map
238 .as_ref()
239 .ok_or_else(|| anyhow!("operation_map required when using operation_from"))?;
240
241 let value = resolve_path(context, op_path)
243 .ok_or_else(|| anyhow!("operation_from path '{op_path}' not found in context"))?;
244
245 let value_str = match value {
246 JsonValue::String(s) => s.clone(),
247 JsonValue::Number(n) => n.to_string(),
248 JsonValue::Bool(b) => b.to_string(),
249 _ => return Err(anyhow!("operation_from value must be a string or number")),
250 };
251
252 op_map
253 .get(&value_str)
254 .cloned()
255 .ok_or_else(|| anyhow!("No operation mapping found for value '{value_str}'"))
256 }
257
258 fn resolve_effective_from(&self, mapping: &SourceMapping, context: &JsonValue) -> Result<u64> {
260 let Some(ref config) = mapping.effective_from else {
261 return Ok(current_time_millis());
262 };
263
264 let (template, format) = match config {
265 EffectiveFromConfig::Simple(t) => (t.as_str(), None),
266 EffectiveFromConfig::Explicit { value, format } => (value.as_str(), Some(format)),
267 };
268
269 let rendered = self.render_string(template, context)?;
270 if rendered.is_empty() {
271 return Ok(current_time_millis());
272 }
273
274 parse_timestamp(&rendered, format)
275 }
276
277 fn build_element(
279 &self,
280 mapping: &SourceMapping,
281 context: &JsonValue,
282 source_id: &str,
283 effective_from: u64,
284 ) -> Result<Element> {
285 let template = &mapping.template;
286
287 let id = self.render_string(&template.id, context)?;
289 if id.is_empty() {
290 return Err(anyhow!("Template rendered empty ID"));
291 }
292
293 let labels: Result<Vec<Arc<str>>> = template
295 .labels
296 .iter()
297 .map(|l| {
298 let rendered = self.render_string(l, context)?;
299 Ok(Arc::from(rendered.as_str()))
300 })
301 .collect();
302 let labels = labels?;
303
304 let metadata = ElementMetadata {
306 reference: ElementReference {
307 source_id: Arc::from(source_id),
308 element_id: Arc::from(id.as_str()),
309 },
310 labels: Arc::from(labels),
311 effective_from,
312 };
313
314 let properties = self.render_properties(template, context)?;
316
317 match mapping.element_type {
318 ElementType::Node => Ok(Element::Node {
319 metadata,
320 properties,
321 }),
322 ElementType::Relation => {
323 let from_template = template
324 .from
325 .as_ref()
326 .ok_or_else(|| anyhow!("Relation template missing 'from' field"))?;
327 let to_template = template
328 .to
329 .as_ref()
330 .ok_or_else(|| anyhow!("Relation template missing 'to' field"))?;
331
332 let from_id = self.render_string(from_template, context)?;
333 let to_id = self.render_string(to_template, context)?;
334
335 Ok(Element::Relation {
336 metadata,
337 properties,
338 in_node: ElementReference {
339 source_id: Arc::from(source_id),
340 element_id: Arc::from(to_id.as_str()),
341 },
342 out_node: ElementReference {
343 source_id: Arc::from(source_id),
344 element_id: Arc::from(from_id.as_str()),
345 },
346 })
347 }
348 }
349 }
350
351 fn render_properties(
353 &self,
354 template: &ElementTemplate,
355 context: &JsonValue,
356 ) -> Result<ElementPropertyMap> {
357 let mut props = ElementPropertyMap::new();
358
359 let Some(ref prop_value) = template.properties else {
360 return Ok(props);
361 };
362
363 match prop_value {
364 JsonValue::Object(obj) => {
365 for (key, value) in obj {
366 let rendered = self.render_property_value(value, context)?;
367 props.insert(key, rendered);
368 }
369 }
370 JsonValue::String(template_str) => {
371 let rendered = self.render_value(template_str, context)?;
373 if let JsonValue::Object(obj) = rendered {
374 for (key, value) in obj {
375 props.insert(&key, json_to_element_value(&value)?);
376 }
377 }
378 }
379 _ => {
380 return Err(anyhow!("Properties must be an object or a template string"));
381 }
382 }
383
384 Ok(props)
385 }
386
387 fn render_property_value(
389 &self,
390 value: &JsonValue,
391 context: &JsonValue,
392 ) -> Result<ElementValue> {
393 match value {
394 JsonValue::String(template) => {
395 let rendered = self.render_value(template, context)?;
396 json_to_element_value(&rendered)
397 }
398 JsonValue::Number(n) => {
399 if let Some(i) = n.as_i64() {
400 Ok(ElementValue::Integer(i))
401 } else if let Some(f) = n.as_f64() {
402 Ok(ElementValue::Float(OrderedFloat(f)))
403 } else {
404 Err(anyhow!("Invalid number"))
405 }
406 }
407 JsonValue::Bool(b) => Ok(ElementValue::Bool(*b)),
408 JsonValue::Null => Ok(ElementValue::Null),
409 JsonValue::Array(arr) => {
410 let items: Result<Vec<_>> = arr
411 .iter()
412 .map(|v| self.render_property_value(v, context))
413 .collect();
414 Ok(ElementValue::List(items?))
415 }
416 JsonValue::Object(obj) => {
417 let mut map = ElementPropertyMap::new();
418 for (k, v) in obj {
419 map.insert(k, self.render_property_value(v, context)?);
420 }
421 Ok(ElementValue::Object(map))
422 }
423 }
424 }
425}
426
427impl Default for SourceMappingEngine {
428 fn default() -> Self {
429 Self::new()
430 }
431}
432
433pub fn json_to_element_value(value: &JsonValue) -> Result<ElementValue> {
435 match value {
436 JsonValue::Null => Ok(ElementValue::Null),
437 JsonValue::Bool(b) => Ok(ElementValue::Bool(*b)),
438 JsonValue::Number(n) => {
439 if let Some(i) = n.as_i64() {
440 Ok(ElementValue::Integer(i))
441 } else if let Some(f) = n.as_f64() {
442 Ok(ElementValue::Float(OrderedFloat(f)))
443 } else {
444 Err(anyhow!("Invalid number value"))
445 }
446 }
447 JsonValue::String(s) => Ok(ElementValue::String(Arc::from(s.as_str()))),
448 JsonValue::Array(arr) => {
449 let items: Result<Vec<_>> = arr.iter().map(json_to_element_value).collect();
450 Ok(ElementValue::List(items?))
451 }
452 JsonValue::Object(obj) => {
453 let mut map = ElementPropertyMap::new();
454 for (k, v) in obj {
455 map.insert(k, json_to_element_value(v)?);
456 }
457 Ok(ElementValue::Object(map))
458 }
459 }
460}
461
462fn register_helpers(handlebars: &mut Handlebars) {
464 handlebars.register_helper(
466 "lowercase",
467 Box::new(
468 |h: &Helper,
469 _: &Handlebars,
470 _: &Context,
471 _: &mut RenderContext,
472 out: &mut dyn Output|
473 -> HelperResult {
474 let param = h
475 .param(0)
476 .ok_or(RenderErrorReason::ParamNotFoundForIndex("lowercase", 0))?;
477 let value = param.value().as_str().unwrap_or("");
478 out.write(&value.to_lowercase())?;
479 Ok(())
480 },
481 ),
482 );
483
484 handlebars.register_helper(
486 "uppercase",
487 Box::new(
488 |h: &Helper,
489 _: &Handlebars,
490 _: &Context,
491 _: &mut RenderContext,
492 out: &mut dyn Output|
493 -> HelperResult {
494 let param = h
495 .param(0)
496 .ok_or(RenderErrorReason::ParamNotFoundForIndex("uppercase", 0))?;
497 let value = param.value().as_str().unwrap_or("");
498 out.write(&value.to_uppercase())?;
499 Ok(())
500 },
501 ),
502 );
503
504 handlebars.register_helper(
506 "now",
507 Box::new(
508 |_: &Helper,
509 _: &Handlebars,
510 _: &Context,
511 _: &mut RenderContext,
512 out: &mut dyn Output|
513 -> HelperResult {
514 out.write(¤t_time_millis().to_string())?;
515 Ok(())
516 },
517 ),
518 );
519
520 handlebars.register_helper(
522 "concat",
523 Box::new(
524 |h: &Helper,
525 _: &Handlebars,
526 _: &Context,
527 _: &mut RenderContext,
528 out: &mut dyn Output|
529 -> HelperResult {
530 let mut result = String::new();
531 for param in h.params() {
532 if let Some(s) = param.value().as_str() {
533 result.push_str(s);
534 } else {
535 result.push_str(¶m.value().to_string());
536 }
537 }
538 out.write(&result)?;
539 Ok(())
540 },
541 ),
542 );
543
544 handlebars.register_helper(
546 "default",
547 Box::new(
548 |h: &Helper,
549 _: &Handlebars,
550 _: &Context,
551 _: &mut RenderContext,
552 out: &mut dyn Output|
553 -> HelperResult {
554 let value = h.param(0).map(|p| p.value());
555 let default = h.param(1).map(|p| p.value());
556
557 let output = match value {
558 Some(v) if !v.is_null() && v.as_str() != Some("") => v,
559 _ => default.unwrap_or(&JsonValue::Null),
560 };
561
562 if let Some(s) = output.as_str() {
563 out.write(s)?;
564 } else {
565 out.write(&output.to_string())?;
566 }
567 Ok(())
568 },
569 ),
570 );
571
572 handlebars.register_helper(
574 "json",
575 Box::new(
576 |h: &Helper,
577 _: &Handlebars,
578 _: &Context,
579 _: &mut RenderContext,
580 out: &mut dyn Output|
581 -> HelperResult {
582 let param = h
583 .param(0)
584 .ok_or(RenderErrorReason::ParamNotFoundForIndex("json", 0))?;
585 let json_str =
586 serde_json::to_string(param.value()).unwrap_or_else(|_| "null".to_string());
587 out.write(&json_str)?;
588 Ok(())
589 },
590 ),
591 );
592}
593
594fn extract_simple_path(template: &str) -> Option<String> {
596 let trimmed = template.trim();
597 if trimmed.starts_with("{{") && trimmed.ends_with("}}") {
598 let inner = trimmed[2..trimmed.len() - 2].trim();
599 if !inner.contains(' ') && !inner.contains('#') && !inner.contains('/') {
601 return Some(inner.to_string());
602 }
603 }
604 None
605}
606
607fn resolve_path<'a>(value: &'a JsonValue, path: &str) -> Option<&'a JsonValue> {
609 let mut current = value;
610 for part in path.split('.') {
611 current = match current {
612 JsonValue::Object(obj) => obj.get(part)?,
613 JsonValue::Array(arr) => {
614 let index: usize = part.parse().ok()?;
615 arr.get(index)?
616 }
617 _ => return None,
618 };
619 }
620 Some(current)
621}
622
623fn current_time_millis() -> u64 {
625 std::time::SystemTime::now()
626 .duration_since(std::time::UNIX_EPOCH)
627 .map(|d| d.as_millis() as u64)
628 .unwrap_or(0)
629}
630
631fn parse_timestamp(value: &str, format: Option<&TimestampFormat>) -> Result<u64> {
633 if let Some(fmt) = format {
634 return parse_with_format(value, fmt);
635 }
636
637 let trimmed = value.trim();
639
640 if trimmed.contains('T') || (trimmed.contains('-') && !trimmed.starts_with('-')) {
642 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(trimmed) {
643 return Ok(dt.timestamp_millis() as u64);
644 }
645 if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S") {
647 return Ok(dt.and_utc().timestamp_millis() as u64);
648 }
649 if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S%.f") {
650 return Ok(dt.and_utc().timestamp_millis() as u64);
651 }
652 }
653
654 if let Ok(num) = trimmed.parse::<i64>() {
656 let abs = num.unsigned_abs();
657 if abs < 10_000_000_000 {
659 return Ok(abs * 1000);
661 } else if abs < 10_000_000_000_000 {
662 return Ok(abs);
664 } else {
665 return Ok(abs / 1_000_000);
667 }
668 }
669
670 Err(anyhow!(
671 "Unable to parse timestamp '{value}'. Expected ISO 8601 or Unix timestamp"
672 ))
673}
674
675fn parse_with_format(value: &str, format: &TimestampFormat) -> Result<u64> {
677 match format {
678 TimestampFormat::Iso8601 => {
679 let dt = chrono::DateTime::parse_from_rfc3339(value.trim())
680 .map_err(|e| anyhow!("Invalid ISO 8601 timestamp: {e}"))?;
681 Ok(dt.timestamp_millis() as u64)
682 }
683 TimestampFormat::UnixSeconds => {
684 let secs: i64 = value
685 .trim()
686 .parse()
687 .map_err(|e| anyhow!("Invalid Unix seconds: {e}"))?;
688 if secs < 0 {
689 return Err(anyhow!("Negative Unix timestamp not supported: {secs}"));
690 }
691 Ok((secs as u64) * 1000)
692 }
693 TimestampFormat::UnixMillis => {
694 let millis: u64 = value
695 .trim()
696 .parse()
697 .map_err(|e| anyhow!("Invalid Unix milliseconds: {e}"))?;
698 Ok(millis)
699 }
700 TimestampFormat::UnixNanos => {
701 let nanos: u64 = value
702 .trim()
703 .parse()
704 .map_err(|e| anyhow!("Invalid Unix nanoseconds: {e}"))?;
705 Ok(nanos / 1_000_000)
706 }
707 }
708}
709
710#[cfg(test)]
711mod tests {
712 use super::*;
713 use crate::config::{ElementTemplate, SourceMapping};
714
715 fn create_test_context() -> JsonValue {
716 serde_json::json!({
717 "payload": {
718 "id": "123",
719 "name": "Test Order",
720 "customer": "Alice",
721 "total": 150,
722 "status": "pending",
723 "metadata": { "source": "webhook" }
724 },
725 "key": "order-123",
726 "topic": "orders",
727 "partition": 0,
728 "offset": 42,
729 "source_id": "test-source"
730 })
731 }
732
733 #[test]
734 fn test_render_simple_template() {
735 let engine = SourceMappingEngine::new();
736 let context = create_test_context();
737
738 let result = engine.render_string("{{payload.id}}", &context).unwrap();
739 assert_eq!(result, "123");
740 }
741
742 #[test]
743 fn test_render_value_preserves_type() {
744 let engine = SourceMappingEngine::new();
745 let context = create_test_context();
746
747 let result = engine.render_value("{{payload.total}}", &context).unwrap();
748 assert_eq!(result, JsonValue::Number(150.into()));
749 }
750
751 #[test]
752 fn test_render_value_preserves_object() {
753 let engine = SourceMappingEngine::new();
754 let context = create_test_context();
755
756 let result = engine
757 .render_value("{{payload.metadata}}", &context)
758 .unwrap();
759 assert_eq!(result, serde_json::json!({"source": "webhook"}));
760 }
761
762 #[test]
763 fn test_process_mapping_node_insert() {
764 let engine = SourceMappingEngine::new();
765 let context = create_test_context();
766
767 let mapping = SourceMapping {
768 when: None,
769 operation: Some(OperationType::Insert),
770 operation_from: None,
771 operation_map: None,
772 element_type: ElementType::Node,
773 effective_from: None,
774 template: ElementTemplate {
775 id: "{{key}}".to_string(),
776 labels: vec!["Order".to_string()],
777 properties: Some(JsonValue::String("{{payload}}".to_string())),
778 from: None,
779 to: None,
780 },
781 };
782
783 let result = engine
784 .process_mapping(&mapping, &context, "test-source")
785 .unwrap();
786 match result {
787 SourceChange::Insert { element } => {
788 match element {
789 Element::Node {
790 metadata,
791 properties,
792 } => {
793 assert_eq!(metadata.reference.element_id.as_ref(), "order-123");
794 assert_eq!(metadata.labels[0].as_ref(), "Order");
795 assert!(properties.get("customer").is_some());
797 assert!(properties.get("total").is_some());
798 }
799 _ => panic!("Expected Node element"),
800 }
801 }
802 _ => panic!("Expected Insert"),
803 }
804 }
805
806 #[test]
807 fn test_process_mapping_with_operation_from() {
808 let engine = SourceMappingEngine::new();
809 let context = serde_json::json!({
810 "payload": {
811 "action": "updated",
812 "id": "order-1",
813 "total": 200
814 },
815 "key": "order-1"
816 });
817
818 let mut op_map = std::collections::HashMap::new();
819 op_map.insert("created".to_string(), OperationType::Insert);
820 op_map.insert("updated".to_string(), OperationType::Update);
821 op_map.insert("deleted".to_string(), OperationType::Delete);
822
823 let mapping = SourceMapping {
824 when: None,
825 operation: None,
826 operation_from: Some("payload.action".to_string()),
827 operation_map: Some(op_map),
828 element_type: ElementType::Node,
829 effective_from: None,
830 template: ElementTemplate {
831 id: "{{payload.id}}".to_string(),
832 labels: vec!["Order".to_string()],
833 properties: Some(serde_json::json!({
834 "total": "{{payload.total}}"
835 })),
836 from: None,
837 to: None,
838 },
839 };
840
841 let result = engine
842 .process_mapping(&mapping, &context, "test-source")
843 .unwrap();
844 assert!(matches!(result, SourceChange::Update { .. }));
845 }
846
847 #[test]
848 fn test_process_mapping_relation() {
849 let engine = SourceMappingEngine::new();
850 let context = serde_json::json!({
851 "payload": {
852 "id": "rel-1",
853 "customer_id": "cust-1",
854 "order_id": "order-1",
855 "quantity": 5
856 },
857 "key": "rel-1"
858 });
859
860 let mapping = SourceMapping {
861 when: None,
862 operation: Some(OperationType::Insert),
863 operation_from: None,
864 operation_map: None,
865 element_type: ElementType::Relation,
866 effective_from: None,
867 template: ElementTemplate {
868 id: "{{payload.id}}".to_string(),
869 labels: vec!["PURCHASED".to_string()],
870 properties: Some(serde_json::json!({
871 "quantity": "{{payload.quantity}}"
872 })),
873 from: Some("{{payload.customer_id}}".to_string()),
874 to: Some("{{payload.order_id}}".to_string()),
875 },
876 };
877
878 let result = engine
879 .process_mapping(&mapping, &context, "test-source")
880 .unwrap();
881 match result {
882 SourceChange::Insert { element } => match element {
883 Element::Relation {
884 metadata,
885 out_node,
886 in_node,
887 ..
888 } => {
889 assert_eq!(metadata.reference.element_id.as_ref(), "rel-1");
890 assert_eq!(metadata.labels[0].as_ref(), "PURCHASED");
891 assert_eq!(out_node.element_id.as_ref(), "cust-1");
892 assert_eq!(in_node.element_id.as_ref(), "order-1");
893 }
894 _ => panic!("Expected Relation element"),
895 },
896 _ => panic!("Expected Insert"),
897 }
898 }
899
900 #[test]
901 fn test_condition_matches_field_equals() {
902 let engine = SourceMappingEngine::new();
903 let context = serde_json::json!({
904 "payload": {
905 "type": "order",
906 "id": "123"
907 }
908 });
909
910 let condition = MappingCondition {
911 header: None,
912 field: Some("type".to_string()),
913 equals: Some("order".to_string()),
914 contains: None,
915 regex: None,
916 };
917
918 assert!(engine.condition_matches(&condition, &context, None));
919 }
920
921 #[test]
922 fn test_condition_matches_field_not_equals() {
923 let engine = SourceMappingEngine::new();
924 let context = serde_json::json!({
925 "payload": {
926 "type": "shipment",
927 "id": "123"
928 }
929 });
930
931 let condition = MappingCondition {
932 header: None,
933 field: Some("type".to_string()),
934 equals: Some("order".to_string()),
935 contains: None,
936 regex: None,
937 };
938
939 assert!(!engine.condition_matches(&condition, &context, None));
940 }
941
942 #[test]
943 fn test_json_to_element_value_types() {
944 let null_val = json_to_element_value(&JsonValue::Null).unwrap();
945 assert_eq!(null_val, ElementValue::Null);
946
947 let bool_val = json_to_element_value(&JsonValue::Bool(true)).unwrap();
948 assert_eq!(bool_val, ElementValue::Bool(true));
949
950 let int_val = json_to_element_value(&serde_json::json!(42)).unwrap();
951 assert_eq!(int_val, ElementValue::Integer(42));
952
953 let str_val = json_to_element_value(&serde_json::json!("hello")).unwrap();
954 assert_eq!(str_val, ElementValue::String(Arc::from("hello")));
955 }
956
957 #[test]
958 fn test_helpers_lowercase() {
959 let engine = SourceMappingEngine::new();
960 let context = serde_json::json!({"payload": {"name": "HELLO"}});
961
962 let result = engine
963 .render_string("{{lowercase payload.name}}", &context)
964 .unwrap();
965 assert_eq!(result, "hello");
966 }
967
968 #[test]
969 fn test_helpers_concat() {
970 let engine = SourceMappingEngine::new();
971 let context = serde_json::json!({"payload": {"id": "123"}});
972
973 let result = engine
974 .render_string("{{concat \"prefix-\" payload.id}}", &context)
975 .unwrap();
976 assert_eq!(result, "prefix-123");
977 }
978
979 #[test]
980 fn test_extract_simple_path_basic() {
981 assert_eq!(
982 extract_simple_path("{{payload.name}}"),
983 Some("payload.name".to_string())
984 );
985 }
986
987 #[test]
988 fn test_extract_simple_path_with_spaces_around_braces() {
989 assert_eq!(extract_simple_path("{{ key }}"), Some("key".to_string()));
990 }
991
992 #[test]
993 fn test_extract_simple_path_helper_returns_none() {
994 assert_eq!(extract_simple_path("{{#if x}}yes{{/if}}"), None);
995 }
996
997 #[test]
998 fn test_extract_simple_path_with_space_returns_none() {
999 assert_eq!(extract_simple_path("{{lowercase payload.name}}"), None);
1001 }
1002
1003 #[test]
1004 fn test_extract_simple_path_not_template() {
1005 assert_eq!(extract_simple_path("plain-text"), None);
1006 }
1007
1008 #[test]
1009 fn test_parse_with_format_iso8601() {
1010 let result = parse_with_format("2024-01-15T10:30:00Z", &TimestampFormat::Iso8601).unwrap();
1011 assert_eq!(result, 1705314600000);
1012 }
1013
1014 #[test]
1015 fn test_parse_with_format_unix_seconds() {
1016 let result = parse_with_format("1705311000", &TimestampFormat::UnixSeconds).unwrap();
1017 assert_eq!(result, 1705311000000);
1018 }
1019
1020 #[test]
1021 fn test_parse_with_format_unix_millis() {
1022 let result = parse_with_format("1705311000123", &TimestampFormat::UnixMillis).unwrap();
1023 assert_eq!(result, 1705311000123);
1024 }
1025
1026 #[test]
1027 fn test_parse_with_format_unix_nanos() {
1028 let result = parse_with_format("1705311000123456789", &TimestampFormat::UnixNanos).unwrap();
1029 assert_eq!(result, 1705311000123);
1030 }
1031
1032 #[test]
1033 fn test_parse_with_format_negative_seconds_rejected() {
1034 let result = parse_with_format("-100", &TimestampFormat::UnixSeconds);
1035 assert!(result.is_err());
1036 assert!(result.unwrap_err().to_string().contains("Negative"));
1037 }
1038
1039 #[test]
1040 fn test_parse_timestamp_auto_detect_seconds() {
1041 let result = parse_timestamp("1705311000", None).unwrap();
1043 assert_eq!(result, 1705311000000);
1044 }
1045
1046 #[test]
1047 fn test_parse_timestamp_auto_detect_millis() {
1048 let result = parse_timestamp("1705311000123", None).unwrap();
1050 assert_eq!(result, 1705311000123);
1051 }
1052
1053 #[test]
1054 fn test_parse_timestamp_auto_detect_nanos() {
1055 let result = parse_timestamp("1705311000123456789", None).unwrap();
1057 assert_eq!(result, 1705311000123);
1058 }
1059
1060 #[test]
1061 fn test_parse_timestamp_iso8601() {
1062 let result = parse_timestamp("2024-01-15T10:30:00Z", None).unwrap();
1063 assert_eq!(result, 1705314600000);
1064 }
1065}