Skip to main content

faucet_core/observability/
decorator.rs

1//! Pipeline-internal decorators that emit spans + metrics around every
2//! source / sink trait call. See the design spec for the full vocabulary.
3
4use crate::error::FaucetError;
5use crate::observability::labels::Labels;
6use crate::observability::timer::DurationGuard;
7use crate::pipeline::StreamPage;
8use crate::traits::{Sink, Source};
9use async_trait::async_trait;
10use futures::FutureExt;
11use futures_core::Stream;
12use metrics::{Label, SharedString, counter, gauge};
13use serde_json::Value;
14use std::collections::HashMap;
15use std::panic::AssertUnwindSafe;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicUsize, Ordering};
19use tracing::{Instrument, info_span};
20
21/// Guard an inner connector's `connector_name()` so an empty string maps to
22/// the `"unknown"` fallback. Used both for the `connector` metric label and the
23/// `connector_name()` passthrough so the two never disagree.
24fn guarded_connector_name(raw: &'static str) -> &'static str {
25    if raw.is_empty() { "unknown" } else { raw }
26}
27
28/// Build the base `pipeline` / `row` / `connector` label vec once. The two
29/// `pipeline` / `row` heap allocations and the vec construction happen a single
30/// time at decorator construction; per-call sites `clone()` this instead of
31/// rebuilding from the `Arc<str>` labels on every page / write / flush.
32fn base_metric_labels(labels: &Labels, connector: &SharedString) -> Vec<Label> {
33    vec![
34        Label::new("pipeline", SharedString::from(labels.pipeline.to_string())),
35        Label::new("row", SharedString::from(labels.row.to_string())),
36        Label::new("connector", connector.clone()),
37    ]
38}
39
40/// Wraps a `&dyn Source` (or any `&S: Source`) and emits spans + metrics
41/// around every call. Constructed by `Pipeline::run` and never exposed to
42/// end users; the wrapped source remains the user-facing object.
43pub struct InstrumentedSource<'a, S: Source + ?Sized> {
44    inner: &'a S,
45    labels: Labels,
46    connector: SharedString,
47    /// Precomputed `pipeline` / `row` / `connector` labels, cloned per call.
48    base_labels: Vec<Label>,
49    page_index: Arc<AtomicUsize>,
50}
51
52impl<'a, S: Source + ?Sized> InstrumentedSource<'a, S> {
53    pub fn new(inner: &'a S, labels: Labels) -> Self {
54        let raw = inner.connector_name();
55        debug_assert!(
56            !raw.is_empty(),
57            "connector_name() must return a non-empty string"
58        );
59        let connector: SharedString = SharedString::const_str(guarded_connector_name(raw));
60        let base_labels = base_metric_labels(&labels, &connector);
61        Self {
62            inner,
63            labels,
64            connector,
65            base_labels,
66            page_index: Arc::new(AtomicUsize::new(0)),
67        }
68    }
69
70    fn metric_labels(&self) -> Vec<Label> {
71        self.base_labels.clone()
72    }
73
74    /// Returns `metric_labels()` with an additional `kind` label appended.
75    /// Used by `InstrumentedSink::write_batch` (Task 9) and any future
76    /// instrumentation paths where `self` is in scope.
77    #[allow(dead_code)]
78    fn error_labels(&self, kind: &'static str) -> Vec<Label> {
79        let mut l = self.metric_labels();
80        l.push(Label::new("kind", SharedString::const_str(kind)));
81        l
82    }
83}
84
85#[async_trait]
86impl<'a, S: Source + ?Sized> Source for InstrumentedSource<'a, S> {
87    fn connector_name(&self) -> &'static str {
88        // Return the guarded name so an inner connector that returns "" maps to
89        // the "unknown" fallback — keeping this passthrough consistent with the
90        // `connector` metric label rather than leaking an empty string.
91        guarded_connector_name(self.inner.connector_name())
92    }
93
94    fn state_key(&self) -> Option<String> {
95        self.inner.state_key()
96    }
97
98    async fn apply_start_bookmark(&self, bookmark: Value) -> Result<(), FaucetError> {
99        self.inner.apply_start_bookmark(bookmark).await
100    }
101
102    async fn fetch_with_context(
103        &self,
104        context: &HashMap<String, Value>,
105    ) -> Result<Vec<Value>, FaucetError> {
106        // Library-call path; the pipeline drives through stream_pages.
107        self.inner.fetch_with_context(context).await
108    }
109
110    async fn fetch_with_context_incremental(
111        &self,
112        context: &HashMap<String, Value>,
113    ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
114        self.inner.fetch_with_context_incremental(context).await
115    }
116
117    fn stream_pages<'b>(
118        &'b self,
119        context: &'b HashMap<String, Value>,
120        batch_size: usize,
121    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'b>> {
122        let inner_stream = self.inner.stream_pages(context, batch_size);
123        let labels = self.labels.clone();
124        let connector = self.connector.clone();
125        let page_index = Arc::clone(&self.page_index);
126        let metric_labels = self.metric_labels();
127        let pipeline = self.labels.pipeline.clone();
128        let row = self.labels.row.clone();
129
130        Box::pin(async_stream::try_stream! {
131            // In-flight gauge tracks open streams. Decrement on drop so
132            // cancellation leaves the gauge consistent.
133            struct InFlightGuard(Vec<Label>);
134            impl Drop for InFlightGuard {
135                fn drop(&mut self) {
136                    gauge!("faucet_source_in_flight", self.0.clone()).decrement(1.0);
137                }
138            }
139            gauge!("faucet_source_in_flight", metric_labels.clone()).increment(1.0);
140            let _in_flight = InFlightGuard(metric_labels.clone());
141
142            let mut inner = inner_stream;
143            loop {
144                let idx = page_index.fetch_add(1, Ordering::Relaxed);
145                let span = info_span!(
146                    "faucet.source.page",
147                    pipeline = %pipeline,
148                    row = %row,
149                    run_id = %labels.run_id,
150                    connector = %connector,
151                    page_index = idx,
152                );
153                // Armed across the poll so a cancelled / panicking page-fetch
154                // still records the time spent. Disarmed on the terminal empty
155                // poll (`Ok(None)`) so end-of-stream doesn't record a spurious
156                // ~0 sample into the page-duration histogram.
157                let mut _timer = DurationGuard::new(
158                    "faucet_source_page_duration_seconds",
159                    metric_labels.clone(),
160                );
161
162                let next = AssertUnwindSafe(async {
163                    use futures::StreamExt;
164                    inner.next().await
165                })
166                .catch_unwind()
167                .instrument(span)
168                .await;
169
170                match next {
171                    Ok(Some(Ok(page))) => {
172                        counter!("faucet_source_pages_total", metric_labels.clone()).increment(1);
173                        counter!("faucet_source_records_total", metric_labels.clone())
174                            .increment(page.records.len() as u64);
175                        yield page;
176                    }
177                    Ok(Some(Err(e))) => {
178                        let mut l = metric_labels.clone();
179                        l.push(Label::new("kind", SharedString::const_str(error_kind(&e))));
180                        counter!("faucet_source_errors_total", l).increment(1);
181                        Err(e)?;
182                    }
183                    Ok(None) => {
184                        _timer.disarm();
185                        break;
186                    }
187                    Err(panic) => {
188                        let mut l = metric_labels.clone();
189                        l.push(Label::new("kind", SharedString::const_str("Panic")));
190                        counter!("faucet_source_errors_total", l).increment(1);
191                        let msg = panic.downcast_ref::<&'static str>().map(|s| (*s).to_string())
192                            .or_else(|| panic.downcast_ref::<String>().cloned())
193                            .unwrap_or_else(|| "<non-string panic payload>".to_string());
194                        Err(FaucetError::Custom(format!("panic in source: {msg}").into()))?;
195                    }
196                }
197            }
198        })
199    }
200}
201
202/// Map a `FaucetError` variant to its stable `kind` label value. The match
203/// must be exhaustive; update when new variants are added.
204pub(crate) fn error_kind(e: &FaucetError) -> &'static str {
205    match e {
206        FaucetError::Http(_) => "Http",
207        FaucetError::HttpStatus { .. } => "HttpStatus",
208        FaucetError::Json(_) => "Json",
209        FaucetError::JsonPath(_) => "JsonPath",
210        FaucetError::Auth(_) => "Auth",
211        FaucetError::RateLimited { .. } => "RateLimited",
212        FaucetError::Url(_) => "Url",
213        FaucetError::Transform(_) => "Transform",
214        FaucetError::Config(_) => "Config",
215        FaucetError::Source(_) => "Source",
216        FaucetError::Sink(_) => "Sink",
217        FaucetError::QualityFailure { .. } => "QualityFailure",
218        FaucetError::State(_) => "State",
219        FaucetError::Custom(_) => "Custom",
220    }
221}
222
223/// Wraps a `&dyn Sink` (or any `&S: Sink`) and emits spans + metrics around
224/// `write_batch` and `flush`. Constructed by `Pipeline::run`.
225pub struct InstrumentedSink<'a, S: Sink + ?Sized> {
226    inner: &'a S,
227    labels: Labels,
228    connector: SharedString,
229    /// Precomputed `pipeline` / `row` / `connector` labels, cloned per call.
230    base_labels: Vec<Label>,
231}
232
233impl<'a, S: Sink + ?Sized> InstrumentedSink<'a, S> {
234    pub fn new(inner: &'a S, labels: Labels) -> Self {
235        let raw = inner.connector_name();
236        debug_assert!(
237            !raw.is_empty(),
238            "connector_name() must return a non-empty string"
239        );
240        let connector: SharedString = SharedString::const_str(guarded_connector_name(raw));
241        let base_labels = base_metric_labels(&labels, &connector);
242        Self {
243            inner,
244            labels,
245            connector,
246            base_labels,
247        }
248    }
249
250    fn metric_labels(&self) -> Vec<Label> {
251        self.base_labels.clone()
252    }
253
254    fn error_labels(&self, kind: &'static str) -> Vec<Label> {
255        let mut l = self.metric_labels();
256        l.push(Label::new("kind", SharedString::const_str(kind)));
257        l
258    }
259}
260
261#[async_trait]
262impl<'a, S: Sink + ?Sized> Sink for InstrumentedSink<'a, S> {
263    fn connector_name(&self) -> &'static str {
264        // Return the guarded name so an inner connector that returns "" maps to
265        // the "unknown" fallback — keeping this passthrough consistent with the
266        // `connector` metric label rather than leaking an empty string.
267        guarded_connector_name(self.inner.connector_name())
268    }
269
270    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
271        let span = info_span!(
272            "faucet.sink.write",
273            pipeline = %self.labels.pipeline,
274            row = %self.labels.row,
275            run_id = %self.labels.run_id,
276            connector = %self.connector,
277            records = records.len(),
278        );
279        let metric_labels = self.metric_labels();
280        gauge!("faucet_sink_in_flight", metric_labels.clone()).increment(1.0);
281
282        // RAII guard ensures the gauge is decremented even if write_batch
283        // panics or the future is cancelled.
284        struct InFlightGuard(Vec<Label>);
285        impl Drop for InFlightGuard {
286            fn drop(&mut self) {
287                gauge!("faucet_sink_in_flight", self.0.clone()).decrement(1.0);
288            }
289        }
290        let _in_flight = InFlightGuard(metric_labels.clone());
291
292        let _timer =
293            DurationGuard::new("faucet_sink_write_duration_seconds", metric_labels.clone());
294
295        let result = AssertUnwindSafe(self.inner.write_batch(records))
296            .catch_unwind()
297            .instrument(span)
298            .await;
299
300        match result {
301            Ok(Ok(n)) => {
302                counter!("faucet_sink_writes_total", metric_labels.clone()).increment(1);
303                counter!("faucet_sink_records_total", metric_labels.clone()).increment(n as u64);
304                Ok(n)
305            }
306            Ok(Err(e)) => {
307                counter!(
308                    "faucet_sink_errors_total",
309                    self.error_labels(error_kind(&e))
310                )
311                .increment(1);
312                Err(e)
313            }
314            Err(panic) => {
315                counter!("faucet_sink_errors_total", self.error_labels("Panic")).increment(1);
316                let msg = panic
317                    .downcast_ref::<&'static str>()
318                    .map(|s| (*s).to_string())
319                    .or_else(|| panic.downcast_ref::<String>().cloned())
320                    .unwrap_or_else(|| "<non-string panic payload>".to_string());
321                Err(FaucetError::Custom(format!("panic in sink: {msg}").into()))
322            }
323        }
324    }
325
326    async fn write_batch_partial(
327        &self,
328        records: &[Value],
329    ) -> Result<Vec<crate::traits::RowOutcome>, FaucetError> {
330        let span = info_span!(
331            "faucet.sink.write_partial",
332            pipeline = %self.labels.pipeline,
333            row = %self.labels.row,
334            run_id = %self.labels.run_id,
335            connector = %self.connector,
336            records = records.len(),
337        );
338        let metric_labels = self.metric_labels();
339        gauge!("faucet_sink_in_flight", metric_labels.clone()).increment(1.0);
340
341        // RAII guard ensures the gauge is decremented even if write_batch_partial
342        // panics or the future is cancelled.
343        struct InFlightGuard(Vec<Label>);
344        impl Drop for InFlightGuard {
345            fn drop(&mut self) {
346                gauge!("faucet_sink_in_flight", self.0.clone()).decrement(1.0);
347            }
348        }
349        let _in_flight = InFlightGuard(metric_labels.clone());
350
351        let _timer =
352            DurationGuard::new("faucet_sink_write_duration_seconds", metric_labels.clone());
353
354        let result = AssertUnwindSafe(self.inner.write_batch_partial(records))
355            .catch_unwind()
356            .instrument(span)
357            .await;
358
359        match result {
360            Ok(Ok(outcomes)) => {
361                let success_count = outcomes.iter().filter(|o| o.is_ok()).count();
362                counter!("faucet_sink_writes_total", metric_labels.clone()).increment(1);
363                counter!("faucet_sink_records_total", metric_labels.clone())
364                    .increment(success_count as u64);
365                Ok(outcomes)
366            }
367            Ok(Err(e)) => {
368                counter!(
369                    "faucet_sink_errors_total",
370                    self.error_labels(error_kind(&e))
371                )
372                .increment(1);
373                Err(e)
374            }
375            Err(panic) => {
376                counter!("faucet_sink_errors_total", self.error_labels("Panic")).increment(1);
377                let msg = panic
378                    .downcast_ref::<&'static str>()
379                    .map(|s| (*s).to_string())
380                    .or_else(|| panic.downcast_ref::<String>().cloned())
381                    .unwrap_or_else(|| "<non-string panic payload>".to_string());
382                Err(FaucetError::Custom(format!("panic in sink: {msg}").into()))
383            }
384        }
385    }
386
387    async fn flush(&self) -> Result<(), FaucetError> {
388        let span = info_span!(
389            "faucet.sink.flush",
390            pipeline = %self.labels.pipeline,
391            row = %self.labels.row,
392            run_id = %self.labels.run_id,
393            connector = %self.connector,
394        );
395        let metric_labels = self.metric_labels();
396        let _timer =
397            DurationGuard::new("faucet_sink_flush_duration_seconds", metric_labels.clone());
398
399        let result = AssertUnwindSafe(self.inner.flush())
400            .catch_unwind()
401            .instrument(span)
402            .await;
403
404        match result {
405            Ok(Ok(())) => Ok(()),
406            Ok(Err(e)) => {
407                counter!(
408                    "faucet_sink_errors_total",
409                    self.error_labels(error_kind(&e))
410                )
411                .increment(1);
412                Err(e)
413            }
414            Err(panic) => {
415                counter!("faucet_sink_errors_total", self.error_labels("Panic")).increment(1);
416                let msg = panic
417                    .downcast_ref::<&'static str>()
418                    .map(|s| (*s).to_string())
419                    .or_else(|| panic.downcast_ref::<String>().cloned())
420                    .unwrap_or_else(|| "<non-string panic payload>".to_string());
421                Err(FaucetError::Custom(format!("panic in flush: {msg}").into()))
422            }
423        }
424    }
425}
426
427#[cfg(test)]
428pub(crate) mod source_tests {
429    use super::*;
430    use async_trait::async_trait;
431    use futures::StreamExt;
432    use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshotter};
433    use serde_json::json;
434    use std::sync::{Mutex, OnceLock};
435
436    // Process-global recorder shared across all observability tests in this
437    // crate. Task 5 established the same pattern.
438    pub(crate) static LOCK: Mutex<()> = Mutex::new(());
439    static SNAPSHOTTER: OnceLock<Snapshotter> = OnceLock::new();
440
441    pub(crate) fn snapshotter() -> &'static Snapshotter {
442        SNAPSHOTTER.get_or_init(|| {
443            let recorder = DebuggingRecorder::new();
444            let snap = recorder.snapshotter();
445            // First test installs; the OnceLock guarantees we never install
446            // twice. If something else (e.g. the timer test) already installed
447            // a recorder, `set_global_recorder` will Err — but in that case
448            // *our* snapshotter is disconnected from the live recorder. The
449            // workaround is for all observability tests to share one source of
450            // truth — this file. If a future test elsewhere installs a
451            // recorder first, restructure so all tests share this OnceLock.
452            let _ = metrics::set_global_recorder(recorder);
453            snap
454        })
455    }
456
457    pub(in crate::observability) fn labels() -> Labels {
458        Labels::new("p", "r", "rid")
459    }
460
461    struct MockSource(Vec<Value>);
462    #[async_trait]
463    impl Source for MockSource {
464        async fn fetch_with_context(
465            &self,
466            _: &HashMap<String, Value>,
467        ) -> Result<Vec<Value>, FaucetError> {
468            Ok(self.0.clone())
469        }
470        fn connector_name(&self) -> &'static str {
471            "mock"
472        }
473    }
474
475    struct PanickingSource;
476    #[async_trait]
477    impl Source for PanickingSource {
478        async fn fetch_with_context(
479            &self,
480            _: &HashMap<String, Value>,
481        ) -> Result<Vec<Value>, FaucetError> {
482            panic!("kaboom")
483        }
484        fn connector_name(&self) -> &'static str {
485            "panic-test"
486        }
487    }
488
489    // Inner connector that returns an empty name. The instrumented wrapper must
490    // map this to the `"unknown"` fallback so the `connector_name()` passthrough
491    // never disagrees with the `connector` metric label.
492    struct EmptyNameSource;
493    #[async_trait]
494    impl Source for EmptyNameSource {
495        async fn fetch_with_context(
496            &self,
497            _: &HashMap<String, Value>,
498        ) -> Result<Vec<Value>, FaucetError> {
499            Ok(vec![])
500        }
501        fn connector_name(&self) -> &'static str {
502            ""
503        }
504    }
505
506    #[test]
507    fn empty_inner_connector_name_falls_back_to_unknown() {
508        let inner = EmptyNameSource;
509        // `InstrumentedSource::new` debug_asserts on an empty inner name, so
510        // build the wrapper directly with the fallback name to exercise the
511        // passthrough without tripping the assertion in debug builds.
512        let wrapped = InstrumentedSource {
513            inner: &inner,
514            labels: labels(),
515            connector: SharedString::const_str("unknown"),
516            base_labels: Vec::new(),
517            page_index: Arc::new(AtomicUsize::new(0)),
518        };
519        assert_eq!(
520            Source::connector_name(&wrapped),
521            "unknown",
522            "instrumented source must not leak an empty connector name"
523        );
524    }
525
526    #[tokio::test]
527    #[allow(clippy::await_holding_lock)]
528    async fn records_records_counter_per_page() {
529        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
530        let snap = snapshotter();
531        let inner = MockSource((0..5).map(|i| json!({"i": i})).collect());
532        let wrapped = InstrumentedSource::new(&inner, labels());
533        let ctx = HashMap::new();
534        let mut s = wrapped.stream_pages(&ctx, 2);
535        while s.next().await.is_some() {}
536        let snapshot = snap.snapshot();
537        let records: u64 = snapshot
538            .into_vec()
539            .into_iter()
540            .filter_map(|(key, _u, _d, v)| {
541                if key.key().name() == "faucet_source_records_total"
542                    && let DebugValue::Counter(c) = v
543                {
544                    return Some(c);
545                }
546                None
547            })
548            .sum();
549        assert!(
550            records >= 5,
551            "expected at least 5 records counted, got {records}"
552        );
553    }
554
555    // Source with a unique connector name so the page-duration histogram for
556    // this run can be isolated in the shared global recorder.
557    struct PageCountSource(Vec<Value>);
558    #[async_trait]
559    impl Source for PageCountSource {
560        async fn fetch_with_context(
561            &self,
562            _: &HashMap<String, Value>,
563        ) -> Result<Vec<Value>, FaucetError> {
564            Ok(self.0.clone())
565        }
566        fn connector_name(&self) -> &'static str {
567            "page-count-probe"
568        }
569    }
570
571    #[tokio::test]
572    #[allow(clippy::await_holding_lock)]
573    async fn page_duration_records_one_sample_per_yielded_page() {
574        // 5 records at batch_size 2 → pages [2, 2, 1] = 3 yielded pages. The
575        // terminal empty poll must NOT add a 4th (spurious ~0) sample.
576        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
577        let snap = snapshotter();
578        let inner = PageCountSource((0..5).map(|i| json!({"i": i})).collect());
579        let wrapped = InstrumentedSource::new(&inner, labels());
580        let ctx = HashMap::new();
581        let mut s = wrapped.stream_pages(&ctx, 2);
582        let mut pages = 0usize;
583        while s.next().await.is_some() {
584            pages += 1;
585        }
586        assert_eq!(pages, 3, "expected 3 yielded pages");
587
588        let snapshot = snap.snapshot();
589        let samples: usize = snapshot
590            .into_vec()
591            .into_iter()
592            .filter_map(|(key, _u, _d, v)| {
593                if key.key().name() == "faucet_source_page_duration_seconds"
594                    && key
595                        .key()
596                        .labels()
597                        .any(|l| l.key() == "connector" && l.value() == "page-count-probe")
598                    && let DebugValue::Histogram(h) = v
599                {
600                    return Some(h.len());
601                }
602                None
603            })
604            .sum();
605        assert_eq!(
606            samples, pages,
607            "page-duration histogram must have exactly one sample per yielded \
608             page ({pages}), not page+1 (no spurious terminal sample)"
609        );
610    }
611
612    #[tokio::test]
613    #[allow(clippy::await_holding_lock)]
614    async fn maps_panic_to_custom_error_with_kind_panic() {
615        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
616        let _snap = snapshotter();
617        let inner = PanickingSource;
618        let wrapped = InstrumentedSource::new(&inner, labels());
619        let ctx = HashMap::new();
620        let mut s = wrapped.stream_pages(&ctx, 10);
621        let first = s
622            .next()
623            .await
624            .expect("stream yields at least one item before terminating");
625        assert!(matches!(first, Err(FaucetError::Custom(_))));
626        // Process did not abort — implicit by reaching this line.
627    }
628}
629
630#[cfg(test)]
631mod sink_tests {
632    use super::source_tests::{LOCK, labels, snapshotter};
633    use super::*;
634    use async_trait::async_trait;
635    use metrics_util::debugging::DebugValue;
636    use serde_json::json;
637
638    struct MockSink(std::sync::Mutex<Vec<Value>>);
639    #[async_trait]
640    impl Sink for MockSink {
641        async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
642            self.0.lock().unwrap().extend(records.iter().cloned());
643            Ok(records.len())
644        }
645        fn connector_name(&self) -> &'static str {
646            "mock-sink"
647        }
648    }
649
650    struct FailingSink;
651    #[async_trait]
652    impl Sink for FailingSink {
653        async fn write_batch(&self, _: &[Value]) -> Result<usize, FaucetError> {
654            Err(FaucetError::Sink("nope".into()))
655        }
656        fn connector_name(&self) -> &'static str {
657            "failing-sink"
658        }
659    }
660
661    struct EmptyNameSink;
662    #[async_trait]
663    impl Sink for EmptyNameSink {
664        async fn write_batch(&self, _: &[Value]) -> Result<usize, FaucetError> {
665            Ok(0)
666        }
667        fn connector_name(&self) -> &'static str {
668            ""
669        }
670    }
671
672    #[test]
673    fn empty_inner_connector_name_falls_back_to_unknown() {
674        let inner = EmptyNameSink;
675        // `InstrumentedSink::new` debug_asserts on an empty inner name, so build
676        // the wrapper directly with the fallback name to exercise the
677        // passthrough without tripping the assertion in debug builds.
678        let wrapped = InstrumentedSink {
679            inner: &inner,
680            labels: labels(),
681            connector: SharedString::const_str("unknown"),
682            base_labels: Vec::new(),
683        };
684        assert_eq!(
685            Sink::connector_name(&wrapped),
686            "unknown",
687            "instrumented sink must not leak an empty connector name"
688        );
689    }
690
691    #[tokio::test]
692    #[allow(clippy::await_holding_lock)]
693    async fn records_writes_and_records_counters() {
694        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
695        let snap = snapshotter();
696        let inner = MockSink(std::sync::Mutex::new(Vec::new()));
697        let wrapped = InstrumentedSink::new(&inner, labels());
698        wrapped
699            .write_batch(&[json!({"a": 1}), json!({"a": 2})])
700            .await
701            .unwrap();
702        let snapshot = snap.snapshot();
703        let writes: u64 = snapshot
704            .into_vec()
705            .into_iter()
706            .filter_map(|(key, _u, _d, v)| {
707                if key.key().name() == "faucet_sink_writes_total"
708                    && let DebugValue::Counter(c) = v
709                {
710                    return Some(c);
711                }
712                None
713            })
714            .sum();
715        assert!(writes >= 1, "expected at least one write counted");
716    }
717
718    #[tokio::test]
719    #[allow(clippy::await_holding_lock)]
720    async fn error_increments_errors_total_with_kind() {
721        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
722        let snap = snapshotter();
723        let inner = FailingSink;
724        let wrapped = InstrumentedSink::new(&inner, labels());
725        let _ = wrapped.write_batch(&[json!({})]).await;
726        let snapshot = snap.snapshot();
727        let found = snapshot.into_vec().into_iter().any(|(key, _u, _d, v)| {
728            key.key().name() == "faucet_sink_errors_total"
729                && key
730                    .key()
731                    .labels()
732                    .any(|l| l.key() == "kind" && l.value() == "Sink")
733                && matches!(v, DebugValue::Counter(c) if c >= 1)
734        });
735        assert!(found, "expected sink_errors_total with kind=Sink");
736    }
737
738    #[tokio::test]
739    #[allow(clippy::await_holding_lock)]
740    async fn instrumented_sink_write_batch_partial_counts_successful_outcomes() {
741        use crate::traits::RowOutcome;
742        use metrics_util::debugging::DebugValue;
743
744        // Sink that returns 2 Ok + 1 Err.
745        struct MixedSink;
746        #[async_trait]
747        impl Sink for MixedSink {
748            async fn write_batch(&self, _r: &[Value]) -> Result<usize, FaucetError> {
749                unreachable!()
750            }
751            async fn write_batch_partial(
752                &self,
753                _r: &[Value],
754            ) -> Result<Vec<RowOutcome>, FaucetError> {
755                Ok(vec![
756                    Ok(()),
757                    Err(FaucetError::Sink("bad row".into())),
758                    Ok(()),
759                ])
760            }
761            fn connector_name(&self) -> &'static str {
762                "mixed"
763            }
764        }
765
766        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
767        let snap = snapshotter();
768
769        let inner = MixedSink;
770        let wrapped = InstrumentedSink::new(&inner, labels());
771        let _ = wrapped
772            .write_batch_partial(&[json!({}), json!({}), json!({})])
773            .await
774            .unwrap();
775
776        // faucet_sink_records_total should reflect 2 (Ok count), not 3.
777        // Filter to this test's own labels (connector="mixed") — prior tests in
778        // the same `mod sink_tests` (e.g. records_writes_and_records_counters
779        // for connector="mock-sink") leave entries in the shared global
780        // recorder, and the HashMap-iteration order of `Snapshot::into_vec()`
781        // is non-deterministic, so a naïve `find_map` returns an arbitrary
782        // entry.
783        let snapshot = snap.snapshot();
784        let records: u64 = snapshot
785            .into_vec()
786            .into_iter()
787            .filter_map(|(k, _u, _d, v): (metrics_util::CompositeKey, _, _, _)| {
788                if k.key().name() == "faucet_sink_records_total"
789                    && k.key()
790                        .labels()
791                        .any(|l| l.key() == "connector" && l.value() == "mixed")
792                    && let DebugValue::Counter(c) = v
793                {
794                    Some(c)
795                } else {
796                    None
797                }
798            })
799            .sum();
800        assert!(
801            records >= 2,
802            "expected faucet_sink_records_total{{connector=mixed}} >= 2, got {records}"
803        );
804    }
805}