bloop_server_framework/
engine.rs

1use crate::achievement::{Achievement, AchievementAwardBatch, AchievementContext, AwardedTracker};
2use crate::bloop::{Bloop, BloopProvider, ProcessedBloop, bloops_since};
3use crate::event::Event;
4use crate::message::{AchievementRecord, DataHash, ErrorResponse, ServerMessage};
5use crate::nfc_uid::NfcUid;
6use crate::player::{PlayerInfo, PlayerMutator, PlayerRegistry};
7use crate::trigger::TriggerRegistry;
8#[cfg(feature = "tokio-graceful-shutdown")]
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use serde::Deserialize;
12use std::collections::{HashMap, HashSet};
13use std::fmt::Debug;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use std::time::Duration;
17use thiserror::Error;
18use tokio::fs::File;
19use tokio::io::AsyncReadExt;
20use tokio::sync::{Mutex, broadcast, mpsc, oneshot};
21#[cfg(feature = "tokio-graceful-shutdown")]
22use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
23use tracing::{info, instrument, warn};
24use uuid::Uuid;
25
26#[derive(Debug)]
27pub enum EngineRequest {
28    Bloop { client_id: String, nfc_uid: NfcUid },
29    RetrieveAudio { id: Uuid },
30    PreloadCheck { manifest_hash: Option<DataHash> },
31}
32
33#[derive(Debug, Deserialize)]
34pub struct Throttle {
35    max_bloops: usize,
36    threshold: Duration,
37}
38
39impl Throttle {
40    pub fn new(max_bloops: usize, threshold: Duration) -> Self {
41        Self {
42            max_bloops,
43            threshold,
44        }
45    }
46}
47
48struct HotAchievement {
49    id: Uuid,
50    client_id: String,
51    until: DateTime<Utc>,
52}
53
54impl HotAchievement {
55    fn new<Player: PlayerInfo>(id: Uuid, bloop: &Bloop<Player>, duration: Duration) -> Self {
56        Self {
57            id,
58            client_id: bloop.client_id.clone(),
59            until: bloop.recorded_at + duration,
60        }
61    }
62}
63
64pub struct Engine<Metadata, Player, State, Trigger>
65where
66    Player: PlayerInfo + PlayerMutator,
67    Trigger: Copy,
68{
69    bloop_provider: BloopProvider<Player>,
70    achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>>,
71    audio_base_path: PathBuf,
72    audio_manifest_hash: DataHash,
73    player_registry: Arc<Mutex<PlayerRegistry<Player>>>,
74    state: Arc<Mutex<State>>,
75    trigger_registry: TriggerRegistry<Trigger>,
76    hot_achievements: Vec<HotAchievement>,
77    network_rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
78    event_tx: broadcast::Sender<Event>,
79    throttle: Option<Throttle>,
80}
81
82impl<Metadata, Player, State, Trigger> Engine<Metadata, Player, State, Trigger>
83where
84    Player: PlayerInfo + PlayerMutator,
85    Trigger: Copy,
86{
87    pub async fn process_requests(&mut self) {
88        while let Some((request, response)) = self.network_rx.recv().await {
89            match request {
90                EngineRequest::Bloop { nfc_uid, client_id } => {
91                    self.handle_bloop(nfc_uid, client_id, response).await;
92                }
93                EngineRequest::RetrieveAudio { id } => {
94                    self.handle_retrieve_audio(id, response);
95                }
96                EngineRequest::PreloadCheck { manifest_hash } => {
97                    self.handle_preload_check(manifest_hash, response);
98                }
99            }
100        }
101    }
102
103    #[instrument(skip(self, response))]
104    async fn handle_bloop(
105        &mut self,
106        nfc_uid: NfcUid,
107        client_id: String,
108        response: oneshot::Sender<ServerMessage>,
109    ) {
110        if self
111            .trigger_registry
112            .try_activate_trigger(nfc_uid, &client_id)
113        {
114            let _ = response.send(ServerMessage::BloopAccepted {
115                achievements: Vec::new(),
116            });
117            return;
118        }
119
120        let player = {
121            let player_registry = self.player_registry.lock().await;
122            let Some(player) = player_registry.get_by_nfc_uid(nfc_uid) else {
123                let _ = response.send(ServerMessage::Error(ErrorResponse::UnknownNfcUid));
124                return;
125            };
126            player
127        };
128
129        if let Some(throttle) = self.throttle.as_ref() {
130            let player_id = player.read().unwrap().id();
131
132            let recent_bloops = self
133                .bloop_provider
134                .for_client(&client_id)
135                .iter()
136                .filter(bloops_since(Utc::now() - throttle.threshold))
137                .take(throttle.max_bloops)
138                .collect::<Vec<_>>();
139
140            if recent_bloops
141                .iter()
142                .all(|bloop| bloop.player_id == player_id)
143                && recent_bloops.len() == throttle.max_bloops
144            {
145                let _ = response.send(ServerMessage::Error(ErrorResponse::NfcUidThrottled));
146                return;
147            }
148        }
149
150        player.write().unwrap().increment_bloops();
151        let bloop = Bloop::new(player.clone(), client_id, Utc::now());
152
153        let mut awarded_tracker = self.evaluate_achievements(&bloop).await;
154        self.activate_hot_achievements(&bloop, &awarded_tracker);
155        self.inject_hot_achievements(&bloop, &mut awarded_tracker);
156
157        let player_registry = self.player_registry.lock().await;
158        awarded_tracker.remove_duplicates(player_registry);
159
160        let achievement_ids: Vec<Uuid> = awarded_tracker
161            .for_player(bloop.player_id)
162            .map_or_else(Vec::new, |set| set.iter().cloned().collect());
163
164        self.apply_awarded(awarded_tracker).await;
165
166        let processed_bloop: ProcessedBloop = (&bloop).into();
167        self.bloop_provider.add(Arc::new(bloop));
168        let _ = self.event_tx.send(Event::BloopProcessed(processed_bloop));
169
170        let _ = response.send(ServerMessage::BloopAccepted {
171            achievements: achievement_ids
172                .into_iter()
173                .map(|id| AchievementRecord {
174                    id,
175                    audio_file_hash: self
176                        .achievements
177                        .get(&id)
178                        .unwrap()
179                        .audio_file
180                        .resolve(&self.audio_base_path)
181                        .as_ref()
182                        .map(|file| file.hash),
183                })
184                .collect(),
185        });
186    }
187
188    async fn evaluate_achievements(&mut self, bloop: &Bloop<Player>) -> AwardedTracker {
189        let previous_awarded: HashSet<Uuid> = {
190            let player = bloop.player();
191            player.awarded_achievements().keys().cloned().collect()
192        };
193        let state = self.state.lock().await;
194        let ctx = AchievementContext::new(
195            bloop,
196            &self.bloop_provider,
197            &*state,
198            &mut self.trigger_registry,
199        );
200
201        for achievement in self.achievements.values() {
202            if !previous_awarded.contains(&achievement.id) {
203                achievement.evaluate(&ctx);
204            }
205        }
206
207        ctx.take_awarded_tracker()
208    }
209
210    fn inject_hot_achievements(
211        &mut self,
212        bloop: &Bloop<Player>,
213        awarded_tracker: &mut AwardedTracker,
214    ) {
215        let awarded = awarded_tracker.for_player_mut(bloop.player_id);
216
217        self.hot_achievements.retain(|hot_achievement| {
218            if hot_achievement.until < bloop.recorded_at {
219                return false;
220            }
221
222            if hot_achievement.client_id == bloop.client_id {
223                awarded.insert(hot_achievement.id);
224            }
225
226            true
227        });
228    }
229
230    fn activate_hot_achievements(
231        &mut self,
232        bloop: &Bloop<Player>,
233        awarded_tracker: &AwardedTracker,
234    ) {
235        let Some(awarded) = awarded_tracker.for_player(bloop.player_id) else {
236            return;
237        };
238
239        for achievement_id in awarded {
240            let Some(achievement) = self.achievements.get(achievement_id) else {
241                continue;
242            };
243
244            if let Some(hot_duration) = achievement.hot_duration {
245                self.hot_achievements.push(HotAchievement::new(
246                    *achievement_id,
247                    bloop,
248                    hot_duration,
249                ));
250            };
251        }
252    }
253
254    async fn apply_awarded(&self, tracker: AwardedTracker) {
255        let mut player_registry = self.player_registry.lock().await;
256        let batch: AchievementAwardBatch = tracker.into();
257
258        for player_awards in batch.players.iter() {
259            player_registry.mutate_by_id(player_awards.player_id, |player| {
260                for achievement_id in player_awards.achievement_ids.iter() {
261                    player.add_awarded_achievement(*achievement_id, batch.awarded_at);
262                }
263            });
264        }
265
266        let _ = self.event_tx.send(Event::AchievementsAwarded(batch));
267    }
268
269    #[instrument(skip(self, response))]
270    fn handle_retrieve_audio(&self, id: Uuid, response: oneshot::Sender<ServerMessage>) {
271        let Some(achievement) = self.achievements.get(&id) else {
272            info!("Client requested unknown achievement: {}", id);
273            let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
274            return;
275        };
276
277        let Some(audio_file) = achievement.audio_file.resolve(&self.audio_base_path) else {
278            info!("Client requested audio for audio-less achievement: {}", id);
279            let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
280            return;
281        };
282
283        let path = audio_file.path.clone();
284
285        tokio::spawn(async move {
286            let mut file = match File::open(&path).await {
287                Ok(file) => file,
288                Err(err) => {
289                    warn!("Failed to open file {:?}: {:?}", &path, err);
290                    let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
291                    return;
292                }
293            };
294
295            let mut data = vec![];
296
297            if let Err(err) = file.read_to_end(&mut data).await {
298                warn!("Failed to read file {:?}: {:?}", &path, err);
299                let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
300                return;
301            }
302
303            let _ = response.send(ServerMessage::AudioData { data });
304        });
305    }
306
307    #[instrument(skip(self, response))]
308    fn handle_preload_check(
309        &self,
310        manifest_hash: Option<DataHash>,
311        response: oneshot::Sender<ServerMessage>,
312    ) {
313        if let Some(manifest_hash) = manifest_hash
314            && manifest_hash == self.audio_manifest_hash
315        {
316            let _ = response.send(ServerMessage::PreloadMatch);
317            return;
318        }
319
320        let _ = response.send(ServerMessage::PreloadMismatch {
321            audio_manifest_hash: self.audio_manifest_hash,
322            achievements: self
323                .achievements
324                .values()
325                .map(|achievement| AchievementRecord {
326                    id: achievement.id,
327                    audio_file_hash: achievement
328                        .audio_file
329                        .resolve(&self.audio_base_path)
330                        .as_ref()
331                        .map(|file| file.hash),
332                })
333                .collect(),
334        });
335    }
336}
337
338#[cfg(feature = "tokio-graceful-shutdown")]
339#[derive(Debug, Error)]
340pub enum NeverError {}
341
342#[cfg(feature = "tokio-graceful-shutdown")]
343#[async_trait]
344impl<Metadata, Player, State, Trigger> IntoSubsystem<NeverError>
345    for Engine<Metadata, Player, State, Trigger>
346where
347    Metadata: Send + Sync + 'static,
348    Player: PlayerInfo + PlayerMutator + Send + Sync + 'static,
349    State: Send + Sync + 'static,
350    Trigger: Copy + PartialEq + Eq + Debug + Send + Sync + 'static,
351{
352    async fn run(mut self, subsys: SubsystemHandle) -> Result<(), NeverError> {
353        let _ = self.process_requests().cancel_on_shutdown(&subsys).await;
354        Ok(())
355    }
356}
357
358#[derive(Debug, Error)]
359pub enum BuilderError {
360    #[error("missing field: {0}")]
361    MissingField(&'static str),
362}
363
364#[derive(Debug, Default)]
365pub struct EngineBuilder<Player, State = (), Trigger = (), Metadata = ()>
366where
367    Player: PlayerInfo + PlayerMutator,
368    Metadata: Default,
369    Trigger: Copy,
370{
371    bloops: Vec<Bloop<Player>>,
372    achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
373    bloop_retention: Option<Duration>,
374    audio_base_path: Option<PathBuf>,
375    player_registry: Option<Arc<Mutex<PlayerRegistry<Player>>>>,
376    state: Option<Arc<Mutex<State>>>,
377    trigger_registry: Option<TriggerRegistry<Trigger>>,
378    network_rx: Option<mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>>,
379    event_tx: Option<broadcast::Sender<Event>>,
380    throttle: Option<Throttle>,
381}
382
383impl<Player, State, Trigger, Metadata> EngineBuilder<Player, State, Trigger, Metadata>
384where
385    Player: PlayerInfo + PlayerMutator,
386    Metadata: Default,
387    State: Default,
388    Trigger: Copy + PartialEq + Eq + Debug,
389{
390    pub fn new() -> Self {
391        Self {
392            bloops: Vec::new(),
393            achievements: Vec::new(),
394            bloop_retention: None,
395            audio_base_path: None,
396            player_registry: None,
397            state: None,
398            trigger_registry: None,
399            network_rx: None,
400            event_tx: None,
401            throttle: None,
402        }
403    }
404
405    pub fn bloops(mut self, bloops: Vec<Bloop<Player>>) -> Self {
406        self.bloops = bloops;
407        self
408    }
409
410    pub fn achievements(
411        mut self,
412        achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
413    ) -> Self {
414        self.achievements = achievements;
415        self
416    }
417
418    pub fn bloop_retention(mut self, retention: Duration) -> Self {
419        self.bloop_retention = Some(retention);
420        self
421    }
422
423    pub fn audio_base_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
424        self.audio_base_path = Some(path.into());
425        self
426    }
427
428    pub fn player_registry(mut self, registry: Arc<Mutex<PlayerRegistry<Player>>>) -> Self {
429        self.player_registry = Some(registry);
430        self
431    }
432
433    pub fn state(mut self, state: Arc<Mutex<State>>) -> Self {
434        self.state = Some(state);
435        self
436    }
437
438    pub fn trigger_registry(mut self, registry: TriggerRegistry<Trigger>) -> Self {
439        self.trigger_registry = Some(registry);
440        self
441    }
442
443    pub fn network_rx(
444        mut self,
445        rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
446    ) -> Self {
447        self.network_rx = Some(rx);
448        self
449    }
450
451    pub fn event_tx(mut self, tx: broadcast::Sender<Event>) -> Self {
452        self.event_tx = Some(tx);
453        self
454    }
455
456    pub fn throttle(mut self, throttle: Throttle) -> Self {
457        self.throttle = Some(throttle);
458        self
459    }
460
461    /// Consumes the builder and constructs the Engine.
462    pub fn build(self) -> Result<Engine<Metadata, Player, State, Trigger>, BuilderError> {
463        let bloop_retention = self
464            .bloop_retention
465            .ok_or(BuilderError::MissingField("bloop_retention"))?;
466        let audio_base_path = self
467            .audio_base_path
468            .ok_or(BuilderError::MissingField("audio_base_path"))?;
469        let player_registry = self
470            .player_registry
471            .ok_or(BuilderError::MissingField("player_registry"))?;
472        let network_rx = self
473            .network_rx
474            .ok_or(BuilderError::MissingField("network_rx"))?;
475        let event_tx = self
476            .event_tx
477            .ok_or(BuilderError::MissingField("event_tx"))?;
478
479        let audio_manifest_hash = calculate_manifest_hash(&audio_base_path, &self.achievements);
480        let bloop_provider = BloopProvider::with_bloops(bloop_retention, self.bloops);
481        let achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>> =
482            self.achievements.into_iter().map(|a| (a.id, a)).collect();
483        let state = self
484            .state
485            .unwrap_or_else(|| Arc::new(Mutex::new(Default::default())));
486        let trigger_registry = self
487            .trigger_registry
488            .unwrap_or_else(|| TriggerRegistry::new(HashMap::new()));
489        let hot_achievements = Vec::new();
490
491        Ok(Engine {
492            bloop_provider,
493            achievements,
494            audio_base_path,
495            audio_manifest_hash,
496            player_registry,
497            state,
498            trigger_registry,
499            hot_achievements,
500            network_rx,
501            event_tx,
502            throttle: self.throttle,
503        })
504    }
505}
506
507fn calculate_manifest_hash<Metadata, Player, State, Trigger>(
508    audio_base_path: &Path,
509    achievements: &[Achievement<Metadata, Player, State, Trigger>],
510) -> DataHash {
511    let audio_file_hashes: HashMap<Uuid, DataHash> = achievements
512        .iter()
513        .filter_map(|achievement| {
514            achievement
515                .audio_file
516                .resolve(audio_base_path)
517                .as_ref()
518                .map(|file| (achievement.id, file.hash))
519        })
520        .collect();
521
522    let mut entries: Vec<_> = audio_file_hashes.iter().collect();
523    entries.sort_by_key(|(id, _)| *id);
524    let mut hash_input = Vec::with_capacity(entries.len() * 32);
525
526    for (id, hash) in entries {
527        hash_input.extend(id.as_bytes());
528        hash_input.extend_from_slice(hash.as_bytes());
529    }
530
531    let manifest_hash = md5::compute(hash_input);
532    manifest_hash.into()
533}
534
535#[cfg(test)]
536mod tests {
537    use super::*;
538    use crate::test_utils::MockPlayer;
539    use crate::trigger::{TriggerOccurrence, TriggerSpec};
540
541    fn build_test_engine() -> Engine<(), MockPlayer, (), ()> {
542        let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
543        let trigger_registry = TriggerRegistry::new(HashMap::new());
544
545        EngineBuilder::<MockPlayer, (), (), ()>::new()
546            .bloop_retention(Duration::from_secs(3600))
547            .audio_base_path("./audio")
548            .player_registry(player_registry)
549            .trigger_registry(trigger_registry)
550            .network_rx(mpsc::channel(1).1)
551            .event_tx(broadcast::channel(16).0)
552            .build()
553            .unwrap()
554    }
555
556    #[tokio::test]
557    async fn handle_bloop_rejects_unknown_nfc_uid() {
558        let mut engine = build_test_engine();
559        let unknown_nfc_uid = NfcUid::default();
560        let client_id = "test-client".to_string();
561
562        let (resp_tx, resp_rx) = oneshot::channel();
563        engine
564            .handle_bloop(unknown_nfc_uid, client_id.clone(), resp_tx)
565            .await;
566
567        let response = resp_rx.await.unwrap();
568        match response {
569            ServerMessage::Error(err) => {
570                assert!(matches!(err, ErrorResponse::UnknownNfcUid));
571            }
572            _ => panic!("Expected Error response for unknown NFC UID"),
573        }
574    }
575
576    #[tokio::test]
577    async fn handle_bloop_accepts_known_player() {
578        let mut engine = build_test_engine();
579        let nfc_uid = NfcUid::default();
580        let client_id = "test-client".to_string();
581
582        {
583            let mut registry = engine.player_registry.lock().await;
584            let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
585            registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
586        }
587
588        let (resp_tx, resp_rx) = oneshot::channel();
589        engine
590            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
591            .await;
592
593        let response = resp_rx.await.unwrap();
594        match response {
595            ServerMessage::BloopAccepted { achievements } => {
596                assert!(achievements.is_empty());
597            }
598            _ => panic!("Expected BloopAccepted response"),
599        }
600    }
601
602    #[tokio::test]
603    async fn handle_bloop_activates_trigger_and_responds() {
604        let mut trigger_registry = HashMap::new();
605        trigger_registry.insert(
606            NfcUid::default(),
607            TriggerSpec {
608                trigger: (),
609                global: false,
610                occurrence: TriggerOccurrence::Once,
611            },
612        );
613        let trigger_registry = TriggerRegistry::new(trigger_registry);
614
615        let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
616
617        let (_tx, rx) = mpsc::channel(1);
618        let (evt_tx, _) = broadcast::channel(16);
619
620        let mut engine = EngineBuilder::<MockPlayer, (), (), ()>::new()
621            .bloop_retention(Duration::from_secs(3600))
622            .audio_base_path("./audio")
623            .player_registry(player_registry)
624            .trigger_registry(trigger_registry)
625            .network_rx(rx)
626            .event_tx(evt_tx)
627            .build()
628            .unwrap();
629
630        let nfc_uid = NfcUid::default();
631        let client_id = "client".to_string();
632        let (resp_tx, resp_rx) = oneshot::channel();
633
634        engine
635            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
636            .await;
637
638        let response = resp_rx.await.unwrap();
639        match response {
640            ServerMessage::BloopAccepted { achievements } => {
641                assert!(achievements.is_empty());
642            }
643            _ => panic!("Expected BloopAccepted response"),
644        }
645
646        assert!(
647            engine
648                .trigger_registry
649                .check_active_trigger((), "client", Utc::now())
650        );
651    }
652
653    #[tokio::test]
654    async fn handle_bloop_respects_throttling() {
655        let mut engine = build_test_engine();
656        let nfc_uid = NfcUid::default();
657        let client_id = "test-client".to_string();
658
659        {
660            let mut registry = engine.player_registry.lock().await;
661            let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
662            registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
663        }
664
665        engine.throttle = Some(Throttle::new(1, Duration::from_secs(10)));
666
667        let bloop = Bloop::new(
668            engine
669                .player_registry
670                .lock()
671                .await
672                .get_by_nfc_uid(nfc_uid)
673                .unwrap(),
674            client_id.clone(),
675            Utc::now(),
676        );
677        engine.bloop_provider.add(Arc::new(bloop));
678
679        let (resp_tx, resp_rx) = oneshot::channel();
680        engine
681            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
682            .await;
683
684        let response = resp_rx.await.unwrap();
685        match response {
686            ServerMessage::Error(err) => {
687                assert!(matches!(err, ErrorResponse::NfcUidThrottled));
688            }
689            _ => panic!("Expected throttling error"),
690        }
691    }
692}