1use std::collections::HashMap;
2use std::io::{self, Write};
3use std::pin::Pin;
4use std::sync::OnceLock;
5use std::task::{Context, Poll};
6use std::time::Instant;
7
8use chrono::Local;
9use crossterm::{cursor, execute, terminal};
10use hyper::body::{Body, Bytes, Frame, SizeHint};
11use hyper::header::HeaderMap;
12use serde::Serialize;
13use tokio::sync::broadcast;
14
15use crate::request::Request;
16
17type BoxError = Box<dyn std::error::Error + Send + Sync>;
18
19pub const DATASTAR_VERSION: &str = "1.0.1";
21
22#[derive(Clone, Default)]
24pub struct StartupOptions {
25 pub watch: bool,
26 pub tls: Option<String>,
27 pub store: Option<String>,
28 pub topic: Option<String>,
29 pub expose: Option<String>,
30 pub services: bool,
31 pub datastar: bool,
32}
33
34struct TokenBucket {
37 tokens: f64,
38 capacity: f64,
39 refill_rate: f64,
40 last_refill: Instant,
41}
42
43impl TokenBucket {
44 fn new(capacity: f64, refill_rate: f64, now: Instant) -> Self {
45 Self {
46 tokens: capacity,
47 capacity,
48 refill_rate,
49 last_refill: now,
50 }
51 }
52
53 fn try_consume(&mut self, now: Instant) -> bool {
54 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
55 self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.capacity);
56 self.last_refill = now;
57
58 if self.tokens >= 1.0 {
59 self.tokens -= 1.0;
60 true
61 } else {
62 false
63 }
64 }
65}
66
67#[derive(Clone, Serialize)]
70pub struct RequestData {
71 pub proto: String,
72 pub method: String,
73 pub remote_ip: Option<String>,
74 pub remote_port: Option<u16>,
75 pub trusted_ip: Option<String>,
76 pub headers: HashMap<String, String>,
77 pub uri: String,
78 pub path: String,
79 pub query: HashMap<String, String>,
80}
81
82impl From<&Request> for RequestData {
83 fn from(req: &Request) -> Self {
84 Self {
85 proto: req.proto.to_string(),
86 method: req.method.to_string(),
87 remote_ip: req.remote_ip.map(|ip| ip.to_string()),
88 remote_port: req.remote_port,
89 trusted_ip: req.trusted_ip.map(|ip| ip.to_string()),
90 headers: req
91 .headers
92 .iter()
93 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
94 .collect(),
95 uri: req.uri.to_string(),
96 path: req.path.clone(),
97 query: req.query.clone(),
98 }
99 }
100}
101
102#[derive(Clone)]
103pub enum Event {
104 Request {
106 request_id: scru128::Scru128Id,
107 request: Box<RequestData>,
108 },
109 Response {
110 request_id: scru128::Scru128Id,
111 status: u16,
112 headers: HashMap<String, String>,
113 latency_ms: u64,
114 },
115 Complete {
116 request_id: scru128::Scru128Id,
117 bytes: u64,
118 duration_ms: u64,
119 },
120
121 Started {
123 address: String,
124 startup_ms: u64,
125 options: StartupOptions,
126 },
127 Reloaded,
128 Error {
129 error: String,
130 },
131 Print {
132 message: String,
133 },
134 Stopping {
135 inflight: usize,
136 },
137 Stopped,
138 StopTimedOut,
139 Shutdown,
140}
141
142static SENDER: OnceLock<broadcast::Sender<Event>> = OnceLock::new();
145
146pub fn init_broadcast() -> broadcast::Receiver<Event> {
147 let (tx, rx) = broadcast::channel(65536);
148 let _ = SENDER.set(tx);
149 rx
150}
151
152pub fn subscribe() -> Option<broadcast::Receiver<Event>> {
153 SENDER.get().map(|tx| tx.subscribe())
154}
155
156fn emit(event: Event) {
157 if let Some(tx) = SENDER.get() {
158 let _ = tx.send(event); }
160}
161
162pub fn log_request(request_id: scru128::Scru128Id, request: &Request) {
165 emit(Event::Request {
166 request_id,
167 request: Box::new(RequestData::from(request)),
168 });
169}
170
171pub fn log_response(
172 request_id: scru128::Scru128Id,
173 status: u16,
174 headers: &HeaderMap,
175 start_time: Instant,
176) {
177 let headers_map: HashMap<String, String> = headers
178 .iter()
179 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
180 .collect();
181
182 emit(Event::Response {
183 request_id,
184 status,
185 headers: headers_map,
186 latency_ms: start_time.elapsed().as_millis() as u64,
187 });
188}
189
190pub fn log_complete(request_id: scru128::Scru128Id, bytes: u64, response_time: Instant) {
191 emit(Event::Complete {
192 request_id,
193 bytes,
194 duration_ms: response_time.elapsed().as_millis() as u64,
195 });
196}
197
198pub fn log_started(address: &str, startup_ms: u128, options: StartupOptions) {
199 emit(Event::Started {
200 address: address.to_string(),
201 startup_ms: startup_ms as u64,
202 options,
203 });
204}
205
206pub fn log_reloaded() {
207 emit(Event::Reloaded);
208}
209
210pub fn log_error(error: &str) {
211 emit(Event::Error {
212 error: error.to_string(),
213 });
214}
215
216pub fn log_print(message: &str) {
217 emit(Event::Print {
218 message: message.to_string(),
219 });
220}
221
222pub fn log_stopping(inflight: usize) {
223 emit(Event::Stopping { inflight });
224}
225
226pub fn log_stopped() {
227 emit(Event::Stopped);
228}
229
230pub fn log_stop_timed_out() {
231 emit(Event::StopTimedOut);
232}
233
234pub fn shutdown() {
235 emit(Event::Shutdown);
236}
237
238pub fn run_jsonl_handler(rx: broadcast::Receiver<Event>) -> std::thread::JoinHandle<()> {
241 use std::io::Write;
242
243 std::thread::spawn(move || {
244 let mut rx = rx;
245 let mut stdout = std::io::BufWriter::new(std::io::stdout().lock());
246
247 loop {
248 let event = match rx.blocking_recv() {
249 Ok(event) => event,
250 Err(broadcast::error::RecvError::Lagged(n)) => {
251 let json = serde_json::json!({
252 "stamp": scru128::new().to_string(),
253 "message": "lagged",
254 "dropped": n,
255 });
256 if let Ok(line) = serde_json::to_string(&json) {
257 let _ = writeln!(stdout, "{line}");
258 let _ = stdout.flush();
259 }
260 continue;
261 }
262 Err(broadcast::error::RecvError::Closed) => break,
263 };
264
265 if matches!(event, Event::Shutdown) {
266 let _ = stdout.flush();
267 break;
268 }
269
270 let needs_flush = matches!(
271 &event,
272 Event::Started { .. }
273 | Event::Stopped
274 | Event::StopTimedOut
275 | Event::Reloaded
276 | Event::Error { .. }
277 | Event::Print { .. }
278 );
279
280 let stamp = scru128::new().to_string();
281
282 let json = match event {
283 Event::Request {
284 request_id,
285 request,
286 } => {
287 serde_json::json!({
288 "stamp": stamp,
289 "message": "request",
290 "request_id": request_id.to_string(),
291 "method": &request.method,
292 "path": &request.path,
293 "trusted_ip": &request.trusted_ip,
294 "request": request,
295 })
296 }
297 Event::Response {
298 request_id,
299 status,
300 headers,
301 latency_ms,
302 } => {
303 serde_json::json!({
304 "stamp": stamp,
305 "message": "response",
306 "request_id": request_id.to_string(),
307 "status": status,
308 "headers": headers,
309 "latency_ms": latency_ms,
310 })
311 }
312 Event::Complete {
313 request_id,
314 bytes,
315 duration_ms,
316 } => {
317 serde_json::json!({
318 "stamp": stamp,
319 "message": "complete",
320 "request_id": request_id.to_string(),
321 "bytes": bytes,
322 "duration_ms": duration_ms,
323 })
324 }
325 Event::Started {
326 address,
327 startup_ms,
328 options,
329 } => {
330 let xs_version: Option<&str> = if options.store.is_some() {
331 #[cfg(feature = "cross-stream")]
332 {
333 Some(env!("XS_VERSION"))
334 }
335 #[cfg(not(feature = "cross-stream"))]
336 {
337 None
338 }
339 } else {
340 None
341 };
342 serde_json::json!({
343 "stamp": stamp,
344 "message": "started",
345 "address": address,
346 "startup_ms": startup_ms,
347 "watch": options.watch,
348 "tls": options.tls,
349 "store": options.store,
350 "topic": options.topic,
351 "expose": options.expose,
352 "services": options.services,
353 "nu_version": env!("NU_VERSION"),
354 "xs_version": xs_version,
355 "datastar_version": DATASTAR_VERSION,
356 })
357 }
358 Event::Reloaded => {
359 serde_json::json!({
360 "stamp": stamp,
361 "message": "reloaded",
362 })
363 }
364 Event::Error { error } => {
365 serde_json::json!({
366 "stamp": stamp,
367 "message": "error",
368 "error": error,
369 })
370 }
371 Event::Print { message } => {
372 serde_json::json!({
373 "stamp": stamp,
374 "message": "print",
375 "content": message,
376 })
377 }
378 Event::Stopping { inflight } => {
379 serde_json::json!({
380 "stamp": stamp,
381 "message": "stopping",
382 "inflight": inflight,
383 })
384 }
385 Event::Stopped => {
386 serde_json::json!({
387 "stamp": stamp,
388 "message": "stopped",
389 })
390 }
391 Event::StopTimedOut => {
392 serde_json::json!({
393 "stamp": stamp,
394 "message": "stop_timed_out",
395 })
396 }
397 Event::Shutdown => unreachable!(),
398 };
399
400 if let Ok(line) = serde_json::to_string(&json) {
401 let _ = writeln!(stdout, "{line}");
402 }
403
404 if needs_flush || rx.is_empty() {
406 let _ = stdout.flush();
407 }
408 }
409
410 let _ = stdout.flush();
411 })
412}
413
414struct RequestState {
417 method: String,
418 path: String,
419 trusted_ip: Option<String>,
420 start_time: Instant,
421 status: Option<u16>,
422 latency_ms: Option<u64>,
423}
424
425fn truncate_middle(s: &str, max_len: usize) -> String {
426 if s.len() <= max_len {
427 return s.to_string();
428 }
429 let keep = (max_len - 3) / 2;
430 format!("{}...{}", &s[..keep], &s[s.len() - keep..])
431}
432
433struct ActiveZone {
434 stdout: io::Stdout,
435 line_count: usize,
436}
437
438impl ActiveZone {
439 fn new() -> Self {
440 Self {
441 stdout: io::stdout(),
442 line_count: 0,
443 }
444 }
445
446 fn clear(&mut self) {
448 if self.line_count > 0 {
449 let _ = execute!(
450 self.stdout,
451 cursor::MoveUp(self.line_count as u16),
452 terminal::Clear(terminal::ClearType::FromCursorDown)
453 );
454 self.line_count = 0;
455 }
456 }
457
458 fn print_permanent(&mut self, line: &str) {
460 self.clear();
461 println!("{line}");
462 let _ = self.stdout.flush();
463 }
464
465 fn redraw(&mut self, active_ids: &[String], requests: &HashMap<String, RequestState>) {
467 self.line_count = 0;
468 if !active_ids.is_empty() {
469 println!("· · · ✈ in flight · · ·");
470 self.line_count += 1;
471 for id in active_ids {
472 if let Some(state) = requests.get(id) {
473 let line = format_active_line(state);
474 println!("{line}");
475 self.line_count += 1;
476 }
477 }
478 }
479 let _ = self.stdout.flush();
480 }
481}
482
483fn format_active_line(state: &RequestState) -> String {
484 let timestamp = Local::now().format("%H:%M:%S%.3f");
485 let ip = state.trusted_ip.as_deref().unwrap_or("-");
486 let method = &state.method;
487 let path = truncate_middle(&state.path, 40);
488 let elapsed = state.start_time.elapsed().as_secs_f64();
489
490 match (state.status, state.latency_ms) {
491 (Some(status), Some(latency)) => {
492 format!(
493 "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {elapsed:>6.1}s"
494 )
495 }
496 _ => {
497 format!("{timestamp} {ip:>15} {method:<6} {path:<40} ... {elapsed:>6.1}s")
498 }
499 }
500}
501
502fn format_complete_line(state: &RequestState, duration_ms: u64, bytes: u64) -> String {
503 let timestamp = Local::now().format("%H:%M:%S%.3f");
504 let ip = state.trusted_ip.as_deref().unwrap_or("-");
505 let method = &state.method;
506 let path = truncate_middle(&state.path, 40);
507 let status = state.status.unwrap_or(0);
508 let latency = state.latency_ms.unwrap_or(0);
509
510 format!(
511 "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {duration_ms:>6}ms {bytes:>8}b"
512 )
513}
514
515pub fn run_human_handler(rx: broadcast::Receiver<Event>) -> std::thread::JoinHandle<()> {
516 std::thread::spawn(move || {
517 let mut rx = rx;
518 let mut zone = ActiveZone::new();
519 let mut requests: HashMap<String, RequestState> = HashMap::new();
520 let mut active_ids: Vec<String> = Vec::new();
521
522 let mut rate_limiter = TokenBucket::new(40.0, 20.0, Instant::now());
524 let mut skipped: u64 = 0;
525 let mut lagged: u64 = 0;
526
527 loop {
528 let event = match rx.blocking_recv() {
529 Ok(event) => event,
530 Err(broadcast::error::RecvError::Lagged(n)) => {
531 lagged += n;
532 requests.clear();
534 active_ids.clear();
535 zone.print_permanent(&format!(
536 "⚠ lagged: dropped {n} events, cleared in-flight"
537 ));
538 continue;
539 }
540 Err(broadcast::error::RecvError::Closed) => break,
541 };
542 match event {
543 Event::Started {
544 address,
545 startup_ms,
546 options,
547 } => {
548 let version = env!("CARGO_PKG_VERSION");
549 let pid = std::process::id();
550 let now = Local::now().to_rfc2822();
551 zone.print_permanent(&format!("<http-nu version=\"{version}\">"));
552 zone.print_permanent(" __ ,");
553 zone.print_permanent(&format!(
554 " .--()°'.' pid {pid} · {address} · {startup_ms}ms 💜"
555 ));
556 zone.print_permanent(&format!("'|, . ,' {now}"));
557
558 let mut http_opts = Vec::new();
560 if options.watch {
561 http_opts.push("watch".to_string());
562 }
563 if let Some(ref tls) = options.tls {
564 http_opts.push(format!("tls:{tls}"));
565 }
566
567 let mut xs_opts = Vec::new();
568 if let Some(ref store) = options.store {
569 xs_opts.push(store.to_string());
570 }
571 if let Some(ref topic) = options.topic {
572 xs_opts.push(format!("topic:{topic}"));
573 }
574 if let Some(ref expose) = options.expose {
575 xs_opts.push(expose.to_string());
576 }
577 if options.services {
578 xs_opts.push("services".to_string());
579 }
580
581 let mut versions = vec![format!("nu {}", env!("NU_VERSION"))];
583 #[cfg(feature = "cross-stream")]
584 if options.store.is_some() {
585 versions.push(format!("xs {}", env!("XS_VERSION")));
586 }
587 if options.datastar {
588 versions.push(format!("datastar {DATASTAR_VERSION}"));
589 }
590 let versions_str = versions.join(" · ");
591
592 let has_opts = !http_opts.is_empty() || !xs_opts.is_empty();
593 if has_opts {
594 let opts_str = match (http_opts.is_empty(), xs_opts.is_empty()) {
596 (false, true) => http_opts.join(" "),
597 (true, false) => format!("xs {}", xs_opts.join(" ")),
598 (false, false) => {
599 format!("{} │ xs {}", http_opts.join(" "), xs_opts.join(" "))
600 }
601 _ => unreachable!(),
602 };
603 zone.print_permanent(&format!(" !_-(_\\ {opts_str}"));
604 zone.print_permanent(&format!(" {versions_str}"));
605 } else {
606 zone.print_permanent(&format!(" !_-(_\\ {versions_str}"));
608 }
609
610 zone.redraw(&active_ids, &requests);
611 }
612 Event::Reloaded => {
613 zone.print_permanent("reloaded 🔄");
614 zone.redraw(&active_ids, &requests);
615 }
616 Event::Error { error } => {
617 zone.clear();
618 eprintln!("ERROR: {error}");
619 zone.redraw(&active_ids, &requests);
620 }
621 Event::Print { message } => {
622 zone.print_permanent(&format!("PRINT: {message}"));
623 zone.redraw(&active_ids, &requests);
624 }
625 Event::Stopping { inflight } => {
626 zone.print_permanent(&format!(
627 "stopping, {inflight} connection(s) in flight..."
628 ));
629 zone.redraw(&active_ids, &requests);
630 }
631 Event::Stopped => {
632 let timestamp = Local::now().format("%H:%M:%S%.3f");
633 zone.print_permanent(&format!("{timestamp} cu l8r"));
634 zone.clear();
635 println!("</http-nu>");
636 }
637 Event::StopTimedOut => {
638 zone.print_permanent("stop timed out, forcing exit");
639 }
640 Event::Request {
641 request_id,
642 request,
643 } => {
644 if !rate_limiter.try_consume(Instant::now()) {
645 skipped += 1;
646 continue;
647 }
648
649 if skipped > 0 {
651 zone.print_permanent(&format!("... skipped {skipped} requests"));
652 skipped = 0;
653 }
654
655 let id = request_id.to_string();
656 let state = RequestState {
657 method: request.method.clone(),
658 path: request.path.clone(),
659 trusted_ip: request.trusted_ip.clone(),
660 start_time: Instant::now(),
661 status: None,
662 latency_ms: None,
663 };
664 requests.insert(id.clone(), state);
665 active_ids.push(id);
666 zone.clear();
667 zone.redraw(&active_ids, &requests);
668 }
669 Event::Response {
670 request_id,
671 status,
672 latency_ms,
673 ..
674 } => {
675 let id = request_id.to_string();
676 if let Some(state) = requests.get_mut(&id) {
677 state.status = Some(status);
678 state.latency_ms = Some(latency_ms);
679 zone.clear();
680 zone.redraw(&active_ids, &requests);
681 }
682 }
683 Event::Complete {
684 request_id,
685 bytes,
686 duration_ms,
687 } => {
688 let id = request_id.to_string();
689 if let Some(state) = requests.remove(&id) {
690 active_ids.retain(|x| x != &id);
691 let line = format_complete_line(&state, duration_ms, bytes);
692 zone.print_permanent(&line);
693 zone.redraw(&active_ids, &requests);
694 }
695 }
696 Event::Shutdown => break,
697 }
698 }
699
700 zone.clear();
702 if skipped > 0 {
703 println!("... skipped {skipped} requests");
704 }
705 if lagged > 0 {
706 println!("⚠ total lagged: {lagged} events dropped");
707 }
708 })
709}
710
711pub struct RequestGuard {
714 request_id: scru128::Scru128Id,
715 start: Instant,
716 bytes_sent: u64,
717}
718
719impl RequestGuard {
720 pub fn new(request_id: scru128::Scru128Id) -> Self {
721 Self {
722 request_id,
723 start: Instant::now(),
724 bytes_sent: 0,
725 }
726 }
727
728 pub fn request_id(&self) -> scru128::Scru128Id {
729 self.request_id
730 }
731}
732
733impl Drop for RequestGuard {
734 fn drop(&mut self) {
735 log_complete(self.request_id, self.bytes_sent, self.start);
736 }
737}
738
739pub struct LoggingBody<B> {
742 inner: B,
743 guard: RequestGuard,
744}
745
746impl<B> LoggingBody<B> {
747 pub fn new(inner: B, guard: RequestGuard) -> Self {
748 Self { inner, guard }
749 }
750}
751
752impl<B> Body for LoggingBody<B>
753where
754 B: Body<Data = Bytes, Error = BoxError> + Unpin,
755{
756 type Data = Bytes;
757 type Error = BoxError;
758
759 fn poll_frame(
760 mut self: Pin<&mut Self>,
761 cx: &mut Context<'_>,
762 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
763 let inner = Pin::new(&mut self.inner);
764 match inner.poll_frame(cx) {
765 Poll::Ready(Some(Ok(frame))) => {
766 if let Some(data) = frame.data_ref() {
767 self.guard.bytes_sent += data.len() as u64;
768 }
769 Poll::Ready(Some(Ok(frame)))
770 }
771 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
772 Poll::Ready(None) => Poll::Ready(None),
773 Poll::Pending => Poll::Pending,
774 }
775 }
776
777 fn is_end_stream(&self) -> bool {
778 self.inner.is_end_stream()
779 }
780
781 fn size_hint(&self) -> SizeHint {
782 self.inner.size_hint()
783 }
784}
785
786#[cfg(test)]
789mod tests {
790 use super::*;
791 use std::time::Duration;
792
793 #[test]
794 fn token_bucket_allows_burst() {
795 let start = Instant::now();
796 let mut bucket = TokenBucket::new(40.0, 20.0, start);
797
798 for _ in 0..40 {
800 assert!(bucket.try_consume(start));
801 }
802 assert!(!bucket.try_consume(start));
804 }
805
806 #[test]
807 fn token_bucket_refills_over_time() {
808 let start = Instant::now();
809 let mut bucket = TokenBucket::new(40.0, 20.0, start);
810
811 for _ in 0..40 {
813 bucket.try_consume(start);
814 }
815 assert!(!bucket.try_consume(start));
816
817 let later = start + Duration::from_millis(100);
819 assert!(bucket.try_consume(later));
820 assert!(bucket.try_consume(later));
821 assert!(!bucket.try_consume(later));
822 }
823
824 #[test]
825 fn token_bucket_caps_at_capacity() {
826 let start = Instant::now();
827 let mut bucket = TokenBucket::new(40.0, 20.0, start);
828
829 let later = start + Duration::from_secs(10);
831 for _ in 0..40 {
832 assert!(bucket.try_consume(later));
833 }
834 assert!(!bucket.try_consume(later));
835 }
836}