1use crate::achievement::{Achievement, AchievementAwardBatch, AchievementContext, AwardedTracker};
2use crate::bloop::{Bloop, BloopProvider, ProcessedBloop, bloops_since};
3use crate::event::Event;
4use crate::message::{AchievementRecord, DataHash, ErrorResponse, ServerMessage};
5use crate::nfc_uid::NfcUid;
6use crate::player::{PlayerInfo, PlayerMutator, PlayerRegistry};
7use crate::trigger::TriggerRegistry;
8use chrono::{DateTime, Utc};
9use serde::Deserialize;
10use std::collections::{HashMap, HashSet};
11use std::fmt::Debug;
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::time::Duration;
15use thiserror::Error;
16use tokio::fs::File;
17use tokio::io::AsyncReadExt;
18use tokio::sync::{Mutex, broadcast, mpsc, oneshot};
19#[cfg(feature = "tokio-graceful-shutdown")]
20use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
21use tracing::{info, instrument, warn};
22use uuid::Uuid;
23
24#[derive(Debug)]
25pub enum EngineRequest {
26 Bloop { client_id: String, nfc_uid: NfcUid },
27 RetrieveAudio { id: Uuid },
28 PreloadCheck { manifest_hash: Option<DataHash> },
29}
30
31#[derive(Debug, Deserialize)]
32pub struct Throttle {
33 max_bloops: usize,
34 #[serde(with = "humantime_serde")]
35 threshold: Duration,
36}
37
38impl Throttle {
39 pub fn new(max_bloops: usize, threshold: Duration) -> Self {
40 Self {
41 max_bloops,
42 threshold,
43 }
44 }
45}
46
47struct HotAchievement {
48 id: Uuid,
49 client_id: String,
50 until: DateTime<Utc>,
51}
52
53impl HotAchievement {
54 fn new<Player: PlayerInfo>(id: Uuid, bloop: &Bloop<Player>, duration: Duration) -> Self {
55 Self {
56 id,
57 client_id: bloop.client_id.clone(),
58 until: bloop.recorded_at + duration,
59 }
60 }
61}
62
63pub struct Engine<Metadata, Player, State, Trigger>
64where
65 Player: PlayerInfo + PlayerMutator,
66 Trigger: Copy,
67{
68 bloop_provider: BloopProvider<Player>,
69 achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>>,
70 audio_base_path: PathBuf,
71 audio_manifest_hash: DataHash,
72 player_registry: Arc<Mutex<PlayerRegistry<Player>>>,
73 state: Arc<Mutex<State>>,
74 trigger_registry: TriggerRegistry<Trigger>,
75 hot_achievements: Vec<HotAchievement>,
76 network_rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
77 event_tx: broadcast::Sender<Event>,
78 throttle: Option<Throttle>,
79}
80
81impl<Metadata, Player, State, Trigger> Engine<Metadata, Player, State, Trigger>
82where
83 Player: PlayerInfo + PlayerMutator,
84 Trigger: Copy,
85{
86 pub async fn process_requests(&mut self) {
87 while let Some((request, response)) = self.network_rx.recv().await {
88 match request {
89 EngineRequest::Bloop { nfc_uid, client_id } => {
90 self.handle_bloop(nfc_uid, client_id, response).await;
91 }
92 EngineRequest::RetrieveAudio { id } => {
93 self.handle_retrieve_audio(id, response);
94 }
95 EngineRequest::PreloadCheck { manifest_hash } => {
96 self.handle_preload_check(manifest_hash, response);
97 }
98 }
99 }
100 }
101
102 #[instrument(skip(self, response))]
103 async fn handle_bloop(
104 &mut self,
105 nfc_uid: NfcUid,
106 client_id: String,
107 response: oneshot::Sender<ServerMessage>,
108 ) {
109 if self
110 .trigger_registry
111 .try_activate_trigger(nfc_uid, &client_id)
112 {
113 let _ = response.send(ServerMessage::BloopAccepted {
114 achievements: Vec::new(),
115 });
116 return;
117 }
118
119 let player = {
120 let player_registry = self.player_registry.lock().await;
121 let Some(player) = player_registry.get_by_nfc_uid(nfc_uid) else {
122 let _ = response.send(ServerMessage::Error(ErrorResponse::UnknownNfcUid));
123 return;
124 };
125 player
126 };
127
128 if let Some(throttle) = self.throttle.as_ref() {
129 let player_id = player.read().unwrap().id();
130
131 let recent_bloops = self
132 .bloop_provider
133 .for_client(&client_id)
134 .iter()
135 .filter(bloops_since(Utc::now() - throttle.threshold))
136 .take(throttle.max_bloops)
137 .collect::<Vec<_>>();
138
139 if recent_bloops
140 .iter()
141 .all(|bloop| bloop.player_id == player_id)
142 && recent_bloops.len() == throttle.max_bloops
143 {
144 let _ = response.send(ServerMessage::Error(ErrorResponse::NfcUidThrottled));
145 return;
146 }
147 }
148
149 player.write().unwrap().increment_bloops();
150 let bloop = Bloop::new(player.clone(), client_id, Utc::now());
151
152 let mut awarded_tracker = self.evaluate_achievements(&bloop).await;
153 self.activate_hot_achievements(&bloop, &awarded_tracker);
154 self.inject_hot_achievements(&bloop, &mut awarded_tracker);
155
156 let player_registry = self.player_registry.lock().await;
157 awarded_tracker.remove_duplicates(player_registry);
158
159 let achievement_ids: Vec<Uuid> = awarded_tracker
160 .for_player(bloop.player_id)
161 .map_or_else(Vec::new, |set| set.iter().cloned().collect());
162
163 self.apply_awarded(awarded_tracker).await;
164
165 let processed_bloop: ProcessedBloop = (&bloop).into();
166 self.bloop_provider.add(Arc::new(bloop));
167 let _ = self.event_tx.send(Event::BloopProcessed(processed_bloop));
168
169 let _ = response.send(ServerMessage::BloopAccepted {
170 achievements: achievement_ids
171 .into_iter()
172 .map(|id| AchievementRecord {
173 id,
174 audio_file_hash: self
175 .achievements
176 .get(&id)
177 .unwrap()
178 .audio_file
179 .resolve(&self.audio_base_path)
180 .as_ref()
181 .map(|file| file.hash),
182 })
183 .collect(),
184 });
185 }
186
187 async fn evaluate_achievements(&mut self, bloop: &Bloop<Player>) -> AwardedTracker {
188 let previous_awarded: HashSet<Uuid> = {
189 let player = bloop.player();
190 player.awarded_achievements().keys().cloned().collect()
191 };
192 let state = self.state.lock().await;
193 let ctx = AchievementContext::new(
194 bloop,
195 &self.bloop_provider,
196 &*state,
197 &mut self.trigger_registry,
198 );
199
200 for achievement in self.achievements.values() {
201 if !previous_awarded.contains(&achievement.id) {
202 achievement.evaluate(&ctx);
203 }
204 }
205
206 ctx.take_awarded_tracker()
207 }
208
209 fn inject_hot_achievements(
210 &mut self,
211 bloop: &Bloop<Player>,
212 awarded_tracker: &mut AwardedTracker,
213 ) {
214 let awarded = awarded_tracker.for_player_mut(bloop.player_id);
215
216 self.hot_achievements.retain(|hot_achievement| {
217 if hot_achievement.until < bloop.recorded_at {
218 return false;
219 }
220
221 if hot_achievement.client_id == bloop.client_id {
222 awarded.insert(hot_achievement.id);
223 }
224
225 true
226 });
227 }
228
229 fn activate_hot_achievements(
230 &mut self,
231 bloop: &Bloop<Player>,
232 awarded_tracker: &AwardedTracker,
233 ) {
234 let Some(awarded) = awarded_tracker.for_player(bloop.player_id) else {
235 return;
236 };
237
238 for achievement_id in awarded {
239 let Some(achievement) = self.achievements.get(achievement_id) else {
240 continue;
241 };
242
243 if let Some(hot_duration) = achievement.hot_duration {
244 self.hot_achievements.push(HotAchievement::new(
245 *achievement_id,
246 bloop,
247 hot_duration,
248 ));
249 };
250 }
251 }
252
253 async fn apply_awarded(&self, tracker: AwardedTracker) {
254 let mut player_registry = self.player_registry.lock().await;
255 let batch: AchievementAwardBatch = tracker.into();
256
257 for player_awards in batch.players.iter() {
258 player_registry.mutate_by_id(player_awards.player_id, |player| {
259 for achievement_id in player_awards.achievement_ids.iter() {
260 player.add_awarded_achievement(*achievement_id, batch.awarded_at);
261 }
262 });
263 }
264
265 let _ = self.event_tx.send(Event::AchievementsAwarded(batch));
266 }
267
268 #[instrument(skip(self, response))]
269 fn handle_retrieve_audio(&self, id: Uuid, response: oneshot::Sender<ServerMessage>) {
270 let Some(achievement) = self.achievements.get(&id) else {
271 info!("Client requested unknown achievement: {}", id);
272 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
273 return;
274 };
275
276 let Some(audio_file) = achievement.audio_file.resolve(&self.audio_base_path) else {
277 info!("Client requested audio for audio-less achievement: {}", id);
278 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
279 return;
280 };
281
282 let path = audio_file.path.clone();
283
284 tokio::spawn(async move {
285 let mut file = match File::open(&path).await {
286 Ok(file) => file,
287 Err(err) => {
288 warn!("Failed to open file {:?}: {:?}", &path, err);
289 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
290 return;
291 }
292 };
293
294 let mut data = vec![];
295
296 if let Err(err) = file.read_to_end(&mut data).await {
297 warn!("Failed to read file {:?}: {:?}", &path, err);
298 let _ = response.send(ServerMessage::Error(ErrorResponse::AudioUnavailable));
299 return;
300 }
301
302 let _ = response.send(ServerMessage::AudioData { data });
303 });
304 }
305
306 #[instrument(skip(self, response))]
307 fn handle_preload_check(
308 &self,
309 manifest_hash: Option<DataHash>,
310 response: oneshot::Sender<ServerMessage>,
311 ) {
312 if let Some(manifest_hash) = manifest_hash
313 && manifest_hash == self.audio_manifest_hash
314 {
315 let _ = response.send(ServerMessage::PreloadMatch);
316 return;
317 }
318
319 let _ = response.send(ServerMessage::PreloadMismatch {
320 audio_manifest_hash: self.audio_manifest_hash,
321 achievements: self
322 .achievements
323 .values()
324 .map(|achievement| AchievementRecord {
325 id: achievement.id,
326 audio_file_hash: achievement
327 .audio_file
328 .resolve(&self.audio_base_path)
329 .as_ref()
330 .map(|file| file.hash),
331 })
332 .collect(),
333 });
334 }
335}
336
337#[cfg(feature = "tokio-graceful-shutdown")]
338#[derive(Debug, Error)]
339pub enum NeverError {}
340
341#[cfg(feature = "tokio-graceful-shutdown")]
342impl<Metadata, Player, State, Trigger> IntoSubsystem<NeverError>
343 for Engine<Metadata, Player, State, Trigger>
344where
345 Metadata: Send + Sync + 'static,
346 Player: PlayerInfo + PlayerMutator + Send + Sync + 'static,
347 State: Send + Sync + 'static,
348 Trigger: Copy + PartialEq + Eq + Debug + Send + Sync + 'static,
349{
350 async fn run(mut self, subsys: &mut SubsystemHandle) -> Result<(), NeverError> {
351 let _ = self.process_requests().cancel_on_shutdown(subsys).await;
352 Ok(())
353 }
354}
355
356#[derive(Debug, Error)]
357pub enum BuilderError {
358 #[error("missing field: {0}")]
359 MissingField(&'static str),
360}
361
362#[derive(Debug, Default)]
363pub struct EngineBuilder<Player, State = (), Trigger = (), Metadata = ()>
364where
365 Player: PlayerInfo + PlayerMutator,
366 Trigger: Copy,
367{
368 bloops: Vec<Bloop<Player>>,
369 achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
370 bloop_retention: Option<Duration>,
371 audio_base_path: Option<PathBuf>,
372 player_registry: Option<Arc<Mutex<PlayerRegistry<Player>>>>,
373 state: Option<Arc<Mutex<State>>>,
374 trigger_registry: Option<TriggerRegistry<Trigger>>,
375 network_rx: Option<mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>>,
376 event_tx: Option<broadcast::Sender<Event>>,
377 throttle: Option<Throttle>,
378}
379
380impl<Player, State, Trigger, Metadata> EngineBuilder<Player, State, Trigger, Metadata>
381where
382 Player: PlayerInfo + PlayerMutator,
383 State: Default,
384 Trigger: Copy + PartialEq + Eq + Debug,
385{
386 pub fn new() -> Self {
387 Self {
388 bloops: Vec::new(),
389 achievements: Vec::new(),
390 bloop_retention: None,
391 audio_base_path: None,
392 player_registry: None,
393 state: None,
394 trigger_registry: None,
395 network_rx: None,
396 event_tx: None,
397 throttle: None,
398 }
399 }
400
401 pub fn bloops(mut self, bloops: Vec<Bloop<Player>>) -> Self {
402 self.bloops = bloops;
403 self
404 }
405
406 pub fn achievements(
407 mut self,
408 achievements: Vec<Achievement<Metadata, Player, State, Trigger>>,
409 ) -> Self {
410 self.achievements = achievements;
411 self
412 }
413
414 pub fn bloop_retention(mut self, retention: Duration) -> Self {
415 self.bloop_retention = Some(retention);
416 self
417 }
418
419 pub fn audio_base_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
420 self.audio_base_path = Some(path.into());
421 self
422 }
423
424 pub fn player_registry(mut self, registry: Arc<Mutex<PlayerRegistry<Player>>>) -> Self {
425 self.player_registry = Some(registry);
426 self
427 }
428
429 pub fn state(mut self, state: Arc<Mutex<State>>) -> Self {
430 self.state = Some(state);
431 self
432 }
433
434 pub fn trigger_registry(mut self, registry: TriggerRegistry<Trigger>) -> Self {
435 self.trigger_registry = Some(registry);
436 self
437 }
438
439 pub fn network_rx(
440 mut self,
441 rx: mpsc::Receiver<(EngineRequest, oneshot::Sender<ServerMessage>)>,
442 ) -> Self {
443 self.network_rx = Some(rx);
444 self
445 }
446
447 pub fn event_tx(mut self, tx: broadcast::Sender<Event>) -> Self {
448 self.event_tx = Some(tx);
449 self
450 }
451
452 pub fn throttle(mut self, throttle: Throttle) -> Self {
453 self.throttle = Some(throttle);
454 self
455 }
456
457 pub fn build(self) -> Result<Engine<Metadata, Player, State, Trigger>, BuilderError> {
459 let bloop_retention = self
460 .bloop_retention
461 .ok_or(BuilderError::MissingField("bloop_retention"))?;
462 let audio_base_path = self
463 .audio_base_path
464 .ok_or(BuilderError::MissingField("audio_base_path"))?;
465 let player_registry = self
466 .player_registry
467 .ok_or(BuilderError::MissingField("player_registry"))?;
468 let network_rx = self
469 .network_rx
470 .ok_or(BuilderError::MissingField("network_rx"))?;
471 let event_tx = self
472 .event_tx
473 .ok_or(BuilderError::MissingField("event_tx"))?;
474
475 let audio_manifest_hash = calculate_manifest_hash(&audio_base_path, &self.achievements);
476 let bloop_provider = BloopProvider::with_bloops(bloop_retention, self.bloops);
477 let achievements: HashMap<Uuid, Achievement<Metadata, Player, State, Trigger>> =
478 self.achievements.into_iter().map(|a| (a.id, a)).collect();
479 let state = self
480 .state
481 .unwrap_or_else(|| Arc::new(Mutex::new(Default::default())));
482 let trigger_registry = self
483 .trigger_registry
484 .unwrap_or_else(|| TriggerRegistry::new(HashMap::new()));
485 let hot_achievements = Vec::new();
486
487 Ok(Engine {
488 bloop_provider,
489 achievements,
490 audio_base_path,
491 audio_manifest_hash,
492 player_registry,
493 state,
494 trigger_registry,
495 hot_achievements,
496 network_rx,
497 event_tx,
498 throttle: self.throttle,
499 })
500 }
501}
502
503fn calculate_manifest_hash<Metadata, Player, State, Trigger>(
504 audio_base_path: &Path,
505 achievements: &[Achievement<Metadata, Player, State, Trigger>],
506) -> DataHash {
507 let audio_file_hashes: HashMap<Uuid, DataHash> = achievements
508 .iter()
509 .filter_map(|achievement| {
510 achievement
511 .audio_file
512 .resolve(audio_base_path)
513 .as_ref()
514 .map(|file| (achievement.id, file.hash))
515 })
516 .collect();
517
518 let mut entries: Vec<_> = audio_file_hashes.iter().collect();
519 entries.sort_by_key(|(id, _)| *id);
520 let mut hash_input = Vec::with_capacity(entries.len() * 32);
521
522 for (id, hash) in entries {
523 hash_input.extend(id.as_bytes());
524 hash_input.extend_from_slice(hash.as_bytes());
525 }
526
527 let manifest_hash = md5::compute(hash_input);
528 manifest_hash.into()
529}
530
531#[cfg(test)]
532mod tests {
533 use super::*;
534 use crate::test_utils::MockPlayer;
535 use crate::trigger::{TriggerOccurrence, TriggerSpec};
536
537 fn build_test_engine() -> Engine<(), MockPlayer, (), ()> {
538 let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
539 let trigger_registry = TriggerRegistry::new(HashMap::new());
540
541 EngineBuilder::<MockPlayer, (), (), ()>::new()
542 .bloop_retention(Duration::from_secs(3600))
543 .audio_base_path("./audio")
544 .player_registry(player_registry)
545 .trigger_registry(trigger_registry)
546 .network_rx(mpsc::channel(1).1)
547 .event_tx(broadcast::channel(16).0)
548 .build()
549 .unwrap()
550 }
551
552 #[tokio::test]
553 async fn handle_bloop_rejects_unknown_nfc_uid() {
554 let mut engine = build_test_engine();
555 let unknown_nfc_uid = NfcUid::default();
556 let client_id = "test-client".to_string();
557
558 let (resp_tx, resp_rx) = oneshot::channel();
559 engine
560 .handle_bloop(unknown_nfc_uid, client_id.clone(), resp_tx)
561 .await;
562
563 let response = resp_rx.await.unwrap();
564 match response {
565 ServerMessage::Error(err) => {
566 assert!(matches!(err, ErrorResponse::UnknownNfcUid));
567 }
568 _ => panic!("Expected Error response for unknown NFC UID"),
569 }
570 }
571
572 #[tokio::test]
573 async fn handle_bloop_accepts_known_player() {
574 let mut engine = build_test_engine();
575 let nfc_uid = NfcUid::default();
576 let client_id = "test-client".to_string();
577
578 {
579 let mut registry = engine.player_registry.lock().await;
580 let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
581 registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
582 }
583
584 let (resp_tx, resp_rx) = oneshot::channel();
585 engine
586 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
587 .await;
588
589 let response = resp_rx.await.unwrap();
590 match response {
591 ServerMessage::BloopAccepted { achievements } => {
592 assert!(achievements.is_empty());
593 }
594 _ => panic!("Expected BloopAccepted response"),
595 }
596 }
597
598 #[tokio::test]
599 async fn handle_bloop_activates_trigger_and_responds() {
600 let mut trigger_registry = HashMap::new();
601 trigger_registry.insert(
602 NfcUid::default(),
603 TriggerSpec {
604 trigger: (),
605 global: false,
606 occurrence: TriggerOccurrence::Once,
607 },
608 );
609 let trigger_registry = TriggerRegistry::new(trigger_registry);
610
611 let player_registry = Arc::new(Mutex::new(PlayerRegistry::new(vec![])));
612
613 let (_tx, rx) = mpsc::channel(1);
614 let (evt_tx, _) = broadcast::channel(16);
615
616 let mut engine = EngineBuilder::<MockPlayer, (), (), ()>::new()
617 .bloop_retention(Duration::from_secs(3600))
618 .audio_base_path("./audio")
619 .player_registry(player_registry)
620 .trigger_registry(trigger_registry)
621 .network_rx(rx)
622 .event_tx(evt_tx)
623 .build()
624 .unwrap();
625
626 let nfc_uid = NfcUid::default();
627 let client_id = "client".to_string();
628 let (resp_tx, resp_rx) = oneshot::channel();
629
630 engine
631 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
632 .await;
633
634 let response = resp_rx.await.unwrap();
635 match response {
636 ServerMessage::BloopAccepted { achievements } => {
637 assert!(achievements.is_empty());
638 }
639 _ => panic!("Expected BloopAccepted response"),
640 }
641
642 assert!(
643 engine
644 .trigger_registry
645 .check_active_trigger((), "client", Utc::now())
646 );
647 }
648
649 #[tokio::test]
650 async fn handle_bloop_respects_throttling() {
651 let mut engine = build_test_engine();
652 let nfc_uid = NfcUid::default();
653 let client_id = "test-client".to_string();
654
655 {
656 let mut registry = engine.player_registry.lock().await;
657 let (player, _) = MockPlayer::builder().nfc_uid(nfc_uid).build();
658 registry.add(Arc::into_inner(player).unwrap().into_inner().unwrap());
659 }
660
661 engine.throttle = Some(Throttle::new(1, Duration::from_secs(10)));
662
663 let bloop = Bloop::new(
664 engine
665 .player_registry
666 .lock()
667 .await
668 .get_by_nfc_uid(nfc_uid)
669 .unwrap(),
670 client_id.clone(),
671 Utc::now(),
672 );
673 engine.bloop_provider.add(Arc::new(bloop));
674
675 let (resp_tx, resp_rx) = oneshot::channel();
676 engine
677 .handle_bloop(nfc_uid, client_id.clone(), resp_tx)
678 .await;
679
680 let response = resp_rx.await.unwrap();
681 match response {
682 ServerMessage::Error(err) => {
683 assert!(matches!(err, ErrorResponse::NfcUidThrottled));
684 }
685 _ => panic!("Expected throttling error"),
686 }
687 }
688}