use crate::error::PubSubError;
use async_trait::async_trait;
use serde::Serialize;
use std::future::Future;
use tokio::{sync::oneshot, task::JoinHandle};
mod memory;
#[cfg(feature = "redis-backend")]
mod redis;
pub use memory::InMemoryPubSub;
#[cfg(feature = "redis-backend")]
pub use redis::{BackoffConfig, RedisPubSub};
pub struct Subscription {
handle: JoinHandle<()>,
stop_tx: Option<oneshot::Sender<()>>, }
impl Subscription {
pub(crate) fn new(handle: JoinHandle<()>, stop_tx: oneshot::Sender<()>) -> Self {
Self {
handle,
stop_tx: Some(stop_tx),
}
}
pub async fn stop(mut self) {
if let Some(tx) = self.stop_tx.take() {
let _ = tx.send(());
}
let handle = self.handle;
let _ = handle.await;
}
pub fn abort(self) {
self.handle.abort();
}
pub fn is_active(&self) -> bool {
!self.handle.is_finished()
}
}
#[async_trait]
pub trait PubSubBackend: Send + Sync {
async fn publish_bytes(&self, topic: &str, payload: Vec<u8>) -> Result<(), PubSubError>;
async fn subscribe_bytes(
&self,
topic: &str,
handler: Box<dyn Fn(Vec<u8>) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send>> + Send>,
) -> Result<Subscription, PubSubError>;
}
#[async_trait]
pub trait PubSubExt: PubSubBackend {
async fn publish<T: Serialize + Send + Sync>(
&self,
topic: &str,
msg: &T,
) -> Result<(), PubSubError> {
let payload = serde_json::to_vec(msg)?;
self.publish_bytes(topic, payload).await
}
async fn subscribe<F, Fut>(&self, topic: &str, handler: F) -> Result<Subscription, PubSubError>
where
F: Fn(Vec<u8>) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
self
.subscribe_bytes(
topic,
Box::new(move |bytes| Box::pin(handler(bytes))),
)
.await
}
}
impl<T: PubSubBackend + ?Sized> PubSubExt for T {}