1use crate::error::FaucetError;
5use crate::observability::labels::Labels;
6use crate::observability::timer::DurationGuard;
7use crate::pipeline::StreamPage;
8use crate::traits::{Sink, Source};
9use async_trait::async_trait;
10use futures::FutureExt;
11use futures_core::Stream;
12use metrics::{Label, SharedString, counter, gauge};
13use serde_json::Value;
14use std::collections::HashMap;
15use std::panic::AssertUnwindSafe;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicUsize, Ordering};
19use tracing::{Instrument, info_span};
20
21fn guarded_connector_name(raw: &'static str) -> &'static str {
25 if raw.is_empty() { "unknown" } else { raw }
26}
27
28fn base_metric_labels(labels: &Labels, connector: &SharedString) -> Vec<Label> {
33 vec![
34 Label::new("pipeline", SharedString::from(labels.pipeline.to_string())),
35 Label::new("row", SharedString::from(labels.row.to_string())),
36 Label::new("connector", connector.clone()),
37 ]
38}
39
40pub struct InstrumentedSource<'a, S: Source + ?Sized> {
44 inner: &'a S,
45 labels: Labels,
46 connector: SharedString,
47 base_labels: Vec<Label>,
49 page_index: Arc<AtomicUsize>,
50}
51
52impl<'a, S: Source + ?Sized> InstrumentedSource<'a, S> {
53 pub fn new(inner: &'a S, labels: Labels) -> Self {
54 let raw = inner.connector_name();
55 debug_assert!(
56 !raw.is_empty(),
57 "connector_name() must return a non-empty string"
58 );
59 let connector: SharedString = SharedString::const_str(guarded_connector_name(raw));
60 let base_labels = base_metric_labels(&labels, &connector);
61 Self {
62 inner,
63 labels,
64 connector,
65 base_labels,
66 page_index: Arc::new(AtomicUsize::new(0)),
67 }
68 }
69
70 fn metric_labels(&self) -> Vec<Label> {
71 self.base_labels.clone()
72 }
73
74 #[allow(dead_code)]
78 fn error_labels(&self, kind: &'static str) -> Vec<Label> {
79 let mut l = self.metric_labels();
80 l.push(Label::new("kind", SharedString::const_str(kind)));
81 l
82 }
83}
84
85#[async_trait]
86impl<'a, S: Source + ?Sized> Source for InstrumentedSource<'a, S> {
87 fn connector_name(&self) -> &'static str {
88 guarded_connector_name(self.inner.connector_name())
92 }
93
94 fn state_key(&self) -> Option<String> {
95 self.inner.state_key()
96 }
97
98 async fn apply_start_bookmark(&self, bookmark: Value) -> Result<(), FaucetError> {
99 self.inner.apply_start_bookmark(bookmark).await
100 }
101
102 async fn fetch_with_context(
103 &self,
104 context: &HashMap<String, Value>,
105 ) -> Result<Vec<Value>, FaucetError> {
106 self.inner.fetch_with_context(context).await
108 }
109
110 async fn fetch_with_context_incremental(
111 &self,
112 context: &HashMap<String, Value>,
113 ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
114 self.inner.fetch_with_context_incremental(context).await
115 }
116
117 fn stream_pages<'b>(
118 &'b self,
119 context: &'b HashMap<String, Value>,
120 batch_size: usize,
121 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'b>> {
122 let inner_stream = self.inner.stream_pages(context, batch_size);
123 let labels = self.labels.clone();
124 let connector = self.connector.clone();
125 let page_index = Arc::clone(&self.page_index);
126 let metric_labels = self.metric_labels();
127 let pipeline = self.labels.pipeline.clone();
128 let row = self.labels.row.clone();
129
130 Box::pin(async_stream::try_stream! {
131 struct InFlightGuard(Vec<Label>);
134 impl Drop for InFlightGuard {
135 fn drop(&mut self) {
136 gauge!("faucet_source_in_flight", self.0.clone()).decrement(1.0);
137 }
138 }
139 gauge!("faucet_source_in_flight", metric_labels.clone()).increment(1.0);
140 let _in_flight = InFlightGuard(metric_labels.clone());
141
142 let mut inner = inner_stream;
143 loop {
144 let idx = page_index.fetch_add(1, Ordering::Relaxed);
145 let span = info_span!(
146 "faucet.source.page",
147 pipeline = %pipeline,
148 row = %row,
149 run_id = %labels.run_id,
150 connector = %connector,
151 page_index = idx,
152 );
153 let mut _timer = DurationGuard::new(
158 "faucet_source_page_duration_seconds",
159 metric_labels.clone(),
160 );
161
162 let next = AssertUnwindSafe(async {
163 use futures::StreamExt;
164 inner.next().await
165 })
166 .catch_unwind()
167 .instrument(span)
168 .await;
169
170 match next {
171 Ok(Some(Ok(page))) => {
172 counter!("faucet_source_pages_total", metric_labels.clone()).increment(1);
173 counter!("faucet_source_records_total", metric_labels.clone())
174 .increment(page.records.len() as u64);
175 yield page;
176 }
177 Ok(Some(Err(e))) => {
178 let mut l = metric_labels.clone();
179 l.push(Label::new("kind", SharedString::const_str(error_kind(&e))));
180 counter!("faucet_source_errors_total", l).increment(1);
181 Err(e)?;
182 }
183 Ok(None) => {
184 _timer.disarm();
185 break;
186 }
187 Err(panic) => {
188 let mut l = metric_labels.clone();
189 l.push(Label::new("kind", SharedString::const_str("Panic")));
190 counter!("faucet_source_errors_total", l).increment(1);
191 let msg = panic.downcast_ref::<&'static str>().map(|s| (*s).to_string())
192 .or_else(|| panic.downcast_ref::<String>().cloned())
193 .unwrap_or_else(|| "<non-string panic payload>".to_string());
194 Err(FaucetError::Custom(format!("panic in source: {msg}").into()))?;
195 }
196 }
197 }
198 })
199 }
200}
201
202pub(crate) fn error_kind(e: &FaucetError) -> &'static str {
205 match e {
206 FaucetError::Http(_) => "Http",
207 FaucetError::HttpStatus { .. } => "HttpStatus",
208 FaucetError::Json(_) => "Json",
209 FaucetError::JsonPath(_) => "JsonPath",
210 FaucetError::Auth(_) => "Auth",
211 FaucetError::RateLimited { .. } => "RateLimited",
212 FaucetError::Url(_) => "Url",
213 FaucetError::Transform(_) => "Transform",
214 FaucetError::Config(_) => "Config",
215 FaucetError::Source(_) => "Source",
216 FaucetError::Sink(_) => "Sink",
217 FaucetError::QualityFailure { .. } => "QualityFailure",
218 FaucetError::State(_) => "State",
219 FaucetError::Custom(_) => "Custom",
220 }
221}
222
223pub struct InstrumentedSink<'a, S: Sink + ?Sized> {
226 inner: &'a S,
227 labels: Labels,
228 connector: SharedString,
229 base_labels: Vec<Label>,
231}
232
233impl<'a, S: Sink + ?Sized> InstrumentedSink<'a, S> {
234 pub fn new(inner: &'a S, labels: Labels) -> Self {
235 let raw = inner.connector_name();
236 debug_assert!(
237 !raw.is_empty(),
238 "connector_name() must return a non-empty string"
239 );
240 let connector: SharedString = SharedString::const_str(guarded_connector_name(raw));
241 let base_labels = base_metric_labels(&labels, &connector);
242 Self {
243 inner,
244 labels,
245 connector,
246 base_labels,
247 }
248 }
249
250 fn metric_labels(&self) -> Vec<Label> {
251 self.base_labels.clone()
252 }
253
254 fn error_labels(&self, kind: &'static str) -> Vec<Label> {
255 let mut l = self.metric_labels();
256 l.push(Label::new("kind", SharedString::const_str(kind)));
257 l
258 }
259}
260
261#[async_trait]
262impl<'a, S: Sink + ?Sized> Sink for InstrumentedSink<'a, S> {
263 fn connector_name(&self) -> &'static str {
264 guarded_connector_name(self.inner.connector_name())
268 }
269
270 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
271 let span = info_span!(
272 "faucet.sink.write",
273 pipeline = %self.labels.pipeline,
274 row = %self.labels.row,
275 run_id = %self.labels.run_id,
276 connector = %self.connector,
277 records = records.len(),
278 );
279 let metric_labels = self.metric_labels();
280 gauge!("faucet_sink_in_flight", metric_labels.clone()).increment(1.0);
281
282 struct InFlightGuard(Vec<Label>);
285 impl Drop for InFlightGuard {
286 fn drop(&mut self) {
287 gauge!("faucet_sink_in_flight", self.0.clone()).decrement(1.0);
288 }
289 }
290 let _in_flight = InFlightGuard(metric_labels.clone());
291
292 let _timer =
293 DurationGuard::new("faucet_sink_write_duration_seconds", metric_labels.clone());
294
295 let result = AssertUnwindSafe(self.inner.write_batch(records))
296 .catch_unwind()
297 .instrument(span)
298 .await;
299
300 match result {
301 Ok(Ok(n)) => {
302 counter!("faucet_sink_writes_total", metric_labels.clone()).increment(1);
303 counter!("faucet_sink_records_total", metric_labels.clone()).increment(n as u64);
304 Ok(n)
305 }
306 Ok(Err(e)) => {
307 counter!(
308 "faucet_sink_errors_total",
309 self.error_labels(error_kind(&e))
310 )
311 .increment(1);
312 Err(e)
313 }
314 Err(panic) => {
315 counter!("faucet_sink_errors_total", self.error_labels("Panic")).increment(1);
316 let msg = panic
317 .downcast_ref::<&'static str>()
318 .map(|s| (*s).to_string())
319 .or_else(|| panic.downcast_ref::<String>().cloned())
320 .unwrap_or_else(|| "<non-string panic payload>".to_string());
321 Err(FaucetError::Custom(format!("panic in sink: {msg}").into()))
322 }
323 }
324 }
325
326 async fn write_batch_partial(
327 &self,
328 records: &[Value],
329 ) -> Result<Vec<crate::traits::RowOutcome>, FaucetError> {
330 let span = info_span!(
331 "faucet.sink.write_partial",
332 pipeline = %self.labels.pipeline,
333 row = %self.labels.row,
334 run_id = %self.labels.run_id,
335 connector = %self.connector,
336 records = records.len(),
337 );
338 let metric_labels = self.metric_labels();
339 gauge!("faucet_sink_in_flight", metric_labels.clone()).increment(1.0);
340
341 struct InFlightGuard(Vec<Label>);
344 impl Drop for InFlightGuard {
345 fn drop(&mut self) {
346 gauge!("faucet_sink_in_flight", self.0.clone()).decrement(1.0);
347 }
348 }
349 let _in_flight = InFlightGuard(metric_labels.clone());
350
351 let _timer =
352 DurationGuard::new("faucet_sink_write_duration_seconds", metric_labels.clone());
353
354 let result = AssertUnwindSafe(self.inner.write_batch_partial(records))
355 .catch_unwind()
356 .instrument(span)
357 .await;
358
359 match result {
360 Ok(Ok(outcomes)) => {
361 let success_count = outcomes.iter().filter(|o| o.is_ok()).count();
362 counter!("faucet_sink_writes_total", metric_labels.clone()).increment(1);
363 counter!("faucet_sink_records_total", metric_labels.clone())
364 .increment(success_count as u64);
365 Ok(outcomes)
366 }
367 Ok(Err(e)) => {
368 counter!(
369 "faucet_sink_errors_total",
370 self.error_labels(error_kind(&e))
371 )
372 .increment(1);
373 Err(e)
374 }
375 Err(panic) => {
376 counter!("faucet_sink_errors_total", self.error_labels("Panic")).increment(1);
377 let msg = panic
378 .downcast_ref::<&'static str>()
379 .map(|s| (*s).to_string())
380 .or_else(|| panic.downcast_ref::<String>().cloned())
381 .unwrap_or_else(|| "<non-string panic payload>".to_string());
382 Err(FaucetError::Custom(format!("panic in sink: {msg}").into()))
383 }
384 }
385 }
386
387 async fn flush(&self) -> Result<(), FaucetError> {
388 let span = info_span!(
389 "faucet.sink.flush",
390 pipeline = %self.labels.pipeline,
391 row = %self.labels.row,
392 run_id = %self.labels.run_id,
393 connector = %self.connector,
394 );
395 let metric_labels = self.metric_labels();
396 let _timer =
397 DurationGuard::new("faucet_sink_flush_duration_seconds", metric_labels.clone());
398
399 let result = AssertUnwindSafe(self.inner.flush())
400 .catch_unwind()
401 .instrument(span)
402 .await;
403
404 match result {
405 Ok(Ok(())) => Ok(()),
406 Ok(Err(e)) => {
407 counter!(
408 "faucet_sink_errors_total",
409 self.error_labels(error_kind(&e))
410 )
411 .increment(1);
412 Err(e)
413 }
414 Err(panic) => {
415 counter!("faucet_sink_errors_total", self.error_labels("Panic")).increment(1);
416 let msg = panic
417 .downcast_ref::<&'static str>()
418 .map(|s| (*s).to_string())
419 .or_else(|| panic.downcast_ref::<String>().cloned())
420 .unwrap_or_else(|| "<non-string panic payload>".to_string());
421 Err(FaucetError::Custom(format!("panic in flush: {msg}").into()))
422 }
423 }
424 }
425}
426
427#[cfg(test)]
428pub(crate) mod source_tests {
429 use super::*;
430 use async_trait::async_trait;
431 use futures::StreamExt;
432 use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshotter};
433 use serde_json::json;
434 use std::sync::{Mutex, OnceLock};
435
436 pub(crate) static LOCK: Mutex<()> = Mutex::new(());
439 static SNAPSHOTTER: OnceLock<Snapshotter> = OnceLock::new();
440
441 pub(crate) fn snapshotter() -> &'static Snapshotter {
442 SNAPSHOTTER.get_or_init(|| {
443 let recorder = DebuggingRecorder::new();
444 let snap = recorder.snapshotter();
445 let _ = metrics::set_global_recorder(recorder);
453 snap
454 })
455 }
456
457 pub(in crate::observability) fn labels() -> Labels {
458 Labels::new("p", "r", "rid")
459 }
460
461 struct MockSource(Vec<Value>);
462 #[async_trait]
463 impl Source for MockSource {
464 async fn fetch_with_context(
465 &self,
466 _: &HashMap<String, Value>,
467 ) -> Result<Vec<Value>, FaucetError> {
468 Ok(self.0.clone())
469 }
470 fn connector_name(&self) -> &'static str {
471 "mock"
472 }
473 }
474
475 struct PanickingSource;
476 #[async_trait]
477 impl Source for PanickingSource {
478 async fn fetch_with_context(
479 &self,
480 _: &HashMap<String, Value>,
481 ) -> Result<Vec<Value>, FaucetError> {
482 panic!("kaboom")
483 }
484 fn connector_name(&self) -> &'static str {
485 "panic-test"
486 }
487 }
488
489 struct EmptyNameSource;
493 #[async_trait]
494 impl Source for EmptyNameSource {
495 async fn fetch_with_context(
496 &self,
497 _: &HashMap<String, Value>,
498 ) -> Result<Vec<Value>, FaucetError> {
499 Ok(vec![])
500 }
501 fn connector_name(&self) -> &'static str {
502 ""
503 }
504 }
505
506 #[test]
507 fn empty_inner_connector_name_falls_back_to_unknown() {
508 let inner = EmptyNameSource;
509 let wrapped = InstrumentedSource {
513 inner: &inner,
514 labels: labels(),
515 connector: SharedString::const_str("unknown"),
516 base_labels: Vec::new(),
517 page_index: Arc::new(AtomicUsize::new(0)),
518 };
519 assert_eq!(
520 Source::connector_name(&wrapped),
521 "unknown",
522 "instrumented source must not leak an empty connector name"
523 );
524 }
525
526 #[tokio::test]
527 #[allow(clippy::await_holding_lock)]
528 async fn records_records_counter_per_page() {
529 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
530 let snap = snapshotter();
531 let inner = MockSource((0..5).map(|i| json!({"i": i})).collect());
532 let wrapped = InstrumentedSource::new(&inner, labels());
533 let ctx = HashMap::new();
534 let mut s = wrapped.stream_pages(&ctx, 2);
535 while s.next().await.is_some() {}
536 let snapshot = snap.snapshot();
537 let records: u64 = snapshot
538 .into_vec()
539 .into_iter()
540 .filter_map(|(key, _u, _d, v)| {
541 if key.key().name() == "faucet_source_records_total"
542 && let DebugValue::Counter(c) = v
543 {
544 return Some(c);
545 }
546 None
547 })
548 .sum();
549 assert!(
550 records >= 5,
551 "expected at least 5 records counted, got {records}"
552 );
553 }
554
555 struct PageCountSource(Vec<Value>);
558 #[async_trait]
559 impl Source for PageCountSource {
560 async fn fetch_with_context(
561 &self,
562 _: &HashMap<String, Value>,
563 ) -> Result<Vec<Value>, FaucetError> {
564 Ok(self.0.clone())
565 }
566 fn connector_name(&self) -> &'static str {
567 "page-count-probe"
568 }
569 }
570
571 #[tokio::test]
572 #[allow(clippy::await_holding_lock)]
573 async fn page_duration_records_one_sample_per_yielded_page() {
574 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
577 let snap = snapshotter();
578 let inner = PageCountSource((0..5).map(|i| json!({"i": i})).collect());
579 let wrapped = InstrumentedSource::new(&inner, labels());
580 let ctx = HashMap::new();
581 let mut s = wrapped.stream_pages(&ctx, 2);
582 let mut pages = 0usize;
583 while s.next().await.is_some() {
584 pages += 1;
585 }
586 assert_eq!(pages, 3, "expected 3 yielded pages");
587
588 let snapshot = snap.snapshot();
589 let samples: usize = snapshot
590 .into_vec()
591 .into_iter()
592 .filter_map(|(key, _u, _d, v)| {
593 if key.key().name() == "faucet_source_page_duration_seconds"
594 && key
595 .key()
596 .labels()
597 .any(|l| l.key() == "connector" && l.value() == "page-count-probe")
598 && let DebugValue::Histogram(h) = v
599 {
600 return Some(h.len());
601 }
602 None
603 })
604 .sum();
605 assert_eq!(
606 samples, pages,
607 "page-duration histogram must have exactly one sample per yielded \
608 page ({pages}), not page+1 (no spurious terminal sample)"
609 );
610 }
611
612 #[tokio::test]
613 #[allow(clippy::await_holding_lock)]
614 async fn maps_panic_to_custom_error_with_kind_panic() {
615 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
616 let _snap = snapshotter();
617 let inner = PanickingSource;
618 let wrapped = InstrumentedSource::new(&inner, labels());
619 let ctx = HashMap::new();
620 let mut s = wrapped.stream_pages(&ctx, 10);
621 let first = s
622 .next()
623 .await
624 .expect("stream yields at least one item before terminating");
625 assert!(matches!(first, Err(FaucetError::Custom(_))));
626 }
628}
629
630#[cfg(test)]
631mod sink_tests {
632 use super::source_tests::{LOCK, labels, snapshotter};
633 use super::*;
634 use async_trait::async_trait;
635 use metrics_util::debugging::DebugValue;
636 use serde_json::json;
637
638 struct MockSink(std::sync::Mutex<Vec<Value>>);
639 #[async_trait]
640 impl Sink for MockSink {
641 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
642 self.0.lock().unwrap().extend(records.iter().cloned());
643 Ok(records.len())
644 }
645 fn connector_name(&self) -> &'static str {
646 "mock-sink"
647 }
648 }
649
650 struct FailingSink;
651 #[async_trait]
652 impl Sink for FailingSink {
653 async fn write_batch(&self, _: &[Value]) -> Result<usize, FaucetError> {
654 Err(FaucetError::Sink("nope".into()))
655 }
656 fn connector_name(&self) -> &'static str {
657 "failing-sink"
658 }
659 }
660
661 struct EmptyNameSink;
662 #[async_trait]
663 impl Sink for EmptyNameSink {
664 async fn write_batch(&self, _: &[Value]) -> Result<usize, FaucetError> {
665 Ok(0)
666 }
667 fn connector_name(&self) -> &'static str {
668 ""
669 }
670 }
671
672 #[test]
673 fn empty_inner_connector_name_falls_back_to_unknown() {
674 let inner = EmptyNameSink;
675 let wrapped = InstrumentedSink {
679 inner: &inner,
680 labels: labels(),
681 connector: SharedString::const_str("unknown"),
682 base_labels: Vec::new(),
683 };
684 assert_eq!(
685 Sink::connector_name(&wrapped),
686 "unknown",
687 "instrumented sink must not leak an empty connector name"
688 );
689 }
690
691 #[tokio::test]
692 #[allow(clippy::await_holding_lock)]
693 async fn records_writes_and_records_counters() {
694 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
695 let snap = snapshotter();
696 let inner = MockSink(std::sync::Mutex::new(Vec::new()));
697 let wrapped = InstrumentedSink::new(&inner, labels());
698 wrapped
699 .write_batch(&[json!({"a": 1}), json!({"a": 2})])
700 .await
701 .unwrap();
702 let snapshot = snap.snapshot();
703 let writes: u64 = snapshot
704 .into_vec()
705 .into_iter()
706 .filter_map(|(key, _u, _d, v)| {
707 if key.key().name() == "faucet_sink_writes_total"
708 && let DebugValue::Counter(c) = v
709 {
710 return Some(c);
711 }
712 None
713 })
714 .sum();
715 assert!(writes >= 1, "expected at least one write counted");
716 }
717
718 #[tokio::test]
719 #[allow(clippy::await_holding_lock)]
720 async fn error_increments_errors_total_with_kind() {
721 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
722 let snap = snapshotter();
723 let inner = FailingSink;
724 let wrapped = InstrumentedSink::new(&inner, labels());
725 let _ = wrapped.write_batch(&[json!({})]).await;
726 let snapshot = snap.snapshot();
727 let found = snapshot.into_vec().into_iter().any(|(key, _u, _d, v)| {
728 key.key().name() == "faucet_sink_errors_total"
729 && key
730 .key()
731 .labels()
732 .any(|l| l.key() == "kind" && l.value() == "Sink")
733 && matches!(v, DebugValue::Counter(c) if c >= 1)
734 });
735 assert!(found, "expected sink_errors_total with kind=Sink");
736 }
737
738 #[tokio::test]
739 #[allow(clippy::await_holding_lock)]
740 async fn instrumented_sink_write_batch_partial_counts_successful_outcomes() {
741 use crate::traits::RowOutcome;
742 use metrics_util::debugging::DebugValue;
743
744 struct MixedSink;
746 #[async_trait]
747 impl Sink for MixedSink {
748 async fn write_batch(&self, _r: &[Value]) -> Result<usize, FaucetError> {
749 unreachable!()
750 }
751 async fn write_batch_partial(
752 &self,
753 _r: &[Value],
754 ) -> Result<Vec<RowOutcome>, FaucetError> {
755 Ok(vec![
756 Ok(()),
757 Err(FaucetError::Sink("bad row".into())),
758 Ok(()),
759 ])
760 }
761 fn connector_name(&self) -> &'static str {
762 "mixed"
763 }
764 }
765
766 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
767 let snap = snapshotter();
768
769 let inner = MixedSink;
770 let wrapped = InstrumentedSink::new(&inner, labels());
771 let _ = wrapped
772 .write_batch_partial(&[json!({}), json!({}), json!({})])
773 .await
774 .unwrap();
775
776 let snapshot = snap.snapshot();
784 let records: u64 = snapshot
785 .into_vec()
786 .into_iter()
787 .filter_map(|(k, _u, _d, v): (metrics_util::CompositeKey, _, _, _)| {
788 if k.key().name() == "faucet_sink_records_total"
789 && k.key()
790 .labels()
791 .any(|l| l.key() == "connector" && l.value() == "mixed")
792 && let DebugValue::Counter(c) = v
793 {
794 Some(c)
795 } else {
796 None
797 }
798 })
799 .sum();
800 assert!(
801 records >= 2,
802 "expected faucet_sink_records_total{{connector=mixed}} >= 2, got {records}"
803 );
804 }
805}