1use crate::error::FaucetError;
4use crate::pipeline::StreamPage;
5use async_trait::async_trait;
6use futures_core::Stream;
7use serde_json::Value;
8use std::pin::Pin;
9
10#[async_trait]
12pub trait Source: Send + Sync {
13 async fn fetch_with_context(
21 &self,
22 context: &std::collections::HashMap<String, Value>,
23 ) -> Result<Vec<Value>, FaucetError>;
24
25 async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
27 self.fetch_with_context(&std::collections::HashMap::new())
28 .await
29 }
30
31 async fn fetch_with_context_incremental(
37 &self,
38 context: &std::collections::HashMap<String, Value>,
39 ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
40 let records = self.fetch_with_context(context).await?;
41 Ok((records, None))
42 }
43
44 async fn fetch_all_incremental(&self) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
46 self.fetch_with_context_incremental(&std::collections::HashMap::new())
47 .await
48 }
49
50 fn stream_pages<'a>(
74 &'a self,
75 context: &'a std::collections::HashMap<String, Value>,
76 batch_size: usize,
77 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
78 Box::pin(async_stream::try_stream! {
79 let (records, bookmark) = self
80 .fetch_with_context_incremental(context)
81 .await?;
82 let total = records.len();
83 let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
86
87 if total == 0 {
88 if bookmark.is_some() {
89 yield StreamPage {
90 records: Vec::new(),
91 bookmark,
92 };
93 }
94 return;
95 }
96
97 let mut iter = records.into_iter();
98 let mut consumed = 0usize;
99 loop {
100 let batch: Vec<Value> = iter.by_ref().take(chunk).collect();
101 if batch.is_empty() {
102 break;
103 }
104 consumed += batch.len();
105 let page_bookmark = if consumed >= total {
106 bookmark.clone()
107 } else {
108 None
109 };
110 yield StreamPage {
111 records: batch,
112 bookmark: page_bookmark,
113 };
114 }
115 })
116 }
117
118 fn config_schema(&self) -> Value {
120 serde_json::json!({"type": "object", "properties": {}})
121 }
122
123 fn state_key(&self) -> Option<String> {
135 None
136 }
137
138 async fn apply_start_bookmark(&self, _bookmark: Value) -> Result<(), FaucetError> {
147 Ok(())
148 }
149
150 fn supports_exactly_once(&self) -> bool {
159 false
160 }
161
162 fn connector_name(&self) -> &'static str {
170 crate::observability::strip_type_name(std::any::type_name::<Self>())
171 }
172
173 fn dataset_uri(&self) -> String {
181 format!("{}://unknown", self.connector_name())
182 }
183
184 async fn check(
197 &self,
198 ctx: &crate::check::CheckContext,
199 ) -> Result<crate::check::CheckReport, FaucetError> {
200 use crate::check::{CheckReport, Probe};
201 use futures::StreamExt;
202
203 let empty = std::collections::HashMap::new();
204 let start = std::time::Instant::now();
205 let mut pages = self.stream_pages(&empty, 1);
206 let probe = match tokio::time::timeout(ctx.timeout, pages.next()).await {
207 Err(_) => Probe::fail("read", start.elapsed(), "timed out fetching first page"),
208 Ok(None) | Ok(Some(Ok(_))) => Probe::pass("read", start.elapsed()),
209 Ok(Some(Err(e))) => Probe::fail("read", start.elapsed(), e.to_string()),
210 };
211 Ok(CheckReport::single(probe))
212 }
213}
214
215pub type RowOutcome = Result<(), FaucetError>;
221
222#[async_trait]
224pub trait Sink: Send + Sync {
225 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError>;
229
230 async fn flush(&self) -> Result<(), FaucetError> {
235 Ok(())
236 }
237
238 async fn write_batch_partial(&self, records: &[Value]) -> Result<Vec<RowOutcome>, FaucetError> {
247 self.write_batch(records).await?;
248 Ok(records.iter().map(|_| Ok(())).collect())
249 }
250
251 fn supports_idempotent_writes(&self) -> bool {
258 false
259 }
260
261 fn supported_write_modes(&self) -> &'static [crate::write_mode::WriteMode] {
267 &[crate::write_mode::WriteMode::Append]
268 }
269
270 async fn write_batch_idempotent(
280 &self,
281 records: &[Value],
282 scope: &str,
283 token: &str,
284 ) -> Result<usize, FaucetError> {
285 let _ = (scope, token);
286 self.write_batch(records).await
287 }
288
289 async fn last_committed_token(&self, scope: &str) -> Result<Option<String>, FaucetError> {
292 let _ = scope;
293 Ok(None)
294 }
295
296 fn config_schema(&self) -> Value {
304 serde_json::json!({"type": "object", "properties": {}})
305 }
306
307 fn connector_name(&self) -> &'static str {
310 crate::observability::strip_type_name(std::any::type_name::<Self>())
311 }
312
313 fn dataset_uri(&self) -> String {
321 format!("{}://unknown", self.connector_name())
322 }
323
324 async fn check(
335 &self,
336 _ctx: &crate::check::CheckContext,
337 ) -> Result<crate::check::CheckReport, FaucetError> {
338 Ok(crate::check::CheckReport::not_implemented())
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use serde_json::json;
346
347 struct MockSource {
350 records: Vec<Value>,
351 }
352
353 #[async_trait]
354 impl Source for MockSource {
355 async fn fetch_with_context(
356 &self,
357 _context: &std::collections::HashMap<String, Value>,
358 ) -> Result<Vec<Value>, FaucetError> {
359 Ok(self.records.clone())
360 }
361 }
362
363 struct IncrementalSource {
364 records: Vec<Value>,
365 bookmark: Value,
366 }
367
368 #[async_trait]
369 impl Source for IncrementalSource {
370 async fn fetch_with_context(
371 &self,
372 _context: &std::collections::HashMap<String, Value>,
373 ) -> Result<Vec<Value>, FaucetError> {
374 Ok(self.records.clone())
375 }
376
377 async fn fetch_with_context_incremental(
378 &self,
379 _context: &std::collections::HashMap<String, Value>,
380 ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
381 Ok((self.records.clone(), Some(self.bookmark.clone())))
382 }
383 }
384
385 struct FailingSource;
386
387 #[async_trait]
388 impl Source for FailingSource {
389 async fn fetch_with_context(
390 &self,
391 _context: &std::collections::HashMap<String, Value>,
392 ) -> Result<Vec<Value>, FaucetError> {
393 Err(FaucetError::Auth("no credentials".into()))
394 }
395 }
396
397 struct MockSink {
400 written: std::sync::Mutex<Vec<Value>>,
401 }
402
403 impl MockSink {
404 fn new() -> Self {
405 Self {
406 written: std::sync::Mutex::new(Vec::new()),
407 }
408 }
409 }
410
411 #[async_trait]
412 impl Sink for MockSink {
413 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
414 let mut w = self.written.lock().unwrap();
415 w.extend(records.iter().cloned());
416 Ok(records.len())
417 }
418 }
419
420 struct FailingSink;
421
422 #[async_trait]
423 impl Sink for FailingSink {
424 async fn write_batch(&self, _records: &[Value]) -> Result<usize, FaucetError> {
425 Err(FaucetError::Sink("write failed".into()))
426 }
427 }
428
429 #[tokio::test]
432 async fn source_fetch_all_returns_records() {
433 let source = MockSource {
434 records: vec![json!({"id": 1}), json!({"id": 2})],
435 };
436 let records = source.fetch_all().await.unwrap();
437 assert_eq!(records.len(), 2);
438 assert_eq!(records[0]["id"], 1);
439 }
440
441 #[tokio::test]
442 async fn source_fetch_all_empty() {
443 let source = MockSource { records: vec![] };
444 let records = source.fetch_all().await.unwrap();
445 assert!(records.is_empty());
446 }
447
448 #[tokio::test]
449 async fn source_default_incremental_returns_none_bookmark() {
450 let source = MockSource {
451 records: vec![json!({"id": 1})],
452 };
453 let (records, bookmark) = source.fetch_all_incremental().await.unwrap();
454 assert_eq!(records.len(), 1);
455 assert!(bookmark.is_none());
456 }
457
458 #[tokio::test]
459 async fn source_custom_incremental_returns_bookmark() {
460 let source = IncrementalSource {
461 records: vec![json!({"id": 1})],
462 bookmark: json!("2024-12-01"),
463 };
464 let (records, bookmark) = source.fetch_all_incremental().await.unwrap();
465 assert_eq!(records.len(), 1);
466 assert_eq!(bookmark, Some(json!("2024-12-01")));
467 }
468
469 #[tokio::test]
470 async fn source_error_propagates() {
471 let source = FailingSource;
472 let result = source.fetch_all().await;
473 assert!(result.is_err());
474 assert!(matches!(result, Err(FaucetError::Auth(_))));
475 }
476
477 #[tokio::test]
478 async fn source_as_trait_object() {
479 let source: Box<dyn Source> = Box::new(MockSource {
480 records: vec![json!({"id": 42})],
481 });
482 let records = source.fetch_all().await.unwrap();
483 assert_eq!(records[0]["id"], 42);
484 }
485
486 #[tokio::test]
489 async fn sink_write_batch_returns_count() {
490 let sink = MockSink::new();
491 let records = vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})];
492 let count = sink.write_batch(&records).await.unwrap();
493 assert_eq!(count, 3);
494 }
495
496 #[tokio::test]
497 async fn sink_write_batch_empty() {
498 let sink = MockSink::new();
499 let count = sink.write_batch(&[]).await.unwrap();
500 assert_eq!(count, 0);
501 }
502
503 #[tokio::test]
504 async fn sink_accumulates_records() {
505 let sink = MockSink::new();
506 sink.write_batch(&[json!({"a": 1})]).await.unwrap();
507 sink.write_batch(&[json!({"b": 2})]).await.unwrap();
508 let written = sink.written.lock().unwrap();
509 assert_eq!(written.len(), 2);
510 }
511
512 #[tokio::test]
513 async fn sink_default_flush_is_noop() {
514 let sink = MockSink::new();
515 assert!(sink.flush().await.is_ok());
516 }
517
518 #[tokio::test]
519 async fn sink_error_propagates() {
520 let sink = FailingSink;
521 let result = sink.write_batch(&[json!({"id": 1})]).await;
522 assert!(result.is_err());
523 assert!(matches!(result, Err(FaucetError::Sink(_))));
524 }
525
526 #[tokio::test]
527 async fn sink_as_trait_object() {
528 let sink: Box<dyn Sink> = Box::new(MockSink::new());
529 let count = sink.write_batch(&[json!({"id": 1})]).await.unwrap();
530 assert_eq!(count, 1);
531 }
532
533 use crate::pipeline::DEFAULT_BATCH_SIZE;
536 use futures::StreamExt;
537
538 #[tokio::test]
539 async fn default_stream_pages_chunks_records() {
540 let source = MockSource {
541 records: (0..5).map(|i| json!({"i": i})).collect(),
542 };
543 let ctx = std::collections::HashMap::new();
544 let mut pages = source.stream_pages(&ctx, 2);
545 let mut all = Vec::new();
546 while let Some(page) = pages.next().await {
547 all.push(page.unwrap());
548 }
549 assert_eq!(all.len(), 3);
551 assert_eq!(all[0].records.len(), 2);
552 assert_eq!(all[1].records.len(), 2);
553 assert_eq!(all[2].records.len(), 1);
554 }
555
556 #[tokio::test]
557 async fn default_stream_pages_attaches_bookmark_to_final_page_only() {
558 let source = IncrementalSource {
559 records: (0..5).map(|i| json!({"i": i})).collect(),
560 bookmark: json!("v1"),
561 };
562 let ctx = std::collections::HashMap::new();
563 let mut pages = source.stream_pages(&ctx, 2);
564 let mut collected = Vec::new();
565 while let Some(page) = pages.next().await {
566 collected.push(page.unwrap());
567 }
568 assert_eq!(collected.len(), 3);
569 assert!(collected[0].bookmark.is_none());
570 assert!(collected[1].bookmark.is_none());
571 assert_eq!(collected[2].bookmark, Some(json!("v1")));
572 }
573
574 #[tokio::test]
575 async fn default_stream_pages_single_page_when_batch_size_exceeds_total() {
576 let source = MockSource {
577 records: vec![json!({"id": 1}), json!({"id": 2})],
578 };
579 let ctx = std::collections::HashMap::new();
580 let mut pages = source.stream_pages(&ctx, 100);
581 let mut collected = Vec::new();
582 while let Some(page) = pages.next().await {
583 collected.push(page.unwrap());
584 }
585 assert_eq!(collected.len(), 1);
586 assert_eq!(collected[0].records.len(), 2);
587 }
588
589 #[tokio::test]
590 async fn default_stream_pages_batch_size_zero_emits_single_page() {
591 let source = MockSource {
594 records: (0..50_000).map(|i| json!({"i": i})).collect(),
595 };
596 let ctx = std::collections::HashMap::new();
597 let mut pages = source.stream_pages(&ctx, 0);
598 let mut collected = Vec::new();
599 while let Some(page) = pages.next().await {
600 collected.push(page.unwrap());
601 }
602 assert_eq!(
603 collected.len(),
604 1,
605 "batch_size=0 must emit exactly one page"
606 );
607 assert_eq!(collected[0].records.len(), 50_000);
608 }
609
610 #[tokio::test]
611 async fn default_stream_pages_batch_size_zero_attaches_bookmark_to_sole_page() {
612 let source = IncrementalSource {
613 records: (0..3).map(|i| json!({"i": i})).collect(),
614 bookmark: json!("v1"),
615 };
616 let ctx = std::collections::HashMap::new();
617 let mut pages = source.stream_pages(&ctx, 0);
618 let page = pages.next().await.unwrap().unwrap();
619 assert_eq!(page.records.len(), 3);
620 assert_eq!(page.bookmark, Some(json!("v1")));
621 assert!(pages.next().await.is_none());
622 }
623
624 #[tokio::test]
625 async fn default_stream_pages_empty_source_yields_no_pages() {
626 let source = MockSource { records: vec![] };
627 let ctx = std::collections::HashMap::new();
628 let mut pages = source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
629 assert!(pages.next().await.is_none());
630 }
631
632 #[tokio::test]
633 async fn default_stream_pages_empty_source_with_bookmark_yields_single_empty_page() {
634 let source = IncrementalSource {
635 records: vec![],
636 bookmark: json!("v0"),
637 };
638 let ctx = std::collections::HashMap::new();
639 let mut pages = source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
640 let mut collected = Vec::new();
641 while let Some(page) = pages.next().await {
642 collected.push(page.unwrap());
643 }
644 assert_eq!(collected.len(), 1);
647 assert!(collected[0].records.is_empty());
648 assert_eq!(collected[0].bookmark, Some(json!("v0")));
649 }
650
651 #[tokio::test]
652 async fn default_stream_pages_propagates_fetch_errors() {
653 let source = FailingSource;
654 let ctx = std::collections::HashMap::new();
655 let mut pages = source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
656 let first = pages.next().await.unwrap();
657 assert!(matches!(first, Err(FaucetError::Auth(_))));
658 }
659
660 #[test]
661 fn source_default_connector_name_is_stripped_type_name() {
662 let source = MockSource { records: vec![] };
665 assert_eq!(source.connector_name(), "MockSource");
666 }
667
668 #[test]
669 fn sink_default_connector_name_is_stripped_type_name() {
670 let sink = MockSink::new();
671 assert_eq!(sink.connector_name(), "MockSink");
672 }
673
674 #[test]
675 fn source_default_dataset_uri_uses_connector_name() {
676 let source = MockSource { records: vec![] };
677 assert_eq!(source.dataset_uri(), "MockSource://unknown");
678 }
679
680 #[test]
681 fn sink_default_dataset_uri_uses_connector_name() {
682 let sink = MockSink::new();
683 assert_eq!(sink.dataset_uri(), "MockSink://unknown");
684 }
685
686 #[tokio::test]
689 async fn default_write_batch_partial_success_returns_all_ok() {
690 let sink = MockSink::new();
691 let records = vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})];
692 let outcomes = sink.write_batch_partial(&records).await.unwrap();
693 assert_eq!(outcomes.len(), 3);
694 assert!(outcomes.iter().all(|o| o.is_ok()));
695 assert_eq!(sink.written.lock().unwrap().len(), 3);
696 }
697
698 #[tokio::test]
699 async fn default_write_batch_partial_bubbles_outer_err() {
700 let sink = FailingSink;
701 let records = vec![json!({"id": 1}), json!({"id": 2})];
702 let result = sink.write_batch_partial(&records).await;
703 assert!(matches!(result, Err(FaucetError::Sink(_))));
704 }
705
706 #[tokio::test]
707 async fn default_write_batch_partial_empty_returns_empty_vec() {
708 let sink = MockSink::new();
709 let outcomes = sink.write_batch_partial(&[]).await.unwrap();
710 assert!(outcomes.is_empty());
711 }
712
713 #[tokio::test]
714 async fn default_write_batch_partial_callable_through_trait_object() {
715 let sink: Box<dyn Sink> = Box::new(MockSink::new());
716 let records = vec![json!({"id": 1}), json!({"id": 2})];
717 let outcomes = sink.write_batch_partial(&records).await.unwrap();
718 assert_eq!(outcomes.len(), 2);
719 assert!(outcomes.iter().all(|o| o.is_ok()));
720 }
721
722 #[tokio::test]
725 async fn source_default_check_pulls_first_page_and_passes() {
726 let source = MockSource {
727 records: vec![json!({"id": 1}), json!({"id": 2})],
728 };
729 let report = source
730 .check(&crate::check::CheckContext::default())
731 .await
732 .unwrap();
733 assert_eq!(report.failed_count(), 0);
734 assert!(
735 report
736 .probes
737 .iter()
738 .any(|p| p.name == "read" && matches!(p.status, crate::check::ProbeStatus::Pass))
739 );
740 }
741
742 #[tokio::test]
743 async fn source_default_check_passes_on_empty_source() {
744 let source = MockSource { records: vec![] };
745 let report = source
746 .check(&crate::check::CheckContext::default())
747 .await
748 .unwrap();
749 assert_eq!(report.failed_count(), 0);
751 }
752
753 #[tokio::test]
754 async fn source_default_check_fails_when_fetch_errors() {
755 let source = FailingSource;
756 let report = source
757 .check(&crate::check::CheckContext::default())
758 .await
759 .unwrap();
760 assert_eq!(report.failed_count(), 1);
761 assert!(report.probes.iter().any(
762 |p| p.name == "read" && matches!(p.status, crate::check::ProbeStatus::Fail { .. })
763 ));
764 }
765
766 #[tokio::test]
767 async fn sink_default_check_is_not_implemented_skip() {
768 let sink = MockSink::new();
769 let report = sink
770 .check(&crate::check::CheckContext::default())
771 .await
772 .unwrap();
773 assert_eq!(report.probes.len(), 1);
774 assert!(matches!(
775 report.probes[0].status,
776 crate::check::ProbeStatus::Skip { .. }
777 ));
778 }
779
780 #[tokio::test]
781 async fn source_check_callable_through_trait_object() {
782 let source: Box<dyn Source> = Box::new(MockSource {
783 records: vec![json!({"id": 1})],
784 });
785 let report = source
786 .check(&crate::check::CheckContext::default())
787 .await
788 .unwrap();
789 assert_eq!(report.failed_count(), 0);
790 }
791
792 #[tokio::test]
795 async fn sink_default_is_not_idempotent() {
796 let sink = MockSink::new();
797 assert!(!sink.supports_idempotent_writes());
798 let n = sink
800 .write_batch_idempotent(&[json!({"id": 1})], "scope::a", "00000000000000000001")
801 .await
802 .unwrap();
803 assert_eq!(n, 1);
804 assert_eq!(sink.last_committed_token("scope::a").await.unwrap(), None);
805 assert_eq!(sink.written.lock().unwrap().len(), 1);
806 }
807
808 #[test]
809 fn source_default_does_not_support_exactly_once() {
810 let source = MockSource { records: vec![] };
811 assert!(!source.supports_exactly_once());
812 }
813
814 #[test]
815 fn sink_default_supported_write_modes_is_append_only() {
816 use crate::write_mode::WriteMode;
817 let sink = MockSink::new();
818 assert_eq!(sink.supported_write_modes(), &[WriteMode::Append]);
819 }
820
821 #[test]
822 fn supported_write_modes_callable_through_trait_object() {
823 use crate::write_mode::WriteMode;
824 let sink: Box<dyn Sink> = Box::new(MockSink::new());
825 assert!(sink.supported_write_modes().contains(&WriteMode::Append));
826 }
827}