1use crate::bloop::ProcessedBloop;
2use crate::event::Event;
3use bytes::Bytes;
4use chrono::{DateTime, NaiveDate, Timelike, Utc};
5use chrono_tz::Tz;
6use http_body_util::Full;
7use hyper::server::conn::http1;
8use hyper::service::service_fn;
9use hyper::{Request, Response};
10use hyper_util::rt::{TokioIo, TokioTimer};
11use serde::Serialize;
12use std::collections::HashMap;
13use std::convert::Infallible;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use thiserror::Error;
17use tokio::net::{TcpListener, TcpStream};
18use tokio::sync::broadcast::error::RecvError;
19use tokio::sync::{RwLock, broadcast};
20use tokio::{io, select, task};
21#[cfg(feature = "tokio-graceful-shutdown")]
22use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
23use tracing::{debug, error, warn};
24
25const MINUTES_IN_DAY: usize = 60 * 24;
26
27#[derive(Debug)]
134pub struct ClientStats {
135 pub total_bloops: u64,
137
138 pub per_minute_bloops: [u8; MINUTES_IN_DAY],
142
143 pub per_minute_dates: [NaiveDate; MINUTES_IN_DAY],
145
146 pub bloops_per_hour: [u32; 24],
148
149 pub bloops_per_day: HashMap<NaiveDate, u32>,
151}
152
153impl Default for ClientStats {
154 fn default() -> Self {
155 Self::new()
156 }
157}
158
159impl ClientStats {
160 pub fn new() -> Self {
162 Self {
163 total_bloops: 0,
164 per_minute_bloops: [0; MINUTES_IN_DAY],
165 per_minute_dates: [NaiveDate::MIN; MINUTES_IN_DAY],
166 bloops_per_hour: [0; 24],
167 bloops_per_day: HashMap::new(),
168 }
169 }
170
171 fn record_bloop(&mut self, bloop: ProcessedBloop, tz: &Tz) {
172 let date = bloop.recorded_at.date_naive();
173 let time = bloop.recorded_at.time();
174 let idx = (time.hour() * 60 + time.minute()) as usize;
175
176 if self.per_minute_dates[idx] != date {
177 self.per_minute_dates[idx] = date;
178 self.per_minute_bloops[idx] = 1;
179 } else {
180 self.per_minute_bloops[idx] = self.per_minute_bloops[idx].saturating_add(1);
181 }
182
183 let local_hour = bloop.recorded_at.with_timezone(tz).hour() as usize;
184
185 self.total_bloops += 1;
186 self.bloops_per_hour[local_hour] += 1;
187 *self.bloops_per_day.entry(date).or_insert(0) += 1;
188 }
189
190 fn count_last_minutes(&self, now: DateTime<Utc>, minutes: usize) -> u32 {
191 let now_minute = (now.hour() * 60 + now.minute()) as usize;
192 let mut total = 0;
193
194 for i in 0..minutes {
195 let idx = now_minute.wrapping_sub(i) % MINUTES_IN_DAY;
196 let expected_date = (now - chrono::Duration::minutes(i as i64)).date_naive();
197
198 if self.per_minute_dates[idx] == expected_date {
199 total += self.per_minute_bloops[idx] as u32;
200 }
201 }
202
203 total
204 }
205}
206
207#[inline]
223pub fn minute_index(time: impl Timelike) -> usize {
224 time.hour() as usize * 60 + time.minute() as usize
225}
226
227#[derive(Debug, Clone, Serialize)]
228#[serde(rename_all = "camelCase")]
229struct StatsSummary {
230 pub total_bloops: u64,
231 pub bloops_last_hour: u32,
232 pub bloops_last_24_hours: u32,
233 pub bloops_per_hour: [u32; 24],
234 pub bloops_per_day: HashMap<NaiveDate, u32>,
235}
236
237#[derive(Debug, Clone, Serialize)]
238#[serde(rename_all = "camelCase")]
239struct StatsSnapshot {
240 pub created_at: DateTime<Utc>,
241 pub clients: HashMap<String, StatsSummary>,
242 pub global: StatsSummary,
243}
244
245#[derive(Debug, Default)]
246pub struct StatsTracker {
247 clients: HashMap<String, ClientStats>,
248 cached_snapshot: RwLock<Option<StatsSnapshot>>,
249}
250
251impl From<HashMap<String, ClientStats>> for StatsTracker {
252 fn from(clients: HashMap<String, ClientStats>) -> Self {
253 Self::new(clients)
254 }
255}
256
257impl StatsTracker {
258 pub fn new(clients: HashMap<String, ClientStats>) -> Self {
259 Self {
260 clients,
261 cached_snapshot: RwLock::new(None),
262 }
263 }
264
265 fn record_bloop(&mut self, bloop: ProcessedBloop, tz: &Tz) {
266 let client = self.clients.entry(bloop.client_id.clone()).or_default();
267
268 client.record_bloop(bloop, tz);
269 }
270
271 async fn snapshot(&self, now: DateTime<Utc>) -> StatsSnapshot {
272 if let Some(snapshot) = self.cached_snapshot.read().await.as_ref()
273 && snapshot.created_at > now - chrono::Duration::minutes(1)
274 {
275 return snapshot.clone();
276 }
277
278 let snapshot = self.compute_snapshot(now);
279 self.cached_snapshot.write().await.replace(snapshot.clone());
280
281 snapshot
282 }
283
284 fn compute_snapshot(&self, now: DateTime<Utc>) -> StatsSnapshot {
285 let mut clients_snapshots = HashMap::new();
286 let mut global = StatsSummary {
287 total_bloops: 0,
288 bloops_last_hour: 0,
289 bloops_last_24_hours: 0,
290 bloops_per_hour: [0; 24],
291 bloops_per_day: HashMap::new(),
292 };
293
294 for (client_id, client) in &self.clients {
295 let bloops_last_hour = client.count_last_minutes(now, 60);
296 let bloops_last_24_hours = client.count_last_minutes(now, MINUTES_IN_DAY);
297
298 let snapshot = StatsSummary {
299 total_bloops: client.total_bloops,
300 bloops_last_hour,
301 bloops_last_24_hours,
302 bloops_per_hour: client.bloops_per_hour,
303 bloops_per_day: client.bloops_per_day.clone(),
304 };
305
306 global.total_bloops += snapshot.total_bloops;
307 global.bloops_last_hour += snapshot.bloops_last_hour;
308 global.bloops_last_24_hours += snapshot.bloops_last_24_hours;
309
310 for hour in 0..24 {
311 global.bloops_per_hour[hour] += snapshot.bloops_per_hour[hour];
312 }
313
314 for (day, count) in snapshot.bloops_per_day.iter() {
315 *global.bloops_per_day.entry(*day).or_insert(0) += count;
316 }
317
318 clients_snapshots.insert(client_id.clone(), snapshot);
319 }
320
321 StatsSnapshot {
322 created_at: now,
323 clients: clients_snapshots,
324 global,
325 }
326 }
327}
328
329#[derive(Debug)]
331pub struct StatisticsServer {
332 addr: SocketAddr,
333 stats: Arc<RwLock<StatsTracker>>,
334 event_rx: broadcast::Receiver<Event>,
335 tz: Tz,
336 #[cfg(test)]
337 pub test_notify_event_processed: Arc<tokio::sync::Notify>,
338}
339
340impl StatisticsServer {
341 pub async fn listen(&mut self) -> Result<(), io::Error> {
345 let listener = TcpListener::bind(self.addr).await?;
346
347 loop {
348 let should_continue = select! {
349 conn = listener.accept() => {
350 if let Ok((stream, _)) = conn {
351 self.handle_connection(stream).await;
352 }
353
354 true
355 }
356
357 result = self.event_rx.recv() => {
358 self.handle_recv(result).await
359 }
360 };
361
362 if !should_continue {
363 break;
364 }
365 }
366
367 Ok(())
368 }
369
370 async fn handle_connection(&self, stream: TcpStream) {
372 let io = TokioIo::new(stream);
373 let stats = self.stats.clone();
374
375 task::spawn(async move {
376 let result = http1::Builder::new()
377 .timer(TokioTimer::new())
378 .serve_connection(
379 io,
380 service_fn(move |_: Request<_>| {
381 let stats = stats.clone();
382
383 async move {
384 let snapshot = stats.read().await.snapshot(Utc::now()).await;
385
386 let body = match serde_json::to_string(&snapshot) {
387 Ok(body) => body,
388 Err(err) => {
389 error!("failed to serialize statistics: {:?}", err);
390 return Ok::<_, Infallible>(
391 Response::builder()
392 .status(500)
393 .body(Full::default())
394 .unwrap(),
395 );
396 }
397 };
398
399 Ok::<_, Infallible>(
400 Response::builder()
401 .header("Content-Type", "application/json")
402 .body(Full::new(Bytes::from(body)))
403 .unwrap(),
404 )
405 }
406 }),
407 )
408 .await;
409
410 if let Err(err) = result {
411 error!("failed to serve statistics request: {:?}", err);
412 }
413 });
414 }
415
416 async fn handle_recv(&self, result: Result<Event, RecvError>) -> bool {
421 let should_continue = match result {
422 Ok(Event::BloopProcessed(bloop)) => {
423 self.stats.write().await.record_bloop(bloop, &self.tz);
424 true
425 }
426 Ok(_) => true,
427 Err(RecvError::Lagged(n)) => {
428 warn!("StatisticsServer lagged by {n} messages, some bloops were missed");
429 true
430 }
431 Err(RecvError::Closed) => {
432 debug!("StatisticsServer event stream closed, exiting event loop");
433 false
434 }
435 };
436
437 #[cfg(test)]
438 self.test_notify_event_processed.notify_one();
439
440 should_continue
441 }
442}
443
444#[cfg(feature = "tokio-graceful-shutdown")]
445#[derive(Debug, Error)]
446pub enum NeverError {}
447
448#[cfg(feature = "tokio-graceful-shutdown")]
449impl IntoSubsystem<NeverError> for StatisticsServer {
450 async fn run(mut self, subsys: &mut SubsystemHandle) -> Result<(), NeverError> {
451 let _ = self.listen().cancel_on_shutdown(subsys).await;
452
453 Ok(())
454 }
455}
456
457#[derive(Debug, Error)]
458pub enum BuilderError {
459 #[error("missing field: {0}")]
460 MissingField(&'static str),
461
462 #[error(transparent)]
463 AddrParse(#[from] std::net::AddrParseError),
464}
465
466#[derive(Debug, Default)]
468pub struct StatisticsServerBuilder {
469 address: Option<String>,
470 tz: Option<Tz>,
471 stats: Option<HashMap<String, ClientStats>>,
472 event_rx: Option<broadcast::Receiver<Event>>,
473}
474
475impl StatisticsServerBuilder {
476 pub fn new() -> Self {
478 Self {
479 address: None,
480 tz: None,
481 stats: None,
482 event_rx: None,
483 }
484 }
485
486 pub fn address(mut self, address: impl Into<String>) -> Self {
488 self.address = Some(address.into());
489 self
490 }
491
492 pub fn tz(mut self, tz: Tz) -> Self {
494 self.tz = Some(tz);
495 self
496 }
497
498 pub fn stats(mut self, stats: HashMap<String, ClientStats>) -> Self {
502 self.stats = Some(stats);
503 self
504 }
505
506 pub fn event_rx(mut self, event_rx: broadcast::Receiver<Event>) -> Self {
508 self.event_rx = Some(event_rx);
509 self
510 }
511
512 pub fn build(self) -> Result<StatisticsServer, BuilderError> {
514 let addr: SocketAddr = self
515 .address
516 .ok_or(BuilderError::MissingField("address"))?
517 .parse()?;
518
519 Ok(StatisticsServer {
520 addr,
521 tz: self.tz.unwrap_or(Tz::UTC),
522 stats: Arc::new(RwLock::new(StatsTracker::new(
523 self.stats.unwrap_or_default(),
524 ))),
525 event_rx: self
526 .event_rx
527 .ok_or(BuilderError::MissingField("event_rx"))?,
528 #[cfg(test)]
529 test_notify_event_processed: Arc::new(tokio::sync::Notify::new()),
530 })
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use super::*;
537 use chrono::{Duration, NaiveTime, TimeZone};
538 use chrono_tz::UTC;
539 use ntest::timeout;
540 use tokio::io::{AsyncReadExt, AsyncWriteExt};
541 use uuid::Uuid;
542
543 fn make_bloop(client_id: &str, recorded_at: DateTime<Utc>) -> ProcessedBloop {
544 ProcessedBloop {
545 client_id: client_id.to_string(),
546 player_id: Uuid::new_v4(),
547 recorded_at,
548 }
549 }
550
551 #[test]
552 fn minute_index_calculation() {
553 let time = NaiveTime::from_hms_opt(13, 45, 0).unwrap();
554 assert_eq!(minute_index(time), 13 * 60 + 45);
555
556 let time = NaiveTime::from_hms_opt(0, 0, 0).unwrap();
557 assert_eq!(minute_index(time), 0);
558 }
559
560 #[test]
561 fn record_bloop_basic() {
562 let tz = UTC;
563 let mut stats = ClientStats::new();
564 let now = Utc.with_ymd_and_hms(2025, 7, 5, 12, 30, 0).unwrap();
565
566 let bloop = make_bloop("client1", now);
567 stats.record_bloop(bloop, &tz);
568
569 assert_eq!(stats.total_bloops, 1);
570
571 let idx = minute_index(now.time());
572 assert_eq!(stats.per_minute_bloops[idx], 1);
573 assert_eq!(stats.per_minute_dates[idx], now.date_naive());
574
575 let hour = now.hour() as usize;
576 assert_eq!(stats.bloops_per_hour[hour], 1);
577 assert_eq!(stats.bloops_per_day[&now.date_naive()], 1);
578 }
579
580 #[test]
581 fn record_bloop_accumulation_and_wraparound() {
582 let tz = UTC;
583 let mut stats = ClientStats::new();
584 let now = Utc.with_ymd_and_hms(2025, 7, 5, 0, 1, 0).unwrap();
585
586 let bloop1 = make_bloop("client1", now);
587 stats.record_bloop(bloop1, &tz);
588 stats.record_bloop(make_bloop("client1", now), &tz);
589
590 let idx = minute_index(now.time());
591 assert_eq!(stats.per_minute_bloops[idx], 2);
592 assert_eq!(stats.per_minute_dates[idx], now.date_naive());
593
594 let yesterday = now - Duration::days(1);
595 let idx_wrap = minute_index(yesterday.time());
596 stats.record_bloop(make_bloop("client1", yesterday), &tz);
597
598 assert_eq!(stats.per_minute_dates[idx_wrap], yesterday.date_naive());
599 assert_eq!(stats.per_minute_bloops[idx_wrap], 1);
600 }
601
602 #[test]
603 fn count_last_minutes() {
604 let tz = UTC;
605 let mut stats = ClientStats::new();
606 let now = Utc.with_ymd_and_hms(2025, 7, 5, 10, 0, 0).unwrap();
607
608 stats.record_bloop(make_bloop("c", now), &tz);
609 stats.record_bloop(make_bloop("c", now - Duration::minutes(1)), &tz);
610 stats.record_bloop(make_bloop("c", now - Duration::minutes(2)), &tz);
611
612 let count = stats.count_last_minutes(now, 3);
613 assert_eq!(count, 3);
614
615 let count_short = stats.count_last_minutes(now, 1);
616 assert_eq!(count_short, 1);
617 }
618
619 #[test]
620 fn stats_tracker_snapshot() {
621 let tz = UTC;
622 let mut tracker = StatsTracker::default();
623
624 let now = Utc.with_ymd_and_hms(2025, 7, 5, 15, 0, 0).unwrap();
625 let bloop = make_bloop("client-a", now);
626 tracker.record_bloop(bloop, &tz);
627
628 let snapshot = tracker.compute_snapshot(now);
629
630 assert!(snapshot.clients.contains_key("client-a"));
631 let client_stats = &snapshot.clients["client-a"];
632
633 assert_eq!(client_stats.total_bloops, 1);
634 assert!(client_stats.bloops_last_hour >= 1);
635 assert!(client_stats.bloops_last_24_hours >= 1);
636 assert!(client_stats.bloops_per_hour.iter().any(|&x| x >= 1));
637 assert!(client_stats.bloops_per_day.contains_key(&now.date_naive()));
638
639 assert_eq!(snapshot.global.total_bloops, 1);
640 }
641
642 fn dummy_stats() -> HashMap<String, ClientStats> {
643 let mut map = HashMap::new();
644 map.insert("client1".to_string(), Default::default());
645 map
646 }
647
648 fn dummy_event_rx() -> broadcast::Receiver<Event> {
649 let (_tx, rx) = broadcast::channel(16);
651 rx
652 }
653
654 #[test]
655 fn build_succeeds_with_all_fields() {
656 let builder = StatisticsServerBuilder::new()
657 .address("127.0.0.1:12345")
658 .tz(chrono_tz::Europe::London)
659 .stats(dummy_stats())
660 .event_rx(dummy_event_rx());
661
662 let server = builder.build().unwrap();
663 assert_eq!(server.addr, "127.0.0.1:12345".parse().unwrap());
664 assert_eq!(server.tz, chrono_tz::Europe::London);
665 }
666
667 #[test]
668 fn build_fails_if_addr_missing() {
669 let builder = StatisticsServerBuilder::new()
670 .stats(dummy_stats())
671 .event_rx(dummy_event_rx());
672
673 let err = builder.build().unwrap_err();
674 assert!(matches!(err, BuilderError::MissingField(field) if field == "address"));
675 }
676 #[test]
677 fn build_fails_if_event_rx_missing() {
678 let builder = StatisticsServerBuilder::new()
679 .address("127.0.0.1:12345")
680 .stats(dummy_stats());
681
682 let err = builder.build().unwrap_err();
683 assert!(matches!(err, BuilderError::MissingField(field) if field == "event_rx"));
684 }
685
686 #[test]
687 fn build_defaults_tz_to_utc() {
688 let builder = StatisticsServerBuilder::new()
689 .address("127.0.0.1:12345")
690 .stats(dummy_stats())
691 .event_rx(dummy_event_rx());
692
693 let server = builder.build().unwrap();
694 assert_eq!(server.tz, UTC);
695 }
696
697 #[tokio::test]
698 #[timeout(1000)]
699 async fn server_handles_bloop_processed_event() {
700 let tz = Tz::UTC;
701 let stats_map = HashMap::<String, ClientStats>::new();
702
703 let (sender, event_rx) = broadcast::channel(16);
704
705 let mut server = StatisticsServerBuilder::new()
706 .address("127.0.0.1:12345")
707 .tz(tz)
708 .stats(stats_map)
709 .event_rx(event_rx)
710 .build()
711 .unwrap();
712 let notify = server.test_notify_event_processed.clone();
713 let stats = server.stats.clone();
714
715 tokio::spawn(async move {
716 let _ = server.listen().await;
717 });
718
719 let bloop = Event::BloopProcessed(make_bloop("client", Utc::now()));
720 sender.send(bloop).unwrap();
721 notify.notified().await;
722
723 let stats = stats.read().await;
724 let snapshot = stats.snapshot(Utc::now()).await;
725
726 assert_eq!(snapshot.clients["client"].total_bloops, 1);
727 }
728
729 #[tokio::test]
730 #[timeout(1000)]
731 async fn server_serves_stats_over_http() {
732 let (sender, event_rx) = broadcast::channel(16);
733
734 let local_addr = TcpListener::bind("127.0.0.1:0")
735 .await
736 .unwrap()
737 .local_addr()
738 .unwrap();
739
740 let mut server = StatisticsServerBuilder::new()
741 .address(local_addr.to_string())
742 .event_rx(event_rx)
743 .build()
744 .unwrap();
745 let notify = server.test_notify_event_processed.clone();
746
747 tokio::spawn(async move {
748 let _ = server.listen().await;
749 });
750
751 let bloop = Event::BloopProcessed(ProcessedBloop {
752 player_id: Uuid::new_v4(),
753 client_id: "client".to_string(),
754 recorded_at: Utc::now(),
755 });
756 sender.send(bloop).unwrap();
757 notify.notified().await;
758
759 let mut client = TcpStream::connect(local_addr).await.unwrap();
760 let request = b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n";
761 client.write_all(request).await.unwrap();
762
763 let mut buffer = vec![0; 1024];
764 let bytes_read = client.read(&mut buffer).await.unwrap();
765
766 let response = String::from_utf8_lossy(&buffer[..bytes_read]);
767 assert!(response.contains("200 OK"));
768 assert!(response.contains("application/json"));
769 }
770}