1use std::collections::HashMap;
2use std::io::{self, Write};
3use std::pin::Pin;
4use std::sync::OnceLock;
5use std::task::{Context, Poll};
6use std::time::Instant;
7
8use chrono::Local;
9use crossterm::{cursor, execute, terminal};
10use hyper::body::{Body, Bytes, Frame, SizeHint};
11use hyper::header::HeaderMap;
12use serde::Serialize;
13use tokio::sync::broadcast;
14
15use crate::request::Request;
16
17type BoxError = Box<dyn std::error::Error + Send + Sync>;
18
19struct TokenBucket {
22 tokens: f64,
23 capacity: f64,
24 refill_rate: f64,
25 last_refill: Instant,
26}
27
28impl TokenBucket {
29 fn new(capacity: f64, refill_rate: f64, now: Instant) -> Self {
30 Self {
31 tokens: capacity,
32 capacity,
33 refill_rate,
34 last_refill: now,
35 }
36 }
37
38 fn try_consume(&mut self, now: Instant) -> bool {
39 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
40 self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.capacity);
41 self.last_refill = now;
42
43 if self.tokens >= 1.0 {
44 self.tokens -= 1.0;
45 true
46 } else {
47 false
48 }
49 }
50}
51
52#[derive(Clone, Serialize)]
55pub struct RequestData {
56 pub proto: String,
57 pub method: String,
58 pub remote_ip: Option<String>,
59 pub remote_port: Option<u16>,
60 pub trusted_ip: Option<String>,
61 pub headers: HashMap<String, String>,
62 pub uri: String,
63 pub path: String,
64 pub query: HashMap<String, String>,
65}
66
67impl From<&Request> for RequestData {
68 fn from(req: &Request) -> Self {
69 Self {
70 proto: req.proto.to_string(),
71 method: req.method.to_string(),
72 remote_ip: req.remote_ip.map(|ip| ip.to_string()),
73 remote_port: req.remote_port,
74 trusted_ip: req.trusted_ip.map(|ip| ip.to_string()),
75 headers: req
76 .headers
77 .iter()
78 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
79 .collect(),
80 uri: req.uri.to_string(),
81 path: req.path.clone(),
82 query: req.query.clone(),
83 }
84 }
85}
86
87#[derive(Clone)]
88pub enum Event {
89 Request {
91 request_id: scru128::Scru128Id,
92 request: Box<RequestData>,
93 },
94 Response {
95 request_id: scru128::Scru128Id,
96 status: u16,
97 headers: HashMap<String, String>,
98 latency_ms: u64,
99 },
100 Complete {
101 request_id: scru128::Scru128Id,
102 bytes: u64,
103 duration_ms: u64,
104 },
105
106 Started {
108 address: String,
109 startup_ms: u64,
110 },
111 Reloaded,
112 Error {
113 error: String,
114 },
115 Print {
116 message: String,
117 },
118 Stopping {
119 inflight: usize,
120 },
121 Stopped,
122 StopTimedOut,
123 Shutdown,
124}
125
126static SENDER: OnceLock<broadcast::Sender<Event>> = OnceLock::new();
129
130pub fn init_broadcast() -> broadcast::Receiver<Event> {
131 let (tx, rx) = broadcast::channel(65536);
132 let _ = SENDER.set(tx);
133 rx
134}
135
136pub fn subscribe() -> Option<broadcast::Receiver<Event>> {
137 SENDER.get().map(|tx| tx.subscribe())
138}
139
140fn emit(event: Event) {
141 if let Some(tx) = SENDER.get() {
142 let _ = tx.send(event); }
144}
145
146pub fn log_request(request_id: scru128::Scru128Id, request: &Request) {
149 emit(Event::Request {
150 request_id,
151 request: Box::new(RequestData::from(request)),
152 });
153}
154
155pub fn log_response(
156 request_id: scru128::Scru128Id,
157 status: u16,
158 headers: &HeaderMap,
159 start_time: Instant,
160) {
161 let headers_map: HashMap<String, String> = headers
162 .iter()
163 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
164 .collect();
165
166 emit(Event::Response {
167 request_id,
168 status,
169 headers: headers_map,
170 latency_ms: start_time.elapsed().as_millis() as u64,
171 });
172}
173
174pub fn log_complete(request_id: scru128::Scru128Id, bytes: u64, response_time: Instant) {
175 emit(Event::Complete {
176 request_id,
177 bytes,
178 duration_ms: response_time.elapsed().as_millis() as u64,
179 });
180}
181
182pub fn log_started(address: &str, startup_ms: u128) {
183 emit(Event::Started {
184 address: address.to_string(),
185 startup_ms: startup_ms as u64,
186 });
187}
188
189pub fn log_reloaded() {
190 emit(Event::Reloaded);
191}
192
193pub fn log_error(error: &str) {
194 emit(Event::Error {
195 error: error.to_string(),
196 });
197}
198
199pub fn log_print(message: &str) {
200 emit(Event::Print {
201 message: message.to_string(),
202 });
203}
204
205pub fn log_stopping(inflight: usize) {
206 emit(Event::Stopping { inflight });
207}
208
209pub fn log_stopped() {
210 emit(Event::Stopped);
211}
212
213pub fn log_stop_timed_out() {
214 emit(Event::StopTimedOut);
215}
216
217pub fn shutdown() {
218 emit(Event::Shutdown);
219}
220
221pub fn run_jsonl_handler(rx: broadcast::Receiver<Event>) -> std::thread::JoinHandle<()> {
224 use std::io::Write;
225
226 std::thread::spawn(move || {
227 let mut rx = rx;
228 let mut stdout = std::io::BufWriter::new(std::io::stdout().lock());
229
230 loop {
231 let event = match rx.blocking_recv() {
232 Ok(event) => event,
233 Err(broadcast::error::RecvError::Lagged(n)) => {
234 let json = serde_json::json!({
235 "stamp": scru128::new().to_string(),
236 "message": "lagged",
237 "dropped": n,
238 });
239 if let Ok(line) = serde_json::to_string(&json) {
240 let _ = writeln!(stdout, "{line}");
241 let _ = stdout.flush();
242 }
243 continue;
244 }
245 Err(broadcast::error::RecvError::Closed) => break,
246 };
247
248 if matches!(event, Event::Shutdown) {
249 let _ = stdout.flush();
250 break;
251 }
252
253 let needs_flush = matches!(
254 &event,
255 Event::Started { .. }
256 | Event::Stopped
257 | Event::StopTimedOut
258 | Event::Reloaded
259 | Event::Error { .. }
260 | Event::Print { .. }
261 );
262
263 let stamp = scru128::new().to_string();
264
265 let json = match event {
266 Event::Request {
267 request_id,
268 request,
269 } => {
270 serde_json::json!({
271 "stamp": stamp,
272 "message": "request",
273 "request_id": request_id.to_string(),
274 "method": &request.method,
275 "path": &request.path,
276 "trusted_ip": &request.trusted_ip,
277 "request": request,
278 })
279 }
280 Event::Response {
281 request_id,
282 status,
283 headers,
284 latency_ms,
285 } => {
286 serde_json::json!({
287 "stamp": stamp,
288 "message": "response",
289 "request_id": request_id.to_string(),
290 "status": status,
291 "headers": headers,
292 "latency_ms": latency_ms,
293 })
294 }
295 Event::Complete {
296 request_id,
297 bytes,
298 duration_ms,
299 } => {
300 serde_json::json!({
301 "stamp": stamp,
302 "message": "complete",
303 "request_id": request_id.to_string(),
304 "bytes": bytes,
305 "duration_ms": duration_ms,
306 })
307 }
308 Event::Started {
309 address,
310 startup_ms,
311 } => {
312 serde_json::json!({
313 "stamp": stamp,
314 "message": "started",
315 "address": address,
316 "startup_ms": startup_ms,
317 })
318 }
319 Event::Reloaded => {
320 serde_json::json!({
321 "stamp": stamp,
322 "message": "reloaded",
323 })
324 }
325 Event::Error { error } => {
326 serde_json::json!({
327 "stamp": stamp,
328 "message": "error",
329 "error": error,
330 })
331 }
332 Event::Print { message } => {
333 serde_json::json!({
334 "stamp": stamp,
335 "message": "print",
336 "content": message,
337 })
338 }
339 Event::Stopping { inflight } => {
340 serde_json::json!({
341 "stamp": stamp,
342 "message": "stopping",
343 "inflight": inflight,
344 })
345 }
346 Event::Stopped => {
347 serde_json::json!({
348 "stamp": stamp,
349 "message": "stopped",
350 })
351 }
352 Event::StopTimedOut => {
353 serde_json::json!({
354 "stamp": stamp,
355 "message": "stop_timed_out",
356 })
357 }
358 Event::Shutdown => unreachable!(),
359 };
360
361 if let Ok(line) = serde_json::to_string(&json) {
362 let _ = writeln!(stdout, "{line}");
363 }
364
365 if needs_flush || rx.is_empty() {
367 let _ = stdout.flush();
368 }
369 }
370
371 let _ = stdout.flush();
372 })
373}
374
375struct RequestState {
378 method: String,
379 path: String,
380 trusted_ip: Option<String>,
381 start_time: Instant,
382 status: Option<u16>,
383 latency_ms: Option<u64>,
384}
385
386fn truncate_middle(s: &str, max_len: usize) -> String {
387 if s.len() <= max_len {
388 return s.to_string();
389 }
390 let keep = (max_len - 3) / 2;
391 format!("{}...{}", &s[..keep], &s[s.len() - keep..])
392}
393
394struct ActiveZone {
395 stdout: io::Stdout,
396 line_count: usize,
397}
398
399impl ActiveZone {
400 fn new() -> Self {
401 Self {
402 stdout: io::stdout(),
403 line_count: 0,
404 }
405 }
406
407 fn clear(&mut self) {
409 if self.line_count > 0 {
410 let _ = execute!(
411 self.stdout,
412 cursor::MoveUp(self.line_count as u16),
413 terminal::Clear(terminal::ClearType::FromCursorDown)
414 );
415 self.line_count = 0;
416 }
417 }
418
419 fn print_permanent(&mut self, line: &str) {
421 self.clear();
422 println!("{line}");
423 let _ = self.stdout.flush();
424 }
425
426 fn redraw(&mut self, active_ids: &[String], requests: &HashMap<String, RequestState>) {
428 self.line_count = 0;
429 if !active_ids.is_empty() {
430 println!("· · · ✈ in flight · · ·");
431 self.line_count += 1;
432 for id in active_ids {
433 if let Some(state) = requests.get(id) {
434 let line = format_active_line(state);
435 println!("{line}");
436 self.line_count += 1;
437 }
438 }
439 }
440 let _ = self.stdout.flush();
441 }
442}
443
444fn format_active_line(state: &RequestState) -> String {
445 let timestamp = Local::now().format("%H:%M:%S%.3f");
446 let ip = state.trusted_ip.as_deref().unwrap_or("-");
447 let method = &state.method;
448 let path = truncate_middle(&state.path, 40);
449 let elapsed = state.start_time.elapsed().as_secs_f64();
450
451 match (state.status, state.latency_ms) {
452 (Some(status), Some(latency)) => {
453 format!(
454 "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {elapsed:>6.1}s"
455 )
456 }
457 _ => {
458 format!("{timestamp} {ip:>15} {method:<6} {path:<40} ... {elapsed:>6.1}s")
459 }
460 }
461}
462
463fn format_complete_line(state: &RequestState, duration_ms: u64, bytes: u64) -> String {
464 let timestamp = Local::now().format("%H:%M:%S%.3f");
465 let ip = state.trusted_ip.as_deref().unwrap_or("-");
466 let method = &state.method;
467 let path = truncate_middle(&state.path, 40);
468 let status = state.status.unwrap_or(0);
469 let latency = state.latency_ms.unwrap_or(0);
470
471 format!(
472 "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {duration_ms:>6}ms {bytes:>8}b"
473 )
474}
475
476pub fn run_human_handler(rx: broadcast::Receiver<Event>) -> std::thread::JoinHandle<()> {
477 std::thread::spawn(move || {
478 let mut rx = rx;
479 let mut zone = ActiveZone::new();
480 let mut requests: HashMap<String, RequestState> = HashMap::new();
481 let mut active_ids: Vec<String> = Vec::new();
482
483 let mut rate_limiter = TokenBucket::new(40.0, 20.0, Instant::now());
485 let mut skipped: u64 = 0;
486 let mut lagged: u64 = 0;
487
488 loop {
489 let event = match rx.blocking_recv() {
490 Ok(event) => event,
491 Err(broadcast::error::RecvError::Lagged(n)) => {
492 lagged += n;
493 requests.clear();
495 active_ids.clear();
496 zone.print_permanent(&format!(
497 "⚠ lagged: dropped {n} events, cleared in-flight"
498 ));
499 continue;
500 }
501 Err(broadcast::error::RecvError::Closed) => break,
502 };
503 match event {
504 Event::Started {
505 address,
506 startup_ms,
507 } => {
508 let version = env!("CARGO_PKG_VERSION");
509 let pid = std::process::id();
510 let now = Local::now().to_rfc2822();
511 zone.print_permanent(&format!("<http-nu version=\"{version}\">"));
512 zone.print_permanent(" __ ,");
513 zone.print_permanent(&format!(
514 " .--()°'.' pid {pid} · {address} · {startup_ms}ms 💜"
515 ));
516 zone.print_permanent(&format!("'|, . ,' {now}"));
517 zone.print_permanent(" !_-(_\\");
518 zone.redraw(&active_ids, &requests);
519 }
520 Event::Reloaded => {
521 zone.print_permanent("reloaded 🔄");
522 zone.redraw(&active_ids, &requests);
523 }
524 Event::Error { error } => {
525 zone.clear();
526 eprintln!("ERROR: {error}");
527 zone.redraw(&active_ids, &requests);
528 }
529 Event::Print { message } => {
530 zone.print_permanent(&format!("PRINT: {message}"));
531 zone.redraw(&active_ids, &requests);
532 }
533 Event::Stopping { inflight } => {
534 zone.print_permanent(&format!(
535 "stopping, {inflight} connection(s) in flight..."
536 ));
537 zone.redraw(&active_ids, &requests);
538 }
539 Event::Stopped => {
540 let timestamp = Local::now().format("%H:%M:%S%.3f");
541 zone.print_permanent(&format!("{timestamp} cu l8r"));
542 zone.clear();
543 println!("</http-nu>");
544 }
545 Event::StopTimedOut => {
546 zone.print_permanent("stop timed out, forcing exit");
547 }
548 Event::Request {
549 request_id,
550 request,
551 } => {
552 if !rate_limiter.try_consume(Instant::now()) {
553 skipped += 1;
554 continue;
555 }
556
557 if skipped > 0 {
559 zone.print_permanent(&format!("... skipped {skipped} requests"));
560 skipped = 0;
561 }
562
563 let id = request_id.to_string();
564 let state = RequestState {
565 method: request.method.clone(),
566 path: request.path.clone(),
567 trusted_ip: request.trusted_ip.clone(),
568 start_time: Instant::now(),
569 status: None,
570 latency_ms: None,
571 };
572 requests.insert(id.clone(), state);
573 active_ids.push(id);
574 zone.clear();
575 zone.redraw(&active_ids, &requests);
576 }
577 Event::Response {
578 request_id,
579 status,
580 latency_ms,
581 ..
582 } => {
583 let id = request_id.to_string();
584 if let Some(state) = requests.get_mut(&id) {
585 state.status = Some(status);
586 state.latency_ms = Some(latency_ms);
587 zone.clear();
588 zone.redraw(&active_ids, &requests);
589 }
590 }
591 Event::Complete {
592 request_id,
593 bytes,
594 duration_ms,
595 } => {
596 let id = request_id.to_string();
597 if let Some(state) = requests.remove(&id) {
598 active_ids.retain(|x| x != &id);
599 let line = format_complete_line(&state, duration_ms, bytes);
600 zone.print_permanent(&line);
601 zone.redraw(&active_ids, &requests);
602 }
603 }
604 Event::Shutdown => break,
605 }
606 }
607
608 zone.clear();
610 if skipped > 0 {
611 println!("... skipped {skipped} requests");
612 }
613 if lagged > 0 {
614 println!("⚠ total lagged: {lagged} events dropped");
615 }
616 })
617}
618
619pub struct RequestGuard {
622 request_id: scru128::Scru128Id,
623 start: Instant,
624 bytes_sent: u64,
625}
626
627impl RequestGuard {
628 pub fn new(request_id: scru128::Scru128Id) -> Self {
629 Self {
630 request_id,
631 start: Instant::now(),
632 bytes_sent: 0,
633 }
634 }
635
636 pub fn request_id(&self) -> scru128::Scru128Id {
637 self.request_id
638 }
639}
640
641impl Drop for RequestGuard {
642 fn drop(&mut self) {
643 log_complete(self.request_id, self.bytes_sent, self.start);
644 }
645}
646
647pub struct LoggingBody<B> {
650 inner: B,
651 guard: RequestGuard,
652}
653
654impl<B> LoggingBody<B> {
655 pub fn new(inner: B, guard: RequestGuard) -> Self {
656 Self { inner, guard }
657 }
658}
659
660impl<B> Body for LoggingBody<B>
661where
662 B: Body<Data = Bytes, Error = BoxError> + Unpin,
663{
664 type Data = Bytes;
665 type Error = BoxError;
666
667 fn poll_frame(
668 mut self: Pin<&mut Self>,
669 cx: &mut Context<'_>,
670 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
671 let inner = Pin::new(&mut self.inner);
672 match inner.poll_frame(cx) {
673 Poll::Ready(Some(Ok(frame))) => {
674 if let Some(data) = frame.data_ref() {
675 self.guard.bytes_sent += data.len() as u64;
676 }
677 Poll::Ready(Some(Ok(frame)))
678 }
679 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
680 Poll::Ready(None) => Poll::Ready(None),
681 Poll::Pending => Poll::Pending,
682 }
683 }
684
685 fn is_end_stream(&self) -> bool {
686 self.inner.is_end_stream()
687 }
688
689 fn size_hint(&self) -> SizeHint {
690 self.inner.size_hint()
691 }
692}
693
694#[cfg(test)]
697mod tests {
698 use super::*;
699 use std::time::Duration;
700
701 #[test]
702 fn token_bucket_allows_burst() {
703 let start = Instant::now();
704 let mut bucket = TokenBucket::new(40.0, 20.0, start);
705
706 for _ in 0..40 {
708 assert!(bucket.try_consume(start));
709 }
710 assert!(!bucket.try_consume(start));
712 }
713
714 #[test]
715 fn token_bucket_refills_over_time() {
716 let start = Instant::now();
717 let mut bucket = TokenBucket::new(40.0, 20.0, start);
718
719 for _ in 0..40 {
721 bucket.try_consume(start);
722 }
723 assert!(!bucket.try_consume(start));
724
725 let later = start + Duration::from_millis(100);
727 assert!(bucket.try_consume(later));
728 assert!(bucket.try_consume(later));
729 assert!(!bucket.try_consume(later));
730 }
731
732 #[test]
733 fn token_bucket_caps_at_capacity() {
734 let start = Instant::now();
735 let mut bucket = TokenBucket::new(40.0, 20.0, start);
736
737 let later = start + Duration::from_secs(10);
739 for _ in 0..40 {
740 assert!(bucket.try_consume(later));
741 }
742 assert!(!bucket.try_consume(later));
743 }
744}