Skip to main content

http_nu/
logging.rs

1use std::collections::HashMap;
2use std::io::{self, Write};
3use std::pin::Pin;
4use std::sync::OnceLock;
5use std::task::{Context, Poll};
6use std::time::Instant;
7
8use chrono::Local;
9use crossterm::{cursor, execute, terminal};
10use hyper::body::{Body, Bytes, Frame, SizeHint};
11use hyper::header::HeaderMap;
12use serde::Serialize;
13use tokio::sync::broadcast;
14
15use crate::request::Request;
16
17type BoxError = Box<dyn std::error::Error + Send + Sync>;
18
19/// Datastar SDK version (matches CDN URL in stdlib)
20pub const DATASTAR_VERSION: &str = "1.0-RC.8";
21
22/// Startup options to display in preamble
23#[derive(Clone, Default)]
24pub struct StartupOptions {
25    pub watch: bool,
26    pub tls: Option<String>,
27    pub store: Option<String>,
28    pub topic: Option<String>,
29    pub expose: Option<String>,
30    pub services: bool,
31}
32
33// --- Token bucket rate limiter ---
34
35struct TokenBucket {
36    tokens: f64,
37    capacity: f64,
38    refill_rate: f64,
39    last_refill: Instant,
40}
41
42impl TokenBucket {
43    fn new(capacity: f64, refill_rate: f64, now: Instant) -> Self {
44        Self {
45            tokens: capacity,
46            capacity,
47            refill_rate,
48            last_refill: now,
49        }
50    }
51
52    fn try_consume(&mut self, now: Instant) -> bool {
53        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
54        self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.capacity);
55        self.last_refill = now;
56
57        if self.tokens >= 1.0 {
58            self.tokens -= 1.0;
59            true
60        } else {
61            false
62        }
63    }
64}
65
66// --- Event enum: owned data for async broadcast ---
67
68#[derive(Clone, Serialize)]
69pub struct RequestData {
70    pub proto: String,
71    pub method: String,
72    pub remote_ip: Option<String>,
73    pub remote_port: Option<u16>,
74    pub trusted_ip: Option<String>,
75    pub headers: HashMap<String, String>,
76    pub uri: String,
77    pub path: String,
78    pub query: HashMap<String, String>,
79}
80
81impl From<&Request> for RequestData {
82    fn from(req: &Request) -> Self {
83        Self {
84            proto: req.proto.to_string(),
85            method: req.method.to_string(),
86            remote_ip: req.remote_ip.map(|ip| ip.to_string()),
87            remote_port: req.remote_port,
88            trusted_ip: req.trusted_ip.map(|ip| ip.to_string()),
89            headers: req
90                .headers
91                .iter()
92                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
93                .collect(),
94            uri: req.uri.to_string(),
95            path: req.path.clone(),
96            query: req.query.clone(),
97        }
98    }
99}
100
101#[derive(Clone)]
102pub enum Event {
103    // Access events
104    Request {
105        request_id: scru128::Scru128Id,
106        request: Box<RequestData>,
107    },
108    Response {
109        request_id: scru128::Scru128Id,
110        status: u16,
111        headers: HashMap<String, String>,
112        latency_ms: u64,
113    },
114    Complete {
115        request_id: scru128::Scru128Id,
116        bytes: u64,
117        duration_ms: u64,
118    },
119
120    // Lifecycle events
121    Started {
122        address: String,
123        startup_ms: u64,
124        options: StartupOptions,
125    },
126    Reloaded,
127    Error {
128        error: String,
129    },
130    Print {
131        message: String,
132    },
133    Stopping {
134        inflight: usize,
135    },
136    Stopped,
137    StopTimedOut,
138    Shutdown,
139}
140
141// --- Broadcast channel ---
142
143static SENDER: OnceLock<broadcast::Sender<Event>> = OnceLock::new();
144
145pub fn init_broadcast() -> broadcast::Receiver<Event> {
146    let (tx, rx) = broadcast::channel(65536);
147    let _ = SENDER.set(tx);
148    rx
149}
150
151pub fn subscribe() -> Option<broadcast::Receiver<Event>> {
152    SENDER.get().map(|tx| tx.subscribe())
153}
154
155fn emit(event: Event) {
156    if let Some(tx) = SENDER.get() {
157        let _ = tx.send(event); // non-blocking, drops if no receivers
158    }
159}
160
161// --- Public emit functions ---
162
163pub fn log_request(request_id: scru128::Scru128Id, request: &Request) {
164    emit(Event::Request {
165        request_id,
166        request: Box::new(RequestData::from(request)),
167    });
168}
169
170pub fn log_response(
171    request_id: scru128::Scru128Id,
172    status: u16,
173    headers: &HeaderMap,
174    start_time: Instant,
175) {
176    let headers_map: HashMap<String, String> = headers
177        .iter()
178        .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
179        .collect();
180
181    emit(Event::Response {
182        request_id,
183        status,
184        headers: headers_map,
185        latency_ms: start_time.elapsed().as_millis() as u64,
186    });
187}
188
189pub fn log_complete(request_id: scru128::Scru128Id, bytes: u64, response_time: Instant) {
190    emit(Event::Complete {
191        request_id,
192        bytes,
193        duration_ms: response_time.elapsed().as_millis() as u64,
194    });
195}
196
197pub fn log_started(address: &str, startup_ms: u128, options: StartupOptions) {
198    emit(Event::Started {
199        address: address.to_string(),
200        startup_ms: startup_ms as u64,
201        options,
202    });
203}
204
205pub fn log_reloaded() {
206    emit(Event::Reloaded);
207}
208
209pub fn log_error(error: &str) {
210    emit(Event::Error {
211        error: error.to_string(),
212    });
213}
214
215pub fn log_print(message: &str) {
216    emit(Event::Print {
217        message: message.to_string(),
218    });
219}
220
221pub fn log_stopping(inflight: usize) {
222    emit(Event::Stopping { inflight });
223}
224
225pub fn log_stopped() {
226    emit(Event::Stopped);
227}
228
229pub fn log_stop_timed_out() {
230    emit(Event::StopTimedOut);
231}
232
233pub fn shutdown() {
234    emit(Event::Shutdown);
235}
236
237// --- JSONL handler (dedicated writer thread does serialization + IO) ---
238
239pub fn run_jsonl_handler(rx: broadcast::Receiver<Event>) -> std::thread::JoinHandle<()> {
240    use std::io::Write;
241
242    std::thread::spawn(move || {
243        let mut rx = rx;
244        let mut stdout = std::io::BufWriter::new(std::io::stdout().lock());
245
246        loop {
247            let event = match rx.blocking_recv() {
248                Ok(event) => event,
249                Err(broadcast::error::RecvError::Lagged(n)) => {
250                    let json = serde_json::json!({
251                        "stamp": scru128::new().to_string(),
252                        "message": "lagged",
253                        "dropped": n,
254                    });
255                    if let Ok(line) = serde_json::to_string(&json) {
256                        let _ = writeln!(stdout, "{line}");
257                        let _ = stdout.flush();
258                    }
259                    continue;
260                }
261                Err(broadcast::error::RecvError::Closed) => break,
262            };
263
264            if matches!(event, Event::Shutdown) {
265                let _ = stdout.flush();
266                break;
267            }
268
269            let needs_flush = matches!(
270                &event,
271                Event::Started { .. }
272                    | Event::Stopped
273                    | Event::StopTimedOut
274                    | Event::Reloaded
275                    | Event::Error { .. }
276                    | Event::Print { .. }
277            );
278
279            let stamp = scru128::new().to_string();
280
281            let json = match event {
282                Event::Request {
283                    request_id,
284                    request,
285                } => {
286                    serde_json::json!({
287                        "stamp": stamp,
288                        "message": "request",
289                        "request_id": request_id.to_string(),
290                        "method": &request.method,
291                        "path": &request.path,
292                        "trusted_ip": &request.trusted_ip,
293                        "request": request,
294                    })
295                }
296                Event::Response {
297                    request_id,
298                    status,
299                    headers,
300                    latency_ms,
301                } => {
302                    serde_json::json!({
303                        "stamp": stamp,
304                        "message": "response",
305                        "request_id": request_id.to_string(),
306                        "status": status,
307                        "headers": headers,
308                        "latency_ms": latency_ms,
309                    })
310                }
311                Event::Complete {
312                    request_id,
313                    bytes,
314                    duration_ms,
315                } => {
316                    serde_json::json!({
317                        "stamp": stamp,
318                        "message": "complete",
319                        "request_id": request_id.to_string(),
320                        "bytes": bytes,
321                        "duration_ms": duration_ms,
322                    })
323                }
324                Event::Started {
325                    address,
326                    startup_ms,
327                    options,
328                } => {
329                    let xs_version: Option<&str> = if options.store.is_some() {
330                        #[cfg(feature = "cross-stream")]
331                        {
332                            Some(env!("XS_VERSION"))
333                        }
334                        #[cfg(not(feature = "cross-stream"))]
335                        {
336                            None
337                        }
338                    } else {
339                        None
340                    };
341                    serde_json::json!({
342                        "stamp": stamp,
343                        "message": "started",
344                        "address": address,
345                        "startup_ms": startup_ms,
346                        "watch": options.watch,
347                        "tls": options.tls,
348                        "store": options.store,
349                        "topic": options.topic,
350                        "expose": options.expose,
351                        "services": options.services,
352                        "nu_version": env!("NU_VERSION"),
353                        "xs_version": xs_version,
354                        "datastar_version": DATASTAR_VERSION,
355                    })
356                }
357                Event::Reloaded => {
358                    serde_json::json!({
359                        "stamp": stamp,
360                        "message": "reloaded",
361                    })
362                }
363                Event::Error { error } => {
364                    serde_json::json!({
365                        "stamp": stamp,
366                        "message": "error",
367                        "error": error,
368                    })
369                }
370                Event::Print { message } => {
371                    serde_json::json!({
372                        "stamp": stamp,
373                        "message": "print",
374                        "content": message,
375                    })
376                }
377                Event::Stopping { inflight } => {
378                    serde_json::json!({
379                        "stamp": stamp,
380                        "message": "stopping",
381                        "inflight": inflight,
382                    })
383                }
384                Event::Stopped => {
385                    serde_json::json!({
386                        "stamp": stamp,
387                        "message": "stopped",
388                    })
389                }
390                Event::StopTimedOut => {
391                    serde_json::json!({
392                        "stamp": stamp,
393                        "message": "stop_timed_out",
394                    })
395                }
396                Event::Shutdown => unreachable!(),
397            };
398
399            if let Ok(line) = serde_json::to_string(&json) {
400                let _ = writeln!(stdout, "{line}");
401            }
402
403            // Flush if lifecycle event or channel is empty (idle)
404            if needs_flush || rx.is_empty() {
405                let _ = stdout.flush();
406            }
407        }
408
409        let _ = stdout.flush();
410    })
411}
412
413// --- Human-readable handler (dedicated thread) ---
414
415struct RequestState {
416    method: String,
417    path: String,
418    trusted_ip: Option<String>,
419    start_time: Instant,
420    status: Option<u16>,
421    latency_ms: Option<u64>,
422}
423
424fn truncate_middle(s: &str, max_len: usize) -> String {
425    if s.len() <= max_len {
426        return s.to_string();
427    }
428    let keep = (max_len - 3) / 2;
429    format!("{}...{}", &s[..keep], &s[s.len() - keep..])
430}
431
432struct ActiveZone {
433    stdout: io::Stdout,
434    line_count: usize,
435}
436
437impl ActiveZone {
438    fn new() -> Self {
439        Self {
440            stdout: io::stdout(),
441            line_count: 0,
442        }
443    }
444
445    /// Clear the active zone and move cursor back to start
446    fn clear(&mut self) {
447        if self.line_count > 0 {
448            let _ = execute!(
449                self.stdout,
450                cursor::MoveUp(self.line_count as u16),
451                terminal::Clear(terminal::ClearType::FromCursorDown)
452            );
453            self.line_count = 0;
454        }
455    }
456
457    /// Print a permanent line (scrolls up, not part of active zone)
458    fn print_permanent(&mut self, line: &str) {
459        self.clear();
460        println!("{line}");
461        let _ = self.stdout.flush();
462    }
463
464    /// Redraw all active requests
465    fn redraw(&mut self, active_ids: &[String], requests: &HashMap<String, RequestState>) {
466        self.line_count = 0;
467        if !active_ids.is_empty() {
468            println!("· · ·  ✈ in flight  · · ·");
469            self.line_count += 1;
470            for id in active_ids {
471                if let Some(state) = requests.get(id) {
472                    let line = format_active_line(state);
473                    println!("{line}");
474                    self.line_count += 1;
475                }
476            }
477        }
478        let _ = self.stdout.flush();
479    }
480}
481
482fn format_active_line(state: &RequestState) -> String {
483    let timestamp = Local::now().format("%H:%M:%S%.3f");
484    let ip = state.trusted_ip.as_deref().unwrap_or("-");
485    let method = &state.method;
486    let path = truncate_middle(&state.path, 40);
487    let elapsed = state.start_time.elapsed().as_secs_f64();
488
489    match (state.status, state.latency_ms) {
490        (Some(status), Some(latency)) => {
491            format!(
492                "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {elapsed:>6.1}s"
493            )
494        }
495        _ => {
496            format!("{timestamp} {ip:>15} {method:<6} {path:<40} ... {elapsed:>6.1}s")
497        }
498    }
499}
500
501fn format_complete_line(state: &RequestState, duration_ms: u64, bytes: u64) -> String {
502    let timestamp = Local::now().format("%H:%M:%S%.3f");
503    let ip = state.trusted_ip.as_deref().unwrap_or("-");
504    let method = &state.method;
505    let path = truncate_middle(&state.path, 40);
506    let status = state.status.unwrap_or(0);
507    let latency = state.latency_ms.unwrap_or(0);
508
509    format!(
510        "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {duration_ms:>6}ms {bytes:>8}b"
511    )
512}
513
514pub fn run_human_handler(rx: broadcast::Receiver<Event>) -> std::thread::JoinHandle<()> {
515    std::thread::spawn(move || {
516        let mut rx = rx;
517        let mut zone = ActiveZone::new();
518        let mut requests: HashMap<String, RequestState> = HashMap::new();
519        let mut active_ids: Vec<String> = Vec::new();
520
521        // Rate limiting: token bucket (burst 40, refill 20/sec)
522        let mut rate_limiter = TokenBucket::new(40.0, 20.0, Instant::now());
523        let mut skipped: u64 = 0;
524        let mut lagged: u64 = 0;
525
526        loop {
527            let event = match rx.blocking_recv() {
528                Ok(event) => event,
529                Err(broadcast::error::RecvError::Lagged(n)) => {
530                    lagged += n;
531                    // Clear all pending - their Response/Complete events may have been dropped
532                    requests.clear();
533                    active_ids.clear();
534                    zone.print_permanent(&format!(
535                        "⚠ lagged: dropped {n} events, cleared in-flight"
536                    ));
537                    continue;
538                }
539                Err(broadcast::error::RecvError::Closed) => break,
540            };
541            match event {
542                Event::Started {
543                    address,
544                    startup_ms,
545                    options,
546                } => {
547                    let version = env!("CARGO_PKG_VERSION");
548                    let pid = std::process::id();
549                    let now = Local::now().to_rfc2822();
550                    zone.print_permanent(&format!("<http-nu version=\"{version}\">"));
551                    zone.print_permanent("     __  ,");
552                    zone.print_permanent(&format!(
553                        " .--()°'.'  pid {pid} · {address} · {startup_ms}ms 💜"
554                    ));
555                    zone.print_permanent(&format!("'|, . ,'    {now}"));
556
557                    // Build options line: [http-nu opts] │ xs [store] [expose] [services]
558                    let mut http_opts = Vec::new();
559                    if options.watch {
560                        http_opts.push("watch".to_string());
561                    }
562                    if let Some(ref tls) = options.tls {
563                        http_opts.push(format!("tls:{tls}"));
564                    }
565
566                    let mut xs_opts = Vec::new();
567                    if let Some(ref store) = options.store {
568                        xs_opts.push(store.to_string());
569                    }
570                    if let Some(ref topic) = options.topic {
571                        xs_opts.push(format!("topic:{topic}"));
572                    }
573                    if let Some(ref expose) = options.expose {
574                        xs_opts.push(expose.to_string());
575                    }
576                    if options.services {
577                        xs_opts.push("services".to_string());
578                    }
579
580                    // Build versions string
581                    let mut versions = vec![format!("nu {}", env!("NU_VERSION"))];
582                    #[cfg(feature = "cross-stream")]
583                    if options.store.is_some() {
584                        versions.push(format!("xs {}", env!("XS_VERSION")));
585                    }
586                    versions.push(format!("datastar {DATASTAR_VERSION}"));
587                    let versions_str = versions.join(" · ");
588
589                    let has_opts = !http_opts.is_empty() || !xs_opts.is_empty();
590                    if has_opts {
591                        // Options on duck line, versions on next line
592                        let opts_str = match (http_opts.is_empty(), xs_opts.is_empty()) {
593                            (false, true) => http_opts.join(" "),
594                            (true, false) => format!("xs {}", xs_opts.join(" ")),
595                            (false, false) => {
596                                format!("{} │ xs {}", http_opts.join(" "), xs_opts.join(" "))
597                            }
598                            _ => unreachable!(),
599                        };
600                        zone.print_permanent(&format!(" !_-(_\\     {opts_str}"));
601                        zone.print_permanent(&format!("            {versions_str}"));
602                    } else {
603                        // No options: versions on duck line
604                        zone.print_permanent(&format!(" !_-(_\\     {versions_str}"));
605                    }
606
607                    zone.redraw(&active_ids, &requests);
608                }
609                Event::Reloaded => {
610                    zone.print_permanent("reloaded 🔄");
611                    zone.redraw(&active_ids, &requests);
612                }
613                Event::Error { error } => {
614                    zone.clear();
615                    eprintln!("ERROR: {error}");
616                    zone.redraw(&active_ids, &requests);
617                }
618                Event::Print { message } => {
619                    zone.print_permanent(&format!("PRINT: {message}"));
620                    zone.redraw(&active_ids, &requests);
621                }
622                Event::Stopping { inflight } => {
623                    zone.print_permanent(&format!(
624                        "stopping, {inflight} connection(s) in flight..."
625                    ));
626                    zone.redraw(&active_ids, &requests);
627                }
628                Event::Stopped => {
629                    let timestamp = Local::now().format("%H:%M:%S%.3f");
630                    zone.print_permanent(&format!("{timestamp} cu l8r"));
631                    zone.clear();
632                    println!("</http-nu>");
633                }
634                Event::StopTimedOut => {
635                    zone.print_permanent("stop timed out, forcing exit");
636                }
637                Event::Request {
638                    request_id,
639                    request,
640                } => {
641                    if !rate_limiter.try_consume(Instant::now()) {
642                        skipped += 1;
643                        continue;
644                    }
645
646                    // Print skip summary if any
647                    if skipped > 0 {
648                        zone.print_permanent(&format!("... skipped {skipped} requests"));
649                        skipped = 0;
650                    }
651
652                    let id = request_id.to_string();
653                    let state = RequestState {
654                        method: request.method.clone(),
655                        path: request.path.clone(),
656                        trusted_ip: request.trusted_ip.clone(),
657                        start_time: Instant::now(),
658                        status: None,
659                        latency_ms: None,
660                    };
661                    requests.insert(id.clone(), state);
662                    active_ids.push(id);
663                    zone.clear();
664                    zone.redraw(&active_ids, &requests);
665                }
666                Event::Response {
667                    request_id,
668                    status,
669                    latency_ms,
670                    ..
671                } => {
672                    let id = request_id.to_string();
673                    if let Some(state) = requests.get_mut(&id) {
674                        state.status = Some(status);
675                        state.latency_ms = Some(latency_ms);
676                        zone.clear();
677                        zone.redraw(&active_ids, &requests);
678                    }
679                }
680                Event::Complete {
681                    request_id,
682                    bytes,
683                    duration_ms,
684                } => {
685                    let id = request_id.to_string();
686                    if let Some(state) = requests.remove(&id) {
687                        active_ids.retain(|x| x != &id);
688                        let line = format_complete_line(&state, duration_ms, bytes);
689                        zone.print_permanent(&line);
690                        zone.redraw(&active_ids, &requests);
691                    }
692                }
693                Event::Shutdown => break,
694            }
695        }
696
697        // Final summaries
698        zone.clear();
699        if skipped > 0 {
700            println!("... skipped {skipped} requests");
701        }
702        if lagged > 0 {
703            println!("⚠ total lagged: {lagged} events dropped");
704        }
705    })
706}
707
708// --- RequestGuard: ensures Complete fires even on abort ---
709
710pub struct RequestGuard {
711    request_id: scru128::Scru128Id,
712    start: Instant,
713    bytes_sent: u64,
714}
715
716impl RequestGuard {
717    pub fn new(request_id: scru128::Scru128Id) -> Self {
718        Self {
719            request_id,
720            start: Instant::now(),
721            bytes_sent: 0,
722        }
723    }
724
725    pub fn request_id(&self) -> scru128::Scru128Id {
726        self.request_id
727    }
728}
729
730impl Drop for RequestGuard {
731    fn drop(&mut self) {
732        log_complete(self.request_id, self.bytes_sent, self.start);
733    }
734}
735
736// --- LoggingBody wrapper ---
737
738pub struct LoggingBody<B> {
739    inner: B,
740    guard: RequestGuard,
741}
742
743impl<B> LoggingBody<B> {
744    pub fn new(inner: B, guard: RequestGuard) -> Self {
745        Self { inner, guard }
746    }
747}
748
749impl<B> Body for LoggingBody<B>
750where
751    B: Body<Data = Bytes, Error = BoxError> + Unpin,
752{
753    type Data = Bytes;
754    type Error = BoxError;
755
756    fn poll_frame(
757        mut self: Pin<&mut Self>,
758        cx: &mut Context<'_>,
759    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
760        let inner = Pin::new(&mut self.inner);
761        match inner.poll_frame(cx) {
762            Poll::Ready(Some(Ok(frame))) => {
763                if let Some(data) = frame.data_ref() {
764                    self.guard.bytes_sent += data.len() as u64;
765                }
766                Poll::Ready(Some(Ok(frame)))
767            }
768            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
769            Poll::Ready(None) => Poll::Ready(None),
770            Poll::Pending => Poll::Pending,
771        }
772    }
773
774    fn is_end_stream(&self) -> bool {
775        self.inner.is_end_stream()
776    }
777
778    fn size_hint(&self) -> SizeHint {
779        self.inner.size_hint()
780    }
781}
782
783// No Drop impl needed - guard's Drop fires Complete
784
785#[cfg(test)]
786mod tests {
787    use super::*;
788    use std::time::Duration;
789
790    #[test]
791    fn token_bucket_allows_burst() {
792        let start = Instant::now();
793        let mut bucket = TokenBucket::new(40.0, 20.0, start);
794
795        // Should allow 40 requests immediately
796        for _ in 0..40 {
797            assert!(bucket.try_consume(start));
798        }
799        // 41st should fail
800        assert!(!bucket.try_consume(start));
801    }
802
803    #[test]
804    fn token_bucket_refills_over_time() {
805        let start = Instant::now();
806        let mut bucket = TokenBucket::new(40.0, 20.0, start);
807
808        // Drain the bucket
809        for _ in 0..40 {
810            bucket.try_consume(start);
811        }
812        assert!(!bucket.try_consume(start));
813
814        // After 100ms, should have 2 tokens (20/sec * 0.1s)
815        let later = start + Duration::from_millis(100);
816        assert!(bucket.try_consume(later));
817        assert!(bucket.try_consume(later));
818        assert!(!bucket.try_consume(later));
819    }
820
821    #[test]
822    fn token_bucket_caps_at_capacity() {
823        let start = Instant::now();
824        let mut bucket = TokenBucket::new(40.0, 20.0, start);
825
826        // Wait a long time - should still cap at 40
827        let later = start + Duration::from_secs(10);
828        for _ in 0..40 {
829            assert!(bucket.try_consume(later));
830        }
831        assert!(!bucket.try_consume(later));
832    }
833}