redis_objects/
pubsub.rs

1//! objects for interacting with pubsubs
2use std::{marker::PhantomData, sync::Arc};
3
4use futures::StreamExt;
5use log::error;
6use redis::{AsyncCommands, Msg};
7use serde::Serialize;
8use tokio::sync::mpsc;
9use serde::de::DeserializeOwned;
10use tracing::instrument;
11
12use crate::{retry_call, ErrorTypes, RedisObjects};
13
14/// Struct to setup a stream reading from a pubsub
15/// The content of the pubsub is not processed
16pub struct ListenerBuilder {
17    store: Arc<RedisObjects>,
18    channels: Vec<String>,
19    patterns: Vec<String>,
20}
21
22impl ListenerBuilder {
23
24    pub (crate) fn new(store: Arc<RedisObjects>) -> Self {
25        ListenerBuilder { store, channels: vec![], patterns: vec![] }
26    }
27
28    /// Subscribe to a fixed channel 
29    pub fn subscribe(mut self, channel: String) -> Self {
30        self.channels.push(channel); self
31    }
32
33    /// Subscribe to all channels matching this pattern
34    pub fn psubscribe(mut self, channel: String) -> Self {
35        self.patterns.push(channel); self
36    }
37
38    /// Launch the task reading from the pubsub
39    pub async fn listen(self) -> mpsc::Receiver<Option<Msg>> {
40
41        let (message_sender, message_receiver) = mpsc::channel(64);
42        let started = Arc::new(tokio::sync::Notify::new());
43        let notify_started = started.clone();
44
45        tokio::spawn(async move {
46            const STARTING_EXPONENT: f64 = -8.0;
47            let mut exponent = STARTING_EXPONENT;
48            let maximum = 3.0;
49
50            'reconnect: loop {
51                if exponent > STARTING_EXPONENT {
52                    log::warn!("No connection to Redis, reconnecting...");
53                    tokio::time::sleep(tokio::time::Duration::from_secs_f64(2f64.powf(exponent))).await;
54                }
55                exponent = (exponent + 1.0).min(maximum);
56
57                let mut pubsub = match self.store.client.get_async_pubsub().await {
58                    Ok(connection) => connection,
59                    Err(connection_error) => {
60                        error!("Error connecting to pubsub: {connection_error}");
61                        continue 'reconnect;
62                    }
63                };
64
65                for channel in &self.channels {
66                    if pubsub.subscribe(channel).await.is_err() {
67                        continue 'reconnect;
68                    }
69                }
70                for pattern in &self.patterns {
71                    if pubsub.psubscribe(pattern).await.is_err() {
72                        continue 'reconnect;
73                    }
74                }
75                notify_started.notify_one();
76
77                let mut stream = pubsub.on_message();
78                while let Some(message) = stream.next().await {
79                    // if the send fails it means the other end of the channel has dropped 
80                    // and we can stop listening 
81                    if message_sender.send(Some(message)).await.is_err() {
82                        break 'reconnect
83                    }
84                    exponent = STARTING_EXPONENT + 1.0;
85                }
86                if message_sender.send(None).await.is_err() {
87                    break 'reconnect
88                }
89            }
90        });
91
92        started.notified().await;
93        message_receiver
94    }
95}
96
97
98
99/// Struct to setup a stream reading from a pubsub
100/// The content of the Pubsub must be a JSON serialized object
101pub struct JsonListenerBuilder<Message: DeserializeOwned> {
102    store: Arc<RedisObjects>,
103    channels: Vec<String>,
104    patterns: Vec<String>,
105    _data: PhantomData<Message>
106}
107
108impl<Message: DeserializeOwned + Send + 'static> JsonListenerBuilder<Message> {
109
110    pub (crate) fn new(store: Arc<RedisObjects>) -> Self {
111        JsonListenerBuilder { store, channels: vec![], patterns: vec![], _data: Default::default() }
112    }
113
114    /// Subscribe to a fixed channel 
115    pub fn subscribe(mut self, channel: String) -> Self {
116        self.channels.push(channel); self
117    }
118
119    /// Subscribe to all channels matching this pattern
120    pub fn psubscribe(mut self, channel: String) -> Self {
121        self.patterns.push(channel); self
122    }
123
124    /// Launch the task reading from the pubsub
125    pub async fn listen(self) -> mpsc::Receiver<Option<Message>> {
126
127        let (parsed_sender, parsed_receiver) = mpsc::channel(2);
128
129        let mut message_reciever = ListenerBuilder {
130            store: self.store,
131            channels: self.channels,
132            patterns: self.patterns
133        }.listen().await;
134
135        tokio::spawn(async move {
136            while let Some(message) = message_reciever.recv().await {
137                let message = match message {
138                    Some(message) => message,
139                    None => {
140                        if parsed_sender.send(None).await.is_err() {
141                            break
142                        }
143                        continue
144                    }
145                };
146
147                let result = match serde_json::from_slice(message.get_payload_bytes()) {
148                    Ok(message) => parsed_sender.send(Some(message)).await,
149                    Err(err) => {
150                        error!("Could not process pubsub message: {err}");
151                        parsed_sender.send(None).await
152                    }
153                };
154
155                if result.is_err() {
156                    break
157                }
158            }
159        });
160
161        parsed_receiver
162    }
163}
164
165/// Hold connection and channel name for publishing to a pubsub
166pub struct Publisher {
167    store: Arc<RedisObjects>,
168    channel: String,
169}
170
171impl std::fmt::Debug for Publisher {
172    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173        f.debug_struct("Publisher").field("store", &self.store).field("channel", &self.channel).finish()
174    }
175}
176
177impl Publisher {
178    pub (crate) fn new(store: Arc<RedisObjects>, channel: String) -> Self {
179        Publisher { store, channel }
180    }
181
182    /// Publish a message in a serializable type
183    #[instrument(skip(data))]
184    pub async fn publish<T: Serialize>(&self, data: &T) -> Result<(), ErrorTypes> {
185        self.publish_data(&serde_json::to_vec(data)?).await
186    }
187
188    /// Publish raw data as a pubsub message
189    #[instrument(skip(data))]
190    pub async fn publish_data(&self, data: &[u8]) -> Result<(), ErrorTypes> {
191        retry_call!(self.store.pool, publish, &self.channel, data)
192    }
193}