Crate socketioxide_redis

Source
Expand description

§A redis/valkey adapter implementation for the socketioxide crate.

The adapter is used to communicate with other nodes of the same application. This allows to broadcast messages to sockets connected on other servers, to get the list of rooms, to add or remove sockets from rooms, etc.

To achieve this, the adapter uses a pub/sub system through Redis to communicate with other servers.

The Driver abstraction allows the use of any pub/sub client. Three implementations are provided:

When using redis clusters, the drivers employ sharded pub/sub to distribute the load across Redis nodes.

You can also implement your own driver by implementing the Driver trait.

The provided driver implementations are using RESP3 for efficiency purposes. Make sure your redis server supports it (redis v7 and above). If not, you can implement your own driver using the RESP2 protocol.
Socketioxide-Redis is not compatible with @socketio/redis-adapter and @socketio/redis-emitter. They use completely different protocols and cannot be used together. Do not mix socket.io JS servers with socketioxide rust servers.

§Example with the redis driver

async fn on_connect<A: Adapter>(socket: SocketRef<A>) {
    socket.join("room1");
    socket.on("event", on_event);
    let _ = socket.broadcast().emit("hello", "world").await.ok();
}
async fn on_event<A: Adapter>(socket: SocketRef<A>, Data(data): Data<String>) {}

let client = redis::Client::open("redis://127.0.0.1:6379?protocol=RESP3")?;
let adapter = RedisAdapterCtr::new_with_redis(&client).await?;
let (layer, io) = SocketIo::builder()
    .with_adapter::<RedisAdapter<_>>(adapter)
    .build_layer();
Ok(())

§Example with the fred driver

async fn on_connect<A: Adapter>(socket: SocketRef<A>) {
    socket.join("room1");
    socket.on("event", on_event);
    let _ = socket.broadcast().emit("hello", "world").await.ok();
}
async fn on_event<A: Adapter>(socket: SocketRef<A>, Data(data): Data<String>) {}

let mut config = fred::prelude::Config::from_url("redis://127.0.0.1:6379?protocol=resp3")?;
// We need to manually set the RESP3 version because
// the fred crate does not parse the protocol query parameter.
config.version = RespVersion::RESP3;
let client = fred::prelude::Builder::from_config(config).build_subscriber_client()?;
let adapter = RedisAdapterCtr::new_with_fred(client).await?;
let (layer, io) = SocketIo::builder()
    .with_adapter::<FredAdapter<_>>(adapter)
    .build_layer();
Ok(())

§Example with the redis cluster driver

async fn on_connect<A: Adapter>(socket: SocketRef<A>) {
    socket.join("room1");
    socket.on("event", on_event);
    let _ = socket.broadcast().emit("hello", "world").await.ok();
}
async fn on_event<A: Adapter>(socket: SocketRef<A>, Data(data): Data<String>) {}

// single node cluster
let client = redis::cluster::ClusterClient::new(["redis://127.0.0.1:6379?protocol=resp3"])?;
let adapter = RedisAdapterCtr::new_with_cluster(&client).await?;

let (layer, io) = SocketIo::builder()
    .with_adapter::<ClusterAdapter<_>>(adapter)
    .build_layer();
Ok(())

Check the chat example for more complete examples.

§How does it work?

An adapter is created for each created namespace and it takes a corresponding CoreLocalAdapter. The CoreLocalAdapter allows to manage the local rooms and local sockets. The default LocalAdapter is simply a wrapper around this CoreLocalAdapter.

The adapter is then initialized with the RedisAdapter::init method. This will subscribe to 3 channels:

  • "{prefix}-request#{namespace}#": A global channel to receive broadcasted requests.
  • "{prefix}-request#{namespace}#{uid}#": A specific channel to receive requests only for this server.
  • "{prefix}-response#{namespace}#{uid}#": A specific channel to receive responses only for this server. Messages sent to this channel will be always in the form [req_id, data]. This will allow the adapter to extract the request id and route the response to the approriate stream before deserializing the data.

All messages are encoded with msgpack.

There are 7 types of requests:

  • Broadcast a packet to all the matching sockets.
  • Broadcast a packet to all the matching sockets and wait for a stream of acks.
  • Disconnect matching sockets.
  • Get all the rooms.
  • Add matching sockets to rooms.
  • Remove matching sockets to rooms.
  • Fetch all the remote sockets matching the options.

For ack streams, the adapter will first send a BroadcastAckCount response to the server that sent the request, and then send the acks as they are received (more details in RedisAdapter::broadcast_with_ack fn).

On the other side, each time an action has to be performed on the local server, the adapter will first broadcast a request to all the servers and then perform the action locally.

Modules§

drivers
Drivers are an abstraction over the pub/sub backend used by the adapter. You can use the provided implementation or implement your own.

Structs§

CustomRedisAdapter
The redis adapter implementation. It is generic over the Driver used to communicate with the redis server. And over the SocketEmitter used to communicate with the local server. This allows to avoid cyclic dependencies between the adapter, socketioxide-core and socketioxide crates.
InitRes
The result of the init future.
RedisAdapterConfig
The configuration of the RedisAdapter.
RedisAdapterCtr
The adapter constructor. For each namespace you define, a new adapter instance is created from this constructor.

Enums§

Error
Represent any error that might happen when using this adapter.
InitError
Error that can happen when initializing the adapter.

Type Aliases§

ClusterAdapterredis-cluster
The redis adapter with the redis cluster driver.
FredAdapterfred
The redis adapter with the fred driver.
RedisAdapterredis
The redis adapter with the redis driver.