use std::{
error::Error as StdError,
fmt::{Display, Formatter},
marker::PhantomData,
time::Instant,
};
use tokio::sync::broadcast::Receiver as BroadcastReceiver;
use crate::{AwaitableTask, PublishActivationStrategy, TaskDefinition};
#[cfg(feature = "in_memory")]
pub mod in_memory;
#[cfg(feature = "postgres")]
pub mod postgres;
#[cfg(feature = "sqlite")]
pub mod sqlite;
pub trait Backend: Clone + Send + Sync {
fn subscribe<T>(
&self,
) -> impl Future<Output = Result<BackendSignalSubscription<T>, SubscribeError>> + Send
where
T: TaskDefinition;
fn sweep<T>(&self) -> impl Future<Output = Result<Vec<SweptTask>, SweepTasksError>> + Send
where
T: TaskDefinition;
fn publish<T>(
&self,
payload: <<T as TaskDefinition>::Trigger as PublishActivationStrategy>::Payload,
) -> impl Future<Output = Result<PublishedTask, PublishTaskError>> + Send
where
T: TaskDefinition,
T::Trigger: PublishActivationStrategy;
fn publish_awaitable<T>(
&self,
payload: <<T as TaskDefinition>::Trigger as PublishActivationStrategy>::Payload,
) -> impl Future<Output = Result<AwaitableTask<T::Callback>, PublishTaskError>> + Send
where
T: TaskDefinition,
T::Trigger: PublishActivationStrategy;
fn claim_published<T>(
&self,
worker_id: u64,
task_id: u64,
lease_expiration: Instant,
) -> impl Future<
Output = Result<
ClaimedTask<<<T as TaskDefinition>::Trigger as PublishActivationStrategy>::Payload>,
ClaimTaskError,
>,
> + Send
where
T: TaskDefinition,
T::Trigger: PublishActivationStrategy;
fn claim_singleton<T>(
&self,
worker_id: u64,
lease_expiration: Instant,
) -> impl Future<Output = Result<ClaimedTask<()>, ClaimTaskError>> + Send
where
T: TaskDefinition;
fn renew(
&self,
worker_id: u64,
task_id: u64,
lease_expiration: Instant,
) -> impl Future<Output = Result<RenewedTaskLease, RenewTaskError>> + Send;
fn finish<T>(
&self,
worker_id: u64,
task_id: u64,
callback_payload: T::Callback,
) -> impl Future<Output = Result<FinishedTask, FinishTaskError>> + Send
where
T: TaskDefinition;
}
pub type BoxBackendError = Box<dyn StdError + Send + Sync + 'static>;
#[derive(Debug)]
pub enum SubscribeError {
Backend(BoxBackendError),
}
impl Display for SubscribeError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Backend(error) => write!(f, "backend subscribe failed: {error}"),
}
}
}
impl StdError for SubscribeError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self {
Self::Backend(error) => Some(error.as_ref()),
}
}
}
#[derive(Debug)]
pub enum SweepTasksError {
Backend(BoxBackendError),
}
impl Display for SweepTasksError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Backend(error) => write!(f, "backend sweep failed: {error}"),
}
}
}
impl StdError for SweepTasksError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self {
Self::Backend(error) => Some(error.as_ref()),
}
}
}
pub struct BackendSignalSubscription<T> {
rx: BroadcastReceiver<BackendSignal>,
_task: PhantomData<fn() -> T>,
}
impl<T> BackendSignalSubscription<T>
where
T: TaskDefinition,
{
pub async fn recv(
&mut self,
) -> Result<NewTaskAvailableSignalPayload, tokio::sync::broadcast::error::RecvError> {
match self.rx.recv().await? {
BackendSignal::NewTaskAvailable(signal) => Ok(signal),
}
}
}
impl<T> BackendSignalSubscription<T> {
pub(crate) fn new(rx: BroadcastReceiver<BackendSignal>) -> Self {
Self {
rx,
_task: PhantomData,
}
}
}
#[derive(Debug, Clone)]
pub enum BackendSignal {
NewTaskAvailable(NewTaskAvailableSignalPayload),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct NewTaskAvailableSignalPayload {
pub task_id: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SweptTask {
pub task_id: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PublishedTask {
pub task_id: u64,
}
#[derive(Debug)]
pub enum PublishTaskError {
Backend(BoxBackendError),
}
impl Display for PublishTaskError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Backend(error) => write!(f, "backend publish failed: {error}"),
}
}
}
impl StdError for PublishTaskError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self {
Self::Backend(error) => Some(error.as_ref()),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ClaimedTask<T> {
pub task_id: u64,
pub task_payload: T,
pub lease_expiration: Instant,
}
#[derive(Debug)]
pub enum ClaimTaskError {
Backend(BoxBackendError),
TaskLeased { expiration: Instant },
TaskNotFound,
}
impl Display for ClaimTaskError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Backend(error) => write!(f, "backend claim failed: {error}"),
Self::TaskLeased { expiration } => {
write!(f, "task is currently leased until {expiration:?}")
}
Self::TaskNotFound => f.write_str("task not found"),
}
}
}
impl StdError for ClaimTaskError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self {
Self::Backend(error) => Some(error.as_ref()),
Self::TaskLeased { .. } | Self::TaskNotFound => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RenewedTaskLease {
pub new_expiration: Instant,
}
#[derive(Debug)]
pub enum RenewTaskError {
Backend(BoxBackendError),
LeaseLost,
}
impl Display for RenewTaskError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Backend(error) => write!(f, "backend lease renewal failed: {error}"),
Self::LeaseLost => f.write_str("task lease lost"),
}
}
}
impl StdError for RenewTaskError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self {
Self::Backend(error) => Some(error.as_ref()),
Self::LeaseLost => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FinishedTask {
pub task_id: u64,
}
#[derive(Debug)]
pub enum FinishTaskError {
Backend(BoxBackendError),
LeaseLost,
}
impl Display for FinishTaskError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Backend(error) => write!(f, "backend finish failed: {error}"),
Self::LeaseLost => f.write_str("task lease lost"),
}
}
}
impl StdError for FinishTaskError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self {
Self::Backend(error) => Some(error.as_ref()),
Self::LeaseLost => None,
}
}
}