use std::{
collections::HashMap,
pin::Pin,
task::{Context as TaskContext, Poll},
time::Duration,
};
use async_trait::async_trait;
use futures_core::Stream;
use crate::Error;
pub type MessageId = String;
#[async_trait]
pub trait EventBus: Send + Sync + 'static {
async fn publish(&self, subject: &str, payload: &[u8]) -> Result<MessageId, Error>;
async fn publish_with_headers(
&self,
subject: &str,
payload: &[u8],
headers: HashMap<String, String>,
) -> Result<MessageId, Error>;
async fn subscribe(
&self,
subject_pattern: &str,
group: &str,
opts: SubscribeOptions,
) -> Result<Subscription, Error>;
fn system(&self) -> &'static str;
}
#[derive(Clone, Debug)]
pub struct SubscribeOptions {
pub start: StartPosition,
pub visibility_timeout: Duration,
pub max_in_flight: usize,
}
impl Default for SubscribeOptions {
fn default() -> Self {
Self {
start: StartPosition::Now,
visibility_timeout: Duration::from_secs(30),
max_in_flight: 32,
}
}
}
#[derive(Clone, Copy, Debug)]
pub enum StartPosition {
Now,
Earliest,
}
pub struct Subscription {
inner: Pin<Box<dyn Stream<Item = DeliveredMessage> + Send>>,
}
impl Subscription {
pub fn new(s: impl Stream<Item = DeliveredMessage> + Send + 'static) -> Self {
Self { inner: Box::pin(s) }
}
}
impl Stream for Subscription {
type Item = DeliveredMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
self.inner.as_mut().poll_next(cx)
}
}
pub struct DeliveredMessage {
pub id: MessageId,
pub subject: String,
pub payload: Vec<u8>,
pub headers: HashMap<String, String>,
pub delivery_attempt: u32,
acker: Box<dyn Acker>,
}
impl DeliveredMessage {
pub fn new(
id: MessageId,
subject: String,
payload: Vec<u8>,
headers: HashMap<String, String>,
delivery_attempt: u32,
acker: impl Acker + 'static,
) -> Self {
Self {
id,
subject,
payload,
headers,
delivery_attempt,
acker: Box::new(acker),
}
}
pub async fn ack(self) -> Result<(), Error> {
self.acker.ack().await
}
pub async fn nack(self) -> Result<(), Error> {
self.acker.nack(None).await
}
pub async fn nack_with_delay(self, delay: Duration) -> Result<(), Error> {
self.acker.nack(Some(delay)).await
}
}
#[async_trait]
pub trait Acker: Send + Sync {
async fn ack(self: Box<Self>) -> Result<(), Error>;
async fn nack(self: Box<Self>, delay: Option<Duration>) -> Result<(), Error>;
}