redis_objects/
pubsub.rs

1//! objects for interacting with pubsubs
2use std::{marker::PhantomData, sync::Arc};
3
4use futures::StreamExt;
5use log::error;
6use redis::{AsyncCommands, Msg};
7use serde::Serialize;
8use tokio::sync::mpsc;
9use serde::de::DeserializeOwned;
10
11use crate::{retry_call, ErrorTypes, RedisObjects};
12
13/// Struct to setup a stream reading from a pubsub
14/// The content of the pubsub is not processed
15pub struct ListenerBuilder {
16    store: Arc<RedisObjects>,
17    channels: Vec<String>,
18    patterns: Vec<String>,
19}
20
21impl ListenerBuilder {
22
23    pub (crate) fn new(store: Arc<RedisObjects>) -> Self {
24        ListenerBuilder { store, channels: vec![], patterns: vec![] }
25    }
26
27    /// Subscribe to a fixed channel 
28    pub fn subscribe(mut self, channel: String) -> Self {
29        self.channels.push(channel); self
30    }
31
32    /// Subscribe to all channels matching this pattern
33    pub fn psubscribe(mut self, channel: String) -> Self {
34        self.patterns.push(channel); self
35    }
36
37    /// Launch the task reading from the pubsub
38    pub async fn listen(self) -> mpsc::Receiver<Option<Msg>> {
39
40        let (message_sender, message_receiver) = mpsc::channel(64);
41        let started = Arc::new(tokio::sync::Notify::new());
42        let notify_started = started.clone();
43
44        tokio::spawn(async move {
45            const STARTING_EXPONENT: f64 = -8.0;
46            let mut exponent = STARTING_EXPONENT;
47            let maximum = 3.0;
48
49            'reconnect: loop {
50                if exponent > STARTING_EXPONENT {
51                    log::warn!("No connection to Redis, reconnecting...");
52                    tokio::time::sleep(tokio::time::Duration::from_secs_f64(2f64.powf(exponent))).await;
53                }
54                exponent = (exponent + 1.0).min(maximum);
55
56                let mut pubsub = match self.store.client.get_async_pubsub().await {
57                    Ok(connection) => connection,
58                    Err(connection_error) => {
59                        error!("Error connecting to pubsub: {connection_error}");
60                        continue 'reconnect;
61                    }
62                };
63
64                for channel in &self.channels {
65                    if pubsub.subscribe(channel).await.is_err() {
66                        continue 'reconnect;
67                    }
68                }
69                for pattern in &self.patterns {
70                    if pubsub.psubscribe(pattern).await.is_err() {
71                        continue 'reconnect;
72                    }
73                }
74                notify_started.notify_waiters();
75
76                let mut stream = pubsub.on_message();
77                while let Some(message) = stream.next().await {
78                    // if the send fails it means the other end of the channel has dropped 
79                    // and we can stop listening 
80                    if message_sender.send(Some(message)).await.is_err() {
81                        break 'reconnect
82                    }
83                    exponent = STARTING_EXPONENT + 1.0;
84                }
85                if message_sender.send(None).await.is_err() {
86                    break 'reconnect
87                }
88            }
89        });
90
91        started.notified().await;
92        message_receiver
93    }
94}
95
96
97
98/// Struct to setup a stream reading from a pubsub
99/// The content of the Pubsub must be a JSON serialized object
100pub struct JsonListenerBuilder<Message: DeserializeOwned> {
101    store: Arc<RedisObjects>,
102    channels: Vec<String>,
103    patterns: Vec<String>,
104    _data: PhantomData<Message>
105}
106
107impl<Message: DeserializeOwned + Send + 'static> JsonListenerBuilder<Message> {
108
109    pub (crate) fn new(store: Arc<RedisObjects>) -> Self {
110        JsonListenerBuilder { store, channels: vec![], patterns: vec![], _data: Default::default() }
111    }
112
113    /// Subscribe to a fixed channel 
114    pub fn subscribe(mut self, channel: String) -> Self {
115        self.channels.push(channel); self
116    }
117
118    /// Subscribe to all channels matching this pattern
119    pub fn psubscribe(mut self, channel: String) -> Self {
120        self.patterns.push(channel); self
121    }
122
123    /// Launch the task reading from the pubsub
124    pub async fn listen(self) -> mpsc::Receiver<Option<Message>> {
125
126        let (parsed_sender, parsed_receiver) = mpsc::channel(2);
127
128        let mut message_reciever = ListenerBuilder {
129            store: self.store,
130            channels: self.channels,
131            patterns: self.patterns
132        }.listen().await;
133
134        tokio::spawn(async move {
135            while let Some(message) = message_reciever.recv().await {
136                let message = match message {
137                    Some(message) => message,
138                    None => {
139                        if parsed_sender.send(None).await.is_err() {
140                            break
141                        }
142                        continue
143                    }
144                };
145
146                let result = match serde_json::from_slice(message.get_payload_bytes()) {
147                    Ok(message) => parsed_sender.send(Some(message)).await,
148                    Err(err) => {
149                        error!("Could not process pubsub message: {err}");
150                        parsed_sender.send(None).await
151                    }
152                };
153
154                if result.is_err() {
155                    break
156                }
157            }
158        });
159
160        parsed_receiver
161    }
162}
163
164/// Hold connection and channel name for publishing to a pubsub
165pub struct Publisher {
166    store: Arc<RedisObjects>,
167    channel: String,
168}
169
170impl Publisher {
171    pub (crate) fn new(store: Arc<RedisObjects>, channel: String) -> Self {
172        Publisher { store, channel }
173    }
174
175    /// Publish a message in a serializable type
176    pub async fn publish<T: Serialize>(&self, data: &T) -> Result<(), ErrorTypes> {
177        self.publish_data(&serde_json::to_vec(data)?).await
178    }
179
180    /// Publish raw data as a pubsub message
181    pub async fn publish_data(&self, data: &[u8]) -> Result<(), ErrorTypes> {
182        retry_call!(self.store.pool, publish, &self.channel, data)
183    }
184}