Skip to main content

bloop_server_framework/
engine.rs

1use crate::achievement::{Achievement, AchievementAwardBatch, AchievementContext, AwardedTracker};
2use crate::bloop::{Bloop, BloopProvider, ProcessedBloop, bloops_since};
3use crate::event::Event;
4use crate::message::{AchievementRecord, DataHash, ErrorResponse, ServerMessage};
5use crate::nfc_uid::NfcUid;
6use crate::player::{PlayerInfo, PlayerMutator, PlayerRegistry};
7use crate::trigger::TriggerRegistry;
8use chrono::{DateTime, Utc};
9use serde::Deserialize;
10use std::collections::{HashMap, HashSet};
11use std::fmt::Debug;
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::time::Duration;
15use thiserror::Error;
16use tokio::fs::File;
17use tokio::io::AsyncReadExt;
18use tokio::sync::{Mutex, broadcast, mpsc, oneshot};
19#[cfg(feature = "tokio-graceful-shutdown")]
20use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
21use tracing::{info, instrument, warn};
22use uuid::Uuid;
23
24#[derive(Debug)]
25pub enum EngineRequest {
26    Bloop { client_id: String, nfc_uid: NfcUid },
27    RetrieveAudio { id: Uuid },
28    PreloadCheck { manifest_hash: Option<DataHash> },
29}
30
31#[derive(Debug, Deserialize)]
32pub struct Throttle {
33    max_bloops: usize,
34    #[serde(with = "humantime_serde")]
35    threshold: Duration,
36}
37
38impl Throttle {
39    pub fn new(max_bloops: usize, threshold: Duration) -> Self {
40        Self {
41            max_bloops,
42            threshold,
43        }
44    }
45}
46
47struct HotAchievement {
48    id: Uuid,
49    client_id: String,
50    until: DateTime<Utc>,
51}
52
53impl HotAchievement {
54    fn new<Player: PlayerInfo>(id: Uuid, bloop: &Bloop<Player>, duration: Duration) -> Self {
55        Self {
56            id,
57            client_id: bloop.client_id.clone(),
58            until: bloop.recorded_at + duration,
59        }
60    }
61}
62
63pub struct Engine<Metadata, Player, State, Trigger>
64where
65    Player: PlayerInfo + PlayerMutator,
66    Trigger: Copy,
67{
68    bloop_provider: BloopProvider<Player>,
69    achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>>,
70    audio_base_path: PathBuf,
71    audio_manifest_hash: DataHash,
72    player_registry: Arc<Mutex<PlayerRegistry<Player>>>,
73    state: Arc<Mutex<State>>,
74    trigger_registry: TriggerRegistry<Trigger>,
75    hot_achievements: Vec<HotAchievement>,
76    network_rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
77    event_tx: broadcast::Sender<Event>,
78    throttle: Option<Throttle>,
79}
80
81impl<Metadata, Player, State, Trigger> Engine<Metadata, Player, State, Trigger>
82where
83    Player: PlayerInfo + PlayerMutator,
84    Trigger: Copy,
85{
86    pub async fn process_requests(&mut self) {
87        while let Some((request, response)) = self.network_rx.recv().await {
88            match request {
89                EngineRequest::Bloop { nfc_uid, client_id } => {
90                    self.handle_bloop(nfc_uid, client_id, response).await;
91                }
92                EngineRequest::RetrieveAudio { id } => {
93                    self.handle_retrieve_audio(id, response);
94                }
95                EngineRequest::PreloadCheck { manifest_hash } => {
96                    self.handle_preload_check(manifest_hash, response);
97                }
98            }
99        }
100    }
101
102    #[instrument(skip(self, response))]
103    async fn handle_bloop(
104        &mut self,
105        nfc_uid: NfcUid,
106        client_id: String,
107        response: oneshot::Sender<ServerMessage>,
108    ) {
109        if self
110            .trigger_registry
111            .try_activate_trigger(nfc_uid, &client_id)
112        {
113            let _ = response.send(ServerMessage::BloopAccepted {
114                achievements: Vec::new(),
115            });
116            return;
117        }
118
119        let player = {
120            let player_registry = self.player_registry.lock().await;
121            let Some(player) = player_registry.get_by_nfc_uid(nfc_uid) else {
122                let _ = response.send(ServerMessage::Error(ErrorResponse::UnknownNfcUid));
123                return;
124            };
125            player
126        };
127
128        if let Some(throttle) = self.throttle.as_ref() {
129            let player_id = player.read().unwrap().id();
130
131            let recent_bloops = self
132                .bloop_provider
133                .for_client(&client_id)
134                .iter()
135                .filter(bloops_since(Utc::now() - throttle.threshold))
136                .take(throttle.max_bloops)
137                .collect::<Vec<_>>();
138
139            if recent_bloops
140                .iter()
141                .all(|bloop| bloop.player_id == player_id)
142                && recent_bloops.len() == throttle.max_bloops
143            {
144                let _ = response.send(ServerMessage::Error(ErrorResponse::NfcUidThrottled));
145                return;
146            }
147        }
148
149        player.write().unwrap().increment_bloops();
150        let bloop = Bloop::new(player.clone(), client_id, Utc::now());
151
152        let mut awarded_tracker = self.evaluate_achievements(&bloop).await;
153        self.activate_hot_achievements(&bloop, &awarded_tracker);
154        self.inject_hot_achievements(&bloop, &mut awarded_tracker);
155
156        let player_registry = self.player_registry.lock().await;
157        awarded_tracker.remove_duplicates(player_registry);
158
159        let achievement_ids: Vec<Uuid> = awarded_tracker
160            .for_player(bloop.player_id)
161            .map_or_else(Vec::new, |set| set.iter().cloned().collect());
162
163        self.apply_awarded(awarded_tracker).await;
164
165        let processed_bloop: ProcessedBloop = (&bloop).into();
166        self.bloop_provider.add(Arc::new(bloop));
167        let _ = self.event_tx.send(Event::BloopProcessed(processed_bloop));
168
169        let _ = response.send(ServerMessage::BloopAccepted {
170            achievements: achievement_ids
171                .into_iter()
172                .map(|id| AchievementRecord {
173                    id,
174                    audio_file_hash: self
175                        .achievements
176                        .get(&id)
177                        .unwrap()
178                        .audio_file
179                        .resolve(&self.audio_base_path)
180                        .as_ref()
181                        .map(|file| file.hash),
182                })
183                .collect(),
184        });
185    }
186
187    async fn evaluate_achievements(&mut self, bloop: &Bloop<Player>) -> AwardedTracker {
188        let previous_awarded: HashSet<Uuid> = {
189            let player = bloop.player();
190            player.awarded_achievements().keys().cloned().collect()
191        };
192        let state = self.state.lock().await;
193        let ctx = AchievementContext::new(
194            bloop,
195            &self.bloop_provider,
196            &*state,
197            &mut self.trigger_registry,
198        );
199
200        for achievement in self.achievements.values() {
201            if !previous_awarded.contains(&achievement.id) {
202                achievement.evaluate(&ctx);
203            }
204        }
205
206        ctx.take_awarded_tracker()
207    }
208
209    fn inject_hot_achievements(
210        &mut self,
211        bloop: &Bloop<Player>,
212        awarded_tracker: &mut AwardedTracker,
213    ) {
214        let awarded = awarded_tracker.for_player_mut(bloop.player_id);
215
216        self.hot_achievements.retain(|hot_achievement| {
217            if hot_achievement.until < bloop.recorded_at {
218                return false;
219            }
220
221            if hot_achievement.client_id == bloop.client_id {
222                awarded.insert(hot_achievement.id);
223            }
224
225            true
226        });
227    }
228
229    fn activate_hot_achievements(
230        &mut self,
231        bloop: &Bloop<Player>,
232        awarded_tracker: &AwardedTracker,
233    ) {
234        let Some(awarded) = awarded_tracker.for_player(bloop.player_id) else {
235            return;
236        };
237
238        for achievement_id in awarded {
239            let Some(achievement) = self.achievements.get(achievement_id) else {
240                continue;
241            };
242
243            if let Some(hot_duration) = achievement.hot_duration {
244                self.hot_achievements.push(HotAchievement::new(
245                    *achievement_id,
246                    bloop,
247                    hot_duration,
248                ));
249            };
250        }
251    }
252
253    async fn apply_awarded(&self, tracker: AwardedTracker) {
254        let mut player_registry = self.player_registry.lock().await;
255        let batch: AchievementAwardBatch = tracker.into();
256
257        for player_awards in batch.players.iter() {
258            player_registry.mutate_by_id(player_awards.player_id, |player| {
259                for achievement_id in player_awards.achievement_ids.iter() {
260                    player.add_awarded_achievement(*achievement_id, batch.awarded_at);
261                }
262            });
263        }
264
265        let _ = self.event_tx.send(Event::AchievementsAwarded(batch));
266    }
267
268    #[instrument(skip(self, response))]
269    fn handle_retrieve_audio(&self, id: Uuid, response: oneshot::Sender<ServerMessage>) {
270        let Some(achievement) = self.achievements.get(&id) else {
271            info!("Client requested unknown achievement: {}", id);
272            let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
273            return;
274        };
275
276        let Some(audio_file) = achievement.audio_file.resolve(&self.audio_base_path) else {
277            info!("Client requested audio for audio-less achievement: {}", id);
278            let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
279            return;
280        };
281
282        let path = audio_file.path.clone();
283
284        tokio::spawn(async move {
285            let mut file = match File::open(&path).await {
286                Ok(file) => file,
287                Err(err) => {
288                    warn!("Failed to open file {:?}: {:?}", &path, err);
289                    let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
290                    return;
291                }
292            };
293
294            let mut data = vec![];
295
296            if let Err(err) = file.read_to_end(&mut data).await {
297                warn!("Failed to read file {:?}: {:?}", &path, err);
298                let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
299                return;
300            }
301
302            let _ = response.send(ServerMessage::AudioData { data });
303        });
304    }
305
306    #[instrument(skip(self, response))]
307    fn handle_preload_check(
308        &self,
309        manifest_hash: Option<DataHash>,
310        response: oneshot::Sender<ServerMessage>,
311    ) {
312        if let Some(manifest_hash) = manifest_hash
313            && manifest_hash == self.audio_manifest_hash
314        {
315            let _ = response.send(ServerMessage::PreloadMatch);
316            return;
317        }
318
319        let _ = response.send(ServerMessage::PreloadMismatch {
320            audio_manifest_hash: self.audio_manifest_hash,
321            achievements: self
322                .achievements
323                .values()
324                .map(|achievement| AchievementRecord {
325                    id: achievement.id,
326                    audio_file_hash: achievement
327                        .audio_file
328                        .resolve(&self.audio_base_path)
329                        .as_ref()
330                        .map(|file| file.hash),
331                })
332                .collect(),
333        });
334    }
335}
336
337#[cfg(feature = "tokio-graceful-shutdown")]
338#[derive(Debug, Error)]
339pub enum NeverError {}
340
341#[cfg(feature = "tokio-graceful-shutdown")]
342impl<Metadata, Player, State, Trigger> IntoSubsystem<NeverError>
343    for Engine<Metadata, Player, State, Trigger>
344where
345    Metadata: Send + Sync + 'static,
346    Player: PlayerInfo + PlayerMutator + Send + Sync + 'static,
347    State: Send + Sync + 'static,
348    Trigger: Copy + PartialEq + Eq + Debug + Send + Sync + 'static,
349{
350    async fn run(mut self, subsys: &mut SubsystemHandle) -> Result<(), NeverError> {
351        let _ = self.process_requests().cancel_on_shutdown(subsys).await;
352        Ok(())
353    }
354}
355
356#[derive(Debug, Error)]
357pub enum BuilderError {
358    #[error("missing field: {0}")]
359    MissingField(&'static str),
360}
361
362#[derive(Debug, Default)]
363pub struct EngineBuilder<Player, State = (), Trigger = (), Metadata = ()>
364where
365    Player: PlayerInfo + PlayerMutator,
366    Trigger: Copy,
367{
368    bloops: Vec<Bloop<Player>>,
369    achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
370    bloop_retention: Option<Duration>,
371    audio_base_path: Option<PathBuf>,
372    player_registry: Option<Arc<Mutex<PlayerRegistry<Player>>>>,
373    state: Option<Arc<Mutex<State>>>,
374    trigger_registry: Option<TriggerRegistry<Trigger>>,
375    network_rx: Option<mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>>,
376    event_tx: Option<broadcast::Sender<Event>>,
377    throttle: Option<Throttle>,
378}
379
380impl<Player, State, Trigger, Metadata> EngineBuilder<Player, State, Trigger, Metadata>
381where
382    Player: PlayerInfo + PlayerMutator,
383    State: Default,
384    Trigger: Copy + PartialEq + Eq + Debug,
385{
386    pub fn new() -> Self {
387        Self {
388            bloops: Vec::new(),
389            achievements: Vec::new(),
390            bloop_retention: None,
391            audio_base_path: None,
392            player_registry: None,
393            state: None,
394            trigger_registry: None,
395            network_rx: None,
396            event_tx: None,
397            throttle: None,
398        }
399    }
400
401    pub fn bloops(mut self, bloops: Vec<Bloop<Player>>) -> Self {
402        self.bloops = bloops;
403        self
404    }
405
406    pub fn achievements(
407        mut self,
408        achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
409    ) -> Self {
410        self.achievements = achievements;
411        self
412    }
413
414    pub fn bloop_retention(mut self, retention: Duration) -> Self {
415        self.bloop_retention = Some(retention);
416        self
417    }
418
419    pub fn audio_base_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
420        self.audio_base_path = Some(path.into());
421        self
422    }
423
424    pub fn player_registry(mut self, registry: Arc<Mutex<PlayerRegistry<Player>>>) -> Self {
425        self.player_registry = Some(registry);
426        self
427    }
428
429    pub fn state(mut self, state: Arc<Mutex<State>>) -> Self {
430        self.state = Some(state);
431        self
432    }
433
434    pub fn trigger_registry(mut self, registry: TriggerRegistry<Trigger>) -> Self {
435        self.trigger_registry = Some(registry);
436        self
437    }
438
439    pub fn network_rx(
440        mut self,
441        rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
442    ) -> Self {
443        self.network_rx = Some(rx);
444        self
445    }
446
447    pub fn event_tx(mut self, tx: broadcast::Sender<Event>) -> Self {
448        self.event_tx = Some(tx);
449        self
450    }
451
452    pub fn throttle(mut self, throttle: Throttle) -> Self {
453        self.throttle = Some(throttle);
454        self
455    }
456
457    /// Consumes the builder and constructs the Engine.
458    pub fn build(self) -> Result<Engine<Metadata, Player, State, Trigger>, BuilderError> {
459        let bloop_retention = self
460            .bloop_retention
461            .ok_or(BuilderError::MissingField("bloop_retention"))?;
462        let audio_base_path = self
463            .audio_base_path
464            .ok_or(BuilderError::MissingField("audio_base_path"))?;
465        let player_registry = self
466            .player_registry
467            .ok_or(BuilderError::MissingField("player_registry"))?;
468        let network_rx = self
469            .network_rx
470            .ok_or(BuilderError::MissingField("network_rx"))?;
471        let event_tx = self
472            .event_tx
473            .ok_or(BuilderError::MissingField("event_tx"))?;
474
475        let audio_manifest_hash = calculate_manifest_hash(&audio_base_path, &self.achievements);
476        let bloop_provider = BloopProvider::with_bloops(bloop_retention, self.bloops);
477        let achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>> =
478            self.achievements.into_iter().map(|a| (a.id, a)).collect();
479        let state = self
480            .state
481            .unwrap_or_else(|| Arc::new(Mutex::new(Default::default())));
482        let trigger_registry = self
483            .trigger_registry
484            .unwrap_or_else(|| TriggerRegistry::new(HashMap::new()));
485        let hot_achievements = Vec::new();
486
487        Ok(Engine {
488            bloop_provider,
489            achievements,
490            audio_base_path,
491            audio_manifest_hash,
492            player_registry,
493            state,
494            trigger_registry,
495            hot_achievements,
496            network_rx,
497            event_tx,
498            throttle: self.throttle,
499        })
500    }
501}
502
503fn calculate_manifest_hash<Metadata, Player, State, Trigger>(
504    audio_base_path: &Path,
505    achievements: &[Achievement<Metadata, Player, State, Trigger>],
506) -> DataHash {
507    let audio_file_hashes: HashMap<Uuid, DataHash> = achievements
508        .iter()
509        .filter_map(|achievement| {
510            achievement
511                .audio_file
512                .resolve(audio_base_path)
513                .as_ref()
514                .map(|file| (achievement.id, file.hash))
515        })
516        .collect();
517
518    let mut entries: Vec<_> = audio_file_hashes.iter().collect();
519    entries.sort_by_key(|(id, _)| *id);
520    let mut hash_input = Vec::with_capacity(entries.len() * 32);
521
522    for (id, hash) in entries {
523        hash_input.extend(id.as_bytes());
524        hash_input.extend_from_slice(hash.as_bytes());
525    }
526
527    let manifest_hash = md5::compute(hash_input);
528    manifest_hash.into()
529}
530
531#[cfg(test)]
532mod tests {
533    use super::*;
534    use crate::test_utils::MockPlayer;
535    use crate::trigger::{TriggerOccurrence, TriggerSpec};
536
537    fn build_test_engine() -> Engine<(), MockPlayer, (), ()> {
538        let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
539        let trigger_registry = TriggerRegistry::new(HashMap::new());
540
541        EngineBuilder::<MockPlayer, (), (), ()>::new()
542            .bloop_retention(Duration::from_secs(3600))
543            .audio_base_path("./audio")
544            .player_registry(player_registry)
545            .trigger_registry(trigger_registry)
546            .network_rx(mpsc::channel(1).1)
547            .event_tx(broadcast::channel(16).0)
548            .build()
549            .unwrap()
550    }
551
552    #[tokio::test]
553    async fn handle_bloop_rejects_unknown_nfc_uid() {
554        let mut engine = build_test_engine();
555        let unknown_nfc_uid = NfcUid::default();
556        let client_id = "test-client".to_string();
557
558        let (resp_tx, resp_rx) = oneshot::channel();
559        engine
560            .handle_bloop(unknown_nfc_uid, client_id.clone(), resp_tx)
561            .await;
562
563        let response = resp_rx.await.unwrap();
564        match response {
565            ServerMessage::Error(err) => {
566                assert!(matches!(err, ErrorResponse::UnknownNfcUid));
567            }
568            _ => panic!("Expected Error response for unknown NFC UID"),
569        }
570    }
571
572    #[tokio::test]
573    async fn handle_bloop_accepts_known_player() {
574        let mut engine = build_test_engine();
575        let nfc_uid = NfcUid::default();
576        let client_id = "test-client".to_string();
577
578        {
579            let mut registry = engine.player_registry.lock().await;
580            let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
581            registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
582        }
583
584        let (resp_tx, resp_rx) = oneshot::channel();
585        engine
586            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
587            .await;
588
589        let response = resp_rx.await.unwrap();
590        match response {
591            ServerMessage::BloopAccepted { achievements } => {
592                assert!(achievements.is_empty());
593            }
594            _ => panic!("Expected BloopAccepted response"),
595        }
596    }
597
598    #[tokio::test]
599    async fn handle_bloop_activates_trigger_and_responds() {
600        let mut trigger_registry = HashMap::new();
601        trigger_registry.insert(
602            NfcUid::default(),
603            TriggerSpec {
604                trigger: (),
605                global: false,
606                occurrence: TriggerOccurrence::Once,
607            },
608        );
609        let trigger_registry = TriggerRegistry::new(trigger_registry);
610
611        let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
612
613        let (_tx, rx) = mpsc::channel(1);
614        let (evt_tx, _) = broadcast::channel(16);
615
616        let mut engine = EngineBuilder::<MockPlayer, (), (), ()>::new()
617            .bloop_retention(Duration::from_secs(3600))
618            .audio_base_path("./audio")
619            .player_registry(player_registry)
620            .trigger_registry(trigger_registry)
621            .network_rx(rx)
622            .event_tx(evt_tx)
623            .build()
624            .unwrap();
625
626        let nfc_uid = NfcUid::default();
627        let client_id = "client".to_string();
628        let (resp_tx, resp_rx) = oneshot::channel();
629
630        engine
631            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
632            .await;
633
634        let response = resp_rx.await.unwrap();
635        match response {
636            ServerMessage::BloopAccepted { achievements } => {
637                assert!(achievements.is_empty());
638            }
639            _ => panic!("Expected BloopAccepted response"),
640        }
641
642        assert!(
643            engine
644                .trigger_registry
645                .check_active_trigger((), "client", Utc::now())
646        );
647    }
648
649    #[tokio::test]
650    async fn handle_bloop_respects_throttling() {
651        let mut engine = build_test_engine();
652        let nfc_uid = NfcUid::default();
653        let client_id = "test-client".to_string();
654
655        {
656            let mut registry = engine.player_registry.lock().await;
657            let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
658            registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
659        }
660
661        engine.throttle = Some(Throttle::new(1, Duration::from_secs(10)));
662
663        let bloop = Bloop::new(
664            engine
665                .player_registry
666                .lock()
667                .await
668                .get_by_nfc_uid(nfc_uid)
669                .unwrap(),
670            client_id.clone(),
671            Utc::now(),
672        );
673        engine.bloop_provider.add(Arc::new(bloop));
674
675        let (resp_tx, resp_rx) = oneshot::channel();
676        engine
677            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
678            .await;
679
680        let response = resp_rx.await.unwrap();
681        match response {
682            ServerMessage::Error(err) => {
683                assert!(matches!(err, ErrorResponse::NfcUidThrottled));
684            }
685            _ => panic!("Expected throttling error"),
686        }
687    }
688}