bloop_server_framework/
bloop.rs

1//! Module for handling NFC scan events ("bloops").
2//!
3//! This module defines:
4//!
5//! - [`Bloop`]: Represents a single NFC scan event tied to a player and client.
6//! - [`BloopCollection`]: A time-limited collection of bloops with pruning of
7//!   old entries.
8//! - [`BloopProvider`]: Provides access to bloops globally and per client.
9//! - [`ProcessedBloop`]: A simplified representation for persistence.
10//! - [`BloopRepository`]: Async trait abstraction for persisting processed
11//!   bloops.
12//! - [`ProcessedBloopSink`]: Buffers and persists bloops in batches driven by
13//!   event notifications.
14//!
15//! The system enables tracking and managing bloops efficiently with
16//! asynchronous persistence. Features optional integration with graceful
17//! shutdown via the `tokio-graceful-shutdown` crate.
18
19use crate::event::Event;
20use crate::player::PlayerInfo;
21use async_trait::async_trait;
22use chrono::{DateTime, Utc};
23use std::cmp::Reverse;
24use std::collections::{HashMap, VecDeque, vec_deque};
25use std::sync::{Arc, RwLock, RwLockReadGuard};
26use std::time::Duration;
27use thiserror::Error;
28use tokio::sync::broadcast;
29use tokio::sync::broadcast::error::RecvError;
30use tokio::{select, time};
31#[cfg(feature = "tokio-graceful-shutdown")]
32use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
33use tracing::{debug, error, warn};
34use uuid::Uuid;
35
36/// Represents a single NFC tag scan ("bloop") event.
37///
38/// Each `Bloop` captures the details of an NFC scan performed by a player on a
39/// specific client device, including when the scan was recorded.
40#[derive(Debug, Clone)]
41pub struct Bloop<Player> {
42    /// The player who blooped,
43    player: Arc<RwLock<Player>>,
44
45    /// Player ID extracted from the assigned player.
46    ///
47    /// The player ID is accessible individually as it is immutable and thus does not require a read
48    /// lock on the player to be obtained.
49    pub player_id: Uuid,
50
51    /// The client ID this bloop occurred at.
52    pub client_id: String,
53
54    /// The time this bloop was recorded,
55    pub recorded_at: DateTime<Utc>,
56}
57
58impl<Player: PlayerInfo> Bloop<Player> {
59    /// Creates a new `Bloop` instance, extracting the player ID from the provided
60    /// player.
61    pub fn new(
62        player: Arc<RwLock<Player>>,
63        client_id: impl Into<String>,
64        recorded_at: DateTime<Utc>,
65    ) -> Self {
66        let player_id = player.read().unwrap().id();
67
68        Bloop {
69            player_id,
70            player,
71            client_id: client_id.into(),
72            recorded_at,
73        }
74    }
75}
76
77impl<Player> Bloop<Player> {
78    /// Returns a read lock guard to the player associated with this bloop.
79    ///
80    /// If only the player ID is needed, use the `player_id` field directly.
81    /// For multiple accesses, cache the guard to minimize locking overhead.
82    pub fn player(&self) -> RwLockReadGuard<Player> {
83        self.player.read().unwrap()
84    }
85}
86
87/// Returns a closure to filter bloops recorded at or after a given timestamp.
88#[inline]
89pub fn bloops_since<Player>(since: DateTime<Utc>) -> impl Fn(&&Arc<Bloop<Player>>) -> bool {
90    move |bloop| bloop.recorded_at >= since
91}
92
93/// Returns a closure to filter bloops belonging to a specific player.
94#[inline]
95pub fn bloops_for_player<Player>(player_id: Uuid) -> impl Fn(&&Arc<Bloop<Player>>) -> bool {
96    move |bloop| bloop.player_id == player_id
97}
98
99/// A collection of bloops with a maximum age threshold.
100///
101/// Automatically prunes bloops which are older than the specified maximum age.
102#[derive(Debug)]
103pub struct BloopCollection<Player> {
104    bloops: VecDeque<Arc<Bloop<Player>>>,
105    max_age: Duration,
106}
107
108impl<Player> BloopCollection<Player> {
109    /// Creates an empty collection with the specified maximum age.
110    pub fn new(max_age: Duration) -> Self {
111        Self {
112            bloops: VecDeque::new(),
113            max_age,
114        }
115    }
116
117    /// Creates a collection pre-populated with bloops, sorted by most recent.
118    pub fn with_bloops(max_age: Duration, mut bloops: Vec<Arc<Bloop<Player>>>) -> Self {
119        bloops.sort_by_key(|bloop| Reverse(bloop.recorded_at));
120
121        let mut collection = Self::new(max_age);
122        collection.bloops.extend(bloops);
123
124        collection
125    }
126
127    /// Adds a bloop to the collection, pruning old bloops exceeding the max age.
128    pub fn add(&mut self, bloop: Arc<Bloop<Player>>) {
129        let threshold = Utc::now() - self.max_age;
130
131        while let Some(oldest) = self.bloops.back() {
132            if oldest.recorded_at < threshold {
133                self.bloops.pop_back();
134            } else {
135                break;
136            }
137        }
138
139        self.bloops.push_front(bloop);
140    }
141
142    /// Returns an iterator over all bloops in the collection.
143    pub fn iter(&self) -> impl Iterator<Item = &Arc<Bloop<Player>>> {
144        self.bloops.iter()
145    }
146}
147
148impl<'a, Player: 'a> IntoIterator for &'a BloopCollection<Player> {
149    type Item = &'a Arc<Bloop<Player>>;
150    type IntoIter = vec_deque::Iter<'a, Arc<Bloop<Player>>>;
151
152    fn into_iter(self) -> Self::IntoIter {
153        self.bloops.iter()
154    }
155}
156
157/// Provides access to bloops globally and per client, respecting a maximum age.
158#[derive(Debug)]
159pub struct BloopProvider<Player> {
160    global: BloopCollection<Player>,
161    per_client: HashMap<String, BloopCollection<Player>>,
162    max_age: Duration,
163    empty_collection: BloopCollection<Player>,
164}
165
166impl<Player> BloopProvider<Player> {
167    /// Creates a new provider with no bloops.
168    pub fn new(max_age: Duration) -> Self {
169        Self {
170            global: BloopCollection::new(max_age),
171            per_client: HashMap::new(),
172            max_age,
173            empty_collection: BloopCollection::new(max_age),
174        }
175    }
176
177    /// Creates a provider initialized with a set of bloops.
178    pub fn with_bloops(max_age: Duration, bloops: Vec<Bloop<Player>>) -> Self {
179        let bloops: Vec<Arc<Bloop<Player>>> = bloops.into_iter().map(Arc::new).collect();
180        let global_collection = BloopCollection::with_bloops(max_age, bloops.clone());
181
182        let mut per_client: HashMap<String, BloopCollection<Player>> = HashMap::new();
183        let mut client_groups: HashMap<String, Vec<Arc<Bloop<Player>>>> = HashMap::new();
184
185        for bloop in bloops {
186            client_groups
187                .entry(bloop.client_id.clone())
188                .or_default()
189                .push(bloop.clone());
190        }
191
192        for (client_id, client_bloops) in client_groups {
193            let collection = BloopCollection::with_bloops(max_age, client_bloops);
194            per_client.insert(client_id, collection);
195        }
196
197        Self {
198            global: global_collection,
199            per_client,
200            max_age,
201            empty_collection: BloopCollection::new(max_age),
202        }
203    }
204
205    /// Adds a bloop to the global and client-specific collections.
206    pub fn add(&mut self, bloop: Arc<Bloop<Player>>) {
207        self.global.add(bloop.clone());
208
209        let client_collection = self
210            .per_client
211            .entry(bloop.client_id.clone())
212            .or_insert_with(|| BloopCollection::new(self.max_age));
213
214        client_collection.add(bloop.clone());
215    }
216
217    /// Returns a reference to the global collection of bloops.
218    pub fn global(&self) -> &BloopCollection<Player> {
219        &self.global
220    }
221
222    /// Returns a reference to the collection of bloops for a specific client.
223    pub fn for_client(&self, client_id: &str) -> &BloopCollection<Player> {
224        self.per_client
225            .get(client_id)
226            .unwrap_or(&self.empty_collection)
227    }
228}
229
230/// A simplified representation of a bloop suitable for persistence.
231#[derive(Debug, Clone)]
232pub struct ProcessedBloop {
233    /// Player ID associated with the bloop.
234    pub player_id: Uuid,
235
236    /// The client ID this bloop occurred at.
237    pub client_id: String,
238
239    /// The time this bloop was recorded.
240    pub recorded_at: DateTime<Utc>,
241}
242
243impl<Player> From<&Bloop<Player>> for ProcessedBloop {
244    fn from(bloop: &Bloop<Player>) -> Self {
245        Self {
246            player_id: bloop.player_id,
247            client_id: bloop.client_id.clone(),
248            recorded_at: bloop.recorded_at,
249        }
250    }
251}
252
253/// Interface for persisting processed bloops asynchronously.
254#[async_trait]
255pub trait BloopRepository {
256    type Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static;
257
258    async fn persist_batch(&self, bloops: &[ProcessedBloop]) -> Result<(), Self::Error>;
259}
260
261#[derive(Debug, Error)]
262enum SinkError {
263    #[error("Repository error: {0}")]
264    Repository(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
265}
266
267/// Sink that buffers processed bloops and persists them in batches.
268#[derive(Debug)]
269pub struct ProcessedBloopSink<R: BloopRepository> {
270    repository: R,
271    buffer: Vec<ProcessedBloop>,
272    max_batch_size: usize,
273    max_batch_duration: Duration,
274    event_rx: broadcast::Receiver<Event>,
275}
276
277impl<R: BloopRepository> ProcessedBloopSink<R> {
278    /// Processes incoming events, buffering bloops and persisting in batches.
279    pub async fn process_events(&mut self) {
280        let mut flush_interval = time::interval(self.max_batch_duration);
281
282        loop {
283            let should_continue = select! {
284                _ = flush_interval.tick() => {
285                    self.flush().await;
286                    true
287                }
288
289                result = self.event_rx.recv() => {
290                    self.handle_recv(result).await
291                }
292            };
293
294            if !should_continue {
295                break;
296            }
297        }
298    }
299
300    /// Flushes the buffered bloops to the repository.
301    pub async fn flush(&mut self) {
302        if self.buffer.is_empty() {
303            return;
304        }
305
306        let batch = std::mem::take(&mut self.buffer);
307
308        if let Err(err) = self.repository.persist_batch(&batch).await {
309            error!("Failed to persist bloop batch: {}", err);
310            self.buffer.extend(batch);
311        } else {
312            debug!("Persisted {} bloops", batch.len());
313        }
314    }
315
316    async fn handle_recv(&mut self, result: Result<Event, RecvError>) -> bool {
317        match result {
318            Ok(Event::BloopProcessed(bloop)) => {
319                self.buffer.push(bloop);
320
321                if self.buffer.len() >= self.max_batch_size {
322                    debug!("Batch size reached, flushing");
323                    self.flush().await;
324                }
325                true
326            }
327            Ok(_) => true,
328            Err(RecvError::Lagged(n)) => {
329                warn!("ProcessedBloopSink lagged by {n} messages, some bloops were missed");
330                self.flush().await;
331                true
332            }
333            Err(RecvError::Closed) => {
334                debug!("ProcessedBloopSink event stream closed, exiting event loop");
335                false
336            }
337        }
338    }
339}
340
341#[cfg(feature = "tokio-graceful-shutdown")]
342#[derive(Debug, Error)]
343pub enum NeverError {}
344
345#[cfg(feature = "tokio-graceful-shutdown")]
346#[async_trait]
347impl<R> IntoSubsystem<NeverError> for ProcessedBloopSink<R>
348where
349    R: BloopRepository + Send + Sync + 'static,
350{
351    async fn run(mut self, subsys: SubsystemHandle) -> Result<(), NeverError> {
352        let _ = self.process_events().cancel_on_shutdown(&subsys).await;
353        self.flush().await;
354
355        Ok(())
356    }
357}
358
359#[derive(Debug, Error)]
360pub enum BuilderError {
361    #[error("missing field: {0}")]
362    MissingField(&'static str),
363}
364
365/// Builder for constructing a [`ProcessedBloopSink`].
366#[derive(Debug, Default)]
367pub struct ProcessedBloopSinkBuilder<R: BloopRepository> {
368    repository: Option<R>,
369    max_batch_size: Option<usize>,
370    max_batch_duration: Option<Duration>,
371    event_rx: Option<broadcast::Receiver<Event>>,
372}
373
374impl<R: BloopRepository> ProcessedBloopSinkBuilder<R> {
375    /// Creates a new, empty builder.
376    pub fn new() -> Self {
377        Self {
378            repository: None,
379            max_batch_size: None,
380            max_batch_duration: None,
381            event_rx: None,
382        }
383    }
384
385    /// Sets the repository used by the sink.
386    pub fn repository(mut self, repository: R) -> Self {
387        self.repository = Some(repository);
388        self
389    }
390
391    /// Sets the maximum number of bloops to buffer before flushing.
392    pub fn max_batch_size(mut self, size: usize) -> Self {
393        self.max_batch_size = Some(size);
394        self
395    }
396
397    /// Sets the maximum duration to wait before flushing the buffered bloops.
398    pub fn max_batch_duration(mut self, duration: Duration) -> Self {
399        self.max_batch_duration = Some(duration);
400        self
401    }
402
403    /// Sets the event receiver from which bloops are received.
404    pub fn event_rx(mut self, event_rx: broadcast::Receiver<Event>) -> Self {
405        self.event_rx = Some(event_rx);
406        self
407    }
408
409    /// Attempts to build the sink, returning an error if any required fields are
410    /// missing.
411    pub fn build(self) -> Result<ProcessedBloopSink<R>, BuilderError> {
412        Ok(ProcessedBloopSink {
413            repository: self
414                .repository
415                .ok_or(BuilderError::MissingField("repository"))?,
416            buffer: Vec::new(),
417            max_batch_size: self
418                .max_batch_size
419                .ok_or(BuilderError::MissingField("max_batch_size"))?,
420            max_batch_duration: self
421                .max_batch_duration
422                .ok_or(BuilderError::MissingField("max_batch_duration"))?,
423            event_rx: self
424                .event_rx
425                .ok_or(BuilderError::MissingField("event_rx"))?,
426        })
427    }
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433    use crate::bloop::{Bloop, BloopCollection, BloopProvider};
434    use crate::test_utils::MockPlayer;
435    use ntest::timeout;
436    use std::sync::{Arc, Mutex};
437    use std::time::Duration;
438    use tokio::sync::mpsc;
439
440    fn create_mock_bloop(recorded_at: DateTime<Utc>) -> Arc<Bloop<MockPlayer>> {
441        let (player, _) = MockPlayer::builder().build();
442        Arc::new(Bloop::new(player, "client-1", recorded_at))
443    }
444
445    #[test]
446    fn bloop_collection_adds_bloops_and_preserves_order() {
447        let now = Utc::now();
448        let bloop1 = create_mock_bloop(now);
449        let bloop2 = create_mock_bloop(now + Duration::from_secs(1));
450
451        let mut collection = BloopCollection::new(Duration::from_secs(60));
452        collection.add(bloop2.clone());
453        collection.add(bloop1.clone());
454
455        let bloops: Vec<_> = collection.iter().collect();
456        assert_eq!(bloops.len(), 2);
457        assert_eq!(bloops[0].recorded_at, bloop1.recorded_at);
458        assert_eq!(bloops[1].recorded_at, bloop2.recorded_at);
459    }
460
461    #[test]
462    fn bloop_collection_prunes_bloops_older_than_max_age() {
463        let mut collection = BloopCollection::new(Duration::from_secs(5));
464        let now = Utc::now();
465
466        let old_bloop = create_mock_bloop(now - Duration::from_secs(10));
467        let recent_bloop = create_mock_bloop(now - Duration::from_secs(2));
468
469        collection.add(old_bloop);
470        collection.add(recent_bloop.clone());
471
472        let bloops: Vec<_> = collection.iter().collect();
473        assert_eq!(bloops.len(), 1);
474        assert_eq!(bloops[0].recorded_at, recent_bloop.recorded_at);
475    }
476
477    #[test]
478    fn bloop_provider_adds_bloops_to_global_and_per_client_collections() {
479        let mut provider = BloopProvider::new(Duration::from_secs(60));
480        let now = Utc::now();
481        let bloop = create_mock_bloop(now);
482        let client_id = bloop.client_id.clone();
483
484        provider.add(bloop.clone());
485
486        let global_bloops: Vec<_> = provider.global().iter().collect();
487        assert_eq!(global_bloops.len(), 1);
488        assert_eq!(global_bloops[0].recorded_at, bloop.recorded_at);
489
490        let client_bloops: Vec<_> = provider.for_client(&client_id).iter().collect();
491        assert_eq!(client_bloops.len(), 1);
492        assert_eq!(client_bloops[0].recorded_at, bloop.recorded_at);
493    }
494
495    #[test]
496    fn bloop_provider_correctly_initializes_from_existing_bloops() {
497        let now = Utc::now();
498        let (player1, _) = MockPlayer::builder().build();
499        let (player2, _) = MockPlayer::builder().build();
500
501        let bloop1 = Bloop::new(player1.clone(), "client-1", now);
502        let bloop2 = Bloop::new(player1, "client-1", now + Duration::from_secs(1));
503        let bloop3 = Bloop::new(player2, "client-2", now);
504
505        let provider =
506            BloopProvider::with_bloops(Duration::from_secs(60), vec![bloop1, bloop2, bloop3]);
507
508        let global_bloops: Vec<_> = provider.global().iter().collect();
509        assert_eq!(global_bloops.len(), 3);
510
511        let client1_bloops: Vec<_> = provider.for_client("client-1").iter().collect();
512        assert_eq!(client1_bloops.len(), 2);
513        assert!(client1_bloops.iter().any(|b| b.recorded_at == now));
514        assert!(
515            client1_bloops
516                .iter()
517                .any(|b| b.recorded_at == now + Duration::from_secs(1))
518        );
519
520        let client2_bloops: Vec<_> = provider.for_client("client-2").iter().collect();
521        assert_eq!(client2_bloops.len(), 1);
522        assert_eq!(client2_bloops[0].recorded_at, now);
523    }
524
525    #[test]
526    fn bloops_since_filter_correctly_filters_based_on_timestamp() {
527        let now = Utc::now();
528        let bloop1 = create_mock_bloop(now);
529        let bloop2 = create_mock_bloop(now + Duration::from_secs(10));
530
531        let bloops = vec![bloop1.clone(), bloop2.clone()];
532        let since = now + Duration::from_secs(5);
533        let filtered: Vec<_> = bloops.iter().filter(bloops_since(since)).collect();
534
535        assert_eq!(filtered.len(), 1);
536        assert_eq!(filtered[0].recorded_at, bloop2.recorded_at);
537    }
538
539    #[test]
540    fn bloops_for_player_filter_correctly_filters_based_on_player_id() {
541        let now = Utc::now();
542        let bloop1 = create_mock_bloop(now);
543        let (player2, player2_id) = MockPlayer::builder().build();
544        let bloop2 = Arc::new(Bloop::new(player2, "client-1", now));
545
546        let bloops = vec![bloop1.clone(), bloop2.clone()];
547        let filtered: Vec<_> = bloops
548            .iter()
549            .filter(bloops_for_player(player2_id))
550            .collect();
551
552        assert_eq!(filtered.len(), 1);
553        assert_eq!(filtered[0].player_id, player2_id);
554    }
555
556    #[derive(Clone)]
557    struct DummyRepo {
558        sender: mpsc::UnboundedSender<Vec<ProcessedBloop>>,
559        fail_persist: Arc<Mutex<bool>>,
560    }
561
562    impl DummyRepo {
563        fn new(sender: mpsc::UnboundedSender<Vec<ProcessedBloop>>) -> Self {
564            Self {
565                sender,
566                fail_persist: Arc::new(Mutex::new(false)),
567            }
568        }
569
570        fn set_fail(&self, fail: bool) {
571            *self.fail_persist.lock().unwrap() = fail;
572        }
573    }
574
575    #[async_trait::async_trait]
576    impl BloopRepository for DummyRepo {
577        type Error = &'static str;
578
579        async fn persist_batch(&self, bloops: &[ProcessedBloop]) -> Result<(), Self::Error> {
580            if *self.fail_persist.lock().unwrap() {
581                return Err("fail");
582            }
583            self.sender.send(bloops.to_vec()).unwrap();
584            Ok(())
585        }
586    }
587
588    fn create_processed_bloop() -> ProcessedBloop {
589        ProcessedBloop {
590            player_id: Uuid::new_v4(),
591            client_id: "client-1".to_string(),
592            recorded_at: Utc::now(),
593        }
594    }
595
596    #[tokio::test]
597    async fn flush_persists_and_clears_buffer() {
598        let (tx, mut rx) = mpsc::unbounded_channel();
599        let repo = DummyRepo::new(tx);
600
601        let (_evt_tx, evt_rx) = broadcast::channel(16);
602
603        let mut sink = ProcessedBloopSinkBuilder::new()
604            .repository(repo.clone())
605            .max_batch_size(10)
606            .max_batch_duration(Duration::from_secs(5))
607            .event_rx(evt_rx)
608            .build()
609            .unwrap();
610
611        sink.buffer.push(create_processed_bloop());
612        sink.flush().await;
613
614        let batch = rx.recv().await.unwrap();
615        assert_eq!(batch.len(), 1);
616        assert!(sink.buffer.is_empty());
617    }
618
619    #[tokio::test]
620    async fn flush_retries_on_failure() {
621        let (tx, mut rx) = mpsc::unbounded_channel();
622        let repo = DummyRepo::new(tx);
623        repo.set_fail(true);
624
625        let (_evt_tx, evt_rx) = broadcast::channel(16);
626
627        let mut sink = ProcessedBloopSinkBuilder::new()
628            .repository(repo.clone())
629            .max_batch_size(10)
630            .max_batch_duration(Duration::from_secs(5))
631            .event_rx(evt_rx)
632            .build()
633            .unwrap();
634
635        sink.buffer.push(create_processed_bloop());
636        sink.flush().await;
637        assert_eq!(sink.buffer.len(), 1);
638
639        repo.set_fail(false);
640        sink.flush().await;
641
642        let batch = rx.recv().await.unwrap();
643        assert_eq!(batch.len(), 1);
644        assert!(sink.buffer.is_empty());
645    }
646
647    #[tokio::test]
648    #[timeout(1000)]
649    async fn process_events_flushes_on_batch_size() {
650        let (tx, mut rx) = mpsc::unbounded_channel();
651        let repo = DummyRepo::new(tx);
652        let (evt_tx, evt_rx) = broadcast::channel(16);
653
654        let mut sink = ProcessedBloopSinkBuilder::new()
655            .repository(repo.clone())
656            .max_batch_size(2)
657            .max_batch_duration(Duration::from_secs(10))
658            .event_rx(evt_rx)
659            .build()
660            .unwrap();
661
662        let bloop1 = create_processed_bloop();
663        let bloop2 = create_processed_bloop();
664
665        let handle = tokio::spawn(async move { sink.process_events().await });
666
667        evt_tx.send(Event::BloopProcessed(bloop1.clone())).unwrap();
668        evt_tx.send(Event::BloopProcessed(bloop2.clone())).unwrap();
669
670        let batch = rx.recv().await.unwrap();
671        assert_eq!(batch.len(), 2);
672
673        drop(evt_tx);
674
675        handle.await.unwrap();
676    }
677
678    #[tokio::test]
679    async fn handle_recv_returns_false_on_closed() {
680        let (tx, _rx) = mpsc::unbounded_channel();
681        let repo = DummyRepo::new(tx);
682        let (evt_tx, evt_rx) = broadcast::channel(16);
683
684        let mut sink = ProcessedBloopSinkBuilder::new()
685            .repository(repo.clone())
686            .max_batch_size(10)
687            .max_batch_duration(Duration::from_secs(10))
688            .event_rx(evt_rx)
689            .build()
690            .unwrap();
691
692        drop(evt_tx);
693
694        let recv_result = sink.event_rx.recv().await;
695        let result = sink.handle_recv(recv_result).await;
696        assert!(!result);
697    }
698}