Skip to main content

bloop_server_framework/
bloop.rs

1//! Module for handling NFC scan events ("bloops").
2//!
3//! This module defines:
4//!
5//! - [`Bloop`]: Represents a single NFC scan event tied to a player and client.
6//! - [`BloopCollection`]: A time-limited collection of bloops with pruning of
7//!   old entries.
8//! - [`BloopProvider`]: Provides access to bloops globally and per client.
9//! - [`ProcessedBloop`]: A simplified representation for persistence.
10//! - [`BloopRepository`]: Async trait abstraction for persisting processed
11//!   bloops.
12//! - [`ProcessedBloopSink`]: Buffers and persists bloops in batches driven by
13//!   event notifications.
14//!
15//! The system enables tracking and managing bloops efficiently with
16//! asynchronous persistence. Features optional integration with graceful
17//! shutdown via the `tokio-graceful-shutdown` crate.
18
19use crate::event::Event;
20use crate::player::PlayerInfo;
21use chrono::{DateTime, Utc};
22use std::cmp::Reverse;
23use std::collections::{HashMap, VecDeque, vec_deque};
24use std::sync::{Arc, RwLock, RwLockReadGuard};
25use std::time::Duration;
26use thiserror::Error;
27use tokio::sync::broadcast;
28use tokio::sync::broadcast::error::RecvError;
29use tokio::{select, time};
30#[cfg(feature = "tokio-graceful-shutdown")]
31use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
32use tracing::{debug, error, warn};
33use uuid::Uuid;
34
35/// Represents a single NFC tag scan ("bloop") event.
36///
37/// Each `Bloop` captures the details of an NFC scan performed by a player on a
38/// specific client device, including when the scan was recorded.
39#[derive(Debug, Clone)]
40pub struct Bloop<Player> {
41    /// The player who blooped,
42    player: Arc<RwLock<Player>>,
43
44    /// Player ID extracted from the assigned player.
45    ///
46    /// The player ID is accessible individually as it is immutable and thus does not require a read
47    /// lock on the player to be obtained.
48    pub player_id: Uuid,
49
50    /// The client ID this bloop occurred at.
51    pub client_id: String,
52
53    /// The time this bloop was recorded,
54    pub recorded_at: DateTime<Utc>,
55}
56
57impl<Player: PlayerInfo> Bloop<Player> {
58    /// Creates a new `Bloop` instance, extracting the player ID from the provided
59    /// player.
60    pub fn new(
61        player: Arc<RwLock<Player>>,
62        client_id: impl Into<String>,
63        recorded_at: DateTime<Utc>,
64    ) -> Self {
65        let player_id = player.read().unwrap().id();
66
67        Bloop {
68            player_id,
69            player,
70            client_id: client_id.into(),
71            recorded_at,
72        }
73    }
74}
75
76impl<Player> Bloop<Player> {
77    /// Returns a read lock guard to the player associated with this bloop.
78    ///
79    /// If only the player ID is needed, use the `player_id` field directly.
80    /// For multiple accesses, cache the guard to minimize locking overhead.
81    pub fn player(&self) -> RwLockReadGuard<'_, Player> {
82        self.player.read().unwrap()
83    }
84}
85
86/// Returns a closure to filter bloops recorded at or after a given timestamp.
87#[inline]
88pub fn bloops_since<Player>(since: DateTime<Utc>) -> impl Fn(&&Arc<Bloop<Player>>) -> bool {
89    move |bloop| bloop.recorded_at >= since
90}
91
92/// Returns a closure to filter bloops belonging to a specific player.
93#[inline]
94pub fn bloops_for_player<Player>(player_id: Uuid) -> impl Fn(&&Arc<Bloop<Player>>) -> bool {
95    move |bloop| bloop.player_id == player_id
96}
97
98/// A collection of bloops with a maximum age threshold.
99///
100/// Automatically prunes bloops which are older than the specified maximum age.
101#[derive(Debug)]
102pub struct BloopCollection<Player> {
103    bloops: VecDeque<Arc<Bloop<Player>>>,
104    max_age: Duration,
105}
106
107impl<Player> BloopCollection<Player> {
108    /// Creates an empty collection with the specified maximum age.
109    pub fn new(max_age: Duration) -> Self {
110        Self {
111            bloops: VecDeque::new(),
112            max_age,
113        }
114    }
115
116    /// Creates a collection pre-populated with bloops, sorted by most recent.
117    pub fn with_bloops(max_age: Duration, mut bloops: Vec<Arc<Bloop<Player>>>) -> Self {
118        bloops.sort_by_key(|bloop| Reverse(bloop.recorded_at));
119
120        let mut collection = Self::new(max_age);
121        collection.bloops.extend(bloops);
122
123        collection
124    }
125
126    /// Adds a bloop to the collection, pruning old bloops exceeding the max age.
127    pub fn add(&mut self, bloop: Arc<Bloop<Player>>) {
128        let threshold = Utc::now() - self.max_age;
129
130        while let Some(oldest) = self.bloops.back() {
131            if oldest.recorded_at < threshold {
132                self.bloops.pop_back();
133            } else {
134                break;
135            }
136        }
137
138        self.bloops.push_front(bloop);
139    }
140
141    /// Returns an iterator over all bloops in the collection.
142    pub fn iter(&self) -> impl Iterator<Item = &Arc<Bloop<Player>>> {
143        self.bloops.iter()
144    }
145}
146
147impl<'a, Player: 'a> IntoIterator for &'a BloopCollection<Player> {
148    type Item = &'a Arc<Bloop<Player>>;
149    type IntoIter = vec_deque::Iter<'a, Arc<Bloop<Player>>>;
150
151    fn into_iter(self) -> Self::IntoIter {
152        self.bloops.iter()
153    }
154}
155
156/// Provides access to bloops globally and per client, respecting a maximum age.
157#[derive(Debug)]
158pub struct BloopProvider<Player> {
159    global: BloopCollection<Player>,
160    per_client: HashMap<String, BloopCollection<Player>>,
161    max_age: Duration,
162    empty_collection: BloopCollection<Player>,
163}
164
165impl<Player> BloopProvider<Player> {
166    /// Creates a new provider with no bloops.
167    pub fn new(max_age: Duration) -> Self {
168        Self {
169            global: BloopCollection::new(max_age),
170            per_client: HashMap::new(),
171            max_age,
172            empty_collection: BloopCollection::new(max_age),
173        }
174    }
175
176    /// Creates a provider initialized with a set of bloops.
177    pub fn with_bloops(max_age: Duration, bloops: Vec<Bloop<Player>>) -> Self {
178        let bloops: Vec<Arc<Bloop<Player>>> = bloops.into_iter().map(Arc::new).collect();
179        let global_collection = BloopCollection::with_bloops(max_age, bloops.clone());
180
181        let mut per_client: HashMap<String, BloopCollection<Player>> = HashMap::new();
182        let mut client_groups: HashMap<String, Vec<Arc<Bloop<Player>>>> = HashMap::new();
183
184        for bloop in bloops {
185            client_groups
186                .entry(bloop.client_id.clone())
187                .or_default()
188                .push(bloop.clone());
189        }
190
191        for (client_id, client_bloops) in client_groups {
192            let collection = BloopCollection::with_bloops(max_age, client_bloops);
193            per_client.insert(client_id, collection);
194        }
195
196        Self {
197            global: global_collection,
198            per_client,
199            max_age,
200            empty_collection: BloopCollection::new(max_age),
201        }
202    }
203
204    /// Adds a bloop to the global and client-specific collections.
205    pub fn add(&mut self, bloop: Arc<Bloop<Player>>) {
206        self.global.add(bloop.clone());
207
208        let client_collection = self
209            .per_client
210            .entry(bloop.client_id.clone())
211            .or_insert_with(|| BloopCollection::new(self.max_age));
212
213        client_collection.add(bloop.clone());
214    }
215
216    /// Returns a reference to the global collection of bloops.
217    pub fn global(&self) -> &BloopCollection<Player> {
218        &self.global
219    }
220
221    /// Returns a reference to the collection of bloops for a specific client.
222    pub fn for_client(&self, client_id: &str) -> &BloopCollection<Player> {
223        self.per_client
224            .get(client_id)
225            .unwrap_or(&self.empty_collection)
226    }
227}
228
229/// A simplified representation of a bloop suitable for persistence.
230#[derive(Debug, Clone)]
231pub struct ProcessedBloop {
232    /// Player ID associated with the bloop.
233    pub player_id: Uuid,
234
235    /// The client ID this bloop occurred at.
236    pub client_id: String,
237
238    /// The time this bloop was recorded.
239    pub recorded_at: DateTime<Utc>,
240}
241
242impl<Player> From<&Bloop<Player>> for ProcessedBloop {
243    fn from(bloop: &Bloop<Player>) -> Self {
244        Self {
245            player_id: bloop.player_id,
246            client_id: bloop.client_id.clone(),
247            recorded_at: bloop.recorded_at,
248        }
249    }
250}
251
252/// Interface for persisting processed bloops asynchronously.
253pub trait BloopRepository {
254    type Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static;
255
256    fn persist_batch(
257        &self,
258        bloops: &[ProcessedBloop],
259    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
260}
261
262/// A sink that buffers processed bloops and persists them in batches.
263#[derive(Debug)]
264pub struct ProcessedBloopSink<R: BloopRepository> {
265    repository: R,
266    buffer: Vec<ProcessedBloop>,
267    max_batch_size: usize,
268    max_batch_duration: Duration,
269    event_rx: broadcast::Receiver<Event>,
270}
271
272impl<R: BloopRepository> ProcessedBloopSink<R> {
273    /// Processes incoming events, buffering bloops and persisting in batches.
274    pub async fn process_events(&mut self) {
275        let mut flush_interval = time::interval(self.max_batch_duration);
276
277        loop {
278            let should_continue = select! {
279                _ = flush_interval.tick() => {
280                    self.flush().await;
281                    true
282                }
283
284                result = self.event_rx.recv() => {
285                    self.handle_recv(result).await
286                }
287            };
288
289            if !should_continue {
290                break;
291            }
292        }
293    }
294
295    /// Flushes the buffered bloops to the repository.
296    pub async fn flush(&mut self) {
297        if self.buffer.is_empty() {
298            return;
299        }
300
301        let batch = std::mem::take(&mut self.buffer);
302
303        if let Err(err) = self.repository.persist_batch(&batch).await {
304            error!("Failed to persist bloop batch: {}", err);
305            self.buffer.extend(batch);
306        } else {
307            debug!("Persisted {} bloops", batch.len());
308        }
309    }
310
311    async fn handle_recv(&mut self, result: Result<Event, RecvError>) -> bool {
312        match result {
313            Ok(Event::BloopProcessed(bloop)) => {
314                self.buffer.push(bloop);
315
316                if self.buffer.len() >= self.max_batch_size {
317                    debug!("Batch size reached, flushing");
318                    self.flush().await;
319                }
320                true
321            }
322            Ok(_) => true,
323            Err(RecvError::Lagged(n)) => {
324                warn!("ProcessedBloopSink lagged by {n} messages, some bloops were missed");
325                self.flush().await;
326                true
327            }
328            Err(RecvError::Closed) => {
329                debug!("ProcessedBloopSink event stream closed, exiting event loop");
330                false
331            }
332        }
333    }
334}
335
336#[cfg(feature = "tokio-graceful-shutdown")]
337#[derive(Debug, Error)]
338pub enum NeverError {}
339
340#[cfg(feature = "tokio-graceful-shutdown")]
341impl<R> IntoSubsystem<NeverError> for ProcessedBloopSink<R>
342where
343    R: BloopRepository + Send + Sync + 'static,
344{
345    async fn run(mut self, subsys: &mut SubsystemHandle) -> Result<(), NeverError> {
346        let _ = self.process_events().cancel_on_shutdown(subsys).await;
347        self.flush().await;
348
349        Ok(())
350    }
351}
352
353#[derive(Debug, Error)]
354pub enum BuilderError {
355    #[error("missing field: {0}")]
356    MissingField(&'static str),
357}
358
359/// Builder for constructing a [`ProcessedBloopSink`].
360#[derive(Debug, Default)]
361pub struct ProcessedBloopSinkBuilder<R: BloopRepository> {
362    repository: Option<R>,
363    max_batch_size: Option<usize>,
364    max_batch_duration: Option<Duration>,
365    event_rx: Option<broadcast::Receiver<Event>>,
366}
367
368impl<R: BloopRepository> ProcessedBloopSinkBuilder<R> {
369    /// Creates a new, empty builder.
370    pub fn new() -> Self {
371        Self {
372            repository: None,
373            max_batch_size: None,
374            max_batch_duration: None,
375            event_rx: None,
376        }
377    }
378
379    /// Sets the repository used by the sink.
380    pub fn repository(mut self, repository: R) -> Self {
381        self.repository = Some(repository);
382        self
383    }
384
385    /// Sets the maximum number of bloops to buffer before flushing.
386    pub fn max_batch_size(mut self, size: usize) -> Self {
387        self.max_batch_size = Some(size);
388        self
389    }
390
391    /// Sets the maximum duration to wait before flushing the buffered bloops.
392    pub fn max_batch_duration(mut self, duration: Duration) -> Self {
393        self.max_batch_duration = Some(duration);
394        self
395    }
396
397    /// Sets the event receiver from which bloops are received.
398    pub fn event_rx(mut self, event_rx: broadcast::Receiver<Event>) -> Self {
399        self.event_rx = Some(event_rx);
400        self
401    }
402
403    /// Attempts to build the sink, returning an error if any required fields are
404    /// missing.
405    pub fn build(self) -> Result<ProcessedBloopSink<R>, BuilderError> {
406        Ok(ProcessedBloopSink {
407            repository: self
408                .repository
409                .ok_or(BuilderError::MissingField("repository"))?,
410            buffer: Vec::new(),
411            max_batch_size: self
412                .max_batch_size
413                .ok_or(BuilderError::MissingField("max_batch_size"))?,
414            max_batch_duration: self
415                .max_batch_duration
416                .ok_or(BuilderError::MissingField("max_batch_duration"))?,
417            event_rx: self
418                .event_rx
419                .ok_or(BuilderError::MissingField("event_rx"))?,
420        })
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427    use crate::bloop::{Bloop, BloopCollection, BloopProvider};
428    use crate::test_utils::MockPlayer;
429    use ntest::timeout;
430    use std::sync::{Arc, Mutex};
431    use std::time::Duration;
432    use tokio::sync::mpsc;
433
434    fn create_mock_bloop(recorded_at: DateTime<Utc>) -> Arc<Bloop<MockPlayer>> {
435        let (player, _) = MockPlayer::builder().build();
436        Arc::new(Bloop::new(player, "client-1", recorded_at))
437    }
438
439    #[test]
440    fn bloop_collection_adds_bloops_and_preserves_order() {
441        let now = Utc::now();
442        let bloop1 = create_mock_bloop(now);
443        let bloop2 = create_mock_bloop(now + Duration::from_secs(1));
444
445        let mut collection = BloopCollection::new(Duration::from_secs(60));
446        collection.add(bloop2.clone());
447        collection.add(bloop1.clone());
448
449        let bloops: Vec<_> = collection.iter().collect();
450        assert_eq!(bloops.len(), 2);
451        assert_eq!(bloops[0].recorded_at, bloop1.recorded_at);
452        assert_eq!(bloops[1].recorded_at, bloop2.recorded_at);
453    }
454
455    #[test]
456    fn bloop_collection_prunes_bloops_older_than_max_age() {
457        let mut collection = BloopCollection::new(Duration::from_secs(5));
458        let now = Utc::now();
459
460        let old_bloop = create_mock_bloop(now - Duration::from_secs(10));
461        let recent_bloop = create_mock_bloop(now - Duration::from_secs(2));
462
463        collection.add(old_bloop);
464        collection.add(recent_bloop.clone());
465
466        let bloops: Vec<_> = collection.iter().collect();
467        assert_eq!(bloops.len(), 1);
468        assert_eq!(bloops[0].recorded_at, recent_bloop.recorded_at);
469    }
470
471    #[test]
472    fn bloop_provider_adds_bloops_to_global_and_per_client_collections() {
473        let mut provider = BloopProvider::new(Duration::from_secs(60));
474        let now = Utc::now();
475        let bloop = create_mock_bloop(now);
476        let client_id = bloop.client_id.clone();
477
478        provider.add(bloop.clone());
479
480        let global_bloops: Vec<_> = provider.global().iter().collect();
481        assert_eq!(global_bloops.len(), 1);
482        assert_eq!(global_bloops[0].recorded_at, bloop.recorded_at);
483
484        let client_bloops: Vec<_> = provider.for_client(&client_id).iter().collect();
485        assert_eq!(client_bloops.len(), 1);
486        assert_eq!(client_bloops[0].recorded_at, bloop.recorded_at);
487    }
488
489    #[test]
490    fn bloop_provider_correctly_initializes_from_existing_bloops() {
491        let now = Utc::now();
492        let (player1, _) = MockPlayer::builder().build();
493        let (player2, _) = MockPlayer::builder().build();
494
495        let bloop1 = Bloop::new(player1.clone(), "client-1", now);
496        let bloop2 = Bloop::new(player1, "client-1", now + Duration::from_secs(1));
497        let bloop3 = Bloop::new(player2, "client-2", now);
498
499        let provider =
500            BloopProvider::with_bloops(Duration::from_secs(60), vec![bloop1, bloop2, bloop3]);
501
502        let global_bloops: Vec<_> = provider.global().iter().collect();
503        assert_eq!(global_bloops.len(), 3);
504
505        let client1_bloops: Vec<_> = provider.for_client("client-1").iter().collect();
506        assert_eq!(client1_bloops.len(), 2);
507        assert!(client1_bloops.iter().any(|b| b.recorded_at == now));
508        assert!(
509            client1_bloops
510                .iter()
511                .any(|b| b.recorded_at == now + Duration::from_secs(1))
512        );
513
514        let client2_bloops: Vec<_> = provider.for_client("client-2").iter().collect();
515        assert_eq!(client2_bloops.len(), 1);
516        assert_eq!(client2_bloops[0].recorded_at, now);
517    }
518
519    #[test]
520    fn bloops_since_filter_correctly_filters_based_on_timestamp() {
521        let now = Utc::now();
522        let bloop1 = create_mock_bloop(now);
523        let bloop2 = create_mock_bloop(now + Duration::from_secs(10));
524
525        let bloops = vec![bloop1.clone(), bloop2.clone()];
526        let since = now + Duration::from_secs(5);
527        let filtered: Vec<_> = bloops.iter().filter(bloops_since(since)).collect();
528
529        assert_eq!(filtered.len(), 1);
530        assert_eq!(filtered[0].recorded_at, bloop2.recorded_at);
531    }
532
533    #[test]
534    fn bloops_for_player_filter_correctly_filters_based_on_player_id() {
535        let now = Utc::now();
536        let bloop1 = create_mock_bloop(now);
537        let (player2, player2_id) = MockPlayer::builder().build();
538        let bloop2 = Arc::new(Bloop::new(player2, "client-1", now));
539
540        let bloops = vec![bloop1.clone(), bloop2.clone()];
541        let filtered: Vec<_> = bloops
542            .iter()
543            .filter(bloops_for_player(player2_id))
544            .collect();
545
546        assert_eq!(filtered.len(), 1);
547        assert_eq!(filtered[0].player_id, player2_id);
548    }
549
550    #[derive(Clone)]
551    struct DummyRepo {
552        sender: mpsc::UnboundedSender<Vec<ProcessedBloop>>,
553        fail_persist: Arc<Mutex<bool>>,
554    }
555
556    impl DummyRepo {
557        fn new(sender: mpsc::UnboundedSender<Vec<ProcessedBloop>>) -> Self {
558            Self {
559                sender,
560                fail_persist: Arc::new(Mutex::new(false)),
561            }
562        }
563
564        fn set_fail(&self, fail: bool) {
565            *self.fail_persist.lock().unwrap() = fail;
566        }
567    }
568
569    impl BloopRepository for DummyRepo {
570        type Error = &'static str;
571
572        async fn persist_batch(&self, bloops: &[ProcessedBloop]) -> Result<(), Self::Error> {
573            if *self.fail_persist.lock().unwrap() {
574                return Err("fail");
575            }
576            self.sender.send(bloops.to_vec()).unwrap();
577            Ok(())
578        }
579    }
580
581    fn create_processed_bloop() -> ProcessedBloop {
582        ProcessedBloop {
583            player_id: Uuid::new_v4(),
584            client_id: "client-1".to_string(),
585            recorded_at: Utc::now(),
586        }
587    }
588
589    #[tokio::test]
590    async fn flush_persists_and_clears_buffer() {
591        let (tx, mut rx) = mpsc::unbounded_channel();
592        let repo = DummyRepo::new(tx);
593
594        let (_evt_tx, evt_rx) = broadcast::channel(16);
595
596        let mut sink = ProcessedBloopSinkBuilder::new()
597            .repository(repo.clone())
598            .max_batch_size(10)
599            .max_batch_duration(Duration::from_secs(5))
600            .event_rx(evt_rx)
601            .build()
602            .unwrap();
603
604        sink.buffer.push(create_processed_bloop());
605        sink.flush().await;
606
607        let batch = rx.recv().await.unwrap();
608        assert_eq!(batch.len(), 1);
609        assert!(sink.buffer.is_empty());
610    }
611
612    #[tokio::test]
613    async fn flush_retries_on_failure() {
614        let (tx, mut rx) = mpsc::unbounded_channel();
615        let repo = DummyRepo::new(tx);
616        repo.set_fail(true);
617
618        let (_evt_tx, evt_rx) = broadcast::channel(16);
619
620        let mut sink = ProcessedBloopSinkBuilder::new()
621            .repository(repo.clone())
622            .max_batch_size(10)
623            .max_batch_duration(Duration::from_secs(5))
624            .event_rx(evt_rx)
625            .build()
626            .unwrap();
627
628        sink.buffer.push(create_processed_bloop());
629        sink.flush().await;
630        assert_eq!(sink.buffer.len(), 1);
631
632        repo.set_fail(false);
633        sink.flush().await;
634
635        let batch = rx.recv().await.unwrap();
636        assert_eq!(batch.len(), 1);
637        assert!(sink.buffer.is_empty());
638    }
639
640    #[tokio::test]
641    #[timeout(1000)]
642    async fn process_events_flushes_on_batch_size() {
643        let (tx, mut rx) = mpsc::unbounded_channel();
644        let repo = DummyRepo::new(tx);
645        let (evt_tx, evt_rx) = broadcast::channel(16);
646
647        let mut sink = ProcessedBloopSinkBuilder::new()
648            .repository(repo.clone())
649            .max_batch_size(2)
650            .max_batch_duration(Duration::from_secs(10))
651            .event_rx(evt_rx)
652            .build()
653            .unwrap();
654
655        let bloop1 = create_processed_bloop();
656        let bloop2 = create_processed_bloop();
657
658        let handle = tokio::spawn(async move { sink.process_events().await });
659
660        evt_tx.send(Event::BloopProcessed(bloop1.clone())).unwrap();
661        evt_tx.send(Event::BloopProcessed(bloop2.clone())).unwrap();
662
663        let batch = rx.recv().await.unwrap();
664        assert_eq!(batch.len(), 2);
665
666        drop(evt_tx);
667
668        handle.await.unwrap();
669    }
670
671    #[tokio::test]
672    async fn handle_recv_returns_false_on_closed() {
673        let (tx, _rx) = mpsc::unbounded_channel();
674        let repo = DummyRepo::new(tx);
675        let (evt_tx, evt_rx) = broadcast::channel(16);
676
677        let mut sink = ProcessedBloopSinkBuilder::new()
678            .repository(repo.clone())
679            .max_batch_size(10)
680            .max_batch_duration(Duration::from_secs(10))
681            .event_rx(evt_rx)
682            .build()
683            .unwrap();
684
685        drop(evt_tx);
686
687        let recv_result = sink.event_rx.recv().await;
688        let result = sink.handle_recv(recv_result).await;
689        assert!(!result);
690    }
691}