use std::{marker::PhantomData, time::Instant};
use serde::{Serialize, de::DeserializeOwned};
use tokio::sync::oneshot;
pub mod backends;
pub use backends::Backend;
use backends::ClaimedTask;
pub mod dispatcher;
pub(crate) mod runtime;
pub trait TaskDefinition: Send {
const NAME: &'static str;
type Callback: Serialize + DeserializeOwned + Send + Sync + 'static;
type Trigger: ActivationStrategy;
}
pub trait ActivationStrategy: private::Sealed {
#[doc(hidden)]
const KIND: ActivationStrategyKind;
#[doc(hidden)]
type DispatchToken: Send;
#[doc(hidden)]
type EffectivePayload: Send + Sync;
#[doc(hidden)]
fn claim_task<B, T>(
backend: &B,
worker_id: u64,
dispatch_token: Self::DispatchToken,
lease_expiration: Instant,
) -> impl Future<Output = Result<ClaimedTask<Self::EffectivePayload>, backends::ClaimTaskError>> + Send
where
B: Backend,
T: TaskDefinition<Trigger = Self>;
}
#[doc(hidden)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ActivationStrategyKind {
Publish,
Singleton,
}
pub trait PublishActivationStrategy: ActivationStrategy {
type Payload: Serialize + DeserializeOwned + Send + Sync;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TaskSuccess<T> {
pub callback_payload: T,
pub available_from: Option<Instant>,
}
impl<T> TaskSuccess<T> {
pub fn done(callback_payload: T) -> Self {
Self {
callback_payload,
available_from: None,
}
}
pub fn schedule_next_run(callback_payload: T, available_from: Instant) -> Self {
Self {
callback_payload,
available_from: Some(available_from),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TaskFailure {
pub available_from: Option<Instant>,
}
impl TaskFailure {
pub fn retry_immediately() -> Self {
Self {
available_from: None,
}
}
pub fn retry_at(available_from: Instant) -> Self {
Self {
available_from: Some(available_from),
}
}
}
pub type TaskResult<T> = Result<TaskSuccess<T>, TaskFailure>;
pub trait Worker: Send {
type Task: TaskDefinition;
fn process(
self,
task_id: u64,
task_payload: <<Self::Task as TaskDefinition>::Trigger as ActivationStrategy>::EffectivePayload,
) -> impl Future<Output = TaskResult<<Self::Task as TaskDefinition>::Callback>> + Send;
}
pub trait WorkerFactory: Send + Sync {
type Worker: Worker;
fn build(&self, worker_id: u64) -> Self::Worker;
}
#[derive(Debug)]
pub struct AwaitableTask<T> {
task_id: u64,
callback_rx: oneshot::Receiver<T>,
}
impl<T> AwaitableTask<T> {
pub(crate) fn new(task_id: u64, callback_rx: oneshot::Receiver<T>) -> Self {
Self {
task_id,
callback_rx,
}
}
pub fn task_id(&self) -> u64 {
self.task_id
}
pub async fn wait(self) -> Result<T, AwaitTaskError> {
self.callback_rx
.await
.map_err(|_| AwaitTaskError::CallbackLost)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AwaitTaskError {
CallbackLost,
}
impl std::fmt::Display for AwaitTaskError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CallbackLost => {
f.write_str("task finished without an observable callback notification")
}
}
}
}
impl std::error::Error for AwaitTaskError {}
pub struct PublishTrigger<Payload>
where
Payload: Serialize + DeserializeOwned + Send + Sync,
{
payload_type: PhantomData<Payload>,
}
#[doc(hidden)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PublishDispatchToken {
Task(u64),
EarliestAvailable,
}
impl<Payload> private::Sealed for PublishTrigger<Payload> where
Payload: Serialize + DeserializeOwned + Send + Sync
{
}
impl<Payload> ActivationStrategy for PublishTrigger<Payload>
where
Payload: Serialize + DeserializeOwned + Send + Sync,
{
const KIND: ActivationStrategyKind = ActivationStrategyKind::Publish;
type DispatchToken = PublishDispatchToken;
type EffectivePayload = Payload;
async fn claim_task<B, T>(
backend: &B,
worker_id: u64,
dispatch_token: Self::DispatchToken,
lease_expiration: Instant,
) -> Result<ClaimedTask<Self::EffectivePayload>, backends::ClaimTaskError>
where
B: Backend,
T: TaskDefinition<Trigger = Self>,
{
match dispatch_token {
PublishDispatchToken::Task(task_id) => {
backend
.claim_published::<T>(worker_id, task_id, lease_expiration)
.await
}
PublishDispatchToken::EarliestAvailable => {
backend
.claim_earliest_published::<T>(worker_id, lease_expiration)
.await
}
}
}
}
impl<Payload> PublishActivationStrategy for PublishTrigger<Payload>
where
Payload: Serialize + DeserializeOwned + Send + Sync,
{
type Payload = Payload;
}
pub struct SingletonTrigger;
impl private::Sealed for SingletonTrigger {}
impl ActivationStrategy for SingletonTrigger {
const KIND: ActivationStrategyKind = ActivationStrategyKind::Singleton;
type DispatchToken = ();
type EffectivePayload = ();
fn claim_task<B, T>(
backend: &B,
worker_id: u64,
_dispatch_token: Self::DispatchToken,
lease_expiration: Instant,
) -> impl Future<Output = Result<ClaimedTask<Self::EffectivePayload>, backends::ClaimTaskError>> + Send
where
B: Backend,
T: TaskDefinition<Trigger = Self>,
{
backend.claim_singleton::<T>(worker_id, lease_expiration)
}
}
mod private {
pub trait Sealed {}
}