Skip to main content

http_nu/
logging.rs

1use std::collections::HashMap;
2use std::io::{self, Write};
3use std::pin::Pin;
4use std::sync::OnceLock;
5use std::task::{Context, Poll};
6use std::time::Instant;
7
8use chrono::Local;
9use crossterm::{cursor, execute, terminal};
10use hyper::body::{Body, Bytes, Frame, SizeHint};
11use hyper::header::HeaderMap;
12use serde::Serialize;
13use tokio::sync::broadcast;
14
15use crate::request::Request;
16
17type BoxError = Box<dyn std::error::Error + Send + Sync>;
18
19/// Datastar SDK version (matches CDN URL in stdlib)
20pub const DATASTAR_VERSION: &str = "1.0.1";
21
22/// Startup options to display in preamble
23#[derive(Clone, Default)]
24pub struct StartupOptions {
25    pub watch: bool,
26    pub tls: Option<String>,
27    pub store: Option<String>,
28    pub topic: Option<String>,
29    pub expose: Option<String>,
30    pub services: bool,
31    pub datastar: bool,
32}
33
34// --- Token bucket rate limiter ---
35
36struct TokenBucket {
37    tokens: f64,
38    capacity: f64,
39    refill_rate: f64,
40    last_refill: Instant,
41}
42
43impl TokenBucket {
44    fn new(capacity: f64, refill_rate: f64, now: Instant) -> Self {
45        Self {
46            tokens: capacity,
47            capacity,
48            refill_rate,
49            last_refill: now,
50        }
51    }
52
53    fn try_consume(&mut self, now: Instant) -> bool {
54        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
55        self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.capacity);
56        self.last_refill = now;
57
58        if self.tokens >= 1.0 {
59            self.tokens -= 1.0;
60            true
61        } else {
62            false
63        }
64    }
65}
66
67// --- Event enum: owned data for async broadcast ---
68
69#[derive(Clone, Serialize)]
70pub struct RequestData {
71    pub proto: String,
72    pub method: String,
73    pub remote_ip: Option<String>,
74    pub remote_port: Option<u16>,
75    pub trusted_ip: Option<String>,
76    pub headers: HashMap<String, String>,
77    pub uri: String,
78    pub path: String,
79    pub query: HashMap<String, String>,
80}
81
82impl From<&Request> for RequestData {
83    fn from(req: &Request) -> Self {
84        Self {
85            proto: req.proto.to_string(),
86            method: req.method.to_string(),
87            remote_ip: req.remote_ip.map(|ip| ip.to_string()),
88            remote_port: req.remote_port,
89            trusted_ip: req.trusted_ip.map(|ip| ip.to_string()),
90            headers: req
91                .headers
92                .iter()
93                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
94                .collect(),
95            uri: req.uri.to_string(),
96            path: req.path.clone(),
97            query: req.query.clone(),
98        }
99    }
100}
101
102#[derive(Clone)]
103pub enum Event {
104    // Access events
105    Request {
106        request_id: scru128::Scru128Id,
107        request: Box<RequestData>,
108    },
109    Response {
110        request_id: scru128::Scru128Id,
111        status: u16,
112        headers: HashMap<String, String>,
113        latency_ms: u64,
114    },
115    Complete {
116        request_id: scru128::Scru128Id,
117        bytes: u64,
118        duration_ms: u64,
119    },
120
121    // Lifecycle events
122    Started {
123        address: String,
124        startup_ms: u64,
125        options: StartupOptions,
126    },
127    Reloaded,
128    Error {
129        error: String,
130    },
131    Print {
132        message: String,
133    },
134    Stopping {
135        inflight: usize,
136    },
137    Stopped,
138    StopTimedOut,
139    Shutdown,
140}
141
142// --- Broadcast channel ---
143
144static SENDER: OnceLock<broadcast::Sender<Event>> = OnceLock::new();
145
146pub fn init_broadcast() -> broadcast::Receiver<Event> {
147    let (tx, rx) = broadcast::channel(65536);
148    let _ = SENDER.set(tx);
149    rx
150}
151
152pub fn subscribe() -> Option<broadcast::Receiver<Event>> {
153    SENDER.get().map(|tx| tx.subscribe())
154}
155
156fn emit(event: Event) {
157    if let Some(tx) = SENDER.get() {
158        let _ = tx.send(event); // non-blocking, drops if no receivers
159    }
160}
161
162// --- Public emit functions ---
163
164pub fn log_request(request_id: scru128::Scru128Id, request: &Request) {
165    emit(Event::Request {
166        request_id,
167        request: Box::new(RequestData::from(request)),
168    });
169}
170
171pub fn log_response(
172    request_id: scru128::Scru128Id,
173    status: u16,
174    headers: &HeaderMap,
175    start_time: Instant,
176) {
177    let headers_map: HashMap<String, String> = headers
178        .iter()
179        .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
180        .collect();
181
182    emit(Event::Response {
183        request_id,
184        status,
185        headers: headers_map,
186        latency_ms: start_time.elapsed().as_millis() as u64,
187    });
188}
189
190pub fn log_complete(request_id: scru128::Scru128Id, bytes: u64, response_time: Instant) {
191    emit(Event::Complete {
192        request_id,
193        bytes,
194        duration_ms: response_time.elapsed().as_millis() as u64,
195    });
196}
197
198pub fn log_started(address: &str, startup_ms: u128, options: StartupOptions) {
199    emit(Event::Started {
200        address: address.to_string(),
201        startup_ms: startup_ms as u64,
202        options,
203    });
204}
205
206pub fn log_reloaded() {
207    emit(Event::Reloaded);
208}
209
210pub fn log_error(error: &str) {
211    emit(Event::Error {
212        error: error.to_string(),
213    });
214}
215
216pub fn log_print(message: &str) {
217    emit(Event::Print {
218        message: message.to_string(),
219    });
220}
221
222pub fn log_stopping(inflight: usize) {
223    emit(Event::Stopping { inflight });
224}
225
226pub fn log_stopped() {
227    emit(Event::Stopped);
228}
229
230pub fn log_stop_timed_out() {
231    emit(Event::StopTimedOut);
232}
233
234pub fn shutdown() {
235    emit(Event::Shutdown);
236}
237
238// --- JSONL handler (dedicated writer thread does serialization + IO) ---
239
240pub fn run_jsonl_handler(rx: broadcast::Receiver<Event>) -> std::thread::JoinHandle<()> {
241    use std::io::Write;
242
243    std::thread::spawn(move || {
244        let mut rx = rx;
245        let mut stdout = std::io::BufWriter::new(std::io::stdout().lock());
246
247        loop {
248            let event = match rx.blocking_recv() {
249                Ok(event) => event,
250                Err(broadcast::error::RecvError::Lagged(n)) => {
251                    let json = serde_json::json!({
252                        "stamp": scru128::new().to_string(),
253                        "message": "lagged",
254                        "dropped": n,
255                    });
256                    if let Ok(line) = serde_json::to_string(&json) {
257                        let _ = writeln!(stdout, "{line}");
258                        let _ = stdout.flush();
259                    }
260                    continue;
261                }
262                Err(broadcast::error::RecvError::Closed) => break,
263            };
264
265            if matches!(event, Event::Shutdown) {
266                let _ = stdout.flush();
267                break;
268            }
269
270            let needs_flush = matches!(
271                &event,
272                Event::Started { .. }
273                    | Event::Stopped
274                    | Event::StopTimedOut
275                    | Event::Reloaded
276                    | Event::Error { .. }
277                    | Event::Print { .. }
278            );
279
280            let stamp = scru128::new().to_string();
281
282            let json = match event {
283                Event::Request {
284                    request_id,
285                    request,
286                } => {
287                    serde_json::json!({
288                        "stamp": stamp,
289                        "message": "request",
290                        "request_id": request_id.to_string(),
291                        "method": &request.method,
292                        "path": &request.path,
293                        "trusted_ip": &request.trusted_ip,
294                        "request": request,
295                    })
296                }
297                Event::Response {
298                    request_id,
299                    status,
300                    headers,
301                    latency_ms,
302                } => {
303                    serde_json::json!({
304                        "stamp": stamp,
305                        "message": "response",
306                        "request_id": request_id.to_string(),
307                        "status": status,
308                        "headers": headers,
309                        "latency_ms": latency_ms,
310                    })
311                }
312                Event::Complete {
313                    request_id,
314                    bytes,
315                    duration_ms,
316                } => {
317                    serde_json::json!({
318                        "stamp": stamp,
319                        "message": "complete",
320                        "request_id": request_id.to_string(),
321                        "bytes": bytes,
322                        "duration_ms": duration_ms,
323                    })
324                }
325                Event::Started {
326                    address,
327                    startup_ms,
328                    options,
329                } => {
330                    let xs_version: Option<&str> = if options.store.is_some() {
331                        #[cfg(feature = "cross-stream")]
332                        {
333                            Some(env!("XS_VERSION"))
334                        }
335                        #[cfg(not(feature = "cross-stream"))]
336                        {
337                            None
338                        }
339                    } else {
340                        None
341                    };
342                    serde_json::json!({
343                        "stamp": stamp,
344                        "message": "started",
345                        "address": address,
346                        "startup_ms": startup_ms,
347                        "watch": options.watch,
348                        "tls": options.tls,
349                        "store": options.store,
350                        "topic": options.topic,
351                        "expose": options.expose,
352                        "services": options.services,
353                        "nu_version": env!("NU_VERSION"),
354                        "xs_version": xs_version,
355                        "datastar_version": DATASTAR_VERSION,
356                    })
357                }
358                Event::Reloaded => {
359                    serde_json::json!({
360                        "stamp": stamp,
361                        "message": "reloaded",
362                    })
363                }
364                Event::Error { error } => {
365                    serde_json::json!({
366                        "stamp": stamp,
367                        "message": "error",
368                        "error": error,
369                    })
370                }
371                Event::Print { message } => {
372                    serde_json::json!({
373                        "stamp": stamp,
374                        "message": "print",
375                        "content": message,
376                    })
377                }
378                Event::Stopping { inflight } => {
379                    serde_json::json!({
380                        "stamp": stamp,
381                        "message": "stopping",
382                        "inflight": inflight,
383                    })
384                }
385                Event::Stopped => {
386                    serde_json::json!({
387                        "stamp": stamp,
388                        "message": "stopped",
389                    })
390                }
391                Event::StopTimedOut => {
392                    serde_json::json!({
393                        "stamp": stamp,
394                        "message": "stop_timed_out",
395                    })
396                }
397                Event::Shutdown => unreachable!(),
398            };
399
400            if let Ok(line) = serde_json::to_string(&json) {
401                let _ = writeln!(stdout, "{line}");
402            }
403
404            // Flush if lifecycle event or channel is empty (idle)
405            if needs_flush || rx.is_empty() {
406                let _ = stdout.flush();
407            }
408        }
409
410        let _ = stdout.flush();
411    })
412}
413
414// --- Human-readable handler (dedicated thread) ---
415
416struct RequestState {
417    method: String,
418    path: String,
419    trusted_ip: Option<String>,
420    start_time: Instant,
421    status: Option<u16>,
422    latency_ms: Option<u64>,
423}
424
425fn truncate_middle(s: &str, max_len: usize) -> String {
426    if s.len() <= max_len {
427        return s.to_string();
428    }
429    let keep = (max_len - 3) / 2;
430    format!("{}...{}", &s[..keep], &s[s.len() - keep..])
431}
432
433struct ActiveZone {
434    stdout: io::Stdout,
435    line_count: usize,
436}
437
438impl ActiveZone {
439    fn new() -> Self {
440        Self {
441            stdout: io::stdout(),
442            line_count: 0,
443        }
444    }
445
446    /// Clear the active zone and move cursor back to start
447    fn clear(&mut self) {
448        if self.line_count > 0 {
449            let _ = execute!(
450                self.stdout,
451                cursor::MoveUp(self.line_count as u16),
452                terminal::Clear(terminal::ClearType::FromCursorDown)
453            );
454            self.line_count = 0;
455        }
456    }
457
458    /// Print a permanent line (scrolls up, not part of active zone)
459    fn print_permanent(&mut self, line: &str) {
460        self.clear();
461        println!("{line}");
462        let _ = self.stdout.flush();
463    }
464
465    /// Redraw all active requests
466    fn redraw(&mut self, active_ids: &[String], requests: &HashMap<String, RequestState>) {
467        self.line_count = 0;
468        if !active_ids.is_empty() {
469            println!("· · ·  ✈ in flight  · · ·");
470            self.line_count += 1;
471            for id in active_ids {
472                if let Some(state) = requests.get(id) {
473                    let line = format_active_line(state);
474                    println!("{line}");
475                    self.line_count += 1;
476                }
477            }
478        }
479        let _ = self.stdout.flush();
480    }
481}
482
483fn format_active_line(state: &RequestState) -> String {
484    let timestamp = Local::now().format("%H:%M:%S%.3f");
485    let ip = state.trusted_ip.as_deref().unwrap_or("-");
486    let method = &state.method;
487    let path = truncate_middle(&state.path, 40);
488    let elapsed = state.start_time.elapsed().as_secs_f64();
489
490    match (state.status, state.latency_ms) {
491        (Some(status), Some(latency)) => {
492            format!(
493                "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {elapsed:>6.1}s"
494            )
495        }
496        _ => {
497            format!("{timestamp} {ip:>15} {method:<6} {path:<40} ... {elapsed:>6.1}s")
498        }
499    }
500}
501
502fn format_complete_line(state: &RequestState, duration_ms: u64, bytes: u64) -> String {
503    let timestamp = Local::now().format("%H:%M:%S%.3f");
504    let ip = state.trusted_ip.as_deref().unwrap_or("-");
505    let method = &state.method;
506    let path = truncate_middle(&state.path, 40);
507    let status = state.status.unwrap_or(0);
508    let latency = state.latency_ms.unwrap_or(0);
509
510    format!(
511        "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {duration_ms:>6}ms {bytes:>8}b"
512    )
513}
514
515pub fn run_human_handler(rx: broadcast::Receiver<Event>) -> std::thread::JoinHandle<()> {
516    std::thread::spawn(move || {
517        let mut rx = rx;
518        let mut zone = ActiveZone::new();
519        let mut requests: HashMap<String, RequestState> = HashMap::new();
520        let mut active_ids: Vec<String> = Vec::new();
521
522        // Rate limiting: token bucket (burst 40, refill 20/sec)
523        let mut rate_limiter = TokenBucket::new(40.0, 20.0, Instant::now());
524        let mut skipped: u64 = 0;
525        let mut lagged: u64 = 0;
526
527        loop {
528            let event = match rx.blocking_recv() {
529                Ok(event) => event,
530                Err(broadcast::error::RecvError::Lagged(n)) => {
531                    lagged += n;
532                    // Clear all pending - their Response/Complete events may have been dropped
533                    requests.clear();
534                    active_ids.clear();
535                    zone.print_permanent(&format!(
536                        "⚠ lagged: dropped {n} events, cleared in-flight"
537                    ));
538                    continue;
539                }
540                Err(broadcast::error::RecvError::Closed) => break,
541            };
542            match event {
543                Event::Started {
544                    address,
545                    startup_ms,
546                    options,
547                } => {
548                    let version = env!("CARGO_PKG_VERSION");
549                    let pid = std::process::id();
550                    let now = Local::now().to_rfc2822();
551                    zone.print_permanent(&format!("<http-nu version=\"{version}\">"));
552                    zone.print_permanent("     __  ,");
553                    zone.print_permanent(&format!(
554                        " .--()°'.'  pid {pid} · {address} · {startup_ms}ms 💜"
555                    ));
556                    zone.print_permanent(&format!("'|, . ,'    {now}"));
557
558                    // Build options line: [http-nu opts] │ xs [store] [expose] [services]
559                    let mut http_opts = Vec::new();
560                    if options.watch {
561                        http_opts.push("watch".to_string());
562                    }
563                    if let Some(ref tls) = options.tls {
564                        http_opts.push(format!("tls:{tls}"));
565                    }
566
567                    let mut xs_opts = Vec::new();
568                    if let Some(ref store) = options.store {
569                        xs_opts.push(store.to_string());
570                    }
571                    if let Some(ref topic) = options.topic {
572                        xs_opts.push(format!("topic:{topic}"));
573                    }
574                    if let Some(ref expose) = options.expose {
575                        xs_opts.push(expose.to_string());
576                    }
577                    if options.services {
578                        xs_opts.push("services".to_string());
579                    }
580
581                    // Build versions string
582                    let mut versions = vec![format!("nu {}", env!("NU_VERSION"))];
583                    #[cfg(feature = "cross-stream")]
584                    if options.store.is_some() {
585                        versions.push(format!("xs {}", env!("XS_VERSION")));
586                    }
587                    if options.datastar {
588                        versions.push(format!("datastar {DATASTAR_VERSION}"));
589                    }
590                    let versions_str = versions.join(" · ");
591
592                    let has_opts = !http_opts.is_empty() || !xs_opts.is_empty();
593                    if has_opts {
594                        // Options on duck line, versions on next line
595                        let opts_str = match (http_opts.is_empty(), xs_opts.is_empty()) {
596                            (false, true) => http_opts.join(" "),
597                            (true, false) => format!("xs {}", xs_opts.join(" ")),
598                            (false, false) => {
599                                format!("{} │ xs {}", http_opts.join(" "), xs_opts.join(" "))
600                            }
601                            _ => unreachable!(),
602                        };
603                        zone.print_permanent(&format!(" !_-(_\\     {opts_str}"));
604                        zone.print_permanent(&format!("            {versions_str}"));
605                    } else {
606                        // No options: versions on duck line
607                        zone.print_permanent(&format!(" !_-(_\\     {versions_str}"));
608                    }
609
610                    zone.redraw(&active_ids, &requests);
611                }
612                Event::Reloaded => {
613                    zone.print_permanent("reloaded 🔄");
614                    zone.redraw(&active_ids, &requests);
615                }
616                Event::Error { error } => {
617                    zone.clear();
618                    eprintln!("ERROR: {error}");
619                    zone.redraw(&active_ids, &requests);
620                }
621                Event::Print { message } => {
622                    zone.print_permanent(&format!("PRINT: {message}"));
623                    zone.redraw(&active_ids, &requests);
624                }
625                Event::Stopping { inflight } => {
626                    zone.print_permanent(&format!(
627                        "stopping, {inflight} connection(s) in flight..."
628                    ));
629                    zone.redraw(&active_ids, &requests);
630                }
631                Event::Stopped => {
632                    let timestamp = Local::now().format("%H:%M:%S%.3f");
633                    zone.print_permanent(&format!("{timestamp} cu l8r"));
634                    zone.clear();
635                    println!("</http-nu>");
636                }
637                Event::StopTimedOut => {
638                    zone.print_permanent("stop timed out, forcing exit");
639                }
640                Event::Request {
641                    request_id,
642                    request,
643                } => {
644                    if !rate_limiter.try_consume(Instant::now()) {
645                        skipped += 1;
646                        continue;
647                    }
648
649                    // Print skip summary if any
650                    if skipped > 0 {
651                        zone.print_permanent(&format!("... skipped {skipped} requests"));
652                        skipped = 0;
653                    }
654
655                    let id = request_id.to_string();
656                    let state = RequestState {
657                        method: request.method.clone(),
658                        path: request.path.clone(),
659                        trusted_ip: request.trusted_ip.clone(),
660                        start_time: Instant::now(),
661                        status: None,
662                        latency_ms: None,
663                    };
664                    requests.insert(id.clone(), state);
665                    active_ids.push(id);
666                    zone.clear();
667                    zone.redraw(&active_ids, &requests);
668                }
669                Event::Response {
670                    request_id,
671                    status,
672                    latency_ms,
673                    ..
674                } => {
675                    let id = request_id.to_string();
676                    if let Some(state) = requests.get_mut(&id) {
677                        state.status = Some(status);
678                        state.latency_ms = Some(latency_ms);
679                        zone.clear();
680                        zone.redraw(&active_ids, &requests);
681                    }
682                }
683                Event::Complete {
684                    request_id,
685                    bytes,
686                    duration_ms,
687                } => {
688                    let id = request_id.to_string();
689                    if let Some(state) = requests.remove(&id) {
690                        active_ids.retain(|x| x != &id);
691                        let line = format_complete_line(&state, duration_ms, bytes);
692                        zone.print_permanent(&line);
693                        zone.redraw(&active_ids, &requests);
694                    }
695                }
696                Event::Shutdown => break,
697            }
698        }
699
700        // Final summaries
701        zone.clear();
702        if skipped > 0 {
703            println!("... skipped {skipped} requests");
704        }
705        if lagged > 0 {
706            println!("⚠ total lagged: {lagged} events dropped");
707        }
708    })
709}
710
711// --- RequestGuard: ensures Complete fires even on abort ---
712
713pub struct RequestGuard {
714    request_id: scru128::Scru128Id,
715    start: Instant,
716    bytes_sent: u64,
717}
718
719impl RequestGuard {
720    pub fn new(request_id: scru128::Scru128Id) -> Self {
721        Self {
722            request_id,
723            start: Instant::now(),
724            bytes_sent: 0,
725        }
726    }
727
728    pub fn request_id(&self) -> scru128::Scru128Id {
729        self.request_id
730    }
731}
732
733impl Drop for RequestGuard {
734    fn drop(&mut self) {
735        log_complete(self.request_id, self.bytes_sent, self.start);
736    }
737}
738
739// --- LoggingBody wrapper ---
740
741pub struct LoggingBody<B> {
742    inner: B,
743    guard: RequestGuard,
744}
745
746impl<B> LoggingBody<B> {
747    pub fn new(inner: B, guard: RequestGuard) -> Self {
748        Self { inner, guard }
749    }
750}
751
752impl<B> Body for LoggingBody<B>
753where
754    B: Body<Data = Bytes, Error = BoxError> + Unpin,
755{
756    type Data = Bytes;
757    type Error = BoxError;
758
759    fn poll_frame(
760        mut self: Pin<&mut Self>,
761        cx: &mut Context<'_>,
762    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
763        let inner = Pin::new(&mut self.inner);
764        match inner.poll_frame(cx) {
765            Poll::Ready(Some(Ok(frame))) => {
766                if let Some(data) = frame.data_ref() {
767                    self.guard.bytes_sent += data.len() as u64;
768                }
769                Poll::Ready(Some(Ok(frame)))
770            }
771            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
772            Poll::Ready(None) => Poll::Ready(None),
773            Poll::Pending => Poll::Pending,
774        }
775    }
776
777    fn is_end_stream(&self) -> bool {
778        self.inner.is_end_stream()
779    }
780
781    fn size_hint(&self) -> SizeHint {
782        self.inner.size_hint()
783    }
784}
785
786// No Drop impl needed - guard's Drop fires Complete
787
788#[cfg(test)]
789mod tests {
790    use super::*;
791    use std::time::Duration;
792
793    #[test]
794    fn token_bucket_allows_burst() {
795        let start = Instant::now();
796        let mut bucket = TokenBucket::new(40.0, 20.0, start);
797
798        // Should allow 40 requests immediately
799        for _ in 0..40 {
800            assert!(bucket.try_consume(start));
801        }
802        // 41st should fail
803        assert!(!bucket.try_consume(start));
804    }
805
806    #[test]
807    fn token_bucket_refills_over_time() {
808        let start = Instant::now();
809        let mut bucket = TokenBucket::new(40.0, 20.0, start);
810
811        // Drain the bucket
812        for _ in 0..40 {
813            bucket.try_consume(start);
814        }
815        assert!(!bucket.try_consume(start));
816
817        // After 100ms, should have 2 tokens (20/sec * 0.1s)
818        let later = start + Duration::from_millis(100);
819        assert!(bucket.try_consume(later));
820        assert!(bucket.try_consume(later));
821        assert!(!bucket.try_consume(later));
822    }
823
824    #[test]
825    fn token_bucket_caps_at_capacity() {
826        let start = Instant::now();
827        let mut bucket = TokenBucket::new(40.0, 20.0, start);
828
829        // Wait a long time - should still cap at 40
830        let later = start + Duration::from_secs(10);
831        for _ in 0..40 {
832            assert!(bucket.try_consume(later));
833        }
834        assert!(!bucket.try_consume(later));
835    }
836}