http_nu/
logging.rs

1use std::collections::HashMap;
2use std::io::{self, Write};
3use std::pin::Pin;
4use std::sync::OnceLock;
5use std::task::{Context, Poll};
6use std::time::Instant;
7
8use chrono::Local;
9use crossterm::{cursor, execute, terminal};
10use hyper::body::{Body, Bytes, Frame, SizeHint};
11use hyper::header::HeaderMap;
12use serde::Serialize;
13use tokio::sync::broadcast;
14
15use crate::request::Request;
16
17type BoxError = Box<dyn std::error::Error + Send + Sync>;
18
19// --- Token bucket rate limiter ---
20
21struct TokenBucket {
22    tokens: f64,
23    capacity: f64,
24    refill_rate: f64,
25    last_refill: Instant,
26}
27
28impl TokenBucket {
29    fn new(capacity: f64, refill_rate: f64, now: Instant) -> Self {
30        Self {
31            tokens: capacity,
32            capacity,
33            refill_rate,
34            last_refill: now,
35        }
36    }
37
38    fn try_consume(&mut self, now: Instant) -> bool {
39        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
40        self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.capacity);
41        self.last_refill = now;
42
43        if self.tokens >= 1.0 {
44            self.tokens -= 1.0;
45            true
46        } else {
47            false
48        }
49    }
50}
51
52// --- Event enum: owned data for async broadcast ---
53
54#[derive(Clone, Serialize)]
55pub struct RequestData {
56    pub proto: String,
57    pub method: String,
58    pub remote_ip: Option<String>,
59    pub remote_port: Option<u16>,
60    pub trusted_ip: Option<String>,
61    pub headers: HashMap<String, String>,
62    pub uri: String,
63    pub path: String,
64    pub query: HashMap<String, String>,
65}
66
67impl From<&Request> for RequestData {
68    fn from(req: &Request) -> Self {
69        Self {
70            proto: req.proto.to_string(),
71            method: req.method.to_string(),
72            remote_ip: req.remote_ip.map(|ip| ip.to_string()),
73            remote_port: req.remote_port,
74            trusted_ip: req.trusted_ip.map(|ip| ip.to_string()),
75            headers: req
76                .headers
77                .iter()
78                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
79                .collect(),
80            uri: req.uri.to_string(),
81            path: req.path.clone(),
82            query: req.query.clone(),
83        }
84    }
85}
86
87#[derive(Clone)]
88pub enum Event {
89    // Access events
90    Request {
91        request_id: scru128::Scru128Id,
92        request: Box<RequestData>,
93    },
94    Response {
95        request_id: scru128::Scru128Id,
96        status: u16,
97        headers: HashMap<String, String>,
98        latency_ms: u64,
99    },
100    Complete {
101        request_id: scru128::Scru128Id,
102        bytes: u64,
103        duration_ms: u64,
104    },
105
106    // Lifecycle events
107    Started {
108        address: String,
109        startup_ms: u64,
110    },
111    Reloaded,
112    Error {
113        error: String,
114    },
115    Stopping {
116        inflight: usize,
117    },
118    Stopped,
119    StopTimedOut,
120}
121
122// --- Broadcast channel ---
123
124static SENDER: OnceLock<broadcast::Sender<Event>> = OnceLock::new();
125
126pub fn init_broadcast() -> broadcast::Receiver<Event> {
127    let (tx, rx) = broadcast::channel(65536);
128    let _ = SENDER.set(tx);
129    rx
130}
131
132pub fn subscribe() -> Option<broadcast::Receiver<Event>> {
133    SENDER.get().map(|tx| tx.subscribe())
134}
135
136fn emit(event: Event) {
137    if let Some(tx) = SENDER.get() {
138        let _ = tx.send(event); // non-blocking, drops if no receivers
139    }
140}
141
142// --- Public emit functions ---
143
144pub fn log_request(request_id: scru128::Scru128Id, request: &Request) {
145    emit(Event::Request {
146        request_id,
147        request: Box::new(RequestData::from(request)),
148    });
149}
150
151pub fn log_response(
152    request_id: scru128::Scru128Id,
153    status: u16,
154    headers: &HeaderMap,
155    start_time: Instant,
156) {
157    let headers_map: HashMap<String, String> = headers
158        .iter()
159        .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
160        .collect();
161
162    emit(Event::Response {
163        request_id,
164        status,
165        headers: headers_map,
166        latency_ms: start_time.elapsed().as_millis() as u64,
167    });
168}
169
170pub fn log_complete(request_id: scru128::Scru128Id, bytes: u64, response_time: Instant) {
171    emit(Event::Complete {
172        request_id,
173        bytes,
174        duration_ms: response_time.elapsed().as_millis() as u64,
175    });
176}
177
178pub fn log_started(address: &str, startup_ms: u128) {
179    emit(Event::Started {
180        address: address.to_string(),
181        startup_ms: startup_ms as u64,
182    });
183}
184
185pub fn log_reloaded() {
186    emit(Event::Reloaded);
187}
188
189pub fn log_error(error: &str) {
190    emit(Event::Error {
191        error: error.to_string(),
192    });
193}
194
195pub fn log_stopping(inflight: usize) {
196    emit(Event::Stopping { inflight });
197}
198
199pub fn log_stopped() {
200    emit(Event::Stopped);
201}
202
203pub fn log_stop_timed_out() {
204    emit(Event::StopTimedOut);
205}
206
207// --- JSONL handler (dedicated writer thread does serialization + IO) ---
208
209pub fn run_jsonl_handler(rx: broadcast::Receiver<Event>) {
210    use std::io::Write;
211
212    std::thread::spawn(move || {
213        let mut rx = rx;
214        let mut stdout = std::io::BufWriter::new(std::io::stdout().lock());
215
216        loop {
217            let event = match rx.blocking_recv() {
218                Ok(event) => event,
219                Err(broadcast::error::RecvError::Lagged(n)) => {
220                    let json = serde_json::json!({
221                        "stamp": scru128::new().to_string(),
222                        "message": "lagged",
223                        "dropped": n,
224                    });
225                    if let Ok(line) = serde_json::to_string(&json) {
226                        let _ = writeln!(stdout, "{line}");
227                        let _ = stdout.flush();
228                    }
229                    continue;
230                }
231                Err(broadcast::error::RecvError::Closed) => break,
232            };
233
234            let needs_flush = matches!(
235                &event,
236                Event::Started { .. }
237                    | Event::Stopped
238                    | Event::StopTimedOut
239                    | Event::Reloaded
240                    | Event::Error { .. }
241            );
242
243            let stamp = scru128::new().to_string();
244
245            let json = match event {
246                Event::Request {
247                    request_id,
248                    request,
249                } => {
250                    serde_json::json!({
251                        "stamp": stamp,
252                        "message": "request",
253                        "request_id": request_id.to_string(),
254                        "method": &request.method,
255                        "path": &request.path,
256                        "trusted_ip": &request.trusted_ip,
257                        "request": request,
258                    })
259                }
260                Event::Response {
261                    request_id,
262                    status,
263                    headers,
264                    latency_ms,
265                } => {
266                    serde_json::json!({
267                        "stamp": stamp,
268                        "message": "response",
269                        "request_id": request_id.to_string(),
270                        "status": status,
271                        "headers": headers,
272                        "latency_ms": latency_ms,
273                    })
274                }
275                Event::Complete {
276                    request_id,
277                    bytes,
278                    duration_ms,
279                } => {
280                    serde_json::json!({
281                        "stamp": stamp,
282                        "message": "complete",
283                        "request_id": request_id.to_string(),
284                        "bytes": bytes,
285                        "duration_ms": duration_ms,
286                    })
287                }
288                Event::Started {
289                    address,
290                    startup_ms,
291                } => {
292                    serde_json::json!({
293                        "stamp": stamp,
294                        "message": "started",
295                        "address": address,
296                        "startup_ms": startup_ms,
297                    })
298                }
299                Event::Reloaded => {
300                    serde_json::json!({
301                        "stamp": stamp,
302                        "message": "reloaded",
303                    })
304                }
305                Event::Error { error } => {
306                    serde_json::json!({
307                        "stamp": stamp,
308                        "message": "error",
309                        "error": error,
310                    })
311                }
312                Event::Stopping { inflight } => {
313                    serde_json::json!({
314                        "stamp": stamp,
315                        "message": "stopping",
316                        "inflight": inflight,
317                    })
318                }
319                Event::Stopped => {
320                    serde_json::json!({
321                        "stamp": stamp,
322                        "message": "stopped",
323                    })
324                }
325                Event::StopTimedOut => {
326                    serde_json::json!({
327                        "stamp": stamp,
328                        "message": "stop_timed_out",
329                    })
330                }
331            };
332
333            if let Ok(line) = serde_json::to_string(&json) {
334                let _ = writeln!(stdout, "{line}");
335            }
336
337            // Flush if lifecycle event or channel is empty (idle)
338            if needs_flush || rx.is_empty() {
339                let _ = stdout.flush();
340            }
341        }
342
343        let _ = stdout.flush();
344    });
345}
346
347// --- Human-readable handler (dedicated thread) ---
348
349struct RequestState {
350    method: String,
351    path: String,
352    trusted_ip: Option<String>,
353    start_time: Instant,
354    status: Option<u16>,
355    latency_ms: Option<u64>,
356}
357
358fn truncate_middle(s: &str, max_len: usize) -> String {
359    if s.len() <= max_len {
360        return s.to_string();
361    }
362    let keep = (max_len - 3) / 2;
363    format!("{}...{}", &s[..keep], &s[s.len() - keep..])
364}
365
366struct ActiveZone {
367    stdout: io::Stdout,
368    line_count: usize,
369}
370
371impl ActiveZone {
372    fn new() -> Self {
373        Self {
374            stdout: io::stdout(),
375            line_count: 0,
376        }
377    }
378
379    /// Clear the active zone and move cursor back to start
380    fn clear(&mut self) {
381        if self.line_count > 0 {
382            let _ = execute!(
383                self.stdout,
384                cursor::MoveUp(self.line_count as u16),
385                terminal::Clear(terminal::ClearType::FromCursorDown)
386            );
387            self.line_count = 0;
388        }
389    }
390
391    /// Print a permanent line (scrolls up, not part of active zone)
392    fn print_permanent(&mut self, line: &str) {
393        self.clear();
394        println!("{line}");
395        let _ = self.stdout.flush();
396    }
397
398    /// Redraw all active requests
399    fn redraw(&mut self, active_ids: &[String], requests: &HashMap<String, RequestState>) {
400        self.line_count = 0;
401        if !active_ids.is_empty() {
402            println!("· · ·  ✈ in flight  · · ·");
403            self.line_count += 1;
404            for id in active_ids {
405                if let Some(state) = requests.get(id) {
406                    let line = format_active_line(state);
407                    println!("{line}");
408                    self.line_count += 1;
409                }
410            }
411        }
412        let _ = self.stdout.flush();
413    }
414}
415
416fn format_active_line(state: &RequestState) -> String {
417    let timestamp = Local::now().format("%H:%M:%S%.3f");
418    let ip = state.trusted_ip.as_deref().unwrap_or("-");
419    let method = &state.method;
420    let path = truncate_middle(&state.path, 40);
421    let elapsed = state.start_time.elapsed().as_secs_f64();
422
423    match (state.status, state.latency_ms) {
424        (Some(status), Some(latency)) => {
425            format!(
426                "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {elapsed:>6.1}s"
427            )
428        }
429        _ => {
430            format!("{timestamp} {ip:>15} {method:<6} {path:<40} ... {elapsed:>6.1}s")
431        }
432    }
433}
434
435fn format_complete_line(state: &RequestState, duration_ms: u64, bytes: u64) -> String {
436    let timestamp = Local::now().format("%H:%M:%S%.3f");
437    let ip = state.trusted_ip.as_deref().unwrap_or("-");
438    let method = &state.method;
439    let path = truncate_middle(&state.path, 40);
440    let status = state.status.unwrap_or(0);
441    let latency = state.latency_ms.unwrap_or(0);
442
443    format!(
444        "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {duration_ms:>6}ms {bytes:>8}b"
445    )
446}
447
448pub fn run_human_handler(rx: broadcast::Receiver<Event>) {
449    std::thread::spawn(move || {
450        let mut rx = rx;
451        let mut zone = ActiveZone::new();
452        let mut requests: HashMap<String, RequestState> = HashMap::new();
453        let mut active_ids: Vec<String> = Vec::new();
454
455        // Rate limiting: token bucket (burst 40, refill 20/sec)
456        let mut rate_limiter = TokenBucket::new(40.0, 20.0, Instant::now());
457        let mut skipped: u64 = 0;
458        let mut lagged: u64 = 0;
459
460        loop {
461            let event = match rx.blocking_recv() {
462                Ok(event) => event,
463                Err(broadcast::error::RecvError::Lagged(n)) => {
464                    lagged += n;
465                    // Clear all pending - their Response/Complete events may have been dropped
466                    requests.clear();
467                    active_ids.clear();
468                    zone.print_permanent(&format!(
469                        "⚠ lagged: dropped {n} events, cleared in-flight"
470                    ));
471                    continue;
472                }
473                Err(broadcast::error::RecvError::Closed) => break,
474            };
475            match event {
476                Event::Started {
477                    address,
478                    startup_ms,
479                } => {
480                    let version = env!("CARGO_PKG_VERSION");
481                    let pid = std::process::id();
482                    let now = Local::now().to_rfc2822();
483                    zone.print_permanent(&format!("<http-nu version=\"{version}\">"));
484                    zone.print_permanent("     __  ,");
485                    zone.print_permanent(&format!(
486                        " .--()°'.'  pid {pid} · {address} · {startup_ms}ms 💜"
487                    ));
488                    zone.print_permanent(&format!("'|, . ,'    {now}"));
489                    zone.print_permanent(" !_-(_\\");
490                    zone.redraw(&active_ids, &requests);
491                }
492                Event::Reloaded => {
493                    zone.print_permanent("reloaded 🔄");
494                    zone.redraw(&active_ids, &requests);
495                }
496                Event::Error { error } => {
497                    zone.clear();
498                    eprintln!("ERROR: {error}");
499                    zone.redraw(&active_ids, &requests);
500                }
501                Event::Stopping { inflight } => {
502                    zone.print_permanent(&format!(
503                        "stopping, {inflight} connection(s) in flight..."
504                    ));
505                    zone.redraw(&active_ids, &requests);
506                }
507                Event::Stopped => {
508                    zone.clear();
509                    println!("cu l8r </http-nu>");
510                }
511                Event::StopTimedOut => {
512                    zone.print_permanent("stop timed out, forcing exit");
513                }
514                Event::Request {
515                    request_id,
516                    request,
517                } => {
518                    if !rate_limiter.try_consume(Instant::now()) {
519                        skipped += 1;
520                        continue;
521                    }
522
523                    // Print skip summary if any
524                    if skipped > 0 {
525                        zone.print_permanent(&format!("... skipped {skipped} requests"));
526                        skipped = 0;
527                    }
528
529                    let id = request_id.to_string();
530                    let state = RequestState {
531                        method: request.method.clone(),
532                        path: request.path.clone(),
533                        trusted_ip: request.trusted_ip.clone(),
534                        start_time: Instant::now(),
535                        status: None,
536                        latency_ms: None,
537                    };
538                    requests.insert(id.clone(), state);
539                    active_ids.push(id);
540                    zone.clear();
541                    zone.redraw(&active_ids, &requests);
542                }
543                Event::Response {
544                    request_id,
545                    status,
546                    latency_ms,
547                    ..
548                } => {
549                    let id = request_id.to_string();
550                    if let Some(state) = requests.get_mut(&id) {
551                        state.status = Some(status);
552                        state.latency_ms = Some(latency_ms);
553                        zone.clear();
554                        zone.redraw(&active_ids, &requests);
555                    }
556                }
557                Event::Complete {
558                    request_id,
559                    bytes,
560                    duration_ms,
561                } => {
562                    let id = request_id.to_string();
563                    if let Some(state) = requests.remove(&id) {
564                        active_ids.retain(|x| x != &id);
565                        let line = format_complete_line(&state, duration_ms, bytes);
566                        zone.print_permanent(&line);
567                        zone.redraw(&active_ids, &requests);
568                    }
569                }
570            }
571        }
572
573        // Final summaries
574        zone.clear();
575        if skipped > 0 {
576            println!("... skipped {skipped} requests");
577        }
578        if lagged > 0 {
579            println!("⚠ total lagged: {lagged} events dropped");
580        }
581    });
582}
583
584// --- RequestGuard: ensures Complete fires even on abort ---
585
586pub struct RequestGuard {
587    request_id: scru128::Scru128Id,
588    start: Instant,
589    bytes_sent: u64,
590}
591
592impl RequestGuard {
593    pub fn new(request_id: scru128::Scru128Id) -> Self {
594        Self {
595            request_id,
596            start: Instant::now(),
597            bytes_sent: 0,
598        }
599    }
600
601    pub fn request_id(&self) -> scru128::Scru128Id {
602        self.request_id
603    }
604}
605
606impl Drop for RequestGuard {
607    fn drop(&mut self) {
608        log_complete(self.request_id, self.bytes_sent, self.start);
609    }
610}
611
612// --- LoggingBody wrapper ---
613
614pub struct LoggingBody<B> {
615    inner: B,
616    guard: RequestGuard,
617}
618
619impl<B> LoggingBody<B> {
620    pub fn new(inner: B, guard: RequestGuard) -> Self {
621        Self { inner, guard }
622    }
623}
624
625impl<B> Body for LoggingBody<B>
626where
627    B: Body<Data = Bytes, Error = BoxError> + Unpin,
628{
629    type Data = Bytes;
630    type Error = BoxError;
631
632    fn poll_frame(
633        mut self: Pin<&mut Self>,
634        cx: &mut Context<'_>,
635    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
636        let inner = Pin::new(&mut self.inner);
637        match inner.poll_frame(cx) {
638            Poll::Ready(Some(Ok(frame))) => {
639                if let Some(data) = frame.data_ref() {
640                    self.guard.bytes_sent += data.len() as u64;
641                }
642                Poll::Ready(Some(Ok(frame)))
643            }
644            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
645            Poll::Ready(None) => Poll::Ready(None),
646            Poll::Pending => Poll::Pending,
647        }
648    }
649
650    fn is_end_stream(&self) -> bool {
651        self.inner.is_end_stream()
652    }
653
654    fn size_hint(&self) -> SizeHint {
655        self.inner.size_hint()
656    }
657}
658
659// No Drop impl needed - guard's Drop fires Complete
660
661#[cfg(test)]
662mod tests {
663    use super::*;
664    use std::time::Duration;
665
666    #[test]
667    fn token_bucket_allows_burst() {
668        let start = Instant::now();
669        let mut bucket = TokenBucket::new(40.0, 20.0, start);
670
671        // Should allow 40 requests immediately
672        for _ in 0..40 {
673            assert!(bucket.try_consume(start));
674        }
675        // 41st should fail
676        assert!(!bucket.try_consume(start));
677    }
678
679    #[test]
680    fn token_bucket_refills_over_time() {
681        let start = Instant::now();
682        let mut bucket = TokenBucket::new(40.0, 20.0, start);
683
684        // Drain the bucket
685        for _ in 0..40 {
686            bucket.try_consume(start);
687        }
688        assert!(!bucket.try_consume(start));
689
690        // After 100ms, should have 2 tokens (20/sec * 0.1s)
691        let later = start + Duration::from_millis(100);
692        assert!(bucket.try_consume(later));
693        assert!(bucket.try_consume(later));
694        assert!(!bucket.try_consume(later));
695    }
696
697    #[test]
698    fn token_bucket_caps_at_capacity() {
699        let start = Instant::now();
700        let mut bucket = TokenBucket::new(40.0, 20.0, start);
701
702        // Wait a long time - should still cap at 40
703        let later = start + Duration::from_secs(10);
704        for _ in 0..40 {
705            assert!(bucket.try_consume(later));
706        }
707        assert!(!bucket.try_consume(later));
708    }
709}