1use crate::achievement::{Achievement, AchievementAwardBatch, AchievementContext, AwardedTracker};
2use crate::bloop::{Bloop, BloopProvider, ProcessedBloop, bloops_since};
3use crate::event::Event;
4use crate::message::{AchievementRecord, DataHash, ErrorResponse, ServerMessage};
5use crate::nfc_uid::NfcUid;
6use crate::player::{PlayerInfo, PlayerMutator, PlayerRegistry};
7use crate::trigger::TriggerRegistry;
8#[cfg(feature = "tokio-graceful-shutdown")]
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use serde::Deserialize;
12use std::collections::{HashMap, HashSet};
13use std::fmt::Debug;
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::Duration;
18use thiserror::Error;
19use tokio::fs::File;
20use tokio::io::AsyncReadExt;
21use tokio::sync::{Mutex, broadcast, mpsc, oneshot};
22#[cfg(feature = "tokio-graceful-shutdown")]
23use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
24use tracing::{info, instrument, warn};
25use uuid::Uuid;
26
27#[derive(Debug)]
28pub enum EngineRequest {
29 Bloop { client_id: String, nfc_uid: NfcUid },
30 RetrieveAudio { id: Uuid },
31 PreloadCheck { manifest_hash: Option<DataHash> },
32}
33
34#[derive(Debug, Deserialize)]
35pub struct Throttle {
36 max_bloops: usize,
37 threshold: Duration,
38}
39
40impl Throttle {
41 pub fn new(max_bloops: usize, threshold: Duration) -> Self {
42 Self {
43 max_bloops,
44 threshold,
45 }
46 }
47}
48
49struct HotAchievement {
50 id: Uuid,
51 client_id: String,
52 until: DateTime<Utc>,
53}
54
55impl HotAchievement {
56 fn new<Player: PlayerInfo>(id: Uuid, bloop: &Bloop<Player>, duration: Duration) -> Self {
57 Self {
58 id,
59 client_id: bloop.client_id.clone(),
60 until: bloop.recorded_at + duration,
61 }
62 }
63}
64
65pub struct Engine<Metadata, Player, State, Trigger>
66where
67 Player: PlayerInfo + PlayerMutator,
68 Trigger: Copy,
69{
70 bloop_provider: BloopProvider<Player>,
71 achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>>,
72 audio_base_path: PathBuf,
73 audio_file_hashes: HashMap<Uuid, DataHash>,
74 audio_manifest_hash: DataHash,
75 player_registry: Arc<Mutex<PlayerRegistry<Player>>>,
76 state: Arc<Mutex<State>>,
77 trigger_registry: TriggerRegistry<Trigger>,
78 hot_achievements: Vec<HotAchievement>,
79 network_rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
80 event_tx: broadcast::Sender<Event>,
81 throttle: Option<Throttle>,
82}
83
84impl<Metadata, Player, State, Trigger> Engine<Metadata, Player, State, Trigger>
85where
86 Player: PlayerInfo + PlayerMutator,
87 Trigger: Copy,
88{
89 pub async fn process_requests(&mut self) {
90 while let Some((request, response)) = self.network_rx.recv().await {
91 match request {
92 EngineRequest::Bloop { nfc_uid, client_id } => {
93 self.handle_bloop(nfc_uid, client_id, response).await;
94 }
95 EngineRequest::RetrieveAudio { id } => {
96 self.handle_retrieve_audio(id, response);
97 }
98 EngineRequest::PreloadCheck { manifest_hash } => {
99 self.handle_preload_check(manifest_hash, response);
100 }
101 }
102 }
103 }
104
105 #[instrument(skip(self, response))]
106 async fn handle_bloop(
107 &mut self,
108 nfc_uid: NfcUid,
109 client_id: String,
110 response: oneshot::Sender<ServerMessage>,
111 ) {
112 if self
113 .trigger_registry
114 .try_activate_trigger(nfc_uid, &client_id)
115 {
116 let _ = response.send(ServerMessage::BloopAccepted {
117 achievements: Vec::new(),
118 });
119 return;
120 }
121
122 let player = {
123 let player_registry = self.player_registry.lock().await;
124 let Some(player) = player_registry.get_by_nfc_uid(nfc_uid) else {
125 let _ = response.send(ServerMessage::Error(ErrorResponse::UnknownNfcUid));
126 return;
127 };
128 player
129 };
130
131 if let Some(throttle) = self.throttle.as_ref() {
132 let player_id = player.read().unwrap().id();
133
134 let throttled = self
135 .bloop_provider
136 .for_client(&client_id)
137 .iter()
138 .filter(bloops_since(Utc::now() - throttle.threshold))
139 .take(throttle.max_bloops)
140 .all(|bloop| bloop.player_id == player_id);
141
142 if throttled {
143 let _ = response.send(ServerMessage::Error(ErrorResponse::NfcUidThrottled));
144 return;
145 }
146 }
147
148 player.write().unwrap().increment_bloops();
149 let bloop = Bloop::new(player.clone(), client_id, Utc::now());
150
151 let mut awarded_tracker = self.evaluate_achievements(&bloop).await;
152 self.activate_hot_achievements(&bloop, &awarded_tracker);
153 self.inject_hot_achievements(&bloop, &mut awarded_tracker);
154
155 let player_registry = self.player_registry.lock().await;
156 awarded_tracker.remove_duplicates(player_registry);
157
158 let achievement_ids: Vec<Uuid> = awarded_tracker
159 .for_player(bloop.player_id)
160 .map_or_else(Vec::new, |set| set.iter().cloned().collect());
161
162 self.apply_awarded(awarded_tracker).await;
163
164 let processed_bloop: ProcessedBloop = (&bloop).into();
165 self.bloop_provider.add(Arc::new(bloop));
166 let _ = self.event_tx.send(Event::BloopProcessed(processed_bloop));
167
168 let _ = response.send(ServerMessage::BloopAccepted {
169 achievements: achievement_ids
170 .into_iter()
171 .map(|id| AchievementRecord {
172 id,
173 audio_file_hash: self.audio_file_hashes.get(&id).cloned(),
174 })
175 .collect(),
176 });
177 }
178
179 async fn evaluate_achievements(&mut self, bloop: &Bloop<Player>) -> AwardedTracker {
180 let previous_awarded: HashSet<Uuid> = {
181 let player = bloop.player();
182 player.awarded_achievements().keys().cloned().collect()
183 };
184 let metadata = self.state.lock().await;
185 let ctx = AchievementContext::new(
186 bloop,
187 &self.bloop_provider,
188 &*metadata,
189 &mut self.trigger_registry,
190 );
191
192 for achievement in self.achievements.values() {
193 if !previous_awarded.contains(&achievement.id) {
194 achievement.evaluate(&ctx);
195 }
196 }
197
198 ctx.take_awarded_tracker()
199 }
200
201 fn inject_hot_achievements(
202 &mut self,
203 bloop: &Bloop<Player>,
204 awarded_tracker: &mut AwardedTracker,
205 ) {
206 let awarded = awarded_tracker.for_player_mut(bloop.player_id);
207
208 self.hot_achievements.retain(|hot_achievement| {
209 if hot_achievement.until < bloop.recorded_at {
210 return false;
211 }
212
213 if hot_achievement.client_id == bloop.client_id {
214 awarded.insert(hot_achievement.id);
215 }
216
217 true
218 });
219 }
220
221 fn activate_hot_achievements(
222 &mut self,
223 bloop: &Bloop<Player>,
224 awarded_tracker: &AwardedTracker,
225 ) {
226 let Some(awarded) = awarded_tracker.for_player(bloop.player_id) else {
227 return;
228 };
229
230 for achievement_id in awarded {
231 let Some(achievement) = self.achievements.get(achievement_id) else {
232 continue;
233 };
234
235 if let Some(hot_duration) = achievement.hot_duration {
236 self.hot_achievements.push(HotAchievement::new(
237 *achievement_id,
238 bloop,
239 hot_duration,
240 ));
241 };
242 }
243 }
244
245 async fn apply_awarded(&self, tracker: AwardedTracker) {
246 let mut player_registry = self.player_registry.lock().await;
247 let batch: AchievementAwardBatch = tracker.into();
248
249 for player_awards in batch.players.iter() {
250 player_registry.mutate_by_id(player_awards.player_id, |player| {
251 for achievement_id in player_awards.achievement_ids.iter() {
252 player.add_awarded_achievement(*achievement_id, batch.awarded_at);
253 }
254 });
255 }
256
257 let _ = self.event_tx.send(Event::AchievementsAwarded(batch));
258 }
259
260 #[instrument(skip(self, response))]
261 fn handle_retrieve_audio(&self, id: Uuid, response: oneshot::Sender<ServerMessage>) {
262 let Some(achievement) = self.achievements.get(&id) else {
263 info!("Client requested unknown achievement: {}", id);
264 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
265 return;
266 };
267
268 let Some(path) = achievement.audio_path.as_ref() else {
269 info!("Client requested audio for audio-less achievement: {}", id);
270 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
271 return;
272 };
273 let path = self.audio_base_path.join(path);
274
275 tokio::spawn(async move {
276 let mut file = match File::open(&path).await {
277 Ok(file) => file,
278 Err(err) => {
279 warn!("Failed to open file {:?}: {:?}", &path, err);
280 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
281 return;
282 }
283 };
284
285 let mut data = vec![];
286
287 if let Err(err) = file.read_to_end(&mut data).await {
288 warn!("Failed to read file {:?}: {:?}", &path, err);
289 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
290 return;
291 }
292
293 let _ = response.send(ServerMessage::AudioData { data });
294 });
295 }
296
297 #[instrument(skip(self, response))]
298 fn handle_preload_check(
299 &self,
300 manifest_hash: Option<DataHash>,
301 response: oneshot::Sender<ServerMessage>,
302 ) {
303 if let Some(manifest_hash) = manifest_hash {
304 if manifest_hash == self.audio_manifest_hash {
305 let _ = response.send(ServerMessage::PreloadMatch);
306 return;
307 }
308 }
309
310 let _ = response.send(ServerMessage::PreloadMismatch {
311 audio_manifest_hash: self.audio_manifest_hash,
312 achievements: self
313 .achievements
314 .values()
315 .map(|achievement| AchievementRecord {
316 id: achievement.id,
317 audio_file_hash: self.audio_file_hashes.get(&achievement.id).cloned(),
318 })
319 .collect(),
320 });
321 }
322}
323
324#[cfg(feature = "tokio-graceful-shutdown")]
325#[derive(Debug, Error)]
326pub enum NeverError {}
327
328#[cfg(feature = "tokio-graceful-shutdown")]
329#[async_trait]
330impl<Metadata, Player, State, Trigger> IntoSubsystem<NeverError>
331 for Engine<Metadata, Player, State, Trigger>
332where
333 Metadata: Send + Sync + 'static,
334 Player: PlayerInfo + PlayerMutator + Send + Sync + 'static,
335 State: Send + Sync + 'static,
336 Trigger: Copy + PartialEq + Eq + Debug + Send + Sync + 'static,
337{
338 async fn run(mut self, subsys: SubsystemHandle) -> Result<(), NeverError> {
339 let _ = self.process_requests().cancel_on_shutdown(&subsys).await;
340 Ok(())
341 }
342}
343
344#[derive(Debug, Error)]
345pub enum BuilderError {
346 #[error("missing field: {0}")]
347 MissingField(&'static str),
348}
349
350#[derive(Debug, Default)]
351pub struct EngineBuilder<Player, State = (), Trigger = (), Metadata = ()>
352where
353 Player: PlayerInfo + PlayerMutator,
354 Metadata: Default,
355 Trigger: Copy,
356{
357 bloops: Vec<Bloop<Player>>,
358 achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
359 bloop_retention: Option<Duration>,
360 audio_base_path: Option<PathBuf>,
361 player_registry: Option<Arc<Mutex<PlayerRegistry<Player>>>>,
362 state: Option<Arc<Mutex<State>>>,
363 trigger_registry: Option<TriggerRegistry<Trigger>>,
364 network_rx: Option<mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>>,
365 event_tx: Option<broadcast::Sender<Event>>,
366 throttle: Option<Throttle>,
367}
368
369impl<Player, State, Trigger, Metadata> EngineBuilder<Player, State, Trigger, Metadata>
370where
371 Player: PlayerInfo + PlayerMutator,
372 Metadata: Default,
373 State: Default,
374 Trigger: Copy + PartialEq + Eq + Debug,
375{
376 pub fn new() -> Self {
377 Self {
378 bloops: Vec::new(),
379 achievements: Vec::new(),
380 bloop_retention: None,
381 audio_base_path: None,
382 player_registry: None,
383 state: None,
384 trigger_registry: None,
385 network_rx: None,
386 event_tx: None,
387 throttle: None,
388 }
389 }
390
391 pub fn bloops(mut self, bloops: Vec<Bloop<Player>>) -> Self {
392 self.bloops = bloops;
393 self
394 }
395
396 pub fn achievements(
397 mut self,
398 achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
399 ) -> Self {
400 self.achievements = achievements;
401 self
402 }
403
404 pub fn bloop_retention(mut self, retention: Duration) -> Self {
405 self.bloop_retention = Some(retention);
406 self
407 }
408
409 pub fn audio_base_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
410 self.audio_base_path = Some(path.into());
411 self
412 }
413
414 pub fn player_registry(mut self, registry: Arc<Mutex<PlayerRegistry<Player>>>) -> Self {
415 self.player_registry = Some(registry);
416 self
417 }
418
419 pub fn metadata(mut self, metadata: Arc<Mutex<State>>) -> Self {
420 self.state = Some(metadata);
421 self
422 }
423
424 pub fn trigger_registry(mut self, registry: TriggerRegistry<Trigger>) -> Self {
425 self.trigger_registry = Some(registry);
426 self
427 }
428
429 pub fn network_rx(
430 mut self,
431 rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
432 ) -> Self {
433 self.network_rx = Some(rx);
434 self
435 }
436
437 pub fn event_tx(mut self, tx: broadcast::Sender<Event>) -> Self {
438 self.event_tx = Some(tx);
439 self
440 }
441
442 pub fn throttle(mut self, throttle: Throttle) -> Self {
443 self.throttle = Some(throttle);
444 self
445 }
446
447 pub fn build(self) -> Result<Engine<Metadata, Player, State, Trigger>, BuilderError> {
449 let bloop_retention = self
450 .bloop_retention
451 .ok_or(BuilderError::MissingField("bloop_retention"))?;
452 let audio_base_path = self
453 .audio_base_path
454 .ok_or(BuilderError::MissingField("audio_base_path"))?;
455 let player_registry = self
456 .player_registry
457 .ok_or(BuilderError::MissingField("player_registry"))?;
458 let network_rx = self
459 .network_rx
460 .ok_or(BuilderError::MissingField("network_rx"))?;
461 let event_tx = self
462 .event_tx
463 .ok_or(BuilderError::MissingField("event_tx"))?;
464
465 let (audio_hashes, global_hash) =
466 collect_audio_hashes(&audio_base_path, &self.achievements);
467 let bloop_provider = BloopProvider::with_bloops(bloop_retention, self.bloops);
468 let achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>> =
469 self.achievements.into_iter().map(|a| (a.id, a)).collect();
470 let state = self
471 .state
472 .unwrap_or_else(|| Arc::new(Mutex::new(Default::default())));
473 let trigger_registry = self
474 .trigger_registry
475 .unwrap_or_else(|| TriggerRegistry::new(HashMap::new()));
476 let hot_achievements = Vec::new();
477
478 Ok(Engine {
479 bloop_provider,
480 achievements,
481 audio_base_path,
482 audio_file_hashes: audio_hashes,
483 audio_manifest_hash: global_hash,
484 player_registry,
485 state,
486 trigger_registry,
487 hot_achievements,
488 network_rx,
489 event_tx,
490 throttle: self.throttle,
491 })
492 }
493}
494
495fn collect_audio_hashes<Metadata, Player, State, Trigger>(
496 audio_base_path: &Path,
497 achievements: &[Achievement<Metadata, Player, State, Trigger>],
498) -> (HashMap<Uuid, DataHash>, DataHash) {
499 let audio_file_hashes: HashMap<Uuid, DataHash> = achievements
500 .iter()
501 .filter_map(|achievement| {
502 let path = achievement.audio_path.as_ref()?;
503 let path = audio_base_path.join(path);
504
505 let Ok(file_content) = fs::read(path.clone()) else {
506 warn!("Audio file missing: {:?}", path);
507 return None;
508 };
509
510 let digest = md5::compute(file_content);
511
512 Some((achievement.id, digest.into()))
513 })
514 .collect();
515
516 let mut entries: Vec<_> = audio_file_hashes.iter().collect();
517 entries.sort_by_key(|(id, _)| *id);
518 let mut hash_input = Vec::with_capacity(entries.len() * 32);
519
520 for (id, hash) in entries {
521 hash_input.extend(id.as_bytes());
522 hash_input.extend_from_slice(hash.as_bytes());
523 }
524
525 let manifest_hash = md5::compute(hash_input);
526
527 (audio_file_hashes, manifest_hash.into())
528}
529
530#[cfg(test)]
531mod tests {
532 use super::*;
533 use crate::test_utils::MockPlayer;
534 use crate::trigger::{TriggerOccurrence, TriggerSpec};
535
536 fn build_test_engine() -> Engine<(), MockPlayer, (), ()> {
537 let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
538 let trigger_registry = TriggerRegistry::new(HashMap::new());
539
540 EngineBuilder::<MockPlayer, (), (), ()>::new()
541 .bloop_retention(Duration::from_secs(3600))
542 .audio_base_path("./audio")
543 .player_registry(player_registry)
544 .trigger_registry(trigger_registry)
545 .network_rx(mpsc::channel(1).1)
546 .event_tx(broadcast::channel(16).0)
547 .build()
548 .unwrap()
549 }
550
551 #[tokio::test]
552 async fn handle_bloop_rejects_unknown_nfc_uid() {
553 let mut engine = build_test_engine();
554 let unknown_nfc_uid = NfcUid::default();
555 let client_id = "test-client".to_string();
556
557 let (resp_tx, resp_rx) = oneshot::channel();
558 engine
559 .handle_bloop(unknown_nfc_uid, client_id.clone(), resp_tx)
560 .await;
561
562 let response = resp_rx.await.unwrap();
563 match response {
564 ServerMessage::Error(err) => {
565 assert!(matches!(err, ErrorResponse::UnknownNfcUid));
566 }
567 _ => panic!("Expected Error response for unknown NFC UID"),
568 }
569 }
570
571 #[tokio::test]
572 async fn handle_bloop_accepts_known_player() {
573 let mut engine = build_test_engine();
574 let nfc_uid = NfcUid::default();
575 let client_id = "test-client".to_string();
576
577 {
578 let mut registry = engine.player_registry.lock().await;
579 let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
580 registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
581 }
582
583 let (resp_tx, resp_rx) = oneshot::channel();
584 engine
585 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
586 .await;
587
588 let response = resp_rx.await.unwrap();
589 match response {
590 ServerMessage::BloopAccepted { achievements } => {
591 assert!(achievements.is_empty());
592 }
593 _ => panic!("Expected BloopAccepted response"),
594 }
595 }
596
597 #[tokio::test]
598 async fn handle_bloop_activates_trigger_and_responds() {
599 let mut trigger_registry = HashMap::new();
600 trigger_registry.insert(
601 NfcUid::default(),
602 TriggerSpec {
603 trigger: (),
604 global: false,
605 occurrence: TriggerOccurrence::Once,
606 },
607 );
608 let trigger_registry = TriggerRegistry::new(trigger_registry);
609
610 let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
611
612 let (_tx, rx) = mpsc::channel(1);
613 let (evt_tx, _) = broadcast::channel(16);
614
615 let mut engine = EngineBuilder::<MockPlayer, (), (), ()>::new()
616 .bloop_retention(Duration::from_secs(3600))
617 .audio_base_path("./audio")
618 .player_registry(player_registry)
619 .trigger_registry(trigger_registry)
620 .network_rx(rx)
621 .event_tx(evt_tx)
622 .build()
623 .unwrap();
624
625 let nfc_uid = NfcUid::default();
626 let client_id = "client".to_string();
627 let (resp_tx, resp_rx) = oneshot::channel();
628
629 engine
630 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
631 .await;
632
633 let response = resp_rx.await.unwrap();
634 match response {
635 ServerMessage::BloopAccepted { achievements } => {
636 assert!(achievements.is_empty());
637 }
638 _ => panic!("Expected BloopAccepted response"),
639 }
640
641 assert!(
642 engine
643 .trigger_registry
644 .check_active_trigger((), "client", Utc::now())
645 );
646 }
647
648 #[tokio::test]
649 async fn handle_bloop_respects_throttling() {
650 let mut engine = build_test_engine();
651 let nfc_uid = NfcUid::default();
652 let client_id = "test-client".to_string();
653
654 {
655 let mut registry = engine.player_registry.lock().await;
656 let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
657 registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
658 }
659
660 engine.throttle = Some(Throttle::new(1, Duration::from_secs(10)));
661
662 let bloop = Bloop::new(
663 engine
664 .player_registry
665 .lock()
666 .await
667 .get_by_nfc_uid(nfc_uid)
668 .unwrap(),
669 client_id.clone(),
670 Utc::now(),
671 );
672 engine.bloop_provider.add(Arc::new(bloop));
673
674 let (resp_tx, resp_rx) = oneshot::channel();
675 engine
676 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
677 .await;
678
679 let response = resp_rx.await.unwrap();
680 match response {
681 ServerMessage::Error(err) => {
682 assert!(matches!(err, ErrorResponse::NfcUidThrottled));
683 }
684 _ => panic!("Expected throttling error"),
685 }
686 }
687}