Skip to main content

rivven_cdc/common/
smt.rs

1//! # Single Message Transforms (SMTs)
2//!
3//! Message transformations for CDC events.
4//!
5//! ## Built-in Transforms (17 configurable)
6//!
7//! | Transform | Description |
8//! |-----------|-------------|
9//! | `ExtractNewRecordState` | Flatten envelope, extract "after" state |
10//! | `ValueToKey` | Extract key fields from value |
11//! | `TimestampConverter` | Convert timestamps (ISO8601, epoch, date) |
12//! | `TimezoneConverter` | Convert between timezones (IANA names) |
13//! | `MaskField` | Mask sensitive fields |
14//! | `ReplaceField` | Rename, include, exclude fields |
15//! | `InsertField` | Add static or computed fields |
16//! | `Filter` | Drop events based on condition |
17//! | `Cast` | Convert field types |
18//! | `Flatten` | Flatten nested structures |
19//! | `RegexRouter` | Route based on regex patterns |
20//! | `ContentRouter` | Route based on field values |
21//! | `ComputeField` | Compute new fields (concat, hash, etc.) |
22//! | `HeaderToValue` | Move envelope fields into record |
23//! | `Unwrap` | Extract nested field to top level |
24//! | `SetNull` | Conditionally nullify fields |
25//! | `ExternalizeBlob` | Store large blobs in object storage (S3/GCS/Azure) |
26//!
27//! ## Conditional Application (Predicates)
28//!
29//! Any transform can be applied conditionally using predicates.
30//! In YAML, add a `predicate:` block to any transform — the system
31//! internally wraps it with [`ConditionalSmt`].
32//!
33//! ```yaml
34//! transforms:
35//!   - type: mask_field
36//!     predicate:
37//!       table: "users"          # only apply to users table
38//!       operations: [insert, update]  # only on insert/update
39//!     config:
40//!       fields: [ssn, credit_card]
41//! ```
42//!
43//! `ConditionalSmt` is not a user-facing transform type — it is the
44//! internal wrapper that makes predicates work.
45//!
46//! ## Usage
47//!
48//! ```rust,ignore
49//! use rivven_cdc::common::smt::*;
50//!
51//! // Create transform chain
52//! let transforms = SmtChain::new()
53//!     .add(ExtractNewRecordState::new())
54//!     .add(MaskField::new(vec!["ssn", "credit_card"]))
55//!     .add(TimestampConverter::new(["created_at"], TimestampFormat::Iso8601))
56//!     .add(ContentRouter::new()
57//!         .route("priority", "high", "priority-events")
58//!         .default_topic("default-events"));
59//!
60//! // Apply to event
61//! let transformed = transforms.apply(event)?;
62//! ```
63
64use crate::common::{CdcEvent, CdcOp};
65use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
66use regex::Regex;
67use serde_json::{Map, Value};
68use std::collections::{HashMap, HashSet};
69use std::sync::Arc;
70use tracing::warn;
71
72/// Convert CdcOp to standard operation code.
73fn op_to_code(op: &CdcOp) -> &'static str {
74    match op {
75        CdcOp::Insert => "c",    // create
76        CdcOp::Update => "u",    // update
77        CdcOp::Delete => "d",    // delete
78        CdcOp::Tombstone => "d", // tombstone (treated as delete)
79        CdcOp::Truncate => "t",  // truncate
80        CdcOp::Snapshot => "r",  // read (snapshot)
81        CdcOp::Schema => "s",    // schema change (DDL)
82    }
83}
84
85/// Trait for Single Message Transforms.
86pub trait Smt: Send + Sync {
87    /// Transform a CDC event. Returns None to drop the event.
88    fn apply(&self, event: CdcEvent) -> Option<CdcEvent>;
89
90    /// Get the transform name.
91    fn name(&self) -> &'static str;
92}
93
94/// Chain of SMT transforms applied in sequence.
95pub struct SmtChain {
96    transforms: Vec<Arc<dyn Smt>>,
97}
98
99impl Default for SmtChain {
100    fn default() -> Self {
101        Self::new()
102    }
103}
104
105impl SmtChain {
106    /// Create a new empty chain.
107    pub fn new() -> Self {
108        Self {
109            transforms: Vec::new(),
110        }
111    }
112
113    /// Add a transform to the chain (builder pattern).
114    #[allow(clippy::should_implement_trait)] // Builder pattern, not std::ops::Add
115    pub fn add<T: Smt + 'static>(mut self, transform: T) -> Self {
116        self.transforms.push(Arc::new(transform));
117        self
118    }
119
120    /// Add a boxed transform to the chain.
121    pub fn add_boxed(mut self, transform: Arc<dyn Smt>) -> Self {
122        self.transforms.push(transform);
123        self
124    }
125
126    /// Apply all transforms in sequence.
127    pub fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
128        for transform in &self.transforms {
129            event = transform.apply(event)?;
130        }
131        Some(event)
132    }
133
134    /// Get number of transforms.
135    pub fn len(&self) -> usize {
136        self.transforms.len()
137    }
138
139    /// Check if chain is empty.
140    pub fn is_empty(&self) -> bool {
141        self.transforms.is_empty()
142    }
143
144    /// Get transform names.
145    pub fn names(&self) -> Vec<&'static str> {
146        self.transforms.iter().map(|t| t.name()).collect()
147    }
148}
149
150// ============================================================================
151// ExtractNewRecordState - Flatten envelope to just the "after" data
152// ============================================================================
153
154/// Extract new record state - adds envelope fields to the data.
155///
156/// CDC events have an envelope with "before", "after", "op", etc.
157/// This transform can add envelope fields to the "after" data for downstream processing.
158///
159/// # Options
160/// - `drop_tombstones`: Drop delete events entirely (default: false)
161/// - `delete_handling`: How to handle deletes ("drop", "rewrite", "none")
162/// - `add_fields`: Which envelope fields to add to the record
163#[derive(Debug, Clone)]
164pub struct ExtractNewRecordState {
165    /// Drop delete events entirely
166    drop_tombstones: bool,
167    /// How to handle deletes
168    delete_handling: DeleteHandling,
169    /// Add operation field
170    add_op: bool,
171    /// Add table field
172    add_table: bool,
173    /// Add schema field
174    add_schema: bool,
175    /// Add timestamp field
176    add_ts: bool,
177    /// Header prefix for envelope fields
178    header_prefix: String,
179}
180
181/// How to handle delete operations.
182#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
183pub enum DeleteHandling {
184    /// Drop delete events
185    Drop,
186    /// Rewrite delete to include "__deleted" field
187    Rewrite,
188    /// No special handling
189    #[default]
190    None,
191}
192
193impl Default for ExtractNewRecordState {
194    fn default() -> Self {
195        Self::new()
196    }
197}
198
199impl ExtractNewRecordState {
200    /// Create a new extractor with default settings.
201    pub fn new() -> Self {
202        Self {
203            drop_tombstones: false,
204            delete_handling: DeleteHandling::None,
205            add_op: false,
206            add_table: false,
207            add_schema: false,
208            add_ts: false,
209            header_prefix: "__".to_string(),
210        }
211    }
212
213    /// Drop tombstone (delete) events.
214    pub fn drop_tombstones(mut self) -> Self {
215        self.drop_tombstones = true;
216        self
217    }
218
219    /// Set delete handling mode.
220    pub fn delete_handling(mut self, mode: DeleteHandling) -> Self {
221        self.delete_handling = mode;
222        self
223    }
224
225    /// Add operation field to output.
226    pub fn add_op_field(mut self) -> Self {
227        self.add_op = true;
228        self
229    }
230
231    /// Add table field to output.
232    pub fn add_table_field(mut self) -> Self {
233        self.add_table = true;
234        self
235    }
236
237    /// Add schema field to output.
238    pub fn add_schema_field(mut self) -> Self {
239        self.add_schema = true;
240        self
241    }
242
243    /// Add timestamp field to output.
244    pub fn add_ts_field(mut self) -> Self {
245        self.add_ts = true;
246        self
247    }
248
249    /// Set header prefix for added fields.
250    pub fn header_prefix(mut self, prefix: impl Into<String>) -> Self {
251        self.header_prefix = prefix.into();
252        self
253    }
254}
255
256impl Smt for ExtractNewRecordState {
257    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
258        // Handle deletes
259        if event.op == CdcOp::Delete {
260            if self.drop_tombstones {
261                return None;
262            }
263
264            match self.delete_handling {
265                DeleteHandling::Drop => return None,
266                DeleteHandling::Rewrite => {
267                    // Use "before" as the record and add __deleted field
268                    if let Some(before) = &event.before {
269                        let mut record = before.clone();
270                        if let Some(obj) = record.as_object_mut() {
271                            obj.insert("__deleted".to_string(), Value::Bool(true));
272                        }
273                        event.after = Some(record);
274                    }
275                }
276                DeleteHandling::None => {}
277            }
278        }
279
280        // Add envelope fields to "after"
281        if let Some(after) = &mut event.after {
282            if let Some(obj) = after.as_object_mut() {
283                if self.add_op {
284                    obj.insert(
285                        format!("{}op", self.header_prefix),
286                        Value::String(op_to_code(&event.op).to_string()),
287                    );
288                }
289                if self.add_table {
290                    obj.insert(
291                        format!("{}table", self.header_prefix),
292                        Value::String(event.table.clone()),
293                    );
294                }
295                if self.add_schema {
296                    obj.insert(
297                        format!("{}schema", self.header_prefix),
298                        Value::String(event.schema.clone()),
299                    );
300                }
301                if self.add_ts {
302                    obj.insert(
303                        format!("{}ts_ms", self.header_prefix),
304                        Value::Number((event.timestamp * 1000).into()),
305                    );
306                }
307            }
308        }
309
310        Some(event)
311    }
312
313    fn name(&self) -> &'static str {
314        "ExtractNewRecordState"
315    }
316}
317
318// ============================================================================
319// ValueToKey - Extract key fields from value
320// ============================================================================
321
322/// Extract key fields from the value.
323///
324/// Useful for creating a message key from specific value fields.
325/// Stores the key in the "after" data as "__key" field.
326#[derive(Debug, Clone)]
327pub struct ValueToKey {
328    /// Fields to extract for the key
329    fields: Vec<String>,
330}
331
332impl ValueToKey {
333    /// Create a new ValueToKey with specified fields.
334    pub fn new(fields: Vec<String>) -> Self {
335        Self { fields }
336    }
337
338    /// Create from field names.
339    pub fn with_fields<I, S>(fields: I) -> Self
340    where
341        I: IntoIterator<Item = S>,
342        S: Into<String>,
343    {
344        Self {
345            fields: fields.into_iter().map(Into::into).collect(),
346        }
347    }
348
349    /// Extract key from event.
350    pub fn extract_key(&self, event: &CdcEvent) -> Option<Value> {
351        let source = event.after.as_ref().or(event.before.as_ref())?;
352
353        if let Some(obj) = source.as_object() {
354            let mut key_obj = Map::new();
355            for field in &self.fields {
356                if let Some(value) = obj.get(field) {
357                    key_obj.insert(field.clone(), value.clone());
358                }
359            }
360
361            if !key_obj.is_empty() {
362                return Some(Value::Object(key_obj));
363            }
364        }
365
366        None
367    }
368}
369
370impl Smt for ValueToKey {
371    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
372        if let Some(key) = self.extract_key(&event) {
373            // Store key in after data
374            if let Some(after) = &mut event.after {
375                if let Some(obj) = after.as_object_mut() {
376                    obj.insert("__key".to_string(), key);
377                }
378            }
379        }
380        Some(event)
381    }
382
383    fn name(&self) -> &'static str {
384        "ValueToKey"
385    }
386}
387
388// ============================================================================
389// TimestampConverter - Convert timestamp formats
390// ============================================================================
391
392/// Target format for timestamp conversion.
393#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
394pub enum TimestampFormat {
395    /// ISO 8601 string (2024-01-15T10:30:00Z)
396    #[default]
397    Iso8601,
398    /// Unix epoch seconds
399    EpochSeconds,
400    /// Unix epoch milliseconds
401    EpochMillis,
402    /// Unix epoch microseconds
403    EpochMicros,
404    /// Date only (2024-01-15)
405    DateOnly,
406    /// Time only (10:30:00)
407    TimeOnly,
408}
409
410/// Convert timestamp fields between formats.
411#[derive(Debug, Clone)]
412pub struct TimestampConverter {
413    /// Fields to convert
414    fields: Vec<String>,
415    /// Target format
416    target_format: TimestampFormat,
417}
418
419impl TimestampConverter {
420    /// Create a new converter for specific fields.
421    pub fn new<I, S>(fields: I, format: TimestampFormat) -> Self
422    where
423        I: IntoIterator<Item = S>,
424        S: Into<String>,
425    {
426        Self {
427            fields: fields.into_iter().map(Into::into).collect(),
428            target_format: format,
429        }
430    }
431
432    /// Convert a value to the target format.
433    fn convert_value(&self, value: &Value) -> Option<Value> {
434        let timestamp = self.parse_timestamp(value)?;
435
436        Some(match self.target_format {
437            TimestampFormat::Iso8601 => Value::String(timestamp.to_rfc3339()),
438            TimestampFormat::EpochSeconds => Value::Number(timestamp.timestamp().into()),
439            TimestampFormat::EpochMillis => Value::Number(timestamp.timestamp_millis().into()),
440            TimestampFormat::EpochMicros => Value::Number(timestamp.timestamp_micros().into()),
441            TimestampFormat::DateOnly => Value::String(timestamp.format("%Y-%m-%d").to_string()),
442            TimestampFormat::TimeOnly => Value::String(timestamp.format("%H:%M:%S").to_string()),
443        })
444    }
445
446    /// Parse various timestamp formats.
447    fn parse_timestamp(&self, value: &Value) -> Option<DateTime<Utc>> {
448        match value {
449            Value::String(s) => {
450                // Try ISO 8601
451                if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
452                    return Some(dt.with_timezone(&Utc));
453                }
454                // Try common formats
455                if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
456                    return Some(Utc.from_utc_datetime(&dt));
457                }
458                if let Ok(dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
459                    return Some(Utc.from_utc_datetime(&dt));
460                }
461                None
462            }
463            Value::Number(n) => {
464                // Assume milliseconds if large, seconds otherwise
465                let ts = n.as_i64()?;
466                if ts > 1_000_000_000_000 {
467                    // Milliseconds
468                    DateTime::from_timestamp_millis(ts)
469                } else {
470                    // Seconds
471                    DateTime::from_timestamp(ts, 0)
472                }
473            }
474            _ => None,
475        }
476    }
477}
478
479impl Smt for TimestampConverter {
480    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
481        // Convert in "after"
482        if let Some(after) = &mut event.after {
483            if let Some(obj) = after.as_object_mut() {
484                for field in &self.fields {
485                    if let Some(value) = obj.get(field).cloned() {
486                        if let Some(converted) = self.convert_value(&value) {
487                            obj.insert(field.clone(), converted);
488                        }
489                    }
490                }
491            }
492        }
493
494        // Convert in "before"
495        if let Some(before) = &mut event.before {
496            if let Some(obj) = before.as_object_mut() {
497                for field in &self.fields {
498                    if let Some(value) = obj.get(field).cloned() {
499                        if let Some(converted) = self.convert_value(&value) {
500                            obj.insert(field.clone(), converted);
501                        }
502                    }
503                }
504            }
505        }
506
507        Some(event)
508    }
509
510    fn name(&self) -> &'static str {
511        "TimestampConverter"
512    }
513}
514
515// ============================================================================
516// MaskField - Mask sensitive field values
517// ============================================================================
518
519/// Masking strategy for sensitive fields.
520#[derive(Debug, Clone, Default)]
521pub enum MaskStrategy {
522    /// Replace with fixed string
523    Fixed(String),
524    /// Replace with asterisks, keeping length
525    #[default]
526    Asterisks,
527    /// Hash the value (SHA-256, hex)
528    Hash,
529    /// Keep first N characters, mask rest
530    PartialMask { keep_first: usize, keep_last: usize },
531    /// Replace with null
532    Null,
533    /// Redact completely (remove field)
534    Redact,
535}
536
537/// Mask sensitive fields in CDC events.
538#[derive(Debug, Clone)]
539pub struct MaskField {
540    /// Fields to mask
541    fields: HashSet<String>,
542    /// Masking strategy
543    strategy: MaskStrategy,
544}
545
546impl MaskField {
547    /// Create a new masker with default asterisk strategy.
548    pub fn new<I, S>(fields: I) -> Self
549    where
550        I: IntoIterator<Item = S>,
551        S: Into<String>,
552    {
553        Self {
554            fields: fields.into_iter().map(Into::into).collect(),
555            strategy: MaskStrategy::Asterisks,
556        }
557    }
558
559    /// Set masking strategy.
560    pub fn with_strategy(mut self, strategy: MaskStrategy) -> Self {
561        self.strategy = strategy;
562        self
563    }
564
565    /// Mask a value.
566    fn mask_value(&self, value: &Value) -> Value {
567        match &self.strategy {
568            MaskStrategy::Fixed(s) => Value::String(s.clone()),
569            MaskStrategy::Asterisks => {
570                if let Some(s) = value.as_str() {
571                    Value::String("*".repeat(s.len().min(20)))
572                } else {
573                    Value::String("****".to_string())
574                }
575            }
576            MaskStrategy::Hash => {
577                use sha2::{Digest, Sha256};
578                let bytes = serde_json::to_vec(value).unwrap_or_default();
579                let hash = Sha256::digest(&bytes);
580                Value::String(hex::encode(hash))
581            }
582            MaskStrategy::PartialMask {
583                keep_first,
584                keep_last,
585            } => {
586                if let Some(s) = value.as_str() {
587                    let len = s.len();
588                    if len <= keep_first + keep_last {
589                        Value::String("*".repeat(len))
590                    } else {
591                        let first: String = s.chars().take(*keep_first).collect();
592                        let last: String = s.chars().skip(len - keep_last).collect();
593                        let middle = "*".repeat(len - keep_first - keep_last);
594                        Value::String(format!("{}{}{}", first, middle, last))
595                    }
596                } else {
597                    Value::String("****".to_string())
598                }
599            }
600            MaskStrategy::Null => Value::Null,
601            MaskStrategy::Redact => Value::Null, // Will be removed
602        }
603    }
604
605    /// Apply masking to a JSON object.
606    fn mask_object(&self, obj: &mut Map<String, Value>) {
607        for field in &self.fields {
608            if obj.contains_key(field) {
609                if matches!(self.strategy, MaskStrategy::Redact) {
610                    obj.remove(field);
611                } else if let Some(value) = obj.get(field).cloned() {
612                    obj.insert(field.clone(), self.mask_value(&value));
613                }
614            }
615        }
616    }
617}
618
619impl Smt for MaskField {
620    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
621        if let Some(after) = &mut event.after {
622            if let Some(obj) = after.as_object_mut() {
623                self.mask_object(obj);
624            }
625        }
626
627        if let Some(before) = &mut event.before {
628            if let Some(obj) = before.as_object_mut() {
629                self.mask_object(obj);
630            }
631        }
632
633        Some(event)
634    }
635
636    fn name(&self) -> &'static str {
637        "MaskField"
638    }
639}
640
641// ============================================================================
642// ReplaceField - Rename, include, or exclude fields
643// ============================================================================
644
645/// Replace, rename, include, or exclude fields.
646#[derive(Debug, Clone, Default)]
647pub struct ReplaceField {
648    /// Fields to include (whitelist)
649    include: Option<HashSet<String>>,
650    /// Fields to exclude (blacklist)
651    exclude: HashSet<String>,
652    /// Field renames (old -> new)
653    renames: HashMap<String, String>,
654}
655
656impl ReplaceField {
657    /// Create a new empty replacer.
658    pub fn new() -> Self {
659        Self::default()
660    }
661
662    /// Include only these fields (whitelist).
663    pub fn include<I, S>(mut self, fields: I) -> Self
664    where
665        I: IntoIterator<Item = S>,
666        S: Into<String>,
667    {
668        self.include = Some(fields.into_iter().map(Into::into).collect());
669        self
670    }
671
672    /// Exclude these fields (blacklist).
673    pub fn exclude<I, S>(mut self, fields: I) -> Self
674    where
675        I: IntoIterator<Item = S>,
676        S: Into<String>,
677    {
678        self.exclude = fields.into_iter().map(Into::into).collect();
679        self
680    }
681
682    /// Rename a field.
683    pub fn rename(mut self, from: impl Into<String>, to: impl Into<String>) -> Self {
684        self.renames.insert(from.into(), to.into());
685        self
686    }
687
688    /// Apply replacements to an object.
689    fn replace_object(&self, obj: &mut Map<String, Value>) {
690        // Apply renames first
691        for (old, new) in &self.renames {
692            if let Some(value) = obj.remove(old) {
693                obj.insert(new.clone(), value);
694            }
695        }
696
697        // Apply include filter
698        if let Some(include) = &self.include {
699            let keys: Vec<_> = obj.keys().cloned().collect();
700            for key in keys {
701                if !include.contains(&key) {
702                    obj.remove(&key);
703                }
704            }
705        }
706
707        // Apply exclude filter
708        for field in &self.exclude {
709            obj.remove(field);
710        }
711    }
712}
713
714impl Smt for ReplaceField {
715    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
716        if let Some(after) = &mut event.after {
717            if let Some(obj) = after.as_object_mut() {
718                self.replace_object(obj);
719            }
720        }
721
722        if let Some(before) = &mut event.before {
723            if let Some(obj) = before.as_object_mut() {
724                self.replace_object(obj);
725            }
726        }
727
728        Some(event)
729    }
730
731    fn name(&self) -> &'static str {
732        "ReplaceField"
733    }
734}
735
736// ============================================================================
737// InsertField - Add fields with static or computed values
738// ============================================================================
739
740/// Field value source for insertion.
741#[derive(Debug, Clone)]
742pub enum InsertValue {
743    /// Static value
744    Static(Value),
745    /// Current timestamp (ISO 8601)
746    CurrentTimestamp,
747    /// Current date (YYYY-MM-DD)
748    CurrentDate,
749    /// Copy from another field
750    CopyFrom(String),
751}
752
753/// Insert fields with static or computed values.
754#[derive(Debug, Clone)]
755pub struct InsertField {
756    /// Fields to insert
757    fields: Vec<(String, InsertValue)>,
758}
759
760impl InsertField {
761    /// Create a new inserter.
762    pub fn new() -> Self {
763        Self { fields: Vec::new() }
764    }
765
766    /// Add a static field.
767    pub fn static_field(mut self, name: impl Into<String>, value: Value) -> Self {
768        self.fields.push((name.into(), InsertValue::Static(value)));
769        self
770    }
771
772    /// Add current timestamp field.
773    pub fn timestamp_field(mut self, name: impl Into<String>) -> Self {
774        self.fields
775            .push((name.into(), InsertValue::CurrentTimestamp));
776        self
777    }
778
779    /// Add current date field.
780    pub fn date_field(mut self, name: impl Into<String>) -> Self {
781        self.fields.push((name.into(), InsertValue::CurrentDate));
782        self
783    }
784
785    /// Copy field from another field.
786    pub fn copy_field(mut self, name: impl Into<String>, source: impl Into<String>) -> Self {
787        self.fields
788            .push((name.into(), InsertValue::CopyFrom(source.into())));
789        self
790    }
791
792    /// Get value for insertion.
793    fn get_value(&self, source: &InsertValue, obj: &Map<String, Value>) -> Value {
794        match source {
795            InsertValue::Static(v) => v.clone(),
796            InsertValue::CurrentTimestamp => Value::String(chrono::Utc::now().to_rfc3339()),
797            InsertValue::CurrentDate => {
798                Value::String(chrono::Utc::now().format("%Y-%m-%d").to_string())
799            }
800            InsertValue::CopyFrom(field) => obj.get(field).cloned().unwrap_or(Value::Null),
801        }
802    }
803}
804
805impl Default for InsertField {
806    fn default() -> Self {
807        Self::new()
808    }
809}
810
811impl Smt for InsertField {
812    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
813        if let Some(after) = &mut event.after {
814            if let Some(obj) = after.as_object_mut() {
815                // Clone obj for reading while we insert
816                let obj_clone = obj.clone();
817                for (name, source) in &self.fields {
818                    let value = self.get_value(source, &obj_clone);
819                    obj.insert(name.clone(), value);
820                }
821            }
822        }
823
824        Some(event)
825    }
826
827    fn name(&self) -> &'static str {
828        "InsertField"
829    }
830}
831
832// ============================================================================
833// Filter - Drop events based on conditions
834// ============================================================================
835
836/// Filter condition.
837pub enum FilterCondition {
838    /// Field equals value
839    Equals { field: String, value: Value },
840    /// Field not equals value
841    NotEquals { field: String, value: Value },
842    /// Field is null
843    IsNull { field: String },
844    /// Field is not null
845    IsNotNull { field: String },
846    /// Field matches regex
847    Matches { field: String, pattern: String },
848    /// Field in list
849    In { field: String, values: Vec<Value> },
850    /// Custom predicate
851    Custom(Arc<dyn Fn(&CdcEvent) -> bool + Send + Sync>),
852    /// All conditions must match
853    And(Vec<FilterCondition>),
854    /// Any condition must match
855    Or(Vec<FilterCondition>),
856    /// Negate condition
857    Not(Box<FilterCondition>),
858}
859
860impl std::fmt::Debug for FilterCondition {
861    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
862        match self {
863            Self::Equals { field, value } => f
864                .debug_struct("Equals")
865                .field("field", field)
866                .field("value", value)
867                .finish(),
868            Self::NotEquals { field, value } => f
869                .debug_struct("NotEquals")
870                .field("field", field)
871                .field("value", value)
872                .finish(),
873            Self::IsNull { field } => f.debug_struct("IsNull").field("field", field).finish(),
874            Self::IsNotNull { field } => f.debug_struct("IsNotNull").field("field", field).finish(),
875            Self::Matches { field, pattern } => f
876                .debug_struct("Matches")
877                .field("field", field)
878                .field("pattern", pattern)
879                .finish(),
880            Self::In { field, values } => f
881                .debug_struct("In")
882                .field("field", field)
883                .field("values", values)
884                .finish(),
885            Self::Custom(_) => f.debug_struct("Custom").field("fn", &"<closure>").finish(),
886            Self::And(conditions) => f
887                .debug_struct("And")
888                .field("conditions", conditions)
889                .finish(),
890            Self::Or(conditions) => f
891                .debug_struct("Or")
892                .field("conditions", conditions)
893                .finish(),
894            Self::Not(condition) => f.debug_struct("Not").field("condition", condition).finish(),
895        }
896    }
897}
898
899/// Filter events based on conditions.
900pub struct Filter {
901    /// Condition to match
902    condition: FilterCondition,
903    /// Whether to keep matching events (true) or drop them (false)
904    keep_matching: bool,
905}
906
907impl Filter {
908    /// Keep events matching the condition.
909    pub fn keep(condition: FilterCondition) -> Self {
910        Self {
911            condition,
912            keep_matching: true,
913        }
914    }
915
916    /// Drop events matching the condition.
917    pub fn drop(condition: FilterCondition) -> Self {
918        Self {
919            condition,
920            keep_matching: false,
921        }
922    }
923
924    /// Check if condition matches.
925    fn matches(&self, event: &CdcEvent) -> bool {
926        self.check_condition(&self.condition, event)
927    }
928
929    fn check_condition(&self, condition: &FilterCondition, event: &CdcEvent) -> bool {
930        match condition {
931            FilterCondition::Equals { field, value } => self
932                .get_field_value(event, field)
933                .map(|v| v == value)
934                .unwrap_or(false),
935            FilterCondition::NotEquals { field, value } => self
936                .get_field_value(event, field)
937                .map(|v| v != value)
938                .unwrap_or(true),
939            FilterCondition::IsNull { field } => self
940                .get_field_value(event, field)
941                .map(|v| v.is_null())
942                .unwrap_or(true),
943            FilterCondition::IsNotNull { field } => self
944                .get_field_value(event, field)
945                .map(|v| !v.is_null())
946                .unwrap_or(false),
947            FilterCondition::Matches { field, pattern } => {
948                if let Ok(re) = Regex::new(pattern) {
949                    self.get_field_value(event, field)
950                        .and_then(|v| v.as_str().map(|s| re.is_match(s)))
951                        .unwrap_or(false)
952                } else {
953                    false
954                }
955            }
956            FilterCondition::In { field, values } => self
957                .get_field_value(event, field)
958                .map(|v| values.contains(v))
959                .unwrap_or(false),
960            FilterCondition::Custom(f) => f(event),
961            FilterCondition::And(conditions) => {
962                conditions.iter().all(|c| self.check_condition(c, event))
963            }
964            FilterCondition::Or(conditions) => {
965                conditions.iter().any(|c| self.check_condition(c, event))
966            }
967            FilterCondition::Not(c) => !self.check_condition(c, event),
968        }
969    }
970
971    fn get_field_value<'a>(&self, event: &'a CdcEvent, field: &str) -> Option<&'a Value> {
972        event
973            .after
974            .as_ref()
975            .or(event.before.as_ref())
976            .and_then(|v| v.as_object())
977            .and_then(|obj| obj.get(field))
978    }
979}
980
981impl Smt for Filter {
982    fn apply(&self, event: CdcEvent) -> Option<CdcEvent> {
983        let matches = self.matches(&event);
984        if (self.keep_matching && matches) || (!self.keep_matching && !matches) {
985            Some(event)
986        } else {
987            None
988        }
989    }
990
991    fn name(&self) -> &'static str {
992        "Filter"
993    }
994}
995
996// ============================================================================
997// Cast - Convert field types
998// ============================================================================
999
1000/// Target type for casting.
1001#[derive(Debug, Clone, Copy)]
1002pub enum CastType {
1003    String,
1004    Integer,
1005    Float,
1006    Boolean,
1007    Json,
1008}
1009
1010/// Cast field values to different types.
1011#[derive(Debug, Clone)]
1012pub struct Cast {
1013    /// Field -> target type mappings
1014    casts: HashMap<String, CastType>,
1015}
1016
1017impl Cast {
1018    /// Create a new caster.
1019    pub fn new() -> Self {
1020        Self {
1021            casts: HashMap::new(),
1022        }
1023    }
1024
1025    /// Add a cast rule.
1026    pub fn field(mut self, name: impl Into<String>, to: CastType) -> Self {
1027        self.casts.insert(name.into(), to);
1028        self
1029    }
1030
1031    /// Cast a value.
1032    fn cast_value(&self, value: &Value, to: CastType) -> Value {
1033        match to {
1034            CastType::String => match value {
1035                Value::String(s) => Value::String(s.clone()),
1036                v => Value::String(v.to_string()),
1037            },
1038            CastType::Integer => match value {
1039                Value::Number(n) => {
1040                    if let Some(i) = n.as_i64() {
1041                        Value::Number(i.into())
1042                    } else if let Some(f) = n.as_f64() {
1043                        Value::Number((f as i64).into())
1044                    } else {
1045                        value.clone()
1046                    }
1047                }
1048                Value::String(s) => s
1049                    .parse::<i64>()
1050                    .map(|i| Value::Number(i.into()))
1051                    .unwrap_or(Value::Null),
1052                Value::Bool(b) => Value::Number(if *b { 1 } else { 0 }.into()),
1053                _ => Value::Null,
1054            },
1055            CastType::Float => match value {
1056                Value::Number(n) => {
1057                    if let Some(f) = n.as_f64() {
1058                        serde_json::Number::from_f64(f)
1059                            .map(Value::Number)
1060                            .unwrap_or(value.clone())
1061                    } else {
1062                        value.clone()
1063                    }
1064                }
1065                Value::String(s) => s
1066                    .parse::<f64>()
1067                    .ok()
1068                    .and_then(serde_json::Number::from_f64)
1069                    .map(Value::Number)
1070                    .unwrap_or(Value::Null),
1071                _ => Value::Null,
1072            },
1073            CastType::Boolean => match value {
1074                Value::Bool(b) => Value::Bool(*b),
1075                Value::Number(n) => Value::Bool(n.as_i64().map(|i| i != 0).unwrap_or(false)),
1076                Value::String(s) => {
1077                    let lower = s.to_lowercase();
1078                    Value::Bool(lower == "true" || lower == "1" || lower == "yes")
1079                }
1080                _ => Value::Bool(false),
1081            },
1082            CastType::Json => match value {
1083                Value::String(s) => serde_json::from_str(s).unwrap_or(Value::String(s.clone())),
1084                v => v.clone(),
1085            },
1086        }
1087    }
1088}
1089
1090impl Default for Cast {
1091    fn default() -> Self {
1092        Self::new()
1093    }
1094}
1095
1096impl Smt for Cast {
1097    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
1098        if let Some(after) = &mut event.after {
1099            if let Some(obj) = after.as_object_mut() {
1100                for (field, cast_type) in &self.casts {
1101                    if let Some(value) = obj.get(field).cloned() {
1102                        obj.insert(field.clone(), self.cast_value(&value, *cast_type));
1103                    }
1104                }
1105            }
1106        }
1107
1108        if let Some(before) = &mut event.before {
1109            if let Some(obj) = before.as_object_mut() {
1110                for (field, cast_type) in &self.casts {
1111                    if let Some(value) = obj.get(field).cloned() {
1112                        obj.insert(field.clone(), self.cast_value(&value, *cast_type));
1113                    }
1114                }
1115            }
1116        }
1117
1118        Some(event)
1119    }
1120
1121    fn name(&self) -> &'static str {
1122        "Cast"
1123    }
1124}
1125
1126// ============================================================================
1127// Flatten - Flatten nested JSON structures
1128// ============================================================================
1129
1130/// Flatten nested JSON structures.
1131#[derive(Debug, Clone)]
1132pub struct Flatten {
1133    /// Delimiter between nested keys
1134    delimiter: String,
1135    /// Maximum depth to flatten (0 = unlimited)
1136    max_depth: usize,
1137}
1138
1139impl Default for Flatten {
1140    fn default() -> Self {
1141        Self::new()
1142    }
1143}
1144
1145impl Flatten {
1146    /// Create a new flattener with default settings.
1147    pub fn new() -> Self {
1148        Self {
1149            delimiter: ".".to_string(),
1150            max_depth: 0,
1151        }
1152    }
1153
1154    /// Set key delimiter.
1155    pub fn delimiter(mut self, delimiter: impl Into<String>) -> Self {
1156        self.delimiter = delimiter.into();
1157        self
1158    }
1159
1160    /// Set maximum depth.
1161    pub fn max_depth(mut self, depth: usize) -> Self {
1162        self.max_depth = depth;
1163        self
1164    }
1165
1166    /// Flatten a JSON object.
1167    fn flatten_object(&self, obj: &Map<String, Value>) -> Map<String, Value> {
1168        let mut result = Map::new();
1169        self.flatten_recursive(obj, "", &mut result, 0);
1170        result
1171    }
1172
1173    fn flatten_recursive(
1174        &self,
1175        obj: &Map<String, Value>,
1176        prefix: &str,
1177        result: &mut Map<String, Value>,
1178        depth: usize,
1179    ) {
1180        for (key, value) in obj {
1181            let new_key = if prefix.is_empty() {
1182                key.clone()
1183            } else {
1184                format!("{}{}{}", prefix, self.delimiter, key)
1185            };
1186
1187            if self.max_depth > 0 && depth >= self.max_depth {
1188                result.insert(new_key, value.clone());
1189            } else if let Some(nested) = value.as_object() {
1190                self.flatten_recursive(nested, &new_key, result, depth + 1);
1191            } else {
1192                result.insert(new_key, value.clone());
1193            }
1194        }
1195    }
1196}
1197
1198impl Smt for Flatten {
1199    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
1200        if let Some(after) = &event.after {
1201            if let Some(obj) = after.as_object() {
1202                event.after = Some(Value::Object(self.flatten_object(obj)));
1203            }
1204        }
1205
1206        if let Some(before) = &event.before {
1207            if let Some(obj) = before.as_object() {
1208                event.before = Some(Value::Object(self.flatten_object(obj)));
1209            }
1210        }
1211
1212        Some(event)
1213    }
1214
1215    fn name(&self) -> &'static str {
1216        "Flatten"
1217    }
1218}
1219
1220// ============================================================================
1221// RegexRouter - Route events to topics based on regex patterns
1222// ============================================================================
1223
1224/// Route events to topics based on regex patterns.
1225/// Stores the target topic in the "after" data as "__topic" field.
1226#[derive(Debug, Clone)]
1227pub struct RegexRouter {
1228    /// Routing rules (pattern -> topic)
1229    rules: Vec<(Regex, String)>,
1230    /// Default topic if no rules match
1231    default_topic: String,
1232}
1233
1234impl RegexRouter {
1235    /// Create a new router with a default topic.
1236    pub fn new(default_topic: impl Into<String>) -> Self {
1237        Self {
1238            rules: Vec::new(),
1239            default_topic: default_topic.into(),
1240        }
1241    }
1242
1243    /// Add a routing rule.
1244    pub fn route(mut self, pattern: &str, topic: impl Into<String>) -> Self {
1245        if let Ok(re) = Regex::new(pattern) {
1246            self.rules.push((re, topic.into()));
1247        } else {
1248            warn!("Invalid regex pattern: {}", pattern);
1249        }
1250        self
1251    }
1252
1253    /// Get the target topic for a table.
1254    pub fn get_topic(&self, schema: &str, table: &str) -> &str {
1255        let full_name = format!("{}.{}", schema, table);
1256
1257        for (pattern, topic) in &self.rules {
1258            if pattern.is_match(&full_name) {
1259                return topic;
1260            }
1261        }
1262
1263        &self.default_topic
1264    }
1265}
1266
1267impl Smt for RegexRouter {
1268    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
1269        let topic = self.get_topic(&event.schema, &event.table);
1270        // Store topic in after as metadata (can be used by downstream)
1271        if let Some(after) = &mut event.after {
1272            if let Some(obj) = after.as_object_mut() {
1273                obj.insert("__topic".to_string(), Value::String(topic.to_string()));
1274            }
1275        }
1276        Some(event)
1277    }
1278
1279    fn name(&self) -> &'static str {
1280        "RegexRouter"
1281    }
1282}
1283
1284// ============================================================================
1285// Predicate - Conditional transform application
1286// ============================================================================
1287
1288/// Predicate for conditionally applying transforms.
1289///
1290/// Predicates allow transforms to be applied only when
1291/// certain conditions are met.
1292pub enum Predicate {
1293    /// Apply only to specific tables
1294    Table { pattern: Regex },
1295    /// Apply only to specific schemas
1296    Schema { pattern: Regex },
1297    /// Apply only to specific operations
1298    Operation { ops: Vec<CdcOp> },
1299    /// Apply based on field value
1300    FieldValue { field: String, value: Value },
1301    /// Apply based on field existence
1302    FieldExists { field: String },
1303    /// Custom predicate function
1304    Custom(Arc<dyn Fn(&CdcEvent) -> bool + Send + Sync>),
1305    /// Combine predicates with AND
1306    And(Vec<Predicate>),
1307    /// Combine predicates with OR
1308    Or(Vec<Predicate>),
1309    /// Negate a predicate
1310    Not(Box<Predicate>),
1311}
1312
1313impl Predicate {
1314    /// Create a table pattern predicate.
1315    pub fn table(pattern: &str) -> Option<Self> {
1316        Regex::new(pattern)
1317            .ok()
1318            .map(|re| Predicate::Table { pattern: re })
1319    }
1320
1321    /// Create a schema pattern predicate.
1322    pub fn schema(pattern: &str) -> Option<Self> {
1323        Regex::new(pattern)
1324            .ok()
1325            .map(|re| Predicate::Schema { pattern: re })
1326    }
1327
1328    /// Create an operation predicate.
1329    pub fn operation(ops: Vec<CdcOp>) -> Self {
1330        Predicate::Operation { ops }
1331    }
1332
1333    /// Create a field value predicate.
1334    pub fn field_equals(field: impl Into<String>, value: Value) -> Self {
1335        Predicate::FieldValue {
1336            field: field.into(),
1337            value,
1338        }
1339    }
1340
1341    /// Create a field exists predicate.
1342    pub fn field_exists(field: impl Into<String>) -> Self {
1343        Predicate::FieldExists {
1344            field: field.into(),
1345        }
1346    }
1347
1348    /// Check if the predicate matches the event.
1349    pub fn matches(&self, event: &CdcEvent) -> bool {
1350        match self {
1351            Predicate::Table { pattern } => pattern.is_match(&event.table),
1352            Predicate::Schema { pattern } => pattern.is_match(&event.schema),
1353            Predicate::Operation { ops } => ops.contains(&event.op),
1354            Predicate::FieldValue { field, value } => event
1355                .after
1356                .as_ref()
1357                .or(event.before.as_ref())
1358                .and_then(|v| v.as_object())
1359                .and_then(|obj| obj.get(field))
1360                .map(|v| v == value)
1361                .unwrap_or(false),
1362            Predicate::FieldExists { field } => event
1363                .after
1364                .as_ref()
1365                .or(event.before.as_ref())
1366                .and_then(|v| v.as_object())
1367                .map(|obj| obj.contains_key(field))
1368                .unwrap_or(false),
1369            Predicate::Custom(f) => f(event),
1370            Predicate::And(predicates) => predicates.iter().all(|p| p.matches(event)),
1371            Predicate::Or(predicates) => predicates.iter().any(|p| p.matches(event)),
1372            Predicate::Not(p) => !p.matches(event),
1373        }
1374    }
1375}
1376
1377impl std::fmt::Debug for Predicate {
1378    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1379        match self {
1380            Self::Table { pattern } => f
1381                .debug_struct("Table")
1382                .field("pattern", &pattern.as_str())
1383                .finish(),
1384            Self::Schema { pattern } => f
1385                .debug_struct("Schema")
1386                .field("pattern", &pattern.as_str())
1387                .finish(),
1388            Self::Operation { ops } => f.debug_struct("Operation").field("ops", ops).finish(),
1389            Self::FieldValue { field, value } => f
1390                .debug_struct("FieldValue")
1391                .field("field", field)
1392                .field("value", value)
1393                .finish(),
1394            Self::FieldExists { field } => {
1395                f.debug_struct("FieldExists").field("field", field).finish()
1396            }
1397            Self::Custom(_) => f.debug_struct("Custom").field("fn", &"<closure>").finish(),
1398            Self::And(predicates) => f
1399                .debug_struct("And")
1400                .field("predicates", predicates)
1401                .finish(),
1402            Self::Or(predicates) => f
1403                .debug_struct("Or")
1404                .field("predicates", predicates)
1405                .finish(),
1406            Self::Not(predicate) => f.debug_struct("Not").field("predicate", predicate).finish(),
1407        }
1408    }
1409}
1410
1411/// Wrapper to apply a transform only when a predicate matches.
1412pub struct ConditionalSmt {
1413    /// The transform to apply
1414    transform: Arc<dyn Smt>,
1415    /// The predicate that must match
1416    predicate: Predicate,
1417    /// Whether to negate (apply when predicate doesn't match)
1418    negate: bool,
1419}
1420
1421impl ConditionalSmt {
1422    /// Apply transform only when predicate matches.
1423    pub fn when<T: Smt + 'static>(predicate: Predicate, transform: T) -> Self {
1424        Self {
1425            transform: Arc::new(transform),
1426            predicate,
1427            negate: false,
1428        }
1429    }
1430
1431    /// Apply transform only when predicate matches (takes Arc directly).
1432    pub fn when_arc(predicate: Predicate, transform: Arc<dyn Smt>) -> Self {
1433        Self {
1434            transform,
1435            predicate,
1436            negate: false,
1437        }
1438    }
1439
1440    /// Apply transform only when predicate does NOT match.
1441    pub fn unless<T: Smt + 'static>(predicate: Predicate, transform: T) -> Self {
1442        Self {
1443            transform: Arc::new(transform),
1444            predicate,
1445            negate: true,
1446        }
1447    }
1448
1449    /// Apply transform only when predicate does NOT match (takes Arc directly).
1450    pub fn unless_arc(predicate: Predicate, transform: Arc<dyn Smt>) -> Self {
1451        Self {
1452            transform,
1453            predicate,
1454            negate: true,
1455        }
1456    }
1457}
1458
1459impl Smt for ConditionalSmt {
1460    fn apply(&self, event: CdcEvent) -> Option<CdcEvent> {
1461        let matches = self.predicate.matches(&event);
1462        let should_apply = if self.negate { !matches } else { matches };
1463
1464        if should_apply {
1465            self.transform.apply(event)
1466        } else {
1467            Some(event)
1468        }
1469    }
1470
1471    fn name(&self) -> &'static str {
1472        "ConditionalSmt"
1473    }
1474}
1475
1476// ============================================================================
1477// HeaderToValue - Move headers/metadata into record value
1478// ============================================================================
1479
1480/// Move or copy envelope fields into the record value.
1481#[derive(Debug, Clone)]
1482pub struct HeaderToValue {
1483    /// Fields to insert: (target_field_name, source)
1484    fields: Vec<(String, HeaderSource)>,
1485    /// Operation mode
1486    mode: HeaderMode,
1487}
1488
1489/// Source of header value.
1490#[derive(Debug, Clone)]
1491pub enum HeaderSource {
1492    /// Source database type (postgres, mysql)
1493    SourceType,
1494    /// Database name
1495    Database,
1496    /// Schema name
1497    Schema,
1498    /// Table name
1499    Table,
1500    /// Operation code
1501    Operation,
1502    /// Event timestamp
1503    Timestamp,
1504    /// Transaction ID (if available)
1505    TransactionId,
1506}
1507
1508/// Mode for header operation.
1509#[derive(Debug, Clone, Copy, Default)]
1510pub enum HeaderMode {
1511    /// Copy values (keep in both places)
1512    #[default]
1513    Copy,
1514    /// Move values (only in target)
1515    Move,
1516}
1517
1518impl HeaderToValue {
1519    /// Create a new HeaderToValue transform.
1520    pub fn new() -> Self {
1521        Self {
1522            fields: Vec::new(),
1523            mode: HeaderMode::Copy,
1524        }
1525    }
1526
1527    /// Add a field to copy/move.
1528    pub fn field(mut self, target: impl Into<String>, source: HeaderSource) -> Self {
1529        self.fields.push((target.into(), source));
1530        self
1531    }
1532
1533    /// Set mode to move (not copy).
1534    pub fn move_mode(mut self) -> Self {
1535        self.mode = HeaderMode::Move;
1536        self
1537    }
1538
1539    /// Add all standard fields with prefixed names.
1540    pub fn all_headers(self, prefix: &str) -> Self {
1541        self.field(format!("{}source_type", prefix), HeaderSource::SourceType)
1542            .field(format!("{}database", prefix), HeaderSource::Database)
1543            .field(format!("{}schema", prefix), HeaderSource::Schema)
1544            .field(format!("{}table", prefix), HeaderSource::Table)
1545            .field(format!("{}op", prefix), HeaderSource::Operation)
1546            .field(format!("{}ts", prefix), HeaderSource::Timestamp)
1547    }
1548}
1549
1550impl Default for HeaderToValue {
1551    fn default() -> Self {
1552        Self::new()
1553    }
1554}
1555
1556impl Smt for HeaderToValue {
1557    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
1558        if let Some(after) = &mut event.after {
1559            if let Some(obj) = after.as_object_mut() {
1560                for (target, source) in &self.fields {
1561                    let value = match source {
1562                        HeaderSource::SourceType => Value::String(event.source_type.clone()),
1563                        HeaderSource::Database => Value::String(event.database.clone()),
1564                        HeaderSource::Schema => Value::String(event.schema.clone()),
1565                        HeaderSource::Table => Value::String(event.table.clone()),
1566                        HeaderSource::Operation => Value::String(op_to_code(&event.op).to_string()),
1567                        HeaderSource::Timestamp => Value::Number(event.timestamp.into()),
1568                        HeaderSource::TransactionId => event
1569                            .transaction
1570                            .as_ref()
1571                            .map(|t| Value::String(t.id.clone()))
1572                            .unwrap_or(Value::Null),
1573                    };
1574                    obj.insert(target.clone(), value);
1575                }
1576            }
1577        }
1578
1579        Some(event)
1580    }
1581
1582    fn name(&self) -> &'static str {
1583        "HeaderToValue"
1584    }
1585}
1586
1587// ============================================================================
1588// Unwrap - Extract nested field to top level
1589// ============================================================================
1590
1591/// Unwrap/extract a nested JSON field to the top level.
1592#[derive(Debug, Clone)]
1593pub struct Unwrap {
1594    /// The field path to unwrap (e.g., "payload.data")
1595    field_path: Vec<String>,
1596    /// Whether to replace the entire value or merge
1597    replace: bool,
1598}
1599
1600impl Unwrap {
1601    /// Create a new Unwrap transform.
1602    pub fn new(path: impl Into<String>) -> Self {
1603        let path_str = path.into();
1604        Self {
1605            field_path: path_str.split('.').map(String::from).collect(),
1606            replace: true,
1607        }
1608    }
1609
1610    /// Merge unwrapped fields instead of replacing.
1611    pub fn merge(mut self) -> Self {
1612        self.replace = false;
1613        self
1614    }
1615
1616    /// Extract value at path from a JSON value.
1617    fn extract_at_path<'a>(&self, value: &'a Value) -> Option<&'a Value> {
1618        let mut current = value;
1619        for key in &self.field_path {
1620            current = current.as_object()?.get(key)?;
1621        }
1622        Some(current)
1623    }
1624}
1625
1626impl Smt for Unwrap {
1627    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
1628        if let Some(after) = &event.after {
1629            if let Some(extracted) = self.extract_at_path(after) {
1630                let extracted_clone = extracted.clone();
1631                if self.replace {
1632                    event.after = Some(extracted_clone);
1633                } else if let Some(extracted_obj) = extracted_clone.as_object() {
1634                    if let Some(obj) = event.after.as_mut().and_then(|v| v.as_object_mut()) {
1635                        for (k, v) in extracted_obj {
1636                            obj.insert(k.clone(), v.clone());
1637                        }
1638                    }
1639                }
1640            }
1641        }
1642
1643        Some(event)
1644    }
1645
1646    fn name(&self) -> &'static str {
1647        "Unwrap"
1648    }
1649}
1650
1651// ============================================================================
1652// SetNull - Set fields to null based on condition
1653// ============================================================================
1654
1655/// Set fields to null based on conditions.
1656#[derive(Debug, Clone)]
1657pub struct SetNull {
1658    /// Fields to potentially nullify
1659    fields: Vec<String>,
1660    /// Condition for nullification
1661    condition: NullCondition,
1662}
1663
1664/// Condition for setting null.
1665#[derive(Debug, Clone)]
1666pub enum NullCondition {
1667    /// Always set to null
1668    Always,
1669    /// Set to null if empty string
1670    IfEmpty,
1671    /// Set to null if matches value
1672    IfEquals(Value),
1673    /// Set to null if matches pattern
1674    IfMatches(String),
1675}
1676
1677impl SetNull {
1678    /// Create a new SetNull transform.
1679    pub fn new<I, S>(fields: I) -> Self
1680    where
1681        I: IntoIterator<Item = S>,
1682        S: Into<String>,
1683    {
1684        Self {
1685            fields: fields.into_iter().map(Into::into).collect(),
1686            condition: NullCondition::Always,
1687        }
1688    }
1689
1690    /// Set condition for nullification.
1691    pub fn when(mut self, condition: NullCondition) -> Self {
1692        self.condition = condition;
1693        self
1694    }
1695
1696    /// Check if value should be nullified.
1697    fn should_nullify(&self, value: &Value) -> bool {
1698        match &self.condition {
1699            NullCondition::Always => true,
1700            NullCondition::IfEmpty => value.as_str().map(|s| s.is_empty()).unwrap_or(false),
1701            NullCondition::IfEquals(target) => value == target,
1702            NullCondition::IfMatches(pattern) => {
1703                if let (Ok(re), Some(s)) = (Regex::new(pattern), value.as_str()) {
1704                    re.is_match(s)
1705                } else {
1706                    false
1707                }
1708            }
1709        }
1710    }
1711}
1712
1713impl Smt for SetNull {
1714    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
1715        if let Some(after) = &mut event.after {
1716            if let Some(obj) = after.as_object_mut() {
1717                for field in &self.fields {
1718                    if let Some(value) = obj.get(field).cloned() {
1719                        if self.should_nullify(&value) {
1720                            obj.insert(field.clone(), Value::Null);
1721                        }
1722                    }
1723                }
1724            }
1725        }
1726
1727        Some(event)
1728    }
1729
1730    fn name(&self) -> &'static str {
1731        "SetNull"
1732    }
1733}
1734
1735// ============================================================================
1736// TimezoneConverter - Convert timestamp fields between timezones
1737// ============================================================================
1738
1739/// Convert timestamp fields from one timezone to another.
1740///
1741/// # Example
1742/// ```rust,ignore
1743/// // Convert from UTC to EST
1744/// let smt = TimezoneConverter::new(["created_at", "updated_at"])
1745///     .from("UTC")
1746///     .to("America/New_York");
1747/// ```
1748#[derive(Clone)]
1749pub struct TimezoneConverter {
1750    /// Fields to convert
1751    fields: Vec<String>,
1752    /// Source timezone (default: UTC)
1753    source_tz: chrono_tz::Tz,
1754    /// Target timezone
1755    target_tz: chrono_tz::Tz,
1756    /// Include time component in output
1757    include_time: bool,
1758    /// Output format override (uses ISO8601 if None)
1759    format: Option<String>,
1760}
1761
1762impl std::fmt::Debug for TimezoneConverter {
1763    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1764        f.debug_struct("TimezoneConverter")
1765            .field("fields", &self.fields)
1766            .field("source_tz", &self.source_tz.to_string())
1767            .field("target_tz", &self.target_tz.to_string())
1768            .field("include_time", &self.include_time)
1769            .field("format", &self.format)
1770            .finish()
1771    }
1772}
1773
1774impl TimezoneConverter {
1775    /// Create a new timezone converter for specific fields.
1776    pub fn new<I, S>(fields: I) -> Self
1777    where
1778        I: IntoIterator<Item = S>,
1779        S: Into<String>,
1780    {
1781        Self {
1782            fields: fields.into_iter().map(Into::into).collect(),
1783            source_tz: chrono_tz::UTC,
1784            target_tz: chrono_tz::UTC,
1785            include_time: true,
1786            format: None,
1787        }
1788    }
1789
1790    /// Set source timezone.
1791    pub fn from(mut self, tz: &str) -> Self {
1792        if let Ok(parsed) = tz.parse::<chrono_tz::Tz>() {
1793            self.source_tz = parsed;
1794        } else {
1795            warn!("Invalid source timezone '{}', using UTC", tz);
1796        }
1797        self
1798    }
1799
1800    /// Set target timezone.
1801    pub fn to(mut self, tz: &str) -> Self {
1802        if let Ok(parsed) = tz.parse::<chrono_tz::Tz>() {
1803            self.target_tz = parsed;
1804        } else {
1805            warn!("Invalid target timezone '{}', using UTC", tz);
1806        }
1807        self
1808    }
1809
1810    /// Exclude time component (date only).
1811    pub fn date_only(mut self) -> Self {
1812        self.include_time = false;
1813        self
1814    }
1815
1816    /// Set custom output format (strftime).
1817    pub fn format(mut self, fmt: impl Into<String>) -> Self {
1818        self.format = Some(fmt.into());
1819        self
1820    }
1821
1822    /// Convert a timestamp value.
1823    fn convert_value(&self, value: &Value) -> Option<Value> {
1824        let dt = self.parse_timestamp(value)?;
1825
1826        // Convert from source to target timezone
1827        let converted = dt.with_timezone(&self.target_tz);
1828
1829        let result = if let Some(ref fmt) = self.format {
1830            converted.format(fmt).to_string()
1831        } else if self.include_time {
1832            converted.to_rfc3339()
1833        } else {
1834            converted.format("%Y-%m-%d").to_string()
1835        };
1836
1837        Some(Value::String(result))
1838    }
1839
1840    /// Parse various timestamp formats.
1841    fn parse_timestamp(&self, value: &Value) -> Option<DateTime<chrono_tz::Tz>> {
1842        match value {
1843            Value::String(s) => {
1844                // Try ISO 8601 with timezone
1845                if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1846                    return Some(dt.with_timezone(&self.source_tz));
1847                }
1848                // Try without timezone (assume source_tz)
1849                if let Ok(naive) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1850                    return self.source_tz.from_local_datetime(&naive).single();
1851                }
1852                if let Ok(naive) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1853                    return self.source_tz.from_local_datetime(&naive).single();
1854                }
1855                // Date only
1856                if let Ok(date) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1857                    let naive = date.and_hms_opt(0, 0, 0)?;
1858                    return self.source_tz.from_local_datetime(&naive).single();
1859                }
1860                None
1861            }
1862            Value::Number(n) => {
1863                let ts = n.as_i64()?;
1864                let dt = if ts > 1_000_000_000_000 {
1865                    DateTime::from_timestamp_millis(ts)?
1866                } else {
1867                    DateTime::from_timestamp(ts, 0)?
1868                };
1869                Some(dt.with_timezone(&self.source_tz))
1870            }
1871            _ => None,
1872        }
1873    }
1874}
1875
1876impl Smt for TimezoneConverter {
1877    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
1878        // Convert in "after"
1879        if let Some(after) = &mut event.after {
1880            if let Some(obj) = after.as_object_mut() {
1881                for field in &self.fields {
1882                    if let Some(value) = obj.get(field).cloned() {
1883                        if let Some(converted) = self.convert_value(&value) {
1884                            obj.insert(field.clone(), converted);
1885                        }
1886                    }
1887                }
1888            }
1889        }
1890
1891        // Convert in "before"
1892        if let Some(before) = &mut event.before {
1893            if let Some(obj) = before.as_object_mut() {
1894                for field in &self.fields {
1895                    if let Some(value) = obj.get(field).cloned() {
1896                        if let Some(converted) = self.convert_value(&value) {
1897                            obj.insert(field.clone(), converted);
1898                        }
1899                    }
1900                }
1901            }
1902        }
1903
1904        Some(event)
1905    }
1906
1907    fn name(&self) -> &'static str {
1908        "TimezoneConverter"
1909    }
1910}
1911
1912// ============================================================================
1913// ContentRouter - Route events based on field content
1914// ============================================================================
1915
1916/// Content-based routing SMT.
1917///
1918/// Routes events to different topics based on field values.
1919/// Adds `__routing_topic` field to the event for downstream processing.
1920///
1921/// # Example
1922/// ```rust,ignore
1923/// let router = ContentRouter::new()
1924///     .route("priority", "high", "priority-events")
1925///     .route("priority", "low", "batch-events")
1926///     .route_pattern("category", r"^urgent", "urgent-events")
1927///     .default_topic("default-events");
1928/// ```
1929#[derive(Clone)]
1930pub struct ContentRouter {
1931    /// Field-value to topic mappings
1932    routes: Vec<ContentRoute>,
1933    /// Default topic if no match
1934    default_topic: Option<String>,
1935    /// Topic field name to set
1936    topic_field: String,
1937}
1938
1939/// A single content-based route.
1940#[derive(Clone)]
1941struct ContentRoute {
1942    field: String,
1943    matcher: RouteMatcher,
1944    topic: String,
1945}
1946
1947/// Route matching strategy.
1948#[derive(Clone)]
1949enum RouteMatcher {
1950    /// Exact value match
1951    Exact(Value),
1952    /// Regex pattern match
1953    Pattern(Regex),
1954    /// Value in set
1955    In(HashSet<String>),
1956    /// Custom predicate
1957    Predicate(Arc<dyn Fn(&Value) -> bool + Send + Sync>),
1958}
1959
1960impl std::fmt::Debug for ContentRouter {
1961    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1962        f.debug_struct("ContentRouter")
1963            .field("routes_count", &self.routes.len())
1964            .field("default_topic", &self.default_topic)
1965            .field("topic_field", &self.topic_field)
1966            .finish()
1967    }
1968}
1969
1970impl Default for ContentRouter {
1971    fn default() -> Self {
1972        Self::new()
1973    }
1974}
1975
1976impl ContentRouter {
1977    /// Create a new content router.
1978    pub fn new() -> Self {
1979        Self {
1980            routes: Vec::new(),
1981            default_topic: None,
1982            topic_field: "__routing_topic".to_string(),
1983        }
1984    }
1985
1986    /// Add exact value route.
1987    pub fn route(
1988        mut self,
1989        field: impl Into<String>,
1990        value: impl Into<Value>,
1991        topic: impl Into<String>,
1992    ) -> Self {
1993        self.routes.push(ContentRoute {
1994            field: field.into(),
1995            matcher: RouteMatcher::Exact(value.into()),
1996            topic: topic.into(),
1997        });
1998        self
1999    }
2000
2001    /// Add pattern-based route.
2002    pub fn route_pattern(
2003        mut self,
2004        field: impl Into<String>,
2005        pattern: &str,
2006        topic: impl Into<String>,
2007    ) -> Self {
2008        if let Ok(re) = Regex::new(pattern) {
2009            self.routes.push(ContentRoute {
2010                field: field.into(),
2011                matcher: RouteMatcher::Pattern(re),
2012                topic: topic.into(),
2013            });
2014        }
2015        self
2016    }
2017
2018    /// Add set membership route.
2019    pub fn route_in<I, S>(
2020        mut self,
2021        field: impl Into<String>,
2022        values: I,
2023        topic: impl Into<String>,
2024    ) -> Self
2025    where
2026        I: IntoIterator<Item = S>,
2027        S: Into<String>,
2028    {
2029        self.routes.push(ContentRoute {
2030            field: field.into(),
2031            matcher: RouteMatcher::In(values.into_iter().map(Into::into).collect()),
2032            topic: topic.into(),
2033        });
2034        self
2035    }
2036
2037    /// Add predicate-based route.
2038    pub fn route_if<F>(
2039        mut self,
2040        field: impl Into<String>,
2041        predicate: F,
2042        topic: impl Into<String>,
2043    ) -> Self
2044    where
2045        F: Fn(&Value) -> bool + Send + Sync + 'static,
2046    {
2047        self.routes.push(ContentRoute {
2048            field: field.into(),
2049            matcher: RouteMatcher::Predicate(Arc::new(predicate)),
2050            topic: topic.into(),
2051        });
2052        self
2053    }
2054
2055    /// Set default topic.
2056    pub fn default_topic(mut self, topic: impl Into<String>) -> Self {
2057        self.default_topic = Some(topic.into());
2058        self
2059    }
2060
2061    /// Set the field name for routing topic.
2062    pub fn topic_field(mut self, field: impl Into<String>) -> Self {
2063        self.topic_field = field.into();
2064        self
2065    }
2066
2067    /// Find matching topic for event.
2068    fn find_topic(&self, event: &CdcEvent) -> Option<&str> {
2069        let after = event.after.as_ref()?;
2070        let obj = after.as_object()?;
2071
2072        for route in &self.routes {
2073            if let Some(value) = obj.get(&route.field) {
2074                let matches = match &route.matcher {
2075                    RouteMatcher::Exact(expected) => value == expected,
2076                    RouteMatcher::Pattern(re) => {
2077                        value.as_str().map(|s| re.is_match(s)).unwrap_or(false)
2078                    }
2079                    RouteMatcher::In(set) => {
2080                        value.as_str().map(|s| set.contains(s)).unwrap_or(false)
2081                    }
2082                    RouteMatcher::Predicate(f) => f(value),
2083                };
2084
2085                if matches {
2086                    return Some(&route.topic);
2087                }
2088            }
2089        }
2090
2091        self.default_topic.as_deref()
2092    }
2093}
2094
2095impl Smt for ContentRouter {
2096    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
2097        if let Some(topic) = self.find_topic(&event) {
2098            if let Some(after) = &mut event.after {
2099                if let Some(obj) = after.as_object_mut() {
2100                    obj.insert(self.topic_field.clone(), Value::String(topic.to_string()));
2101                }
2102            }
2103        }
2104        Some(event)
2105    }
2106
2107    fn name(&self) -> &'static str {
2108        "ContentRouter"
2109    }
2110}
2111
2112// ============================================================================
2113// ComputeField - Compute new fields from expressions
2114// ============================================================================
2115
2116/// Compute fields from expressions.
2117///
2118/// Supports various computation types:
2119/// - String concatenation
2120/// - Numeric operations
2121/// - JSON path extraction
2122/// - Hash computation
2123/// - Timestamp operations
2124///
2125/// # Example
2126/// ```rust,ignore
2127/// let smt = ComputeField::new()
2128///     .concat("full_name", ["first_name", " ", "last_name"])
2129///     .hash("user_hash", ["id", "email"])
2130///     .coalesce("display_name", ["nickname", "username", "email"]);
2131/// ```
2132#[derive(Clone)]
2133pub struct ComputeField {
2134    /// Computations to apply
2135    computations: Vec<FieldComputation>,
2136}
2137
2138/// A single field computation.
2139#[derive(Clone)]
2140struct FieldComputation {
2141    target: String,
2142    operation: ComputeOp,
2143}
2144
2145/// Computation operation.
2146#[derive(Clone)]
2147enum ComputeOp {
2148    /// Concatenate fields/literals
2149    Concat(Vec<ConcatPart>),
2150    /// Hash multiple fields (SHA-256)
2151    Hash(Vec<String>),
2152    /// First non-null value
2153    Coalesce(Vec<String>),
2154    /// Numeric sum
2155    Sum(Vec<String>),
2156    /// String length
2157    Length(String),
2158    /// Uppercase
2159    Upper(String),
2160    /// Lowercase
2161    Lower(String),
2162    /// Substring
2163    Substring(String, usize, Option<usize>),
2164    /// Replace pattern
2165    Replace(String, String, String),
2166    /// Current timestamp
2167    CurrentTimestamp,
2168    /// UUID v4
2169    Uuid,
2170    /// JSON path extraction
2171    JsonPath(String, String),
2172    /// Conditional value
2173    Conditional(ComputeCondition, Value, Value),
2174}
2175
2176/// Part of a concatenation.
2177#[derive(Clone)]
2178enum ConcatPart {
2179    Field(String),
2180    Literal(String),
2181}
2182
2183/// Condition for conditional compute.
2184#[derive(Clone)]
2185pub enum ComputeCondition {
2186    /// Field equals value
2187    FieldEquals(String, Value),
2188    /// Field is null
2189    FieldIsNull(String),
2190    /// Field matches pattern
2191    FieldMatches(String, String),
2192}
2193
2194impl std::fmt::Debug for ComputeField {
2195    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2196        f.debug_struct("ComputeField")
2197            .field("computations_count", &self.computations.len())
2198            .finish()
2199    }
2200}
2201
2202impl Default for ComputeField {
2203    fn default() -> Self {
2204        Self::new()
2205    }
2206}
2207
2208impl ComputeField {
2209    /// Create a new compute field transform.
2210    pub fn new() -> Self {
2211        Self {
2212            computations: Vec::new(),
2213        }
2214    }
2215
2216    /// Concatenate fields and literals.
2217    pub fn concat<I>(mut self, target: impl Into<String>, parts: I) -> Self
2218    where
2219        I: IntoIterator,
2220        I::Item: Into<String>,
2221    {
2222        let parts: Vec<ConcatPart> = parts
2223            .into_iter()
2224            .map(|p| {
2225                let s: String = p.into();
2226                // If starts with $, it's a field reference
2227                if let Some(field_name) = s.strip_prefix('$') {
2228                    ConcatPart::Field(field_name.to_string())
2229                } else {
2230                    ConcatPart::Literal(s)
2231                }
2232            })
2233            .collect();
2234
2235        self.computations.push(FieldComputation {
2236            target: target.into(),
2237            operation: ComputeOp::Concat(parts),
2238        });
2239        self
2240    }
2241
2242    /// Hash multiple fields together (SHA-256).
2243    pub fn hash<I, S>(mut self, target: impl Into<String>, fields: I) -> Self
2244    where
2245        I: IntoIterator<Item = S>,
2246        S: Into<String>,
2247    {
2248        self.computations.push(FieldComputation {
2249            target: target.into(),
2250            operation: ComputeOp::Hash(fields.into_iter().map(Into::into).collect()),
2251        });
2252        self
2253    }
2254
2255    /// First non-null value from fields.
2256    pub fn coalesce<I, S>(mut self, target: impl Into<String>, fields: I) -> Self
2257    where
2258        I: IntoIterator<Item = S>,
2259        S: Into<String>,
2260    {
2261        self.computations.push(FieldComputation {
2262            target: target.into(),
2263            operation: ComputeOp::Coalesce(fields.into_iter().map(Into::into).collect()),
2264        });
2265        self
2266    }
2267
2268    /// Sum numeric fields.
2269    pub fn sum<I, S>(mut self, target: impl Into<String>, fields: I) -> Self
2270    where
2271        I: IntoIterator<Item = S>,
2272        S: Into<String>,
2273    {
2274        self.computations.push(FieldComputation {
2275            target: target.into(),
2276            operation: ComputeOp::Sum(fields.into_iter().map(Into::into).collect()),
2277        });
2278        self
2279    }
2280
2281    /// String length of field.
2282    pub fn length(mut self, target: impl Into<String>, field: impl Into<String>) -> Self {
2283        self.computations.push(FieldComputation {
2284            target: target.into(),
2285            operation: ComputeOp::Length(field.into()),
2286        });
2287        self
2288    }
2289
2290    /// Uppercase field value.
2291    pub fn upper(mut self, target: impl Into<String>, field: impl Into<String>) -> Self {
2292        self.computations.push(FieldComputation {
2293            target: target.into(),
2294            operation: ComputeOp::Upper(field.into()),
2295        });
2296        self
2297    }
2298
2299    /// Lowercase field value.
2300    pub fn lower(mut self, target: impl Into<String>, field: impl Into<String>) -> Self {
2301        self.computations.push(FieldComputation {
2302            target: target.into(),
2303            operation: ComputeOp::Lower(field.into()),
2304        });
2305        self
2306    }
2307
2308    /// Substring of field.
2309    pub fn substring(
2310        mut self,
2311        target: impl Into<String>,
2312        field: impl Into<String>,
2313        start: usize,
2314        len: Option<usize>,
2315    ) -> Self {
2316        self.computations.push(FieldComputation {
2317            target: target.into(),
2318            operation: ComputeOp::Substring(field.into(), start, len),
2319        });
2320        self
2321    }
2322
2323    /// Replace pattern in field.
2324    pub fn replace(
2325        mut self,
2326        target: impl Into<String>,
2327        field: impl Into<String>,
2328        pattern: impl Into<String>,
2329        replacement: impl Into<String>,
2330    ) -> Self {
2331        self.computations.push(FieldComputation {
2332            target: target.into(),
2333            operation: ComputeOp::Replace(field.into(), pattern.into(), replacement.into()),
2334        });
2335        self
2336    }
2337
2338    /// Add current timestamp.
2339    pub fn current_timestamp(mut self, target: impl Into<String>) -> Self {
2340        self.computations.push(FieldComputation {
2341            target: target.into(),
2342            operation: ComputeOp::CurrentTimestamp,
2343        });
2344        self
2345    }
2346
2347    /// Add UUID v4.
2348    pub fn uuid(mut self, target: impl Into<String>) -> Self {
2349        self.computations.push(FieldComputation {
2350            target: target.into(),
2351            operation: ComputeOp::Uuid,
2352        });
2353        self
2354    }
2355
2356    /// Extract value using JSON path.
2357    pub fn json_path(
2358        mut self,
2359        target: impl Into<String>,
2360        field: impl Into<String>,
2361        path: impl Into<String>,
2362    ) -> Self {
2363        self.computations.push(FieldComputation {
2364            target: target.into(),
2365            operation: ComputeOp::JsonPath(field.into(), path.into()),
2366        });
2367        self
2368    }
2369
2370    /// Conditional value.
2371    pub fn conditional(
2372        mut self,
2373        target: impl Into<String>,
2374        condition: ComputeCondition,
2375        then_value: impl Into<Value>,
2376        else_value: impl Into<Value>,
2377    ) -> Self {
2378        self.computations.push(FieldComputation {
2379            target: target.into(),
2380            operation: ComputeOp::Conditional(condition, then_value.into(), else_value.into()),
2381        });
2382        self
2383    }
2384
2385    /// Execute a computation.
2386    fn compute(&self, op: &ComputeOp, obj: &Map<String, Value>) -> Option<Value> {
2387        match op {
2388            ComputeOp::Concat(parts) => {
2389                let mut result = String::new();
2390                for part in parts {
2391                    match part {
2392                        ConcatPart::Literal(s) => result.push_str(s),
2393                        ConcatPart::Field(f) => {
2394                            if let Some(v) = obj.get(f) {
2395                                match v {
2396                                    Value::String(s) => result.push_str(s),
2397                                    Value::Number(n) => result.push_str(&n.to_string()),
2398                                    Value::Bool(b) => result.push_str(&b.to_string()),
2399                                    _ => {}
2400                                }
2401                            }
2402                        }
2403                    }
2404                }
2405                Some(Value::String(result))
2406            }
2407            ComputeOp::Hash(fields) => {
2408                use sha2::{Digest, Sha256};
2409                let mut data = Vec::new();
2410                for field in fields {
2411                    if let Some(v) = obj.get(field) {
2412                        data.extend_from_slice(
2413                            serde_json::to_string(v).unwrap_or_default().as_bytes(),
2414                        );
2415                    }
2416                }
2417                let hash = Sha256::digest(&data);
2418                Some(Value::String(hex::encode(hash)))
2419            }
2420            ComputeOp::Coalesce(fields) => {
2421                for field in fields {
2422                    if let Some(v) = obj.get(field) {
2423                        if !v.is_null() {
2424                            return Some(v.clone());
2425                        }
2426                    }
2427                }
2428                None
2429            }
2430            ComputeOp::Sum(fields) => {
2431                let mut sum = 0.0;
2432                for field in fields {
2433                    if let Some(v) = obj.get(field) {
2434                        if let Some(n) = v.as_f64() {
2435                            sum += n;
2436                        }
2437                    }
2438                }
2439                Some(Value::from(sum))
2440            }
2441            ComputeOp::Length(field) => obj
2442                .get(field)
2443                .and_then(|v| v.as_str())
2444                .map(|s| Value::from(s.len() as i64)),
2445            ComputeOp::Upper(field) => obj
2446                .get(field)
2447                .and_then(|v| v.as_str())
2448                .map(|s| Value::String(s.to_uppercase())),
2449            ComputeOp::Lower(field) => obj
2450                .get(field)
2451                .and_then(|v| v.as_str())
2452                .map(|s| Value::String(s.to_lowercase())),
2453            ComputeOp::Substring(field, start, len) => {
2454                obj.get(field).and_then(|v| v.as_str()).map(|s| {
2455                    let chars: Vec<char> = s.chars().collect();
2456                    let end = len
2457                        .map(|l| (*start + l).min(chars.len()))
2458                        .unwrap_or(chars.len());
2459                    let substr: String = chars[*start.min(&chars.len())..end].iter().collect();
2460                    Value::String(substr)
2461                })
2462            }
2463            ComputeOp::Replace(field, pattern, replacement) => {
2464                if let Ok(re) = Regex::new(pattern) {
2465                    obj.get(field)
2466                        .and_then(|v| v.as_str())
2467                        .map(|s| Value::String(re.replace_all(s, replacement.as_str()).to_string()))
2468                } else {
2469                    None
2470                }
2471            }
2472            ComputeOp::CurrentTimestamp => Some(Value::String(Utc::now().to_rfc3339())),
2473            ComputeOp::Uuid => {
2474                // Generate UUID v4 using the uuid crate
2475                Some(Value::String(uuid::Uuid::new_v4().to_string()))
2476            }
2477            ComputeOp::JsonPath(field, path) => {
2478                // Simple JSON path: "nested.field.value"
2479                obj.get(field).and_then(|v| {
2480                    let parts: Vec<&str> = path.split('.').collect();
2481                    let mut current = v;
2482                    for part in parts {
2483                        current = current.get(part)?;
2484                    }
2485                    Some(current.clone())
2486                })
2487            }
2488            ComputeOp::Conditional(cond, then_val, else_val) => {
2489                let matches = match cond {
2490                    ComputeCondition::FieldEquals(f, v) => {
2491                        obj.get(f).map(|fv| fv == v).unwrap_or(false)
2492                    }
2493                    ComputeCondition::FieldIsNull(f) => {
2494                        obj.get(f).map(|v| v.is_null()).unwrap_or(true)
2495                    }
2496                    ComputeCondition::FieldMatches(f, pattern) => {
2497                        if let (Some(v), Ok(re)) = (obj.get(f), Regex::new(pattern)) {
2498                            v.as_str().map(|s| re.is_match(s)).unwrap_or(false)
2499                        } else {
2500                            false
2501                        }
2502                    }
2503                };
2504                Some(if matches {
2505                    then_val.clone()
2506                } else {
2507                    else_val.clone()
2508                })
2509            }
2510        }
2511    }
2512}
2513
2514impl Smt for ComputeField {
2515    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
2516        if let Some(after) = &mut event.after {
2517            if let Some(obj) = after.as_object_mut() {
2518                for comp in &self.computations {
2519                    // Clone the object for read-only access
2520                    let obj_clone = obj.clone();
2521                    if let Some(value) = self.compute(&comp.operation, &obj_clone) {
2522                        obj.insert(comp.target.clone(), value);
2523                    }
2524                }
2525            }
2526        }
2527
2528        Some(event)
2529    }
2530
2531    fn name(&self) -> &'static str {
2532        "ComputeField"
2533    }
2534}
2535
2536// ============================================================================
2537// ExternalizeBlob - Store large blobs in object storage
2538// ============================================================================
2539
2540/// Externalize large blob values to object storage.
2541///
2542/// This SMT detects fields containing large binary data (base64-encoded strings
2543/// or large JSON values) and uploads them to object storage (S3/GCS/Azure/local),
2544/// replacing the original value with a reference containing the storage URL.
2545///
2546/// # Use Cases
2547///
2548/// - Reduce message size for topics with blob fields (images, documents, binary data)
2549/// - Improve Kafka/Rivven throughput by offloading large payloads
2550/// - Enable efficient storage of binary data with proper content-type handling
2551///
2552/// # Configuration Options
2553///
2554/// - **size_threshold**: Minimum byte size to externalize (default: 10KB)
2555/// - **fields**: Specific fields to check, or all if empty
2556/// - **prefix**: Object key prefix in storage
2557/// - **url_format**: Format for reference URL (s3://, gs://, https://)
2558///
2559/// # Reference Format
2560///
2561/// The original field value is replaced with a JSON object:
2562///
2563/// ```json
2564/// {
2565///   "__externalized": true,
2566///   "url": "s3://bucket/prefix/table/field/abc123.bin",
2567///   "size": 1048576,
2568///   "content_type": "application/octet-stream",
2569///   "sha256": "a1b2c3..."
2570/// }
2571/// ```
2572///
2573/// # Example
2574///
2575/// ```rust,ignore
2576/// use rivven_cdc::common::smt::ExternalizeBlob;
2577///
2578/// // Using pre-configured object store
2579/// let smt = ExternalizeBlob::new(object_store)
2580///     .size_threshold(1024)  // 1KB minimum
2581///     .fields(["image_data", "document"])
2582///     .prefix("cdc-blobs/");
2583///
2584/// let transformed = smt.apply(event);
2585/// ```
2586///
2587/// # Storage Keys
2588///
2589/// Objects are stored with keys in the format:
2590/// `{prefix}/{table}/{field}/{timestamp}_{uuid}.bin`
2591///
2592/// This ensures uniqueness and allows easy browsing by table/field.
2593#[cfg(feature = "cloud-storage")]
2594#[derive(Debug, Clone)]
2595pub struct ExternalizeBlob {
2596    /// Object store backend
2597    store: Arc<dyn object_store::ObjectStore>,
2598    /// Bucket/container name (for URL generation)
2599    bucket: String,
2600    /// Minimum size in bytes to externalize (default: 10KB)
2601    size_threshold: usize,
2602    /// Fields to check (empty = all fields)
2603    fields: HashSet<String>,
2604    /// Object key prefix
2605    prefix: String,
2606    /// URL scheme for references (s3://, gs://, https://, etc.)
2607    url_scheme: String,
2608}
2609
2610#[cfg(feature = "cloud-storage")]
2611impl ExternalizeBlob {
2612    /// Create a new ExternalizeBlob SMT with the given object store.
2613    ///
2614    /// # Arguments
2615    ///
2616    /// * `store` - The object store backend (S3, GCS, Azure, or local)
2617    /// * `bucket` - Bucket/container name for URL generation
2618    pub fn new(store: Arc<dyn object_store::ObjectStore>, bucket: impl Into<String>) -> Self {
2619        Self {
2620            store,
2621            bucket: bucket.into(),
2622            size_threshold: 10 * 1024, // 10KB default
2623            fields: HashSet::new(),
2624            prefix: String::new(),
2625            url_scheme: "s3://".to_string(),
2626        }
2627    }
2628
2629    /// Create an ExternalizeBlob for Amazon S3.
2630    #[cfg(feature = "s3")]
2631    pub fn s3(
2632        bucket: impl Into<String>,
2633        region: impl Into<String>,
2634    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
2635        use object_store::aws::AmazonS3Builder;
2636
2637        let bucket = bucket.into();
2638        let store = AmazonS3Builder::new()
2639            .with_bucket_name(&bucket)
2640            .with_region(region)
2641            .with_allow_http(false)
2642            .build()?;
2643
2644        Ok(Self {
2645            store: Arc::new(store),
2646            bucket,
2647            size_threshold: 10 * 1024,
2648            fields: HashSet::new(),
2649            prefix: String::new(),
2650            url_scheme: "s3://".to_string(),
2651        })
2652    }
2653
2654    /// Create an ExternalizeBlob for Google Cloud Storage.
2655    #[cfg(feature = "gcs")]
2656    pub fn gcs(
2657        bucket: impl Into<String>,
2658    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
2659        use object_store::gcp::GoogleCloudStorageBuilder;
2660
2661        let bucket = bucket.into();
2662        let store = GoogleCloudStorageBuilder::new()
2663            .with_bucket_name(&bucket)
2664            .build()?;
2665
2666        Ok(Self {
2667            store: Arc::new(store),
2668            bucket,
2669            size_threshold: 10 * 1024,
2670            fields: HashSet::new(),
2671            prefix: String::new(),
2672            url_scheme: "gs://".to_string(),
2673        })
2674    }
2675
2676    /// Create an ExternalizeBlob for Azure Blob Storage.
2677    #[cfg(feature = "azure")]
2678    pub fn azure(
2679        account: impl Into<String>,
2680        container: impl Into<String>,
2681    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
2682        use object_store::azure::MicrosoftAzureBuilder;
2683
2684        let container = container.into();
2685        let store = MicrosoftAzureBuilder::new()
2686            .with_account(account)
2687            .with_container_name(&container)
2688            .build()?;
2689
2690        Ok(Self {
2691            store: Arc::new(store),
2692            bucket: container,
2693            size_threshold: 10 * 1024,
2694            fields: HashSet::new(),
2695            prefix: String::new(),
2696            url_scheme: "https://".to_string(),
2697        })
2698    }
2699
2700    /// Create an ExternalizeBlob using local filesystem (for testing).
2701    pub fn local(
2702        path: impl Into<std::path::PathBuf>,
2703    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
2704        use object_store::local::LocalFileSystem;
2705
2706        let path: std::path::PathBuf = path.into();
2707        let store = LocalFileSystem::new_with_prefix(&path)?;
2708
2709        Ok(Self {
2710            store: Arc::new(store),
2711            bucket: path.to_string_lossy().to_string(),
2712            size_threshold: 10 * 1024,
2713            fields: HashSet::new(),
2714            prefix: String::new(),
2715            url_scheme: "file://".to_string(),
2716        })
2717    }
2718
2719    /// Set the minimum size threshold for externalization.
2720    ///
2721    /// Fields with values below this size will not be externalized.
2722    /// Default: 10KB (10240 bytes).
2723    pub fn size_threshold(mut self, bytes: usize) -> Self {
2724        self.size_threshold = bytes;
2725        self
2726    }
2727
2728    /// Specify which fields to check for externalization.
2729    ///
2730    /// If empty, all fields are checked. Fields not in this list are ignored.
2731    pub fn fields<I, S>(mut self, fields: I) -> Self
2732    where
2733        I: IntoIterator<Item = S>,
2734        S: Into<String>,
2735    {
2736        self.fields = fields.into_iter().map(Into::into).collect();
2737        self
2738    }
2739
2740    /// Set the object key prefix.
2741    ///
2742    /// Objects are stored at: `{prefix}/{table}/{field}/{id}.bin`
2743    pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
2744        self.prefix = prefix.into();
2745        self
2746    }
2747
2748    /// Set the URL scheme for references.
2749    ///
2750    /// Common values: "s3://", "gs://", "https://", "file://"
2751    pub fn url_scheme(mut self, scheme: impl Into<String>) -> Self {
2752        self.url_scheme = scheme.into();
2753        self
2754    }
2755
2756    /// Check if a field should be externalized.
2757    fn should_externalize(&self, field: &str, value: &Value) -> bool {
2758        // If fields list is specified, only check those fields
2759        if !self.fields.is_empty() && !self.fields.contains(field) {
2760            return false;
2761        }
2762
2763        // Check size
2764        let size = self.estimate_size(value);
2765        size >= self.size_threshold
2766    }
2767
2768    /// Estimate the byte size of a JSON value.
2769    fn estimate_size(&self, value: &Value) -> usize {
2770        match value {
2771            Value::String(s) => s.len(),
2772            Value::Object(_) | Value::Array(_) => {
2773                // Serialize to get accurate size
2774                serde_json::to_vec(value).map(|v| v.len()).unwrap_or(0)
2775            }
2776            Value::Number(_) => 8,
2777            Value::Bool(_) => 1,
2778            Value::Null => 0,
2779        }
2780    }
2781
2782    /// Decode base64 value if applicable.
2783    fn decode_blob(&self, value: &Value) -> Option<Vec<u8>> {
2784        match value {
2785            Value::String(s) => {
2786                // Try base64 decoding
2787                use base64::Engine;
2788                // Try base64 decoding first, fall back to raw string bytes
2789                base64::engine::general_purpose::STANDARD
2790                    .decode(s)
2791                    .ok()
2792                    .or_else(|| Some(s.as_bytes().to_vec()))
2793            }
2794            _ => {
2795                // For non-string values, serialize to JSON bytes
2796                serde_json::to_vec(value).ok()
2797            }
2798        }
2799    }
2800
2801    /// Generate object key for storing the blob.
2802    fn generate_key(&self, table: &str, field: &str) -> String {
2803        let timestamp = chrono::Utc::now().timestamp_millis();
2804        let uuid = uuid::Uuid::new_v4();
2805
2806        let mut key = String::new();
2807        if !self.prefix.is_empty() {
2808            key.push_str(&self.prefix);
2809            if !self.prefix.ends_with('/') {
2810                key.push('/');
2811            }
2812        }
2813        key.push_str(&format!("{}/{}/{}_{}.bin", table, field, timestamp, uuid));
2814        key
2815    }
2816
2817    /// Create a reference object for the externalized blob.
2818    fn create_reference(&self, url: &str, size: usize, sha256: &str, content_type: &str) -> Value {
2819        serde_json::json!({
2820            "__externalized": true,
2821            "url": url,
2822            "size": size,
2823            "content_type": content_type,
2824            "sha256": sha256
2825        })
2826    }
2827
2828    /// Upload blob and return reference.
2829    fn externalize_value(&self, table: &str, field: &str, value: &Value) -> Option<Value> {
2830        // Decode the blob data
2831        let blob_data = self.decode_blob(value)?;
2832        let size = blob_data.len();
2833
2834        // Calculate SHA-256 hash
2835        use sha2::{Digest, Sha256};
2836        let hash = Sha256::digest(&blob_data);
2837        let sha256 = hex::encode(hash);
2838
2839        // Generate storage key
2840        let key = self.generate_key(table, field);
2841
2842        // Detect content type
2843        let content_type = if value.is_string() {
2844            "application/octet-stream"
2845        } else {
2846            "application/json"
2847        };
2848
2849        // Upload using blocking - we're in an async runtime context
2850        let store = self.store.clone();
2851        let key_path = object_store::path::Path::from(key.clone());
2852        let payload = object_store::PutPayload::from(blob_data);
2853
2854        // Use tokio's Handle to run async code from sync context
2855        let upload_result = match tokio::runtime::Handle::try_current() {
2856            Ok(handle) => {
2857                // We're in an async runtime, use block_in_place
2858                tokio::task::block_in_place(|| {
2859                    handle.block_on(async { store.put(&key_path, payload).await })
2860                })
2861            }
2862            Err(_) => {
2863                // Not in async runtime - create a new one (for testing)
2864                let rt = match tokio::runtime::Runtime::new() {
2865                    Ok(rt) => rt,
2866                    Err(e) => {
2867                        warn!("Failed to create runtime for blob upload: {}", e);
2868                        return None;
2869                    }
2870                };
2871                rt.block_on(async { store.put(&key_path, payload).await })
2872            }
2873        };
2874
2875        match upload_result {
2876            Ok(_) => {
2877                // Generate URL
2878                let url = format!("{}{}/{}", self.url_scheme, self.bucket, key);
2879                Some(self.create_reference(&url, size, &sha256, content_type))
2880            }
2881            Err(e) => {
2882                warn!("Failed to externalize blob for {}.{}: {}", table, field, e);
2883                None
2884            }
2885        }
2886    }
2887
2888    /// Process a JSON object, externalizing large blobs.
2889    fn process_object(&self, table: &str, obj: &mut Map<String, Value>) {
2890        let fields_to_process: Vec<String> = obj.keys().cloned().collect();
2891
2892        for field in fields_to_process {
2893            if let Some(value) = obj.get(&field) {
2894                if self.should_externalize(&field, value) {
2895                    if let Some(reference) = self.externalize_value(table, &field, value) {
2896                        obj.insert(field, reference);
2897                    }
2898                }
2899            }
2900        }
2901    }
2902}
2903
2904#[cfg(feature = "cloud-storage")]
2905impl Smt for ExternalizeBlob {
2906    fn apply(&self, mut event: CdcEvent) -> Option<CdcEvent> {
2907        let table = event.table.clone();
2908
2909        // Process "after" record (for INSERT/UPDATE)
2910        if let Some(after) = &mut event.after {
2911            if let Some(obj) = after.as_object_mut() {
2912                self.process_object(&table, obj);
2913            }
2914        }
2915
2916        // Process "before" record (for UPDATE/DELETE) - optional but consistent
2917        if let Some(before) = &mut event.before {
2918            if let Some(obj) = before.as_object_mut() {
2919                self.process_object(&table, obj);
2920            }
2921        }
2922
2923        Some(event)
2924    }
2925
2926    fn name(&self) -> &'static str {
2927        "ExternalizeBlob"
2928    }
2929}
2930
2931// ============================================================================
2932// Tests
2933// ============================================================================
2934
2935#[cfg(test)]
2936mod tests {
2937    use super::*;
2938    use serde_json::json;
2939
2940    fn make_event(op: CdcOp, before: Option<Value>, after: Option<Value>) -> CdcEvent {
2941        CdcEvent {
2942            source_type: "postgres".to_string(),
2943            database: "testdb".to_string(),
2944            schema: "public".to_string(),
2945            table: "users".to_string(),
2946            op,
2947            before,
2948            after,
2949            timestamp: chrono::Utc::now().timestamp(),
2950            transaction: None,
2951        }
2952    }
2953
2954    // SMT Chain tests
2955    #[test]
2956    fn test_smt_chain_empty() {
2957        let chain = SmtChain::new();
2958        assert!(chain.is_empty());
2959        assert_eq!(chain.len(), 0);
2960    }
2961
2962    #[test]
2963    fn test_smt_chain_apply() {
2964        let event = make_event(
2965            CdcOp::Insert,
2966            None,
2967            Some(json!({"name": "Alice", "email": "alice@test.com"})),
2968        );
2969
2970        let chain = SmtChain::new()
2971            .add(MaskField::new(["email"]))
2972            .add(InsertField::new().static_field("version", json!(1)));
2973
2974        let result = chain.apply(event).unwrap();
2975        let after = result.after.unwrap();
2976
2977        assert_eq!(after["name"], "Alice");
2978        assert_ne!(after["email"], "alice@test.com"); // Masked
2979        assert_eq!(after["version"], 1);
2980    }
2981
2982    #[test]
2983    fn test_smt_chain_names() {
2984        let chain = SmtChain::new()
2985            .add(MaskField::new(["email"]))
2986            .add(Flatten::new());
2987
2988        let names = chain.names();
2989        assert_eq!(names, vec!["MaskField", "Flatten"]);
2990    }
2991
2992    // ExtractNewRecordState tests
2993    #[test]
2994    fn test_extract_new_record_state() {
2995        let event = make_event(
2996            CdcOp::Update,
2997            Some(json!({"id": 1, "name": "Old"})),
2998            Some(json!({"id": 1, "name": "New"})),
2999        );
3000
3001        let smt = ExtractNewRecordState::new()
3002            .add_op_field()
3003            .add_table_field();
3004
3005        let result = smt.apply(event).unwrap();
3006        let after = result.after.unwrap();
3007
3008        assert_eq!(after["__op"], "u");
3009        assert_eq!(after["__table"], "users");
3010    }
3011
3012    #[test]
3013    fn test_extract_drop_tombstones() {
3014        let event = make_event(CdcOp::Delete, Some(json!({"id": 1})), None);
3015
3016        let smt = ExtractNewRecordState::new().drop_tombstones();
3017        assert!(smt.apply(event).is_none());
3018    }
3019
3020    #[test]
3021    fn test_extract_rewrite_deletes() {
3022        let event = make_event(
3023            CdcOp::Delete,
3024            Some(json!({"id": 1, "name": "Deleted"})),
3025            None,
3026        );
3027
3028        let smt = ExtractNewRecordState::new().delete_handling(DeleteHandling::Rewrite);
3029
3030        let result = smt.apply(event).unwrap();
3031        let after = result.after.unwrap();
3032
3033        assert_eq!(after["__deleted"], true);
3034        assert_eq!(after["id"], 1);
3035    }
3036
3037    #[test]
3038    fn test_extract_custom_prefix() {
3039        let event = make_event(CdcOp::Insert, None, Some(json!({"id": 1})));
3040
3041        let smt = ExtractNewRecordState::new()
3042            .header_prefix("_cdc_")
3043            .add_op_field()
3044            .add_schema_field();
3045
3046        let result = smt.apply(event).unwrap();
3047        let after = result.after.unwrap();
3048
3049        assert_eq!(after["_cdc_op"], "c");
3050        assert_eq!(after["_cdc_schema"], "public");
3051    }
3052
3053    // ValueToKey tests
3054    #[test]
3055    fn test_value_to_key() {
3056        let event = make_event(
3057            CdcOp::Insert,
3058            None,
3059            Some(json!({"id": 42, "name": "Alice", "email": "alice@test.com"})),
3060        );
3061
3062        let smt = ValueToKey::with_fields(["id"]);
3063        let result = smt.apply(event).unwrap();
3064        let after = result.after.unwrap();
3065
3066        assert_eq!(after["__key"], json!({"id": 42}));
3067    }
3068
3069    #[test]
3070    fn test_value_to_key_multiple_fields() {
3071        let event = make_event(
3072            CdcOp::Insert,
3073            None,
3074            Some(json!({"org_id": 1, "user_id": 2, "name": "Alice"})),
3075        );
3076
3077        let smt = ValueToKey::with_fields(["org_id", "user_id"]);
3078        let result = smt.apply(event).unwrap();
3079        let after = result.after.unwrap();
3080
3081        assert_eq!(after["__key"], json!({"org_id": 1, "user_id": 2}));
3082    }
3083
3084    // TimestampConverter tests
3085    #[test]
3086    fn test_timestamp_converter_to_iso() {
3087        let event = make_event(
3088            CdcOp::Insert,
3089            None,
3090            Some(json!({"created_at": 1705320000000i64})), // 2024-01-15T10:00:00Z in millis
3091        );
3092
3093        let smt = TimestampConverter::new(["created_at"], TimestampFormat::Iso8601);
3094        let result = smt.apply(event).unwrap();
3095        let after = result.after.unwrap();
3096
3097        assert!(after["created_at"].as_str().unwrap().contains("2024-01-15"));
3098    }
3099
3100    #[test]
3101    fn test_timestamp_converter_to_epoch() {
3102        let event = make_event(
3103            CdcOp::Insert,
3104            None,
3105            Some(json!({"created_at": "2024-01-15T10:00:00Z"})),
3106        );
3107
3108        let smt = TimestampConverter::new(["created_at"], TimestampFormat::EpochMillis);
3109        let result = smt.apply(event).unwrap();
3110        let after = result.after.unwrap();
3111
3112        assert!(after["created_at"].is_number());
3113    }
3114
3115    #[test]
3116    fn test_timestamp_converter_date_only() {
3117        let event = make_event(
3118            CdcOp::Insert,
3119            None,
3120            Some(json!({"created_at": 1705320000000i64})),
3121        );
3122
3123        let smt = TimestampConverter::new(["created_at"], TimestampFormat::DateOnly);
3124        let result = smt.apply(event).unwrap();
3125        let after = result.after.unwrap();
3126
3127        assert_eq!(after["created_at"], "2024-01-15");
3128    }
3129
3130    // MaskField tests
3131    #[test]
3132    fn test_mask_field_asterisks() {
3133        let event = make_event(
3134            CdcOp::Insert,
3135            None,
3136            Some(json!({"name": "Alice", "ssn": "123-45-6789"})),
3137        );
3138
3139        let smt = MaskField::new(["ssn"]);
3140        let result = smt.apply(event).unwrap();
3141        let after = result.after.unwrap();
3142
3143        assert_eq!(after["name"], "Alice");
3144        assert_eq!(after["ssn"], "***********");
3145    }
3146
3147    #[test]
3148    fn test_mask_field_partial() {
3149        let event = make_event(
3150            CdcOp::Insert,
3151            None,
3152            Some(json!({"card": "4111111111111111"})),
3153        );
3154
3155        let smt = MaskField::new(["card"]).with_strategy(MaskStrategy::PartialMask {
3156            keep_first: 4,
3157            keep_last: 4,
3158        });
3159
3160        let result = smt.apply(event).unwrap();
3161        let after = result.after.unwrap();
3162
3163        assert_eq!(after["card"], "4111********1111");
3164    }
3165
3166    #[test]
3167    fn test_mask_field_hash() {
3168        let event = make_event(
3169            CdcOp::Insert,
3170            None,
3171            Some(json!({"email": "alice@test.com"})),
3172        );
3173
3174        let smt = MaskField::new(["email"]).with_strategy(MaskStrategy::Hash);
3175        let result = smt.apply(event).unwrap();
3176        let after = result.after.unwrap();
3177
3178        // Should be a hex string
3179        assert!(after["email"]
3180            .as_str()
3181            .unwrap()
3182            .chars()
3183            .all(|c| c.is_ascii_hexdigit()));
3184    }
3185
3186    #[test]
3187    fn test_mask_field_redact() {
3188        let event = make_event(
3189            CdcOp::Insert,
3190            None,
3191            Some(json!({"name": "Alice", "password": "secret"})),
3192        );
3193
3194        let smt = MaskField::new(["password"]).with_strategy(MaskStrategy::Redact);
3195        let result = smt.apply(event).unwrap();
3196        let after = result.after.unwrap();
3197
3198        assert!(!after.as_object().unwrap().contains_key("password"));
3199    }
3200
3201    #[test]
3202    fn test_mask_field_fixed() {
3203        let event = make_event(
3204            CdcOp::Insert,
3205            None,
3206            Some(json!({"email": "alice@test.com"})),
3207        );
3208
3209        let smt =
3210            MaskField::new(["email"]).with_strategy(MaskStrategy::Fixed("[REDACTED]".to_string()));
3211        let result = smt.apply(event).unwrap();
3212        let after = result.after.unwrap();
3213
3214        assert_eq!(after["email"], "[REDACTED]");
3215    }
3216
3217    // ReplaceField tests
3218    #[test]
3219    fn test_replace_field_rename() {
3220        let event = make_event(
3221            CdcOp::Insert,
3222            None,
3223            Some(json!({"first_name": "Alice", "last_name": "Smith"})),
3224        );
3225
3226        let smt = ReplaceField::new()
3227            .rename("first_name", "firstName")
3228            .rename("last_name", "lastName");
3229
3230        let result = smt.apply(event).unwrap();
3231        let after = result.after.unwrap();
3232
3233        assert_eq!(after["firstName"], "Alice");
3234        assert_eq!(after["lastName"], "Smith");
3235        assert!(!after.as_object().unwrap().contains_key("first_name"));
3236    }
3237
3238    #[test]
3239    fn test_replace_field_include() {
3240        let event = make_event(
3241            CdcOp::Insert,
3242            None,
3243            Some(json!({"id": 1, "name": "Alice", "internal_field": "secret"})),
3244        );
3245
3246        let smt = ReplaceField::new().include(["id", "name"]);
3247        let result = smt.apply(event).unwrap();
3248        let after = result.after.unwrap();
3249
3250        assert_eq!(after["id"], 1);
3251        assert_eq!(after["name"], "Alice");
3252        assert!(!after.as_object().unwrap().contains_key("internal_field"));
3253    }
3254
3255    #[test]
3256    fn test_replace_field_exclude() {
3257        let event = make_event(
3258            CdcOp::Insert,
3259            None,
3260            Some(json!({"id": 1, "name": "Alice", "password_hash": "xxx"})),
3261        );
3262
3263        let smt = ReplaceField::new().exclude(["password_hash"]);
3264        let result = smt.apply(event).unwrap();
3265        let after = result.after.unwrap();
3266
3267        assert!(after.as_object().unwrap().contains_key("id"));
3268        assert!(!after.as_object().unwrap().contains_key("password_hash"));
3269    }
3270
3271    // InsertField tests
3272    #[test]
3273    fn test_insert_field_static() {
3274        let event = make_event(CdcOp::Insert, None, Some(json!({"name": "Alice"})));
3275
3276        let smt = InsertField::new()
3277            .static_field("version", json!(1))
3278            .static_field("source", json!("cdc"));
3279
3280        let result = smt.apply(event).unwrap();
3281        let after = result.after.unwrap();
3282
3283        assert_eq!(after["version"], 1);
3284        assert_eq!(after["source"], "cdc");
3285    }
3286
3287    #[test]
3288    fn test_insert_field_timestamp() {
3289        let event = make_event(CdcOp::Insert, None, Some(json!({"name": "Alice"})));
3290
3291        let smt = InsertField::new().timestamp_field("processed_at");
3292        let result = smt.apply(event).unwrap();
3293        let after = result.after.unwrap();
3294
3295        assert!(after["processed_at"].as_str().is_some());
3296    }
3297
3298    #[test]
3299    fn test_insert_field_copy() {
3300        let event = make_event(
3301            CdcOp::Insert,
3302            None,
3303            Some(json!({"id": 42, "name": "Alice"})),
3304        );
3305
3306        let smt = InsertField::new().copy_field("original_id", "id");
3307        let result = smt.apply(event).unwrap();
3308        let after = result.after.unwrap();
3309
3310        assert_eq!(after["original_id"], 42);
3311    }
3312
3313    // Filter tests
3314    #[test]
3315    fn test_filter_keep_equals() {
3316        let event = make_event(
3317            CdcOp::Insert,
3318            None,
3319            Some(json!({"status": "active", "name": "Alice"})),
3320        );
3321
3322        let smt = Filter::keep(FilterCondition::Equals {
3323            field: "status".to_string(),
3324            value: json!("active"),
3325        });
3326
3327        assert!(smt.apply(event).is_some());
3328    }
3329
3330    #[test]
3331    fn test_filter_drop_equals() {
3332        let event = make_event(
3333            CdcOp::Insert,
3334            None,
3335            Some(json!({"status": "deleted", "name": "Alice"})),
3336        );
3337
3338        let smt = Filter::drop(FilterCondition::Equals {
3339            field: "status".to_string(),
3340            value: json!("deleted"),
3341        });
3342
3343        assert!(smt.apply(event).is_none());
3344    }
3345
3346    #[test]
3347    fn test_filter_regex() {
3348        let event = make_event(
3349            CdcOp::Insert,
3350            None,
3351            Some(json!({"email": "alice@example.com"})),
3352        );
3353
3354        let smt = Filter::keep(FilterCondition::Matches {
3355            field: "email".to_string(),
3356            pattern: r".*@example\.com$".to_string(),
3357        });
3358
3359        assert!(smt.apply(event).is_some());
3360    }
3361
3362    #[test]
3363    fn test_filter_and() {
3364        let event = make_event(
3365            CdcOp::Insert,
3366            None,
3367            Some(json!({"status": "active", "role": "admin"})),
3368        );
3369
3370        let smt = Filter::keep(FilterCondition::And(vec![
3371            FilterCondition::Equals {
3372                field: "status".to_string(),
3373                value: json!("active"),
3374            },
3375            FilterCondition::Equals {
3376                field: "role".to_string(),
3377                value: json!("admin"),
3378            },
3379        ]));
3380
3381        assert!(smt.apply(event).is_some());
3382    }
3383
3384    #[test]
3385    fn test_filter_or() {
3386        let event = make_event(
3387            CdcOp::Insert,
3388            None,
3389            Some(json!({"status": "deleted", "role": "admin"})),
3390        );
3391
3392        let smt = Filter::keep(FilterCondition::Or(vec![
3393            FilterCondition::Equals {
3394                field: "status".to_string(),
3395                value: json!("active"),
3396            },
3397            FilterCondition::Equals {
3398                field: "role".to_string(),
3399                value: json!("admin"),
3400            },
3401        ]));
3402
3403        assert!(smt.apply(event).is_some());
3404    }
3405
3406    #[test]
3407    fn test_filter_not() {
3408        let event = make_event(CdcOp::Insert, None, Some(json!({"status": "active"})));
3409
3410        let smt = Filter::keep(FilterCondition::Not(Box::new(FilterCondition::Equals {
3411            field: "status".to_string(),
3412            value: json!("deleted"),
3413        })));
3414
3415        assert!(smt.apply(event).is_some());
3416    }
3417
3418    #[test]
3419    fn test_filter_is_null() {
3420        let event = make_event(
3421            CdcOp::Insert,
3422            None,
3423            Some(json!({"name": "Alice", "email": null})),
3424        );
3425
3426        let smt = Filter::keep(FilterCondition::IsNull {
3427            field: "email".to_string(),
3428        });
3429
3430        assert!(smt.apply(event).is_some());
3431    }
3432
3433    #[test]
3434    fn test_filter_in() {
3435        let event = make_event(CdcOp::Insert, None, Some(json!({"status": "pending"})));
3436
3437        let smt = Filter::keep(FilterCondition::In {
3438            field: "status".to_string(),
3439            values: vec![json!("pending"), json!("active"), json!("approved")],
3440        });
3441
3442        assert!(smt.apply(event).is_some());
3443    }
3444
3445    // Cast tests
3446    #[test]
3447    fn test_cast_to_string() {
3448        let event = make_event(CdcOp::Insert, None, Some(json!({"id": 42, "active": true})));
3449
3450        let smt = Cast::new()
3451            .field("id", CastType::String)
3452            .field("active", CastType::String);
3453
3454        let result = smt.apply(event).unwrap();
3455        let after = result.after.unwrap();
3456
3457        assert_eq!(after["id"], "42");
3458        assert_eq!(after["active"], "true");
3459    }
3460
3461    #[test]
3462    fn test_cast_string_to_int() {
3463        let event = make_event(CdcOp::Insert, None, Some(json!({"count": "42"})));
3464
3465        let smt = Cast::new().field("count", CastType::Integer);
3466        let result = smt.apply(event).unwrap();
3467        let after = result.after.unwrap();
3468
3469        assert_eq!(after["count"], 42);
3470    }
3471
3472    #[test]
3473    fn test_cast_to_boolean() {
3474        let event = make_event(
3475            CdcOp::Insert,
3476            None,
3477            Some(json!({"flag1": "true", "flag2": 1, "flag3": "yes"})),
3478        );
3479
3480        let smt = Cast::new()
3481            .field("flag1", CastType::Boolean)
3482            .field("flag2", CastType::Boolean)
3483            .field("flag3", CastType::Boolean);
3484
3485        let result = smt.apply(event).unwrap();
3486        let after = result.after.unwrap();
3487
3488        assert_eq!(after["flag1"], true);
3489        assert_eq!(after["flag2"], true);
3490        assert_eq!(after["flag3"], true);
3491    }
3492
3493    #[test]
3494    fn test_cast_json_string() {
3495        let event = make_event(
3496            CdcOp::Insert,
3497            None,
3498            Some(json!({"config": "{\"key\":\"value\"}"})),
3499        );
3500
3501        let smt = Cast::new().field("config", CastType::Json);
3502        let result = smt.apply(event).unwrap();
3503        let after = result.after.unwrap();
3504
3505        assert_eq!(after["config"]["key"], "value");
3506    }
3507
3508    // Flatten tests
3509    #[test]
3510    fn test_flatten() {
3511        let event = make_event(
3512            CdcOp::Insert,
3513            None,
3514            Some(json!({
3515                "user": {
3516                    "name": "Alice",
3517                    "address": {
3518                        "city": "NYC",
3519                        "zip": "10001"
3520                    }
3521                }
3522            })),
3523        );
3524
3525        let smt = Flatten::new();
3526        let result = smt.apply(event).unwrap();
3527        let after = result.after.unwrap();
3528
3529        assert_eq!(after["user.name"], "Alice");
3530        assert_eq!(after["user.address.city"], "NYC");
3531    }
3532
3533    #[test]
3534    fn test_flatten_max_depth() {
3535        let event = make_event(
3536            CdcOp::Insert,
3537            None,
3538            Some(json!({
3539                "level1": {
3540                    "level2": {
3541                        "level3": "deep"
3542                    }
3543                }
3544            })),
3545        );
3546
3547        let smt = Flatten::new().max_depth(1);
3548        let result = smt.apply(event).unwrap();
3549        let after = result.after.unwrap();
3550
3551        // Should only flatten one level
3552        assert!(after.as_object().unwrap().contains_key("level1.level2"));
3553    }
3554
3555    #[test]
3556    fn test_flatten_custom_delimiter() {
3557        let event = make_event(
3558            CdcOp::Insert,
3559            None,
3560            Some(json!({
3561                "user": {
3562                    "name": "Alice"
3563                }
3564            })),
3565        );
3566
3567        let smt = Flatten::new().delimiter("_");
3568        let result = smt.apply(event).unwrap();
3569        let after = result.after.unwrap();
3570
3571        assert_eq!(after["user_name"], "Alice");
3572    }
3573
3574    // RegexRouter tests
3575    #[test]
3576    fn test_regex_router() {
3577        let event = make_event(CdcOp::Insert, None, Some(json!({"id": 1})));
3578
3579        let smt = RegexRouter::new("cdc.default")
3580            .route(r"^public\.users$", "cdc.users")
3581            .route(r"^public\.orders.*", "cdc.orders");
3582
3583        let result = smt.apply(event).unwrap();
3584        let after = result.after.unwrap();
3585        assert_eq!(after["__topic"], "cdc.users");
3586    }
3587
3588    #[test]
3589    fn test_regex_router_default() {
3590        let mut event = make_event(CdcOp::Insert, None, Some(json!({"id": 1})));
3591        event.table = "unknown_table".to_string();
3592
3593        let smt = RegexRouter::new("cdc.default").route(r"^public\.users$", "cdc.users");
3594
3595        let result = smt.apply(event).unwrap();
3596        let after = result.after.unwrap();
3597        assert_eq!(after["__topic"], "cdc.default");
3598    }
3599
3600    // Chain combination tests
3601    #[test]
3602    fn test_full_transform_chain() {
3603        let event = make_event(
3604            CdcOp::Update,
3605            Some(json!({"id": 1, "ssn": "123-45-6789", "name": "Old"})),
3606            Some(
3607                json!({"id": 1, "ssn": "123-45-6789", "name": "New", "created_at": 1705320000000i64}),
3608            ),
3609        );
3610
3611        let chain = SmtChain::new()
3612            .add(
3613                ExtractNewRecordState::new()
3614                    .add_op_field()
3615                    .add_table_field(),
3616            )
3617            .add(MaskField::new(["ssn"]))
3618            .add(TimestampConverter::new(
3619                ["created_at"],
3620                TimestampFormat::Iso8601,
3621            ))
3622            .add(InsertField::new().static_field("_version", json!(1)));
3623
3624        let result = chain.apply(event).unwrap();
3625        let after = result.after.unwrap();
3626
3627        assert_eq!(after["__op"], "u");
3628        assert_eq!(after["__table"], "users");
3629        assert_eq!(after["ssn"], "***********");
3630        assert!(after["created_at"].as_str().unwrap().contains("2024"));
3631        assert_eq!(after["_version"], 1);
3632    }
3633
3634    #[test]
3635    fn test_filter_drops_chain() {
3636        let event = make_event(CdcOp::Insert, None, Some(json!({"status": "deleted"})));
3637
3638        let chain = SmtChain::new()
3639            .add(Filter::drop(FilterCondition::Equals {
3640                field: "status".to_string(),
3641                value: json!("deleted"),
3642            }))
3643            .add(InsertField::new().static_field("processed", json!(true)));
3644
3645        // Should be dropped by filter, so InsertField never runs
3646        assert!(chain.apply(event).is_none());
3647    }
3648
3649    // Predicate tests
3650    #[test]
3651    fn test_predicate_table() {
3652        let event = make_event(CdcOp::Insert, None, Some(json!({"id": 1})));
3653
3654        let predicate = Predicate::table(r"^users$").unwrap();
3655        assert!(predicate.matches(&event));
3656
3657        let predicate = Predicate::table(r"^orders$").unwrap();
3658        assert!(!predicate.matches(&event));
3659    }
3660
3661    #[test]
3662    fn test_predicate_operation() {
3663        let event = make_event(CdcOp::Insert, None, Some(json!({"id": 1})));
3664
3665        let predicate = Predicate::operation(vec![CdcOp::Insert, CdcOp::Update]);
3666        assert!(predicate.matches(&event));
3667
3668        let predicate = Predicate::operation(vec![CdcOp::Delete]);
3669        assert!(!predicate.matches(&event));
3670    }
3671
3672    #[test]
3673    fn test_predicate_field_value() {
3674        let event = make_event(
3675            CdcOp::Insert,
3676            None,
3677            Some(json!({"status": "active", "id": 1})),
3678        );
3679
3680        let predicate = Predicate::field_equals("status", json!("active"));
3681        assert!(predicate.matches(&event));
3682
3683        let predicate = Predicate::field_equals("status", json!("deleted"));
3684        assert!(!predicate.matches(&event));
3685    }
3686
3687    #[test]
3688    fn test_predicate_and() {
3689        let event = make_event(CdcOp::Insert, None, Some(json!({"status": "active"})));
3690
3691        let predicate = Predicate::And(vec![
3692            Predicate::operation(vec![CdcOp::Insert]),
3693            Predicate::field_equals("status", json!("active")),
3694        ]);
3695        assert!(predicate.matches(&event));
3696    }
3697
3698    #[test]
3699    fn test_predicate_or() {
3700        let event = make_event(CdcOp::Delete, None, Some(json!({"id": 1})));
3701
3702        let predicate = Predicate::Or(vec![
3703            Predicate::operation(vec![CdcOp::Insert]),
3704            Predicate::operation(vec![CdcOp::Delete]),
3705        ]);
3706        assert!(predicate.matches(&event));
3707    }
3708
3709    #[test]
3710    fn test_predicate_not() {
3711        let event = make_event(CdcOp::Insert, None, Some(json!({"id": 1})));
3712
3713        let predicate = Predicate::Not(Box::new(Predicate::operation(vec![CdcOp::Delete])));
3714        assert!(predicate.matches(&event));
3715    }
3716
3717    #[test]
3718    fn test_conditional_smt_when() {
3719        let event = make_event(CdcOp::Insert, None, Some(json!({"name": "Alice"})));
3720
3721        // Only mask when inserting
3722        let smt = ConditionalSmt::when(
3723            Predicate::operation(vec![CdcOp::Insert]),
3724            InsertField::new().static_field("_inserted", json!(true)),
3725        );
3726
3727        let result = smt.apply(event).unwrap();
3728        let after = result.after.unwrap();
3729        assert_eq!(after["_inserted"], true);
3730    }
3731
3732    #[test]
3733    fn test_conditional_smt_unless() {
3734        let event = make_event(CdcOp::Delete, Some(json!({"id": 1})), None);
3735
3736        // Don't add field on deletes
3737        let smt = ConditionalSmt::unless(
3738            Predicate::operation(vec![CdcOp::Delete]),
3739            InsertField::new().static_field("_processed", json!(true)),
3740        );
3741
3742        let result = smt.apply(event).unwrap();
3743        // Delete event should pass through unchanged (no after to modify anyway)
3744        assert!(result.after.is_none());
3745    }
3746
3747    #[test]
3748    fn test_conditional_smt_table_predicate() {
3749        let event = make_event(CdcOp::Insert, None, Some(json!({"ssn": "123-45-6789"})));
3750
3751        // Only mask SSN for users table
3752        let smt = ConditionalSmt::when(
3753            Predicate::table(r"^users$").unwrap(),
3754            MaskField::new(["ssn"]),
3755        );
3756
3757        let result = smt.apply(event).unwrap();
3758        let after = result.after.unwrap();
3759        assert_eq!(after["ssn"], "***********");
3760    }
3761
3762    // HeaderToValue tests
3763    #[test]
3764    fn test_header_to_value() {
3765        let event = make_event(CdcOp::Insert, None, Some(json!({"id": 1})));
3766
3767        let smt = HeaderToValue::new()
3768            .field("_source", HeaderSource::SourceType)
3769            .field("_table", HeaderSource::Table)
3770            .field("_op", HeaderSource::Operation);
3771
3772        let result = smt.apply(event).unwrap();
3773        let after = result.after.unwrap();
3774
3775        assert_eq!(after["_source"], "postgres");
3776        assert_eq!(after["_table"], "users");
3777        assert_eq!(after["_op"], "c");
3778    }
3779
3780    #[test]
3781    fn test_header_to_value_all() {
3782        let event = make_event(CdcOp::Update, None, Some(json!({"id": 1})));
3783
3784        let smt = HeaderToValue::new().all_headers("__");
3785
3786        let result = smt.apply(event).unwrap();
3787        let after = result.after.unwrap();
3788
3789        assert_eq!(after["__source_type"], "postgres");
3790        assert_eq!(after["__database"], "testdb");
3791        assert_eq!(after["__schema"], "public");
3792        assert_eq!(after["__table"], "users");
3793        assert_eq!(after["__op"], "u");
3794        assert!(after["__ts"].is_number());
3795    }
3796
3797    // Unwrap tests
3798    #[test]
3799    fn test_unwrap_nested() {
3800        let event = make_event(
3801            CdcOp::Insert,
3802            None,
3803            Some(json!({
3804                "payload": {
3805                    "data": {
3806                        "name": "Alice",
3807                        "email": "alice@test.com"
3808                    }
3809                }
3810            })),
3811        );
3812
3813        let smt = Unwrap::new("payload.data");
3814        let result = smt.apply(event).unwrap();
3815        let after = result.after.unwrap();
3816
3817        assert_eq!(after["name"], "Alice");
3818        assert_eq!(after["email"], "alice@test.com");
3819        // Original payload should be gone (replaced)
3820        assert!(after.get("payload").is_none());
3821    }
3822
3823    #[test]
3824    fn test_unwrap_merge() {
3825        let event = make_event(
3826            CdcOp::Insert,
3827            None,
3828            Some(json!({
3829                "id": 1,
3830                "nested": {
3831                    "name": "Alice"
3832                }
3833            })),
3834        );
3835
3836        let smt = Unwrap::new("nested").merge();
3837        let result = smt.apply(event).unwrap();
3838        let after = result.after.unwrap();
3839
3840        assert_eq!(after["id"], 1);
3841        assert_eq!(after["name"], "Alice");
3842    }
3843
3844    // SetNull tests
3845    #[test]
3846    fn test_set_null_always() {
3847        let event = make_event(
3848            CdcOp::Insert,
3849            None,
3850            Some(json!({"name": "Alice", "password": "secret"})),
3851        );
3852
3853        let smt = SetNull::new(["password"]);
3854        let result = smt.apply(event).unwrap();
3855        let after = result.after.unwrap();
3856
3857        assert_eq!(after["name"], "Alice");
3858        assert!(after["password"].is_null());
3859    }
3860
3861    #[test]
3862    fn test_set_null_if_empty() {
3863        let event = make_event(
3864            CdcOp::Insert,
3865            None,
3866            Some(json!({"name": "Alice", "nickname": ""})),
3867        );
3868
3869        let smt = SetNull::new(["nickname"]).when(NullCondition::IfEmpty);
3870        let result = smt.apply(event).unwrap();
3871        let after = result.after.unwrap();
3872
3873        assert!(after["nickname"].is_null());
3874    }
3875
3876    #[test]
3877    fn test_set_null_if_equals() {
3878        let event = make_event(
3879            CdcOp::Insert,
3880            None,
3881            Some(json!({"status": "N/A", "name": "Alice"})),
3882        );
3883
3884        let smt = SetNull::new(["status"]).when(NullCondition::IfEquals(json!("N/A")));
3885        let result = smt.apply(event).unwrap();
3886        let after = result.after.unwrap();
3887
3888        assert!(after["status"].is_null());
3889        assert_eq!(after["name"], "Alice");
3890    }
3891
3892    #[test]
3893    fn test_set_null_if_matches() {
3894        let event = make_event(
3895            CdcOp::Insert,
3896            None,
3897            Some(json!({"phone": "000-000-0000", "name": "Alice"})),
3898        );
3899
3900        let smt = SetNull::new(["phone"]).when(NullCondition::IfMatches(r"^0+-0+-0+$".to_string()));
3901        let result = smt.apply(event).unwrap();
3902        let after = result.after.unwrap();
3903
3904        assert!(after["phone"].is_null());
3905    }
3906
3907    // Combined advanced tests
3908    #[test]
3909    fn test_advanced_chain_with_predicates() {
3910        let event = make_event(
3911            CdcOp::Insert,
3912            None,
3913            Some(json!({"name": "Alice", "ssn": "123-45-6789", "status": "active"})),
3914        );
3915
3916        let chain = SmtChain::new()
3917            // Only mask SSN for inserts
3918            .add(ConditionalSmt::when(
3919                Predicate::operation(vec![CdcOp::Insert]),
3920                MaskField::new(["ssn"]),
3921            ))
3922            // Add metadata
3923            .add(
3924                HeaderToValue::new()
3925                    .field("_table", HeaderSource::Table)
3926                    .field("_op", HeaderSource::Operation),
3927            )
3928            // Add version
3929            .add(InsertField::new().static_field("_version", json!(1)));
3930
3931        let result = chain.apply(event).unwrap();
3932        let after = result.after.unwrap();
3933
3934        assert_eq!(after["name"], "Alice");
3935        assert_eq!(after["ssn"], "***********"); // Masked
3936        assert_eq!(after["_table"], "users");
3937        assert_eq!(after["_op"], "c");
3938        assert_eq!(after["_version"], 1);
3939    }
3940
3941    // =========================================================================
3942    // TimezoneConverter tests
3943    // =========================================================================
3944
3945    #[test]
3946    fn test_timezone_converter_utc_to_est() {
3947        let event = make_event(
3948            CdcOp::Insert,
3949            None,
3950            Some(json!({"created_at": "2024-01-15T15:00:00Z"})),
3951        );
3952
3953        let smt = TimezoneConverter::new(["created_at"])
3954            .from("UTC")
3955            .to("America/New_York");
3956
3957        let result = smt.apply(event).unwrap();
3958        let after = result.after.unwrap();
3959
3960        // UTC 15:00 should be EST 10:00 (EST is UTC-5 in January)
3961        let ts = after["created_at"].as_str().unwrap();
3962        assert!(ts.contains("10:00:00"));
3963        assert!(ts.contains("-05:00") || ts.contains("New_York"));
3964    }
3965
3966    #[test]
3967    fn test_timezone_converter_date_only() {
3968        let event = make_event(
3969            CdcOp::Insert,
3970            None,
3971            Some(json!({"created_at": "2024-01-15T23:00:00Z"})),
3972        );
3973
3974        let smt = TimezoneConverter::new(["created_at"])
3975            .from("UTC")
3976            .to("America/Los_Angeles")
3977            .date_only();
3978
3979        let result = smt.apply(event).unwrap();
3980        let after = result.after.unwrap();
3981
3982        // UTC 23:00 is 15:00 PT (UTC-8), same day
3983        assert_eq!(after["created_at"], "2024-01-15");
3984    }
3985
3986    #[test]
3987    fn test_timezone_converter_custom_format() {
3988        let event = make_event(
3989            CdcOp::Insert,
3990            None,
3991            Some(json!({"created_at": "2024-01-15T12:30:00Z"})),
3992        );
3993
3994        let smt = TimezoneConverter::new(["created_at"])
3995            .from("UTC")
3996            .to("Europe/London")
3997            .format("%Y-%m-%d %H:%M");
3998
3999        let result = smt.apply(event).unwrap();
4000        let after = result.after.unwrap();
4001
4002        // London is UTC+0 in January
4003        assert_eq!(after["created_at"], "2024-01-15 12:30");
4004    }
4005
4006    #[test]
4007    fn test_timezone_converter_epoch_input() {
4008        let event = make_event(
4009            CdcOp::Insert,
4010            None,
4011            Some(json!({"created_at": 1705320000000i64})), // 2024-01-15T10:00:00Z
4012        );
4013
4014        let smt = TimezoneConverter::new(["created_at"])
4015            .from("UTC")
4016            .to("Asia/Tokyo"); // JST is UTC+9
4017
4018        let result = smt.apply(event).unwrap();
4019        let after = result.after.unwrap();
4020
4021        // UTC 10:00 -> JST 19:00
4022        let ts = after["created_at"].as_str().unwrap();
4023        // The timestamp should contain Tokyo timezone indicator and correct time
4024        assert!(ts.contains("2024-01-15") && ts.contains("+09:00"));
4025    }
4026
4027    #[test]
4028    fn test_timezone_converter_multiple_fields() {
4029        let event = make_event(
4030            CdcOp::Insert,
4031            None,
4032            Some(json!({
4033                "created_at": "2024-01-15T10:00:00Z",
4034                "updated_at": "2024-01-15T15:30:00Z"
4035            })),
4036        );
4037
4038        let smt = TimezoneConverter::new(["created_at", "updated_at"])
4039            .from("UTC")
4040            .to("Europe/Paris"); // CET is UTC+1 in January
4041
4042        let result = smt.apply(event).unwrap();
4043        let after = result.after.unwrap();
4044
4045        // UTC 10:00 -> CET 11:00
4046        assert!(after["created_at"].as_str().unwrap().contains("11:00:00"));
4047        // UTC 15:30 -> CET 16:30
4048        assert!(after["updated_at"].as_str().unwrap().contains("16:30:00"));
4049    }
4050
4051    // =========================================================================
4052    // ContentRouter tests
4053    // =========================================================================
4054
4055    #[test]
4056    fn test_content_router_exact_match() {
4057        let event = make_event(
4058            CdcOp::Insert,
4059            None,
4060            Some(json!({"priority": "high", "message": "urgent"})),
4061        );
4062
4063        let smt = ContentRouter::new()
4064            .route("priority", "high", "priority-events")
4065            .route("priority", "low", "batch-events")
4066            .default_topic("default-events");
4067
4068        let result = smt.apply(event).unwrap();
4069        let after = result.after.unwrap();
4070
4071        assert_eq!(after["__routing_topic"], "priority-events");
4072    }
4073
4074    #[test]
4075    fn test_content_router_pattern_match() {
4076        let event = make_event(
4077            CdcOp::Insert,
4078            None,
4079            Some(json!({"category": "urgent-alert-123", "data": "test"})),
4080        );
4081
4082        let smt = ContentRouter::new()
4083            .route_pattern("category", r"^urgent", "urgent-events")
4084            .route_pattern("category", r"^normal", "normal-events");
4085
4086        let result = smt.apply(event).unwrap();
4087        let after = result.after.unwrap();
4088
4089        assert_eq!(after["__routing_topic"], "urgent-events");
4090    }
4091
4092    #[test]
4093    fn test_content_router_in_set() {
4094        let event = make_event(
4095            CdcOp::Insert,
4096            None,
4097            Some(json!({"status": "active", "name": "Alice"})),
4098        );
4099
4100        let smt = ContentRouter::new()
4101            .route_in("status", ["active", "pending"], "active-events")
4102            .route_in("status", ["archived", "deleted"], "archive-events");
4103
4104        let result = smt.apply(event).unwrap();
4105        let after = result.after.unwrap();
4106
4107        assert_eq!(after["__routing_topic"], "active-events");
4108    }
4109
4110    #[test]
4111    fn test_content_router_predicate() {
4112        let event = make_event(
4113            CdcOp::Insert,
4114            None,
4115            Some(json!({"amount": 1500, "currency": "USD"})),
4116        );
4117
4118        let smt = ContentRouter::new()
4119            .route_if(
4120                "amount",
4121                |v| v.as_i64().map(|n| n > 1000).unwrap_or(false),
4122                "high-value",
4123            )
4124            .route_if(
4125                "amount",
4126                |v| v.as_i64().map(|n| n <= 1000).unwrap_or(false),
4127                "normal-value",
4128            );
4129
4130        let result = smt.apply(event).unwrap();
4131        let after = result.after.unwrap();
4132
4133        assert_eq!(after["__routing_topic"], "high-value");
4134    }
4135
4136    #[test]
4137    fn test_content_router_default_topic() {
4138        let event = make_event(
4139            CdcOp::Insert,
4140            None,
4141            Some(json!({"type": "unknown", "data": "test"})),
4142        );
4143
4144        let smt = ContentRouter::new()
4145            .route("type", "order", "order-events")
4146            .route("type", "user", "user-events")
4147            .default_topic("other-events");
4148
4149        let result = smt.apply(event).unwrap();
4150        let after = result.after.unwrap();
4151
4152        assert_eq!(after["__routing_topic"], "other-events");
4153    }
4154
4155    #[test]
4156    fn test_content_router_custom_field() {
4157        let event = make_event(
4158            CdcOp::Insert,
4159            None,
4160            Some(json!({"priority": "high", "message": "test"})),
4161        );
4162
4163        let smt = ContentRouter::new()
4164            .route("priority", "high", "priority-events")
4165            .topic_field("_target_topic");
4166
4167        let result = smt.apply(event).unwrap();
4168        let after = result.after.unwrap();
4169
4170        assert_eq!(after["_target_topic"], "priority-events");
4171        assert!(after.get("__routing_topic").is_none());
4172    }
4173
4174    // =========================================================================
4175    // ComputeField tests
4176    // =========================================================================
4177
4178    #[test]
4179    fn test_compute_field_concat() {
4180        let event = make_event(
4181            CdcOp::Insert,
4182            None,
4183            Some(json!({"first_name": "Alice", "last_name": "Smith"})),
4184        );
4185
4186        let smt = ComputeField::new().concat("full_name", ["$first_name", " ", "$last_name"]);
4187
4188        let result = smt.apply(event).unwrap();
4189        let after = result.after.unwrap();
4190
4191        assert_eq!(after["full_name"], "Alice Smith");
4192    }
4193
4194    #[test]
4195    fn test_compute_field_hash() {
4196        let event = make_event(
4197            CdcOp::Insert,
4198            None,
4199            Some(json!({"id": 123, "email": "alice@test.com"})),
4200        );
4201
4202        let smt = ComputeField::new().hash("user_hash", ["id", "email"]);
4203
4204        let result = smt.apply(event).unwrap();
4205        let after = result.after.unwrap();
4206
4207        // Should be a hex hash
4208        let hash = after["user_hash"].as_str().unwrap();
4209        assert_eq!(hash.len(), 64); // SHA-256 is 64 hex chars
4210        assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
4211    }
4212
4213    #[test]
4214    fn test_compute_field_coalesce() {
4215        let event = make_event(
4216            CdcOp::Insert,
4217            None,
4218            Some(json!({"nickname": null, "username": "alice123", "email": "alice@test.com"})),
4219        );
4220
4221        let smt = ComputeField::new().coalesce("display_name", ["nickname", "username", "email"]);
4222
4223        let result = smt.apply(event).unwrap();
4224        let after = result.after.unwrap();
4225
4226        assert_eq!(after["display_name"], "alice123");
4227    }
4228
4229    #[test]
4230    fn test_compute_field_sum() {
4231        let event = make_event(
4232            CdcOp::Insert,
4233            None,
4234            Some(json!({"price": 100.0, "tax": 10.0, "shipping": 5.0})),
4235        );
4236
4237        let smt = ComputeField::new().sum("total", ["price", "tax", "shipping"]);
4238
4239        let result = smt.apply(event).unwrap();
4240        let after = result.after.unwrap();
4241
4242        assert_eq!(after["total"].as_f64().unwrap(), 115.0);
4243    }
4244
4245    #[test]
4246    fn test_compute_field_length() {
4247        let event = make_event(CdcOp::Insert, None, Some(json!({"name": "Alice"})));
4248
4249        let smt = ComputeField::new().length("name_length", "name");
4250
4251        let result = smt.apply(event).unwrap();
4252        let after = result.after.unwrap();
4253
4254        assert_eq!(after["name_length"], 5);
4255    }
4256
4257    #[test]
4258    fn test_compute_field_upper_lower() {
4259        let event = make_event(CdcOp::Insert, None, Some(json!({"name": "Alice Smith"})));
4260
4261        let smt = ComputeField::new()
4262            .upper("name_upper", "name")
4263            .lower("name_lower", "name");
4264
4265        let result = smt.apply(event).unwrap();
4266        let after = result.after.unwrap();
4267
4268        assert_eq!(after["name_upper"], "ALICE SMITH");
4269        assert_eq!(after["name_lower"], "alice smith");
4270    }
4271
4272    #[test]
4273    fn test_compute_field_substring() {
4274        let event = make_event(
4275            CdcOp::Insert,
4276            None,
4277            Some(json!({"phone": "+1-555-123-4567"})),
4278        );
4279
4280        let smt = ComputeField::new().substring("area_code", "phone", 3, Some(3));
4281
4282        let result = smt.apply(event).unwrap();
4283        let after = result.after.unwrap();
4284
4285        assert_eq!(after["area_code"], "555");
4286    }
4287
4288    #[test]
4289    fn test_compute_field_replace() {
4290        let event = make_event(CdcOp::Insert, None, Some(json!({"phone": "555-123-4567"})));
4291
4292        let smt = ComputeField::new().replace("phone_clean", "phone", r"-", "");
4293
4294        let result = smt.apply(event).unwrap();
4295        let after = result.after.unwrap();
4296
4297        assert_eq!(after["phone_clean"], "5551234567");
4298    }
4299
4300    #[test]
4301    fn test_compute_field_current_timestamp() {
4302        let event = make_event(CdcOp::Insert, None, Some(json!({"name": "Alice"})));
4303
4304        let smt = ComputeField::new().current_timestamp("processed_at");
4305
4306        let result = smt.apply(event).unwrap();
4307        let after = result.after.unwrap();
4308
4309        // Should be ISO8601 timestamp
4310        assert!(after["processed_at"].as_str().unwrap().contains("20"));
4311    }
4312
4313    #[test]
4314    fn test_compute_field_uuid() {
4315        let event = make_event(CdcOp::Insert, None, Some(json!({"name": "Alice"})));
4316
4317        let smt = ComputeField::new().uuid("request_id");
4318
4319        let result = smt.apply(event).unwrap();
4320        let after = result.after.unwrap();
4321
4322        // Should be UUID format (8-4-4-4-12)
4323        let uuid = after["request_id"].as_str().unwrap();
4324        let parts: Vec<&str> = uuid.split('-').collect();
4325        assert_eq!(parts.len(), 5);
4326        assert_eq!(parts[0].len(), 8);
4327        assert_eq!(parts[1].len(), 4);
4328        assert_eq!(parts[2].len(), 4);
4329        assert_eq!(parts[3].len(), 4);
4330        assert_eq!(parts[4].len(), 12);
4331    }
4332
4333    #[test]
4334    fn test_compute_field_json_path() {
4335        let event = make_event(
4336            CdcOp::Insert,
4337            None,
4338            Some(json!({
4339                "metadata": {
4340                    "user": {
4341                        "name": "Alice"
4342                    }
4343                }
4344            })),
4345        );
4346
4347        let smt = ComputeField::new().json_path("user_name", "metadata", "user.name");
4348
4349        let result = smt.apply(event).unwrap();
4350        let after = result.after.unwrap();
4351
4352        assert_eq!(after["user_name"], "Alice");
4353    }
4354
4355    #[test]
4356    fn test_compute_field_conditional() {
4357        let event = make_event(
4358            CdcOp::Insert,
4359            None,
4360            Some(json!({"status": "active", "name": "Alice"})),
4361        );
4362
4363        let smt = ComputeField::new().conditional(
4364            "is_active",
4365            ComputeCondition::FieldEquals("status".to_string(), json!("active")),
4366            json!(true),
4367            json!(false),
4368        );
4369
4370        let result = smt.apply(event).unwrap();
4371        let after = result.after.unwrap();
4372
4373        assert_eq!(after["is_active"], true);
4374    }
4375
4376    #[test]
4377    fn test_compute_field_conditional_null_check() {
4378        let event = make_event(
4379            CdcOp::Insert,
4380            None,
4381            Some(json!({"name": "Alice", "deleted_at": null})),
4382        );
4383
4384        let smt = ComputeField::new().conditional(
4385            "status",
4386            ComputeCondition::FieldIsNull("deleted_at".to_string()),
4387            json!("active"),
4388            json!("deleted"),
4389        );
4390
4391        let result = smt.apply(event).unwrap();
4392        let after = result.after.unwrap();
4393
4394        assert_eq!(after["status"], "active");
4395    }
4396
4397    #[test]
4398    fn test_compute_field_conditional_pattern() {
4399        let event = make_event(
4400            CdcOp::Insert,
4401            None,
4402            Some(json!({"email": "admin@company.com", "name": "Admin"})),
4403        );
4404
4405        let smt = ComputeField::new().conditional(
4406            "is_admin",
4407            ComputeCondition::FieldMatches("email".to_string(), r"^admin@".to_string()),
4408            json!(true),
4409            json!(false),
4410        );
4411
4412        let result = smt.apply(event).unwrap();
4413        let after = result.after.unwrap();
4414
4415        assert_eq!(after["is_admin"], true);
4416    }
4417
4418    #[test]
4419    fn test_compute_field_chain() {
4420        let event = make_event(
4421            CdcOp::Insert,
4422            None,
4423            Some(json!({
4424                "first_name": "alice",
4425                "last_name": "smith",
4426                "amount": 100.0
4427            })),
4428        );
4429
4430        let smt = ComputeField::new()
4431            .upper("first_upper", "first_name")
4432            .upper("last_upper", "last_name")
4433            .concat("full_name", ["$first_upper", " ", "$last_upper"])
4434            .sum("total_with_tax", ["amount"]);
4435
4436        let result = smt.apply(event).unwrap();
4437        let after = result.after.unwrap();
4438
4439        assert_eq!(after["first_upper"], "ALICE");
4440        assert_eq!(after["last_upper"], "SMITH");
4441        // Concat uses computed values since we clone obj after each computation
4442        assert_eq!(after["full_name"], "ALICE SMITH");
4443    }
4444
4445    // =========================================================================
4446    // Integration tests - combining new SMTs
4447    // =========================================================================
4448
4449    #[test]
4450    fn test_full_pipeline_with_new_smts() {
4451        let event = make_event(
4452            CdcOp::Insert,
4453            None,
4454            Some(json!({
4455                "first_name": "Alice",
4456                "last_name": "Smith",
4457                "email": "alice@test.com",
4458                "priority": "high",
4459                "created_at": "2024-01-15T10:00:00Z"
4460            })),
4461        );
4462
4463        let chain = SmtChain::new()
4464            // Compute full name
4465            .add(
4466                ComputeField::new()
4467                    .concat("full_name", ["$first_name", " ", "$last_name"])
4468                    .hash("email_hash", ["email"])
4469                    .uuid("event_id"),
4470            )
4471            // Mask email
4472            .add(MaskField::new(["email"]))
4473            // Route based on priority
4474            .add(
4475                ContentRouter::new()
4476                    .route("priority", "high", "priority-events")
4477                    .default_topic("normal-events"),
4478            )
4479            // Convert timezone
4480            .add(
4481                TimezoneConverter::new(["created_at"])
4482                    .from("UTC")
4483                    .to("America/New_York"),
4484            );
4485
4486        let result = chain.apply(event).unwrap();
4487        let after = result.after.unwrap();
4488
4489        // Check all transforms applied
4490        assert_eq!(after["full_name"], "Alice Smith");
4491        assert_eq!(after["email_hash"].as_str().unwrap().len(), 64);
4492        assert!(after["event_id"].as_str().unwrap().contains("-"));
4493        assert_ne!(after["email"], "alice@test.com"); // Masked
4494        assert_eq!(after["__routing_topic"], "priority-events");
4495        // Timezone should be converted (EST is UTC-5)
4496        let ts = after["created_at"].as_str().unwrap();
4497        assert!(ts.contains("05:00:00") || ts.contains("-05:00"));
4498    }
4499}
4500
4501// ============================================================================
4502// ExternalizeBlob Tests (feature-gated)
4503// ============================================================================
4504
4505#[cfg(all(test, feature = "cloud-storage"))]
4506mod externalize_blob_tests {
4507    use super::*;
4508    use serde_json::json;
4509    use tempfile::TempDir;
4510
4511    fn make_event(op: CdcOp, before: Option<Value>, after: Option<Value>) -> CdcEvent {
4512        CdcEvent {
4513            source_type: "postgres".to_string(),
4514            database: "testdb".to_string(),
4515            schema: "public".to_string(),
4516            table: "users".to_string(),
4517            op,
4518            before,
4519            after,
4520            timestamp: chrono::Utc::now().timestamp(),
4521            transaction: None,
4522        }
4523    }
4524
4525    #[test]
4526    fn test_externalize_small_value_unchanged() {
4527        let temp_dir = TempDir::new().unwrap();
4528        let smt = ExternalizeBlob::local(temp_dir.path())
4529            .unwrap()
4530            .size_threshold(1000); // 1KB threshold
4531
4532        let event = make_event(
4533            CdcOp::Insert,
4534            None,
4535            Some(json!({
4536                "id": 1,
4537                "name": "Alice",
4538                "small_data": "short string"
4539            })),
4540        );
4541
4542        let result = smt.apply(event).unwrap();
4543        let after = result.after.unwrap();
4544
4545        // Small value should not be externalized
4546        assert_eq!(after["small_data"], "short string");
4547        assert!(after["small_data"].get("__externalized").is_none());
4548    }
4549
4550    #[test]
4551    fn test_externalize_large_value() {
4552        let temp_dir = TempDir::new().unwrap();
4553        let smt = ExternalizeBlob::local(temp_dir.path())
4554            .unwrap()
4555            .size_threshold(50); // Low threshold for testing
4556
4557        // Create a large base64-encoded value
4558        let large_data = "A".repeat(100);
4559
4560        let event = make_event(
4561            CdcOp::Insert,
4562            None,
4563            Some(json!({
4564                "id": 1,
4565                "blob_data": large_data
4566            })),
4567        );
4568
4569        let result = smt.apply(event).unwrap();
4570        let after = result.after.unwrap();
4571
4572        // Large value should be externalized
4573        let blob_ref = after.get("blob_data").unwrap();
4574        assert_eq!(blob_ref["__externalized"], true);
4575        assert!(blob_ref["url"].as_str().unwrap().starts_with("file://"));
4576        assert!(blob_ref["size"].as_u64().unwrap() > 0);
4577        assert!(blob_ref["sha256"].as_str().unwrap().len() == 64);
4578        assert_eq!(blob_ref["content_type"], "application/octet-stream");
4579    }
4580
4581    #[test]
4582    fn test_externalize_specific_fields_only() {
4583        let temp_dir = TempDir::new().unwrap();
4584        let smt = ExternalizeBlob::local(temp_dir.path())
4585            .unwrap()
4586            .size_threshold(10)
4587            .fields(["image_data"]); // Only externalize image_data
4588
4589        let event = make_event(
4590            CdcOp::Insert,
4591            None,
4592            Some(json!({
4593                "id": 1,
4594                "name": "very long name that exceeds threshold",
4595                "image_data": "large image data here..."
4596            })),
4597        );
4598
4599        let result = smt.apply(event).unwrap();
4600        let after = result.after.unwrap();
4601
4602        // name should not be externalized (not in fields list)
4603        assert_eq!(after["name"], "very long name that exceeds threshold");
4604
4605        // image_data should be externalized
4606        let image_ref = after.get("image_data").unwrap();
4607        assert_eq!(image_ref["__externalized"], true);
4608    }
4609
4610    #[test]
4611    fn test_externalize_with_prefix() {
4612        let temp_dir = TempDir::new().unwrap();
4613        let smt = ExternalizeBlob::local(temp_dir.path())
4614            .unwrap()
4615            .size_threshold(10)
4616            .prefix("cdc-blobs/production");
4617
4618        let large_data = "B".repeat(100);
4619        let event = make_event(
4620            CdcOp::Insert,
4621            None,
4622            Some(json!({
4623                "document": large_data
4624            })),
4625        );
4626
4627        let result = smt.apply(event).unwrap();
4628        let after = result.after.unwrap();
4629
4630        let doc_ref = after.get("document").unwrap();
4631        let url = doc_ref["url"].as_str().unwrap();
4632        assert!(url.contains("cdc-blobs/production"));
4633        assert!(url.contains("users")); // table name
4634        assert!(url.contains("document")); // field name
4635    }
4636
4637    #[test]
4638    fn test_externalize_base64_data() {
4639        use base64::Engine;
4640
4641        let temp_dir = TempDir::new().unwrap();
4642        let smt = ExternalizeBlob::local(temp_dir.path())
4643            .unwrap()
4644            .size_threshold(10);
4645
4646        // Valid base64 data
4647        let raw_bytes: Vec<u8> = (0..100).collect();
4648        let base64_data = base64::engine::general_purpose::STANDARD.encode(&raw_bytes);
4649
4650        let event = make_event(
4651            CdcOp::Insert,
4652            None,
4653            Some(json!({
4654                "binary_field": base64_data
4655            })),
4656        );
4657
4658        let result = smt.apply(event).unwrap();
4659        let after = result.after.unwrap();
4660
4661        let blob_ref = after.get("binary_field").unwrap();
4662        assert_eq!(blob_ref["__externalized"], true);
4663        // Size should reflect decoded bytes
4664        assert_eq!(blob_ref["size"].as_u64().unwrap(), 100);
4665    }
4666
4667    #[test]
4668    fn test_externalize_json_object() {
4669        let temp_dir = TempDir::new().unwrap();
4670        let smt = ExternalizeBlob::local(temp_dir.path())
4671            .unwrap()
4672            .size_threshold(20);
4673
4674        // Large JSON object
4675        let large_obj = json!({
4676            "nested": {
4677                "data": "large nested object with lots of data here",
4678                "more": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
4679            }
4680        });
4681
4682        let event = make_event(
4683            CdcOp::Insert,
4684            None,
4685            Some(json!({
4686                "id": 1,
4687                "metadata": large_obj
4688            })),
4689        );
4690
4691        let result = smt.apply(event).unwrap();
4692        let after = result.after.unwrap();
4693
4694        let meta_ref = after.get("metadata").unwrap();
4695        assert_eq!(meta_ref["__externalized"], true);
4696        assert_eq!(meta_ref["content_type"], "application/json");
4697    }
4698
4699    #[test]
4700    fn test_externalize_update_event_both_before_after() {
4701        let temp_dir = TempDir::new().unwrap();
4702        let smt = ExternalizeBlob::local(temp_dir.path())
4703            .unwrap()
4704            .size_threshold(20);
4705
4706        let old_data = "X".repeat(50);
4707        let new_data = "Y".repeat(50);
4708
4709        let event = make_event(
4710            CdcOp::Update,
4711            Some(json!({ "data": old_data })),
4712            Some(json!({ "data": new_data })),
4713        );
4714
4715        let result = smt.apply(event).unwrap();
4716
4717        // Both before and after should be externalized
4718        let before = result.before.unwrap();
4719        let after = result.after.unwrap();
4720
4721        assert_eq!(before["data"]["__externalized"], true);
4722        assert_eq!(after["data"]["__externalized"], true);
4723
4724        // URLs should be different
4725        let before_url = before["data"]["url"].as_str().unwrap();
4726        let after_url = after["data"]["url"].as_str().unwrap();
4727        assert_ne!(before_url, after_url);
4728    }
4729
4730    #[test]
4731    fn test_externalize_preserves_other_fields() {
4732        let temp_dir = TempDir::new().unwrap();
4733        let smt = ExternalizeBlob::local(temp_dir.path())
4734            .unwrap()
4735            .size_threshold(20);
4736
4737        let large_data = "Z".repeat(100);
4738        let event = make_event(
4739            CdcOp::Insert,
4740            None,
4741            Some(json!({
4742                "id": 123,
4743                "name": "Alice",
4744                "active": true,
4745                "score": 95.5,
4746                "large_blob": large_data
4747            })),
4748        );
4749
4750        let result = smt.apply(event).unwrap();
4751        let after = result.after.unwrap();
4752
4753        // Other fields unchanged
4754        assert_eq!(after["id"], 123);
4755        assert_eq!(after["name"], "Alice");
4756        assert_eq!(after["active"], true);
4757        assert_eq!(after["score"], 95.5);
4758
4759        // Large blob externalized
4760        assert_eq!(after["large_blob"]["__externalized"], true);
4761    }
4762
4763    #[test]
4764    fn test_externalize_smt_name() {
4765        let temp_dir = TempDir::new().unwrap();
4766        let smt = ExternalizeBlob::local(temp_dir.path()).unwrap();
4767        assert_eq!(smt.name(), "ExternalizeBlob");
4768    }
4769
4770    #[test]
4771    fn test_externalize_chain_with_other_smts() {
4772        let temp_dir = TempDir::new().unwrap();
4773
4774        let chain = SmtChain::new()
4775            // First mask sensitive data
4776            .add(MaskField::new(["ssn"]))
4777            // Then externalize large blobs
4778            .add(
4779                ExternalizeBlob::local(temp_dir.path())
4780                    .unwrap()
4781                    .size_threshold(20),
4782            )
4783            // Add computed field
4784            .add(ComputeField::new().uuid("event_id"));
4785
4786        let large_data = "A".repeat(50);
4787        let event = make_event(
4788            CdcOp::Insert,
4789            None,
4790            Some(json!({
4791                "id": 1,
4792                "ssn": "123-45-6789",
4793                "document": large_data
4794            })),
4795        );
4796
4797        let result = chain.apply(event).unwrap();
4798        let after = result.after.unwrap();
4799
4800        // SSN should be masked
4801        assert_ne!(after["ssn"], "123-45-6789");
4802        // Document should be externalized
4803        assert_eq!(after["document"]["__externalized"], true);
4804        // Event ID should be added
4805        assert!(after["event_id"].as_str().unwrap().contains("-"));
4806    }
4807
4808    #[test]
4809    fn test_externalize_file_written_to_disk() {
4810        use base64::Engine;
4811
4812        let temp_dir = TempDir::new().unwrap();
4813        let smt = ExternalizeBlob::local(temp_dir.path())
4814            .unwrap()
4815            .size_threshold(10);
4816
4817        let raw_bytes: Vec<u8> = vec![
4818            0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x21,
4819        ]; // "Hello World!"
4820        let base64_data = base64::engine::general_purpose::STANDARD.encode(&raw_bytes);
4821
4822        let event = make_event(
4823            CdcOp::Insert,
4824            None,
4825            Some(json!({
4826                "greeting": base64_data
4827            })),
4828        );
4829
4830        let result = smt.apply(event).unwrap();
4831        let after = result.after.unwrap();
4832
4833        let blob_ref = after.get("greeting").unwrap();
4834        let url = blob_ref["url"].as_str().unwrap();
4835
4836        // Extract path after file:// and bucket
4837        let file_path = url.strip_prefix("file://").unwrap();
4838        // The path should exist
4839        assert!(std::path::Path::new(file_path).exists());
4840
4841        // Verify content
4842        let content = std::fs::read(file_path).unwrap();
4843        assert_eq!(content, raw_bytes);
4844    }
4845}