1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
mod error;
#[cfg(feature = "worker")]
mod streams;
#[cfg(feature = "worker")]
mod worker;
use std::time::Duration;
use chrono::{DateTime, Utc};
use crate::{
job::JobStream,
job::{Job, JobStreamResult},
request::JobRequest,
};
#[cfg(feature = "storage")]
pub use self::error::StorageError;
#[cfg(feature = "worker")]
pub use worker::StorageWorker;
pub type StorageResult<I> = Result<I, StorageError>;
#[async_trait::async_trait]
pub trait Storage: Clone {
type Output: Job;
async fn push(&mut self, job: Self::Output) -> StorageResult<()>;
async fn schedule(&mut self, job: Self::Output, on: DateTime<Utc>) -> StorageResult<()>;
async fn len(&self) -> StorageResult<i64>;
async fn fetch_by_id(&self, job_id: String) -> StorageResult<Option<JobRequest<Self::Output>>>;
fn consume(&mut self, worker_id: String, interval: Duration) -> JobStreamResult<Self::Output>;
async fn ack(&mut self, worker_id: String, job_id: String) -> StorageResult<()>;
async fn retry(&mut self, worker_id: String, job_id: String) -> StorageResult<()>;
async fn keep_alive<Service>(&mut self, worker_id: String) -> StorageResult<()>;
async fn kill(&mut self, worker_id: String, job_id: String) -> StorageResult<()>;
async fn update_by_id(
&self,
job_id: String,
job: &JobRequest<Self::Output>,
) -> StorageResult<()>;
async fn heartbeat(&mut self, pulse: StorageWorkerPulse) -> StorageResult<bool>;
async fn reschedule(
&mut self,
job: &JobRequest<Self::Output>,
wait: Duration,
) -> StorageResult<()>;
async fn reenqueue_active(&mut self, _job_ids: Vec<String>) -> StorageResult<()> {
Ok(())
}
#[doc(hidden)]
async fn is_empty(&self) -> StorageResult<bool> {
unimplemented!()
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub enum StorageWorkerPulse {
EnqueueScheduled {
count: i32,
},
RenqueueOrpharned {
count: i32,
},
}
impl<S> JobStream for S
where
S: Storage,
{
type Job = S::Output;
fn stream(&mut self, worker_id: String, interval: Duration) -> JobStreamResult<S::Output> {
self.consume(worker_id, interval)
}
}