1use std::{marker::PhantomData, sync::Arc};
3
4use futures::StreamExt;
5use log::error;
6use redis::{AsyncCommands, Msg};
7use serde::Serialize;
8use tokio::sync::mpsc;
9use serde::de::DeserializeOwned;
10
11use crate::{retry_call, ErrorTypes, RedisObjects};
12
13pub struct ListenerBuilder {
16 store: Arc<RedisObjects>,
17 channels: Vec<String>,
18 patterns: Vec<String>,
19}
20
21impl ListenerBuilder {
22
23 pub (crate) fn new(store: Arc<RedisObjects>) -> Self {
24 ListenerBuilder { store, channels: vec![], patterns: vec![] }
25 }
26
27 pub fn subscribe(mut self, channel: String) -> Self {
29 self.channels.push(channel); self
30 }
31
32 pub fn psubscribe(mut self, channel: String) -> Self {
34 self.patterns.push(channel); self
35 }
36
37 pub async fn listen(self) -> mpsc::Receiver<Option<Msg>> {
39
40 let (message_sender, message_receiver) = mpsc::channel(64);
41 let started = Arc::new(tokio::sync::Notify::new());
42 let notify_started = started.clone();
43
44 tokio::spawn(async move {
45 const STARTING_EXPONENT: f64 = -8.0;
46 let mut exponent = STARTING_EXPONENT;
47 let maximum = 3.0;
48
49 'reconnect: loop {
50 if exponent > STARTING_EXPONENT {
51 log::warn!("No connection to Redis, reconnecting...");
52 tokio::time::sleep(tokio::time::Duration::from_secs_f64(2f64.powf(exponent))).await;
53 }
54 exponent = (exponent + 1.0).min(maximum);
55
56 let mut pubsub = match self.store.client.get_async_pubsub().await {
57 Ok(connection) => connection,
58 Err(connection_error) => {
59 error!("Error connecting to pubsub: {connection_error}");
60 continue 'reconnect;
61 }
62 };
63
64 for channel in &self.channels {
65 if pubsub.subscribe(channel).await.is_err() {
66 continue 'reconnect;
67 }
68 }
69 for pattern in &self.patterns {
70 if pubsub.psubscribe(pattern).await.is_err() {
71 continue 'reconnect;
72 }
73 }
74 notify_started.notify_waiters();
75
76 let mut stream = pubsub.on_message();
77 while let Some(message) = stream.next().await {
78 if message_sender.send(Some(message)).await.is_err() {
81 break 'reconnect
82 }
83 exponent = STARTING_EXPONENT + 1.0;
84 }
85 if message_sender.send(None).await.is_err() {
86 break 'reconnect
87 }
88 }
89 });
90
91 started.notified().await;
92 message_receiver
93 }
94}
95
96
97
98pub struct JsonListenerBuilder<Message: DeserializeOwned> {
101 store: Arc<RedisObjects>,
102 channels: Vec<String>,
103 patterns: Vec<String>,
104 _data: PhantomData<Message>
105}
106
107impl<Message: DeserializeOwned + Send + 'static> JsonListenerBuilder<Message> {
108
109 pub (crate) fn new(store: Arc<RedisObjects>) -> Self {
110 JsonListenerBuilder { store, channels: vec![], patterns: vec![], _data: Default::default() }
111 }
112
113 pub fn subscribe(mut self, channel: String) -> Self {
115 self.channels.push(channel); self
116 }
117
118 pub fn psubscribe(mut self, channel: String) -> Self {
120 self.patterns.push(channel); self
121 }
122
123 pub async fn listen(self) -> mpsc::Receiver<Option<Message>> {
125
126 let (parsed_sender, parsed_receiver) = mpsc::channel(2);
127
128 let mut message_reciever = ListenerBuilder {
129 store: self.store,
130 channels: self.channels,
131 patterns: self.patterns
132 }.listen().await;
133
134 tokio::spawn(async move {
135 while let Some(message) = message_reciever.recv().await {
136 let message = match message {
137 Some(message) => message,
138 None => {
139 if parsed_sender.send(None).await.is_err() {
140 break
141 }
142 continue
143 }
144 };
145
146 let result = match serde_json::from_slice(message.get_payload_bytes()) {
147 Ok(message) => parsed_sender.send(Some(message)).await,
148 Err(err) => {
149 error!("Could not process pubsub message: {err}");
150 parsed_sender.send(None).await
151 }
152 };
153
154 if result.is_err() {
155 break
156 }
157 }
158 });
159
160 parsed_receiver
161 }
162}
163
164pub struct Publisher {
166 store: Arc<RedisObjects>,
167 channel: String,
168}
169
170impl Publisher {
171 pub (crate) fn new(store: Arc<RedisObjects>, channel: String) -> Self {
172 Publisher { store, channel }
173 }
174
175 pub async fn publish<T: Serialize>(&self, data: &T) -> Result<(), ErrorTypes> {
177 self.publish_data(&serde_json::to_vec(data)?).await
178 }
179
180 pub async fn publish_data(&self, data: &[u8]) -> Result<(), ErrorTypes> {
182 retry_call!(self.store.pool, publish, &self.channel, data)
183 }
184}