1use crate::bloop::ProcessedBloop;
2use crate::event::Event;
3use bytes::Bytes;
4use chrono::{DateTime, NaiveDate, Timelike, Utc};
5use chrono_tz::Tz;
6use http_body_util::Full;
7use hyper::server::conn::http1;
8use hyper::service::service_fn;
9use hyper::{Request, Response};
10use hyper_util::rt::{TokioIo, TokioTimer};
11use serde::Serialize;
12use std::collections::HashMap;
13use std::convert::Infallible;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use thiserror::Error;
17use tokio::net::{TcpListener, TcpStream};
18use tokio::sync::broadcast::error::RecvError;
19use tokio::sync::{RwLock, broadcast};
20use tokio::{io, select, task};
21#[cfg(feature = "tokio-graceful-shutdown")]
22use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
23use tracing::{debug, error, warn};
24
25const MINUTES_IN_DAY: usize = 60 * 24;
26
27#[derive(Debug)]
134pub struct ClientStats {
135 pub total_bloops: u64,
137
138 pub per_minute_bloops: [u8; MINUTES_IN_DAY],
142
143 pub per_minute_dates: [NaiveDate; MINUTES_IN_DAY],
145
146 pub bloops_per_hour: [u32; 24],
148
149 pub bloops_per_day: HashMap<NaiveDate, u32>,
151}
152
153impl Default for ClientStats {
154 fn default() -> Self {
155 Self::new()
156 }
157}
158
159impl ClientStats {
160 pub fn new() -> Self {
162 Self {
163 total_bloops: 0,
164 per_minute_bloops: [0; MINUTES_IN_DAY],
165 per_minute_dates: [NaiveDate::MIN; MINUTES_IN_DAY],
166 bloops_per_hour: [0; 24],
167 bloops_per_day: HashMap::new(),
168 }
169 }
170
171 fn record_bloop(&mut self, bloop: ProcessedBloop, tz: &Tz) {
172 let utc_date = bloop.recorded_at.date_naive();
173 let time = bloop.recorded_at.time();
174 let idx = (time.hour() * 60 + time.minute()) as usize;
175
176 if self.per_minute_dates[idx] != utc_date {
177 self.per_minute_dates[idx] = utc_date;
178 self.per_minute_bloops[idx] = 1;
179 } else {
180 self.per_minute_bloops[idx] = self.per_minute_bloops[idx].saturating_add(1);
181 }
182
183 let local_dt = bloop.recorded_at.with_timezone(tz);
184 let local_hour = local_dt.hour() as usize;
185 let local_date = local_dt.date_naive();
186
187 self.total_bloops += 1;
188 self.bloops_per_hour[local_hour] += 1;
189 *self.bloops_per_day.entry(local_date).or_insert(0) += 1;
190 }
191
192 fn count_last_minutes(&self, now: DateTime<Utc>, minutes: usize) -> u32 {
193 let now_minute = (now.hour() * 60 + now.minute()) as usize;
194 let mut total = 0;
195
196 for i in 0..minutes {
197 let idx = (now_minute + MINUTES_IN_DAY - i) % MINUTES_IN_DAY;
198 let expected_date = (now - chrono::Duration::minutes(i as i64)).date_naive();
199
200 if self.per_minute_dates[idx] == expected_date {
201 total += self.per_minute_bloops[idx] as u32;
202 }
203 }
204
205 total
206 }
207}
208
209#[inline]
225pub fn minute_index(time: impl Timelike) -> usize {
226 time.hour() as usize * 60 + time.minute() as usize
227}
228
229#[derive(Debug, Clone, Serialize)]
230#[serde(rename_all = "camelCase")]
231struct StatsSummary {
232 pub total_bloops: u64,
233 pub bloops_last_hour: u32,
234 pub bloops_last_24_hours: u32,
235 pub bloops_per_hour: [u32; 24],
236 pub bloops_per_day: HashMap<NaiveDate, u32>,
237}
238
239#[derive(Debug, Clone, Serialize)]
240#[serde(rename_all = "camelCase")]
241struct StatsSnapshot {
242 pub created_at: DateTime<Utc>,
243 pub clients: HashMap<String, StatsSummary>,
244 pub global: StatsSummary,
245}
246
247#[derive(Debug, Default)]
248pub struct StatsTracker {
249 clients: HashMap<String, ClientStats>,
250 cached_snapshot: RwLock<Option<StatsSnapshot>>,
251}
252
253impl From<HashMap<String, ClientStats>> for StatsTracker {
254 fn from(clients: HashMap<String, ClientStats>) -> Self {
255 Self::new(clients)
256 }
257}
258
259impl StatsTracker {
260 pub fn new(clients: HashMap<String, ClientStats>) -> Self {
261 Self {
262 clients,
263 cached_snapshot: RwLock::new(None),
264 }
265 }
266
267 fn record_bloop(&mut self, bloop: ProcessedBloop, tz: &Tz) {
268 let client = self.clients.entry(bloop.client_id.clone()).or_default();
269
270 client.record_bloop(bloop, tz);
271 }
272
273 async fn snapshot(&self, now: DateTime<Utc>) -> StatsSnapshot {
274 if let Some(snapshot) = self.cached_snapshot.read().await.as_ref()
275 && snapshot.created_at > now - chrono::Duration::minutes(1)
276 {
277 return snapshot.clone();
278 }
279
280 let snapshot = self.compute_snapshot(now);
281 self.cached_snapshot.write().await.replace(snapshot.clone());
282
283 snapshot
284 }
285
286 fn compute_snapshot(&self, now: DateTime<Utc>) -> StatsSnapshot {
287 let mut clients_snapshots = HashMap::new();
288 let mut global = StatsSummary {
289 total_bloops: 0,
290 bloops_last_hour: 0,
291 bloops_last_24_hours: 0,
292 bloops_per_hour: [0; 24],
293 bloops_per_day: HashMap::new(),
294 };
295
296 for (client_id, client) in &self.clients {
297 let bloops_last_hour = client.count_last_minutes(now, 60);
298 let bloops_last_24_hours = client.count_last_minutes(now, MINUTES_IN_DAY);
299
300 let snapshot = StatsSummary {
301 total_bloops: client.total_bloops,
302 bloops_last_hour,
303 bloops_last_24_hours,
304 bloops_per_hour: client.bloops_per_hour,
305 bloops_per_day: client.bloops_per_day.clone(),
306 };
307
308 global.total_bloops += snapshot.total_bloops;
309 global.bloops_last_hour += snapshot.bloops_last_hour;
310 global.bloops_last_24_hours += snapshot.bloops_last_24_hours;
311
312 for hour in 0..24 {
313 global.bloops_per_hour[hour] += snapshot.bloops_per_hour[hour];
314 }
315
316 for (day, count) in snapshot.bloops_per_day.iter() {
317 *global.bloops_per_day.entry(*day).or_insert(0) += count;
318 }
319
320 clients_snapshots.insert(client_id.clone(), snapshot);
321 }
322
323 StatsSnapshot {
324 created_at: now,
325 clients: clients_snapshots,
326 global,
327 }
328 }
329}
330
331#[derive(Debug)]
333pub struct StatisticsServer {
334 addr: SocketAddr,
335 stats: Arc<RwLock<StatsTracker>>,
336 event_rx: broadcast::Receiver<Event>,
337 tz: Tz,
338 #[cfg(test)]
339 pub test_notify_event_processed: Arc<tokio::sync::Notify>,
340}
341
342impl StatisticsServer {
343 pub async fn listen(&mut self) -> Result<(), io::Error> {
347 let listener = TcpListener::bind(self.addr).await?;
348
349 loop {
350 let should_continue = select! {
351 conn = listener.accept() => {
352 if let Ok((stream, _)) = conn {
353 self.handle_connection(stream).await;
354 }
355
356 true
357 }
358
359 result = self.event_rx.recv() => {
360 self.handle_recv(result).await
361 }
362 };
363
364 if !should_continue {
365 break;
366 }
367 }
368
369 Ok(())
370 }
371
372 async fn handle_connection(&self, stream: TcpStream) {
374 let io = TokioIo::new(stream);
375 let stats = self.stats.clone();
376
377 task::spawn(async move {
378 let result = http1::Builder::new()
379 .timer(TokioTimer::new())
380 .serve_connection(
381 io,
382 service_fn(move |_: Request<_>| {
383 let stats = stats.clone();
384
385 async move {
386 let snapshot = stats.read().await.snapshot(Utc::now()).await;
387
388 let body = match serde_json::to_string(&snapshot) {
389 Ok(body) => body,
390 Err(err) => {
391 error!("failed to serialize statistics: {:?}", err);
392 return Ok::<_, Infallible>(
393 Response::builder()
394 .status(500)
395 .body(Full::default())
396 .unwrap(),
397 );
398 }
399 };
400
401 Ok::<_, Infallible>(
402 Response::builder()
403 .header("Content-Type", "application/json")
404 .body(Full::new(Bytes::from(body)))
405 .unwrap(),
406 )
407 }
408 }),
409 )
410 .await;
411
412 if let Err(err) = result {
413 error!("failed to serve statistics request: {:?}", err);
414 }
415 });
416 }
417
418 async fn handle_recv(&self, result: Result<Event, RecvError>) -> bool {
423 let should_continue = match result {
424 Ok(Event::BloopProcessed(bloop)) => {
425 self.stats.write().await.record_bloop(bloop, &self.tz);
426 true
427 }
428 Ok(_) => true,
429 Err(RecvError::Lagged(n)) => {
430 warn!("StatisticsServer lagged by {n} messages, some bloops were missed");
431 true
432 }
433 Err(RecvError::Closed) => {
434 debug!("StatisticsServer event stream closed, exiting event loop");
435 false
436 }
437 };
438
439 #[cfg(test)]
440 self.test_notify_event_processed.notify_one();
441
442 should_continue
443 }
444}
445
446#[cfg(feature = "tokio-graceful-shutdown")]
447#[derive(Debug, Error)]
448pub enum NeverError {}
449
450#[cfg(feature = "tokio-graceful-shutdown")]
451impl IntoSubsystem<NeverError> for StatisticsServer {
452 async fn run(mut self, subsys: &mut SubsystemHandle) -> Result<(), NeverError> {
453 let _ = self.listen().cancel_on_shutdown(subsys).await;
454
455 Ok(())
456 }
457}
458
459#[derive(Debug, Error)]
460pub enum BuilderError {
461 #[error("missing field: {0}")]
462 MissingField(&'static str),
463
464 #[error(transparent)]
465 AddrParse(#[from] std::net::AddrParseError),
466}
467
468#[derive(Debug, Default)]
470pub struct StatisticsServerBuilder {
471 address: Option<String>,
472 tz: Option<Tz>,
473 stats: Option<HashMap<String, ClientStats>>,
474 event_rx: Option<broadcast::Receiver<Event>>,
475}
476
477impl StatisticsServerBuilder {
478 pub fn new() -> Self {
480 Self {
481 address: None,
482 tz: None,
483 stats: None,
484 event_rx: None,
485 }
486 }
487
488 pub fn address(mut self, address: impl Into<String>) -> Self {
490 self.address = Some(address.into());
491 self
492 }
493
494 pub fn tz(mut self, tz: Tz) -> Self {
496 self.tz = Some(tz);
497 self
498 }
499
500 pub fn stats(mut self, stats: HashMap<String, ClientStats>) -> Self {
504 self.stats = Some(stats);
505 self
506 }
507
508 pub fn event_rx(mut self, event_rx: broadcast::Receiver<Event>) -> Self {
510 self.event_rx = Some(event_rx);
511 self
512 }
513
514 pub fn build(self) -> Result<StatisticsServer, BuilderError> {
516 let addr: SocketAddr = self
517 .address
518 .ok_or(BuilderError::MissingField("address"))?
519 .parse()?;
520
521 Ok(StatisticsServer {
522 addr,
523 tz: self.tz.unwrap_or(Tz::UTC),
524 stats: Arc::new(RwLock::new(StatsTracker::new(
525 self.stats.unwrap_or_default(),
526 ))),
527 event_rx: self
528 .event_rx
529 .ok_or(BuilderError::MissingField("event_rx"))?,
530 #[cfg(test)]
531 test_notify_event_processed: Arc::new(tokio::sync::Notify::new()),
532 })
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539 use chrono::{Duration, NaiveTime, TimeZone};
540 use chrono_tz::UTC;
541 use ntest::timeout;
542 use tokio::io::{AsyncReadExt, AsyncWriteExt};
543 use uuid::Uuid;
544
545 fn make_bloop(client_id: &str, recorded_at: DateTime<Utc>) -> ProcessedBloop {
546 ProcessedBloop {
547 client_id: client_id.to_string(),
548 player_id: Uuid::new_v4(),
549 recorded_at,
550 }
551 }
552
553 #[test]
554 fn minute_index_calculation() {
555 let time = NaiveTime::from_hms_opt(13, 45, 0).unwrap();
556 assert_eq!(minute_index(time), 13 * 60 + 45);
557
558 let time = NaiveTime::from_hms_opt(0, 0, 0).unwrap();
559 assert_eq!(minute_index(time), 0);
560 }
561
562 #[test]
563 fn record_bloop_basic() {
564 let tz = UTC;
565 let mut stats = ClientStats::new();
566 let now = Utc.with_ymd_and_hms(2025, 7, 5, 12, 30, 0).unwrap();
567
568 let bloop = make_bloop("client1", now);
569 stats.record_bloop(bloop, &tz);
570
571 assert_eq!(stats.total_bloops, 1);
572
573 let idx = minute_index(now.time());
574 assert_eq!(stats.per_minute_bloops[idx], 1);
575 assert_eq!(stats.per_minute_dates[idx], now.date_naive());
576
577 let hour = now.hour() as usize;
578 assert_eq!(stats.bloops_per_hour[hour], 1);
579 assert_eq!(stats.bloops_per_day[&now.date_naive()], 1);
580 }
581
582 #[test]
583 fn record_bloop_accumulation_and_wraparound() {
584 let tz = UTC;
585 let mut stats = ClientStats::new();
586 let now = Utc.with_ymd_and_hms(2025, 7, 5, 0, 1, 0).unwrap();
587
588 let bloop1 = make_bloop("client1", now);
589 stats.record_bloop(bloop1, &tz);
590 stats.record_bloop(make_bloop("client1", now), &tz);
591
592 let idx = minute_index(now.time());
593 assert_eq!(stats.per_minute_bloops[idx], 2);
594 assert_eq!(stats.per_minute_dates[idx], now.date_naive());
595
596 let yesterday = now - Duration::days(1);
597 let idx_wrap = minute_index(yesterday.time());
598 stats.record_bloop(make_bloop("client1", yesterday), &tz);
599
600 assert_eq!(stats.per_minute_dates[idx_wrap], yesterday.date_naive());
601 assert_eq!(stats.per_minute_bloops[idx_wrap], 1);
602 }
603
604 #[test]
605 fn count_last_minutes() {
606 let tz = UTC;
607 let mut stats = ClientStats::new();
608 let now = Utc.with_ymd_and_hms(2025, 7, 5, 10, 0, 0).unwrap();
609
610 stats.record_bloop(make_bloop("c", now), &tz);
611 stats.record_bloop(make_bloop("c", now - Duration::minutes(1)), &tz);
612 stats.record_bloop(make_bloop("c", now - Duration::minutes(2)), &tz);
613
614 let count = stats.count_last_minutes(now, 3);
615 assert_eq!(count, 3);
616
617 let count_short = stats.count_last_minutes(now, 1);
618 assert_eq!(count_short, 1);
619 }
620
621 #[test]
622 fn stats_tracker_snapshot() {
623 let tz = UTC;
624 let mut tracker = StatsTracker::default();
625
626 let now = Utc.with_ymd_and_hms(2025, 7, 5, 15, 0, 0).unwrap();
627 let bloop = make_bloop("client-a", now);
628 tracker.record_bloop(bloop, &tz);
629
630 let snapshot = tracker.compute_snapshot(now);
631
632 assert!(snapshot.clients.contains_key("client-a"));
633 let client_stats = &snapshot.clients["client-a"];
634
635 assert_eq!(client_stats.total_bloops, 1);
636 assert!(client_stats.bloops_last_hour >= 1);
637 assert!(client_stats.bloops_last_24_hours >= 1);
638 assert!(client_stats.bloops_per_hour.iter().any(|&x| x >= 1));
639 assert!(client_stats.bloops_per_day.contains_key(&now.date_naive()));
640
641 assert_eq!(snapshot.global.total_bloops, 1);
642 }
643
644 #[test]
645 fn count_last_minutes_across_midnight() {
646 let tz = UTC;
648 let mut stats = ClientStats::new();
649 let now = Utc.with_ymd_and_hms(2025, 7, 6, 0, 1, 0).unwrap();
650
651 stats.record_bloop(make_bloop("c", now), &tz);
653 stats.record_bloop(make_bloop("c", now - Duration::minutes(1)), &tz);
655 stats.record_bloop(make_bloop("c", now - Duration::minutes(2)), &tz);
657
658 let count = stats.count_last_minutes(now, 3);
659 assert_eq!(
660 count, 3,
661 "should count all 3 minutes even when wrapping past midnight"
662 );
663 }
664
665 #[test]
666 fn record_bloop_uses_local_timezone_for_bloops_per_day() {
667 let tz = chrono_tz::Europe::Helsinki; let mut stats = ClientStats::new();
671
672 let recorded_at = Utc.with_ymd_and_hms(2025, 7, 5, 23, 30, 0).unwrap();
674 let bloop = make_bloop("client1", recorded_at);
675 stats.record_bloop(bloop, &tz);
676
677 let local_date = recorded_at.with_timezone(&tz).date_naive();
678 let utc_date = recorded_at.date_naive();
679
680 assert_ne!(local_date, utc_date);
682 assert_eq!(stats.bloops_per_day.get(&local_date), Some(&1));
684 assert_eq!(stats.bloops_per_day.get(&utc_date), None);
685 }
686
687 fn dummy_stats() -> HashMap<String, ClientStats> {
688 let mut map = HashMap::new();
689 map.insert("client1".to_string(), Default::default());
690 map
691 }
692
693 fn dummy_event_rx() -> broadcast::Receiver<Event> {
694 let (_tx, rx) = broadcast::channel(16);
696 rx
697 }
698
699 #[test]
700 fn build_succeeds_with_all_fields() {
701 let builder = StatisticsServerBuilder::new()
702 .address("127.0.0.1:12345")
703 .tz(chrono_tz::Europe::London)
704 .stats(dummy_stats())
705 .event_rx(dummy_event_rx());
706
707 let server = builder.build().unwrap();
708 assert_eq!(server.addr, "127.0.0.1:12345".parse().unwrap());
709 assert_eq!(server.tz, chrono_tz::Europe::London);
710 }
711
712 #[test]
713 fn build_fails_if_addr_missing() {
714 let builder = StatisticsServerBuilder::new()
715 .stats(dummy_stats())
716 .event_rx(dummy_event_rx());
717
718 let err = builder.build().unwrap_err();
719 assert!(matches!(err, BuilderError::MissingField(field) if field == "address"));
720 }
721 #[test]
722 fn build_fails_if_event_rx_missing() {
723 let builder = StatisticsServerBuilder::new()
724 .address("127.0.0.1:12345")
725 .stats(dummy_stats());
726
727 let err = builder.build().unwrap_err();
728 assert!(matches!(err, BuilderError::MissingField(field) if field == "event_rx"));
729 }
730
731 #[test]
732 fn build_defaults_tz_to_utc() {
733 let builder = StatisticsServerBuilder::new()
734 .address("127.0.0.1:12345")
735 .stats(dummy_stats())
736 .event_rx(dummy_event_rx());
737
738 let server = builder.build().unwrap();
739 assert_eq!(server.tz, UTC);
740 }
741
742 #[tokio::test]
743 #[timeout(1000)]
744 async fn server_handles_bloop_processed_event() {
745 let tz = Tz::UTC;
746 let stats_map = HashMap::<String, ClientStats>::new();
747
748 let (sender, event_rx) = broadcast::channel(16);
749
750 let mut server = StatisticsServerBuilder::new()
751 .address("127.0.0.1:12345")
752 .tz(tz)
753 .stats(stats_map)
754 .event_rx(event_rx)
755 .build()
756 .unwrap();
757 let notify = server.test_notify_event_processed.clone();
758 let stats = server.stats.clone();
759
760 tokio::spawn(async move {
761 let _ = server.listen().await;
762 });
763
764 let bloop = Event::BloopProcessed(make_bloop("client", Utc::now()));
765 sender.send(bloop).unwrap();
766 notify.notified().await;
767
768 let stats = stats.read().await;
769 let snapshot = stats.snapshot(Utc::now()).await;
770
771 assert_eq!(snapshot.clients["client"].total_bloops, 1);
772 }
773
774 #[tokio::test]
775 #[timeout(1000)]
776 async fn server_serves_stats_over_http() {
777 let (sender, event_rx) = broadcast::channel(16);
778
779 let local_addr = TcpListener::bind("127.0.0.1:0")
780 .await
781 .unwrap()
782 .local_addr()
783 .unwrap();
784
785 let mut server = StatisticsServerBuilder::new()
786 .address(local_addr.to_string())
787 .event_rx(event_rx)
788 .build()
789 .unwrap();
790 let notify = server.test_notify_event_processed.clone();
791
792 tokio::spawn(async move {
793 let _ = server.listen().await;
794 });
795
796 let bloop = Event::BloopProcessed(ProcessedBloop {
797 player_id: Uuid::new_v4(),
798 client_id: "client".to_string(),
799 recorded_at: Utc::now(),
800 });
801 sender.send(bloop).unwrap();
802 notify.notified().await;
803
804 let mut client = TcpStream::connect(local_addr).await.unwrap();
805 let request = b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n";
806 client.write_all(request).await.unwrap();
807
808 let mut buffer = vec![0; 1024];
809 let bytes_read = client.read(&mut buffer).await.unwrap();
810
811 let response = String::from_utf8_lossy(&buffer[..bytes_read]);
812 assert!(response.contains("200 OK"));
813 assert!(response.contains("application/json"));
814 }
815}