cluster
and aio
only.Expand description
This module provides async functionality for connecting to Redis / Valkey Clusters.
The cluster connection is meant to abstract the fact that a cluster is composed of multiple nodes, and to provide an API which is as close as possible to that of a single node connection. In order to do that, the cluster connection maintains connections to each node in the Redis/ Valkey cluster, and can route requests automatically to the relevant nodes. In cases that the cluster connection receives indications that the cluster topology has changed, it will query nodes in order to find the current cluster topology. If it disconnects from some nodes, it will automatically reconnect to those nodes.
By default, ClusterConnection
makes use of MultiplexedConnection
and maintains a pool
of connections to each node in the cluster.
§Example
use redis::cluster::ClusterClient;
use redis::AsyncCommands;
async fn fetch_an_integer() -> String {
let nodes = vec!["redis://127.0.0.1/"];
let client = ClusterClient::new(nodes).unwrap();
let mut connection = client.get_async_connection().await.unwrap();
let _: () = connection.set("test", "test_data").await.unwrap();
let rv: String = connection.get("test").await.unwrap();
return rv;
}
§Pipelining
use redis::cluster::ClusterClient;
use redis::{Value, AsyncCommands};
async fn fetch_an_integer() -> redis::RedisResult<()> {
let nodes = vec!["redis://127.0.0.1/"];
let client = ClusterClient::new(nodes).unwrap();
let mut connection = client.get_async_connection().await.unwrap();
let key = "test";
redis::pipe()
.rpush(key, "123").ignore()
.ltrim(key, -10, -1).ignore()
.expire(key, 60).ignore()
.exec_async(&mut connection).await
}
§Pubsub
Pubsub, and generally receiving push messages from the cluster nodes, is now supported when defining a connection with crate::ProtocolVersion::RESP3 and some crate::aio::AsyncPushSender to receive the messages on.
use redis::cluster::ClusterClientBuilder;
use redis::{Value, AsyncCommands};
async fn fetch_an_integer() -> redis::RedisResult<()> {
let nodes = vec!["redis://127.0.0.1/?protocol=3"];
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let client = ClusterClientBuilder::new(nodes)
.use_protocol(redis::ProtocolVersion::RESP3)
.push_sender(tx).build()?;
let mut connection = client.get_async_connection().await?;
connection.subscribe("channel").await?;
while let Some(msg) = rx.recv().await {
println!("Got: {:?}", msg);
}
Ok(())
}
§Sending request to specific node
In some cases you’d want to send a request to a specific node in the cluster, instead of letting the cluster connection decide by itself to which node it should send the request. This can happen, for example, if you want to send SCAN commands to each node in the cluster.
use redis::cluster::ClusterClient;
use redis::{Value, AsyncCommands};
use redis::cluster_routing::{ RoutingInfo, SingleNodeRoutingInfo };
async fn fetch_an_integer() -> redis::RedisResult<Value> {
let nodes = vec!["redis://127.0.0.1/"];
let client = ClusterClient::new(nodes)?;
let mut connection = client.get_async_connection().await?;
let routing_info = RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress{
host: "redis://127.0.0.1".to_string(),
port: 6378
});
connection.route_command(&redis::cmd("PING"), routing_info).await
}
Structs§
- Cluster
Connection - This represents an async Redis Cluster connection.
Traits§
- Connect
- Implements the process of connecting to a Redis server and obtaining a connection handle.