Skip to main content

bloop_server_framework/
statistics.rs

1use crate::bloop::ProcessedBloop;
2use crate::event::Event;
3use bytes::Bytes;
4use chrono::{DateTime, NaiveDate, Timelike, Utc};
5use chrono_tz::Tz;
6use http_body_util::Full;
7use hyper::server::conn::http1;
8use hyper::service::service_fn;
9use hyper::{Request, Response};
10use hyper_util::rt::{TokioIo, TokioTimer};
11use serde::Serialize;
12use std::collections::HashMap;
13use std::convert::Infallible;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use thiserror::Error;
17use tokio::net::{TcpListener, TcpStream};
18use tokio::sync::broadcast::error::RecvError;
19use tokio::sync::{RwLock, broadcast};
20use tokio::{io, select, task};
21#[cfg(feature = "tokio-graceful-shutdown")]
22use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
23use tracing::{debug, error, warn};
24
25const MINUTES_IN_DAY: usize = 60 * 24;
26
27/// Stats collected per client to track "bloops" over time.
28///
29/// This structure holds aggregated counters for the last 24 hours and beyond,
30/// to efficiently support queries like "how many bloops in the last hour" or
31/// "bloops per day". It is designed to be seeded from a database on startup.
32///
33/// # Seeding from PostgreSQL
34///
35/// To efficiently load these stats from a Postgres database with potentially
36/// thousands of bloops per client, the following SQL queries are recommended.
37///
38/// ## Recommended index:
39///
40/// ```sql
41/// CREATE INDEX idx_bloops_client_recorded_at ON bloops (
42///     client_id,
43///     recorded_at
44/// );
45/// ```
46///
47/// This index speeds up queries filtering by client and time range.
48///
49/// ## Total bloops per client:
50///
51/// ```sql
52/// SELECT client_id, COUNT(*) as total_bloops
53/// FROM bloops
54/// GROUP BY client_id;
55/// ```
56///
57/// ## Bloops per minute for the last 24 hours (UTC):
58///
59/// ```sql
60/// SELECT client_id,
61///        DATE_TRUNC(
62///            'minute',
63///            recorded_at AT TIME ZONE 'UTC'
64///        ) AS minute_bucket,
65///        COUNT(*) AS count
66/// FROM bloops
67/// WHERE recorded_at >= NOW() - INTERVAL '24 hours'
68/// GROUP BY client_id, minute_bucket;
69/// ```
70///
71/// - `minute_bucket` contains the UTC timestamp truncated to the minute.
72/// - You can map `minute_bucket` to your in-memory bucket index like this:
73///
74/// ### Mapping to In-Memory Buckets:
75///
76/// You can map the `minute_bucket` to your in-memory buckets like this:
77///
78/// ```no_run
79/// use chrono::{NaiveDateTime, Timelike};
80/// use bloop_server_framework::statistics::{minute_index, ClientStats};
81/// use bloop_server_framework::test_utils::Utc;
82///
83/// // Load these from your database
84/// let minute_bucket = Utc::now().naive_utc();
85/// let count = 0;
86///
87/// let mut client_stats = ClientStats::new();
88/// let idx = minute_index(minute_bucket);
89/// let date = minute_bucket.date();
90///
91/// client_stats.per_minute_dates[idx] = date;
92/// client_stats.per_minute_bloops[idx] = count as u8;
93/// ```
94///
95/// ## Bloops per hour for the last 24 hours (in local time):
96///
97/// ```sql
98/// SELECT client_id,
99///        EXTRACT(hour FROM recorded_at AT TIME ZONE 'your_timezone') AS hour,
100///        COUNT(*) AS count
101/// FROM bloops
102/// WHERE recorded_at >= NOW() - INTERVAL '24 hours'
103/// GROUP BY client_id, hour;
104/// ```
105///
106/// ## Bloops per day (in local time):
107///
108/// ```sql
109/// SELECT client_id,
110///        DATE(recorded_at AT TIME ZONE 'your_timezone') AS day,
111///        COUNT(*) AS count
112/// FROM bloops
113/// GROUP BY client_id, day;
114/// ```
115///
116/// After executing these queries, aggregate the counts per client into the
117/// fields of this struct.
118///
119/// # Example usage:
120///
121/// ```no_run
122/// use std::collections::HashMap;
123/// use bloop_server_framework::statistics::ClientStats;
124///
125/// let mut stats_map: HashMap<String, ClientStats> = HashMap::new();
126///
127/// // For each row of minute-buckets query:
128/// // 1. Parse `minute_bucket` into NaiveDate and index.
129/// // 2. Insert count into `per_minute_bloops` and `per_minute_dates`.
130///
131/// // Similarly for other queries...
132/// ```
133#[derive(Debug)]
134pub struct ClientStats {
135    /// Total bloops observed for this client.
136    pub total_bloops: u64,
137
138    /// Count of bloops per minute in a rolling 24-hour buffer.
139    ///
140    /// Indexed by minute of day (0..1439).
141    pub per_minute_bloops: [u8; MINUTES_IN_DAY],
142
143    /// The date associated with each minute bucket, used to invalidate stale data.
144    pub per_minute_dates: [NaiveDate; MINUTES_IN_DAY],
145
146    /// Count of bloops per hour in the local timezone.
147    pub bloops_per_hour: [u32; 24],
148
149    /// Count of bloops per day, keyed by date in local timezone.
150    pub bloops_per_day: HashMap<NaiveDate, u32>,
151}
152
153impl Default for ClientStats {
154    fn default() -> Self {
155        Self::new()
156    }
157}
158
159impl ClientStats {
160    /// Creates a new empty statistics struct.
161    pub fn new() -> Self {
162        Self {
163            total_bloops: 0,
164            per_minute_bloops: [0; MINUTES_IN_DAY],
165            per_minute_dates: [NaiveDate::MIN; MINUTES_IN_DAY],
166            bloops_per_hour: [0; 24],
167            bloops_per_day: HashMap::new(),
168        }
169    }
170
171    fn record_bloop(&mut self, bloop: ProcessedBloop, tz: &Tz) {
172        let date = bloop.recorded_at.date_naive();
173        let time = bloop.recorded_at.time();
174        let idx = (time.hour() * 60 + time.minute()) as usize;
175
176        if self.per_minute_dates[idx] != date {
177            self.per_minute_dates[idx] = date;
178            self.per_minute_bloops[idx] = 1;
179        } else {
180            self.per_minute_bloops[idx] = self.per_minute_bloops[idx].saturating_add(1);
181        }
182
183        let local_hour = bloop.recorded_at.with_timezone(tz).hour() as usize;
184
185        self.total_bloops += 1;
186        self.bloops_per_hour[local_hour] += 1;
187        *self.bloops_per_day.entry(date).or_insert(0) += 1;
188    }
189
190    fn count_last_minutes(&self, now: DateTime<Utc>, minutes: usize) -> u32 {
191        let now_minute = (now.hour() * 60 + now.minute()) as usize;
192        let mut total = 0;
193
194        for i in 0..minutes {
195            let idx = now_minute.wrapping_sub(i) % MINUTES_IN_DAY;
196            let expected_date = (now - chrono::Duration::minutes(i as i64)).date_naive();
197
198            if self.per_minute_dates[idx] == expected_date {
199                total += self.per_minute_bloops[idx] as u32;
200            }
201        }
202
203        total
204    }
205}
206
207/// Calculates the minute index within a day (0..1439) for a given `NaiveTime`.
208///
209/// This is used to map events into per-minute buckets.
210///
211/// # Examples
212///
213/// ```
214/// use chrono::NaiveTime;
215/// use bloop_server_framework::statistics::minute_index;
216///
217/// let time = NaiveTime::from_hms_opt(13, 45, 0).unwrap();
218/// let idx = minute_index(time);
219///
220/// assert_eq!(idx, 13 * 60 + 45);
221/// ```
222#[inline]
223pub fn minute_index(time: impl Timelike) -> usize {
224    time.hour() as usize * 60 + time.minute() as usize
225}
226
227#[derive(Debug, Clone, Serialize)]
228#[serde(rename_all = "camelCase")]
229struct StatsSummary {
230    pub total_bloops: u64,
231    pub bloops_last_hour: u32,
232    pub bloops_last_24_hours: u32,
233    pub bloops_per_hour: [u32; 24],
234    pub bloops_per_day: HashMap<NaiveDate, u32>,
235}
236
237#[derive(Debug, Clone, Serialize)]
238#[serde(rename_all = "camelCase")]
239struct StatsSnapshot {
240    pub created_at: DateTime<Utc>,
241    pub clients: HashMap<String, StatsSummary>,
242    pub global: StatsSummary,
243}
244
245#[derive(Debug, Default)]
246pub struct StatsTracker {
247    clients: HashMap<String, ClientStats>,
248    cached_snapshot: RwLock<Option<StatsSnapshot>>,
249}
250
251impl From<HashMap<String, ClientStats>> for StatsTracker {
252    fn from(clients: HashMap<String, ClientStats>) -> Self {
253        Self::new(clients)
254    }
255}
256
257impl StatsTracker {
258    pub fn new(clients: HashMap<String, ClientStats>) -> Self {
259        Self {
260            clients,
261            cached_snapshot: RwLock::new(None),
262        }
263    }
264
265    fn record_bloop(&mut self, bloop: ProcessedBloop, tz: &Tz) {
266        let client = self.clients.entry(bloop.client_id.clone()).or_default();
267
268        client.record_bloop(bloop, tz);
269    }
270
271    async fn snapshot(&self, now: DateTime<Utc>) -> StatsSnapshot {
272        if let Some(snapshot) = self.cached_snapshot.read().await.as_ref()
273            && snapshot.created_at > now - chrono::Duration::minutes(1)
274        {
275            return snapshot.clone();
276        }
277
278        let snapshot = self.compute_snapshot(now);
279        self.cached_snapshot.write().await.replace(snapshot.clone());
280
281        snapshot
282    }
283
284    fn compute_snapshot(&self, now: DateTime<Utc>) -> StatsSnapshot {
285        let mut clients_snapshots = HashMap::new();
286        let mut global = StatsSummary {
287            total_bloops: 0,
288            bloops_last_hour: 0,
289            bloops_last_24_hours: 0,
290            bloops_per_hour: [0; 24],
291            bloops_per_day: HashMap::new(),
292        };
293
294        for (client_id, client) in &self.clients {
295            let bloops_last_hour = client.count_last_minutes(now, 60);
296            let bloops_last_24_hours = client.count_last_minutes(now, MINUTES_IN_DAY);
297
298            let snapshot = StatsSummary {
299                total_bloops: client.total_bloops,
300                bloops_last_hour,
301                bloops_last_24_hours,
302                bloops_per_hour: client.bloops_per_hour,
303                bloops_per_day: client.bloops_per_day.clone(),
304            };
305
306            global.total_bloops += snapshot.total_bloops;
307            global.bloops_last_hour += snapshot.bloops_last_hour;
308            global.bloops_last_24_hours += snapshot.bloops_last_24_hours;
309
310            for hour in 0..24 {
311                global.bloops_per_hour[hour] += snapshot.bloops_per_hour[hour];
312            }
313
314            for (day, count) in snapshot.bloops_per_day.iter() {
315                *global.bloops_per_day.entry(*day).or_insert(0) += count;
316            }
317
318            clients_snapshots.insert(client_id.clone(), snapshot);
319        }
320
321        StatsSnapshot {
322            created_at: now,
323            clients: clients_snapshots,
324            global,
325        }
326    }
327}
328
329/// A background service that collects statistics and exposes them over HTTP.
330#[derive(Debug)]
331pub struct StatisticsServer {
332    addr: SocketAddr,
333    stats: Arc<RwLock<StatsTracker>>,
334    event_rx: broadcast::Receiver<Event>,
335    tz: Tz,
336    #[cfg(test)]
337    pub test_notify_event_processed: Arc<tokio::sync::Notify>,
338}
339
340impl StatisticsServer {
341    /// Starts the statistics server and begins listening for events and HTTP requests.
342    ///
343    /// This runs the main event loop until the broadcast channel is closed.
344    pub async fn listen(&mut self) -> Result<(), io::Error> {
345        let listener = TcpListener::bind(self.addr).await?;
346
347        loop {
348            let should_continue = select! {
349                conn = listener.accept() => {
350                    if let Ok((stream, _)) = conn {
351                        self.handle_connection(stream).await;
352                    }
353
354                    true
355                }
356
357                result = self.event_rx.recv() => {
358                    self.handle_recv(result).await
359                }
360            };
361
362            if !should_continue {
363                break;
364            }
365        }
366
367        Ok(())
368    }
369
370    /// Handles a single incoming HTTP connection and serves current stats as JSON.
371    async fn handle_connection(&self, stream: TcpStream) {
372        let io = TokioIo::new(stream);
373        let stats = self.stats.clone();
374
375        task::spawn(async move {
376            let result = http1::Builder::new()
377                .timer(TokioTimer::new())
378                .serve_connection(
379                    io,
380                    service_fn(move |_: Request<_>| {
381                        let stats = stats.clone();
382
383                        async move {
384                            let snapshot = stats.read().await.snapshot(Utc::now()).await;
385
386                            let body = match serde_json::to_string(&snapshot) {
387                                Ok(body) => body,
388                                Err(err) => {
389                                    error!("failed to serialize statistics: {:?}", err);
390                                    return Ok::<_, Infallible>(
391                                        Response::builder()
392                                            .status(500)
393                                            .body(Full::default())
394                                            .unwrap(),
395                                    );
396                                }
397                            };
398
399                            Ok::<_, Infallible>(
400                                Response::builder()
401                                    .header("Content-Type", "application/json")
402                                    .body(Full::new(Bytes::from(body)))
403                                    .unwrap(),
404                            )
405                        }
406                    }),
407                )
408                .await;
409
410            if let Err(err) = result {
411                error!("failed to serve statistics request: {:?}", err);
412            }
413        });
414    }
415
416    /// Handles a received event from the broadcast channel.
417    ///
418    /// If the event is a `BloopProcessed`, the statistics are updated. Returns
419    /// `false` when the channel is closed, indicating the server should shut down.
420    async fn handle_recv(&self, result: Result<Event, RecvError>) -> bool {
421        let should_continue = match result {
422            Ok(Event::BloopProcessed(bloop)) => {
423                self.stats.write().await.record_bloop(bloop, &self.tz);
424                true
425            }
426            Ok(_) => true,
427            Err(RecvError::Lagged(n)) => {
428                warn!("StatisticsServer lagged by {n} messages, some bloops were missed");
429                true
430            }
431            Err(RecvError::Closed) => {
432                debug!("StatisticsServer event stream closed, exiting event loop");
433                false
434            }
435        };
436
437        #[cfg(test)]
438        self.test_notify_event_processed.notify_one();
439
440        should_continue
441    }
442}
443
444#[cfg(feature = "tokio-graceful-shutdown")]
445#[derive(Debug, Error)]
446pub enum NeverError {}
447
448#[cfg(feature = "tokio-graceful-shutdown")]
449impl IntoSubsystem<NeverError> for StatisticsServer {
450    async fn run(mut self, subsys: &mut SubsystemHandle) -> Result<(), NeverError> {
451        let _ = self.listen().cancel_on_shutdown(subsys).await;
452
453        Ok(())
454    }
455}
456
457#[derive(Debug, Error)]
458pub enum BuilderError {
459    #[error("missing field: {0}")]
460    MissingField(&'static str),
461
462    #[error(transparent)]
463    AddrParse(#[from] std::net::AddrParseError),
464}
465
466/// Builder for creating a [StatisticsServer] instance.
467#[derive(Debug, Default)]
468pub struct StatisticsServerBuilder {
469    address: Option<String>,
470    tz: Option<Tz>,
471    stats: Option<HashMap<String, ClientStats>>,
472    event_rx: Option<broadcast::Receiver<Event>>,
473}
474
475impl StatisticsServerBuilder {
476    /// Creates a new empty builder.
477    pub fn new() -> Self {
478        Self {
479            address: None,
480            tz: None,
481            stats: None,
482            event_rx: None,
483        }
484    }
485
486    /// Sets the listening address for the statistics server.
487    pub fn address(mut self, address: impl Into<String>) -> Self {
488        self.address = Some(address.into());
489        self
490    }
491
492    /// Sets the timezone to time specific statistics.
493    pub fn tz(mut self, tz: Tz) -> Self {
494        self.tz = Some(tz);
495        self
496    }
497
498    /// Sets the initial statistics.
499    ///
500    /// This is typically loaded from persistent storage (e.g., database) at startup.
501    pub fn stats(mut self, stats: HashMap<String, ClientStats>) -> Self {
502        self.stats = Some(stats);
503        self
504    }
505
506    /// Sets the event receiver the server listens to for tracking events.
507    pub fn event_rx(mut self, event_rx: broadcast::Receiver<Event>) -> Self {
508        self.event_rx = Some(event_rx);
509        self
510    }
511
512    /// Consumes the builder and produces a configured [StatisticsServer].
513    pub fn build(self) -> Result<StatisticsServer, BuilderError> {
514        let addr: SocketAddr = self
515            .address
516            .ok_or(BuilderError::MissingField("address"))?
517            .parse()?;
518
519        Ok(StatisticsServer {
520            addr,
521            tz: self.tz.unwrap_or(Tz::UTC),
522            stats: Arc::new(RwLock::new(StatsTracker::new(
523                self.stats.unwrap_or_default(),
524            ))),
525            event_rx: self
526                .event_rx
527                .ok_or(BuilderError::MissingField("event_rx"))?,
528            #[cfg(test)]
529            test_notify_event_processed: Arc::new(tokio::sync::Notify::new()),
530        })
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537    use chrono::{Duration, NaiveTime, TimeZone};
538    use chrono_tz::UTC;
539    use ntest::timeout;
540    use tokio::io::{AsyncReadExt, AsyncWriteExt};
541    use uuid::Uuid;
542
543    fn make_bloop(client_id: &str, recorded_at: DateTime<Utc>) -> ProcessedBloop {
544        ProcessedBloop {
545            client_id: client_id.to_string(),
546            player_id: Uuid::new_v4(),
547            recorded_at,
548        }
549    }
550
551    #[test]
552    fn minute_index_calculation() {
553        let time = NaiveTime::from_hms_opt(13, 45, 0).unwrap();
554        assert_eq!(minute_index(time), 13 * 60 + 45);
555
556        let time = NaiveTime::from_hms_opt(0, 0, 0).unwrap();
557        assert_eq!(minute_index(time), 0);
558    }
559
560    #[test]
561    fn record_bloop_basic() {
562        let tz = UTC;
563        let mut stats = ClientStats::new();
564        let now = Utc.with_ymd_and_hms(2025, 7, 5, 12, 30, 0).unwrap();
565
566        let bloop = make_bloop("client1", now);
567        stats.record_bloop(bloop, &tz);
568
569        assert_eq!(stats.total_bloops, 1);
570
571        let idx = minute_index(now.time());
572        assert_eq!(stats.per_minute_bloops[idx], 1);
573        assert_eq!(stats.per_minute_dates[idx], now.date_naive());
574
575        let hour = now.hour() as usize;
576        assert_eq!(stats.bloops_per_hour[hour], 1);
577        assert_eq!(stats.bloops_per_day[&now.date_naive()], 1);
578    }
579
580    #[test]
581    fn record_bloop_accumulation_and_wraparound() {
582        let tz = UTC;
583        let mut stats = ClientStats::new();
584        let now = Utc.with_ymd_and_hms(2025, 7, 5, 0, 1, 0).unwrap();
585
586        let bloop1 = make_bloop("client1", now);
587        stats.record_bloop(bloop1, &tz);
588        stats.record_bloop(make_bloop("client1", now), &tz);
589
590        let idx = minute_index(now.time());
591        assert_eq!(stats.per_minute_bloops[idx], 2);
592        assert_eq!(stats.per_minute_dates[idx], now.date_naive());
593
594        let yesterday = now - Duration::days(1);
595        let idx_wrap = minute_index(yesterday.time());
596        stats.record_bloop(make_bloop("client1", yesterday), &tz);
597
598        assert_eq!(stats.per_minute_dates[idx_wrap], yesterday.date_naive());
599        assert_eq!(stats.per_minute_bloops[idx_wrap], 1);
600    }
601
602    #[test]
603    fn count_last_minutes() {
604        let tz = UTC;
605        let mut stats = ClientStats::new();
606        let now = Utc.with_ymd_and_hms(2025, 7, 5, 10, 0, 0).unwrap();
607
608        stats.record_bloop(make_bloop("c", now), &tz);
609        stats.record_bloop(make_bloop("c", now - Duration::minutes(1)), &tz);
610        stats.record_bloop(make_bloop("c", now - Duration::minutes(2)), &tz);
611
612        let count = stats.count_last_minutes(now, 3);
613        assert_eq!(count, 3);
614
615        let count_short = stats.count_last_minutes(now, 1);
616        assert_eq!(count_short, 1);
617    }
618
619    #[test]
620    fn stats_tracker_snapshot() {
621        let tz = UTC;
622        let mut tracker = StatsTracker::default();
623
624        let now = Utc.with_ymd_and_hms(2025, 7, 5, 15, 0, 0).unwrap();
625        let bloop = make_bloop("client-a", now);
626        tracker.record_bloop(bloop, &tz);
627
628        let snapshot = tracker.compute_snapshot(now);
629
630        assert!(snapshot.clients.contains_key("client-a"));
631        let client_stats = &snapshot.clients["client-a"];
632
633        assert_eq!(client_stats.total_bloops, 1);
634        assert!(client_stats.bloops_last_hour >= 1);
635        assert!(client_stats.bloops_last_24_hours >= 1);
636        assert!(client_stats.bloops_per_hour.iter().any(|&x| x >= 1));
637        assert!(client_stats.bloops_per_day.contains_key(&now.date_naive()));
638
639        assert_eq!(snapshot.global.total_bloops, 1);
640    }
641
642    fn dummy_stats() -> HashMap<String, ClientStats> {
643        let mut map = HashMap::new();
644        map.insert("client1".to_string(), Default::default());
645        map
646    }
647
648    fn dummy_event_rx() -> broadcast::Receiver<Event> {
649        // Create a broadcast channel and take a receiver for testing
650        let (_tx, rx) = broadcast::channel(16);
651        rx
652    }
653
654    #[test]
655    fn build_succeeds_with_all_fields() {
656        let builder = StatisticsServerBuilder::new()
657            .address("127.0.0.1:12345")
658            .tz(chrono_tz::Europe::London)
659            .stats(dummy_stats())
660            .event_rx(dummy_event_rx());
661
662        let server = builder.build().unwrap();
663        assert_eq!(server.addr, "127.0.0.1:12345".parse().unwrap());
664        assert_eq!(server.tz, chrono_tz::Europe::London);
665    }
666
667    #[test]
668    fn build_fails_if_addr_missing() {
669        let builder = StatisticsServerBuilder::new()
670            .stats(dummy_stats())
671            .event_rx(dummy_event_rx());
672
673        let err = builder.build().unwrap_err();
674        assert!(matches!(err, BuilderError::MissingField(field) if field == "address"));
675    }
676    #[test]
677    fn build_fails_if_event_rx_missing() {
678        let builder = StatisticsServerBuilder::new()
679            .address("127.0.0.1:12345")
680            .stats(dummy_stats());
681
682        let err = builder.build().unwrap_err();
683        assert!(matches!(err, BuilderError::MissingField(field) if field == "event_rx"));
684    }
685
686    #[test]
687    fn build_defaults_tz_to_utc() {
688        let builder = StatisticsServerBuilder::new()
689            .address("127.0.0.1:12345")
690            .stats(dummy_stats())
691            .event_rx(dummy_event_rx());
692
693        let server = builder.build().unwrap();
694        assert_eq!(server.tz, UTC);
695    }
696
697    #[tokio::test]
698    #[timeout(1000)]
699    async fn server_handles_bloop_processed_event() {
700        let tz = Tz::UTC;
701        let stats_map = HashMap::<String, ClientStats>::new();
702
703        let (sender, event_rx) = broadcast::channel(16);
704
705        let mut server = StatisticsServerBuilder::new()
706            .address("127.0.0.1:12345")
707            .tz(tz)
708            .stats(stats_map)
709            .event_rx(event_rx)
710            .build()
711            .unwrap();
712        let notify = server.test_notify_event_processed.clone();
713        let stats = server.stats.clone();
714
715        tokio::spawn(async move {
716            let _ = server.listen().await;
717        });
718
719        let bloop = Event::BloopProcessed(make_bloop("client", Utc::now()));
720        sender.send(bloop).unwrap();
721        notify.notified().await;
722
723        let stats = stats.read().await;
724        let snapshot = stats.snapshot(Utc::now()).await;
725
726        assert_eq!(snapshot.clients["client"].total_bloops, 1);
727    }
728
729    #[tokio::test]
730    #[timeout(1000)]
731    async fn server_serves_stats_over_http() {
732        let (sender, event_rx) = broadcast::channel(16);
733
734        let local_addr = TcpListener::bind("127.0.0.1:0")
735            .await
736            .unwrap()
737            .local_addr()
738            .unwrap();
739
740        let mut server = StatisticsServerBuilder::new()
741            .address(local_addr.to_string())
742            .event_rx(event_rx)
743            .build()
744            .unwrap();
745        let notify = server.test_notify_event_processed.clone();
746
747        tokio::spawn(async move {
748            let _ = server.listen().await;
749        });
750
751        let bloop = Event::BloopProcessed(ProcessedBloop {
752            player_id: Uuid::new_v4(),
753            client_id: "client".to_string(),
754            recorded_at: Utc::now(),
755        });
756        sender.send(bloop).unwrap();
757        notify.notified().await;
758
759        let mut client = TcpStream::connect(local_addr).await.unwrap();
760        let request = b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n";
761        client.write_all(request).await.unwrap();
762
763        let mut buffer = vec![0; 1024];
764        let bytes_read = client.read(&mut buffer).await.unwrap();
765
766        let response = String::from_utf8_lossy(&buffer[..bytes_read]);
767        assert!(response.contains("200 OK"));
768        assert!(response.contains("application/json"));
769    }
770}