Skip to main content

faucet_stream/
transform.rs

1//! Record transformation pipeline.
2//!
3//! ## Built-in transforms (optional Cargo features)
4//!
5//! | Variant | Feature flag | Default |
6//! |---------|-------------|---------|
7//! | [`RecordTransform::Flatten`] | `transform-flatten` | enabled |
8//! | [`RecordTransform::RenameKeys`] | `transform-rename-keys` | enabled |
9//! | [`RecordTransform::KeysToSnakeCase`] | `transform-snake-case` | enabled |
10//!
11//! Disable a transform (and its dependencies) by opting out of its feature:
12//!
13//! ```toml
14//! [dependencies]
15//! faucet-stream = { version = "*", default-features = false,
16//!                   features = ["transform-flatten"] }
17//! ```
18//!
19//! ## Custom transforms
20//!
21//! [`RecordTransform::Custom`] is always available regardless of features.
22//! Pass any closure or function pointer via [`RecordTransform::custom`].
23
24use crate::error::FaucetError;
25#[cfg(any(
26    feature = "transform-flatten",
27    feature = "transform-rename-keys",
28    feature = "transform-snake-case"
29))]
30use serde_json::Map;
31use serde_json::Value;
32use std::fmt;
33use std::sync::Arc;
34
35#[cfg(any(feature = "transform-rename-keys", feature = "transform-snake-case"))]
36use regex::Regex;
37
38#[cfg(feature = "transform-snake-case")]
39use std::sync::LazyLock;
40
41// ── Public config-facing type ─────────────────────────────────────────────────
42
43/// A transformation applied to every record fetched by a [`crate::stream::RestStream`].
44///
45/// Transforms are applied in the order they are added via
46/// [`crate::config::RestStreamConfig::add_transform`].
47///
48/// The three built-in variants are each guarded by a Cargo feature flag
49/// (all enabled by default — see module-level docs).
50/// [`RecordTransform::Custom`] is always available and accepts any closure.
51pub enum RecordTransform {
52    /// Flatten nested JSON objects into a single-level map.
53    ///
54    /// Nested key paths are joined with `separator`.  Arrays are left as-is.
55    ///
56    /// _Requires feature `transform-flatten` (default)._
57    ///
58    /// # Example
59    ///
60    /// ```text
61    /// {"user": {"id": 1, "addr": {"city": "NYC"}}}  →  (separator = "__")
62    /// {"user__id": 1, "user__addr__city": "NYC"}
63    /// ```
64    #[cfg(feature = "transform-flatten")]
65    Flatten { separator: String },
66
67    /// Apply a single regex substitution to every key in the record.
68    ///
69    /// Keys in nested objects and objects inside arrays are also renamed
70    /// recursively.  `pattern` is a Rust regex; `replacement` may reference
71    /// capture groups with `$1`, `${name}`, etc.  Chain multiple `RenameKeys`
72    /// transforms for multi-step pipelines.
73    ///
74    /// _Requires feature `transform-rename-keys` (default)._
75    ///
76    /// # Example
77    ///
78    /// ```text
79    /// pattern = r"^_sdc_", replacement = ""   →   strip "_sdc_" prefix
80    /// ```
81    #[cfg(feature = "transform-rename-keys")]
82    RenameKeys {
83        pattern: String,
84        replacement: String,
85    },
86
87    /// Convert all keys to `snake_case` using the same algorithm as Meltano's
88    /// default key normaliser:
89    ///
90    /// 1. Strip characters that are neither alphanumeric nor whitespace.
91    /// 2. Trim edges, then replace whitespace runs with `_`.
92    /// 3. Collapse consecutive underscores.
93    /// 4. Lowercase and trim leading/trailing underscores.
94    ///
95    /// _Requires feature `transform-snake-case` (default)._
96    ///
97    /// | Input key      | Output key     |
98    /// |----------------|----------------|
99    /// | `"First Name"` | `"first_name"` |
100    /// | `"last-name"`  | `"lastname"`   |
101    /// | `"price ($)"`  | `"price"`      |
102    /// | `"ID"`         | `"id"`         |
103    #[cfg(feature = "transform-snake-case")]
104    KeysToSnakeCase,
105
106    /// A user-supplied transformation function.
107    ///
108    /// The function receives each record as a [`Value`] and returns the
109    /// (possibly modified) record.  Construct one with [`RecordTransform::custom`].
110    ///
111    /// Always available — not guarded by any feature flag.
112    Custom(Arc<dyn Fn(Value) -> Value + Send + Sync>),
113}
114
115impl fmt::Debug for RecordTransform {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        match self {
118            #[cfg(feature = "transform-flatten")]
119            Self::Flatten { separator } => f
120                .debug_struct("Flatten")
121                .field("separator", separator)
122                .finish(),
123            #[cfg(feature = "transform-rename-keys")]
124            Self::RenameKeys {
125                pattern,
126                replacement,
127            } => f
128                .debug_struct("RenameKeys")
129                .field("pattern", pattern)
130                .field("replacement", replacement)
131                .finish(),
132            #[cfg(feature = "transform-snake-case")]
133            Self::KeysToSnakeCase => write!(f, "KeysToSnakeCase"),
134            Self::Custom(_) => write!(f, "Custom(<fn>)"),
135        }
136    }
137}
138
139// Arc<dyn Fn> is Clone (bumps refcount) but #[derive(Clone)] can't see that,
140// so we implement Clone manually.
141impl Clone for RecordTransform {
142    fn clone(&self) -> Self {
143        match self {
144            #[cfg(feature = "transform-flatten")]
145            Self::Flatten { separator } => Self::Flatten {
146                separator: separator.clone(),
147            },
148            #[cfg(feature = "transform-rename-keys")]
149            Self::RenameKeys {
150                pattern,
151                replacement,
152            } => Self::RenameKeys {
153                pattern: pattern.clone(),
154                replacement: replacement.clone(),
155            },
156            #[cfg(feature = "transform-snake-case")]
157            Self::KeysToSnakeCase => Self::KeysToSnakeCase,
158            Self::Custom(f) => Self::Custom(Arc::clone(f)),
159        }
160    }
161}
162
163impl RecordTransform {
164    /// Create a custom transform from any function or closure.
165    ///
166    /// The closure receives each record as a [`Value`] and must return a
167    /// [`Value`] (the transformed record).  It is called once per record and
168    /// may perform any manipulation — adding fields, removing fields, renaming,
169    /// type coercion, etc.
170    ///
171    /// Custom transforms are always available regardless of feature flags.
172    ///
173    /// # Example
174    ///
175    /// ```rust
176    /// use faucet_stream::RecordTransform;
177    /// use serde_json::{Value, json};
178    ///
179    /// // Inject a constant "source" field into every record.
180    /// let stamp = RecordTransform::custom(|mut record| {
181    ///     if let Value::Object(ref mut map) = record {
182    ///         map.insert("_source".to_string(), json!("my-api"));
183    ///     }
184    ///     record
185    /// });
186    /// ```
187    pub fn custom<F>(f: F) -> Self
188    where
189        F: Fn(Value) -> Value + Send + Sync + 'static,
190    {
191        Self::Custom(Arc::new(f))
192    }
193}
194
195// ── Internal compiled representation ─────────────────────────────────────────
196
197/// Pre-compiled form of a [`RecordTransform`].
198///
199/// Stored inside [`crate::stream::RestStream`] so that regex patterns are
200/// compiled exactly once (at [`crate::stream::RestStream::new`] time) rather
201/// than once per record.
202pub(crate) enum CompiledTransform {
203    #[cfg(feature = "transform-flatten")]
204    Flatten {
205        separator: String,
206    },
207    #[cfg(feature = "transform-rename-keys")]
208    RenameKeys {
209        re: Regex,
210        replacement: String,
211    },
212    #[cfg(feature = "transform-snake-case")]
213    KeysToSnakeCase,
214    Custom(Arc<dyn Fn(Value) -> Value + Send + Sync>),
215}
216
217/// Compile a [`RecordTransform`] into its [`CompiledTransform`] form.
218///
219/// Returns [`FaucetError::Transform`] if a regex pattern is invalid.
220pub(crate) fn compile(t: &RecordTransform) -> Result<CompiledTransform, FaucetError> {
221    match t {
222        #[cfg(feature = "transform-flatten")]
223        RecordTransform::Flatten { separator } => Ok(CompiledTransform::Flatten {
224            separator: separator.clone(),
225        }),
226        #[cfg(feature = "transform-rename-keys")]
227        RecordTransform::RenameKeys {
228            pattern,
229            replacement,
230        } => {
231            let re = Regex::new(pattern)
232                .map_err(|e| FaucetError::Transform(format!("invalid regex '{pattern}': {e}")))?;
233            Ok(CompiledTransform::RenameKeys {
234                re,
235                replacement: replacement.clone(),
236            })
237        }
238        #[cfg(feature = "transform-snake-case")]
239        RecordTransform::KeysToSnakeCase => Ok(CompiledTransform::KeysToSnakeCase),
240        RecordTransform::Custom(f) => Ok(CompiledTransform::Custom(Arc::clone(f))),
241    }
242}
243
244/// Apply a slice of pre-compiled transforms to a record, in order.
245pub(crate) fn apply_all(record: Value, transforms: &[CompiledTransform]) -> Value {
246    transforms.iter().fold(record, apply_one)
247}
248
249fn apply_one(value: Value, t: &CompiledTransform) -> Value {
250    match t {
251        #[cfg(feature = "transform-flatten")]
252        CompiledTransform::Flatten { separator } => flatten(value, separator),
253        #[cfg(feature = "transform-rename-keys")]
254        CompiledTransform::RenameKeys { re, replacement } => rename_keys(value, re, replacement),
255        #[cfg(feature = "transform-snake-case")]
256        CompiledTransform::KeysToSnakeCase => keys_to_snake_case(value),
257        CompiledTransform::Custom(f) => f(value),
258    }
259}
260
261// ── Flatten ───────────────────────────────────────────────────────────────────
262
263#[cfg(feature = "transform-flatten")]
264fn flatten(value: Value, separator: &str) -> Value {
265    match value {
266        Value::Object(_) => {
267            let mut out = Map::new();
268            flatten_into(value, "", separator, &mut out);
269            Value::Object(out)
270        }
271        other => other,
272    }
273}
274
275#[cfg(feature = "transform-flatten")]
276fn flatten_into(value: Value, prefix: &str, separator: &str, out: &mut Map<String, Value>) {
277    match value {
278        Value::Object(map) => {
279            for (k, v) in map {
280                let key = if prefix.is_empty() {
281                    k
282                } else {
283                    format!("{prefix}{separator}{k}")
284                };
285                flatten_into(v, &key, separator, out);
286            }
287        }
288        other => {
289            out.insert(prefix.to_string(), other);
290        }
291    }
292}
293
294// ── Rename keys ───────────────────────────────────────────────────────────────
295
296#[cfg(feature = "transform-rename-keys")]
297fn rename_keys(value: Value, re: &Regex, replacement: &str) -> Value {
298    match value {
299        Value::Object(map) => {
300            let new_map: Map<String, Value> = map
301                .into_iter()
302                .map(|(k, v)| {
303                    let new_k = re.replace_all(&k, replacement).into_owned();
304                    (new_k, rename_keys(v, re, replacement))
305                })
306                .collect();
307            Value::Object(new_map)
308        }
309        Value::Array(arr) => Value::Array(
310            arr.into_iter()
311                .map(|v| rename_keys(v, re, replacement))
312                .collect(),
313        ),
314        other => other,
315    }
316}
317
318// ── Keys to snake_case ────────────────────────────────────────────────────────
319
320#[cfg(feature = "transform-snake-case")]
321static RE_SPECIAL: LazyLock<Regex> =
322    LazyLock::new(|| Regex::new(r"[^a-zA-Z0-9\s]").expect("static regex"));
323
324#[cfg(feature = "transform-snake-case")]
325static RE_WHITESPACE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\s+").expect("static regex"));
326
327#[cfg(feature = "transform-snake-case")]
328static RE_MULTI_UNDERSCORE: LazyLock<Regex> =
329    LazyLock::new(|| Regex::new(r"_+").expect("static regex"));
330
331/// Convert a single key string to snake_case (mirrors Meltano's algorithm).
332#[cfg(feature = "transform-snake-case")]
333pub(crate) fn to_snake_case(key: &str) -> String {
334    let s = RE_SPECIAL.replace_all(key, "");
335    let s = RE_WHITESPACE.replace_all(s.trim(), "_");
336    let s = RE_MULTI_UNDERSCORE.replace_all(&s, "_");
337    s.to_lowercase().trim_matches('_').to_string()
338}
339
340#[cfg(feature = "transform-snake-case")]
341fn keys_to_snake_case(value: Value) -> Value {
342    match value {
343        Value::Object(map) => {
344            let new_map: Map<String, Value> = map
345                .into_iter()
346                .map(|(k, v)| (to_snake_case(&k), keys_to_snake_case(v)))
347                .collect();
348            Value::Object(new_map)
349        }
350        Value::Array(arr) => Value::Array(arr.into_iter().map(keys_to_snake_case).collect()),
351        other => other,
352    }
353}
354
355// ── Tests ─────────────────────────────────────────────────────────────────────
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360    use serde_json::json;
361
362    fn compiled(transforms: &[RecordTransform]) -> Vec<CompiledTransform> {
363        transforms.iter().map(|t| compile(t).unwrap()).collect()
364    }
365
366    // ── Custom (always available) ─────────────────────────────────────────────
367
368    #[test]
369    fn test_custom_adds_field() {
370        let record = json!({"id": 1});
371        let result = apply_all(
372            record,
373            &compiled(&[RecordTransform::custom(|mut v| {
374                if let Value::Object(ref mut m) = v {
375                    m.insert("added".to_string(), json!(true));
376                }
377                v
378            })]),
379        );
380        assert_eq!(result["id"], 1);
381        assert_eq!(result["added"], true);
382    }
383
384    #[test]
385    fn test_custom_removes_field() {
386        let record = json!({"id": 1, "secret": "drop_me"});
387        let result = apply_all(
388            record,
389            &compiled(&[RecordTransform::custom(|mut v| {
390                if let Value::Object(ref mut m) = v {
391                    m.remove("secret");
392                }
393                v
394            })]),
395        );
396        assert_eq!(result["id"], 1);
397        assert!(result.get("secret").is_none());
398    }
399
400    #[test]
401    fn test_no_transforms_is_identity() {
402        let record = json!({"id": 1, "name": "Alice"});
403        let result = apply_all(record.clone(), &[]);
404        assert_eq!(result, record);
405    }
406
407    // ── Flatten ───────────────────────────────────────────────────────────────
408
409    #[cfg(feature = "transform-flatten")]
410    #[test]
411    fn test_flatten_nested_object() {
412        let record = json!({"a": {"b": 1, "c": {"d": 2}}, "e": 3});
413        let result = apply_all(
414            record,
415            &compiled(&[RecordTransform::Flatten {
416                separator: "__".into(),
417            }]),
418        );
419        assert_eq!(result["a__b"], 1);
420        assert_eq!(result["a__c__d"], 2);
421        assert_eq!(result["e"], 3);
422        assert!(result.get("a").is_none(), "nested key should be removed");
423    }
424
425    #[cfg(feature = "transform-flatten")]
426    #[test]
427    fn test_flatten_leaves_arrays_intact() {
428        let record = json!({"tags": ["rust", "api"], "meta": {"count": 2}});
429        let result = apply_all(
430            record,
431            &compiled(&[RecordTransform::Flatten {
432                separator: ".".into(),
433            }]),
434        );
435        assert_eq!(result["tags"], json!(["rust", "api"]));
436        assert_eq!(result["meta.count"], 2);
437    }
438
439    #[cfg(feature = "transform-flatten")]
440    #[test]
441    fn test_flatten_already_flat() {
442        let record = json!({"id": 1, "name": "Alice"});
443        let result = apply_all(
444            record.clone(),
445            &compiled(&[RecordTransform::Flatten {
446                separator: "__".into(),
447            }]),
448        );
449        assert_eq!(result, record);
450    }
451
452    #[cfg(feature = "transform-flatten")]
453    #[test]
454    fn test_flatten_empty_separator() {
455        let record = json!({"a": {"b": 1}});
456        let result = apply_all(
457            record,
458            &compiled(&[RecordTransform::Flatten {
459                separator: "".into(),
460            }]),
461        );
462        assert_eq!(result["ab"], 1);
463    }
464
465    // ── RenameKeys ────────────────────────────────────────────────────────────
466
467    #[cfg(feature = "transform-rename-keys")]
468    #[test]
469    fn test_rename_keys_strips_prefix() {
470        let record = json!({"_prefix_id": 1, "_prefix_name": "Alice"});
471        let result = apply_all(
472            record,
473            &compiled(&[RecordTransform::RenameKeys {
474                pattern: r"^_prefix_".into(),
475                replacement: "".into(),
476            }]),
477        );
478        assert_eq!(result["id"], 1);
479        assert_eq!(result["name"], "Alice");
480    }
481
482    #[cfg(feature = "transform-rename-keys")]
483    #[test]
484    fn test_rename_keys_uppercase_to_placeholder() {
485        let record = json!({"OUTER": {"INNER": 42}});
486        let result = apply_all(
487            record,
488            &compiled(&[RecordTransform::RenameKeys {
489                pattern: r"[A-Z]+".into(),
490                replacement: "x".into(),
491            }]),
492        );
493        assert_eq!(result["x"]["x"], 42);
494    }
495
496    #[cfg(feature = "transform-rename-keys")]
497    #[test]
498    fn test_rename_keys_in_array_elements() {
499        let record = json!({"items": [{"KEY": 1}, {"KEY": 2}]});
500        let result = apply_all(
501            record,
502            &compiled(&[RecordTransform::RenameKeys {
503                pattern: r"KEY".into(),
504                replacement: "key".into(),
505            }]),
506        );
507        assert_eq!(result["items"][0]["key"], 1);
508        assert_eq!(result["items"][1]["key"], 2);
509    }
510
511    #[cfg(feature = "transform-rename-keys")]
512    #[test]
513    fn test_rename_keys_invalid_regex_errors_at_compile() {
514        let err = compile(&RecordTransform::RenameKeys {
515            pattern: "[invalid".into(),
516            replacement: "".into(),
517        });
518        assert!(err.is_err());
519        assert!(matches!(err, Err(FaucetError::Transform(_))));
520    }
521
522    #[cfg(feature = "transform-rename-keys")]
523    #[test]
524    fn test_rename_keys_chained() {
525        let record = json!({"__camelCase__": 1});
526        let result = apply_all(
527            record,
528            &compiled(&[
529                RecordTransform::RenameKeys {
530                    pattern: r"^_+|_+$".into(),
531                    replacement: "".into(),
532                },
533                RecordTransform::RenameKeys {
534                    pattern: r"[A-Z]".into(),
535                    replacement: "_".into(),
536                },
537            ]),
538        );
539        let key = result.as_object().unwrap().keys().next().unwrap().clone();
540        assert_eq!(key, "camel_ase");
541    }
542
543    // ── KeysToSnakeCase ───────────────────────────────────────────────────────
544
545    #[cfg(feature = "transform-snake-case")]
546    #[test]
547    fn test_snake_case_spaces_to_underscores() {
548        assert_eq!(to_snake_case("First Name"), "first_name");
549        assert_eq!(to_snake_case("last name"), "last_name");
550    }
551
552    #[cfg(feature = "transform-snake-case")]
553    #[test]
554    fn test_snake_case_removes_hyphens_and_special_chars() {
555        assert_eq!(to_snake_case("last-name"), "lastname");
556        assert_eq!(to_snake_case("price ($)"), "price");
557    }
558
559    #[cfg(feature = "transform-snake-case")]
560    #[test]
561    fn test_snake_case_trims_edge_whitespace() {
562        assert_eq!(to_snake_case("  id  "), "id");
563        assert_eq!(to_snake_case("  first name  "), "first_name");
564    }
565
566    #[cfg(feature = "transform-snake-case")]
567    #[test]
568    fn test_snake_case_lowercases() {
569        assert_eq!(to_snake_case("ID"), "id");
570        assert_eq!(to_snake_case("UserName"), "username");
571    }
572
573    #[cfg(feature = "transform-snake-case")]
574    #[test]
575    fn test_snake_case_collapses_underscores_from_spaces() {
576        assert_eq!(to_snake_case("foo   bar"), "foo_bar");
577    }
578
579    #[cfg(feature = "transform-snake-case")]
580    #[test]
581    fn test_snake_case_empty_after_stripping() {
582        assert_eq!(to_snake_case("!@#"), "");
583        assert_eq!(to_snake_case("---"), "");
584    }
585
586    #[cfg(feature = "transform-snake-case")]
587    #[test]
588    fn test_keys_to_snake_case_on_record() {
589        let record = json!({
590            "First Name": "Alice",
591            "last-name": "Smith",
592            "price ($)": 9.99,
593            "  id  ": 1,
594        });
595        let result = apply_all(record, &compiled(&[RecordTransform::KeysToSnakeCase]));
596        assert_eq!(result["first_name"], "Alice");
597        assert_eq!(result["lastname"], "Smith");
598        assert_eq!(result["price"], 9.99);
599        assert_eq!(result["id"], 1);
600    }
601
602    #[cfg(feature = "transform-snake-case")]
603    #[test]
604    fn test_keys_to_snake_case_nested() {
605        let record = json!({"Outer Key": {"Inner Key": 42}});
606        let result = apply_all(record, &compiled(&[RecordTransform::KeysToSnakeCase]));
607        assert_eq!(result["outer_key"]["inner_key"], 42);
608    }
609
610    #[cfg(feature = "transform-snake-case")]
611    #[test]
612    fn test_keys_to_snake_case_in_array() {
613        let record = json!({"items": [{"MY KEY": 1}, {"MY KEY": 2}]});
614        let result = apply_all(record, &compiled(&[RecordTransform::KeysToSnakeCase]));
615        assert_eq!(result["items"][0]["my_key"], 1);
616        assert_eq!(result["items"][1]["my_key"], 2);
617    }
618
619    // ── Chaining ──────────────────────────────────────────────────────────────
620
621    #[cfg(all(feature = "transform-snake-case", feature = "transform-flatten"))]
622    #[test]
623    fn test_snake_case_then_flatten() {
624        let record = json!({"User Info": {"First Name": "Alice", "Last Name": "Smith"}});
625        let result = apply_all(
626            record,
627            &compiled(&[
628                RecordTransform::KeysToSnakeCase,
629                RecordTransform::Flatten {
630                    separator: "_".into(),
631                },
632            ]),
633        );
634        assert_eq!(result["user_info_first_name"], "Alice");
635        assert_eq!(result["user_info_last_name"], "Smith");
636    }
637
638    #[test]
639    fn test_custom_chained_with_builtin() {
640        // Custom runs before (or after) built-ins — ordering is preserved.
641        let record = json!({"id": 1, "raw_value": 100});
642        let result = apply_all(
643            record,
644            &compiled(&[
645                // Step 1: custom — double raw_value
646                RecordTransform::custom(|mut v| {
647                    if let Some(n) = v.get("raw_value").and_then(|n| n.as_i64())
648                        && let Value::Object(ref mut m) = v
649                    {
650                        m.insert("raw_value".to_string(), json!(n * 2));
651                    }
652                    v
653                }),
654                // Step 2: custom — rename raw_value → value
655                RecordTransform::custom(|mut v| {
656                    if let Value::Object(ref mut m) = v
657                        && let Some(val) = m.remove("raw_value")
658                    {
659                        m.insert("value".to_string(), val);
660                    }
661                    v
662                }),
663            ]),
664        );
665        assert_eq!(result["id"], 1);
666        assert_eq!(result["value"], 200);
667        assert!(result.get("raw_value").is_none());
668    }
669}