1use crate::error::FaucetError;
7use crate::observability::{Labels, instrumented_apply_stages};
8use crate::pipeline::StreamPage;
9use crate::stage::{CompiledStage, TransformStage, compile_stage};
10use crate::traits::Source;
11use async_trait::async_trait;
12use futures::StreamExt;
13use futures_core::Stream;
14use serde_json::Value;
15use std::collections::HashMap;
16use std::pin::Pin;
17
18pub struct TransformingSource {
39 inner: Box<dyn Source>,
40 stages: Vec<CompiledStage>,
41 labels: Labels,
42}
43
44impl TransformingSource {
45 pub fn new(
49 inner: Box<dyn Source>,
50 stages: Vec<TransformStage>,
51 labels: Labels,
52 ) -> Result<Self, FaucetError> {
53 let compiled = stages
54 .iter()
55 .map(compile_stage)
56 .collect::<Result<Vec<_>, _>>()?;
57 Ok(Self {
58 inner,
59 stages: compiled,
60 labels,
61 })
62 }
63}
64
65#[async_trait]
66impl Source for TransformingSource {
67 async fn fetch_with_context(
68 &self,
69 ctx: &HashMap<String, Value>,
70 ) -> Result<Vec<Value>, FaucetError> {
71 let records = self.inner.fetch_with_context(ctx).await?;
72 instrumented_apply_stages(records, &self.stages, &self.labels)
73 }
74
75 async fn fetch_with_context_incremental(
76 &self,
77 ctx: &HashMap<String, Value>,
78 ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
79 let (records, bookmark) = self.inner.fetch_with_context_incremental(ctx).await?;
80 let transformed = instrumented_apply_stages(records, &self.stages, &self.labels)?;
81 Ok((transformed, bookmark))
82 }
83
84 fn stream_pages<'a>(
85 &'a self,
86 ctx: &'a HashMap<String, Value>,
87 batch_size: usize,
88 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
89 Box::pin(async_stream::try_stream! {
90 let mut pages = self.inner.stream_pages(ctx, batch_size);
91 while let Some(page) = pages.next().await {
92 let page = page?;
93 let out = instrumented_apply_stages(
94 page.records, &self.stages, &self.labels,
95 )?;
96 if out.is_empty() {
97 yield StreamPage { records: vec![], bookmark: page.bookmark };
98 continue;
99 }
100 if batch_size == 0 {
101 yield StreamPage { records: out, bookmark: page.bookmark };
102 continue;
103 }
104 let total = out.len();
105 let mut start = 0usize;
106 while start < total {
107 let end = std::cmp::min(start + batch_size, total);
108 let is_last = end == total;
109 let chunk: Vec<Value> = out[start..end].to_vec();
110 yield StreamPage {
111 records: chunk,
112 bookmark: if is_last { page.bookmark.clone() } else { None },
113 };
114 start = end;
115 }
116 }
117 })
118 }
119
120 fn state_key(&self) -> Option<String> {
121 self.inner.state_key()
122 }
123
124 async fn apply_start_bookmark(&self, bookmark: Value) -> Result<(), FaucetError> {
125 self.inner.apply_start_bookmark(bookmark).await
126 }
127
128 fn connector_name(&self) -> &'static str {
129 self.inner.connector_name()
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use crate::stage::TransformStage;
137 use crate::transform::{KeyCaseMode, RecordTransform};
138 use serde_json::json;
139 use std::sync::Arc;
140 use std::sync::atomic::{AtomicBool, Ordering};
141
142 struct MockSource(Vec<Value>);
143
144 #[async_trait]
145 impl Source for MockSource {
146 async fn fetch_with_context(
147 &self,
148 _ctx: &HashMap<String, Value>,
149 ) -> Result<Vec<Value>, FaucetError> {
150 Ok(self.0.clone())
151 }
152 }
153
154 #[tokio::test]
155 async fn fetch_with_context_transforms_records() {
156 let inner: Box<dyn Source> = Box::new(MockSource(vec![json!({"FooBar": 1})]));
157 let wrapped = TransformingSource::new(
158 inner,
159 vec![TransformStage::Map(RecordTransform::KeysCase {
160 mode: KeyCaseMode::Snake,
161 })],
162 Labels::for_named("test"),
163 )
164 .expect("compile succeeds");
165 let out = wrapped.fetch_with_context(&HashMap::new()).await.unwrap();
166 assert_eq!(out, vec![json!({"foo_bar": 1})]);
167 }
168
169 struct IncrementalSource {
170 records: Vec<Value>,
171 bookmark: Value,
172 }
173
174 #[async_trait]
175 impl Source for IncrementalSource {
176 async fn fetch_with_context(
177 &self,
178 _ctx: &HashMap<String, Value>,
179 ) -> Result<Vec<Value>, FaucetError> {
180 Ok(self.records.clone())
181 }
182
183 async fn fetch_with_context_incremental(
184 &self,
185 _ctx: &HashMap<String, Value>,
186 ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
187 Ok((self.records.clone(), Some(self.bookmark.clone())))
188 }
189 }
190
191 #[tokio::test]
192 async fn fetch_with_context_incremental_transforms_and_preserves_bookmark() {
193 let inner: Box<dyn Source> = Box::new(IncrementalSource {
194 records: vec![json!({"FooBar": 1})],
195 bookmark: json!("2026-05-28T00:00:00Z"),
196 });
197 let wrapped = TransformingSource::new(
198 inner,
199 vec![TransformStage::Map(RecordTransform::KeysCase {
200 mode: KeyCaseMode::Snake,
201 })],
202 Labels::for_named("test"),
203 )
204 .unwrap();
205 let (records, bookmark) = wrapped
206 .fetch_with_context_incremental(&HashMap::new())
207 .await
208 .unwrap();
209 assert_eq!(records, vec![json!({"foo_bar": 1})]);
210 assert_eq!(bookmark, Some(json!("2026-05-28T00:00:00Z")));
211 }
212
213 struct PagedSource {
218 pages: Vec<Vec<Value>>,
219 final_bookmark: Value,
220 }
221
222 #[async_trait]
223 impl Source for PagedSource {
224 async fn fetch_with_context(
225 &self,
226 _ctx: &HashMap<String, Value>,
227 ) -> Result<Vec<Value>, FaucetError> {
228 Ok(self.pages.iter().flatten().cloned().collect())
229 }
230
231 fn stream_pages<'a>(
232 &'a self,
233 _ctx: &'a HashMap<String, Value>,
234 _batch_size: usize,
235 ) -> Pin<Box<dyn futures_core::Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>>
236 {
237 let pages = self.pages.clone();
238 let bookmark = self.final_bookmark.clone();
239 Box::pin(async_stream::try_stream! {
240 let n = pages.len();
241 for (i, records) in pages.into_iter().enumerate() {
242 let bm = if i + 1 == n { Some(bookmark.clone()) } else { None };
243 yield StreamPage { records, bookmark: bm };
244 }
245 })
246 }
247 }
248
249 #[tokio::test]
250 async fn stream_pages_transforms_each_page_and_preserves_bookmarks() {
251 let inner: Box<dyn Source> = Box::new(PagedSource {
252 pages: vec![
253 vec![json!({"FooBar": 1})],
254 vec![json!({"FooBar": 2})],
255 vec![json!({"FooBar": 3})],
256 ],
257 final_bookmark: json!("v1"),
258 });
259 let wrapped = TransformingSource::new(
260 inner,
261 vec![TransformStage::Map(RecordTransform::KeysCase {
262 mode: KeyCaseMode::Snake,
263 })],
264 Labels::for_named("test"),
265 )
266 .unwrap();
267
268 let ctx = HashMap::new();
269 let mut stream = wrapped.stream_pages(&ctx, 1000);
270 let mut collected: Vec<StreamPage> = Vec::new();
271 while let Some(page) = stream.next().await {
272 collected.push(page.unwrap());
273 }
274
275 assert_eq!(collected.len(), 3);
276 assert_eq!(collected[0].records, vec![json!({"foo_bar": 1})]);
277 assert!(collected[0].bookmark.is_none());
278 assert_eq!(collected[1].records, vec![json!({"foo_bar": 2})]);
279 assert!(collected[1].bookmark.is_none());
280 assert_eq!(collected[2].records, vec![json!({"foo_bar": 3})]);
281 assert_eq!(collected[2].bookmark, Some(json!("v1")));
282 }
283
284 #[tokio::test]
285 async fn stream_pages_passes_through_empty_records_page_with_bookmark() {
286 struct EmptyWithBookmark;
287 #[async_trait]
288 impl Source for EmptyWithBookmark {
289 async fn fetch_with_context(
290 &self,
291 _ctx: &HashMap<String, Value>,
292 ) -> Result<Vec<Value>, FaucetError> {
293 Ok(Vec::new())
294 }
295 fn stream_pages<'a>(
296 &'a self,
297 _ctx: &'a HashMap<String, Value>,
298 _batch_size: usize,
299 ) -> Pin<
300 Box<dyn futures_core::Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>,
301 > {
302 Box::pin(async_stream::try_stream! {
303 yield StreamPage { records: Vec::new(), bookmark: Some(json!("v1")) };
304 })
305 }
306 }
307 let wrapped = TransformingSource::new(
308 Box::new(EmptyWithBookmark),
309 vec![TransformStage::Map(RecordTransform::KeysCase {
310 mode: KeyCaseMode::Snake,
311 })],
312 Labels::for_named("test"),
313 )
314 .unwrap();
315 let ctx = HashMap::new();
316 let mut stream = wrapped.stream_pages(&ctx, 1000);
317 let page = stream.next().await.unwrap().unwrap();
318 assert!(page.records.is_empty());
319 assert_eq!(page.bookmark, Some(json!("v1")));
320 assert!(stream.next().await.is_none());
321 }
322
323 struct InstrumentedSource {
324 started: Arc<AtomicBool>,
325 }
326
327 #[async_trait]
328 impl Source for InstrumentedSource {
329 async fn fetch_with_context(
330 &self,
331 _ctx: &HashMap<String, Value>,
332 ) -> Result<Vec<Value>, FaucetError> {
333 Ok(vec![])
334 }
335 fn connector_name(&self) -> &'static str {
336 "instrumented"
337 }
338 fn state_key(&self) -> Option<String> {
339 Some("instrumented::key".to_string())
340 }
341 async fn apply_start_bookmark(&self, _bookmark: Value) -> Result<(), FaucetError> {
342 self.started.store(true, Ordering::Relaxed);
343 Ok(())
344 }
345 }
346
347 #[tokio::test]
348 async fn connector_name_state_key_and_start_bookmark_delegate_to_inner() {
349 let started = Arc::new(AtomicBool::new(false));
350 let inner = InstrumentedSource {
351 started: started.clone(),
352 };
353 let wrapped = TransformingSource::new(
354 Box::new(inner),
355 vec![TransformStage::Map(RecordTransform::KeysCase {
356 mode: KeyCaseMode::Snake,
357 })],
358 Labels::for_named("test"),
359 )
360 .unwrap();
361 assert_eq!(wrapped.connector_name(), "instrumented");
362 assert_eq!(wrapped.state_key(), Some("instrumented::key".to_string()));
363 wrapped.apply_start_bookmark(json!("bm")).await.unwrap();
364 assert!(started.load(Ordering::Relaxed));
365 }
366
367 #[tokio::test]
368 async fn new_fails_fast_on_invalid_regex() {
369 let inner: Box<dyn Source> = Box::new(MockSource(vec![]));
370 let result = TransformingSource::new(
371 inner,
372 vec![TransformStage::Map(RecordTransform::RenameKeys {
373 pattern: "[invalid".to_string(),
374 replacement: "x".to_string(),
375 })],
376 Labels::for_named("test"),
377 );
378 let err = match result {
379 Ok(_) => panic!("invalid regex must fail at new()"),
380 Err(e) => e,
381 };
382 assert!(matches!(err, FaucetError::Transform(_)));
383 }
384
385 #[tokio::test]
386 async fn custom_closure_transform_runs() {
387 let inner: Box<dyn Source> = Box::new(MockSource(vec![json!({"x": 1})]));
388 let wrapped = TransformingSource::new(
389 inner,
390 vec![TransformStage::Map(RecordTransform::custom(
391 |mut record| {
392 if let Some(obj) = record.as_object_mut() {
393 obj.insert("added".to_string(), json!(true));
394 }
395 record
396 },
397 ))],
398 Labels::for_named("test"),
399 )
400 .unwrap();
401 let out = wrapped.fetch_with_context(&HashMap::new()).await.unwrap();
402 assert_eq!(out, vec![json!({"x": 1, "added": true})]);
403 }
404
405 #[tokio::test]
406 async fn usable_as_boxed_dyn_source() {
407 let inner: Box<dyn Source> = Box::new(MockSource(vec![json!({"FooBar": 1})]));
408 let wrapped: Box<dyn Source> = Box::new(
409 TransformingSource::new(
410 inner,
411 vec![TransformStage::Map(RecordTransform::KeysCase {
412 mode: KeyCaseMode::Snake,
413 })],
414 Labels::for_named("test"),
415 )
416 .unwrap(),
417 );
418 let out = wrapped.fetch_with_context(&HashMap::new()).await.unwrap();
419 assert_eq!(out, vec![json!({"foo_bar": 1})]);
420 }
421
422 struct OnePageSource {
424 records: Vec<Value>,
425 bookmark: Option<Value>,
426 }
427
428 #[async_trait]
429 impl Source for OnePageSource {
430 async fn fetch_with_context(
431 &self,
432 _ctx: &HashMap<String, Value>,
433 ) -> Result<Vec<Value>, FaucetError> {
434 Ok(self.records.clone())
435 }
436 async fn fetch_with_context_incremental(
437 &self,
438 _ctx: &HashMap<String, Value>,
439 ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
440 Ok((self.records.clone(), self.bookmark.clone()))
441 }
442 fn stream_pages<'a>(
443 &'a self,
444 _ctx: &'a HashMap<String, Value>,
445 _batch_size: usize,
446 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
447 let page = StreamPage {
448 records: self.records.clone(),
449 bookmark: self.bookmark.clone(),
450 };
451 Box::pin(async_stream::stream! { yield Ok(page); })
452 }
453 }
454
455 #[cfg(feature = "transform-explode")]
456 fn explode_stage() -> TransformStage {
457 TransformStage::Explode(crate::stage::ExplodeSpec {
458 path: "items".to_owned(),
459 prefix: None,
460 separator: "_".to_owned(),
461 on_missing: crate::stage::OnMissing::Drop,
462 })
463 }
464
465 #[cfg(feature = "transform-explode")]
467 fn explode_10x_records(n: usize) -> Vec<Value> {
468 (0..n)
469 .map(|i| {
470 json!({
471 "id": i,
472 "items": (0..10).map(|j| json!({"k": j})).collect::<Vec<_>>(),
473 })
474 })
475 .collect()
476 }
477
478 #[cfg(feature = "transform-explode")]
479 #[tokio::test]
480 async fn stream_pages_rechunks_explosion_with_bookmark_on_last() {
481 let inner: Box<dyn Source> = Box::new(OnePageSource {
482 records: explode_10x_records(100), bookmark: Some(json!("bm")),
484 });
485 let wrapped =
486 TransformingSource::new(inner, vec![explode_stage()], Labels::for_named("t")).unwrap();
487 let ctx = HashMap::new();
488 let mut stream = wrapped.stream_pages(&ctx, 200);
489 let mut sub_pages: Vec<StreamPage> = Vec::new();
490 while let Some(p) = stream.next().await {
491 sub_pages.push(p.unwrap());
492 }
493 assert_eq!(sub_pages.len(), 5, "1000 records / 200 batch = 5 sub-pages");
494 for (i, p) in sub_pages.iter().enumerate() {
495 assert_eq!(p.records.len(), 200, "sub-page {i} should be size 200");
496 if i < 4 {
497 assert!(
498 p.bookmark.is_none(),
499 "non-final sub-page {i} carries no bookmark"
500 );
501 } else {
502 assert_eq!(p.bookmark, Some(json!("bm")), "final sub-page has bookmark");
503 }
504 }
505 }
506
507 #[cfg(feature = "transform-explode")]
508 #[tokio::test]
509 async fn stream_pages_batch_size_zero_emits_one_page() {
510 let inner: Box<dyn Source> = Box::new(OnePageSource {
511 records: explode_10x_records(10), bookmark: Some(json!("bm")),
513 });
514 let wrapped =
515 TransformingSource::new(inner, vec![explode_stage()], Labels::for_named("t")).unwrap();
516 let ctx = HashMap::new();
517 let mut stream = wrapped.stream_pages(&ctx, 0);
518 let mut sub_pages: Vec<StreamPage> = Vec::new();
519 while let Some(p) = stream.next().await {
520 sub_pages.push(p.unwrap());
521 }
522 assert_eq!(sub_pages.len(), 1, "batch_size=0 means one sub-page");
523 assert_eq!(sub_pages[0].records.len(), 100);
524 assert_eq!(sub_pages[0].bookmark, Some(json!("bm")));
525 }
526
527 #[cfg(feature = "transform-filter")]
528 #[tokio::test]
529 async fn stream_pages_filter_drops_all_still_yields_bookmark() {
530 let inner: Box<dyn Source> = Box::new(OnePageSource {
531 records: vec![json!({"deleted": true}), json!({"deleted": true})],
532 bookmark: Some(json!("bm")),
533 });
534 let drop_all = TransformStage::Filter(crate::stage::FilterSpec {
535 path: "deleted".to_owned(),
536 op: crate::stage::FilterOp::Ne,
537 value: Some(json!(true)),
538 });
539 let wrapped =
540 TransformingSource::new(inner, vec![drop_all], Labels::for_named("t")).unwrap();
541 let ctx = HashMap::new();
542 let mut stream = wrapped.stream_pages(&ctx, 100);
543 let mut sub_pages: Vec<StreamPage> = Vec::new();
544 while let Some(p) = stream.next().await {
545 sub_pages.push(p.unwrap());
546 }
547 assert_eq!(sub_pages.len(), 1);
548 assert!(sub_pages[0].records.is_empty());
549 assert_eq!(sub_pages[0].bookmark, Some(json!("bm")));
550 }
551}