Skip to main content

vgi_rpc/
access_log.rs

1//! Structured access logging as a [`DispatchHook`].
2//!
3//! Emits one JSON record per RPC call via a writer the caller supplies.
4//! The schema matches the Python canonical (`vgi_rpc.access_log_conformance`
5//! validator) so logs are portable across implementations.
6
7use std::io::Write;
8use std::sync::mpsc::{SyncSender, TrySendError};
9use std::sync::{Arc, Mutex};
10use std::time::Instant;
11
12use serde_json::json;
13
14use crate::errors::RpcError;
15use crate::hooks::{CallStatistics, DispatchHook, DispatchInfo, HookToken};
16
17/// Where the hook sends formatted JSON lines.
18enum Sink {
19    /// Synchronous: dispatch thread holds the sink mutex during the write.
20    Sync(Arc<Mutex<dyn Write + Send>>),
21    /// Asynchronous: dispatch thread queues the line into a bounded
22    /// channel; a background writer thread drains it.
23    Async {
24        tx: SyncSender<Vec<u8>>,
25        dropped: Arc<std::sync::atomic::AtomicU64>,
26    },
27}
28
29/// A `DispatchHook` that writes one JSON line per call to an arbitrary
30/// `Write` sink. Entries carry the `vgi_rpc.access` logger name so the
31/// Python validator's filter (`.logger == "vgi_rpc.access"`) matches.
32///
33/// Two modes:
34/// - [`AccessLogHook::new`] / [`to_stderr`] write synchronously on the
35///   dispatch thread (acceptable for stderr or in-memory test sinks).
36/// - [`AccessLogHook::buffered`] queues into a bounded mpsc channel and
37///   drains on a background thread; on overflow it drops the entry and
38///   bumps a counter rather than blocking dispatch.
39pub struct AccessLogHook {
40    sink: Sink,
41    server_version: String,
42    /// When true, emit the full base64-encoded request batch as
43    /// `request_data` (DEBUG-equivalent — see [`Self::verbose`]).
44    /// When false (default), emit `original_request_bytes` +
45    /// `truncated: true` instead so the access-log schema's
46    /// "unary requires request_data unless truncated" invariant
47    /// still holds without ballooning every record by 8+ KiB.
48    verbose: bool,
49    /// Start instants keyed by request_id for duration tracking. For server
50    /// loads where request_id is always empty, a simple monotonically
51    /// increasing counter token is used instead.
52    starts: Mutex<std::collections::HashMap<HookToken, Instant>>,
53    next_token: std::sync::atomic::AtomicU64,
54}
55
56impl AccessLogHook {
57    /// Create an access log hook that writes synchronously to `sink`.
58    /// Suitable for stderr or in-memory sinks; for production file I/O
59    /// prefer [`AccessLogHook::buffered`] to keep dispatch threads off
60    /// the disk path.
61    pub fn new<W: Write + Send + 'static>(sink: W, server_version: impl Into<String>) -> Arc<Self> {
62        Arc::new(Self {
63            sink: Sink::Sync(Arc::new(Mutex::new(sink))),
64            server_version: server_version.into(),
65            verbose: false,
66            starts: Mutex::new(std::collections::HashMap::new()),
67            next_token: std::sync::atomic::AtomicU64::new(1),
68        })
69    }
70
71    /// Return a new `Arc<AccessLogHook>` with verbose request-data
72    /// emission enabled. Mirrors Python's
73    /// `_access_logger.isEnabledFor(logging.DEBUG)` behaviour where
74    /// the full base64-encoded request batch is included verbatim
75    /// rather than being elided via `truncated: true`.
76    pub fn with_verbose(self: Arc<Self>, verbose: bool) -> Arc<Self> {
77        if self.verbose == verbose {
78            return self;
79        }
80        let sink = match &self.sink {
81            Sink::Sync(m) => Sink::Sync(m.clone()),
82            Sink::Async { tx, dropped } => Sink::Async {
83                tx: tx.clone(),
84                dropped: dropped.clone(),
85            },
86        };
87        Arc::new(Self {
88            sink,
89            server_version: self.server_version.clone(),
90            verbose,
91            starts: Mutex::new(std::collections::HashMap::new()),
92            next_token: std::sync::atomic::AtomicU64::new(1),
93        })
94    }
95
96    /// Create a hook that writes asynchronously: the dispatch thread
97    /// pushes a formatted line into a bounded channel of `capacity`
98    /// entries and a background thread drains it into `sink`. When the
99    /// channel is full, the entry is *dropped* (counted by
100    /// [`dropped_count`](Self::dropped_count)) instead of blocking
101    /// dispatch — this is the right tradeoff for high-throughput servers
102    /// where occasional log loss is preferable to head-of-line blocking
103    /// behind a stalled disk.
104    ///
105    /// The writer thread exits when the hook is dropped (sender closes).
106    pub fn buffered<W: Write + Send + 'static>(
107        sink: W,
108        server_version: impl Into<String>,
109        capacity: usize,
110    ) -> Arc<Self> {
111        let (tx, rx) = std::sync::mpsc::sync_channel::<Vec<u8>>(capacity.max(1));
112        let dropped = Arc::new(std::sync::atomic::AtomicU64::new(0));
113        let mut sink = sink;
114        std::thread::Builder::new()
115            .name("vgi-rpc-access-log".into())
116            .spawn(move || {
117                while let Ok(line) = rx.recv() {
118                    if sink.write_all(&line).is_err() {
119                        return;
120                    }
121                    if sink.write_all(b"\n").is_err() {
122                        return;
123                    }
124                    let _ = sink.flush();
125                }
126            })
127            .expect("spawn access-log writer thread");
128        Arc::new(Self {
129            sink: Sink::Async { tx, dropped },
130            server_version: server_version.into(),
131            verbose: false,
132            starts: Mutex::new(std::collections::HashMap::new()),
133            next_token: std::sync::atomic::AtomicU64::new(1),
134        })
135    }
136
137    /// Convenience: write access logs to stderr synchronously
138    /// (one JSON line per entry).
139    pub fn to_stderr(server_version: impl Into<String>) -> Arc<Self> {
140        Self::new(std::io::stderr(), server_version)
141    }
142
143    /// Number of entries dropped because the async channel was full.
144    /// Always zero for synchronous hooks.
145    pub fn dropped_count(&self) -> u64 {
146        match &self.sink {
147            Sink::Async { dropped, .. } => dropped.load(std::sync::atomic::Ordering::Relaxed),
148            Sink::Sync(_) => 0,
149        }
150    }
151}
152
153impl DispatchHook for AccessLogHook {
154    fn on_dispatch_start(&self, _info: &DispatchInfo) -> HookToken {
155        let token = self
156            .next_token
157            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
158        self.starts.lock().unwrap().insert(token, Instant::now());
159        token
160    }
161
162    fn on_dispatch_end(
163        &self,
164        token: HookToken,
165        info: &DispatchInfo,
166        error: Option<&RpcError>,
167        stats: &CallStatistics,
168    ) {
169        let start = self.starts.lock().unwrap().remove(&token);
170        let duration_ms = start
171            .map(|t| t.elapsed().as_secs_f64() * 1000.0)
172            .unwrap_or(0.0);
173        let status = if error.is_some() { "error" } else { "ok" };
174
175        // Build the record as a JSON object — schema-aligned with
176        // docs/access-log-spec.md in the Python reference repo.
177        let mut rec = serde_json::Map::new();
178        rec.insert("timestamp".into(), json!(rfc3339_utc_millis()));
179        rec.insert("level".into(), json!("INFO"));
180        rec.insert("logger".into(), json!("vgi_rpc.access"));
181        rec.insert(
182            "message".into(),
183            json!(format!("{}.{} {}", info.protocol, info.method, status)),
184        );
185        rec.insert("server_id".into(), json!(info.server_id));
186        rec.insert("protocol".into(), json!(info.protocol));
187        rec.insert("protocol_hash".into(), json!(info.protocol_hash));
188        rec.insert("method".into(), json!(info.method));
189        rec.insert("method_type".into(), json!(info.method_type));
190        rec.insert("principal".into(), json!(info.principal));
191        rec.insert("auth_domain".into(), json!(info.auth_domain));
192        rec.insert("authenticated".into(), json!(info.authenticated));
193        rec.insert("remote_addr".into(), json!(info.remote_addr));
194        rec.insert(
195            "duration_ms".into(),
196            json!((duration_ms * 100.0).round() / 100.0),
197        );
198        rec.insert("status".into(), json!(status));
199        rec.insert(
200            "error_type".into(),
201            json!(error.map(|e| e.error_type.clone()).unwrap_or_default()),
202        );
203
204        if let Some(err) = error {
205            rec.insert("error_message".into(), json!(err.message));
206        }
207        if !self.server_version.is_empty() {
208            rec.insert("server_version".into(), json!(self.server_version));
209        }
210        if !info.protocol_version.is_empty() {
211            rec.insert("protocol_version".into(), json!(info.protocol_version));
212        }
213        if !info.request_id.is_empty() {
214            rec.insert("request_id".into(), json!(info.request_id));
215        }
216        if info.http_status > 0 {
217            rec.insert("http_status".into(), json!(info.http_status));
218        }
219        if !info.request_data.is_empty() {
220            let encoded = base64_encode(&info.request_data);
221            if self.verbose {
222                rec.insert("request_data".into(), json!(encoded));
223            } else {
224                // INFO-level default: omit the full payload but keep the
225                // schema invariant via `original_request_bytes` +
226                // `truncated: true`. Mirrors Python's
227                // `_access_logger.isEnabledFor(logging.DEBUG)` gate.
228                rec.insert("original_request_bytes".into(), json!(encoded.len()));
229                rec.insert("truncated".into(), json!(true));
230            }
231        }
232        if info.method_type == "stream" {
233            let sid = if info.stream_id.is_empty() {
234                random_stream_id()
235            } else {
236                info.stream_id.clone()
237            };
238            rec.insert("stream_id".into(), json!(sid));
239        }
240        if info.cancelled {
241            rec.insert("cancelled".into(), json!(true));
242        }
243        if stats.input_batches
244            + stats.output_batches
245            + stats.input_rows
246            + stats.output_rows
247            + stats.input_bytes
248            + stats.output_bytes
249            != 0
250        {
251            rec.insert("input_batches".into(), json!(stats.input_batches));
252            rec.insert("output_batches".into(), json!(stats.output_batches));
253            rec.insert("input_rows".into(), json!(stats.input_rows));
254            rec.insert("output_rows".into(), json!(stats.output_rows));
255            rec.insert("input_bytes".into(), json!(stats.input_bytes));
256            rec.insert("output_bytes".into(), json!(stats.output_bytes));
257        }
258
259        let line = serde_json::Value::Object(rec).to_string();
260        match &self.sink {
261            Sink::Sync(m) => {
262                let mut w = m.lock().unwrap();
263                let _ = writeln!(w, "{line}");
264                let _ = w.flush();
265            }
266            Sink::Async { tx, dropped } => {
267                if let Err(e) = tx.try_send(line.into_bytes()) {
268                    match e {
269                        TrySendError::Full(_) => {
270                            dropped.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
271                        }
272                        TrySendError::Disconnected(_) => {
273                            // Writer thread exited; treat as dropped silently.
274                            dropped.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
275                        }
276                    }
277                }
278            }
279        }
280    }
281}
282
283/// Format the current wall-clock time as RFC 3339 UTC with millisecond
284/// precision, matching the access-log spec's `timestamp` regex.
285pub(crate) fn rfc3339_utc_millis() -> String {
286    use std::time::{SystemTime, UNIX_EPOCH};
287    let dur = SystemTime::now()
288        .duration_since(UNIX_EPOCH)
289        .unwrap_or_default();
290    let total_ms = dur.as_millis() as i64;
291    let secs = total_ms / 1000;
292    let millis = (total_ms % 1000) as u32;
293
294    // Civil time conversion using Howard Hinnant's algorithm.
295    let z = secs.div_euclid(86_400);
296    let sod = secs.rem_euclid(86_400) as u32;
297    let z = z + 719_468;
298    let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
299    let doe = (z - era * 146_097) as u32;
300    let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
301    let y = (yoe as i64) + era * 400;
302    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
303    let mp = (5 * doy + 2) / 153;
304    let d = doy - (153 * mp + 2) / 5 + 1;
305    let m = if mp < 10 { mp + 3 } else { mp - 9 };
306    let y = if m <= 2 { y + 1 } else { y };
307
308    let h = sod / 3600;
309    let mi = (sod / 60) % 60;
310    let s = sod % 60;
311    format!(
312        "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z",
313        y, m, d, h, mi, s, millis
314    )
315}
316
317/// Standard base64 (RFC 4648, padded). Inlined here so the access-log module
318/// stays usable without the optional `base64` crate dependency.
319fn base64_encode(bytes: &[u8]) -> String {
320    const ALPHABET: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
321    let mut out = String::with_capacity(bytes.len().div_ceil(3) * 4);
322    let mut chunks = bytes.chunks_exact(3);
323    for chunk in chunks.by_ref() {
324        let n = ((chunk[0] as u32) << 16) | ((chunk[1] as u32) << 8) | (chunk[2] as u32);
325        out.push(ALPHABET[((n >> 18) & 0x3F) as usize] as char);
326        out.push(ALPHABET[((n >> 12) & 0x3F) as usize] as char);
327        out.push(ALPHABET[((n >> 6) & 0x3F) as usize] as char);
328        out.push(ALPHABET[(n & 0x3F) as usize] as char);
329    }
330    let rem = chunks.remainder();
331    match rem.len() {
332        1 => {
333            let n = (rem[0] as u32) << 16;
334            out.push(ALPHABET[((n >> 18) & 0x3F) as usize] as char);
335            out.push(ALPHABET[((n >> 12) & 0x3F) as usize] as char);
336            out.push('=');
337            out.push('=');
338        }
339        2 => {
340            let n = ((rem[0] as u32) << 16) | ((rem[1] as u32) << 8);
341            out.push(ALPHABET[((n >> 18) & 0x3F) as usize] as char);
342            out.push(ALPHABET[((n >> 12) & 0x3F) as usize] as char);
343            out.push(ALPHABET[((n >> 6) & 0x3F) as usize] as char);
344            out.push('=');
345        }
346        _ => {}
347    }
348    out
349}
350
351/// Mint a 32-character lowercase hex stream_id. Use this at the start of a
352/// stream call and reuse the same value across init and continuations.
353pub(crate) fn random_stream_id() -> String {
354    use std::time::{SystemTime, UNIX_EPOCH};
355    // 128 bits drawn from time + a per-process atomic counter. Not
356    // cryptographic — adequate for log correlation.
357    static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
358    let lo = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
359    let hi = SystemTime::now()
360        .duration_since(UNIX_EPOCH)
361        .map(|d| d.as_nanos() as u64)
362        .unwrap_or(0);
363    let pid = std::process::id() as u64;
364    format!("{:016x}{:016x}", hi ^ pid, lo)
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use std::sync::Arc;
371
372    #[test]
373    fn emits_json_line_per_call() {
374        let buf: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(Vec::new()));
375        struct BufSink(Arc<Mutex<Vec<u8>>>);
376        impl Write for BufSink {
377            fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
378                self.0.lock().unwrap().extend_from_slice(b);
379                Ok(b.len())
380            }
381            fn flush(&mut self) -> std::io::Result<()> {
382                Ok(())
383            }
384        }
385        let hook: Arc<dyn DispatchHook> = AccessLogHook::new(BufSink(buf.clone()), "1.2.3");
386
387        let info = DispatchInfo {
388            method: "echo_string".into(),
389            method_type: "unary",
390            server_id: "srv".into(),
391            request_id: "req-1".into(),
392            transport_metadata: Arc::new(Default::default()),
393            principal: String::new(),
394            auth_domain: String::new(),
395            authenticated: false,
396            protocol: "Test".into(),
397            remote_addr: String::new(),
398            http_status: 0,
399            request_data: Vec::new(),
400            stream_id: String::new(),
401            cancelled: false,
402            claims: std::collections::BTreeMap::new(),
403            protocol_hash: String::new(),
404            protocol_version: String::new(),
405        };
406        let tok = hook.on_dispatch_start(&info);
407        hook.on_dispatch_end(tok, &info, None, &CallStatistics::default());
408
409        let line = String::from_utf8(buf.lock().unwrap().clone()).unwrap();
410        let rec: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
411        assert_eq!(rec["logger"], "vgi_rpc.access");
412        assert_eq!(rec["method"], "echo_string");
413        assert_eq!(rec["server_version"], "1.2.3");
414        assert_eq!(rec["status"], "ok");
415        assert_eq!(rec["authenticated"], false);
416    }
417
418    #[test]
419    fn buffered_writes_via_background_thread() {
420        // Sink that records every write; cloned across threads via Arc<Mutex>.
421        struct ChanSink(std::sync::mpsc::Sender<Vec<u8>>);
422        impl Write for ChanSink {
423            fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
424                let _ = self.0.send(b.to_vec());
425                Ok(b.len())
426            }
427            fn flush(&mut self) -> std::io::Result<()> {
428                Ok(())
429            }
430        }
431        let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
432        let hook: Arc<dyn DispatchHook> = AccessLogHook::buffered(ChanSink(tx), "1.2.3", 128);
433
434        let info = DispatchInfo {
435            method: "echo_string".into(),
436            method_type: "unary",
437            server_id: "srv".into(),
438            request_id: "req-1".into(),
439            transport_metadata: Arc::new(Default::default()),
440            principal: String::new(),
441            auth_domain: String::new(),
442            authenticated: false,
443            protocol: "Test".into(),
444            remote_addr: String::new(),
445            http_status: 0,
446            request_data: Vec::new(),
447            stream_id: String::new(),
448            cancelled: false,
449            claims: std::collections::BTreeMap::new(),
450            protocol_hash: String::new(),
451            protocol_version: String::new(),
452        };
453        let tok = hook.on_dispatch_start(&info);
454        hook.on_dispatch_end(tok, &info, None, &CallStatistics::default());
455
456        // Drain the receiver until we see the JSON body. The writer thread
457        // will write the line and a trailing newline as separate writes.
458        let mut acc = Vec::new();
459        while let Ok(chunk) = rx.recv_timeout(std::time::Duration::from_millis(500)) {
460            acc.extend(chunk);
461            if acc.contains(&b'\n') {
462                break;
463            }
464        }
465        let line = String::from_utf8(acc).unwrap();
466        assert!(line.contains("\"method\":\"echo_string\""), "got: {line}");
467        assert!(line.contains("\"server_version\":\"1.2.3\""), "got: {line}");
468    }
469
470    #[test]
471    fn buffered_drops_when_channel_full_instead_of_blocking() {
472        // Sink whose writes block forever — the writer thread will park
473        // on the very first entry, leaving the channel saturated.
474        struct ParkingSink;
475        impl Write for ParkingSink {
476            fn write(&mut self, _b: &[u8]) -> std::io::Result<usize> {
477                std::thread::park();
478                Ok(0)
479            }
480            fn flush(&mut self) -> std::io::Result<()> {
481                Ok(())
482            }
483        }
484        let hook = AccessLogHook::buffered(ParkingSink, "1.2.3", 1);
485        let dyn_hook: Arc<dyn DispatchHook> = hook.clone();
486        let info = DispatchInfo {
487            method: "m".into(),
488            method_type: "unary",
489            server_id: "s".into(),
490            request_id: String::new(),
491            transport_metadata: Arc::new(Default::default()),
492            principal: String::new(),
493            auth_domain: String::new(),
494            authenticated: false,
495            protocol: "Test".into(),
496            remote_addr: String::new(),
497            http_status: 0,
498            request_data: Vec::new(),
499            stream_id: String::new(),
500            cancelled: false,
501            claims: std::collections::BTreeMap::new(),
502            protocol_hash: String::new(),
503            protocol_version: String::new(),
504        };
505        // Push enough entries that the bounded channel overflows.
506        for _ in 0..50 {
507            let tok = dyn_hook.on_dispatch_start(&info);
508            dyn_hook.on_dispatch_end(tok, &info, None, &CallStatistics::default());
509        }
510        // Some entries must have been dropped — this is the property under
511        // test (dispatch never blocked even though the sink is wedged).
512        assert!(
513            hook.dropped_count() > 0,
514            "expected drops on saturated buffered sink, got {}",
515            hook.dropped_count()
516        );
517    }
518
519    #[test]
520    fn error_entries_carry_error_message() {
521        let buf: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(Vec::new()));
522        struct BufSink(Arc<Mutex<Vec<u8>>>);
523        impl Write for BufSink {
524            fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
525                self.0.lock().unwrap().extend_from_slice(b);
526                Ok(b.len())
527            }
528            fn flush(&mut self) -> std::io::Result<()> {
529                Ok(())
530            }
531        }
532        let hook: Arc<dyn DispatchHook> = AccessLogHook::new(BufSink(buf.clone()), "1.2.3");
533        let info = DispatchInfo {
534            method: "raise_value_error".into(),
535            method_type: "unary",
536            server_id: "srv".into(),
537            request_id: String::new(),
538            transport_metadata: Arc::new(Default::default()),
539            principal: String::new(),
540            auth_domain: String::new(),
541            authenticated: false,
542            protocol: "Test".into(),
543            remote_addr: String::new(),
544            http_status: 0,
545            request_data: Vec::new(),
546            stream_id: String::new(),
547            cancelled: false,
548            claims: std::collections::BTreeMap::new(),
549            protocol_hash: String::new(),
550            protocol_version: String::new(),
551        };
552        let tok = hook.on_dispatch_start(&info);
553        let err = RpcError::value_error("boom");
554        hook.on_dispatch_end(tok, &info, Some(&err), &CallStatistics::default());
555        let line = String::from_utf8(buf.lock().unwrap().clone()).unwrap();
556        let rec: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
557        assert_eq!(rec["status"], "error");
558        assert_eq!(rec["error_type"], "ValueError");
559        assert_eq!(rec["error_message"], "boom");
560    }
561}