Skip to main content

mockforge_observability/
log_shipper.rs

1//! Structured request log shipper for hosted-mock deployments.
2//!
3//! `mockforge-cli serve` runs this in-container when configured by the
4//! orchestrator, capturing one event per HTTP request and forwarding the
5//! batch to MockForge Cloud's log-ingest endpoint. The Cloud admin UI then
6//! reads them back via the per-deployment "Requests" tab.
7//!
8//! Phase 2 (#224) gave hosted mocks Fly's container stdout/stderr. That's
9//! good for "did the app boot" but not for "what URL did the user just hit
10//! and what did we return." This module fills that gap.
11//!
12//! ## Configuration
13//!
14//! All env vars are optional — when any required one is missing the shipper
15//! is a no-op and `enqueue` calls drop their events. The orchestrator sets
16//! these on hosted-mock Fly machines:
17//!
18//! - `MOCKFORGE_LOG_INGEST_URL` — full URL to the ingest endpoint, e.g.
19//!   `https://api.mockforge.dev/api/v1/hosted-mocks/<id>/log-ingest`.
20//! - `MOCKFORGE_LOG_INGEST_TOKEN` — short-lived JWT scoped to the deployment.
21//! - `MOCKFORGE_LOG_INGEST_BATCH_SIZE` — events per POST (default 50).
22//! - `MOCKFORGE_LOG_INGEST_FLUSH_MS` — max batch age before flush (default 2000).
23//! - `MOCKFORGE_LOG_INGEST_BUFFER` — bounded channel capacity (default 1024).
24//!   When full, oldest events are dropped — observability code must never
25//!   block the request path.
26
27use chrono::{DateTime, Utc};
28use serde::{Deserialize, Serialize};
29use std::sync::Arc;
30use std::time::Duration;
31use tokio::sync::mpsc;
32use tracing::{debug, warn};
33
34/// One captured request/response pair. Fields kept thin so the shipper has
35/// no opinion on storage schema — the cloud side decides what to keep.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct RequestLogEvent {
38    pub timestamp: DateTime<Utc>,
39    pub method: String,
40    pub path: String,
41    pub status: u16,
42    pub latency_ms: u32,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub matched_route: Option<String>,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub client_ip: Option<String>,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub user_agent: Option<String>,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub request_id: Option<String>,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub bytes_in: Option<u64>,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub bytes_out: Option<u64>,
55}
56
57#[derive(Debug, Serialize)]
58struct IngestPayload<'a> {
59    events: &'a [RequestLogEvent],
60}
61
62/// Cheap cloneable handle. `enqueue` is non-blocking and never fails — it's
63/// safe to call from the request path. When the shipper isn't configured
64/// the handle is `None` and calls are zero-cost.
65#[derive(Clone)]
66pub struct LogShipperHandle {
67    inner: Option<Arc<Inner>>,
68}
69
70struct Inner {
71    sender: mpsc::Sender<RequestLogEvent>,
72}
73
74impl LogShipperHandle {
75    /// Construct a no-op handle. Used when the shipper isn't configured —
76    /// the request middleware can still call `enqueue` without checking.
77    pub fn disabled() -> Self {
78        Self { inner: None }
79    }
80
81    /// Non-blocking enqueue. Drops the event silently when the buffer is
82    /// full, which is the right tradeoff for a request-path component:
83    /// observability never blocks user traffic.
84    pub fn enqueue(&self, event: RequestLogEvent) {
85        if let Some(inner) = &self.inner {
86            let _ = inner.sender.try_send(event);
87        }
88    }
89
90    /// True if the shipper is actually running. Useful for skipping work
91    /// in middleware (e.g., decoding the user-agent header).
92    pub fn is_active(&self) -> bool {
93        self.inner.is_some()
94    }
95}
96
97/// Construct a shipper from environment variables. Returns a no-op handle
98/// when required env vars are missing (this is the common case in local
99/// dev). Spawns a background task that drains the channel and POSTs to the
100/// configured ingest URL; the task runs for the lifetime of the process.
101pub fn from_env() -> LogShipperHandle {
102    let url = match std::env::var("MOCKFORGE_LOG_INGEST_URL") {
103        Ok(u) if !u.trim().is_empty() => u,
104        _ => return LogShipperHandle::disabled(),
105    };
106    let token = match std::env::var("MOCKFORGE_LOG_INGEST_TOKEN") {
107        Ok(t) if !t.trim().is_empty() => t,
108        _ => return LogShipperHandle::disabled(),
109    };
110    let batch_size: usize = std::env::var("MOCKFORGE_LOG_INGEST_BATCH_SIZE")
111        .ok()
112        .and_then(|s| s.parse().ok())
113        .unwrap_or(50);
114    let flush_ms: u64 = std::env::var("MOCKFORGE_LOG_INGEST_FLUSH_MS")
115        .ok()
116        .and_then(|s| s.parse().ok())
117        .unwrap_or(2000);
118    let buffer: usize = std::env::var("MOCKFORGE_LOG_INGEST_BUFFER")
119        .ok()
120        .and_then(|s| s.parse().ok())
121        .unwrap_or(1024);
122
123    let client = match reqwest::Client::builder().timeout(Duration::from_secs(5)).build() {
124        Ok(c) => c,
125        Err(e) => {
126            warn!("LogShipper HTTP client init failed: {}", e);
127            return LogShipperHandle::disabled();
128        }
129    };
130
131    let (sender, receiver) = mpsc::channel::<RequestLogEvent>(buffer);
132    let inner = Arc::new(Inner { sender });
133
134    tokio::spawn(run(receiver, client, url, token, batch_size, flush_ms));
135
136    LogShipperHandle { inner: Some(inner) }
137}
138
139/// Background task: batch by count or by elapsed time, then POST. Errors
140/// are warn-logged and dropped — request volume on a healthy mock is far
141/// higher than retry capacity is worth, and the cloud-side ingest is the
142/// canonical store; we don't need at-least-once.
143async fn run(
144    mut receiver: mpsc::Receiver<RequestLogEvent>,
145    client: reqwest::Client,
146    url: String,
147    token: String,
148    batch_size: usize,
149    flush_ms: u64,
150) {
151    let flush_after = Duration::from_millis(flush_ms);
152    let mut batch: Vec<RequestLogEvent> = Vec::with_capacity(batch_size);
153    let mut deadline = tokio::time::Instant::now() + flush_after;
154
155    loop {
156        let timeout = tokio::time::sleep_until(deadline);
157        tokio::pin!(timeout);
158
159        tokio::select! {
160            biased;
161            evt = receiver.recv() => {
162                match evt {
163                    Some(e) => {
164                        batch.push(e);
165                        if batch.len() >= batch_size {
166                            send_batch(&client, &url, &token, &batch).await;
167                            batch.clear();
168                            deadline = tokio::time::Instant::now() + flush_after;
169                        }
170                    }
171                    None => {
172                        // Channel closed (process shutting down). Best-effort
173                        // final flush.
174                        if !batch.is_empty() {
175                            send_batch(&client, &url, &token, &batch).await;
176                        }
177                        return;
178                    }
179                }
180            }
181            _ = &mut timeout => {
182                if !batch.is_empty() {
183                    send_batch(&client, &url, &token, &batch).await;
184                    batch.clear();
185                }
186                deadline = tokio::time::Instant::now() + flush_after;
187            }
188        }
189    }
190}
191
192async fn send_batch(client: &reqwest::Client, url: &str, token: &str, events: &[RequestLogEvent]) {
193    let payload = IngestPayload { events };
194    debug!(count = events.len(), "Shipping log batch");
195    match client.post(url).bearer_auth(token).json(&payload).send().await {
196        Ok(resp) if resp.status().is_success() => {}
197        Ok(resp) => {
198            warn!(
199                status = %resp.status(),
200                count = events.len(),
201                "Log ingest non-success; dropping batch"
202            );
203        }
204        Err(e) => {
205            warn!(error = %e, count = events.len(), "Log ingest send failed; dropping batch");
206        }
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213
214    #[test]
215    fn disabled_handle_is_inert() {
216        let h = LogShipperHandle::disabled();
217        assert!(!h.is_active());
218        // Should not panic — non-blocking, drops silently.
219        h.enqueue(RequestLogEvent {
220            timestamp: Utc::now(),
221            method: "GET".into(),
222            path: "/test".into(),
223            status: 200,
224            latency_ms: 1,
225            matched_route: None,
226            client_ip: None,
227            user_agent: None,
228            request_id: None,
229            bytes_in: None,
230            bytes_out: None,
231        });
232    }
233
234    #[test]
235    fn from_env_returns_disabled_without_url_or_token() {
236        std::env::remove_var("MOCKFORGE_LOG_INGEST_URL");
237        std::env::remove_var("MOCKFORGE_LOG_INGEST_TOKEN");
238        assert!(!from_env().is_active());
239    }
240}