apimock-server 5.10.0

HTTP(S) server runtime for apimock: listener loop, request handling, response building.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
//! Live match-trace channel — RFC 006 (in-process) + RFC 009 (transport).
//!
//! # Architecture
//!
//! ```text
//!  HTTP handler ──► TraceEmitter::emit()
//!//!              tokio::sync::broadcast (bounded, 1024)
//!//!           ┌─────────────┴──────────────┐
//!      in-process                  TraceTransport::accept_loop
//!      subscriber                  (UDS on Unix, TCP fallback)
//!//!                                  up to 4 GUI connections
//!                                  (newline-delimited JSON)
//! ```
//!
//! # Transport variants
//!
//! | `TraceTransportConfig` | Platform | Notes |
//! |---|---|---|
//! | `Uds { path }` | Unix/macOS | Default when available |
//! | `Tcp { addr }` | All | Portable fallback; `addr = "127.0.0.1:0"` assigns ephemeral port |
//! | `Disabled` | All | No out-of-process forwarding (default) |
//!
//! # Back-pressure
//!
//! The broadcast channel is bounded by [`TRACE_CHANNEL_CAPACITY`]. When
//! the channel is full, `emit` drops the event and increments an internal
//! counter; the count is reported as `dropped_count` on the next event.
//!
//! Slow out-of-process subscribers receive a `RecvError::Lagged` from the
//! broadcast channel; the gap is reported in the next JSON line via
//! `dropped_count`.
//!
//! # Subscriber cap
//!
//! At most [`MAX_SUBSCRIBERS`] out-of-process connections are accepted.
//! A fifth connection receives `{"error":"max_subscribers_reached"}` and
//! is then closed.

use std::sync::{
    Arc,
    atomic::{AtomicU32, AtomicUsize, Ordering},
};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use serde::Serialize;
use tokio::io::AsyncWriteExt;
use tokio::sync::broadcast;

/// Capacity of the broadcast channel (events).
pub const TRACE_CHANNEL_CAPACITY: usize = 1_024;
/// Maximum concurrent out-of-process subscriber connections.
pub const MAX_SUBSCRIBERS: usize = 4;

// ── Event schema ──────────────────────────────────────────────────────

/// A single request/response trace event.
#[derive(Clone, Debug, Serialize)]
pub struct MatchTraceEvent {
    /// Monotonically increasing event counter within this server run.
    pub event_id: u64,
    /// Schema version — bumped on breaking changes.
    pub schema_version: u8,
    /// Unix timestamp (milliseconds) when the request was received.
    pub received_at_ms: u64,
    /// Processing time in milliseconds.
    pub duration_ms: u32,
    /// Key fields from the incoming request.
    pub request: RequestSummary,
    /// What the server decided to do with the request.
    pub outcome: Outcome,
    /// Events dropped since the last successfully delivered event.
    pub dropped_count: u32,
}

/// Key fields from the incoming HTTP request.
#[derive(Clone, Debug, Serialize)]
pub struct RequestSummary {
    pub method: String,
    pub url_path: String,
    /// Selected request headers (display-only; body capture deferred).
    pub headers: Vec<(String, String)>,
}

/// What the server decided to do with the request.
#[derive(Clone, Debug, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Outcome {
    Matched { rule_set_index: usize, rule_index: usize },
    Fallback { file_path: String, status: u16 },
    Miss    { status: u16 },
    Error   { kind: String, message: String },
}

// ── Emitter ───────────────────────────────────────────────────────────

/// Shared handle to the trace broadcast channel.
///
/// Clone freely — each clone refers to the same underlying channel.
#[derive(Clone)]
pub struct TraceEmitter {
    sender:          broadcast::Sender<MatchTraceEvent>,
    event_counter:   Arc<AtomicU32>,
    dropped_counter: Arc<AtomicU32>,
}

impl TraceEmitter {
    pub fn new() -> Self {
        let (sender, _) = broadcast::channel(TRACE_CHANNEL_CAPACITY);
        Self {
            sender,
            event_counter:   Arc::new(AtomicU32::new(0)),
            dropped_counter: Arc::new(AtomicU32::new(0)),
        }
    }

    /// Subscribe to the event stream (in-process).
    pub fn subscribe(&self) -> broadcast::Receiver<MatchTraceEvent> {
        self.sender.subscribe()
    }

    /// Emit one event.  If the channel is full, the event is dropped and
    /// the internal drop counter incremented.
    pub fn emit(
        &self,
        received_at_ms: u64,
        duration_ms: u32,
        request: RequestSummary,
        outcome: Outcome,
    ) {
        let event_id    = self.event_counter.fetch_add(1, Ordering::Relaxed) as u64;
        let dropped_count = self.dropped_counter.swap(0, Ordering::Relaxed);

        let event = MatchTraceEvent {
            event_id,
            schema_version: 1,
            received_at_ms,
            duration_ms,
            request,
            outcome,
            dropped_count,
        };

        if self.sender.send(event).is_err() {
            self.dropped_counter.fetch_add(1, Ordering::Relaxed);
        }
    }

    /// Returns `true` iff at least one receiver is currently active.
    pub fn has_subscribers(&self) -> bool {
        self.sender.receiver_count() > 0
    }
}

impl Default for TraceEmitter { fn default() -> Self { Self::new() } }

// ── Transport configuration ───────────────────────────────────────────

/// Configuration for the out-of-process transport layer.
#[derive(Clone, Debug, Default)]
pub enum TraceTransportConfig {
    /// Unix-domain socket at the given path (Unix/macOS only).
    #[cfg(unix)]
    Uds { path: String },
    /// TCP loopback socket (portable fallback).
    Tcp { addr: String },
    /// No out-of-process forwarding.
    #[default]
    Disabled,
}

// ── Transport implementation ──────────────────────────────────────────

pub struct TraceTransport;

impl TraceTransport {
    /// Start accepting out-of-process subscriber connections and forwarding
    /// events as newline-delimited JSON.
    ///
    /// This future runs forever (until the process exits or the socket
    /// errors fatally). Spawn it with `tokio::spawn`.
    ///
    /// # Subscriber cap
    ///
    /// At most [`MAX_SUBSCRIBERS`] connections are served simultaneously.
    /// Connection #`MAX_SUBSCRIBERS + 1` receives a JSON error line and
    /// is closed.
    pub async fn accept_loop(config: TraceTransportConfig, emitter: TraceEmitter) {
        match config {
            #[cfg(unix)]
            TraceTransportConfig::Uds { path } => {
                Self::uds_accept_loop(path, emitter).await
            }
            TraceTransportConfig::Tcp { addr } => {
                Self::tcp_accept_loop(addr, emitter).await
            }
            TraceTransportConfig::Disabled => {
                // No-op — transport disabled; in-process channel still works.
            }
        }
    }

    // ── TCP accept loop ───────────────────────────────────────────────

    async fn tcp_accept_loop(addr: String, emitter: TraceEmitter) {
        let listener = match tokio::net::TcpListener::bind(&addr).await {
            Ok(l) => {
                let bound = l.local_addr().map(|a| a.to_string())
                    .unwrap_or_else(|_| addr.clone());
                log::info!("trace transport: TCP listening on {}", bound);
                l
            }
            Err(e) => {
                log::error!("trace transport: failed to bind TCP {}: {}", addr, e);
                return;
            }
        };

        let active = Arc::new(AtomicUsize::new(0));
        loop {
            match listener.accept().await {
                Ok((stream, peer)) => {
                    let count = active.fetch_add(1, Ordering::Relaxed) + 1;
                    if count > MAX_SUBSCRIBERS {
                        active.fetch_sub(1, Ordering::Relaxed);
                        let active_clone = active.clone();
                        tokio::spawn(async move {
                            let (_, mut writer) = tokio::io::split(stream);
                            let _ = writer
                                .write_all(b"{\"error\":\"max_subscribers_reached\"}\n")
                                .await;
                            drop(active_clone);
                        });
                        continue;
                    }
                    log::debug!("trace: TCP subscriber connected from {}", peer);
                    let rx = emitter.subscribe();
                    let active_clone = active.clone();
                    tokio::spawn(async move {
                        let (_, writer) = tokio::io::split(stream);
                        Self::forward_events(writer, rx).await;
                        active_clone.fetch_sub(1, Ordering::Relaxed);
                        log::debug!("trace: TCP subscriber {} disconnected", peer);
                    });
                }
                Err(e) => {
                    log::error!("trace: TCP accept error: {}", e);
                    tokio::time::sleep(Duration::from_millis(100)).await;
                }
            }
        }
    }

    // ── UDS accept loop (Unix only) ───────────────────────────────────

    #[cfg(unix)]
    async fn uds_accept_loop(path: String, emitter: TraceEmitter) {
        // Remove stale socket file from a previous run.
        let _ = std::fs::remove_file(&path);

        let listener = match tokio::net::UnixListener::bind(&path) {
            Ok(l) => {
                log::info!("trace transport: UDS listening at {}", path);
                l
            }
            Err(e) => {
                log::error!("trace transport: failed to bind UDS {}: {}", path, e);
                return;
            }
        };

        let active = Arc::new(AtomicUsize::new(0));
        loop {
            match listener.accept().await {
                Ok((stream, _)) => {
                    let count = active.fetch_add(1, Ordering::Relaxed) + 1;
                    if count > MAX_SUBSCRIBERS {
                        active.fetch_sub(1, Ordering::Relaxed);
                        tokio::spawn(async move {
                            let (_, mut writer) = tokio::io::split(stream);
                            let _ = writer
                                .write_all(b"{\"error\":\"max_subscribers_reached\"}\n")
                                .await;
                        });
                        continue;
                    }
                    log::debug!("trace: UDS subscriber connected");
                    let rx = emitter.subscribe();
                    let active_clone = active.clone();
                    tokio::spawn(async move {
                        let (_, writer) = tokio::io::split(stream);
                        Self::forward_events(writer, rx).await;
                        active_clone.fetch_sub(1, Ordering::Relaxed);
                        log::debug!("trace: UDS subscriber disconnected");
                    });
                }
                Err(e) => {
                    log::error!("trace: UDS accept error: {}", e);
                    tokio::time::sleep(Duration::from_millis(100)).await;
                }
            }
        }
    }

    // ── Event forwarder (shared by UDS and TCP) ───────────────────────

    /// Read events from `rx` and write each as a JSON line to `writer`
    /// until the connection closes or the channel is closed.
    async fn forward_events<W>(mut writer: W, mut rx: broadcast::Receiver<MatchTraceEvent>)
    where
        W: tokio::io::AsyncWrite + Unpin,
    {
        loop {
            let event = match rx.recv().await {
                Ok(e) => e,
                Err(broadcast::error::RecvError::Lagged(n)) => {
                    // Receiver was too slow; `n` events were dropped.
                    // The next event will carry `dropped_count` so the
                    // subscriber can detect the gap. Continue.
                    log::debug!("trace: subscriber lagged, {} events dropped", n);
                    continue;
                }
                Err(broadcast::error::RecvError::Closed) => break,
            };

            let mut line = match serde_json::to_string(&event) {
                Ok(s) => s,
                Err(e) => {
                    log::error!("trace: serialise error: {}", e);
                    continue;
                }
            };
            line.push('\n');

            if writer.write_all(line.as_bytes()).await.is_err() {
                break; // subscriber disconnected
            }
        }
    }
}

// ── Timestamp helper ──────────────────────────────────────────────────

/// Current Unix time in milliseconds.
pub fn now_ms() -> u64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or(Duration::ZERO)
        .as_millis() as u64
}

// ── Tests ─────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn emit_received_by_subscriber() {
        let emitter = TraceEmitter::new();
        let mut rx = emitter.subscribe();

        emitter.emit(
            1_000_000, 5,
            RequestSummary { method: "GET".into(), url_path: "/api/test".into(), headers: vec![] },
            Outcome::Miss { status: 404 },
        );

        let event = rx.try_recv().expect("event in channel");
        assert_eq!(event.event_id, 0);
        assert_eq!(event.schema_version, 1);
        assert_eq!(event.request.method, "GET");
        assert_eq!(event.duration_ms, 5);
        assert_eq!(event.dropped_count, 0);
        assert!(matches!(event.outcome, Outcome::Miss { status: 404 }));
    }

    #[tokio::test]
    async fn emit_no_subscriber_increments_dropped() {
        let emitter = TraceEmitter::new();
        emitter.emit(0, 0,
            RequestSummary { method: "GET".into(), url_path: "/".into(), headers: vec![] },
            Outcome::Miss { status: 404 },
        );
        let mut rx = emitter.subscribe();
        emitter.emit(0, 0,
            RequestSummary { method: "GET".into(), url_path: "/".into(), headers: vec![] },
            Outcome::Miss { status: 200 },
        );
        let event = rx.try_recv().expect("second event visible");
        assert_eq!(event.dropped_count, 1, "first event should be counted dropped");
    }

    #[test]
    fn has_subscribers_reflects_state() {
        let emitter = TraceEmitter::new();
        assert!(!emitter.has_subscribers());
        let _rx = emitter.subscribe();
        assert!(emitter.has_subscribers());
    }

    #[tokio::test]
    async fn outcome_serialises_correctly() {
        let event = MatchTraceEvent {
            event_id: 7, schema_version: 1, received_at_ms: 0, duration_ms: 0,
            request: RequestSummary { method: "POST".into(), url_path: "/x".into(), headers: vec![] },
            outcome: Outcome::Matched { rule_set_index: 0, rule_index: 2 },
            dropped_count: 0,
        };
        let json = serde_json::to_string(&event).unwrap();
        assert!(json.contains("\"type\":\"matched\""));
        assert!(json.contains("\"rule_index\":2"));
        assert!(json.contains("\"schema_version\":1"));
    }

    #[tokio::test]
    async fn tcp_transport_delivers_events() {
        let emitter = TraceEmitter::new();
        let emitter_clone = emitter.clone();

        // Bind on an ephemeral port.
        let config = TraceTransportConfig::Tcp { addr: "127.0.0.1:0".to_owned() };

        // We need to know the actual bound port before connecting.
        // Bind the listener ourselves to capture the address, then hand
        // the address to the transport accept loop via a channel.
        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
        let bound_addr = listener.local_addr().unwrap();

        // Spawn a simplified accept loop that uses our pre-bound listener.
        tokio::spawn(async move {
            let (stream, _) = listener.accept().await.unwrap();
            let rx = emitter_clone.subscribe();
            let (_, writer) = tokio::io::split(stream);
            TraceTransport::forward_events(writer, rx).await;
        });

        // Connect a subscriber.
        let mut client = tokio::net::TcpStream::connect(bound_addr).await.unwrap();

        // Give the subscriber task a moment to subscribe before emitting.
        tokio::time::sleep(std::time::Duration::from_millis(10)).await;

        emitter.emit(
            42, 3,
            RequestSummary { method: "GET".into(), url_path: "/ping".into(), headers: vec![] },
            Outcome::Miss { status: 404 },
        );

        // Read one JSON line from the TCP connection.
        use tokio::io::AsyncBufReadExt;
        let mut reader = tokio::io::BufReader::new(&mut client);
        let mut line = String::new();
        tokio::time::timeout(
            std::time::Duration::from_secs(2),
            reader.read_line(&mut line),
        )
        .await
        .expect("timeout")
        .expect("read ok");

        let value: serde_json::Value = serde_json::from_str(line.trim()).expect("valid JSON");
        assert_eq!(value["request"]["url_path"], "/ping");
        assert_eq!(value["outcome"]["type"], "miss");
        assert_eq!(value["schema_version"], 1);
    }
}