Skip to main content

faucet_core/
traits.rs

1//! Shared traits for faucet sources and sinks.
2
3use crate::error::FaucetError;
4use crate::pipeline::StreamPage;
5use async_trait::async_trait;
6use futures_core::Stream;
7use serde_json::Value;
8use std::pin::Pin;
9
10/// A source fetches records from an external system.
11#[async_trait]
12pub trait Source: Send + Sync {
13    /// Primary fetch method. Receives context from a parent source's records.
14    ///
15    /// An empty context map means this is a root source (no parent).
16    /// Connectors that support being a child should use
17    /// [`substitute_context()`](crate::util::substitute_context) to resolve
18    /// `{placeholder}` tokens in their URL path, query parameters, headers,
19    /// or body. Connectors that don't need parent context ignore the map.
20    async fn fetch_with_context(
21        &self,
22        context: &std::collections::HashMap<String, Value>,
23    ) -> Result<Vec<Value>, FaucetError>;
24
25    /// Convenience: fetch with no parent context.
26    async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
27        self.fetch_with_context(&std::collections::HashMap::new())
28            .await
29    }
30
31    /// Incremental fetch with parent context support.
32    ///
33    /// Returns the records and an optional bookmark value for incremental
34    /// replication. The default delegates to `fetch_with_context` and
35    /// returns `None` for the bookmark.
36    async fn fetch_with_context_incremental(
37        &self,
38        context: &std::collections::HashMap<String, Value>,
39    ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
40        let records = self.fetch_with_context(context).await?;
41        Ok((records, None))
42    }
43
44    /// Convenience: incremental fetch with no parent context.
45    async fn fetch_all_incremental(&self) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
46        self.fetch_with_context_incremental(&std::collections::HashMap::new())
47            .await
48    }
49
50    /// Stream records page-by-page so the pipeline can write to the sink as
51    /// pages arrive instead of buffering the full result set.
52    ///
53    /// `batch_size` is the *hint* the pipeline passes down; sources are free
54    /// to use a larger or smaller native chunk (e.g. one page per HTTP
55    /// response, one row-group per Parquet file) but should approximate it
56    /// where feasible. The special value `batch_size = 0` means "do not
57    /// batch — emit the entire result set in a single page." Sources that
58    /// stream natively should treat `0` as "skip the chunking layer and
59    /// yield one page after the underlying read completes" (useful for
60    /// small lookup tables or for sinks like SQL `COPY` / BigQuery load
61    /// jobs that prefer one large request).
62    ///
63    /// The default implementation fetches the full result set via
64    /// [`fetch_with_context_incremental`](Self::fetch_with_context_incremental)
65    /// and chunks it in memory by `batch_size`. The bookmark (when present)
66    /// is attached to the *final* page so the pipeline only persists after
67    /// the entire fetch has been written. Sources that can stream natively
68    /// override this method and may emit per-page bookmarks (e.g. CDC).
69    ///
70    /// An empty result with a `Some(bookmark)` still yields one empty page
71    /// carrying the bookmark, so incremental runs that produce no records
72    /// still advance their checkpoint.
73    fn stream_pages<'a>(
74        &'a self,
75        context: &'a std::collections::HashMap<String, Value>,
76        batch_size: usize,
77    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
78        Box::pin(async_stream::try_stream! {
79            let (records, bookmark) = self
80                .fetch_with_context_incremental(context)
81                .await?;
82            let total = records.len();
83            // batch_size == 0 means "no batching" — emit all records as one
84            // page. Otherwise chunk into pages of size `batch_size`.
85            let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
86
87            if total == 0 {
88                if bookmark.is_some() {
89                    yield StreamPage {
90                        records: Vec::new(),
91                        bookmark,
92                    };
93                }
94                return;
95            }
96
97            let mut iter = records.into_iter();
98            let mut consumed = 0usize;
99            loop {
100                let batch: Vec<Value> = iter.by_ref().take(chunk).collect();
101                if batch.is_empty() {
102                    break;
103                }
104                consumed += batch.len();
105                let page_bookmark = if consumed >= total {
106                    bookmark.clone()
107                } else {
108                    None
109                };
110                yield StreamPage {
111                    records: batch,
112                    bookmark: page_bookmark,
113                };
114            }
115        })
116    }
117
118    /// Return a JSON Schema describing the configuration this source accepts.
119    fn config_schema(&self) -> Value {
120        serde_json::json!({"type": "object", "properties": {}})
121    }
122
123    /// Stable key under which this source's incremental-replication bookmark
124    /// should be persisted in a [`StateStore`](crate::state::StateStore).
125    ///
126    /// Returning `Some(key)` opts this source into resumable runs: when the
127    /// pipeline is configured with a state store via
128    /// [`Pipeline::with_state_store`](crate::Pipeline::with_state_store), it
129    /// reads the bookmark at `key` before fetching and writes the new
130    /// bookmark back only after the sink confirms the batch was written.
131    ///
132    /// The default returns `None`, meaning the source is not persisted.
133    /// Keys must satisfy [`validate_state_key`](crate::state::validate_state_key).
134    fn state_key(&self) -> Option<String> {
135        None
136    }
137
138    /// Apply a bookmark loaded from a [`StateStore`](crate::state::StateStore)
139    /// as this run's starting point.
140    ///
141    /// The default implementation ignores the value, which keeps existing
142    /// sources backwards-compatible. Sources that support incremental
143    /// replication override this — typically by storing the value behind
144    /// interior mutability and consulting it inside
145    /// `fetch_with_context_incremental`.
146    async fn apply_start_bookmark(&self, _bookmark: Value) -> Result<(), FaucetError> {
147        Ok(())
148    }
149
150    /// Whether this source **deterministically replays** the same page sequence
151    /// from a given bookmark — the requirement for exactly-once delivery (a
152    /// non-deterministic replay could cause the pipeline to skip a page whose
153    /// contents differ from the one already committed). Default: `false`.
154    ///
155    /// Sources with a durable monotonic position and per-page bookmarks (CDC)
156    /// override this to return `true`. The pipeline rejects
157    /// `DeliveryMode::ExactlyOnce` against a source that returns `false`.
158    fn supports_exactly_once(&self) -> bool {
159        false
160    }
161
162    /// Stable identifier used as the `connector` label on metrics and the
163    /// `connector` attribute on spans. Defaults to the final segment of
164    /// `std::any::type_name::<Self>()`, e.g. `"RestSource"`. Built-in
165    /// connectors override with a short, friendly snake_case name (e.g.
166    /// `"rest"`). Must return a non-empty string; observability decorators
167    /// fall back to `"unknown"` in release builds if it is empty (and
168    /// `debug_assert!` in debug builds).
169    fn connector_name(&self) -> &'static str {
170        crate::observability::strip_type_name(std::any::type_name::<Self>())
171    }
172
173    /// Logical dataset identity for lineage emission, following OpenLineage
174    /// naming conventions (<https://openlineage.io/docs/spec/naming>).
175    ///
176    /// The default returns `"<connector_name>://unknown"`. Built-in connectors
177    /// override with a credential-free URI derived from their config. Strip any
178    /// credentials with [`redact_uri_credentials`](crate::redact_uri_credentials).
179    /// Informational metadata only — never used for I/O.
180    fn dataset_uri(&self) -> String {
181        format!("{}://unknown", self.connector_name())
182    }
183
184    /// Run a fast, non-mutating preflight probe (used by `faucet doctor`).
185    ///
186    /// The default pulls a **single page** via
187    /// [`stream_pages`](Self::stream_pages) and reports success/failure — it
188    /// exercises the real read path (DNS, TLS, auth, the first request, the
189    /// first-record decode) but never paginates the full dataset and never
190    /// repeats. The page stream is dropped immediately after the first page.
191    ///
192    /// Sources whose first page *blocks* waiting for inbound data (webhook,
193    /// websocket) or has *side effects* (CDC consuming WAL) override this with a
194    /// cheaper, side-effect-free probe. Probe-level failures are returned as a
195    /// [`ProbeStatus::Fail`](crate::check::ProbeStatus) inside `Ok(report)`.
196    async fn check(
197        &self,
198        ctx: &crate::check::CheckContext,
199    ) -> Result<crate::check::CheckReport, FaucetError> {
200        use crate::check::{CheckReport, Probe};
201        use futures::StreamExt;
202
203        let empty = std::collections::HashMap::new();
204        let start = std::time::Instant::now();
205        let mut pages = self.stream_pages(&empty, 1);
206        let probe = match tokio::time::timeout(ctx.timeout, pages.next()).await {
207            Err(_) => Probe::fail("read", start.elapsed(), "timed out fetching first page"),
208            Ok(None) | Ok(Some(Ok(_))) => Probe::pass("read", start.elapsed()),
209            Ok(Some(Err(e))) => Probe::fail("read", start.elapsed(), e.to_string()),
210        };
211        Ok(CheckReport::single(probe))
212    }
213}
214
215/// Per-row outcome from [`Sink::write_batch_partial`].
216///
217/// `Ok(())` — the row was durably written to the sink.
218/// `Err(_)` — the row failed; the pipeline will route it to the DLQ when
219/// one is configured.
220pub type RowOutcome = Result<(), FaucetError>;
221
222/// A sink writes records to an external system.
223#[async_trait]
224pub trait Sink: Send + Sync {
225    /// Write a batch of records to the destination.
226    ///
227    /// Returns the number of records successfully written.
228    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError>;
229
230    /// Flush any buffered data to the destination.
231    ///
232    /// The default implementation is a no-op (suitable for sinks that
233    /// write immediately in `write_batch`).
234    async fn flush(&self) -> Result<(), FaucetError> {
235        Ok(())
236    }
237
238    /// Write a batch and report per-row outcomes.
239    ///
240    /// Sinks whose underlying API exposes per-row results (BigQuery
241    /// `insertAll`, Elasticsearch `_bulk`) override this. The default
242    /// implementation delegates to [`Self::write_batch`] and maps a single success
243    /// onto a uniform all-`Ok(())` vector. An outer failure is bubbled up
244    /// unchanged so the pipeline's DLQ router can apply its `on_batch_error`
245    /// policy at a single decision point.
246    async fn write_batch_partial(&self, records: &[Value]) -> Result<Vec<RowOutcome>, FaucetError> {
247        self.write_batch(records).await?;
248        Ok(records.iter().map(|_| Ok(())).collect())
249    }
250
251    /// Whether this sink can durably commit a page's rows **and** a commit token
252    /// in a single atomic transaction. Default: `false` (at-least-once only).
253    ///
254    /// Only return `true` from a sink that genuinely commits both atomically —
255    /// see [`write_batch_idempotent`](Self::write_batch_idempotent). The pipeline
256    /// rejects `DeliveryMode::ExactlyOnce` against a sink that returns `false`.
257    fn supports_idempotent_writes(&self) -> bool {
258        false
259    }
260
261    /// Write modes this sink can apply. Default: append-only. Sinks that
262    /// implement key-based merge override this to include
263    /// [`WriteMode::Upsert`](crate::write_mode::WriteMode) /
264    /// [`WriteMode::Delete`](crate::write_mode::WriteMode). The CLI rejects a
265    /// configured mode that is not in this set at config-load time.
266    fn supported_write_modes(&self) -> &'static [crate::write_mode::WriteMode] {
267        &[crate::write_mode::WriteMode::Append]
268    }
269
270    /// Write `records` AND durably record `token` for `scope`, atomically.
271    ///
272    /// `scope` namespaces the watermark (the pipeline passes the per-row state
273    /// key, e.g. `"{name}::{row_id}"`). `token` is a monotonic, fixed-width
274    /// string (see [`format_token`](crate::format_token)).
275    ///
276    /// The default is **not** idempotent: it ignores the token and delegates to
277    /// [`write_batch`](Self::write_batch). Override only when the commit is
278    /// genuinely atomic (and return `true` from `supports_idempotent_writes`).
279    async fn write_batch_idempotent(
280        &self,
281        records: &[Value],
282        scope: &str,
283        token: &str,
284    ) -> Result<usize, FaucetError> {
285        let _ = (scope, token);
286        self.write_batch(records).await
287    }
288
289    /// The last token durably committed for `scope`, or `None` if this sink has
290    /// never committed under that scope. Default: `None`.
291    async fn last_committed_token(&self, scope: &str) -> Result<Option<String>, FaucetError> {
292        let _ = scope;
293        Ok(None)
294    }
295
296    /// Return a JSON Schema describing the configuration this sink accepts.
297    ///
298    /// The schema is auto-generated from the config struct using `schemars`.
299    /// Callers can inspect it to discover required fields, types, defaults,
300    /// and descriptions before constructing the sink.
301    ///
302    /// The default returns an empty object schema.
303    fn config_schema(&self) -> Value {
304        serde_json::json!({"type": "object", "properties": {}})
305    }
306
307    /// Stable identifier used as the `connector` label on metrics and the
308    /// `connector` attribute on spans. See `Source::connector_name`.
309    fn connector_name(&self) -> &'static str {
310        crate::observability::strip_type_name(std::any::type_name::<Self>())
311    }
312
313    /// Logical dataset identity for lineage emission, following OpenLineage
314    /// naming conventions (<https://openlineage.io/docs/spec/naming>).
315    ///
316    /// The default returns `"<connector_name>://unknown"`. Built-in connectors
317    /// override with a credential-free URI derived from their config. Strip any
318    /// credentials with [`redact_uri_credentials`](crate::redact_uri_credentials).
319    /// Informational metadata only — never used for I/O.
320    fn dataset_uri(&self) -> String {
321        format!("{}://unknown", self.connector_name())
322    }
323
324    /// Run a fast, non-mutating preflight probe (used by `faucet doctor`).
325    ///
326    /// Unlike sources, a sink has no non-mutating "first page" equivalent
327    /// (`write_batch` mutates the destination), so the default returns
328    /// [`CheckReport::not_implemented`](crate::check::CheckReport::not_implemented).
329    /// Built-in sinks override this with a connect / auth / metadata probe.
330    ///
331    /// The probe **MUST be idempotent and side-effect-free** — no inserts, no
332    /// residual rows or objects — and must never put credentials or connection
333    /// strings in a probe `reason`/`hint`.
334    async fn check(
335        &self,
336        _ctx: &crate::check::CheckContext,
337    ) -> Result<crate::check::CheckReport, FaucetError> {
338        Ok(crate::check::CheckReport::not_implemented())
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use serde_json::json;
346
347    // ── Mock Source ──────────────────────────────────────────────────────────
348
349    struct MockSource {
350        records: Vec<Value>,
351    }
352
353    #[async_trait]
354    impl Source for MockSource {
355        async fn fetch_with_context(
356            &self,
357            _context: &std::collections::HashMap<String, Value>,
358        ) -> Result<Vec<Value>, FaucetError> {
359            Ok(self.records.clone())
360        }
361    }
362
363    struct IncrementalSource {
364        records: Vec<Value>,
365        bookmark: Value,
366    }
367
368    #[async_trait]
369    impl Source for IncrementalSource {
370        async fn fetch_with_context(
371            &self,
372            _context: &std::collections::HashMap<String, Value>,
373        ) -> Result<Vec<Value>, FaucetError> {
374            Ok(self.records.clone())
375        }
376
377        async fn fetch_with_context_incremental(
378            &self,
379            _context: &std::collections::HashMap<String, Value>,
380        ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
381            Ok((self.records.clone(), Some(self.bookmark.clone())))
382        }
383    }
384
385    struct FailingSource;
386
387    #[async_trait]
388    impl Source for FailingSource {
389        async fn fetch_with_context(
390            &self,
391            _context: &std::collections::HashMap<String, Value>,
392        ) -> Result<Vec<Value>, FaucetError> {
393            Err(FaucetError::Auth("no credentials".into()))
394        }
395    }
396
397    // ── Mock Sink ───────────────────────────────────────────────────────────
398
399    struct MockSink {
400        written: std::sync::Mutex<Vec<Value>>,
401    }
402
403    impl MockSink {
404        fn new() -> Self {
405            Self {
406                written: std::sync::Mutex::new(Vec::new()),
407            }
408        }
409    }
410
411    #[async_trait]
412    impl Sink for MockSink {
413        async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
414            let mut w = self.written.lock().unwrap();
415            w.extend(records.iter().cloned());
416            Ok(records.len())
417        }
418    }
419
420    struct FailingSink;
421
422    #[async_trait]
423    impl Sink for FailingSink {
424        async fn write_batch(&self, _records: &[Value]) -> Result<usize, FaucetError> {
425            Err(FaucetError::Sink("write failed".into()))
426        }
427    }
428
429    // ── Source tests ────────────────────────────────────────────────────────
430
431    #[tokio::test]
432    async fn source_fetch_all_returns_records() {
433        let source = MockSource {
434            records: vec![json!({"id": 1}), json!({"id": 2})],
435        };
436        let records = source.fetch_all().await.unwrap();
437        assert_eq!(records.len(), 2);
438        assert_eq!(records[0]["id"], 1);
439    }
440
441    #[tokio::test]
442    async fn source_fetch_all_empty() {
443        let source = MockSource { records: vec![] };
444        let records = source.fetch_all().await.unwrap();
445        assert!(records.is_empty());
446    }
447
448    #[tokio::test]
449    async fn source_default_incremental_returns_none_bookmark() {
450        let source = MockSource {
451            records: vec![json!({"id": 1})],
452        };
453        let (records, bookmark) = source.fetch_all_incremental().await.unwrap();
454        assert_eq!(records.len(), 1);
455        assert!(bookmark.is_none());
456    }
457
458    #[tokio::test]
459    async fn source_custom_incremental_returns_bookmark() {
460        let source = IncrementalSource {
461            records: vec![json!({"id": 1})],
462            bookmark: json!("2024-12-01"),
463        };
464        let (records, bookmark) = source.fetch_all_incremental().await.unwrap();
465        assert_eq!(records.len(), 1);
466        assert_eq!(bookmark, Some(json!("2024-12-01")));
467    }
468
469    #[tokio::test]
470    async fn source_error_propagates() {
471        let source = FailingSource;
472        let result = source.fetch_all().await;
473        assert!(result.is_err());
474        assert!(matches!(result, Err(FaucetError::Auth(_))));
475    }
476
477    #[tokio::test]
478    async fn source_as_trait_object() {
479        let source: Box<dyn Source> = Box::new(MockSource {
480            records: vec![json!({"id": 42})],
481        });
482        let records = source.fetch_all().await.unwrap();
483        assert_eq!(records[0]["id"], 42);
484    }
485
486    // ── Sink tests ──────────────────────────────────────────────────────────
487
488    #[tokio::test]
489    async fn sink_write_batch_returns_count() {
490        let sink = MockSink::new();
491        let records = vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})];
492        let count = sink.write_batch(&records).await.unwrap();
493        assert_eq!(count, 3);
494    }
495
496    #[tokio::test]
497    async fn sink_write_batch_empty() {
498        let sink = MockSink::new();
499        let count = sink.write_batch(&[]).await.unwrap();
500        assert_eq!(count, 0);
501    }
502
503    #[tokio::test]
504    async fn sink_accumulates_records() {
505        let sink = MockSink::new();
506        sink.write_batch(&[json!({"a": 1})]).await.unwrap();
507        sink.write_batch(&[json!({"b": 2})]).await.unwrap();
508        let written = sink.written.lock().unwrap();
509        assert_eq!(written.len(), 2);
510    }
511
512    #[tokio::test]
513    async fn sink_default_flush_is_noop() {
514        let sink = MockSink::new();
515        assert!(sink.flush().await.is_ok());
516    }
517
518    #[tokio::test]
519    async fn sink_error_propagates() {
520        let sink = FailingSink;
521        let result = sink.write_batch(&[json!({"id": 1})]).await;
522        assert!(result.is_err());
523        assert!(matches!(result, Err(FaucetError::Sink(_))));
524    }
525
526    #[tokio::test]
527    async fn sink_as_trait_object() {
528        let sink: Box<dyn Sink> = Box::new(MockSink::new());
529        let count = sink.write_batch(&[json!({"id": 1})]).await.unwrap();
530        assert_eq!(count, 1);
531    }
532
533    // ── stream_pages tests ──────────────────────────────────────────────────
534
535    use crate::pipeline::DEFAULT_BATCH_SIZE;
536    use futures::StreamExt;
537
538    #[tokio::test]
539    async fn default_stream_pages_chunks_records() {
540        let source = MockSource {
541            records: (0..5).map(|i| json!({"i": i})).collect(),
542        };
543        let ctx = std::collections::HashMap::new();
544        let mut pages = source.stream_pages(&ctx, 2);
545        let mut all = Vec::new();
546        while let Some(page) = pages.next().await {
547            all.push(page.unwrap());
548        }
549        // 5 records, batch_size=2 → pages of [2, 2, 1]
550        assert_eq!(all.len(), 3);
551        assert_eq!(all[0].records.len(), 2);
552        assert_eq!(all[1].records.len(), 2);
553        assert_eq!(all[2].records.len(), 1);
554    }
555
556    #[tokio::test]
557    async fn default_stream_pages_attaches_bookmark_to_final_page_only() {
558        let source = IncrementalSource {
559            records: (0..5).map(|i| json!({"i": i})).collect(),
560            bookmark: json!("v1"),
561        };
562        let ctx = std::collections::HashMap::new();
563        let mut pages = source.stream_pages(&ctx, 2);
564        let mut collected = Vec::new();
565        while let Some(page) = pages.next().await {
566            collected.push(page.unwrap());
567        }
568        assert_eq!(collected.len(), 3);
569        assert!(collected[0].bookmark.is_none());
570        assert!(collected[1].bookmark.is_none());
571        assert_eq!(collected[2].bookmark, Some(json!("v1")));
572    }
573
574    #[tokio::test]
575    async fn default_stream_pages_single_page_when_batch_size_exceeds_total() {
576        let source = MockSource {
577            records: vec![json!({"id": 1}), json!({"id": 2})],
578        };
579        let ctx = std::collections::HashMap::new();
580        let mut pages = source.stream_pages(&ctx, 100);
581        let mut collected = Vec::new();
582        while let Some(page) = pages.next().await {
583            collected.push(page.unwrap());
584        }
585        assert_eq!(collected.len(), 1);
586        assert_eq!(collected[0].records.len(), 2);
587    }
588
589    #[tokio::test]
590    async fn default_stream_pages_batch_size_zero_emits_single_page() {
591        // batch_size = 0 is the "no batching" sentinel — yields every record
592        // in one page regardless of total count.
593        let source = MockSource {
594            records: (0..50_000).map(|i| json!({"i": i})).collect(),
595        };
596        let ctx = std::collections::HashMap::new();
597        let mut pages = source.stream_pages(&ctx, 0);
598        let mut collected = Vec::new();
599        while let Some(page) = pages.next().await {
600            collected.push(page.unwrap());
601        }
602        assert_eq!(
603            collected.len(),
604            1,
605            "batch_size=0 must emit exactly one page"
606        );
607        assert_eq!(collected[0].records.len(), 50_000);
608    }
609
610    #[tokio::test]
611    async fn default_stream_pages_batch_size_zero_attaches_bookmark_to_sole_page() {
612        let source = IncrementalSource {
613            records: (0..3).map(|i| json!({"i": i})).collect(),
614            bookmark: json!("v1"),
615        };
616        let ctx = std::collections::HashMap::new();
617        let mut pages = source.stream_pages(&ctx, 0);
618        let page = pages.next().await.unwrap().unwrap();
619        assert_eq!(page.records.len(), 3);
620        assert_eq!(page.bookmark, Some(json!("v1")));
621        assert!(pages.next().await.is_none());
622    }
623
624    #[tokio::test]
625    async fn default_stream_pages_empty_source_yields_no_pages() {
626        let source = MockSource { records: vec![] };
627        let ctx = std::collections::HashMap::new();
628        let mut pages = source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
629        assert!(pages.next().await.is_none());
630    }
631
632    #[tokio::test]
633    async fn default_stream_pages_empty_source_with_bookmark_yields_single_empty_page() {
634        let source = IncrementalSource {
635            records: vec![],
636            bookmark: json!("v0"),
637        };
638        let ctx = std::collections::HashMap::new();
639        let mut pages = source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
640        let mut collected = Vec::new();
641        while let Some(page) = pages.next().await {
642            collected.push(page.unwrap());
643        }
644        // One empty-records page that carries the bookmark, so the pipeline
645        // still persists progress on otherwise-empty incremental runs.
646        assert_eq!(collected.len(), 1);
647        assert!(collected[0].records.is_empty());
648        assert_eq!(collected[0].bookmark, Some(json!("v0")));
649    }
650
651    #[tokio::test]
652    async fn default_stream_pages_propagates_fetch_errors() {
653        let source = FailingSource;
654        let ctx = std::collections::HashMap::new();
655        let mut pages = source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
656        let first = pages.next().await.unwrap();
657        assert!(matches!(first, Err(FaucetError::Auth(_))));
658    }
659
660    #[test]
661    fn source_default_connector_name_is_stripped_type_name() {
662        // MockSource lives at `faucet_core::traits::tests::MockSource`; the
663        // stripped type_name yields the trailing segment.
664        let source = MockSource { records: vec![] };
665        assert_eq!(source.connector_name(), "MockSource");
666    }
667
668    #[test]
669    fn sink_default_connector_name_is_stripped_type_name() {
670        let sink = MockSink::new();
671        assert_eq!(sink.connector_name(), "MockSink");
672    }
673
674    #[test]
675    fn source_default_dataset_uri_uses_connector_name() {
676        let source = MockSource { records: vec![] };
677        assert_eq!(source.dataset_uri(), "MockSource://unknown");
678    }
679
680    #[test]
681    fn sink_default_dataset_uri_uses_connector_name() {
682        let sink = MockSink::new();
683        assert_eq!(sink.dataset_uri(), "MockSink://unknown");
684    }
685
686    // ── write_batch_partial tests ───────────────────────────────────────────
687
688    #[tokio::test]
689    async fn default_write_batch_partial_success_returns_all_ok() {
690        let sink = MockSink::new();
691        let records = vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})];
692        let outcomes = sink.write_batch_partial(&records).await.unwrap();
693        assert_eq!(outcomes.len(), 3);
694        assert!(outcomes.iter().all(|o| o.is_ok()));
695        assert_eq!(sink.written.lock().unwrap().len(), 3);
696    }
697
698    #[tokio::test]
699    async fn default_write_batch_partial_bubbles_outer_err() {
700        let sink = FailingSink;
701        let records = vec![json!({"id": 1}), json!({"id": 2})];
702        let result = sink.write_batch_partial(&records).await;
703        assert!(matches!(result, Err(FaucetError::Sink(_))));
704    }
705
706    #[tokio::test]
707    async fn default_write_batch_partial_empty_returns_empty_vec() {
708        let sink = MockSink::new();
709        let outcomes = sink.write_batch_partial(&[]).await.unwrap();
710        assert!(outcomes.is_empty());
711    }
712
713    #[tokio::test]
714    async fn default_write_batch_partial_callable_through_trait_object() {
715        let sink: Box<dyn Sink> = Box::new(MockSink::new());
716        let records = vec![json!({"id": 1}), json!({"id": 2})];
717        let outcomes = sink.write_batch_partial(&records).await.unwrap();
718        assert_eq!(outcomes.len(), 2);
719        assert!(outcomes.iter().all(|o| o.is_ok()));
720    }
721
722    // ── check() tests ─────────────────────────────────────────────────────────
723
724    #[tokio::test]
725    async fn source_default_check_pulls_first_page_and_passes() {
726        let source = MockSource {
727            records: vec![json!({"id": 1}), json!({"id": 2})],
728        };
729        let report = source
730            .check(&crate::check::CheckContext::default())
731            .await
732            .unwrap();
733        assert_eq!(report.failed_count(), 0);
734        assert!(
735            report
736                .probes
737                .iter()
738                .any(|p| p.name == "read" && matches!(p.status, crate::check::ProbeStatus::Pass))
739        );
740    }
741
742    #[tokio::test]
743    async fn source_default_check_passes_on_empty_source() {
744        let source = MockSource { records: vec![] };
745        let report = source
746            .check(&crate::check::CheckContext::default())
747            .await
748            .unwrap();
749        // Reachable but empty is still a healthy source.
750        assert_eq!(report.failed_count(), 0);
751    }
752
753    #[tokio::test]
754    async fn source_default_check_fails_when_fetch_errors() {
755        let source = FailingSource;
756        let report = source
757            .check(&crate::check::CheckContext::default())
758            .await
759            .unwrap();
760        assert_eq!(report.failed_count(), 1);
761        assert!(report.probes.iter().any(
762            |p| p.name == "read" && matches!(p.status, crate::check::ProbeStatus::Fail { .. })
763        ));
764    }
765
766    #[tokio::test]
767    async fn sink_default_check_is_not_implemented_skip() {
768        let sink = MockSink::new();
769        let report = sink
770            .check(&crate::check::CheckContext::default())
771            .await
772            .unwrap();
773        assert_eq!(report.probes.len(), 1);
774        assert!(matches!(
775            report.probes[0].status,
776            crate::check::ProbeStatus::Skip { .. }
777        ));
778    }
779
780    #[tokio::test]
781    async fn source_check_callable_through_trait_object() {
782        let source: Box<dyn Source> = Box::new(MockSource {
783            records: vec![json!({"id": 1})],
784        });
785        let report = source
786            .check(&crate::check::CheckContext::default())
787            .await
788            .unwrap();
789        assert_eq!(report.failed_count(), 0);
790    }
791
792    // ── idempotent-write / exactly-once capability tests ──────────────────────
793
794    #[tokio::test]
795    async fn sink_default_is_not_idempotent() {
796        let sink = MockSink::new();
797        assert!(!sink.supports_idempotent_writes());
798        // Default write_batch_idempotent ignores the token and delegates.
799        let n = sink
800            .write_batch_idempotent(&[json!({"id": 1})], "scope::a", "00000000000000000001")
801            .await
802            .unwrap();
803        assert_eq!(n, 1);
804        assert_eq!(sink.last_committed_token("scope::a").await.unwrap(), None);
805        assert_eq!(sink.written.lock().unwrap().len(), 1);
806    }
807
808    #[test]
809    fn source_default_does_not_support_exactly_once() {
810        let source = MockSource { records: vec![] };
811        assert!(!source.supports_exactly_once());
812    }
813
814    #[test]
815    fn sink_default_supported_write_modes_is_append_only() {
816        use crate::write_mode::WriteMode;
817        let sink = MockSink::new();
818        assert_eq!(sink.supported_write_modes(), &[WriteMode::Append]);
819    }
820
821    #[test]
822    fn supported_write_modes_callable_through_trait_object() {
823        use crate::write_mode::WriteMode;
824        let sink: Box<dyn Sink> = Box::new(MockSink::new());
825        assert!(sink.supported_write_modes().contains(&WriteMode::Append));
826    }
827}