use std::collections::VecDeque;
use std::sync::Mutex;
use tokio::sync::broadcast;
use super::event::QueryEvent;
pub const DEFAULT_CAPACITY: usize = 1000;
pub struct LiveLog {
ring: Mutex<VecDeque<QueryEvent>>,
capacity: usize,
tx: broadcast::Sender<QueryEvent>,
}
impl LiveLog {
pub fn new(capacity: usize) -> Self {
let (tx, _) = broadcast::channel(capacity);
Self {
ring: Mutex::new(VecDeque::with_capacity(capacity)),
capacity,
tx,
}
}
pub fn publish(&self, event: QueryEvent) {
{
let mut ring = self.ring.lock().expect("ring mutex poisoned");
if ring.len() == self.capacity {
ring.pop_front();
}
ring.push_back(event.clone());
}
let _ = self.tx.send(event);
}
pub fn subscribe(&self) -> broadcast::Receiver<QueryEvent> {
self.tx.subscribe()
}
pub fn recent(&self) -> Vec<QueryEvent> {
self.ring
.lock()
.expect("ring mutex poisoned")
.iter()
.cloned()
.collect()
}
}
impl Default for LiveLog {
fn default() -> Self {
Self::new(DEFAULT_CAPACITY)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
codec::{message::Qtype, name::Name},
resolver::pipeline::Outcome,
telemetry::event::QueryEvent,
};
use std::net::SocketAddr;
fn make_event(label: &str, outcome: Outcome) -> QueryEvent {
let client: SocketAddr = "203.0.113.5:1234".parse().unwrap();
let qname: Name = format!("{label}.example.com").parse().unwrap();
QueryEvent::new(client, qname, Qtype::A, outcome)
}
#[tokio::test]
async fn publish_delivers_to_subscriber() {
let log = LiveLog::new(10);
let mut rx = log.subscribe();
let event = make_event("a", Outcome::Forwarded);
log.publish(event.clone());
let received = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
.await
.expect("timeout waiting for broadcast")
.expect("broadcast channel closed");
assert_eq!(received.qname.to_string(), event.qname.to_string());
}
#[test]
fn ring_bounded_evicts_oldest() {
let log = LiveLog::new(3);
for i in 0..5u8 {
log.publish(make_event(&format!("host{i}"), Outcome::Forwarded));
}
let recent = log.recent();
assert_eq!(recent.len(), 3);
let names: Vec<String> = recent.iter().map(|e| e.qname.to_string()).collect();
assert_eq!(names[0], "host2.example.com.");
assert_eq!(names[1], "host3.example.com.");
assert_eq!(names[2], "host4.example.com.");
}
#[tokio::test]
async fn late_subscriber_seeding_contract() {
let log = LiveLog::new(10);
for i in 0..3u8 {
log.publish(make_event(&format!("pre{i}"), Outcome::Cached));
}
let history = log.recent();
assert_eq!(history.len(), 3, "recent() must return pre-publish events");
let mut rx = log.subscribe();
let post_event = make_event("post0", Outcome::Forwarded);
log.publish(post_event.clone());
let received = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
.await
.expect("timeout waiting for broadcast")
.expect("broadcast channel closed");
assert_eq!(
received.qname.to_string(),
post_event.qname.to_string(),
"subscriber should only get the post-subscribe event"
);
let nothing = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
assert!(
nothing.is_err(),
"broadcast channel should be empty after consuming the one post-subscribe event"
);
}
}