1use std::collections::HashMap;
2use std::io::{self, Write};
3use std::pin::Pin;
4use std::sync::OnceLock;
5use std::task::{Context, Poll};
6use std::time::Instant;
7
8use chrono::Local;
9use crossterm::{cursor, execute, terminal};
10use hyper::body::{Body, Bytes, Frame, SizeHint};
11use hyper::header::HeaderMap;
12use serde::Serialize;
13use tokio::sync::broadcast;
14
15use crate::request::Request;
16
17type BoxError = Box<dyn std::error::Error + Send + Sync>;
18
19pub const DATASTAR_VERSION: &str = "1.0-RC.8";
21
22#[derive(Clone, Default)]
24pub struct StartupOptions {
25 pub watch: bool,
26 pub tls: Option<String>,
27 pub store: Option<String>,
28 pub topic: Option<String>,
29 pub expose: Option<String>,
30 pub services: bool,
31}
32
33struct TokenBucket {
36 tokens: f64,
37 capacity: f64,
38 refill_rate: f64,
39 last_refill: Instant,
40}
41
42impl TokenBucket {
43 fn new(capacity: f64, refill_rate: f64, now: Instant) -> Self {
44 Self {
45 tokens: capacity,
46 capacity,
47 refill_rate,
48 last_refill: now,
49 }
50 }
51
52 fn try_consume(&mut self, now: Instant) -> bool {
53 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
54 self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.capacity);
55 self.last_refill = now;
56
57 if self.tokens >= 1.0 {
58 self.tokens -= 1.0;
59 true
60 } else {
61 false
62 }
63 }
64}
65
66#[derive(Clone, Serialize)]
69pub struct RequestData {
70 pub proto: String,
71 pub method: String,
72 pub remote_ip: Option<String>,
73 pub remote_port: Option<u16>,
74 pub trusted_ip: Option<String>,
75 pub headers: HashMap<String, String>,
76 pub uri: String,
77 pub path: String,
78 pub query: HashMap<String, String>,
79}
80
81impl From<&Request> for RequestData {
82 fn from(req: &Request) -> Self {
83 Self {
84 proto: req.proto.to_string(),
85 method: req.method.to_string(),
86 remote_ip: req.remote_ip.map(|ip| ip.to_string()),
87 remote_port: req.remote_port,
88 trusted_ip: req.trusted_ip.map(|ip| ip.to_string()),
89 headers: req
90 .headers
91 .iter()
92 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
93 .collect(),
94 uri: req.uri.to_string(),
95 path: req.path.clone(),
96 query: req.query.clone(),
97 }
98 }
99}
100
101#[derive(Clone)]
102pub enum Event {
103 Request {
105 request_id: scru128::Scru128Id,
106 request: Box<RequestData>,
107 },
108 Response {
109 request_id: scru128::Scru128Id,
110 status: u16,
111 headers: HashMap<String, String>,
112 latency_ms: u64,
113 },
114 Complete {
115 request_id: scru128::Scru128Id,
116 bytes: u64,
117 duration_ms: u64,
118 },
119
120 Started {
122 address: String,
123 startup_ms: u64,
124 options: StartupOptions,
125 },
126 Reloaded,
127 Error {
128 error: String,
129 },
130 Print {
131 message: String,
132 },
133 Stopping {
134 inflight: usize,
135 },
136 Stopped,
137 StopTimedOut,
138 Shutdown,
139}
140
141static SENDER: OnceLock<broadcast::Sender<Event>> = OnceLock::new();
144
145pub fn init_broadcast() -> broadcast::Receiver<Event> {
146 let (tx, rx) = broadcast::channel(65536);
147 let _ = SENDER.set(tx);
148 rx
149}
150
151pub fn subscribe() -> Option<broadcast::Receiver<Event>> {
152 SENDER.get().map(|tx| tx.subscribe())
153}
154
155fn emit(event: Event) {
156 if let Some(tx) = SENDER.get() {
157 let _ = tx.send(event); }
159}
160
161pub fn log_request(request_id: scru128::Scru128Id, request: &Request) {
164 emit(Event::Request {
165 request_id,
166 request: Box::new(RequestData::from(request)),
167 });
168}
169
170pub fn log_response(
171 request_id: scru128::Scru128Id,
172 status: u16,
173 headers: &HeaderMap,
174 start_time: Instant,
175) {
176 let headers_map: HashMap<String, String> = headers
177 .iter()
178 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
179 .collect();
180
181 emit(Event::Response {
182 request_id,
183 status,
184 headers: headers_map,
185 latency_ms: start_time.elapsed().as_millis() as u64,
186 });
187}
188
189pub fn log_complete(request_id: scru128::Scru128Id, bytes: u64, response_time: Instant) {
190 emit(Event::Complete {
191 request_id,
192 bytes,
193 duration_ms: response_time.elapsed().as_millis() as u64,
194 });
195}
196
197pub fn log_started(address: &str, startup_ms: u128, options: StartupOptions) {
198 emit(Event::Started {
199 address: address.to_string(),
200 startup_ms: startup_ms as u64,
201 options,
202 });
203}
204
205pub fn log_reloaded() {
206 emit(Event::Reloaded);
207}
208
209pub fn log_error(error: &str) {
210 emit(Event::Error {
211 error: error.to_string(),
212 });
213}
214
215pub fn log_print(message: &str) {
216 emit(Event::Print {
217 message: message.to_string(),
218 });
219}
220
221pub fn log_stopping(inflight: usize) {
222 emit(Event::Stopping { inflight });
223}
224
225pub fn log_stopped() {
226 emit(Event::Stopped);
227}
228
229pub fn log_stop_timed_out() {
230 emit(Event::StopTimedOut);
231}
232
233pub fn shutdown() {
234 emit(Event::Shutdown);
235}
236
237pub fn run_jsonl_handler(rx: broadcast::Receiver<Event>) -> std::thread::JoinHandle<()> {
240 use std::io::Write;
241
242 std::thread::spawn(move || {
243 let mut rx = rx;
244 let mut stdout = std::io::BufWriter::new(std::io::stdout().lock());
245
246 loop {
247 let event = match rx.blocking_recv() {
248 Ok(event) => event,
249 Err(broadcast::error::RecvError::Lagged(n)) => {
250 let json = serde_json::json!({
251 "stamp": scru128::new().to_string(),
252 "message": "lagged",
253 "dropped": n,
254 });
255 if let Ok(line) = serde_json::to_string(&json) {
256 let _ = writeln!(stdout, "{line}");
257 let _ = stdout.flush();
258 }
259 continue;
260 }
261 Err(broadcast::error::RecvError::Closed) => break,
262 };
263
264 if matches!(event, Event::Shutdown) {
265 let _ = stdout.flush();
266 break;
267 }
268
269 let needs_flush = matches!(
270 &event,
271 Event::Started { .. }
272 | Event::Stopped
273 | Event::StopTimedOut
274 | Event::Reloaded
275 | Event::Error { .. }
276 | Event::Print { .. }
277 );
278
279 let stamp = scru128::new().to_string();
280
281 let json = match event {
282 Event::Request {
283 request_id,
284 request,
285 } => {
286 serde_json::json!({
287 "stamp": stamp,
288 "message": "request",
289 "request_id": request_id.to_string(),
290 "method": &request.method,
291 "path": &request.path,
292 "trusted_ip": &request.trusted_ip,
293 "request": request,
294 })
295 }
296 Event::Response {
297 request_id,
298 status,
299 headers,
300 latency_ms,
301 } => {
302 serde_json::json!({
303 "stamp": stamp,
304 "message": "response",
305 "request_id": request_id.to_string(),
306 "status": status,
307 "headers": headers,
308 "latency_ms": latency_ms,
309 })
310 }
311 Event::Complete {
312 request_id,
313 bytes,
314 duration_ms,
315 } => {
316 serde_json::json!({
317 "stamp": stamp,
318 "message": "complete",
319 "request_id": request_id.to_string(),
320 "bytes": bytes,
321 "duration_ms": duration_ms,
322 })
323 }
324 Event::Started {
325 address,
326 startup_ms,
327 options,
328 } => {
329 let xs_version: Option<&str> = if options.store.is_some() {
330 #[cfg(feature = "cross-stream")]
331 {
332 Some(env!("XS_VERSION"))
333 }
334 #[cfg(not(feature = "cross-stream"))]
335 {
336 None
337 }
338 } else {
339 None
340 };
341 serde_json::json!({
342 "stamp": stamp,
343 "message": "started",
344 "address": address,
345 "startup_ms": startup_ms,
346 "watch": options.watch,
347 "tls": options.tls,
348 "store": options.store,
349 "topic": options.topic,
350 "expose": options.expose,
351 "services": options.services,
352 "nu_version": env!("NU_VERSION"),
353 "xs_version": xs_version,
354 "datastar_version": DATASTAR_VERSION,
355 })
356 }
357 Event::Reloaded => {
358 serde_json::json!({
359 "stamp": stamp,
360 "message": "reloaded",
361 })
362 }
363 Event::Error { error } => {
364 serde_json::json!({
365 "stamp": stamp,
366 "message": "error",
367 "error": error,
368 })
369 }
370 Event::Print { message } => {
371 serde_json::json!({
372 "stamp": stamp,
373 "message": "print",
374 "content": message,
375 })
376 }
377 Event::Stopping { inflight } => {
378 serde_json::json!({
379 "stamp": stamp,
380 "message": "stopping",
381 "inflight": inflight,
382 })
383 }
384 Event::Stopped => {
385 serde_json::json!({
386 "stamp": stamp,
387 "message": "stopped",
388 })
389 }
390 Event::StopTimedOut => {
391 serde_json::json!({
392 "stamp": stamp,
393 "message": "stop_timed_out",
394 })
395 }
396 Event::Shutdown => unreachable!(),
397 };
398
399 if let Ok(line) = serde_json::to_string(&json) {
400 let _ = writeln!(stdout, "{line}");
401 }
402
403 if needs_flush || rx.is_empty() {
405 let _ = stdout.flush();
406 }
407 }
408
409 let _ = stdout.flush();
410 })
411}
412
413struct RequestState {
416 method: String,
417 path: String,
418 trusted_ip: Option<String>,
419 start_time: Instant,
420 status: Option<u16>,
421 latency_ms: Option<u64>,
422}
423
424fn truncate_middle(s: &str, max_len: usize) -> String {
425 if s.len() <= max_len {
426 return s.to_string();
427 }
428 let keep = (max_len - 3) / 2;
429 format!("{}...{}", &s[..keep], &s[s.len() - keep..])
430}
431
432struct ActiveZone {
433 stdout: io::Stdout,
434 line_count: usize,
435}
436
437impl ActiveZone {
438 fn new() -> Self {
439 Self {
440 stdout: io::stdout(),
441 line_count: 0,
442 }
443 }
444
445 fn clear(&mut self) {
447 if self.line_count > 0 {
448 let _ = execute!(
449 self.stdout,
450 cursor::MoveUp(self.line_count as u16),
451 terminal::Clear(terminal::ClearType::FromCursorDown)
452 );
453 self.line_count = 0;
454 }
455 }
456
457 fn print_permanent(&mut self, line: &str) {
459 self.clear();
460 println!("{line}");
461 let _ = self.stdout.flush();
462 }
463
464 fn redraw(&mut self, active_ids: &[String], requests: &HashMap<String, RequestState>) {
466 self.line_count = 0;
467 if !active_ids.is_empty() {
468 println!("· · · ✈ in flight · · ·");
469 self.line_count += 1;
470 for id in active_ids {
471 if let Some(state) = requests.get(id) {
472 let line = format_active_line(state);
473 println!("{line}");
474 self.line_count += 1;
475 }
476 }
477 }
478 let _ = self.stdout.flush();
479 }
480}
481
482fn format_active_line(state: &RequestState) -> String {
483 let timestamp = Local::now().format("%H:%M:%S%.3f");
484 let ip = state.trusted_ip.as_deref().unwrap_or("-");
485 let method = &state.method;
486 let path = truncate_middle(&state.path, 40);
487 let elapsed = state.start_time.elapsed().as_secs_f64();
488
489 match (state.status, state.latency_ms) {
490 (Some(status), Some(latency)) => {
491 format!(
492 "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {elapsed:>6.1}s"
493 )
494 }
495 _ => {
496 format!("{timestamp} {ip:>15} {method:<6} {path:<40} ... {elapsed:>6.1}s")
497 }
498 }
499}
500
501fn format_complete_line(state: &RequestState, duration_ms: u64, bytes: u64) -> String {
502 let timestamp = Local::now().format("%H:%M:%S%.3f");
503 let ip = state.trusted_ip.as_deref().unwrap_or("-");
504 let method = &state.method;
505 let path = truncate_middle(&state.path, 40);
506 let status = state.status.unwrap_or(0);
507 let latency = state.latency_ms.unwrap_or(0);
508
509 format!(
510 "{timestamp} {ip:>15} {method:<6} {path:<40} {status} {latency:>6}ms {duration_ms:>6}ms {bytes:>8}b"
511 )
512}
513
514pub fn run_human_handler(rx: broadcast::Receiver<Event>) -> std::thread::JoinHandle<()> {
515 std::thread::spawn(move || {
516 let mut rx = rx;
517 let mut zone = ActiveZone::new();
518 let mut requests: HashMap<String, RequestState> = HashMap::new();
519 let mut active_ids: Vec<String> = Vec::new();
520
521 let mut rate_limiter = TokenBucket::new(40.0, 20.0, Instant::now());
523 let mut skipped: u64 = 0;
524 let mut lagged: u64 = 0;
525
526 loop {
527 let event = match rx.blocking_recv() {
528 Ok(event) => event,
529 Err(broadcast::error::RecvError::Lagged(n)) => {
530 lagged += n;
531 requests.clear();
533 active_ids.clear();
534 zone.print_permanent(&format!(
535 "⚠ lagged: dropped {n} events, cleared in-flight"
536 ));
537 continue;
538 }
539 Err(broadcast::error::RecvError::Closed) => break,
540 };
541 match event {
542 Event::Started {
543 address,
544 startup_ms,
545 options,
546 } => {
547 let version = env!("CARGO_PKG_VERSION");
548 let pid = std::process::id();
549 let now = Local::now().to_rfc2822();
550 zone.print_permanent(&format!("<http-nu version=\"{version}\">"));
551 zone.print_permanent(" __ ,");
552 zone.print_permanent(&format!(
553 " .--()°'.' pid {pid} · {address} · {startup_ms}ms 💜"
554 ));
555 zone.print_permanent(&format!("'|, . ,' {now}"));
556
557 let mut http_opts = Vec::new();
559 if options.watch {
560 http_opts.push("watch".to_string());
561 }
562 if let Some(ref tls) = options.tls {
563 http_opts.push(format!("tls:{tls}"));
564 }
565
566 let mut xs_opts = Vec::new();
567 if let Some(ref store) = options.store {
568 xs_opts.push(store.to_string());
569 }
570 if let Some(ref topic) = options.topic {
571 xs_opts.push(format!("topic:{topic}"));
572 }
573 if let Some(ref expose) = options.expose {
574 xs_opts.push(expose.to_string());
575 }
576 if options.services {
577 xs_opts.push("services".to_string());
578 }
579
580 let mut versions = vec![format!("nu {}", env!("NU_VERSION"))];
582 #[cfg(feature = "cross-stream")]
583 if options.store.is_some() {
584 versions.push(format!("xs {}", env!("XS_VERSION")));
585 }
586 versions.push(format!("datastar {DATASTAR_VERSION}"));
587 let versions_str = versions.join(" · ");
588
589 let has_opts = !http_opts.is_empty() || !xs_opts.is_empty();
590 if has_opts {
591 let opts_str = match (http_opts.is_empty(), xs_opts.is_empty()) {
593 (false, true) => http_opts.join(" "),
594 (true, false) => format!("xs {}", xs_opts.join(" ")),
595 (false, false) => {
596 format!("{} │ xs {}", http_opts.join(" "), xs_opts.join(" "))
597 }
598 _ => unreachable!(),
599 };
600 zone.print_permanent(&format!(" !_-(_\\ {opts_str}"));
601 zone.print_permanent(&format!(" {versions_str}"));
602 } else {
603 zone.print_permanent(&format!(" !_-(_\\ {versions_str}"));
605 }
606
607 zone.redraw(&active_ids, &requests);
608 }
609 Event::Reloaded => {
610 zone.print_permanent("reloaded 🔄");
611 zone.redraw(&active_ids, &requests);
612 }
613 Event::Error { error } => {
614 zone.clear();
615 eprintln!("ERROR: {error}");
616 zone.redraw(&active_ids, &requests);
617 }
618 Event::Print { message } => {
619 zone.print_permanent(&format!("PRINT: {message}"));
620 zone.redraw(&active_ids, &requests);
621 }
622 Event::Stopping { inflight } => {
623 zone.print_permanent(&format!(
624 "stopping, {inflight} connection(s) in flight..."
625 ));
626 zone.redraw(&active_ids, &requests);
627 }
628 Event::Stopped => {
629 let timestamp = Local::now().format("%H:%M:%S%.3f");
630 zone.print_permanent(&format!("{timestamp} cu l8r"));
631 zone.clear();
632 println!("</http-nu>");
633 }
634 Event::StopTimedOut => {
635 zone.print_permanent("stop timed out, forcing exit");
636 }
637 Event::Request {
638 request_id,
639 request,
640 } => {
641 if !rate_limiter.try_consume(Instant::now()) {
642 skipped += 1;
643 continue;
644 }
645
646 if skipped > 0 {
648 zone.print_permanent(&format!("... skipped {skipped} requests"));
649 skipped = 0;
650 }
651
652 let id = request_id.to_string();
653 let state = RequestState {
654 method: request.method.clone(),
655 path: request.path.clone(),
656 trusted_ip: request.trusted_ip.clone(),
657 start_time: Instant::now(),
658 status: None,
659 latency_ms: None,
660 };
661 requests.insert(id.clone(), state);
662 active_ids.push(id);
663 zone.clear();
664 zone.redraw(&active_ids, &requests);
665 }
666 Event::Response {
667 request_id,
668 status,
669 latency_ms,
670 ..
671 } => {
672 let id = request_id.to_string();
673 if let Some(state) = requests.get_mut(&id) {
674 state.status = Some(status);
675 state.latency_ms = Some(latency_ms);
676 zone.clear();
677 zone.redraw(&active_ids, &requests);
678 }
679 }
680 Event::Complete {
681 request_id,
682 bytes,
683 duration_ms,
684 } => {
685 let id = request_id.to_string();
686 if let Some(state) = requests.remove(&id) {
687 active_ids.retain(|x| x != &id);
688 let line = format_complete_line(&state, duration_ms, bytes);
689 zone.print_permanent(&line);
690 zone.redraw(&active_ids, &requests);
691 }
692 }
693 Event::Shutdown => break,
694 }
695 }
696
697 zone.clear();
699 if skipped > 0 {
700 println!("... skipped {skipped} requests");
701 }
702 if lagged > 0 {
703 println!("⚠ total lagged: {lagged} events dropped");
704 }
705 })
706}
707
708pub struct RequestGuard {
711 request_id: scru128::Scru128Id,
712 start: Instant,
713 bytes_sent: u64,
714}
715
716impl RequestGuard {
717 pub fn new(request_id: scru128::Scru128Id) -> Self {
718 Self {
719 request_id,
720 start: Instant::now(),
721 bytes_sent: 0,
722 }
723 }
724
725 pub fn request_id(&self) -> scru128::Scru128Id {
726 self.request_id
727 }
728}
729
730impl Drop for RequestGuard {
731 fn drop(&mut self) {
732 log_complete(self.request_id, self.bytes_sent, self.start);
733 }
734}
735
736pub struct LoggingBody<B> {
739 inner: B,
740 guard: RequestGuard,
741}
742
743impl<B> LoggingBody<B> {
744 pub fn new(inner: B, guard: RequestGuard) -> Self {
745 Self { inner, guard }
746 }
747}
748
749impl<B> Body for LoggingBody<B>
750where
751 B: Body<Data = Bytes, Error = BoxError> + Unpin,
752{
753 type Data = Bytes;
754 type Error = BoxError;
755
756 fn poll_frame(
757 mut self: Pin<&mut Self>,
758 cx: &mut Context<'_>,
759 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
760 let inner = Pin::new(&mut self.inner);
761 match inner.poll_frame(cx) {
762 Poll::Ready(Some(Ok(frame))) => {
763 if let Some(data) = frame.data_ref() {
764 self.guard.bytes_sent += data.len() as u64;
765 }
766 Poll::Ready(Some(Ok(frame)))
767 }
768 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
769 Poll::Ready(None) => Poll::Ready(None),
770 Poll::Pending => Poll::Pending,
771 }
772 }
773
774 fn is_end_stream(&self) -> bool {
775 self.inner.is_end_stream()
776 }
777
778 fn size_hint(&self) -> SizeHint {
779 self.inner.size_hint()
780 }
781}
782
783#[cfg(test)]
786mod tests {
787 use super::*;
788 use std::time::Duration;
789
790 #[test]
791 fn token_bucket_allows_burst() {
792 let start = Instant::now();
793 let mut bucket = TokenBucket::new(40.0, 20.0, start);
794
795 for _ in 0..40 {
797 assert!(bucket.try_consume(start));
798 }
799 assert!(!bucket.try_consume(start));
801 }
802
803 #[test]
804 fn token_bucket_refills_over_time() {
805 let start = Instant::now();
806 let mut bucket = TokenBucket::new(40.0, 20.0, start);
807
808 for _ in 0..40 {
810 bucket.try_consume(start);
811 }
812 assert!(!bucket.try_consume(start));
813
814 let later = start + Duration::from_millis(100);
816 assert!(bucket.try_consume(later));
817 assert!(bucket.try_consume(later));
818 assert!(!bucket.try_consume(later));
819 }
820
821 #[test]
822 fn token_bucket_caps_at_capacity() {
823 let start = Instant::now();
824 let mut bucket = TokenBucket::new(40.0, 20.0, start);
825
826 let later = start + Duration::from_secs(10);
828 for _ in 0..40 {
829 assert!(bucket.try_consume(later));
830 }
831 assert!(!bucket.try_consume(later));
832 }
833}