1use std::io::Write;
8use std::sync::mpsc::{SyncSender, TrySendError};
9use std::sync::{Arc, Mutex};
10use std::time::Instant;
11
12use serde_json::json;
13
14use crate::errors::RpcError;
15use crate::hooks::{CallStatistics, DispatchHook, DispatchInfo, HookToken};
16
17enum Sink {
19 Sync(Arc<Mutex<dyn Write + Send>>),
21 Async {
24 tx: SyncSender<Vec<u8>>,
25 dropped: Arc<std::sync::atomic::AtomicU64>,
26 },
27}
28
29pub struct AccessLogHook {
40 sink: Sink,
41 server_version: String,
42 verbose: bool,
49 starts: Mutex<std::collections::HashMap<HookToken, Instant>>,
53 next_token: std::sync::atomic::AtomicU64,
54}
55
56impl AccessLogHook {
57 pub fn new<W: Write + Send + 'static>(sink: W, server_version: impl Into<String>) -> Arc<Self> {
62 Arc::new(Self {
63 sink: Sink::Sync(Arc::new(Mutex::new(sink))),
64 server_version: server_version.into(),
65 verbose: false,
66 starts: Mutex::new(std::collections::HashMap::new()),
67 next_token: std::sync::atomic::AtomicU64::new(1),
68 })
69 }
70
71 pub fn with_verbose(self: Arc<Self>, verbose: bool) -> Arc<Self> {
77 if self.verbose == verbose {
78 return self;
79 }
80 let sink = match &self.sink {
81 Sink::Sync(m) => Sink::Sync(m.clone()),
82 Sink::Async { tx, dropped } => Sink::Async {
83 tx: tx.clone(),
84 dropped: dropped.clone(),
85 },
86 };
87 Arc::new(Self {
88 sink,
89 server_version: self.server_version.clone(),
90 verbose,
91 starts: Mutex::new(std::collections::HashMap::new()),
92 next_token: std::sync::atomic::AtomicU64::new(1),
93 })
94 }
95
96 pub fn buffered<W: Write + Send + 'static>(
107 sink: W,
108 server_version: impl Into<String>,
109 capacity: usize,
110 ) -> Arc<Self> {
111 let (tx, rx) = std::sync::mpsc::sync_channel::<Vec<u8>>(capacity.max(1));
112 let dropped = Arc::new(std::sync::atomic::AtomicU64::new(0));
113 let mut sink = sink;
114 std::thread::Builder::new()
115 .name("vgi-rpc-access-log".into())
116 .spawn(move || {
117 while let Ok(line) = rx.recv() {
118 if sink.write_all(&line).is_err() {
119 return;
120 }
121 if sink.write_all(b"\n").is_err() {
122 return;
123 }
124 let _ = sink.flush();
125 }
126 })
127 .expect("spawn access-log writer thread");
128 Arc::new(Self {
129 sink: Sink::Async { tx, dropped },
130 server_version: server_version.into(),
131 verbose: false,
132 starts: Mutex::new(std::collections::HashMap::new()),
133 next_token: std::sync::atomic::AtomicU64::new(1),
134 })
135 }
136
137 pub fn to_stderr(server_version: impl Into<String>) -> Arc<Self> {
140 Self::new(std::io::stderr(), server_version)
141 }
142
143 pub fn dropped_count(&self) -> u64 {
146 match &self.sink {
147 Sink::Async { dropped, .. } => dropped.load(std::sync::atomic::Ordering::Relaxed),
148 Sink::Sync(_) => 0,
149 }
150 }
151}
152
153impl DispatchHook for AccessLogHook {
154 fn on_dispatch_start(&self, _info: &DispatchInfo) -> HookToken {
155 let token = self
156 .next_token
157 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
158 self.starts.lock().unwrap().insert(token, Instant::now());
159 token
160 }
161
162 fn on_dispatch_end(
163 &self,
164 token: HookToken,
165 info: &DispatchInfo,
166 error: Option<&RpcError>,
167 stats: &CallStatistics,
168 ) {
169 let start = self.starts.lock().unwrap().remove(&token);
170 let duration_ms = start
171 .map(|t| t.elapsed().as_secs_f64() * 1000.0)
172 .unwrap_or(0.0);
173 let status = if error.is_some() { "error" } else { "ok" };
174
175 let mut rec = serde_json::Map::new();
178 rec.insert("timestamp".into(), json!(rfc3339_utc_millis()));
179 rec.insert("level".into(), json!("INFO"));
180 rec.insert("logger".into(), json!("vgi_rpc.access"));
181 rec.insert(
182 "message".into(),
183 json!(format!("{}.{} {}", info.protocol, info.method, status)),
184 );
185 rec.insert("server_id".into(), json!(info.server_id));
186 rec.insert("protocol".into(), json!(info.protocol));
187 rec.insert("protocol_hash".into(), json!(info.protocol_hash));
188 rec.insert("method".into(), json!(info.method));
189 rec.insert("method_type".into(), json!(info.method_type));
190 rec.insert("principal".into(), json!(info.principal));
191 rec.insert("auth_domain".into(), json!(info.auth_domain));
192 rec.insert("authenticated".into(), json!(info.authenticated));
193 rec.insert("remote_addr".into(), json!(info.remote_addr));
194 rec.insert(
195 "duration_ms".into(),
196 json!((duration_ms * 100.0).round() / 100.0),
197 );
198 rec.insert("status".into(), json!(status));
199 rec.insert(
200 "error_type".into(),
201 json!(error.map(|e| e.error_type.clone()).unwrap_or_default()),
202 );
203
204 if let Some(err) = error {
205 rec.insert("error_message".into(), json!(err.message));
206 }
207 if !self.server_version.is_empty() {
208 rec.insert("server_version".into(), json!(self.server_version));
209 }
210 if !info.protocol_version.is_empty() {
211 rec.insert("protocol_version".into(), json!(info.protocol_version));
212 }
213 if !info.request_id.is_empty() {
214 rec.insert("request_id".into(), json!(info.request_id));
215 }
216 if info.http_status > 0 {
217 rec.insert("http_status".into(), json!(info.http_status));
218 }
219 if !info.request_data.is_empty() {
220 let encoded = base64_encode(&info.request_data);
221 if self.verbose {
222 rec.insert("request_data".into(), json!(encoded));
223 } else {
224 rec.insert("original_request_bytes".into(), json!(encoded.len()));
229 rec.insert("truncated".into(), json!(true));
230 }
231 }
232 if info.method_type == "stream" {
233 let sid = if info.stream_id.is_empty() {
234 random_stream_id()
235 } else {
236 info.stream_id.clone()
237 };
238 rec.insert("stream_id".into(), json!(sid));
239 }
240 if info.cancelled {
241 rec.insert("cancelled".into(), json!(true));
242 }
243 if stats.input_batches
244 + stats.output_batches
245 + stats.input_rows
246 + stats.output_rows
247 + stats.input_bytes
248 + stats.output_bytes
249 != 0
250 {
251 rec.insert("input_batches".into(), json!(stats.input_batches));
252 rec.insert("output_batches".into(), json!(stats.output_batches));
253 rec.insert("input_rows".into(), json!(stats.input_rows));
254 rec.insert("output_rows".into(), json!(stats.output_rows));
255 rec.insert("input_bytes".into(), json!(stats.input_bytes));
256 rec.insert("output_bytes".into(), json!(stats.output_bytes));
257 }
258
259 let line = serde_json::Value::Object(rec).to_string();
260 match &self.sink {
261 Sink::Sync(m) => {
262 let mut w = m.lock().unwrap();
263 let _ = writeln!(w, "{line}");
264 let _ = w.flush();
265 }
266 Sink::Async { tx, dropped } => {
267 if let Err(e) = tx.try_send(line.into_bytes()) {
268 match e {
269 TrySendError::Full(_) => {
270 dropped.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
271 }
272 TrySendError::Disconnected(_) => {
273 dropped.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
275 }
276 }
277 }
278 }
279 }
280 }
281}
282
283pub(crate) fn rfc3339_utc_millis() -> String {
286 use std::time::{SystemTime, UNIX_EPOCH};
287 let dur = SystemTime::now()
288 .duration_since(UNIX_EPOCH)
289 .unwrap_or_default();
290 let total_ms = dur.as_millis() as i64;
291 let secs = total_ms / 1000;
292 let millis = (total_ms % 1000) as u32;
293
294 let z = secs.div_euclid(86_400);
296 let sod = secs.rem_euclid(86_400) as u32;
297 let z = z + 719_468;
298 let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
299 let doe = (z - era * 146_097) as u32;
300 let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
301 let y = (yoe as i64) + era * 400;
302 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
303 let mp = (5 * doy + 2) / 153;
304 let d = doy - (153 * mp + 2) / 5 + 1;
305 let m = if mp < 10 { mp + 3 } else { mp - 9 };
306 let y = if m <= 2 { y + 1 } else { y };
307
308 let h = sod / 3600;
309 let mi = (sod / 60) % 60;
310 let s = sod % 60;
311 format!(
312 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z",
313 y, m, d, h, mi, s, millis
314 )
315}
316
317fn base64_encode(bytes: &[u8]) -> String {
320 const ALPHABET: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
321 let mut out = String::with_capacity(bytes.len().div_ceil(3) * 4);
322 let mut chunks = bytes.chunks_exact(3);
323 for chunk in chunks.by_ref() {
324 let n = ((chunk[0] as u32) << 16) | ((chunk[1] as u32) << 8) | (chunk[2] as u32);
325 out.push(ALPHABET[((n >> 18) & 0x3F) as usize] as char);
326 out.push(ALPHABET[((n >> 12) & 0x3F) as usize] as char);
327 out.push(ALPHABET[((n >> 6) & 0x3F) as usize] as char);
328 out.push(ALPHABET[(n & 0x3F) as usize] as char);
329 }
330 let rem = chunks.remainder();
331 match rem.len() {
332 1 => {
333 let n = (rem[0] as u32) << 16;
334 out.push(ALPHABET[((n >> 18) & 0x3F) as usize] as char);
335 out.push(ALPHABET[((n >> 12) & 0x3F) as usize] as char);
336 out.push('=');
337 out.push('=');
338 }
339 2 => {
340 let n = ((rem[0] as u32) << 16) | ((rem[1] as u32) << 8);
341 out.push(ALPHABET[((n >> 18) & 0x3F) as usize] as char);
342 out.push(ALPHABET[((n >> 12) & 0x3F) as usize] as char);
343 out.push(ALPHABET[((n >> 6) & 0x3F) as usize] as char);
344 out.push('=');
345 }
346 _ => {}
347 }
348 out
349}
350
351pub(crate) fn random_stream_id() -> String {
354 use std::time::{SystemTime, UNIX_EPOCH};
355 static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
358 let lo = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
359 let hi = SystemTime::now()
360 .duration_since(UNIX_EPOCH)
361 .map(|d| d.as_nanos() as u64)
362 .unwrap_or(0);
363 let pid = std::process::id() as u64;
364 format!("{:016x}{:016x}", hi ^ pid, lo)
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370 use std::sync::Arc;
371
372 #[test]
373 fn emits_json_line_per_call() {
374 let buf: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(Vec::new()));
375 struct BufSink(Arc<Mutex<Vec<u8>>>);
376 impl Write for BufSink {
377 fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
378 self.0.lock().unwrap().extend_from_slice(b);
379 Ok(b.len())
380 }
381 fn flush(&mut self) -> std::io::Result<()> {
382 Ok(())
383 }
384 }
385 let hook: Arc<dyn DispatchHook> = AccessLogHook::new(BufSink(buf.clone()), "1.2.3");
386
387 let info = DispatchInfo {
388 method: "echo_string".into(),
389 method_type: "unary",
390 server_id: "srv".into(),
391 request_id: "req-1".into(),
392 transport_metadata: Arc::new(Default::default()),
393 principal: String::new(),
394 auth_domain: String::new(),
395 authenticated: false,
396 protocol: "Test".into(),
397 remote_addr: String::new(),
398 http_status: 0,
399 request_data: Vec::new(),
400 stream_id: String::new(),
401 cancelled: false,
402 claims: std::collections::BTreeMap::new(),
403 protocol_hash: String::new(),
404 protocol_version: String::new(),
405 };
406 let tok = hook.on_dispatch_start(&info);
407 hook.on_dispatch_end(tok, &info, None, &CallStatistics::default());
408
409 let line = String::from_utf8(buf.lock().unwrap().clone()).unwrap();
410 let rec: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
411 assert_eq!(rec["logger"], "vgi_rpc.access");
412 assert_eq!(rec["method"], "echo_string");
413 assert_eq!(rec["server_version"], "1.2.3");
414 assert_eq!(rec["status"], "ok");
415 assert_eq!(rec["authenticated"], false);
416 }
417
418 #[test]
419 fn buffered_writes_via_background_thread() {
420 struct ChanSink(std::sync::mpsc::Sender<Vec<u8>>);
422 impl Write for ChanSink {
423 fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
424 let _ = self.0.send(b.to_vec());
425 Ok(b.len())
426 }
427 fn flush(&mut self) -> std::io::Result<()> {
428 Ok(())
429 }
430 }
431 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
432 let hook: Arc<dyn DispatchHook> = AccessLogHook::buffered(ChanSink(tx), "1.2.3", 128);
433
434 let info = DispatchInfo {
435 method: "echo_string".into(),
436 method_type: "unary",
437 server_id: "srv".into(),
438 request_id: "req-1".into(),
439 transport_metadata: Arc::new(Default::default()),
440 principal: String::new(),
441 auth_domain: String::new(),
442 authenticated: false,
443 protocol: "Test".into(),
444 remote_addr: String::new(),
445 http_status: 0,
446 request_data: Vec::new(),
447 stream_id: String::new(),
448 cancelled: false,
449 claims: std::collections::BTreeMap::new(),
450 protocol_hash: String::new(),
451 protocol_version: String::new(),
452 };
453 let tok = hook.on_dispatch_start(&info);
454 hook.on_dispatch_end(tok, &info, None, &CallStatistics::default());
455
456 let mut acc = Vec::new();
459 while let Ok(chunk) = rx.recv_timeout(std::time::Duration::from_millis(500)) {
460 acc.extend(chunk);
461 if acc.contains(&b'\n') {
462 break;
463 }
464 }
465 let line = String::from_utf8(acc).unwrap();
466 assert!(line.contains("\"method\":\"echo_string\""), "got: {line}");
467 assert!(line.contains("\"server_version\":\"1.2.3\""), "got: {line}");
468 }
469
470 #[test]
471 fn buffered_drops_when_channel_full_instead_of_blocking() {
472 struct ParkingSink;
475 impl Write for ParkingSink {
476 fn write(&mut self, _b: &[u8]) -> std::io::Result<usize> {
477 std::thread::park();
478 Ok(0)
479 }
480 fn flush(&mut self) -> std::io::Result<()> {
481 Ok(())
482 }
483 }
484 let hook = AccessLogHook::buffered(ParkingSink, "1.2.3", 1);
485 let dyn_hook: Arc<dyn DispatchHook> = hook.clone();
486 let info = DispatchInfo {
487 method: "m".into(),
488 method_type: "unary",
489 server_id: "s".into(),
490 request_id: String::new(),
491 transport_metadata: Arc::new(Default::default()),
492 principal: String::new(),
493 auth_domain: String::new(),
494 authenticated: false,
495 protocol: "Test".into(),
496 remote_addr: String::new(),
497 http_status: 0,
498 request_data: Vec::new(),
499 stream_id: String::new(),
500 cancelled: false,
501 claims: std::collections::BTreeMap::new(),
502 protocol_hash: String::new(),
503 protocol_version: String::new(),
504 };
505 for _ in 0..50 {
507 let tok = dyn_hook.on_dispatch_start(&info);
508 dyn_hook.on_dispatch_end(tok, &info, None, &CallStatistics::default());
509 }
510 assert!(
513 hook.dropped_count() > 0,
514 "expected drops on saturated buffered sink, got {}",
515 hook.dropped_count()
516 );
517 }
518
519 #[test]
520 fn error_entries_carry_error_message() {
521 let buf: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(Vec::new()));
522 struct BufSink(Arc<Mutex<Vec<u8>>>);
523 impl Write for BufSink {
524 fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
525 self.0.lock().unwrap().extend_from_slice(b);
526 Ok(b.len())
527 }
528 fn flush(&mut self) -> std::io::Result<()> {
529 Ok(())
530 }
531 }
532 let hook: Arc<dyn DispatchHook> = AccessLogHook::new(BufSink(buf.clone()), "1.2.3");
533 let info = DispatchInfo {
534 method: "raise_value_error".into(),
535 method_type: "unary",
536 server_id: "srv".into(),
537 request_id: String::new(),
538 transport_metadata: Arc::new(Default::default()),
539 principal: String::new(),
540 auth_domain: String::new(),
541 authenticated: false,
542 protocol: "Test".into(),
543 remote_addr: String::new(),
544 http_status: 0,
545 request_data: Vec::new(),
546 stream_id: String::new(),
547 cancelled: false,
548 claims: std::collections::BTreeMap::new(),
549 protocol_hash: String::new(),
550 protocol_version: String::new(),
551 };
552 let tok = hook.on_dispatch_start(&info);
553 let err = RpcError::value_error("boom");
554 hook.on_dispatch_end(tok, &info, Some(&err), &CallStatistics::default());
555 let line = String::from_utf8(buf.lock().unwrap().clone()).unwrap();
556 let rec: serde_json::Value = serde_json::from_str(line.trim()).unwrap();
557 assert_eq!(rec["status"], "error");
558 assert_eq!(rec["error_type"], "ValueError");
559 assert_eq!(rec["error_message"], "boom");
560 }
561}