1use crate::event::Event;
20use crate::player::PlayerInfo;
21use chrono::{DateTime, Utc};
22use std::cmp::Reverse;
23use std::collections::{HashMap, VecDeque, vec_deque};
24use std::sync::{Arc, RwLock, RwLockReadGuard};
25use std::time::Duration;
26use thiserror::Error;
27use tokio::sync::broadcast;
28use tokio::sync::broadcast::error::RecvError;
29use tokio::{select, time};
30#[cfg(feature = "tokio-graceful-shutdown")]
31use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
32use tracing::{debug, error, warn};
33use uuid::Uuid;
34
35#[derive(Debug, Clone)]
40pub struct Bloop<Player> {
41 player: Arc<RwLock<Player>>,
43
44 pub player_id: Uuid,
49
50 pub client_id: String,
52
53 pub recorded_at: DateTime<Utc>,
55}
56
57impl<Player: PlayerInfo> Bloop<Player> {
58 pub fn new(
61 player: Arc<RwLock<Player>>,
62 client_id: impl Into<String>,
63 recorded_at: DateTime<Utc>,
64 ) -> Self {
65 let player_id = player.read().unwrap().id();
66
67 Bloop {
68 player_id,
69 player,
70 client_id: client_id.into(),
71 recorded_at,
72 }
73 }
74}
75
76impl<Player> Bloop<Player> {
77 pub fn player(&self) -> RwLockReadGuard<'_, Player> {
82 self.player.read().unwrap()
83 }
84}
85
86#[inline]
88pub fn bloops_since<Player>(since: DateTime<Utc>) -> impl Fn(&&Arc<Bloop<Player>>) -> bool {
89 move |bloop| bloop.recorded_at >= since
90}
91
92#[inline]
94pub fn bloops_for_player<Player>(player_id: Uuid) -> impl Fn(&&Arc<Bloop<Player>>) -> bool {
95 move |bloop| bloop.player_id == player_id
96}
97
98#[derive(Debug)]
102pub struct BloopCollection<Player> {
103 bloops: VecDeque<Arc<Bloop<Player>>>,
104 max_age: Duration,
105}
106
107impl<Player> BloopCollection<Player> {
108 pub fn new(max_age: Duration) -> Self {
110 Self {
111 bloops: VecDeque::new(),
112 max_age,
113 }
114 }
115
116 pub fn with_bloops(max_age: Duration, mut bloops: Vec<Arc<Bloop<Player>>>) -> Self {
118 bloops.sort_by_key(|bloop| Reverse(bloop.recorded_at));
119
120 let mut collection = Self::new(max_age);
121 collection.bloops.extend(bloops);
122
123 collection
124 }
125
126 pub fn add(&mut self, bloop: Arc<Bloop<Player>>) {
128 let threshold = Utc::now() - self.max_age;
129
130 while let Some(oldest) = self.bloops.back() {
131 if oldest.recorded_at < threshold {
132 self.bloops.pop_back();
133 } else {
134 break;
135 }
136 }
137
138 self.bloops.push_front(bloop);
139 }
140
141 pub fn iter(&self) -> impl Iterator<Item = &Arc<Bloop<Player>>> {
143 self.bloops.iter()
144 }
145}
146
147impl<'a, Player: 'a> IntoIterator for &'a BloopCollection<Player> {
148 type Item = &'a Arc<Bloop<Player>>;
149 type IntoIter = vec_deque::Iter<'a, Arc<Bloop<Player>>>;
150
151 fn into_iter(self) -> Self::IntoIter {
152 self.bloops.iter()
153 }
154}
155
156#[derive(Debug)]
158pub struct BloopProvider<Player> {
159 global: BloopCollection<Player>,
160 per_client: HashMap<String, BloopCollection<Player>>,
161 max_age: Duration,
162 empty_collection: BloopCollection<Player>,
163}
164
165impl<Player> BloopProvider<Player> {
166 pub fn new(max_age: Duration) -> Self {
168 Self {
169 global: BloopCollection::new(max_age),
170 per_client: HashMap::new(),
171 max_age,
172 empty_collection: BloopCollection::new(max_age),
173 }
174 }
175
176 pub fn with_bloops(max_age: Duration, bloops: Vec<Bloop<Player>>) -> Self {
178 let bloops: Vec<Arc<Bloop<Player>>> = bloops.into_iter().map(Arc::new).collect();
179 let global_collection = BloopCollection::with_bloops(max_age, bloops.clone());
180
181 let mut per_client: HashMap<String, BloopCollection<Player>> = HashMap::new();
182 let mut client_groups: HashMap<String, Vec<Arc<Bloop<Player>>>> = HashMap::new();
183
184 for bloop in bloops {
185 client_groups
186 .entry(bloop.client_id.clone())
187 .or_default()
188 .push(bloop.clone());
189 }
190
191 for (client_id, client_bloops) in client_groups {
192 let collection = BloopCollection::with_bloops(max_age, client_bloops);
193 per_client.insert(client_id, collection);
194 }
195
196 Self {
197 global: global_collection,
198 per_client,
199 max_age,
200 empty_collection: BloopCollection::new(max_age),
201 }
202 }
203
204 pub fn add(&mut self, bloop: Arc<Bloop<Player>>) {
206 self.global.add(bloop.clone());
207
208 let client_collection = self
209 .per_client
210 .entry(bloop.client_id.clone())
211 .or_insert_with(|| BloopCollection::new(self.max_age));
212
213 client_collection.add(bloop.clone());
214 }
215
216 pub fn global(&self) -> &BloopCollection<Player> {
218 &self.global
219 }
220
221 pub fn for_client(&self, client_id: &str) -> &BloopCollection<Player> {
223 self.per_client
224 .get(client_id)
225 .unwrap_or(&self.empty_collection)
226 }
227}
228
229#[derive(Debug, Clone)]
231pub struct ProcessedBloop {
232 pub player_id: Uuid,
234
235 pub client_id: String,
237
238 pub recorded_at: DateTime<Utc>,
240}
241
242impl<Player> From<&Bloop<Player>> for ProcessedBloop {
243 fn from(bloop: &Bloop<Player>) -> Self {
244 Self {
245 player_id: bloop.player_id,
246 client_id: bloop.client_id.clone(),
247 recorded_at: bloop.recorded_at,
248 }
249 }
250}
251
252pub trait BloopRepository {
254 type Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static;
255
256 fn persist_batch(
257 &self,
258 bloops: &[ProcessedBloop],
259 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
260}
261
262#[derive(Debug)]
264pub struct ProcessedBloopSink<R: BloopRepository> {
265 repository: R,
266 buffer: Vec<ProcessedBloop>,
267 max_batch_size: usize,
268 max_batch_duration: Duration,
269 event_rx: broadcast::Receiver<Event>,
270}
271
272impl<R: BloopRepository> ProcessedBloopSink<R> {
273 pub async fn process_events(&mut self) {
275 let mut flush_interval = time::interval(self.max_batch_duration);
276
277 loop {
278 let should_continue = select! {
279 _ = flush_interval.tick() => {
280 self.flush().await;
281 true
282 }
283
284 result = self.event_rx.recv() => {
285 self.handle_recv(result).await
286 }
287 };
288
289 if !should_continue {
290 break;
291 }
292 }
293 }
294
295 pub async fn flush(&mut self) {
297 if self.buffer.is_empty() {
298 return;
299 }
300
301 let batch = std::mem::take(&mut self.buffer);
302
303 if let Err(err) = self.repository.persist_batch(&batch).await {
304 error!("Failed to persist bloop batch: {}", err);
305 self.buffer.extend(batch);
306 } else {
307 debug!("Persisted {} bloops", batch.len());
308 }
309 }
310
311 async fn handle_recv(&mut self, result: Result<Event, RecvError>) -> bool {
312 match result {
313 Ok(Event::BloopProcessed(bloop)) => {
314 self.buffer.push(bloop);
315
316 if self.buffer.len() >= self.max_batch_size {
317 debug!("Batch size reached, flushing");
318 self.flush().await;
319 }
320 true
321 }
322 Ok(_) => true,
323 Err(RecvError::Lagged(n)) => {
324 warn!("ProcessedBloopSink lagged by {n} messages, some bloops were missed");
325 self.flush().await;
326 true
327 }
328 Err(RecvError::Closed) => {
329 debug!("ProcessedBloopSink event stream closed, exiting event loop");
330 false
331 }
332 }
333 }
334}
335
336#[cfg(feature = "tokio-graceful-shutdown")]
337#[derive(Debug, Error)]
338pub enum NeverError {}
339
340#[cfg(feature = "tokio-graceful-shutdown")]
341impl<R> IntoSubsystem<NeverError> for ProcessedBloopSink<R>
342where
343 R: BloopRepository + Send + Sync + 'static,
344{
345 async fn run(mut self, subsys: &mut SubsystemHandle) -> Result<(), NeverError> {
346 let _ = self.process_events().cancel_on_shutdown(subsys).await;
347 self.flush().await;
348
349 Ok(())
350 }
351}
352
353#[derive(Debug, Error)]
354pub enum BuilderError {
355 #[error("missing field: {0}")]
356 MissingField(&'static str),
357}
358
359#[derive(Debug, Default)]
361pub struct ProcessedBloopSinkBuilder<R: BloopRepository> {
362 repository: Option<R>,
363 max_batch_size: Option<usize>,
364 max_batch_duration: Option<Duration>,
365 event_rx: Option<broadcast::Receiver<Event>>,
366}
367
368impl<R: BloopRepository> ProcessedBloopSinkBuilder<R> {
369 pub fn new() -> Self {
371 Self {
372 repository: None,
373 max_batch_size: None,
374 max_batch_duration: None,
375 event_rx: None,
376 }
377 }
378
379 pub fn repository(mut self, repository: R) -> Self {
381 self.repository = Some(repository);
382 self
383 }
384
385 pub fn max_batch_size(mut self, size: usize) -> Self {
387 self.max_batch_size = Some(size);
388 self
389 }
390
391 pub fn max_batch_duration(mut self, duration: Duration) -> Self {
393 self.max_batch_duration = Some(duration);
394 self
395 }
396
397 pub fn event_rx(mut self, event_rx: broadcast::Receiver<Event>) -> Self {
399 self.event_rx = Some(event_rx);
400 self
401 }
402
403 pub fn build(self) -> Result<ProcessedBloopSink<R>, BuilderError> {
406 Ok(ProcessedBloopSink {
407 repository: self
408 .repository
409 .ok_or(BuilderError::MissingField("repository"))?,
410 buffer: Vec::new(),
411 max_batch_size: self
412 .max_batch_size
413 .ok_or(BuilderError::MissingField("max_batch_size"))?,
414 max_batch_duration: self
415 .max_batch_duration
416 .ok_or(BuilderError::MissingField("max_batch_duration"))?,
417 event_rx: self
418 .event_rx
419 .ok_or(BuilderError::MissingField("event_rx"))?,
420 })
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use crate::bloop::{Bloop, BloopCollection, BloopProvider};
428 use crate::test_utils::MockPlayer;
429 use ntest::timeout;
430 use std::sync::{Arc, Mutex};
431 use std::time::Duration;
432 use tokio::sync::mpsc;
433
434 fn create_mock_bloop(recorded_at: DateTime<Utc>) -> Arc<Bloop<MockPlayer>> {
435 let (player, _) = MockPlayer::builder().build();
436 Arc::new(Bloop::new(player, "client-1", recorded_at))
437 }
438
439 #[test]
440 fn bloop_collection_adds_bloops_and_preserves_order() {
441 let now = Utc::now();
442 let bloop1 = create_mock_bloop(now);
443 let bloop2 = create_mock_bloop(now + Duration::from_secs(1));
444
445 let mut collection = BloopCollection::new(Duration::from_secs(60));
446 collection.add(bloop2.clone());
447 collection.add(bloop1.clone());
448
449 let bloops: Vec<_> = collection.iter().collect();
450 assert_eq!(bloops.len(), 2);
451 assert_eq!(bloops[0].recorded_at, bloop1.recorded_at);
452 assert_eq!(bloops[1].recorded_at, bloop2.recorded_at);
453 }
454
455 #[test]
456 fn bloop_collection_prunes_bloops_older_than_max_age() {
457 let mut collection = BloopCollection::new(Duration::from_secs(5));
458 let now = Utc::now();
459
460 let old_bloop = create_mock_bloop(now - Duration::from_secs(10));
461 let recent_bloop = create_mock_bloop(now - Duration::from_secs(2));
462
463 collection.add(old_bloop);
464 collection.add(recent_bloop.clone());
465
466 let bloops: Vec<_> = collection.iter().collect();
467 assert_eq!(bloops.len(), 1);
468 assert_eq!(bloops[0].recorded_at, recent_bloop.recorded_at);
469 }
470
471 #[test]
472 fn bloop_provider_adds_bloops_to_global_and_per_client_collections() {
473 let mut provider = BloopProvider::new(Duration::from_secs(60));
474 let now = Utc::now();
475 let bloop = create_mock_bloop(now);
476 let client_id = bloop.client_id.clone();
477
478 provider.add(bloop.clone());
479
480 let global_bloops: Vec<_> = provider.global().iter().collect();
481 assert_eq!(global_bloops.len(), 1);
482 assert_eq!(global_bloops[0].recorded_at, bloop.recorded_at);
483
484 let client_bloops: Vec<_> = provider.for_client(&client_id).iter().collect();
485 assert_eq!(client_bloops.len(), 1);
486 assert_eq!(client_bloops[0].recorded_at, bloop.recorded_at);
487 }
488
489 #[test]
490 fn bloop_provider_correctly_initializes_from_existing_bloops() {
491 let now = Utc::now();
492 let (player1, _) = MockPlayer::builder().build();
493 let (player2, _) = MockPlayer::builder().build();
494
495 let bloop1 = Bloop::new(player1.clone(), "client-1", now);
496 let bloop2 = Bloop::new(player1, "client-1", now + Duration::from_secs(1));
497 let bloop3 = Bloop::new(player2, "client-2", now);
498
499 let provider =
500 BloopProvider::with_bloops(Duration::from_secs(60), vec![bloop1, bloop2, bloop3]);
501
502 let global_bloops: Vec<_> = provider.global().iter().collect();
503 assert_eq!(global_bloops.len(), 3);
504
505 let client1_bloops: Vec<_> = provider.for_client("client-1").iter().collect();
506 assert_eq!(client1_bloops.len(), 2);
507 assert!(client1_bloops.iter().any(|b| b.recorded_at == now));
508 assert!(
509 client1_bloops
510 .iter()
511 .any(|b| b.recorded_at == now + Duration::from_secs(1))
512 );
513
514 let client2_bloops: Vec<_> = provider.for_client("client-2").iter().collect();
515 assert_eq!(client2_bloops.len(), 1);
516 assert_eq!(client2_bloops[0].recorded_at, now);
517 }
518
519 #[test]
520 fn bloops_since_filter_correctly_filters_based_on_timestamp() {
521 let now = Utc::now();
522 let bloop1 = create_mock_bloop(now);
523 let bloop2 = create_mock_bloop(now + Duration::from_secs(10));
524
525 let bloops = vec![bloop1.clone(), bloop2.clone()];
526 let since = now + Duration::from_secs(5);
527 let filtered: Vec<_> = bloops.iter().filter(bloops_since(since)).collect();
528
529 assert_eq!(filtered.len(), 1);
530 assert_eq!(filtered[0].recorded_at, bloop2.recorded_at);
531 }
532
533 #[test]
534 fn bloops_for_player_filter_correctly_filters_based_on_player_id() {
535 let now = Utc::now();
536 let bloop1 = create_mock_bloop(now);
537 let (player2, player2_id) = MockPlayer::builder().build();
538 let bloop2 = Arc::new(Bloop::new(player2, "client-1", now));
539
540 let bloops = vec![bloop1.clone(), bloop2.clone()];
541 let filtered: Vec<_> = bloops
542 .iter()
543 .filter(bloops_for_player(player2_id))
544 .collect();
545
546 assert_eq!(filtered.len(), 1);
547 assert_eq!(filtered[0].player_id, player2_id);
548 }
549
550 #[derive(Clone)]
551 struct DummyRepo {
552 sender: mpsc::UnboundedSender<Vec<ProcessedBloop>>,
553 fail_persist: Arc<Mutex<bool>>,
554 }
555
556 impl DummyRepo {
557 fn new(sender: mpsc::UnboundedSender<Vec<ProcessedBloop>>) -> Self {
558 Self {
559 sender,
560 fail_persist: Arc::new(Mutex::new(false)),
561 }
562 }
563
564 fn set_fail(&self, fail: bool) {
565 *self.fail_persist.lock().unwrap() = fail;
566 }
567 }
568
569 impl BloopRepository for DummyRepo {
570 type Error = &'static str;
571
572 async fn persist_batch(&self, bloops: &[ProcessedBloop]) -> Result<(), Self::Error> {
573 if *self.fail_persist.lock().unwrap() {
574 return Err("fail");
575 }
576 self.sender.send(bloops.to_vec()).unwrap();
577 Ok(())
578 }
579 }
580
581 fn create_processed_bloop() -> ProcessedBloop {
582 ProcessedBloop {
583 player_id: Uuid::new_v4(),
584 client_id: "client-1".to_string(),
585 recorded_at: Utc::now(),
586 }
587 }
588
589 #[tokio::test]
590 async fn flush_persists_and_clears_buffer() {
591 let (tx, mut rx) = mpsc::unbounded_channel();
592 let repo = DummyRepo::new(tx);
593
594 let (_evt_tx, evt_rx) = broadcast::channel(16);
595
596 let mut sink = ProcessedBloopSinkBuilder::new()
597 .repository(repo.clone())
598 .max_batch_size(10)
599 .max_batch_duration(Duration::from_secs(5))
600 .event_rx(evt_rx)
601 .build()
602 .unwrap();
603
604 sink.buffer.push(create_processed_bloop());
605 sink.flush().await;
606
607 let batch = rx.recv().await.unwrap();
608 assert_eq!(batch.len(), 1);
609 assert!(sink.buffer.is_empty());
610 }
611
612 #[tokio::test]
613 async fn flush_retries_on_failure() {
614 let (tx, mut rx) = mpsc::unbounded_channel();
615 let repo = DummyRepo::new(tx);
616 repo.set_fail(true);
617
618 let (_evt_tx, evt_rx) = broadcast::channel(16);
619
620 let mut sink = ProcessedBloopSinkBuilder::new()
621 .repository(repo.clone())
622 .max_batch_size(10)
623 .max_batch_duration(Duration::from_secs(5))
624 .event_rx(evt_rx)
625 .build()
626 .unwrap();
627
628 sink.buffer.push(create_processed_bloop());
629 sink.flush().await;
630 assert_eq!(sink.buffer.len(), 1);
631
632 repo.set_fail(false);
633 sink.flush().await;
634
635 let batch = rx.recv().await.unwrap();
636 assert_eq!(batch.len(), 1);
637 assert!(sink.buffer.is_empty());
638 }
639
640 #[tokio::test]
641 #[timeout(1000)]
642 async fn process_events_flushes_on_batch_size() {
643 let (tx, mut rx) = mpsc::unbounded_channel();
644 let repo = DummyRepo::new(tx);
645 let (evt_tx, evt_rx) = broadcast::channel(16);
646
647 let mut sink = ProcessedBloopSinkBuilder::new()
648 .repository(repo.clone())
649 .max_batch_size(2)
650 .max_batch_duration(Duration::from_secs(10))
651 .event_rx(evt_rx)
652 .build()
653 .unwrap();
654
655 let bloop1 = create_processed_bloop();
656 let bloop2 = create_processed_bloop();
657
658 let handle = tokio::spawn(async move { sink.process_events().await });
659
660 evt_tx.send(Event::BloopProcessed(bloop1.clone())).unwrap();
661 evt_tx.send(Event::BloopProcessed(bloop2.clone())).unwrap();
662
663 let batch = rx.recv().await.unwrap();
664 assert_eq!(batch.len(), 2);
665
666 drop(evt_tx);
667
668 handle.await.unwrap();
669 }
670
671 #[tokio::test]
672 async fn handle_recv_returns_false_on_closed() {
673 let (tx, _rx) = mpsc::unbounded_channel();
674 let repo = DummyRepo::new(tx);
675 let (evt_tx, evt_rx) = broadcast::channel(16);
676
677 let mut sink = ProcessedBloopSinkBuilder::new()
678 .repository(repo.clone())
679 .max_batch_size(10)
680 .max_batch_duration(Duration::from_secs(10))
681 .event_rx(evt_rx)
682 .build()
683 .unwrap();
684
685 drop(evt_tx);
686
687 let recv_result = sink.event_rx.recv().await;
688 let result = sink.handle_recv(recv_result).await;
689 assert!(!result);
690 }
691}