1use std::{marker::PhantomData, sync::Arc};
3
4use futures::StreamExt;
5use log::error;
6use redis::{AsyncCommands, Msg};
7use serde::Serialize;
8use tokio::sync::mpsc;
9use serde::de::DeserializeOwned;
10use tracing::instrument;
11
12use crate::{retry_call, ErrorTypes, RedisObjects};
13
14pub struct ListenerBuilder {
17 store: Arc<RedisObjects>,
18 channels: Vec<String>,
19 patterns: Vec<String>,
20}
21
22impl ListenerBuilder {
23
24 pub (crate) fn new(store: Arc<RedisObjects>) -> Self {
25 ListenerBuilder { store, channels: vec![], patterns: vec![] }
26 }
27
28 pub fn subscribe(mut self, channel: String) -> Self {
30 self.channels.push(channel); self
31 }
32
33 pub fn psubscribe(mut self, channel: String) -> Self {
35 self.patterns.push(channel); self
36 }
37
38 pub async fn listen(self) -> mpsc::Receiver<Option<Msg>> {
40
41 let (message_sender, message_receiver) = mpsc::channel(64);
42 let started = Arc::new(tokio::sync::Notify::new());
43 let notify_started = started.clone();
44
45 tokio::spawn(async move {
46 const STARTING_EXPONENT: f64 = -8.0;
47 let mut exponent = STARTING_EXPONENT;
48 let maximum = 3.0;
49
50 'reconnect: loop {
51 if exponent > STARTING_EXPONENT {
52 log::warn!("No connection to Redis, reconnecting...");
53 tokio::time::sleep(tokio::time::Duration::from_secs_f64(2f64.powf(exponent))).await;
54 }
55 exponent = (exponent + 1.0).min(maximum);
56
57 let mut pubsub = match self.store.client.get_async_pubsub().await {
58 Ok(connection) => connection,
59 Err(connection_error) => {
60 error!("Error connecting to pubsub: {connection_error}");
61 continue 'reconnect;
62 }
63 };
64
65 for channel in &self.channels {
66 if pubsub.subscribe(channel).await.is_err() {
67 continue 'reconnect;
68 }
69 }
70 for pattern in &self.patterns {
71 if pubsub.psubscribe(pattern).await.is_err() {
72 continue 'reconnect;
73 }
74 }
75 notify_started.notify_one();
76
77 let mut stream = pubsub.on_message();
78 while let Some(message) = stream.next().await {
79 if message_sender.send(Some(message)).await.is_err() {
82 break 'reconnect
83 }
84 exponent = STARTING_EXPONENT + 1.0;
85 }
86 if message_sender.send(None).await.is_err() {
87 break 'reconnect
88 }
89 }
90 });
91
92 started.notified().await;
93 message_receiver
94 }
95}
96
97
98
99pub struct JsonListenerBuilder<Message: DeserializeOwned> {
102 store: Arc<RedisObjects>,
103 channels: Vec<String>,
104 patterns: Vec<String>,
105 _data: PhantomData<Message>
106}
107
108impl<Message: DeserializeOwned + Send + 'static> JsonListenerBuilder<Message> {
109
110 pub (crate) fn new(store: Arc<RedisObjects>) -> Self {
111 JsonListenerBuilder { store, channels: vec![], patterns: vec![], _data: Default::default() }
112 }
113
114 pub fn subscribe(mut self, channel: String) -> Self {
116 self.channels.push(channel); self
117 }
118
119 pub fn psubscribe(mut self, channel: String) -> Self {
121 self.patterns.push(channel); self
122 }
123
124 pub async fn listen(self) -> mpsc::Receiver<Option<Message>> {
126
127 let (parsed_sender, parsed_receiver) = mpsc::channel(2);
128
129 let mut message_reciever = ListenerBuilder {
130 store: self.store,
131 channels: self.channels,
132 patterns: self.patterns
133 }.listen().await;
134
135 tokio::spawn(async move {
136 while let Some(message) = message_reciever.recv().await {
137 let message = match message {
138 Some(message) => message,
139 None => {
140 if parsed_sender.send(None).await.is_err() {
141 break
142 }
143 continue
144 }
145 };
146
147 let result = match serde_json::from_slice(message.get_payload_bytes()) {
148 Ok(message) => parsed_sender.send(Some(message)).await,
149 Err(err) => {
150 error!("Could not process pubsub message: {err}");
151 parsed_sender.send(None).await
152 }
153 };
154
155 if result.is_err() {
156 break
157 }
158 }
159 });
160
161 parsed_receiver
162 }
163}
164
165pub struct Publisher {
167 store: Arc<RedisObjects>,
168 channel: String,
169}
170
171impl std::fmt::Debug for Publisher {
172 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173 f.debug_struct("Publisher").field("store", &self.store).field("channel", &self.channel).finish()
174 }
175}
176
177impl Publisher {
178 pub (crate) fn new(store: Arc<RedisObjects>, channel: String) -> Self {
179 Publisher { store, channel }
180 }
181
182 #[instrument(skip(data))]
184 pub async fn publish<T: Serialize>(&self, data: &T) -> Result<(), ErrorTypes> {
185 self.publish_data(&serde_json::to_vec(data)?).await
186 }
187
188 #[instrument(skip(data))]
190 pub async fn publish_data(&self, data: &[u8]) -> Result<(), ErrorTypes> {
191 retry_call!(self.store.pool, publish, &self.channel, data)
192 }
193}