socketioxide_redis/drivers/
mod.rs

1use std::{future::Future, pin::Pin, task};
2
3use futures_core::Stream;
4use pin_project_lite::pin_project;
5use tokio::sync::mpsc;
6
7/// A driver implementation for the [redis](docs.rs/redis) pub/sub backend.
8#[cfg(feature = "redis")]
9#[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
10pub mod redis;
11
12/// A driver implementation for the [fred](docs.rs/fred) pub/sub backend.
13#[cfg(feature = "fred")]
14#[cfg_attr(docsrs, doc(cfg(feature = "fred")))]
15pub mod fred;
16
17pin_project! {
18    /// A stream of raw messages received from a channel.
19    /// Messages are encoded with msgpack.
20    #[derive(Debug)]
21    pub struct MessageStream<T> {
22        #[pin]
23        rx: mpsc::Receiver<T>,
24    }
25}
26
27impl<T> MessageStream<T> {
28    /// Create a new empty message stream.
29    pub fn new_empty() -> Self {
30        // mpsc bounded channel requires buffer > 0
31        let (_, rx) = mpsc::channel(1);
32        Self { rx }
33    }
34    /// Create a new message stream from a receiver.
35    pub fn new(rx: mpsc::Receiver<T>) -> Self {
36        Self { rx }
37    }
38}
39
40impl<T> Stream for MessageStream<T> {
41    type Item = T;
42
43    fn poll_next(
44        self: Pin<&mut Self>,
45        cx: &mut task::Context<'_>,
46    ) -> task::Poll<Option<Self::Item>> {
47        self.project().rx.poll_recv(cx)
48    }
49}
50
51/// A message item that can be returned from a channel.
52pub type ChanItem = (String, Vec<u8>);
53
54/// The driver trait can be used to support different pub/sub backends.
55/// It must share handlers/connection between its clones.
56pub trait Driver: Clone + Send + Sync + 'static {
57    /// The error type for the driver.
58    type Error: std::error::Error + Send + 'static;
59
60    /// Publish a message to a channel.
61    fn publish(
62        &self,
63        chan: String,
64        val: Vec<u8>,
65    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
66
67    /// Subscribe to a channel, it will return a stream of messages.
68    /// The size parameter is the buffer size of the channel.
69    fn subscribe(
70        &self,
71        chan: String,
72        size: usize,
73    ) -> impl Future<Output = Result<MessageStream<ChanItem>, Self::Error>> + Send;
74
75    /// Unsubscribe from a channel.
76    fn unsubscribe(&self, pat: String) -> impl Future<Output = Result<(), Self::Error>> + Send;
77
78    /// Returns the number of socket.io servers.
79    fn num_serv(&self, chan: &str) -> impl Future<Output = Result<u16, Self::Error>> + Send;
80}