socketioxide_redis/drivers/
mod.rs1use std::{future::Future, pin::Pin, task};
2
3use futures_core::Stream;
4use pin_project_lite::pin_project;
5use tokio::sync::mpsc;
6
7#[cfg(feature = "redis")]
9#[cfg_attr(docsrs, doc(cfg(feature = "redis")))]
10pub mod redis;
11
12#[cfg(feature = "fred")]
14#[cfg_attr(docsrs, doc(cfg(feature = "fred")))]
15pub mod fred;
16
17pin_project! {
18 #[derive(Debug)]
21 pub struct MessageStream<T> {
22 #[pin]
23 rx: mpsc::Receiver<T>,
24 }
25}
26
27impl<T> MessageStream<T> {
28 pub fn new_empty() -> Self {
30 let (_, rx) = mpsc::channel(1);
32 Self { rx }
33 }
34 pub fn new(rx: mpsc::Receiver<T>) -> Self {
36 Self { rx }
37 }
38}
39
40impl<T> Stream for MessageStream<T> {
41 type Item = T;
42
43 fn poll_next(
44 self: Pin<&mut Self>,
45 cx: &mut task::Context<'_>,
46 ) -> task::Poll<Option<Self::Item>> {
47 self.project().rx.poll_recv(cx)
48 }
49}
50
51pub type ChanItem = (String, Vec<u8>);
53
54pub trait Driver: Clone + Send + Sync + 'static {
57 type Error: std::error::Error + Send + 'static;
59
60 fn publish(
62 &self,
63 chan: String,
64 val: Vec<u8>,
65 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
66
67 fn subscribe(
70 &self,
71 chan: String,
72 size: usize,
73 ) -> impl Future<Output = Result<MessageStream<ChanItem>, Self::Error>> + Send;
74
75 fn unsubscribe(&self, pat: String) -> impl Future<Output = Result<(), Self::Error>> + Send;
77
78 fn num_serv(&self, chan: &str) -> impl Future<Output = Result<u16, Self::Error>> + Send;
80}