Skip to main content

bloop_server_framework/
statistics.rs

1use crate::bloop::ProcessedBloop;
2use crate::event::Event;
3use bytes::Bytes;
4use chrono::{DateTime, NaiveDate, Timelike, Utc};
5use chrono_tz::Tz;
6use http_body_util::Full;
7use hyper::server::conn::http1;
8use hyper::service::service_fn;
9use hyper::{Request, Response};
10use hyper_util::rt::{TokioIo, TokioTimer};
11use serde::Serialize;
12use std::collections::HashMap;
13use std::convert::Infallible;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use thiserror::Error;
17use tokio::net::{TcpListener, TcpStream};
18use tokio::sync::broadcast::error::RecvError;
19use tokio::sync::{RwLock, broadcast};
20use tokio::{io, select, task};
21#[cfg(feature = "tokio-graceful-shutdown")]
22use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
23use tracing::{debug, error, warn};
24
25const MINUTES_IN_DAY: usize = 60 * 24;
26
27/// Stats collected per client to track "bloops" over time.
28///
29/// This structure holds aggregated counters for the last 24 hours and beyond,
30/// to efficiently support queries like "how many bloops in the last hour" or
31/// "bloops per day". It is designed to be seeded from a database on startup.
32///
33/// # Seeding from PostgreSQL
34///
35/// To efficiently load these stats from a Postgres database with potentially
36/// thousands of bloops per client, the following SQL queries are recommended.
37///
38/// ## Recommended index:
39///
40/// ```sql
41/// CREATE INDEX idx_bloops_client_recorded_at ON bloops (
42///     client_id,
43///     recorded_at
44/// );
45/// ```
46///
47/// This index speeds up queries filtering by client and time range.
48///
49/// ## Total bloops per client:
50///
51/// ```sql
52/// SELECT client_id, COUNT(*) as total_bloops
53/// FROM bloops
54/// GROUP BY client_id;
55/// ```
56///
57/// ## Bloops per minute for the last 24 hours (UTC):
58///
59/// ```sql
60/// SELECT client_id,
61///        DATE_TRUNC(
62///            'minute',
63///            recorded_at AT TIME ZONE 'UTC'
64///        ) AS minute_bucket,
65///        COUNT(*) AS count
66/// FROM bloops
67/// WHERE recorded_at >= NOW() - INTERVAL '24 hours'
68/// GROUP BY client_id, minute_bucket;
69/// ```
70///
71/// - `minute_bucket` contains the UTC timestamp truncated to the minute.
72/// - You can map `minute_bucket` to your in-memory bucket index like this:
73///
74/// ### Mapping to In-Memory Buckets:
75///
76/// You can map the `minute_bucket` to your in-memory buckets like this:
77///
78/// ```no_run
79/// use chrono::{NaiveDateTime, Timelike};
80/// use bloop_server_framework::statistics::{minute_index, ClientStats};
81/// use bloop_server_framework::test_utils::Utc;
82///
83/// // Load these from your database
84/// let minute_bucket = Utc::now().naive_utc();
85/// let count = 0;
86///
87/// let mut client_stats = ClientStats::new();
88/// let idx = minute_index(minute_bucket);
89/// let date = minute_bucket.date();
90///
91/// client_stats.per_minute_dates[idx] = date;
92/// client_stats.per_minute_bloops[idx] = count as u8;
93/// ```
94///
95/// ## Bloops per hour for the last 24 hours (in local time):
96///
97/// ```sql
98/// SELECT client_id,
99///        EXTRACT(hour FROM recorded_at AT TIME ZONE 'your_timezone') AS hour,
100///        COUNT(*) AS count
101/// FROM bloops
102/// WHERE recorded_at >= NOW() - INTERVAL '24 hours'
103/// GROUP BY client_id, hour;
104/// ```
105///
106/// ## Bloops per day (in local time):
107///
108/// ```sql
109/// SELECT client_id,
110///        DATE(recorded_at AT TIME ZONE 'your_timezone') AS day,
111///        COUNT(*) AS count
112/// FROM bloops
113/// GROUP BY client_id, day;
114/// ```
115///
116/// After executing these queries, aggregate the counts per client into the
117/// fields of this struct.
118///
119/// # Example usage:
120///
121/// ```no_run
122/// use std::collections::HashMap;
123/// use bloop_server_framework::statistics::ClientStats;
124///
125/// let mut stats_map: HashMap<String, ClientStats> = HashMap::new();
126///
127/// // For each row of minute-buckets query:
128/// // 1. Parse `minute_bucket` into NaiveDate and index.
129/// // 2. Insert count into `per_minute_bloops` and `per_minute_dates`.
130///
131/// // Similarly for other queries...
132/// ```
133#[derive(Debug)]
134pub struct ClientStats {
135    /// Total bloops observed for this client.
136    pub total_bloops: u64,
137
138    /// Count of bloops per minute in a rolling 24-hour buffer.
139    ///
140    /// Indexed by minute of day (0..1439).
141    pub per_minute_bloops: [u8; MINUTES_IN_DAY],
142
143    /// The date associated with each minute bucket, used to invalidate stale data.
144    pub per_minute_dates: [NaiveDate; MINUTES_IN_DAY],
145
146    /// Count of bloops per hour in the local timezone.
147    pub bloops_per_hour: [u32; 24],
148
149    /// Count of bloops per day, keyed by date in local timezone.
150    pub bloops_per_day: HashMap<NaiveDate, u32>,
151}
152
153impl Default for ClientStats {
154    fn default() -> Self {
155        Self::new()
156    }
157}
158
159impl ClientStats {
160    /// Creates a new empty statistics struct.
161    pub fn new() -> Self {
162        Self {
163            total_bloops: 0,
164            per_minute_bloops: [0; MINUTES_IN_DAY],
165            per_minute_dates: [NaiveDate::MIN; MINUTES_IN_DAY],
166            bloops_per_hour: [0; 24],
167            bloops_per_day: HashMap::new(),
168        }
169    }
170
171    fn record_bloop(&mut self, bloop: ProcessedBloop, tz: &Tz) {
172        let utc_date = bloop.recorded_at.date_naive();
173        let time = bloop.recorded_at.time();
174        let idx = (time.hour() * 60 + time.minute()) as usize;
175
176        if self.per_minute_dates[idx] != utc_date {
177            self.per_minute_dates[idx] = utc_date;
178            self.per_minute_bloops[idx] = 1;
179        } else {
180            self.per_minute_bloops[idx] = self.per_minute_bloops[idx].saturating_add(1);
181        }
182
183        let local_dt = bloop.recorded_at.with_timezone(tz);
184        let local_hour = local_dt.hour() as usize;
185        let local_date = local_dt.date_naive();
186
187        self.total_bloops += 1;
188        self.bloops_per_hour[local_hour] += 1;
189        *self.bloops_per_day.entry(local_date).or_insert(0) += 1;
190    }
191
192    fn count_last_minutes(&self, now: DateTime<Utc>, minutes: usize) -> u32 {
193        let now_minute = (now.hour() * 60 + now.minute()) as usize;
194        let mut total = 0;
195
196        for i in 0..minutes {
197            let idx = (now_minute + MINUTES_IN_DAY - i) % MINUTES_IN_DAY;
198            let expected_date = (now - chrono::Duration::minutes(i as i64)).date_naive();
199
200            if self.per_minute_dates[idx] == expected_date {
201                total += self.per_minute_bloops[idx] as u32;
202            }
203        }
204
205        total
206    }
207}
208
209/// Calculates the minute index within a day (0..1439) for a given `NaiveTime`.
210///
211/// This is used to map events into per-minute buckets.
212///
213/// # Examples
214///
215/// ```
216/// use chrono::NaiveTime;
217/// use bloop_server_framework::statistics::minute_index;
218///
219/// let time = NaiveTime::from_hms_opt(13, 45, 0).unwrap();
220/// let idx = minute_index(time);
221///
222/// assert_eq!(idx, 13 * 60 + 45);
223/// ```
224#[inline]
225pub fn minute_index(time: impl Timelike) -> usize {
226    time.hour() as usize * 60 + time.minute() as usize
227}
228
229#[derive(Debug, Clone, Serialize)]
230#[serde(rename_all = "camelCase")]
231struct StatsSummary {
232    pub total_bloops: u64,
233    pub bloops_last_hour: u32,
234    pub bloops_last_24_hours: u32,
235    pub bloops_per_hour: [u32; 24],
236    pub bloops_per_day: HashMap<NaiveDate, u32>,
237}
238
239#[derive(Debug, Clone, Serialize)]
240#[serde(rename_all = "camelCase")]
241struct StatsSnapshot {
242    pub created_at: DateTime<Utc>,
243    pub clients: HashMap<String, StatsSummary>,
244    pub global: StatsSummary,
245}
246
247#[derive(Debug, Default)]
248pub struct StatsTracker {
249    clients: HashMap<String, ClientStats>,
250    cached_snapshot: RwLock<Option<StatsSnapshot>>,
251}
252
253impl From<HashMap<String, ClientStats>> for StatsTracker {
254    fn from(clients: HashMap<String, ClientStats>) -> Self {
255        Self::new(clients)
256    }
257}
258
259impl StatsTracker {
260    pub fn new(clients: HashMap<String, ClientStats>) -> Self {
261        Self {
262            clients,
263            cached_snapshot: RwLock::new(None),
264        }
265    }
266
267    fn record_bloop(&mut self, bloop: ProcessedBloop, tz: &Tz) {
268        let client = self.clients.entry(bloop.client_id.clone()).or_default();
269
270        client.record_bloop(bloop, tz);
271    }
272
273    async fn snapshot(&self, now: DateTime<Utc>) -> StatsSnapshot {
274        if let Some(snapshot) = self.cached_snapshot.read().await.as_ref()
275            && snapshot.created_at > now - chrono::Duration::minutes(1)
276        {
277            return snapshot.clone();
278        }
279
280        let snapshot = self.compute_snapshot(now);
281        self.cached_snapshot.write().await.replace(snapshot.clone());
282
283        snapshot
284    }
285
286    fn compute_snapshot(&self, now: DateTime<Utc>) -> StatsSnapshot {
287        let mut clients_snapshots = HashMap::new();
288        let mut global = StatsSummary {
289            total_bloops: 0,
290            bloops_last_hour: 0,
291            bloops_last_24_hours: 0,
292            bloops_per_hour: [0; 24],
293            bloops_per_day: HashMap::new(),
294        };
295
296        for (client_id, client) in &self.clients {
297            let bloops_last_hour = client.count_last_minutes(now, 60);
298            let bloops_last_24_hours = client.count_last_minutes(now, MINUTES_IN_DAY);
299
300            let snapshot = StatsSummary {
301                total_bloops: client.total_bloops,
302                bloops_last_hour,
303                bloops_last_24_hours,
304                bloops_per_hour: client.bloops_per_hour,
305                bloops_per_day: client.bloops_per_day.clone(),
306            };
307
308            global.total_bloops += snapshot.total_bloops;
309            global.bloops_last_hour += snapshot.bloops_last_hour;
310            global.bloops_last_24_hours += snapshot.bloops_last_24_hours;
311
312            for hour in 0..24 {
313                global.bloops_per_hour[hour] += snapshot.bloops_per_hour[hour];
314            }
315
316            for (day, count) in snapshot.bloops_per_day.iter() {
317                *global.bloops_per_day.entry(*day).or_insert(0) += count;
318            }
319
320            clients_snapshots.insert(client_id.clone(), snapshot);
321        }
322
323        StatsSnapshot {
324            created_at: now,
325            clients: clients_snapshots,
326            global,
327        }
328    }
329}
330
331/// A background service that collects statistics and exposes them over HTTP.
332#[derive(Debug)]
333pub struct StatisticsServer {
334    addr: SocketAddr,
335    stats: Arc<RwLock<StatsTracker>>,
336    event_rx: broadcast::Receiver<Event>,
337    tz: Tz,
338    #[cfg(test)]
339    pub test_notify_event_processed: Arc<tokio::sync::Notify>,
340}
341
342impl StatisticsServer {
343    /// Starts the statistics server and begins listening for events and HTTP requests.
344    ///
345    /// This runs the main event loop until the broadcast channel is closed.
346    pub async fn listen(&mut self) -> Result<(), io::Error> {
347        let listener = TcpListener::bind(self.addr).await?;
348
349        loop {
350            let should_continue = select! {
351                conn = listener.accept() => {
352                    if let Ok((stream, _)) = conn {
353                        self.handle_connection(stream).await;
354                    }
355
356                    true
357                }
358
359                result = self.event_rx.recv() => {
360                    self.handle_recv(result).await
361                }
362            };
363
364            if !should_continue {
365                break;
366            }
367        }
368
369        Ok(())
370    }
371
372    /// Handles a single incoming HTTP connection and serves current stats as JSON.
373    async fn handle_connection(&self, stream: TcpStream) {
374        let io = TokioIo::new(stream);
375        let stats = self.stats.clone();
376
377        task::spawn(async move {
378            let result = http1::Builder::new()
379                .timer(TokioTimer::new())
380                .serve_connection(
381                    io,
382                    service_fn(move |_: Request<_>| {
383                        let stats = stats.clone();
384
385                        async move {
386                            let snapshot = stats.read().await.snapshot(Utc::now()).await;
387
388                            let body = match serde_json::to_string(&snapshot) {
389                                Ok(body) => body,
390                                Err(err) => {
391                                    error!("failed to serialize statistics: {:?}", err);
392                                    return Ok::<_, Infallible>(
393                                        Response::builder()
394                                            .status(500)
395                                            .body(Full::default())
396                                            .unwrap(),
397                                    );
398                                }
399                            };
400
401                            Ok::<_, Infallible>(
402                                Response::builder()
403                                    .header("Content-Type", "application/json")
404                                    .body(Full::new(Bytes::from(body)))
405                                    .unwrap(),
406                            )
407                        }
408                    }),
409                )
410                .await;
411
412            if let Err(err) = result {
413                error!("failed to serve statistics request: {:?}", err);
414            }
415        });
416    }
417
418    /// Handles a received event from the broadcast channel.
419    ///
420    /// If the event is a `BloopProcessed`, the statistics are updated. Returns
421    /// `false` when the channel is closed, indicating the server should shut down.
422    async fn handle_recv(&self, result: Result<Event, RecvError>) -> bool {
423        let should_continue = match result {
424            Ok(Event::BloopProcessed(bloop)) => {
425                self.stats.write().await.record_bloop(bloop, &self.tz);
426                true
427            }
428            Ok(_) => true,
429            Err(RecvError::Lagged(n)) => {
430                warn!("StatisticsServer lagged by {n} messages, some bloops were missed");
431                true
432            }
433            Err(RecvError::Closed) => {
434                debug!("StatisticsServer event stream closed, exiting event loop");
435                false
436            }
437        };
438
439        #[cfg(test)]
440        self.test_notify_event_processed.notify_one();
441
442        should_continue
443    }
444}
445
446#[cfg(feature = "tokio-graceful-shutdown")]
447#[derive(Debug, Error)]
448pub enum NeverError {}
449
450#[cfg(feature = "tokio-graceful-shutdown")]
451impl IntoSubsystem<NeverError> for StatisticsServer {
452    async fn run(mut self, subsys: &mut SubsystemHandle) -> Result<(), NeverError> {
453        let _ = self.listen().cancel_on_shutdown(subsys).await;
454
455        Ok(())
456    }
457}
458
459#[derive(Debug, Error)]
460pub enum BuilderError {
461    #[error("missing field: {0}")]
462    MissingField(&'static str),
463
464    #[error(transparent)]
465    AddrParse(#[from] std::net::AddrParseError),
466}
467
468/// Builder for creating a [StatisticsServer] instance.
469#[derive(Debug, Default)]
470pub struct StatisticsServerBuilder {
471    address: Option<String>,
472    tz: Option<Tz>,
473    stats: Option<HashMap<String, ClientStats>>,
474    event_rx: Option<broadcast::Receiver<Event>>,
475}
476
477impl StatisticsServerBuilder {
478    /// Creates a new empty builder.
479    pub fn new() -> Self {
480        Self {
481            address: None,
482            tz: None,
483            stats: None,
484            event_rx: None,
485        }
486    }
487
488    /// Sets the listening address for the statistics server.
489    pub fn address(mut self, address: impl Into<String>) -> Self {
490        self.address = Some(address.into());
491        self
492    }
493
494    /// Sets the timezone to time specific statistics.
495    pub fn tz(mut self, tz: Tz) -> Self {
496        self.tz = Some(tz);
497        self
498    }
499
500    /// Sets the initial statistics.
501    ///
502    /// This is typically loaded from persistent storage (e.g., database) at startup.
503    pub fn stats(mut self, stats: HashMap<String, ClientStats>) -> Self {
504        self.stats = Some(stats);
505        self
506    }
507
508    /// Sets the event receiver the server listens to for tracking events.
509    pub fn event_rx(mut self, event_rx: broadcast::Receiver<Event>) -> Self {
510        self.event_rx = Some(event_rx);
511        self
512    }
513
514    /// Consumes the builder and produces a configured [StatisticsServer].
515    pub fn build(self) -> Result<StatisticsServer, BuilderError> {
516        let addr: SocketAddr = self
517            .address
518            .ok_or(BuilderError::MissingField("address"))?
519            .parse()?;
520
521        Ok(StatisticsServer {
522            addr,
523            tz: self.tz.unwrap_or(Tz::UTC),
524            stats: Arc::new(RwLock::new(StatsTracker::new(
525                self.stats.unwrap_or_default(),
526            ))),
527            event_rx: self
528                .event_rx
529                .ok_or(BuilderError::MissingField("event_rx"))?,
530            #[cfg(test)]
531            test_notify_event_processed: Arc::new(tokio::sync::Notify::new()),
532        })
533    }
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539    use chrono::{Duration, NaiveTime, TimeZone};
540    use chrono_tz::UTC;
541    use ntest::timeout;
542    use tokio::io::{AsyncReadExt, AsyncWriteExt};
543    use uuid::Uuid;
544
545    fn make_bloop(client_id: &str, recorded_at: DateTime<Utc>) -> ProcessedBloop {
546        ProcessedBloop {
547            client_id: client_id.to_string(),
548            player_id: Uuid::new_v4(),
549            recorded_at,
550        }
551    }
552
553    #[test]
554    fn minute_index_calculation() {
555        let time = NaiveTime::from_hms_opt(13, 45, 0).unwrap();
556        assert_eq!(minute_index(time), 13 * 60 + 45);
557
558        let time = NaiveTime::from_hms_opt(0, 0, 0).unwrap();
559        assert_eq!(minute_index(time), 0);
560    }
561
562    #[test]
563    fn record_bloop_basic() {
564        let tz = UTC;
565        let mut stats = ClientStats::new();
566        let now = Utc.with_ymd_and_hms(2025, 7, 5, 12, 30, 0).unwrap();
567
568        let bloop = make_bloop("client1", now);
569        stats.record_bloop(bloop, &tz);
570
571        assert_eq!(stats.total_bloops, 1);
572
573        let idx = minute_index(now.time());
574        assert_eq!(stats.per_minute_bloops[idx], 1);
575        assert_eq!(stats.per_minute_dates[idx], now.date_naive());
576
577        let hour = now.hour() as usize;
578        assert_eq!(stats.bloops_per_hour[hour], 1);
579        assert_eq!(stats.bloops_per_day[&now.date_naive()], 1);
580    }
581
582    #[test]
583    fn record_bloop_accumulation_and_wraparound() {
584        let tz = UTC;
585        let mut stats = ClientStats::new();
586        let now = Utc.with_ymd_and_hms(2025, 7, 5, 0, 1, 0).unwrap();
587
588        let bloop1 = make_bloop("client1", now);
589        stats.record_bloop(bloop1, &tz);
590        stats.record_bloop(make_bloop("client1", now), &tz);
591
592        let idx = minute_index(now.time());
593        assert_eq!(stats.per_minute_bloops[idx], 2);
594        assert_eq!(stats.per_minute_dates[idx], now.date_naive());
595
596        let yesterday = now - Duration::days(1);
597        let idx_wrap = minute_index(yesterday.time());
598        stats.record_bloop(make_bloop("client1", yesterday), &tz);
599
600        assert_eq!(stats.per_minute_dates[idx_wrap], yesterday.date_naive());
601        assert_eq!(stats.per_minute_bloops[idx_wrap], 1);
602    }
603
604    #[test]
605    fn count_last_minutes() {
606        let tz = UTC;
607        let mut stats = ClientStats::new();
608        let now = Utc.with_ymd_and_hms(2025, 7, 5, 10, 0, 0).unwrap();
609
610        stats.record_bloop(make_bloop("c", now), &tz);
611        stats.record_bloop(make_bloop("c", now - Duration::minutes(1)), &tz);
612        stats.record_bloop(make_bloop("c", now - Duration::minutes(2)), &tz);
613
614        let count = stats.count_last_minutes(now, 3);
615        assert_eq!(count, 3);
616
617        let count_short = stats.count_last_minutes(now, 1);
618        assert_eq!(count_short, 1);
619    }
620
621    #[test]
622    fn stats_tracker_snapshot() {
623        let tz = UTC;
624        let mut tracker = StatsTracker::default();
625
626        let now = Utc.with_ymd_and_hms(2025, 7, 5, 15, 0, 0).unwrap();
627        let bloop = make_bloop("client-a", now);
628        tracker.record_bloop(bloop, &tz);
629
630        let snapshot = tracker.compute_snapshot(now);
631
632        assert!(snapshot.clients.contains_key("client-a"));
633        let client_stats = &snapshot.clients["client-a"];
634
635        assert_eq!(client_stats.total_bloops, 1);
636        assert!(client_stats.bloops_last_hour >= 1);
637        assert!(client_stats.bloops_last_24_hours >= 1);
638        assert!(client_stats.bloops_per_hour.iter().any(|&x| x >= 1));
639        assert!(client_stats.bloops_per_day.contains_key(&now.date_naive()));
640
641        assert_eq!(snapshot.global.total_bloops, 1);
642    }
643
644    #[test]
645    fn count_last_minutes_across_midnight() {
646        // now_minute = 1 (00:01 UTC), so counting back 3 minutes wraps across midnight
647        let tz = UTC;
648        let mut stats = ClientStats::new();
649        let now = Utc.with_ymd_and_hms(2025, 7, 6, 0, 1, 0).unwrap();
650
651        // 00:01 today
652        stats.record_bloop(make_bloop("c", now), &tz);
653        // 00:00 today
654        stats.record_bloop(make_bloop("c", now - Duration::minutes(1)), &tz);
655        // 23:59 yesterday
656        stats.record_bloop(make_bloop("c", now - Duration::minutes(2)), &tz);
657
658        let count = stats.count_last_minutes(now, 3);
659        assert_eq!(
660            count, 3,
661            "should count all 3 minutes even when wrapping past midnight"
662        );
663    }
664
665    #[test]
666    fn record_bloop_uses_local_timezone_for_bloops_per_day() {
667        // Use a timezone that is ahead of UTC (e.g. UTC+2) so that a bloop
668        // recorded at 23:30 UTC on day D is actually local day D+1.
669        let tz = chrono_tz::Europe::Helsinki; // UTC+2 / UTC+3
670        let mut stats = ClientStats::new();
671
672        // 2025-07-05 23:30 UTC == 2025-07-06 01:30 Helsinki (EEST, UTC+3)
673        let recorded_at = Utc.with_ymd_and_hms(2025, 7, 5, 23, 30, 0).unwrap();
674        let bloop = make_bloop("client1", recorded_at);
675        stats.record_bloop(bloop, &tz);
676
677        let local_date = recorded_at.with_timezone(&tz).date_naive();
678        let utc_date = recorded_at.date_naive();
679
680        // The local date must differ from the UTC date for this test to be meaningful
681        assert_ne!(local_date, utc_date);
682        // bloops_per_day must be keyed by local date, not UTC date
683        assert_eq!(stats.bloops_per_day.get(&local_date), Some(&1));
684        assert_eq!(stats.bloops_per_day.get(&utc_date), None);
685    }
686
687    fn dummy_stats() -> HashMap<String, ClientStats> {
688        let mut map = HashMap::new();
689        map.insert("client1".to_string(), Default::default());
690        map
691    }
692
693    fn dummy_event_rx() -> broadcast::Receiver<Event> {
694        // Create a broadcast channel and take a receiver for testing
695        let (_tx, rx) = broadcast::channel(16);
696        rx
697    }
698
699    #[test]
700    fn build_succeeds_with_all_fields() {
701        let builder = StatisticsServerBuilder::new()
702            .address("127.0.0.1:12345")
703            .tz(chrono_tz::Europe::London)
704            .stats(dummy_stats())
705            .event_rx(dummy_event_rx());
706
707        let server = builder.build().unwrap();
708        assert_eq!(server.addr, "127.0.0.1:12345".parse().unwrap());
709        assert_eq!(server.tz, chrono_tz::Europe::London);
710    }
711
712    #[test]
713    fn build_fails_if_addr_missing() {
714        let builder = StatisticsServerBuilder::new()
715            .stats(dummy_stats())
716            .event_rx(dummy_event_rx());
717
718        let err = builder.build().unwrap_err();
719        assert!(matches!(err, BuilderError::MissingField(field) if field == "address"));
720    }
721    #[test]
722    fn build_fails_if_event_rx_missing() {
723        let builder = StatisticsServerBuilder::new()
724            .address("127.0.0.1:12345")
725            .stats(dummy_stats());
726
727        let err = builder.build().unwrap_err();
728        assert!(matches!(err, BuilderError::MissingField(field) if field == "event_rx"));
729    }
730
731    #[test]
732    fn build_defaults_tz_to_utc() {
733        let builder = StatisticsServerBuilder::new()
734            .address("127.0.0.1:12345")
735            .stats(dummy_stats())
736            .event_rx(dummy_event_rx());
737
738        let server = builder.build().unwrap();
739        assert_eq!(server.tz, UTC);
740    }
741
742    #[tokio::test]
743    #[timeout(1000)]
744    async fn server_handles_bloop_processed_event() {
745        let tz = Tz::UTC;
746        let stats_map = HashMap::<String, ClientStats>::new();
747
748        let (sender, event_rx) = broadcast::channel(16);
749
750        let mut server = StatisticsServerBuilder::new()
751            .address("127.0.0.1:12345")
752            .tz(tz)
753            .stats(stats_map)
754            .event_rx(event_rx)
755            .build()
756            .unwrap();
757        let notify = server.test_notify_event_processed.clone();
758        let stats = server.stats.clone();
759
760        tokio::spawn(async move {
761            let _ = server.listen().await;
762        });
763
764        let bloop = Event::BloopProcessed(make_bloop("client", Utc::now()));
765        sender.send(bloop).unwrap();
766        notify.notified().await;
767
768        let stats = stats.read().await;
769        let snapshot = stats.snapshot(Utc::now()).await;
770
771        assert_eq!(snapshot.clients["client"].total_bloops, 1);
772    }
773
774    #[tokio::test]
775    #[timeout(1000)]
776    async fn server_serves_stats_over_http() {
777        let (sender, event_rx) = broadcast::channel(16);
778
779        let local_addr = TcpListener::bind("127.0.0.1:0")
780            .await
781            .unwrap()
782            .local_addr()
783            .unwrap();
784
785        let mut server = StatisticsServerBuilder::new()
786            .address(local_addr.to_string())
787            .event_rx(event_rx)
788            .build()
789            .unwrap();
790        let notify = server.test_notify_event_processed.clone();
791
792        tokio::spawn(async move {
793            let _ = server.listen().await;
794        });
795
796        let bloop = Event::BloopProcessed(ProcessedBloop {
797            player_id: Uuid::new_v4(),
798            client_id: "client".to_string(),
799            recorded_at: Utc::now(),
800        });
801        sender.send(bloop).unwrap();
802        notify.notified().await;
803
804        let mut client = TcpStream::connect(local_addr).await.unwrap();
805        let request = b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n";
806        client.write_all(request).await.unwrap();
807
808        let mut buffer = vec![0; 1024];
809        let bytes_read = client.read(&mut buffer).await.unwrap();
810
811        let response = String::from_utf8_lossy(&buffer[..bytes_read]);
812        assert!(response.contains("200 OK"));
813        assert!(response.contains("application/json"));
814    }
815}