1use std::collections::HashMap;
2use std::pin::Pin;
3use std::sync::OnceLock;
4use std::task::{Context, Poll};
5use std::time::Instant;
6
7use chrono::Local;
8use hyper::body::{Body, Bytes, Frame, SizeHint};
9use hyper::header::HeaderMap;
10use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
11use serde::Serialize;
12use tokio::sync::broadcast;
13
14use crate::request::Request;
15
16type BoxError = Box<dyn std::error::Error + Send + Sync>;
17
18#[derive(Clone, Serialize)]
21pub struct RequestData {
22 pub proto: String,
23 pub method: String,
24 pub remote_ip: Option<String>,
25 pub remote_port: Option<u16>,
26 pub trusted_ip: Option<String>,
27 pub headers: HashMap<String, String>,
28 pub uri: String,
29 pub path: String,
30 pub query: HashMap<String, String>,
31}
32
33impl From<&Request> for RequestData {
34 fn from(req: &Request) -> Self {
35 Self {
36 proto: req.proto.to_string(),
37 method: req.method.to_string(),
38 remote_ip: req.remote_ip.map(|ip| ip.to_string()),
39 remote_port: req.remote_port,
40 trusted_ip: req.trusted_ip.map(|ip| ip.to_string()),
41 headers: req
42 .headers
43 .iter()
44 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
45 .collect(),
46 uri: req.uri.to_string(),
47 path: req.path.clone(),
48 query: req.query.clone(),
49 }
50 }
51}
52
53#[derive(Clone)]
54pub enum Event {
55 Request {
57 request_id: scru128::Scru128Id,
58 request: Box<RequestData>,
59 },
60 Response {
61 request_id: scru128::Scru128Id,
62 status: u16,
63 headers: HashMap<String, String>,
64 latency_ms: u64,
65 },
66 Complete {
67 request_id: scru128::Scru128Id,
68 bytes: u64,
69 duration_ms: u64,
70 },
71
72 Started {
74 address: String,
75 startup_ms: u64,
76 },
77 Reloaded,
78 Error {
79 error: String,
80 },
81 Stopping {
82 inflight: usize,
83 },
84 Stopped,
85 StopTimedOut,
86}
87
88static SENDER: OnceLock<broadcast::Sender<Event>> = OnceLock::new();
91
92pub fn init_broadcast() -> broadcast::Receiver<Event> {
93 let (tx, rx) = broadcast::channel(65536);
94 let _ = SENDER.set(tx);
95 rx
96}
97
98pub fn subscribe() -> Option<broadcast::Receiver<Event>> {
99 SENDER.get().map(|tx| tx.subscribe())
100}
101
102fn emit(event: Event) {
103 if let Some(tx) = SENDER.get() {
104 let _ = tx.send(event); }
106}
107
108pub fn log_request(request_id: scru128::Scru128Id, request: &Request) {
111 emit(Event::Request {
112 request_id,
113 request: Box::new(RequestData::from(request)),
114 });
115}
116
117pub fn log_response(
118 request_id: scru128::Scru128Id,
119 status: u16,
120 headers: &HeaderMap,
121 start_time: Instant,
122) {
123 let headers_map: HashMap<String, String> = headers
124 .iter()
125 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
126 .collect();
127
128 emit(Event::Response {
129 request_id,
130 status,
131 headers: headers_map,
132 latency_ms: start_time.elapsed().as_millis() as u64,
133 });
134}
135
136pub fn log_complete(request_id: scru128::Scru128Id, bytes: u64, response_time: Instant) {
137 emit(Event::Complete {
138 request_id,
139 bytes,
140 duration_ms: response_time.elapsed().as_millis() as u64,
141 });
142}
143
144pub fn log_started(address: &str, startup_ms: u128) {
145 emit(Event::Started {
146 address: address.to_string(),
147 startup_ms: startup_ms as u64,
148 });
149}
150
151pub fn log_reloaded() {
152 emit(Event::Reloaded);
153}
154
155pub fn log_error(error: &str) {
156 emit(Event::Error {
157 error: error.to_string(),
158 });
159}
160
161pub fn log_stopping(inflight: usize) {
162 emit(Event::Stopping { inflight });
163}
164
165pub fn log_stopped() {
166 emit(Event::Stopped);
167}
168
169pub fn log_stop_timed_out() {
170 emit(Event::StopTimedOut);
171}
172
173pub fn run_jsonl_handler(rx: broadcast::Receiver<Event>) {
176 use std::io::Write;
177
178 std::thread::spawn(move || {
179 let mut rx = rx;
180 let mut stdout = std::io::BufWriter::new(std::io::stdout().lock());
181
182 loop {
183 let event = match rx.blocking_recv() {
184 Ok(event) => event,
185 Err(broadcast::error::RecvError::Lagged(n)) => {
186 let json = serde_json::json!({
187 "stamp": scru128::new().to_string(),
188 "message": "lagged",
189 "dropped": n,
190 });
191 if let Ok(line) = serde_json::to_string(&json) {
192 let _ = writeln!(stdout, "{line}");
193 let _ = stdout.flush();
194 }
195 continue;
196 }
197 Err(broadcast::error::RecvError::Closed) => break,
198 };
199
200 let needs_flush = matches!(
201 &event,
202 Event::Started { .. }
203 | Event::Stopped
204 | Event::StopTimedOut
205 | Event::Reloaded
206 | Event::Error { .. }
207 );
208
209 let stamp = scru128::new().to_string();
210
211 let json = match event {
212 Event::Request {
213 request_id,
214 request,
215 } => {
216 serde_json::json!({
217 "stamp": stamp,
218 "message": "request",
219 "request_id": request_id.to_string(),
220 "method": &request.method,
221 "path": &request.path,
222 "trusted_ip": &request.trusted_ip,
223 "request": request,
224 })
225 }
226 Event::Response {
227 request_id,
228 status,
229 headers,
230 latency_ms,
231 } => {
232 serde_json::json!({
233 "stamp": stamp,
234 "message": "response",
235 "request_id": request_id.to_string(),
236 "status": status,
237 "headers": headers,
238 "latency_ms": latency_ms,
239 })
240 }
241 Event::Complete {
242 request_id,
243 bytes,
244 duration_ms,
245 } => {
246 serde_json::json!({
247 "stamp": stamp,
248 "message": "complete",
249 "request_id": request_id.to_string(),
250 "bytes": bytes,
251 "duration_ms": duration_ms,
252 })
253 }
254 Event::Started {
255 address,
256 startup_ms,
257 } => {
258 serde_json::json!({
259 "stamp": stamp,
260 "message": "started",
261 "address": address,
262 "startup_ms": startup_ms,
263 })
264 }
265 Event::Reloaded => {
266 serde_json::json!({
267 "stamp": stamp,
268 "message": "reloaded",
269 })
270 }
271 Event::Error { error } => {
272 serde_json::json!({
273 "stamp": stamp,
274 "message": "error",
275 "error": error,
276 })
277 }
278 Event::Stopping { inflight } => {
279 serde_json::json!({
280 "stamp": stamp,
281 "message": "stopping",
282 "inflight": inflight,
283 })
284 }
285 Event::Stopped => {
286 serde_json::json!({
287 "stamp": stamp,
288 "message": "stopped",
289 })
290 }
291 Event::StopTimedOut => {
292 serde_json::json!({
293 "stamp": stamp,
294 "message": "stop_timed_out",
295 })
296 }
297 };
298
299 if let Ok(line) = serde_json::to_string(&json) {
300 let _ = writeln!(stdout, "{line}");
301 }
302
303 if needs_flush || rx.is_empty() {
305 let _ = stdout.flush();
306 }
307 }
308
309 let _ = stdout.flush();
310 });
311}
312
313struct RequestState {
316 pb: ProgressBar,
317 method: String,
318 path: String,
319 trusted_ip: Option<String>,
320 status: Option<u16>,
321 latency_ms: Option<u64>,
322}
323
324fn truncate_middle(s: &str, max_len: usize) -> String {
325 if s.len() <= max_len {
326 return s.to_string();
327 }
328 let keep = (max_len - 3) / 2;
329 format!("{}...{}", &s[..keep], &s[s.len() - keep..])
330}
331
332fn format_line(state: &RequestState, duration_ms: Option<u64>, bytes: Option<u64>) -> String {
333 let timestamp = Local::now().format("%H:%M:%S%.3f");
334 let ip = state.trusted_ip.as_deref().unwrap_or("-");
335 let method = &state.method;
336 let path = truncate_middle(&state.path, 40);
337
338 match (state.status, state.latency_ms, duration_ms, bytes) {
339 (Some(status), Some(latency), Some(dur), Some(b)) => {
341 format!(
342 "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {dur:>6}ms {b:>8}b"
343 )
344 }
345 (Some(status), Some(latency), None, None) => {
347 format!("{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms")
348 }
349 _ => {
351 format!("{timestamp} {ip:>15} {method:<6} {path:<40} ...")
352 }
353 }
354}
355
356pub fn run_human_handler(rx: broadcast::Receiver<Event>) {
357 std::thread::spawn(move || {
358 let mut rx = rx;
359 let mp = MultiProgress::new();
360 let mut requests: HashMap<String, RequestState> = HashMap::new();
361
362 let min_interval = std::time::Duration::from_millis(100);
364 let mut last_shown = std::time::Instant::now();
365 let mut skipped: u64 = 0;
366
367 loop {
368 let event = match rx.blocking_recv() {
369 Ok(event) => event,
370 Err(broadcast::error::RecvError::Lagged(n)) => {
371 skipped += n;
372 continue;
373 }
374 Err(broadcast::error::RecvError::Closed) => break,
375 };
376 match event {
377 Event::Started {
378 address,
379 startup_ms,
380 } => {
381 let version = env!("CARGO_PKG_VERSION");
382 let pid = std::process::id();
383 let now = Local::now().to_rfc2822();
384 println!("<http-nu version=\"{version}\">");
385 println!(" __ ,");
386 println!(" .--()°'.' pid {pid} · {address} · {startup_ms}ms 💜");
387 println!("'|, . ,' {now}");
388 println!(" !_-(_\\");
389 }
390 Event::Reloaded => {
391 println!("reloaded 🔄");
392 }
393 Event::Error { error } => {
394 eprintln!("{error}");
395 }
396 Event::Stopping { inflight } => {
397 println!("stopping, {inflight} connection(s) in flight...");
398 }
399 Event::Stopped => {
400 println!("cu l8r </http-nu>");
401 }
402 Event::StopTimedOut => {
403 println!("stop timed out, forcing exit");
404 }
405 Event::Request {
406 request_id,
407 request,
408 } => {
409 let now = std::time::Instant::now();
410 if now.duration_since(last_shown) < min_interval {
411 skipped += 1;
412 continue;
413 }
414 last_shown = now;
415
416 if skipped > 0 {
418 println!("... skipped {skipped} requests");
419 skipped = 0;
420 }
421
422 let pb = mp.add(ProgressBar::new_spinner());
423 pb.set_style(ProgressStyle::default_spinner().template("{msg}").unwrap());
424
425 let state = RequestState {
426 pb: pb.clone(),
427 method: request.method.clone(),
428 path: request.path.clone(),
429 trusted_ip: request.trusted_ip.clone(),
430 status: None,
431 latency_ms: None,
432 };
433 pb.set_message(format_line(&state, None, None));
434 requests.insert(request_id.to_string(), state);
435 }
436 Event::Response {
437 request_id,
438 status,
439 latency_ms,
440 ..
441 } => {
442 if let Some(state) = requests.get_mut(&request_id.to_string()) {
443 state.status = Some(status);
444 state.latency_ms = Some(latency_ms);
445 state.pb.set_message(format_line(state, None, None));
446 }
447 }
448 Event::Complete {
449 request_id,
450 bytes,
451 duration_ms,
452 } => {
453 if let Some(state) = requests.remove(&request_id.to_string()) {
454 state.pb.finish_with_message(format_line(
455 &state,
456 Some(duration_ms),
457 Some(bytes),
458 ));
459 }
460 }
461 }
462 }
463
464 if skipped > 0 {
466 println!("... skipped {skipped} requests");
467 }
468 });
469}
470
471pub struct LoggingBody<B> {
474 inner: B,
475 request_id: scru128::Scru128Id,
476 response_time: Instant,
477 bytes_sent: u64,
478 logged_complete: bool,
479}
480
481impl<B> LoggingBody<B> {
482 pub fn new(inner: B, request_id: scru128::Scru128Id) -> Self {
483 Self {
484 inner,
485 request_id,
486 response_time: Instant::now(),
487 bytes_sent: 0,
488 logged_complete: false,
489 }
490 }
491
492 fn do_log_complete(&mut self) {
493 if !self.logged_complete {
494 self.logged_complete = true;
495 log_complete(self.request_id, self.bytes_sent, self.response_time);
496 }
497 }
498}
499
500impl<B> Body for LoggingBody<B>
501where
502 B: Body<Data = Bytes, Error = BoxError> + Unpin,
503{
504 type Data = Bytes;
505 type Error = BoxError;
506
507 fn poll_frame(
508 mut self: Pin<&mut Self>,
509 cx: &mut Context<'_>,
510 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
511 let inner = Pin::new(&mut self.inner);
512 match inner.poll_frame(cx) {
513 Poll::Ready(Some(Ok(frame))) => {
514 if let Some(data) = frame.data_ref() {
515 self.bytes_sent += data.len() as u64;
516 }
517 Poll::Ready(Some(Ok(frame)))
518 }
519 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
520 Poll::Ready(None) => {
521 self.do_log_complete();
522 Poll::Ready(None)
523 }
524 Poll::Pending => Poll::Pending,
525 }
526 }
527
528 fn is_end_stream(&self) -> bool {
529 self.inner.is_end_stream()
530 }
531
532 fn size_hint(&self) -> SizeHint {
533 self.inner.size_hint()
534 }
535}
536
537impl<B> Drop for LoggingBody<B> {
538 fn drop(&mut self) {
539 self.do_log_complete();
540 }
541}