thruster_socketio/
redis_pubsub.rs

1use futures_util::StreamExt;
2use log::{debug, error};
3use std::sync::RwLock;
4use tokio;
5use trezm_redis::AsyncCommands;
6use trezm_redis::RedisResult;
7
8use tokio::sync::broadcast::channel as unbounded;
9use tokio::sync::broadcast::Sender;
10
11use crate::rooms::get_sockets_for_room;
12use crate::sid::generate_sid;
13use crate::socketio::{InternalMessage, SocketIOAdapter};
14use crate::socketio_message::SocketIOMessage;
15
16lazy_static! {
17    static ref CHANNEL: RwLock<Vec<Sender<SocketIOToRedisMessage>>> = RwLock::new(Vec::new());
18}
19
20#[derive(Clone)]
21pub struct RedisAdapter {}
22
23impl SocketIOAdapter for RedisAdapter {
24    fn incoming(&self, room_id: &str, message: &SocketIOMessage) {
25        // Here we need to relay the message to the redis pubsub
26        // This is client -> us -> redis
27        send_message(room_id, message.clone())
28    }
29
30    fn outgoing(&self, _room_id: &str, _message: &SocketIOMessage) {
31        // Here we need to forward the message
32        // This is redis -> us -> client
33        // Automagically handled by the listener
34    }
35}
36
37#[derive(Clone)]
38struct SocketIOToRedisMessage {
39    room_id: String,
40    socket_io_message: SocketIOMessage,
41}
42
43#[derive(Serialize, Deserialize, Debug)]
44struct RedisMessage {
45    channel: String,
46    room_id: String,
47    event: String,
48    message: String,
49    sending_id: String,
50}
51
52pub fn send_message(room_id: &str, message: SocketIOMessage) {
53    for sender in &*CHANNEL.read().unwrap() {
54        let socket_io_to_redis_message = match message {
55            SocketIOMessage::Message(ref event, ref message) => Some(SocketIOToRedisMessage {
56                room_id: room_id.to_owned(),
57                socket_io_message: SocketIOMessage::Message(event.clone(), message.clone()),
58            }),
59            SocketIOMessage::SendMessage(ref event, ref message) => Some(SocketIOToRedisMessage {
60                room_id: room_id.to_owned(),
61                socket_io_message: SocketIOMessage::SendMessage(event.clone(), message.clone()),
62            }),
63            SocketIOMessage::Join(_) => None,
64            SocketIOMessage::AddListener(_, _) => None,
65            _ => {
66                error!(
67                    "Received a message that was not RawMessage, Message, or SendMessage: {}",
68                    message
69                );
70                None
71            }
72        };
73
74        if let Some(val) = socket_io_to_redis_message {
75            let _ = sender.send(val);
76        }
77    }
78}
79
80/// Connect to a redis host using a particular channel name (in redis) in order to pass messages
81/// between servers or processes.
82pub async fn connect_to_pubsub(redis_host: &str, channel_name: &str) -> RedisResult<()> {
83    connect_to_pubsub_with_capacity(redis_host, channel_name, 16).await
84}
85
86/// Connect to a redis host using a particular channel name (in redis) in order to pass messages
87/// between servers or processes.
88///
89/// Capacity represents the maximum number of in-flight messages before processing has occurred.
90pub async fn connect_to_pubsub_with_capacity(
91    redis_host: &str,
92    channel_name: &str,
93    message_capacity: usize,
94) -> RedisResult<()> {
95    let redis_host = redis_host.to_string();
96    let channel_name = channel_name.to_string();
97
98    let client = trezm_redis::Client::open(redis_host).unwrap();
99    let mut publish_conn = client.get_async_connection().await?;
100
101    let (sender, mut receiver) = unbounded(message_capacity);
102
103    CHANNEL.write().unwrap().push(sender);
104
105    let channel_name = channel_name.to_string();
106    let channel_name_outgoing = channel_name.clone();
107    let channel_name_incoming = channel_name;
108    let sending_id = generate_sid();
109    let sending_id_outgoing = sending_id.clone();
110    let sending_id_incoming = sending_id;
111
112    // Handle pubbing local requests into redis
113    tokio::spawn(async move {
114        while let Ok(val) = receiver.recv().await {
115            debug!("local -> redis: {} {}", val.room_id, val.socket_io_message);
116
117            match val.socket_io_message {
118                SocketIOMessage::SendMessage(event, message) => {
119                    let _ = publish_conn
120                        .publish::<'_, _, _, String>(
121                            channel_name_outgoing.clone(),
122                            serde_json::to_string(&RedisMessage {
123                                channel: channel_name_outgoing.clone(),
124                                room_id: val.room_id.clone(),
125                                event,
126                                message,
127                                sending_id: sending_id_outgoing.clone(),
128                            })
129                            .unwrap(),
130                        )
131                        .await;
132                }
133                SocketIOMessage::Message(event, message) => {
134                    let _ = publish_conn
135                        .publish::<'_, _, _, String>(
136                            channel_name_outgoing.clone(),
137                            serde_json::to_string(&RedisMessage {
138                                channel: channel_name_outgoing.clone(),
139                                room_id: val.room_id.clone(),
140                                event,
141                                message,
142                                sending_id: sending_id_outgoing.clone(),
143                            })
144                            .unwrap(),
145                        )
146                        .await;
147                }
148                _ => (),
149            }
150        }
151    });
152
153    // Handle subbing local requests from redis
154    tokio::spawn(async move {
155        let mut pubsub_conn = client.get_async_connection().await.unwrap().into_pubsub();
156
157        println!("channel_name_incoming {}", channel_name_incoming);
158        println!("{:#?}", pubsub_conn.subscribe(channel_name_incoming).await);
159        // .expect("Was unable to subscribe to incoming channel in redis");
160        let mut pubsub_stream = pubsub_conn.on_message();
161
162        while let Some(msg) = pubsub_stream.next().await {
163            let message: RedisMessage =
164                serde_json::from_str(&msg.get_payload::<String>().unwrap()).unwrap();
165
166            debug!(
167                "redis -> local: {} {} {}",
168                message.room_id, message.event, message.message
169            );
170
171            if message.sending_id != sending_id_incoming {
172                if let Some(sockets) = get_sockets_for_room(&message.room_id) {
173                    for socket in &*sockets {
174                        socket.send(InternalMessage::IO(SocketIOMessage::SendMessage(
175                            message.event.to_string(),
176                            message.message.to_string(),
177                        )));
178                    }
179                };
180            }
181        }
182    });
183
184    Ok(())
185}