Skip to main content

http_nu/
logging.rs

1use std::collections::HashMap;
2use std::io::{self, Write};
3use std::pin::Pin;
4use std::sync::OnceLock;
5use std::task::{Context, Poll};
6use std::time::Instant;
7
8use chrono::Local;
9use crossterm::{cursor, execute, terminal};
10use hyper::body::{Body, Bytes, Frame, SizeHint};
11use hyper::header::HeaderMap;
12use serde::Serialize;
13use tokio::sync::broadcast;
14
15use crate::request::Request;
16
17type BoxError = Box<dyn std::error::Error + Send + Sync>;
18
19// --- Token bucket rate limiter ---
20
21struct TokenBucket {
22    tokens: f64,
23    capacity: f64,
24    refill_rate: f64,
25    last_refill: Instant,
26}
27
28impl TokenBucket {
29    fn new(capacity: f64, refill_rate: f64, now: Instant) -> Self {
30        Self {
31            tokens: capacity,
32            capacity,
33            refill_rate,
34            last_refill: now,
35        }
36    }
37
38    fn try_consume(&mut self, now: Instant) -> bool {
39        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
40        self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.capacity);
41        self.last_refill = now;
42
43        if self.tokens >= 1.0 {
44            self.tokens -= 1.0;
45            true
46        } else {
47            false
48        }
49    }
50}
51
52// --- Event enum: owned data for async broadcast ---
53
54#[derive(Clone, Serialize)]
55pub struct RequestData {
56    pub proto: String,
57    pub method: String,
58    pub remote_ip: Option<String>,
59    pub remote_port: Option<u16>,
60    pub trusted_ip: Option<String>,
61    pub headers: HashMap<String, String>,
62    pub uri: String,
63    pub path: String,
64    pub query: HashMap<String, String>,
65}
66
67impl From<&Request> for RequestData {
68    fn from(req: &Request) -> Self {
69        Self {
70            proto: req.proto.to_string(),
71            method: req.method.to_string(),
72            remote_ip: req.remote_ip.map(|ip| ip.to_string()),
73            remote_port: req.remote_port,
74            trusted_ip: req.trusted_ip.map(|ip| ip.to_string()),
75            headers: req
76                .headers
77                .iter()
78                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
79                .collect(),
80            uri: req.uri.to_string(),
81            path: req.path.clone(),
82            query: req.query.clone(),
83        }
84    }
85}
86
87#[derive(Clone)]
88pub enum Event {
89    // Access events
90    Request {
91        request_id: scru128::Scru128Id,
92        request: Box<RequestData>,
93    },
94    Response {
95        request_id: scru128::Scru128Id,
96        status: u16,
97        headers: HashMap<String, String>,
98        latency_ms: u64,
99    },
100    Complete {
101        request_id: scru128::Scru128Id,
102        bytes: u64,
103        duration_ms: u64,
104    },
105
106    // Lifecycle events
107    Started {
108        address: String,
109        startup_ms: u64,
110    },
111    Reloaded,
112    Error {
113        error: String,
114    },
115    Print {
116        message: String,
117    },
118    Stopping {
119        inflight: usize,
120    },
121    Stopped,
122    StopTimedOut,
123    Shutdown,
124}
125
126// --- Broadcast channel ---
127
128static SENDER: OnceLock<broadcast::Sender<Event>> = OnceLock::new();
129
130pub fn init_broadcast() -> broadcast::Receiver<Event> {
131    let (tx, rx) = broadcast::channel(65536);
132    let _ = SENDER.set(tx);
133    rx
134}
135
136pub fn subscribe() -> Option<broadcast::Receiver<Event>> {
137    SENDER.get().map(|tx| tx.subscribe())
138}
139
140fn emit(event: Event) {
141    if let Some(tx) = SENDER.get() {
142        let _ = tx.send(event); // non-blocking, drops if no receivers
143    }
144}
145
146// --- Public emit functions ---
147
148pub fn log_request(request_id: scru128::Scru128Id, request: &Request) {
149    emit(Event::Request {
150        request_id,
151        request: Box::new(RequestData::from(request)),
152    });
153}
154
155pub fn log_response(
156    request_id: scru128::Scru128Id,
157    status: u16,
158    headers: &HeaderMap,
159    start_time: Instant,
160) {
161    let headers_map: HashMap<String, String> = headers
162        .iter()
163        .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
164        .collect();
165
166    emit(Event::Response {
167        request_id,
168        status,
169        headers: headers_map,
170        latency_ms: start_time.elapsed().as_millis() as u64,
171    });
172}
173
174pub fn log_complete(request_id: scru128::Scru128Id, bytes: u64, response_time: Instant) {
175    emit(Event::Complete {
176        request_id,
177        bytes,
178        duration_ms: response_time.elapsed().as_millis() as u64,
179    });
180}
181
182pub fn log_started(address: &str, startup_ms: u128) {
183    emit(Event::Started {
184        address: address.to_string(),
185        startup_ms: startup_ms as u64,
186    });
187}
188
189pub fn log_reloaded() {
190    emit(Event::Reloaded);
191}
192
193pub fn log_error(error: &str) {
194    emit(Event::Error {
195        error: error.to_string(),
196    });
197}
198
199pub fn log_print(message: &str) {
200    emit(Event::Print {
201        message: message.to_string(),
202    });
203}
204
205pub fn log_stopping(inflight: usize) {
206    emit(Event::Stopping { inflight });
207}
208
209pub fn log_stopped() {
210    emit(Event::Stopped);
211}
212
213pub fn log_stop_timed_out() {
214    emit(Event::StopTimedOut);
215}
216
217pub fn shutdown() {
218    emit(Event::Shutdown);
219}
220
221// --- JSONL handler (dedicated writer thread does serialization + IO) ---
222
223pub fn run_jsonl_handler(rx: broadcast::Receiver<Event>) -> std::thread::JoinHandle<()> {
224    use std::io::Write;
225
226    std::thread::spawn(move || {
227        let mut rx = rx;
228        let mut stdout = std::io::BufWriter::new(std::io::stdout().lock());
229
230        loop {
231            let event = match rx.blocking_recv() {
232                Ok(event) => event,
233                Err(broadcast::error::RecvError::Lagged(n)) => {
234                    let json = serde_json::json!({
235                        "stamp": scru128::new().to_string(),
236                        "message": "lagged",
237                        "dropped": n,
238                    });
239                    if let Ok(line) = serde_json::to_string(&json) {
240                        let _ = writeln!(stdout, "{line}");
241                        let _ = stdout.flush();
242                    }
243                    continue;
244                }
245                Err(broadcast::error::RecvError::Closed) => break,
246            };
247
248            if matches!(event, Event::Shutdown) {
249                let _ = stdout.flush();
250                break;
251            }
252
253            let needs_flush = matches!(
254                &event,
255                Event::Started { .. }
256                    | Event::Stopped
257                    | Event::StopTimedOut
258                    | Event::Reloaded
259                    | Event::Error { .. }
260                    | Event::Print { .. }
261            );
262
263            let stamp = scru128::new().to_string();
264
265            let json = match event {
266                Event::Request {
267                    request_id,
268                    request,
269                } => {
270                    serde_json::json!({
271                        "stamp": stamp,
272                        "message": "request",
273                        "request_id": request_id.to_string(),
274                        "method": &request.method,
275                        "path": &request.path,
276                        "trusted_ip": &request.trusted_ip,
277                        "request": request,
278                    })
279                }
280                Event::Response {
281                    request_id,
282                    status,
283                    headers,
284                    latency_ms,
285                } => {
286                    serde_json::json!({
287                        "stamp": stamp,
288                        "message": "response",
289                        "request_id": request_id.to_string(),
290                        "status": status,
291                        "headers": headers,
292                        "latency_ms": latency_ms,
293                    })
294                }
295                Event::Complete {
296                    request_id,
297                    bytes,
298                    duration_ms,
299                } => {
300                    serde_json::json!({
301                        "stamp": stamp,
302                        "message": "complete",
303                        "request_id": request_id.to_string(),
304                        "bytes": bytes,
305                        "duration_ms": duration_ms,
306                    })
307                }
308                Event::Started {
309                    address,
310                    startup_ms,
311                } => {
312                    serde_json::json!({
313                        "stamp": stamp,
314                        "message": "started",
315                        "address": address,
316                        "startup_ms": startup_ms,
317                    })
318                }
319                Event::Reloaded => {
320                    serde_json::json!({
321                        "stamp": stamp,
322                        "message": "reloaded",
323                    })
324                }
325                Event::Error { error } => {
326                    serde_json::json!({
327                        "stamp": stamp,
328                        "message": "error",
329                        "error": error,
330                    })
331                }
332                Event::Print { message } => {
333                    serde_json::json!({
334                        "stamp": stamp,
335                        "message": "print",
336                        "content": message,
337                    })
338                }
339                Event::Stopping { inflight } => {
340                    serde_json::json!({
341                        "stamp": stamp,
342                        "message": "stopping",
343                        "inflight": inflight,
344                    })
345                }
346                Event::Stopped => {
347                    serde_json::json!({
348                        "stamp": stamp,
349                        "message": "stopped",
350                    })
351                }
352                Event::StopTimedOut => {
353                    serde_json::json!({
354                        "stamp": stamp,
355                        "message": "stop_timed_out",
356                    })
357                }
358                Event::Shutdown => unreachable!(),
359            };
360
361            if let Ok(line) = serde_json::to_string(&json) {
362                let _ = writeln!(stdout, "{line}");
363            }
364
365            // Flush if lifecycle event or channel is empty (idle)
366            if needs_flush || rx.is_empty() {
367                let _ = stdout.flush();
368            }
369        }
370
371        let _ = stdout.flush();
372    })
373}
374
375// --- Human-readable handler (dedicated thread) ---
376
377struct RequestState {
378    method: String,
379    path: String,
380    trusted_ip: Option<String>,
381    start_time: Instant,
382    status: Option<u16>,
383    latency_ms: Option<u64>,
384}
385
386fn truncate_middle(s: &str, max_len: usize) -> String {
387    if s.len() <= max_len {
388        return s.to_string();
389    }
390    let keep = (max_len - 3) / 2;
391    format!("{}...{}", &s[..keep], &s[s.len() - keep..])
392}
393
394struct ActiveZone {
395    stdout: io::Stdout,
396    line_count: usize,
397}
398
399impl ActiveZone {
400    fn new() -> Self {
401        Self {
402            stdout: io::stdout(),
403            line_count: 0,
404        }
405    }
406
407    /// Clear the active zone and move cursor back to start
408    fn clear(&mut self) {
409        if self.line_count > 0 {
410            let _ = execute!(
411                self.stdout,
412                cursor::MoveUp(self.line_count as u16),
413                terminal::Clear(terminal::ClearType::FromCursorDown)
414            );
415            self.line_count = 0;
416        }
417    }
418
419    /// Print a permanent line (scrolls up, not part of active zone)
420    fn print_permanent(&mut self, line: &str) {
421        self.clear();
422        println!("{line}");
423        let _ = self.stdout.flush();
424    }
425
426    /// Redraw all active requests
427    fn redraw(&mut self, active_ids: &[String], requests: &HashMap<String, RequestState>) {
428        self.line_count = 0;
429        if !active_ids.is_empty() {
430            println!("· · ·  ✈ in flight  · · ·");
431            self.line_count += 1;
432            for id in active_ids {
433                if let Some(state) = requests.get(id) {
434                    let line = format_active_line(state);
435                    println!("{line}");
436                    self.line_count += 1;
437                }
438            }
439        }
440        let _ = self.stdout.flush();
441    }
442}
443
444fn format_active_line(state: &RequestState) -> String {
445    let timestamp = Local::now().format("%H:%M:%S%.3f");
446    let ip = state.trusted_ip.as_deref().unwrap_or("-");
447    let method = &state.method;
448    let path = truncate_middle(&state.path, 40);
449    let elapsed = state.start_time.elapsed().as_secs_f64();
450
451    match (state.status, state.latency_ms) {
452        (Some(status), Some(latency)) => {
453            format!(
454                "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {elapsed:>6.1}s"
455            )
456        }
457        _ => {
458            format!("{timestamp} {ip:>15} {method:<6} {path:<40} ... {elapsed:>6.1}s")
459        }
460    }
461}
462
463fn format_complete_line(state: &RequestState, duration_ms: u64, bytes: u64) -> String {
464    let timestamp = Local::now().format("%H:%M:%S%.3f");
465    let ip = state.trusted_ip.as_deref().unwrap_or("-");
466    let method = &state.method;
467    let path = truncate_middle(&state.path, 40);
468    let status = state.status.unwrap_or(0);
469    let latency = state.latency_ms.unwrap_or(0);
470
471    format!(
472        "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {duration_ms:>6}ms {bytes:>8}b"
473    )
474}
475
476pub fn run_human_handler(rx: broadcast::Receiver<Event>) -> std::thread::JoinHandle<()> {
477    std::thread::spawn(move || {
478        let mut rx = rx;
479        let mut zone = ActiveZone::new();
480        let mut requests: HashMap<String, RequestState> = HashMap::new();
481        let mut active_ids: Vec<String> = Vec::new();
482
483        // Rate limiting: token bucket (burst 40, refill 20/sec)
484        let mut rate_limiter = TokenBucket::new(40.0, 20.0, Instant::now());
485        let mut skipped: u64 = 0;
486        let mut lagged: u64 = 0;
487
488        loop {
489            let event = match rx.blocking_recv() {
490                Ok(event) => event,
491                Err(broadcast::error::RecvError::Lagged(n)) => {
492                    lagged += n;
493                    // Clear all pending - their Response/Complete events may have been dropped
494                    requests.clear();
495                    active_ids.clear();
496                    zone.print_permanent(&format!(
497                        "⚠ lagged: dropped {n} events, cleared in-flight"
498                    ));
499                    continue;
500                }
501                Err(broadcast::error::RecvError::Closed) => break,
502            };
503            match event {
504                Event::Started {
505                    address,
506                    startup_ms,
507                } => {
508                    let version = env!("CARGO_PKG_VERSION");
509                    let pid = std::process::id();
510                    let now = Local::now().to_rfc2822();
511                    zone.print_permanent(&format!("<http-nu version=\"{version}\">"));
512                    zone.print_permanent("     __  ,");
513                    zone.print_permanent(&format!(
514                        " .--()°'.'  pid {pid} · {address} · {startup_ms}ms 💜"
515                    ));
516                    zone.print_permanent(&format!("'|, . ,'    {now}"));
517                    zone.print_permanent(" !_-(_\\");
518                    zone.redraw(&active_ids, &requests);
519                }
520                Event::Reloaded => {
521                    zone.print_permanent("reloaded 🔄");
522                    zone.redraw(&active_ids, &requests);
523                }
524                Event::Error { error } => {
525                    zone.clear();
526                    eprintln!("ERROR: {error}");
527                    zone.redraw(&active_ids, &requests);
528                }
529                Event::Print { message } => {
530                    zone.print_permanent(&format!("PRINT: {message}"));
531                    zone.redraw(&active_ids, &requests);
532                }
533                Event::Stopping { inflight } => {
534                    zone.print_permanent(&format!(
535                        "stopping, {inflight} connection(s) in flight..."
536                    ));
537                    zone.redraw(&active_ids, &requests);
538                }
539                Event::Stopped => {
540                    let timestamp = Local::now().format("%H:%M:%S%.3f");
541                    zone.print_permanent(&format!("{timestamp} cu l8r"));
542                    zone.clear();
543                    println!("</http-nu>");
544                }
545                Event::StopTimedOut => {
546                    zone.print_permanent("stop timed out, forcing exit");
547                }
548                Event::Request {
549                    request_id,
550                    request,
551                } => {
552                    if !rate_limiter.try_consume(Instant::now()) {
553                        skipped += 1;
554                        continue;
555                    }
556
557                    // Print skip summary if any
558                    if skipped > 0 {
559                        zone.print_permanent(&format!("... skipped {skipped} requests"));
560                        skipped = 0;
561                    }
562
563                    let id = request_id.to_string();
564                    let state = RequestState {
565                        method: request.method.clone(),
566                        path: request.path.clone(),
567                        trusted_ip: request.trusted_ip.clone(),
568                        start_time: Instant::now(),
569                        status: None,
570                        latency_ms: None,
571                    };
572                    requests.insert(id.clone(), state);
573                    active_ids.push(id);
574                    zone.clear();
575                    zone.redraw(&active_ids, &requests);
576                }
577                Event::Response {
578                    request_id,
579                    status,
580                    latency_ms,
581                    ..
582                } => {
583                    let id = request_id.to_string();
584                    if let Some(state) = requests.get_mut(&id) {
585                        state.status = Some(status);
586                        state.latency_ms = Some(latency_ms);
587                        zone.clear();
588                        zone.redraw(&active_ids, &requests);
589                    }
590                }
591                Event::Complete {
592                    request_id,
593                    bytes,
594                    duration_ms,
595                } => {
596                    let id = request_id.to_string();
597                    if let Some(state) = requests.remove(&id) {
598                        active_ids.retain(|x| x != &id);
599                        let line = format_complete_line(&state, duration_ms, bytes);
600                        zone.print_permanent(&line);
601                        zone.redraw(&active_ids, &requests);
602                    }
603                }
604                Event::Shutdown => break,
605            }
606        }
607
608        // Final summaries
609        zone.clear();
610        if skipped > 0 {
611            println!("... skipped {skipped} requests");
612        }
613        if lagged > 0 {
614            println!("⚠ total lagged: {lagged} events dropped");
615        }
616    })
617}
618
619// --- RequestGuard: ensures Complete fires even on abort ---
620
621pub struct RequestGuard {
622    request_id: scru128::Scru128Id,
623    start: Instant,
624    bytes_sent: u64,
625}
626
627impl RequestGuard {
628    pub fn new(request_id: scru128::Scru128Id) -> Self {
629        Self {
630            request_id,
631            start: Instant::now(),
632            bytes_sent: 0,
633        }
634    }
635
636    pub fn request_id(&self) -> scru128::Scru128Id {
637        self.request_id
638    }
639}
640
641impl Drop for RequestGuard {
642    fn drop(&mut self) {
643        log_complete(self.request_id, self.bytes_sent, self.start);
644    }
645}
646
647// --- LoggingBody wrapper ---
648
649pub struct LoggingBody<B> {
650    inner: B,
651    guard: RequestGuard,
652}
653
654impl<B> LoggingBody<B> {
655    pub fn new(inner: B, guard: RequestGuard) -> Self {
656        Self { inner, guard }
657    }
658}
659
660impl<B> Body for LoggingBody<B>
661where
662    B: Body<Data = Bytes, Error = BoxError> + Unpin,
663{
664    type Data = Bytes;
665    type Error = BoxError;
666
667    fn poll_frame(
668        mut self: Pin<&mut Self>,
669        cx: &mut Context<'_>,
670    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
671        let inner = Pin::new(&mut self.inner);
672        match inner.poll_frame(cx) {
673            Poll::Ready(Some(Ok(frame))) => {
674                if let Some(data) = frame.data_ref() {
675                    self.guard.bytes_sent += data.len() as u64;
676                }
677                Poll::Ready(Some(Ok(frame)))
678            }
679            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
680            Poll::Ready(None) => Poll::Ready(None),
681            Poll::Pending => Poll::Pending,
682        }
683    }
684
685    fn is_end_stream(&self) -> bool {
686        self.inner.is_end_stream()
687    }
688
689    fn size_hint(&self) -> SizeHint {
690        self.inner.size_hint()
691    }
692}
693
694// No Drop impl needed - guard's Drop fires Complete
695
696#[cfg(test)]
697mod tests {
698    use super::*;
699    use std::time::Duration;
700
701    #[test]
702    fn token_bucket_allows_burst() {
703        let start = Instant::now();
704        let mut bucket = TokenBucket::new(40.0, 20.0, start);
705
706        // Should allow 40 requests immediately
707        for _ in 0..40 {
708            assert!(bucket.try_consume(start));
709        }
710        // 41st should fail
711        assert!(!bucket.try_consume(start));
712    }
713
714    #[test]
715    fn token_bucket_refills_over_time() {
716        let start = Instant::now();
717        let mut bucket = TokenBucket::new(40.0, 20.0, start);
718
719        // Drain the bucket
720        for _ in 0..40 {
721            bucket.try_consume(start);
722        }
723        assert!(!bucket.try_consume(start));
724
725        // After 100ms, should have 2 tokens (20/sec * 0.1s)
726        let later = start + Duration::from_millis(100);
727        assert!(bucket.try_consume(later));
728        assert!(bucket.try_consume(later));
729        assert!(!bucket.try_consume(later));
730    }
731
732    #[test]
733    fn token_bucket_caps_at_capacity() {
734        let start = Instant::now();
735        let mut bucket = TokenBucket::new(40.0, 20.0, start);
736
737        // Wait a long time - should still cap at 40
738        let later = start + Duration::from_secs(10);
739        for _ in 0..40 {
740            assert!(bucket.try_consume(later));
741        }
742        assert!(!bucket.try_consume(later));
743    }
744}