bloop_server_framework/
engine.rs

1use crate::achievement::{Achievement, AchievementAwardBatch, AchievementContext, AwardedTracker};
2use crate::bloop::{Bloop, BloopProvider, ProcessedBloop, bloops_since};
3use crate::event::Event;
4use crate::message::{AchievementRecord, DataHash, ErrorResponse, ServerMessage};
5use crate::nfc_uid::NfcUid;
6use crate::player::{PlayerInfo, PlayerMutator, PlayerRegistry};
7use crate::trigger::TriggerRegistry;
8#[cfg(feature = "tokio-graceful-shutdown")]
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use serde::Deserialize;
12use std::collections::{HashMap, HashSet};
13use std::fmt::Debug;
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::Duration;
18use thiserror::Error;
19use tokio::fs::File;
20use tokio::io::AsyncReadExt;
21use tokio::sync::{Mutex, broadcast, mpsc, oneshot};
22#[cfg(feature = "tokio-graceful-shutdown")]
23use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
24use tracing::{info, instrument, warn};
25use uuid::Uuid;
26
27#[derive(Debug)]
28pub enum EngineRequest {
29    Bloop { client_id: String, nfc_uid: NfcUid },
30    RetrieveAudio { id: Uuid },
31    PreloadCheck { manifest_hash: Option<DataHash> },
32}
33
34#[derive(Debug, Deserialize)]
35pub struct Throttle {
36    max_bloops: usize,
37    threshold: Duration,
38}
39
40impl Throttle {
41    pub fn new(max_bloops: usize, threshold: Duration) -> Self {
42        Self {
43            max_bloops,
44            threshold,
45        }
46    }
47}
48
49struct HotAchievement {
50    id: Uuid,
51    client_id: String,
52    until: DateTime<Utc>,
53}
54
55impl HotAchievement {
56    fn new<Player: PlayerInfo>(id: Uuid, bloop: &Bloop<Player>, duration: Duration) -> Self {
57        Self {
58            id,
59            client_id: bloop.client_id.clone(),
60            until: bloop.recorded_at + duration,
61        }
62    }
63}
64
65pub struct Engine<Metadata, Player, State, Trigger>
66where
67    Player: PlayerInfo + PlayerMutator,
68    Trigger: Copy,
69{
70    bloop_provider: BloopProvider<Player>,
71    achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>>,
72    audio_base_path: PathBuf,
73    audio_file_hashes: HashMap<Uuid, DataHash>,
74    audio_manifest_hash: DataHash,
75    player_registry: Arc<Mutex<PlayerRegistry<Player>>>,
76    state: Arc<Mutex<State>>,
77    trigger_registry: TriggerRegistry<Trigger>,
78    hot_achievements: Vec<HotAchievement>,
79    network_rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
80    event_tx: broadcast::Sender<Event>,
81    throttle: Option<Throttle>,
82}
83
84impl<Metadata, Player, State, Trigger> Engine<Metadata, Player, State, Trigger>
85where
86    Player: PlayerInfo + PlayerMutator,
87    Trigger: Copy,
88{
89    pub async fn process_requests(&mut self) {
90        while let Some((request, response)) = self.network_rx.recv().await {
91            match request {
92                EngineRequest::Bloop { nfc_uid, client_id } => {
93                    self.handle_bloop(nfc_uid, client_id, response).await;
94                }
95                EngineRequest::RetrieveAudio { id } => {
96                    self.handle_retrieve_audio(id, response);
97                }
98                EngineRequest::PreloadCheck { manifest_hash } => {
99                    self.handle_preload_check(manifest_hash, response);
100                }
101            }
102        }
103    }
104
105    #[instrument(skip(self, response))]
106    async fn handle_bloop(
107        &mut self,
108        nfc_uid: NfcUid,
109        client_id: String,
110        response: oneshot::Sender<ServerMessage>,
111    ) {
112        if self
113            .trigger_registry
114            .try_activate_trigger(nfc_uid, &client_id)
115        {
116            let _ = response.send(ServerMessage::BloopAccepted {
117                achievements: Vec::new(),
118            });
119            return;
120        }
121
122        let player = {
123            let player_registry = self.player_registry.lock().await;
124            let Some(player) = player_registry.get_by_nfc_uid(nfc_uid) else {
125                let _ = response.send(ServerMessage::Error(ErrorResponse::UnknownNfcUid));
126                return;
127            };
128            player
129        };
130
131        if let Some(throttle) = self.throttle.as_ref() {
132            let player_id = player.read().unwrap().id();
133
134            let recent_bloops = self
135                .bloop_provider
136                .for_client(&client_id)
137                .iter()
138                .filter(bloops_since(Utc::now() - throttle.threshold))
139                .take(throttle.max_bloops)
140                .collect::<Vec<_>>();
141
142            if recent_bloops
143                .iter()
144                .all(|bloop| bloop.player_id == player_id)
145                && recent_bloops.len() == throttle.max_bloops
146            {
147                let _ = response.send(ServerMessage::Error(ErrorResponse::NfcUidThrottled));
148                return;
149            }
150        }
151
152        player.write().unwrap().increment_bloops();
153        let bloop = Bloop::new(player.clone(), client_id, Utc::now());
154
155        let mut awarded_tracker = self.evaluate_achievements(&bloop).await;
156        self.activate_hot_achievements(&bloop, &awarded_tracker);
157        self.inject_hot_achievements(&bloop, &mut awarded_tracker);
158
159        let player_registry = self.player_registry.lock().await;
160        awarded_tracker.remove_duplicates(player_registry);
161
162        let achievement_ids: Vec<Uuid> = awarded_tracker
163            .for_player(bloop.player_id)
164            .map_or_else(Vec::new, |set| set.iter().cloned().collect());
165
166        self.apply_awarded(awarded_tracker).await;
167
168        let processed_bloop: ProcessedBloop = (&bloop).into();
169        self.bloop_provider.add(Arc::new(bloop));
170        let _ = self.event_tx.send(Event::BloopProcessed(processed_bloop));
171
172        let _ = response.send(ServerMessage::BloopAccepted {
173            achievements: achievement_ids
174                .into_iter()
175                .map(|id| AchievementRecord {
176                    id,
177                    audio_file_hash: self.audio_file_hashes.get(&id).cloned(),
178                })
179                .collect(),
180        });
181    }
182
183    async fn evaluate_achievements(&mut self, bloop: &Bloop<Player>) -> AwardedTracker {
184        let previous_awarded: HashSet<Uuid> = {
185            let player = bloop.player();
186            player.awarded_achievements().keys().cloned().collect()
187        };
188        let metadata = self.state.lock().await;
189        let ctx = AchievementContext::new(
190            bloop,
191            &self.bloop_provider,
192            &*metadata,
193            &mut self.trigger_registry,
194        );
195
196        for achievement in self.achievements.values() {
197            if !previous_awarded.contains(&achievement.id) {
198                achievement.evaluate(&ctx);
199            }
200        }
201
202        ctx.take_awarded_tracker()
203    }
204
205    fn inject_hot_achievements(
206        &mut self,
207        bloop: &Bloop<Player>,
208        awarded_tracker: &mut AwardedTracker,
209    ) {
210        let awarded = awarded_tracker.for_player_mut(bloop.player_id);
211
212        self.hot_achievements.retain(|hot_achievement| {
213            if hot_achievement.until < bloop.recorded_at {
214                return false;
215            }
216
217            if hot_achievement.client_id == bloop.client_id {
218                awarded.insert(hot_achievement.id);
219            }
220
221            true
222        });
223    }
224
225    fn activate_hot_achievements(
226        &mut self,
227        bloop: &Bloop<Player>,
228        awarded_tracker: &AwardedTracker,
229    ) {
230        let Some(awarded) = awarded_tracker.for_player(bloop.player_id) else {
231            return;
232        };
233
234        for achievement_id in awarded {
235            let Some(achievement) = self.achievements.get(achievement_id) else {
236                continue;
237            };
238
239            if let Some(hot_duration) = achievement.hot_duration {
240                self.hot_achievements.push(HotAchievement::new(
241                    *achievement_id,
242                    bloop,
243                    hot_duration,
244                ));
245            };
246        }
247    }
248
249    async fn apply_awarded(&self, tracker: AwardedTracker) {
250        let mut player_registry = self.player_registry.lock().await;
251        let batch: AchievementAwardBatch = tracker.into();
252
253        for player_awards in batch.players.iter() {
254            player_registry.mutate_by_id(player_awards.player_id, |player| {
255                for achievement_id in player_awards.achievement_ids.iter() {
256                    player.add_awarded_achievement(*achievement_id, batch.awarded_at);
257                }
258            });
259        }
260
261        let _ = self.event_tx.send(Event::AchievementsAwarded(batch));
262    }
263
264    #[instrument(skip(self, response))]
265    fn handle_retrieve_audio(&self, id: Uuid, response: oneshot::Sender<ServerMessage>) {
266        let Some(achievement) = self.achievements.get(&id) else {
267            info!("Client requested unknown achievement: {}", id);
268            let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
269            return;
270        };
271
272        let Some(path) = achievement.audio_path.as_ref() else {
273            info!("Client requested audio for audio-less achievement: {}", id);
274            let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
275            return;
276        };
277        let path = self.audio_base_path.join(path);
278
279        tokio::spawn(async move {
280            let mut file = match File::open(&path).await {
281                Ok(file) => file,
282                Err(err) => {
283                    warn!("Failed to open file {:?}: {:?}", &path, err);
284                    let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
285                    return;
286                }
287            };
288
289            let mut data = vec![];
290
291            if let Err(err) = file.read_to_end(&mut data).await {
292                warn!("Failed to read file {:?}: {:?}", &path, err);
293                let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
294                return;
295            }
296
297            let _ = response.send(ServerMessage::AudioData { data });
298        });
299    }
300
301    #[instrument(skip(self, response))]
302    fn handle_preload_check(
303        &self,
304        manifest_hash: Option<DataHash>,
305        response: oneshot::Sender<ServerMessage>,
306    ) {
307        if let Some(manifest_hash) = manifest_hash {
308            if manifest_hash == self.audio_manifest_hash {
309                let _ = response.send(ServerMessage::PreloadMatch);
310                return;
311            }
312        }
313
314        let _ = response.send(ServerMessage::PreloadMismatch {
315            audio_manifest_hash: self.audio_manifest_hash,
316            achievements: self
317                .achievements
318                .values()
319                .map(|achievement| AchievementRecord {
320                    id: achievement.id,
321                    audio_file_hash: self.audio_file_hashes.get(&achievement.id).cloned(),
322                })
323                .collect(),
324        });
325    }
326}
327
328#[cfg(feature = "tokio-graceful-shutdown")]
329#[derive(Debug, Error)]
330pub enum NeverError {}
331
332#[cfg(feature = "tokio-graceful-shutdown")]
333#[async_trait]
334impl<Metadata, Player, State, Trigger> IntoSubsystem<NeverError>
335    for Engine<Metadata, Player, State, Trigger>
336where
337    Metadata: Send + Sync + 'static,
338    Player: PlayerInfo + PlayerMutator + Send + Sync + 'static,
339    State: Send + Sync + 'static,
340    Trigger: Copy + PartialEq + Eq + Debug + Send + Sync + 'static,
341{
342    async fn run(mut self, subsys: SubsystemHandle) -> Result<(), NeverError> {
343        let _ = self.process_requests().cancel_on_shutdown(&subsys).await;
344        Ok(())
345    }
346}
347
348#[derive(Debug, Error)]
349pub enum BuilderError {
350    #[error("missing field: {0}")]
351    MissingField(&'static str),
352}
353
354#[derive(Debug, Default)]
355pub struct EngineBuilder<Player, State = (), Trigger = (), Metadata = ()>
356where
357    Player: PlayerInfo + PlayerMutator,
358    Metadata: Default,
359    Trigger: Copy,
360{
361    bloops: Vec<Bloop<Player>>,
362    achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
363    bloop_retention: Option<Duration>,
364    audio_base_path: Option<PathBuf>,
365    player_registry: Option<Arc<Mutex<PlayerRegistry<Player>>>>,
366    state: Option<Arc<Mutex<State>>>,
367    trigger_registry: Option<TriggerRegistry<Trigger>>,
368    network_rx: Option<mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>>,
369    event_tx: Option<broadcast::Sender<Event>>,
370    throttle: Option<Throttle>,
371}
372
373impl<Player, State, Trigger, Metadata> EngineBuilder<Player, State, Trigger, Metadata>
374where
375    Player: PlayerInfo + PlayerMutator,
376    Metadata: Default,
377    State: Default,
378    Trigger: Copy + PartialEq + Eq + Debug,
379{
380    pub fn new() -> Self {
381        Self {
382            bloops: Vec::new(),
383            achievements: Vec::new(),
384            bloop_retention: None,
385            audio_base_path: None,
386            player_registry: None,
387            state: None,
388            trigger_registry: None,
389            network_rx: None,
390            event_tx: None,
391            throttle: None,
392        }
393    }
394
395    pub fn bloops(mut self, bloops: Vec<Bloop<Player>>) -> Self {
396        self.bloops = bloops;
397        self
398    }
399
400    pub fn achievements(
401        mut self,
402        achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
403    ) -> Self {
404        self.achievements = achievements;
405        self
406    }
407
408    pub fn bloop_retention(mut self, retention: Duration) -> Self {
409        self.bloop_retention = Some(retention);
410        self
411    }
412
413    pub fn audio_base_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
414        self.audio_base_path = Some(path.into());
415        self
416    }
417
418    pub fn player_registry(mut self, registry: Arc<Mutex<PlayerRegistry<Player>>>) -> Self {
419        self.player_registry = Some(registry);
420        self
421    }
422
423    pub fn metadata(mut self, metadata: Arc<Mutex<State>>) -> Self {
424        self.state = Some(metadata);
425        self
426    }
427
428    pub fn trigger_registry(mut self, registry: TriggerRegistry<Trigger>) -> Self {
429        self.trigger_registry = Some(registry);
430        self
431    }
432
433    pub fn network_rx(
434        mut self,
435        rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
436    ) -> Self {
437        self.network_rx = Some(rx);
438        self
439    }
440
441    pub fn event_tx(mut self, tx: broadcast::Sender<Event>) -> Self {
442        self.event_tx = Some(tx);
443        self
444    }
445
446    pub fn throttle(mut self, throttle: Throttle) -> Self {
447        self.throttle = Some(throttle);
448        self
449    }
450
451    /// Consumes the builder and constructs the Engine.
452    pub fn build(self) -> Result<Engine<Metadata, Player, State, Trigger>, BuilderError> {
453        let bloop_retention = self
454            .bloop_retention
455            .ok_or(BuilderError::MissingField("bloop_retention"))?;
456        let audio_base_path = self
457            .audio_base_path
458            .ok_or(BuilderError::MissingField("audio_base_path"))?;
459        let player_registry = self
460            .player_registry
461            .ok_or(BuilderError::MissingField("player_registry"))?;
462        let network_rx = self
463            .network_rx
464            .ok_or(BuilderError::MissingField("network_rx"))?;
465        let event_tx = self
466            .event_tx
467            .ok_or(BuilderError::MissingField("event_tx"))?;
468
469        let (audio_hashes, global_hash) =
470            collect_audio_hashes(&audio_base_path, &self.achievements);
471        let bloop_provider = BloopProvider::with_bloops(bloop_retention, self.bloops);
472        let achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>> =
473            self.achievements.into_iter().map(|a| (a.id, a)).collect();
474        let state = self
475            .state
476            .unwrap_or_else(|| Arc::new(Mutex::new(Default::default())));
477        let trigger_registry = self
478            .trigger_registry
479            .unwrap_or_else(|| TriggerRegistry::new(HashMap::new()));
480        let hot_achievements = Vec::new();
481
482        Ok(Engine {
483            bloop_provider,
484            achievements,
485            audio_base_path,
486            audio_file_hashes: audio_hashes,
487            audio_manifest_hash: global_hash,
488            player_registry,
489            state,
490            trigger_registry,
491            hot_achievements,
492            network_rx,
493            event_tx,
494            throttle: self.throttle,
495        })
496    }
497}
498
499fn collect_audio_hashes<Metadata, Player, State, Trigger>(
500    audio_base_path: &Path,
501    achievements: &[Achievement<Metadata, Player, State, Trigger>],
502) -> (HashMap<Uuid, DataHash>, DataHash) {
503    let audio_file_hashes: HashMap<Uuid, DataHash> = achievements
504        .iter()
505        .filter_map(|achievement| {
506            let path = achievement.audio_path.as_ref()?;
507            let path = audio_base_path.join(path);
508
509            let Ok(file_content) = fs::read(path.clone()) else {
510                warn!("Audio file missing: {:?}", path);
511                return None;
512            };
513
514            let digest = md5::compute(file_content);
515
516            Some((achievement.id, digest.into()))
517        })
518        .collect();
519
520    let mut entries: Vec<_> = audio_file_hashes.iter().collect();
521    entries.sort_by_key(|(id, _)| *id);
522    let mut hash_input = Vec::with_capacity(entries.len() * 32);
523
524    for (id, hash) in entries {
525        hash_input.extend(id.as_bytes());
526        hash_input.extend_from_slice(hash.as_bytes());
527    }
528
529    let manifest_hash = md5::compute(hash_input);
530
531    (audio_file_hashes, manifest_hash.into())
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537    use crate::test_utils::MockPlayer;
538    use crate::trigger::{TriggerOccurrence, TriggerSpec};
539
540    fn build_test_engine() -> Engine<(), MockPlayer, (), ()> {
541        let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
542        let trigger_registry = TriggerRegistry::new(HashMap::new());
543
544        EngineBuilder::<MockPlayer, (), (), ()>::new()
545            .bloop_retention(Duration::from_secs(3600))
546            .audio_base_path("./audio")
547            .player_registry(player_registry)
548            .trigger_registry(trigger_registry)
549            .network_rx(mpsc::channel(1).1)
550            .event_tx(broadcast::channel(16).0)
551            .build()
552            .unwrap()
553    }
554
555    #[tokio::test]
556    async fn handle_bloop_rejects_unknown_nfc_uid() {
557        let mut engine = build_test_engine();
558        let unknown_nfc_uid = NfcUid::default();
559        let client_id = "test-client".to_string();
560
561        let (resp_tx, resp_rx) = oneshot::channel();
562        engine
563            .handle_bloop(unknown_nfc_uid, client_id.clone(), resp_tx)
564            .await;
565
566        let response = resp_rx.await.unwrap();
567        match response {
568            ServerMessage::Error(err) => {
569                assert!(matches!(err, ErrorResponse::UnknownNfcUid));
570            }
571            _ => panic!("Expected Error response for unknown NFC UID"),
572        }
573    }
574
575    #[tokio::test]
576    async fn handle_bloop_accepts_known_player() {
577        let mut engine = build_test_engine();
578        let nfc_uid = NfcUid::default();
579        let client_id = "test-client".to_string();
580
581        {
582            let mut registry = engine.player_registry.lock().await;
583            let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
584            registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
585        }
586
587        let (resp_tx, resp_rx) = oneshot::channel();
588        engine
589            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
590            .await;
591
592        let response = resp_rx.await.unwrap();
593        match response {
594            ServerMessage::BloopAccepted { achievements } => {
595                assert!(achievements.is_empty());
596            }
597            _ => panic!("Expected BloopAccepted response"),
598        }
599    }
600
601    #[tokio::test]
602    async fn handle_bloop_activates_trigger_and_responds() {
603        let mut trigger_registry = HashMap::new();
604        trigger_registry.insert(
605            NfcUid::default(),
606            TriggerSpec {
607                trigger: (),
608                global: false,
609                occurrence: TriggerOccurrence::Once,
610            },
611        );
612        let trigger_registry = TriggerRegistry::new(trigger_registry);
613
614        let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
615
616        let (_tx, rx) = mpsc::channel(1);
617        let (evt_tx, _) = broadcast::channel(16);
618
619        let mut engine = EngineBuilder::<MockPlayer, (), (), ()>::new()
620            .bloop_retention(Duration::from_secs(3600))
621            .audio_base_path("./audio")
622            .player_registry(player_registry)
623            .trigger_registry(trigger_registry)
624            .network_rx(rx)
625            .event_tx(evt_tx)
626            .build()
627            .unwrap();
628
629        let nfc_uid = NfcUid::default();
630        let client_id = "client".to_string();
631        let (resp_tx, resp_rx) = oneshot::channel();
632
633        engine
634            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
635            .await;
636
637        let response = resp_rx.await.unwrap();
638        match response {
639            ServerMessage::BloopAccepted { achievements } => {
640                assert!(achievements.is_empty());
641            }
642            _ => panic!("Expected BloopAccepted response"),
643        }
644
645        assert!(
646            engine
647                .trigger_registry
648                .check_active_trigger((), "client", Utc::now())
649        );
650    }
651
652    #[tokio::test]
653    async fn handle_bloop_respects_throttling() {
654        let mut engine = build_test_engine();
655        let nfc_uid = NfcUid::default();
656        let client_id = "test-client".to_string();
657
658        {
659            let mut registry = engine.player_registry.lock().await;
660            let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
661            registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
662        }
663
664        engine.throttle = Some(Throttle::new(1, Duration::from_secs(10)));
665
666        let bloop = Bloop::new(
667            engine
668                .player_registry
669                .lock()
670                .await
671                .get_by_nfc_uid(nfc_uid)
672                .unwrap(),
673            client_id.clone(),
674            Utc::now(),
675        );
676        engine.bloop_provider.add(Arc::new(bloop));
677
678        let (resp_tx, resp_rx) = oneshot::channel();
679        engine
680            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
681            .await;
682
683        let response = resp_rx.await.unwrap();
684        match response {
685            ServerMessage::Error(err) => {
686                assert!(matches!(err, ErrorResponse::NfcUidThrottled));
687            }
688            _ => panic!("Expected throttling error"),
689        }
690    }
691}