use crate::error::BusError;
use crate::ids::{DurableName, JobId};
use async_trait::async_trait;
use bytes::Bytes;
use futures_core::Stream;
use std::collections::HashMap;
use std::pin::Pin;
use std::time::Duration;
pub type Headers = HashMap<String, String>;
pub struct Msg {
pub subject: String,
pub payload: Bytes,
pub headers: Headers,
pub ack: AckHandle,
}
impl std::fmt::Debug for Msg {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Msg")
.field("subject", &self.subject)
.field("payload_len", &self.payload.len())
.field("headers", &self.headers)
.finish()
}
}
pub struct AckHandle(Box<dyn AckHandleImpl>);
impl AckHandle {
pub fn new(inner: Box<dyn AckHandleImpl>) -> Self {
Self(inner)
}
}
#[async_trait]
pub trait AckHandleImpl: Send + Sync {
async fn ack(self: Box<Self>) -> Result<(), BusError>;
async fn nak(self: Box<Self>, delay: Duration) -> Result<(), BusError>;
async fn term(self: Box<Self>) -> Result<(), BusError>;
}
impl AckHandle {
pub async fn ack(self) -> Result<(), BusError> {
self.0.ack().await
}
pub async fn nak(self, delay: Duration) -> Result<(), BusError> {
self.0.nak(delay).await
}
pub async fn term(self) -> Result<(), BusError> {
self.0.term().await
}
}
pub type MsgStream = Pin<Box<dyn Stream<Item = Result<Msg, BusError>> + Send + 'static>>;
#[async_trait]
pub trait Pubsub: Send + Sync {
async fn publish(
&self,
subject: &str,
payload: Bytes,
headers: Headers,
) -> Result<(), BusError>;
async fn subscribe(&self, subject: &str, durable: DurableName) -> Result<MsgStream, BusError>;
}
#[async_trait]
pub trait RequestReply: Send + Sync {
async fn request(
&self,
subject: &str,
payload: Bytes,
timeout: Duration,
) -> Result<Bytes, BusError>;
}
pub type Revision = u64;
#[derive(Debug, Clone)]
pub struct KvEntry {
pub value: Bytes,
pub revision: Revision,
}
#[async_trait]
pub trait KvStore: Send + Sync {
async fn get(&self, bucket: &str, key: &str) -> Result<Option<KvEntry>, BusError>;
async fn put(&self, bucket: &str, key: &str, value: Bytes) -> Result<Revision, BusError>;
async fn cas(
&self,
bucket: &str,
key: &str,
value: Bytes,
expected: Option<Revision>,
) -> Result<Revision, BusError>;
async fn delete(&self, bucket: &str, key: &str) -> Result<(), BusError>;
async fn lease(&self, bucket: &str, key: &str, ttl: Duration) -> Result<Lease, BusError>;
}
pub struct Lease(Box<dyn LeaseImpl>);
impl Lease {
pub fn new(inner: Box<dyn LeaseImpl>) -> Self {
Self(inner)
}
}
#[async_trait]
pub trait LeaseImpl: Send + Sync {
async fn heartbeat(&self) -> Result<(), BusError>;
}
impl Lease {
pub async fn heartbeat(&self) -> Result<(), BusError> {
self.0.heartbeat().await
}
}
#[derive(Debug, Clone)]
pub struct Job {
pub payload: Bytes,
pub dedup_key: Option<String>,
pub max_attempts: Option<u32>,
}
impl Job {
pub fn new(payload: impl Into<Bytes>) -> Self {
Self {
payload: payload.into(),
dedup_key: None,
max_attempts: None,
}
}
}
pub struct ClaimedJob {
pub id: JobId,
pub payload: Bytes,
pub lease: Lease,
pub claim: ClaimHandle,
}
impl std::fmt::Debug for ClaimedJob {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ClaimedJob")
.field("id", &self.id)
.field("payload_len", &self.payload.len())
.finish()
}
}
pub struct ClaimHandle(Box<dyn ClaimHandleImpl>);
impl ClaimHandle {
pub fn new(inner: Box<dyn ClaimHandleImpl>) -> Self {
Self(inner)
}
}
#[async_trait]
pub trait ClaimHandleImpl: Send + Sync {
async fn ack(self: Box<Self>) -> Result<(), BusError>;
async fn nak(self: Box<Self>, delay: Duration) -> Result<(), BusError>;
async fn dead_letter(self: Box<Self>, reason: &str) -> Result<(), BusError>;
}
impl ClaimedJob {
pub async fn heartbeat(&self) -> Result<(), BusError> {
self.lease.heartbeat().await
}
pub async fn ack(self) -> Result<(), BusError> {
self.claim.0.ack().await
}
pub async fn nak(self, delay: Duration) -> Result<(), BusError> {
self.claim.0.nak(delay).await
}
pub async fn dead_letter(self, reason: &str) -> Result<(), BusError> {
self.claim.0.dead_letter(reason).await
}
}
#[async_trait]
pub trait JobQueue: Send + Sync {
async fn enqueue(&self, queue: &str, job: Job) -> Result<JobId, BusError>;
async fn claim(
&self,
queue: &str,
worker_id: &str,
lease_ttl: Duration,
) -> Result<Option<ClaimedJob>, BusError>;
}
#[cfg(test)]
mod tests {
use super::*;
#[allow(dead_code)]
fn _assert_dyn_pubsub(_: &dyn Pubsub) {}
#[allow(dead_code)]
fn _assert_dyn_request(_: &dyn RequestReply) {}
#[allow(dead_code)]
fn _assert_dyn_kv(_: &dyn KvStore) {}
#[allow(dead_code)]
fn _assert_dyn_jobs(_: &dyn JobQueue) {}
}