1use crate::achievement::{Achievement, AchievementAwardBatch, AchievementContext, AwardedTracker};
2use crate::bloop::{Bloop, BloopProvider, ProcessedBloop, bloops_since};
3use crate::event::Event;
4use crate::message::{AchievementRecord, DataHash, ErrorResponse, ServerMessage};
5use crate::nfc_uid::NfcUid;
6use crate::player::{PlayerInfo, PlayerMutator, PlayerRegistry};
7use crate::trigger::TriggerRegistry;
8#[cfg(feature = "tokio-graceful-shutdown")]
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use serde::Deserialize;
12use std::collections::{HashMap, HashSet};
13use std::fmt::Debug;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use std::time::Duration;
17use thiserror::Error;
18use tokio::fs::File;
19use tokio::io::AsyncReadExt;
20use tokio::sync::{Mutex, broadcast, mpsc, oneshot};
21#[cfg(feature = "tokio-graceful-shutdown")]
22use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
23use tracing::{info, instrument, warn};
24use uuid::Uuid;
25
26#[derive(Debug)]
27pub enum EngineRequest {
28 Bloop { client_id: String, nfc_uid: NfcUid },
29 RetrieveAudio { id: Uuid },
30 PreloadCheck { manifest_hash: Option<DataHash> },
31}
32
33#[derive(Debug, Deserialize)]
34pub struct Throttle {
35 max_bloops: usize,
36 threshold: Duration,
37}
38
39impl Throttle {
40 pub fn new(max_bloops: usize, threshold: Duration) -> Self {
41 Self {
42 max_bloops,
43 threshold,
44 }
45 }
46}
47
48struct HotAchievement {
49 id: Uuid,
50 client_id: String,
51 until: DateTime<Utc>,
52}
53
54impl HotAchievement {
55 fn new<Player: PlayerInfo>(id: Uuid, bloop: &Bloop<Player>, duration: Duration) -> Self {
56 Self {
57 id,
58 client_id: bloop.client_id.clone(),
59 until: bloop.recorded_at + duration,
60 }
61 }
62}
63
64pub struct Engine<Metadata, Player, State, Trigger>
65where
66 Player: PlayerInfo + PlayerMutator,
67 Trigger: Copy,
68{
69 bloop_provider: BloopProvider<Player>,
70 achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>>,
71 audio_base_path: PathBuf,
72 audio_manifest_hash: DataHash,
73 player_registry: Arc<Mutex<PlayerRegistry<Player>>>,
74 state: Arc<Mutex<State>>,
75 trigger_registry: TriggerRegistry<Trigger>,
76 hot_achievements: Vec<HotAchievement>,
77 network_rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
78 event_tx: broadcast::Sender<Event>,
79 throttle: Option<Throttle>,
80}
81
82impl<Metadata, Player, State, Trigger> Engine<Metadata, Player, State, Trigger>
83where
84 Player: PlayerInfo + PlayerMutator,
85 Trigger: Copy,
86{
87 pub async fn process_requests(&mut self) {
88 while let Some((request, response)) = self.network_rx.recv().await {
89 match request {
90 EngineRequest::Bloop { nfc_uid, client_id } => {
91 self.handle_bloop(nfc_uid, client_id, response).await;
92 }
93 EngineRequest::RetrieveAudio { id } => {
94 self.handle_retrieve_audio(id, response);
95 }
96 EngineRequest::PreloadCheck { manifest_hash } => {
97 self.handle_preload_check(manifest_hash, response);
98 }
99 }
100 }
101 }
102
103 #[instrument(skip(self, response))]
104 async fn handle_bloop(
105 &mut self,
106 nfc_uid: NfcUid,
107 client_id: String,
108 response: oneshot::Sender<ServerMessage>,
109 ) {
110 if self
111 .trigger_registry
112 .try_activate_trigger(nfc_uid, &client_id)
113 {
114 let _ = response.send(ServerMessage::BloopAccepted {
115 achievements: Vec::new(),
116 });
117 return;
118 }
119
120 let player = {
121 let player_registry = self.player_registry.lock().await;
122 let Some(player) = player_registry.get_by_nfc_uid(nfc_uid) else {
123 let _ = response.send(ServerMessage::Error(ErrorResponse::UnknownNfcUid));
124 return;
125 };
126 player
127 };
128
129 if let Some(throttle) = self.throttle.as_ref() {
130 let player_id = player.read().unwrap().id();
131
132 let recent_bloops = self
133 .bloop_provider
134 .for_client(&client_id)
135 .iter()
136 .filter(bloops_since(Utc::now() - throttle.threshold))
137 .take(throttle.max_bloops)
138 .collect::<Vec<_>>();
139
140 if recent_bloops
141 .iter()
142 .all(|bloop| bloop.player_id == player_id)
143 && recent_bloops.len() == throttle.max_bloops
144 {
145 let _ = response.send(ServerMessage::Error(ErrorResponse::NfcUidThrottled));
146 return;
147 }
148 }
149
150 player.write().unwrap().increment_bloops();
151 let bloop = Bloop::new(player.clone(), client_id, Utc::now());
152
153 let mut awarded_tracker = self.evaluate_achievements(&bloop).await;
154 self.activate_hot_achievements(&bloop, &awarded_tracker);
155 self.inject_hot_achievements(&bloop, &mut awarded_tracker);
156
157 let player_registry = self.player_registry.lock().await;
158 awarded_tracker.remove_duplicates(player_registry);
159
160 let achievement_ids: Vec<Uuid> = awarded_tracker
161 .for_player(bloop.player_id)
162 .map_or_else(Vec::new, |set| set.iter().cloned().collect());
163
164 self.apply_awarded(awarded_tracker).await;
165
166 let processed_bloop: ProcessedBloop = (&bloop).into();
167 self.bloop_provider.add(Arc::new(bloop));
168 let _ = self.event_tx.send(Event::BloopProcessed(processed_bloop));
169
170 let _ = response.send(ServerMessage::BloopAccepted {
171 achievements: achievement_ids
172 .into_iter()
173 .map(|id| AchievementRecord {
174 id,
175 audio_file_hash: self
176 .achievements
177 .get(&id)
178 .unwrap()
179 .audio_file
180 .resolve(&self.audio_base_path)
181 .as_ref()
182 .map(|file| file.hash),
183 })
184 .collect(),
185 });
186 }
187
188 async fn evaluate_achievements(&mut self, bloop: &Bloop<Player>) -> AwardedTracker {
189 let previous_awarded: HashSet<Uuid> = {
190 let player = bloop.player();
191 player.awarded_achievements().keys().cloned().collect()
192 };
193 let state = self.state.lock().await;
194 let ctx = AchievementContext::new(
195 bloop,
196 &self.bloop_provider,
197 &*state,
198 &mut self.trigger_registry,
199 );
200
201 for achievement in self.achievements.values() {
202 if !previous_awarded.contains(&achievement.id) {
203 achievement.evaluate(&ctx);
204 }
205 }
206
207 ctx.take_awarded_tracker()
208 }
209
210 fn inject_hot_achievements(
211 &mut self,
212 bloop: &Bloop<Player>,
213 awarded_tracker: &mut AwardedTracker,
214 ) {
215 let awarded = awarded_tracker.for_player_mut(bloop.player_id);
216
217 self.hot_achievements.retain(|hot_achievement| {
218 if hot_achievement.until < bloop.recorded_at {
219 return false;
220 }
221
222 if hot_achievement.client_id == bloop.client_id {
223 awarded.insert(hot_achievement.id);
224 }
225
226 true
227 });
228 }
229
230 fn activate_hot_achievements(
231 &mut self,
232 bloop: &Bloop<Player>,
233 awarded_tracker: &AwardedTracker,
234 ) {
235 let Some(awarded) = awarded_tracker.for_player(bloop.player_id) else {
236 return;
237 };
238
239 for achievement_id in awarded {
240 let Some(achievement) = self.achievements.get(achievement_id) else {
241 continue;
242 };
243
244 if let Some(hot_duration) = achievement.hot_duration {
245 self.hot_achievements.push(HotAchievement::new(
246 *achievement_id,
247 bloop,
248 hot_duration,
249 ));
250 };
251 }
252 }
253
254 async fn apply_awarded(&self, tracker: AwardedTracker) {
255 let mut player_registry = self.player_registry.lock().await;
256 let batch: AchievementAwardBatch = tracker.into();
257
258 for player_awards in batch.players.iter() {
259 player_registry.mutate_by_id(player_awards.player_id, |player| {
260 for achievement_id in player_awards.achievement_ids.iter() {
261 player.add_awarded_achievement(*achievement_id, batch.awarded_at);
262 }
263 });
264 }
265
266 let _ = self.event_tx.send(Event::AchievementsAwarded(batch));
267 }
268
269 #[instrument(skip(self, response))]
270 fn handle_retrieve_audio(&self, id: Uuid, response: oneshot::Sender<ServerMessage>) {
271 let Some(achievement) = self.achievements.get(&id) else {
272 info!("Client requested unknown achievement: {}", id);
273 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
274 return;
275 };
276
277 let Some(audio_file) = achievement.audio_file.resolve(&self.audio_base_path) else {
278 info!("Client requested audio for audio-less achievement: {}", id);
279 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
280 return;
281 };
282
283 let path = audio_file.path.clone();
284
285 tokio::spawn(async move {
286 let mut file = match File::open(&path).await {
287 Ok(file) => file,
288 Err(err) => {
289 warn!("Failed to open file {:?}: {:?}", &path, err);
290 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
291 return;
292 }
293 };
294
295 let mut data = vec![];
296
297 if let Err(err) = file.read_to_end(&mut data).await {
298 warn!("Failed to read file {:?}: {:?}", &path, err);
299 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
300 return;
301 }
302
303 let _ = response.send(ServerMessage::AudioData { data });
304 });
305 }
306
307 #[instrument(skip(self, response))]
308 fn handle_preload_check(
309 &self,
310 manifest_hash: Option<DataHash>,
311 response: oneshot::Sender<ServerMessage>,
312 ) {
313 if let Some(manifest_hash) = manifest_hash
314 && manifest_hash == self.audio_manifest_hash
315 {
316 let _ = response.send(ServerMessage::PreloadMatch);
317 return;
318 }
319
320 let _ = response.send(ServerMessage::PreloadMismatch {
321 audio_manifest_hash: self.audio_manifest_hash,
322 achievements: self
323 .achievements
324 .values()
325 .map(|achievement| AchievementRecord {
326 id: achievement.id,
327 audio_file_hash: achievement
328 .audio_file
329 .resolve(&self.audio_base_path)
330 .as_ref()
331 .map(|file| file.hash),
332 })
333 .collect(),
334 });
335 }
336}
337
338#[cfg(feature = "tokio-graceful-shutdown")]
339#[derive(Debug, Error)]
340pub enum NeverError {}
341
342#[cfg(feature = "tokio-graceful-shutdown")]
343#[async_trait]
344impl<Metadata, Player, State, Trigger> IntoSubsystem<NeverError>
345 for Engine<Metadata, Player, State, Trigger>
346where
347 Metadata: Send + Sync + 'static,
348 Player: PlayerInfo + PlayerMutator + Send + Sync + 'static,
349 State: Send + Sync + 'static,
350 Trigger: Copy + PartialEq + Eq + Debug + Send + Sync + 'static,
351{
352 async fn run(mut self, subsys: SubsystemHandle) -> Result<(), NeverError> {
353 let _ = self.process_requests().cancel_on_shutdown(&subsys).await;
354 Ok(())
355 }
356}
357
358#[derive(Debug, Error)]
359pub enum BuilderError {
360 #[error("missing field: {0}")]
361 MissingField(&'static str),
362}
363
364#[derive(Debug, Default)]
365pub struct EngineBuilder<Player, State = (), Trigger = (), Metadata = ()>
366where
367 Player: PlayerInfo + PlayerMutator,
368 Trigger: Copy,
369{
370 bloops: Vec<Bloop<Player>>,
371 achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
372 bloop_retention: Option<Duration>,
373 audio_base_path: Option<PathBuf>,
374 player_registry: Option<Arc<Mutex<PlayerRegistry<Player>>>>,
375 state: Option<Arc<Mutex<State>>>,
376 trigger_registry: Option<TriggerRegistry<Trigger>>,
377 network_rx: Option<mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>>,
378 event_tx: Option<broadcast::Sender<Event>>,
379 throttle: Option<Throttle>,
380}
381
382impl<Player, State, Trigger, Metadata> EngineBuilder<Player, State, Trigger, Metadata>
383where
384 Player: PlayerInfo + PlayerMutator,
385 State: Default,
386 Trigger: Copy + PartialEq + Eq + Debug,
387{
388 pub fn new() -> Self {
389 Self {
390 bloops: Vec::new(),
391 achievements: Vec::new(),
392 bloop_retention: None,
393 audio_base_path: None,
394 player_registry: None,
395 state: None,
396 trigger_registry: None,
397 network_rx: None,
398 event_tx: None,
399 throttle: None,
400 }
401 }
402
403 pub fn bloops(mut self, bloops: Vec<Bloop<Player>>) -> Self {
404 self.bloops = bloops;
405 self
406 }
407
408 pub fn achievements(
409 mut self,
410 achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
411 ) -> Self {
412 self.achievements = achievements;
413 self
414 }
415
416 pub fn bloop_retention(mut self, retention: Duration) -> Self {
417 self.bloop_retention = Some(retention);
418 self
419 }
420
421 pub fn audio_base_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
422 self.audio_base_path = Some(path.into());
423 self
424 }
425
426 pub fn player_registry(mut self, registry: Arc<Mutex<PlayerRegistry<Player>>>) -> Self {
427 self.player_registry = Some(registry);
428 self
429 }
430
431 pub fn state(mut self, state: Arc<Mutex<State>>) -> Self {
432 self.state = Some(state);
433 self
434 }
435
436 pub fn trigger_registry(mut self, registry: TriggerRegistry<Trigger>) -> Self {
437 self.trigger_registry = Some(registry);
438 self
439 }
440
441 pub fn network_rx(
442 mut self,
443 rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
444 ) -> Self {
445 self.network_rx = Some(rx);
446 self
447 }
448
449 pub fn event_tx(mut self, tx: broadcast::Sender<Event>) -> Self {
450 self.event_tx = Some(tx);
451 self
452 }
453
454 pub fn throttle(mut self, throttle: Throttle) -> Self {
455 self.throttle = Some(throttle);
456 self
457 }
458
459 pub fn build(self) -> Result<Engine<Metadata, Player, State, Trigger>, BuilderError> {
461 let bloop_retention = self
462 .bloop_retention
463 .ok_or(BuilderError::MissingField("bloop_retention"))?;
464 let audio_base_path = self
465 .audio_base_path
466 .ok_or(BuilderError::MissingField("audio_base_path"))?;
467 let player_registry = self
468 .player_registry
469 .ok_or(BuilderError::MissingField("player_registry"))?;
470 let network_rx = self
471 .network_rx
472 .ok_or(BuilderError::MissingField("network_rx"))?;
473 let event_tx = self
474 .event_tx
475 .ok_or(BuilderError::MissingField("event_tx"))?;
476
477 let audio_manifest_hash = calculate_manifest_hash(&audio_base_path, &self.achievements);
478 let bloop_provider = BloopProvider::with_bloops(bloop_retention, self.bloops);
479 let achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>> =
480 self.achievements.into_iter().map(|a| (a.id, a)).collect();
481 let state = self
482 .state
483 .unwrap_or_else(|| Arc::new(Mutex::new(Default::default())));
484 let trigger_registry = self
485 .trigger_registry
486 .unwrap_or_else(|| TriggerRegistry::new(HashMap::new()));
487 let hot_achievements = Vec::new();
488
489 Ok(Engine {
490 bloop_provider,
491 achievements,
492 audio_base_path,
493 audio_manifest_hash,
494 player_registry,
495 state,
496 trigger_registry,
497 hot_achievements,
498 network_rx,
499 event_tx,
500 throttle: self.throttle,
501 })
502 }
503}
504
505fn calculate_manifest_hash<Metadata, Player, State, Trigger>(
506 audio_base_path: &Path,
507 achievements: &[Achievement<Metadata, Player, State, Trigger>],
508) -> DataHash {
509 let audio_file_hashes: HashMap<Uuid, DataHash> = achievements
510 .iter()
511 .filter_map(|achievement| {
512 achievement
513 .audio_file
514 .resolve(audio_base_path)
515 .as_ref()
516 .map(|file| (achievement.id, file.hash))
517 })
518 .collect();
519
520 let mut entries: Vec<_> = audio_file_hashes.iter().collect();
521 entries.sort_by_key(|(id, _)| *id);
522 let mut hash_input = Vec::with_capacity(entries.len() * 32);
523
524 for (id, hash) in entries {
525 hash_input.extend(id.as_bytes());
526 hash_input.extend_from_slice(hash.as_bytes());
527 }
528
529 let manifest_hash = md5::compute(hash_input);
530 manifest_hash.into()
531}
532
533#[cfg(test)]
534mod tests {
535 use super::*;
536 use crate::test_utils::MockPlayer;
537 use crate::trigger::{TriggerOccurrence, TriggerSpec};
538
539 fn build_test_engine() -> Engine<(), MockPlayer, (), ()> {
540 let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
541 let trigger_registry = TriggerRegistry::new(HashMap::new());
542
543 EngineBuilder::<MockPlayer, (), (), ()>::new()
544 .bloop_retention(Duration::from_secs(3600))
545 .audio_base_path("./audio")
546 .player_registry(player_registry)
547 .trigger_registry(trigger_registry)
548 .network_rx(mpsc::channel(1).1)
549 .event_tx(broadcast::channel(16).0)
550 .build()
551 .unwrap()
552 }
553
554 #[tokio::test]
555 async fn handle_bloop_rejects_unknown_nfc_uid() {
556 let mut engine = build_test_engine();
557 let unknown_nfc_uid = NfcUid::default();
558 let client_id = "test-client".to_string();
559
560 let (resp_tx, resp_rx) = oneshot::channel();
561 engine
562 .handle_bloop(unknown_nfc_uid, client_id.clone(), resp_tx)
563 .await;
564
565 let response = resp_rx.await.unwrap();
566 match response {
567 ServerMessage::Error(err) => {
568 assert!(matches!(err, ErrorResponse::UnknownNfcUid));
569 }
570 _ => panic!("Expected Error response for unknown NFC UID"),
571 }
572 }
573
574 #[tokio::test]
575 async fn handle_bloop_accepts_known_player() {
576 let mut engine = build_test_engine();
577 let nfc_uid = NfcUid::default();
578 let client_id = "test-client".to_string();
579
580 {
581 let mut registry = engine.player_registry.lock().await;
582 let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
583 registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
584 }
585
586 let (resp_tx, resp_rx) = oneshot::channel();
587 engine
588 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
589 .await;
590
591 let response = resp_rx.await.unwrap();
592 match response {
593 ServerMessage::BloopAccepted { achievements } => {
594 assert!(achievements.is_empty());
595 }
596 _ => panic!("Expected BloopAccepted response"),
597 }
598 }
599
600 #[tokio::test]
601 async fn handle_bloop_activates_trigger_and_responds() {
602 let mut trigger_registry = HashMap::new();
603 trigger_registry.insert(
604 NfcUid::default(),
605 TriggerSpec {
606 trigger: (),
607 global: false,
608 occurrence: TriggerOccurrence::Once,
609 },
610 );
611 let trigger_registry = TriggerRegistry::new(trigger_registry);
612
613 let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
614
615 let (_tx, rx) = mpsc::channel(1);
616 let (evt_tx, _) = broadcast::channel(16);
617
618 let mut engine = EngineBuilder::<MockPlayer, (), (), ()>::new()
619 .bloop_retention(Duration::from_secs(3600))
620 .audio_base_path("./audio")
621 .player_registry(player_registry)
622 .trigger_registry(trigger_registry)
623 .network_rx(rx)
624 .event_tx(evt_tx)
625 .build()
626 .unwrap();
627
628 let nfc_uid = NfcUid::default();
629 let client_id = "client".to_string();
630 let (resp_tx, resp_rx) = oneshot::channel();
631
632 engine
633 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
634 .await;
635
636 let response = resp_rx.await.unwrap();
637 match response {
638 ServerMessage::BloopAccepted { achievements } => {
639 assert!(achievements.is_empty());
640 }
641 _ => panic!("Expected BloopAccepted response"),
642 }
643
644 assert!(
645 engine
646 .trigger_registry
647 .check_active_trigger((), "client", Utc::now())
648 );
649 }
650
651 #[tokio::test]
652 async fn handle_bloop_respects_throttling() {
653 let mut engine = build_test_engine();
654 let nfc_uid = NfcUid::default();
655 let client_id = "test-client".to_string();
656
657 {
658 let mut registry = engine.player_registry.lock().await;
659 let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
660 registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
661 }
662
663 engine.throttle = Some(Throttle::new(1, Duration::from_secs(10)));
664
665 let bloop = Bloop::new(
666 engine
667 .player_registry
668 .lock()
669 .await
670 .get_by_nfc_uid(nfc_uid)
671 .unwrap(),
672 client_id.clone(),
673 Utc::now(),
674 );
675 engine.bloop_provider.add(Arc::new(bloop));
676
677 let (resp_tx, resp_rx) = oneshot::channel();
678 engine
679 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
680 .await;
681
682 let response = resp_rx.await.unwrap();
683 match response {
684 ServerMessage::Error(err) => {
685 assert!(matches!(err, ErrorResponse::NfcUidThrottled));
686 }
687 _ => panic!("Expected throttling error"),
688 }
689 }
690}