mini_redis/blocking_client.rs
1//! Minimal blocking Redis client implementation
2//!
3//! Provides a blocking connect and methods for issuing the supported commands.
4
5use bytes::Bytes;
6use std::time::Duration;
7use tokio::net::ToSocketAddrs;
8use tokio::runtime::Runtime;
9
10pub use crate::client::Message;
11
12/// Established connection with a Redis server.
13///
14/// Backed by a single `TcpStream`, `BlockingClient` provides basic network
15/// client functionality (no pooling, retrying, ...). Connections are
16/// established using the [`connect`](fn@connect) function.
17///
18/// Requests are issued using the various methods of `Client`.
19pub struct BlockingClient {
20 /// The asynchronous `Client`.
21 inner: crate::client::Client,
22
23 /// A `current_thread` runtime for executing operations on the asynchronous
24 /// client in a blocking manner.
25 rt: Runtime,
26}
27
28/// A client that has entered pub/sub mode.
29///
30/// Once clients subscribe to a channel, they may only perform pub/sub related
31/// commands. The `BlockingClient` type is transitioned to a
32/// `BlockingSubscriber` type in order to prevent non-pub/sub methods from being
33/// called.
34pub struct BlockingSubscriber {
35 /// The asynchronous `Subscriber`.
36 inner: crate::client::Subscriber,
37
38 /// A `current_thread` runtime for executing operations on the asynchronous
39 /// `Subscriber` in a blocking manner.
40 rt: Runtime,
41}
42
43/// The iterator returned by `Subscriber::into_iter`.
44struct SubscriberIterator {
45 /// The asynchronous `Subscriber`.
46 inner: crate::client::Subscriber,
47
48 /// A `current_thread` runtime for executing operations on the asynchronous
49 /// `Subscriber` in a blocking manner.
50 rt: Runtime,
51}
52
53/// Establish a connection with the Redis server located at `addr`.
54///
55/// `addr` may be any type that can be asynchronously converted to a
56/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs`
57/// trait is the Tokio version and not the `std` version.
58///
59/// # Examples
60///
61/// ```no_run
62/// use mini_redis::blocking_client;
63///
64/// fn main() {
65/// let client = match blocking_client::connect("localhost:6379") {
66/// Ok(client) => client,
67/// Err(_) => panic!("failed to establish connection"),
68/// };
69/// # drop(client);
70/// }
71/// ```
72pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
73 let rt = tokio::runtime::Builder::new_current_thread()
74 .enable_all()
75 .build()?;
76
77 let inner = rt.block_on(crate::client::connect(addr))?;
78
79 Ok(BlockingClient { inner, rt })
80}
81
82impl BlockingClient {
83 /// Get the value of key.
84 ///
85 /// If the key does not exist the special value `None` is returned.
86 ///
87 /// # Examples
88 ///
89 /// Demonstrates basic usage.
90 ///
91 /// ```no_run
92 /// use mini_redis::blocking_client;
93 ///
94 /// fn main() {
95 /// let mut client = blocking_client::connect("localhost:6379").unwrap();
96 ///
97 /// let val = client.get("foo").unwrap();
98 /// println!("Got = {:?}", val);
99 /// }
100 /// ```
101 pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> {
102 self.rt.block_on(self.inner.get(key))
103 }
104
105 /// Set `key` to hold the given `value`.
106 ///
107 /// The `value` is associated with `key` until it is overwritten by the next
108 /// call to `set` or it is removed.
109 ///
110 /// If key already holds a value, it is overwritten. Any previous time to
111 /// live associated with the key is discarded on successful SET operation.
112 ///
113 /// # Examples
114 ///
115 /// Demonstrates basic usage.
116 ///
117 /// ```no_run
118 /// use mini_redis::blocking_client;
119 ///
120 /// fn main() {
121 /// let mut client = blocking_client::connect("localhost:6379").unwrap();
122 ///
123 /// client.set("foo", "bar".into()).unwrap();
124 ///
125 /// // Getting the value immediately works
126 /// let val = client.get("foo").unwrap().unwrap();
127 /// assert_eq!(val, "bar");
128 /// }
129 /// ```
130 pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> {
131 self.rt.block_on(self.inner.set(key, value))
132 }
133
134 /// Set `key` to hold the given `value`. The value expires after `expiration`
135 ///
136 /// The `value` is associated with `key` until one of the following:
137 /// - it expires.
138 /// - it is overwritten by the next call to `set`.
139 /// - it is removed.
140 ///
141 /// If key already holds a value, it is overwritten. Any previous time to
142 /// live associated with the key is discarded on a successful SET operation.
143 ///
144 /// # Examples
145 ///
146 /// Demonstrates basic usage. This example is not **guaranteed** to always
147 /// work as it relies on time based logic and assumes the client and server
148 /// stay relatively synchronized in time. The real world tends to not be so
149 /// favorable.
150 ///
151 /// ```no_run
152 /// use mini_redis::blocking_client;
153 /// use std::thread;
154 /// use std::time::Duration;
155 ///
156 /// fn main() {
157 /// let ttl = Duration::from_millis(500);
158 /// let mut client = blocking_client::connect("localhost:6379").unwrap();
159 ///
160 /// client.set_expires("foo", "bar".into(), ttl).unwrap();
161 ///
162 /// // Getting the value immediately works
163 /// let val = client.get("foo").unwrap().unwrap();
164 /// assert_eq!(val, "bar");
165 ///
166 /// // Wait for the TTL to expire
167 /// thread::sleep(ttl);
168 ///
169 /// let val = client.get("foo").unwrap();
170 /// assert!(val.is_some());
171 /// }
172 /// ```
173 pub fn set_expires(
174 &mut self,
175 key: &str,
176 value: Bytes,
177 expiration: Duration,
178 ) -> crate::Result<()> {
179 self.rt
180 .block_on(self.inner.set_expires(key, value, expiration))
181 }
182
183 /// Posts `message` to the given `channel`.
184 ///
185 /// Returns the number of subscribers currently listening on the channel.
186 /// There is no guarantee that these subscribers receive the message as they
187 /// may disconnect at any time.
188 ///
189 /// # Examples
190 ///
191 /// Demonstrates basic usage.
192 ///
193 /// ```no_run
194 /// use mini_redis::blocking_client;
195 ///
196 /// fn main() {
197 /// let mut client = blocking_client::connect("localhost:6379").unwrap();
198 ///
199 /// let val = client.publish("foo", "bar".into()).unwrap();
200 /// println!("Got = {:?}", val);
201 /// }
202 /// ```
203 pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
204 self.rt.block_on(self.inner.publish(channel, message))
205 }
206
207 /// Subscribes the client to the specified channels.
208 ///
209 /// Once a client issues a subscribe command, it may no longer issue any
210 /// non-pub/sub commands. The function consumes `self` and returns a
211 /// `BlockingSubscriber`.
212 ///
213 /// The `BlockingSubscriber` value is used to receive messages as well as
214 /// manage the list of channels the client is subscribed to.
215 pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> {
216 let subscriber = self.rt.block_on(self.inner.subscribe(channels))?;
217 Ok(BlockingSubscriber {
218 inner: subscriber,
219 rt: self.rt,
220 })
221 }
222}
223
224impl BlockingSubscriber {
225 /// Returns the set of channels currently subscribed to.
226 pub fn get_subscribed(&self) -> &[String] {
227 self.inner.get_subscribed()
228 }
229
230 /// Receive the next message published on a subscribed channel, waiting if
231 /// necessary.
232 ///
233 /// `None` indicates the subscription has been terminated.
234 pub fn next_message(&mut self) -> crate::Result<Option<Message>> {
235 self.rt.block_on(self.inner.next_message())
236 }
237
238 /// Convert the subscriber into an `Iterator` yielding new messages published
239 /// on subscribed channels.
240 pub fn into_iter(self) -> impl Iterator<Item = crate::Result<Message>> {
241 SubscriberIterator {
242 inner: self.inner,
243 rt: self.rt,
244 }
245 }
246
247 /// Subscribe to a list of new channels
248 pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> {
249 self.rt.block_on(self.inner.subscribe(channels))
250 }
251
252 /// Unsubscribe to a list of new channels
253 pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> {
254 self.rt.block_on(self.inner.unsubscribe(channels))
255 }
256}
257
258impl Iterator for SubscriberIterator {
259 type Item = crate::Result<Message>;
260
261 fn next(&mut self) -> Option<crate::Result<Message>> {
262 self.rt.block_on(self.inner.next_message()).transpose()
263 }
264}