1use std::collections::HashMap;
2use std::io::{self, Write};
3use std::pin::Pin;
4use std::sync::OnceLock;
5use std::task::{Context, Poll};
6use std::time::Instant;
7
8use chrono::Local;
9use crossterm::{cursor, execute, terminal};
10use hyper::body::{Body, Bytes, Frame, SizeHint};
11use hyper::header::HeaderMap;
12use serde::Serialize;
13use tokio::sync::broadcast;
14
15use crate::request::Request;
16
17type BoxError = Box<dyn std::error::Error + Send + Sync>;
18
19struct TokenBucket {
22 tokens: f64,
23 capacity: f64,
24 refill_rate: f64,
25 last_refill: Instant,
26}
27
28impl TokenBucket {
29 fn new(capacity: f64, refill_rate: f64, now: Instant) -> Self {
30 Self {
31 tokens: capacity,
32 capacity,
33 refill_rate,
34 last_refill: now,
35 }
36 }
37
38 fn try_consume(&mut self, now: Instant) -> bool {
39 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
40 self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.capacity);
41 self.last_refill = now;
42
43 if self.tokens >= 1.0 {
44 self.tokens -= 1.0;
45 true
46 } else {
47 false
48 }
49 }
50}
51
52#[derive(Clone, Serialize)]
55pub struct RequestData {
56 pub proto: String,
57 pub method: String,
58 pub remote_ip: Option<String>,
59 pub remote_port: Option<u16>,
60 pub trusted_ip: Option<String>,
61 pub headers: HashMap<String, String>,
62 pub uri: String,
63 pub path: String,
64 pub query: HashMap<String, String>,
65}
66
67impl From<&Request> for RequestData {
68 fn from(req: &Request) -> Self {
69 Self {
70 proto: req.proto.to_string(),
71 method: req.method.to_string(),
72 remote_ip: req.remote_ip.map(|ip| ip.to_string()),
73 remote_port: req.remote_port,
74 trusted_ip: req.trusted_ip.map(|ip| ip.to_string()),
75 headers: req
76 .headers
77 .iter()
78 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
79 .collect(),
80 uri: req.uri.to_string(),
81 path: req.path.clone(),
82 query: req.query.clone(),
83 }
84 }
85}
86
87#[derive(Clone)]
88pub enum Event {
89 Request {
91 request_id: scru128::Scru128Id,
92 request: Box<RequestData>,
93 },
94 Response {
95 request_id: scru128::Scru128Id,
96 status: u16,
97 headers: HashMap<String, String>,
98 latency_ms: u64,
99 },
100 Complete {
101 request_id: scru128::Scru128Id,
102 bytes: u64,
103 duration_ms: u64,
104 },
105
106 Started {
108 address: String,
109 startup_ms: u64,
110 },
111 Reloaded,
112 Error {
113 error: String,
114 },
115 Stopping {
116 inflight: usize,
117 },
118 Stopped,
119 StopTimedOut,
120}
121
122static SENDER: OnceLock<broadcast::Sender<Event>> = OnceLock::new();
125
126pub fn init_broadcast() -> broadcast::Receiver<Event> {
127 let (tx, rx) = broadcast::channel(65536);
128 let _ = SENDER.set(tx);
129 rx
130}
131
132pub fn subscribe() -> Option<broadcast::Receiver<Event>> {
133 SENDER.get().map(|tx| tx.subscribe())
134}
135
136fn emit(event: Event) {
137 if let Some(tx) = SENDER.get() {
138 let _ = tx.send(event); }
140}
141
142pub fn log_request(request_id: scru128::Scru128Id, request: &Request) {
145 emit(Event::Request {
146 request_id,
147 request: Box::new(RequestData::from(request)),
148 });
149}
150
151pub fn log_response(
152 request_id: scru128::Scru128Id,
153 status: u16,
154 headers: &HeaderMap,
155 start_time: Instant,
156) {
157 let headers_map: HashMap<String, String> = headers
158 .iter()
159 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
160 .collect();
161
162 emit(Event::Response {
163 request_id,
164 status,
165 headers: headers_map,
166 latency_ms: start_time.elapsed().as_millis() as u64,
167 });
168}
169
170pub fn log_complete(request_id: scru128::Scru128Id, bytes: u64, response_time: Instant) {
171 emit(Event::Complete {
172 request_id,
173 bytes,
174 duration_ms: response_time.elapsed().as_millis() as u64,
175 });
176}
177
178pub fn log_started(address: &str, startup_ms: u128) {
179 emit(Event::Started {
180 address: address.to_string(),
181 startup_ms: startup_ms as u64,
182 });
183}
184
185pub fn log_reloaded() {
186 emit(Event::Reloaded);
187}
188
189pub fn log_error(error: &str) {
190 emit(Event::Error {
191 error: error.to_string(),
192 });
193}
194
195pub fn log_stopping(inflight: usize) {
196 emit(Event::Stopping { inflight });
197}
198
199pub fn log_stopped() {
200 emit(Event::Stopped);
201}
202
203pub fn log_stop_timed_out() {
204 emit(Event::StopTimedOut);
205}
206
207pub fn run_jsonl_handler(rx: broadcast::Receiver<Event>) {
210 use std::io::Write;
211
212 std::thread::spawn(move || {
213 let mut rx = rx;
214 let mut stdout = std::io::BufWriter::new(std::io::stdout().lock());
215
216 loop {
217 let event = match rx.blocking_recv() {
218 Ok(event) => event,
219 Err(broadcast::error::RecvError::Lagged(n)) => {
220 let json = serde_json::json!({
221 "stamp": scru128::new().to_string(),
222 "message": "lagged",
223 "dropped": n,
224 });
225 if let Ok(line) = serde_json::to_string(&json) {
226 let _ = writeln!(stdout, "{line}");
227 let _ = stdout.flush();
228 }
229 continue;
230 }
231 Err(broadcast::error::RecvError::Closed) => break,
232 };
233
234 let needs_flush = matches!(
235 &event,
236 Event::Started { .. }
237 | Event::Stopped
238 | Event::StopTimedOut
239 | Event::Reloaded
240 | Event::Error { .. }
241 );
242
243 let stamp = scru128::new().to_string();
244
245 let json = match event {
246 Event::Request {
247 request_id,
248 request,
249 } => {
250 serde_json::json!({
251 "stamp": stamp,
252 "message": "request",
253 "request_id": request_id.to_string(),
254 "method": &request.method,
255 "path": &request.path,
256 "trusted_ip": &request.trusted_ip,
257 "request": request,
258 })
259 }
260 Event::Response {
261 request_id,
262 status,
263 headers,
264 latency_ms,
265 } => {
266 serde_json::json!({
267 "stamp": stamp,
268 "message": "response",
269 "request_id": request_id.to_string(),
270 "status": status,
271 "headers": headers,
272 "latency_ms": latency_ms,
273 })
274 }
275 Event::Complete {
276 request_id,
277 bytes,
278 duration_ms,
279 } => {
280 serde_json::json!({
281 "stamp": stamp,
282 "message": "complete",
283 "request_id": request_id.to_string(),
284 "bytes": bytes,
285 "duration_ms": duration_ms,
286 })
287 }
288 Event::Started {
289 address,
290 startup_ms,
291 } => {
292 serde_json::json!({
293 "stamp": stamp,
294 "message": "started",
295 "address": address,
296 "startup_ms": startup_ms,
297 })
298 }
299 Event::Reloaded => {
300 serde_json::json!({
301 "stamp": stamp,
302 "message": "reloaded",
303 })
304 }
305 Event::Error { error } => {
306 serde_json::json!({
307 "stamp": stamp,
308 "message": "error",
309 "error": error,
310 })
311 }
312 Event::Stopping { inflight } => {
313 serde_json::json!({
314 "stamp": stamp,
315 "message": "stopping",
316 "inflight": inflight,
317 })
318 }
319 Event::Stopped => {
320 serde_json::json!({
321 "stamp": stamp,
322 "message": "stopped",
323 })
324 }
325 Event::StopTimedOut => {
326 serde_json::json!({
327 "stamp": stamp,
328 "message": "stop_timed_out",
329 })
330 }
331 };
332
333 if let Ok(line) = serde_json::to_string(&json) {
334 let _ = writeln!(stdout, "{line}");
335 }
336
337 if needs_flush || rx.is_empty() {
339 let _ = stdout.flush();
340 }
341 }
342
343 let _ = stdout.flush();
344 });
345}
346
347struct RequestState {
350 method: String,
351 path: String,
352 trusted_ip: Option<String>,
353 start_time: Instant,
354 status: Option<u16>,
355 latency_ms: Option<u64>,
356}
357
358fn truncate_middle(s: &str, max_len: usize) -> String {
359 if s.len() <= max_len {
360 return s.to_string();
361 }
362 let keep = (max_len - 3) / 2;
363 format!("{}...{}", &s[..keep], &s[s.len() - keep..])
364}
365
366struct ActiveZone {
367 stdout: io::Stdout,
368 line_count: usize,
369}
370
371impl ActiveZone {
372 fn new() -> Self {
373 Self {
374 stdout: io::stdout(),
375 line_count: 0,
376 }
377 }
378
379 fn clear(&mut self) {
381 if self.line_count > 0 {
382 let _ = execute!(
383 self.stdout,
384 cursor::MoveUp(self.line_count as u16),
385 terminal::Clear(terminal::ClearType::FromCursorDown)
386 );
387 self.line_count = 0;
388 }
389 }
390
391 fn print_permanent(&mut self, line: &str) {
393 self.clear();
394 println!("{line}");
395 let _ = self.stdout.flush();
396 }
397
398 fn redraw(&mut self, active_ids: &[String], requests: &HashMap<String, RequestState>) {
400 self.line_count = 0;
401 if !active_ids.is_empty() {
402 println!("· · · ✈ in flight · · ·");
403 self.line_count += 1;
404 for id in active_ids {
405 if let Some(state) = requests.get(id) {
406 let line = format_active_line(state);
407 println!("{line}");
408 self.line_count += 1;
409 }
410 }
411 }
412 let _ = self.stdout.flush();
413 }
414}
415
416fn format_active_line(state: &RequestState) -> String {
417 let timestamp = Local::now().format("%H:%M:%S%.3f");
418 let ip = state.trusted_ip.as_deref().unwrap_or("-");
419 let method = &state.method;
420 let path = truncate_middle(&state.path, 40);
421 let elapsed = state.start_time.elapsed().as_secs_f64();
422
423 match (state.status, state.latency_ms) {
424 (Some(status), Some(latency)) => {
425 format!(
426 "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {elapsed:>6.1}s"
427 )
428 }
429 _ => {
430 format!("{timestamp} {ip:>15} {method:<6} {path:<40} ... {elapsed:>6.1}s")
431 }
432 }
433}
434
435fn format_complete_line(state: &RequestState, duration_ms: u64, bytes: u64) -> String {
436 let timestamp = Local::now().format("%H:%M:%S%.3f");
437 let ip = state.trusted_ip.as_deref().unwrap_or("-");
438 let method = &state.method;
439 let path = truncate_middle(&state.path, 40);
440 let status = state.status.unwrap_or(0);
441 let latency = state.latency_ms.unwrap_or(0);
442
443 format!(
444 "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {duration_ms:>6}ms {bytes:>8}b"
445 )
446}
447
448pub fn run_human_handler(rx: broadcast::Receiver<Event>) {
449 std::thread::spawn(move || {
450 let mut rx = rx;
451 let mut zone = ActiveZone::new();
452 let mut requests: HashMap<String, RequestState> = HashMap::new();
453 let mut active_ids: Vec<String> = Vec::new();
454
455 let mut rate_limiter = TokenBucket::new(40.0, 20.0, Instant::now());
457 let mut skipped: u64 = 0;
458 let mut lagged: u64 = 0;
459
460 loop {
461 let event = match rx.blocking_recv() {
462 Ok(event) => event,
463 Err(broadcast::error::RecvError::Lagged(n)) => {
464 lagged += n;
465 requests.clear();
467 active_ids.clear();
468 zone.print_permanent(&format!(
469 "⚠ lagged: dropped {n} events, cleared in-flight"
470 ));
471 continue;
472 }
473 Err(broadcast::error::RecvError::Closed) => break,
474 };
475 match event {
476 Event::Started {
477 address,
478 startup_ms,
479 } => {
480 let version = env!("CARGO_PKG_VERSION");
481 let pid = std::process::id();
482 let now = Local::now().to_rfc2822();
483 zone.print_permanent(&format!("<http-nu version=\"{version}\">"));
484 zone.print_permanent(" __ ,");
485 zone.print_permanent(&format!(
486 " .--()°'.' pid {pid} · {address} · {startup_ms}ms 💜"
487 ));
488 zone.print_permanent(&format!("'|, . ,' {now}"));
489 zone.print_permanent(" !_-(_\\");
490 zone.redraw(&active_ids, &requests);
491 }
492 Event::Reloaded => {
493 zone.print_permanent("reloaded 🔄");
494 zone.redraw(&active_ids, &requests);
495 }
496 Event::Error { error } => {
497 zone.clear();
498 eprintln!("ERROR: {error}");
499 zone.redraw(&active_ids, &requests);
500 }
501 Event::Stopping { inflight } => {
502 zone.print_permanent(&format!(
503 "stopping, {inflight} connection(s) in flight..."
504 ));
505 zone.redraw(&active_ids, &requests);
506 }
507 Event::Stopped => {
508 zone.clear();
509 println!("cu l8r </http-nu>");
510 }
511 Event::StopTimedOut => {
512 zone.print_permanent("stop timed out, forcing exit");
513 }
514 Event::Request {
515 request_id,
516 request,
517 } => {
518 if !rate_limiter.try_consume(Instant::now()) {
519 skipped += 1;
520 continue;
521 }
522
523 if skipped > 0 {
525 zone.print_permanent(&format!("... skipped {skipped} requests"));
526 skipped = 0;
527 }
528
529 let id = request_id.to_string();
530 let state = RequestState {
531 method: request.method.clone(),
532 path: request.path.clone(),
533 trusted_ip: request.trusted_ip.clone(),
534 start_time: Instant::now(),
535 status: None,
536 latency_ms: None,
537 };
538 requests.insert(id.clone(), state);
539 active_ids.push(id);
540 zone.clear();
541 zone.redraw(&active_ids, &requests);
542 }
543 Event::Response {
544 request_id,
545 status,
546 latency_ms,
547 ..
548 } => {
549 let id = request_id.to_string();
550 if let Some(state) = requests.get_mut(&id) {
551 state.status = Some(status);
552 state.latency_ms = Some(latency_ms);
553 zone.clear();
554 zone.redraw(&active_ids, &requests);
555 }
556 }
557 Event::Complete {
558 request_id,
559 bytes,
560 duration_ms,
561 } => {
562 let id = request_id.to_string();
563 if let Some(state) = requests.remove(&id) {
564 active_ids.retain(|x| x != &id);
565 let line = format_complete_line(&state, duration_ms, bytes);
566 zone.print_permanent(&line);
567 zone.redraw(&active_ids, &requests);
568 }
569 }
570 }
571 }
572
573 zone.clear();
575 if skipped > 0 {
576 println!("... skipped {skipped} requests");
577 }
578 if lagged > 0 {
579 println!("⚠ total lagged: {lagged} events dropped");
580 }
581 });
582}
583
584pub struct RequestGuard {
587 request_id: scru128::Scru128Id,
588 start: Instant,
589 bytes_sent: u64,
590}
591
592impl RequestGuard {
593 pub fn new(request_id: scru128::Scru128Id) -> Self {
594 Self {
595 request_id,
596 start: Instant::now(),
597 bytes_sent: 0,
598 }
599 }
600
601 pub fn request_id(&self) -> scru128::Scru128Id {
602 self.request_id
603 }
604}
605
606impl Drop for RequestGuard {
607 fn drop(&mut self) {
608 log_complete(self.request_id, self.bytes_sent, self.start);
609 }
610}
611
612pub struct LoggingBody<B> {
615 inner: B,
616 guard: RequestGuard,
617}
618
619impl<B> LoggingBody<B> {
620 pub fn new(inner: B, guard: RequestGuard) -> Self {
621 Self { inner, guard }
622 }
623}
624
625impl<B> Body for LoggingBody<B>
626where
627 B: Body<Data = Bytes, Error = BoxError> + Unpin,
628{
629 type Data = Bytes;
630 type Error = BoxError;
631
632 fn poll_frame(
633 mut self: Pin<&mut Self>,
634 cx: &mut Context<'_>,
635 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
636 let inner = Pin::new(&mut self.inner);
637 match inner.poll_frame(cx) {
638 Poll::Ready(Some(Ok(frame))) => {
639 if let Some(data) = frame.data_ref() {
640 self.guard.bytes_sent += data.len() as u64;
641 }
642 Poll::Ready(Some(Ok(frame)))
643 }
644 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
645 Poll::Ready(None) => Poll::Ready(None),
646 Poll::Pending => Poll::Pending,
647 }
648 }
649
650 fn is_end_stream(&self) -> bool {
651 self.inner.is_end_stream()
652 }
653
654 fn size_hint(&self) -> SizeHint {
655 self.inner.size_hint()
656 }
657}
658
659#[cfg(test)]
662mod tests {
663 use super::*;
664 use std::time::Duration;
665
666 #[test]
667 fn token_bucket_allows_burst() {
668 let start = Instant::now();
669 let mut bucket = TokenBucket::new(40.0, 20.0, start);
670
671 for _ in 0..40 {
673 assert!(bucket.try_consume(start));
674 }
675 assert!(!bucket.try_consume(start));
677 }
678
679 #[test]
680 fn token_bucket_refills_over_time() {
681 let start = Instant::now();
682 let mut bucket = TokenBucket::new(40.0, 20.0, start);
683
684 for _ in 0..40 {
686 bucket.try_consume(start);
687 }
688 assert!(!bucket.try_consume(start));
689
690 let later = start + Duration::from_millis(100);
692 assert!(bucket.try_consume(later));
693 assert!(bucket.try_consume(later));
694 assert!(!bucket.try_consume(later));
695 }
696
697 #[test]
698 fn token_bucket_caps_at_capacity() {
699 let start = Instant::now();
700 let mut bucket = TokenBucket::new(40.0, 20.0, start);
701
702 let later = start + Duration::from_secs(10);
704 for _ in 0..40 {
705 assert!(bucket.try_consume(later));
706 }
707 assert!(!bucket.try_consume(later));
708 }
709}