thruster_socketio/
redis_pubsub.rs1use futures_util::StreamExt;
2use log::{debug, error};
3use std::sync::RwLock;
4use tokio;
5use trezm_redis::AsyncCommands;
6use trezm_redis::RedisResult;
7
8use tokio::sync::broadcast::channel as unbounded;
9use tokio::sync::broadcast::Sender;
10
11use crate::rooms::get_sockets_for_room;
12use crate::sid::generate_sid;
13use crate::socketio::{InternalMessage, SocketIOAdapter};
14use crate::socketio_message::SocketIOMessage;
15
16lazy_static! {
17 static ref CHANNEL: RwLock<Vec<Sender<SocketIOToRedisMessage>>> = RwLock::new(Vec::new());
18}
19
20#[derive(Clone)]
21pub struct RedisAdapter {}
22
23impl SocketIOAdapter for RedisAdapter {
24 fn incoming(&self, room_id: &str, message: &SocketIOMessage) {
25 send_message(room_id, message.clone())
28 }
29
30 fn outgoing(&self, _room_id: &str, _message: &SocketIOMessage) {
31 }
35}
36
37#[derive(Clone)]
38struct SocketIOToRedisMessage {
39 room_id: String,
40 socket_io_message: SocketIOMessage,
41}
42
43#[derive(Serialize, Deserialize, Debug)]
44struct RedisMessage {
45 channel: String,
46 room_id: String,
47 event: String,
48 message: String,
49 sending_id: String,
50}
51
52pub fn send_message(room_id: &str, message: SocketIOMessage) {
53 for sender in &*CHANNEL.read().unwrap() {
54 let socket_io_to_redis_message = match message {
55 SocketIOMessage::Message(ref event, ref message) => Some(SocketIOToRedisMessage {
56 room_id: room_id.to_owned(),
57 socket_io_message: SocketIOMessage::Message(event.clone(), message.clone()),
58 }),
59 SocketIOMessage::SendMessage(ref event, ref message) => Some(SocketIOToRedisMessage {
60 room_id: room_id.to_owned(),
61 socket_io_message: SocketIOMessage::SendMessage(event.clone(), message.clone()),
62 }),
63 SocketIOMessage::Join(_) => None,
64 SocketIOMessage::AddListener(_, _) => None,
65 _ => {
66 error!(
67 "Received a message that was not RawMessage, Message, or SendMessage: {}",
68 message
69 );
70 None
71 }
72 };
73
74 if let Some(val) = socket_io_to_redis_message {
75 let _ = sender.send(val);
76 }
77 }
78}
79
80pub async fn connect_to_pubsub(redis_host: &str, channel_name: &str) -> RedisResult<()> {
83 connect_to_pubsub_with_capacity(redis_host, channel_name, 16).await
84}
85
86pub async fn connect_to_pubsub_with_capacity(
91 redis_host: &str,
92 channel_name: &str,
93 message_capacity: usize,
94) -> RedisResult<()> {
95 let redis_host = redis_host.to_string();
96 let channel_name = channel_name.to_string();
97
98 let client = trezm_redis::Client::open(redis_host).unwrap();
99 let mut publish_conn = client.get_async_connection().await?;
100
101 let (sender, mut receiver) = unbounded(message_capacity);
102
103 CHANNEL.write().unwrap().push(sender);
104
105 let channel_name = channel_name.to_string();
106 let channel_name_outgoing = channel_name.clone();
107 let channel_name_incoming = channel_name;
108 let sending_id = generate_sid();
109 let sending_id_outgoing = sending_id.clone();
110 let sending_id_incoming = sending_id;
111
112 tokio::spawn(async move {
114 while let Ok(val) = receiver.recv().await {
115 debug!("local -> redis: {} {}", val.room_id, val.socket_io_message);
116
117 match val.socket_io_message {
118 SocketIOMessage::SendMessage(event, message) => {
119 let _ = publish_conn
120 .publish::<'_, _, _, String>(
121 channel_name_outgoing.clone(),
122 serde_json::to_string(&RedisMessage {
123 channel: channel_name_outgoing.clone(),
124 room_id: val.room_id.clone(),
125 event,
126 message,
127 sending_id: sending_id_outgoing.clone(),
128 })
129 .unwrap(),
130 )
131 .await;
132 }
133 SocketIOMessage::Message(event, message) => {
134 let _ = publish_conn
135 .publish::<'_, _, _, String>(
136 channel_name_outgoing.clone(),
137 serde_json::to_string(&RedisMessage {
138 channel: channel_name_outgoing.clone(),
139 room_id: val.room_id.clone(),
140 event,
141 message,
142 sending_id: sending_id_outgoing.clone(),
143 })
144 .unwrap(),
145 )
146 .await;
147 }
148 _ => (),
149 }
150 }
151 });
152
153 tokio::spawn(async move {
155 let mut pubsub_conn = client.get_async_connection().await.unwrap().into_pubsub();
156
157 println!("channel_name_incoming {}", channel_name_incoming);
158 println!("{:#?}", pubsub_conn.subscribe(channel_name_incoming).await);
159 let mut pubsub_stream = pubsub_conn.on_message();
161
162 while let Some(msg) = pubsub_stream.next().await {
163 let message: RedisMessage =
164 serde_json::from_str(&msg.get_payload::<String>().unwrap()).unwrap();
165
166 debug!(
167 "redis -> local: {} {} {}",
168 message.room_id, message.event, message.message
169 );
170
171 if message.sending_id != sending_id_incoming {
172 if let Some(sockets) = get_sockets_for_room(&message.room_id) {
173 for socket in &*sockets {
174 socket.send(InternalMessage::IO(SocketIOMessage::SendMessage(
175 message.event.to_string(),
176 message.message.to_string(),
177 )));
178 }
179 };
180 }
181 }
182 });
183
184 Ok(())
185}