1use std::{future::Future, time::Duration};
22
23use futures_sink::Sink;
24use futures_util::{
25 stream::{self, BoxStream},
26 Stream,
27};
28
29use crate::{
30 error::BoxDynError,
31 task::{status::Status, task_id::TaskId, Task},
32 worker::context::WorkerContext,
33};
34
35pub mod codec;
36pub mod custom;
37pub mod pipe;
38pub mod shared;
39
40mod impls;
41
42pub use impls::guide;
43
44pub mod memory {
46 pub use crate::backend::impls::memory::*;
47}
48
49#[cfg(feature = "json")]
51pub mod json {
52 pub use crate::backend::impls::json::*;
53}
54
55pub trait Backend<Args> {
59 type IdType: Clone;
61 type Context: Default;
63 type Error;
65 type Codec;
67
68 type Stream: Stream<Item = Result<Option<Task<Args, Self::Context, Self::IdType>>, Self::Error>>;
70 type Beat: Stream<Item = Result<(), Self::Error>>;
72 type Layer;
74
75 fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat;
77 fn middleware(&self) -> Self::Layer;
79 fn poll(self, worker: &WorkerContext) -> Self::Stream;
81}
82
83pub type TaskStream<T, E = BoxDynError> = BoxStream<'static, Result<Option<T>, E>>;
85
86pub trait TaskSink<Args>: Backend<Args> {
88 fn push(&mut self, task: Args) -> impl Future<Output = Result<(), Self::Error>> + Send;
90
91 fn push_bulk(
93 &mut self,
94 tasks: Vec<Args>,
95 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
96
97 fn push_stream(
99 &mut self,
100 tasks: impl Stream<Item = Args> + Unpin + Send,
101 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
102
103 fn push_task(
105 &mut self,
106 task: Task<Args, Self::Context, Self::IdType>,
107 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
108}
109
110impl<Args, S, E> TaskSink<Args> for S
111where
112 S: Sink<Task<Args, Self::Context, Self::IdType>, Error = E>
113 + Unpin
114 + Backend<Args, Error = E>
115 + Send,
116 Args: Send,
117 S::Context: Send + Default,
118 S::IdType: Send + 'static,
119 E: Send,
120{
121 async fn push(&mut self, task: Args) -> Result<(), Self::Error> {
122 use futures_util::SinkExt;
123 self.send(Task::new(task)).await
124 }
125
126 async fn push_bulk(&mut self, tasks: Vec<Args>) -> Result<(), Self::Error> {
127 use futures_util::SinkExt;
128 self.send_all(&mut stream::iter(
129 tasks
130 .into_iter()
131 .map(Task::new)
132 .map(Result::Ok)
133 .collect::<Vec<_>>(),
134 ))
135 .await
136 }
137
138 async fn push_stream(
139 &mut self,
140 tasks: impl Stream<Item = Args> + Unpin + Send,
141 ) -> Result<(), Self::Error> {
142 use futures_util::SinkExt;
143 use futures_util::StreamExt;
144 self.send_all(&mut tasks.map(Task::new).map(Result::Ok))
145 .await
146 }
147
148 async fn push_task(
149 &mut self,
150 task: Task<Args, Self::Context, Self::IdType>,
151 ) -> Result<(), Self::Error> {
152 use futures_util::SinkExt;
153 self.send(task).await
154 }
155}
156
157pub trait FetchById<Args>: Backend<Args> {
159 fn fetch_by_id(
161 &mut self,
162 task_id: &TaskId<Self::IdType>,
163 ) -> impl Future<Output = Result<Option<Task<Args, Self::Context>>, Self::Error>> + Send;
164}
165
166pub trait Update<Args>: Backend<Args> {
168 fn update(
170 &mut self,
171 task: Task<Args, Self::Context, Self::IdType>,
172 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
173}
174
175pub trait Reschedule<Args>: Backend<Args> {
177 fn reschedule(
179 &mut self,
180 task: Task<Args, Self::Context, Self::IdType>,
181 wait: Duration,
182 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
183}
184
185pub trait Vacuum {
187 type Error;
189 fn vacuum(&mut self) -> impl Future<Output = Result<usize, Self::Error>> + Send;
191}
192
193pub trait ResumeById<Args>: Backend<Args> {
195 fn resume_by_id(
197 &mut self,
198 id: TaskId<Self::IdType>,
199 ) -> impl Future<Output = Result<bool, Self::Error>> + Send;
200}
201
202pub trait ResumeAbandoned<Args>: Backend<Args> {
204 fn resume_abandoned(&mut self) -> impl Future<Output = Result<usize, Self::Error>> + Send;
206}
207
208pub trait RegisterWorker<Args>: Backend<Args> {
210 fn register_worker(
212 &mut self,
213 worker_id: String,
214 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
215}
216
217pub trait Metric<Output> {
219 type Error;
221 fn metric(&mut self) -> impl Future<Output = Result<Output, Self::Error>> + Send;
223}
224
225pub trait ListWorkers<Args>: Backend<Args> {
227 type Worker;
229 fn list_workers(&self) -> impl Future<Output = Result<Vec<Self::Worker>, Self::Error>> + Send;
231}
232
233pub trait ListTasks<Args>: Backend<Args> {
235 type Filter;
237
238 fn list_tasks(
240 &self,
241 filter: &Self::Filter,
242 ) -> impl Future<Output = Result<Vec<Task<Args, Self::Context, Self::IdType>>, Self::Error>> + Send;
243}
244
245#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
247#[derive(Debug, Clone)]
248pub struct TaskResult<T> {
249 task_id: TaskId,
250 status: Status,
251 result: Result<T, String>,
252}
253
254impl<T> TaskResult<T> {
255 pub fn new(task_id: TaskId, status: Status, result: Result<T, String>) -> Self {
257 Self {
258 task_id,
259 status,
260 result,
261 }
262 }
263 pub fn task_id(&self) -> &TaskId {
265 &self.task_id
266 }
267
268 pub fn status(&self) -> &Status {
270 &self.status
271 }
272
273 pub fn result(&self) -> &Result<T, String> {
275 &self.result
276 }
277
278 pub fn take(self) -> Result<T, String> {
280 self.result
281 }
282}
283
284pub trait WaitForCompletion<T, Args>: Backend<Args> {
286 type ResultStream: Stream<Item = Result<TaskResult<T>, Self::Error>> + Send + 'static;
288
289 fn wait_for(
291 &self,
292 task_ids: impl IntoIterator<Item = TaskId<Self::IdType>>,
293 ) -> Self::ResultStream;
294
295 fn wait_for_single(&self, task_id: TaskId<Self::IdType>) -> Self::ResultStream {
297 self.wait_for(std::iter::once(task_id))
298 }
299
300 fn check_status(
302 &self,
303 task_ids: impl IntoIterator<Item = TaskId<Self::IdType>> + Send,
304 ) -> impl Future<Output = Result<Vec<TaskResult<T>>, Self::Error>> + Send;
305}