1use crate::achievement::{Achievement, AchievementAwardBatch, AchievementContext, AwardedTracker};
2use crate::bloop::{Bloop, BloopProvider, ProcessedBloop, bloops_since};
3use crate::event::Event;
4use crate::message::{AchievementRecord, DataHash, ErrorResponse, ServerMessage};
5use crate::nfc_uid::NfcUid;
6use crate::player::{PlayerInfo, PlayerMutator, PlayerRegistry};
7use crate::trigger::TriggerRegistry;
8use chrono::{DateTime, Utc};
9use serde::Deserialize;
10use std::collections::{HashMap, HashSet};
11use std::fmt::Debug;
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::time::Duration;
15use thiserror::Error;
16use tokio::fs::File;
17use tokio::io::AsyncReadExt;
18use tokio::sync::{Mutex, broadcast, mpsc, oneshot};
19#[cfg(feature = "tokio-graceful-shutdown")]
20use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
21use tracing::{info, instrument, warn};
22use uuid::Uuid;
23
24#[derive(Debug)]
25pub enum EngineRequest {
26 Bloop { client_id: String, nfc_uid: NfcUid },
27 RetrieveAudio { id: Uuid },
28 PreloadCheck { manifest_hash: Option<DataHash> },
29}
30
31#[derive(Debug, Deserialize)]
32pub struct Throttle {
33 max_bloops: usize,
34 threshold: Duration,
35}
36
37impl Throttle {
38 pub fn new(max_bloops: usize, threshold: Duration) -> Self {
39 Self {
40 max_bloops,
41 threshold,
42 }
43 }
44}
45
46struct HotAchievement {
47 id: Uuid,
48 client_id: String,
49 until: DateTime<Utc>,
50}
51
52impl HotAchievement {
53 fn new<Player: PlayerInfo>(id: Uuid, bloop: &Bloop<Player>, duration: Duration) -> Self {
54 Self {
55 id,
56 client_id: bloop.client_id.clone(),
57 until: bloop.recorded_at + duration,
58 }
59 }
60}
61
62pub struct Engine<Metadata, Player, State, Trigger>
63where
64 Player: PlayerInfo + PlayerMutator,
65 Trigger: Copy,
66{
67 bloop_provider: BloopProvider<Player>,
68 achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>>,
69 audio_base_path: PathBuf,
70 audio_manifest_hash: DataHash,
71 player_registry: Arc<Mutex<PlayerRegistry<Player>>>,
72 state: Arc<Mutex<State>>,
73 trigger_registry: TriggerRegistry<Trigger>,
74 hot_achievements: Vec<HotAchievement>,
75 network_rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
76 event_tx: broadcast::Sender<Event>,
77 throttle: Option<Throttle>,
78}
79
80impl<Metadata, Player, State, Trigger> Engine<Metadata, Player, State, Trigger>
81where
82 Player: PlayerInfo + PlayerMutator,
83 Trigger: Copy,
84{
85 pub async fn process_requests(&mut self) {
86 while let Some((request, response)) = self.network_rx.recv().await {
87 match request {
88 EngineRequest::Bloop { nfc_uid, client_id } => {
89 self.handle_bloop(nfc_uid, client_id, response).await;
90 }
91 EngineRequest::RetrieveAudio { id } => {
92 self.handle_retrieve_audio(id, response);
93 }
94 EngineRequest::PreloadCheck { manifest_hash } => {
95 self.handle_preload_check(manifest_hash, response);
96 }
97 }
98 }
99 }
100
101 #[instrument(skip(self, response))]
102 async fn handle_bloop(
103 &mut self,
104 nfc_uid: NfcUid,
105 client_id: String,
106 response: oneshot::Sender<ServerMessage>,
107 ) {
108 if self
109 .trigger_registry
110 .try_activate_trigger(nfc_uid, &client_id)
111 {
112 let _ = response.send(ServerMessage::BloopAccepted {
113 achievements: Vec::new(),
114 });
115 return;
116 }
117
118 let player = {
119 let player_registry = self.player_registry.lock().await;
120 let Some(player) = player_registry.get_by_nfc_uid(nfc_uid) else {
121 let _ = response.send(ServerMessage::Error(ErrorResponse::UnknownNfcUid));
122 return;
123 };
124 player
125 };
126
127 if let Some(throttle) = self.throttle.as_ref() {
128 let player_id = player.read().unwrap().id();
129
130 let recent_bloops = self
131 .bloop_provider
132 .for_client(&client_id)
133 .iter()
134 .filter(bloops_since(Utc::now() - throttle.threshold))
135 .take(throttle.max_bloops)
136 .collect::<Vec<_>>();
137
138 if recent_bloops
139 .iter()
140 .all(|bloop| bloop.player_id == player_id)
141 && recent_bloops.len() == throttle.max_bloops
142 {
143 let _ = response.send(ServerMessage::Error(ErrorResponse::NfcUidThrottled));
144 return;
145 }
146 }
147
148 player.write().unwrap().increment_bloops();
149 let bloop = Bloop::new(player.clone(), client_id, Utc::now());
150
151 let mut awarded_tracker = self.evaluate_achievements(&bloop).await;
152 self.activate_hot_achievements(&bloop, &awarded_tracker);
153 self.inject_hot_achievements(&bloop, &mut awarded_tracker);
154
155 let player_registry = self.player_registry.lock().await;
156 awarded_tracker.remove_duplicates(player_registry);
157
158 let achievement_ids: Vec<Uuid> = awarded_tracker
159 .for_player(bloop.player_id)
160 .map_or_else(Vec::new, |set| set.iter().cloned().collect());
161
162 self.apply_awarded(awarded_tracker).await;
163
164 let processed_bloop: ProcessedBloop = (&bloop).into();
165 self.bloop_provider.add(Arc::new(bloop));
166 let _ = self.event_tx.send(Event::BloopProcessed(processed_bloop));
167
168 let _ = response.send(ServerMessage::BloopAccepted {
169 achievements: achievement_ids
170 .into_iter()
171 .map(|id| AchievementRecord {
172 id,
173 audio_file_hash: self
174 .achievements
175 .get(&id)
176 .unwrap()
177 .audio_file
178 .resolve(&self.audio_base_path)
179 .as_ref()
180 .map(|file| file.hash),
181 })
182 .collect(),
183 });
184 }
185
186 async fn evaluate_achievements(&mut self, bloop: &Bloop<Player>) -> AwardedTracker {
187 let previous_awarded: HashSet<Uuid> = {
188 let player = bloop.player();
189 player.awarded_achievements().keys().cloned().collect()
190 };
191 let state = self.state.lock().await;
192 let ctx = AchievementContext::new(
193 bloop,
194 &self.bloop_provider,
195 &*state,
196 &mut self.trigger_registry,
197 );
198
199 for achievement in self.achievements.values() {
200 if !previous_awarded.contains(&achievement.id) {
201 achievement.evaluate(&ctx);
202 }
203 }
204
205 ctx.take_awarded_tracker()
206 }
207
208 fn inject_hot_achievements(
209 &mut self,
210 bloop: &Bloop<Player>,
211 awarded_tracker: &mut AwardedTracker,
212 ) {
213 let awarded = awarded_tracker.for_player_mut(bloop.player_id);
214
215 self.hot_achievements.retain(|hot_achievement| {
216 if hot_achievement.until < bloop.recorded_at {
217 return false;
218 }
219
220 if hot_achievement.client_id == bloop.client_id {
221 awarded.insert(hot_achievement.id);
222 }
223
224 true
225 });
226 }
227
228 fn activate_hot_achievements(
229 &mut self,
230 bloop: &Bloop<Player>,
231 awarded_tracker: &AwardedTracker,
232 ) {
233 let Some(awarded) = awarded_tracker.for_player(bloop.player_id) else {
234 return;
235 };
236
237 for achievement_id in awarded {
238 let Some(achievement) = self.achievements.get(achievement_id) else {
239 continue;
240 };
241
242 if let Some(hot_duration) = achievement.hot_duration {
243 self.hot_achievements.push(HotAchievement::new(
244 *achievement_id,
245 bloop,
246 hot_duration,
247 ));
248 };
249 }
250 }
251
252 async fn apply_awarded(&self, tracker: AwardedTracker) {
253 let mut player_registry = self.player_registry.lock().await;
254 let batch: AchievementAwardBatch = tracker.into();
255
256 for player_awards in batch.players.iter() {
257 player_registry.mutate_by_id(player_awards.player_id, |player| {
258 for achievement_id in player_awards.achievement_ids.iter() {
259 player.add_awarded_achievement(*achievement_id, batch.awarded_at);
260 }
261 });
262 }
263
264 let _ = self.event_tx.send(Event::AchievementsAwarded(batch));
265 }
266
267 #[instrument(skip(self, response))]
268 fn handle_retrieve_audio(&self, id: Uuid, response: oneshot::Sender<ServerMessage>) {
269 let Some(achievement) = self.achievements.get(&id) else {
270 info!("Client requested unknown achievement: {}", id);
271 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
272 return;
273 };
274
275 let Some(audio_file) = achievement.audio_file.resolve(&self.audio_base_path) else {
276 info!("Client requested audio for audio-less achievement: {}", id);
277 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
278 return;
279 };
280
281 let path = audio_file.path.clone();
282
283 tokio::spawn(async move {
284 let mut file = match File::open(&path).await {
285 Ok(file) => file,
286 Err(err) => {
287 warn!("Failed to open file {:?}: {:?}", &path, err);
288 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
289 return;
290 }
291 };
292
293 let mut data = vec![];
294
295 if let Err(err) = file.read_to_end(&mut data).await {
296 warn!("Failed to read file {:?}: {:?}", &path, err);
297 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
298 return;
299 }
300
301 let _ = response.send(ServerMessage::AudioData { data });
302 });
303 }
304
305 #[instrument(skip(self, response))]
306 fn handle_preload_check(
307 &self,
308 manifest_hash: Option<DataHash>,
309 response: oneshot::Sender<ServerMessage>,
310 ) {
311 if let Some(manifest_hash) = manifest_hash
312 && manifest_hash == self.audio_manifest_hash
313 {
314 let _ = response.send(ServerMessage::PreloadMatch);
315 return;
316 }
317
318 let _ = response.send(ServerMessage::PreloadMismatch {
319 audio_manifest_hash: self.audio_manifest_hash,
320 achievements: self
321 .achievements
322 .values()
323 .map(|achievement| AchievementRecord {
324 id: achievement.id,
325 audio_file_hash: achievement
326 .audio_file
327 .resolve(&self.audio_base_path)
328 .as_ref()
329 .map(|file| file.hash),
330 })
331 .collect(),
332 });
333 }
334}
335
336#[cfg(feature = "tokio-graceful-shutdown")]
337#[derive(Debug, Error)]
338pub enum NeverError {}
339
340#[cfg(feature = "tokio-graceful-shutdown")]
341impl<Metadata, Player, State, Trigger> IntoSubsystem<NeverError>
342 for Engine<Metadata, Player, State, Trigger>
343where
344 Metadata: Send + Sync + 'static,
345 Player: PlayerInfo + PlayerMutator + Send + Sync + 'static,
346 State: Send + Sync + 'static,
347 Trigger: Copy + PartialEq + Eq + Debug + Send + Sync + 'static,
348{
349 async fn run(mut self, subsys: &mut SubsystemHandle) -> Result<(), NeverError> {
350 let _ = self.process_requests().cancel_on_shutdown(subsys).await;
351 Ok(())
352 }
353}
354
355#[derive(Debug, Error)]
356pub enum BuilderError {
357 #[error("missing field: {0}")]
358 MissingField(&'static str),
359}
360
361#[derive(Debug, Default)]
362pub struct EngineBuilder<Player, State = (), Trigger = (), Metadata = ()>
363where
364 Player: PlayerInfo + PlayerMutator,
365 Trigger: Copy,
366{
367 bloops: Vec<Bloop<Player>>,
368 achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
369 bloop_retention: Option<Duration>,
370 audio_base_path: Option<PathBuf>,
371 player_registry: Option<Arc<Mutex<PlayerRegistry<Player>>>>,
372 state: Option<Arc<Mutex<State>>>,
373 trigger_registry: Option<TriggerRegistry<Trigger>>,
374 network_rx: Option<mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>>,
375 event_tx: Option<broadcast::Sender<Event>>,
376 throttle: Option<Throttle>,
377}
378
379impl<Player, State, Trigger, Metadata> EngineBuilder<Player, State, Trigger, Metadata>
380where
381 Player: PlayerInfo + PlayerMutator,
382 State: Default,
383 Trigger: Copy + PartialEq + Eq + Debug,
384{
385 pub fn new() -> Self {
386 Self {
387 bloops: Vec::new(),
388 achievements: Vec::new(),
389 bloop_retention: None,
390 audio_base_path: None,
391 player_registry: None,
392 state: None,
393 trigger_registry: None,
394 network_rx: None,
395 event_tx: None,
396 throttle: None,
397 }
398 }
399
400 pub fn bloops(mut self, bloops: Vec<Bloop<Player>>) -> Self {
401 self.bloops = bloops;
402 self
403 }
404
405 pub fn achievements(
406 mut self,
407 achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
408 ) -> Self {
409 self.achievements = achievements;
410 self
411 }
412
413 pub fn bloop_retention(mut self, retention: Duration) -> Self {
414 self.bloop_retention = Some(retention);
415 self
416 }
417
418 pub fn audio_base_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
419 self.audio_base_path = Some(path.into());
420 self
421 }
422
423 pub fn player_registry(mut self, registry: Arc<Mutex<PlayerRegistry<Player>>>) -> Self {
424 self.player_registry = Some(registry);
425 self
426 }
427
428 pub fn state(mut self, state: Arc<Mutex<State>>) -> Self {
429 self.state = Some(state);
430 self
431 }
432
433 pub fn trigger_registry(mut self, registry: TriggerRegistry<Trigger>) -> Self {
434 self.trigger_registry = Some(registry);
435 self
436 }
437
438 pub fn network_rx(
439 mut self,
440 rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
441 ) -> Self {
442 self.network_rx = Some(rx);
443 self
444 }
445
446 pub fn event_tx(mut self, tx: broadcast::Sender<Event>) -> Self {
447 self.event_tx = Some(tx);
448 self
449 }
450
451 pub fn throttle(mut self, throttle: Throttle) -> Self {
452 self.throttle = Some(throttle);
453 self
454 }
455
456 pub fn build(self) -> Result<Engine<Metadata, Player, State, Trigger>, BuilderError> {
458 let bloop_retention = self
459 .bloop_retention
460 .ok_or(BuilderError::MissingField("bloop_retention"))?;
461 let audio_base_path = self
462 .audio_base_path
463 .ok_or(BuilderError::MissingField("audio_base_path"))?;
464 let player_registry = self
465 .player_registry
466 .ok_or(BuilderError::MissingField("player_registry"))?;
467 let network_rx = self
468 .network_rx
469 .ok_or(BuilderError::MissingField("network_rx"))?;
470 let event_tx = self
471 .event_tx
472 .ok_or(BuilderError::MissingField("event_tx"))?;
473
474 let audio_manifest_hash = calculate_manifest_hash(&audio_base_path, &self.achievements);
475 let bloop_provider = BloopProvider::with_bloops(bloop_retention, self.bloops);
476 let achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>> =
477 self.achievements.into_iter().map(|a| (a.id, a)).collect();
478 let state = self
479 .state
480 .unwrap_or_else(|| Arc::new(Mutex::new(Default::default())));
481 let trigger_registry = self
482 .trigger_registry
483 .unwrap_or_else(|| TriggerRegistry::new(HashMap::new()));
484 let hot_achievements = Vec::new();
485
486 Ok(Engine {
487 bloop_provider,
488 achievements,
489 audio_base_path,
490 audio_manifest_hash,
491 player_registry,
492 state,
493 trigger_registry,
494 hot_achievements,
495 network_rx,
496 event_tx,
497 throttle: self.throttle,
498 })
499 }
500}
501
502fn calculate_manifest_hash<Metadata, Player, State, Trigger>(
503 audio_base_path: &Path,
504 achievements: &[Achievement<Metadata, Player, State, Trigger>],
505) -> DataHash {
506 let audio_file_hashes: HashMap<Uuid, DataHash> = achievements
507 .iter()
508 .filter_map(|achievement| {
509 achievement
510 .audio_file
511 .resolve(audio_base_path)
512 .as_ref()
513 .map(|file| (achievement.id, file.hash))
514 })
515 .collect();
516
517 let mut entries: Vec<_> = audio_file_hashes.iter().collect();
518 entries.sort_by_key(|(id, _)| *id);
519 let mut hash_input = Vec::with_capacity(entries.len() * 32);
520
521 for (id, hash) in entries {
522 hash_input.extend(id.as_bytes());
523 hash_input.extend_from_slice(hash.as_bytes());
524 }
525
526 let manifest_hash = md5::compute(hash_input);
527 manifest_hash.into()
528}
529
530#[cfg(test)]
531mod tests {
532 use super::*;
533 use crate::test_utils::MockPlayer;
534 use crate::trigger::{TriggerOccurrence, TriggerSpec};
535
536 fn build_test_engine() -> Engine<(), MockPlayer, (), ()> {
537 let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
538 let trigger_registry = TriggerRegistry::new(HashMap::new());
539
540 EngineBuilder::<MockPlayer, (), (), ()>::new()
541 .bloop_retention(Duration::from_secs(3600))
542 .audio_base_path("./audio")
543 .player_registry(player_registry)
544 .trigger_registry(trigger_registry)
545 .network_rx(mpsc::channel(1).1)
546 .event_tx(broadcast::channel(16).0)
547 .build()
548 .unwrap()
549 }
550
551 #[tokio::test]
552 async fn handle_bloop_rejects_unknown_nfc_uid() {
553 let mut engine = build_test_engine();
554 let unknown_nfc_uid = NfcUid::default();
555 let client_id = "test-client".to_string();
556
557 let (resp_tx, resp_rx) = oneshot::channel();
558 engine
559 .handle_bloop(unknown_nfc_uid, client_id.clone(), resp_tx)
560 .await;
561
562 let response = resp_rx.await.unwrap();
563 match response {
564 ServerMessage::Error(err) => {
565 assert!(matches!(err, ErrorResponse::UnknownNfcUid));
566 }
567 _ => panic!("Expected Error response for unknown NFC UID"),
568 }
569 }
570
571 #[tokio::test]
572 async fn handle_bloop_accepts_known_player() {
573 let mut engine = build_test_engine();
574 let nfc_uid = NfcUid::default();
575 let client_id = "test-client".to_string();
576
577 {
578 let mut registry = engine.player_registry.lock().await;
579 let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
580 registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
581 }
582
583 let (resp_tx, resp_rx) = oneshot::channel();
584 engine
585 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
586 .await;
587
588 let response = resp_rx.await.unwrap();
589 match response {
590 ServerMessage::BloopAccepted { achievements } => {
591 assert!(achievements.is_empty());
592 }
593 _ => panic!("Expected BloopAccepted response"),
594 }
595 }
596
597 #[tokio::test]
598 async fn handle_bloop_activates_trigger_and_responds() {
599 let mut trigger_registry = HashMap::new();
600 trigger_registry.insert(
601 NfcUid::default(),
602 TriggerSpec {
603 trigger: (),
604 global: false,
605 occurrence: TriggerOccurrence::Once,
606 },
607 );
608 let trigger_registry = TriggerRegistry::new(trigger_registry);
609
610 let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
611
612 let (_tx, rx) = mpsc::channel(1);
613 let (evt_tx, _) = broadcast::channel(16);
614
615 let mut engine = EngineBuilder::<MockPlayer, (), (), ()>::new()
616 .bloop_retention(Duration::from_secs(3600))
617 .audio_base_path("./audio")
618 .player_registry(player_registry)
619 .trigger_registry(trigger_registry)
620 .network_rx(rx)
621 .event_tx(evt_tx)
622 .build()
623 .unwrap();
624
625 let nfc_uid = NfcUid::default();
626 let client_id = "client".to_string();
627 let (resp_tx, resp_rx) = oneshot::channel();
628
629 engine
630 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
631 .await;
632
633 let response = resp_rx.await.unwrap();
634 match response {
635 ServerMessage::BloopAccepted { achievements } => {
636 assert!(achievements.is_empty());
637 }
638 _ => panic!("Expected BloopAccepted response"),
639 }
640
641 assert!(
642 engine
643 .trigger_registry
644 .check_active_trigger((), "client", Utc::now())
645 );
646 }
647
648 #[tokio::test]
649 async fn handle_bloop_respects_throttling() {
650 let mut engine = build_test_engine();
651 let nfc_uid = NfcUid::default();
652 let client_id = "test-client".to_string();
653
654 {
655 let mut registry = engine.player_registry.lock().await;
656 let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
657 registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
658 }
659
660 engine.throttle = Some(Throttle::new(1, Duration::from_secs(10)));
661
662 let bloop = Bloop::new(
663 engine
664 .player_registry
665 .lock()
666 .await
667 .get_by_nfc_uid(nfc_uid)
668 .unwrap(),
669 client_id.clone(),
670 Utc::now(),
671 );
672 engine.bloop_provider.add(Arc::new(bloop));
673
674 let (resp_tx, resp_rx) = oneshot::channel();
675 engine
676 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
677 .await;
678
679 let response = resp_rx.await.unwrap();
680 match response {
681 ServerMessage::Error(err) => {
682 assert!(matches!(err, ErrorResponse::NfcUidThrottled));
683 }
684 _ => panic!("Expected throttling error"),
685 }
686 }
687}