Skip to main content

gn_matchmaking_state/adapters/redis/publisher/
native.rs

1use redis::{Connection, FromRedisValue};
2
3use crate::adapters::{
4    redis::{NotifyOnRedisEvent, RedisAdapter, RedisIdentifiable},
5    InfoPublisher, Publishable,
6};
7
8const EVENT_PREFIX: &str = "events";
9#[derive(Default)]
10pub struct RedisInfoPublisher {
11    connection: Option<Connection>,
12}
13
14impl RedisInfoPublisher {
15    #[inline]
16    pub fn new(connection: Connection) -> Self {
17        Self {
18            connection: Some(connection),
19        }
20    }
21}
22
23impl InfoPublisher<redis::Connection> for RedisInfoPublisher {
24    fn publish(
25        &mut self,
26        created: &dyn Publishable<redis::Connection>,
27        event: String,
28    ) -> Result<(), Box<dyn std::error::Error>> {
29        created.publish(
30            self.connection.as_mut().unwrap(),
31            format!("{}:{}", EVENT_PREFIX, event),
32        )?;
33        Ok(())
34    }
35}
36
37fn loop_on_redis_event<T>(
38    channel: String,
39    client: redis::Client,
40    mut handler: impl FnMut(T) -> () + Send + Sync + 'static,
41) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>>
42where
43    T: FromRedisValue,
44{
45    let mut connection = client.get_connection()?;
46    Ok(tokio::task::spawn_blocking(move || {
47        let mut connection = connection.as_pubsub();
48        connection.psubscribe(channel).unwrap(); // TODO: Handle error
49        loop {
50            let msg = connection.get_message().unwrap();
51            let payload = msg.get_payload::<T>().unwrap();
52            handler(payload);
53        }
54    }))
55}
56
57impl<T> NotifyOnRedisEvent<redis::Connection> for T
58where
59    T: RedisIdentifiable,
60{
61    fn on_update<O>(
62        connection: &RedisAdapter<redis::Connection>,
63        handler: impl FnMut(O) -> () + Send + Sync + 'static,
64    ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>>
65    where
66        O: FromRedisValue,
67    {
68        Ok(loop_on_redis_event(
69            format!("{}:update:*:{}", EVENT_PREFIX, T::name()),
70            connection.client.clone(),
71            handler,
72        )?)
73    }
74
75    fn on_delete<O>(
76        connection: &RedisAdapter<redis::Connection>,
77        handler: impl FnMut(O) -> () + Send + Sync + 'static,
78    ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>>
79    where
80        O: FromRedisValue,
81    {
82        Ok(loop_on_redis_event(
83            format!("{}:delete:*:{}", EVENT_PREFIX, T::name()),
84            connection.client.clone(),
85            handler,
86        )?)
87    }
88
89    fn on_insert<O>(
90        connection: &RedisAdapter<redis::Connection>,
91        handler: impl FnMut(O) -> () + Send + Sync + 'static,
92    ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>>
93    where
94        O: FromRedisValue,
95    {
96        Ok(loop_on_redis_event(
97            format!("{}:insert:*:{}", EVENT_PREFIX, T::name()),
98            connection.client.clone(),
99            handler,
100        )?)
101    }
102}