1use crate::error::FaucetError;
5use crate::observability::labels::Labels;
6use crate::observability::timer::DurationGuard;
7use crate::pipeline::StreamPage;
8use crate::traits::{Sink, Source};
9use async_trait::async_trait;
10use futures::FutureExt;
11use futures_core::Stream;
12use metrics::{Label, SharedString, counter, gauge};
13use serde_json::Value;
14use std::collections::HashMap;
15use std::panic::AssertUnwindSafe;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicUsize, Ordering};
19use tracing::{Instrument, info_span};
20
21fn guarded_connector_name(raw: &'static str) -> &'static str {
25 if raw.is_empty() { "unknown" } else { raw }
26}
27
28fn base_metric_labels(labels: &Labels, connector: &SharedString) -> Vec<Label> {
33 vec![
34 Label::new("pipeline", SharedString::from(labels.pipeline.to_string())),
35 Label::new("row", SharedString::from(labels.row.to_string())),
36 Label::new("connector", connector.clone()),
37 ]
38}
39
40pub struct InstrumentedSource<'a, S: Source + ?Sized> {
44 inner: &'a S,
45 labels: Labels,
46 connector: SharedString,
47 base_labels: Vec<Label>,
49 page_index: Arc<AtomicUsize>,
50}
51
52impl<'a, S: Source + ?Sized> InstrumentedSource<'a, S> {
53 pub fn new(inner: &'a S, labels: Labels) -> Self {
54 let raw = inner.connector_name();
55 debug_assert!(
56 !raw.is_empty(),
57 "connector_name() must return a non-empty string"
58 );
59 let connector: SharedString = SharedString::const_str(guarded_connector_name(raw));
60 let base_labels = base_metric_labels(&labels, &connector);
61 Self {
62 inner,
63 labels,
64 connector,
65 base_labels,
66 page_index: Arc::new(AtomicUsize::new(0)),
67 }
68 }
69
70 fn metric_labels(&self) -> Vec<Label> {
71 self.base_labels.clone()
72 }
73
74 #[allow(dead_code)]
78 fn error_labels(&self, kind: &'static str) -> Vec<Label> {
79 let mut l = self.metric_labels();
80 l.push(Label::new("kind", SharedString::const_str(kind)));
81 l
82 }
83}
84
85#[async_trait]
86impl<'a, S: Source + ?Sized> Source for InstrumentedSource<'a, S> {
87 fn connector_name(&self) -> &'static str {
88 guarded_connector_name(self.inner.connector_name())
92 }
93
94 fn state_key(&self) -> Option<String> {
95 self.inner.state_key()
96 }
97
98 async fn apply_start_bookmark(&self, bookmark: Value) -> Result<(), FaucetError> {
99 self.inner.apply_start_bookmark(bookmark).await
100 }
101
102 async fn fetch_with_context(
103 &self,
104 context: &HashMap<String, Value>,
105 ) -> Result<Vec<Value>, FaucetError> {
106 self.inner.fetch_with_context(context).await
108 }
109
110 async fn fetch_with_context_incremental(
111 &self,
112 context: &HashMap<String, Value>,
113 ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
114 self.inner.fetch_with_context_incremental(context).await
115 }
116
117 fn stream_pages<'b>(
118 &'b self,
119 context: &'b HashMap<String, Value>,
120 batch_size: usize,
121 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'b>> {
122 let inner_stream = self.inner.stream_pages(context, batch_size);
123 let labels = self.labels.clone();
124 let connector = self.connector.clone();
125 let page_index = Arc::clone(&self.page_index);
126 let metric_labels = self.metric_labels();
127 let pipeline = self.labels.pipeline.clone();
128 let row = self.labels.row.clone();
129
130 Box::pin(async_stream::try_stream! {
131 struct InFlightGuard(Vec<Label>);
134 impl Drop for InFlightGuard {
135 fn drop(&mut self) {
136 gauge!("faucet_source_in_flight", self.0.clone()).decrement(1.0);
137 }
138 }
139 gauge!("faucet_source_in_flight", metric_labels.clone()).increment(1.0);
140 let _in_flight = InFlightGuard(metric_labels.clone());
141
142 let mut inner = inner_stream;
143 loop {
144 let idx = page_index.fetch_add(1, Ordering::Relaxed);
145 let span = info_span!(
146 "faucet.source.page",
147 pipeline = %pipeline,
148 row = %row,
149 run_id = %labels.run_id,
150 connector = %connector,
151 page_index = idx,
152 );
153 let mut _timer = DurationGuard::new(
158 "faucet_source_page_duration_seconds",
159 metric_labels.clone(),
160 );
161
162 let next = AssertUnwindSafe(async {
163 use futures::StreamExt;
164 inner.next().await
165 })
166 .catch_unwind()
167 .instrument(span)
168 .await;
169
170 match next {
171 Ok(Some(Ok(page))) => {
172 counter!("faucet_source_pages_total", metric_labels.clone()).increment(1);
173 counter!("faucet_source_records_total", metric_labels.clone())
174 .increment(page.records.len() as u64);
175 yield page;
176 }
177 Ok(Some(Err(e))) => {
178 let mut l = metric_labels.clone();
179 l.push(Label::new("kind", SharedString::const_str(error_kind(&e))));
180 counter!("faucet_source_errors_total", l).increment(1);
181 Err(e)?;
182 }
183 Ok(None) => {
184 _timer.disarm();
185 break;
186 }
187 Err(panic) => {
188 let mut l = metric_labels.clone();
189 l.push(Label::new("kind", SharedString::const_str("Panic")));
190 counter!("faucet_source_errors_total", l).increment(1);
191 let msg = panic.downcast_ref::<&'static str>().map(|s| (*s).to_string())
192 .or_else(|| panic.downcast_ref::<String>().cloned())
193 .unwrap_or_else(|| "<non-string panic payload>".to_string());
194 Err(FaucetError::Custom(format!("panic in source: {msg}").into()))?;
195 }
196 }
197 }
198 })
199 }
200}
201
202pub(crate) fn error_kind(e: &FaucetError) -> &'static str {
205 match e {
206 FaucetError::Http(_) => "Http",
207 FaucetError::HttpStatus { .. } => "HttpStatus",
208 FaucetError::Json(_) => "Json",
209 FaucetError::JsonPath(_) => "JsonPath",
210 FaucetError::Auth(_) => "Auth",
211 FaucetError::RateLimited { .. } => "RateLimited",
212 FaucetError::Url(_) => "Url",
213 FaucetError::Transform(_) => "Transform",
214 FaucetError::Config(_) => "Config",
215 FaucetError::Source(_) => "Source",
216 FaucetError::Sink(_) => "Sink",
217 FaucetError::QualityFailure { .. } => "QualityFailure",
218 FaucetError::State(_) => "State",
219 FaucetError::Custom(_) => "Custom",
220 }
221}
222
223pub struct InstrumentedSink<'a, S: Sink + ?Sized> {
226 inner: &'a S,
227 labels: Labels,
228 connector: SharedString,
229 base_labels: Vec<Label>,
231}
232
233impl<'a, S: Sink + ?Sized> InstrumentedSink<'a, S> {
234 pub fn new(inner: &'a S, labels: Labels) -> Self {
235 let raw = inner.connector_name();
236 debug_assert!(
237 !raw.is_empty(),
238 "connector_name() must return a non-empty string"
239 );
240 let connector: SharedString = SharedString::const_str(guarded_connector_name(raw));
241 let base_labels = base_metric_labels(&labels, &connector);
242 Self {
243 inner,
244 labels,
245 connector,
246 base_labels,
247 }
248 }
249
250 fn metric_labels(&self) -> Vec<Label> {
251 self.base_labels.clone()
252 }
253
254 fn error_labels(&self, kind: &'static str) -> Vec<Label> {
255 let mut l = self.metric_labels();
256 l.push(Label::new("kind", SharedString::const_str(kind)));
257 l
258 }
259}
260
261#[async_trait]
262impl<'a, S: Sink + ?Sized> Sink for InstrumentedSink<'a, S> {
263 fn connector_name(&self) -> &'static str {
264 guarded_connector_name(self.inner.connector_name())
268 }
269
270 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
271 let span = info_span!(
272 "faucet.sink.write",
273 pipeline = %self.labels.pipeline,
274 row = %self.labels.row,
275 run_id = %self.labels.run_id,
276 connector = %self.connector,
277 records = records.len(),
278 );
279 let metric_labels = self.metric_labels();
280 gauge!("faucet_sink_in_flight", metric_labels.clone()).increment(1.0);
281
282 struct InFlightGuard(Vec<Label>);
285 impl Drop for InFlightGuard {
286 fn drop(&mut self) {
287 gauge!("faucet_sink_in_flight", self.0.clone()).decrement(1.0);
288 }
289 }
290 let _in_flight = InFlightGuard(metric_labels.clone());
291
292 let _timer =
293 DurationGuard::new("faucet_sink_write_duration_seconds", metric_labels.clone());
294
295 let result = AssertUnwindSafe(self.inner.write_batch(records))
296 .catch_unwind()
297 .instrument(span)
298 .await;
299
300 match result {
301 Ok(Ok(n)) => {
302 counter!("faucet_sink_writes_total", metric_labels.clone()).increment(1);
303 counter!("faucet_sink_records_total", metric_labels.clone()).increment(n as u64);
304 Ok(n)
305 }
306 Ok(Err(e)) => {
307 counter!(
308 "faucet_sink_errors_total",
309 self.error_labels(error_kind(&e))
310 )
311 .increment(1);
312 Err(e)
313 }
314 Err(panic) => {
315 counter!("faucet_sink_errors_total", self.error_labels("Panic")).increment(1);
316 let msg = panic
317 .downcast_ref::<&'static str>()
318 .map(|s| (*s).to_string())
319 .or_else(|| panic.downcast_ref::<String>().cloned())
320 .unwrap_or_else(|| "<non-string panic payload>".to_string());
321 Err(FaucetError::Custom(format!("panic in sink: {msg}").into()))
322 }
323 }
324 }
325
326 async fn write_batch_partial(
327 &self,
328 records: &[Value],
329 ) -> Result<Vec<crate::traits::RowOutcome>, FaucetError> {
330 let span = info_span!(
331 "faucet.sink.write_partial",
332 pipeline = %self.labels.pipeline,
333 row = %self.labels.row,
334 run_id = %self.labels.run_id,
335 connector = %self.connector,
336 records = records.len(),
337 );
338 let metric_labels = self.metric_labels();
339 gauge!("faucet_sink_in_flight", metric_labels.clone()).increment(1.0);
340
341 struct InFlightGuard(Vec<Label>);
344 impl Drop for InFlightGuard {
345 fn drop(&mut self) {
346 gauge!("faucet_sink_in_flight", self.0.clone()).decrement(1.0);
347 }
348 }
349 let _in_flight = InFlightGuard(metric_labels.clone());
350
351 let _timer =
352 DurationGuard::new("faucet_sink_write_duration_seconds", metric_labels.clone());
353
354 let result = AssertUnwindSafe(self.inner.write_batch_partial(records))
355 .catch_unwind()
356 .instrument(span)
357 .await;
358
359 match result {
360 Ok(Ok(outcomes)) => {
361 let success_count = outcomes.iter().filter(|o| o.is_ok()).count();
362 counter!("faucet_sink_writes_total", metric_labels.clone()).increment(1);
363 counter!("faucet_sink_records_total", metric_labels.clone())
364 .increment(success_count as u64);
365 Ok(outcomes)
366 }
367 Ok(Err(e)) => {
368 counter!(
369 "faucet_sink_errors_total",
370 self.error_labels(error_kind(&e))
371 )
372 .increment(1);
373 Err(e)
374 }
375 Err(panic) => {
376 counter!("faucet_sink_errors_total", self.error_labels("Panic")).increment(1);
377 let msg = panic
378 .downcast_ref::<&'static str>()
379 .map(|s| (*s).to_string())
380 .or_else(|| panic.downcast_ref::<String>().cloned())
381 .unwrap_or_else(|| "<non-string panic payload>".to_string());
382 Err(FaucetError::Custom(format!("panic in sink: {msg}").into()))
383 }
384 }
385 }
386
387 async fn flush(&self) -> Result<(), FaucetError> {
388 let span = info_span!(
389 "faucet.sink.flush",
390 pipeline = %self.labels.pipeline,
391 row = %self.labels.row,
392 run_id = %self.labels.run_id,
393 connector = %self.connector,
394 );
395 let metric_labels = self.metric_labels();
396 let _timer =
397 DurationGuard::new("faucet_sink_flush_duration_seconds", metric_labels.clone());
398
399 let result = AssertUnwindSafe(self.inner.flush())
400 .catch_unwind()
401 .instrument(span)
402 .await;
403
404 match result {
405 Ok(Ok(())) => Ok(()),
406 Ok(Err(e)) => {
407 counter!(
408 "faucet_sink_errors_total",
409 self.error_labels(error_kind(&e))
410 )
411 .increment(1);
412 Err(e)
413 }
414 Err(panic) => {
415 counter!("faucet_sink_errors_total", self.error_labels("Panic")).increment(1);
416 let msg = panic
417 .downcast_ref::<&'static str>()
418 .map(|s| (*s).to_string())
419 .or_else(|| panic.downcast_ref::<String>().cloned())
420 .unwrap_or_else(|| "<non-string panic payload>".to_string());
421 Err(FaucetError::Custom(format!("panic in flush: {msg}").into()))
422 }
423 }
424 }
425}
426
427#[cfg(test)]
428pub(crate) mod source_tests {
429 use super::*;
430 use async_trait::async_trait;
431 use futures::StreamExt;
432 use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshotter};
433 use serde_json::json;
434 use std::sync::{Mutex, OnceLock};
435
436 pub(crate) static LOCK: Mutex<()> = Mutex::new(());
439 static SNAPSHOTTER: OnceLock<Snapshotter> = OnceLock::new();
440
441 pub(crate) fn snapshotter() -> &'static Snapshotter {
442 SNAPSHOTTER.get_or_init(|| {
443 let recorder = DebuggingRecorder::new();
444 let snap = recorder.snapshotter();
445 let _ = metrics::set_global_recorder(recorder);
453 snap
454 })
455 }
456
457 pub(in crate::observability) fn labels() -> Labels {
458 Labels::new("p", "r", "rid")
459 }
460
461 struct MockSource(Vec<Value>);
462 #[async_trait]
463 impl Source for MockSource {
464 async fn fetch_with_context(
465 &self,
466 _: &HashMap<String, Value>,
467 ) -> Result<Vec<Value>, FaucetError> {
468 Ok(self.0.clone())
469 }
470 fn connector_name(&self) -> &'static str {
471 "mock"
472 }
473 }
474
475 struct PanickingSource;
476 #[async_trait]
477 impl Source for PanickingSource {
478 async fn fetch_with_context(
479 &self,
480 _: &HashMap<String, Value>,
481 ) -> Result<Vec<Value>, FaucetError> {
482 panic!("kaboom")
483 }
484 fn connector_name(&self) -> &'static str {
485 "panic-test"
486 }
487 }
488
489 struct EmptyNameSource;
493 #[async_trait]
494 impl Source for EmptyNameSource {
495 async fn fetch_with_context(
496 &self,
497 _: &HashMap<String, Value>,
498 ) -> Result<Vec<Value>, FaucetError> {
499 Ok(vec![])
500 }
501 fn connector_name(&self) -> &'static str {
502 ""
503 }
504 }
505
506 #[test]
507 fn empty_inner_connector_name_falls_back_to_unknown() {
508 let inner = EmptyNameSource;
509 let wrapped = InstrumentedSource {
513 inner: &inner,
514 labels: labels(),
515 connector: SharedString::const_str("unknown"),
516 base_labels: Vec::new(),
517 page_index: Arc::new(AtomicUsize::new(0)),
518 };
519 assert_eq!(
520 Source::connector_name(&wrapped),
521 "unknown",
522 "instrumented source must not leak an empty connector name"
523 );
524 }
525
526 #[tokio::test]
527 #[allow(clippy::await_holding_lock)]
528 async fn records_records_counter_per_page() {
529 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
530 let snap = snapshotter();
531 let inner = MockSource((0..5).map(|i| json!({"i": i})).collect());
532 let wrapped = InstrumentedSource::new(&inner, labels());
533 let ctx = HashMap::new();
534 let mut s = wrapped.stream_pages(&ctx, 2);
535 while s.next().await.is_some() {}
536 let snapshot = snap.snapshot();
537 let records: u64 = snapshot
538 .into_vec()
539 .into_iter()
540 .filter_map(|(key, _u, _d, v)| {
541 if key.key().name() == "faucet_source_records_total"
542 && let DebugValue::Counter(c) = v
543 {
544 return Some(c);
545 }
546 None
547 })
548 .sum();
549 assert!(
550 records >= 5,
551 "expected at least 5 records counted, got {records}"
552 );
553 }
554
555 struct PageCountSource(Vec<Value>);
558 #[async_trait]
559 impl Source for PageCountSource {
560 async fn fetch_with_context(
561 &self,
562 _: &HashMap<String, Value>,
563 ) -> Result<Vec<Value>, FaucetError> {
564 Ok(self.0.clone())
565 }
566 fn connector_name(&self) -> &'static str {
567 "page-count-probe"
568 }
569 }
570
571 #[tokio::test]
572 #[allow(clippy::await_holding_lock)]
573 async fn page_duration_records_one_sample_per_yielded_page() {
574 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
577 let snap = snapshotter();
578 let inner = PageCountSource((0..5).map(|i| json!({"i": i})).collect());
579 let wrapped = InstrumentedSource::new(&inner, labels());
580 let ctx = HashMap::new();
581 let mut s = wrapped.stream_pages(&ctx, 2);
582 let mut pages = 0usize;
583 while s.next().await.is_some() {
584 pages += 1;
585 }
586 assert_eq!(pages, 3, "expected 3 yielded pages");
587
588 let snapshot = snap.snapshot();
589 let samples: usize = snapshot
590 .into_vec()
591 .into_iter()
592 .filter_map(|(key, _u, _d, v)| {
593 if key.key().name() == "faucet_source_page_duration_seconds"
594 && key
595 .key()
596 .labels()
597 .any(|l| l.key() == "connector" && l.value() == "page-count-probe")
598 && let DebugValue::Histogram(h) = v
599 {
600 return Some(h.len());
601 }
602 None
603 })
604 .sum();
605 assert_eq!(
606 samples, pages,
607 "page-duration histogram must have exactly one sample per yielded \
608 page ({pages}), not page+1 (no spurious terminal sample)"
609 );
610 }
611
612 #[tokio::test]
613 #[allow(clippy::await_holding_lock)]
614 async fn maps_panic_to_custom_error_with_kind_panic() {
615 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
616 let _snap = snapshotter();
617 let inner = PanickingSource;
618 let wrapped = InstrumentedSource::new(&inner, labels());
619 let ctx = HashMap::new();
620 let mut s = wrapped.stream_pages(&ctx, 10);
621 let first = s
622 .next()
623 .await
624 .expect("stream yields at least one item before terminating");
625 assert!(matches!(first, Err(FaucetError::Custom(_))));
626 }
628
629 #[test]
632 fn error_kind_covers_all_variants() {
633 use std::time::Duration;
634 let cases: Vec<(FaucetError, &str)> = vec![
639 (
640 FaucetError::HttpStatus {
641 status: 500,
642 url: "u".into(),
643 body: "b".into(),
644 },
645 "HttpStatus",
646 ),
647 (
648 FaucetError::Json(serde_json::from_str::<Value>("nope").unwrap_err()),
649 "Json",
650 ),
651 (FaucetError::JsonPath("bad".into()), "JsonPath"),
652 (FaucetError::Auth("a".into()), "Auth"),
653 (
654 FaucetError::RateLimited(Duration::from_secs(1)),
655 "RateLimited",
656 ),
657 (FaucetError::Url("bad url".into()), "Url"),
658 (FaucetError::Transform("t".into()), "Transform"),
659 (FaucetError::Config("c".into()), "Config"),
660 (FaucetError::Source("s".into()), "Source"),
661 (FaucetError::Sink("s".into()), "Sink"),
662 (
663 FaucetError::QualityFailure {
664 check: "chk".into(),
665 message: "m".into(),
666 },
667 "QualityFailure",
668 ),
669 (FaucetError::State("st".into()), "State"),
670 (
671 FaucetError::Custom(Box::new(std::io::Error::other("boom"))),
672 "Custom",
673 ),
674 ];
675 for (err, expected) in cases {
676 assert_eq!(error_kind(&err), expected, "mismatch for {err:?}");
677 }
678 }
679
680 struct PassthroughSource {
686 seen_bookmark: Mutex<Option<Value>>,
687 }
688 #[async_trait]
689 impl Source for PassthroughSource {
690 async fn fetch_with_context(
691 &self,
692 _: &HashMap<String, Value>,
693 ) -> Result<Vec<Value>, FaucetError> {
694 Ok(vec![json!({"fwc": 1})])
695 }
696 async fn fetch_with_context_incremental(
697 &self,
698 _: &HashMap<String, Value>,
699 ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
700 Ok((vec![json!({"inc": 1})], Some(json!("bm"))))
701 }
702 fn state_key(&self) -> Option<String> {
703 Some("passthrough_key".into())
704 }
705 async fn apply_start_bookmark(&self, bookmark: Value) -> Result<(), FaucetError> {
706 *self.seen_bookmark.lock().unwrap() = Some(bookmark);
707 Ok(())
708 }
709 fn connector_name(&self) -> &'static str {
710 "passthrough"
711 }
712 }
713
714 #[tokio::test]
715 async fn source_passthroughs_delegate_to_inner() {
716 let inner = PassthroughSource {
717 seen_bookmark: Mutex::new(None),
718 };
719 let wrapped = InstrumentedSource::new(&inner, labels());
720
721 assert_eq!(wrapped.state_key(), Some("passthrough_key".to_string()));
723
724 let ctx = HashMap::new();
726 assert_eq!(
727 wrapped.fetch_with_context(&ctx).await.unwrap(),
728 vec![json!({"fwc": 1})]
729 );
730
731 let (recs, bm) = wrapped.fetch_with_context_incremental(&ctx).await.unwrap();
733 assert_eq!(recs, vec![json!({"inc": 1})]);
734 assert_eq!(bm, Some(json!("bm")));
735
736 wrapped.apply_start_bookmark(json!("resume")).await.unwrap();
738 assert_eq!(
739 *inner.seen_bookmark.lock().unwrap(),
740 Some(json!("resume")),
741 "apply_start_bookmark must reach the inner source"
742 );
743 }
744}
745
746#[cfg(test)]
747mod sink_tests {
748 use super::source_tests::{LOCK, labels, snapshotter};
749 use super::*;
750 use async_trait::async_trait;
751 use metrics_util::debugging::DebugValue;
752 use serde_json::json;
753
754 struct MockSink(std::sync::Mutex<Vec<Value>>);
755 #[async_trait]
756 impl Sink for MockSink {
757 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
758 self.0.lock().unwrap().extend(records.iter().cloned());
759 Ok(records.len())
760 }
761 fn connector_name(&self) -> &'static str {
762 "mock-sink"
763 }
764 }
765
766 struct FailingSink;
767 #[async_trait]
768 impl Sink for FailingSink {
769 async fn write_batch(&self, _: &[Value]) -> Result<usize, FaucetError> {
770 Err(FaucetError::Sink("nope".into()))
771 }
772 fn connector_name(&self) -> &'static str {
773 "failing-sink"
774 }
775 }
776
777 struct EmptyNameSink;
778 #[async_trait]
779 impl Sink for EmptyNameSink {
780 async fn write_batch(&self, _: &[Value]) -> Result<usize, FaucetError> {
781 Ok(0)
782 }
783 fn connector_name(&self) -> &'static str {
784 ""
785 }
786 }
787
788 #[test]
789 fn empty_inner_connector_name_falls_back_to_unknown() {
790 let inner = EmptyNameSink;
791 let wrapped = InstrumentedSink {
795 inner: &inner,
796 labels: labels(),
797 connector: SharedString::const_str("unknown"),
798 base_labels: Vec::new(),
799 };
800 assert_eq!(
801 Sink::connector_name(&wrapped),
802 "unknown",
803 "instrumented sink must not leak an empty connector name"
804 );
805 }
806
807 #[tokio::test]
808 #[allow(clippy::await_holding_lock)]
809 async fn records_writes_and_records_counters() {
810 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
811 let snap = snapshotter();
812 let inner = MockSink(std::sync::Mutex::new(Vec::new()));
813 let wrapped = InstrumentedSink::new(&inner, labels());
814 wrapped
815 .write_batch(&[json!({"a": 1}), json!({"a": 2})])
816 .await
817 .unwrap();
818 let snapshot = snap.snapshot();
819 let writes: u64 = snapshot
820 .into_vec()
821 .into_iter()
822 .filter_map(|(key, _u, _d, v)| {
823 if key.key().name() == "faucet_sink_writes_total"
824 && let DebugValue::Counter(c) = v
825 {
826 return Some(c);
827 }
828 None
829 })
830 .sum();
831 assert!(writes >= 1, "expected at least one write counted");
832 }
833
834 #[tokio::test]
835 #[allow(clippy::await_holding_lock)]
836 async fn error_increments_errors_total_with_kind() {
837 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
838 let snap = snapshotter();
839 let inner = FailingSink;
840 let wrapped = InstrumentedSink::new(&inner, labels());
841 let _ = wrapped.write_batch(&[json!({})]).await;
842 let snapshot = snap.snapshot();
843 let found = snapshot.into_vec().into_iter().any(|(key, _u, _d, v)| {
844 key.key().name() == "faucet_sink_errors_total"
845 && key
846 .key()
847 .labels()
848 .any(|l| l.key() == "kind" && l.value() == "Sink")
849 && matches!(v, DebugValue::Counter(c) if c >= 1)
850 });
851 assert!(found, "expected sink_errors_total with kind=Sink");
852 }
853
854 #[tokio::test]
855 #[allow(clippy::await_holding_lock)]
856 async fn instrumented_sink_write_batch_partial_counts_successful_outcomes() {
857 use crate::traits::RowOutcome;
858 use metrics_util::debugging::DebugValue;
859
860 struct MixedSink;
862 #[async_trait]
863 impl Sink for MixedSink {
864 async fn write_batch(&self, _r: &[Value]) -> Result<usize, FaucetError> {
865 unreachable!()
866 }
867 async fn write_batch_partial(
868 &self,
869 _r: &[Value],
870 ) -> Result<Vec<RowOutcome>, FaucetError> {
871 Ok(vec![
872 Ok(()),
873 Err(FaucetError::Sink("bad row".into())),
874 Ok(()),
875 ])
876 }
877 fn connector_name(&self) -> &'static str {
878 "mixed"
879 }
880 }
881
882 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
883 let snap = snapshotter();
884
885 let inner = MixedSink;
886 let wrapped = InstrumentedSink::new(&inner, labels());
887 let _ = wrapped
888 .write_batch_partial(&[json!({}), json!({}), json!({})])
889 .await
890 .unwrap();
891
892 let snapshot = snap.snapshot();
900 let records: u64 = snapshot
901 .into_vec()
902 .into_iter()
903 .filter_map(|(k, _u, _d, v): (metrics_util::CompositeKey, _, _, _)| {
904 if k.key().name() == "faucet_sink_records_total"
905 && k.key()
906 .labels()
907 .any(|l| l.key() == "connector" && l.value() == "mixed")
908 && let DebugValue::Counter(c) = v
909 {
910 Some(c)
911 } else {
912 None
913 }
914 })
915 .sum();
916 assert!(
917 records >= 2,
918 "expected faucet_sink_records_total{{connector=mixed}} >= 2, got {records}"
919 );
920 }
921
922 #[tokio::test]
925 #[allow(clippy::await_holding_lock)]
926 async fn flush_error_increments_errors_total_and_propagates() {
927 struct FlushFailSink;
930 #[async_trait]
931 impl Sink for FlushFailSink {
932 async fn write_batch(&self, r: &[Value]) -> Result<usize, FaucetError> {
933 Ok(r.len())
934 }
935 async fn flush(&self) -> Result<(), FaucetError> {
936 Err(FaucetError::Sink("flush boom".into()))
937 }
938 fn connector_name(&self) -> &'static str {
939 "flush-fail-sink"
940 }
941 }
942
943 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
944 let snap = snapshotter();
945 let inner = FlushFailSink;
946 let wrapped = InstrumentedSink::new(&inner, labels());
947 let err = wrapped.flush().await.unwrap_err();
948 assert!(matches!(&err, FaucetError::Sink(m) if m.contains("flush boom")));
949
950 let snapshot = snap.snapshot();
951 let found = snapshot.into_vec().into_iter().any(|(key, _u, _d, v)| {
952 key.key().name() == "faucet_sink_errors_total"
953 && key
954 .key()
955 .labels()
956 .any(|l| l.key() == "connector" && l.value() == "flush-fail-sink")
957 && key
958 .key()
959 .labels()
960 .any(|l| l.key() == "kind" && l.value() == "Sink")
961 && matches!(v, DebugValue::Counter(c) if c >= 1)
962 });
963 assert!(
964 found,
965 "expected sink_errors_total{{connector=flush-fail-sink,kind=Sink}}"
966 );
967 }
968
969 struct PanickingSink;
972 #[async_trait]
973 impl Sink for PanickingSink {
974 async fn write_batch(&self, _: &[Value]) -> Result<usize, FaucetError> {
975 panic!("write kaboom")
976 }
977 async fn write_batch_partial(
978 &self,
979 _: &[Value],
980 ) -> Result<Vec<crate::traits::RowOutcome>, FaucetError> {
981 panic!("partial kaboom")
982 }
983 async fn flush(&self) -> Result<(), FaucetError> {
984 panic!("flush kaboom")
985 }
986 fn connector_name(&self) -> &'static str {
987 "panic-sink"
988 }
989 }
990
991 #[tokio::test]
992 #[allow(clippy::await_holding_lock)]
993 async fn write_batch_panic_maps_to_custom_error() {
994 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
995 let _snap = snapshotter();
996 let inner = PanickingSink;
997 let wrapped = InstrumentedSink::new(&inner, labels());
998 let err = wrapped.write_batch(&[json!({})]).await.unwrap_err();
999 match err {
1000 FaucetError::Custom(b) => {
1001 assert!(b.to_string().contains("panic in sink: write kaboom"))
1002 }
1003 other => panic!("expected Custom panic error, got {other:?}"),
1004 }
1005 }
1006
1007 #[tokio::test]
1008 #[allow(clippy::await_holding_lock)]
1009 async fn write_batch_partial_panic_maps_to_custom_error() {
1010 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
1011 let _snap = snapshotter();
1012 let inner = PanickingSink;
1013 let wrapped = InstrumentedSink::new(&inner, labels());
1014 let err = wrapped.write_batch_partial(&[json!({})]).await.unwrap_err();
1015 match err {
1016 FaucetError::Custom(b) => {
1017 assert!(b.to_string().contains("panic in sink: partial kaboom"))
1018 }
1019 other => panic!("expected Custom panic error, got {other:?}"),
1020 }
1021 }
1022
1023 #[tokio::test]
1024 #[allow(clippy::await_holding_lock)]
1025 async fn flush_panic_maps_to_custom_error() {
1026 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
1027 let _snap = snapshotter();
1028 let inner = PanickingSink;
1029 let wrapped = InstrumentedSink::new(&inner, labels());
1030 let err = wrapped.flush().await.unwrap_err();
1031 match err {
1032 FaucetError::Custom(b) => {
1033 assert!(b.to_string().contains("panic in flush: flush kaboom"))
1034 }
1035 other => panic!("expected Custom panic error, got {other:?}"),
1036 }
1037 }
1038}