use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use dashmap::DashMap;
use tokio::sync::broadcast;
use crate::error::Error;
pub type RelayFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
pub trait RelayBackend: Send + Sync + 'static {
fn push(&self, topic: &str, json: Arc<String>) -> RelayFuture<'_, Result<(), Error>>;
fn subscribe(&self, topic: &str) -> RelayFuture<'_, Box<dyn TopicReceiver>>;
}
pub trait TopicReceiver: Send + 'static {
fn recv(&mut self) -> RelayFuture<'_, Option<Arc<String>>>;
}
pub struct InMemoryBackend {
topics: Arc<DashMap<String, broadcast::Sender<Arc<String>>>>,
topic_capacity: usize,
}
impl InMemoryBackend {
pub fn new(topic_capacity: usize) -> Self {
Self {
topics: Arc::new(DashMap::new()),
topic_capacity,
}
}
}
impl RelayBackend for InMemoryBackend {
fn push(&self, topic: &str, json: Arc<String>) -> RelayFuture<'_, Result<(), Error>> {
if let Some(tx) = self.topics.get(topic) {
let _ = tx.send(json);
}
Box::pin(std::future::ready(Ok(())))
}
fn subscribe(&self, topic: &str) -> RelayFuture<'_, Box<dyn TopicReceiver>> {
let tx = self
.topics
.entry(topic.to_owned())
.or_insert_with(|| broadcast::channel(self.topic_capacity).0)
.clone();
let rx = tx.subscribe();
let receiver = BroadcastReceiver {
rx,
topic: topic.to_owned(),
topics: Arc::clone(&self.topics),
};
Box::pin(std::future::ready(
Box::new(receiver) as Box<dyn TopicReceiver>
))
}
}
struct BroadcastReceiver {
rx: broadcast::Receiver<Arc<String>>,
topic: String,
topics: Arc<DashMap<String, broadcast::Sender<Arc<String>>>>,
}
impl TopicReceiver for BroadcastReceiver {
fn recv(&mut self) -> RelayFuture<'_, Option<Arc<String>>> {
Box::pin(async {
loop {
match self.rx.recv().await {
Ok(msg) => return Some(msg),
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => return None,
}
}
})
}
}
impl Drop for BroadcastReceiver {
fn drop(&mut self) {
self.topics
.remove_if(&self.topic, |_, tx| tx.receiver_count() == 0);
}
}