use crate::error::FaucetError;
use crate::observability::labels::Labels;
use crate::observability::timer::DurationGuard;
use crate::pipeline::StreamPage;
use crate::traits::{Sink, Source};
use async_trait::async_trait;
use futures::FutureExt;
use futures_core::Stream;
use metrics::{Label, SharedString, counter, gauge};
use serde_json::Value;
use std::collections::HashMap;
use std::panic::AssertUnwindSafe;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tracing::{Instrument, info_span};
fn guarded_connector_name(raw: &'static str) -> &'static str {
if raw.is_empty() { "unknown" } else { raw }
}
fn base_metric_labels(labels: &Labels, connector: &SharedString) -> Vec<Label> {
vec![
Label::new("pipeline", SharedString::from(labels.pipeline.to_string())),
Label::new("row", SharedString::from(labels.row.to_string())),
Label::new("connector", connector.clone()),
]
}
pub struct InstrumentedSource<'a, S: Source + ?Sized> {
inner: &'a S,
labels: Labels,
connector: SharedString,
base_labels: Vec<Label>,
page_index: Arc<AtomicUsize>,
}
impl<'a, S: Source + ?Sized> InstrumentedSource<'a, S> {
pub fn new(inner: &'a S, labels: Labels) -> Self {
let raw = inner.connector_name();
debug_assert!(
!raw.is_empty(),
"connector_name() must return a non-empty string"
);
let connector: SharedString = SharedString::const_str(guarded_connector_name(raw));
let base_labels = base_metric_labels(&labels, &connector);
Self {
inner,
labels,
connector,
base_labels,
page_index: Arc::new(AtomicUsize::new(0)),
}
}
fn metric_labels(&self) -> Vec<Label> {
self.base_labels.clone()
}
#[allow(dead_code)]
fn error_labels(&self, kind: &'static str) -> Vec<Label> {
let mut l = self.metric_labels();
l.push(Label::new("kind", SharedString::const_str(kind)));
l
}
}
#[async_trait]
impl<'a, S: Source + ?Sized> Source for InstrumentedSource<'a, S> {
fn connector_name(&self) -> &'static str {
guarded_connector_name(self.inner.connector_name())
}
fn state_key(&self) -> Option<String> {
self.inner.state_key()
}
async fn apply_start_bookmark(&self, bookmark: Value) -> Result<(), FaucetError> {
self.inner.apply_start_bookmark(bookmark).await
}
async fn fetch_with_context(
&self,
context: &HashMap<String, Value>,
) -> Result<Vec<Value>, FaucetError> {
self.inner.fetch_with_context(context).await
}
async fn fetch_with_context_incremental(
&self,
context: &HashMap<String, Value>,
) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
self.inner.fetch_with_context_incremental(context).await
}
fn stream_pages<'b>(
&'b self,
context: &'b HashMap<String, Value>,
batch_size: usize,
) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'b>> {
let inner_stream = self.inner.stream_pages(context, batch_size);
let labels = self.labels.clone();
let connector = self.connector.clone();
let page_index = Arc::clone(&self.page_index);
let metric_labels = self.metric_labels();
let pipeline = self.labels.pipeline.clone();
let row = self.labels.row.clone();
Box::pin(async_stream::try_stream! {
struct InFlightGuard(Vec<Label>);
impl Drop for InFlightGuard {
fn drop(&mut self) {
gauge!("faucet_source_in_flight", self.0.clone()).decrement(1.0);
}
}
gauge!("faucet_source_in_flight", metric_labels.clone()).increment(1.0);
let _in_flight = InFlightGuard(metric_labels.clone());
let mut inner = inner_stream;
loop {
let idx = page_index.fetch_add(1, Ordering::Relaxed);
let span = info_span!(
"faucet.source.page",
pipeline = %pipeline,
row = %row,
run_id = %labels.run_id,
connector = %connector,
page_index = idx,
);
let mut _timer = DurationGuard::new(
"faucet_source_page_duration_seconds",
metric_labels.clone(),
);
let next = AssertUnwindSafe(async {
use futures::StreamExt;
inner.next().await
})
.catch_unwind()
.instrument(span)
.await;
match next {
Ok(Some(Ok(page))) => {
counter!("faucet_source_pages_total", metric_labels.clone()).increment(1);
counter!("faucet_source_records_total", metric_labels.clone())
.increment(page.records.len() as u64);
yield page;
}
Ok(Some(Err(e))) => {
let mut l = metric_labels.clone();
l.push(Label::new("kind", SharedString::const_str(error_kind(&e))));
counter!("faucet_source_errors_total", l).increment(1);
Err(e)?;
}
Ok(None) => {
_timer.disarm();
break;
}
Err(panic) => {
let mut l = metric_labels.clone();
l.push(Label::new("kind", SharedString::const_str("Panic")));
counter!("faucet_source_errors_total", l).increment(1);
let msg = panic.downcast_ref::<&'static str>().map(|s| (*s).to_string())
.or_else(|| panic.downcast_ref::<String>().cloned())
.unwrap_or_else(|| "<non-string panic payload>".to_string());
Err(FaucetError::Custom(format!("panic in source: {msg}").into()))?;
}
}
}
})
}
}
pub(crate) fn error_kind(e: &FaucetError) -> &'static str {
match e {
FaucetError::Http(_) => "Http",
FaucetError::HttpStatus { .. } => "HttpStatus",
FaucetError::Json(_) => "Json",
FaucetError::JsonPath(_) => "JsonPath",
FaucetError::Auth(_) => "Auth",
FaucetError::RateLimited { .. } => "RateLimited",
FaucetError::Url(_) => "Url",
FaucetError::Transform(_) => "Transform",
FaucetError::Config(_) => "Config",
FaucetError::Source(_) => "Source",
FaucetError::Sink(_) => "Sink",
FaucetError::QualityFailure { .. } => "QualityFailure",
FaucetError::State(_) => "State",
FaucetError::Custom(_) => "Custom",
}
}
pub struct InstrumentedSink<'a, S: Sink + ?Sized> {
inner: &'a S,
labels: Labels,
connector: SharedString,
base_labels: Vec<Label>,
}
impl<'a, S: Sink + ?Sized> InstrumentedSink<'a, S> {
pub fn new(inner: &'a S, labels: Labels) -> Self {
let raw = inner.connector_name();
debug_assert!(
!raw.is_empty(),
"connector_name() must return a non-empty string"
);
let connector: SharedString = SharedString::const_str(guarded_connector_name(raw));
let base_labels = base_metric_labels(&labels, &connector);
Self {
inner,
labels,
connector,
base_labels,
}
}
fn metric_labels(&self) -> Vec<Label> {
self.base_labels.clone()
}
fn error_labels(&self, kind: &'static str) -> Vec<Label> {
let mut l = self.metric_labels();
l.push(Label::new("kind", SharedString::const_str(kind)));
l
}
}
#[async_trait]
impl<'a, S: Sink + ?Sized> Sink for InstrumentedSink<'a, S> {
fn connector_name(&self) -> &'static str {
guarded_connector_name(self.inner.connector_name())
}
async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
let span = info_span!(
"faucet.sink.write",
pipeline = %self.labels.pipeline,
row = %self.labels.row,
run_id = %self.labels.run_id,
connector = %self.connector,
records = records.len(),
);
let metric_labels = self.metric_labels();
gauge!("faucet_sink_in_flight", metric_labels.clone()).increment(1.0);
struct InFlightGuard(Vec<Label>);
impl Drop for InFlightGuard {
fn drop(&mut self) {
gauge!("faucet_sink_in_flight", self.0.clone()).decrement(1.0);
}
}
let _in_flight = InFlightGuard(metric_labels.clone());
let _timer =
DurationGuard::new("faucet_sink_write_duration_seconds", metric_labels.clone());
let result = AssertUnwindSafe(self.inner.write_batch(records))
.catch_unwind()
.instrument(span)
.await;
match result {
Ok(Ok(n)) => {
counter!("faucet_sink_writes_total", metric_labels.clone()).increment(1);
counter!("faucet_sink_records_total", metric_labels.clone()).increment(n as u64);
Ok(n)
}
Ok(Err(e)) => {
counter!(
"faucet_sink_errors_total",
self.error_labels(error_kind(&e))
)
.increment(1);
Err(e)
}
Err(panic) => {
counter!("faucet_sink_errors_total", self.error_labels("Panic")).increment(1);
let msg = panic
.downcast_ref::<&'static str>()
.map(|s| (*s).to_string())
.or_else(|| panic.downcast_ref::<String>().cloned())
.unwrap_or_else(|| "<non-string panic payload>".to_string());
Err(FaucetError::Custom(format!("panic in sink: {msg}").into()))
}
}
}
async fn write_batch_partial(
&self,
records: &[Value],
) -> Result<Vec<crate::traits::RowOutcome>, FaucetError> {
let span = info_span!(
"faucet.sink.write_partial",
pipeline = %self.labels.pipeline,
row = %self.labels.row,
run_id = %self.labels.run_id,
connector = %self.connector,
records = records.len(),
);
let metric_labels = self.metric_labels();
gauge!("faucet_sink_in_flight", metric_labels.clone()).increment(1.0);
struct InFlightGuard(Vec<Label>);
impl Drop for InFlightGuard {
fn drop(&mut self) {
gauge!("faucet_sink_in_flight", self.0.clone()).decrement(1.0);
}
}
let _in_flight = InFlightGuard(metric_labels.clone());
let _timer =
DurationGuard::new("faucet_sink_write_duration_seconds", metric_labels.clone());
let result = AssertUnwindSafe(self.inner.write_batch_partial(records))
.catch_unwind()
.instrument(span)
.await;
match result {
Ok(Ok(outcomes)) => {
let success_count = outcomes.iter().filter(|o| o.is_ok()).count();
counter!("faucet_sink_writes_total", metric_labels.clone()).increment(1);
counter!("faucet_sink_records_total", metric_labels.clone())
.increment(success_count as u64);
Ok(outcomes)
}
Ok(Err(e)) => {
counter!(
"faucet_sink_errors_total",
self.error_labels(error_kind(&e))
)
.increment(1);
Err(e)
}
Err(panic) => {
counter!("faucet_sink_errors_total", self.error_labels("Panic")).increment(1);
let msg = panic
.downcast_ref::<&'static str>()
.map(|s| (*s).to_string())
.or_else(|| panic.downcast_ref::<String>().cloned())
.unwrap_or_else(|| "<non-string panic payload>".to_string());
Err(FaucetError::Custom(format!("panic in sink: {msg}").into()))
}
}
}
async fn flush(&self) -> Result<(), FaucetError> {
let span = info_span!(
"faucet.sink.flush",
pipeline = %self.labels.pipeline,
row = %self.labels.row,
run_id = %self.labels.run_id,
connector = %self.connector,
);
let metric_labels = self.metric_labels();
let _timer =
DurationGuard::new("faucet_sink_flush_duration_seconds", metric_labels.clone());
let result = AssertUnwindSafe(self.inner.flush())
.catch_unwind()
.instrument(span)
.await;
match result {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => {
counter!(
"faucet_sink_errors_total",
self.error_labels(error_kind(&e))
)
.increment(1);
Err(e)
}
Err(panic) => {
counter!("faucet_sink_errors_total", self.error_labels("Panic")).increment(1);
let msg = panic
.downcast_ref::<&'static str>()
.map(|s| (*s).to_string())
.or_else(|| panic.downcast_ref::<String>().cloned())
.unwrap_or_else(|| "<non-string panic payload>".to_string());
Err(FaucetError::Custom(format!("panic in flush: {msg}").into()))
}
}
}
}
#[cfg(test)]
pub(crate) mod source_tests {
use super::*;
use async_trait::async_trait;
use futures::StreamExt;
use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshotter};
use serde_json::json;
use std::sync::{Mutex, OnceLock};
pub(crate) static LOCK: Mutex<()> = Mutex::new(());
static SNAPSHOTTER: OnceLock<Snapshotter> = OnceLock::new();
pub(crate) fn snapshotter() -> &'static Snapshotter {
SNAPSHOTTER.get_or_init(|| {
let recorder = DebuggingRecorder::new();
let snap = recorder.snapshotter();
let _ = metrics::set_global_recorder(recorder);
snap
})
}
pub(in crate::observability) fn labels() -> Labels {
Labels::new("p", "r", "rid")
}
struct MockSource(Vec<Value>);
#[async_trait]
impl Source for MockSource {
async fn fetch_with_context(
&self,
_: &HashMap<String, Value>,
) -> Result<Vec<Value>, FaucetError> {
Ok(self.0.clone())
}
fn connector_name(&self) -> &'static str {
"mock"
}
}
struct PanickingSource;
#[async_trait]
impl Source for PanickingSource {
async fn fetch_with_context(
&self,
_: &HashMap<String, Value>,
) -> Result<Vec<Value>, FaucetError> {
panic!("kaboom")
}
fn connector_name(&self) -> &'static str {
"panic-test"
}
}
struct EmptyNameSource;
#[async_trait]
impl Source for EmptyNameSource {
async fn fetch_with_context(
&self,
_: &HashMap<String, Value>,
) -> Result<Vec<Value>, FaucetError> {
Ok(vec![])
}
fn connector_name(&self) -> &'static str {
""
}
}
#[test]
fn empty_inner_connector_name_falls_back_to_unknown() {
let inner = EmptyNameSource;
let wrapped = InstrumentedSource {
inner: &inner,
labels: labels(),
connector: SharedString::const_str("unknown"),
base_labels: Vec::new(),
page_index: Arc::new(AtomicUsize::new(0)),
};
assert_eq!(
Source::connector_name(&wrapped),
"unknown",
"instrumented source must not leak an empty connector name"
);
}
#[tokio::test]
#[allow(clippy::await_holding_lock)]
async fn records_records_counter_per_page() {
let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
let snap = snapshotter();
let inner = MockSource((0..5).map(|i| json!({"i": i})).collect());
let wrapped = InstrumentedSource::new(&inner, labels());
let ctx = HashMap::new();
let mut s = wrapped.stream_pages(&ctx, 2);
while s.next().await.is_some() {}
let snapshot = snap.snapshot();
let records: u64 = snapshot
.into_vec()
.into_iter()
.filter_map(|(key, _u, _d, v)| {
if key.key().name() == "faucet_source_records_total"
&& let DebugValue::Counter(c) = v
{
return Some(c);
}
None
})
.sum();
assert!(
records >= 5,
"expected at least 5 records counted, got {records}"
);
}
struct PageCountSource(Vec<Value>);
#[async_trait]
impl Source for PageCountSource {
async fn fetch_with_context(
&self,
_: &HashMap<String, Value>,
) -> Result<Vec<Value>, FaucetError> {
Ok(self.0.clone())
}
fn connector_name(&self) -> &'static str {
"page-count-probe"
}
}
#[tokio::test]
#[allow(clippy::await_holding_lock)]
async fn page_duration_records_one_sample_per_yielded_page() {
let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
let snap = snapshotter();
let inner = PageCountSource((0..5).map(|i| json!({"i": i})).collect());
let wrapped = InstrumentedSource::new(&inner, labels());
let ctx = HashMap::new();
let mut s = wrapped.stream_pages(&ctx, 2);
let mut pages = 0usize;
while s.next().await.is_some() {
pages += 1;
}
assert_eq!(pages, 3, "expected 3 yielded pages");
let snapshot = snap.snapshot();
let samples: usize = snapshot
.into_vec()
.into_iter()
.filter_map(|(key, _u, _d, v)| {
if key.key().name() == "faucet_source_page_duration_seconds"
&& key
.key()
.labels()
.any(|l| l.key() == "connector" && l.value() == "page-count-probe")
&& let DebugValue::Histogram(h) = v
{
return Some(h.len());
}
None
})
.sum();
assert_eq!(
samples, pages,
"page-duration histogram must have exactly one sample per yielded \
page ({pages}), not page+1 (no spurious terminal sample)"
);
}
#[tokio::test]
#[allow(clippy::await_holding_lock)]
async fn maps_panic_to_custom_error_with_kind_panic() {
let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
let _snap = snapshotter();
let inner = PanickingSource;
let wrapped = InstrumentedSource::new(&inner, labels());
let ctx = HashMap::new();
let mut s = wrapped.stream_pages(&ctx, 10);
let first = s
.next()
.await
.expect("stream yields at least one item before terminating");
assert!(matches!(first, Err(FaucetError::Custom(_))));
}
}
#[cfg(test)]
mod sink_tests {
use super::source_tests::{LOCK, labels, snapshotter};
use super::*;
use async_trait::async_trait;
use metrics_util::debugging::DebugValue;
use serde_json::json;
struct MockSink(std::sync::Mutex<Vec<Value>>);
#[async_trait]
impl Sink for MockSink {
async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
self.0.lock().unwrap().extend(records.iter().cloned());
Ok(records.len())
}
fn connector_name(&self) -> &'static str {
"mock-sink"
}
}
struct FailingSink;
#[async_trait]
impl Sink for FailingSink {
async fn write_batch(&self, _: &[Value]) -> Result<usize, FaucetError> {
Err(FaucetError::Sink("nope".into()))
}
fn connector_name(&self) -> &'static str {
"failing-sink"
}
}
struct EmptyNameSink;
#[async_trait]
impl Sink for EmptyNameSink {
async fn write_batch(&self, _: &[Value]) -> Result<usize, FaucetError> {
Ok(0)
}
fn connector_name(&self) -> &'static str {
""
}
}
#[test]
fn empty_inner_connector_name_falls_back_to_unknown() {
let inner = EmptyNameSink;
let wrapped = InstrumentedSink {
inner: &inner,
labels: labels(),
connector: SharedString::const_str("unknown"),
base_labels: Vec::new(),
};
assert_eq!(
Sink::connector_name(&wrapped),
"unknown",
"instrumented sink must not leak an empty connector name"
);
}
#[tokio::test]
#[allow(clippy::await_holding_lock)]
async fn records_writes_and_records_counters() {
let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
let snap = snapshotter();
let inner = MockSink(std::sync::Mutex::new(Vec::new()));
let wrapped = InstrumentedSink::new(&inner, labels());
wrapped
.write_batch(&[json!({"a": 1}), json!({"a": 2})])
.await
.unwrap();
let snapshot = snap.snapshot();
let writes: u64 = snapshot
.into_vec()
.into_iter()
.filter_map(|(key, _u, _d, v)| {
if key.key().name() == "faucet_sink_writes_total"
&& let DebugValue::Counter(c) = v
{
return Some(c);
}
None
})
.sum();
assert!(writes >= 1, "expected at least one write counted");
}
#[tokio::test]
#[allow(clippy::await_holding_lock)]
async fn error_increments_errors_total_with_kind() {
let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
let snap = snapshotter();
let inner = FailingSink;
let wrapped = InstrumentedSink::new(&inner, labels());
let _ = wrapped.write_batch(&[json!({})]).await;
let snapshot = snap.snapshot();
let found = snapshot.into_vec().into_iter().any(|(key, _u, _d, v)| {
key.key().name() == "faucet_sink_errors_total"
&& key
.key()
.labels()
.any(|l| l.key() == "kind" && l.value() == "Sink")
&& matches!(v, DebugValue::Counter(c) if c >= 1)
});
assert!(found, "expected sink_errors_total with kind=Sink");
}
#[tokio::test]
#[allow(clippy::await_holding_lock)]
async fn instrumented_sink_write_batch_partial_counts_successful_outcomes() {
use crate::traits::RowOutcome;
use metrics_util::debugging::DebugValue;
struct MixedSink;
#[async_trait]
impl Sink for MixedSink {
async fn write_batch(&self, _r: &[Value]) -> Result<usize, FaucetError> {
unreachable!()
}
async fn write_batch_partial(
&self,
_r: &[Value],
) -> Result<Vec<RowOutcome>, FaucetError> {
Ok(vec![
Ok(()),
Err(FaucetError::Sink("bad row".into())),
Ok(()),
])
}
fn connector_name(&self) -> &'static str {
"mixed"
}
}
let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
let snap = snapshotter();
let inner = MixedSink;
let wrapped = InstrumentedSink::new(&inner, labels());
let _ = wrapped
.write_batch_partial(&[json!({}), json!({}), json!({})])
.await
.unwrap();
let snapshot = snap.snapshot();
let records: u64 = snapshot
.into_vec()
.into_iter()
.filter_map(|(k, _u, _d, v): (metrics_util::CompositeKey, _, _, _)| {
if k.key().name() == "faucet_sink_records_total"
&& k.key()
.labels()
.any(|l| l.key() == "connector" && l.value() == "mixed")
&& let DebugValue::Counter(c) = v
{
Some(c)
} else {
None
}
})
.sum();
assert!(
records >= 2,
"expected faucet_sink_records_total{{connector=mixed}} >= 2, got {records}"
);
}
}