1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
/*!
Defines types related to the clients structs and their dependencies:
[`Client`], [`PooledClientManager`], [`Pipeline`], [`Transaction`] and how to configure them
# Clients
The central object in **rustis** is the [`Client`].
It will allow you to connect to the Redis server, to send command requests
and to receive command responses and push messages.
The [`Client`] struct can be used in 3 different modes
* As a single client
* As a mutiplexer
* In a pool of clients
## The single client
The single [`Client`] maintains a unique connection to a Redis Server or cluster.
This use case of the client is not meant to be used directly in a Web application, where multiple HTTP connections access
the Redis server at the same time in a multi-threaded architecture (like [Actix](https://actix.rs/) or [Rocket](https://rocket.rs/)).
It could be used in tools where the load is minimal.
```
use rustis::{
client::Client,
commands::{FlushingMode, ServerCommands, StringCommands},
Result,
};
#[cfg_attr(feature = "tokio-runtime", tokio::main)]
#[cfg_attr(feature = "async-std-runtime", async_std::main)]
async fn main() -> Result<()> {
let client = Client::connect("127.0.0.1:6379").await?;
client.flushdb(FlushingMode::Sync).await?;
client.set("key", "value").await?;
let value: String = client.get("key").await?;
println!("value: {value:?}");
Ok(())
}
```
## The multiplexer
A [`Client`] instance can be cloned, allowing multiple requests
to be sent concurrently on the same underlying connection.
Multiplexer mode is highly efficient in multi-threaded architectures because it uses only a single
underlying connection. It is the prefered mode for most Web applications.
### Managing Multiplexed Subscriptions
Because **rustis** implements the RESP3 protocol, there is no limitation when using subscriptions on a multiplexed connection.
Pub/Sub messages and regular command responses are cleanly distinguished at the protocol level,
allowing both to coexist safely on the same shared connection.
### Limitations
Becaware that using a [`Client`] in a multiplexer mode, by cloning an instance across multiple threads,
is not suitable when using [blocking commands](crate::commands::BlockingCommands).
Blocking commands monopolize the entire connection, preventing it from being shared.
In addition, the [`watch`](crate::commands::TransactionCommands::watch) command is not compatible
with multiplexer mode either. This is because the watched state applies to the shared connection itself, not just
to the particular [`Client`] instance that issued the [`watch`](crate::commands::TransactionCommands::watch) command.
## The pooled client manager
The pooled client manager holds a pool of [`Client`]s, based on [bb8](https://docs.rs/bb8/latest/bb8/).
Each time a new command must be sent to the Redis Server, a client will be borrowed temporarily to the manager
and automatically given back to it at the end of the operation.
It is an alternative to multiplexing, for managing **rustis** within a Web application.
The manager can be configured via [bb8](https://docs.rs/bb8/latest/bb8/) with a various of options like maximum size, maximum lifetime, etc.
For you convenience, [bb8](https://docs.rs/bb8/latest/bb8/) is reexported from the **rustis** crate.
```
#[cfg(feature = "pool")]
use rustis::{
client::PooledClientManager, commands::StringCommands,
};
use rustis::Result;
#[cfg_attr(feature = "tokio-runtime", tokio::main)]
#[cfg_attr(feature = "async-std-runtime", async_std::main)]
async fn main() -> Result<()> {
#[cfg(feature = "pool")] {
let manager = PooledClientManager::new("127.0.0.1:6379")?;
let pool = rustis::bb8::Pool::builder()
.max_size(10)
.build(manager).await?;
let client1 = pool.get().await.unwrap();
client1.set("key1", "value1").await?;
let value: String = client1.get("key1").await?;
println!("value: {value:?}");
let client2 = pool.get().await.unwrap();
client2.set("key2", "value2").await?;
let value: String = client2.get("key2").await?;
println!("value: {value:?}");
}
Ok(())
}
```
# Configuration
A [`Client`] instance can be configured with the [`Config`] struct:
* Authentication
* [`TlsConfig`]
* [`ServerConfig`] (Standalone, Sentinel or Cluster)
[`IntoConfig`] is a convenient trait to convert more known types to a [`Config`] instance:
* &[`str`](https://doc.rust-lang.org/std/primitive.str.html): host and port separated by a colon
* `(impl Into<String>, u16)`: a pair of host and port
* [`String`](https://doc.rust-lang.org/alloc/string/struct.String.html): host and port separated by a colon
* [`Url`](https://docs.rs/url/latest/url/struct.Url.html): see Url syntax below.
## Url Syntax
The **rustis** [`Config`] can also be built from an URL
### Standalone
```text
redis|rediss://[[<username>]:<password>@]<host>[:<port>][/<database>]
```
### Cluster
```text
redis|rediss[+cluster]://[[<username>]:<password>@]<host1>[:<port1>][,<host2>:[<port2>][,<hostN>:[<portN>]]]
```
### Sentinel
```text
redis|rediss[+sentinel]://[[<username>]:<password>@]<host>[:<port>]/<service>[/<database>]
[?wait_between_failures=<250>[&sentinel_username=<username>][&sentinel_password=<password>]]
```
`service` is the required name of the sentinel service
### Schemes
The URL scheme is used to detect the server type:
* `redis://` - Non secure TCP connection to a standalone Redis server
* `rediss://` - Secure (TSL) TCP connection to a standalone Redis server
* `redis+sentinel://` or `redis-sentinel://` - Non secure TCP connection to a Redis sentinel network
* `rediss+sentinel://` or `rediss-sentinel://` - Secure (TSL) TCP connection to a Redis sentinel network
* `redis+cluster://` or `redis-cluster://` - Non secure TCP connection to a Redis cluster
* `rediss+cluster://` or `rediss-cluster://` - Secure (TSL) TCP connection to a Redis cluster
### QueryParameters
Query parameters match perfectly optional configuration fields
of the struct [`Config`] or its dependencies:
* [`connect_timeout`](Config::connect_timeout) - The time to attempt a connection before timing out (default `10,000` ms).
* [`command_timeout`](Config::command_timeout) - If a command does not return a reply within a set number of milliseconds,
a timeout error will be thrown. If set to 0, no timeout is apply (default `0`).
* [`auto_resubscribe`](Config::auto_resubscribe) - When the client reconnects, channels subscribed in the previous connection will be
resubscribed automatically if `auto_resubscribe` is `true` (default `true`).
* [`auto_remonitor`](Config::auto_remonitor) - When the client reconnects, if in `monitor` mode, the
[`monitor`](crate::commands::BlockingCommands::monitor) command will be resent automatically
* [`connection_name`](Config::connection_name) - Set the name of the connection to make
it easier to identity the connection in client list.
* [`keep_alive`](Config::keep_alive) - Enable/disable keep-alive functionality (default `None`)
* [`no_delay`](Config::no_delay) - Enable/disable the use of Nagle's algorithm (default `true`)
* [`retry_on_error`](Config::retry_on_error) - Defines the default strategy for retries on network error (default `false`).
* [`reconnection`](Config::reconnection) - Reconnection policy configuration: Constant, Linear or Exponential (default `Constant`)
* [`wait_between_failures`](SentinelConfig::wait_between_failures) - (Sentinel only) Waiting time after
failing before connecting to the next Sentinel instance (default `250` ms).
* [`sentinel_username`](SentinelConfig::username) - (Sentinel only) Sentinel username
* [`sentinel_password`](SentinelConfig::password) - (Sentinel only) Sentinel password
### Example
```
use rustis::{client::Client, resp::cmd, Result};
#[cfg_attr(feature = "tokio-runtime", tokio::main)]
#[cfg_attr(feature = "async-std-runtime", async_std::main)]
async fn main() -> Result<()> {
// standalone, host=localhost, port=6379 (default), database=1
let client = Client::connect("redis://localhost/1").await?;
Ok(())
}
```
# Pipelining
One of the most performant Redis feature is [pipelining](https://redis.io/docs/manual/pipelining/).
This allow to optimize round-trip times by batching Redis commands.
### API description
You can create a pipeline on a [`Client`] instance by calling the associated fonction [`create_pipeline`](Client::create_pipeline).
Be sure to store the pipeline instance in a mutable variable because a pipeline requires an exclusive access.
Once the pipeline is created, you can use exactly the same commands that you would directly use on a client instance.
This is possible because the [`Pipeline`] implements all the built-in [command traits](crate::commands).
The main difference, is that you have to choose for each command:
* to [`queue`](BatchPreparedCommand::queue) it, meaning that the [`Pipeline`] instance will queue the command in an internal
queue to be able to send later the batch of commands to the Redis server.
* to [`forget`](BatchPreparedCommand::forget) it, meaning that the command will be queued as well **BUT** its response won't be awaited
by the [`Pipeline`] instance
Finally, call the [`execute`](Pipeline::execute) associated function.
It is the caller responsability to use the right type to cast the server response
to the right tuple or collection depending on which command has been
[queued](BatchPreparedCommand::queue) or [forgotten](BatchPreparedCommand::forget).
The most generic type that can be requested as a result is `Vec<resp::Value>`
### Example
```
use rustis::{
client::{Client, Pipeline, BatchPreparedCommand},
commands::StringCommands,
resp::{cmd, Value}, Result,
};
#[cfg_attr(feature = "tokio-runtime", tokio::main)]
#[cfg_attr(feature = "async-std-runtime", async_std::main)]
async fn main() -> Result<()> {
let client = Client::connect("127.0.0.1:6379").await?;
let mut pipeline = client.create_pipeline();
pipeline.set("key1", "value1").forget();
pipeline.set("key2", "value2").forget();
pipeline.get::<()>("key1").queue();
pipeline.get::<()>("key2").queue();
let (value1, value2): (String, String) = pipeline.execute().await?;
assert_eq!("value1", value1);
assert_eq!("value2", value2);
Ok(())
}
```
# Transactions
[Redis Transactions](https://redis.io/docs/manual/transactions/) allow the execution of a group of commands in a single step.
All the commands in a transaction are serialized and executed sequentially.
A request sent by another client will never be served in the middle of the execution of a Redis Transaction.
This guarantees that the commands are executed as a single isolated operation.
### API description
You can create a transaction on a client instance by calling the associated fonction [`create_transaction`](Client::create_transaction).
Be sure to store the transaction instance in a mutable variable because a transaction requires an exclusive access.
Once the transaction is created, you can use exactly the same commands that you would directly use on a client instance.
This is possible because the [`Transaction`] implements all the built-in [command traits](crate::commands).
The main difference, is that you have to choose for each command:
* to [`queue`](BatchPreparedCommand::queue) it, meaning that the [`Transaction`] instance will queue the command in an internal
queue to be able to send later the batch of commands to the Redis server.
* to [`forget`](BatchPreparedCommand::forget) it, meaning that the command will be queued as well **BUT** its response won't be awaited
by the [`Transaction`] instance.
Finally, call the [`execute`](Transaction::execute) associated function.
It is the caller responsability to use the right type to cast the server response
to the right tuple or collection depending on which command has been
[queued](BatchPreparedCommand::queue) or [forgotten](BatchPreparedCommand::forget).
The most generic type that can be requested as a result is `Vec<(resp::Value)>`
### Example
```
use rustis::{
client::{Client, Transaction, BatchPreparedCommand},
commands::StringCommands,
resp::{cmd, Value}, Result,
};
#[cfg_attr(feature = "tokio-runtime", tokio::main)]
#[cfg_attr(feature = "async-std-runtime", async_std::main)]
async fn main() -> Result<()> {
let client = Client::connect("127.0.0.1:6379").await?;
let mut transaction = client.create_transaction();
transaction.set("key1", "value1").forget();
transaction.set("key2", "value2").forget();
transaction.get::<()>("key1").queue();
let value: String = transaction.execute().await?;
assert_eq!("value1", value);
Ok(())
}
```
# Pub/Sub
[`Pub/Sub`](https://redis.io/docs/manual/pubsub/) is a Redis architecture were senders can publish messages into channels
and subscribers can subscribe by channel names or patterns to receive messages.
### Publishing
To publish a message, you can call the [`publish`](crate::commands::PubSubCommands::publish)
associated function on its dedicated trait.
It also possible to use the sharded flavor of the publish function: [`spublish`](crate::commands::PubSubCommands::spublish).
### Subscribing
**rustis** implements subsribing through an async [`Stream`](https://docs.rs/futures/latest/futures/stream/trait.Stream.html).
You can create a [`PubSubStream`] by calling [`subscribe`](crate::commands::PubSubCommands::subscribe),
[`psubscribe`](crate::commands::PubSubCommands::psubscribe), or [`ssubscribe`](crate::commands::PubSubCommands::ssubscribe).
Then by calling [`next`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.next) on the pub/sub stream, you can
wait for an incoming message in the form of the struct [`PubSubMessage`].
You can also create a [`PubSubStream`] without an upfront subscription by calling [`create_pub_sub`](crate::client::Client::create_pub_sub).
### Managing Multiplexed Subscriptions
Because **rustis** implements the RESP3 protocol, there is no limitation when using subscriptions on a multiplexed connection.
Pub/Sub messages and regular command responses are cleanly distinguished at the protocol level,
allowing both to coexist safely on the same shared connection.
### Simple Example
```
use rustis::{
client::{Client, ClientPreparedCommand},
commands::{FlushingMode, PubSubCommands, ServerCommands},
resp::{cmd, Value}, Result,
};
use futures_util::StreamExt;
#[cfg_attr(feature = "tokio-runtime", tokio::main)]
#[cfg_attr(feature = "async-std-runtime", async_std::main)]
async fn main() -> Result<()> {
let subscribing_client = Client::connect("127.0.0.1:6379").await?;
let publishing_client = Client::connect("127.0.0.1:6379").await?;
subscribing_client.flushdb(FlushingMode::Sync).await?;
// Create a subscription from the subscribing client:
let mut pub_sub_stream = subscribing_client.subscribe("mychannel").await?;
// The publishing client publishes a message on the channel:
publishing_client.publish("mychannel", "mymessage").await?;
// Let's now iterate over messages received:
while let Some(Ok(message)) = pub_sub_stream.next().await {
assert_eq!(b"mychannel".to_vec(), message.channel);
assert_eq!(b"mymessage".to_vec(), message.payload);
break;
}
Ok(())
}
```
Once the stream has been created, it is still possible to add additional subscriptions
by calling [`subscribe`](PubSubStream::subscribe), [`psubscribe`](PubSubStream::psubscribe)
or [`ssubscribe`](PubSubStream::ssubscribe) on the [`PubSubStream`] instance.
### Split Stream Example
To make it easy to modify subscriptions while iterating over messages, you can use the [`split`](PubSubStream::split) method to
split the stream into [sink](PubSubSplitSink) and [stream](PubSubSplitStream) parts. Once this is done, you call [`subscribe`](PubSubSplitSink::subscribe)
or [`unsubscribe`](PubSubSplitSink::unsubscribe) (and related methods) on the sink while the split stream is used only for iteration. This can be useful
when you want to split ownership between async tasks.
```
use rustis::{
client::{Client, ClientPreparedCommand},
commands::{FlushingMode, PubSubCommands, ServerCommands},
resp::{cmd, Value}, Result,
};
use futures_util::StreamExt;
#[cfg_attr(feature = "tokio-runtime", tokio::main)]
#[cfg_attr(feature = "async-std-runtime", async_std::main)]
async fn main() -> Result<()> {
let subscribing_client = Client::connect("127.0.0.1:6379").await?;
let regular_client = Client::connect("127.0.0.1:6379").await?;
regular_client.flushdb(FlushingMode::Sync).await?;
// This time we will split the stream into sink and stream parts:
let (mut sink, mut stream) = subscribing_client.subscribe("mychannel").await?.split();
// You can then subscribe or unsubscribe using the sink.
// Typically you would pass ownership of the sink to another async task.
sink.subscribe("otherchannel").await?;
sink.psubscribe("o*").await?;
regular_client.publish("mychannel", "mymessage").await?;
// Iterate over messages using the split stream:
while let Some(Ok(message)) = stream.next().await {
assert_eq!(b"mychannel".to_vec(), message.channel);
assert_eq!(b"mymessage".to_vec(), message.payload);
break;
}
Ok(())
}
```
*/
pub use *;
pub use *;
pub use *;
pub use *;
pub use *;
pub use *;
pub use *;
pub use *;
pub use *;
pub use *;