rusher_pubsub/
lib.rs

1#![allow(async_fn_in_trait)]
2
3use std::{collections::HashSet, error::Error};
4
5use fred::prelude::RedisClient;
6use futures::stream::BoxStream;
7use serde::{de::DeserializeOwned, Serialize};
8
9pub mod memory;
10pub mod redis;
11
12pub(crate) type BoxError = Box<dyn Error + Send + Sync>;
13
14pub trait Broker: Clone {
15    type Conn: Connection;
16    async fn connect(&self) -> Result<Self::Conn, BoxError>;
17    async fn subscribers_count(&self, channel: &str) -> usize;
18    async fn subscriptions(&self) -> HashSet<(String, usize)>;
19    async fn publish(&self, channel: &str, msg: impl Serialize) -> Result<(), BoxError>;
20    fn all_messages<T: DeserializeOwned + Send + 'static>(&self) -> BoxStream<'static, T>;
21}
22
23pub trait Connection {
24    async fn authenticate(&mut self, user_id: &str, data: impl Serialize) -> Result<(), BoxError>;
25    async fn publish(&mut self, channel: &str, msg: impl Serialize) -> Result<(), BoxError>;
26    async fn subscribe(&mut self, channel: &str) -> Result<(), BoxError>;
27    async fn unsubscribe(&mut self, channel: &str) -> Result<(), BoxError>;
28    async fn recv<T: DeserializeOwned>(&mut self) -> Result<T, BoxError>;
29    async fn try_recv<T: DeserializeOwned>(&mut self) -> Result<Option<T>, BoxError>;
30}
31
32#[derive(Debug, Clone)]
33pub enum AnyBroker {
34    Memory(memory::MemoryBroker),
35    Redis(redis::RedisBroker),
36}
37
38impl AnyBroker {
39    pub fn memory() -> Self {
40        Self::Memory(memory::MemoryBroker::default())
41    }
42
43    pub async fn redis(url: &str, namespace: &str) -> Result<Self, BoxError> {
44        Ok(Self::Redis(
45            redis::RedisBroker::from_url(url, namespace).await?,
46        ))
47    }
48
49    pub async fn redis_single(
50        publisher: RedisClient,
51        subscriber: RedisClient,
52        namespace: &str,
53    ) -> Result<Self, BoxError> {
54        Ok(Self::Redis(
55            redis::RedisBroker::from_connection_pair(publisher, subscriber, namespace).await?,
56        ))
57    }
58}
59
60impl Broker for AnyBroker {
61    type Conn = AnyConnection;
62
63    async fn connect(&self) -> Result<Self::Conn, BoxError> {
64        match self {
65            Self::Memory(broker) => Ok(AnyConnection::Memory(broker.connect().await?)),
66            Self::Redis(broker) => Ok(AnyConnection::Redis(broker.connect().await?)),
67        }
68    }
69
70    async fn subscribers_count(&self, channel: &str) -> usize {
71        match self {
72            Self::Memory(broker) => broker.subscribers_count(channel).await,
73            Self::Redis(broker) => broker.subscribers_count(channel).await,
74        }
75    }
76
77    async fn subscriptions(&self) -> HashSet<(String, usize)> {
78        match self {
79            Self::Memory(broker) => broker.subscriptions().await,
80            Self::Redis(broker) => broker.subscriptions().await,
81        }
82    }
83
84    async fn publish(&self, channel: &str, msg: impl Serialize) -> Result<(), BoxError> {
85        match self {
86            Self::Memory(broker) => broker.publish(channel, msg).await,
87            Self::Redis(broker) => broker.publish(channel, msg).await,
88        }
89    }
90
91    fn all_messages<T: DeserializeOwned + Send + 'static>(&self) -> BoxStream<'static, T> {
92        match self {
93            Self::Memory(broker) => broker.all_messages(),
94            Self::Redis(broker) => broker.all_messages(),
95        }
96    }
97}
98
99pub enum AnyConnection {
100    Memory(memory::MemoryConnection),
101    Redis(redis::RedisConnection),
102}
103
104impl Connection for AnyConnection {
105    async fn authenticate(&mut self, user_id: &str, data: impl Serialize) -> Result<(), BoxError> {
106        match self {
107            Self::Memory(broker) => broker.authenticate(user_id, data).await,
108            Self::Redis(broker) => broker.authenticate(user_id, data).await,
109        }
110    }
111
112    async fn publish(&mut self, channel: &str, msg: impl Serialize) -> Result<(), BoxError> {
113        match self {
114            Self::Memory(broker) => broker.publish(channel, msg).await,
115            Self::Redis(broker) => broker.publish(channel, msg).await,
116        }
117    }
118
119    async fn subscribe(&mut self, channel: &str) -> Result<(), BoxError> {
120        match self {
121            Self::Memory(broker) => broker.subscribe(channel).await,
122            Self::Redis(broker) => broker.subscribe(channel).await,
123        }
124    }
125
126    async fn unsubscribe(&mut self, channel: &str) -> Result<(), BoxError> {
127        match self {
128            Self::Memory(broker) => broker.unsubscribe(channel).await,
129            Self::Redis(broker) => broker.unsubscribe(channel).await,
130        }
131    }
132
133    async fn recv<T: DeserializeOwned>(&mut self) -> Result<T, BoxError> {
134        match self {
135            Self::Memory(broker) => broker.recv().await,
136            Self::Redis(broker) => broker.recv().await,
137        }
138    }
139
140    async fn try_recv<T: DeserializeOwned>(&mut self) -> Result<Option<T>, BoxError> {
141        match self {
142            Self::Memory(broker) => broker.try_recv().await,
143            Self::Redis(broker) => broker.try_recv().await,
144        }
145    }
146}