Skip to main content

apimock_server/
trace.rs

1//! Live match-trace channel — RFC 006.
2//!
3//! # What this module provides
4//!
5//! A structured event stream that lets a GUI subscriber observe every
6//! incoming HTTP request alongside the rule that matched (or a "miss"
7//! if no rule matched). Events are emitted in-process via a bounded
8//! `tokio::sync::broadcast` channel.
9//!
10//! # What is deliberately stubbed / deferred
11//!
12//! The in-process channel is fully implemented. The *transport layer*
13//! (Unix-domain socket, named pipe, or TCP loopback) that forwards
14//! events to an out-of-process GUI subscriber is stubbed — the
15//! `TraceTransport::accept_loop` method panics with an explicit
16//! "not yet implemented" message. Implementing it requires
17//! OS-specific IPC plumbing that is scoped to a future release.
18//!
19//! # Back-pressure
20//!
21//! The channel is bounded by `TRACE_CHANNEL_CAPACITY`. When the
22//! channel is full (subscriber too slow), `broadcast::Sender::send`
23//! returns `SendError`; the emitter drops the event and increments
24//! `dropped_count` so the next event can report how many were lost.
25
26use std::sync::{
27    Arc,
28    atomic::{AtomicU32, Ordering},
29};
30use std::time::{Duration, SystemTime, UNIX_EPOCH};
31
32use tokio::sync::broadcast;
33
34/// Capacity of the in-process broadcast channel.
35pub const TRACE_CHANNEL_CAPACITY: usize = 1_024;
36
37// ── Event types ───────────────────────────────────────────────────────
38
39/// A single request/response trace event.
40#[derive(Clone, Debug)]
41pub struct MatchTraceEvent {
42    /// Monotonically increasing event counter within this server run.
43    pub event_id: u64,
44    /// Unix timestamp (milliseconds) of when the request was received.
45    pub received_at_ms: u64,
46    /// Time taken to produce the response, in milliseconds.
47    pub duration_ms: u32,
48    /// Summary of the incoming request.
49    pub request: RequestSummary,
50    /// What the server did with the request.
51    pub outcome: Outcome,
52    /// Number of events dropped since the last successfully delivered
53    /// event (0 when no events were dropped).
54    pub dropped_count: u32,
55}
56
57/// Key fields from the incoming HTTP request.
58#[derive(Clone, Debug)]
59pub struct RequestSummary {
60    pub method: String,
61    pub url_path: String,
62    /// Selected request headers (not all — see RFC 006 §drawbacks on
63    /// body capture being deferred).
64    pub headers: Vec<(String, String)>,
65}
66
67/// What the server decided to do with the request.
68#[derive(Clone, Debug)]
69pub enum Outcome {
70    /// A rule in a rule set matched.
71    Matched {
72        rule_set_index: usize,
73        rule_index: usize,
74    },
75    /// No rule matched; the dynamic-route fallback served the request.
76    Fallback { file_path: String, status: u16 },
77    /// No rule matched and the fallback produced no response.
78    Miss { status: u16 },
79    /// An error occurred while processing the request.
80    Error { kind: String, message: String },
81}
82
83// ── Channel handle ────────────────────────────────────────────────────
84
85/// Shared handle to the trace broadcast channel.
86///
87/// Clone freely — each clone refers to the same underlying channel.
88#[derive(Clone)]
89pub struct TraceEmitter {
90    sender: broadcast::Sender<MatchTraceEvent>,
91    event_counter: Arc<AtomicU32>,
92    dropped_counter: Arc<AtomicU32>,
93}
94
95impl TraceEmitter {
96    /// Create a new emitter with a fresh broadcast channel.
97    pub fn new() -> Self {
98        let (sender, _) = broadcast::channel(TRACE_CHANNEL_CAPACITY);
99        Self {
100            sender,
101            event_counter: Arc::new(AtomicU32::new(0)),
102            dropped_counter: Arc::new(AtomicU32::new(0)),
103        }
104    }
105
106    /// Subscribe to the event stream. Returns a receiver that will
107    /// receive future events. Events sent before this call are not
108    /// replayed (no backfill — deferred per RFC 006 unresolved
109    /// questions §4).
110    pub fn subscribe(&self) -> broadcast::Receiver<MatchTraceEvent> {
111        self.sender.subscribe()
112    }
113
114    /// Emit one event. If the channel is full, the event is dropped
115    /// and the internal drop counter incremented.
116    pub fn emit(
117        &self,
118        received_at_ms: u64,
119        duration_ms: u32,
120        request: RequestSummary,
121        outcome: Outcome,
122    ) {
123        let event_id = self.event_counter.fetch_add(1, Ordering::Relaxed) as u64;
124        let dropped_count = self.dropped_counter.swap(0, Ordering::Relaxed);
125
126        let event = MatchTraceEvent {
127            event_id,
128            received_at_ms,
129            duration_ms,
130            request,
131            outcome,
132            dropped_count,
133        };
134
135        if self.sender.send(event).is_err() {
136            // No active receivers OR channel full — increment dropped.
137            self.dropped_counter.fetch_add(1, Ordering::Relaxed);
138        }
139    }
140
141    /// Return `true` iff at least one subscriber is currently active.
142    pub fn has_subscribers(&self) -> bool {
143        self.sender.receiver_count() > 0
144    }
145}
146
147impl Default for TraceEmitter {
148    fn default() -> Self {
149        Self::new()
150    }
151}
152
153// ── Transport stub (RFC 006 §deferred) ───────────────────────────────
154
155/// Configuration for the out-of-process trace transport.
156///
157/// Currently only UDS is listed; TCP and named-pipe variants are
158/// reserved for the future implementation.
159#[derive(Clone, Debug, Default)]
160pub enum TraceTransportConfig {
161    /// Unix-domain socket at the given path.
162    Uds { path: String },
163    /// TCP loopback on the given address.
164    Tcp { addr: String },
165    /// Disabled — no out-of-process forwarding.
166    #[default]
167    Disabled,
168}
169
170/// Stub transport layer.
171///
172/// The `accept_loop` method is explicitly unimplemented — it is
173/// intentionally left for a future session, as documented in the RFC 006
174/// implementation notes and the session handoff document.
175pub struct TraceTransport;
176
177impl TraceTransport {
178    /// Start accepting out-of-process subscribers and forwarding events.
179    ///
180    /// # Panics
181    ///
182    /// Always panics with "not yet implemented — RFC 006 socket I/O
183    /// transport is a stub, deferred to a future release."
184    pub async fn accept_loop(
185        _config: TraceTransportConfig,
186        _emitter: TraceEmitter,
187    ) -> ! {
188        unimplemented!(
189            "RFC 006 socket I/O transport is a stub, deferred to a future release. \
190             The in-process broadcast channel is fully functional."
191        )
192    }
193}
194
195// ── Timestamp helper ──────────────────────────────────────────────────
196
197/// Current time as Unix milliseconds.
198pub fn now_ms() -> u64 {
199    SystemTime::now()
200        .duration_since(UNIX_EPOCH)
201        .unwrap_or(Duration::ZERO)
202        .as_millis() as u64
203}
204
205// ── Tests ─────────────────────────────────────────────────────────────
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210
211    #[tokio::test]
212    async fn emit_received_by_subscriber() {
213        let emitter = TraceEmitter::new();
214        let mut rx = emitter.subscribe();
215
216        emitter.emit(
217            1_000_000,
218            5,
219            RequestSummary {
220                method: "GET".into(),
221                url_path: "/api/test".into(),
222                headers: vec![],
223            },
224            Outcome::Miss { status: 404 },
225        );
226
227        let event = rx.try_recv().expect("event should be in channel");
228        assert_eq!(event.event_id, 0);
229        assert_eq!(event.request.method, "GET");
230        assert_eq!(event.request.url_path, "/api/test");
231        assert_eq!(event.duration_ms, 5);
232        assert_eq!(event.dropped_count, 0);
233        assert!(matches!(event.outcome, Outcome::Miss { status: 404 }));
234    }
235
236    #[tokio::test]
237    async fn emit_with_no_subscriber_increments_dropped() {
238        let emitter = TraceEmitter::new();
239        // No subscriber — send should fail and increment dropped.
240        emitter.emit(
241            0,
242            0,
243            RequestSummary { method: "GET".into(), url_path: "/".into(), headers: vec![] },
244            Outcome::Miss { status: 404 },
245        );
246
247        // Now subscribe and emit again; dropped_count on the second
248        // event should reflect the one we lost.
249        let mut rx = emitter.subscribe();
250        emitter.emit(
251            0,
252            0,
253            RequestSummary { method: "GET".into(), url_path: "/".into(), headers: vec![] },
254            Outcome::Miss { status: 404 },
255        );
256
257        let event = rx.try_recv().expect("second event visible to new subscriber");
258        assert_eq!(event.dropped_count, 1, "first event should be counted as dropped");
259    }
260
261    #[test]
262    fn has_subscribers_reflects_state() {
263        let emitter = TraceEmitter::new();
264        assert!(!emitter.has_subscribers());
265        let _rx = emitter.subscribe();
266        assert!(emitter.has_subscribers());
267    }
268}