simple_redis/
subscriber.rs1#[cfg(test)]
7#[path = "./subscriber_test.rs"]
8mod subscriber_test;
9
10use crate::types::{Interrupts, Message, RedisEmptyResult, RedisError};
11use std::time::Duration;
12
13pub(crate) struct Subscriber {
15 subscriptions: Vec<String>,
16 psubscriptions: Vec<String>,
17 redis_connection: Option<redis::Connection>,
18}
19
20fn subscribe_all<'a>(
21 subscriber: &'a mut Subscriber,
22 client: &redis::Client,
23) -> Result<redis::PubSub<'a>, RedisError> {
24 match client.get_connection() {
26 Ok(redis_connection) => {
27 let redis_connection_ref = subscriber.redis_connection.get_or_insert(redis_connection);
28 let mut redis_pubsub = redis_connection_ref.as_pubsub();
29
30 for channel in &subscriber.subscriptions {
31 let result = redis_pubsub.subscribe(channel);
32
33 if result.is_err() {
34 let subscription_error = match result.err() {
35 Some(error) => Err(RedisError::RedisError(error)),
36 None => Err(RedisError::Description("Error while subscribing.")),
37 };
38
39 return subscription_error;
40 }
41 }
42
43 for channel in &subscriber.psubscriptions {
44 let result = redis_pubsub.psubscribe(channel);
45
46 if result.is_err() {
47 let subscription_error = match result.err() {
48 Some(error) => Err(RedisError::RedisError(error)),
49 None => Err(RedisError::Description("Error while subscribing.")),
50 };
51
52 return subscription_error;
53 }
54 }
55
56 Ok(redis_pubsub)
57 }
58 Err(error) => Err(RedisError::RedisError(error)),
59 }
60}
61
62fn fetch_messages(
63 mut redis_pubsub: redis::PubSub,
64 on_message: &mut dyn FnMut(Message) -> bool,
65 poll_interrupts: &mut dyn FnMut() -> Interrupts,
66) -> RedisEmptyResult {
67 loop {
68 let interrupts = poll_interrupts();
69 if interrupts.stop {
70 return Ok(());
71 } else {
72 let duration_millis = interrupts.next_polling_time.unwrap_or(5000);
73
74 let read_timeout = if duration_millis == 0 {
75 None
76 } else {
77 Some(Duration::from_millis(duration_millis))
78 };
79 if let Err(error) = redis_pubsub.set_read_timeout(read_timeout) {
80 return Err(RedisError::RedisError(error));
81 };
82
83 let message_result = redis_pubsub.get_message();
84
85 match message_result {
86 Ok(message) => {
87 if on_message(message) {
88 return Ok(());
89 }
90 }
91 Err(error) => {
92 if !error.is_timeout() {
93 return Err(RedisError::RedisError(error));
94 }
95 }
96 }
97 }
98 }
99}
100
101fn subscribe_and_fetch(
102 subscriber: &mut Subscriber,
103 client: &redis::Client,
104 on_message: &mut dyn FnMut(Message) -> bool,
105 poll_interrupts: &mut dyn FnMut() -> Interrupts,
106) -> RedisEmptyResult {
107 match subscribe_all(subscriber, client) {
108 Err(error) => Err(error),
109 Ok(pubsub) => fetch_messages(pubsub, on_message, poll_interrupts),
110 }
111}
112
113fn subscribe(subscriber: &mut Subscriber, channel: &str, pattern: bool) -> RedisEmptyResult {
114 if pattern {
115 subscriber.psubscriptions.push(channel.to_string());
116 } else {
117 subscriber.subscriptions.push(channel.to_string());
118 }
119
120 Ok(())
121}
122
123fn unsubscribe(subscriber: &mut Subscriber, channel: &str, pattern: bool) -> RedisEmptyResult {
124 let search_result = if pattern {
125 subscriber.psubscriptions.iter().position(|x| x == channel)
126 } else {
127 subscriber.subscriptions.iter().position(|x| x == channel)
128 };
129
130 match search_result {
131 Some(index) => {
132 if pattern {
133 subscriber.psubscriptions.remove(index);
134 } else {
135 subscriber.subscriptions.remove(index);
136 }
137
138 Ok(())
139 }
140 None => Ok(()),
141 }
142}
143
144impl Subscriber {
145 pub(crate) fn subscribe(self: &mut Subscriber, channel: &str) -> RedisEmptyResult {
146 subscribe(self, channel, false)
147 }
148
149 pub(crate) fn psubscribe(self: &mut Subscriber, channel: &str) -> RedisEmptyResult {
150 subscribe(self, channel, true)
151 }
152
153 pub(crate) fn unsubscribe(self: &mut Subscriber, channel: &str) -> RedisEmptyResult {
154 unsubscribe(self, channel, false)
155 }
156
157 pub(crate) fn punsubscribe(self: &mut Subscriber, channel: &str) -> RedisEmptyResult {
158 unsubscribe(self, channel, true)
159 }
160
161 pub(crate) fn is_subscribed(self: &mut Subscriber, channel: &str) -> bool {
162 let search_result = self.subscriptions.iter().position(|x| x == channel);
163
164 match search_result {
165 None => false,
166 _ => true,
167 }
168 }
169
170 pub(crate) fn is_psubscribed(self: &mut Subscriber, channel: &str) -> bool {
171 let search_result = self.psubscriptions.iter().position(|x| x == channel);
172
173 match search_result {
174 None => false,
175 _ => true,
176 }
177 }
178
179 pub(crate) fn unsubscribe_all(self: &mut Subscriber) -> RedisEmptyResult {
180 self.subscriptions.clear();
181 self.psubscriptions.clear();
182
183 Ok(())
184 }
185
186 fn has_subscriptions(self: &Subscriber) -> bool {
187 !self.subscriptions.is_empty() || !self.psubscriptions.is_empty()
188 }
189
190 pub(crate) fn fetch_messages(
191 self: &mut Subscriber,
192 client: &redis::Client,
193 on_message: &mut dyn FnMut(Message) -> bool,
194 poll_interrupts: &mut dyn FnMut() -> Interrupts,
195 ) -> RedisEmptyResult {
196 if !self.has_subscriptions() {
197 Err(RedisError::Description("No subscriptions defined."))
198 } else {
199 subscribe_and_fetch(self, client, on_message, poll_interrupts)
200 }
201 }
202}
203
204pub(crate) fn create() -> Subscriber {
206 Subscriber {
207 subscriptions: vec![],
208 psubscriptions: vec![],
209 redis_connection: None,
210 }
211}