Skip to main content

faucet_core/
transforming_source.rs

1//! Wrap any [`Source`] with a fixed list of [`TransformStage`]s applied to
2//! every emitted record. The canonical way for library callers to attach
3//! stages (transforms wrapped via [`TransformStage::Map`], plus `Filter` /
4//! `Explode` / `Custom`); the CLI uses this same type internally.
5
6use crate::error::FaucetError;
7use crate::observability::{Labels, instrumented_apply_stages};
8use crate::pipeline::StreamPage;
9use crate::stage::{CompiledStage, TransformStage, compile_stage};
10use crate::traits::Source;
11use async_trait::async_trait;
12use futures::StreamExt;
13use futures_core::Stream;
14use serde_json::Value;
15use std::collections::HashMap;
16use std::pin::Pin;
17
18/// Source decorator that applies a fixed list of compiled stages to every
19/// record. Emits `faucet_transform_*` metrics per page via
20/// [`instrumented_apply_stages`].
21///
22/// # Example
23///
24/// ```no_run
25/// use faucet_core::{RecordTransform, Source, TransformingSource};
26/// use faucet_core::observability::Labels;
27/// use faucet_core::stage::TransformStage;
28/// use faucet_core::transform::KeyCaseMode;
29///
30/// # fn build_inner() -> Box<dyn Source> { unimplemented!() }
31/// let inner: Box<dyn Source> = build_inner();
32/// let wrapped = TransformingSource::new(
33///     inner,
34///     vec![TransformStage::Map(RecordTransform::KeysCase { mode: KeyCaseMode::Snake })],
35///     Labels::for_named("rest"),
36/// ).unwrap();
37/// ```
38pub struct TransformingSource {
39    inner: Box<dyn Source>,
40    stages: Vec<CompiledStage>,
41    labels: Labels,
42}
43
44impl TransformingSource {
45    /// Compile `stages` and wrap `inner`. Returns
46    /// [`FaucetError::Transform`] if any stage's compilation fails (e.g.
47    /// invalid regex in `RenameKeys`).
48    pub fn new(
49        inner: Box<dyn Source>,
50        stages: Vec<TransformStage>,
51        labels: Labels,
52    ) -> Result<Self, FaucetError> {
53        let compiled = stages
54            .iter()
55            .map(compile_stage)
56            .collect::<Result<Vec<_>, _>>()?;
57        Ok(Self {
58            inner,
59            stages: compiled,
60            labels,
61        })
62    }
63}
64
65#[async_trait]
66impl Source for TransformingSource {
67    async fn fetch_with_context(
68        &self,
69        ctx: &HashMap<String, Value>,
70    ) -> Result<Vec<Value>, FaucetError> {
71        let records = self.inner.fetch_with_context(ctx).await?;
72        instrumented_apply_stages(records, &self.stages, &self.labels)
73    }
74
75    async fn fetch_with_context_incremental(
76        &self,
77        ctx: &HashMap<String, Value>,
78    ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
79        let (records, bookmark) = self.inner.fetch_with_context_incremental(ctx).await?;
80        let transformed = instrumented_apply_stages(records, &self.stages, &self.labels)?;
81        Ok((transformed, bookmark))
82    }
83
84    fn stream_pages<'a>(
85        &'a self,
86        ctx: &'a HashMap<String, Value>,
87        batch_size: usize,
88    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
89        Box::pin(async_stream::try_stream! {
90            let mut pages = self.inner.stream_pages(ctx, batch_size);
91            while let Some(page) = pages.next().await {
92                let page = page?;
93                let out = instrumented_apply_stages(
94                    page.records, &self.stages, &self.labels,
95                )?;
96                if out.is_empty() {
97                    yield StreamPage { records: vec![], bookmark: page.bookmark };
98                    continue;
99                }
100                if batch_size == 0 {
101                    yield StreamPage { records: out, bookmark: page.bookmark };
102                    continue;
103                }
104                let total = out.len();
105                let mut start = 0usize;
106                while start < total {
107                    let end = std::cmp::min(start + batch_size, total);
108                    let is_last = end == total;
109                    let chunk: Vec<Value> = out[start..end].to_vec();
110                    yield StreamPage {
111                        records: chunk,
112                        bookmark: if is_last { page.bookmark.clone() } else { None },
113                    };
114                    start = end;
115                }
116            }
117        })
118    }
119
120    fn state_key(&self) -> Option<String> {
121        self.inner.state_key()
122    }
123
124    async fn apply_start_bookmark(&self, bookmark: Value) -> Result<(), FaucetError> {
125        self.inner.apply_start_bookmark(bookmark).await
126    }
127
128    fn connector_name(&self) -> &'static str {
129        self.inner.connector_name()
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use crate::stage::TransformStage;
137    use crate::transform::{KeyCaseMode, RecordTransform};
138    use serde_json::json;
139    use std::sync::Arc;
140    use std::sync::atomic::{AtomicBool, Ordering};
141
142    struct MockSource(Vec<Value>);
143
144    #[async_trait]
145    impl Source for MockSource {
146        async fn fetch_with_context(
147            &self,
148            _ctx: &HashMap<String, Value>,
149        ) -> Result<Vec<Value>, FaucetError> {
150            Ok(self.0.clone())
151        }
152    }
153
154    #[tokio::test]
155    async fn fetch_with_context_transforms_records() {
156        let inner: Box<dyn Source> = Box::new(MockSource(vec![json!({"FooBar": 1})]));
157        let wrapped = TransformingSource::new(
158            inner,
159            vec![TransformStage::Map(RecordTransform::KeysCase {
160                mode: KeyCaseMode::Snake,
161            })],
162            Labels::for_named("test"),
163        )
164        .expect("compile succeeds");
165        let out = wrapped.fetch_with_context(&HashMap::new()).await.unwrap();
166        assert_eq!(out, vec![json!({"foo_bar": 1})]);
167    }
168
169    struct IncrementalSource {
170        records: Vec<Value>,
171        bookmark: Value,
172    }
173
174    #[async_trait]
175    impl Source for IncrementalSource {
176        async fn fetch_with_context(
177            &self,
178            _ctx: &HashMap<String, Value>,
179        ) -> Result<Vec<Value>, FaucetError> {
180            Ok(self.records.clone())
181        }
182
183        async fn fetch_with_context_incremental(
184            &self,
185            _ctx: &HashMap<String, Value>,
186        ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
187            Ok((self.records.clone(), Some(self.bookmark.clone())))
188        }
189    }
190
191    #[tokio::test]
192    async fn fetch_with_context_incremental_transforms_and_preserves_bookmark() {
193        let inner: Box<dyn Source> = Box::new(IncrementalSource {
194            records: vec![json!({"FooBar": 1})],
195            bookmark: json!("2026-05-28T00:00:00Z"),
196        });
197        let wrapped = TransformingSource::new(
198            inner,
199            vec![TransformStage::Map(RecordTransform::KeysCase {
200                mode: KeyCaseMode::Snake,
201            })],
202            Labels::for_named("test"),
203        )
204        .unwrap();
205        let (records, bookmark) = wrapped
206            .fetch_with_context_incremental(&HashMap::new())
207            .await
208            .unwrap();
209        assert_eq!(records, vec![json!({"foo_bar": 1})]);
210        assert_eq!(bookmark, Some(json!("2026-05-28T00:00:00Z")));
211    }
212
213    /// Emits records as N predetermined pages with the bookmark only on the last.
214    /// Overrides `stream_pages` directly so the test catches whether the wrapper
215    /// delegates to the native streaming path (correct) or falls back to the
216    /// chunk-the-buffer default (wrong — the bug we're fixing).
217    struct PagedSource {
218        pages: Vec<Vec<Value>>,
219        final_bookmark: Value,
220    }
221
222    #[async_trait]
223    impl Source for PagedSource {
224        async fn fetch_with_context(
225            &self,
226            _ctx: &HashMap<String, Value>,
227        ) -> Result<Vec<Value>, FaucetError> {
228            Ok(self.pages.iter().flatten().cloned().collect())
229        }
230
231        fn stream_pages<'a>(
232            &'a self,
233            _ctx: &'a HashMap<String, Value>,
234            _batch_size: usize,
235        ) -> Pin<Box<dyn futures_core::Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>>
236        {
237            let pages = self.pages.clone();
238            let bookmark = self.final_bookmark.clone();
239            Box::pin(async_stream::try_stream! {
240                let n = pages.len();
241                for (i, records) in pages.into_iter().enumerate() {
242                    let bm = if i + 1 == n { Some(bookmark.clone()) } else { None };
243                    yield StreamPage { records, bookmark: bm };
244                }
245            })
246        }
247    }
248
249    #[tokio::test]
250    async fn stream_pages_transforms_each_page_and_preserves_bookmarks() {
251        let inner: Box<dyn Source> = Box::new(PagedSource {
252            pages: vec![
253                vec![json!({"FooBar": 1})],
254                vec![json!({"FooBar": 2})],
255                vec![json!({"FooBar": 3})],
256            ],
257            final_bookmark: json!("v1"),
258        });
259        let wrapped = TransformingSource::new(
260            inner,
261            vec![TransformStage::Map(RecordTransform::KeysCase {
262                mode: KeyCaseMode::Snake,
263            })],
264            Labels::for_named("test"),
265        )
266        .unwrap();
267
268        let ctx = HashMap::new();
269        let mut stream = wrapped.stream_pages(&ctx, 1000);
270        let mut collected: Vec<StreamPage> = Vec::new();
271        while let Some(page) = stream.next().await {
272            collected.push(page.unwrap());
273        }
274
275        assert_eq!(collected.len(), 3);
276        assert_eq!(collected[0].records, vec![json!({"foo_bar": 1})]);
277        assert!(collected[0].bookmark.is_none());
278        assert_eq!(collected[1].records, vec![json!({"foo_bar": 2})]);
279        assert!(collected[1].bookmark.is_none());
280        assert_eq!(collected[2].records, vec![json!({"foo_bar": 3})]);
281        assert_eq!(collected[2].bookmark, Some(json!("v1")));
282    }
283
284    #[tokio::test]
285    async fn stream_pages_passes_through_empty_records_page_with_bookmark() {
286        struct EmptyWithBookmark;
287        #[async_trait]
288        impl Source for EmptyWithBookmark {
289            async fn fetch_with_context(
290                &self,
291                _ctx: &HashMap<String, Value>,
292            ) -> Result<Vec<Value>, FaucetError> {
293                Ok(Vec::new())
294            }
295            fn stream_pages<'a>(
296                &'a self,
297                _ctx: &'a HashMap<String, Value>,
298                _batch_size: usize,
299            ) -> Pin<
300                Box<dyn futures_core::Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>,
301            > {
302                Box::pin(async_stream::try_stream! {
303                    yield StreamPage { records: Vec::new(), bookmark: Some(json!("v1")) };
304                })
305            }
306        }
307        let wrapped = TransformingSource::new(
308            Box::new(EmptyWithBookmark),
309            vec![TransformStage::Map(RecordTransform::KeysCase {
310                mode: KeyCaseMode::Snake,
311            })],
312            Labels::for_named("test"),
313        )
314        .unwrap();
315        let ctx = HashMap::new();
316        let mut stream = wrapped.stream_pages(&ctx, 1000);
317        let page = stream.next().await.unwrap().unwrap();
318        assert!(page.records.is_empty());
319        assert_eq!(page.bookmark, Some(json!("v1")));
320        assert!(stream.next().await.is_none());
321    }
322
323    struct InstrumentedSource {
324        started: Arc<AtomicBool>,
325    }
326
327    #[async_trait]
328    impl Source for InstrumentedSource {
329        async fn fetch_with_context(
330            &self,
331            _ctx: &HashMap<String, Value>,
332        ) -> Result<Vec<Value>, FaucetError> {
333            Ok(vec![])
334        }
335        fn connector_name(&self) -> &'static str {
336            "instrumented"
337        }
338        fn state_key(&self) -> Option<String> {
339            Some("instrumented::key".to_string())
340        }
341        async fn apply_start_bookmark(&self, _bookmark: Value) -> Result<(), FaucetError> {
342            self.started.store(true, Ordering::Relaxed);
343            Ok(())
344        }
345    }
346
347    #[tokio::test]
348    async fn connector_name_state_key_and_start_bookmark_delegate_to_inner() {
349        let started = Arc::new(AtomicBool::new(false));
350        let inner = InstrumentedSource {
351            started: started.clone(),
352        };
353        let wrapped = TransformingSource::new(
354            Box::new(inner),
355            vec![TransformStage::Map(RecordTransform::KeysCase {
356                mode: KeyCaseMode::Snake,
357            })],
358            Labels::for_named("test"),
359        )
360        .unwrap();
361        assert_eq!(wrapped.connector_name(), "instrumented");
362        assert_eq!(wrapped.state_key(), Some("instrumented::key".to_string()));
363        wrapped.apply_start_bookmark(json!("bm")).await.unwrap();
364        assert!(started.load(Ordering::Relaxed));
365    }
366
367    #[tokio::test]
368    async fn new_fails_fast_on_invalid_regex() {
369        let inner: Box<dyn Source> = Box::new(MockSource(vec![]));
370        let result = TransformingSource::new(
371            inner,
372            vec![TransformStage::Map(RecordTransform::RenameKeys {
373                pattern: "[invalid".to_string(),
374                replacement: "x".to_string(),
375            })],
376            Labels::for_named("test"),
377        );
378        let err = match result {
379            Ok(_) => panic!("invalid regex must fail at new()"),
380            Err(e) => e,
381        };
382        assert!(matches!(err, FaucetError::Transform(_)));
383    }
384
385    #[tokio::test]
386    async fn custom_closure_transform_runs() {
387        let inner: Box<dyn Source> = Box::new(MockSource(vec![json!({"x": 1})]));
388        let wrapped = TransformingSource::new(
389            inner,
390            vec![TransformStage::Map(RecordTransform::custom(
391                |mut record| {
392                    if let Some(obj) = record.as_object_mut() {
393                        obj.insert("added".to_string(), json!(true));
394                    }
395                    record
396                },
397            ))],
398            Labels::for_named("test"),
399        )
400        .unwrap();
401        let out = wrapped.fetch_with_context(&HashMap::new()).await.unwrap();
402        assert_eq!(out, vec![json!({"x": 1, "added": true})]);
403    }
404
405    #[tokio::test]
406    async fn usable_as_boxed_dyn_source() {
407        let inner: Box<dyn Source> = Box::new(MockSource(vec![json!({"FooBar": 1})]));
408        let wrapped: Box<dyn Source> = Box::new(
409            TransformingSource::new(
410                inner,
411                vec![TransformStage::Map(RecordTransform::KeysCase {
412                    mode: KeyCaseMode::Snake,
413                })],
414                Labels::for_named("test"),
415            )
416            .unwrap(),
417        );
418        let out = wrapped.fetch_with_context(&HashMap::new()).await.unwrap();
419        assert_eq!(out, vec![json!({"foo_bar": 1})]);
420    }
421
422    /// A source that emits a single page with the given records and bookmark.
423    struct OnePageSource {
424        records: Vec<Value>,
425        bookmark: Option<Value>,
426    }
427
428    #[async_trait]
429    impl Source for OnePageSource {
430        async fn fetch_with_context(
431            &self,
432            _ctx: &HashMap<String, Value>,
433        ) -> Result<Vec<Value>, FaucetError> {
434            Ok(self.records.clone())
435        }
436        async fn fetch_with_context_incremental(
437            &self,
438            _ctx: &HashMap<String, Value>,
439        ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
440            Ok((self.records.clone(), self.bookmark.clone()))
441        }
442        fn stream_pages<'a>(
443            &'a self,
444            _ctx: &'a HashMap<String, Value>,
445            _batch_size: usize,
446        ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
447            let page = StreamPage {
448                records: self.records.clone(),
449                bookmark: self.bookmark.clone(),
450            };
451            Box::pin(async_stream::stream! { yield Ok(page); })
452        }
453    }
454
455    #[cfg(feature = "transform-explode")]
456    fn explode_stage() -> TransformStage {
457        TransformStage::Explode(crate::stage::ExplodeSpec {
458            path: "items".to_owned(),
459            prefix: None,
460            separator: "_".to_owned(),
461            on_missing: crate::stage::OnMissing::Drop,
462        })
463    }
464
465    /// Build N records each with a 10-element `items` array, so explode 10×s them.
466    #[cfg(feature = "transform-explode")]
467    fn explode_10x_records(n: usize) -> Vec<Value> {
468        (0..n)
469            .map(|i| {
470                json!({
471                    "id": i,
472                    "items": (0..10).map(|j| json!({"k": j})).collect::<Vec<_>>(),
473                })
474            })
475            .collect()
476    }
477
478    #[cfg(feature = "transform-explode")]
479    #[tokio::test]
480    async fn stream_pages_rechunks_explosion_with_bookmark_on_last() {
481        let inner: Box<dyn Source> = Box::new(OnePageSource {
482            records: explode_10x_records(100), // 100 → 1000 after explode
483            bookmark: Some(json!("bm")),
484        });
485        let wrapped =
486            TransformingSource::new(inner, vec![explode_stage()], Labels::for_named("t")).unwrap();
487        let ctx = HashMap::new();
488        let mut stream = wrapped.stream_pages(&ctx, 200);
489        let mut sub_pages: Vec<StreamPage> = Vec::new();
490        while let Some(p) = stream.next().await {
491            sub_pages.push(p.unwrap());
492        }
493        assert_eq!(sub_pages.len(), 5, "1000 records / 200 batch = 5 sub-pages");
494        for (i, p) in sub_pages.iter().enumerate() {
495            assert_eq!(p.records.len(), 200, "sub-page {i} should be size 200");
496            if i < 4 {
497                assert!(
498                    p.bookmark.is_none(),
499                    "non-final sub-page {i} carries no bookmark"
500                );
501            } else {
502                assert_eq!(p.bookmark, Some(json!("bm")), "final sub-page has bookmark");
503            }
504        }
505    }
506
507    #[cfg(feature = "transform-explode")]
508    #[tokio::test]
509    async fn stream_pages_batch_size_zero_emits_one_page() {
510        let inner: Box<dyn Source> = Box::new(OnePageSource {
511            records: explode_10x_records(10), // 10 → 100 after explode
512            bookmark: Some(json!("bm")),
513        });
514        let wrapped =
515            TransformingSource::new(inner, vec![explode_stage()], Labels::for_named("t")).unwrap();
516        let ctx = HashMap::new();
517        let mut stream = wrapped.stream_pages(&ctx, 0);
518        let mut sub_pages: Vec<StreamPage> = Vec::new();
519        while let Some(p) = stream.next().await {
520            sub_pages.push(p.unwrap());
521        }
522        assert_eq!(sub_pages.len(), 1, "batch_size=0 means one sub-page");
523        assert_eq!(sub_pages[0].records.len(), 100);
524        assert_eq!(sub_pages[0].bookmark, Some(json!("bm")));
525    }
526
527    #[cfg(feature = "transform-filter")]
528    #[tokio::test]
529    async fn stream_pages_filter_drops_all_still_yields_bookmark() {
530        let inner: Box<dyn Source> = Box::new(OnePageSource {
531            records: vec![json!({"deleted": true}), json!({"deleted": true})],
532            bookmark: Some(json!("bm")),
533        });
534        let drop_all = TransformStage::Filter(crate::stage::FilterSpec {
535            path: "deleted".to_owned(),
536            op: crate::stage::FilterOp::Ne,
537            value: Some(json!(true)),
538        });
539        let wrapped =
540            TransformingSource::new(inner, vec![drop_all], Labels::for_named("t")).unwrap();
541        let ctx = HashMap::new();
542        let mut stream = wrapped.stream_pages(&ctx, 100);
543        let mut sub_pages: Vec<StreamPage> = Vec::new();
544        while let Some(p) = stream.next().await {
545            sub_pages.push(p.unwrap());
546        }
547        assert_eq!(sub_pages.len(), 1);
548        assert!(sub_pages[0].records.is_empty());
549        assert_eq!(sub_pages[0].bookmark, Some(json!("bm")));
550    }
551}