simple_redis/
subscriber.rs

1//! # commands
2//!
3//! Manages the pubsub subscriber connection and if needed resubscribes in case of reconnections.
4//!
5
6#[cfg(test)]
7#[path = "./subscriber_test.rs"]
8mod subscriber_test;
9
10use crate::types::{Interrupts, Message, RedisEmptyResult, RedisError};
11use std::time::Duration;
12
13/// The redis pubsub wrapper.
14pub(crate) struct Subscriber {
15    subscriptions: Vec<String>,
16    psubscriptions: Vec<String>,
17    redis_connection: Option<redis::Connection>,
18}
19
20fn subscribe_all<'a>(
21    subscriber: &'a mut Subscriber,
22    client: &redis::Client,
23) -> Result<redis::PubSub<'a>, RedisError> {
24    // get pubsub
25    match client.get_connection() {
26        Ok(redis_connection) => {
27            let redis_connection_ref = subscriber.redis_connection.get_or_insert(redis_connection);
28            let mut redis_pubsub = redis_connection_ref.as_pubsub();
29
30            for channel in &subscriber.subscriptions {
31                let result = redis_pubsub.subscribe(channel);
32
33                if result.is_err() {
34                    let subscription_error = match result.err() {
35                        Some(error) => Err(RedisError::RedisError(error)),
36                        None => Err(RedisError::Description("Error while subscribing.")),
37                    };
38
39                    return subscription_error;
40                }
41            }
42
43            for channel in &subscriber.psubscriptions {
44                let result = redis_pubsub.psubscribe(channel);
45
46                if result.is_err() {
47                    let subscription_error = match result.err() {
48                        Some(error) => Err(RedisError::RedisError(error)),
49                        None => Err(RedisError::Description("Error while subscribing.")),
50                    };
51
52                    return subscription_error;
53                }
54            }
55
56            Ok(redis_pubsub)
57        }
58        Err(error) => Err(RedisError::RedisError(error)),
59    }
60}
61
62fn fetch_messages(
63    mut redis_pubsub: redis::PubSub,
64    on_message: &mut dyn FnMut(Message) -> bool,
65    poll_interrupts: &mut dyn FnMut() -> Interrupts,
66) -> RedisEmptyResult {
67    loop {
68        let interrupts = poll_interrupts();
69        if interrupts.stop {
70            return Ok(());
71        } else {
72            let duration_millis = interrupts.next_polling_time.unwrap_or(5000);
73
74            let read_timeout = if duration_millis == 0 {
75                None
76            } else {
77                Some(Duration::from_millis(duration_millis))
78            };
79            if let Err(error) = redis_pubsub.set_read_timeout(read_timeout) {
80                return Err(RedisError::RedisError(error));
81            };
82
83            let message_result = redis_pubsub.get_message();
84
85            match message_result {
86                Ok(message) => {
87                    if on_message(message) {
88                        return Ok(());
89                    }
90                }
91                Err(error) => {
92                    if !error.is_timeout() {
93                        return Err(RedisError::RedisError(error));
94                    }
95                }
96            }
97        }
98    }
99}
100
101fn subscribe_and_fetch(
102    subscriber: &mut Subscriber,
103    client: &redis::Client,
104    on_message: &mut dyn FnMut(Message) -> bool,
105    poll_interrupts: &mut dyn FnMut() -> Interrupts,
106) -> RedisEmptyResult {
107    match subscribe_all(subscriber, client) {
108        Err(error) => Err(error),
109        Ok(pubsub) => fetch_messages(pubsub, on_message, poll_interrupts),
110    }
111}
112
113fn subscribe(subscriber: &mut Subscriber, channel: &str, pattern: bool) -> RedisEmptyResult {
114    if pattern {
115        subscriber.psubscriptions.push(channel.to_string());
116    } else {
117        subscriber.subscriptions.push(channel.to_string());
118    }
119
120    Ok(())
121}
122
123fn unsubscribe(subscriber: &mut Subscriber, channel: &str, pattern: bool) -> RedisEmptyResult {
124    let search_result = if pattern {
125        subscriber.psubscriptions.iter().position(|x| x == channel)
126    } else {
127        subscriber.subscriptions.iter().position(|x| x == channel)
128    };
129
130    match search_result {
131        Some(index) => {
132            if pattern {
133                subscriber.psubscriptions.remove(index);
134            } else {
135                subscriber.subscriptions.remove(index);
136            }
137
138            Ok(())
139        }
140        None => Ok(()),
141    }
142}
143
144impl Subscriber {
145    pub(crate) fn subscribe(self: &mut Subscriber, channel: &str) -> RedisEmptyResult {
146        subscribe(self, channel, false)
147    }
148
149    pub(crate) fn psubscribe(self: &mut Subscriber, channel: &str) -> RedisEmptyResult {
150        subscribe(self, channel, true)
151    }
152
153    pub(crate) fn unsubscribe(self: &mut Subscriber, channel: &str) -> RedisEmptyResult {
154        unsubscribe(self, channel, false)
155    }
156
157    pub(crate) fn punsubscribe(self: &mut Subscriber, channel: &str) -> RedisEmptyResult {
158        unsubscribe(self, channel, true)
159    }
160
161    pub(crate) fn is_subscribed(self: &mut Subscriber, channel: &str) -> bool {
162        let search_result = self.subscriptions.iter().position(|x| x == channel);
163
164        match search_result {
165            None => false,
166            _ => true,
167        }
168    }
169
170    pub(crate) fn is_psubscribed(self: &mut Subscriber, channel: &str) -> bool {
171        let search_result = self.psubscriptions.iter().position(|x| x == channel);
172
173        match search_result {
174            None => false,
175            _ => true,
176        }
177    }
178
179    pub(crate) fn unsubscribe_all(self: &mut Subscriber) -> RedisEmptyResult {
180        self.subscriptions.clear();
181        self.psubscriptions.clear();
182
183        Ok(())
184    }
185
186    fn has_subscriptions(self: &Subscriber) -> bool {
187        !self.subscriptions.is_empty() || !self.psubscriptions.is_empty()
188    }
189
190    pub(crate) fn fetch_messages(
191        self: &mut Subscriber,
192        client: &redis::Client,
193        on_message: &mut dyn FnMut(Message) -> bool,
194        poll_interrupts: &mut dyn FnMut() -> Interrupts,
195    ) -> RedisEmptyResult {
196        if !self.has_subscriptions() {
197            Err(RedisError::Description("No subscriptions defined."))
198        } else {
199            subscribe_and_fetch(self, client, on_message, poll_interrupts)
200        }
201    }
202}
203
204/// Creates and returns a new connection
205pub(crate) fn create() -> Subscriber {
206    Subscriber {
207        subscriptions: vec![],
208        psubscriptions: vec![],
209        redis_connection: None,
210    }
211}