use crate::{
logging::{LogEvent, LogLevel},
metrics::SiftStreamMetrics,
};
use std::sync::Arc;
#[derive(Default)]
pub(crate) struct TelemetryVisitor {
pub message: String,
pub fields: Vec<(String, String)>,
}
impl tracing::field::Visit for TelemetryVisitor {
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if field.name() == "message" {
self.message = value.to_owned();
} else {
self.fields
.push((field.name().to_owned(), value.to_owned()));
}
}
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
let s = format!("{value:?}");
if field.name() == "message" {
self.message = s;
} else {
self.fields.push((field.name().to_owned(), s));
}
}
}
pub(crate) struct SiftTelemetryLayer {
tx: async_channel::Sender<LogEvent>,
level_filter: LogLevel,
metrics: Arc<SiftStreamMetrics>,
}
impl SiftTelemetryLayer {
pub(crate) fn new(
tx: async_channel::Sender<LogEvent>,
level_filter: LogLevel,
metrics: Arc<SiftStreamMetrics>,
) -> Self {
Self {
tx,
level_filter,
metrics,
}
}
}
impl<S: tracing::Subscriber> tracing_subscriber::layer::Layer<S> for SiftTelemetryLayer {
fn on_event(
&self,
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let metadata = event.metadata();
if !metadata.target().starts_with("sift_stream") {
return;
}
let level = LogLevel::from(metadata.level());
if level > self.level_filter {
return;
}
let mut visitor = TelemetryVisitor::default();
event.record(&mut visitor);
let log_event = LogEvent {
level,
target: metadata.target(),
file: metadata.file().unwrap_or(""),
line: metadata.line().unwrap_or(0),
message: visitor.message,
fields: visitor.fields,
timestamp: std::time::SystemTime::now(),
};
if self.tx.try_send(log_event).is_err() {
self.metrics.logs_dropped_channel_full.increment();
}
}
}
pub(crate) struct DispatchForwardingLayer(pub tracing::Dispatch);
impl<S: tracing::Subscriber> tracing_subscriber::layer::Layer<S> for DispatchForwardingLayer {
fn on_event(
&self,
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
if event.metadata().target().starts_with("sift_stream") && self.0.enabled(event.metadata())
{
self.0.event(event);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::metrics::SiftStreamMetrics;
use std::sync::{
Arc,
atomic::{AtomicU32, Ordering},
};
use tracing_subscriber::layer::SubscriberExt;
fn make_layer(
capacity: usize,
level: LogLevel,
) -> (
SiftTelemetryLayer,
async_channel::Receiver<LogEvent>,
Arc<SiftStreamMetrics>,
) {
let (tx, rx) = async_channel::bounded(capacity);
let metrics = Arc::new(SiftStreamMetrics::new());
let layer = SiftTelemetryLayer::new(tx, level, metrics.clone());
(layer, rx, metrics)
}
fn with_layer<F: FnOnce()>(layer: SiftTelemetryLayer, f: F) {
let subscriber = tracing_subscriber::registry().with(layer);
tracing::dispatcher::with_default(&tracing::Dispatch::new(subscriber), f);
}
struct EventCounter(Arc<AtomicU32>);
impl tracing::Subscriber for EventCounter {
fn enabled(&self, _: &tracing::Metadata<'_>) -> bool {
true
}
fn new_span(&self, _: &tracing::span::Attributes<'_>) -> tracing::span::Id {
tracing::span::Id::from_u64(1)
}
fn record(&self, _: &tracing::span::Id, _: &tracing::span::Record<'_>) {}
fn record_follows_from(&self, _: &tracing::span::Id, _: &tracing::span::Id) {}
fn event(&self, _: &tracing::Event<'_>) {
self.0.fetch_add(1, Ordering::Relaxed);
}
fn enter(&self, _: &tracing::span::Id) {}
fn exit(&self, _: &tracing::span::Id) {}
}
#[test]
fn layer_captures_sift_stream_event() {
let (layer, rx, _) = make_layer(8, LogLevel::Info);
with_layer(layer, || {
tracing::event!(
target: "sift_stream::tasks::ingestion",
tracing::Level::INFO,
"hello from sift"
);
});
assert_eq!(rx.len(), 1);
let ev = rx.try_recv().unwrap();
assert_eq!(ev.message, "hello from sift");
assert_eq!(ev.level, LogLevel::Info);
assert_eq!(ev.target, "sift_stream::tasks::ingestion");
}
#[test]
fn layer_ignores_non_sift_stream_target() {
let (layer, rx, _) = make_layer(8, LogLevel::Info);
with_layer(layer, || {
tracing::event!(target: "h2::proto::connection", tracing::Level::INFO, "h2 noise");
tracing::event!(target: "tonic::transport", tracing::Level::WARN, "tonic noise");
tracing::event!(target: "hyper::client", tracing::Level::ERROR, "hyper noise");
});
assert_eq!(rx.len(), 0, "non-sift_stream events must not be captured");
}
#[test]
fn layer_filters_events_below_level_threshold() {
let (layer, rx, _) = make_layer(8, LogLevel::Info);
with_layer(layer, || {
tracing::event!(
target: "sift_stream::tasks",
tracing::Level::DEBUG,
"debug noise"
);
tracing::event!(
target: "sift_stream::tasks",
tracing::Level::TRACE,
"trace noise"
);
});
assert_eq!(
rx.len(),
0,
"DEBUG/TRACE must be dropped with an INFO filter"
);
}
#[test]
fn layer_passes_events_at_and_above_level_threshold() {
let (layer, rx, _) = make_layer(8, LogLevel::Info);
with_layer(layer, || {
tracing::event!(target: "sift_stream::tasks", tracing::Level::INFO, "info");
tracing::event!(target: "sift_stream::tasks", tracing::Level::WARN, "warn");
tracing::event!(target: "sift_stream::tasks", tracing::Level::ERROR, "error");
});
assert_eq!(
rx.len(),
3,
"INFO, WARN, ERROR must all pass an INFO filter"
);
}
#[test]
fn layer_captures_structured_fields() {
let (layer, rx, _) = make_layer(8, LogLevel::Info);
with_layer(layer, || {
tracing::event!(
target: "sift_stream::tasks",
tracing::Level::INFO,
user = "alice",
request_id = "req-42",
"user login"
);
});
let ev = rx.try_recv().unwrap();
assert_eq!(ev.message, "user login");
assert!(
ev.fields.iter().any(|(k, v)| k == "user" && v == "alice"),
"user field not captured: {:?}",
ev.fields
);
assert!(
ev.fields
.iter()
.any(|(k, v)| k == "request_id" && v == "req-42"),
"request_id field not captured: {:?}",
ev.fields
);
}
#[test]
fn layer_increments_dropped_metric_on_full_channel() {
let (layer, rx, metrics) = make_layer(1, LogLevel::Info);
with_layer(layer, || {
tracing::event!(target: "sift_stream::tasks", tracing::Level::INFO, "first");
tracing::event!(target: "sift_stream::tasks", tracing::Level::INFO, "overflow");
});
assert_eq!(rx.len(), 1, "only the first event should be in the channel");
assert_eq!(
metrics.logs_dropped_channel_full.get(),
1,
"exactly one drop should be counted"
);
}
#[test]
fn forwarding_layer_forwards_sift_stream_events_to_base() {
let count = Arc::new(AtomicU32::new(0));
let base = tracing::Dispatch::new(EventCounter(count.clone()));
let layer = DispatchForwardingLayer(base);
let subscriber = tracing_subscriber::registry().with(layer);
tracing::dispatcher::with_default(&tracing::Dispatch::new(subscriber), || {
tracing::event!(
target: "sift_stream::tasks::ingestion",
tracing::Level::INFO,
"sift event"
);
tracing::event!(
target: "sift_stream::stream::builder",
tracing::Level::WARN,
"another sift event"
);
});
assert_eq!(
count.load(Ordering::Relaxed),
2,
"both sift_stream events must reach the base dispatch"
);
}
#[test]
fn forwarding_layer_does_not_forward_non_sift_stream_events() {
let count = Arc::new(AtomicU32::new(0));
let base = tracing::Dispatch::new(EventCounter(count.clone()));
let layer = DispatchForwardingLayer(base);
let subscriber = tracing_subscriber::registry().with(layer);
tracing::dispatcher::with_default(&tracing::Dispatch::new(subscriber), || {
tracing::event!(
target: "h2::proto::connection",
tracing::Level::TRACE,
"h2 spam"
);
tracing::event!(target: "tonic::transport", tracing::Level::DEBUG, "tonic noise");
tracing::event!(target: "hyper::client", tracing::Level::INFO, "hyper log");
});
assert_eq!(
count.load(Ordering::Relaxed),
0,
"third-party library events must not reach the base dispatch"
);
}
}