Skip to main content

apimock_server/
trace.rs

1//! Live match-trace channel — RFC 006 (in-process) + RFC 009 (transport).
2//!
3//! # Architecture
4//!
5//! ```text
6//!  HTTP handler ──► TraceEmitter::emit()
7//!                         │
8//!              tokio::sync::broadcast (bounded, 1024)
9//!                         │
10//!           ┌─────────────┴──────────────┐
11//!      in-process                  TraceTransport::accept_loop
12//!      subscriber                  (UDS on Unix, TCP fallback)
13//!                                        │
14//!                                  up to 4 GUI connections
15//!                                  (newline-delimited JSON)
16//! ```
17//!
18//! # Transport variants
19//!
20//! | `TraceTransportConfig` | Platform | Notes |
21//! |---|---|---|
22//! | `Uds { path }` | Unix/macOS | Default when available |
23//! | `Tcp { addr }` | All | Portable fallback; `addr = "127.0.0.1:0"` assigns ephemeral port |
24//! | `Disabled` | All | No out-of-process forwarding (default) |
25//!
26//! # Back-pressure
27//!
28//! The broadcast channel is bounded by [`TRACE_CHANNEL_CAPACITY`]. When
29//! the channel is full, `emit` drops the event and increments an internal
30//! counter; the count is reported as `dropped_count` on the next event.
31//!
32//! Slow out-of-process subscribers receive a `RecvError::Lagged` from the
33//! broadcast channel; the gap is reported in the next JSON line via
34//! `dropped_count`.
35//!
36//! # Subscriber cap
37//!
38//! At most [`MAX_SUBSCRIBERS`] out-of-process connections are accepted.
39//! A fifth connection receives `{"error":"max_subscribers_reached"}` and
40//! is then closed.
41
42use std::sync::{
43    Arc,
44    atomic::{AtomicU32, AtomicUsize, Ordering},
45};
46use std::time::{Duration, SystemTime, UNIX_EPOCH};
47
48use serde::Serialize;
49use tokio::io::AsyncWriteExt;
50use tokio::sync::broadcast;
51
52/// Capacity of the broadcast channel (events).
53pub const TRACE_CHANNEL_CAPACITY: usize = 1_024;
54/// Maximum concurrent out-of-process subscriber connections.
55pub const MAX_SUBSCRIBERS: usize = 4;
56
57// ── Event schema ──────────────────────────────────────────────────────
58
59/// A single request/response trace event.
60#[derive(Clone, Debug, Serialize)]
61pub struct MatchTraceEvent {
62    /// Monotonically increasing event counter within this server run.
63    pub event_id: u64,
64    /// Schema version — bumped on breaking changes.
65    pub schema_version: u8,
66    /// Unix timestamp (milliseconds) when the request was received.
67    pub received_at_ms: u64,
68    /// Processing time in milliseconds.
69    pub duration_ms: u32,
70    /// Key fields from the incoming request.
71    pub request: RequestSummary,
72    /// What the server decided to do with the request.
73    pub outcome: Outcome,
74    /// Events dropped since the last successfully delivered event.
75    pub dropped_count: u32,
76}
77
78/// Key fields from the incoming HTTP request.
79#[derive(Clone, Debug, Serialize)]
80pub struct RequestSummary {
81    pub method: String,
82    pub url_path: String,
83    /// Selected request headers (display-only; body capture deferred).
84    pub headers: Vec<(String, String)>,
85}
86
87/// What the server decided to do with the request.
88#[derive(Clone, Debug, Serialize)]
89#[serde(tag = "type", rename_all = "snake_case")]
90pub enum Outcome {
91    Matched { rule_set_index: usize, rule_index: usize },
92    Fallback { file_path: String, status: u16 },
93    Miss    { status: u16 },
94    Error   { kind: String, message: String },
95}
96
97// ── Emitter ───────────────────────────────────────────────────────────
98
99/// Shared handle to the trace broadcast channel.
100///
101/// Clone freely — each clone refers to the same underlying channel.
102#[derive(Clone)]
103pub struct TraceEmitter {
104    sender:          broadcast::Sender<MatchTraceEvent>,
105    event_counter:   Arc<AtomicU32>,
106    dropped_counter: Arc<AtomicU32>,
107}
108
109impl TraceEmitter {
110    pub fn new() -> Self {
111        let (sender, _) = broadcast::channel(TRACE_CHANNEL_CAPACITY);
112        Self {
113            sender,
114            event_counter:   Arc::new(AtomicU32::new(0)),
115            dropped_counter: Arc::new(AtomicU32::new(0)),
116        }
117    }
118
119    /// Subscribe to the event stream (in-process).
120    pub fn subscribe(&self) -> broadcast::Receiver<MatchTraceEvent> {
121        self.sender.subscribe()
122    }
123
124    /// Emit one event.  If the channel is full, the event is dropped and
125    /// the internal drop counter incremented.
126    pub fn emit(
127        &self,
128        received_at_ms: u64,
129        duration_ms: u32,
130        request: RequestSummary,
131        outcome: Outcome,
132    ) {
133        let event_id    = self.event_counter.fetch_add(1, Ordering::Relaxed) as u64;
134        let dropped_count = self.dropped_counter.swap(0, Ordering::Relaxed);
135
136        let event = MatchTraceEvent {
137            event_id,
138            schema_version: 1,
139            received_at_ms,
140            duration_ms,
141            request,
142            outcome,
143            dropped_count,
144        };
145
146        if self.sender.send(event).is_err() {
147            self.dropped_counter.fetch_add(1, Ordering::Relaxed);
148        }
149    }
150
151    /// Returns `true` iff at least one receiver is currently active.
152    pub fn has_subscribers(&self) -> bool {
153        self.sender.receiver_count() > 0
154    }
155}
156
157impl Default for TraceEmitter { fn default() -> Self { Self::new() } }
158
159// ── Transport configuration ───────────────────────────────────────────
160
161/// Configuration for the out-of-process transport layer.
162#[derive(Clone, Debug, Default)]
163pub enum TraceTransportConfig {
164    /// Unix-domain socket at the given path (Unix/macOS only).
165    #[cfg(unix)]
166    Uds { path: String },
167    /// TCP loopback socket (portable fallback).
168    Tcp { addr: String },
169    /// No out-of-process forwarding.
170    #[default]
171    Disabled,
172}
173
174// ── Transport implementation ──────────────────────────────────────────
175
176pub struct TraceTransport;
177
178impl TraceTransport {
179    /// Start accepting out-of-process subscriber connections and forwarding
180    /// events as newline-delimited JSON.
181    ///
182    /// This future runs forever (until the process exits or the socket
183    /// errors fatally). Spawn it with `tokio::spawn`.
184    ///
185    /// # Subscriber cap
186    ///
187    /// At most [`MAX_SUBSCRIBERS`] connections are served simultaneously.
188    /// Connection #`MAX_SUBSCRIBERS + 1` receives a JSON error line and
189    /// is closed.
190    pub async fn accept_loop(config: TraceTransportConfig, emitter: TraceEmitter) {
191        match config {
192            #[cfg(unix)]
193            TraceTransportConfig::Uds { path } => {
194                Self::uds_accept_loop(path, emitter).await
195            }
196            TraceTransportConfig::Tcp { addr } => {
197                Self::tcp_accept_loop(addr, emitter).await
198            }
199            TraceTransportConfig::Disabled => {
200                // No-op — transport disabled; in-process channel still works.
201            }
202        }
203    }
204
205    // ── TCP accept loop ───────────────────────────────────────────────
206
207    async fn tcp_accept_loop(addr: String, emitter: TraceEmitter) {
208        let listener = match tokio::net::TcpListener::bind(&addr).await {
209            Ok(l) => {
210                let bound = l.local_addr().map(|a| a.to_string())
211                    .unwrap_or_else(|_| addr.clone());
212                log::info!("trace transport: TCP listening on {}", bound);
213                l
214            }
215            Err(e) => {
216                log::error!("trace transport: failed to bind TCP {}: {}", addr, e);
217                return;
218            }
219        };
220
221        let active = Arc::new(AtomicUsize::new(0));
222        loop {
223            match listener.accept().await {
224                Ok((stream, peer)) => {
225                    let count = active.fetch_add(1, Ordering::Relaxed) + 1;
226                    if count > MAX_SUBSCRIBERS {
227                        active.fetch_sub(1, Ordering::Relaxed);
228                        let active_clone = active.clone();
229                        tokio::spawn(async move {
230                            let (_, mut writer) = tokio::io::split(stream);
231                            let _ = writer
232                                .write_all(b"{\"error\":\"max_subscribers_reached\"}\n")
233                                .await;
234                            drop(active_clone);
235                        });
236                        continue;
237                    }
238                    log::debug!("trace: TCP subscriber connected from {}", peer);
239                    let rx = emitter.subscribe();
240                    let active_clone = active.clone();
241                    tokio::spawn(async move {
242                        let (_, writer) = tokio::io::split(stream);
243                        Self::forward_events(writer, rx).await;
244                        active_clone.fetch_sub(1, Ordering::Relaxed);
245                        log::debug!("trace: TCP subscriber {} disconnected", peer);
246                    });
247                }
248                Err(e) => {
249                    log::error!("trace: TCP accept error: {}", e);
250                    tokio::time::sleep(Duration::from_millis(100)).await;
251                }
252            }
253        }
254    }
255
256    // ── UDS accept loop (Unix only) ───────────────────────────────────
257
258    #[cfg(unix)]
259    async fn uds_accept_loop(path: String, emitter: TraceEmitter) {
260        // Remove stale socket file from a previous run.
261        let _ = std::fs::remove_file(&path);
262
263        let listener = match tokio::net::UnixListener::bind(&path) {
264            Ok(l) => {
265                log::info!("trace transport: UDS listening at {}", path);
266                l
267            }
268            Err(e) => {
269                log::error!("trace transport: failed to bind UDS {}: {}", path, e);
270                return;
271            }
272        };
273
274        let active = Arc::new(AtomicUsize::new(0));
275        loop {
276            match listener.accept().await {
277                Ok((stream, _)) => {
278                    let count = active.fetch_add(1, Ordering::Relaxed) + 1;
279                    if count > MAX_SUBSCRIBERS {
280                        active.fetch_sub(1, Ordering::Relaxed);
281                        tokio::spawn(async move {
282                            let (_, mut writer) = tokio::io::split(stream);
283                            let _ = writer
284                                .write_all(b"{\"error\":\"max_subscribers_reached\"}\n")
285                                .await;
286                        });
287                        continue;
288                    }
289                    log::debug!("trace: UDS subscriber connected");
290                    let rx = emitter.subscribe();
291                    let active_clone = active.clone();
292                    tokio::spawn(async move {
293                        let (_, writer) = tokio::io::split(stream);
294                        Self::forward_events(writer, rx).await;
295                        active_clone.fetch_sub(1, Ordering::Relaxed);
296                        log::debug!("trace: UDS subscriber disconnected");
297                    });
298                }
299                Err(e) => {
300                    log::error!("trace: UDS accept error: {}", e);
301                    tokio::time::sleep(Duration::from_millis(100)).await;
302                }
303            }
304        }
305    }
306
307    // ── Event forwarder (shared by UDS and TCP) ───────────────────────
308
309    /// Read events from `rx` and write each as a JSON line to `writer`
310    /// until the connection closes or the channel is closed.
311    async fn forward_events<W>(mut writer: W, mut rx: broadcast::Receiver<MatchTraceEvent>)
312    where
313        W: tokio::io::AsyncWrite + Unpin,
314    {
315        loop {
316            let event = match rx.recv().await {
317                Ok(e) => e,
318                Err(broadcast::error::RecvError::Lagged(n)) => {
319                    // Receiver was too slow; `n` events were dropped.
320                    // The next event will carry `dropped_count` so the
321                    // subscriber can detect the gap. Continue.
322                    log::debug!("trace: subscriber lagged, {} events dropped", n);
323                    continue;
324                }
325                Err(broadcast::error::RecvError::Closed) => break,
326            };
327
328            let mut line = match serde_json::to_string(&event) {
329                Ok(s) => s,
330                Err(e) => {
331                    log::error!("trace: serialise error: {}", e);
332                    continue;
333                }
334            };
335            line.push('\n');
336
337            if writer.write_all(line.as_bytes()).await.is_err() {
338                break; // subscriber disconnected
339            }
340        }
341    }
342}
343
344// ── Timestamp helper ──────────────────────────────────────────────────
345
346/// Current Unix time in milliseconds.
347pub fn now_ms() -> u64 {
348    SystemTime::now()
349        .duration_since(UNIX_EPOCH)
350        .unwrap_or(Duration::ZERO)
351        .as_millis() as u64
352}
353
354// ── Tests ─────────────────────────────────────────────────────────────
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359
360    #[tokio::test]
361    async fn emit_received_by_subscriber() {
362        let emitter = TraceEmitter::new();
363        let mut rx = emitter.subscribe();
364
365        emitter.emit(
366            1_000_000, 5,
367            RequestSummary { method: "GET".into(), url_path: "/api/test".into(), headers: vec![] },
368            Outcome::Miss { status: 404 },
369        );
370
371        let event = rx.try_recv().expect("event in channel");
372        assert_eq!(event.event_id, 0);
373        assert_eq!(event.schema_version, 1);
374        assert_eq!(event.request.method, "GET");
375        assert_eq!(event.duration_ms, 5);
376        assert_eq!(event.dropped_count, 0);
377        assert!(matches!(event.outcome, Outcome::Miss { status: 404 }));
378    }
379
380    #[tokio::test]
381    async fn emit_no_subscriber_increments_dropped() {
382        let emitter = TraceEmitter::new();
383        emitter.emit(0, 0,
384            RequestSummary { method: "GET".into(), url_path: "/".into(), headers: vec![] },
385            Outcome::Miss { status: 404 },
386        );
387        let mut rx = emitter.subscribe();
388        emitter.emit(0, 0,
389            RequestSummary { method: "GET".into(), url_path: "/".into(), headers: vec![] },
390            Outcome::Miss { status: 200 },
391        );
392        let event = rx.try_recv().expect("second event visible");
393        assert_eq!(event.dropped_count, 1, "first event should be counted dropped");
394    }
395
396    #[test]
397    fn has_subscribers_reflects_state() {
398        let emitter = TraceEmitter::new();
399        assert!(!emitter.has_subscribers());
400        let _rx = emitter.subscribe();
401        assert!(emitter.has_subscribers());
402    }
403
404    #[tokio::test]
405    async fn outcome_serialises_correctly() {
406        let event = MatchTraceEvent {
407            event_id: 7, schema_version: 1, received_at_ms: 0, duration_ms: 0,
408            request: RequestSummary { method: "POST".into(), url_path: "/x".into(), headers: vec![] },
409            outcome: Outcome::Matched { rule_set_index: 0, rule_index: 2 },
410            dropped_count: 0,
411        };
412        let json = serde_json::to_string(&event).unwrap();
413        assert!(json.contains("\"type\":\"matched\""));
414        assert!(json.contains("\"rule_index\":2"));
415        assert!(json.contains("\"schema_version\":1"));
416    }
417
418    #[tokio::test]
419    async fn tcp_transport_delivers_events() {
420        let emitter = TraceEmitter::new();
421        let emitter_clone = emitter.clone();
422
423        // Bind on an ephemeral port.
424        let config = TraceTransportConfig::Tcp { addr: "127.0.0.1:0".to_owned() };
425
426        // We need to know the actual bound port before connecting.
427        // Bind the listener ourselves to capture the address, then hand
428        // the address to the transport accept loop via a channel.
429        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
430        let bound_addr = listener.local_addr().unwrap();
431
432        // Spawn a simplified accept loop that uses our pre-bound listener.
433        tokio::spawn(async move {
434            let (stream, _) = listener.accept().await.unwrap();
435            let rx = emitter_clone.subscribe();
436            let (_, writer) = tokio::io::split(stream);
437            TraceTransport::forward_events(writer, rx).await;
438        });
439
440        // Connect a subscriber.
441        let mut client = tokio::net::TcpStream::connect(bound_addr).await.unwrap();
442
443        // Give the subscriber task a moment to subscribe before emitting.
444        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
445
446        emitter.emit(
447            42, 3,
448            RequestSummary { method: "GET".into(), url_path: "/ping".into(), headers: vec![] },
449            Outcome::Miss { status: 404 },
450        );
451
452        // Read one JSON line from the TCP connection.
453        use tokio::io::AsyncBufReadExt;
454        let mut reader = tokio::io::BufReader::new(&mut client);
455        let mut line = String::new();
456        tokio::time::timeout(
457            std::time::Duration::from_secs(2),
458            reader.read_line(&mut line),
459        )
460        .await
461        .expect("timeout")
462        .expect("read ok");
463
464        let value: serde_json::Value = serde_json::from_str(line.trim()).expect("valid JSON");
465        assert_eq!(value["request"]["url_path"], "/ping");
466        assert_eq!(value["outcome"]["type"], "miss");
467        assert_eq!(value["schema_version"], 1);
468    }
469}