use std::{net::SocketAddr, time::Duration};
use crate::{
codec::{header::Rcode, message::Qtype, name::Name},
resolver::pipeline::Outcome,
time::Clock,
};
#[derive(Debug, Clone)]
pub struct QueryEvent {
pub ts: i64,
pub client: SocketAddr,
pub qname: Name,
pub qtype: Qtype,
pub outcome: Outcome,
pub rcode: Option<Rcode>,
pub upstream: Option<SocketAddr>,
pub latency: Duration,
}
impl QueryEvent {
pub fn new(client: SocketAddr, qname: Name, qtype: Qtype, outcome: Outcome) -> Self {
Self {
ts: Clock::now_millis(),
client,
qname,
qtype,
outcome,
rcode: None,
upstream: None,
latency: Duration::ZERO,
}
}
#[must_use]
pub fn with_ts(mut self, ts: i64) -> Self {
self.ts = ts;
self
}
#[must_use]
pub fn with_rcode(mut self, rcode: Rcode) -> Self {
self.rcode = Some(rcode);
self
}
#[must_use]
pub fn with_upstream(mut self, upstream: SocketAddr) -> Self {
self.upstream = Some(upstream);
self
}
#[must_use]
pub fn with_latency(mut self, latency: Duration) -> Self {
self.latency = latency;
self
}
pub fn emit(&self) {
tracing::info!(
target: "sagittarius::query",
client = %self.client,
qname = %self.qname,
qtype = %self.qtype,
outcome = %self.outcome,
rcode = ?self.rcode,
upstream = ?self.upstream,
latency_ms = self.latency.as_millis() as u64,
"query processed",
);
}
}
pub const QUERY_LOG_CHANNEL_CAPACITY: usize = 4096;
#[derive(Clone)]
struct QueryLogChannel {
sender: tokio::sync::mpsc::Sender<QueryEvent>,
state: std::sync::Arc<crate::resolver::state::ResolverState>,
dropped: std::sync::Arc<std::sync::atomic::AtomicU64>,
}
#[derive(Clone)]
pub struct TelemetrySink {
pub live_log: std::sync::Arc<super::live_log::LiveLog>,
pub stats: std::sync::Arc<super::stats::Stats>,
query_log: Option<QueryLogChannel>,
}
impl TelemetrySink {
pub fn new(
live_log: std::sync::Arc<super::live_log::LiveLog>,
stats: std::sync::Arc<super::stats::Stats>,
) -> Self {
Self {
live_log,
stats,
query_log: None,
}
}
#[must_use]
pub fn with_query_log(
mut self,
sender: tokio::sync::mpsc::Sender<QueryEvent>,
state: std::sync::Arc<crate::resolver::state::ResolverState>,
) -> Self {
self.query_log = Some(QueryLogChannel {
sender,
state,
dropped: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
});
self
}
#[must_use]
pub fn dropped_query_events(&self) -> u64 {
self.query_log
.as_ref()
.map(|ch| ch.dropped.load(std::sync::atomic::Ordering::Relaxed))
.unwrap_or(0)
}
pub fn record(&self, event: QueryEvent) {
event.emit();
self.stats.record(&event);
self.enqueue_for_persistence(&event);
self.live_log.publish(event);
}
fn enqueue_for_persistence(&self, event: &QueryEvent) {
use std::sync::atomic::Ordering;
use tokio::sync::mpsc::error::TrySendError;
let Some(channel) = &self.query_log else {
return;
};
if !channel.state.settings().query_log_enabled {
return;
}
match channel.sender.try_send(event.clone()) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
let dropped = channel.dropped.fetch_add(1, Ordering::Relaxed) + 1;
if dropped == 1 || dropped % 1000 == 0 {
tracing::warn!(
dropped,
"query-log channel full; dropping events (writer task is behind)"
);
}
}
Err(TrySendError::Closed(_)) => {
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
codec::{header::Rcode, message::Qtype, name::Name},
resolver::pipeline::Outcome,
telemetry::{Telemetry, live_log::LiveLog, stats::Stats},
};
use std::{net::SocketAddr, sync::Arc, time::Duration};
fn make_event(outcome: Outcome) -> QueryEvent {
let client: SocketAddr = "203.0.113.5:1234".parse().unwrap();
let qname: Name = "example.com".parse().unwrap();
QueryEvent::new(client, qname, Qtype::A, outcome)
}
#[test]
fn emit_does_not_panic() {
let _ = Telemetry::init();
let event = make_event(Outcome::Forwarded)
.with_rcode(Rcode::NoError)
.with_upstream("9.9.9.9:53".parse().unwrap())
.with_latency(Duration::from_millis(12));
event.emit();
}
#[test]
fn with_rcode_sets_rcode() {
let event = make_event(Outcome::Forwarded).with_rcode(Rcode::NoError);
assert_eq!(event.rcode, Some(Rcode::NoError));
}
#[test]
fn with_upstream_sets_upstream() {
let upstream: SocketAddr = "1.1.1.1:53".parse().unwrap();
let event = make_event(Outcome::Forwarded).with_upstream(upstream);
assert_eq!(event.upstream, Some(upstream));
}
#[test]
fn with_latency_sets_latency() {
let event = make_event(Outcome::Forwarded).with_latency(Duration::from_millis(42));
assert_eq!(event.latency, Duration::from_millis(42));
}
#[test]
fn defaults_are_none_and_zero() {
let event = make_event(Outcome::Cached);
assert!(event.rcode.is_none());
assert!(event.upstream.is_none());
assert_eq!(event.latency, Duration::ZERO);
}
#[test]
fn new_carries_nonzero_ts() {
let event = make_event(Outcome::Cached);
assert!(
event.ts > 1_700_000_000_000,
"ts must default to a real epoch-ms timestamp: {}",
event.ts
);
}
#[test]
fn with_ts_overrides_ts() {
let event = make_event(Outcome::Cached).with_ts(42);
assert_eq!(event.ts, 42);
}
#[tokio::test]
async fn telemetry_sink_record_updates_all() {
let _ = Telemetry::init();
let live_log = Arc::new(LiveLog::default());
let stats = Arc::new(Stats::default());
let sink = TelemetrySink::new(Arc::clone(&live_log), Arc::clone(&stats));
let mut rx = live_log.subscribe();
let event = make_event(Outcome::Forwarded);
sink.record(event);
let snap = stats.snapshot(10);
assert_eq!(snap.total, 1);
assert_eq!(snap.forwarded, 1);
let received = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
.await
.expect("timeout waiting for broadcast")
.expect("broadcast channel closed");
assert_eq!(received.qname.to_string(), "example.com.");
}
async fn hydrate_state() -> (
tempfile::TempDir,
Arc<crate::resolver::state::ResolverState>,
) {
let (dir, db) = crate::test_support::temp_db().await;
let state = crate::resolver::state::ResolverState::hydrate(&db)
.await
.expect("hydrate");
(dir, state)
}
#[tokio::test]
async fn record_enqueues_when_enabled() {
let (_dir, state) = hydrate_state().await;
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
let sink = TelemetrySink::new(Arc::new(LiveLog::default()), Arc::new(Stats::default()))
.with_query_log(tx, Arc::clone(&state));
sink.record(make_event(Outcome::Forwarded));
let queued = rx.try_recv().expect("event must be enqueued when enabled");
assert_eq!(queued.qname.to_string(), "example.com.");
assert_eq!(sink.dropped_query_events(), 0);
}
#[tokio::test]
async fn record_full_channel_drops_and_counts_without_blocking() {
let (_dir, state) = hydrate_state().await;
let (tx, _rx) = tokio::sync::mpsc::channel(1);
let sink = TelemetrySink::new(Arc::new(LiveLog::default()), Arc::new(Stats::default()))
.with_query_log(tx, Arc::clone(&state));
for _ in 0..4 {
sink.record(make_event(Outcome::Forwarded));
}
assert_eq!(
sink.dropped_query_events(),
3,
"1 enqueued, 3 dropped on the full channel"
);
assert_eq!(sink.stats.snapshot(10).total, 4);
}
#[tokio::test]
async fn record_skips_enqueue_when_disabled_but_stats_and_broadcast_fire() {
use crate::resolver::state::RuntimeSettings;
let (_dir, state) = hydrate_state().await;
state.store_settings(RuntimeSettings {
query_log_enabled: false,
..(*state.settings_full()).clone()
});
let live_log = Arc::new(LiveLog::default());
let stats = Arc::new(Stats::default());
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
let sink = TelemetrySink::new(Arc::clone(&live_log), Arc::clone(&stats))
.with_query_log(tx, Arc::clone(&state));
let mut broadcast = live_log.subscribe();
sink.record(make_event(Outcome::Cached));
assert!(
matches!(
rx.try_recv(),
Err(tokio::sync::mpsc::error::TryRecvError::Empty)
),
"disabled logging must not enqueue"
);
assert_eq!(stats.snapshot(10).total, 1);
let got = tokio::time::timeout(std::time::Duration::from_secs(1), broadcast.recv())
.await
.expect("broadcast timeout")
.expect("broadcast closed");
assert_eq!(got.qname.to_string(), "example.com.");
}
}