1use crate::common::{CdcEvent, CdcOp};
65use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
66use regex::Regex;
67use serde_json::{Map, Value};
68use std::collections::{HashMap, HashSet};
69use std::sync::Arc;
70use tracing::warn;
71
72fn op_to_code(op: &CdcOp) -> &'static str {
74 match op {
75 CdcOp::Insert => "c", CdcOp::Update => "u", CdcOp::Delete => "d", CdcOp::Tombstone => "d", CdcOp::Truncate => "t", CdcOp::Snapshot => "r", CdcOp::Schema => "s", }
83}
84
85pub trait Smt: Send + Sync {
87 fn apply(&self, event: CdcEvent) -> Option<CdcEvent>;
89
90 fn name(&self) -> &'static str;
92}
93
94pub struct SmtChain {
96 transforms: Vec<Arc<dyn Smt>>,
97}
98
99impl Default for SmtChain {
100 fn default() -> Self {
101 Self::new()
102 }
103}
104
105impl SmtChain {
106 pub fn new() -> Self {
108 Self {
109 transforms: Vec::new(),
110 }
111 }
112
113 #[allow(clippy::should_implement_trait)] pub fn add<T: Smt + 'static>(mut self, transform: T) -> Self {
116 self.transforms.push(Arc::new(transform));
117 self
118 }
119
120 pub fn add_boxed(mut self, transform: Arc<dyn Smt>) -> Self {
122 self.transforms.push(transform);
123 self
124 }
125
126 pub fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
128 for transform in &self.transforms {
129 event = transform.apply(event)?;
130 }
131 Some(event)
132 }
133
134 pub fn len(&self) -> usize {
136 self.transforms.len()
137 }
138
139 pub fn is_empty(&self) -> bool {
141 self.transforms.is_empty()
142 }
143
144 pub fn names(&self) -> Vec<&'static str> {
146 self.transforms.iter().map(|t| t.name()).collect()
147 }
148}
149
150#[derive(Debug, Clone)]
164pub struct ExtractNewRecordState {
165 drop_tombstones: bool,
167 delete_handling: DeleteHandling,
169 add_op: bool,
171 add_table: bool,
173 add_schema: bool,
175 add_ts: bool,
177 header_prefix: String,
179}
180
181#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
183pub enum DeleteHandling {
184 Drop,
186 Rewrite,
188 #[default]
190 None,
191}
192
193impl Default for ExtractNewRecordState {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199impl ExtractNewRecordState {
200 pub fn new() -> Self {
202 Self {
203 drop_tombstones: false,
204 delete_handling: DeleteHandling::None,
205 add_op: false,
206 add_table: false,
207 add_schema: false,
208 add_ts: false,
209 header_prefix: "__".to_string(),
210 }
211 }
212
213 pub fn drop_tombstones(mut self) -> Self {
215 self.drop_tombstones = true;
216 self
217 }
218
219 pub fn delete_handling(mut self, mode: DeleteHandling) -> Self {
221 self.delete_handling = mode;
222 self
223 }
224
225 pub fn add_op_field(mut self) -> Self {
227 self.add_op = true;
228 self
229 }
230
231 pub fn add_table_field(mut self) -> Self {
233 self.add_table = true;
234 self
235 }
236
237 pub fn add_schema_field(mut self) -> Self {
239 self.add_schema = true;
240 self
241 }
242
243 pub fn add_ts_field(mut self) -> Self {
245 self.add_ts = true;
246 self
247 }
248
249 pub fn header_prefix(mut self, prefix: impl Into<String>) -> Self {
251 self.header_prefix = prefix.into();
252 self
253 }
254}
255
256impl Smt for ExtractNewRecordState {
257 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
258 if event.op == CdcOp::Delete {
260 if self.drop_tombstones {
261 return None;
262 }
263
264 match self.delete_handling {
265 DeleteHandling::Drop => return None,
266 DeleteHandling::Rewrite => {
267 if let Some(before) = &event.before {
269 let mut record = before.clone();
270 if let Some(obj) = record.as_object_mut() {
271 obj.insert("__deleted".to_string(), Value::Bool(true));
272 }
273 event.after = Some(record);
274 }
275 }
276 DeleteHandling::None => {}
277 }
278 }
279
280 if let Some(after) = &mut event.after {
282 if let Some(obj) = after.as_object_mut() {
283 if self.add_op {
284 obj.insert(
285 format!("{}op", self.header_prefix),
286 Value::String(op_to_code(&event.op).to_string()),
287 );
288 }
289 if self.add_table {
290 obj.insert(
291 format!("{}table", self.header_prefix),
292 Value::String(event.table.clone()),
293 );
294 }
295 if self.add_schema {
296 obj.insert(
297 format!("{}schema", self.header_prefix),
298 Value::String(event.schema.clone()),
299 );
300 }
301 if self.add_ts {
302 obj.insert(
303 format!("{}ts_ms", self.header_prefix),
304 Value::Number((event.timestamp * 1000).into()),
305 );
306 }
307 }
308 }
309
310 Some(event)
311 }
312
313 fn name(&self) -> &'static str {
314 "ExtractNewRecordState"
315 }
316}
317
318#[derive(Debug, Clone)]
327pub struct ValueToKey {
328 fields: Vec<String>,
330}
331
332impl ValueToKey {
333 pub fn new(fields: Vec<String>) -> Self {
335 Self { fields }
336 }
337
338 pub fn with_fields<I, S>(fields: I) -> Self
340 where
341 I: IntoIterator<Item = S>,
342 S: Into<String>,
343 {
344 Self {
345 fields: fields.into_iter().map(Into::into).collect(),
346 }
347 }
348
349 pub fn extract_key(&self, event: &CdcEvent) -> Option<Value> {
351 let source = event.after.as_ref().or(event.before.as_ref())?;
352
353 if let Some(obj) = source.as_object() {
354 let mut key_obj = Map::new();
355 for field in &self.fields {
356 if let Some(value) = obj.get(field) {
357 key_obj.insert(field.clone(), value.clone());
358 }
359 }
360
361 if !key_obj.is_empty() {
362 return Some(Value::Object(key_obj));
363 }
364 }
365
366 None
367 }
368}
369
370impl Smt for ValueToKey {
371 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
372 if let Some(key) = self.extract_key(&event) {
373 if let Some(after) = &mut event.after {
375 if let Some(obj) = after.as_object_mut() {
376 obj.insert("__key".to_string(), key);
377 }
378 }
379 }
380 Some(event)
381 }
382
383 fn name(&self) -> &'static str {
384 "ValueToKey"
385 }
386}
387
388#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
394pub enum TimestampFormat {
395 #[default]
397 Iso8601,
398 EpochSeconds,
400 EpochMillis,
402 EpochMicros,
404 DateOnly,
406 TimeOnly,
408}
409
410#[derive(Debug, Clone)]
412pub struct TimestampConverter {
413 fields: Vec<String>,
415 target_format: TimestampFormat,
417}
418
419impl TimestampConverter {
420 pub fn new<I, S>(fields: I, format: TimestampFormat) -> Self
422 where
423 I: IntoIterator<Item = S>,
424 S: Into<String>,
425 {
426 Self {
427 fields: fields.into_iter().map(Into::into).collect(),
428 target_format: format,
429 }
430 }
431
432 fn convert_value(&self, value: &Value) -> Option<Value> {
434 let timestamp = self.parse_timestamp(value)?;
435
436 Some(match self.target_format {
437 TimestampFormat::Iso8601 => Value::String(timestamp.to_rfc3339()),
438 TimestampFormat::EpochSeconds => Value::Number(timestamp.timestamp().into()),
439 TimestampFormat::EpochMillis => Value::Number(timestamp.timestamp_millis().into()),
440 TimestampFormat::EpochMicros => Value::Number(timestamp.timestamp_micros().into()),
441 TimestampFormat::DateOnly => Value::String(timestamp.format("%Y-%m-%d").to_string()),
442 TimestampFormat::TimeOnly => Value::String(timestamp.format("%H:%M:%S").to_string()),
443 })
444 }
445
446 fn parse_timestamp(&self, value: &Value) -> Option<DateTime<Utc>> {
448 match value {
449 Value::String(s) => {
450 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
452 return Some(dt.with_timezone(&Utc));
453 }
454 if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
456 return Some(Utc.from_utc_datetime(&dt));
457 }
458 if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
459 return Some(Utc.from_utc_datetime(&dt));
460 }
461 None
462 }
463 Value::Number(n) => {
464 let ts = n.as_i64()?;
466 if ts > 1_000_000_000_000 {
467 DateTime::from_timestamp_millis(ts)
469 } else {
470 DateTime::from_timestamp(ts, 0)
472 }
473 }
474 _ => None,
475 }
476 }
477}
478
479impl Smt for TimestampConverter {
480 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
481 if let Some(after) = &mut event.after {
483 if let Some(obj) = after.as_object_mut() {
484 for field in &self.fields {
485 if let Some(value) = obj.get(field).cloned() {
486 if let Some(converted) = self.convert_value(&value) {
487 obj.insert(field.clone(), converted);
488 }
489 }
490 }
491 }
492 }
493
494 if let Some(before) = &mut event.before {
496 if let Some(obj) = before.as_object_mut() {
497 for field in &self.fields {
498 if let Some(value) = obj.get(field).cloned() {
499 if let Some(converted) = self.convert_value(&value) {
500 obj.insert(field.clone(), converted);
501 }
502 }
503 }
504 }
505 }
506
507 Some(event)
508 }
509
510 fn name(&self) -> &'static str {
511 "TimestampConverter"
512 }
513}
514
515#[derive(Debug, Clone, Default)]
521pub enum MaskStrategy {
522 Fixed(String),
524 #[default]
526 Asterisks,
527 Hash,
529 PartialMask { keep_first: usize, keep_last: usize },
531 Null,
533 Redact,
535}
536
537#[derive(Debug, Clone)]
539pub struct MaskField {
540 fields: HashSet<String>,
542 strategy: MaskStrategy,
544}
545
546impl MaskField {
547 pub fn new<I, S>(fields: I) -> Self
549 where
550 I: IntoIterator<Item = S>,
551 S: Into<String>,
552 {
553 Self {
554 fields: fields.into_iter().map(Into::into).collect(),
555 strategy: MaskStrategy::Asterisks,
556 }
557 }
558
559 pub fn with_strategy(mut self, strategy: MaskStrategy) -> Self {
561 self.strategy = strategy;
562 self
563 }
564
565 fn mask_value(&self, value: &Value) -> Value {
567 match &self.strategy {
568 MaskStrategy::Fixed(s) => Value::String(s.clone()),
569 MaskStrategy::Asterisks => {
570 if let Some(s) = value.as_str() {
571 Value::String("*".repeat(s.len().min(20)))
572 } else {
573 Value::String("****".to_string())
574 }
575 }
576 MaskStrategy::Hash => {
577 use sha2::{Digest, Sha256};
578 let bytes = serde_json::to_vec(value).unwrap_or_default();
579 let hash = Sha256::digest(&bytes);
580 Value::String(hex::encode(hash))
581 }
582 MaskStrategy::PartialMask {
583 keep_first,
584 keep_last,
585 } => {
586 if let Some(s) = value.as_str() {
587 let len = s.len();
588 if len <= keep_first + keep_last {
589 Value::String("*".repeat(len))
590 } else {
591 let first: String = s.chars().take(*keep_first).collect();
592 let last: String = s.chars().skip(len - keep_last).collect();
593 let middle = "*".repeat(len - keep_first - keep_last);
594 Value::String(format!("{}{}{}", first, middle, last))
595 }
596 } else {
597 Value::String("****".to_string())
598 }
599 }
600 MaskStrategy::Null => Value::Null,
601 MaskStrategy::Redact => Value::Null, }
603 }
604
605 fn mask_object(&self, obj: &mut Map<String, Value>) {
607 for field in &self.fields {
608 if obj.contains_key(field) {
609 if matches!(self.strategy, MaskStrategy::Redact) {
610 obj.remove(field);
611 } else if let Some(value) = obj.get(field).cloned() {
612 obj.insert(field.clone(), self.mask_value(&value));
613 }
614 }
615 }
616 }
617}
618
619impl Smt for MaskField {
620 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
621 if let Some(after) = &mut event.after {
622 if let Some(obj) = after.as_object_mut() {
623 self.mask_object(obj);
624 }
625 }
626
627 if let Some(before) = &mut event.before {
628 if let Some(obj) = before.as_object_mut() {
629 self.mask_object(obj);
630 }
631 }
632
633 Some(event)
634 }
635
636 fn name(&self) -> &'static str {
637 "MaskField"
638 }
639}
640
641#[derive(Debug, Clone, Default)]
647pub struct ReplaceField {
648 include: Option<HashSet<String>>,
650 exclude: HashSet<String>,
652 renames: HashMap<String, String>,
654}
655
656impl ReplaceField {
657 pub fn new() -> Self {
659 Self::default()
660 }
661
662 pub fn include<I, S>(mut self, fields: I) -> Self
664 where
665 I: IntoIterator<Item = S>,
666 S: Into<String>,
667 {
668 self.include = Some(fields.into_iter().map(Into::into).collect());
669 self
670 }
671
672 pub fn exclude<I, S>(mut self, fields: I) -> Self
674 where
675 I: IntoIterator<Item = S>,
676 S: Into<String>,
677 {
678 self.exclude = fields.into_iter().map(Into::into).collect();
679 self
680 }
681
682 pub fn rename(mut self, from: impl Into<String>, to: impl Into<String>) -> Self {
684 self.renames.insert(from.into(), to.into());
685 self
686 }
687
688 fn replace_object(&self, obj: &mut Map<String, Value>) {
690 for (old, new) in &self.renames {
692 if let Some(value) = obj.remove(old) {
693 obj.insert(new.clone(), value);
694 }
695 }
696
697 if let Some(include) = &self.include {
699 let keys: Vec<_> = obj.keys().cloned().collect();
700 for key in keys {
701 if !include.contains(&key) {
702 obj.remove(&key);
703 }
704 }
705 }
706
707 for field in &self.exclude {
709 obj.remove(field);
710 }
711 }
712}
713
714impl Smt for ReplaceField {
715 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
716 if let Some(after) = &mut event.after {
717 if let Some(obj) = after.as_object_mut() {
718 self.replace_object(obj);
719 }
720 }
721
722 if let Some(before) = &mut event.before {
723 if let Some(obj) = before.as_object_mut() {
724 self.replace_object(obj);
725 }
726 }
727
728 Some(event)
729 }
730
731 fn name(&self) -> &'static str {
732 "ReplaceField"
733 }
734}
735
736#[derive(Debug, Clone)]
742pub enum InsertValue {
743 Static(Value),
745 CurrentTimestamp,
747 CurrentDate,
749 CopyFrom(String),
751}
752
753#[derive(Debug, Clone)]
755pub struct InsertField {
756 fields: Vec<(String, InsertValue)>,
758}
759
760impl InsertField {
761 pub fn new() -> Self {
763 Self { fields: Vec::new() }
764 }
765
766 pub fn static_field(mut self, name: impl Into<String>, value: Value) -> Self {
768 self.fields.push((name.into(), InsertValue::Static(value)));
769 self
770 }
771
772 pub fn timestamp_field(mut self, name: impl Into<String>) -> Self {
774 self.fields
775 .push((name.into(), InsertValue::CurrentTimestamp));
776 self
777 }
778
779 pub fn date_field(mut self, name: impl Into<String>) -> Self {
781 self.fields.push((name.into(), InsertValue::CurrentDate));
782 self
783 }
784
785 pub fn copy_field(mut self, name: impl Into<String>, source: impl Into<String>) -> Self {
787 self.fields
788 .push((name.into(), InsertValue::CopyFrom(source.into())));
789 self
790 }
791
792 fn get_value(&self, source: &InsertValue, obj: &Map<String, Value>) -> Value {
794 match source {
795 InsertValue::Static(v) => v.clone(),
796 InsertValue::CurrentTimestamp => Value::String(chrono::Utc::now().to_rfc3339()),
797 InsertValue::CurrentDate => {
798 Value::String(chrono::Utc::now().format("%Y-%m-%d").to_string())
799 }
800 InsertValue::CopyFrom(field) => obj.get(field).cloned().unwrap_or(Value::Null),
801 }
802 }
803}
804
805impl Default for InsertField {
806 fn default() -> Self {
807 Self::new()
808 }
809}
810
811impl Smt for InsertField {
812 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
813 if let Some(after) = &mut event.after {
814 if let Some(obj) = after.as_object_mut() {
815 let obj_clone = obj.clone();
817 for (name, source) in &self.fields {
818 let value = self.get_value(source, &obj_clone);
819 obj.insert(name.clone(), value);
820 }
821 }
822 }
823
824 Some(event)
825 }
826
827 fn name(&self) -> &'static str {
828 "InsertField"
829 }
830}
831
832pub enum FilterCondition {
838 Equals { field: String, value: Value },
840 NotEquals { field: String, value: Value },
842 IsNull { field: String },
844 IsNotNull { field: String },
846 Matches { field: String, pattern: String },
848 In { field: String, values: Vec<Value> },
850 Custom(Arc<dyn Fn(&CdcEvent) -> bool + Send + Sync>),
852 And(Vec<FilterCondition>),
854 Or(Vec<FilterCondition>),
856 Not(Box<FilterCondition>),
858}
859
860impl std::fmt::Debug for FilterCondition {
861 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
862 match self {
863 Self::Equals { field, value } => f
864 .debug_struct("Equals")
865 .field("field", field)
866 .field("value", value)
867 .finish(),
868 Self::NotEquals { field, value } => f
869 .debug_struct("NotEquals")
870 .field("field", field)
871 .field("value", value)
872 .finish(),
873 Self::IsNull { field } => f.debug_struct("IsNull").field("field", field).finish(),
874 Self::IsNotNull { field } => f.debug_struct("IsNotNull").field("field", field).finish(),
875 Self::Matches { field, pattern } => f
876 .debug_struct("Matches")
877 .field("field", field)
878 .field("pattern", pattern)
879 .finish(),
880 Self::In { field, values } => f
881 .debug_struct("In")
882 .field("field", field)
883 .field("values", values)
884 .finish(),
885 Self::Custom(_) => f.debug_struct("Custom").field("fn", &"<closure>").finish(),
886 Self::And(conditions) => f
887 .debug_struct("And")
888 .field("conditions", conditions)
889 .finish(),
890 Self::Or(conditions) => f
891 .debug_struct("Or")
892 .field("conditions", conditions)
893 .finish(),
894 Self::Not(condition) => f.debug_struct("Not").field("condition", condition).finish(),
895 }
896 }
897}
898
899pub struct Filter {
901 condition: FilterCondition,
903 keep_matching: bool,
905}
906
907impl Filter {
908 pub fn keep(condition: FilterCondition) -> Self {
910 Self {
911 condition,
912 keep_matching: true,
913 }
914 }
915
916 pub fn drop(condition: FilterCondition) -> Self {
918 Self {
919 condition,
920 keep_matching: false,
921 }
922 }
923
924 fn matches(&self, event: &CdcEvent) -> bool {
926 self.check_condition(&self.condition, event)
927 }
928
929 fn check_condition(&self, condition: &FilterCondition, event: &CdcEvent) -> bool {
930 match condition {
931 FilterCondition::Equals { field, value } => self
932 .get_field_value(event, field)
933 .map(|v| v == value)
934 .unwrap_or(false),
935 FilterCondition::NotEquals { field, value } => self
936 .get_field_value(event, field)
937 .map(|v| v != value)
938 .unwrap_or(true),
939 FilterCondition::IsNull { field } => self
940 .get_field_value(event, field)
941 .map(|v| v.is_null())
942 .unwrap_or(true),
943 FilterCondition::IsNotNull { field } => self
944 .get_field_value(event, field)
945 .map(|v| !v.is_null())
946 .unwrap_or(false),
947 FilterCondition::Matches { field, pattern } => {
948 if let Ok(re) = Regex::new(pattern) {
949 self.get_field_value(event, field)
950 .and_then(|v| v.as_str().map(|s| re.is_match(s)))
951 .unwrap_or(false)
952 } else {
953 false
954 }
955 }
956 FilterCondition::In { field, values } => self
957 .get_field_value(event, field)
958 .map(|v| values.contains(v))
959 .unwrap_or(false),
960 FilterCondition::Custom(f) => f(event),
961 FilterCondition::And(conditions) => {
962 conditions.iter().all(|c| self.check_condition(c, event))
963 }
964 FilterCondition::Or(conditions) => {
965 conditions.iter().any(|c| self.check_condition(c, event))
966 }
967 FilterCondition::Not(c) => !self.check_condition(c, event),
968 }
969 }
970
971 fn get_field_value<'a>(&self, event: &'a CdcEvent, field: &str) -> Option<&'a Value> {
972 event
973 .after
974 .as_ref()
975 .or(event.before.as_ref())
976 .and_then(|v| v.as_object())
977 .and_then(|obj| obj.get(field))
978 }
979}
980
981impl Smt for Filter {
982 fn apply(&self, event: CdcEvent) -> Option<CdcEvent> {
983 let matches = self.matches(&event);
984 if (self.keep_matching && matches) || (!self.keep_matching && !matches) {
985 Some(event)
986 } else {
987 None
988 }
989 }
990
991 fn name(&self) -> &'static str {
992 "Filter"
993 }
994}
995
996#[derive(Debug, Clone, Copy)]
1002pub enum CastType {
1003 String,
1004 Integer,
1005 Float,
1006 Boolean,
1007 Json,
1008}
1009
1010#[derive(Debug, Clone)]
1012pub struct Cast {
1013 casts: HashMap<String, CastType>,
1015}
1016
1017impl Cast {
1018 pub fn new() -> Self {
1020 Self {
1021 casts: HashMap::new(),
1022 }
1023 }
1024
1025 pub fn field(mut self, name: impl Into<String>, to: CastType) -> Self {
1027 self.casts.insert(name.into(), to);
1028 self
1029 }
1030
1031 fn cast_value(&self, value: &Value, to: CastType) -> Value {
1033 match to {
1034 CastType::String => match value {
1035 Value::String(s) => Value::String(s.clone()),
1036 v => Value::String(v.to_string()),
1037 },
1038 CastType::Integer => match value {
1039 Value::Number(n) => {
1040 if let Some(i) = n.as_i64() {
1041 Value::Number(i.into())
1042 } else if let Some(f) = n.as_f64() {
1043 Value::Number((f as i64).into())
1044 } else {
1045 value.clone()
1046 }
1047 }
1048 Value::String(s) => s
1049 .parse::<i64>()
1050 .map(|i| Value::Number(i.into()))
1051 .unwrap_or(Value::Null),
1052 Value::Bool(b) => Value::Number(if *b { 1 } else { 0 }.into()),
1053 _ => Value::Null,
1054 },
1055 CastType::Float => match value {
1056 Value::Number(n) => {
1057 if let Some(f) = n.as_f64() {
1058 serde_json::Number::from_f64(f)
1059 .map(Value::Number)
1060 .unwrap_or(value.clone())
1061 } else {
1062 value.clone()
1063 }
1064 }
1065 Value::String(s) => s
1066 .parse::<f64>()
1067 .ok()
1068 .and_then(serde_json::Number::from_f64)
1069 .map(Value::Number)
1070 .unwrap_or(Value::Null),
1071 _ => Value::Null,
1072 },
1073 CastType::Boolean => match value {
1074 Value::Bool(b) => Value::Bool(*b),
1075 Value::Number(n) => Value::Bool(n.as_i64().map(|i| i != 0).unwrap_or(false)),
1076 Value::String(s) => {
1077 let lower = s.to_lowercase();
1078 Value::Bool(lower == "true" || lower == "1" || lower == "yes")
1079 }
1080 _ => Value::Bool(false),
1081 },
1082 CastType::Json => match value {
1083 Value::String(s) => serde_json::from_str(s).unwrap_or(Value::String(s.clone())),
1084 v => v.clone(),
1085 },
1086 }
1087 }
1088}
1089
1090impl Default for Cast {
1091 fn default() -> Self {
1092 Self::new()
1093 }
1094}
1095
1096impl Smt for Cast {
1097 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
1098 if let Some(after) = &mut event.after {
1099 if let Some(obj) = after.as_object_mut() {
1100 for (field, cast_type) in &self.casts {
1101 if let Some(value) = obj.get(field).cloned() {
1102 obj.insert(field.clone(), self.cast_value(&value, *cast_type));
1103 }
1104 }
1105 }
1106 }
1107
1108 if let Some(before) = &mut event.before {
1109 if let Some(obj) = before.as_object_mut() {
1110 for (field, cast_type) in &self.casts {
1111 if let Some(value) = obj.get(field).cloned() {
1112 obj.insert(field.clone(), self.cast_value(&value, *cast_type));
1113 }
1114 }
1115 }
1116 }
1117
1118 Some(event)
1119 }
1120
1121 fn name(&self) -> &'static str {
1122 "Cast"
1123 }
1124}
1125
1126#[derive(Debug, Clone)]
1132pub struct Flatten {
1133 delimiter: String,
1135 max_depth: usize,
1137}
1138
1139impl Default for Flatten {
1140 fn default() -> Self {
1141 Self::new()
1142 }
1143}
1144
1145impl Flatten {
1146 pub fn new() -> Self {
1148 Self {
1149 delimiter: ".".to_string(),
1150 max_depth: 0,
1151 }
1152 }
1153
1154 pub fn delimiter(mut self, delimiter: impl Into<String>) -> Self {
1156 self.delimiter = delimiter.into();
1157 self
1158 }
1159
1160 pub fn max_depth(mut self, depth: usize) -> Self {
1162 self.max_depth = depth;
1163 self
1164 }
1165
1166 fn flatten_object(&self, obj: &Map<String, Value>) -> Map<String, Value> {
1168 let mut result = Map::new();
1169 self.flatten_recursive(obj, "", &mut result, 0);
1170 result
1171 }
1172
1173 fn flatten_recursive(
1174 &self,
1175 obj: &Map<String, Value>,
1176 prefix: &str,
1177 result: &mut Map<String, Value>,
1178 depth: usize,
1179 ) {
1180 for (key, value) in obj {
1181 let new_key = if prefix.is_empty() {
1182 key.clone()
1183 } else {
1184 format!("{}{}{}", prefix, self.delimiter, key)
1185 };
1186
1187 if self.max_depth > 0 && depth >= self.max_depth {
1188 result.insert(new_key, value.clone());
1189 } else if let Some(nested) = value.as_object() {
1190 self.flatten_recursive(nested, &new_key, result, depth + 1);
1191 } else {
1192 result.insert(new_key, value.clone());
1193 }
1194 }
1195 }
1196}
1197
1198impl Smt for Flatten {
1199 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
1200 if let Some(after) = &event.after {
1201 if let Some(obj) = after.as_object() {
1202 event.after = Some(Value::Object(self.flatten_object(obj)));
1203 }
1204 }
1205
1206 if let Some(before) = &event.before {
1207 if let Some(obj) = before.as_object() {
1208 event.before = Some(Value::Object(self.flatten_object(obj)));
1209 }
1210 }
1211
1212 Some(event)
1213 }
1214
1215 fn name(&self) -> &'static str {
1216 "Flatten"
1217 }
1218}
1219
1220#[derive(Debug, Clone)]
1227pub struct RegexRouter {
1228 rules: Vec<(Regex, String)>,
1230 default_topic: String,
1232}
1233
1234impl RegexRouter {
1235 pub fn new(default_topic: impl Into<String>) -> Self {
1237 Self {
1238 rules: Vec::new(),
1239 default_topic: default_topic.into(),
1240 }
1241 }
1242
1243 pub fn route(mut self, pattern: &str, topic: impl Into<String>) -> Self {
1245 if let Ok(re) = Regex::new(pattern) {
1246 self.rules.push((re, topic.into()));
1247 } else {
1248 warn!("Invalid regex pattern: {}", pattern);
1249 }
1250 self
1251 }
1252
1253 pub fn get_topic(&self, schema: &str, table: &str) -> &str {
1255 let full_name = format!("{}.{}", schema, table);
1256
1257 for (pattern, topic) in &self.rules {
1258 if pattern.is_match(&full_name) {
1259 return topic;
1260 }
1261 }
1262
1263 &self.default_topic
1264 }
1265}
1266
1267impl Smt for RegexRouter {
1268 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
1269 let topic = self.get_topic(&event.schema, &event.table);
1270 if let Some(after) = &mut event.after {
1272 if let Some(obj) = after.as_object_mut() {
1273 obj.insert("__topic".to_string(), Value::String(topic.to_string()));
1274 }
1275 }
1276 Some(event)
1277 }
1278
1279 fn name(&self) -> &'static str {
1280 "RegexRouter"
1281 }
1282}
1283
1284pub enum Predicate {
1293 Table { pattern: Regex },
1295 Schema { pattern: Regex },
1297 Operation { ops: Vec<CdcOp> },
1299 FieldValue { field: String, value: Value },
1301 FieldExists { field: String },
1303 Custom(Arc<dyn Fn(&CdcEvent) -> bool + Send + Sync>),
1305 And(Vec<Predicate>),
1307 Or(Vec<Predicate>),
1309 Not(Box<Predicate>),
1311}
1312
1313impl Predicate {
1314 pub fn table(pattern: &str) -> Option<Self> {
1316 Regex::new(pattern)
1317 .ok()
1318 .map(|re| Predicate::Table { pattern: re })
1319 }
1320
1321 pub fn schema(pattern: &str) -> Option<Self> {
1323 Regex::new(pattern)
1324 .ok()
1325 .map(|re| Predicate::Schema { pattern: re })
1326 }
1327
1328 pub fn operation(ops: Vec<CdcOp>) -> Self {
1330 Predicate::Operation { ops }
1331 }
1332
1333 pub fn field_equals(field: impl Into<String>, value: Value) -> Self {
1335 Predicate::FieldValue {
1336 field: field.into(),
1337 value,
1338 }
1339 }
1340
1341 pub fn field_exists(field: impl Into<String>) -> Self {
1343 Predicate::FieldExists {
1344 field: field.into(),
1345 }
1346 }
1347
1348 pub fn matches(&self, event: &CdcEvent) -> bool {
1350 match self {
1351 Predicate::Table { pattern } => pattern.is_match(&event.table),
1352 Predicate::Schema { pattern } => pattern.is_match(&event.schema),
1353 Predicate::Operation { ops } => ops.contains(&event.op),
1354 Predicate::FieldValue { field, value } => event
1355 .after
1356 .as_ref()
1357 .or(event.before.as_ref())
1358 .and_then(|v| v.as_object())
1359 .and_then(|obj| obj.get(field))
1360 .map(|v| v == value)
1361 .unwrap_or(false),
1362 Predicate::FieldExists { field } => event
1363 .after
1364 .as_ref()
1365 .or(event.before.as_ref())
1366 .and_then(|v| v.as_object())
1367 .map(|obj| obj.contains_key(field))
1368 .unwrap_or(false),
1369 Predicate::Custom(f) => f(event),
1370 Predicate::And(predicates) => predicates.iter().all(|p| p.matches(event)),
1371 Predicate::Or(predicates) => predicates.iter().any(|p| p.matches(event)),
1372 Predicate::Not(p) => !p.matches(event),
1373 }
1374 }
1375}
1376
1377impl std::fmt::Debug for Predicate {
1378 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1379 match self {
1380 Self::Table { pattern } => f
1381 .debug_struct("Table")
1382 .field("pattern", &pattern.as_str())
1383 .finish(),
1384 Self::Schema { pattern } => f
1385 .debug_struct("Schema")
1386 .field("pattern", &pattern.as_str())
1387 .finish(),
1388 Self::Operation { ops } => f.debug_struct("Operation").field("ops", ops).finish(),
1389 Self::FieldValue { field, value } => f
1390 .debug_struct("FieldValue")
1391 .field("field", field)
1392 .field("value", value)
1393 .finish(),
1394 Self::FieldExists { field } => {
1395 f.debug_struct("FieldExists").field("field", field).finish()
1396 }
1397 Self::Custom(_) => f.debug_struct("Custom").field("fn", &"<closure>").finish(),
1398 Self::And(predicates) => f
1399 .debug_struct("And")
1400 .field("predicates", predicates)
1401 .finish(),
1402 Self::Or(predicates) => f
1403 .debug_struct("Or")
1404 .field("predicates", predicates)
1405 .finish(),
1406 Self::Not(predicate) => f.debug_struct("Not").field("predicate", predicate).finish(),
1407 }
1408 }
1409}
1410
1411pub struct ConditionalSmt {
1413 transform: Arc<dyn Smt>,
1415 predicate: Predicate,
1417 negate: bool,
1419}
1420
1421impl ConditionalSmt {
1422 pub fn when<T: Smt + 'static>(predicate: Predicate, transform: T) -> Self {
1424 Self {
1425 transform: Arc::new(transform),
1426 predicate,
1427 negate: false,
1428 }
1429 }
1430
1431 pub fn when_arc(predicate: Predicate, transform: Arc<dyn Smt>) -> Self {
1433 Self {
1434 transform,
1435 predicate,
1436 negate: false,
1437 }
1438 }
1439
1440 pub fn unless<T: Smt + 'static>(predicate: Predicate, transform: T) -> Self {
1442 Self {
1443 transform: Arc::new(transform),
1444 predicate,
1445 negate: true,
1446 }
1447 }
1448
1449 pub fn unless_arc(predicate: Predicate, transform: Arc<dyn Smt>) -> Self {
1451 Self {
1452 transform,
1453 predicate,
1454 negate: true,
1455 }
1456 }
1457}
1458
1459impl Smt for ConditionalSmt {
1460 fn apply(&self, event: CdcEvent) -> Option<CdcEvent> {
1461 let matches = self.predicate.matches(&event);
1462 let should_apply = if self.negate { !matches } else { matches };
1463
1464 if should_apply {
1465 self.transform.apply(event)
1466 } else {
1467 Some(event)
1468 }
1469 }
1470
1471 fn name(&self) -> &'static str {
1472 "ConditionalSmt"
1473 }
1474}
1475
1476#[derive(Debug, Clone)]
1482pub struct HeaderToValue {
1483 fields: Vec<(String, HeaderSource)>,
1485 mode: HeaderMode,
1487}
1488
1489#[derive(Debug, Clone)]
1491pub enum HeaderSource {
1492 SourceType,
1494 Database,
1496 Schema,
1498 Table,
1500 Operation,
1502 Timestamp,
1504 TransactionId,
1506}
1507
1508#[derive(Debug, Clone, Copy, Default)]
1510pub enum HeaderMode {
1511 #[default]
1513 Copy,
1514 Move,
1516}
1517
1518impl HeaderToValue {
1519 pub fn new() -> Self {
1521 Self {
1522 fields: Vec::new(),
1523 mode: HeaderMode::Copy,
1524 }
1525 }
1526
1527 pub fn field(mut self, target: impl Into<String>, source: HeaderSource) -> Self {
1529 self.fields.push((target.into(), source));
1530 self
1531 }
1532
1533 pub fn move_mode(mut self) -> Self {
1535 self.mode = HeaderMode::Move;
1536 self
1537 }
1538
1539 pub fn all_headers(self, prefix: &str) -> Self {
1541 self.field(format!("{}source_type", prefix), HeaderSource::SourceType)
1542 .field(format!("{}database", prefix), HeaderSource::Database)
1543 .field(format!("{}schema", prefix), HeaderSource::Schema)
1544 .field(format!("{}table", prefix), HeaderSource::Table)
1545 .field(format!("{}op", prefix), HeaderSource::Operation)
1546 .field(format!("{}ts", prefix), HeaderSource::Timestamp)
1547 }
1548}
1549
1550impl Default for HeaderToValue {
1551 fn default() -> Self {
1552 Self::new()
1553 }
1554}
1555
1556impl Smt for HeaderToValue {
1557 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
1558 if let Some(after) = &mut event.after {
1559 if let Some(obj) = after.as_object_mut() {
1560 for (target, source) in &self.fields {
1561 let value = match source {
1562 HeaderSource::SourceType => Value::String(event.source_type.clone()),
1563 HeaderSource::Database => Value::String(event.database.clone()),
1564 HeaderSource::Schema => Value::String(event.schema.clone()),
1565 HeaderSource::Table => Value::String(event.table.clone()),
1566 HeaderSource::Operation => Value::String(op_to_code(&event.op).to_string()),
1567 HeaderSource::Timestamp => Value::Number(event.timestamp.into()),
1568 HeaderSource::TransactionId => event
1569 .transaction
1570 .as_ref()
1571 .map(|t| Value::String(t.id.clone()))
1572 .unwrap_or(Value::Null),
1573 };
1574 obj.insert(target.clone(), value);
1575 }
1576 }
1577 }
1578
1579 Some(event)
1580 }
1581
1582 fn name(&self) -> &'static str {
1583 "HeaderToValue"
1584 }
1585}
1586
1587#[derive(Debug, Clone)]
1593pub struct Unwrap {
1594 field_path: Vec<String>,
1596 replace: bool,
1598}
1599
1600impl Unwrap {
1601 pub fn new(path: impl Into<String>) -> Self {
1603 let path_str = path.into();
1604 Self {
1605 field_path: path_str.split('.').map(String::from).collect(),
1606 replace: true,
1607 }
1608 }
1609
1610 pub fn merge(mut self) -> Self {
1612 self.replace = false;
1613 self
1614 }
1615
1616 fn extract_at_path<'a>(&self, value: &'a Value) -> Option<&'a Value> {
1618 let mut current = value;
1619 for key in &self.field_path {
1620 current = current.as_object()?.get(key)?;
1621 }
1622 Some(current)
1623 }
1624}
1625
1626impl Smt for Unwrap {
1627 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
1628 if let Some(after) = &event.after {
1629 if let Some(extracted) = self.extract_at_path(after) {
1630 let extracted_clone = extracted.clone();
1631 if self.replace {
1632 event.after = Some(extracted_clone);
1633 } else if let Some(extracted_obj) = extracted_clone.as_object() {
1634 if let Some(obj) = event.after.as_mut().and_then(|v| v.as_object_mut()) {
1635 for (k, v) in extracted_obj {
1636 obj.insert(k.clone(), v.clone());
1637 }
1638 }
1639 }
1640 }
1641 }
1642
1643 Some(event)
1644 }
1645
1646 fn name(&self) -> &'static str {
1647 "Unwrap"
1648 }
1649}
1650
1651#[derive(Debug, Clone)]
1657pub struct SetNull {
1658 fields: Vec<String>,
1660 condition: NullCondition,
1662}
1663
1664#[derive(Debug, Clone)]
1666pub enum NullCondition {
1667 Always,
1669 IfEmpty,
1671 IfEquals(Value),
1673 IfMatches(String),
1675}
1676
1677impl SetNull {
1678 pub fn new<I, S>(fields: I) -> Self
1680 where
1681 I: IntoIterator<Item = S>,
1682 S: Into<String>,
1683 {
1684 Self {
1685 fields: fields.into_iter().map(Into::into).collect(),
1686 condition: NullCondition::Always,
1687 }
1688 }
1689
1690 pub fn when(mut self, condition: NullCondition) -> Self {
1692 self.condition = condition;
1693 self
1694 }
1695
1696 fn should_nullify(&self, value: &Value) -> bool {
1698 match &self.condition {
1699 NullCondition::Always => true,
1700 NullCondition::IfEmpty => value.as_str().map(|s| s.is_empty()).unwrap_or(false),
1701 NullCondition::IfEquals(target) => value == target,
1702 NullCondition::IfMatches(pattern) => {
1703 if let (Ok(re), Some(s)) = (Regex::new(pattern), value.as_str()) {
1704 re.is_match(s)
1705 } else {
1706 false
1707 }
1708 }
1709 }
1710 }
1711}
1712
1713impl Smt for SetNull {
1714 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
1715 if let Some(after) = &mut event.after {
1716 if let Some(obj) = after.as_object_mut() {
1717 for field in &self.fields {
1718 if let Some(value) = obj.get(field).cloned() {
1719 if self.should_nullify(&value) {
1720 obj.insert(field.clone(), Value::Null);
1721 }
1722 }
1723 }
1724 }
1725 }
1726
1727 Some(event)
1728 }
1729
1730 fn name(&self) -> &'static str {
1731 "SetNull"
1732 }
1733}
1734
1735#[derive(Clone)]
1749pub struct TimezoneConverter {
1750 fields: Vec<String>,
1752 source_tz: chrono_tz::Tz,
1754 target_tz: chrono_tz::Tz,
1756 include_time: bool,
1758 format: Option<String>,
1760}
1761
1762impl std::fmt::Debug for TimezoneConverter {
1763 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1764 f.debug_struct("TimezoneConverter")
1765 .field("fields", &self.fields)
1766 .field("source_tz", &self.source_tz.to_string())
1767 .field("target_tz", &self.target_tz.to_string())
1768 .field("include_time", &self.include_time)
1769 .field("format", &self.format)
1770 .finish()
1771 }
1772}
1773
1774impl TimezoneConverter {
1775 pub fn new<I, S>(fields: I) -> Self
1777 where
1778 I: IntoIterator<Item = S>,
1779 S: Into<String>,
1780 {
1781 Self {
1782 fields: fields.into_iter().map(Into::into).collect(),
1783 source_tz: chrono_tz::UTC,
1784 target_tz: chrono_tz::UTC,
1785 include_time: true,
1786 format: None,
1787 }
1788 }
1789
1790 pub fn from(mut self, tz: &str) -> Self {
1792 if let Ok(parsed) = tz.parse::<chrono_tz::Tz>() {
1793 self.source_tz = parsed;
1794 } else {
1795 warn!("Invalid source timezone '{}', using UTC", tz);
1796 }
1797 self
1798 }
1799
1800 pub fn to(mut self, tz: &str) -> Self {
1802 if let Ok(parsed) = tz.parse::<chrono_tz::Tz>() {
1803 self.target_tz = parsed;
1804 } else {
1805 warn!("Invalid target timezone '{}', using UTC", tz);
1806 }
1807 self
1808 }
1809
1810 pub fn date_only(mut self) -> Self {
1812 self.include_time = false;
1813 self
1814 }
1815
1816 pub fn format(mut self, fmt: impl Into<String>) -> Self {
1818 self.format = Some(fmt.into());
1819 self
1820 }
1821
1822 fn convert_value(&self, value: &Value) -> Option<Value> {
1824 let dt = self.parse_timestamp(value)?;
1825
1826 let converted = dt.with_timezone(&self.target_tz);
1828
1829 let result = if let Some(ref fmt) = self.format {
1830 converted.format(fmt).to_string()
1831 } else if self.include_time {
1832 converted.to_rfc3339()
1833 } else {
1834 converted.format("%Y-%m-%d").to_string()
1835 };
1836
1837 Some(Value::String(result))
1838 }
1839
1840 fn parse_timestamp(&self, value: &Value) -> Option<DateTime<chrono_tz::Tz>> {
1842 match value {
1843 Value::String(s) => {
1844 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1846 return Some(dt.with_timezone(&self.source_tz));
1847 }
1848 if let Ok(naive) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1850 return self.source_tz.from_local_datetime(&naive).single();
1851 }
1852 if let Ok(naive) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1853 return self.source_tz.from_local_datetime(&naive).single();
1854 }
1855 if let Ok(date) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1857 let naive = date.and_hms_opt(0, 0, 0)?;
1858 return self.source_tz.from_local_datetime(&naive).single();
1859 }
1860 None
1861 }
1862 Value::Number(n) => {
1863 let ts = n.as_i64()?;
1864 let dt = if ts > 1_000_000_000_000 {
1865 DateTime::from_timestamp_millis(ts)?
1866 } else {
1867 DateTime::from_timestamp(ts, 0)?
1868 };
1869 Some(dt.with_timezone(&self.source_tz))
1870 }
1871 _ => None,
1872 }
1873 }
1874}
1875
1876impl Smt for TimezoneConverter {
1877 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
1878 if let Some(after) = &mut event.after {
1880 if let Some(obj) = after.as_object_mut() {
1881 for field in &self.fields {
1882 if let Some(value) = obj.get(field).cloned() {
1883 if let Some(converted) = self.convert_value(&value) {
1884 obj.insert(field.clone(), converted);
1885 }
1886 }
1887 }
1888 }
1889 }
1890
1891 if let Some(before) = &mut event.before {
1893 if let Some(obj) = before.as_object_mut() {
1894 for field in &self.fields {
1895 if let Some(value) = obj.get(field).cloned() {
1896 if let Some(converted) = self.convert_value(&value) {
1897 obj.insert(field.clone(), converted);
1898 }
1899 }
1900 }
1901 }
1902 }
1903
1904 Some(event)
1905 }
1906
1907 fn name(&self) -> &'static str {
1908 "TimezoneConverter"
1909 }
1910}
1911
1912#[derive(Clone)]
1930pub struct ContentRouter {
1931 routes: Vec<ContentRoute>,
1933 default_topic: Option<String>,
1935 topic_field: String,
1937}
1938
1939#[derive(Clone)]
1941struct ContentRoute {
1942 field: String,
1943 matcher: RouteMatcher,
1944 topic: String,
1945}
1946
1947#[derive(Clone)]
1949enum RouteMatcher {
1950 Exact(Value),
1952 Pattern(Regex),
1954 In(HashSet<String>),
1956 Predicate(Arc<dyn Fn(&Value) -> bool + Send + Sync>),
1958}
1959
1960impl std::fmt::Debug for ContentRouter {
1961 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1962 f.debug_struct("ContentRouter")
1963 .field("routes_count", &self.routes.len())
1964 .field("default_topic", &self.default_topic)
1965 .field("topic_field", &self.topic_field)
1966 .finish()
1967 }
1968}
1969
1970impl Default for ContentRouter {
1971 fn default() -> Self {
1972 Self::new()
1973 }
1974}
1975
1976impl ContentRouter {
1977 pub fn new() -> Self {
1979 Self {
1980 routes: Vec::new(),
1981 default_topic: None,
1982 topic_field: "__routing_topic".to_string(),
1983 }
1984 }
1985
1986 pub fn route(
1988 mut self,
1989 field: impl Into<String>,
1990 value: impl Into<Value>,
1991 topic: impl Into<String>,
1992 ) -> Self {
1993 self.routes.push(ContentRoute {
1994 field: field.into(),
1995 matcher: RouteMatcher::Exact(value.into()),
1996 topic: topic.into(),
1997 });
1998 self
1999 }
2000
2001 pub fn route_pattern(
2003 mut self,
2004 field: impl Into<String>,
2005 pattern: &str,
2006 topic: impl Into<String>,
2007 ) -> Self {
2008 if let Ok(re) = Regex::new(pattern) {
2009 self.routes.push(ContentRoute {
2010 field: field.into(),
2011 matcher: RouteMatcher::Pattern(re),
2012 topic: topic.into(),
2013 });
2014 }
2015 self
2016 }
2017
2018 pub fn route_in<I, S>(
2020 mut self,
2021 field: impl Into<String>,
2022 values: I,
2023 topic: impl Into<String>,
2024 ) -> Self
2025 where
2026 I: IntoIterator<Item = S>,
2027 S: Into<String>,
2028 {
2029 self.routes.push(ContentRoute {
2030 field: field.into(),
2031 matcher: RouteMatcher::In(values.into_iter().map(Into::into).collect()),
2032 topic: topic.into(),
2033 });
2034 self
2035 }
2036
2037 pub fn route_if<F>(
2039 mut self,
2040 field: impl Into<String>,
2041 predicate: F,
2042 topic: impl Into<String>,
2043 ) -> Self
2044 where
2045 F: Fn(&Value) -> bool + Send + Sync + 'static,
2046 {
2047 self.routes.push(ContentRoute {
2048 field: field.into(),
2049 matcher: RouteMatcher::Predicate(Arc::new(predicate)),
2050 topic: topic.into(),
2051 });
2052 self
2053 }
2054
2055 pub fn default_topic(mut self, topic: impl Into<String>) -> Self {
2057 self.default_topic = Some(topic.into());
2058 self
2059 }
2060
2061 pub fn topic_field(mut self, field: impl Into<String>) -> Self {
2063 self.topic_field = field.into();
2064 self
2065 }
2066
2067 fn find_topic(&self, event: &CdcEvent) -> Option<&str> {
2069 let after = event.after.as_ref()?;
2070 let obj = after.as_object()?;
2071
2072 for route in &self.routes {
2073 if let Some(value) = obj.get(&route.field) {
2074 let matches = match &route.matcher {
2075 RouteMatcher::Exact(expected) => value == expected,
2076 RouteMatcher::Pattern(re) => {
2077 value.as_str().map(|s| re.is_match(s)).unwrap_or(false)
2078 }
2079 RouteMatcher::In(set) => {
2080 value.as_str().map(|s| set.contains(s)).unwrap_or(false)
2081 }
2082 RouteMatcher::Predicate(f) => f(value),
2083 };
2084
2085 if matches {
2086 return Some(&route.topic);
2087 }
2088 }
2089 }
2090
2091 self.default_topic.as_deref()
2092 }
2093}
2094
2095impl Smt for ContentRouter {
2096 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
2097 if let Some(topic) = self.find_topic(&event) {
2098 if let Some(after) = &mut event.after {
2099 if let Some(obj) = after.as_object_mut() {
2100 obj.insert(self.topic_field.clone(), Value::String(topic.to_string()));
2101 }
2102 }
2103 }
2104 Some(event)
2105 }
2106
2107 fn name(&self) -> &'static str {
2108 "ContentRouter"
2109 }
2110}
2111
2112#[derive(Clone)]
2133pub struct ComputeField {
2134 computations: Vec<FieldComputation>,
2136}
2137
2138#[derive(Clone)]
2140struct FieldComputation {
2141 target: String,
2142 operation: ComputeOp,
2143}
2144
2145#[derive(Clone)]
2147enum ComputeOp {
2148 Concat(Vec<ConcatPart>),
2150 Hash(Vec<String>),
2152 Coalesce(Vec<String>),
2154 Sum(Vec<String>),
2156 Length(String),
2158 Upper(String),
2160 Lower(String),
2162 Substring(String, usize, Option<usize>),
2164 Replace(String, String, String),
2166 CurrentTimestamp,
2168 Uuid,
2170 JsonPath(String, String),
2172 Conditional(ComputeCondition, Value, Value),
2174}
2175
2176#[derive(Clone)]
2178enum ConcatPart {
2179 Field(String),
2180 Literal(String),
2181}
2182
2183#[derive(Clone)]
2185pub enum ComputeCondition {
2186 FieldEquals(String, Value),
2188 FieldIsNull(String),
2190 FieldMatches(String, String),
2192}
2193
2194impl std::fmt::Debug for ComputeField {
2195 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2196 f.debug_struct("ComputeField")
2197 .field("computations_count", &self.computations.len())
2198 .finish()
2199 }
2200}
2201
2202impl Default for ComputeField {
2203 fn default() -> Self {
2204 Self::new()
2205 }
2206}
2207
2208impl ComputeField {
2209 pub fn new() -> Self {
2211 Self {
2212 computations: Vec::new(),
2213 }
2214 }
2215
2216 pub fn concat<I>(mut self, target: impl Into<String>, parts: I) -> Self
2218 where
2219 I: IntoIterator,
2220 I::Item: Into<String>,
2221 {
2222 let parts: Vec<ConcatPart> = parts
2223 .into_iter()
2224 .map(|p| {
2225 let s: String = p.into();
2226 if let Some(field_name) = s.strip_prefix('$') {
2228 ConcatPart::Field(field_name.to_string())
2229 } else {
2230 ConcatPart::Literal(s)
2231 }
2232 })
2233 .collect();
2234
2235 self.computations.push(FieldComputation {
2236 target: target.into(),
2237 operation: ComputeOp::Concat(parts),
2238 });
2239 self
2240 }
2241
2242 pub fn hash<I, S>(mut self, target: impl Into<String>, fields: I) -> Self
2244 where
2245 I: IntoIterator<Item = S>,
2246 S: Into<String>,
2247 {
2248 self.computations.push(FieldComputation {
2249 target: target.into(),
2250 operation: ComputeOp::Hash(fields.into_iter().map(Into::into).collect()),
2251 });
2252 self
2253 }
2254
2255 pub fn coalesce<I, S>(mut self, target: impl Into<String>, fields: I) -> Self
2257 where
2258 I: IntoIterator<Item = S>,
2259 S: Into<String>,
2260 {
2261 self.computations.push(FieldComputation {
2262 target: target.into(),
2263 operation: ComputeOp::Coalesce(fields.into_iter().map(Into::into).collect()),
2264 });
2265 self
2266 }
2267
2268 pub fn sum<I, S>(mut self, target: impl Into<String>, fields: I) -> Self
2270 where
2271 I: IntoIterator<Item = S>,
2272 S: Into<String>,
2273 {
2274 self.computations.push(FieldComputation {
2275 target: target.into(),
2276 operation: ComputeOp::Sum(fields.into_iter().map(Into::into).collect()),
2277 });
2278 self
2279 }
2280
2281 pub fn length(mut self, target: impl Into<String>, field: impl Into<String>) -> Self {
2283 self.computations.push(FieldComputation {
2284 target: target.into(),
2285 operation: ComputeOp::Length(field.into()),
2286 });
2287 self
2288 }
2289
2290 pub fn upper(mut self, target: impl Into<String>, field: impl Into<String>) -> Self {
2292 self.computations.push(FieldComputation {
2293 target: target.into(),
2294 operation: ComputeOp::Upper(field.into()),
2295 });
2296 self
2297 }
2298
2299 pub fn lower(mut self, target: impl Into<String>, field: impl Into<String>) -> Self {
2301 self.computations.push(FieldComputation {
2302 target: target.into(),
2303 operation: ComputeOp::Lower(field.into()),
2304 });
2305 self
2306 }
2307
2308 pub fn substring(
2310 mut self,
2311 target: impl Into<String>,
2312 field: impl Into<String>,
2313 start: usize,
2314 len: Option<usize>,
2315 ) -> Self {
2316 self.computations.push(FieldComputation {
2317 target: target.into(),
2318 operation: ComputeOp::Substring(field.into(), start, len),
2319 });
2320 self
2321 }
2322
2323 pub fn replace(
2325 mut self,
2326 target: impl Into<String>,
2327 field: impl Into<String>,
2328 pattern: impl Into<String>,
2329 replacement: impl Into<String>,
2330 ) -> Self {
2331 self.computations.push(FieldComputation {
2332 target: target.into(),
2333 operation: ComputeOp::Replace(field.into(), pattern.into(), replacement.into()),
2334 });
2335 self
2336 }
2337
2338 pub fn current_timestamp(mut self, target: impl Into<String>) -> Self {
2340 self.computations.push(FieldComputation {
2341 target: target.into(),
2342 operation: ComputeOp::CurrentTimestamp,
2343 });
2344 self
2345 }
2346
2347 pub fn uuid(mut self, target: impl Into<String>) -> Self {
2349 self.computations.push(FieldComputation {
2350 target: target.into(),
2351 operation: ComputeOp::Uuid,
2352 });
2353 self
2354 }
2355
2356 pub fn json_path(
2358 mut self,
2359 target: impl Into<String>,
2360 field: impl Into<String>,
2361 path: impl Into<String>,
2362 ) -> Self {
2363 self.computations.push(FieldComputation {
2364 target: target.into(),
2365 operation: ComputeOp::JsonPath(field.into(), path.into()),
2366 });
2367 self
2368 }
2369
2370 pub fn conditional(
2372 mut self,
2373 target: impl Into<String>,
2374 condition: ComputeCondition,
2375 then_value: impl Into<Value>,
2376 else_value: impl Into<Value>,
2377 ) -> Self {
2378 self.computations.push(FieldComputation {
2379 target: target.into(),
2380 operation: ComputeOp::Conditional(condition, then_value.into(), else_value.into()),
2381 });
2382 self
2383 }
2384
2385 fn compute(&self, op: &ComputeOp, obj: &Map<String, Value>) -> Option<Value> {
2387 match op {
2388 ComputeOp::Concat(parts) => {
2389 let mut result = String::new();
2390 for part in parts {
2391 match part {
2392 ConcatPart::Literal(s) => result.push_str(s),
2393 ConcatPart::Field(f) => {
2394 if let Some(v) = obj.get(f) {
2395 match v {
2396 Value::String(s) => result.push_str(s),
2397 Value::Number(n) => result.push_str(&n.to_string()),
2398 Value::Bool(b) => result.push_str(&b.to_string()),
2399 _ => {}
2400 }
2401 }
2402 }
2403 }
2404 }
2405 Some(Value::String(result))
2406 }
2407 ComputeOp::Hash(fields) => {
2408 use sha2::{Digest, Sha256};
2409 let mut data = Vec::new();
2410 for field in fields {
2411 if let Some(v) = obj.get(field) {
2412 data.extend_from_slice(
2413 serde_json::to_string(v).unwrap_or_default().as_bytes(),
2414 );
2415 }
2416 }
2417 let hash = Sha256::digest(&data);
2418 Some(Value::String(hex::encode(hash)))
2419 }
2420 ComputeOp::Coalesce(fields) => {
2421 for field in fields {
2422 if let Some(v) = obj.get(field) {
2423 if !v.is_null() {
2424 return Some(v.clone());
2425 }
2426 }
2427 }
2428 None
2429 }
2430 ComputeOp::Sum(fields) => {
2431 let mut sum = 0.0;
2432 for field in fields {
2433 if let Some(v) = obj.get(field) {
2434 if let Some(n) = v.as_f64() {
2435 sum += n;
2436 }
2437 }
2438 }
2439 Some(Value::from(sum))
2440 }
2441 ComputeOp::Length(field) => obj
2442 .get(field)
2443 .and_then(|v| v.as_str())
2444 .map(|s| Value::from(s.len() as i64)),
2445 ComputeOp::Upper(field) => obj
2446 .get(field)
2447 .and_then(|v| v.as_str())
2448 .map(|s| Value::String(s.to_uppercase())),
2449 ComputeOp::Lower(field) => obj
2450 .get(field)
2451 .and_then(|v| v.as_str())
2452 .map(|s| Value::String(s.to_lowercase())),
2453 ComputeOp::Substring(field, start, len) => {
2454 obj.get(field).and_then(|v| v.as_str()).map(|s| {
2455 let chars: Vec<char> = s.chars().collect();
2456 let end = len
2457 .map(|l| (*start + l).min(chars.len()))
2458 .unwrap_or(chars.len());
2459 let substr: String = chars[*start.min(&chars.len())..end].iter().collect();
2460 Value::String(substr)
2461 })
2462 }
2463 ComputeOp::Replace(field, pattern, replacement) => {
2464 if let Ok(re) = Regex::new(pattern) {
2465 obj.get(field)
2466 .and_then(|v| v.as_str())
2467 .map(|s| Value::String(re.replace_all(s, replacement.as_str()).to_string()))
2468 } else {
2469 None
2470 }
2471 }
2472 ComputeOp::CurrentTimestamp => Some(Value::String(Utc::now().to_rfc3339())),
2473 ComputeOp::Uuid => {
2474 Some(Value::String(uuid::Uuid::new_v4().to_string()))
2476 }
2477 ComputeOp::JsonPath(field, path) => {
2478 obj.get(field).and_then(|v| {
2480 let parts: Vec<&str> = path.split('.').collect();
2481 let mut current = v;
2482 for part in parts {
2483 current = current.get(part)?;
2484 }
2485 Some(current.clone())
2486 })
2487 }
2488 ComputeOp::Conditional(cond, then_val, else_val) => {
2489 let matches = match cond {
2490 ComputeCondition::FieldEquals(f, v) => {
2491 obj.get(f).map(|fv| fv == v).unwrap_or(false)
2492 }
2493 ComputeCondition::FieldIsNull(f) => {
2494 obj.get(f).map(|v| v.is_null()).unwrap_or(true)
2495 }
2496 ComputeCondition::FieldMatches(f, pattern) => {
2497 if let (Some(v), Ok(re)) = (obj.get(f), Regex::new(pattern)) {
2498 v.as_str().map(|s| re.is_match(s)).unwrap_or(false)
2499 } else {
2500 false
2501 }
2502 }
2503 };
2504 Some(if matches {
2505 then_val.clone()
2506 } else {
2507 else_val.clone()
2508 })
2509 }
2510 }
2511 }
2512}
2513
2514impl Smt for ComputeField {
2515 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
2516 if let Some(after) = &mut event.after {
2517 if let Some(obj) = after.as_object_mut() {
2518 for comp in &self.computations {
2519 let obj_clone = obj.clone();
2521 if let Some(value) = self.compute(&comp.operation, &obj_clone) {
2522 obj.insert(comp.target.clone(), value);
2523 }
2524 }
2525 }
2526 }
2527
2528 Some(event)
2529 }
2530
2531 fn name(&self) -> &'static str {
2532 "ComputeField"
2533 }
2534}
2535
2536#[cfg(feature = "cloud-storage")]
2594#[derive(Debug, Clone)]
2595pub struct ExternalizeBlob {
2596 store: Arc<dyn object_store::ObjectStore>,
2598 bucket: String,
2600 size_threshold: usize,
2602 fields: HashSet<String>,
2604 prefix: String,
2606 url_scheme: String,
2608}
2609
2610#[cfg(feature = "cloud-storage")]
2611impl ExternalizeBlob {
2612 pub fn new(store: Arc<dyn object_store::ObjectStore>, bucket: impl Into<String>) -> Self {
2619 Self {
2620 store,
2621 bucket: bucket.into(),
2622 size_threshold: 10 * 1024, fields: HashSet::new(),
2624 prefix: String::new(),
2625 url_scheme: "s3://".to_string(),
2626 }
2627 }
2628
2629 #[cfg(feature = "s3")]
2631 pub fn s3(
2632 bucket: impl Into<String>,
2633 region: impl Into<String>,
2634 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
2635 use object_store::aws::AmazonS3Builder;
2636
2637 let bucket = bucket.into();
2638 let store = AmazonS3Builder::new()
2639 .with_bucket_name(&bucket)
2640 .with_region(region)
2641 .with_allow_http(false)
2642 .build()?;
2643
2644 Ok(Self {
2645 store: Arc::new(store),
2646 bucket,
2647 size_threshold: 10 * 1024,
2648 fields: HashSet::new(),
2649 prefix: String::new(),
2650 url_scheme: "s3://".to_string(),
2651 })
2652 }
2653
2654 #[cfg(feature = "gcs")]
2656 pub fn gcs(
2657 bucket: impl Into<String>,
2658 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
2659 use object_store::gcp::GoogleCloudStorageBuilder;
2660
2661 let bucket = bucket.into();
2662 let store = GoogleCloudStorageBuilder::new()
2663 .with_bucket_name(&bucket)
2664 .build()?;
2665
2666 Ok(Self {
2667 store: Arc::new(store),
2668 bucket,
2669 size_threshold: 10 * 1024,
2670 fields: HashSet::new(),
2671 prefix: String::new(),
2672 url_scheme: "gs://".to_string(),
2673 })
2674 }
2675
2676 #[cfg(feature = "azure")]
2678 pub fn azure(
2679 account: impl Into<String>,
2680 container: impl Into<String>,
2681 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
2682 use object_store::azure::MicrosoftAzureBuilder;
2683
2684 let container = container.into();
2685 let store = MicrosoftAzureBuilder::new()
2686 .with_account(account)
2687 .with_container_name(&container)
2688 .build()?;
2689
2690 Ok(Self {
2691 store: Arc::new(store),
2692 bucket: container,
2693 size_threshold: 10 * 1024,
2694 fields: HashSet::new(),
2695 prefix: String::new(),
2696 url_scheme: "https://".to_string(),
2697 })
2698 }
2699
2700 pub fn local(
2702 path: impl Into<std::path::PathBuf>,
2703 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
2704 use object_store::local::LocalFileSystem;
2705
2706 let path: std::path::PathBuf = path.into();
2707 let store = LocalFileSystem::new_with_prefix(&path)?;
2708
2709 Ok(Self {
2710 store: Arc::new(store),
2711 bucket: path.to_string_lossy().to_string(),
2712 size_threshold: 10 * 1024,
2713 fields: HashSet::new(),
2714 prefix: String::new(),
2715 url_scheme: "file://".to_string(),
2716 })
2717 }
2718
2719 pub fn size_threshold(mut self, bytes: usize) -> Self {
2724 self.size_threshold = bytes;
2725 self
2726 }
2727
2728 pub fn fields<I, S>(mut self, fields: I) -> Self
2732 where
2733 I: IntoIterator<Item = S>,
2734 S: Into<String>,
2735 {
2736 self.fields = fields.into_iter().map(Into::into).collect();
2737 self
2738 }
2739
2740 pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
2744 self.prefix = prefix.into();
2745 self
2746 }
2747
2748 pub fn url_scheme(mut self, scheme: impl Into<String>) -> Self {
2752 self.url_scheme = scheme.into();
2753 self
2754 }
2755
2756 fn should_externalize(&self, field: &str, value: &Value) -> bool {
2758 if !self.fields.is_empty() && !self.fields.contains(field) {
2760 return false;
2761 }
2762
2763 let size = self.estimate_size(value);
2765 size >= self.size_threshold
2766 }
2767
2768 fn estimate_size(&self, value: &Value) -> usize {
2770 match value {
2771 Value::String(s) => s.len(),
2772 Value::Object(_) | Value::Array(_) => {
2773 serde_json::to_vec(value).map(|v| v.len()).unwrap_or(0)
2775 }
2776 Value::Number(_) => 8,
2777 Value::Bool(_) => 1,
2778 Value::Null => 0,
2779 }
2780 }
2781
2782 fn decode_blob(&self, value: &Value) -> Option<Vec<u8>> {
2784 match value {
2785 Value::String(s) => {
2786 use base64::Engine;
2788 base64::engine::general_purpose::STANDARD
2790 .decode(s)
2791 .ok()
2792 .or_else(|| Some(s.as_bytes().to_vec()))
2793 }
2794 _ => {
2795 serde_json::to_vec(value).ok()
2797 }
2798 }
2799 }
2800
2801 fn generate_key(&self, table: &str, field: &str) -> String {
2803 let timestamp = chrono::Utc::now().timestamp_millis();
2804 let uuid = uuid::Uuid::new_v4();
2805
2806 let mut key = String::new();
2807 if !self.prefix.is_empty() {
2808 key.push_str(&self.prefix);
2809 if !self.prefix.ends_with('/') {
2810 key.push('/');
2811 }
2812 }
2813 key.push_str(&format!("{}/{}/{}_{}.bin", table, field, timestamp, uuid));
2814 key
2815 }
2816
2817 fn create_reference(&self, url: &str, size: usize, sha256: &str, content_type: &str) -> Value {
2819 serde_json::json!({
2820 "__externalized": true,
2821 "url": url,
2822 "size": size,
2823 "content_type": content_type,
2824 "sha256": sha256
2825 })
2826 }
2827
2828 fn externalize_value(&self, table: &str, field: &str, value: &Value) -> Option<Value> {
2830 let blob_data = self.decode_blob(value)?;
2832 let size = blob_data.len();
2833
2834 use sha2::{Digest, Sha256};
2836 let hash = Sha256::digest(&blob_data);
2837 let sha256 = hex::encode(hash);
2838
2839 let key = self.generate_key(table, field);
2841
2842 let content_type = if value.is_string() {
2844 "application/octet-stream"
2845 } else {
2846 "application/json"
2847 };
2848
2849 let store = self.store.clone();
2851 let key_path = object_store::path::Path::from(key.clone());
2852 let payload = object_store::PutPayload::from(blob_data);
2853
2854 let upload_result = match tokio::runtime::Handle::try_current() {
2856 Ok(handle) => {
2857 tokio::task::block_in_place(|| {
2859 handle.block_on(async { store.put(&key_path, payload).await })
2860 })
2861 }
2862 Err(_) => {
2863 let rt = match tokio::runtime::Runtime::new() {
2865 Ok(rt) => rt,
2866 Err(e) => {
2867 warn!("Failed to create runtime for blob upload: {}", e);
2868 return None;
2869 }
2870 };
2871 rt.block_on(async { store.put(&key_path, payload).await })
2872 }
2873 };
2874
2875 match upload_result {
2876 Ok(_) => {
2877 let url = format!("{}{}/{}", self.url_scheme, self.bucket, key);
2879 Some(self.create_reference(&url, size, &sha256, content_type))
2880 }
2881 Err(e) => {
2882 warn!("Failed to externalize blob for {}.{}: {}", table, field, e);
2883 None
2884 }
2885 }
2886 }
2887
2888 fn process_object(&self, table: &str, obj: &mut Map<String, Value>) {
2890 let fields_to_process: Vec<String> = obj.keys().cloned().collect();
2891
2892 for field in fields_to_process {
2893 if let Some(value) = obj.get(&field) {
2894 if self.should_externalize(&field, value) {
2895 if let Some(reference) = self.externalize_value(table, &field, value) {
2896 obj.insert(field, reference);
2897 }
2898 }
2899 }
2900 }
2901 }
2902}
2903
2904#[cfg(feature = "cloud-storage")]
2905impl Smt for ExternalizeBlob {
2906 fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
2907 let table = event.table.clone();
2908
2909 if let Some(after) = &mut event.after {
2911 if let Some(obj) = after.as_object_mut() {
2912 self.process_object(&table, obj);
2913 }
2914 }
2915
2916 if let Some(before) = &mut event.before {
2918 if let Some(obj) = before.as_object_mut() {
2919 self.process_object(&table, obj);
2920 }
2921 }
2922
2923 Some(event)
2924 }
2925
2926 fn name(&self) -> &'static str {
2927 "ExternalizeBlob"
2928 }
2929}
2930
2931#[cfg(test)]
2936mod tests {
2937 use super::*;
2938 use serde_json::json;
2939
2940 fn make_event(op: CdcOp, before: Option<Value>, after: Option<Value>) -> CdcEvent {
2941 CdcEvent {
2942 source_type: "postgres".to_string(),
2943 database: "testdb".to_string(),
2944 schema: "public".to_string(),
2945 table: "users".to_string(),
2946 op,
2947 before,
2948 after,
2949 timestamp: chrono::Utc::now().timestamp(),
2950 transaction: None,
2951 }
2952 }
2953
2954 #[test]
2956 fn test_smt_chain_empty() {
2957 let chain = SmtChain::new();
2958 assert!(chain.is_empty());
2959 assert_eq!(chain.len(), 0);
2960 }
2961
2962 #[test]
2963 fn test_smt_chain_apply() {
2964 let event = make_event(
2965 CdcOp::Insert,
2966 None,
2967 Some(json!({"name": "Alice", "email": "alice@test.com"})),
2968 );
2969
2970 let chain = SmtChain::new()
2971 .add(MaskField::new(["email"]))
2972 .add(InsertField::new().static_field("version", json!(1)));
2973
2974 let result = chain.apply(event).unwrap();
2975 let after = result.after.unwrap();
2976
2977 assert_eq!(after["name"], "Alice");
2978 assert_ne!(after["email"], "alice@test.com"); assert_eq!(after["version"], 1);
2980 }
2981
2982 #[test]
2983 fn test_smt_chain_names() {
2984 let chain = SmtChain::new()
2985 .add(MaskField::new(["email"]))
2986 .add(Flatten::new());
2987
2988 let names = chain.names();
2989 assert_eq!(names, vec!["MaskField", "Flatten"]);
2990 }
2991
2992 #[test]
2994 fn test_extract_new_record_state() {
2995 let event = make_event(
2996 CdcOp::Update,
2997 Some(json!({"id": 1, "name": "Old"})),
2998 Some(json!({"id": 1, "name": "New"})),
2999 );
3000
3001 let smt = ExtractNewRecordState::new()
3002 .add_op_field()
3003 .add_table_field();
3004
3005 let result = smt.apply(event).unwrap();
3006 let after = result.after.unwrap();
3007
3008 assert_eq!(after["__op"], "u");
3009 assert_eq!(after["__table"], "users");
3010 }
3011
3012 #[test]
3013 fn test_extract_drop_tombstones() {
3014 let event = make_event(CdcOp::Delete, Some(json!({"id": 1})), None);
3015
3016 let smt = ExtractNewRecordState::new().drop_tombstones();
3017 assert!(smt.apply(event).is_none());
3018 }
3019
3020 #[test]
3021 fn test_extract_rewrite_deletes() {
3022 let event = make_event(
3023 CdcOp::Delete,
3024 Some(json!({"id": 1, "name": "Deleted"})),
3025 None,
3026 );
3027
3028 let smt = ExtractNewRecordState::new().delete_handling(DeleteHandling::Rewrite);
3029
3030 let result = smt.apply(event).unwrap();
3031 let after = result.after.unwrap();
3032
3033 assert_eq!(after["__deleted"], true);
3034 assert_eq!(after["id"], 1);
3035 }
3036
3037 #[test]
3038 fn test_extract_custom_prefix() {
3039 let event = make_event(CdcOp::Insert, None, Some(json!({"id": 1})));
3040
3041 let smt = ExtractNewRecordState::new()
3042 .header_prefix("_cdc_")
3043 .add_op_field()
3044 .add_schema_field();
3045
3046 let result = smt.apply(event).unwrap();
3047 let after = result.after.unwrap();
3048
3049 assert_eq!(after["_cdc_op"], "c");
3050 assert_eq!(after["_cdc_schema"], "public");
3051 }
3052
3053 #[test]
3055 fn test_value_to_key() {
3056 let event = make_event(
3057 CdcOp::Insert,
3058 None,
3059 Some(json!({"id": 42, "name": "Alice", "email": "alice@test.com"})),
3060 );
3061
3062 let smt = ValueToKey::with_fields(["id"]);
3063 let result = smt.apply(event).unwrap();
3064 let after = result.after.unwrap();
3065
3066 assert_eq!(after["__key"], json!({"id": 42}));
3067 }
3068
3069 #[test]
3070 fn test_value_to_key_multiple_fields() {
3071 let event = make_event(
3072 CdcOp::Insert,
3073 None,
3074 Some(json!({"org_id": 1, "user_id": 2, "name": "Alice"})),
3075 );
3076
3077 let smt = ValueToKey::with_fields(["org_id", "user_id"]);
3078 let result = smt.apply(event).unwrap();
3079 let after = result.after.unwrap();
3080
3081 assert_eq!(after["__key"], json!({"org_id": 1, "user_id": 2}));
3082 }
3083
3084 #[test]
3086 fn test_timestamp_converter_to_iso() {
3087 let event = make_event(
3088 CdcOp::Insert,
3089 None,
3090 Some(json!({"created_at": 1705320000000i64})), );
3092
3093 let smt = TimestampConverter::new(["created_at"], TimestampFormat::Iso8601);
3094 let result = smt.apply(event).unwrap();
3095 let after = result.after.unwrap();
3096
3097 assert!(after["created_at"].as_str().unwrap().contains("2024-01-15"));
3098 }
3099
3100 #[test]
3101 fn test_timestamp_converter_to_epoch() {
3102 let event = make_event(
3103 CdcOp::Insert,
3104 None,
3105 Some(json!({"created_at": "2024-01-15T10:00:00Z"})),
3106 );
3107
3108 let smt = TimestampConverter::new(["created_at"], TimestampFormat::EpochMillis);
3109 let result = smt.apply(event).unwrap();
3110 let after = result.after.unwrap();
3111
3112 assert!(after["created_at"].is_number());
3113 }
3114
3115 #[test]
3116 fn test_timestamp_converter_date_only() {
3117 let event = make_event(
3118 CdcOp::Insert,
3119 None,
3120 Some(json!({"created_at": 1705320000000i64})),
3121 );
3122
3123 let smt = TimestampConverter::new(["created_at"], TimestampFormat::DateOnly);
3124 let result = smt.apply(event).unwrap();
3125 let after = result.after.unwrap();
3126
3127 assert_eq!(after["created_at"], "2024-01-15");
3128 }
3129
3130 #[test]
3132 fn test_mask_field_asterisks() {
3133 let event = make_event(
3134 CdcOp::Insert,
3135 None,
3136 Some(json!({"name": "Alice", "ssn": "123-45-6789"})),
3137 );
3138
3139 let smt = MaskField::new(["ssn"]);
3140 let result = smt.apply(event).unwrap();
3141 let after = result.after.unwrap();
3142
3143 assert_eq!(after["name"], "Alice");
3144 assert_eq!(after["ssn"], "***********");
3145 }
3146
3147 #[test]
3148 fn test_mask_field_partial() {
3149 let event = make_event(
3150 CdcOp::Insert,
3151 None,
3152 Some(json!({"card": "4111111111111111"})),
3153 );
3154
3155 let smt = MaskField::new(["card"]).with_strategy(MaskStrategy::PartialMask {
3156 keep_first: 4,
3157 keep_last: 4,
3158 });
3159
3160 let result = smt.apply(event).unwrap();
3161 let after = result.after.unwrap();
3162
3163 assert_eq!(after["card"], "4111********1111");
3164 }
3165
3166 #[test]
3167 fn test_mask_field_hash() {
3168 let event = make_event(
3169 CdcOp::Insert,
3170 None,
3171 Some(json!({"email": "alice@test.com"})),
3172 );
3173
3174 let smt = MaskField::new(["email"]).with_strategy(MaskStrategy::Hash);
3175 let result = smt.apply(event).unwrap();
3176 let after = result.after.unwrap();
3177
3178 assert!(after["email"]
3180 .as_str()
3181 .unwrap()
3182 .chars()
3183 .all(|c| c.is_ascii_hexdigit()));
3184 }
3185
3186 #[test]
3187 fn test_mask_field_redact() {
3188 let event = make_event(
3189 CdcOp::Insert,
3190 None,
3191 Some(json!({"name": "Alice", "password": "secret"})),
3192 );
3193
3194 let smt = MaskField::new(["password"]).with_strategy(MaskStrategy::Redact);
3195 let result = smt.apply(event).unwrap();
3196 let after = result.after.unwrap();
3197
3198 assert!(!after.as_object().unwrap().contains_key("password"));
3199 }
3200
3201 #[test]
3202 fn test_mask_field_fixed() {
3203 let event = make_event(
3204 CdcOp::Insert,
3205 None,
3206 Some(json!({"email": "alice@test.com"})),
3207 );
3208
3209 let smt =
3210 MaskField::new(["email"]).with_strategy(MaskStrategy::Fixed("[REDACTED]".to_string()));
3211 let result = smt.apply(event).unwrap();
3212 let after = result.after.unwrap();
3213
3214 assert_eq!(after["email"], "[REDACTED]");
3215 }
3216
3217 #[test]
3219 fn test_replace_field_rename() {
3220 let event = make_event(
3221 CdcOp::Insert,
3222 None,
3223 Some(json!({"first_name": "Alice", "last_name": "Smith"})),
3224 );
3225
3226 let smt = ReplaceField::new()
3227 .rename("first_name", "firstName")
3228 .rename("last_name", "lastName");
3229
3230 let result = smt.apply(event).unwrap();
3231 let after = result.after.unwrap();
3232
3233 assert_eq!(after["firstName"], "Alice");
3234 assert_eq!(after["lastName"], "Smith");
3235 assert!(!after.as_object().unwrap().contains_key("first_name"));
3236 }
3237
3238 #[test]
3239 fn test_replace_field_include() {
3240 let event = make_event(
3241 CdcOp::Insert,
3242 None,
3243 Some(json!({"id": 1, "name": "Alice", "internal_field": "secret"})),
3244 );
3245
3246 let smt = ReplaceField::new().include(["id", "name"]);
3247 let result = smt.apply(event).unwrap();
3248 let after = result.after.unwrap();
3249
3250 assert_eq!(after["id"], 1);
3251 assert_eq!(after["name"], "Alice");
3252 assert!(!after.as_object().unwrap().contains_key("internal_field"));
3253 }
3254
3255 #[test]
3256 fn test_replace_field_exclude() {
3257 let event = make_event(
3258 CdcOp::Insert,
3259 None,
3260 Some(json!({"id": 1, "name": "Alice", "password_hash": "xxx"})),
3261 );
3262
3263 let smt = ReplaceField::new().exclude(["password_hash"]);
3264 let result = smt.apply(event).unwrap();
3265 let after = result.after.unwrap();
3266
3267 assert!(after.as_object().unwrap().contains_key("id"));
3268 assert!(!after.as_object().unwrap().contains_key("password_hash"));
3269 }
3270
3271 #[test]
3273 fn test_insert_field_static() {
3274 let event = make_event(CdcOp::Insert, None, Some(json!({"name": "Alice"})));
3275
3276 let smt = InsertField::new()
3277 .static_field("version", json!(1))
3278 .static_field("source", json!("cdc"));
3279
3280 let result = smt.apply(event).unwrap();
3281 let after = result.after.unwrap();
3282
3283 assert_eq!(after["version"], 1);
3284 assert_eq!(after["source"], "cdc");
3285 }
3286
3287 #[test]
3288 fn test_insert_field_timestamp() {
3289 let event = make_event(CdcOp::Insert, None, Some(json!({"name": "Alice"})));
3290
3291 let smt = InsertField::new().timestamp_field("processed_at");
3292 let result = smt.apply(event).unwrap();
3293 let after = result.after.unwrap();
3294
3295 assert!(after["processed_at"].as_str().is_some());
3296 }
3297
3298 #[test]
3299 fn test_insert_field_copy() {
3300 let event = make_event(
3301 CdcOp::Insert,
3302 None,
3303 Some(json!({"id": 42, "name": "Alice"})),
3304 );
3305
3306 let smt = InsertField::new().copy_field("original_id", "id");
3307 let result = smt.apply(event).unwrap();
3308 let after = result.after.unwrap();
3309
3310 assert_eq!(after["original_id"], 42);
3311 }
3312
3313 #[test]
3315 fn test_filter_keep_equals() {
3316 let event = make_event(
3317 CdcOp::Insert,
3318 None,
3319 Some(json!({"status": "active", "name": "Alice"})),
3320 );
3321
3322 let smt = Filter::keep(FilterCondition::Equals {
3323 field: "status".to_string(),
3324 value: json!("active"),
3325 });
3326
3327 assert!(smt.apply(event).is_some());
3328 }
3329
3330 #[test]
3331 fn test_filter_drop_equals() {
3332 let event = make_event(
3333 CdcOp::Insert,
3334 None,
3335 Some(json!({"status": "deleted", "name": "Alice"})),
3336 );
3337
3338 let smt = Filter::drop(FilterCondition::Equals {
3339 field: "status".to_string(),
3340 value: json!("deleted"),
3341 });
3342
3343 assert!(smt.apply(event).is_none());
3344 }
3345
3346 #[test]
3347 fn test_filter_regex() {
3348 let event = make_event(
3349 CdcOp::Insert,
3350 None,
3351 Some(json!({"email": "alice@example.com"})),
3352 );
3353
3354 let smt = Filter::keep(FilterCondition::Matches {
3355 field: "email".to_string(),
3356 pattern: r".*@example\.com$".to_string(),
3357 });
3358
3359 assert!(smt.apply(event).is_some());
3360 }
3361
3362 #[test]
3363 fn test_filter_and() {
3364 let event = make_event(
3365 CdcOp::Insert,
3366 None,
3367 Some(json!({"status": "active", "role": "admin"})),
3368 );
3369
3370 let smt = Filter::keep(FilterCondition::And(vec![
3371 FilterCondition::Equals {
3372 field: "status".to_string(),
3373 value: json!("active"),
3374 },
3375 FilterCondition::Equals {
3376 field: "role".to_string(),
3377 value: json!("admin"),
3378 },
3379 ]));
3380
3381 assert!(smt.apply(event).is_some());
3382 }
3383
3384 #[test]
3385 fn test_filter_or() {
3386 let event = make_event(
3387 CdcOp::Insert,
3388 None,
3389 Some(json!({"status": "deleted", "role": "admin"})),
3390 );
3391
3392 let smt = Filter::keep(FilterCondition::Or(vec![
3393 FilterCondition::Equals {
3394 field: "status".to_string(),
3395 value: json!("active"),
3396 },
3397 FilterCondition::Equals {
3398 field: "role".to_string(),
3399 value: json!("admin"),
3400 },
3401 ]));
3402
3403 assert!(smt.apply(event).is_some());
3404 }
3405
3406 #[test]
3407 fn test_filter_not() {
3408 let event = make_event(CdcOp::Insert, None, Some(json!({"status": "active"})));
3409
3410 let smt = Filter::keep(FilterCondition::Not(Box::new(FilterCondition::Equals {
3411 field: "status".to_string(),
3412 value: json!("deleted"),
3413 })));
3414
3415 assert!(smt.apply(event).is_some());
3416 }
3417
3418 #[test]
3419 fn test_filter_is_null() {
3420 let event = make_event(
3421 CdcOp::Insert,
3422 None,
3423 Some(json!({"name": "Alice", "email": null})),
3424 );
3425
3426 let smt = Filter::keep(FilterCondition::IsNull {
3427 field: "email".to_string(),
3428 });
3429
3430 assert!(smt.apply(event).is_some());
3431 }
3432
3433 #[test]
3434 fn test_filter_in() {
3435 let event = make_event(CdcOp::Insert, None, Some(json!({"status": "pending"})));
3436
3437 let smt = Filter::keep(FilterCondition::In {
3438 field: "status".to_string(),
3439 values: vec![json!("pending"), json!("active"), json!("approved")],
3440 });
3441
3442 assert!(smt.apply(event).is_some());
3443 }
3444
3445 #[test]
3447 fn test_cast_to_string() {
3448 let event = make_event(CdcOp::Insert, None, Some(json!({"id": 42, "active": true})));
3449
3450 let smt = Cast::new()
3451 .field("id", CastType::String)
3452 .field("active", CastType::String);
3453
3454 let result = smt.apply(event).unwrap();
3455 let after = result.after.unwrap();
3456
3457 assert_eq!(after["id"], "42");
3458 assert_eq!(after["active"], "true");
3459 }
3460
3461 #[test]
3462 fn test_cast_string_to_int() {
3463 let event = make_event(CdcOp::Insert, None, Some(json!({"count": "42"})));
3464
3465 let smt = Cast::new().field("count", CastType::Integer);
3466 let result = smt.apply(event).unwrap();
3467 let after = result.after.unwrap();
3468
3469 assert_eq!(after["count"], 42);
3470 }
3471
3472 #[test]
3473 fn test_cast_to_boolean() {
3474 let event = make_event(
3475 CdcOp::Insert,
3476 None,
3477 Some(json!({"flag1": "true", "flag2": 1, "flag3": "yes"})),
3478 );
3479
3480 let smt = Cast::new()
3481 .field("flag1", CastType::Boolean)
3482 .field("flag2", CastType::Boolean)
3483 .field("flag3", CastType::Boolean);
3484
3485 let result = smt.apply(event).unwrap();
3486 let after = result.after.unwrap();
3487
3488 assert_eq!(after["flag1"], true);
3489 assert_eq!(after["flag2"], true);
3490 assert_eq!(after["flag3"], true);
3491 }
3492
3493 #[test]
3494 fn test_cast_json_string() {
3495 let event = make_event(
3496 CdcOp::Insert,
3497 None,
3498 Some(json!({"config": "{\"key\":\"value\"}"})),
3499 );
3500
3501 let smt = Cast::new().field("config", CastType::Json);
3502 let result = smt.apply(event).unwrap();
3503 let after = result.after.unwrap();
3504
3505 assert_eq!(after["config"]["key"], "value");
3506 }
3507
3508 #[test]
3510 fn test_flatten() {
3511 let event = make_event(
3512 CdcOp::Insert,
3513 None,
3514 Some(json!({
3515 "user": {
3516 "name": "Alice",
3517 "address": {
3518 "city": "NYC",
3519 "zip": "10001"
3520 }
3521 }
3522 })),
3523 );
3524
3525 let smt = Flatten::new();
3526 let result = smt.apply(event).unwrap();
3527 let after = result.after.unwrap();
3528
3529 assert_eq!(after["user.name"], "Alice");
3530 assert_eq!(after["user.address.city"], "NYC");
3531 }
3532
3533 #[test]
3534 fn test_flatten_max_depth() {
3535 let event = make_event(
3536 CdcOp::Insert,
3537 None,
3538 Some(json!({
3539 "level1": {
3540 "level2": {
3541 "level3": "deep"
3542 }
3543 }
3544 })),
3545 );
3546
3547 let smt = Flatten::new().max_depth(1);
3548 let result = smt.apply(event).unwrap();
3549 let after = result.after.unwrap();
3550
3551 assert!(after.as_object().unwrap().contains_key("level1.level2"));
3553 }
3554
3555 #[test]
3556 fn test_flatten_custom_delimiter() {
3557 let event = make_event(
3558 CdcOp::Insert,
3559 None,
3560 Some(json!({
3561 "user": {
3562 "name": "Alice"
3563 }
3564 })),
3565 );
3566
3567 let smt = Flatten::new().delimiter("_");
3568 let result = smt.apply(event).unwrap();
3569 let after = result.after.unwrap();
3570
3571 assert_eq!(after["user_name"], "Alice");
3572 }
3573
3574 #[test]
3576 fn test_regex_router() {
3577 let event = make_event(CdcOp::Insert, None, Some(json!({"id": 1})));
3578
3579 let smt = RegexRouter::new("cdc.default")
3580 .route(r"^public\.users$", "cdc.users")
3581 .route(r"^public\.orders.*", "cdc.orders");
3582
3583 let result = smt.apply(event).unwrap();
3584 let after = result.after.unwrap();
3585 assert_eq!(after["__topic"], "cdc.users");
3586 }
3587
3588 #[test]
3589 fn test_regex_router_default() {
3590 let mut event = make_event(CdcOp::Insert, None, Some(json!({"id": 1})));
3591 event.table = "unknown_table".to_string();
3592
3593 let smt = RegexRouter::new("cdc.default").route(r"^public\.users$", "cdc.users");
3594
3595 let result = smt.apply(event).unwrap();
3596 let after = result.after.unwrap();
3597 assert_eq!(after["__topic"], "cdc.default");
3598 }
3599
3600 #[test]
3602 fn test_full_transform_chain() {
3603 let event = make_event(
3604 CdcOp::Update,
3605 Some(json!({"id": 1, "ssn": "123-45-6789", "name": "Old"})),
3606 Some(
3607 json!({"id": 1, "ssn": "123-45-6789", "name": "New", "created_at": 1705320000000i64}),
3608 ),
3609 );
3610
3611 let chain = SmtChain::new()
3612 .add(
3613 ExtractNewRecordState::new()
3614 .add_op_field()
3615 .add_table_field(),
3616 )
3617 .add(MaskField::new(["ssn"]))
3618 .add(TimestampConverter::new(
3619 ["created_at"],
3620 TimestampFormat::Iso8601,
3621 ))
3622 .add(InsertField::new().static_field("_version", json!(1)));
3623
3624 let result = chain.apply(event).unwrap();
3625 let after = result.after.unwrap();
3626
3627 assert_eq!(after["__op"], "u");
3628 assert_eq!(after["__table"], "users");
3629 assert_eq!(after["ssn"], "***********");
3630 assert!(after["created_at"].as_str().unwrap().contains("2024"));
3631 assert_eq!(after["_version"], 1);
3632 }
3633
3634 #[test]
3635 fn test_filter_drops_chain() {
3636 let event = make_event(CdcOp::Insert, None, Some(json!({"status": "deleted"})));
3637
3638 let chain = SmtChain::new()
3639 .add(Filter::drop(FilterCondition::Equals {
3640 field: "status".to_string(),
3641 value: json!("deleted"),
3642 }))
3643 .add(InsertField::new().static_field("processed", json!(true)));
3644
3645 assert!(chain.apply(event).is_none());
3647 }
3648
3649 #[test]
3651 fn test_predicate_table() {
3652 let event = make_event(CdcOp::Insert, None, Some(json!({"id": 1})));
3653
3654 let predicate = Predicate::table(r"^users$").unwrap();
3655 assert!(predicate.matches(&event));
3656
3657 let predicate = Predicate::table(r"^orders$").unwrap();
3658 assert!(!predicate.matches(&event));
3659 }
3660
3661 #[test]
3662 fn test_predicate_operation() {
3663 let event = make_event(CdcOp::Insert, None, Some(json!({"id": 1})));
3664
3665 let predicate = Predicate::operation(vec![CdcOp::Insert, CdcOp::Update]);
3666 assert!(predicate.matches(&event));
3667
3668 let predicate = Predicate::operation(vec![CdcOp::Delete]);
3669 assert!(!predicate.matches(&event));
3670 }
3671
3672 #[test]
3673 fn test_predicate_field_value() {
3674 let event = make_event(
3675 CdcOp::Insert,
3676 None,
3677 Some(json!({"status": "active", "id": 1})),
3678 );
3679
3680 let predicate = Predicate::field_equals("status", json!("active"));
3681 assert!(predicate.matches(&event));
3682
3683 let predicate = Predicate::field_equals("status", json!("deleted"));
3684 assert!(!predicate.matches(&event));
3685 }
3686
3687 #[test]
3688 fn test_predicate_and() {
3689 let event = make_event(CdcOp::Insert, None, Some(json!({"status": "active"})));
3690
3691 let predicate = Predicate::And(vec![
3692 Predicate::operation(vec![CdcOp::Insert]),
3693 Predicate::field_equals("status", json!("active")),
3694 ]);
3695 assert!(predicate.matches(&event));
3696 }
3697
3698 #[test]
3699 fn test_predicate_or() {
3700 let event = make_event(CdcOp::Delete, None, Some(json!({"id": 1})));
3701
3702 let predicate = Predicate::Or(vec![
3703 Predicate::operation(vec![CdcOp::Insert]),
3704 Predicate::operation(vec![CdcOp::Delete]),
3705 ]);
3706 assert!(predicate.matches(&event));
3707 }
3708
3709 #[test]
3710 fn test_predicate_not() {
3711 let event = make_event(CdcOp::Insert, None, Some(json!({"id": 1})));
3712
3713 let predicate = Predicate::Not(Box::new(Predicate::operation(vec![CdcOp::Delete])));
3714 assert!(predicate.matches(&event));
3715 }
3716
3717 #[test]
3718 fn test_conditional_smt_when() {
3719 let event = make_event(CdcOp::Insert, None, Some(json!({"name": "Alice"})));
3720
3721 let smt = ConditionalSmt::when(
3723 Predicate::operation(vec![CdcOp::Insert]),
3724 InsertField::new().static_field("_inserted", json!(true)),
3725 );
3726
3727 let result = smt.apply(event).unwrap();
3728 let after = result.after.unwrap();
3729 assert_eq!(after["_inserted"], true);
3730 }
3731
3732 #[test]
3733 fn test_conditional_smt_unless() {
3734 let event = make_event(CdcOp::Delete, Some(json!({"id": 1})), None);
3735
3736 let smt = ConditionalSmt::unless(
3738 Predicate::operation(vec![CdcOp::Delete]),
3739 InsertField::new().static_field("_processed", json!(true)),
3740 );
3741
3742 let result = smt.apply(event).unwrap();
3743 assert!(result.after.is_none());
3745 }
3746
3747 #[test]
3748 fn test_conditional_smt_table_predicate() {
3749 let event = make_event(CdcOp::Insert, None, Some(json!({"ssn": "123-45-6789"})));
3750
3751 let smt = ConditionalSmt::when(
3753 Predicate::table(r"^users$").unwrap(),
3754 MaskField::new(["ssn"]),
3755 );
3756
3757 let result = smt.apply(event).unwrap();
3758 let after = result.after.unwrap();
3759 assert_eq!(after["ssn"], "***********");
3760 }
3761
3762 #[test]
3764 fn test_header_to_value() {
3765 let event = make_event(CdcOp::Insert, None, Some(json!({"id": 1})));
3766
3767 let smt = HeaderToValue::new()
3768 .field("_source", HeaderSource::SourceType)
3769 .field("_table", HeaderSource::Table)
3770 .field("_op", HeaderSource::Operation);
3771
3772 let result = smt.apply(event).unwrap();
3773 let after = result.after.unwrap();
3774
3775 assert_eq!(after["_source"], "postgres");
3776 assert_eq!(after["_table"], "users");
3777 assert_eq!(after["_op"], "c");
3778 }
3779
3780 #[test]
3781 fn test_header_to_value_all() {
3782 let event = make_event(CdcOp::Update, None, Some(json!({"id": 1})));
3783
3784 let smt = HeaderToValue::new().all_headers("__");
3785
3786 let result = smt.apply(event).unwrap();
3787 let after = result.after.unwrap();
3788
3789 assert_eq!(after["__source_type"], "postgres");
3790 assert_eq!(after["__database"], "testdb");
3791 assert_eq!(after["__schema"], "public");
3792 assert_eq!(after["__table"], "users");
3793 assert_eq!(after["__op"], "u");
3794 assert!(after["__ts"].is_number());
3795 }
3796
3797 #[test]
3799 fn test_unwrap_nested() {
3800 let event = make_event(
3801 CdcOp::Insert,
3802 None,
3803 Some(json!({
3804 "payload": {
3805 "data": {
3806 "name": "Alice",
3807 "email": "alice@test.com"
3808 }
3809 }
3810 })),
3811 );
3812
3813 let smt = Unwrap::new("payload.data");
3814 let result = smt.apply(event).unwrap();
3815 let after = result.after.unwrap();
3816
3817 assert_eq!(after["name"], "Alice");
3818 assert_eq!(after["email"], "alice@test.com");
3819 assert!(after.get("payload").is_none());
3821 }
3822
3823 #[test]
3824 fn test_unwrap_merge() {
3825 let event = make_event(
3826 CdcOp::Insert,
3827 None,
3828 Some(json!({
3829 "id": 1,
3830 "nested": {
3831 "name": "Alice"
3832 }
3833 })),
3834 );
3835
3836 let smt = Unwrap::new("nested").merge();
3837 let result = smt.apply(event).unwrap();
3838 let after = result.after.unwrap();
3839
3840 assert_eq!(after["id"], 1);
3841 assert_eq!(after["name"], "Alice");
3842 }
3843
3844 #[test]
3846 fn test_set_null_always() {
3847 let event = make_event(
3848 CdcOp::Insert,
3849 None,
3850 Some(json!({"name": "Alice", "password": "secret"})),
3851 );
3852
3853 let smt = SetNull::new(["password"]);
3854 let result = smt.apply(event).unwrap();
3855 let after = result.after.unwrap();
3856
3857 assert_eq!(after["name"], "Alice");
3858 assert!(after["password"].is_null());
3859 }
3860
3861 #[test]
3862 fn test_set_null_if_empty() {
3863 let event = make_event(
3864 CdcOp::Insert,
3865 None,
3866 Some(json!({"name": "Alice", "nickname": ""})),
3867 );
3868
3869 let smt = SetNull::new(["nickname"]).when(NullCondition::IfEmpty);
3870 let result = smt.apply(event).unwrap();
3871 let after = result.after.unwrap();
3872
3873 assert!(after["nickname"].is_null());
3874 }
3875
3876 #[test]
3877 fn test_set_null_if_equals() {
3878 let event = make_event(
3879 CdcOp::Insert,
3880 None,
3881 Some(json!({"status": "N/A", "name": "Alice"})),
3882 );
3883
3884 let smt = SetNull::new(["status"]).when(NullCondition::IfEquals(json!("N/A")));
3885 let result = smt.apply(event).unwrap();
3886 let after = result.after.unwrap();
3887
3888 assert!(after["status"].is_null());
3889 assert_eq!(after["name"], "Alice");
3890 }
3891
3892 #[test]
3893 fn test_set_null_if_matches() {
3894 let event = make_event(
3895 CdcOp::Insert,
3896 None,
3897 Some(json!({"phone": "000-000-0000", "name": "Alice"})),
3898 );
3899
3900 let smt = SetNull::new(["phone"]).when(NullCondition::IfMatches(r"^0+-0+-0+$".to_string()));
3901 let result = smt.apply(event).unwrap();
3902 let after = result.after.unwrap();
3903
3904 assert!(after["phone"].is_null());
3905 }
3906
3907 #[test]
3909 fn test_advanced_chain_with_predicates() {
3910 let event = make_event(
3911 CdcOp::Insert,
3912 None,
3913 Some(json!({"name": "Alice", "ssn": "123-45-6789", "status": "active"})),
3914 );
3915
3916 let chain = SmtChain::new()
3917 .add(ConditionalSmt::when(
3919 Predicate::operation(vec![CdcOp::Insert]),
3920 MaskField::new(["ssn"]),
3921 ))
3922 .add(
3924 HeaderToValue::new()
3925 .field("_table", HeaderSource::Table)
3926 .field("_op", HeaderSource::Operation),
3927 )
3928 .add(InsertField::new().static_field("_version", json!(1)));
3930
3931 let result = chain.apply(event).unwrap();
3932 let after = result.after.unwrap();
3933
3934 assert_eq!(after["name"], "Alice");
3935 assert_eq!(after["ssn"], "***********"); assert_eq!(after["_table"], "users");
3937 assert_eq!(after["_op"], "c");
3938 assert_eq!(after["_version"], 1);
3939 }
3940
3941 #[test]
3946 fn test_timezone_converter_utc_to_est() {
3947 let event = make_event(
3948 CdcOp::Insert,
3949 None,
3950 Some(json!({"created_at": "2024-01-15T15:00:00Z"})),
3951 );
3952
3953 let smt = TimezoneConverter::new(["created_at"])
3954 .from("UTC")
3955 .to("America/New_York");
3956
3957 let result = smt.apply(event).unwrap();
3958 let after = result.after.unwrap();
3959
3960 let ts = after["created_at"].as_str().unwrap();
3962 assert!(ts.contains("10:00:00"));
3963 assert!(ts.contains("-05:00") || ts.contains("New_York"));
3964 }
3965
3966 #[test]
3967 fn test_timezone_converter_date_only() {
3968 let event = make_event(
3969 CdcOp::Insert,
3970 None,
3971 Some(json!({"created_at": "2024-01-15T23:00:00Z"})),
3972 );
3973
3974 let smt = TimezoneConverter::new(["created_at"])
3975 .from("UTC")
3976 .to("America/Los_Angeles")
3977 .date_only();
3978
3979 let result = smt.apply(event).unwrap();
3980 let after = result.after.unwrap();
3981
3982 assert_eq!(after["created_at"], "2024-01-15");
3984 }
3985
3986 #[test]
3987 fn test_timezone_converter_custom_format() {
3988 let event = make_event(
3989 CdcOp::Insert,
3990 None,
3991 Some(json!({"created_at": "2024-01-15T12:30:00Z"})),
3992 );
3993
3994 let smt = TimezoneConverter::new(["created_at"])
3995 .from("UTC")
3996 .to("Europe/London")
3997 .format("%Y-%m-%d %H:%M");
3998
3999 let result = smt.apply(event).unwrap();
4000 let after = result.after.unwrap();
4001
4002 assert_eq!(after["created_at"], "2024-01-15 12:30");
4004 }
4005
4006 #[test]
4007 fn test_timezone_converter_epoch_input() {
4008 let event = make_event(
4009 CdcOp::Insert,
4010 None,
4011 Some(json!({"created_at": 1705320000000i64})), );
4013
4014 let smt = TimezoneConverter::new(["created_at"])
4015 .from("UTC")
4016 .to("Asia/Tokyo"); let result = smt.apply(event).unwrap();
4019 let after = result.after.unwrap();
4020
4021 let ts = after["created_at"].as_str().unwrap();
4023 assert!(ts.contains("2024-01-15") && ts.contains("+09:00"));
4025 }
4026
4027 #[test]
4028 fn test_timezone_converter_multiple_fields() {
4029 let event = make_event(
4030 CdcOp::Insert,
4031 None,
4032 Some(json!({
4033 "created_at": "2024-01-15T10:00:00Z",
4034 "updated_at": "2024-01-15T15:30:00Z"
4035 })),
4036 );
4037
4038 let smt = TimezoneConverter::new(["created_at", "updated_at"])
4039 .from("UTC")
4040 .to("Europe/Paris"); let result = smt.apply(event).unwrap();
4043 let after = result.after.unwrap();
4044
4045 assert!(after["created_at"].as_str().unwrap().contains("11:00:00"));
4047 assert!(after["updated_at"].as_str().unwrap().contains("16:30:00"));
4049 }
4050
4051 #[test]
4056 fn test_content_router_exact_match() {
4057 let event = make_event(
4058 CdcOp::Insert,
4059 None,
4060 Some(json!({"priority": "high", "message": "urgent"})),
4061 );
4062
4063 let smt = ContentRouter::new()
4064 .route("priority", "high", "priority-events")
4065 .route("priority", "low", "batch-events")
4066 .default_topic("default-events");
4067
4068 let result = smt.apply(event).unwrap();
4069 let after = result.after.unwrap();
4070
4071 assert_eq!(after["__routing_topic"], "priority-events");
4072 }
4073
4074 #[test]
4075 fn test_content_router_pattern_match() {
4076 let event = make_event(
4077 CdcOp::Insert,
4078 None,
4079 Some(json!({"category": "urgent-alert-123", "data": "test"})),
4080 );
4081
4082 let smt = ContentRouter::new()
4083 .route_pattern("category", r"^urgent", "urgent-events")
4084 .route_pattern("category", r"^normal", "normal-events");
4085
4086 let result = smt.apply(event).unwrap();
4087 let after = result.after.unwrap();
4088
4089 assert_eq!(after["__routing_topic"], "urgent-events");
4090 }
4091
4092 #[test]
4093 fn test_content_router_in_set() {
4094 let event = make_event(
4095 CdcOp::Insert,
4096 None,
4097 Some(json!({"status": "active", "name": "Alice"})),
4098 );
4099
4100 let smt = ContentRouter::new()
4101 .route_in("status", ["active", "pending"], "active-events")
4102 .route_in("status", ["archived", "deleted"], "archive-events");
4103
4104 let result = smt.apply(event).unwrap();
4105 let after = result.after.unwrap();
4106
4107 assert_eq!(after["__routing_topic"], "active-events");
4108 }
4109
4110 #[test]
4111 fn test_content_router_predicate() {
4112 let event = make_event(
4113 CdcOp::Insert,
4114 None,
4115 Some(json!({"amount": 1500, "currency": "USD"})),
4116 );
4117
4118 let smt = ContentRouter::new()
4119 .route_if(
4120 "amount",
4121 |v| v.as_i64().map(|n| n > 1000).unwrap_or(false),
4122 "high-value",
4123 )
4124 .route_if(
4125 "amount",
4126 |v| v.as_i64().map(|n| n <= 1000).unwrap_or(false),
4127 "normal-value",
4128 );
4129
4130 let result = smt.apply(event).unwrap();
4131 let after = result.after.unwrap();
4132
4133 assert_eq!(after["__routing_topic"], "high-value");
4134 }
4135
4136 #[test]
4137 fn test_content_router_default_topic() {
4138 let event = make_event(
4139 CdcOp::Insert,
4140 None,
4141 Some(json!({"type": "unknown", "data": "test"})),
4142 );
4143
4144 let smt = ContentRouter::new()
4145 .route("type", "order", "order-events")
4146 .route("type", "user", "user-events")
4147 .default_topic("other-events");
4148
4149 let result = smt.apply(event).unwrap();
4150 let after = result.after.unwrap();
4151
4152 assert_eq!(after["__routing_topic"], "other-events");
4153 }
4154
4155 #[test]
4156 fn test_content_router_custom_field() {
4157 let event = make_event(
4158 CdcOp::Insert,
4159 None,
4160 Some(json!({"priority": "high", "message": "test"})),
4161 );
4162
4163 let smt = ContentRouter::new()
4164 .route("priority", "high", "priority-events")
4165 .topic_field("_target_topic");
4166
4167 let result = smt.apply(event).unwrap();
4168 let after = result.after.unwrap();
4169
4170 assert_eq!(after["_target_topic"], "priority-events");
4171 assert!(after.get("__routing_topic").is_none());
4172 }
4173
4174 #[test]
4179 fn test_compute_field_concat() {
4180 let event = make_event(
4181 CdcOp::Insert,
4182 None,
4183 Some(json!({"first_name": "Alice", "last_name": "Smith"})),
4184 );
4185
4186 let smt = ComputeField::new().concat("full_name", ["$first_name", " ", "$last_name"]);
4187
4188 let result = smt.apply(event).unwrap();
4189 let after = result.after.unwrap();
4190
4191 assert_eq!(after["full_name"], "Alice Smith");
4192 }
4193
4194 #[test]
4195 fn test_compute_field_hash() {
4196 let event = make_event(
4197 CdcOp::Insert,
4198 None,
4199 Some(json!({"id": 123, "email": "alice@test.com"})),
4200 );
4201
4202 let smt = ComputeField::new().hash("user_hash", ["id", "email"]);
4203
4204 let result = smt.apply(event).unwrap();
4205 let after = result.after.unwrap();
4206
4207 let hash = after["user_hash"].as_str().unwrap();
4209 assert_eq!(hash.len(), 64); assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
4211 }
4212
4213 #[test]
4214 fn test_compute_field_coalesce() {
4215 let event = make_event(
4216 CdcOp::Insert,
4217 None,
4218 Some(json!({"nickname": null, "username": "alice123", "email": "alice@test.com"})),
4219 );
4220
4221 let smt = ComputeField::new().coalesce("display_name", ["nickname", "username", "email"]);
4222
4223 let result = smt.apply(event).unwrap();
4224 let after = result.after.unwrap();
4225
4226 assert_eq!(after["display_name"], "alice123");
4227 }
4228
4229 #[test]
4230 fn test_compute_field_sum() {
4231 let event = make_event(
4232 CdcOp::Insert,
4233 None,
4234 Some(json!({"price": 100.0, "tax": 10.0, "shipping": 5.0})),
4235 );
4236
4237 let smt = ComputeField::new().sum("total", ["price", "tax", "shipping"]);
4238
4239 let result = smt.apply(event).unwrap();
4240 let after = result.after.unwrap();
4241
4242 assert_eq!(after["total"].as_f64().unwrap(), 115.0);
4243 }
4244
4245 #[test]
4246 fn test_compute_field_length() {
4247 let event = make_event(CdcOp::Insert, None, Some(json!({"name": "Alice"})));
4248
4249 let smt = ComputeField::new().length("name_length", "name");
4250
4251 let result = smt.apply(event).unwrap();
4252 let after = result.after.unwrap();
4253
4254 assert_eq!(after["name_length"], 5);
4255 }
4256
4257 #[test]
4258 fn test_compute_field_upper_lower() {
4259 let event = make_event(CdcOp::Insert, None, Some(json!({"name": "Alice Smith"})));
4260
4261 let smt = ComputeField::new()
4262 .upper("name_upper", "name")
4263 .lower("name_lower", "name");
4264
4265 let result = smt.apply(event).unwrap();
4266 let after = result.after.unwrap();
4267
4268 assert_eq!(after["name_upper"], "ALICE SMITH");
4269 assert_eq!(after["name_lower"], "alice smith");
4270 }
4271
4272 #[test]
4273 fn test_compute_field_substring() {
4274 let event = make_event(
4275 CdcOp::Insert,
4276 None,
4277 Some(json!({"phone": "+1-555-123-4567"})),
4278 );
4279
4280 let smt = ComputeField::new().substring("area_code", "phone", 3, Some(3));
4281
4282 let result = smt.apply(event).unwrap();
4283 let after = result.after.unwrap();
4284
4285 assert_eq!(after["area_code"], "555");
4286 }
4287
4288 #[test]
4289 fn test_compute_field_replace() {
4290 let event = make_event(CdcOp::Insert, None, Some(json!({"phone": "555-123-4567"})));
4291
4292 let smt = ComputeField::new().replace("phone_clean", "phone", r"-", "");
4293
4294 let result = smt.apply(event).unwrap();
4295 let after = result.after.unwrap();
4296
4297 assert_eq!(after["phone_clean"], "5551234567");
4298 }
4299
4300 #[test]
4301 fn test_compute_field_current_timestamp() {
4302 let event = make_event(CdcOp::Insert, None, Some(json!({"name": "Alice"})));
4303
4304 let smt = ComputeField::new().current_timestamp("processed_at");
4305
4306 let result = smt.apply(event).unwrap();
4307 let after = result.after.unwrap();
4308
4309 assert!(after["processed_at"].as_str().unwrap().contains("20"));
4311 }
4312
4313 #[test]
4314 fn test_compute_field_uuid() {
4315 let event = make_event(CdcOp::Insert, None, Some(json!({"name": "Alice"})));
4316
4317 let smt = ComputeField::new().uuid("request_id");
4318
4319 let result = smt.apply(event).unwrap();
4320 let after = result.after.unwrap();
4321
4322 let uuid = after["request_id"].as_str().unwrap();
4324 let parts: Vec<&str> = uuid.split('-').collect();
4325 assert_eq!(parts.len(), 5);
4326 assert_eq!(parts[0].len(), 8);
4327 assert_eq!(parts[1].len(), 4);
4328 assert_eq!(parts[2].len(), 4);
4329 assert_eq!(parts[3].len(), 4);
4330 assert_eq!(parts[4].len(), 12);
4331 }
4332
4333 #[test]
4334 fn test_compute_field_json_path() {
4335 let event = make_event(
4336 CdcOp::Insert,
4337 None,
4338 Some(json!({
4339 "metadata": {
4340 "user": {
4341 "name": "Alice"
4342 }
4343 }
4344 })),
4345 );
4346
4347 let smt = ComputeField::new().json_path("user_name", "metadata", "user.name");
4348
4349 let result = smt.apply(event).unwrap();
4350 let after = result.after.unwrap();
4351
4352 assert_eq!(after["user_name"], "Alice");
4353 }
4354
4355 #[test]
4356 fn test_compute_field_conditional() {
4357 let event = make_event(
4358 CdcOp::Insert,
4359 None,
4360 Some(json!({"status": "active", "name": "Alice"})),
4361 );
4362
4363 let smt = ComputeField::new().conditional(
4364 "is_active",
4365 ComputeCondition::FieldEquals("status".to_string(), json!("active")),
4366 json!(true),
4367 json!(false),
4368 );
4369
4370 let result = smt.apply(event).unwrap();
4371 let after = result.after.unwrap();
4372
4373 assert_eq!(after["is_active"], true);
4374 }
4375
4376 #[test]
4377 fn test_compute_field_conditional_null_check() {
4378 let event = make_event(
4379 CdcOp::Insert,
4380 None,
4381 Some(json!({"name": "Alice", "deleted_at": null})),
4382 );
4383
4384 let smt = ComputeField::new().conditional(
4385 "status",
4386 ComputeCondition::FieldIsNull("deleted_at".to_string()),
4387 json!("active"),
4388 json!("deleted"),
4389 );
4390
4391 let result = smt.apply(event).unwrap();
4392 let after = result.after.unwrap();
4393
4394 assert_eq!(after["status"], "active");
4395 }
4396
4397 #[test]
4398 fn test_compute_field_conditional_pattern() {
4399 let event = make_event(
4400 CdcOp::Insert,
4401 None,
4402 Some(json!({"email": "admin@company.com", "name": "Admin"})),
4403 );
4404
4405 let smt = ComputeField::new().conditional(
4406 "is_admin",
4407 ComputeCondition::FieldMatches("email".to_string(), r"^admin@".to_string()),
4408 json!(true),
4409 json!(false),
4410 );
4411
4412 let result = smt.apply(event).unwrap();
4413 let after = result.after.unwrap();
4414
4415 assert_eq!(after["is_admin"], true);
4416 }
4417
4418 #[test]
4419 fn test_compute_field_chain() {
4420 let event = make_event(
4421 CdcOp::Insert,
4422 None,
4423 Some(json!({
4424 "first_name": "alice",
4425 "last_name": "smith",
4426 "amount": 100.0
4427 })),
4428 );
4429
4430 let smt = ComputeField::new()
4431 .upper("first_upper", "first_name")
4432 .upper("last_upper", "last_name")
4433 .concat("full_name", ["$first_upper", " ", "$last_upper"])
4434 .sum("total_with_tax", ["amount"]);
4435
4436 let result = smt.apply(event).unwrap();
4437 let after = result.after.unwrap();
4438
4439 assert_eq!(after["first_upper"], "ALICE");
4440 assert_eq!(after["last_upper"], "SMITH");
4441 assert_eq!(after["full_name"], "ALICE SMITH");
4443 }
4444
4445 #[test]
4450 fn test_full_pipeline_with_new_smts() {
4451 let event = make_event(
4452 CdcOp::Insert,
4453 None,
4454 Some(json!({
4455 "first_name": "Alice",
4456 "last_name": "Smith",
4457 "email": "alice@test.com",
4458 "priority": "high",
4459 "created_at": "2024-01-15T10:00:00Z"
4460 })),
4461 );
4462
4463 let chain = SmtChain::new()
4464 .add(
4466 ComputeField::new()
4467 .concat("full_name", ["$first_name", " ", "$last_name"])
4468 .hash("email_hash", ["email"])
4469 .uuid("event_id"),
4470 )
4471 .add(MaskField::new(["email"]))
4473 .add(
4475 ContentRouter::new()
4476 .route("priority", "high", "priority-events")
4477 .default_topic("normal-events"),
4478 )
4479 .add(
4481 TimezoneConverter::new(["created_at"])
4482 .from("UTC")
4483 .to("America/New_York"),
4484 );
4485
4486 let result = chain.apply(event).unwrap();
4487 let after = result.after.unwrap();
4488
4489 assert_eq!(after["full_name"], "Alice Smith");
4491 assert_eq!(after["email_hash"].as_str().unwrap().len(), 64);
4492 assert!(after["event_id"].as_str().unwrap().contains("-"));
4493 assert_ne!(after["email"], "alice@test.com"); assert_eq!(after["__routing_topic"], "priority-events");
4495 let ts = after["created_at"].as_str().unwrap();
4497 assert!(ts.contains("05:00:00") || ts.contains("-05:00"));
4498 }
4499}
4500
4501#[cfg(all(test, feature = "cloud-storage"))]
4506mod externalize_blob_tests {
4507 use super::*;
4508 use serde_json::json;
4509 use tempfile::TempDir;
4510
4511 fn make_event(op: CdcOp, before: Option<Value>, after: Option<Value>) -> CdcEvent {
4512 CdcEvent {
4513 source_type: "postgres".to_string(),
4514 database: "testdb".to_string(),
4515 schema: "public".to_string(),
4516 table: "users".to_string(),
4517 op,
4518 before,
4519 after,
4520 timestamp: chrono::Utc::now().timestamp(),
4521 transaction: None,
4522 }
4523 }
4524
4525 #[test]
4526 fn test_externalize_small_value_unchanged() {
4527 let temp_dir = TempDir::new().unwrap();
4528 let smt = ExternalizeBlob::local(temp_dir.path())
4529 .unwrap()
4530 .size_threshold(1000); let event = make_event(
4533 CdcOp::Insert,
4534 None,
4535 Some(json!({
4536 "id": 1,
4537 "name": "Alice",
4538 "small_data": "short string"
4539 })),
4540 );
4541
4542 let result = smt.apply(event).unwrap();
4543 let after = result.after.unwrap();
4544
4545 assert_eq!(after["small_data"], "short string");
4547 assert!(after["small_data"].get("__externalized").is_none());
4548 }
4549
4550 #[test]
4551 fn test_externalize_large_value() {
4552 let temp_dir = TempDir::new().unwrap();
4553 let smt = ExternalizeBlob::local(temp_dir.path())
4554 .unwrap()
4555 .size_threshold(50); let large_data = "A".repeat(100);
4559
4560 let event = make_event(
4561 CdcOp::Insert,
4562 None,
4563 Some(json!({
4564 "id": 1,
4565 "blob_data": large_data
4566 })),
4567 );
4568
4569 let result = smt.apply(event).unwrap();
4570 let after = result.after.unwrap();
4571
4572 let blob_ref = after.get("blob_data").unwrap();
4574 assert_eq!(blob_ref["__externalized"], true);
4575 assert!(blob_ref["url"].as_str().unwrap().starts_with("file://"));
4576 assert!(blob_ref["size"].as_u64().unwrap() > 0);
4577 assert!(blob_ref["sha256"].as_str().unwrap().len() == 64);
4578 assert_eq!(blob_ref["content_type"], "application/octet-stream");
4579 }
4580
4581 #[test]
4582 fn test_externalize_specific_fields_only() {
4583 let temp_dir = TempDir::new().unwrap();
4584 let smt = ExternalizeBlob::local(temp_dir.path())
4585 .unwrap()
4586 .size_threshold(10)
4587 .fields(["image_data"]); let event = make_event(
4590 CdcOp::Insert,
4591 None,
4592 Some(json!({
4593 "id": 1,
4594 "name": "very long name that exceeds threshold",
4595 "image_data": "large image data here..."
4596 })),
4597 );
4598
4599 let result = smt.apply(event).unwrap();
4600 let after = result.after.unwrap();
4601
4602 assert_eq!(after["name"], "very long name that exceeds threshold");
4604
4605 let image_ref = after.get("image_data").unwrap();
4607 assert_eq!(image_ref["__externalized"], true);
4608 }
4609
4610 #[test]
4611 fn test_externalize_with_prefix() {
4612 let temp_dir = TempDir::new().unwrap();
4613 let smt = ExternalizeBlob::local(temp_dir.path())
4614 .unwrap()
4615 .size_threshold(10)
4616 .prefix("cdc-blobs/production");
4617
4618 let large_data = "B".repeat(100);
4619 let event = make_event(
4620 CdcOp::Insert,
4621 None,
4622 Some(json!({
4623 "document": large_data
4624 })),
4625 );
4626
4627 let result = smt.apply(event).unwrap();
4628 let after = result.after.unwrap();
4629
4630 let doc_ref = after.get("document").unwrap();
4631 let url = doc_ref["url"].as_str().unwrap();
4632 assert!(url.contains("cdc-blobs/production"));
4633 assert!(url.contains("users")); assert!(url.contains("document")); }
4636
4637 #[test]
4638 fn test_externalize_base64_data() {
4639 use base64::Engine;
4640
4641 let temp_dir = TempDir::new().unwrap();
4642 let smt = ExternalizeBlob::local(temp_dir.path())
4643 .unwrap()
4644 .size_threshold(10);
4645
4646 let raw_bytes: Vec<u8> = (0..100).collect();
4648 let base64_data = base64::engine::general_purpose::STANDARD.encode(&raw_bytes);
4649
4650 let event = make_event(
4651 CdcOp::Insert,
4652 None,
4653 Some(json!({
4654 "binary_field": base64_data
4655 })),
4656 );
4657
4658 let result = smt.apply(event).unwrap();
4659 let after = result.after.unwrap();
4660
4661 let blob_ref = after.get("binary_field").unwrap();
4662 assert_eq!(blob_ref["__externalized"], true);
4663 assert_eq!(blob_ref["size"].as_u64().unwrap(), 100);
4665 }
4666
4667 #[test]
4668 fn test_externalize_json_object() {
4669 let temp_dir = TempDir::new().unwrap();
4670 let smt = ExternalizeBlob::local(temp_dir.path())
4671 .unwrap()
4672 .size_threshold(20);
4673
4674 let large_obj = json!({
4676 "nested": {
4677 "data": "large nested object with lots of data here",
4678 "more": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
4679 }
4680 });
4681
4682 let event = make_event(
4683 CdcOp::Insert,
4684 None,
4685 Some(json!({
4686 "id": 1,
4687 "metadata": large_obj
4688 })),
4689 );
4690
4691 let result = smt.apply(event).unwrap();
4692 let after = result.after.unwrap();
4693
4694 let meta_ref = after.get("metadata").unwrap();
4695 assert_eq!(meta_ref["__externalized"], true);
4696 assert_eq!(meta_ref["content_type"], "application/json");
4697 }
4698
4699 #[test]
4700 fn test_externalize_update_event_both_before_after() {
4701 let temp_dir = TempDir::new().unwrap();
4702 let smt = ExternalizeBlob::local(temp_dir.path())
4703 .unwrap()
4704 .size_threshold(20);
4705
4706 let old_data = "X".repeat(50);
4707 let new_data = "Y".repeat(50);
4708
4709 let event = make_event(
4710 CdcOp::Update,
4711 Some(json!({ "data": old_data })),
4712 Some(json!({ "data": new_data })),
4713 );
4714
4715 let result = smt.apply(event).unwrap();
4716
4717 let before = result.before.unwrap();
4719 let after = result.after.unwrap();
4720
4721 assert_eq!(before["data"]["__externalized"], true);
4722 assert_eq!(after["data"]["__externalized"], true);
4723
4724 let before_url = before["data"]["url"].as_str().unwrap();
4726 let after_url = after["data"]["url"].as_str().unwrap();
4727 assert_ne!(before_url, after_url);
4728 }
4729
4730 #[test]
4731 fn test_externalize_preserves_other_fields() {
4732 let temp_dir = TempDir::new().unwrap();
4733 let smt = ExternalizeBlob::local(temp_dir.path())
4734 .unwrap()
4735 .size_threshold(20);
4736
4737 let large_data = "Z".repeat(100);
4738 let event = make_event(
4739 CdcOp::Insert,
4740 None,
4741 Some(json!({
4742 "id": 123,
4743 "name": "Alice",
4744 "active": true,
4745 "score": 95.5,
4746 "large_blob": large_data
4747 })),
4748 );
4749
4750 let result = smt.apply(event).unwrap();
4751 let after = result.after.unwrap();
4752
4753 assert_eq!(after["id"], 123);
4755 assert_eq!(after["name"], "Alice");
4756 assert_eq!(after["active"], true);
4757 assert_eq!(after["score"], 95.5);
4758
4759 assert_eq!(after["large_blob"]["__externalized"], true);
4761 }
4762
4763 #[test]
4764 fn test_externalize_smt_name() {
4765 let temp_dir = TempDir::new().unwrap();
4766 let smt = ExternalizeBlob::local(temp_dir.path()).unwrap();
4767 assert_eq!(smt.name(), "ExternalizeBlob");
4768 }
4769
4770 #[test]
4771 fn test_externalize_chain_with_other_smts() {
4772 let temp_dir = TempDir::new().unwrap();
4773
4774 let chain = SmtChain::new()
4775 .add(MaskField::new(["ssn"]))
4777 .add(
4779 ExternalizeBlob::local(temp_dir.path())
4780 .unwrap()
4781 .size_threshold(20),
4782 )
4783 .add(ComputeField::new().uuid("event_id"));
4785
4786 let large_data = "A".repeat(50);
4787 let event = make_event(
4788 CdcOp::Insert,
4789 None,
4790 Some(json!({
4791 "id": 1,
4792 "ssn": "123-45-6789",
4793 "document": large_data
4794 })),
4795 );
4796
4797 let result = chain.apply(event).unwrap();
4798 let after = result.after.unwrap();
4799
4800 assert_ne!(after["ssn"], "123-45-6789");
4802 assert_eq!(after["document"]["__externalized"], true);
4804 assert!(after["event_id"].as_str().unwrap().contains("-"));
4806 }
4807
4808 #[test]
4809 fn test_externalize_file_written_to_disk() {
4810 use base64::Engine;
4811
4812 let temp_dir = TempDir::new().unwrap();
4813 let smt = ExternalizeBlob::local(temp_dir.path())
4814 .unwrap()
4815 .size_threshold(10);
4816
4817 let raw_bytes: Vec<u8> = vec![
4818 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x21,
4819 ]; let base64_data = base64::engine::general_purpose::STANDARD.encode(&raw_bytes);
4821
4822 let event = make_event(
4823 CdcOp::Insert,
4824 None,
4825 Some(json!({
4826 "greeting": base64_data
4827 })),
4828 );
4829
4830 let result = smt.apply(event).unwrap();
4831 let after = result.after.unwrap();
4832
4833 let blob_ref = after.get("greeting").unwrap();
4834 let url = blob_ref["url"].as_str().unwrap();
4835
4836 let file_path = url.strip_prefix("file://").unwrap();
4838 assert!(std::path::Path::new(file_path).exists());
4840
4841 let content = std::fs::read(file_path).unwrap();
4843 assert_eq!(content, raw_bytes);
4844 }
4845}