Skip to main content

apimock_server/
trace.rs

1//! Live match-trace channel — RFC 006 (in-process) + RFC 009 (transport).
2//!
3//! # Architecture
4//!
5//! ```text
6//!  HTTP handler ──► TraceEmitter::emit()
7//!                         │
8//!              tokio::sync::broadcast (bounded, 1024)
9//!                         │
10//!           ┌─────────────┴──────────────┐
11//!      in-process                  TraceTransport::accept_loop
12//!      subscriber                  (UDS on Unix, TCP fallback)
13//!                                        │
14//!                                  up to 4 GUI connections
15//!                                  (newline-delimited JSON)
16//! ```
17//!
18//! # Transport variants
19//!
20//! | `TraceTransportConfig` | Platform | Notes |
21//! |---|---|---|
22//! | `Uds { path }` | Unix/macOS | Default when available |
23//! | `Tcp { addr }` | All | Portable fallback; `addr = "127.0.0.1:0"` assigns ephemeral port |
24//! | `Disabled` | All | No out-of-process forwarding (default) |
25//!
26//! # Back-pressure
27//!
28//! The broadcast channel is bounded by [`TRACE_CHANNEL_CAPACITY`]. When
29//! the channel is full, `emit` drops the event and increments an internal
30//! counter; the count is reported as `dropped_count` on the next event.
31//!
32//! Slow out-of-process subscribers receive a `RecvError::Lagged` from the
33//! broadcast channel; the gap is reported in the next JSON line via
34//! `dropped_count`.
35//!
36//! # Subscriber cap
37//!
38//! At most [`MAX_SUBSCRIBERS`] out-of-process connections are accepted.
39//! A fifth connection receives `{"error":"max_subscribers_reached"}` and
40//! is then closed.
41
42use std::sync::{
43    Arc,
44    atomic::{AtomicU32, AtomicUsize, Ordering},
45};
46use std::time::{Duration, SystemTime, UNIX_EPOCH};
47
48use serde::Serialize;
49use tokio::io::AsyncWriteExt;
50use tokio::sync::broadcast;
51
52/// Capacity of the broadcast channel (events).
53pub const TRACE_CHANNEL_CAPACITY: usize = 1_024;
54/// Maximum concurrent out-of-process subscriber connections.
55pub const MAX_SUBSCRIBERS: usize = 4;
56
57// ── Event schema ──────────────────────────────────────────────────────
58
59/// A single request/response trace event.
60#[derive(Clone, Debug, Serialize)]
61pub struct MatchTraceEvent {
62    /// Monotonically increasing event counter within this server run.
63    pub event_id: u64,
64    /// Schema version — bumped on breaking changes.
65    pub schema_version: u8,
66    /// Unix timestamp (milliseconds) when the request was received.
67    pub received_at_ms: u64,
68    /// Processing time in milliseconds.
69    pub duration_ms: u32,
70    /// Key fields from the incoming request.
71    pub request: RequestSummary,
72    /// What the server decided to do with the request.
73    pub outcome: Outcome,
74    /// Events dropped since the last successfully delivered event.
75    pub dropped_count: u32,
76}
77
78/// Key fields from the incoming HTTP request.
79#[derive(Clone, Debug, Serialize)]
80pub struct RequestSummary {
81    pub method: String,
82    pub url_path: String,
83    /// Selected request headers (display-only; body capture deferred).
84    pub headers: Vec<(String, String)>,
85}
86
87/// What the server decided to do with the request.
88#[derive(Clone, Debug, Serialize)]
89#[serde(tag = "type", rename_all = "snake_case")]
90pub enum Outcome {
91    Matched { rule_set_index: usize, rule_index: usize },
92    Fallback { file_path: String, status: u16 },
93    Miss    { status: u16 },
94    Error   { kind: String, message: String },
95}
96
97// ── Emitter ───────────────────────────────────────────────────────────
98
99/// Shared handle to the trace broadcast channel.
100///
101/// Clone freely — each clone refers to the same underlying channel.
102#[derive(Clone)]
103pub struct TraceEmitter {
104    sender:          broadcast::Sender<MatchTraceEvent>,
105    event_counter:   Arc<AtomicU32>,
106    dropped_counter: Arc<AtomicU32>,
107}
108
109impl TraceEmitter {
110    pub fn new() -> Self {
111        let (sender, _) = broadcast::channel(TRACE_CHANNEL_CAPACITY);
112        Self {
113            sender,
114            event_counter:   Arc::new(AtomicU32::new(0)),
115            dropped_counter: Arc::new(AtomicU32::new(0)),
116        }
117    }
118
119    /// Subscribe to the event stream (in-process).
120    pub fn subscribe(&self) -> broadcast::Receiver<MatchTraceEvent> {
121        self.sender.subscribe()
122    }
123
124    /// Emit one event.  If the channel is full, the event is dropped and
125    /// the internal drop counter incremented.
126    pub fn emit(
127        &self,
128        received_at_ms: u64,
129        duration_ms: u32,
130        request: RequestSummary,
131        outcome: Outcome,
132    ) {
133        let event_id    = self.event_counter.fetch_add(1, Ordering::Relaxed) as u64;
134        let dropped_count = self.dropped_counter.swap(0, Ordering::Relaxed);
135
136        let event = MatchTraceEvent {
137            event_id,
138            schema_version: 1,
139            received_at_ms,
140            duration_ms,
141            request,
142            outcome,
143            dropped_count,
144        };
145
146        if self.sender.send(event).is_err() {
147            self.dropped_counter.fetch_add(1, Ordering::Relaxed);
148        }
149    }
150
151    /// Returns `true` iff at least one receiver is currently active.
152    pub fn has_subscribers(&self) -> bool {
153        self.sender.receiver_count() > 0
154    }
155}
156
157impl Default for TraceEmitter { fn default() -> Self { Self::new() } }
158
159// ── Transport configuration ───────────────────────────────────────────
160
161/// Configuration for the out-of-process transport layer.
162#[derive(Clone, Debug, Default)]
163pub enum TraceTransportConfig {
164    /// Unix-domain socket at the given path (Unix/macOS only).
165    #[cfg(unix)]
166    Uds { path: String },
167    /// TCP loopback socket (portable fallback).
168    Tcp { addr: String },
169    /// No out-of-process forwarding.
170    #[default]
171    Disabled,
172}
173
174// ── Transport implementation ──────────────────────────────────────────
175
176pub struct TraceTransport;
177
178impl TraceTransport {
179    /// Start accepting out-of-process subscriber connections and forwarding
180    /// events as newline-delimited JSON.
181    ///
182    /// This future runs forever (until the process exits or the socket
183    /// errors fatally). Spawn it with `tokio::spawn`.
184    ///
185    /// # Subscriber cap
186    ///
187    /// At most [`MAX_SUBSCRIBERS`] connections are served simultaneously.
188    /// Connection #`MAX_SUBSCRIBERS + 1` receives a JSON error line and
189    /// is closed.
190    pub async fn accept_loop(config: TraceTransportConfig, emitter: TraceEmitter) {
191        match config {
192            #[cfg(unix)]
193            TraceTransportConfig::Uds { path } => {
194                Self::uds_accept_loop(path, emitter).await
195            }
196            TraceTransportConfig::Tcp { addr } => {
197                Self::tcp_accept_loop(addr, emitter).await
198            }
199            TraceTransportConfig::Disabled => {
200                // No-op — transport disabled; in-process channel still works.
201            }
202        }
203    }
204
205    // ── TCP accept loop ───────────────────────────────────────────────
206
207    async fn tcp_accept_loop(addr: String, emitter: TraceEmitter) {
208        let listener = match tokio::net::TcpListener::bind(&addr).await {
209            Ok(l) => {
210                let bound = l.local_addr().map(|a| a.to_string())
211                    .unwrap_or_else(|_| addr.clone());
212                log::info!("trace transport: TCP listening on {}", bound);
213                l
214            }
215            Err(e) => {
216                log::error!("trace transport: failed to bind TCP {}: {}", addr, e);
217                return;
218            }
219        };
220
221        let active = Arc::new(AtomicUsize::new(0));
222        loop {
223            match listener.accept().await {
224                Ok((stream, peer)) => {
225                    let count = active.fetch_add(1, Ordering::Relaxed) + 1;
226                    if count > MAX_SUBSCRIBERS {
227                        active.fetch_sub(1, Ordering::Relaxed);
228                        let emitter_clone = emitter.clone();
229                        let active_clone = active.clone();
230                        tokio::spawn(async move {
231                            let (_, mut writer) = tokio::io::split(stream);
232                            let _ = writer
233                                .write_all(b"{\"error\":\"max_subscribers_reached\"}\n")
234                                .await;
235                            drop(active_clone);
236                        });
237                        continue;
238                    }
239                    log::debug!("trace: TCP subscriber connected from {}", peer);
240                    let rx = emitter.subscribe();
241                    let active_clone = active.clone();
242                    tokio::spawn(async move {
243                        let (_, writer) = tokio::io::split(stream);
244                        Self::forward_events(writer, rx).await;
245                        active_clone.fetch_sub(1, Ordering::Relaxed);
246                        log::debug!("trace: TCP subscriber {} disconnected", peer);
247                    });
248                }
249                Err(e) => {
250                    log::error!("trace: TCP accept error: {}", e);
251                    tokio::time::sleep(Duration::from_millis(100)).await;
252                }
253            }
254        }
255    }
256
257    // ── UDS accept loop (Unix only) ───────────────────────────────────
258
259    #[cfg(unix)]
260    async fn uds_accept_loop(path: String, emitter: TraceEmitter) {
261        // Remove stale socket file from a previous run.
262        let _ = std::fs::remove_file(&path);
263
264        let listener = match tokio::net::UnixListener::bind(&path) {
265            Ok(l) => {
266                log::info!("trace transport: UDS listening at {}", path);
267                l
268            }
269            Err(e) => {
270                log::error!("trace transport: failed to bind UDS {}: {}", path, e);
271                return;
272            }
273        };
274
275        let active = Arc::new(AtomicUsize::new(0));
276        loop {
277            match listener.accept().await {
278                Ok((stream, _)) => {
279                    let count = active.fetch_add(1, Ordering::Relaxed) + 1;
280                    if count > MAX_SUBSCRIBERS {
281                        active.fetch_sub(1, Ordering::Relaxed);
282                        tokio::spawn(async move {
283                            let (_, mut writer) = tokio::io::split(stream);
284                            let _ = writer
285                                .write_all(b"{\"error\":\"max_subscribers_reached\"}\n")
286                                .await;
287                        });
288                        continue;
289                    }
290                    log::debug!("trace: UDS subscriber connected");
291                    let rx = emitter.subscribe();
292                    let active_clone = active.clone();
293                    tokio::spawn(async move {
294                        let (_, writer) = tokio::io::split(stream);
295                        Self::forward_events(writer, rx).await;
296                        active_clone.fetch_sub(1, Ordering::Relaxed);
297                        log::debug!("trace: UDS subscriber disconnected");
298                    });
299                }
300                Err(e) => {
301                    log::error!("trace: UDS accept error: {}", e);
302                    tokio::time::sleep(Duration::from_millis(100)).await;
303                }
304            }
305        }
306    }
307
308    // ── Event forwarder (shared by UDS and TCP) ───────────────────────
309
310    /// Read events from `rx` and write each as a JSON line to `writer`
311    /// until the connection closes or the channel is closed.
312    async fn forward_events<W>(mut writer: W, mut rx: broadcast::Receiver<MatchTraceEvent>)
313    where
314        W: tokio::io::AsyncWrite + Unpin,
315    {
316        loop {
317            let event = match rx.recv().await {
318                Ok(e) => e,
319                Err(broadcast::error::RecvError::Lagged(n)) => {
320                    // Receiver was too slow; `n` events were dropped.
321                    // The next event will carry `dropped_count` so the
322                    // subscriber can detect the gap. Continue.
323                    log::debug!("trace: subscriber lagged, {} events dropped", n);
324                    continue;
325                }
326                Err(broadcast::error::RecvError::Closed) => break,
327            };
328
329            let mut line = match serde_json::to_string(&event) {
330                Ok(s) => s,
331                Err(e) => {
332                    log::error!("trace: serialise error: {}", e);
333                    continue;
334                }
335            };
336            line.push('\n');
337
338            if writer.write_all(line.as_bytes()).await.is_err() {
339                break; // subscriber disconnected
340            }
341        }
342    }
343}
344
345// ── Timestamp helper ──────────────────────────────────────────────────
346
347/// Current Unix time in milliseconds.
348pub fn now_ms() -> u64 {
349    SystemTime::now()
350        .duration_since(UNIX_EPOCH)
351        .unwrap_or(Duration::ZERO)
352        .as_millis() as u64
353}
354
355// ── Tests ─────────────────────────────────────────────────────────────
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360
361    #[tokio::test]
362    async fn emit_received_by_subscriber() {
363        let emitter = TraceEmitter::new();
364        let mut rx = emitter.subscribe();
365
366        emitter.emit(
367            1_000_000, 5,
368            RequestSummary { method: "GET".into(), url_path: "/api/test".into(), headers: vec![] },
369            Outcome::Miss { status: 404 },
370        );
371
372        let event = rx.try_recv().expect("event in channel");
373        assert_eq!(event.event_id, 0);
374        assert_eq!(event.schema_version, 1);
375        assert_eq!(event.request.method, "GET");
376        assert_eq!(event.duration_ms, 5);
377        assert_eq!(event.dropped_count, 0);
378        assert!(matches!(event.outcome, Outcome::Miss { status: 404 }));
379    }
380
381    #[tokio::test]
382    async fn emit_no_subscriber_increments_dropped() {
383        let emitter = TraceEmitter::new();
384        emitter.emit(0, 0,
385            RequestSummary { method: "GET".into(), url_path: "/".into(), headers: vec![] },
386            Outcome::Miss { status: 404 },
387        );
388        let mut rx = emitter.subscribe();
389        emitter.emit(0, 0,
390            RequestSummary { method: "GET".into(), url_path: "/".into(), headers: vec![] },
391            Outcome::Miss { status: 200 },
392        );
393        let event = rx.try_recv().expect("second event visible");
394        assert_eq!(event.dropped_count, 1, "first event should be counted dropped");
395    }
396
397    #[test]
398    fn has_subscribers_reflects_state() {
399        let emitter = TraceEmitter::new();
400        assert!(!emitter.has_subscribers());
401        let _rx = emitter.subscribe();
402        assert!(emitter.has_subscribers());
403    }
404
405    #[tokio::test]
406    async fn outcome_serialises_correctly() {
407        let event = MatchTraceEvent {
408            event_id: 7, schema_version: 1, received_at_ms: 0, duration_ms: 0,
409            request: RequestSummary { method: "POST".into(), url_path: "/x".into(), headers: vec![] },
410            outcome: Outcome::Matched { rule_set_index: 0, rule_index: 2 },
411            dropped_count: 0,
412        };
413        let json = serde_json::to_string(&event).unwrap();
414        assert!(json.contains("\"type\":\"matched\""));
415        assert!(json.contains("\"rule_index\":2"));
416        assert!(json.contains("\"schema_version\":1"));
417    }
418
419    #[tokio::test]
420    async fn tcp_transport_delivers_events() {
421        let emitter = TraceEmitter::new();
422        let emitter_clone = emitter.clone();
423
424        // Bind on an ephemeral port.
425        let config = TraceTransportConfig::Tcp { addr: "127.0.0.1:0".to_owned() };
426
427        // We need to know the actual bound port before connecting.
428        // Bind the listener ourselves to capture the address, then hand
429        // the address to the transport accept loop via a channel.
430        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
431        let bound_addr = listener.local_addr().unwrap();
432
433        // Spawn a simplified accept loop that uses our pre-bound listener.
434        tokio::spawn(async move {
435            let (stream, _) = listener.accept().await.unwrap();
436            let rx = emitter_clone.subscribe();
437            let (_, writer) = tokio::io::split(stream);
438            TraceTransport::forward_events(writer, rx).await;
439        });
440
441        // Connect a subscriber.
442        let mut client = tokio::net::TcpStream::connect(bound_addr).await.unwrap();
443
444        // Give the subscriber task a moment to subscribe before emitting.
445        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
446
447        emitter.emit(
448            42, 3,
449            RequestSummary { method: "GET".into(), url_path: "/ping".into(), headers: vec![] },
450            Outcome::Miss { status: 404 },
451        );
452
453        // Read one JSON line from the TCP connection.
454        use tokio::io::AsyncBufReadExt;
455        let mut reader = tokio::io::BufReader::new(&mut client);
456        let mut line = String::new();
457        tokio::time::timeout(
458            std::time::Duration::from_secs(2),
459            reader.read_line(&mut line),
460        )
461        .await
462        .expect("timeout")
463        .expect("read ok");
464
465        let value: serde_json::Value = serde_json::from_str(line.trim()).expect("valid JSON");
466        assert_eq!(value["request"]["url_path"], "/ping");
467        assert_eq!(value["outcome"]["type"], "miss");
468        assert_eq!(value["schema_version"], 1);
469    }
470}