rustis/client/mod.rs
1/*!
2Defines types related to the clients structs and their dependencies:
3[`Client`], [`PooledClientManager`], [`Pipeline`], [`Transaction`] and how to configure them
4
5# Clients
6
7The central object in **rustis** is the [`Client`].
8
9It will allow you to connect to the Redis server, to send command requests
10and to receive command responses and push messages.
11
12The [`Client`] struct can be used in 3 different modes
13* As a single client
14* As a mutiplexer
15* In a pool of clients
16
17## The single client
18The single [`Client`] maintains a unique connection to a Redis Server or cluster.
19
20This use case of the client is not meant to be used directly in a Web application, where multiple HTTP connections access
21the Redis server at the same time in a multi-threaded architecture (like [Actix](https://actix.rs/) or [Rocket](https://rocket.rs/)).
22
23It could be used in tools where the load is minimal.
24
25```
26use rustis::{
27 client::Client,
28 commands::{FlushingMode, ServerCommands, StringCommands},
29 Result,
30};
31
32#[cfg_attr(feature = "tokio-runtime", tokio::main)]
33#[cfg_attr(feature = "async-std-runtime", async_std::main)]
34async fn main() -> Result<()> {
35 let client = Client::connect("127.0.0.1:6379").await?;
36 client.flushdb(FlushingMode::Sync).await?;
37
38 client.set("key", "value").await?;
39 let value: String = client.get("key").await?;
40 println!("value: {value:?}");
41
42 Ok(())
43}
44```
45
46## The multiplexer
47A [`Client`] instance can be cloned, allowing requests
48to be sent concurrently on the same underlying connection.
49
50The multiplexer mode is great because it offers much performance in a multi-threaded architecture, with only a single
51underlying connection. It should be the prefered mode for Web applications.
52
53### Limitations
54Beware that using [`Client`] in a multiplexer mode, by cloning an instance across multiple threads,
55is not suitable for using [blocking commands](crate::commands::BlockingCommands)
56because they monopolize the whole connection which cannot be shared anymore.
57
58Moreover using the [`watch`](crate::commands::TransactionCommands::watch) command is not compatible
59with the multiplexer mode is either. Indeed, it's the shared connection that will be watched, not only
60the [`Client`] instance through which the [`watch`](crate::commands::TransactionCommands::watch) command is sent.
61
62### Managing multiplexed subscriptions
63
64Even if the [`subscribe`][crate::commands::PubSubCommands::subscribe] monopolize the whole connection,
65it is still possible to use it in a multiplexed [`Client`].
66
67Indeed the subscribing mode of Redis still allows to share the connection between multiple clients,
68at the only condition that this connection is dedicated to subscriptions.
69
70In a Web application that requires subscriptions and regualar commands, the prefered solution
71would be to connect two multiplexed clients to the Redis server:
72* 1 for the subscriptions
73* 1 for the regular commands
74
75### See also
76[Multiplexing Explained](https://redis.com/blog/multiplexing-explained/)
77
78### Example
79```
80use rustis::{
81 client::{Client, IntoConfig},
82 commands::{FlushingMode, PubSubCommands, ServerCommands, StringCommands},
83 Result
84};
85
86#[cfg_attr(feature = "tokio-runtime", tokio::main)]
87#[cfg_attr(feature = "async-std-runtime", async_std::main)]
88async fn main() -> Result<()> {
89 let config = "127.0.0.1:6379".into_config()?;
90 let regular_client1 = Client::connect(config.clone()).await?;
91 let pub_sub_client = Client::connect(config).await?;
92
93 regular_client1.flushdb(FlushingMode::Sync).await?;
94
95 regular_client1.set("key", "value").await?;
96 let value: String = regular_client1.get("key").await?;
97 println!("value: {value:?}");
98
99 // clone a second instance on the same underlying connection
100 let regular_client2 = regular_client1.clone();
101 let value: String = regular_client2.get("key").await?;
102 println!("value: {value:?}");
103
104 // use 2nd connection to manager subscriptions
105 let pub_sub_stream = pub_sub_client.subscribe("my_channel").await?;
106 pub_sub_stream.close().await?;
107
108 Ok(())
109}
110```
111
112## The pooled client manager
113The pooled client manager holds a pool of [`Client`]s, based on [bb8](https://docs.rs/bb8/latest/bb8/).
114
115Each time a new command must be sent to the Redis Server, a client will be borrowed temporarily to the manager
116and automatically given back to it at the end of the operation.
117
118It is an alternative to multiplexing, for managing **rustis** within a Web application.
119
120The manager can be configured via [bb8](https://docs.rs/bb8/latest/bb8/) with a various of options like maximum size, maximum lifetime, etc.
121
122For you convenience, [bb8](https://docs.rs/bb8/latest/bb8/) is reexported from the **rustis** crate.
123
124```
125#[cfg(feature = "pool")]
126use rustis::{
127 client::PooledClientManager, commands::StringCommands,
128};
129use rustis::Result;
130
131#[cfg_attr(feature = "tokio-runtime", tokio::main)]
132#[cfg_attr(feature = "async-std-runtime", async_std::main)]
133async fn main() -> Result<()> {
134 #[cfg(feature = "pool")] {
135 let manager = PooledClientManager::new("127.0.0.1:6379")?;
136 let pool = rustis::bb8::Pool::builder()
137 .max_size(10)
138 .build(manager).await?;
139
140 let client1 = pool.get().await.unwrap();
141 client1.set("key1", "value1").await?;
142 let value: String = client1.get("key1").await?;
143 println!("value: {value:?}");
144
145 let client2 = pool.get().await.unwrap();
146 client2.set("key2", "value2").await?;
147 let value: String = client2.get("key2").await?;
148 println!("value: {value:?}");
149 }
150
151 Ok(())
152}
153```
154
155# Configuration
156
157A [`Client`] instance can be configured with the [`Config`] struct:
158* Authentication
159* [`TlsConfig`]
160* [`ServerConfig`] (Standalone, Sentinel or Cluster)
161
162[`IntoConfig`] is a convenient trait to convert more known types to a [`Config`] instance:
163* &[`str`](https://doc.rust-lang.org/std/primitive.str.html): host and port separated by a colon
164* `(impl Into<String>, u16)`: a pair of host and port
165* [`String`](https://doc.rust-lang.org/alloc/string/struct.String.html): host and port separated by a colon
166* [`Url`](https://docs.rs/url/latest/url/struct.Url.html): see Url syntax below.
167
168## Url Syntax
169
170The **rustis** [`Config`] can also be built from an URL
171
172### Standalone
173
174```text
175redis|rediss://[[<username>]:<password>@]<host>[:<port>][/<database>]
176```
177
178### Cluster
179
180```text
181redis|rediss[+cluster]://[[<username>]:<password>@]<host1>[:<port1>][,<host2>:[<port2>][,<hostN>:[<portN>]]]
182```
183
184### Sentinel
185
186```text
187redis|rediss[+sentinel]://[[<username>]:<password>@]<host>[:<port>]/<service>[/<database>]
188 [?wait_between_failures=<250>[&sentinel_username=<username>][&sentinel_password=<password>]]
189```
190
191`service` is the required name of the sentinel service
192
193### Schemes
194The URL scheme is used to detect the server type:
195* `redis://` - Non secure TCP connection to a standalone Redis server
196* `rediss://` - Secure (TSL) TCP connection to a standalone Redis server
197* `redis+sentinel://` or `redis-sentinel://` - Non secure TCP connection to a Redis sentinel network
198* `rediss+sentinel://` or `rediss-sentinel://` - Secure (TSL) TCP connection to a Redis sentinel network
199* `redis+cluster://` or `redis-cluster://` - Non secure TCP connection to a Redis cluster
200* `rediss+cluster://` or `rediss-cluster://` - Secure (TSL) TCP connection to a Redis cluster
201
202### QueryParameters
203Query parameters match perfectly optional configuration fields
204of the struct [`Config`] or its dependencies:
205* [`connect_timeout`](Config::connect_timeout) - The time to attempt a connection before timing out (default `10,000` ms).
206* [`command_timeout`](Config::command_timeout) - If a command does not return a reply within a set number of milliseconds,
207 a timeout error will be thrown. If set to 0, no timeout is apply (default `0`).
208* [`auto_resubscribe`](Config::auto_resubscribe) - When the client reconnects, channels subscribed in the previous connection will be
209 resubscribed automatically if `auto_resubscribe` is `true` (default `true`).
210* [`auto_remonitor`](Config::auto_remonitor) - When the client reconnects, if in `monitor` mode, the
211 [`monitor`](crate::commands::BlockingCommands::monitor) command will be resent automatically
212* [`connection_name`](Config::connection_name) - Set the name of the connection to make
213 it easier to identity the connection in client list.
214* [`keep_alive`](Config::keep_alive) - Enable/disable keep-alive functionality (default `None`)
215* [`no_delay`](Config::no_delay) - Enable/disable the use of Nagle's algorithm (default `true`)
216* [`retry_on_error`](Config::retry_on_error) - Defines the default strategy for retries on network error (default `false`).
217* [`reconnection`](Config::reconnection) - Reconnection policy configuration: Constant, Linear or Exponential (default `Constant`)
218* [`wait_between_failures`](SentinelConfig::wait_between_failures) - (Sentinel only) Waiting time after
219 failing before connecting to the next Sentinel instance (default `250` ms).
220* [`sentinel_username`](SentinelConfig::username) - (Sentinel only) Sentinel username
221* [`sentinel_password`](SentinelConfig::password) - (Sentinel only) Sentinel password
222
223### Example
224
225```
226use rustis::{client::Client, resp::cmd, Result};
227
228#[cfg_attr(feature = "tokio-runtime", tokio::main)]
229#[cfg_attr(feature = "async-std-runtime", async_std::main)]
230async fn main() -> Result<()> {
231 // standalone, host=localhost, port=6379 (default), database=1
232 let client = Client::connect("redis://localhost/1").await?;
233
234 Ok(())
235}
236```
237
238# Pipelining
239
240One of the most performant Redis feature is [pipelining](https://redis.io/docs/manual/pipelining/).
241This allow to optimize round-trip times by batching Redis commands.
242
243### API description
244
245You can create a pipeline on a [`Client`] instance by calling the associated fonction [`create_pipeline`](Client::create_pipeline).
246Be sure to store the pipeline instance in a mutable variable because a pipeline requires an exclusive access.
247
248Once the pipeline is created, you can use exactly the same commands that you would directly use on a client instance.
249This is possible because the [`Pipeline`] implements all the built-in [command traits](crate::commands).
250
251The main difference, is that you have to choose for each command:
252* to [`queue`](BatchPreparedCommand::queue) it, meaning that the [`Pipeline`] instance will queue the command in an internal
253 queue to be able to send later the batch of commands to the Redis server.
254* to [`forget`](BatchPreparedCommand::forget) it, meaning that the command will be queued as well **BUT** its response won't be awaited
255 by the [`Pipeline`] instance
256
257Finally, call the [`execute`](Pipeline::execute) associated function.
258
259It is the caller responsability to use the right type to cast the server response
260to the right tuple or collection depending on which command has been
261[queued](BatchPreparedCommand::queue) or [forgotten](BatchPreparedCommand::forget).
262
263The most generic type that can be requested as a result is `Vec<resp::Value>`
264
265### Example
266```
267use rustis::{
268 client::{Client, Pipeline, BatchPreparedCommand},
269 commands::StringCommands,
270 resp::{cmd, Value}, Result,
271};
272
273#[cfg_attr(feature = "tokio-runtime", tokio::main)]
274#[cfg_attr(feature = "async-std-runtime", async_std::main)]
275async fn main() -> Result<()> {
276 let client = Client::connect("127.0.0.1:6379").await?;
277
278 let mut pipeline = client.create_pipeline();
279 pipeline.set("key1", "value1").forget();
280 pipeline.set("key2", "value2").forget();
281 pipeline.get::<_, ()>("key1").queue();
282 pipeline.get::<_, ()>("key2").queue();
283
284 let (value1, value2): (String, String) = pipeline.execute().await?;
285 assert_eq!("value1", value1);
286 assert_eq!("value2", value2);
287
288 Ok(())
289}
290```
291
292# Transactions
293[Redis Transactions](https://redis.io/docs/manual/transactions/) allow the execution of a group of commands in a single step.
294
295All the commands in a transaction are serialized and executed sequentially.
296A request sent by another client will never be served in the middle of the execution of a Redis Transaction.
297This guarantees that the commands are executed as a single isolated operation.
298
299### API description
300
301You can create a transaction on a client instance by calling the associated fonction [`create_transaction`](Client::create_transaction).
302Be sure to store the transaction instance in a mutable variable because a transaction requires an exclusive access.
303
304Once the transaction is created, you can use exactly the same commands that you would directly use on a client instance.
305This is possible because the [`Transaction`] implements all the built-in [command traits](crate::commands).
306
307The main difference, is that you have to choose for each command:
308* to [`queue`](BatchPreparedCommand::queue) it, meaning that the [`Transaction`] instance will queue the command in an internal
309 queue to be able to send later the batch of commands to the Redis server.
310* to [`forget`](BatchPreparedCommand::forget) it, meaning that the command will be queued as well **BUT** its response won't be awaited
311 by the [`Transaction`] instance.
312
313Finally, call the [`execute`](Transaction::execute) associated function.
314
315It is the caller responsability to use the right type to cast the server response
316to the right tuple or collection depending on which command has been
317[queued](BatchPreparedCommand::queue) or [forgotten](BatchPreparedCommand::forget).
318
319The most generic type that can be requested as a result is `Vec<(resp::Value)>`
320
321### Example
322```
323use rustis::{
324 client::{Client, Transaction, BatchPreparedCommand},
325 commands::StringCommands,
326 resp::{cmd, Value}, Result,
327};
328
329#[cfg_attr(feature = "tokio-runtime", tokio::main)]
330#[cfg_attr(feature = "async-std-runtime", async_std::main)]
331async fn main() -> Result<()> {
332 let client = Client::connect("127.0.0.1:6379").await?;
333
334 let mut transaction = client.create_transaction();
335
336 transaction.set("key1", "value1").forget();
337 transaction.set("key2", "value2").forget();
338 transaction.get::<_, ()>("key1").queue();
339 let value: String = transaction.execute().await?;
340
341 assert_eq!("value1", value);
342
343 Ok(())
344}
345```
346
347# Pub/Sub
348
349[`Pub/Sub`](https://redis.io/docs/manual/pubsub/) is a Redis architecture were senders can publish messages into channels
350and subscribers can subscribe by channel names or patterns to receive messages.
351
352### Publishing
353
354To publish a message, you can call the [`publish`](crate::commands::PubSubCommands::publish)
355associated function on its dedicated trait.
356
357It also possible to use the sharded flavor of the publish function: [`spublish`](crate::commands::PubSubCommands::spublish).
358
359### Subscribing
360
361Subscribing will block the current client connection, in order to let the client wait for incoming messages.
362Consequently, **rustis** implements subsribing through an async [`Stream`](https://docs.rs/futures/latest/futures/stream/trait.Stream.html).
363
364You can create a [`PubSubStream`] by calling [`subscribe`](crate::commands::PubSubCommands::subscribe),
365[`psubscribe`](crate::commands::PubSubCommands::psubscribe), or [`ssubscribe`](crate::commands::PubSubCommands::ssubscribe).
366
367Then by calling [`next`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.next) on the pub/sub stream, you can
368wait for an incoming message in the form of the struct [`PubSubMessage`].
369
370You can also create a [`PubSubStream`] without an upfront subscription by calling [`create_pub_sub`](crate::client::Client::create_pub_sub).
371
372### Warning!
373
374Multiplexed [`Client`] instances must be dedicated to Pub/Sub once a subscribing function has been called.
375Because subscription blocks the multiplexed client shared connection other callers would be blocked when sending regular commands.
376
377### Simple Example
378
379```
380use rustis::{
381 client::{Client, ClientPreparedCommand},
382 commands::{FlushingMode, PubSubCommands, ServerCommands},
383 resp::{cmd, Value}, Result,
384};
385use futures_util::StreamExt;
386
387#[cfg_attr(feature = "tokio-runtime", tokio::main)]
388#[cfg_attr(feature = "async-std-runtime", async_std::main)]
389async fn main() -> Result<()> {
390 let subscribing_client = Client::connect("127.0.0.1:6379").await?;
391 let regular_client = Client::connect("127.0.0.1:6379").await?;
392
393 regular_client.flushdb(FlushingMode::Sync).await?;
394
395 // Create a subscription from the subscribing client:
396 let mut pub_sub_stream = subscribing_client.subscribe("mychannel").await?;
397
398 // The regular client publishes a message on the channel:
399 regular_client.publish("mychannel", "mymessage").await?;
400
401 // Let's now iterate over messages received:
402 while let Some(Ok(message)) = pub_sub_stream.next().await {
403 assert_eq!(b"mychannel".to_vec(), message.channel);
404 assert_eq!(b"mymessage".to_vec(), message.payload);
405 break;
406 }
407
408 Ok(())
409}
410```
411
412Once the stream has been created, it is still possible to add additional subscriptions
413by calling [`subscribe`](PubSubStream::subscribe), [`psubscribe`](PubSubStream::psubscribe)
414or [`ssubscribe`](PubSubStream::ssubscribe) on the [`PubSubStream`] instance.
415
416### Split Stream Example
417
418To make it easy to modify subscriptions while iterating over messages, you can use the [`split`](PubSubStream::split) method to
419split the stream into [sink](PubSubSplitSink) and [stream](PubSubSplitStream) parts. Once this is done, you call [`subscribe`](PubSubSplitSink::subscribe)
420or [`unsubscribe`](PubSubSplitSink::unsubscribe) (and related methods) on the sink while the split stream is used only for iteration. This can be useful
421when you want to split ownership between async tasks.
422
423```
424use rustis::{
425 client::{Client, ClientPreparedCommand},
426 commands::{FlushingMode, PubSubCommands, ServerCommands},
427 resp::{cmd, Value}, Result,
428};
429use futures_util::StreamExt;
430
431#[cfg_attr(feature = "tokio-runtime", tokio::main)]
432#[cfg_attr(feature = "async-std-runtime", async_std::main)]
433async fn main() -> Result<()> {
434 let subscribing_client = Client::connect("127.0.0.1:6379").await?;
435 let regular_client = Client::connect("127.0.0.1:6379").await?;
436
437 regular_client.flushdb(FlushingMode::Sync).await?;
438
439 // This time we will split the stream into sink and stream parts:
440 let (mut sink, mut stream) = subscribing_client.subscribe("mychannel").await?.split();
441
442 // You can then subscribe or unsubscribe using the sink.
443 // Typically you would pass ownership of the sink to another async task.
444 sink.subscribe("otherchannel").await?;
445 sink.psubscribe("o*").await?;
446
447 regular_client.publish("mychannel", "mymessage").await?;
448
449 // Iterate over messages using the split stream:
450 while let Some(Ok(message)) = stream.next().await {
451 assert_eq!(b"mychannel".to_vec(), message.channel);
452 assert_eq!(b"mymessage".to_vec(), message.payload);
453 break;
454 }
455
456 Ok(())
457}
458```
459*/
460
461#[allow(clippy::module_inception)]
462mod client;
463mod client_state;
464mod client_tracking_invalidation_stream;
465mod config;
466mod message;
467mod monitor_stream;
468mod pipeline;
469#[cfg_attr(docsrs, doc(cfg(feature = "pool")))]
470#[cfg(feature = "pool")]
471mod pooled_client_manager;
472mod prepared_command;
473mod pub_sub_stream;
474mod transaction;
475
476pub use client::*;
477pub use client_state::*;
478pub(crate) use client_tracking_invalidation_stream::*;
479pub use config::*;
480pub(crate) use message::*;
481pub use monitor_stream::*;
482pub use pipeline::*;
483#[cfg_attr(docsrs, doc(cfg(feature = "pool")))]
484#[cfg(feature = "pool")]
485pub use pooled_client_manager::*;
486pub use prepared_command::*;
487pub use pub_sub_stream::*;
488pub use transaction::*;