1use crate::error::FaucetError;
4use crate::pipeline::StreamPage;
5use async_trait::async_trait;
6use futures_core::Stream;
7use serde_json::Value;
8use std::pin::Pin;
9
10#[async_trait]
12pub trait Source: Send + Sync {
13 async fn fetch_with_context(
21 &self,
22 context: &std::collections::HashMap<String, Value>,
23 ) -> Result<Vec<Value>, FaucetError>;
24
25 async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
27 self.fetch_with_context(&std::collections::HashMap::new())
28 .await
29 }
30
31 async fn fetch_with_context_incremental(
37 &self,
38 context: &std::collections::HashMap<String, Value>,
39 ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
40 let records = self.fetch_with_context(context).await?;
41 Ok((records, None))
42 }
43
44 async fn fetch_all_incremental(&self) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
46 self.fetch_with_context_incremental(&std::collections::HashMap::new())
47 .await
48 }
49
50 fn stream_pages<'a>(
74 &'a self,
75 context: &'a std::collections::HashMap<String, Value>,
76 batch_size: usize,
77 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
78 Box::pin(async_stream::try_stream! {
79 let (records, bookmark) = self
80 .fetch_with_context_incremental(context)
81 .await?;
82 let total = records.len();
83 let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
86
87 if total == 0 {
88 if bookmark.is_some() {
89 yield StreamPage {
90 records: Vec::new(),
91 bookmark,
92 };
93 }
94 return;
95 }
96
97 let mut iter = records.into_iter();
98 let mut consumed = 0usize;
99 loop {
100 let batch: Vec<Value> = iter.by_ref().take(chunk).collect();
101 if batch.is_empty() {
102 break;
103 }
104 consumed += batch.len();
105 let page_bookmark = if consumed >= total {
106 bookmark.clone()
107 } else {
108 None
109 };
110 yield StreamPage {
111 records: batch,
112 bookmark: page_bookmark,
113 };
114 }
115 })
116 }
117
118 fn config_schema(&self) -> Value {
120 serde_json::json!({"type": "object", "properties": {}})
121 }
122
123 fn state_key(&self) -> Option<String> {
135 None
136 }
137
138 async fn apply_start_bookmark(&self, _bookmark: Value) -> Result<(), FaucetError> {
147 Ok(())
148 }
149
150 fn connector_name(&self) -> &'static str {
158 crate::observability::strip_type_name(std::any::type_name::<Self>())
159 }
160
161 async fn check(
174 &self,
175 ctx: &crate::check::CheckContext,
176 ) -> Result<crate::check::CheckReport, FaucetError> {
177 use crate::check::{CheckReport, Probe};
178 use futures::StreamExt;
179
180 let empty = std::collections::HashMap::new();
181 let start = std::time::Instant::now();
182 let mut pages = self.stream_pages(&empty, 1);
183 let probe = match tokio::time::timeout(ctx.timeout, pages.next()).await {
184 Err(_) => Probe::fail("read", start.elapsed(), "timed out fetching first page"),
185 Ok(None) | Ok(Some(Ok(_))) => Probe::pass("read", start.elapsed()),
186 Ok(Some(Err(e))) => Probe::fail("read", start.elapsed(), e.to_string()),
187 };
188 Ok(CheckReport::single(probe))
189 }
190}
191
192pub type RowOutcome = Result<(), FaucetError>;
198
199#[async_trait]
201pub trait Sink: Send + Sync {
202 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError>;
206
207 async fn flush(&self) -> Result<(), FaucetError> {
212 Ok(())
213 }
214
215 async fn write_batch_partial(&self, records: &[Value]) -> Result<Vec<RowOutcome>, FaucetError> {
224 self.write_batch(records).await?;
225 Ok(records.iter().map(|_| Ok(())).collect())
226 }
227
228 fn config_schema(&self) -> Value {
236 serde_json::json!({"type": "object", "properties": {}})
237 }
238
239 fn connector_name(&self) -> &'static str {
242 crate::observability::strip_type_name(std::any::type_name::<Self>())
243 }
244
245 async fn check(
256 &self,
257 _ctx: &crate::check::CheckContext,
258 ) -> Result<crate::check::CheckReport, FaucetError> {
259 Ok(crate::check::CheckReport::not_implemented())
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use serde_json::json;
267
268 struct MockSource {
271 records: Vec<Value>,
272 }
273
274 #[async_trait]
275 impl Source for MockSource {
276 async fn fetch_with_context(
277 &self,
278 _context: &std::collections::HashMap<String, Value>,
279 ) -> Result<Vec<Value>, FaucetError> {
280 Ok(self.records.clone())
281 }
282 }
283
284 struct IncrementalSource {
285 records: Vec<Value>,
286 bookmark: Value,
287 }
288
289 #[async_trait]
290 impl Source for IncrementalSource {
291 async fn fetch_with_context(
292 &self,
293 _context: &std::collections::HashMap<String, Value>,
294 ) -> Result<Vec<Value>, FaucetError> {
295 Ok(self.records.clone())
296 }
297
298 async fn fetch_with_context_incremental(
299 &self,
300 _context: &std::collections::HashMap<String, Value>,
301 ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
302 Ok((self.records.clone(), Some(self.bookmark.clone())))
303 }
304 }
305
306 struct FailingSource;
307
308 #[async_trait]
309 impl Source for FailingSource {
310 async fn fetch_with_context(
311 &self,
312 _context: &std::collections::HashMap<String, Value>,
313 ) -> Result<Vec<Value>, FaucetError> {
314 Err(FaucetError::Auth("no credentials".into()))
315 }
316 }
317
318 struct MockSink {
321 written: std::sync::Mutex<Vec<Value>>,
322 }
323
324 impl MockSink {
325 fn new() -> Self {
326 Self {
327 written: std::sync::Mutex::new(Vec::new()),
328 }
329 }
330 }
331
332 #[async_trait]
333 impl Sink for MockSink {
334 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
335 let mut w = self.written.lock().unwrap();
336 w.extend(records.iter().cloned());
337 Ok(records.len())
338 }
339 }
340
341 struct FailingSink;
342
343 #[async_trait]
344 impl Sink for FailingSink {
345 async fn write_batch(&self, _records: &[Value]) -> Result<usize, FaucetError> {
346 Err(FaucetError::Sink("write failed".into()))
347 }
348 }
349
350 #[tokio::test]
353 async fn source_fetch_all_returns_records() {
354 let source = MockSource {
355 records: vec![json!({"id": 1}), json!({"id": 2})],
356 };
357 let records = source.fetch_all().await.unwrap();
358 assert_eq!(records.len(), 2);
359 assert_eq!(records[0]["id"], 1);
360 }
361
362 #[tokio::test]
363 async fn source_fetch_all_empty() {
364 let source = MockSource { records: vec![] };
365 let records = source.fetch_all().await.unwrap();
366 assert!(records.is_empty());
367 }
368
369 #[tokio::test]
370 async fn source_default_incremental_returns_none_bookmark() {
371 let source = MockSource {
372 records: vec![json!({"id": 1})],
373 };
374 let (records, bookmark) = source.fetch_all_incremental().await.unwrap();
375 assert_eq!(records.len(), 1);
376 assert!(bookmark.is_none());
377 }
378
379 #[tokio::test]
380 async fn source_custom_incremental_returns_bookmark() {
381 let source = IncrementalSource {
382 records: vec![json!({"id": 1})],
383 bookmark: json!("2024-12-01"),
384 };
385 let (records, bookmark) = source.fetch_all_incremental().await.unwrap();
386 assert_eq!(records.len(), 1);
387 assert_eq!(bookmark, Some(json!("2024-12-01")));
388 }
389
390 #[tokio::test]
391 async fn source_error_propagates() {
392 let source = FailingSource;
393 let result = source.fetch_all().await;
394 assert!(result.is_err());
395 assert!(matches!(result, Err(FaucetError::Auth(_))));
396 }
397
398 #[tokio::test]
399 async fn source_as_trait_object() {
400 let source: Box<dyn Source> = Box::new(MockSource {
401 records: vec![json!({"id": 42})],
402 });
403 let records = source.fetch_all().await.unwrap();
404 assert_eq!(records[0]["id"], 42);
405 }
406
407 #[tokio::test]
410 async fn sink_write_batch_returns_count() {
411 let sink = MockSink::new();
412 let records = vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})];
413 let count = sink.write_batch(&records).await.unwrap();
414 assert_eq!(count, 3);
415 }
416
417 #[tokio::test]
418 async fn sink_write_batch_empty() {
419 let sink = MockSink::new();
420 let count = sink.write_batch(&[]).await.unwrap();
421 assert_eq!(count, 0);
422 }
423
424 #[tokio::test]
425 async fn sink_accumulates_records() {
426 let sink = MockSink::new();
427 sink.write_batch(&[json!({"a": 1})]).await.unwrap();
428 sink.write_batch(&[json!({"b": 2})]).await.unwrap();
429 let written = sink.written.lock().unwrap();
430 assert_eq!(written.len(), 2);
431 }
432
433 #[tokio::test]
434 async fn sink_default_flush_is_noop() {
435 let sink = MockSink::new();
436 assert!(sink.flush().await.is_ok());
437 }
438
439 #[tokio::test]
440 async fn sink_error_propagates() {
441 let sink = FailingSink;
442 let result = sink.write_batch(&[json!({"id": 1})]).await;
443 assert!(result.is_err());
444 assert!(matches!(result, Err(FaucetError::Sink(_))));
445 }
446
447 #[tokio::test]
448 async fn sink_as_trait_object() {
449 let sink: Box<dyn Sink> = Box::new(MockSink::new());
450 let count = sink.write_batch(&[json!({"id": 1})]).await.unwrap();
451 assert_eq!(count, 1);
452 }
453
454 use crate::pipeline::DEFAULT_BATCH_SIZE;
457 use futures::StreamExt;
458
459 #[tokio::test]
460 async fn default_stream_pages_chunks_records() {
461 let source = MockSource {
462 records: (0..5).map(|i| json!({"i": i})).collect(),
463 };
464 let ctx = std::collections::HashMap::new();
465 let mut pages = source.stream_pages(&ctx, 2);
466 let mut all = Vec::new();
467 while let Some(page) = pages.next().await {
468 all.push(page.unwrap());
469 }
470 assert_eq!(all.len(), 3);
472 assert_eq!(all[0].records.len(), 2);
473 assert_eq!(all[1].records.len(), 2);
474 assert_eq!(all[2].records.len(), 1);
475 }
476
477 #[tokio::test]
478 async fn default_stream_pages_attaches_bookmark_to_final_page_only() {
479 let source = IncrementalSource {
480 records: (0..5).map(|i| json!({"i": i})).collect(),
481 bookmark: json!("v1"),
482 };
483 let ctx = std::collections::HashMap::new();
484 let mut pages = source.stream_pages(&ctx, 2);
485 let mut collected = Vec::new();
486 while let Some(page) = pages.next().await {
487 collected.push(page.unwrap());
488 }
489 assert_eq!(collected.len(), 3);
490 assert!(collected[0].bookmark.is_none());
491 assert!(collected[1].bookmark.is_none());
492 assert_eq!(collected[2].bookmark, Some(json!("v1")));
493 }
494
495 #[tokio::test]
496 async fn default_stream_pages_single_page_when_batch_size_exceeds_total() {
497 let source = MockSource {
498 records: vec![json!({"id": 1}), json!({"id": 2})],
499 };
500 let ctx = std::collections::HashMap::new();
501 let mut pages = source.stream_pages(&ctx, 100);
502 let mut collected = Vec::new();
503 while let Some(page) = pages.next().await {
504 collected.push(page.unwrap());
505 }
506 assert_eq!(collected.len(), 1);
507 assert_eq!(collected[0].records.len(), 2);
508 }
509
510 #[tokio::test]
511 async fn default_stream_pages_batch_size_zero_emits_single_page() {
512 let source = MockSource {
515 records: (0..50_000).map(|i| json!({"i": i})).collect(),
516 };
517 let ctx = std::collections::HashMap::new();
518 let mut pages = source.stream_pages(&ctx, 0);
519 let mut collected = Vec::new();
520 while let Some(page) = pages.next().await {
521 collected.push(page.unwrap());
522 }
523 assert_eq!(
524 collected.len(),
525 1,
526 "batch_size=0 must emit exactly one page"
527 );
528 assert_eq!(collected[0].records.len(), 50_000);
529 }
530
531 #[tokio::test]
532 async fn default_stream_pages_batch_size_zero_attaches_bookmark_to_sole_page() {
533 let source = IncrementalSource {
534 records: (0..3).map(|i| json!({"i": i})).collect(),
535 bookmark: json!("v1"),
536 };
537 let ctx = std::collections::HashMap::new();
538 let mut pages = source.stream_pages(&ctx, 0);
539 let page = pages.next().await.unwrap().unwrap();
540 assert_eq!(page.records.len(), 3);
541 assert_eq!(page.bookmark, Some(json!("v1")));
542 assert!(pages.next().await.is_none());
543 }
544
545 #[tokio::test]
546 async fn default_stream_pages_empty_source_yields_no_pages() {
547 let source = MockSource { records: vec![] };
548 let ctx = std::collections::HashMap::new();
549 let mut pages = source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
550 assert!(pages.next().await.is_none());
551 }
552
553 #[tokio::test]
554 async fn default_stream_pages_empty_source_with_bookmark_yields_single_empty_page() {
555 let source = IncrementalSource {
556 records: vec![],
557 bookmark: json!("v0"),
558 };
559 let ctx = std::collections::HashMap::new();
560 let mut pages = source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
561 let mut collected = Vec::new();
562 while let Some(page) = pages.next().await {
563 collected.push(page.unwrap());
564 }
565 assert_eq!(collected.len(), 1);
568 assert!(collected[0].records.is_empty());
569 assert_eq!(collected[0].bookmark, Some(json!("v0")));
570 }
571
572 #[tokio::test]
573 async fn default_stream_pages_propagates_fetch_errors() {
574 let source = FailingSource;
575 let ctx = std::collections::HashMap::new();
576 let mut pages = source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
577 let first = pages.next().await.unwrap();
578 assert!(matches!(first, Err(FaucetError::Auth(_))));
579 }
580
581 #[test]
582 fn source_default_connector_name_is_stripped_type_name() {
583 let source = MockSource { records: vec![] };
586 assert_eq!(source.connector_name(), "MockSource");
587 }
588
589 #[test]
590 fn sink_default_connector_name_is_stripped_type_name() {
591 let sink = MockSink::new();
592 assert_eq!(sink.connector_name(), "MockSink");
593 }
594
595 #[tokio::test]
598 async fn default_write_batch_partial_success_returns_all_ok() {
599 let sink = MockSink::new();
600 let records = vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})];
601 let outcomes = sink.write_batch_partial(&records).await.unwrap();
602 assert_eq!(outcomes.len(), 3);
603 assert!(outcomes.iter().all(|o| o.is_ok()));
604 assert_eq!(sink.written.lock().unwrap().len(), 3);
605 }
606
607 #[tokio::test]
608 async fn default_write_batch_partial_bubbles_outer_err() {
609 let sink = FailingSink;
610 let records = vec![json!({"id": 1}), json!({"id": 2})];
611 let result = sink.write_batch_partial(&records).await;
612 assert!(matches!(result, Err(FaucetError::Sink(_))));
613 }
614
615 #[tokio::test]
616 async fn default_write_batch_partial_empty_returns_empty_vec() {
617 let sink = MockSink::new();
618 let outcomes = sink.write_batch_partial(&[]).await.unwrap();
619 assert!(outcomes.is_empty());
620 }
621
622 #[tokio::test]
623 async fn default_write_batch_partial_callable_through_trait_object() {
624 let sink: Box<dyn Sink> = Box::new(MockSink::new());
625 let records = vec![json!({"id": 1}), json!({"id": 2})];
626 let outcomes = sink.write_batch_partial(&records).await.unwrap();
627 assert_eq!(outcomes.len(), 2);
628 assert!(outcomes.iter().all(|o| o.is_ok()));
629 }
630
631 #[tokio::test]
634 async fn source_default_check_pulls_first_page_and_passes() {
635 let source = MockSource {
636 records: vec![json!({"id": 1}), json!({"id": 2})],
637 };
638 let report = source
639 .check(&crate::check::CheckContext::default())
640 .await
641 .unwrap();
642 assert_eq!(report.failed_count(), 0);
643 assert!(
644 report
645 .probes
646 .iter()
647 .any(|p| p.name == "read" && matches!(p.status, crate::check::ProbeStatus::Pass))
648 );
649 }
650
651 #[tokio::test]
652 async fn source_default_check_passes_on_empty_source() {
653 let source = MockSource { records: vec![] };
654 let report = source
655 .check(&crate::check::CheckContext::default())
656 .await
657 .unwrap();
658 assert_eq!(report.failed_count(), 0);
660 }
661
662 #[tokio::test]
663 async fn source_default_check_fails_when_fetch_errors() {
664 let source = FailingSource;
665 let report = source
666 .check(&crate::check::CheckContext::default())
667 .await
668 .unwrap();
669 assert_eq!(report.failed_count(), 1);
670 assert!(report.probes.iter().any(
671 |p| p.name == "read" && matches!(p.status, crate::check::ProbeStatus::Fail { .. })
672 ));
673 }
674
675 #[tokio::test]
676 async fn sink_default_check_is_not_implemented_skip() {
677 let sink = MockSink::new();
678 let report = sink
679 .check(&crate::check::CheckContext::default())
680 .await
681 .unwrap();
682 assert_eq!(report.probes.len(), 1);
683 assert!(matches!(
684 report.probes[0].status,
685 crate::check::ProbeStatus::Skip { .. }
686 ));
687 }
688
689 #[tokio::test]
690 async fn source_check_callable_through_trait_object() {
691 let source: Box<dyn Source> = Box::new(MockSource {
692 records: vec![json!({"id": 1})],
693 });
694 let report = source
695 .check(&crate::check::CheckContext::default())
696 .await
697 .unwrap();
698 assert_eq!(report.failed_count(), 0);
699 }
700}