mod beats;
pub mod builder;
mod error;
use std::time::Duration;
use crate::{
job::{Job, JobId, JobStreamResult},
layers::ack::{Ack, AckError},
request::JobRequest,
worker::WorkerId,
Timestamp,
};
#[cfg(feature = "storage")]
pub use self::error::StorageError;
pub type StorageResult<I> = Result<I, StorageError>;
#[async_trait::async_trait]
pub trait Storage: Clone {
type Output: Job;
async fn push(&mut self, job: Self::Output) -> StorageResult<JobId>;
async fn schedule(&mut self, job: Self::Output, on: Timestamp) -> StorageResult<JobId>;
async fn len(&self) -> StorageResult<i64>;
async fn fetch_by_id(&self, job_id: &JobId) -> StorageResult<Option<JobRequest<Self::Output>>>;
fn consume(
&mut self,
worker_id: &WorkerId,
interval: Duration,
buffer_size: usize,
) -> JobStreamResult<Self::Output>;
async fn ack(&mut self, worker_id: &WorkerId, job_id: &JobId) -> StorageResult<()>;
async fn retry(&mut self, worker_id: &WorkerId, job_id: &JobId) -> StorageResult<()>;
async fn keep_alive<Service>(&mut self, worker_id: &WorkerId) -> StorageResult<()>;
async fn kill(&mut self, worker_id: &WorkerId, job_id: &JobId) -> StorageResult<()>;
async fn update_by_id(
&self,
job_id: &JobId,
job: &JobRequest<Self::Output>,
) -> StorageResult<()>;
async fn heartbeat(&mut self, pulse: StorageWorkerPulse) -> StorageResult<bool>;
async fn reschedule(
&mut self,
job: &JobRequest<Self::Output>,
wait: Duration,
) -> StorageResult<()>;
async fn reenqueue_active(&mut self, _job_ids: Vec<&JobId>) -> StorageResult<()> {
Ok(())
}
#[doc(hidden)]
async fn is_empty(&self) -> StorageResult<bool> {
unimplemented!()
}
}
#[async_trait::async_trait]
impl<J, S> Ack<J> for S
where
S: Storage<Output = J> + Send + Sync,
J: Send + Sync,
{
type Acknowledger = JobId;
async fn ack(&self, worker_id: &WorkerId, job_id: &JobId) -> Result<(), AckError> {
let mut storage: S = self.clone();
Storage::ack(&mut storage, worker_id, job_id)
.await
.map_err(|e| AckError::NoAck(e.into()))
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub enum StorageWorkerPulse {
EnqueueScheduled {
count: i32,
},
ReenqueueOrphaned {
count: i32,
},
}