1use std::sync::{
43 Arc,
44 atomic::{AtomicU32, AtomicUsize, Ordering},
45};
46use std::time::{Duration, SystemTime, UNIX_EPOCH};
47
48use serde::Serialize;
49use tokio::io::AsyncWriteExt;
50use tokio::sync::broadcast;
51
52pub const TRACE_CHANNEL_CAPACITY: usize = 1_024;
54pub const MAX_SUBSCRIBERS: usize = 4;
56
57#[derive(Clone, Debug, Serialize)]
61pub struct MatchTraceEvent {
62 pub event_id: u64,
64 pub schema_version: u8,
66 pub received_at_ms: u64,
68 pub duration_ms: u32,
70 pub request: RequestSummary,
72 pub outcome: Outcome,
74 pub dropped_count: u32,
76}
77
78#[derive(Clone, Debug, Serialize)]
80pub struct RequestSummary {
81 pub method: String,
82 pub url_path: String,
83 pub headers: Vec<(String, String)>,
85}
86
87#[derive(Clone, Debug, Serialize)]
89#[serde(tag = "type", rename_all = "snake_case")]
90pub enum Outcome {
91 Matched { rule_set_index: usize, rule_index: usize },
92 Fallback { file_path: String, status: u16 },
93 Miss { status: u16 },
94 Error { kind: String, message: String },
95}
96
97#[derive(Clone)]
103pub struct TraceEmitter {
104 sender: broadcast::Sender<MatchTraceEvent>,
105 event_counter: Arc<AtomicU32>,
106 dropped_counter: Arc<AtomicU32>,
107}
108
109impl TraceEmitter {
110 pub fn new() -> Self {
111 let (sender, _) = broadcast::channel(TRACE_CHANNEL_CAPACITY);
112 Self {
113 sender,
114 event_counter: Arc::new(AtomicU32::new(0)),
115 dropped_counter: Arc::new(AtomicU32::new(0)),
116 }
117 }
118
119 pub fn subscribe(&self) -> broadcast::Receiver<MatchTraceEvent> {
121 self.sender.subscribe()
122 }
123
124 pub fn emit(
127 &self,
128 received_at_ms: u64,
129 duration_ms: u32,
130 request: RequestSummary,
131 outcome: Outcome,
132 ) {
133 let event_id = self.event_counter.fetch_add(1, Ordering::Relaxed) as u64;
134 let dropped_count = self.dropped_counter.swap(0, Ordering::Relaxed);
135
136 let event = MatchTraceEvent {
137 event_id,
138 schema_version: 1,
139 received_at_ms,
140 duration_ms,
141 request,
142 outcome,
143 dropped_count,
144 };
145
146 if self.sender.send(event).is_err() {
147 self.dropped_counter.fetch_add(1, Ordering::Relaxed);
148 }
149 }
150
151 pub fn has_subscribers(&self) -> bool {
153 self.sender.receiver_count() > 0
154 }
155}
156
157impl Default for TraceEmitter { fn default() -> Self { Self::new() } }
158
159#[derive(Clone, Debug, Default)]
163pub enum TraceTransportConfig {
164 #[cfg(unix)]
166 Uds { path: String },
167 Tcp { addr: String },
169 #[default]
171 Disabled,
172}
173
174pub struct TraceTransport;
177
178impl TraceTransport {
179 pub async fn accept_loop(config: TraceTransportConfig, emitter: TraceEmitter) {
191 match config {
192 #[cfg(unix)]
193 TraceTransportConfig::Uds { path } => {
194 Self::uds_accept_loop(path, emitter).await
195 }
196 TraceTransportConfig::Tcp { addr } => {
197 Self::tcp_accept_loop(addr, emitter).await
198 }
199 TraceTransportConfig::Disabled => {
200 }
202 }
203 }
204
205 async fn tcp_accept_loop(addr: String, emitter: TraceEmitter) {
208 let listener = match tokio::net::TcpListener::bind(&addr).await {
209 Ok(l) => {
210 let bound = l.local_addr().map(|a| a.to_string())
211 .unwrap_or_else(|_| addr.clone());
212 log::info!("trace transport: TCP listening on {}", bound);
213 l
214 }
215 Err(e) => {
216 log::error!("trace transport: failed to bind TCP {}: {}", addr, e);
217 return;
218 }
219 };
220
221 let active = Arc::new(AtomicUsize::new(0));
222 loop {
223 match listener.accept().await {
224 Ok((stream, peer)) => {
225 let count = active.fetch_add(1, Ordering::Relaxed) + 1;
226 if count > MAX_SUBSCRIBERS {
227 active.fetch_sub(1, Ordering::Relaxed);
228 let active_clone = active.clone();
229 tokio::spawn(async move {
230 let (_, mut writer) = tokio::io::split(stream);
231 let _ = writer
232 .write_all(b"{\"error\":\"max_subscribers_reached\"}\n")
233 .await;
234 drop(active_clone);
235 });
236 continue;
237 }
238 log::debug!("trace: TCP subscriber connected from {}", peer);
239 let rx = emitter.subscribe();
240 let active_clone = active.clone();
241 tokio::spawn(async move {
242 let (_, writer) = tokio::io::split(stream);
243 Self::forward_events(writer, rx).await;
244 active_clone.fetch_sub(1, Ordering::Relaxed);
245 log::debug!("trace: TCP subscriber {} disconnected", peer);
246 });
247 }
248 Err(e) => {
249 log::error!("trace: TCP accept error: {}", e);
250 tokio::time::sleep(Duration::from_millis(100)).await;
251 }
252 }
253 }
254 }
255
256 #[cfg(unix)]
259 async fn uds_accept_loop(path: String, emitter: TraceEmitter) {
260 let _ = std::fs::remove_file(&path);
262
263 let listener = match tokio::net::UnixListener::bind(&path) {
264 Ok(l) => {
265 log::info!("trace transport: UDS listening at {}", path);
266 l
267 }
268 Err(e) => {
269 log::error!("trace transport: failed to bind UDS {}: {}", path, e);
270 return;
271 }
272 };
273
274 let active = Arc::new(AtomicUsize::new(0));
275 loop {
276 match listener.accept().await {
277 Ok((stream, _)) => {
278 let count = active.fetch_add(1, Ordering::Relaxed) + 1;
279 if count > MAX_SUBSCRIBERS {
280 active.fetch_sub(1, Ordering::Relaxed);
281 tokio::spawn(async move {
282 let (_, mut writer) = tokio::io::split(stream);
283 let _ = writer
284 .write_all(b"{\"error\":\"max_subscribers_reached\"}\n")
285 .await;
286 });
287 continue;
288 }
289 log::debug!("trace: UDS subscriber connected");
290 let rx = emitter.subscribe();
291 let active_clone = active.clone();
292 tokio::spawn(async move {
293 let (_, writer) = tokio::io::split(stream);
294 Self::forward_events(writer, rx).await;
295 active_clone.fetch_sub(1, Ordering::Relaxed);
296 log::debug!("trace: UDS subscriber disconnected");
297 });
298 }
299 Err(e) => {
300 log::error!("trace: UDS accept error: {}", e);
301 tokio::time::sleep(Duration::from_millis(100)).await;
302 }
303 }
304 }
305 }
306
307 async fn forward_events<W>(mut writer: W, mut rx: broadcast::Receiver<MatchTraceEvent>)
312 where
313 W: tokio::io::AsyncWrite + Unpin,
314 {
315 loop {
316 let event = match rx.recv().await {
317 Ok(e) => e,
318 Err(broadcast::error::RecvError::Lagged(n)) => {
319 log::debug!("trace: subscriber lagged, {} events dropped", n);
323 continue;
324 }
325 Err(broadcast::error::RecvError::Closed) => break,
326 };
327
328 let mut line = match serde_json::to_string(&event) {
329 Ok(s) => s,
330 Err(e) => {
331 log::error!("trace: serialise error: {}", e);
332 continue;
333 }
334 };
335 line.push('\n');
336
337 if writer.write_all(line.as_bytes()).await.is_err() {
338 break; }
340 }
341 }
342}
343
344pub fn now_ms() -> u64 {
348 SystemTime::now()
349 .duration_since(UNIX_EPOCH)
350 .unwrap_or(Duration::ZERO)
351 .as_millis() as u64
352}
353
354#[cfg(test)]
357mod tests {
358 use super::*;
359
360 #[tokio::test]
361 async fn emit_received_by_subscriber() {
362 let emitter = TraceEmitter::new();
363 let mut rx = emitter.subscribe();
364
365 emitter.emit(
366 1_000_000, 5,
367 RequestSummary { method: "GET".into(), url_path: "/api/test".into(), headers: vec![] },
368 Outcome::Miss { status: 404 },
369 );
370
371 let event = rx.try_recv().expect("event in channel");
372 assert_eq!(event.event_id, 0);
373 assert_eq!(event.schema_version, 1);
374 assert_eq!(event.request.method, "GET");
375 assert_eq!(event.duration_ms, 5);
376 assert_eq!(event.dropped_count, 0);
377 assert!(matches!(event.outcome, Outcome::Miss { status: 404 }));
378 }
379
380 #[tokio::test]
381 async fn emit_no_subscriber_increments_dropped() {
382 let emitter = TraceEmitter::new();
383 emitter.emit(0, 0,
384 RequestSummary { method: "GET".into(), url_path: "/".into(), headers: vec![] },
385 Outcome::Miss { status: 404 },
386 );
387 let mut rx = emitter.subscribe();
388 emitter.emit(0, 0,
389 RequestSummary { method: "GET".into(), url_path: "/".into(), headers: vec![] },
390 Outcome::Miss { status: 200 },
391 );
392 let event = rx.try_recv().expect("second event visible");
393 assert_eq!(event.dropped_count, 1, "first event should be counted dropped");
394 }
395
396 #[test]
397 fn has_subscribers_reflects_state() {
398 let emitter = TraceEmitter::new();
399 assert!(!emitter.has_subscribers());
400 let _rx = emitter.subscribe();
401 assert!(emitter.has_subscribers());
402 }
403
404 #[tokio::test]
405 async fn outcome_serialises_correctly() {
406 let event = MatchTraceEvent {
407 event_id: 7, schema_version: 1, received_at_ms: 0, duration_ms: 0,
408 request: RequestSummary { method: "POST".into(), url_path: "/x".into(), headers: vec![] },
409 outcome: Outcome::Matched { rule_set_index: 0, rule_index: 2 },
410 dropped_count: 0,
411 };
412 let json = serde_json::to_string(&event).unwrap();
413 assert!(json.contains("\"type\":\"matched\""));
414 assert!(json.contains("\"rule_index\":2"));
415 assert!(json.contains("\"schema_version\":1"));
416 }
417
418 #[tokio::test]
419 async fn tcp_transport_delivers_events() {
420 let emitter = TraceEmitter::new();
421 let emitter_clone = emitter.clone();
422
423 let config = TraceTransportConfig::Tcp { addr: "127.0.0.1:0".to_owned() };
425
426 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
430 let bound_addr = listener.local_addr().unwrap();
431
432 tokio::spawn(async move {
434 let (stream, _) = listener.accept().await.unwrap();
435 let rx = emitter_clone.subscribe();
436 let (_, writer) = tokio::io::split(stream);
437 TraceTransport::forward_events(writer, rx).await;
438 });
439
440 let mut client = tokio::net::TcpStream::connect(bound_addr).await.unwrap();
442
443 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
445
446 emitter.emit(
447 42, 3,
448 RequestSummary { method: "GET".into(), url_path: "/ping".into(), headers: vec![] },
449 Outcome::Miss { status: 404 },
450 );
451
452 use tokio::io::AsyncBufReadExt;
454 let mut reader = tokio::io::BufReader::new(&mut client);
455 let mut line = String::new();
456 tokio::time::timeout(
457 std::time::Duration::from_secs(2),
458 reader.read_line(&mut line),
459 )
460 .await
461 .expect("timeout")
462 .expect("read ok");
463
464 let value: serde_json::Value = serde_json::from_str(line.trim()).expect("valid JSON");
465 assert_eq!(value["request"]["url_path"], "/ping");
466 assert_eq!(value["outcome"]["type"], "miss");
467 assert_eq!(value["schema_version"], 1);
468 }
469}