http_nu/
logging.rs

1use std::collections::HashMap;
2use std::pin::Pin;
3use std::sync::OnceLock;
4use std::task::{Context, Poll};
5use std::time::Instant;
6
7use chrono::Local;
8use hyper::body::{Body, Bytes, Frame, SizeHint};
9use hyper::header::HeaderMap;
10use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
11use serde::Serialize;
12use tokio::sync::broadcast;
13
14use crate::request::Request;
15
16type BoxError = Box<dyn std::error::Error + Send + Sync>;
17
18// --- Event enum: owned data for async broadcast ---
19
20#[derive(Clone, Serialize)]
21pub struct RequestData {
22    pub proto: String,
23    pub method: String,
24    pub remote_ip: Option<String>,
25    pub remote_port: Option<u16>,
26    pub trusted_ip: Option<String>,
27    pub headers: HashMap<String, String>,
28    pub uri: String,
29    pub path: String,
30    pub query: HashMap<String, String>,
31}
32
33impl From<&Request> for RequestData {
34    fn from(req: &Request) -> Self {
35        Self {
36            proto: req.proto.to_string(),
37            method: req.method.to_string(),
38            remote_ip: req.remote_ip.map(|ip| ip.to_string()),
39            remote_port: req.remote_port,
40            trusted_ip: req.trusted_ip.map(|ip| ip.to_string()),
41            headers: req
42                .headers
43                .iter()
44                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
45                .collect(),
46            uri: req.uri.to_string(),
47            path: req.path.clone(),
48            query: req.query.clone(),
49        }
50    }
51}
52
53#[derive(Clone)]
54pub enum Event {
55    // Access events
56    Request {
57        request_id: scru128::Scru128Id,
58        request: Box<RequestData>,
59    },
60    Response {
61        request_id: scru128::Scru128Id,
62        status: u16,
63        headers: HashMap<String, String>,
64        latency_ms: u64,
65    },
66    Complete {
67        request_id: scru128::Scru128Id,
68        bytes: u64,
69        duration_ms: u64,
70    },
71
72    // Lifecycle events
73    Started {
74        address: String,
75        startup_ms: u64,
76    },
77    Reloaded,
78    Error {
79        error: String,
80    },
81    Stopping {
82        inflight: usize,
83    },
84    Stopped,
85    StopTimedOut,
86}
87
88// --- Broadcast channel ---
89
90static SENDER: OnceLock<broadcast::Sender<Event>> = OnceLock::new();
91
92pub fn init_broadcast() -> broadcast::Receiver<Event> {
93    let (tx, rx) = broadcast::channel(65536);
94    let _ = SENDER.set(tx);
95    rx
96}
97
98pub fn subscribe() -> Option<broadcast::Receiver<Event>> {
99    SENDER.get().map(|tx| tx.subscribe())
100}
101
102fn emit(event: Event) {
103    if let Some(tx) = SENDER.get() {
104        let _ = tx.send(event); // non-blocking, drops if no receivers
105    }
106}
107
108// --- Public emit functions ---
109
110pub fn log_request(request_id: scru128::Scru128Id, request: &Request) {
111    emit(Event::Request {
112        request_id,
113        request: Box::new(RequestData::from(request)),
114    });
115}
116
117pub fn log_response(
118    request_id: scru128::Scru128Id,
119    status: u16,
120    headers: &HeaderMap,
121    start_time: Instant,
122) {
123    let headers_map: HashMap<String, String> = headers
124        .iter()
125        .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
126        .collect();
127
128    emit(Event::Response {
129        request_id,
130        status,
131        headers: headers_map,
132        latency_ms: start_time.elapsed().as_millis() as u64,
133    });
134}
135
136pub fn log_complete(request_id: scru128::Scru128Id, bytes: u64, response_time: Instant) {
137    emit(Event::Complete {
138        request_id,
139        bytes,
140        duration_ms: response_time.elapsed().as_millis() as u64,
141    });
142}
143
144pub fn log_started(address: &str, startup_ms: u128) {
145    emit(Event::Started {
146        address: address.to_string(),
147        startup_ms: startup_ms as u64,
148    });
149}
150
151pub fn log_reloaded() {
152    emit(Event::Reloaded);
153}
154
155pub fn log_error(error: &str) {
156    emit(Event::Error {
157        error: error.to_string(),
158    });
159}
160
161pub fn log_stopping(inflight: usize) {
162    emit(Event::Stopping { inflight });
163}
164
165pub fn log_stopped() {
166    emit(Event::Stopped);
167}
168
169pub fn log_stop_timed_out() {
170    emit(Event::StopTimedOut);
171}
172
173// --- JSONL handler (dedicated writer thread does serialization + IO) ---
174
175pub fn run_jsonl_handler(rx: broadcast::Receiver<Event>) {
176    use std::io::Write;
177
178    std::thread::spawn(move || {
179        let mut rx = rx;
180        let mut stdout = std::io::BufWriter::new(std::io::stdout().lock());
181
182        loop {
183            let event = match rx.blocking_recv() {
184                Ok(event) => event,
185                Err(broadcast::error::RecvError::Lagged(n)) => {
186                    let json = serde_json::json!({
187                        "stamp": scru128::new().to_string(),
188                        "message": "lagged",
189                        "dropped": n,
190                    });
191                    if let Ok(line) = serde_json::to_string(&json) {
192                        let _ = writeln!(stdout, "{line}");
193                        let _ = stdout.flush();
194                    }
195                    continue;
196                }
197                Err(broadcast::error::RecvError::Closed) => break,
198            };
199
200            let needs_flush = matches!(
201                &event,
202                Event::Started { .. }
203                    | Event::Stopped
204                    | Event::StopTimedOut
205                    | Event::Reloaded
206                    | Event::Error { .. }
207            );
208
209            let stamp = scru128::new().to_string();
210
211            let json = match event {
212                Event::Request {
213                    request_id,
214                    request,
215                } => {
216                    serde_json::json!({
217                        "stamp": stamp,
218                        "message": "request",
219                        "request_id": request_id.to_string(),
220                        "method": &request.method,
221                        "path": &request.path,
222                        "trusted_ip": &request.trusted_ip,
223                        "request": request,
224                    })
225                }
226                Event::Response {
227                    request_id,
228                    status,
229                    headers,
230                    latency_ms,
231                } => {
232                    serde_json::json!({
233                        "stamp": stamp,
234                        "message": "response",
235                        "request_id": request_id.to_string(),
236                        "status": status,
237                        "headers": headers,
238                        "latency_ms": latency_ms,
239                    })
240                }
241                Event::Complete {
242                    request_id,
243                    bytes,
244                    duration_ms,
245                } => {
246                    serde_json::json!({
247                        "stamp": stamp,
248                        "message": "complete",
249                        "request_id": request_id.to_string(),
250                        "bytes": bytes,
251                        "duration_ms": duration_ms,
252                    })
253                }
254                Event::Started {
255                    address,
256                    startup_ms,
257                } => {
258                    serde_json::json!({
259                        "stamp": stamp,
260                        "message": "started",
261                        "address": address,
262                        "startup_ms": startup_ms,
263                    })
264                }
265                Event::Reloaded => {
266                    serde_json::json!({
267                        "stamp": stamp,
268                        "message": "reloaded",
269                    })
270                }
271                Event::Error { error } => {
272                    serde_json::json!({
273                        "stamp": stamp,
274                        "message": "error",
275                        "error": error,
276                    })
277                }
278                Event::Stopping { inflight } => {
279                    serde_json::json!({
280                        "stamp": stamp,
281                        "message": "stopping",
282                        "inflight": inflight,
283                    })
284                }
285                Event::Stopped => {
286                    serde_json::json!({
287                        "stamp": stamp,
288                        "message": "stopped",
289                    })
290                }
291                Event::StopTimedOut => {
292                    serde_json::json!({
293                        "stamp": stamp,
294                        "message": "stop_timed_out",
295                    })
296                }
297            };
298
299            if let Ok(line) = serde_json::to_string(&json) {
300                let _ = writeln!(stdout, "{line}");
301            }
302
303            // Flush if lifecycle event or channel is empty (idle)
304            if needs_flush || rx.is_empty() {
305                let _ = stdout.flush();
306            }
307        }
308
309        let _ = stdout.flush();
310    });
311}
312
313// --- Human-readable handler (dedicated thread) ---
314
315struct RequestState {
316    pb: ProgressBar,
317    method: String,
318    path: String,
319    trusted_ip: Option<String>,
320    status: Option<u16>,
321    latency_ms: Option<u64>,
322}
323
324fn truncate_middle(s: &str, max_len: usize) -> String {
325    if s.len() <= max_len {
326        return s.to_string();
327    }
328    let keep = (max_len - 3) / 2;
329    format!("{}...{}", &s[..keep], &s[s.len() - keep..])
330}
331
332fn format_line(state: &RequestState, duration_ms: Option<u64>, bytes: Option<u64>) -> String {
333    let timestamp = Local::now().format("%H:%M:%S%.3f");
334    let ip = state.trusted_ip.as_deref().unwrap_or("-");
335    let method = &state.method;
336    let path = truncate_middle(&state.path, 40);
337
338    match (state.status, state.latency_ms, duration_ms, bytes) {
339        // Complete: status, latency, duration, bytes
340        (Some(status), Some(latency), Some(dur), Some(b)) => {
341            format!(
342                "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {dur:>6}ms {b:>8}b"
343            )
344        }
345        // Response: status, latency
346        (Some(status), Some(latency), None, None) => {
347            format!("{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms")
348        }
349        // Request pending
350        _ => {
351            format!("{timestamp} {ip:>15} {method:<6} {path:<40} ...")
352        }
353    }
354}
355
356pub fn run_human_handler(rx: broadcast::Receiver<Event>) {
357    std::thread::spawn(move || {
358        let mut rx = rx;
359        let mp = MultiProgress::new();
360        let mut requests: HashMap<String, RequestState> = HashMap::new();
361
362        // Rate limiting: ~10 requests/sec visible
363        let min_interval = std::time::Duration::from_millis(100);
364        let mut last_shown = std::time::Instant::now();
365        let mut skipped: u64 = 0;
366
367        loop {
368            let event = match rx.blocking_recv() {
369                Ok(event) => event,
370                Err(broadcast::error::RecvError::Lagged(n)) => {
371                    skipped += n;
372                    continue;
373                }
374                Err(broadcast::error::RecvError::Closed) => break,
375            };
376            match event {
377                Event::Started {
378                    address,
379                    startup_ms,
380                } => {
381                    let version = env!("CARGO_PKG_VERSION");
382                    let pid = std::process::id();
383                    let now = Local::now().to_rfc2822();
384                    println!("<http-nu version=\"{version}\">");
385                    println!("     __  ,");
386                    println!(" .--()°'.'  pid {pid} · {address} · {startup_ms}ms 💜");
387                    println!("'|, . ,'    {now}");
388                    println!(" !_-(_\\");
389                }
390                Event::Reloaded => {
391                    println!("reloaded 🔄");
392                }
393                Event::Error { error } => {
394                    eprintln!("{error}");
395                }
396                Event::Stopping { inflight } => {
397                    println!("stopping, {inflight} connection(s) in flight...");
398                }
399                Event::Stopped => {
400                    println!("cu l8r </http-nu>");
401                }
402                Event::StopTimedOut => {
403                    println!("stop timed out, forcing exit");
404                }
405                Event::Request {
406                    request_id,
407                    request,
408                } => {
409                    let now = std::time::Instant::now();
410                    if now.duration_since(last_shown) < min_interval {
411                        skipped += 1;
412                        continue;
413                    }
414                    last_shown = now;
415
416                    // Print skip summary if any
417                    if skipped > 0 {
418                        println!("... skipped {skipped} requests");
419                        skipped = 0;
420                    }
421
422                    let pb = mp.add(ProgressBar::new_spinner());
423                    pb.set_style(ProgressStyle::default_spinner().template("{msg}").unwrap());
424
425                    let state = RequestState {
426                        pb: pb.clone(),
427                        method: request.method.clone(),
428                        path: request.path.clone(),
429                        trusted_ip: request.trusted_ip.clone(),
430                        status: None,
431                        latency_ms: None,
432                    };
433                    pb.set_message(format_line(&state, None, None));
434                    requests.insert(request_id.to_string(), state);
435                }
436                Event::Response {
437                    request_id,
438                    status,
439                    latency_ms,
440                    ..
441                } => {
442                    if let Some(state) = requests.get_mut(&request_id.to_string()) {
443                        state.status = Some(status);
444                        state.latency_ms = Some(latency_ms);
445                        state.pb.set_message(format_line(state, None, None));
446                    }
447                }
448                Event::Complete {
449                    request_id,
450                    bytes,
451                    duration_ms,
452                } => {
453                    if let Some(state) = requests.remove(&request_id.to_string()) {
454                        state.pb.finish_with_message(format_line(
455                            &state,
456                            Some(duration_ms),
457                            Some(bytes),
458                        ));
459                    }
460                }
461            }
462        }
463
464        // Final skip summary
465        if skipped > 0 {
466            println!("... skipped {skipped} requests");
467        }
468    });
469}
470
471// --- LoggingBody wrapper ---
472
473pub struct LoggingBody<B> {
474    inner: B,
475    request_id: scru128::Scru128Id,
476    response_time: Instant,
477    bytes_sent: u64,
478    logged_complete: bool,
479}
480
481impl<B> LoggingBody<B> {
482    pub fn new(inner: B, request_id: scru128::Scru128Id) -> Self {
483        Self {
484            inner,
485            request_id,
486            response_time: Instant::now(),
487            bytes_sent: 0,
488            logged_complete: false,
489        }
490    }
491
492    fn do_log_complete(&mut self) {
493        if !self.logged_complete {
494            self.logged_complete = true;
495            log_complete(self.request_id, self.bytes_sent, self.response_time);
496        }
497    }
498}
499
500impl<B> Body for LoggingBody<B>
501where
502    B: Body<Data = Bytes, Error = BoxError> + Unpin,
503{
504    type Data = Bytes;
505    type Error = BoxError;
506
507    fn poll_frame(
508        mut self: Pin<&mut Self>,
509        cx: &mut Context<'_>,
510    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
511        let inner = Pin::new(&mut self.inner);
512        match inner.poll_frame(cx) {
513            Poll::Ready(Some(Ok(frame))) => {
514                if let Some(data) = frame.data_ref() {
515                    self.bytes_sent += data.len() as u64;
516                }
517                Poll::Ready(Some(Ok(frame)))
518            }
519            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
520            Poll::Ready(None) => {
521                self.do_log_complete();
522                Poll::Ready(None)
523            }
524            Poll::Pending => Poll::Pending,
525        }
526    }
527
528    fn is_end_stream(&self) -> bool {
529        self.inner.is_end_stream()
530    }
531
532    fn size_hint(&self) -> SizeHint {
533        self.inner.size_hint()
534    }
535}
536
537impl<B> Drop for LoggingBody<B> {
538    fn drop(&mut self) {
539        self.do_log_complete();
540    }
541}