Module cluster_async

Source
Available on crate features cluster and aio only.
Expand description

This module provides async functionality for connecting to Redis / Valkey Clusters.

The cluster connection is meant to abstract the fact that a cluster is composed of multiple nodes, and to provide an API which is as close as possible to that of a single node connection. In order to do that, the cluster connection maintains connections to each node in the Redis/ Valkey cluster, and can route requests automatically to the relevant nodes. In cases that the cluster connection receives indications that the cluster topology has changed, it will query nodes in order to find the current cluster topology. If it disconnects from some nodes, it will automatically reconnect to those nodes.

By default, ClusterConnection makes use of MultiplexedConnection and maintains a pool of connections to each node in the cluster.

§Example

use redis::cluster::ClusterClient;
use redis::AsyncCommands;

async fn fetch_an_integer() -> String {
    let nodes = vec!["redis://127.0.0.1/"];
    let client = ClusterClient::new(nodes).unwrap();
    let mut connection = client.get_async_connection().await.unwrap();
    let _: () = connection.set("test", "test_data").await.unwrap();
    let rv: String = connection.get("test").await.unwrap();
    return rv;
}

§Pipelining

use redis::cluster::ClusterClient;
use redis::{Value, AsyncCommands};

async fn fetch_an_integer() -> redis::RedisResult<()> {
    let nodes = vec!["redis://127.0.0.1/"];
    let client = ClusterClient::new(nodes).unwrap();
    let mut connection = client.get_async_connection().await.unwrap();
    let key = "test";

    redis::pipe()
        .rpush(key, "123").ignore()
        .ltrim(key, -10, -1).ignore()
        .expire(key, 60).ignore()
        .exec_async(&mut connection).await
}

§Pubsub

Pubsub, and generally receiving push messages from the cluster nodes, is now supported when defining a connection with crate::ProtocolVersion::RESP3 and some crate::aio::AsyncPushSender to receive the messages on.

use redis::cluster::ClusterClientBuilder;
use redis::{Value, AsyncCommands};

async fn fetch_an_integer() -> redis::RedisResult<()> {
    let nodes = vec!["redis://127.0.0.1/?protocol=3"];
    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
    let client = ClusterClientBuilder::new(nodes)
        .use_protocol(redis::ProtocolVersion::RESP3)
        .push_sender(tx).build()?;
    let mut connection = client.get_async_connection().await?;
    connection.subscribe("channel").await?;
    while let Some(msg) = rx.recv().await {
        println!("Got: {:?}", msg);
    }
    Ok(())
}

§Sending request to specific node

In some cases you’d want to send a request to a specific node in the cluster, instead of letting the cluster connection decide by itself to which node it should send the request. This can happen, for example, if you want to send SCAN commands to each node in the cluster.

use redis::cluster::ClusterClient;
use redis::{Value, AsyncCommands};
use redis::cluster_routing::{ RoutingInfo, SingleNodeRoutingInfo };

async fn fetch_an_integer() -> redis::RedisResult<Value> {
    let nodes = vec!["redis://127.0.0.1/"];
    let client = ClusterClient::new(nodes)?;
    let mut connection = client.get_async_connection().await?;
    let routing_info = RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress{
        host: "redis://127.0.0.1".to_string(),
        port: 6378
    });
    connection.route_command(&redis::cmd("PING"), routing_info).await
}

Structs§

ClusterConnection
This represents an async Redis Cluster connection.

Traits§

Connect
Implements the process of connecting to a Redis server and obtaining a connection handle.