gn_matchmaking_state/adapters/redis/publisher/
native.rs1use redis::{Connection, FromRedisValue};
2
3use crate::adapters::{
4 redis::{NotifyOnRedisEvent, RedisAdapter, RedisIdentifiable},
5 InfoPublisher, Publishable,
6};
7
8const EVENT_PREFIX: &str = "events";
9#[derive(Default)]
10pub struct RedisInfoPublisher {
11 connection: Option<Connection>,
12}
13
14impl RedisInfoPublisher {
15 #[inline]
16 pub fn new(connection: Connection) -> Self {
17 Self {
18 connection: Some(connection),
19 }
20 }
21}
22
23impl InfoPublisher<redis::Connection> for RedisInfoPublisher {
24 fn publish(
25 &mut self,
26 created: &dyn Publishable<redis::Connection>,
27 event: String,
28 ) -> Result<(), Box<dyn std::error::Error>> {
29 created.publish(
30 self.connection.as_mut().unwrap(),
31 format!("{}:{}", EVENT_PREFIX, event),
32 )?;
33 Ok(())
34 }
35}
36
37fn loop_on_redis_event<T>(
38 channel: String,
39 client: redis::Client,
40 mut handler: impl FnMut(T) -> () + Send + Sync + 'static,
41) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>>
42where
43 T: FromRedisValue,
44{
45 let mut connection = client.get_connection()?;
46 Ok(tokio::task::spawn_blocking(move || {
47 let mut connection = connection.as_pubsub();
48 connection.psubscribe(channel).unwrap(); loop {
50 let msg = connection.get_message().unwrap();
51 let payload = msg.get_payload::<T>().unwrap();
52 handler(payload);
53 }
54 }))
55}
56
57impl<T> NotifyOnRedisEvent<redis::Connection> for T
58where
59 T: RedisIdentifiable,
60{
61 fn on_update<O>(
62 connection: &RedisAdapter<redis::Connection>,
63 handler: impl FnMut(O) -> () + Send + Sync + 'static,
64 ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>>
65 where
66 O: FromRedisValue,
67 {
68 Ok(loop_on_redis_event(
69 format!("{}:update:*:{}", EVENT_PREFIX, T::name()),
70 connection.client.clone(),
71 handler,
72 )?)
73 }
74
75 fn on_delete<O>(
76 connection: &RedisAdapter<redis::Connection>,
77 handler: impl FnMut(O) -> () + Send + Sync + 'static,
78 ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>>
79 where
80 O: FromRedisValue,
81 {
82 Ok(loop_on_redis_event(
83 format!("{}:delete:*:{}", EVENT_PREFIX, T::name()),
84 connection.client.clone(),
85 handler,
86 )?)
87 }
88
89 fn on_insert<O>(
90 connection: &RedisAdapter<redis::Connection>,
91 handler: impl FnMut(O) -> () + Send + Sync + 'static,
92 ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>>
93 where
94 O: FromRedisValue,
95 {
96 Ok(loop_on_redis_event(
97 format!("{}:insert:*:{}", EVENT_PREFIX, T::name()),
98 connection.client.clone(),
99 handler,
100 )?)
101 }
102}