apimock_server/trace.rs
1//! Live match-trace channel — RFC 006.
2//!
3//! # What this module provides
4//!
5//! A structured event stream that lets a GUI subscriber observe every
6//! incoming HTTP request alongside the rule that matched (or a "miss"
7//! if no rule matched). Events are emitted in-process via a bounded
8//! `tokio::sync::broadcast` channel.
9//!
10//! # What is deliberately stubbed / deferred
11//!
12//! The in-process channel is fully implemented. The *transport layer*
13//! (Unix-domain socket, named pipe, or TCP loopback) that forwards
14//! events to an out-of-process GUI subscriber is stubbed — the
15//! `TraceTransport::accept_loop` method panics with an explicit
16//! "not yet implemented" message. Implementing it requires
17//! OS-specific IPC plumbing that is scoped to a future release.
18//!
19//! # Back-pressure
20//!
21//! The channel is bounded by `TRACE_CHANNEL_CAPACITY`. When the
22//! channel is full (subscriber too slow), `broadcast::Sender::send`
23//! returns `SendError`; the emitter drops the event and increments
24//! `dropped_count` so the next event can report how many were lost.
25
26use std::sync::{
27 Arc,
28 atomic::{AtomicU32, Ordering},
29};
30use std::time::{Duration, SystemTime, UNIX_EPOCH};
31
32use tokio::sync::broadcast;
33
34/// Capacity of the in-process broadcast channel.
35pub const TRACE_CHANNEL_CAPACITY: usize = 1_024;
36
37// ── Event types ───────────────────────────────────────────────────────
38
39/// A single request/response trace event.
40#[derive(Clone, Debug)]
41pub struct MatchTraceEvent {
42 /// Monotonically increasing event counter within this server run.
43 pub event_id: u64,
44 /// Unix timestamp (milliseconds) of when the request was received.
45 pub received_at_ms: u64,
46 /// Time taken to produce the response, in milliseconds.
47 pub duration_ms: u32,
48 /// Summary of the incoming request.
49 pub request: RequestSummary,
50 /// What the server did with the request.
51 pub outcome: Outcome,
52 /// Number of events dropped since the last successfully delivered
53 /// event (0 when no events were dropped).
54 pub dropped_count: u32,
55}
56
57/// Key fields from the incoming HTTP request.
58#[derive(Clone, Debug)]
59pub struct RequestSummary {
60 pub method: String,
61 pub url_path: String,
62 /// Selected request headers (not all — see RFC 006 §drawbacks on
63 /// body capture being deferred).
64 pub headers: Vec<(String, String)>,
65}
66
67/// What the server decided to do with the request.
68#[derive(Clone, Debug)]
69pub enum Outcome {
70 /// A rule in a rule set matched.
71 Matched {
72 rule_set_index: usize,
73 rule_index: usize,
74 },
75 /// No rule matched; the dynamic-route fallback served the request.
76 Fallback { file_path: String, status: u16 },
77 /// No rule matched and the fallback produced no response.
78 Miss { status: u16 },
79 /// An error occurred while processing the request.
80 Error { kind: String, message: String },
81}
82
83// ── Channel handle ────────────────────────────────────────────────────
84
85/// Shared handle to the trace broadcast channel.
86///
87/// Clone freely — each clone refers to the same underlying channel.
88#[derive(Clone)]
89pub struct TraceEmitter {
90 sender: broadcast::Sender<MatchTraceEvent>,
91 event_counter: Arc<AtomicU32>,
92 dropped_counter: Arc<AtomicU32>,
93}
94
95impl TraceEmitter {
96 /// Create a new emitter with a fresh broadcast channel.
97 pub fn new() -> Self {
98 let (sender, _) = broadcast::channel(TRACE_CHANNEL_CAPACITY);
99 Self {
100 sender,
101 event_counter: Arc::new(AtomicU32::new(0)),
102 dropped_counter: Arc::new(AtomicU32::new(0)),
103 }
104 }
105
106 /// Subscribe to the event stream. Returns a receiver that will
107 /// receive future events. Events sent before this call are not
108 /// replayed (no backfill — deferred per RFC 006 unresolved
109 /// questions §4).
110 pub fn subscribe(&self) -> broadcast::Receiver<MatchTraceEvent> {
111 self.sender.subscribe()
112 }
113
114 /// Emit one event. If the channel is full, the event is dropped
115 /// and the internal drop counter incremented.
116 pub fn emit(
117 &self,
118 received_at_ms: u64,
119 duration_ms: u32,
120 request: RequestSummary,
121 outcome: Outcome,
122 ) {
123 let event_id = self.event_counter.fetch_add(1, Ordering::Relaxed) as u64;
124 let dropped_count = self.dropped_counter.swap(0, Ordering::Relaxed);
125
126 let event = MatchTraceEvent {
127 event_id,
128 received_at_ms,
129 duration_ms,
130 request,
131 outcome,
132 dropped_count,
133 };
134
135 if self.sender.send(event).is_err() {
136 // No active receivers OR channel full — increment dropped.
137 self.dropped_counter.fetch_add(1, Ordering::Relaxed);
138 }
139 }
140
141 /// Return `true` iff at least one subscriber is currently active.
142 pub fn has_subscribers(&self) -> bool {
143 self.sender.receiver_count() > 0
144 }
145}
146
147impl Default for TraceEmitter {
148 fn default() -> Self {
149 Self::new()
150 }
151}
152
153// ── Transport stub (RFC 006 §deferred) ───────────────────────────────
154
155/// Configuration for the out-of-process trace transport.
156///
157/// Currently only UDS is listed; TCP and named-pipe variants are
158/// reserved for the future implementation.
159#[derive(Clone, Debug, Default)]
160pub enum TraceTransportConfig {
161 /// Unix-domain socket at the given path.
162 Uds { path: String },
163 /// TCP loopback on the given address.
164 Tcp { addr: String },
165 /// Disabled — no out-of-process forwarding.
166 #[default]
167 Disabled,
168}
169
170/// Stub transport layer.
171///
172/// The `accept_loop` method is explicitly unimplemented — it is
173/// intentionally left for a future session, as documented in the RFC 006
174/// implementation notes and the session handoff document.
175pub struct TraceTransport;
176
177impl TraceTransport {
178 /// Start accepting out-of-process subscribers and forwarding events.
179 ///
180 /// # Panics
181 ///
182 /// Always panics with "not yet implemented — RFC 006 socket I/O
183 /// transport is a stub, deferred to a future release."
184 pub async fn accept_loop(
185 _config: TraceTransportConfig,
186 _emitter: TraceEmitter,
187 ) -> ! {
188 unimplemented!(
189 "RFC 006 socket I/O transport is a stub, deferred to a future release. \
190 The in-process broadcast channel is fully functional."
191 )
192 }
193}
194
195// ── Timestamp helper ──────────────────────────────────────────────────
196
197/// Current time as Unix milliseconds.
198pub fn now_ms() -> u64 {
199 SystemTime::now()
200 .duration_since(UNIX_EPOCH)
201 .unwrap_or(Duration::ZERO)
202 .as_millis() as u64
203}
204
205// ── Tests ─────────────────────────────────────────────────────────────
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210
211 #[tokio::test]
212 async fn emit_received_by_subscriber() {
213 let emitter = TraceEmitter::new();
214 let mut rx = emitter.subscribe();
215
216 emitter.emit(
217 1_000_000,
218 5,
219 RequestSummary {
220 method: "GET".into(),
221 url_path: "/api/test".into(),
222 headers: vec![],
223 },
224 Outcome::Miss { status: 404 },
225 );
226
227 let event = rx.try_recv().expect("event should be in channel");
228 assert_eq!(event.event_id, 0);
229 assert_eq!(event.request.method, "GET");
230 assert_eq!(event.request.url_path, "/api/test");
231 assert_eq!(event.duration_ms, 5);
232 assert_eq!(event.dropped_count, 0);
233 assert!(matches!(event.outcome, Outcome::Miss { status: 404 }));
234 }
235
236 #[tokio::test]
237 async fn emit_with_no_subscriber_increments_dropped() {
238 let emitter = TraceEmitter::new();
239 // No subscriber — send should fail and increment dropped.
240 emitter.emit(
241 0,
242 0,
243 RequestSummary { method: "GET".into(), url_path: "/".into(), headers: vec![] },
244 Outcome::Miss { status: 404 },
245 );
246
247 // Now subscribe and emit again; dropped_count on the second
248 // event should reflect the one we lost.
249 let mut rx = emitter.subscribe();
250 emitter.emit(
251 0,
252 0,
253 RequestSummary { method: "GET".into(), url_path: "/".into(), headers: vec![] },
254 Outcome::Miss { status: 404 },
255 );
256
257 let event = rx.try_recv().expect("second event visible to new subscriber");
258 assert_eq!(event.dropped_count, 1, "first event should be counted as dropped");
259 }
260
261 #[test]
262 fn has_subscribers_reflects_state() {
263 let emitter = TraceEmitter::new();
264 assert!(!emitter.has_subscribers());
265 let _rx = emitter.subscribe();
266 assert!(emitter.has_subscribers());
267 }
268}