1use crate::event::Event;
20use crate::player::PlayerInfo;
21use async_trait::async_trait;
22use chrono::{DateTime, Utc};
23use std::cmp::Reverse;
24use std::collections::{HashMap, VecDeque, vec_deque};
25use std::sync::{Arc, RwLock, RwLockReadGuard};
26use std::time::Duration;
27use thiserror::Error;
28use tokio::sync::broadcast;
29use tokio::sync::broadcast::error::RecvError;
30use tokio::{select, time};
31#[cfg(feature = "tokio-graceful-shutdown")]
32use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
33use tracing::{debug, error, warn};
34use uuid::Uuid;
35
36#[derive(Debug, Clone)]
41pub struct Bloop<Player> {
42 player: Arc<RwLock<Player>>,
44
45 pub player_id: Uuid,
50
51 pub client_id: String,
53
54 pub recorded_at: DateTime<Utc>,
56}
57
58impl<Player: PlayerInfo> Bloop<Player> {
59 pub fn new(
62 player: Arc<RwLock<Player>>,
63 client_id: impl Into<String>,
64 recorded_at: DateTime<Utc>,
65 ) -> Self {
66 let player_id = player.read().unwrap().id();
67
68 Bloop {
69 player_id,
70 player,
71 client_id: client_id.into(),
72 recorded_at,
73 }
74 }
75}
76
77impl<Player> Bloop<Player> {
78 pub fn player(&self) -> RwLockReadGuard<Player> {
83 self.player.read().unwrap()
84 }
85}
86
87#[inline]
89pub fn bloops_since<Player>(since: DateTime<Utc>) -> impl Fn(&&Arc<Bloop<Player>>) -> bool {
90 move |bloop| bloop.recorded_at >= since
91}
92
93#[inline]
95pub fn bloops_for_player<Player>(player_id: Uuid) -> impl Fn(&&Arc<Bloop<Player>>) -> bool {
96 move |bloop| bloop.player_id == player_id
97}
98
99#[derive(Debug)]
103pub struct BloopCollection<Player> {
104 bloops: VecDeque<Arc<Bloop<Player>>>,
105 max_age: Duration,
106}
107
108impl<Player> BloopCollection<Player> {
109 pub fn new(max_age: Duration) -> Self {
111 Self {
112 bloops: VecDeque::new(),
113 max_age,
114 }
115 }
116
117 pub fn with_bloops(max_age: Duration, mut bloops: Vec<Arc<Bloop<Player>>>) -> Self {
119 bloops.sort_by_key(|bloop| Reverse(bloop.recorded_at));
120
121 let mut collection = Self::new(max_age);
122 collection.bloops.extend(bloops);
123
124 collection
125 }
126
127 pub fn add(&mut self, bloop: Arc<Bloop<Player>>) {
129 let threshold = Utc::now() - self.max_age;
130
131 while let Some(oldest) = self.bloops.back() {
132 if oldest.recorded_at < threshold {
133 self.bloops.pop_back();
134 } else {
135 break;
136 }
137 }
138
139 self.bloops.push_front(bloop);
140 }
141
142 pub fn iter(&self) -> impl Iterator<Item = &Arc<Bloop<Player>>> {
144 self.bloops.iter()
145 }
146}
147
148impl<'a, Player: 'a> IntoIterator for &'a BloopCollection<Player> {
149 type Item = &'a Arc<Bloop<Player>>;
150 type IntoIter = vec_deque::Iter<'a, Arc<Bloop<Player>>>;
151
152 fn into_iter(self) -> Self::IntoIter {
153 self.bloops.iter()
154 }
155}
156
157#[derive(Debug)]
159pub struct BloopProvider<Player> {
160 global: BloopCollection<Player>,
161 per_client: HashMap<String, BloopCollection<Player>>,
162 max_age: Duration,
163 empty_collection: BloopCollection<Player>,
164}
165
166impl<Player> BloopProvider<Player> {
167 pub fn new(max_age: Duration) -> Self {
169 Self {
170 global: BloopCollection::new(max_age),
171 per_client: HashMap::new(),
172 max_age,
173 empty_collection: BloopCollection::new(max_age),
174 }
175 }
176
177 pub fn with_bloops(max_age: Duration, bloops: Vec<Bloop<Player>>) -> Self {
179 let bloops: Vec<Arc<Bloop<Player>>> = bloops.into_iter().map(Arc::new).collect();
180 let global_collection = BloopCollection::with_bloops(max_age, bloops.clone());
181
182 let mut per_client: HashMap<String, BloopCollection<Player>> = HashMap::new();
183 let mut client_groups: HashMap<String, Vec<Arc<Bloop<Player>>>> = HashMap::new();
184
185 for bloop in bloops {
186 client_groups
187 .entry(bloop.client_id.clone())
188 .or_default()
189 .push(bloop.clone());
190 }
191
192 for (client_id, client_bloops) in client_groups {
193 let collection = BloopCollection::with_bloops(max_age, client_bloops);
194 per_client.insert(client_id, collection);
195 }
196
197 Self {
198 global: global_collection,
199 per_client,
200 max_age,
201 empty_collection: BloopCollection::new(max_age),
202 }
203 }
204
205 pub fn add(&mut self, bloop: Arc<Bloop<Player>>) {
207 self.global.add(bloop.clone());
208
209 let client_collection = self
210 .per_client
211 .entry(bloop.client_id.clone())
212 .or_insert_with(|| BloopCollection::new(self.max_age));
213
214 client_collection.add(bloop.clone());
215 }
216
217 pub fn global(&self) -> &BloopCollection<Player> {
219 &self.global
220 }
221
222 pub fn for_client(&self, client_id: &str) -> &BloopCollection<Player> {
224 self.per_client
225 .get(client_id)
226 .unwrap_or(&self.empty_collection)
227 }
228}
229
230#[derive(Debug, Clone)]
232pub struct ProcessedBloop {
233 pub player_id: Uuid,
235
236 pub client_id: String,
238
239 pub recorded_at: DateTime<Utc>,
241}
242
243impl<Player> From<&Bloop<Player>> for ProcessedBloop {
244 fn from(bloop: &Bloop<Player>) -> Self {
245 Self {
246 player_id: bloop.player_id,
247 client_id: bloop.client_id.clone(),
248 recorded_at: bloop.recorded_at,
249 }
250 }
251}
252
253#[async_trait]
255pub trait BloopRepository {
256 type Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static;
257
258 async fn persist_batch(&self, bloops: &[ProcessedBloop]) -> Result<(), Self::Error>;
259}
260
261#[derive(Debug, Error)]
262enum SinkError {
263 #[error("Repository error: {0}")]
264 Repository(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
265}
266
267#[derive(Debug)]
269pub struct ProcessedBloopSink<R: BloopRepository> {
270 repository: R,
271 buffer: Vec<ProcessedBloop>,
272 max_batch_size: usize,
273 max_batch_duration: Duration,
274 event_rx: broadcast::Receiver<Event>,
275}
276
277impl<R: BloopRepository> ProcessedBloopSink<R> {
278 pub async fn process_events(&mut self) {
280 let mut flush_interval = time::interval(self.max_batch_duration);
281
282 loop {
283 let should_continue = select! {
284 _ = flush_interval.tick() => {
285 self.flush().await;
286 true
287 }
288
289 result = self.event_rx.recv() => {
290 self.handle_recv(result).await
291 }
292 };
293
294 if !should_continue {
295 break;
296 }
297 }
298 }
299
300 pub async fn flush(&mut self) {
302 if self.buffer.is_empty() {
303 return;
304 }
305
306 let batch = std::mem::take(&mut self.buffer);
307
308 if let Err(err) = self.repository.persist_batch(&batch).await {
309 error!("Failed to persist bloop batch: {}", err);
310 self.buffer.extend(batch);
311 } else {
312 debug!("Persisted {} bloops", batch.len());
313 }
314 }
315
316 async fn handle_recv(&mut self, result: Result<Event, RecvError>) -> bool {
317 match result {
318 Ok(Event::BloopProcessed(bloop)) => {
319 self.buffer.push(bloop);
320
321 if self.buffer.len() >= self.max_batch_size {
322 debug!("Batch size reached, flushing");
323 self.flush().await;
324 }
325 true
326 }
327 Ok(_) => true,
328 Err(RecvError::Lagged(n)) => {
329 warn!("ProcessedBloopSink lagged by {n} messages, some bloops were missed");
330 self.flush().await;
331 true
332 }
333 Err(RecvError::Closed) => {
334 debug!("ProcessedBloopSink event stream closed, exiting event loop");
335 false
336 }
337 }
338 }
339}
340
341#[cfg(feature = "tokio-graceful-shutdown")]
342#[derive(Debug, Error)]
343pub enum NeverError {}
344
345#[cfg(feature = "tokio-graceful-shutdown")]
346#[async_trait]
347impl<R> IntoSubsystem<NeverError> for ProcessedBloopSink<R>
348where
349 R: BloopRepository + Send + Sync + 'static,
350{
351 async fn run(mut self, subsys: SubsystemHandle) -> Result<(), NeverError> {
352 let _ = self.process_events().cancel_on_shutdown(&subsys).await;
353 self.flush().await;
354
355 Ok(())
356 }
357}
358
359#[derive(Debug, Error)]
360pub enum BuilderError {
361 #[error("missing field: {0}")]
362 MissingField(&'static str),
363}
364
365#[derive(Debug, Default)]
367pub struct ProcessedBloopSinkBuilder<R: BloopRepository> {
368 repository: Option<R>,
369 max_batch_size: Option<usize>,
370 max_batch_duration: Option<Duration>,
371 event_rx: Option<broadcast::Receiver<Event>>,
372}
373
374impl<R: BloopRepository> ProcessedBloopSinkBuilder<R> {
375 pub fn new() -> Self {
377 Self {
378 repository: None,
379 max_batch_size: None,
380 max_batch_duration: None,
381 event_rx: None,
382 }
383 }
384
385 pub fn repository(mut self, repository: R) -> Self {
387 self.repository = Some(repository);
388 self
389 }
390
391 pub fn max_batch_size(mut self, size: usize) -> Self {
393 self.max_batch_size = Some(size);
394 self
395 }
396
397 pub fn max_batch_duration(mut self, duration: Duration) -> Self {
399 self.max_batch_duration = Some(duration);
400 self
401 }
402
403 pub fn event_rx(mut self, event_rx: broadcast::Receiver<Event>) -> Self {
405 self.event_rx = Some(event_rx);
406 self
407 }
408
409 pub fn build(self) -> Result<ProcessedBloopSink<R>, BuilderError> {
412 Ok(ProcessedBloopSink {
413 repository: self
414 .repository
415 .ok_or(BuilderError::MissingField("repository"))?,
416 buffer: Vec::new(),
417 max_batch_size: self
418 .max_batch_size
419 .ok_or(BuilderError::MissingField("max_batch_size"))?,
420 max_batch_duration: self
421 .max_batch_duration
422 .ok_or(BuilderError::MissingField("max_batch_duration"))?,
423 event_rx: self
424 .event_rx
425 .ok_or(BuilderError::MissingField("event_rx"))?,
426 })
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433 use crate::bloop::{Bloop, BloopCollection, BloopProvider};
434 use crate::test_utils::MockPlayer;
435 use ntest::timeout;
436 use std::sync::{Arc, Mutex};
437 use std::time::Duration;
438 use tokio::sync::mpsc;
439
440 fn create_mock_bloop(recorded_at: DateTime<Utc>) -> Arc<Bloop<MockPlayer>> {
441 let (player, _) = MockPlayer::builder().build();
442 Arc::new(Bloop::new(player, "client-1", recorded_at))
443 }
444
445 #[test]
446 fn bloop_collection_adds_bloops_and_preserves_order() {
447 let now = Utc::now();
448 let bloop1 = create_mock_bloop(now);
449 let bloop2 = create_mock_bloop(now + Duration::from_secs(1));
450
451 let mut collection = BloopCollection::new(Duration::from_secs(60));
452 collection.add(bloop2.clone());
453 collection.add(bloop1.clone());
454
455 let bloops: Vec<_> = collection.iter().collect();
456 assert_eq!(bloops.len(), 2);
457 assert_eq!(bloops[0].recorded_at, bloop1.recorded_at);
458 assert_eq!(bloops[1].recorded_at, bloop2.recorded_at);
459 }
460
461 #[test]
462 fn bloop_collection_prunes_bloops_older_than_max_age() {
463 let mut collection = BloopCollection::new(Duration::from_secs(5));
464 let now = Utc::now();
465
466 let old_bloop = create_mock_bloop(now - Duration::from_secs(10));
467 let recent_bloop = create_mock_bloop(now - Duration::from_secs(2));
468
469 collection.add(old_bloop);
470 collection.add(recent_bloop.clone());
471
472 let bloops: Vec<_> = collection.iter().collect();
473 assert_eq!(bloops.len(), 1);
474 assert_eq!(bloops[0].recorded_at, recent_bloop.recorded_at);
475 }
476
477 #[test]
478 fn bloop_provider_adds_bloops_to_global_and_per_client_collections() {
479 let mut provider = BloopProvider::new(Duration::from_secs(60));
480 let now = Utc::now();
481 let bloop = create_mock_bloop(now);
482 let client_id = bloop.client_id.clone();
483
484 provider.add(bloop.clone());
485
486 let global_bloops: Vec<_> = provider.global().iter().collect();
487 assert_eq!(global_bloops.len(), 1);
488 assert_eq!(global_bloops[0].recorded_at, bloop.recorded_at);
489
490 let client_bloops: Vec<_> = provider.for_client(&client_id).iter().collect();
491 assert_eq!(client_bloops.len(), 1);
492 assert_eq!(client_bloops[0].recorded_at, bloop.recorded_at);
493 }
494
495 #[test]
496 fn bloop_provider_correctly_initializes_from_existing_bloops() {
497 let now = Utc::now();
498 let (player1, _) = MockPlayer::builder().build();
499 let (player2, _) = MockPlayer::builder().build();
500
501 let bloop1 = Bloop::new(player1.clone(), "client-1", now);
502 let bloop2 = Bloop::new(player1, "client-1", now + Duration::from_secs(1));
503 let bloop3 = Bloop::new(player2, "client-2", now);
504
505 let provider =
506 BloopProvider::with_bloops(Duration::from_secs(60), vec![bloop1, bloop2, bloop3]);
507
508 let global_bloops: Vec<_> = provider.global().iter().collect();
509 assert_eq!(global_bloops.len(), 3);
510
511 let client1_bloops: Vec<_> = provider.for_client("client-1").iter().collect();
512 assert_eq!(client1_bloops.len(), 2);
513 assert!(client1_bloops.iter().any(|b| b.recorded_at == now));
514 assert!(
515 client1_bloops
516 .iter()
517 .any(|b| b.recorded_at == now + Duration::from_secs(1))
518 );
519
520 let client2_bloops: Vec<_> = provider.for_client("client-2").iter().collect();
521 assert_eq!(client2_bloops.len(), 1);
522 assert_eq!(client2_bloops[0].recorded_at, now);
523 }
524
525 #[test]
526 fn bloops_since_filter_correctly_filters_based_on_timestamp() {
527 let now = Utc::now();
528 let bloop1 = create_mock_bloop(now);
529 let bloop2 = create_mock_bloop(now + Duration::from_secs(10));
530
531 let bloops = vec![bloop1.clone(), bloop2.clone()];
532 let since = now + Duration::from_secs(5);
533 let filtered: Vec<_> = bloops.iter().filter(bloops_since(since)).collect();
534
535 assert_eq!(filtered.len(), 1);
536 assert_eq!(filtered[0].recorded_at, bloop2.recorded_at);
537 }
538
539 #[test]
540 fn bloops_for_player_filter_correctly_filters_based_on_player_id() {
541 let now = Utc::now();
542 let bloop1 = create_mock_bloop(now);
543 let (player2, player2_id) = MockPlayer::builder().build();
544 let bloop2 = Arc::new(Bloop::new(player2, "client-1", now));
545
546 let bloops = vec![bloop1.clone(), bloop2.clone()];
547 let filtered: Vec<_> = bloops
548 .iter()
549 .filter(bloops_for_player(player2_id))
550 .collect();
551
552 assert_eq!(filtered.len(), 1);
553 assert_eq!(filtered[0].player_id, player2_id);
554 }
555
556 #[derive(Clone)]
557 struct DummyRepo {
558 sender: mpsc::UnboundedSender<Vec<ProcessedBloop>>,
559 fail_persist: Arc<Mutex<bool>>,
560 }
561
562 impl DummyRepo {
563 fn new(sender: mpsc::UnboundedSender<Vec<ProcessedBloop>>) -> Self {
564 Self {
565 sender,
566 fail_persist: Arc::new(Mutex::new(false)),
567 }
568 }
569
570 fn set_fail(&self, fail: bool) {
571 *self.fail_persist.lock().unwrap() = fail;
572 }
573 }
574
575 #[async_trait::async_trait]
576 impl BloopRepository for DummyRepo {
577 type Error = &'static str;
578
579 async fn persist_batch(&self, bloops: &[ProcessedBloop]) -> Result<(), Self::Error> {
580 if *self.fail_persist.lock().unwrap() {
581 return Err("fail");
582 }
583 self.sender.send(bloops.to_vec()).unwrap();
584 Ok(())
585 }
586 }
587
588 fn create_processed_bloop() -> ProcessedBloop {
589 ProcessedBloop {
590 player_id: Uuid::new_v4(),
591 client_id: "client-1".to_string(),
592 recorded_at: Utc::now(),
593 }
594 }
595
596 #[tokio::test]
597 async fn flush_persists_and_clears_buffer() {
598 let (tx, mut rx) = mpsc::unbounded_channel();
599 let repo = DummyRepo::new(tx);
600
601 let (_evt_tx, evt_rx) = broadcast::channel(16);
602
603 let mut sink = ProcessedBloopSinkBuilder::new()
604 .repository(repo.clone())
605 .max_batch_size(10)
606 .max_batch_duration(Duration::from_secs(5))
607 .event_rx(evt_rx)
608 .build()
609 .unwrap();
610
611 sink.buffer.push(create_processed_bloop());
612 sink.flush().await;
613
614 let batch = rx.recv().await.unwrap();
615 assert_eq!(batch.len(), 1);
616 assert!(sink.buffer.is_empty());
617 }
618
619 #[tokio::test]
620 async fn flush_retries_on_failure() {
621 let (tx, mut rx) = mpsc::unbounded_channel();
622 let repo = DummyRepo::new(tx);
623 repo.set_fail(true);
624
625 let (_evt_tx, evt_rx) = broadcast::channel(16);
626
627 let mut sink = ProcessedBloopSinkBuilder::new()
628 .repository(repo.clone())
629 .max_batch_size(10)
630 .max_batch_duration(Duration::from_secs(5))
631 .event_rx(evt_rx)
632 .build()
633 .unwrap();
634
635 sink.buffer.push(create_processed_bloop());
636 sink.flush().await;
637 assert_eq!(sink.buffer.len(), 1);
638
639 repo.set_fail(false);
640 sink.flush().await;
641
642 let batch = rx.recv().await.unwrap();
643 assert_eq!(batch.len(), 1);
644 assert!(sink.buffer.is_empty());
645 }
646
647 #[tokio::test]
648 #[timeout(1000)]
649 async fn process_events_flushes_on_batch_size() {
650 let (tx, mut rx) = mpsc::unbounded_channel();
651 let repo = DummyRepo::new(tx);
652 let (evt_tx, evt_rx) = broadcast::channel(16);
653
654 let mut sink = ProcessedBloopSinkBuilder::new()
655 .repository(repo.clone())
656 .max_batch_size(2)
657 .max_batch_duration(Duration::from_secs(10))
658 .event_rx(evt_rx)
659 .build()
660 .unwrap();
661
662 let bloop1 = create_processed_bloop();
663 let bloop2 = create_processed_bloop();
664
665 let handle = tokio::spawn(async move { sink.process_events().await });
666
667 evt_tx.send(Event::BloopProcessed(bloop1.clone())).unwrap();
668 evt_tx.send(Event::BloopProcessed(bloop2.clone())).unwrap();
669
670 let batch = rx.recv().await.unwrap();
671 assert_eq!(batch.len(), 2);
672
673 drop(evt_tx);
674
675 handle.await.unwrap();
676 }
677
678 #[tokio::test]
679 async fn handle_recv_returns_false_on_closed() {
680 let (tx, _rx) = mpsc::unbounded_channel();
681 let repo = DummyRepo::new(tx);
682 let (evt_tx, evt_rx) = broadcast::channel(16);
683
684 let mut sink = ProcessedBloopSinkBuilder::new()
685 .repository(repo.clone())
686 .max_batch_size(10)
687 .max_batch_duration(Duration::from_secs(10))
688 .event_rx(evt_rx)
689 .build()
690 .unwrap();
691
692 drop(evt_tx);
693
694 let recv_result = sink.event_rx.recv().await;
695 let result = sink.handle_recv(recv_result).await;
696 assert!(!result);
697 }
698}