Skip to main content

faucet_core/
traits.rs

1//! Shared traits for faucet sources and sinks.
2
3use crate::error::FaucetError;
4use crate::pipeline::StreamPage;
5use async_trait::async_trait;
6use futures_core::Stream;
7use serde_json::Value;
8use std::pin::Pin;
9
10/// A source fetches records from an external system.
11#[async_trait]
12pub trait Source: Send + Sync {
13    /// Primary fetch method. Receives context from a parent source's records.
14    ///
15    /// An empty context map means this is a root source (no parent).
16    /// Connectors that support being a child should use
17    /// [`substitute_context()`](crate::util::substitute_context) to resolve
18    /// `{placeholder}` tokens in their URL path, query parameters, headers,
19    /// or body. Connectors that don't need parent context ignore the map.
20    async fn fetch_with_context(
21        &self,
22        context: &std::collections::HashMap<String, Value>,
23    ) -> Result<Vec<Value>, FaucetError>;
24
25    /// Convenience: fetch with no parent context.
26    async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
27        self.fetch_with_context(&std::collections::HashMap::new())
28            .await
29    }
30
31    /// Incremental fetch with parent context support.
32    ///
33    /// Returns the records and an optional bookmark value for incremental
34    /// replication. The default delegates to `fetch_with_context` and
35    /// returns `None` for the bookmark.
36    async fn fetch_with_context_incremental(
37        &self,
38        context: &std::collections::HashMap<String, Value>,
39    ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
40        let records = self.fetch_with_context(context).await?;
41        Ok((records, None))
42    }
43
44    /// Convenience: incremental fetch with no parent context.
45    async fn fetch_all_incremental(&self) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
46        self.fetch_with_context_incremental(&std::collections::HashMap::new())
47            .await
48    }
49
50    /// Stream records page-by-page so the pipeline can write to the sink as
51    /// pages arrive instead of buffering the full result set.
52    ///
53    /// `batch_size` is the *hint* the pipeline passes down; sources are free
54    /// to use a larger or smaller native chunk (e.g. one page per HTTP
55    /// response, one row-group per Parquet file) but should approximate it
56    /// where feasible. The special value `batch_size = 0` means "do not
57    /// batch — emit the entire result set in a single page." Sources that
58    /// stream natively should treat `0` as "skip the chunking layer and
59    /// yield one page after the underlying read completes" (useful for
60    /// small lookup tables or for sinks like SQL `COPY` / BigQuery load
61    /// jobs that prefer one large request).
62    ///
63    /// The default implementation fetches the full result set via
64    /// [`fetch_with_context_incremental`](Self::fetch_with_context_incremental)
65    /// and chunks it in memory by `batch_size`. The bookmark (when present)
66    /// is attached to the *final* page so the pipeline only persists after
67    /// the entire fetch has been written. Sources that can stream natively
68    /// override this method and may emit per-page bookmarks (e.g. CDC).
69    ///
70    /// An empty result with a `Some(bookmark)` still yields one empty page
71    /// carrying the bookmark, so incremental runs that produce no records
72    /// still advance their checkpoint.
73    fn stream_pages<'a>(
74        &'a self,
75        context: &'a std::collections::HashMap<String, Value>,
76        batch_size: usize,
77    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
78        Box::pin(async_stream::try_stream! {
79            let (records, bookmark) = self
80                .fetch_with_context_incremental(context)
81                .await?;
82            let total = records.len();
83            // batch_size == 0 means "no batching" — emit all records as one
84            // page. Otherwise chunk into pages of size `batch_size`.
85            let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
86
87            if total == 0 {
88                if bookmark.is_some() {
89                    yield StreamPage {
90                        records: Vec::new(),
91                        bookmark,
92                    };
93                }
94                return;
95            }
96
97            let mut iter = records.into_iter();
98            let mut consumed = 0usize;
99            loop {
100                let batch: Vec<Value> = iter.by_ref().take(chunk).collect();
101                if batch.is_empty() {
102                    break;
103                }
104                consumed += batch.len();
105                let page_bookmark = if consumed >= total {
106                    bookmark.clone()
107                } else {
108                    None
109                };
110                yield StreamPage {
111                    records: batch,
112                    bookmark: page_bookmark,
113                };
114            }
115        })
116    }
117
118    /// Return a JSON Schema describing the configuration this source accepts.
119    fn config_schema(&self) -> Value {
120        serde_json::json!({"type": "object", "properties": {}})
121    }
122
123    /// Stable key under which this source's incremental-replication bookmark
124    /// should be persisted in a [`StateStore`](crate::state::StateStore).
125    ///
126    /// Returning `Some(key)` opts this source into resumable runs: when the
127    /// pipeline is configured with a state store via
128    /// [`Pipeline::with_state_store`](crate::Pipeline::with_state_store), it
129    /// reads the bookmark at `key` before fetching and writes the new
130    /// bookmark back only after the sink confirms the batch was written.
131    ///
132    /// The default returns `None`, meaning the source is not persisted.
133    /// Keys must satisfy [`validate_state_key`](crate::state::validate_state_key).
134    fn state_key(&self) -> Option<String> {
135        None
136    }
137
138    /// Apply a bookmark loaded from a [`StateStore`](crate::state::StateStore)
139    /// as this run's starting point.
140    ///
141    /// The default implementation ignores the value, which keeps existing
142    /// sources backwards-compatible. Sources that support incremental
143    /// replication override this — typically by storing the value behind
144    /// interior mutability and consulting it inside
145    /// `fetch_with_context_incremental`.
146    async fn apply_start_bookmark(&self, _bookmark: Value) -> Result<(), FaucetError> {
147        Ok(())
148    }
149
150    /// Stable identifier used as the `connector` label on metrics and the
151    /// `connector` attribute on spans. Defaults to the final segment of
152    /// `std::any::type_name::<Self>()`, e.g. `"RestSource"`. Built-in
153    /// connectors override with a short, friendly snake_case name (e.g.
154    /// `"rest"`). Must return a non-empty string; observability decorators
155    /// fall back to `"unknown"` in release builds if it is empty (and
156    /// `debug_assert!` in debug builds).
157    fn connector_name(&self) -> &'static str {
158        crate::observability::strip_type_name(std::any::type_name::<Self>())
159    }
160
161    /// Run a fast, non-mutating preflight probe (used by `faucet doctor`).
162    ///
163    /// The default pulls a **single page** via
164    /// [`stream_pages`](Self::stream_pages) and reports success/failure — it
165    /// exercises the real read path (DNS, TLS, auth, the first request, the
166    /// first-record decode) but never paginates the full dataset and never
167    /// repeats. The page stream is dropped immediately after the first page.
168    ///
169    /// Sources whose first page *blocks* waiting for inbound data (webhook,
170    /// websocket) or has *side effects* (CDC consuming WAL) override this with a
171    /// cheaper, side-effect-free probe. Probe-level failures are returned as a
172    /// [`ProbeStatus::Fail`](crate::check::ProbeStatus) inside `Ok(report)`.
173    async fn check(
174        &self,
175        ctx: &crate::check::CheckContext,
176    ) -> Result<crate::check::CheckReport, FaucetError> {
177        use crate::check::{CheckReport, Probe};
178        use futures::StreamExt;
179
180        let empty = std::collections::HashMap::new();
181        let start = std::time::Instant::now();
182        let mut pages = self.stream_pages(&empty, 1);
183        let probe = match tokio::time::timeout(ctx.timeout, pages.next()).await {
184            Err(_) => Probe::fail("read", start.elapsed(), "timed out fetching first page"),
185            Ok(None) | Ok(Some(Ok(_))) => Probe::pass("read", start.elapsed()),
186            Ok(Some(Err(e))) => Probe::fail("read", start.elapsed(), e.to_string()),
187        };
188        Ok(CheckReport::single(probe))
189    }
190}
191
192/// Per-row outcome from [`Sink::write_batch_partial`].
193///
194/// `Ok(())` — the row was durably written to the sink.
195/// `Err(_)` — the row failed; the pipeline will route it to the DLQ when
196/// one is configured.
197pub type RowOutcome = Result<(), FaucetError>;
198
199/// A sink writes records to an external system.
200#[async_trait]
201pub trait Sink: Send + Sync {
202    /// Write a batch of records to the destination.
203    ///
204    /// Returns the number of records successfully written.
205    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError>;
206
207    /// Flush any buffered data to the destination.
208    ///
209    /// The default implementation is a no-op (suitable for sinks that
210    /// write immediately in `write_batch`).
211    async fn flush(&self) -> Result<(), FaucetError> {
212        Ok(())
213    }
214
215    /// Write a batch and report per-row outcomes.
216    ///
217    /// Sinks whose underlying API exposes per-row results (BigQuery
218    /// `insertAll`, Elasticsearch `_bulk`) override this. The default
219    /// implementation delegates to [`Self::write_batch`] and maps a single success
220    /// onto a uniform all-`Ok(())` vector. An outer failure is bubbled up
221    /// unchanged so the pipeline's DLQ router can apply its `on_batch_error`
222    /// policy at a single decision point.
223    async fn write_batch_partial(&self, records: &[Value]) -> Result<Vec<RowOutcome>, FaucetError> {
224        self.write_batch(records).await?;
225        Ok(records.iter().map(|_| Ok(())).collect())
226    }
227
228    /// Return a JSON Schema describing the configuration this sink accepts.
229    ///
230    /// The schema is auto-generated from the config struct using `schemars`.
231    /// Callers can inspect it to discover required fields, types, defaults,
232    /// and descriptions before constructing the sink.
233    ///
234    /// The default returns an empty object schema.
235    fn config_schema(&self) -> Value {
236        serde_json::json!({"type": "object", "properties": {}})
237    }
238
239    /// Stable identifier used as the `connector` label on metrics and the
240    /// `connector` attribute on spans. See `Source::connector_name`.
241    fn connector_name(&self) -> &'static str {
242        crate::observability::strip_type_name(std::any::type_name::<Self>())
243    }
244
245    /// Run a fast, non-mutating preflight probe (used by `faucet doctor`).
246    ///
247    /// Unlike sources, a sink has no non-mutating "first page" equivalent
248    /// (`write_batch` mutates the destination), so the default returns
249    /// [`CheckReport::not_implemented`](crate::check::CheckReport::not_implemented).
250    /// Built-in sinks override this with a connect / auth / metadata probe.
251    ///
252    /// The probe **MUST be idempotent and side-effect-free** — no inserts, no
253    /// residual rows or objects — and must never put credentials or connection
254    /// strings in a probe `reason`/`hint`.
255    async fn check(
256        &self,
257        _ctx: &crate::check::CheckContext,
258    ) -> Result<crate::check::CheckReport, FaucetError> {
259        Ok(crate::check::CheckReport::not_implemented())
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use serde_json::json;
267
268    // ── Mock Source ──────────────────────────────────────────────────────────
269
270    struct MockSource {
271        records: Vec<Value>,
272    }
273
274    #[async_trait]
275    impl Source for MockSource {
276        async fn fetch_with_context(
277            &self,
278            _context: &std::collections::HashMap<String, Value>,
279        ) -> Result<Vec<Value>, FaucetError> {
280            Ok(self.records.clone())
281        }
282    }
283
284    struct IncrementalSource {
285        records: Vec<Value>,
286        bookmark: Value,
287    }
288
289    #[async_trait]
290    impl Source for IncrementalSource {
291        async fn fetch_with_context(
292            &self,
293            _context: &std::collections::HashMap<String, Value>,
294        ) -> Result<Vec<Value>, FaucetError> {
295            Ok(self.records.clone())
296        }
297
298        async fn fetch_with_context_incremental(
299            &self,
300            _context: &std::collections::HashMap<String, Value>,
301        ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
302            Ok((self.records.clone(), Some(self.bookmark.clone())))
303        }
304    }
305
306    struct FailingSource;
307
308    #[async_trait]
309    impl Source for FailingSource {
310        async fn fetch_with_context(
311            &self,
312            _context: &std::collections::HashMap<String, Value>,
313        ) -> Result<Vec<Value>, FaucetError> {
314            Err(FaucetError::Auth("no credentials".into()))
315        }
316    }
317
318    // ── Mock Sink ───────────────────────────────────────────────────────────
319
320    struct MockSink {
321        written: std::sync::Mutex<Vec<Value>>,
322    }
323
324    impl MockSink {
325        fn new() -> Self {
326            Self {
327                written: std::sync::Mutex::new(Vec::new()),
328            }
329        }
330    }
331
332    #[async_trait]
333    impl Sink for MockSink {
334        async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
335            let mut w = self.written.lock().unwrap();
336            w.extend(records.iter().cloned());
337            Ok(records.len())
338        }
339    }
340
341    struct FailingSink;
342
343    #[async_trait]
344    impl Sink for FailingSink {
345        async fn write_batch(&self, _records: &[Value]) -> Result<usize, FaucetError> {
346            Err(FaucetError::Sink("write failed".into()))
347        }
348    }
349
350    // ── Source tests ────────────────────────────────────────────────────────
351
352    #[tokio::test]
353    async fn source_fetch_all_returns_records() {
354        let source = MockSource {
355            records: vec![json!({"id": 1}), json!({"id": 2})],
356        };
357        let records = source.fetch_all().await.unwrap();
358        assert_eq!(records.len(), 2);
359        assert_eq!(records[0]["id"], 1);
360    }
361
362    #[tokio::test]
363    async fn source_fetch_all_empty() {
364        let source = MockSource { records: vec![] };
365        let records = source.fetch_all().await.unwrap();
366        assert!(records.is_empty());
367    }
368
369    #[tokio::test]
370    async fn source_default_incremental_returns_none_bookmark() {
371        let source = MockSource {
372            records: vec![json!({"id": 1})],
373        };
374        let (records, bookmark) = source.fetch_all_incremental().await.unwrap();
375        assert_eq!(records.len(), 1);
376        assert!(bookmark.is_none());
377    }
378
379    #[tokio::test]
380    async fn source_custom_incremental_returns_bookmark() {
381        let source = IncrementalSource {
382            records: vec![json!({"id": 1})],
383            bookmark: json!("2024-12-01"),
384        };
385        let (records, bookmark) = source.fetch_all_incremental().await.unwrap();
386        assert_eq!(records.len(), 1);
387        assert_eq!(bookmark, Some(json!("2024-12-01")));
388    }
389
390    #[tokio::test]
391    async fn source_error_propagates() {
392        let source = FailingSource;
393        let result = source.fetch_all().await;
394        assert!(result.is_err());
395        assert!(matches!(result, Err(FaucetError::Auth(_))));
396    }
397
398    #[tokio::test]
399    async fn source_as_trait_object() {
400        let source: Box<dyn Source> = Box::new(MockSource {
401            records: vec![json!({"id": 42})],
402        });
403        let records = source.fetch_all().await.unwrap();
404        assert_eq!(records[0]["id"], 42);
405    }
406
407    // ── Sink tests ──────────────────────────────────────────────────────────
408
409    #[tokio::test]
410    async fn sink_write_batch_returns_count() {
411        let sink = MockSink::new();
412        let records = vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})];
413        let count = sink.write_batch(&records).await.unwrap();
414        assert_eq!(count, 3);
415    }
416
417    #[tokio::test]
418    async fn sink_write_batch_empty() {
419        let sink = MockSink::new();
420        let count = sink.write_batch(&[]).await.unwrap();
421        assert_eq!(count, 0);
422    }
423
424    #[tokio::test]
425    async fn sink_accumulates_records() {
426        let sink = MockSink::new();
427        sink.write_batch(&[json!({"a": 1})]).await.unwrap();
428        sink.write_batch(&[json!({"b": 2})]).await.unwrap();
429        let written = sink.written.lock().unwrap();
430        assert_eq!(written.len(), 2);
431    }
432
433    #[tokio::test]
434    async fn sink_default_flush_is_noop() {
435        let sink = MockSink::new();
436        assert!(sink.flush().await.is_ok());
437    }
438
439    #[tokio::test]
440    async fn sink_error_propagates() {
441        let sink = FailingSink;
442        let result = sink.write_batch(&[json!({"id": 1})]).await;
443        assert!(result.is_err());
444        assert!(matches!(result, Err(FaucetError::Sink(_))));
445    }
446
447    #[tokio::test]
448    async fn sink_as_trait_object() {
449        let sink: Box<dyn Sink> = Box::new(MockSink::new());
450        let count = sink.write_batch(&[json!({"id": 1})]).await.unwrap();
451        assert_eq!(count, 1);
452    }
453
454    // ── stream_pages tests ──────────────────────────────────────────────────
455
456    use crate::pipeline::DEFAULT_BATCH_SIZE;
457    use futures::StreamExt;
458
459    #[tokio::test]
460    async fn default_stream_pages_chunks_records() {
461        let source = MockSource {
462            records: (0..5).map(|i| json!({"i": i})).collect(),
463        };
464        let ctx = std::collections::HashMap::new();
465        let mut pages = source.stream_pages(&ctx, 2);
466        let mut all = Vec::new();
467        while let Some(page) = pages.next().await {
468            all.push(page.unwrap());
469        }
470        // 5 records, batch_size=2 → pages of [2, 2, 1]
471        assert_eq!(all.len(), 3);
472        assert_eq!(all[0].records.len(), 2);
473        assert_eq!(all[1].records.len(), 2);
474        assert_eq!(all[2].records.len(), 1);
475    }
476
477    #[tokio::test]
478    async fn default_stream_pages_attaches_bookmark_to_final_page_only() {
479        let source = IncrementalSource {
480            records: (0..5).map(|i| json!({"i": i})).collect(),
481            bookmark: json!("v1"),
482        };
483        let ctx = std::collections::HashMap::new();
484        let mut pages = source.stream_pages(&ctx, 2);
485        let mut collected = Vec::new();
486        while let Some(page) = pages.next().await {
487            collected.push(page.unwrap());
488        }
489        assert_eq!(collected.len(), 3);
490        assert!(collected[0].bookmark.is_none());
491        assert!(collected[1].bookmark.is_none());
492        assert_eq!(collected[2].bookmark, Some(json!("v1")));
493    }
494
495    #[tokio::test]
496    async fn default_stream_pages_single_page_when_batch_size_exceeds_total() {
497        let source = MockSource {
498            records: vec![json!({"id": 1}), json!({"id": 2})],
499        };
500        let ctx = std::collections::HashMap::new();
501        let mut pages = source.stream_pages(&ctx, 100);
502        let mut collected = Vec::new();
503        while let Some(page) = pages.next().await {
504            collected.push(page.unwrap());
505        }
506        assert_eq!(collected.len(), 1);
507        assert_eq!(collected[0].records.len(), 2);
508    }
509
510    #[tokio::test]
511    async fn default_stream_pages_batch_size_zero_emits_single_page() {
512        // batch_size = 0 is the "no batching" sentinel — yields every record
513        // in one page regardless of total count.
514        let source = MockSource {
515            records: (0..50_000).map(|i| json!({"i": i})).collect(),
516        };
517        let ctx = std::collections::HashMap::new();
518        let mut pages = source.stream_pages(&ctx, 0);
519        let mut collected = Vec::new();
520        while let Some(page) = pages.next().await {
521            collected.push(page.unwrap());
522        }
523        assert_eq!(
524            collected.len(),
525            1,
526            "batch_size=0 must emit exactly one page"
527        );
528        assert_eq!(collected[0].records.len(), 50_000);
529    }
530
531    #[tokio::test]
532    async fn default_stream_pages_batch_size_zero_attaches_bookmark_to_sole_page() {
533        let source = IncrementalSource {
534            records: (0..3).map(|i| json!({"i": i})).collect(),
535            bookmark: json!("v1"),
536        };
537        let ctx = std::collections::HashMap::new();
538        let mut pages = source.stream_pages(&ctx, 0);
539        let page = pages.next().await.unwrap().unwrap();
540        assert_eq!(page.records.len(), 3);
541        assert_eq!(page.bookmark, Some(json!("v1")));
542        assert!(pages.next().await.is_none());
543    }
544
545    #[tokio::test]
546    async fn default_stream_pages_empty_source_yields_no_pages() {
547        let source = MockSource { records: vec![] };
548        let ctx = std::collections::HashMap::new();
549        let mut pages = source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
550        assert!(pages.next().await.is_none());
551    }
552
553    #[tokio::test]
554    async fn default_stream_pages_empty_source_with_bookmark_yields_single_empty_page() {
555        let source = IncrementalSource {
556            records: vec![],
557            bookmark: json!("v0"),
558        };
559        let ctx = std::collections::HashMap::new();
560        let mut pages = source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
561        let mut collected = Vec::new();
562        while let Some(page) = pages.next().await {
563            collected.push(page.unwrap());
564        }
565        // One empty-records page that carries the bookmark, so the pipeline
566        // still persists progress on otherwise-empty incremental runs.
567        assert_eq!(collected.len(), 1);
568        assert!(collected[0].records.is_empty());
569        assert_eq!(collected[0].bookmark, Some(json!("v0")));
570    }
571
572    #[tokio::test]
573    async fn default_stream_pages_propagates_fetch_errors() {
574        let source = FailingSource;
575        let ctx = std::collections::HashMap::new();
576        let mut pages = source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
577        let first = pages.next().await.unwrap();
578        assert!(matches!(first, Err(FaucetError::Auth(_))));
579    }
580
581    #[test]
582    fn source_default_connector_name_is_stripped_type_name() {
583        // MockSource lives at `faucet_core::traits::tests::MockSource`; the
584        // stripped type_name yields the trailing segment.
585        let source = MockSource { records: vec![] };
586        assert_eq!(source.connector_name(), "MockSource");
587    }
588
589    #[test]
590    fn sink_default_connector_name_is_stripped_type_name() {
591        let sink = MockSink::new();
592        assert_eq!(sink.connector_name(), "MockSink");
593    }
594
595    // ── write_batch_partial tests ───────────────────────────────────────────
596
597    #[tokio::test]
598    async fn default_write_batch_partial_success_returns_all_ok() {
599        let sink = MockSink::new();
600        let records = vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})];
601        let outcomes = sink.write_batch_partial(&records).await.unwrap();
602        assert_eq!(outcomes.len(), 3);
603        assert!(outcomes.iter().all(|o| o.is_ok()));
604        assert_eq!(sink.written.lock().unwrap().len(), 3);
605    }
606
607    #[tokio::test]
608    async fn default_write_batch_partial_bubbles_outer_err() {
609        let sink = FailingSink;
610        let records = vec![json!({"id": 1}), json!({"id": 2})];
611        let result = sink.write_batch_partial(&records).await;
612        assert!(matches!(result, Err(FaucetError::Sink(_))));
613    }
614
615    #[tokio::test]
616    async fn default_write_batch_partial_empty_returns_empty_vec() {
617        let sink = MockSink::new();
618        let outcomes = sink.write_batch_partial(&[]).await.unwrap();
619        assert!(outcomes.is_empty());
620    }
621
622    #[tokio::test]
623    async fn default_write_batch_partial_callable_through_trait_object() {
624        let sink: Box<dyn Sink> = Box::new(MockSink::new());
625        let records = vec![json!({"id": 1}), json!({"id": 2})];
626        let outcomes = sink.write_batch_partial(&records).await.unwrap();
627        assert_eq!(outcomes.len(), 2);
628        assert!(outcomes.iter().all(|o| o.is_ok()));
629    }
630
631    // ── check() tests ─────────────────────────────────────────────────────────
632
633    #[tokio::test]
634    async fn source_default_check_pulls_first_page_and_passes() {
635        let source = MockSource {
636            records: vec![json!({"id": 1}), json!({"id": 2})],
637        };
638        let report = source
639            .check(&crate::check::CheckContext::default())
640            .await
641            .unwrap();
642        assert_eq!(report.failed_count(), 0);
643        assert!(
644            report
645                .probes
646                .iter()
647                .any(|p| p.name == "read" && matches!(p.status, crate::check::ProbeStatus::Pass))
648        );
649    }
650
651    #[tokio::test]
652    async fn source_default_check_passes_on_empty_source() {
653        let source = MockSource { records: vec![] };
654        let report = source
655            .check(&crate::check::CheckContext::default())
656            .await
657            .unwrap();
658        // Reachable but empty is still a healthy source.
659        assert_eq!(report.failed_count(), 0);
660    }
661
662    #[tokio::test]
663    async fn source_default_check_fails_when_fetch_errors() {
664        let source = FailingSource;
665        let report = source
666            .check(&crate::check::CheckContext::default())
667            .await
668            .unwrap();
669        assert_eq!(report.failed_count(), 1);
670        assert!(report.probes.iter().any(
671            |p| p.name == "read" && matches!(p.status, crate::check::ProbeStatus::Fail { .. })
672        ));
673    }
674
675    #[tokio::test]
676    async fn sink_default_check_is_not_implemented_skip() {
677        let sink = MockSink::new();
678        let report = sink
679            .check(&crate::check::CheckContext::default())
680            .await
681            .unwrap();
682        assert_eq!(report.probes.len(), 1);
683        assert!(matches!(
684            report.probes[0].status,
685            crate::check::ProbeStatus::Skip { .. }
686        ));
687    }
688
689    #[tokio::test]
690    async fn source_check_callable_through_trait_object() {
691        let source: Box<dyn Source> = Box::new(MockSource {
692            records: vec![json!({"id": 1})],
693        });
694        let report = source
695            .check(&crate::check::CheckContext::default())
696            .await
697            .unwrap();
698        assert_eq!(report.failed_count(), 0);
699    }
700}