1use crate::achievement::{Achievement, AchievementAwardBatch, AchievementContext, AwardedTracker};
2use crate::bloop::{Bloop, BloopProvider, ProcessedBloop, bloops_since};
3use crate::event::Event;
4use crate::message::{AchievementRecord, DataHash, ErrorResponse, ServerMessage};
5use crate::nfc_uid::NfcUid;
6use crate::player::{PlayerInfo, PlayerMutator, PlayerRegistry};
7use crate::trigger::TriggerRegistry;
8#[cfg(feature = "tokio-graceful-shutdown")]
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use serde::Deserialize;
12use std::collections::{HashMap, HashSet};
13use std::fmt::Debug;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use std::time::Duration;
17use thiserror::Error;
18use tokio::fs::File;
19use tokio::io::AsyncReadExt;
20use tokio::sync::{Mutex, broadcast, mpsc, oneshot};
21#[cfg(feature = "tokio-graceful-shutdown")]
22use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
23use tracing::{info, instrument, warn};
24use uuid::Uuid;
25
26#[derive(Debug)]
27pub enum EngineRequest {
28 Bloop { client_id: String, nfc_uid: NfcUid },
29 RetrieveAudio { id: Uuid },
30 PreloadCheck { manifest_hash: Option<DataHash> },
31}
32
33#[derive(Debug, Deserialize)]
34pub struct Throttle {
35 max_bloops: usize,
36 threshold: Duration,
37}
38
39impl Throttle {
40 pub fn new(max_bloops: usize, threshold: Duration) -> Self {
41 Self {
42 max_bloops,
43 threshold,
44 }
45 }
46}
47
48struct HotAchievement {
49 id: Uuid,
50 client_id: String,
51 until: DateTime<Utc>,
52}
53
54impl HotAchievement {
55 fn new<Player: PlayerInfo>(id: Uuid, bloop: &Bloop<Player>, duration: Duration) -> Self {
56 Self {
57 id,
58 client_id: bloop.client_id.clone(),
59 until: bloop.recorded_at + duration,
60 }
61 }
62}
63
64pub struct Engine<Metadata, Player, State, Trigger>
65where
66 Player: PlayerInfo + PlayerMutator,
67 Trigger: Copy,
68{
69 bloop_provider: BloopProvider<Player>,
70 achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>>,
71 audio_base_path: PathBuf,
72 audio_manifest_hash: DataHash,
73 player_registry: Arc<Mutex<PlayerRegistry<Player>>>,
74 state: Arc<Mutex<State>>,
75 trigger_registry: TriggerRegistry<Trigger>,
76 hot_achievements: Vec<HotAchievement>,
77 network_rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
78 event_tx: broadcast::Sender<Event>,
79 throttle: Option<Throttle>,
80}
81
82impl<Metadata, Player, State, Trigger> Engine<Metadata, Player, State, Trigger>
83where
84 Player: PlayerInfo + PlayerMutator,
85 Trigger: Copy,
86{
87 pub async fn process_requests(&mut self) {
88 while let Some((request, response)) = self.network_rx.recv().await {
89 match request {
90 EngineRequest::Bloop { nfc_uid, client_id } => {
91 self.handle_bloop(nfc_uid, client_id, response).await;
92 }
93 EngineRequest::RetrieveAudio { id } => {
94 self.handle_retrieve_audio(id, response);
95 }
96 EngineRequest::PreloadCheck { manifest_hash } => {
97 self.handle_preload_check(manifest_hash, response);
98 }
99 }
100 }
101 }
102
103 #[instrument(skip(self, response))]
104 async fn handle_bloop(
105 &mut self,
106 nfc_uid: NfcUid,
107 client_id: String,
108 response: oneshot::Sender<ServerMessage>,
109 ) {
110 if self
111 .trigger_registry
112 .try_activate_trigger(nfc_uid, &client_id)
113 {
114 let _ = response.send(ServerMessage::BloopAccepted {
115 achievements: Vec::new(),
116 });
117 return;
118 }
119
120 let player = {
121 let player_registry = self.player_registry.lock().await;
122 let Some(player) = player_registry.get_by_nfc_uid(nfc_uid) else {
123 let _ = response.send(ServerMessage::Error(ErrorResponse::UnknownNfcUid));
124 return;
125 };
126 player
127 };
128
129 if let Some(throttle) = self.throttle.as_ref() {
130 let player_id = player.read().unwrap().id();
131
132 let recent_bloops = self
133 .bloop_provider
134 .for_client(&client_id)
135 .iter()
136 .filter(bloops_since(Utc::now() - throttle.threshold))
137 .take(throttle.max_bloops)
138 .collect::<Vec<_>>();
139
140 if recent_bloops
141 .iter()
142 .all(|bloop| bloop.player_id == player_id)
143 && recent_bloops.len() == throttle.max_bloops
144 {
145 let _ = response.send(ServerMessage::Error(ErrorResponse::NfcUidThrottled));
146 return;
147 }
148 }
149
150 player.write().unwrap().increment_bloops();
151 let bloop = Bloop::new(player.clone(), client_id, Utc::now());
152
153 let mut awarded_tracker = self.evaluate_achievements(&bloop).await;
154 self.activate_hot_achievements(&bloop, &awarded_tracker);
155 self.inject_hot_achievements(&bloop, &mut awarded_tracker);
156
157 let player_registry = self.player_registry.lock().await;
158 awarded_tracker.remove_duplicates(player_registry);
159
160 let achievement_ids: Vec<Uuid> = awarded_tracker
161 .for_player(bloop.player_id)
162 .map_or_else(Vec::new, |set| set.iter().cloned().collect());
163
164 self.apply_awarded(awarded_tracker).await;
165
166 let processed_bloop: ProcessedBloop = (&bloop).into();
167 self.bloop_provider.add(Arc::new(bloop));
168 let _ = self.event_tx.send(Event::BloopProcessed(processed_bloop));
169
170 let _ = response.send(ServerMessage::BloopAccepted {
171 achievements: achievement_ids
172 .into_iter()
173 .map(|id| AchievementRecord {
174 id,
175 audio_file_hash: self
176 .achievements
177 .get(&id)
178 .unwrap()
179 .audio_file
180 .resolve(&self.audio_base_path)
181 .as_ref()
182 .map(|file| file.hash),
183 })
184 .collect(),
185 });
186 }
187
188 async fn evaluate_achievements(&mut self, bloop: &Bloop<Player>) -> AwardedTracker {
189 let previous_awarded: HashSet<Uuid> = {
190 let player = bloop.player();
191 player.awarded_achievements().keys().cloned().collect()
192 };
193 let state = self.state.lock().await;
194 let ctx = AchievementContext::new(
195 bloop,
196 &self.bloop_provider,
197 &*state,
198 &mut self.trigger_registry,
199 );
200
201 for achievement in self.achievements.values() {
202 if !previous_awarded.contains(&achievement.id) {
203 achievement.evaluate(&ctx);
204 }
205 }
206
207 ctx.take_awarded_tracker()
208 }
209
210 fn inject_hot_achievements(
211 &mut self,
212 bloop: &Bloop<Player>,
213 awarded_tracker: &mut AwardedTracker,
214 ) {
215 let awarded = awarded_tracker.for_player_mut(bloop.player_id);
216
217 self.hot_achievements.retain(|hot_achievement| {
218 if hot_achievement.until < bloop.recorded_at {
219 return false;
220 }
221
222 if hot_achievement.client_id == bloop.client_id {
223 awarded.insert(hot_achievement.id);
224 }
225
226 true
227 });
228 }
229
230 fn activate_hot_achievements(
231 &mut self,
232 bloop: &Bloop<Player>,
233 awarded_tracker: &AwardedTracker,
234 ) {
235 let Some(awarded) = awarded_tracker.for_player(bloop.player_id) else {
236 return;
237 };
238
239 for achievement_id in awarded {
240 let Some(achievement) = self.achievements.get(achievement_id) else {
241 continue;
242 };
243
244 if let Some(hot_duration) = achievement.hot_duration {
245 self.hot_achievements.push(HotAchievement::new(
246 *achievement_id,
247 bloop,
248 hot_duration,
249 ));
250 };
251 }
252 }
253
254 async fn apply_awarded(&self, tracker: AwardedTracker) {
255 let mut player_registry = self.player_registry.lock().await;
256 let batch: AchievementAwardBatch = tracker.into();
257
258 for player_awards in batch.players.iter() {
259 player_registry.mutate_by_id(player_awards.player_id, |player| {
260 for achievement_id in player_awards.achievement_ids.iter() {
261 player.add_awarded_achievement(*achievement_id, batch.awarded_at);
262 }
263 });
264 }
265
266 let _ = self.event_tx.send(Event::AchievementsAwarded(batch));
267 }
268
269 #[instrument(skip(self, response))]
270 fn handle_retrieve_audio(&self, id: Uuid, response: oneshot::Sender<ServerMessage>) {
271 let Some(achievement) = self.achievements.get(&id) else {
272 info!("Client requested unknown achievement: {}", id);
273 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
274 return;
275 };
276
277 let Some(audio_file) = achievement.audio_file.resolve(&self.audio_base_path) else {
278 info!("Client requested audio for audio-less achievement: {}", id);
279 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
280 return;
281 };
282
283 let path = audio_file.path.clone();
284
285 tokio::spawn(async move {
286 let mut file = match File::open(&path).await {
287 Ok(file) => file,
288 Err(err) => {
289 warn!("Failed to open file {:?}: {:?}", &path, err);
290 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
291 return;
292 }
293 };
294
295 let mut data = vec![];
296
297 if let Err(err) = file.read_to_end(&mut data).await {
298 warn!("Failed to read file {:?}: {:?}", &path, err);
299 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
300 return;
301 }
302
303 let _ = response.send(ServerMessage::AudioData { data });
304 });
305 }
306
307 #[instrument(skip(self, response))]
308 fn handle_preload_check(
309 &self,
310 manifest_hash: Option<DataHash>,
311 response: oneshot::Sender<ServerMessage>,
312 ) {
313 if let Some(manifest_hash) = manifest_hash
314 && manifest_hash == self.audio_manifest_hash
315 {
316 let _ = response.send(ServerMessage::PreloadMatch);
317 return;
318 }
319
320 let _ = response.send(ServerMessage::PreloadMismatch {
321 audio_manifest_hash: self.audio_manifest_hash,
322 achievements: self
323 .achievements
324 .values()
325 .map(|achievement| AchievementRecord {
326 id: achievement.id,
327 audio_file_hash: achievement
328 .audio_file
329 .resolve(&self.audio_base_path)
330 .as_ref()
331 .map(|file| file.hash),
332 })
333 .collect(),
334 });
335 }
336}
337
338#[cfg(feature = "tokio-graceful-shutdown")]
339#[derive(Debug, Error)]
340pub enum NeverError {}
341
342#[cfg(feature = "tokio-graceful-shutdown")]
343#[async_trait]
344impl<Metadata, Player, State, Trigger> IntoSubsystem<NeverError>
345 for Engine<Metadata, Player, State, Trigger>
346where
347 Metadata: Send + Sync + 'static,
348 Player: PlayerInfo + PlayerMutator + Send + Sync + 'static,
349 State: Send + Sync + 'static,
350 Trigger: Copy + PartialEq + Eq + Debug + Send + Sync + 'static,
351{
352 async fn run(mut self, subsys: SubsystemHandle) -> Result<(), NeverError> {
353 let _ = self.process_requests().cancel_on_shutdown(&subsys).await;
354 Ok(())
355 }
356}
357
358#[derive(Debug, Error)]
359pub enum BuilderError {
360 #[error("missing field: {0}")]
361 MissingField(&'static str),
362}
363
364#[derive(Debug, Default)]
365pub struct EngineBuilder<Player, State = (), Trigger = (), Metadata = ()>
366where
367 Player: PlayerInfo + PlayerMutator,
368 Metadata: Default,
369 Trigger: Copy,
370{
371 bloops: Vec<Bloop<Player>>,
372 achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
373 bloop_retention: Option<Duration>,
374 audio_base_path: Option<PathBuf>,
375 player_registry: Option<Arc<Mutex<PlayerRegistry<Player>>>>,
376 state: Option<Arc<Mutex<State>>>,
377 trigger_registry: Option<TriggerRegistry<Trigger>>,
378 network_rx: Option<mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>>,
379 event_tx: Option<broadcast::Sender<Event>>,
380 throttle: Option<Throttle>,
381}
382
383impl<Player, State, Trigger, Metadata> EngineBuilder<Player, State, Trigger, Metadata>
384where
385 Player: PlayerInfo + PlayerMutator,
386 Metadata: Default,
387 State: Default,
388 Trigger: Copy + PartialEq + Eq + Debug,
389{
390 pub fn new() -> Self {
391 Self {
392 bloops: Vec::new(),
393 achievements: Vec::new(),
394 bloop_retention: None,
395 audio_base_path: None,
396 player_registry: None,
397 state: None,
398 trigger_registry: None,
399 network_rx: None,
400 event_tx: None,
401 throttle: None,
402 }
403 }
404
405 pub fn bloops(mut self, bloops: Vec<Bloop<Player>>) -> Self {
406 self.bloops = bloops;
407 self
408 }
409
410 pub fn achievements(
411 mut self,
412 achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
413 ) -> Self {
414 self.achievements = achievements;
415 self
416 }
417
418 pub fn bloop_retention(mut self, retention: Duration) -> Self {
419 self.bloop_retention = Some(retention);
420 self
421 }
422
423 pub fn audio_base_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
424 self.audio_base_path = Some(path.into());
425 self
426 }
427
428 pub fn player_registry(mut self, registry: Arc<Mutex<PlayerRegistry<Player>>>) -> Self {
429 self.player_registry = Some(registry);
430 self
431 }
432
433 pub fn state(mut self, state: Arc<Mutex<State>>) -> Self {
434 self.state = Some(state);
435 self
436 }
437
438 pub fn trigger_registry(mut self, registry: TriggerRegistry<Trigger>) -> Self {
439 self.trigger_registry = Some(registry);
440 self
441 }
442
443 pub fn network_rx(
444 mut self,
445 rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
446 ) -> Self {
447 self.network_rx = Some(rx);
448 self
449 }
450
451 pub fn event_tx(mut self, tx: broadcast::Sender<Event>) -> Self {
452 self.event_tx = Some(tx);
453 self
454 }
455
456 pub fn throttle(mut self, throttle: Throttle) -> Self {
457 self.throttle = Some(throttle);
458 self
459 }
460
461 pub fn build(self) -> Result<Engine<Metadata, Player, State, Trigger>, BuilderError> {
463 let bloop_retention = self
464 .bloop_retention
465 .ok_or(BuilderError::MissingField("bloop_retention"))?;
466 let audio_base_path = self
467 .audio_base_path
468 .ok_or(BuilderError::MissingField("audio_base_path"))?;
469 let player_registry = self
470 .player_registry
471 .ok_or(BuilderError::MissingField("player_registry"))?;
472 let network_rx = self
473 .network_rx
474 .ok_or(BuilderError::MissingField("network_rx"))?;
475 let event_tx = self
476 .event_tx
477 .ok_or(BuilderError::MissingField("event_tx"))?;
478
479 let audio_manifest_hash = calculate_manifest_hash(&audio_base_path, &self.achievements);
480 let bloop_provider = BloopProvider::with_bloops(bloop_retention, self.bloops);
481 let achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>> =
482 self.achievements.into_iter().map(|a| (a.id, a)).collect();
483 let state = self
484 .state
485 .unwrap_or_else(|| Arc::new(Mutex::new(Default::default())));
486 let trigger_registry = self
487 .trigger_registry
488 .unwrap_or_else(|| TriggerRegistry::new(HashMap::new()));
489 let hot_achievements = Vec::new();
490
491 Ok(Engine {
492 bloop_provider,
493 achievements,
494 audio_base_path,
495 audio_manifest_hash,
496 player_registry,
497 state,
498 trigger_registry,
499 hot_achievements,
500 network_rx,
501 event_tx,
502 throttle: self.throttle,
503 })
504 }
505}
506
507fn calculate_manifest_hash<Metadata, Player, State, Trigger>(
508 audio_base_path: &Path,
509 achievements: &[Achievement<Metadata, Player, State, Trigger>],
510) -> DataHash {
511 let audio_file_hashes: HashMap<Uuid, DataHash> = achievements
512 .iter()
513 .filter_map(|achievement| {
514 achievement
515 .audio_file
516 .resolve(audio_base_path)
517 .as_ref()
518 .map(|file| (achievement.id, file.hash))
519 })
520 .collect();
521
522 let mut entries: Vec<_> = audio_file_hashes.iter().collect();
523 entries.sort_by_key(|(id, _)| *id);
524 let mut hash_input = Vec::with_capacity(entries.len() * 32);
525
526 for (id, hash) in entries {
527 hash_input.extend(id.as_bytes());
528 hash_input.extend_from_slice(hash.as_bytes());
529 }
530
531 let manifest_hash = md5::compute(hash_input);
532 manifest_hash.into()
533}
534
535#[cfg(test)]
536mod tests {
537 use super::*;
538 use crate::test_utils::MockPlayer;
539 use crate::trigger::{TriggerOccurrence, TriggerSpec};
540
541 fn build_test_engine() -> Engine<(), MockPlayer, (), ()> {
542 let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
543 let trigger_registry = TriggerRegistry::new(HashMap::new());
544
545 EngineBuilder::<MockPlayer, (), (), ()>::new()
546 .bloop_retention(Duration::from_secs(3600))
547 .audio_base_path("./audio")
548 .player_registry(player_registry)
549 .trigger_registry(trigger_registry)
550 .network_rx(mpsc::channel(1).1)
551 .event_tx(broadcast::channel(16).0)
552 .build()
553 .unwrap()
554 }
555
556 #[tokio::test]
557 async fn handle_bloop_rejects_unknown_nfc_uid() {
558 let mut engine = build_test_engine();
559 let unknown_nfc_uid = NfcUid::default();
560 let client_id = "test-client".to_string();
561
562 let (resp_tx, resp_rx) = oneshot::channel();
563 engine
564 .handle_bloop(unknown_nfc_uid, client_id.clone(), resp_tx)
565 .await;
566
567 let response = resp_rx.await.unwrap();
568 match response {
569 ServerMessage::Error(err) => {
570 assert!(matches!(err, ErrorResponse::UnknownNfcUid));
571 }
572 _ => panic!("Expected Error response for unknown NFC UID"),
573 }
574 }
575
576 #[tokio::test]
577 async fn handle_bloop_accepts_known_player() {
578 let mut engine = build_test_engine();
579 let nfc_uid = NfcUid::default();
580 let client_id = "test-client".to_string();
581
582 {
583 let mut registry = engine.player_registry.lock().await;
584 let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
585 registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
586 }
587
588 let (resp_tx, resp_rx) = oneshot::channel();
589 engine
590 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
591 .await;
592
593 let response = resp_rx.await.unwrap();
594 match response {
595 ServerMessage::BloopAccepted { achievements } => {
596 assert!(achievements.is_empty());
597 }
598 _ => panic!("Expected BloopAccepted response"),
599 }
600 }
601
602 #[tokio::test]
603 async fn handle_bloop_activates_trigger_and_responds() {
604 let mut trigger_registry = HashMap::new();
605 trigger_registry.insert(
606 NfcUid::default(),
607 TriggerSpec {
608 trigger: (),
609 global: false,
610 occurrence: TriggerOccurrence::Once,
611 },
612 );
613 let trigger_registry = TriggerRegistry::new(trigger_registry);
614
615 let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
616
617 let (_tx, rx) = mpsc::channel(1);
618 let (evt_tx, _) = broadcast::channel(16);
619
620 let mut engine = EngineBuilder::<MockPlayer, (), (), ()>::new()
621 .bloop_retention(Duration::from_secs(3600))
622 .audio_base_path("./audio")
623 .player_registry(player_registry)
624 .trigger_registry(trigger_registry)
625 .network_rx(rx)
626 .event_tx(evt_tx)
627 .build()
628 .unwrap();
629
630 let nfc_uid = NfcUid::default();
631 let client_id = "client".to_string();
632 let (resp_tx, resp_rx) = oneshot::channel();
633
634 engine
635 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
636 .await;
637
638 let response = resp_rx.await.unwrap();
639 match response {
640 ServerMessage::BloopAccepted { achievements } => {
641 assert!(achievements.is_empty());
642 }
643 _ => panic!("Expected BloopAccepted response"),
644 }
645
646 assert!(
647 engine
648 .trigger_registry
649 .check_active_trigger((), "client", Utc::now())
650 );
651 }
652
653 #[tokio::test]
654 async fn handle_bloop_respects_throttling() {
655 let mut engine = build_test_engine();
656 let nfc_uid = NfcUid::default();
657 let client_id = "test-client".to_string();
658
659 {
660 let mut registry = engine.player_registry.lock().await;
661 let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
662 registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
663 }
664
665 engine.throttle = Some(Throttle::new(1, Duration::from_secs(10)));
666
667 let bloop = Bloop::new(
668 engine
669 .player_registry
670 .lock()
671 .await
672 .get_by_nfc_uid(nfc_uid)
673 .unwrap(),
674 client_id.clone(),
675 Utc::now(),
676 );
677 engine.bloop_provider.add(Arc::new(bloop));
678
679 let (resp_tx, resp_rx) = oneshot::channel();
680 engine
681 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
682 .await;
683
684 let response = resp_rx.await.unwrap();
685 match response {
686 ServerMessage::Error(err) => {
687 assert!(matches!(err, ErrorResponse::NfcUidThrottled));
688 }
689 _ => panic!("Expected throttling error"),
690 }
691 }
692}