mini_redis/
blocking_client.rs

1//! Minimal blocking Redis client implementation
2//!
3//! Provides a blocking connect and methods for issuing the supported commands.
4
5use bytes::Bytes;
6use std::time::Duration;
7use tokio::net::ToSocketAddrs;
8use tokio::runtime::Runtime;
9
10pub use crate::client::Message;
11
12/// Established connection with a Redis server.
13///
14/// Backed by a single `TcpStream`, `BlockingClient` provides basic network
15/// client functionality (no pooling, retrying, ...). Connections are
16/// established using the [`connect`](fn@connect) function.
17///
18/// Requests are issued using the various methods of `Client`.
19pub struct BlockingClient {
20    /// The asynchronous `Client`.
21    inner: crate::client::Client,
22
23    /// A `current_thread` runtime for executing operations on the asynchronous
24    /// client in a blocking manner.
25    rt: Runtime,
26}
27
28/// A client that has entered pub/sub mode.
29///
30/// Once clients subscribe to a channel, they may only perform pub/sub related
31/// commands. The `BlockingClient` type is transitioned to a
32/// `BlockingSubscriber` type in order to prevent non-pub/sub methods from being
33/// called.
34pub struct BlockingSubscriber {
35    /// The asynchronous `Subscriber`.
36    inner: crate::client::Subscriber,
37
38    /// A `current_thread` runtime for executing operations on the asynchronous
39    /// `Subscriber` in a blocking manner.
40    rt: Runtime,
41}
42
43/// The iterator returned by `Subscriber::into_iter`.
44struct SubscriberIterator {
45    /// The asynchronous `Subscriber`.
46    inner: crate::client::Subscriber,
47
48    /// A `current_thread` runtime for executing operations on the asynchronous
49    /// `Subscriber` in a blocking manner.
50    rt: Runtime,
51}
52
53/// Establish a connection with the Redis server located at `addr`.
54///
55/// `addr` may be any type that can be asynchronously converted to a
56/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs`
57/// trait is the Tokio version and not the `std` version.
58///
59/// # Examples
60///
61/// ```no_run
62/// use mini_redis::blocking_client;
63///
64/// fn main() {
65///     let client = match blocking_client::connect("localhost:6379") {
66///         Ok(client) => client,
67///         Err(_) => panic!("failed to establish connection"),
68///     };
69/// # drop(client);
70/// }
71/// ```
72pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
73    let rt = tokio::runtime::Builder::new_current_thread()
74        .enable_all()
75        .build()?;
76
77    let inner = rt.block_on(crate::client::connect(addr))?;
78
79    Ok(BlockingClient { inner, rt })
80}
81
82impl BlockingClient {
83    /// Get the value of key.
84    ///
85    /// If the key does not exist the special value `None` is returned.
86    ///
87    /// # Examples
88    ///
89    /// Demonstrates basic usage.
90    ///
91    /// ```no_run
92    /// use mini_redis::blocking_client;
93    ///
94    /// fn main() {
95    ///     let mut client = blocking_client::connect("localhost:6379").unwrap();
96    ///
97    ///     let val = client.get("foo").unwrap();
98    ///     println!("Got = {:?}", val);
99    /// }
100    /// ```
101    pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> {
102        self.rt.block_on(self.inner.get(key))
103    }
104
105    /// Set `key` to hold the given `value`.
106    ///
107    /// The `value` is associated with `key` until it is overwritten by the next
108    /// call to `set` or it is removed.
109    ///
110    /// If key already holds a value, it is overwritten. Any previous time to
111    /// live associated with the key is discarded on successful SET operation.
112    ///
113    /// # Examples
114    ///
115    /// Demonstrates basic usage.
116    ///
117    /// ```no_run
118    /// use mini_redis::blocking_client;
119    ///
120    /// fn main() {
121    ///     let mut client = blocking_client::connect("localhost:6379").unwrap();
122    ///
123    ///     client.set("foo", "bar".into()).unwrap();
124    ///
125    ///     // Getting the value immediately works
126    ///     let val = client.get("foo").unwrap().unwrap();
127    ///     assert_eq!(val, "bar");
128    /// }
129    /// ```
130    pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> {
131        self.rt.block_on(self.inner.set(key, value))
132    }
133
134    /// Set `key` to hold the given `value`. The value expires after `expiration`
135    ///
136    /// The `value` is associated with `key` until one of the following:
137    /// - it expires.
138    /// - it is overwritten by the next call to `set`.
139    /// - it is removed.
140    ///
141    /// If key already holds a value, it is overwritten. Any previous time to
142    /// live associated with the key is discarded on a successful SET operation.
143    ///
144    /// # Examples
145    ///
146    /// Demonstrates basic usage. This example is not **guaranteed** to always
147    /// work as it relies on time based logic and assumes the client and server
148    /// stay relatively synchronized in time. The real world tends to not be so
149    /// favorable.
150    ///
151    /// ```no_run
152    /// use mini_redis::blocking_client;
153    /// use std::thread;
154    /// use std::time::Duration;
155    ///
156    /// fn main() {
157    ///     let ttl = Duration::from_millis(500);
158    ///     let mut client = blocking_client::connect("localhost:6379").unwrap();
159    ///
160    ///     client.set_expires("foo", "bar".into(), ttl).unwrap();
161    ///
162    ///     // Getting the value immediately works
163    ///     let val = client.get("foo").unwrap().unwrap();
164    ///     assert_eq!(val, "bar");
165    ///
166    ///     // Wait for the TTL to expire
167    ///     thread::sleep(ttl);
168    ///
169    ///     let val = client.get("foo").unwrap();
170    ///     assert!(val.is_some());
171    /// }
172    /// ```
173    pub fn set_expires(
174        &mut self,
175        key: &str,
176        value: Bytes,
177        expiration: Duration,
178    ) -> crate::Result<()> {
179        self.rt
180            .block_on(self.inner.set_expires(key, value, expiration))
181    }
182
183    /// Posts `message` to the given `channel`.
184    ///
185    /// Returns the number of subscribers currently listening on the channel.
186    /// There is no guarantee that these subscribers receive the message as they
187    /// may disconnect at any time.
188    ///
189    /// # Examples
190    ///
191    /// Demonstrates basic usage.
192    ///
193    /// ```no_run
194    /// use mini_redis::blocking_client;
195    ///
196    /// fn main() {
197    ///     let mut client = blocking_client::connect("localhost:6379").unwrap();
198    ///
199    ///     let val = client.publish("foo", "bar".into()).unwrap();
200    ///     println!("Got = {:?}", val);
201    /// }
202    /// ```
203    pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
204        self.rt.block_on(self.inner.publish(channel, message))
205    }
206
207    /// Subscribes the client to the specified channels.
208    ///
209    /// Once a client issues a subscribe command, it may no longer issue any
210    /// non-pub/sub commands. The function consumes `self` and returns a
211    /// `BlockingSubscriber`.
212    ///
213    /// The `BlockingSubscriber` value is used to receive messages as well as
214    /// manage the list of channels the client is subscribed to.
215    pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> {
216        let subscriber = self.rt.block_on(self.inner.subscribe(channels))?;
217        Ok(BlockingSubscriber {
218            inner: subscriber,
219            rt: self.rt,
220        })
221    }
222}
223
224impl BlockingSubscriber {
225    /// Returns the set of channels currently subscribed to.
226    pub fn get_subscribed(&self) -> &[String] {
227        self.inner.get_subscribed()
228    }
229
230    /// Receive the next message published on a subscribed channel, waiting if
231    /// necessary.
232    ///
233    /// `None` indicates the subscription has been terminated.
234    pub fn next_message(&mut self) -> crate::Result<Option<Message>> {
235        self.rt.block_on(self.inner.next_message())
236    }
237
238    /// Convert the subscriber into an `Iterator` yielding new messages published
239    /// on subscribed channels.
240    pub fn into_iter(self) -> impl Iterator<Item = crate::Result<Message>> {
241        SubscriberIterator {
242            inner: self.inner,
243            rt: self.rt,
244        }
245    }
246
247    /// Subscribe to a list of new channels
248    pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> {
249        self.rt.block_on(self.inner.subscribe(channels))
250    }
251
252    /// Unsubscribe to a list of new channels
253    pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> {
254        self.rt.block_on(self.inner.unsubscribe(channels))
255    }
256}
257
258impl Iterator for SubscriberIterator {
259    type Item = crate::Result<Message>;
260
261    fn next(&mut self) -> Option<crate::Result<Message>> {
262        self.rt.block_on(self.inner.next_message()).transpose()
263    }
264}