1use crate::achievement::{Achievement, AchievementAwardBatch, AchievementContext, AwardedTracker};
2use crate::bloop::{Bloop, BloopProvider, ProcessedBloop, bloops_since};
3use crate::event::Event;
4use crate::message::{AchievementRecord, DataHash, ErrorResponse, ServerMessage};
5use crate::nfc_uid::NfcUid;
6use crate::player::{PlayerInfo, PlayerMutator, PlayerRegistry};
7use crate::trigger::TriggerRegistry;
8#[cfg(feature = "tokio-graceful-shutdown")]
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use serde::Deserialize;
12use std::collections::{HashMap, HashSet};
13use std::fmt::Debug;
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::Duration;
18use thiserror::Error;
19use tokio::fs::File;
20use tokio::io::AsyncReadExt;
21use tokio::sync::{Mutex, broadcast, mpsc, oneshot};
22#[cfg(feature = "tokio-graceful-shutdown")]
23use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
24use tracing::{info, instrument, warn};
25use uuid::Uuid;
26
27#[derive(Debug)]
28pub enum EngineRequest {
29 Bloop { client_id: String, nfc_uid: NfcUid },
30 RetrieveAudio { id: Uuid },
31 PreloadCheck { manifest_hash: Option<DataHash> },
32}
33
34#[derive(Debug, Deserialize)]
35pub struct Throttle {
36 max_bloops: usize,
37 threshold: Duration,
38}
39
40impl Throttle {
41 pub fn new(max_bloops: usize, threshold: Duration) -> Self {
42 Self {
43 max_bloops,
44 threshold,
45 }
46 }
47}
48
49struct HotAchievement {
50 id: Uuid,
51 client_id: String,
52 until: DateTime<Utc>,
53}
54
55impl HotAchievement {
56 fn new<Player: PlayerInfo>(id: Uuid, bloop: &Bloop<Player>, duration: Duration) -> Self {
57 Self {
58 id,
59 client_id: bloop.client_id.clone(),
60 until: bloop.recorded_at + duration,
61 }
62 }
63}
64
65pub struct Engine<Metadata, Player, State, Trigger>
66where
67 Player: PlayerInfo + PlayerMutator,
68 Trigger: Copy,
69{
70 bloop_provider: BloopProvider<Player>,
71 achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>>,
72 audio_base_path: PathBuf,
73 audio_file_hashes: HashMap<Uuid, DataHash>,
74 audio_manifest_hash: DataHash,
75 player_registry: Arc<Mutex<PlayerRegistry<Player>>>,
76 state: Arc<Mutex<State>>,
77 trigger_registry: TriggerRegistry<Trigger>,
78 hot_achievements: Vec<HotAchievement>,
79 network_rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
80 event_tx: broadcast::Sender<Event>,
81 throttle: Option<Throttle>,
82}
83
84impl<Metadata, Player, State, Trigger> Engine<Metadata, Player, State, Trigger>
85where
86 Player: PlayerInfo + PlayerMutator,
87 Trigger: Copy,
88{
89 pub async fn process_requests(&mut self) {
90 while let Some((request, response)) = self.network_rx.recv().await {
91 match request {
92 EngineRequest::Bloop { nfc_uid, client_id } => {
93 self.handle_bloop(nfc_uid, client_id, response).await;
94 }
95 EngineRequest::RetrieveAudio { id } => {
96 self.handle_retrieve_audio(id, response);
97 }
98 EngineRequest::PreloadCheck { manifest_hash } => {
99 self.handle_preload_check(manifest_hash, response);
100 }
101 }
102 }
103 }
104
105 #[instrument(skip(self, response))]
106 async fn handle_bloop(
107 &mut self,
108 nfc_uid: NfcUid,
109 client_id: String,
110 response: oneshot::Sender<ServerMessage>,
111 ) {
112 if self
113 .trigger_registry
114 .try_activate_trigger(nfc_uid, &client_id)
115 {
116 let _ = response.send(ServerMessage::BloopAccepted {
117 achievements: Vec::new(),
118 });
119 return;
120 }
121
122 let player = {
123 let player_registry = self.player_registry.lock().await;
124 let Some(player) = player_registry.get_by_nfc_uid(nfc_uid) else {
125 let _ = response.send(ServerMessage::Error(ErrorResponse::UnknownNfcUid));
126 return;
127 };
128 player
129 };
130
131 if let Some(throttle) = self.throttle.as_ref() {
132 let player_id = player.read().unwrap().id();
133
134 let recent_bloops = self
135 .bloop_provider
136 .for_client(&client_id)
137 .iter()
138 .filter(bloops_since(Utc::now() - throttle.threshold))
139 .take(throttle.max_bloops)
140 .collect::<Vec<_>>();
141
142 if recent_bloops
143 .iter()
144 .all(|bloop| bloop.player_id == player_id)
145 && recent_bloops.len() == throttle.max_bloops
146 {
147 let _ = response.send(ServerMessage::Error(ErrorResponse::NfcUidThrottled));
148 return;
149 }
150 }
151
152 player.write().unwrap().increment_bloops();
153 let bloop = Bloop::new(player.clone(), client_id, Utc::now());
154
155 let mut awarded_tracker = self.evaluate_achievements(&bloop).await;
156 self.activate_hot_achievements(&bloop, &awarded_tracker);
157 self.inject_hot_achievements(&bloop, &mut awarded_tracker);
158
159 let player_registry = self.player_registry.lock().await;
160 awarded_tracker.remove_duplicates(player_registry);
161
162 let achievement_ids: Vec<Uuid> = awarded_tracker
163 .for_player(bloop.player_id)
164 .map_or_else(Vec::new, |set| set.iter().cloned().collect());
165
166 self.apply_awarded(awarded_tracker).await;
167
168 let processed_bloop: ProcessedBloop = (&bloop).into();
169 self.bloop_provider.add(Arc::new(bloop));
170 let _ = self.event_tx.send(Event::BloopProcessed(processed_bloop));
171
172 let _ = response.send(ServerMessage::BloopAccepted {
173 achievements: achievement_ids
174 .into_iter()
175 .map(|id| AchievementRecord {
176 id,
177 audio_file_hash: self.audio_file_hashes.get(&id).cloned(),
178 })
179 .collect(),
180 });
181 }
182
183 async fn evaluate_achievements(&mut self, bloop: &Bloop<Player>) -> AwardedTracker {
184 let previous_awarded: HashSet<Uuid> = {
185 let player = bloop.player();
186 player.awarded_achievements().keys().cloned().collect()
187 };
188 let metadata = self.state.lock().await;
189 let ctx = AchievementContext::new(
190 bloop,
191 &self.bloop_provider,
192 &*metadata,
193 &mut self.trigger_registry,
194 );
195
196 for achievement in self.achievements.values() {
197 if !previous_awarded.contains(&achievement.id) {
198 achievement.evaluate(&ctx);
199 }
200 }
201
202 ctx.take_awarded_tracker()
203 }
204
205 fn inject_hot_achievements(
206 &mut self,
207 bloop: &Bloop<Player>,
208 awarded_tracker: &mut AwardedTracker,
209 ) {
210 let awarded = awarded_tracker.for_player_mut(bloop.player_id);
211
212 self.hot_achievements.retain(|hot_achievement| {
213 if hot_achievement.until < bloop.recorded_at {
214 return false;
215 }
216
217 if hot_achievement.client_id == bloop.client_id {
218 awarded.insert(hot_achievement.id);
219 }
220
221 true
222 });
223 }
224
225 fn activate_hot_achievements(
226 &mut self,
227 bloop: &Bloop<Player>,
228 awarded_tracker: &AwardedTracker,
229 ) {
230 let Some(awarded) = awarded_tracker.for_player(bloop.player_id) else {
231 return;
232 };
233
234 for achievement_id in awarded {
235 let Some(achievement) = self.achievements.get(achievement_id) else {
236 continue;
237 };
238
239 if let Some(hot_duration) = achievement.hot_duration {
240 self.hot_achievements.push(HotAchievement::new(
241 *achievement_id,
242 bloop,
243 hot_duration,
244 ));
245 };
246 }
247 }
248
249 async fn apply_awarded(&self, tracker: AwardedTracker) {
250 let mut player_registry = self.player_registry.lock().await;
251 let batch: AchievementAwardBatch = tracker.into();
252
253 for player_awards in batch.players.iter() {
254 player_registry.mutate_by_id(player_awards.player_id, |player| {
255 for achievement_id in player_awards.achievement_ids.iter() {
256 player.add_awarded_achievement(*achievement_id, batch.awarded_at);
257 }
258 });
259 }
260
261 let _ = self.event_tx.send(Event::AchievementsAwarded(batch));
262 }
263
264 #[instrument(skip(self, response))]
265 fn handle_retrieve_audio(&self, id: Uuid, response: oneshot::Sender<ServerMessage>) {
266 let Some(achievement) = self.achievements.get(&id) else {
267 info!("Client requested unknown achievement: {}", id);
268 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
269 return;
270 };
271
272 let Some(path) = achievement.audio_path.as_ref() else {
273 info!("Client requested audio for audio-less achievement: {}", id);
274 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
275 return;
276 };
277 let path = self.audio_base_path.join(path);
278
279 tokio::spawn(async move {
280 let mut file = match File::open(&path).await {
281 Ok(file) => file,
282 Err(err) => {
283 warn!("Failed to open file {:?}: {:?}", &path, err);
284 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
285 return;
286 }
287 };
288
289 let mut data = vec![];
290
291 if let Err(err) = file.read_to_end(&mut data).await {
292 warn!("Failed to read file {:?}: {:?}", &path, err);
293 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
294 return;
295 }
296
297 let _ = response.send(ServerMessage::AudioData { data });
298 });
299 }
300
301 #[instrument(skip(self, response))]
302 fn handle_preload_check(
303 &self,
304 manifest_hash: Option<DataHash>,
305 response: oneshot::Sender<ServerMessage>,
306 ) {
307 if let Some(manifest_hash) = manifest_hash {
308 if manifest_hash == self.audio_manifest_hash {
309 let _ = response.send(ServerMessage::PreloadMatch);
310 return;
311 }
312 }
313
314 let _ = response.send(ServerMessage::PreloadMismatch {
315 audio_manifest_hash: self.audio_manifest_hash,
316 achievements: self
317 .achievements
318 .values()
319 .map(|achievement| AchievementRecord {
320 id: achievement.id,
321 audio_file_hash: self.audio_file_hashes.get(&achievement.id).cloned(),
322 })
323 .collect(),
324 });
325 }
326}
327
328#[cfg(feature = "tokio-graceful-shutdown")]
329#[derive(Debug, Error)]
330pub enum NeverError {}
331
332#[cfg(feature = "tokio-graceful-shutdown")]
333#[async_trait]
334impl<Metadata, Player, State, Trigger> IntoSubsystem<NeverError>
335 for Engine<Metadata, Player, State, Trigger>
336where
337 Metadata: Send + Sync + 'static,
338 Player: PlayerInfo + PlayerMutator + Send + Sync + 'static,
339 State: Send + Sync + 'static,
340 Trigger: Copy + PartialEq + Eq + Debug + Send + Sync + 'static,
341{
342 async fn run(mut self, subsys: SubsystemHandle) -> Result<(), NeverError> {
343 let _ = self.process_requests().cancel_on_shutdown(&subsys).await;
344 Ok(())
345 }
346}
347
348#[derive(Debug, Error)]
349pub enum BuilderError {
350 #[error("missing field: {0}")]
351 MissingField(&'static str),
352}
353
354#[derive(Debug, Default)]
355pub struct EngineBuilder<Player, State = (), Trigger = (), Metadata = ()>
356where
357 Player: PlayerInfo + PlayerMutator,
358 Metadata: Default,
359 Trigger: Copy,
360{
361 bloops: Vec<Bloop<Player>>,
362 achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
363 bloop_retention: Option<Duration>,
364 audio_base_path: Option<PathBuf>,
365 player_registry: Option<Arc<Mutex<PlayerRegistry<Player>>>>,
366 state: Option<Arc<Mutex<State>>>,
367 trigger_registry: Option<TriggerRegistry<Trigger>>,
368 network_rx: Option<mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>>,
369 event_tx: Option<broadcast::Sender<Event>>,
370 throttle: Option<Throttle>,
371}
372
373impl<Player, State, Trigger, Metadata> EngineBuilder<Player, State, Trigger, Metadata>
374where
375 Player: PlayerInfo + PlayerMutator,
376 Metadata: Default,
377 State: Default,
378 Trigger: Copy + PartialEq + Eq + Debug,
379{
380 pub fn new() -> Self {
381 Self {
382 bloops: Vec::new(),
383 achievements: Vec::new(),
384 bloop_retention: None,
385 audio_base_path: None,
386 player_registry: None,
387 state: None,
388 trigger_registry: None,
389 network_rx: None,
390 event_tx: None,
391 throttle: None,
392 }
393 }
394
395 pub fn bloops(mut self, bloops: Vec<Bloop<Player>>) -> Self {
396 self.bloops = bloops;
397 self
398 }
399
400 pub fn achievements(
401 mut self,
402 achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
403 ) -> Self {
404 self.achievements = achievements;
405 self
406 }
407
408 pub fn bloop_retention(mut self, retention: Duration) -> Self {
409 self.bloop_retention = Some(retention);
410 self
411 }
412
413 pub fn audio_base_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
414 self.audio_base_path = Some(path.into());
415 self
416 }
417
418 pub fn player_registry(mut self, registry: Arc<Mutex<PlayerRegistry<Player>>>) -> Self {
419 self.player_registry = Some(registry);
420 self
421 }
422
423 pub fn metadata(mut self, metadata: Arc<Mutex<State>>) -> Self {
424 self.state = Some(metadata);
425 self
426 }
427
428 pub fn trigger_registry(mut self, registry: TriggerRegistry<Trigger>) -> Self {
429 self.trigger_registry = Some(registry);
430 self
431 }
432
433 pub fn network_rx(
434 mut self,
435 rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
436 ) -> Self {
437 self.network_rx = Some(rx);
438 self
439 }
440
441 pub fn event_tx(mut self, tx: broadcast::Sender<Event>) -> Self {
442 self.event_tx = Some(tx);
443 self
444 }
445
446 pub fn throttle(mut self, throttle: Throttle) -> Self {
447 self.throttle = Some(throttle);
448 self
449 }
450
451 pub fn build(self) -> Result<Engine<Metadata, Player, State, Trigger>, BuilderError> {
453 let bloop_retention = self
454 .bloop_retention
455 .ok_or(BuilderError::MissingField("bloop_retention"))?;
456 let audio_base_path = self
457 .audio_base_path
458 .ok_or(BuilderError::MissingField("audio_base_path"))?;
459 let player_registry = self
460 .player_registry
461 .ok_or(BuilderError::MissingField("player_registry"))?;
462 let network_rx = self
463 .network_rx
464 .ok_or(BuilderError::MissingField("network_rx"))?;
465 let event_tx = self
466 .event_tx
467 .ok_or(BuilderError::MissingField("event_tx"))?;
468
469 let (audio_hashes, global_hash) =
470 collect_audio_hashes(&audio_base_path, &self.achievements);
471 let bloop_provider = BloopProvider::with_bloops(bloop_retention, self.bloops);
472 let achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>> =
473 self.achievements.into_iter().map(|a| (a.id, a)).collect();
474 let state = self
475 .state
476 .unwrap_or_else(|| Arc::new(Mutex::new(Default::default())));
477 let trigger_registry = self
478 .trigger_registry
479 .unwrap_or_else(|| TriggerRegistry::new(HashMap::new()));
480 let hot_achievements = Vec::new();
481
482 Ok(Engine {
483 bloop_provider,
484 achievements,
485 audio_base_path,
486 audio_file_hashes: audio_hashes,
487 audio_manifest_hash: global_hash,
488 player_registry,
489 state,
490 trigger_registry,
491 hot_achievements,
492 network_rx,
493 event_tx,
494 throttle: self.throttle,
495 })
496 }
497}
498
499fn collect_audio_hashes<Metadata, Player, State, Trigger>(
500 audio_base_path: &Path,
501 achievements: &[Achievement<Metadata, Player, State, Trigger>],
502) -> (HashMap<Uuid, DataHash>, DataHash) {
503 let audio_file_hashes: HashMap<Uuid, DataHash> = achievements
504 .iter()
505 .filter_map(|achievement| {
506 let path = achievement.audio_path.as_ref()?;
507 let path = audio_base_path.join(path);
508
509 let Ok(file_content) = fs::read(path.clone()) else {
510 warn!("Audio file missing: {:?}", path);
511 return None;
512 };
513
514 let digest = md5::compute(file_content);
515
516 Some((achievement.id, digest.into()))
517 })
518 .collect();
519
520 let mut entries: Vec<_> = audio_file_hashes.iter().collect();
521 entries.sort_by_key(|(id, _)| *id);
522 let mut hash_input = Vec::with_capacity(entries.len() * 32);
523
524 for (id, hash) in entries {
525 hash_input.extend(id.as_bytes());
526 hash_input.extend_from_slice(hash.as_bytes());
527 }
528
529 let manifest_hash = md5::compute(hash_input);
530
531 (audio_file_hashes, manifest_hash.into())
532}
533
534#[cfg(test)]
535mod tests {
536 use super::*;
537 use crate::test_utils::MockPlayer;
538 use crate::trigger::{TriggerOccurrence, TriggerSpec};
539
540 fn build_test_engine() -> Engine<(), MockPlayer, (), ()> {
541 let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
542 let trigger_registry = TriggerRegistry::new(HashMap::new());
543
544 EngineBuilder::<MockPlayer, (), (), ()>::new()
545 .bloop_retention(Duration::from_secs(3600))
546 .audio_base_path("./audio")
547 .player_registry(player_registry)
548 .trigger_registry(trigger_registry)
549 .network_rx(mpsc::channel(1).1)
550 .event_tx(broadcast::channel(16).0)
551 .build()
552 .unwrap()
553 }
554
555 #[tokio::test]
556 async fn handle_bloop_rejects_unknown_nfc_uid() {
557 let mut engine = build_test_engine();
558 let unknown_nfc_uid = NfcUid::default();
559 let client_id = "test-client".to_string();
560
561 let (resp_tx, resp_rx) = oneshot::channel();
562 engine
563 .handle_bloop(unknown_nfc_uid, client_id.clone(), resp_tx)
564 .await;
565
566 let response = resp_rx.await.unwrap();
567 match response {
568 ServerMessage::Error(err) => {
569 assert!(matches!(err, ErrorResponse::UnknownNfcUid));
570 }
571 _ => panic!("Expected Error response for unknown NFC UID"),
572 }
573 }
574
575 #[tokio::test]
576 async fn handle_bloop_accepts_known_player() {
577 let mut engine = build_test_engine();
578 let nfc_uid = NfcUid::default();
579 let client_id = "test-client".to_string();
580
581 {
582 let mut registry = engine.player_registry.lock().await;
583 let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
584 registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
585 }
586
587 let (resp_tx, resp_rx) = oneshot::channel();
588 engine
589 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
590 .await;
591
592 let response = resp_rx.await.unwrap();
593 match response {
594 ServerMessage::BloopAccepted { achievements } => {
595 assert!(achievements.is_empty());
596 }
597 _ => panic!("Expected BloopAccepted response"),
598 }
599 }
600
601 #[tokio::test]
602 async fn handle_bloop_activates_trigger_and_responds() {
603 let mut trigger_registry = HashMap::new();
604 trigger_registry.insert(
605 NfcUid::default(),
606 TriggerSpec {
607 trigger: (),
608 global: false,
609 occurrence: TriggerOccurrence::Once,
610 },
611 );
612 let trigger_registry = TriggerRegistry::new(trigger_registry);
613
614 let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
615
616 let (_tx, rx) = mpsc::channel(1);
617 let (evt_tx, _) = broadcast::channel(16);
618
619 let mut engine = EngineBuilder::<MockPlayer, (), (), ()>::new()
620 .bloop_retention(Duration::from_secs(3600))
621 .audio_base_path("./audio")
622 .player_registry(player_registry)
623 .trigger_registry(trigger_registry)
624 .network_rx(rx)
625 .event_tx(evt_tx)
626 .build()
627 .unwrap();
628
629 let nfc_uid = NfcUid::default();
630 let client_id = "client".to_string();
631 let (resp_tx, resp_rx) = oneshot::channel();
632
633 engine
634 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
635 .await;
636
637 let response = resp_rx.await.unwrap();
638 match response {
639 ServerMessage::BloopAccepted { achievements } => {
640 assert!(achievements.is_empty());
641 }
642 _ => panic!("Expected BloopAccepted response"),
643 }
644
645 assert!(
646 engine
647 .trigger_registry
648 .check_active_trigger((), "client", Utc::now())
649 );
650 }
651
652 #[tokio::test]
653 async fn handle_bloop_respects_throttling() {
654 let mut engine = build_test_engine();
655 let nfc_uid = NfcUid::default();
656 let client_id = "test-client".to_string();
657
658 {
659 let mut registry = engine.player_registry.lock().await;
660 let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
661 registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
662 }
663
664 engine.throttle = Some(Throttle::new(1, Duration::from_secs(10)));
665
666 let bloop = Bloop::new(
667 engine
668 .player_registry
669 .lock()
670 .await
671 .get_by_nfc_uid(nfc_uid)
672 .unwrap(),
673 client_id.clone(),
674 Utc::now(),
675 );
676 engine.bloop_provider.add(Arc::new(bloop));
677
678 let (resp_tx, resp_rx) = oneshot::channel();
679 engine
680 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
681 .await;
682
683 let response = resp_rx.await.unwrap();
684 match response {
685 ServerMessage::Error(err) => {
686 assert!(matches!(err, ErrorResponse::NfcUidThrottled));
687 }
688 _ => panic!("Expected throttling error"),
689 }
690 }
691}