1#![allow(async_fn_in_trait)]
2
3use std::{collections::HashSet, error::Error};
4
5use fred::prelude::RedisClient;
6use futures::stream::BoxStream;
7use serde::{de::DeserializeOwned, Serialize};
8
9pub mod memory;
10pub mod redis;
11
12pub(crate) type BoxError = Box<dyn Error + Send + Sync>;
13
14pub trait Broker: Clone {
15 type Conn: Connection;
16 async fn connect(&self) -> Result<Self::Conn, BoxError>;
17 async fn subscribers_count(&self, channel: &str) -> usize;
18 async fn subscriptions(&self) -> HashSet<(String, usize)>;
19 async fn publish(&self, channel: &str, msg: impl Serialize) -> Result<(), BoxError>;
20 fn all_messages<T: DeserializeOwned + Send + 'static>(&self) -> BoxStream<'static, T>;
21}
22
23pub trait Connection {
24 async fn authenticate(&mut self, user_id: &str, data: impl Serialize) -> Result<(), BoxError>;
25 async fn publish(&mut self, channel: &str, msg: impl Serialize) -> Result<(), BoxError>;
26 async fn subscribe(&mut self, channel: &str) -> Result<(), BoxError>;
27 async fn unsubscribe(&mut self, channel: &str) -> Result<(), BoxError>;
28 async fn recv<T: DeserializeOwned>(&mut self) -> Result<T, BoxError>;
29 async fn try_recv<T: DeserializeOwned>(&mut self) -> Result<Option<T>, BoxError>;
30}
31
32#[derive(Debug, Clone)]
33pub enum AnyBroker {
34 Memory(memory::MemoryBroker),
35 Redis(redis::RedisBroker),
36}
37
38impl AnyBroker {
39 pub fn memory() -> Self {
40 Self::Memory(memory::MemoryBroker::default())
41 }
42
43 pub async fn redis(url: &str, namespace: &str) -> Result<Self, BoxError> {
44 Ok(Self::Redis(
45 redis::RedisBroker::from_url(url, namespace).await?,
46 ))
47 }
48
49 pub async fn redis_single(
50 publisher: RedisClient,
51 subscriber: RedisClient,
52 namespace: &str,
53 ) -> Result<Self, BoxError> {
54 Ok(Self::Redis(
55 redis::RedisBroker::from_connection_pair(publisher, subscriber, namespace).await?,
56 ))
57 }
58}
59
60impl Broker for AnyBroker {
61 type Conn = AnyConnection;
62
63 async fn connect(&self) -> Result<Self::Conn, BoxError> {
64 match self {
65 Self::Memory(broker) => Ok(AnyConnection::Memory(broker.connect().await?)),
66 Self::Redis(broker) => Ok(AnyConnection::Redis(broker.connect().await?)),
67 }
68 }
69
70 async fn subscribers_count(&self, channel: &str) -> usize {
71 match self {
72 Self::Memory(broker) => broker.subscribers_count(channel).await,
73 Self::Redis(broker) => broker.subscribers_count(channel).await,
74 }
75 }
76
77 async fn subscriptions(&self) -> HashSet<(String, usize)> {
78 match self {
79 Self::Memory(broker) => broker.subscriptions().await,
80 Self::Redis(broker) => broker.subscriptions().await,
81 }
82 }
83
84 async fn publish(&self, channel: &str, msg: impl Serialize) -> Result<(), BoxError> {
85 match self {
86 Self::Memory(broker) => broker.publish(channel, msg).await,
87 Self::Redis(broker) => broker.publish(channel, msg).await,
88 }
89 }
90
91 fn all_messages<T: DeserializeOwned + Send + 'static>(&self) -> BoxStream<'static, T> {
92 match self {
93 Self::Memory(broker) => broker.all_messages(),
94 Self::Redis(broker) => broker.all_messages(),
95 }
96 }
97}
98
99pub enum AnyConnection {
100 Memory(memory::MemoryConnection),
101 Redis(redis::RedisConnection),
102}
103
104impl Connection for AnyConnection {
105 async fn authenticate(&mut self, user_id: &str, data: impl Serialize) -> Result<(), BoxError> {
106 match self {
107 Self::Memory(broker) => broker.authenticate(user_id, data).await,
108 Self::Redis(broker) => broker.authenticate(user_id, data).await,
109 }
110 }
111
112 async fn publish(&mut self, channel: &str, msg: impl Serialize) -> Result<(), BoxError> {
113 match self {
114 Self::Memory(broker) => broker.publish(channel, msg).await,
115 Self::Redis(broker) => broker.publish(channel, msg).await,
116 }
117 }
118
119 async fn subscribe(&mut self, channel: &str) -> Result<(), BoxError> {
120 match self {
121 Self::Memory(broker) => broker.subscribe(channel).await,
122 Self::Redis(broker) => broker.subscribe(channel).await,
123 }
124 }
125
126 async fn unsubscribe(&mut self, channel: &str) -> Result<(), BoxError> {
127 match self {
128 Self::Memory(broker) => broker.unsubscribe(channel).await,
129 Self::Redis(broker) => broker.unsubscribe(channel).await,
130 }
131 }
132
133 async fn recv<T: DeserializeOwned>(&mut self) -> Result<T, BoxError> {
134 match self {
135 Self::Memory(broker) => broker.recv().await,
136 Self::Redis(broker) => broker.recv().await,
137 }
138 }
139
140 async fn try_recv<T: DeserializeOwned>(&mut self) -> Result<Option<T>, BoxError> {
141 match self {
142 Self::Memory(broker) => broker.try_recv().await,
143 Self::Redis(broker) => broker.try_recv().await,
144 }
145 }
146}