bloop_server_framework/
engine.rs

1use crate::achievement::{Achievement, AchievementAwardBatch, AchievementContext, AwardedTracker};
2use crate::bloop::{Bloop, BloopProvider, ProcessedBloop, bloops_since};
3use crate::event::Event;
4use crate::message::{AchievementRecord, DataHash, ErrorResponse, ServerMessage};
5use crate::nfc_uid::NfcUid;
6use crate::player::{PlayerInfo, PlayerMutator, PlayerRegistry};
7use crate::trigger::TriggerRegistry;
8#[cfg(feature = "tokio-graceful-shutdown")]
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use serde::Deserialize;
12use std::collections::{HashMap, HashSet};
13use std::fmt::Debug;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use std::time::Duration;
17use thiserror::Error;
18use tokio::fs::File;
19use tokio::io::AsyncReadExt;
20use tokio::sync::{Mutex, broadcast, mpsc, oneshot};
21#[cfg(feature = "tokio-graceful-shutdown")]
22use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
23use tracing::{info, instrument, warn};
24use uuid::Uuid;
25
26#[derive(Debug)]
27pub enum EngineRequest {
28    Bloop { client_id: String, nfc_uid: NfcUid },
29    RetrieveAudio { id: Uuid },
30    PreloadCheck { manifest_hash: Option<DataHash> },
31}
32
33#[derive(Debug, Deserialize)]
34pub struct Throttle {
35    max_bloops: usize,
36    threshold: Duration,
37}
38
39impl Throttle {
40    pub fn new(max_bloops: usize, threshold: Duration) -> Self {
41        Self {
42            max_bloops,
43            threshold,
44        }
45    }
46}
47
48struct HotAchievement {
49    id: Uuid,
50    client_id: String,
51    until: DateTime<Utc>,
52}
53
54impl HotAchievement {
55    fn new<Player: PlayerInfo>(id: Uuid, bloop: &Bloop<Player>, duration: Duration) -> Self {
56        Self {
57            id,
58            client_id: bloop.client_id.clone(),
59            until: bloop.recorded_at + duration,
60        }
61    }
62}
63
64pub struct Engine<Metadata, Player, State, Trigger>
65where
66    Player: PlayerInfo + PlayerMutator,
67    Trigger: Copy,
68{
69    bloop_provider: BloopProvider<Player>,
70    achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>>,
71    audio_base_path: PathBuf,
72    audio_manifest_hash: DataHash,
73    player_registry: Arc<Mutex<PlayerRegistry<Player>>>,
74    state: Arc<Mutex<State>>,
75    trigger_registry: TriggerRegistry<Trigger>,
76    hot_achievements: Vec<HotAchievement>,
77    network_rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
78    event_tx: broadcast::Sender<Event>,
79    throttle: Option<Throttle>,
80}
81
82impl<Metadata, Player, State, Trigger> Engine<Metadata, Player, State, Trigger>
83where
84    Player: PlayerInfo + PlayerMutator,
85    Trigger: Copy,
86{
87    pub async fn process_requests(&mut self) {
88        while let Some((request, response)) = self.network_rx.recv().await {
89            match request {
90                EngineRequest::Bloop { nfc_uid, client_id } => {
91                    self.handle_bloop(nfc_uid, client_id, response).await;
92                }
93                EngineRequest::RetrieveAudio { id } => {
94                    self.handle_retrieve_audio(id, response);
95                }
96                EngineRequest::PreloadCheck { manifest_hash } => {
97                    self.handle_preload_check(manifest_hash, response);
98                }
99            }
100        }
101    }
102
103    #[instrument(skip(self, response))]
104    async fn handle_bloop(
105        &mut self,
106        nfc_uid: NfcUid,
107        client_id: String,
108        response: oneshot::Sender<ServerMessage>,
109    ) {
110        if self
111            .trigger_registry
112            .try_activate_trigger(nfc_uid, &client_id)
113        {
114            let _ = response.send(ServerMessage::BloopAccepted {
115                achievements: Vec::new(),
116            });
117            return;
118        }
119
120        let player = {
121            let player_registry = self.player_registry.lock().await;
122            let Some(player) = player_registry.get_by_nfc_uid(nfc_uid) else {
123                let _ = response.send(ServerMessage::Error(ErrorResponse::UnknownNfcUid));
124                return;
125            };
126            player
127        };
128
129        if let Some(throttle) = self.throttle.as_ref() {
130            let player_id = player.read().unwrap().id();
131
132            let recent_bloops = self
133                .bloop_provider
134                .for_client(&client_id)
135                .iter()
136                .filter(bloops_since(Utc::now() - throttle.threshold))
137                .take(throttle.max_bloops)
138                .collect::<Vec<_>>();
139
140            if recent_bloops
141                .iter()
142                .all(|bloop| bloop.player_id == player_id)
143                && recent_bloops.len() == throttle.max_bloops
144            {
145                let _ = response.send(ServerMessage::Error(ErrorResponse::NfcUidThrottled));
146                return;
147            }
148        }
149
150        player.write().unwrap().increment_bloops();
151        let bloop = Bloop::new(player.clone(), client_id, Utc::now());
152
153        let mut awarded_tracker = self.evaluate_achievements(&bloop).await;
154        self.activate_hot_achievements(&bloop, &awarded_tracker);
155        self.inject_hot_achievements(&bloop, &mut awarded_tracker);
156
157        let player_registry = self.player_registry.lock().await;
158        awarded_tracker.remove_duplicates(player_registry);
159
160        let achievement_ids: Vec<Uuid> = awarded_tracker
161            .for_player(bloop.player_id)
162            .map_or_else(Vec::new, |set| set.iter().cloned().collect());
163
164        self.apply_awarded(awarded_tracker).await;
165
166        let processed_bloop: ProcessedBloop = (&bloop).into();
167        self.bloop_provider.add(Arc::new(bloop));
168        let _ = self.event_tx.send(Event::BloopProcessed(processed_bloop));
169
170        let _ = response.send(ServerMessage::BloopAccepted {
171            achievements: achievement_ids
172                .into_iter()
173                .map(|id| AchievementRecord {
174                    id,
175                    audio_file_hash: self
176                        .achievements
177                        .get(&id)
178                        .unwrap()
179                        .audio_file
180                        .resolve(&self.audio_base_path)
181                        .as_ref()
182                        .map(|file| file.hash),
183                })
184                .collect(),
185        });
186    }
187
188    async fn evaluate_achievements(&mut self, bloop: &Bloop<Player>) -> AwardedTracker {
189        let previous_awarded: HashSet<Uuid> = {
190            let player = bloop.player();
191            player.awarded_achievements().keys().cloned().collect()
192        };
193        let state = self.state.lock().await;
194        let ctx = AchievementContext::new(
195            bloop,
196            &self.bloop_provider,
197            &*state,
198            &mut self.trigger_registry,
199        );
200
201        for achievement in self.achievements.values() {
202            if !previous_awarded.contains(&achievement.id) {
203                achievement.evaluate(&ctx);
204            }
205        }
206
207        ctx.take_awarded_tracker()
208    }
209
210    fn inject_hot_achievements(
211        &mut self,
212        bloop: &Bloop<Player>,
213        awarded_tracker: &mut AwardedTracker,
214    ) {
215        let awarded = awarded_tracker.for_player_mut(bloop.player_id);
216
217        self.hot_achievements.retain(|hot_achievement| {
218            if hot_achievement.until < bloop.recorded_at {
219                return false;
220            }
221
222            if hot_achievement.client_id == bloop.client_id {
223                awarded.insert(hot_achievement.id);
224            }
225
226            true
227        });
228    }
229
230    fn activate_hot_achievements(
231        &mut self,
232        bloop: &Bloop<Player>,
233        awarded_tracker: &AwardedTracker,
234    ) {
235        let Some(awarded) = awarded_tracker.for_player(bloop.player_id) else {
236            return;
237        };
238
239        for achievement_id in awarded {
240            let Some(achievement) = self.achievements.get(achievement_id) else {
241                continue;
242            };
243
244            if let Some(hot_duration) = achievement.hot_duration {
245                self.hot_achievements.push(HotAchievement::new(
246                    *achievement_id,
247                    bloop,
248                    hot_duration,
249                ));
250            };
251        }
252    }
253
254    async fn apply_awarded(&self, tracker: AwardedTracker) {
255        let mut player_registry = self.player_registry.lock().await;
256        let batch: AchievementAwardBatch = tracker.into();
257
258        for player_awards in batch.players.iter() {
259            player_registry.mutate_by_id(player_awards.player_id, |player| {
260                for achievement_id in player_awards.achievement_ids.iter() {
261                    player.add_awarded_achievement(*achievement_id, batch.awarded_at);
262                }
263            });
264        }
265
266        let _ = self.event_tx.send(Event::AchievementsAwarded(batch));
267    }
268
269    #[instrument(skip(self, response))]
270    fn handle_retrieve_audio(&self, id: Uuid, response: oneshot::Sender<ServerMessage>) {
271        let Some(achievement) = self.achievements.get(&id) else {
272            info!("Client requested unknown achievement: {}", id);
273            let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
274            return;
275        };
276
277        let Some(audio_file) = achievement.audio_file.resolve(&self.audio_base_path) else {
278            info!("Client requested audio for audio-less achievement: {}", id);
279            let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
280            return;
281        };
282
283        let path = audio_file.path.clone();
284
285        tokio::spawn(async move {
286            let mut file = match File::open(&path).await {
287                Ok(file) => file,
288                Err(err) => {
289                    warn!("Failed to open file {:?}: {:?}", &path, err);
290                    let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
291                    return;
292                }
293            };
294
295            let mut data = vec![];
296
297            if let Err(err) = file.read_to_end(&mut data).await {
298                warn!("Failed to read file {:?}: {:?}", &path, err);
299                let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
300                return;
301            }
302
303            let _ = response.send(ServerMessage::AudioData { data });
304        });
305    }
306
307    #[instrument(skip(self, response))]
308    fn handle_preload_check(
309        &self,
310        manifest_hash: Option<DataHash>,
311        response: oneshot::Sender<ServerMessage>,
312    ) {
313        if let Some(manifest_hash) = manifest_hash
314            && manifest_hash == self.audio_manifest_hash
315        {
316            let _ = response.send(ServerMessage::PreloadMatch);
317            return;
318        }
319
320        let _ = response.send(ServerMessage::PreloadMismatch {
321            audio_manifest_hash: self.audio_manifest_hash,
322            achievements: self
323                .achievements
324                .values()
325                .map(|achievement| AchievementRecord {
326                    id: achievement.id,
327                    audio_file_hash: achievement
328                        .audio_file
329                        .resolve(&self.audio_base_path)
330                        .as_ref()
331                        .map(|file| file.hash),
332                })
333                .collect(),
334        });
335    }
336}
337
338#[cfg(feature = "tokio-graceful-shutdown")]
339#[derive(Debug, Error)]
340pub enum NeverError {}
341
342#[cfg(feature = "tokio-graceful-shutdown")]
343#[async_trait]
344impl<Metadata, Player, State, Trigger> IntoSubsystem<NeverError>
345    for Engine<Metadata, Player, State, Trigger>
346where
347    Metadata: Send + Sync + 'static,
348    Player: PlayerInfo + PlayerMutator + Send + Sync + 'static,
349    State: Send + Sync + 'static,
350    Trigger: Copy + PartialEq + Eq + Debug + Send + Sync + 'static,
351{
352    async fn run(mut self, subsys: SubsystemHandle) -> Result<(), NeverError> {
353        let _ = self.process_requests().cancel_on_shutdown(&subsys).await;
354        Ok(())
355    }
356}
357
358#[derive(Debug, Error)]
359pub enum BuilderError {
360    #[error("missing field: {0}")]
361    MissingField(&'static str),
362}
363
364#[derive(Debug, Default)]
365pub struct EngineBuilder<Player, State = (), Trigger = (), Metadata = ()>
366where
367    Player: PlayerInfo + PlayerMutator,
368    Trigger: Copy,
369{
370    bloops: Vec<Bloop<Player>>,
371    achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
372    bloop_retention: Option<Duration>,
373    audio_base_path: Option<PathBuf>,
374    player_registry: Option<Arc<Mutex<PlayerRegistry<Player>>>>,
375    state: Option<Arc<Mutex<State>>>,
376    trigger_registry: Option<TriggerRegistry<Trigger>>,
377    network_rx: Option<mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>>,
378    event_tx: Option<broadcast::Sender<Event>>,
379    throttle: Option<Throttle>,
380}
381
382impl<Player, State, Trigger, Metadata> EngineBuilder<Player, State, Trigger, Metadata>
383where
384    Player: PlayerInfo + PlayerMutator,
385    State: Default,
386    Trigger: Copy + PartialEq + Eq + Debug,
387{
388    pub fn new() -> Self {
389        Self {
390            bloops: Vec::new(),
391            achievements: Vec::new(),
392            bloop_retention: None,
393            audio_base_path: None,
394            player_registry: None,
395            state: None,
396            trigger_registry: None,
397            network_rx: None,
398            event_tx: None,
399            throttle: None,
400        }
401    }
402
403    pub fn bloops(mut self, bloops: Vec<Bloop<Player>>) -> Self {
404        self.bloops = bloops;
405        self
406    }
407
408    pub fn achievements(
409        mut self,
410        achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
411    ) -> Self {
412        self.achievements = achievements;
413        self
414    }
415
416    pub fn bloop_retention(mut self, retention: Duration) -> Self {
417        self.bloop_retention = Some(retention);
418        self
419    }
420
421    pub fn audio_base_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
422        self.audio_base_path = Some(path.into());
423        self
424    }
425
426    pub fn player_registry(mut self, registry: Arc<Mutex<PlayerRegistry<Player>>>) -> Self {
427        self.player_registry = Some(registry);
428        self
429    }
430
431    pub fn state(mut self, state: Arc<Mutex<State>>) -> Self {
432        self.state = Some(state);
433        self
434    }
435
436    pub fn trigger_registry(mut self, registry: TriggerRegistry<Trigger>) -> Self {
437        self.trigger_registry = Some(registry);
438        self
439    }
440
441    pub fn network_rx(
442        mut self,
443        rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
444    ) -> Self {
445        self.network_rx = Some(rx);
446        self
447    }
448
449    pub fn event_tx(mut self, tx: broadcast::Sender<Event>) -> Self {
450        self.event_tx = Some(tx);
451        self
452    }
453
454    pub fn throttle(mut self, throttle: Throttle) -> Self {
455        self.throttle = Some(throttle);
456        self
457    }
458
459    /// Consumes the builder and constructs the Engine.
460    pub fn build(self) -> Result<Engine<Metadata, Player, State, Trigger>, BuilderError> {
461        let bloop_retention = self
462            .bloop_retention
463            .ok_or(BuilderError::MissingField("bloop_retention"))?;
464        let audio_base_path = self
465            .audio_base_path
466            .ok_or(BuilderError::MissingField("audio_base_path"))?;
467        let player_registry = self
468            .player_registry
469            .ok_or(BuilderError::MissingField("player_registry"))?;
470        let network_rx = self
471            .network_rx
472            .ok_or(BuilderError::MissingField("network_rx"))?;
473        let event_tx = self
474            .event_tx
475            .ok_or(BuilderError::MissingField("event_tx"))?;
476
477        let audio_manifest_hash = calculate_manifest_hash(&audio_base_path, &self.achievements);
478        let bloop_provider = BloopProvider::with_bloops(bloop_retention, self.bloops);
479        let achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>> =
480            self.achievements.into_iter().map(|a| (a.id, a)).collect();
481        let state = self
482            .state
483            .unwrap_or_else(|| Arc::new(Mutex::new(Default::default())));
484        let trigger_registry = self
485            .trigger_registry
486            .unwrap_or_else(|| TriggerRegistry::new(HashMap::new()));
487        let hot_achievements = Vec::new();
488
489        Ok(Engine {
490            bloop_provider,
491            achievements,
492            audio_base_path,
493            audio_manifest_hash,
494            player_registry,
495            state,
496            trigger_registry,
497            hot_achievements,
498            network_rx,
499            event_tx,
500            throttle: self.throttle,
501        })
502    }
503}
504
505fn calculate_manifest_hash<Metadata, Player, State, Trigger>(
506    audio_base_path: &Path,
507    achievements: &[Achievement<Metadata, Player, State, Trigger>],
508) -> DataHash {
509    let audio_file_hashes: HashMap<Uuid, DataHash> = achievements
510        .iter()
511        .filter_map(|achievement| {
512            achievement
513                .audio_file
514                .resolve(audio_base_path)
515                .as_ref()
516                .map(|file| (achievement.id, file.hash))
517        })
518        .collect();
519
520    let mut entries: Vec<_> = audio_file_hashes.iter().collect();
521    entries.sort_by_key(|(id, _)| *id);
522    let mut hash_input = Vec::with_capacity(entries.len() * 32);
523
524    for (id, hash) in entries {
525        hash_input.extend(id.as_bytes());
526        hash_input.extend_from_slice(hash.as_bytes());
527    }
528
529    let manifest_hash = md5::compute(hash_input);
530    manifest_hash.into()
531}
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536    use crate::test_utils::MockPlayer;
537    use crate::trigger::{TriggerOccurrence, TriggerSpec};
538
539    fn build_test_engine() -> Engine<(), MockPlayer, (), ()> {
540        let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
541        let trigger_registry = TriggerRegistry::new(HashMap::new());
542
543        EngineBuilder::<MockPlayer, (), (), ()>::new()
544            .bloop_retention(Duration::from_secs(3600))
545            .audio_base_path("./audio")
546            .player_registry(player_registry)
547            .trigger_registry(trigger_registry)
548            .network_rx(mpsc::channel(1).1)
549            .event_tx(broadcast::channel(16).0)
550            .build()
551            .unwrap()
552    }
553
554    #[tokio::test]
555    async fn handle_bloop_rejects_unknown_nfc_uid() {
556        let mut engine = build_test_engine();
557        let unknown_nfc_uid = NfcUid::default();
558        let client_id = "test-client".to_string();
559
560        let (resp_tx, resp_rx) = oneshot::channel();
561        engine
562            .handle_bloop(unknown_nfc_uid, client_id.clone(), resp_tx)
563            .await;
564
565        let response = resp_rx.await.unwrap();
566        match response {
567            ServerMessage::Error(err) => {
568                assert!(matches!(err, ErrorResponse::UnknownNfcUid));
569            }
570            _ => panic!("Expected Error response for unknown NFC UID"),
571        }
572    }
573
574    #[tokio::test]
575    async fn handle_bloop_accepts_known_player() {
576        let mut engine = build_test_engine();
577        let nfc_uid = NfcUid::default();
578        let client_id = "test-client".to_string();
579
580        {
581            let mut registry = engine.player_registry.lock().await;
582            let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
583            registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
584        }
585
586        let (resp_tx, resp_rx) = oneshot::channel();
587        engine
588            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
589            .await;
590
591        let response = resp_rx.await.unwrap();
592        match response {
593            ServerMessage::BloopAccepted { achievements } => {
594                assert!(achievements.is_empty());
595            }
596            _ => panic!("Expected BloopAccepted response"),
597        }
598    }
599
600    #[tokio::test]
601    async fn handle_bloop_activates_trigger_and_responds() {
602        let mut trigger_registry = HashMap::new();
603        trigger_registry.insert(
604            NfcUid::default(),
605            TriggerSpec {
606                trigger: (),
607                global: false,
608                occurrence: TriggerOccurrence::Once,
609            },
610        );
611        let trigger_registry = TriggerRegistry::new(trigger_registry);
612
613        let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
614
615        let (_tx, rx) = mpsc::channel(1);
616        let (evt_tx, _) = broadcast::channel(16);
617
618        let mut engine = EngineBuilder::<MockPlayer, (), (), ()>::new()
619            .bloop_retention(Duration::from_secs(3600))
620            .audio_base_path("./audio")
621            .player_registry(player_registry)
622            .trigger_registry(trigger_registry)
623            .network_rx(rx)
624            .event_tx(evt_tx)
625            .build()
626            .unwrap();
627
628        let nfc_uid = NfcUid::default();
629        let client_id = "client".to_string();
630        let (resp_tx, resp_rx) = oneshot::channel();
631
632        engine
633            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
634            .await;
635
636        let response = resp_rx.await.unwrap();
637        match response {
638            ServerMessage::BloopAccepted { achievements } => {
639                assert!(achievements.is_empty());
640            }
641            _ => panic!("Expected BloopAccepted response"),
642        }
643
644        assert!(
645            engine
646                .trigger_registry
647                .check_active_trigger((), "client", Utc::now())
648        );
649    }
650
651    #[tokio::test]
652    async fn handle_bloop_respects_throttling() {
653        let mut engine = build_test_engine();
654        let nfc_uid = NfcUid::default();
655        let client_id = "test-client".to_string();
656
657        {
658            let mut registry = engine.player_registry.lock().await;
659            let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
660            registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
661        }
662
663        engine.throttle = Some(Throttle::new(1, Duration::from_secs(10)));
664
665        let bloop = Bloop::new(
666            engine
667                .player_registry
668                .lock()
669                .await
670                .get_by_nfc_uid(nfc_uid)
671                .unwrap(),
672            client_id.clone(),
673            Utc::now(),
674        );
675        engine.bloop_provider.add(Arc::new(bloop));
676
677        let (resp_tx, resp_rx) = oneshot::channel();
678        engine
679            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
680            .await;
681
682        let response = resp_rx.await.unwrap();
683        match response {
684            ServerMessage::Error(err) => {
685                assert!(matches!(err, ErrorResponse::NfcUidThrottled));
686            }
687            _ => panic!("Expected throttling error"),
688        }
689    }
690}