bloop_server_framework/
engine.rs

1use crate::achievement::{Achievement, AchievementAwardBatch, AchievementContext, AwardedTracker};
2use crate::bloop::{Bloop, BloopProvider, ProcessedBloop, bloops_since};
3use crate::event::Event;
4use crate::message::{AchievementRecord, DataHash, ErrorResponse, ServerMessage};
5use crate::nfc_uid::NfcUid;
6use crate::player::{PlayerInfo, PlayerMutator, PlayerRegistry};
7use crate::trigger::TriggerRegistry;
8use chrono::{DateTime, Utc};
9use serde::Deserialize;
10use std::collections::{HashMap, HashSet};
11use std::fmt::Debug;
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::time::Duration;
15use thiserror::Error;
16use tokio::fs::File;
17use tokio::io::AsyncReadExt;
18use tokio::sync::{Mutex, broadcast, mpsc, oneshot};
19#[cfg(feature = "tokio-graceful-shutdown")]
20use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
21use tracing::{info, instrument, warn};
22use uuid::Uuid;
23
24#[derive(Debug)]
25pub enum EngineRequest {
26    Bloop { client_id: String, nfc_uid: NfcUid },
27    RetrieveAudio { id: Uuid },
28    PreloadCheck { manifest_hash: Option<DataHash> },
29}
30
31#[derive(Debug, Deserialize)]
32pub struct Throttle {
33    max_bloops: usize,
34    threshold: Duration,
35}
36
37impl Throttle {
38    pub fn new(max_bloops: usize, threshold: Duration) -> Self {
39        Self {
40            max_bloops,
41            threshold,
42        }
43    }
44}
45
46struct HotAchievement {
47    id: Uuid,
48    client_id: String,
49    until: DateTime<Utc>,
50}
51
52impl HotAchievement {
53    fn new<Player: PlayerInfo>(id: Uuid, bloop: &Bloop<Player>, duration: Duration) -> Self {
54        Self {
55            id,
56            client_id: bloop.client_id.clone(),
57            until: bloop.recorded_at + duration,
58        }
59    }
60}
61
62pub struct Engine<Metadata, Player, State, Trigger>
63where
64    Player: PlayerInfo + PlayerMutator,
65    Trigger: Copy,
66{
67    bloop_provider: BloopProvider<Player>,
68    achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>>,
69    audio_base_path: PathBuf,
70    audio_manifest_hash: DataHash,
71    player_registry: Arc<Mutex<PlayerRegistry<Player>>>,
72    state: Arc<Mutex<State>>,
73    trigger_registry: TriggerRegistry<Trigger>,
74    hot_achievements: Vec<HotAchievement>,
75    network_rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
76    event_tx: broadcast::Sender<Event>,
77    throttle: Option<Throttle>,
78}
79
80impl<Metadata, Player, State, Trigger> Engine<Metadata, Player, State, Trigger>
81where
82    Player: PlayerInfo + PlayerMutator,
83    Trigger: Copy,
84{
85    pub async fn process_requests(&mut self) {
86        while let Some((request, response)) = self.network_rx.recv().await {
87            match request {
88                EngineRequest::Bloop { nfc_uid, client_id } => {
89                    self.handle_bloop(nfc_uid, client_id, response).await;
90                }
91                EngineRequest::RetrieveAudio { id } => {
92                    self.handle_retrieve_audio(id, response);
93                }
94                EngineRequest::PreloadCheck { manifest_hash } => {
95                    self.handle_preload_check(manifest_hash, response);
96                }
97            }
98        }
99    }
100
101    #[instrument(skip(self, response))]
102    async fn handle_bloop(
103        &mut self,
104        nfc_uid: NfcUid,
105        client_id: String,
106        response: oneshot::Sender<ServerMessage>,
107    ) {
108        if self
109            .trigger_registry
110            .try_activate_trigger(nfc_uid, &client_id)
111        {
112            let _ = response.send(ServerMessage::BloopAccepted {
113                achievements: Vec::new(),
114            });
115            return;
116        }
117
118        let player = {
119            let player_registry = self.player_registry.lock().await;
120            let Some(player) = player_registry.get_by_nfc_uid(nfc_uid) else {
121                let _ = response.send(ServerMessage::Error(ErrorResponse::UnknownNfcUid));
122                return;
123            };
124            player
125        };
126
127        if let Some(throttle) = self.throttle.as_ref() {
128            let player_id = player.read().unwrap().id();
129
130            let recent_bloops = self
131                .bloop_provider
132                .for_client(&client_id)
133                .iter()
134                .filter(bloops_since(Utc::now() - throttle.threshold))
135                .take(throttle.max_bloops)
136                .collect::<Vec<_>>();
137
138            if recent_bloops
139                .iter()
140                .all(|bloop| bloop.player_id == player_id)
141                && recent_bloops.len() == throttle.max_bloops
142            {
143                let _ = response.send(ServerMessage::Error(ErrorResponse::NfcUidThrottled));
144                return;
145            }
146        }
147
148        player.write().unwrap().increment_bloops();
149        let bloop = Bloop::new(player.clone(), client_id, Utc::now());
150
151        let mut awarded_tracker = self.evaluate_achievements(&bloop).await;
152        self.activate_hot_achievements(&bloop, &awarded_tracker);
153        self.inject_hot_achievements(&bloop, &mut awarded_tracker);
154
155        let player_registry = self.player_registry.lock().await;
156        awarded_tracker.remove_duplicates(player_registry);
157
158        let achievement_ids: Vec<Uuid> = awarded_tracker
159            .for_player(bloop.player_id)
160            .map_or_else(Vec::new, |set| set.iter().cloned().collect());
161
162        self.apply_awarded(awarded_tracker).await;
163
164        let processed_bloop: ProcessedBloop = (&bloop).into();
165        self.bloop_provider.add(Arc::new(bloop));
166        let _ = self.event_tx.send(Event::BloopProcessed(processed_bloop));
167
168        let _ = response.send(ServerMessage::BloopAccepted {
169            achievements: achievement_ids
170                .into_iter()
171                .map(|id| AchievementRecord {
172                    id,
173                    audio_file_hash: self
174                        .achievements
175                        .get(&id)
176                        .unwrap()
177                        .audio_file
178                        .resolve(&self.audio_base_path)
179                        .as_ref()
180                        .map(|file| file.hash),
181                })
182                .collect(),
183        });
184    }
185
186    async fn evaluate_achievements(&mut self, bloop: &Bloop<Player>) -> AwardedTracker {
187        let previous_awarded: HashSet<Uuid> = {
188            let player = bloop.player();
189            player.awarded_achievements().keys().cloned().collect()
190        };
191        let state = self.state.lock().await;
192        let ctx = AchievementContext::new(
193            bloop,
194            &self.bloop_provider,
195            &*state,
196            &mut self.trigger_registry,
197        );
198
199        for achievement in self.achievements.values() {
200            if !previous_awarded.contains(&achievement.id) {
201                achievement.evaluate(&ctx);
202            }
203        }
204
205        ctx.take_awarded_tracker()
206    }
207
208    fn inject_hot_achievements(
209        &mut self,
210        bloop: &Bloop<Player>,
211        awarded_tracker: &mut AwardedTracker,
212    ) {
213        let awarded = awarded_tracker.for_player_mut(bloop.player_id);
214
215        self.hot_achievements.retain(|hot_achievement| {
216            if hot_achievement.until < bloop.recorded_at {
217                return false;
218            }
219
220            if hot_achievement.client_id == bloop.client_id {
221                awarded.insert(hot_achievement.id);
222            }
223
224            true
225        });
226    }
227
228    fn activate_hot_achievements(
229        &mut self,
230        bloop: &Bloop<Player>,
231        awarded_tracker: &AwardedTracker,
232    ) {
233        let Some(awarded) = awarded_tracker.for_player(bloop.player_id) else {
234            return;
235        };
236
237        for achievement_id in awarded {
238            let Some(achievement) = self.achievements.get(achievement_id) else {
239                continue;
240            };
241
242            if let Some(hot_duration) = achievement.hot_duration {
243                self.hot_achievements.push(HotAchievement::new(
244                    *achievement_id,
245                    bloop,
246                    hot_duration,
247                ));
248            };
249        }
250    }
251
252    async fn apply_awarded(&self, tracker: AwardedTracker) {
253        let mut player_registry = self.player_registry.lock().await;
254        let batch: AchievementAwardBatch = tracker.into();
255
256        for player_awards in batch.players.iter() {
257            player_registry.mutate_by_id(player_awards.player_id, |player| {
258                for achievement_id in player_awards.achievement_ids.iter() {
259                    player.add_awarded_achievement(*achievement_id, batch.awarded_at);
260                }
261            });
262        }
263
264        let _ = self.event_tx.send(Event::AchievementsAwarded(batch));
265    }
266
267    #[instrument(skip(self, response))]
268    fn handle_retrieve_audio(&self, id: Uuid, response: oneshot::Sender<ServerMessage>) {
269        let Some(achievement) = self.achievements.get(&id) else {
270            info!("Client requested unknown achievement: {}", id);
271            let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
272            return;
273        };
274
275        let Some(audio_file) = achievement.audio_file.resolve(&self.audio_base_path) else {
276            info!("Client requested audio for audio-less achievement: {}", id);
277            let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
278            return;
279        };
280
281        let path = audio_file.path.clone();
282
283        tokio::spawn(async move {
284            let mut file = match File::open(&path).await {
285                Ok(file) => file,
286                Err(err) => {
287                    warn!("Failed to open file {:?}: {:?}", &path, err);
288                    let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
289                    return;
290                }
291            };
292
293            let mut data = vec![];
294
295            if let Err(err) = file.read_to_end(&mut data).await {
296                warn!("Failed to read file {:?}: {:?}", &path, err);
297                let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
298                return;
299            }
300
301            let _ = response.send(ServerMessage::AudioData { data });
302        });
303    }
304
305    #[instrument(skip(self, response))]
306    fn handle_preload_check(
307        &self,
308        manifest_hash: Option<DataHash>,
309        response: oneshot::Sender<ServerMessage>,
310    ) {
311        if let Some(manifest_hash) = manifest_hash
312            && manifest_hash == self.audio_manifest_hash
313        {
314            let _ = response.send(ServerMessage::PreloadMatch);
315            return;
316        }
317
318        let _ = response.send(ServerMessage::PreloadMismatch {
319            audio_manifest_hash: self.audio_manifest_hash,
320            achievements: self
321                .achievements
322                .values()
323                .map(|achievement| AchievementRecord {
324                    id: achievement.id,
325                    audio_file_hash: achievement
326                        .audio_file
327                        .resolve(&self.audio_base_path)
328                        .as_ref()
329                        .map(|file| file.hash),
330                })
331                .collect(),
332        });
333    }
334}
335
336#[cfg(feature = "tokio-graceful-shutdown")]
337#[derive(Debug, Error)]
338pub enum NeverError {}
339
340#[cfg(feature = "tokio-graceful-shutdown")]
341impl<Metadata, Player, State, Trigger> IntoSubsystem<NeverError>
342    for Engine<Metadata, Player, State, Trigger>
343where
344    Metadata: Send + Sync + 'static,
345    Player: PlayerInfo + PlayerMutator + Send + Sync + 'static,
346    State: Send + Sync + 'static,
347    Trigger: Copy + PartialEq + Eq + Debug + Send + Sync + 'static,
348{
349    async fn run(mut self, subsys: &mut SubsystemHandle) -> Result<(), NeverError> {
350        let _ = self.process_requests().cancel_on_shutdown(subsys).await;
351        Ok(())
352    }
353}
354
355#[derive(Debug, Error)]
356pub enum BuilderError {
357    #[error("missing field: {0}")]
358    MissingField(&'static str),
359}
360
361#[derive(Debug, Default)]
362pub struct EngineBuilder<Player, State = (), Trigger = (), Metadata = ()>
363where
364    Player: PlayerInfo + PlayerMutator,
365    Trigger: Copy,
366{
367    bloops: Vec<Bloop<Player>>,
368    achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
369    bloop_retention: Option<Duration>,
370    audio_base_path: Option<PathBuf>,
371    player_registry: Option<Arc<Mutex<PlayerRegistry<Player>>>>,
372    state: Option<Arc<Mutex<State>>>,
373    trigger_registry: Option<TriggerRegistry<Trigger>>,
374    network_rx: Option<mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>>,
375    event_tx: Option<broadcast::Sender<Event>>,
376    throttle: Option<Throttle>,
377}
378
379impl<Player, State, Trigger, Metadata> EngineBuilder<Player, State, Trigger, Metadata>
380where
381    Player: PlayerInfo + PlayerMutator,
382    State: Default,
383    Trigger: Copy + PartialEq + Eq + Debug,
384{
385    pub fn new() -> Self {
386        Self {
387            bloops: Vec::new(),
388            achievements: Vec::new(),
389            bloop_retention: None,
390            audio_base_path: None,
391            player_registry: None,
392            state: None,
393            trigger_registry: None,
394            network_rx: None,
395            event_tx: None,
396            throttle: None,
397        }
398    }
399
400    pub fn bloops(mut self, bloops: Vec<Bloop<Player>>) -> Self {
401        self.bloops = bloops;
402        self
403    }
404
405    pub fn achievements(
406        mut self,
407        achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
408    ) -> Self {
409        self.achievements = achievements;
410        self
411    }
412
413    pub fn bloop_retention(mut self, retention: Duration) -> Self {
414        self.bloop_retention = Some(retention);
415        self
416    }
417
418    pub fn audio_base_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
419        self.audio_base_path = Some(path.into());
420        self
421    }
422
423    pub fn player_registry(mut self, registry: Arc<Mutex<PlayerRegistry<Player>>>) -> Self {
424        self.player_registry = Some(registry);
425        self
426    }
427
428    pub fn state(mut self, state: Arc<Mutex<State>>) -> Self {
429        self.state = Some(state);
430        self
431    }
432
433    pub fn trigger_registry(mut self, registry: TriggerRegistry<Trigger>) -> Self {
434        self.trigger_registry = Some(registry);
435        self
436    }
437
438    pub fn network_rx(
439        mut self,
440        rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
441    ) -> Self {
442        self.network_rx = Some(rx);
443        self
444    }
445
446    pub fn event_tx(mut self, tx: broadcast::Sender<Event>) -> Self {
447        self.event_tx = Some(tx);
448        self
449    }
450
451    pub fn throttle(mut self, throttle: Throttle) -> Self {
452        self.throttle = Some(throttle);
453        self
454    }
455
456    /// Consumes the builder and constructs the Engine.
457    pub fn build(self) -> Result<Engine<Metadata, Player, State, Trigger>, BuilderError> {
458        let bloop_retention = self
459            .bloop_retention
460            .ok_or(BuilderError::MissingField("bloop_retention"))?;
461        let audio_base_path = self
462            .audio_base_path
463            .ok_or(BuilderError::MissingField("audio_base_path"))?;
464        let player_registry = self
465            .player_registry
466            .ok_or(BuilderError::MissingField("player_registry"))?;
467        let network_rx = self
468            .network_rx
469            .ok_or(BuilderError::MissingField("network_rx"))?;
470        let event_tx = self
471            .event_tx
472            .ok_or(BuilderError::MissingField("event_tx"))?;
473
474        let audio_manifest_hash = calculate_manifest_hash(&audio_base_path, &self.achievements);
475        let bloop_provider = BloopProvider::with_bloops(bloop_retention, self.bloops);
476        let achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>> =
477            self.achievements.into_iter().map(|a| (a.id, a)).collect();
478        let state = self
479            .state
480            .unwrap_or_else(|| Arc::new(Mutex::new(Default::default())));
481        let trigger_registry = self
482            .trigger_registry
483            .unwrap_or_else(|| TriggerRegistry::new(HashMap::new()));
484        let hot_achievements = Vec::new();
485
486        Ok(Engine {
487            bloop_provider,
488            achievements,
489            audio_base_path,
490            audio_manifest_hash,
491            player_registry,
492            state,
493            trigger_registry,
494            hot_achievements,
495            network_rx,
496            event_tx,
497            throttle: self.throttle,
498        })
499    }
500}
501
502fn calculate_manifest_hash<Metadata, Player, State, Trigger>(
503    audio_base_path: &Path,
504    achievements: &[Achievement<Metadata, Player, State, Trigger>],
505) -> DataHash {
506    let audio_file_hashes: HashMap<Uuid, DataHash> = achievements
507        .iter()
508        .filter_map(|achievement| {
509            achievement
510                .audio_file
511                .resolve(audio_base_path)
512                .as_ref()
513                .map(|file| (achievement.id, file.hash))
514        })
515        .collect();
516
517    let mut entries: Vec<_> = audio_file_hashes.iter().collect();
518    entries.sort_by_key(|(id, _)| *id);
519    let mut hash_input = Vec::with_capacity(entries.len() * 32);
520
521    for (id, hash) in entries {
522        hash_input.extend(id.as_bytes());
523        hash_input.extend_from_slice(hash.as_bytes());
524    }
525
526    let manifest_hash = md5::compute(hash_input);
527    manifest_hash.into()
528}
529
530#[cfg(test)]
531mod tests {
532    use super::*;
533    use crate::test_utils::MockPlayer;
534    use crate::trigger::{TriggerOccurrence, TriggerSpec};
535
536    fn build_test_engine() -> Engine<(), MockPlayer, (), ()> {
537        let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
538        let trigger_registry = TriggerRegistry::new(HashMap::new());
539
540        EngineBuilder::<MockPlayer, (), (), ()>::new()
541            .bloop_retention(Duration::from_secs(3600))
542            .audio_base_path("./audio")
543            .player_registry(player_registry)
544            .trigger_registry(trigger_registry)
545            .network_rx(mpsc::channel(1).1)
546            .event_tx(broadcast::channel(16).0)
547            .build()
548            .unwrap()
549    }
550
551    #[tokio::test]
552    async fn handle_bloop_rejects_unknown_nfc_uid() {
553        let mut engine = build_test_engine();
554        let unknown_nfc_uid = NfcUid::default();
555        let client_id = "test-client".to_string();
556
557        let (resp_tx, resp_rx) = oneshot::channel();
558        engine
559            .handle_bloop(unknown_nfc_uid, client_id.clone(), resp_tx)
560            .await;
561
562        let response = resp_rx.await.unwrap();
563        match response {
564            ServerMessage::Error(err) => {
565                assert!(matches!(err, ErrorResponse::UnknownNfcUid));
566            }
567            _ => panic!("Expected Error response for unknown NFC UID"),
568        }
569    }
570
571    #[tokio::test]
572    async fn handle_bloop_accepts_known_player() {
573        let mut engine = build_test_engine();
574        let nfc_uid = NfcUid::default();
575        let client_id = "test-client".to_string();
576
577        {
578            let mut registry = engine.player_registry.lock().await;
579            let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
580            registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
581        }
582
583        let (resp_tx, resp_rx) = oneshot::channel();
584        engine
585            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
586            .await;
587
588        let response = resp_rx.await.unwrap();
589        match response {
590            ServerMessage::BloopAccepted { achievements } => {
591                assert!(achievements.is_empty());
592            }
593            _ => panic!("Expected BloopAccepted response"),
594        }
595    }
596
597    #[tokio::test]
598    async fn handle_bloop_activates_trigger_and_responds() {
599        let mut trigger_registry = HashMap::new();
600        trigger_registry.insert(
601            NfcUid::default(),
602            TriggerSpec {
603                trigger: (),
604                global: false,
605                occurrence: TriggerOccurrence::Once,
606            },
607        );
608        let trigger_registry = TriggerRegistry::new(trigger_registry);
609
610        let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
611
612        let (_tx, rx) = mpsc::channel(1);
613        let (evt_tx, _) = broadcast::channel(16);
614
615        let mut engine = EngineBuilder::<MockPlayer, (), (), ()>::new()
616            .bloop_retention(Duration::from_secs(3600))
617            .audio_base_path("./audio")
618            .player_registry(player_registry)
619            .trigger_registry(trigger_registry)
620            .network_rx(rx)
621            .event_tx(evt_tx)
622            .build()
623            .unwrap();
624
625        let nfc_uid = NfcUid::default();
626        let client_id = "client".to_string();
627        let (resp_tx, resp_rx) = oneshot::channel();
628
629        engine
630            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
631            .await;
632
633        let response = resp_rx.await.unwrap();
634        match response {
635            ServerMessage::BloopAccepted { achievements } => {
636                assert!(achievements.is_empty());
637            }
638            _ => panic!("Expected BloopAccepted response"),
639        }
640
641        assert!(
642            engine
643                .trigger_registry
644                .check_active_trigger((), "client", Utc::now())
645        );
646    }
647
648    #[tokio::test]
649    async fn handle_bloop_respects_throttling() {
650        let mut engine = build_test_engine();
651        let nfc_uid = NfcUid::default();
652        let client_id = "test-client".to_string();
653
654        {
655            let mut registry = engine.player_registry.lock().await;
656            let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
657            registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
658        }
659
660        engine.throttle = Some(Throttle::new(1, Duration::from_secs(10)));
661
662        let bloop = Bloop::new(
663            engine
664                .player_registry
665                .lock()
666                .await
667                .get_by_nfc_uid(nfc_uid)
668                .unwrap(),
669            client_id.clone(),
670            Utc::now(),
671        );
672        engine.bloop_provider.add(Arc::new(bloop));
673
674        let (resp_tx, resp_rx) = oneshot::channel();
675        engine
676            .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
677            .await;
678
679        let response = resp_rx.await.unwrap();
680        match response {
681            ServerMessage::Error(err) => {
682                assert!(matches!(err, ErrorResponse::NfcUidThrottled));
683            }
684            _ => panic!("Expected throttling error"),
685        }
686    }
687}