Skip to main content

faucet_core/observability/
decorator.rs

1//! Pipeline-internal decorators that emit spans + metrics around every
2//! source / sink trait call. See the design spec for the full vocabulary.
3
4use crate::error::FaucetError;
5use crate::observability::labels::Labels;
6use crate::observability::timer::DurationGuard;
7use crate::pipeline::StreamPage;
8use crate::traits::{Sink, Source};
9use async_trait::async_trait;
10use futures::FutureExt;
11use futures_core::Stream;
12use metrics::{Label, SharedString, counter, gauge};
13use serde_json::Value;
14use std::collections::HashMap;
15use std::panic::AssertUnwindSafe;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicUsize, Ordering};
19use tracing::{Instrument, info_span};
20
21/// Guard an inner connector's `connector_name()` so an empty string maps to
22/// the `"unknown"` fallback. Used both for the `connector` metric label and the
23/// `connector_name()` passthrough so the two never disagree.
24fn guarded_connector_name(raw: &'static str) -> &'static str {
25    if raw.is_empty() { "unknown" } else { raw }
26}
27
28/// Build the base `pipeline` / `row` / `connector` label vec once. The two
29/// `pipeline` / `row` heap allocations and the vec construction happen a single
30/// time at decorator construction; per-call sites `clone()` this instead of
31/// rebuilding from the `Arc<str>` labels on every page / write / flush.
32fn base_metric_labels(labels: &Labels, connector: &SharedString) -> Vec<Label> {
33    vec![
34        Label::new("pipeline", SharedString::from(labels.pipeline.to_string())),
35        Label::new("row", SharedString::from(labels.row.to_string())),
36        Label::new("connector", connector.clone()),
37    ]
38}
39
40/// Wraps a `&dyn Source` (or any `&S: Source`) and emits spans + metrics
41/// around every call. Constructed by `Pipeline::run` and never exposed to
42/// end users; the wrapped source remains the user-facing object.
43pub struct InstrumentedSource<'a, S: Source + ?Sized> {
44    inner: &'a S,
45    labels: Labels,
46    connector: SharedString,
47    /// Precomputed `pipeline` / `row` / `connector` labels, cloned per call.
48    base_labels: Vec<Label>,
49    page_index: Arc<AtomicUsize>,
50}
51
52impl<'a, S: Source + ?Sized> InstrumentedSource<'a, S> {
53    pub fn new(inner: &'a S, labels: Labels) -> Self {
54        let raw = inner.connector_name();
55        debug_assert!(
56            !raw.is_empty(),
57            "connector_name() must return a non-empty string"
58        );
59        let connector: SharedString = SharedString::const_str(guarded_connector_name(raw));
60        let base_labels = base_metric_labels(&labels, &connector);
61        Self {
62            inner,
63            labels,
64            connector,
65            base_labels,
66            page_index: Arc::new(AtomicUsize::new(0)),
67        }
68    }
69
70    fn metric_labels(&self) -> Vec<Label> {
71        self.base_labels.clone()
72    }
73
74    /// Returns `metric_labels()` with an additional `kind` label appended.
75    /// Used by `InstrumentedSink::write_batch` (Task 9) and any future
76    /// instrumentation paths where `self` is in scope.
77    #[allow(dead_code)]
78    fn error_labels(&self, kind: &'static str) -> Vec<Label> {
79        let mut l = self.metric_labels();
80        l.push(Label::new("kind", SharedString::const_str(kind)));
81        l
82    }
83}
84
85#[async_trait]
86impl<'a, S: Source + ?Sized> Source for InstrumentedSource<'a, S> {
87    fn connector_name(&self) -> &'static str {
88        // Return the guarded name so an inner connector that returns "" maps to
89        // the "unknown" fallback — keeping this passthrough consistent with the
90        // `connector` metric label rather than leaking an empty string.
91        guarded_connector_name(self.inner.connector_name())
92    }
93
94    fn state_key(&self) -> Option<String> {
95        self.inner.state_key()
96    }
97
98    async fn apply_start_bookmark(&self, bookmark: Value) -> Result<(), FaucetError> {
99        self.inner.apply_start_bookmark(bookmark).await
100    }
101
102    async fn fetch_with_context(
103        &self,
104        context: &HashMap<String, Value>,
105    ) -> Result<Vec<Value>, FaucetError> {
106        // Library-call path; the pipeline drives through stream_pages.
107        self.inner.fetch_with_context(context).await
108    }
109
110    async fn fetch_with_context_incremental(
111        &self,
112        context: &HashMap<String, Value>,
113    ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
114        self.inner.fetch_with_context_incremental(context).await
115    }
116
117    fn stream_pages<'b>(
118        &'b self,
119        context: &'b HashMap<String, Value>,
120        batch_size: usize,
121    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'b>> {
122        let inner_stream = self.inner.stream_pages(context, batch_size);
123        let labels = self.labels.clone();
124        let connector = self.connector.clone();
125        let page_index = Arc::clone(&self.page_index);
126        let metric_labels = self.metric_labels();
127        let pipeline = self.labels.pipeline.clone();
128        let row = self.labels.row.clone();
129
130        Box::pin(async_stream::try_stream! {
131            // In-flight gauge tracks open streams. Decrement on drop so
132            // cancellation leaves the gauge consistent.
133            struct InFlightGuard(Vec<Label>);
134            impl Drop for InFlightGuard {
135                fn drop(&mut self) {
136                    gauge!("faucet_source_in_flight", self.0.clone()).decrement(1.0);
137                }
138            }
139            gauge!("faucet_source_in_flight", metric_labels.clone()).increment(1.0);
140            let _in_flight = InFlightGuard(metric_labels.clone());
141
142            let mut inner = inner_stream;
143            loop {
144                let idx = page_index.fetch_add(1, Ordering::Relaxed);
145                let span = info_span!(
146                    "faucet.source.page",
147                    pipeline = %pipeline,
148                    row = %row,
149                    run_id = %labels.run_id,
150                    connector = %connector,
151                    page_index = idx,
152                );
153                // Armed across the poll so a cancelled / panicking page-fetch
154                // still records the time spent. Disarmed on the terminal empty
155                // poll (`Ok(None)`) so end-of-stream doesn't record a spurious
156                // ~0 sample into the page-duration histogram.
157                let mut _timer = DurationGuard::new(
158                    "faucet_source_page_duration_seconds",
159                    metric_labels.clone(),
160                );
161
162                let next = AssertUnwindSafe(async {
163                    use futures::StreamExt;
164                    inner.next().await
165                })
166                .catch_unwind()
167                .instrument(span)
168                .await;
169
170                match next {
171                    Ok(Some(Ok(page))) => {
172                        counter!("faucet_source_pages_total", metric_labels.clone()).increment(1);
173                        counter!("faucet_source_records_total", metric_labels.clone())
174                            .increment(page.records.len() as u64);
175                        yield page;
176                    }
177                    Ok(Some(Err(e))) => {
178                        let mut l = metric_labels.clone();
179                        l.push(Label::new("kind", SharedString::const_str(error_kind(&e))));
180                        counter!("faucet_source_errors_total", l).increment(1);
181                        Err(e)?;
182                    }
183                    Ok(None) => {
184                        _timer.disarm();
185                        break;
186                    }
187                    Err(panic) => {
188                        let mut l = metric_labels.clone();
189                        l.push(Label::new("kind", SharedString::const_str("Panic")));
190                        counter!("faucet_source_errors_total", l).increment(1);
191                        let msg = panic.downcast_ref::<&'static str>().map(|s| (*s).to_string())
192                            .or_else(|| panic.downcast_ref::<String>().cloned())
193                            .unwrap_or_else(|| "<non-string panic payload>".to_string());
194                        Err(FaucetError::Custom(format!("panic in source: {msg}").into()))?;
195                    }
196                }
197            }
198        })
199    }
200}
201
202/// Map a `FaucetError` variant to its stable `kind` label value. The match
203/// must be exhaustive; update when new variants are added.
204pub(crate) fn error_kind(e: &FaucetError) -> &'static str {
205    match e {
206        FaucetError::Http(_) => "Http",
207        FaucetError::HttpStatus { .. } => "HttpStatus",
208        FaucetError::Json(_) => "Json",
209        FaucetError::JsonPath(_) => "JsonPath",
210        FaucetError::Auth(_) => "Auth",
211        FaucetError::RateLimited { .. } => "RateLimited",
212        FaucetError::Url(_) => "Url",
213        FaucetError::Transform(_) => "Transform",
214        FaucetError::Config(_) => "Config",
215        FaucetError::Source(_) => "Source",
216        FaucetError::Sink(_) => "Sink",
217        FaucetError::QualityFailure { .. } => "QualityFailure",
218        FaucetError::State(_) => "State",
219        FaucetError::Custom(_) => "Custom",
220    }
221}
222
223/// Wraps a `&dyn Sink` (or any `&S: Sink`) and emits spans + metrics around
224/// `write_batch` and `flush`. Constructed by `Pipeline::run`.
225pub struct InstrumentedSink<'a, S: Sink + ?Sized> {
226    inner: &'a S,
227    labels: Labels,
228    connector: SharedString,
229    /// Precomputed `pipeline` / `row` / `connector` labels, cloned per call.
230    base_labels: Vec<Label>,
231}
232
233impl<'a, S: Sink + ?Sized> InstrumentedSink<'a, S> {
234    pub fn new(inner: &'a S, labels: Labels) -> Self {
235        let raw = inner.connector_name();
236        debug_assert!(
237            !raw.is_empty(),
238            "connector_name() must return a non-empty string"
239        );
240        let connector: SharedString = SharedString::const_str(guarded_connector_name(raw));
241        let base_labels = base_metric_labels(&labels, &connector);
242        Self {
243            inner,
244            labels,
245            connector,
246            base_labels,
247        }
248    }
249
250    fn metric_labels(&self) -> Vec<Label> {
251        self.base_labels.clone()
252    }
253
254    fn error_labels(&self, kind: &'static str) -> Vec<Label> {
255        let mut l = self.metric_labels();
256        l.push(Label::new("kind", SharedString::const_str(kind)));
257        l
258    }
259}
260
261#[async_trait]
262impl<'a, S: Sink + ?Sized> Sink for InstrumentedSink<'a, S> {
263    fn connector_name(&self) -> &'static str {
264        // Return the guarded name so an inner connector that returns "" maps to
265        // the "unknown" fallback — keeping this passthrough consistent with the
266        // `connector` metric label rather than leaking an empty string.
267        guarded_connector_name(self.inner.connector_name())
268    }
269
270    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
271        let span = info_span!(
272            "faucet.sink.write",
273            pipeline = %self.labels.pipeline,
274            row = %self.labels.row,
275            run_id = %self.labels.run_id,
276            connector = %self.connector,
277            records = records.len(),
278        );
279        let metric_labels = self.metric_labels();
280        gauge!("faucet_sink_in_flight", metric_labels.clone()).increment(1.0);
281
282        // RAII guard ensures the gauge is decremented even if write_batch
283        // panics or the future is cancelled.
284        struct InFlightGuard(Vec<Label>);
285        impl Drop for InFlightGuard {
286            fn drop(&mut self) {
287                gauge!("faucet_sink_in_flight", self.0.clone()).decrement(1.0);
288            }
289        }
290        let _in_flight = InFlightGuard(metric_labels.clone());
291
292        let _timer =
293            DurationGuard::new("faucet_sink_write_duration_seconds", metric_labels.clone());
294
295        let result = AssertUnwindSafe(self.inner.write_batch(records))
296            .catch_unwind()
297            .instrument(span)
298            .await;
299
300        match result {
301            Ok(Ok(n)) => {
302                counter!("faucet_sink_writes_total", metric_labels.clone()).increment(1);
303                counter!("faucet_sink_records_total", metric_labels.clone()).increment(n as u64);
304                Ok(n)
305            }
306            Ok(Err(e)) => {
307                counter!(
308                    "faucet_sink_errors_total",
309                    self.error_labels(error_kind(&e))
310                )
311                .increment(1);
312                Err(e)
313            }
314            Err(panic) => {
315                counter!("faucet_sink_errors_total", self.error_labels("Panic")).increment(1);
316                let msg = panic
317                    .downcast_ref::<&'static str>()
318                    .map(|s| (*s).to_string())
319                    .or_else(|| panic.downcast_ref::<String>().cloned())
320                    .unwrap_or_else(|| "<non-string panic payload>".to_string());
321                Err(FaucetError::Custom(format!("panic in sink: {msg}").into()))
322            }
323        }
324    }
325
326    async fn write_batch_partial(
327        &self,
328        records: &[Value],
329    ) -> Result<Vec<crate::traits::RowOutcome>, FaucetError> {
330        let span = info_span!(
331            "faucet.sink.write_partial",
332            pipeline = %self.labels.pipeline,
333            row = %self.labels.row,
334            run_id = %self.labels.run_id,
335            connector = %self.connector,
336            records = records.len(),
337        );
338        let metric_labels = self.metric_labels();
339        gauge!("faucet_sink_in_flight", metric_labels.clone()).increment(1.0);
340
341        // RAII guard ensures the gauge is decremented even if write_batch_partial
342        // panics or the future is cancelled.
343        struct InFlightGuard(Vec<Label>);
344        impl Drop for InFlightGuard {
345            fn drop(&mut self) {
346                gauge!("faucet_sink_in_flight", self.0.clone()).decrement(1.0);
347            }
348        }
349        let _in_flight = InFlightGuard(metric_labels.clone());
350
351        let _timer =
352            DurationGuard::new("faucet_sink_write_duration_seconds", metric_labels.clone());
353
354        let result = AssertUnwindSafe(self.inner.write_batch_partial(records))
355            .catch_unwind()
356            .instrument(span)
357            .await;
358
359        match result {
360            Ok(Ok(outcomes)) => {
361                let success_count = outcomes.iter().filter(|o| o.is_ok()).count();
362                counter!("faucet_sink_writes_total", metric_labels.clone()).increment(1);
363                counter!("faucet_sink_records_total", metric_labels.clone())
364                    .increment(success_count as u64);
365                Ok(outcomes)
366            }
367            Ok(Err(e)) => {
368                counter!(
369                    "faucet_sink_errors_total",
370                    self.error_labels(error_kind(&e))
371                )
372                .increment(1);
373                Err(e)
374            }
375            Err(panic) => {
376                counter!("faucet_sink_errors_total", self.error_labels("Panic")).increment(1);
377                let msg = panic
378                    .downcast_ref::<&'static str>()
379                    .map(|s| (*s).to_string())
380                    .or_else(|| panic.downcast_ref::<String>().cloned())
381                    .unwrap_or_else(|| "<non-string panic payload>".to_string());
382                Err(FaucetError::Custom(format!("panic in sink: {msg}").into()))
383            }
384        }
385    }
386
387    async fn flush(&self) -> Result<(), FaucetError> {
388        let span = info_span!(
389            "faucet.sink.flush",
390            pipeline = %self.labels.pipeline,
391            row = %self.labels.row,
392            run_id = %self.labels.run_id,
393            connector = %self.connector,
394        );
395        let metric_labels = self.metric_labels();
396        let _timer =
397            DurationGuard::new("faucet_sink_flush_duration_seconds", metric_labels.clone());
398
399        let result = AssertUnwindSafe(self.inner.flush())
400            .catch_unwind()
401            .instrument(span)
402            .await;
403
404        match result {
405            Ok(Ok(())) => Ok(()),
406            Ok(Err(e)) => {
407                counter!(
408                    "faucet_sink_errors_total",
409                    self.error_labels(error_kind(&e))
410                )
411                .increment(1);
412                Err(e)
413            }
414            Err(panic) => {
415                counter!("faucet_sink_errors_total", self.error_labels("Panic")).increment(1);
416                let msg = panic
417                    .downcast_ref::<&'static str>()
418                    .map(|s| (*s).to_string())
419                    .or_else(|| panic.downcast_ref::<String>().cloned())
420                    .unwrap_or_else(|| "<non-string panic payload>".to_string());
421                Err(FaucetError::Custom(format!("panic in flush: {msg}").into()))
422            }
423        }
424    }
425}
426
427#[cfg(test)]
428pub(crate) mod source_tests {
429    use super::*;
430    use async_trait::async_trait;
431    use futures::StreamExt;
432    use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshotter};
433    use serde_json::json;
434    use std::sync::{Mutex, OnceLock};
435
436    // Process-global recorder shared across all observability tests in this
437    // crate. Task 5 established the same pattern.
438    pub(crate) static LOCK: Mutex<()> = Mutex::new(());
439    static SNAPSHOTTER: OnceLock<Snapshotter> = OnceLock::new();
440
441    pub(crate) fn snapshotter() -> &'static Snapshotter {
442        SNAPSHOTTER.get_or_init(|| {
443            let recorder = DebuggingRecorder::new();
444            let snap = recorder.snapshotter();
445            // First test installs; the OnceLock guarantees we never install
446            // twice. If something else (e.g. the timer test) already installed
447            // a recorder, `set_global_recorder` will Err — but in that case
448            // *our* snapshotter is disconnected from the live recorder. The
449            // workaround is for all observability tests to share one source of
450            // truth — this file. If a future test elsewhere installs a
451            // recorder first, restructure so all tests share this OnceLock.
452            let _ = metrics::set_global_recorder(recorder);
453            snap
454        })
455    }
456
457    pub(in crate::observability) fn labels() -> Labels {
458        Labels::new("p", "r", "rid")
459    }
460
461    struct MockSource(Vec<Value>);
462    #[async_trait]
463    impl Source for MockSource {
464        async fn fetch_with_context(
465            &self,
466            _: &HashMap<String, Value>,
467        ) -> Result<Vec<Value>, FaucetError> {
468            Ok(self.0.clone())
469        }
470        fn connector_name(&self) -> &'static str {
471            "mock"
472        }
473    }
474
475    struct PanickingSource;
476    #[async_trait]
477    impl Source for PanickingSource {
478        async fn fetch_with_context(
479            &self,
480            _: &HashMap<String, Value>,
481        ) -> Result<Vec<Value>, FaucetError> {
482            panic!("kaboom")
483        }
484        fn connector_name(&self) -> &'static str {
485            "panic-test"
486        }
487    }
488
489    // Inner connector that returns an empty name. The instrumented wrapper must
490    // map this to the `"unknown"` fallback so the `connector_name()` passthrough
491    // never disagrees with the `connector` metric label.
492    struct EmptyNameSource;
493    #[async_trait]
494    impl Source for EmptyNameSource {
495        async fn fetch_with_context(
496            &self,
497            _: &HashMap<String, Value>,
498        ) -> Result<Vec<Value>, FaucetError> {
499            Ok(vec![])
500        }
501        fn connector_name(&self) -> &'static str {
502            ""
503        }
504    }
505
506    #[test]
507    fn empty_inner_connector_name_falls_back_to_unknown() {
508        let inner = EmptyNameSource;
509        // `InstrumentedSource::new` debug_asserts on an empty inner name, so
510        // build the wrapper directly with the fallback name to exercise the
511        // passthrough without tripping the assertion in debug builds.
512        let wrapped = InstrumentedSource {
513            inner: &inner,
514            labels: labels(),
515            connector: SharedString::const_str("unknown"),
516            base_labels: Vec::new(),
517            page_index: Arc::new(AtomicUsize::new(0)),
518        };
519        assert_eq!(
520            Source::connector_name(&wrapped),
521            "unknown",
522            "instrumented source must not leak an empty connector name"
523        );
524    }
525
526    #[tokio::test]
527    #[allow(clippy::await_holding_lock)]
528    async fn records_records_counter_per_page() {
529        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
530        let snap = snapshotter();
531        let inner = MockSource((0..5).map(|i| json!({"i": i})).collect());
532        let wrapped = InstrumentedSource::new(&inner, labels());
533        let ctx = HashMap::new();
534        let mut s = wrapped.stream_pages(&ctx, 2);
535        while s.next().await.is_some() {}
536        let snapshot = snap.snapshot();
537        let records: u64 = snapshot
538            .into_vec()
539            .into_iter()
540            .filter_map(|(key, _u, _d, v)| {
541                if key.key().name() == "faucet_source_records_total"
542                    && let DebugValue::Counter(c) = v
543                {
544                    return Some(c);
545                }
546                None
547            })
548            .sum();
549        assert!(
550            records >= 5,
551            "expected at least 5 records counted, got {records}"
552        );
553    }
554
555    // Source with a unique connector name so the page-duration histogram for
556    // this run can be isolated in the shared global recorder.
557    struct PageCountSource(Vec<Value>);
558    #[async_trait]
559    impl Source for PageCountSource {
560        async fn fetch_with_context(
561            &self,
562            _: &HashMap<String, Value>,
563        ) -> Result<Vec<Value>, FaucetError> {
564            Ok(self.0.clone())
565        }
566        fn connector_name(&self) -> &'static str {
567            "page-count-probe"
568        }
569    }
570
571    #[tokio::test]
572    #[allow(clippy::await_holding_lock)]
573    async fn page_duration_records_one_sample_per_yielded_page() {
574        // 5 records at batch_size 2 → pages [2, 2, 1] = 3 yielded pages. The
575        // terminal empty poll must NOT add a 4th (spurious ~0) sample.
576        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
577        let snap = snapshotter();
578        let inner = PageCountSource((0..5).map(|i| json!({"i": i})).collect());
579        let wrapped = InstrumentedSource::new(&inner, labels());
580        let ctx = HashMap::new();
581        let mut s = wrapped.stream_pages(&ctx, 2);
582        let mut pages = 0usize;
583        while s.next().await.is_some() {
584            pages += 1;
585        }
586        assert_eq!(pages, 3, "expected 3 yielded pages");
587
588        let snapshot = snap.snapshot();
589        let samples: usize = snapshot
590            .into_vec()
591            .into_iter()
592            .filter_map(|(key, _u, _d, v)| {
593                if key.key().name() == "faucet_source_page_duration_seconds"
594                    && key
595                        .key()
596                        .labels()
597                        .any(|l| l.key() == "connector" && l.value() == "page-count-probe")
598                    && let DebugValue::Histogram(h) = v
599                {
600                    return Some(h.len());
601                }
602                None
603            })
604            .sum();
605        assert_eq!(
606            samples, pages,
607            "page-duration histogram must have exactly one sample per yielded \
608             page ({pages}), not page+1 (no spurious terminal sample)"
609        );
610    }
611
612    #[tokio::test]
613    #[allow(clippy::await_holding_lock)]
614    async fn maps_panic_to_custom_error_with_kind_panic() {
615        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
616        let _snap = snapshotter();
617        let inner = PanickingSource;
618        let wrapped = InstrumentedSource::new(&inner, labels());
619        let ctx = HashMap::new();
620        let mut s = wrapped.stream_pages(&ctx, 10);
621        let first = s
622            .next()
623            .await
624            .expect("stream yields at least one item before terminating");
625        assert!(matches!(first, Err(FaucetError::Custom(_))));
626        // Process did not abort — implicit by reaching this line.
627    }
628
629    // ── error_kind: exhaustive variant → label mapping ───────────────────────
630
631    #[test]
632    fn error_kind_covers_all_variants() {
633        use std::time::Duration;
634        // Build one of every non-`Http` FaucetError variant and assert its
635        // stable label. (`Http` wraps a `reqwest::Error`, which has no public
636        // constructor; it is exercised through the live request paths in the
637        // connector crates' tests.)
638        let cases: Vec<(FaucetError, &str)> = vec![
639            (
640                FaucetError::HttpStatus {
641                    status: 500,
642                    url: "u".into(),
643                    body: "b".into(),
644                },
645                "HttpStatus",
646            ),
647            (
648                FaucetError::Json(serde_json::from_str::<Value>("nope").unwrap_err()),
649                "Json",
650            ),
651            (FaucetError::JsonPath("bad".into()), "JsonPath"),
652            (FaucetError::Auth("a".into()), "Auth"),
653            (
654                FaucetError::RateLimited(Duration::from_secs(1)),
655                "RateLimited",
656            ),
657            (FaucetError::Url("bad url".into()), "Url"),
658            (FaucetError::Transform("t".into()), "Transform"),
659            (FaucetError::Config("c".into()), "Config"),
660            (FaucetError::Source("s".into()), "Source"),
661            (FaucetError::Sink("s".into()), "Sink"),
662            (
663                FaucetError::QualityFailure {
664                    check: "chk".into(),
665                    message: "m".into(),
666                },
667                "QualityFailure",
668            ),
669            (FaucetError::State("st".into()), "State"),
670            (
671                FaucetError::Custom(Box::new(std::io::Error::other("boom"))),
672                "Custom",
673            ),
674        ];
675        for (err, expected) in cases {
676            assert_eq!(error_kind(&err), expected, "mismatch for {err:?}");
677        }
678    }
679
680    // ── Source passthrough methods ───────────────────────────────────────────
681
682    // A source that overrides every passthrough so the instrumented wrapper's
683    // delegating methods (state_key / apply_start_bookmark / fetch_with_context
684    // / fetch_with_context_incremental) are exercised.
685    struct PassthroughSource {
686        seen_bookmark: Mutex<Option<Value>>,
687    }
688    #[async_trait]
689    impl Source for PassthroughSource {
690        async fn fetch_with_context(
691            &self,
692            _: &HashMap<String, Value>,
693        ) -> Result<Vec<Value>, FaucetError> {
694            Ok(vec![json!({"fwc": 1})])
695        }
696        async fn fetch_with_context_incremental(
697            &self,
698            _: &HashMap<String, Value>,
699        ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
700            Ok((vec![json!({"inc": 1})], Some(json!("bm"))))
701        }
702        fn state_key(&self) -> Option<String> {
703            Some("passthrough_key".into())
704        }
705        async fn apply_start_bookmark(&self, bookmark: Value) -> Result<(), FaucetError> {
706            *self.seen_bookmark.lock().unwrap() = Some(bookmark);
707            Ok(())
708        }
709        fn connector_name(&self) -> &'static str {
710            "passthrough"
711        }
712    }
713
714    #[tokio::test]
715    async fn source_passthroughs_delegate_to_inner() {
716        let inner = PassthroughSource {
717            seen_bookmark: Mutex::new(None),
718        };
719        let wrapped = InstrumentedSource::new(&inner, labels());
720
721        // state_key passthrough
722        assert_eq!(wrapped.state_key(), Some("passthrough_key".to_string()));
723
724        // fetch_with_context passthrough
725        let ctx = HashMap::new();
726        assert_eq!(
727            wrapped.fetch_with_context(&ctx).await.unwrap(),
728            vec![json!({"fwc": 1})]
729        );
730
731        // fetch_with_context_incremental passthrough
732        let (recs, bm) = wrapped.fetch_with_context_incremental(&ctx).await.unwrap();
733        assert_eq!(recs, vec![json!({"inc": 1})]);
734        assert_eq!(bm, Some(json!("bm")));
735
736        // apply_start_bookmark passthrough
737        wrapped.apply_start_bookmark(json!("resume")).await.unwrap();
738        assert_eq!(
739            *inner.seen_bookmark.lock().unwrap(),
740            Some(json!("resume")),
741            "apply_start_bookmark must reach the inner source"
742        );
743    }
744}
745
746#[cfg(test)]
747mod sink_tests {
748    use super::source_tests::{LOCK, labels, snapshotter};
749    use super::*;
750    use async_trait::async_trait;
751    use metrics_util::debugging::DebugValue;
752    use serde_json::json;
753
754    struct MockSink(std::sync::Mutex<Vec<Value>>);
755    #[async_trait]
756    impl Sink for MockSink {
757        async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
758            self.0.lock().unwrap().extend(records.iter().cloned());
759            Ok(records.len())
760        }
761        fn connector_name(&self) -> &'static str {
762            "mock-sink"
763        }
764    }
765
766    struct FailingSink;
767    #[async_trait]
768    impl Sink for FailingSink {
769        async fn write_batch(&self, _: &[Value]) -> Result<usize, FaucetError> {
770            Err(FaucetError::Sink("nope".into()))
771        }
772        fn connector_name(&self) -> &'static str {
773            "failing-sink"
774        }
775    }
776
777    struct EmptyNameSink;
778    #[async_trait]
779    impl Sink for EmptyNameSink {
780        async fn write_batch(&self, _: &[Value]) -> Result<usize, FaucetError> {
781            Ok(0)
782        }
783        fn connector_name(&self) -> &'static str {
784            ""
785        }
786    }
787
788    #[test]
789    fn empty_inner_connector_name_falls_back_to_unknown() {
790        let inner = EmptyNameSink;
791        // `InstrumentedSink::new` debug_asserts on an empty inner name, so build
792        // the wrapper directly with the fallback name to exercise the
793        // passthrough without tripping the assertion in debug builds.
794        let wrapped = InstrumentedSink {
795            inner: &inner,
796            labels: labels(),
797            connector: SharedString::const_str("unknown"),
798            base_labels: Vec::new(),
799        };
800        assert_eq!(
801            Sink::connector_name(&wrapped),
802            "unknown",
803            "instrumented sink must not leak an empty connector name"
804        );
805    }
806
807    #[tokio::test]
808    #[allow(clippy::await_holding_lock)]
809    async fn records_writes_and_records_counters() {
810        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
811        let snap = snapshotter();
812        let inner = MockSink(std::sync::Mutex::new(Vec::new()));
813        let wrapped = InstrumentedSink::new(&inner, labels());
814        wrapped
815            .write_batch(&[json!({"a": 1}), json!({"a": 2})])
816            .await
817            .unwrap();
818        let snapshot = snap.snapshot();
819        let writes: u64 = snapshot
820            .into_vec()
821            .into_iter()
822            .filter_map(|(key, _u, _d, v)| {
823                if key.key().name() == "faucet_sink_writes_total"
824                    && let DebugValue::Counter(c) = v
825                {
826                    return Some(c);
827                }
828                None
829            })
830            .sum();
831        assert!(writes >= 1, "expected at least one write counted");
832    }
833
834    #[tokio::test]
835    #[allow(clippy::await_holding_lock)]
836    async fn error_increments_errors_total_with_kind() {
837        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
838        let snap = snapshotter();
839        let inner = FailingSink;
840        let wrapped = InstrumentedSink::new(&inner, labels());
841        let _ = wrapped.write_batch(&[json!({})]).await;
842        let snapshot = snap.snapshot();
843        let found = snapshot.into_vec().into_iter().any(|(key, _u, _d, v)| {
844            key.key().name() == "faucet_sink_errors_total"
845                && key
846                    .key()
847                    .labels()
848                    .any(|l| l.key() == "kind" && l.value() == "Sink")
849                && matches!(v, DebugValue::Counter(c) if c >= 1)
850        });
851        assert!(found, "expected sink_errors_total with kind=Sink");
852    }
853
854    #[tokio::test]
855    #[allow(clippy::await_holding_lock)]
856    async fn instrumented_sink_write_batch_partial_counts_successful_outcomes() {
857        use crate::traits::RowOutcome;
858        use metrics_util::debugging::DebugValue;
859
860        // Sink that returns 2 Ok + 1 Err.
861        struct MixedSink;
862        #[async_trait]
863        impl Sink for MixedSink {
864            async fn write_batch(&self, _r: &[Value]) -> Result<usize, FaucetError> {
865                unreachable!()
866            }
867            async fn write_batch_partial(
868                &self,
869                _r: &[Value],
870            ) -> Result<Vec<RowOutcome>, FaucetError> {
871                Ok(vec![
872                    Ok(()),
873                    Err(FaucetError::Sink("bad row".into())),
874                    Ok(()),
875                ])
876            }
877            fn connector_name(&self) -> &'static str {
878                "mixed"
879            }
880        }
881
882        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
883        let snap = snapshotter();
884
885        let inner = MixedSink;
886        let wrapped = InstrumentedSink::new(&inner, labels());
887        let _ = wrapped
888            .write_batch_partial(&[json!({}), json!({}), json!({})])
889            .await
890            .unwrap();
891
892        // faucet_sink_records_total should reflect 2 (Ok count), not 3.
893        // Filter to this test's own labels (connector="mixed") — prior tests in
894        // the same `mod sink_tests` (e.g. records_writes_and_records_counters
895        // for connector="mock-sink") leave entries in the shared global
896        // recorder, and the HashMap-iteration order of `Snapshot::into_vec()`
897        // is non-deterministic, so a naïve `find_map` returns an arbitrary
898        // entry.
899        let snapshot = snap.snapshot();
900        let records: u64 = snapshot
901            .into_vec()
902            .into_iter()
903            .filter_map(|(k, _u, _d, v): (metrics_util::CompositeKey, _, _, _)| {
904                if k.key().name() == "faucet_sink_records_total"
905                    && k.key()
906                        .labels()
907                        .any(|l| l.key() == "connector" && l.value() == "mixed")
908                    && let DebugValue::Counter(c) = v
909                {
910                    Some(c)
911                } else {
912                    None
913                }
914            })
915            .sum();
916        assert!(
917            records >= 2,
918            "expected faucet_sink_records_total{{connector=mixed}} >= 2, got {records}"
919        );
920    }
921
922    // ── flush error path ─────────────────────────────────────────────────────
923
924    #[tokio::test]
925    #[allow(clippy::await_holding_lock)]
926    async fn flush_error_increments_errors_total_and_propagates() {
927        // A sink whose flush() returns Err must surface the error and emit
928        // faucet_sink_errors_total with the matching kind label.
929        struct FlushFailSink;
930        #[async_trait]
931        impl Sink for FlushFailSink {
932            async fn write_batch(&self, r: &[Value]) -> Result<usize, FaucetError> {
933                Ok(r.len())
934            }
935            async fn flush(&self) -> Result<(), FaucetError> {
936                Err(FaucetError::Sink("flush boom".into()))
937            }
938            fn connector_name(&self) -> &'static str {
939                "flush-fail-sink"
940            }
941        }
942
943        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
944        let snap = snapshotter();
945        let inner = FlushFailSink;
946        let wrapped = InstrumentedSink::new(&inner, labels());
947        let err = wrapped.flush().await.unwrap_err();
948        assert!(matches!(&err, FaucetError::Sink(m) if m.contains("flush boom")));
949
950        let snapshot = snap.snapshot();
951        let found = snapshot.into_vec().into_iter().any(|(key, _u, _d, v)| {
952            key.key().name() == "faucet_sink_errors_total"
953                && key
954                    .key()
955                    .labels()
956                    .any(|l| l.key() == "connector" && l.value() == "flush-fail-sink")
957                && key
958                    .key()
959                    .labels()
960                    .any(|l| l.key() == "kind" && l.value() == "Sink")
961                && matches!(v, DebugValue::Counter(c) if c >= 1)
962        });
963        assert!(
964            found,
965            "expected sink_errors_total{{connector=flush-fail-sink,kind=Sink}}"
966        );
967    }
968
969    // ── panic isolation on every sink call ───────────────────────────────────
970
971    struct PanickingSink;
972    #[async_trait]
973    impl Sink for PanickingSink {
974        async fn write_batch(&self, _: &[Value]) -> Result<usize, FaucetError> {
975            panic!("write kaboom")
976        }
977        async fn write_batch_partial(
978            &self,
979            _: &[Value],
980        ) -> Result<Vec<crate::traits::RowOutcome>, FaucetError> {
981            panic!("partial kaboom")
982        }
983        async fn flush(&self) -> Result<(), FaucetError> {
984            panic!("flush kaboom")
985        }
986        fn connector_name(&self) -> &'static str {
987            "panic-sink"
988        }
989    }
990
991    #[tokio::test]
992    #[allow(clippy::await_holding_lock)]
993    async fn write_batch_panic_maps_to_custom_error() {
994        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
995        let _snap = snapshotter();
996        let inner = PanickingSink;
997        let wrapped = InstrumentedSink::new(&inner, labels());
998        let err = wrapped.write_batch(&[json!({})]).await.unwrap_err();
999        match err {
1000            FaucetError::Custom(b) => {
1001                assert!(b.to_string().contains("panic in sink: write kaboom"))
1002            }
1003            other => panic!("expected Custom panic error, got {other:?}"),
1004        }
1005    }
1006
1007    #[tokio::test]
1008    #[allow(clippy::await_holding_lock)]
1009    async fn write_batch_partial_panic_maps_to_custom_error() {
1010        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
1011        let _snap = snapshotter();
1012        let inner = PanickingSink;
1013        let wrapped = InstrumentedSink::new(&inner, labels());
1014        let err = wrapped.write_batch_partial(&[json!({})]).await.unwrap_err();
1015        match err {
1016            FaucetError::Custom(b) => {
1017                assert!(b.to_string().contains("panic in sink: partial kaboom"))
1018            }
1019            other => panic!("expected Custom panic error, got {other:?}"),
1020        }
1021    }
1022
1023    #[tokio::test]
1024    #[allow(clippy::await_holding_lock)]
1025    async fn flush_panic_maps_to_custom_error() {
1026        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
1027        let _snap = snapshotter();
1028        let inner = PanickingSink;
1029        let wrapped = InstrumentedSink::new(&inner, labels());
1030        let err = wrapped.flush().await.unwrap_err();
1031        match err {
1032            FaucetError::Custom(b) => {
1033                assert!(b.to_string().contains("panic in flush: flush kaboom"))
1034            }
1035            other => panic!("expected Custom panic error, got {other:?}"),
1036        }
1037    }
1038}