use std::sync::{
Arc,
atomic::{AtomicU32, Ordering},
};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::broadcast;
pub const TRACE_CHANNEL_CAPACITY: usize = 1_024;
#[derive(Clone, Debug)]
pub struct MatchTraceEvent {
pub event_id: u64,
pub received_at_ms: u64,
pub duration_ms: u32,
pub request: RequestSummary,
pub outcome: Outcome,
pub dropped_count: u32,
}
#[derive(Clone, Debug)]
pub struct RequestSummary {
pub method: String,
pub url_path: String,
pub headers: Vec<(String, String)>,
}
#[derive(Clone, Debug)]
pub enum Outcome {
Matched {
rule_set_index: usize,
rule_index: usize,
},
Fallback { file_path: String, status: u16 },
Miss { status: u16 },
Error { kind: String, message: String },
}
#[derive(Clone)]
pub struct TraceEmitter {
sender: broadcast::Sender<MatchTraceEvent>,
event_counter: Arc<AtomicU32>,
dropped_counter: Arc<AtomicU32>,
}
impl TraceEmitter {
pub fn new() -> Self {
let (sender, _) = broadcast::channel(TRACE_CHANNEL_CAPACITY);
Self {
sender,
event_counter: Arc::new(AtomicU32::new(0)),
dropped_counter: Arc::new(AtomicU32::new(0)),
}
}
pub fn subscribe(&self) -> broadcast::Receiver<MatchTraceEvent> {
self.sender.subscribe()
}
pub fn emit(
&self,
received_at_ms: u64,
duration_ms: u32,
request: RequestSummary,
outcome: Outcome,
) {
let event_id = self.event_counter.fetch_add(1, Ordering::Relaxed) as u64;
let dropped_count = self.dropped_counter.swap(0, Ordering::Relaxed);
let event = MatchTraceEvent {
event_id,
received_at_ms,
duration_ms,
request,
outcome,
dropped_count,
};
if self.sender.send(event).is_err() {
self.dropped_counter.fetch_add(1, Ordering::Relaxed);
}
}
pub fn has_subscribers(&self) -> bool {
self.sender.receiver_count() > 0
}
}
impl Default for TraceEmitter {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, Default)]
pub enum TraceTransportConfig {
Uds { path: String },
Tcp { addr: String },
#[default]
Disabled,
}
pub struct TraceTransport;
impl TraceTransport {
pub async fn accept_loop(
_config: TraceTransportConfig,
_emitter: TraceEmitter,
) -> ! {
unimplemented!(
"RFC 006 socket I/O transport is a stub, deferred to a future release. \
The in-process broadcast channel is fully functional."
)
}
}
pub fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as u64
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn emit_received_by_subscriber() {
let emitter = TraceEmitter::new();
let mut rx = emitter.subscribe();
emitter.emit(
1_000_000,
5,
RequestSummary {
method: "GET".into(),
url_path: "/api/test".into(),
headers: vec![],
},
Outcome::Miss { status: 404 },
);
let event = rx.try_recv().expect("event should be in channel");
assert_eq!(event.event_id, 0);
assert_eq!(event.request.method, "GET");
assert_eq!(event.request.url_path, "/api/test");
assert_eq!(event.duration_ms, 5);
assert_eq!(event.dropped_count, 0);
assert!(matches!(event.outcome, Outcome::Miss { status: 404 }));
}
#[tokio::test]
async fn emit_with_no_subscriber_increments_dropped() {
let emitter = TraceEmitter::new();
emitter.emit(
0,
0,
RequestSummary { method: "GET".into(), url_path: "/".into(), headers: vec![] },
Outcome::Miss { status: 404 },
);
let mut rx = emitter.subscribe();
emitter.emit(
0,
0,
RequestSummary { method: "GET".into(), url_path: "/".into(), headers: vec![] },
Outcome::Miss { status: 404 },
);
let event = rx.try_recv().expect("second event visible to new subscriber");
assert_eq!(event.dropped_count, 1, "first event should be counted as dropped");
}
#[test]
fn has_subscribers_reflects_state() {
let emitter = TraceEmitter::new();
assert!(!emitter.has_subscribers());
let _rx = emitter.subscribe();
assert!(emitter.has_subscribers());
}
}