bloop_server_framework/
engine.rs

1use crate::achievement::{Achievement, AchievementAwardBatch, AchievementContext, AwardedTracker};
2use crate::bloop::{Bloop, BloopProvider, ProcessedBloop, bloops_since};
3use crate::event::Event;
4use crate::message::{AchievementRecord, DataHash, ErrorResponse, ServerMessage};
5use crate::nfc_uid::NfcUid;
6use crate::player::{PlayerInfo, PlayerMutator, PlayerRegistry};
7use crate::trigger::TriggerRegistry;
8#[cfg(feature = "tokio-graceful-shutdown")]
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use serde::Deserialize;
12use std::collections::{HashMap, HashSet};
13use std::fmt::Debug;
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::Duration;
18use thiserror::Error;
19use tokio::fs::File;
20use tokio::io::AsyncReadExt;
21use tokio::sync::{Mutex, broadcast, mpsc, oneshot};
22#[cfg(feature = "tokio-graceful-shutdown")]
23use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
24use tracing::{info, instrument, warn};
25use uuid::Uuid;
26
27#[derive(Debug)]
28pub enum EngineRequest {
29    Bloop { client_id: String, nfc_uid: NfcUid },
30    RetrieveAudio { id: Uuid },
31    PreloadCheck { manifest_hash: Option<DataHash> },
32}
33
34#[derive(Debug, Deserialize)]
35pub struct Throttle {
36    max_bloops: usize,
37    threshold: Duration,
38}
39
40impl Throttle {
41    pub fn new(max_bloops: usize, threshold: Duration) -> Self {
42        Self {
43            max_bloops,
44            threshold,
45        }
46    }
47}
48
49struct HotAchievement {
50    id: Uuid,
51    client_id: String,
52    until: DateTime<Utc>,
53}
54
55impl HotAchievement {
56    fn new<Player: PlayerInfo>(id: Uuid, bloop: &Bloop<Player>, duration: Duration) -> Self {
57        Self {
58            id,
59            client_id: bloop.client_id.clone(),
60            until: bloop.recorded_at + duration,
61        }
62    }
63}
64
65pub struct Engine<Metadata, Player, State, Trigger>
66where
67    Player: PlayerInfo + PlayerMutator,
68    Trigger: Copy,
69{
70    bloop_provider: BloopProvider<Player>,
71    achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>>,
72    audio_base_path: PathBuf,
73    audio_file_hashes: HashMap<Uuid, DataHash>,
74    audio_manifest_hash: DataHash,
75    player_registry: Arc<Mutex<PlayerRegistry<Player>>>,
76    state: Arc<Mutex<State>>,
77    trigger_registry: TriggerRegistry<Trigger>,
78    hot_achievements: Vec<HotAchievement>,
79    network_rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
80    event_tx: broadcast::Sender<Event>,
81    throttle: Option<Throttle>,
82}
83
84impl<Metadata, Player, State, Trigger> Engine<Metadata, Player, State, Trigger>
85where
86    Player: PlayerInfo + PlayerMutator,
87    Trigger: Copy,
88{
89    pub async fn process_requests(&mut self) {
90        while let Some((request, response)) = self.network_rx.recv().await {
91            match request {
92                EngineRequest::Bloop { nfc_uid, client_id } => {
93                    self.handle_bloop(nfc_uid, client_id, response).await;
94                }
95                EngineRequest::RetrieveAudio { id } => {
96                    self.handle_retrieve_audio(id, response);
97                }
98                EngineRequest::PreloadCheck { manifest_hash } => {
99                    self.handle_preload_check(manifest_hash, response);
100                }
101            }
102        }
103    }
104
105    #[instrument(skip(self, response))]
106    async fn handle_bloop(
107        &mut self,
108        nfc_uid: NfcUid,
109        client_id: String,
110        response: oneshot::Sender<ServerMessage>,
111    ) {
112        if self
113            .trigger_registry
114            .try_activate_trigger(nfc_uid, &client_id)
115        {
116            let _ = response.send(ServerMessage::BloopAccepted {
117                achievements: Vec::new(),
118            });
119            return;
120        }
121
122        let player = {
123            let player_registry = self.player_registry.lock().await;
124            let Some(player) = player_registry.get_by_nfc_uid(nfc_uid) else {
125                let _ = response.send(ServerMessage::Error(ErrorResponse::UnknownNfcUid));
126                return;
127            };
128            player
129        };
130
131        if let Some(throttle) = self.throttle.as_ref() {
132            let player_id = player.read().unwrap().id();
133
134            let throttled = self
135                .bloop_provider
136                .for_client(&client_id)
137                .iter()
138                .filter(bloops_since(Utc::now() - throttle.threshold))
139                .take(throttle.max_bloops)
140                .all(|bloop| bloop.player_id == player_id);
141
142            if throttled {
143                let _ = response.send(ServerMessage::Error(ErrorResponse::NfcUidThrottled));
144                return;
145            }
146        }
147
148        player.write().unwrap().increment_bloops();
149        let bloop = Bloop::new(player.clone(), client_id, Utc::now());
150
151        let mut awarded_tracker = self.evaluate_achievements(&bloop).await;
152        self.activate_hot_achievements(&bloop, &awarded_tracker);
153        self.inject_hot_achievements(&bloop, &mut awarded_tracker);
154
155        let player_registry = self.player_registry.lock().await;
156        awarded_tracker.remove_duplicates(player_registry);
157
158        let achievement_ids: Vec<Uuid> = awarded_tracker
159            .for_player(bloop.player_id)
160            .map_or_else(Vec::new, |set| set.iter().cloned().collect());
161
162        self.apply_awarded(awarded_tracker).await;
163
164        let processed_bloop: ProcessedBloop = (&bloop).into();
165        self.bloop_provider.add(Arc::new(bloop));
166        let _ = self.event_tx.send(Event::BloopProcessed(processed_bloop));
167
168        let _ = response.send(ServerMessage::BloopAccepted {
169            achievements: achievement_ids
170                .into_iter()
171                .map(|id| AchievementRecord {
172                    id,
173                    audio_file_hash: self.audio_file_hashes.get(&id).cloned(),
174                })
175                .collect(),
176        });
177    }
178
179    async fn evaluate_achievements(&mut self, bloop: &Bloop<Player>) -> AwardedTracker {
180        let previous_awarded: HashSet<Uuid> = {
181            let player = bloop.player();
182            player.awarded_achievements().keys().cloned().collect()
183        };
184        let metadata = self.state.lock().await;
185        let ctx = AchievementContext::new(
186            bloop,
187            &self.bloop_provider,
188            &*metadata,
189            &mut self.trigger_registry,
190        );
191
192        for achievement in self.achievements.values() {
193            if !previous_awarded.contains(&achievement.id) {
194                achievement.evaluate(&ctx);
195            }
196        }
197
198        ctx.take_awarded_tracker()
199    }
200
201    fn inject_hot_achievements(
202        &mut self,
203        bloop: &Bloop<Player>,
204        awarded_tracker: &mut AwardedTracker,
205    ) {
206        let awarded = awarded_tracker.for_player_mut(bloop.player_id);
207
208        self.hot_achievements.retain(|hot_achievement| {
209            if hot_achievement.until < bloop.recorded_at {
210                return false;
211            }
212
213            if hot_achievement.client_id == bloop.client_id {
214                awarded.insert(hot_achievement.id);
215            }
216
217            true
218        });
219    }
220
221    fn activate_hot_achievements(
222        &mut self,
223        bloop: &Bloop<Player>,
224        awarded_tracker: &AwardedTracker,
225    ) {
226        let Some(awarded) = awarded_tracker.for_player(bloop.player_id) else {
227            return;
228        };
229
230        for achievement_id in awarded {
231            let Some(achievement) = self.achievements.get(achievement_id) else {
232                continue;
233            };
234
235            if let Some(hot_duration) = achievement.hot_duration {
236                self.hot_achievements.push(HotAchievement::new(
237                    *achievement_id,
238                    bloop,
239                    hot_duration,
240                ));
241            };
242        }
243    }
244
245    async fn apply_awarded(&self, tracker: AwardedTracker) {
246        let mut player_registry = self.player_registry.lock().await;
247        let batch: AchievementAwardBatch = tracker.into();
248
249        for player_awards in batch.players.iter() {
250            player_registry.mutate_by_id(player_awards.player_id, |player| {
251                for achievement_id in player_awards.achievement_ids.iter() {
252                    player.add_awarded_achievement(*achievement_id, batch.awarded_at);
253                }
254            });
255        }
256
257        let _ = self.event_tx.send(Event::AchievementsAwarded(batch));
258    }
259
260    #[instrument(skip(self, response))]
261    fn handle_retrieve_audio(&self, id: Uuid, response: oneshot::Sender<ServerMessage>) {
262        let Some(achievement) = self.achievements.get(&id) else {
263            info!("Client requested unknown achievement: {}", id);
264            let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
265            return;
266        };
267
268        let Some(path) = achievement.audio_path.as_ref() else {
269            info!("Client requested audio for audio-less achievement: {}", id);
270            let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
271            return;
272        };
273        let path = self.audio_base_path.join(path);
274
275        tokio::spawn(async move {
276            let mut file = match File::open(&path).await {
277                Ok(file) => file,
278                Err(err) => {
279                    warn!("Failed to open file {:?}: {:?}", &path, err);
280                    let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
281                    return;
282                }
283            };
284
285            let mut data = vec![];
286
287            if let Err(err) = file.read_to_end(&mut data).await {
288                warn!("Failed to read file {:?}: {:?}", &path, err);
289                let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
290                return;
291            }
292
293            let _ = response.send(ServerMessage::AudioData { data });
294        });
295    }
296
297    #[instrument(skip(self, response))]
298    fn handle_preload_check(
299        &self,
300        manifest_hash: Option<DataHash>,
301        response: oneshot::Sender<ServerMessage>,
302    ) {
303        if let Some(manifest_hash) = manifest_hash {
304            if manifest_hash == self.audio_manifest_hash {
305                let _ = response.send(ServerMessage::PreloadMatch);
306                return;
307            }
308        }
309
310        let _ = response.send(ServerMessage::PreloadMismatch {
311            audio_manifest_hash: self.audio_manifest_hash,
312            achievements: self
313                .achievements
314                .values()
315                .map(|achievement| AchievementRecord {
316                    id: achievement.id,
317                    audio_file_hash: self.audio_file_hashes.get(&achievement.id).cloned(),
318                })
319                .collect(),
320        });
321    }
322}
323
324#[cfg(feature = "tokio-graceful-shutdown")]
325#[derive(Debug, Error)]
326pub enum NeverError {}
327
328#[cfg(feature = "tokio-graceful-shutdown")]
329#[async_trait]
330impl<Metadata, Player, State, Trigger> IntoSubsystem<NeverError>
331    for Engine<Metadata, Player, State, Trigger>
332where
333    Metadata: Send + Sync + 'static,
334    Player: PlayerInfo + PlayerMutator + Send + Sync + 'static,
335    State: Send + Sync + 'static,
336    Trigger: Copy + PartialEq + Eq + Debug + Send + Sync + 'static,
337{
338    async fn run(mut self, subsys: SubsystemHandle) -> Result<(), NeverError> {
339        let _ = self.process_requests().cancel_on_shutdown(&subsys).await;
340        Ok(())
341    }
342}
343
344#[derive(Debug, Error)]
345pub enum BuilderError {
346    #[error("missing field: {0}")]
347    MissingField(&'static str),
348}
349
350#[derive(Debug, Default)]
351pub struct EngineBuilder<Player, State = (), Trigger = (), Metadata = ()>
352where
353    Player: PlayerInfo + PlayerMutator,
354    Metadata: Default,
355    Trigger: Copy,
356{
357    bloops: Vec<Bloop<Player>>,
358    achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
359    bloop_retention: Option<Duration>,
360    audio_base_path: Option<PathBuf>,
361    player_registry: Option<Arc<Mutex<PlayerRegistry<Player>>>>,
362    state: Option<Arc<Mutex<State>>>,
363    trigger_registry: Option<TriggerRegistry<Trigger>>,
364    network_rx: Option<mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>>,
365    event_tx: Option<broadcast::Sender<Event>>,
366    throttle: Option<Throttle>,
367}
368
369impl<Player, State, Trigger, Metadata> EngineBuilder<Player, State, Trigger, Metadata>
370where
371    Player: PlayerInfo + PlayerMutator,
372    Metadata: Default,
373    State: Default,
374    Trigger: Copy + PartialEq + Eq + Debug,
375{
376    pub fn new() -> Self {
377        Self {
378            bloops: Vec::new(),
379            achievements: Vec::new(),
380            bloop_retention: None,
381            audio_base_path: None,
382            player_registry: None,
383            state: None,
384            trigger_registry: None,
385            network_rx: None,
386            event_tx: None,
387            throttle: None,
388        }
389    }
390
391    pub fn bloops(mut self, bloops: Vec<Bloop<Player>>) -> Self {
392        self.bloops = bloops;
393        self
394    }
395
396    pub fn achievements(
397        mut self,
398        achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
399    ) -> Self {
400        self.achievements = achievements;
401        self
402    }
403
404    pub fn bloop_retention(mut self, retention: Duration) -> Self {
405        self.bloop_retention = Some(retention);
406        self
407    }
408
409    pub fn audio_base_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
410        self.audio_base_path = Some(path.into());
411        self
412    }
413
414    pub fn player_registry(mut self, registry: Arc<Mutex<PlayerRegistry<Player>>>) -> Self {
415        self.player_registry = Some(registry);
416        self
417    }
418
419    pub fn metadata(mut self, metadata: Arc<Mutex<State>>) -> Self {
420        self.state = Some(metadata);
421        self
422    }
423
424    pub fn trigger_registry(mut self, registry: TriggerRegistry<Trigger>) -> Self {
425        self.trigger_registry = Some(registry);
426        self
427    }
428
429    pub fn network_rx(
430        mut self,
431        rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
432    ) -> Self {
433        self.network_rx = Some(rx);
434        self
435    }
436
437    pub fn event_tx(mut self, tx: broadcast::Sender<Event>) -> Self {
438        self.event_tx = Some(tx);
439        self
440    }
441
442    pub fn throttle(mut self, throttle: Throttle) -> Self {
443        self.throttle = Some(throttle);
444        self
445    }
446
447    /// Consumes the builder and constructs the Engine.
448    pub fn build(self) -> Result<Engine<Metadata, Player, State, Trigger>, BuilderError> {
449        let bloop_retention = self
450            .bloop_retention
451            .ok_or(BuilderError::MissingField("bloop_retention"))?;
452        let audio_base_path = self
453            .audio_base_path
454            .ok_or(BuilderError::MissingField("audio_base_path"))?;
455        let player_registry = self
456            .player_registry
457            .ok_or(BuilderError::MissingField("player_registry"))?;
458        let network_rx = self
459            .network_rx
460            .ok_or(BuilderError::MissingField("network_rx"))?;
461        let event_tx = self
462            .event_tx
463            .ok_or(BuilderError::MissingField("event_tx"))?;
464
465        let (audio_hashes, global_hash) =
466            collect_audio_hashes(&audio_base_path, &self.achievements);
467        let bloop_provider = BloopProvider::with_bloops(bloop_retention, self.bloops);
468        let achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>> =
469            self.achievements.into_iter().map(|a| (a.id, a)).collect();
470        let state = self
471            .state
472            .unwrap_or_else(|| Arc::new(Mutex::new(Default::default())));
473        let trigger_registry = self
474            .trigger_registry
475            .unwrap_or_else(|| TriggerRegistry::new(HashMap::new()));
476        let hot_achievements = Vec::new();
477
478        Ok(Engine {
479            bloop_provider,
480            achievements,
481            audio_base_path,
482            audio_file_hashes: audio_hashes,
483            audio_manifest_hash: global_hash,
484            player_registry,
485            state,
486            trigger_registry,
487            hot_achievements,
488            network_rx,
489            event_tx,
490            throttle: self.throttle,
491        })
492    }
493}
494
495fn collect_audio_hashes<Metadata, Player, State, Trigger>(
496    audio_base_path: &Path,
497    achievements: &[Achievement<Metadata, Player, State, Trigger>],
498) -> (HashMap<Uuid, DataHash>, DataHash) {
499    let audio_file_hashes: HashMap<Uuid, DataHash> = achievements
500        .iter()
501        .filter_map(|achievement| {
502            let path = achievement.audio_path.as_ref()?;
503            let path = audio_base_path.join(path);
504
505            let Ok(file_content) = fs::read(path.clone()) else {
506                warn!("Audio file missing: {:?}", path);
507                return None;
508            };
509
510            let digest = md5::compute(file_content);
511
512            Some((achievement.id, digest.into()))
513        })
514        .collect();
515
516    let mut entries: Vec<_> = audio_file_hashes.iter().collect();
517    entries.sort_by_key(|(id, _)| *id);
518    let mut hash_input = Vec::with_capacity(entries.len() * 32);
519
520    for (id, hash) in entries {
521        hash_input.extend(id.as_bytes());
522        hash_input.extend_from_slice(hash.as_bytes());
523    }
524
525    let manifest_hash = md5::compute(hash_input);
526
527    (audio_file_hashes, manifest_hash.into())
528}
529
530#[cfg(test)]
531mod tests {
532    use super::*;
533    use crate::test_utils::MockPlayer;
534    use crate::trigger::{TriggerOccurrence, TriggerSpec};
535
536    fn build_test_engine() -> Engine<(), MockPlayer, (), ()> {
537        let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
538        let trigger_registry = TriggerRegistry::new(HashMap::new());
539
540        EngineBuilder::<MockPlayer, (), (), ()>::new()
541            .bloop_retention(Duration::from_secs(3600))
542            .audio_base_path("./audio")
543            .player_registry(player_registry)
544            .trigger_registry(trigger_registry)
545            .network_rx(mpsc::channel(1).1)
546            .event_tx(broadcast::channel(16).0)
547            .build()
548            .unwrap()
549    }
550
551    #[tokio::test]
552    async fn handle_bloop_rejects_unknown_nfc_uid() {
553        let mut engine = build_test_engine();
554        let unknown_nfc_uid = NfcUid::default();
555        let client_id = "test-client".to_string();
556
557        let (resp_tx, resp_rx) = oneshot::channel();
558        engine
559            .handle_bloop(unknown_nfc_uid, client_id.clone(), resp_tx)
560            .await;
561
562        let response = resp_rx.await.unwrap();
563        match response {
564            ServerMessage::Error(err) => {
565                assert!(matches!(err, ErrorResponse::UnknownNfcUid));
566            }
567            _ => panic!("Expected Error response for unknown NFC UID"),
568        }
569    }
570
571    #[tokio::test]
572    async fn handle_bloop_accepts_known_player() {
573        let mut engine = build_test_engine();
574        let nfc_uid = NfcUid::default();
575        let client_id = "test-client".to_string();
576
577        {
578            let mut registry = engine.player_registry.lock().await;
579            let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
580            registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
581        }
582
583        let (resp_tx, resp_rx) = oneshot::channel();
584        engine
585            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
586            .await;
587
588        let response = resp_rx.await.unwrap();
589        match response {
590            ServerMessage::BloopAccepted { achievements } => {
591                assert!(achievements.is_empty());
592            }
593            _ => panic!("Expected BloopAccepted response"),
594        }
595    }
596
597    #[tokio::test]
598    async fn handle_bloop_activates_trigger_and_responds() {
599        let mut trigger_registry = HashMap::new();
600        trigger_registry.insert(
601            NfcUid::default(),
602            TriggerSpec {
603                trigger: (),
604                global: false,
605                occurrence: TriggerOccurrence::Once,
606            },
607        );
608        let trigger_registry = TriggerRegistry::new(trigger_registry);
609
610        let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
611
612        let (_tx, rx) = mpsc::channel(1);
613        let (evt_tx, _) = broadcast::channel(16);
614
615        let mut engine = EngineBuilder::<MockPlayer, (), (), ()>::new()
616            .bloop_retention(Duration::from_secs(3600))
617            .audio_base_path("./audio")
618            .player_registry(player_registry)
619            .trigger_registry(trigger_registry)
620            .network_rx(rx)
621            .event_tx(evt_tx)
622            .build()
623            .unwrap();
624
625        let nfc_uid = NfcUid::default();
626        let client_id = "client".to_string();
627        let (resp_tx, resp_rx) = oneshot::channel();
628
629        engine
630            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
631            .await;
632
633        let response = resp_rx.await.unwrap();
634        match response {
635            ServerMessage::BloopAccepted { achievements } => {
636                assert!(achievements.is_empty());
637            }
638            _ => panic!("Expected BloopAccepted response"),
639        }
640
641        assert!(
642            engine
643                .trigger_registry
644                .check_active_trigger((), "client", Utc::now())
645        );
646    }
647
648    #[tokio::test]
649    async fn handle_bloop_respects_throttling() {
650        let mut engine = build_test_engine();
651        let nfc_uid = NfcUid::default();
652        let client_id = "test-client".to_string();
653
654        {
655            let mut registry = engine.player_registry.lock().await;
656            let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
657            registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
658        }
659
660        engine.throttle = Some(Throttle::new(1, Duration::from_secs(10)));
661
662        let bloop = Bloop::new(
663            engine
664                .player_registry
665                .lock()
666                .await
667                .get_by_nfc_uid(nfc_uid)
668                .unwrap(),
669            client_id.clone(),
670            Utc::now(),
671        );
672        engine.bloop_provider.add(Arc::new(bloop));
673
674        let (resp_tx, resp_rx) = oneshot::channel();
675        engine
676            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
677            .await;
678
679        let response = resp_rx.await.unwrap();
680        match response {
681            ServerMessage::Error(err) => {
682                assert!(matches!(err, ErrorResponse::NfcUidThrottled));
683            }
684            _ => panic!("Expected throttling error"),
685        }
686    }
687}