use std::{net::SocketAddr, time::Duration};
use crate::{
codec::{header::Rcode, message::Qtype, name::Name},
resolver::pipeline::Outcome,
};
#[derive(Debug, Clone)]
pub struct QueryEvent {
pub client: SocketAddr,
pub qname: Name,
pub qtype: Qtype,
pub outcome: Outcome,
pub rcode: Option<Rcode>,
pub upstream: Option<SocketAddr>,
pub latency: Duration,
}
impl QueryEvent {
pub fn new(client: SocketAddr, qname: Name, qtype: Qtype, outcome: Outcome) -> Self {
Self {
client,
qname,
qtype,
outcome,
rcode: None,
upstream: None,
latency: Duration::ZERO,
}
}
#[must_use]
pub fn with_rcode(mut self, rcode: Rcode) -> Self {
self.rcode = Some(rcode);
self
}
#[must_use]
pub fn with_upstream(mut self, upstream: SocketAddr) -> Self {
self.upstream = Some(upstream);
self
}
#[must_use]
pub fn with_latency(mut self, latency: Duration) -> Self {
self.latency = latency;
self
}
pub fn emit(&self) {
tracing::info!(
target: "sagittarius::query",
client = %self.client,
qname = %self.qname,
qtype = ?self.qtype,
outcome = %self.outcome,
rcode = ?self.rcode,
latency_ms = self.latency.as_millis() as u64,
"query processed",
);
}
}
#[derive(Clone)]
pub struct TelemetrySink {
pub live_log: std::sync::Arc<super::live_log::LiveLog>,
pub stats: std::sync::Arc<super::stats::Stats>,
}
impl TelemetrySink {
pub fn new(
live_log: std::sync::Arc<super::live_log::LiveLog>,
stats: std::sync::Arc<super::stats::Stats>,
) -> Self {
Self { live_log, stats }
}
pub fn record(&self, event: QueryEvent) {
event.emit();
self.stats.record(&event);
self.live_log.publish(event);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
codec::{header::Rcode, message::Qtype, name::Name},
resolver::pipeline::Outcome,
telemetry::{Telemetry, live_log::LiveLog, stats::Stats},
};
use std::{net::SocketAddr, sync::Arc, time::Duration};
fn make_event(outcome: Outcome) -> QueryEvent {
let client: SocketAddr = "203.0.113.5:1234".parse().unwrap();
let qname: Name = "example.com".parse().unwrap();
QueryEvent::new(client, qname, Qtype::A, outcome)
}
#[test]
fn emit_does_not_panic() {
let _ = Telemetry::init();
let event = make_event(Outcome::Forwarded)
.with_rcode(Rcode::NoError)
.with_upstream("9.9.9.9:53".parse().unwrap())
.with_latency(Duration::from_millis(12));
event.emit();
}
#[test]
fn with_rcode_sets_rcode() {
let event = make_event(Outcome::Forwarded).with_rcode(Rcode::NoError);
assert_eq!(event.rcode, Some(Rcode::NoError));
}
#[test]
fn with_upstream_sets_upstream() {
let upstream: SocketAddr = "1.1.1.1:53".parse().unwrap();
let event = make_event(Outcome::Forwarded).with_upstream(upstream);
assert_eq!(event.upstream, Some(upstream));
}
#[test]
fn with_latency_sets_latency() {
let event = make_event(Outcome::Forwarded).with_latency(Duration::from_millis(42));
assert_eq!(event.latency, Duration::from_millis(42));
}
#[test]
fn defaults_are_none_and_zero() {
let event = make_event(Outcome::Cached);
assert!(event.rcode.is_none());
assert!(event.upstream.is_none());
assert_eq!(event.latency, Duration::ZERO);
}
#[tokio::test]
async fn telemetry_sink_record_updates_all() {
let _ = Telemetry::init();
let live_log = Arc::new(LiveLog::default());
let stats = Arc::new(Stats::default());
let sink = TelemetrySink::new(Arc::clone(&live_log), Arc::clone(&stats));
let mut rx = live_log.subscribe();
let event = make_event(Outcome::Forwarded);
sink.record(event);
let snap = stats.snapshot(10);
assert_eq!(snap.total, 1);
assert_eq!(snap.forwarded, 1);
let received = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
.await
.expect("timeout waiting for broadcast")
.expect("broadcast channel closed");
assert_eq!(received.qname.to_string(), "example.com.");
}
}