1use futures_core::stream::BoxStream;
91use futures_sink::Sink;
92use futures_util::SinkExt;
93use futures_util::{Stream, StreamExt};
94use std::pin::Pin;
95use std::sync::Arc;
96use std::task::{Context, Poll};
97use std::{fmt, marker::PhantomData};
98use thiserror::Error;
99use tower_layer::Identity;
100
101use crate::backend::TaskStream;
102use crate::error::BoxDynError;
103use crate::features_table;
104use crate::{backend::Backend, task::Task, worker::context::WorkerContext};
105
106type Fetcher<DB, Config, Fetch> =
107 Arc<Box<dyn Fn(&mut DB, &Config, &WorkerContext) -> Fetch + Send + Sync>>;
108
109type Sinker<DB, Config, Sink> = Arc<Box<dyn Fn(&mut DB, &Config) -> Sink + Send + Sync>>;
110
111#[doc = features_table! {
126 setup = "{ unreachable!() }",
127 TaskSink => supported("Ability to push new tasks", false),
128 Serialization => supported("Serialization support for arguments", false),
129 FetchById => not_supported("Allow fetching a task by its ID"),
130 RegisterWorker => not_implemented("Allow registering a worker with the backend"),
131 PipeExt => limited("Allow other backends to pipe to this backend", false), MakeShared => not_implemented("Share the same [`CustomBackend`] across multiple workers", false),
133 Workflow => not_implemented("Flexible enough to support workflows"),
134 WaitForCompletion => not_implemented("Wait for tasks to complete without blocking"), ResumeById => not_supported("Resume a task by its ID"),
136 ResumeAbandoned => not_supported("Resume abandoned tasks"),
137 ListWorkers => not_implemented("List all workers registered with the backend"),
138 ListTasks => not_implemented("List all tasks in the backend"),
139}]
140#[pin_project::pin_project]
141#[must_use = "Custom backends must be polled or used as a sink"]
142pub struct CustomBackend<Args, DB, Fetch, Sink, IdType, Config = ()> {
143 _marker: PhantomData<(Args, IdType)>,
144 db: DB,
145 fetcher: Fetcher<DB, Config, Fetch>,
146 sinker: Sinker<DB, Config, Sink>,
147 #[pin]
148 current_sink: Sink,
149 config: Config,
150}
151
152impl<Args, DB, Fetch, Sink, IdType, Config> Clone
153 for CustomBackend<Args, DB, Fetch, Sink, IdType, Config>
154where
155 DB: Clone,
156 Config: Clone,
157{
158 fn clone(&self) -> Self {
159 let mut db = self.db.clone();
160 let current_sink = (self.sinker)(&mut db, &self.config);
161 Self {
162 _marker: PhantomData,
163 db,
164 fetcher: Arc::clone(&self.fetcher),
165 sinker: Arc::clone(&self.sinker),
166 current_sink,
167 config: self.config.clone(),
168 }
169 }
170}
171
172impl<Args, DB, Fetch, Sink, IdType, Config> fmt::Debug
173 for CustomBackend<Args, DB, Fetch, Sink, IdType, Config>
174where
175 DB: fmt::Debug,
176 Config: fmt::Debug,
177{
178 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179 f.debug_struct("CustomBackend")
180 .field(
181 "_marker",
182 &format_args!(
183 "PhantomData<({}, {})>",
184 std::any::type_name::<Args>(),
185 std::any::type_name::<IdType>()
186 ),
187 )
188 .field("db", &self.db)
189 .field("fetcher", &"Fn(&mut DB, &Config, &WorkerContext) -> Fetch")
190 .field("sink", &"Fn(&mut DB, &Config) -> Sink")
191 .field("config", &self.config)
192 .finish()
193 }
194}
195
196type FetcherBuilder<DB, Config, Fetch> =
197 Box<dyn Fn(&mut DB, &Config, &WorkerContext) -> Fetch + Send + Sync + 'static>;
198
199type SinkerBuilder<DB, Config, Sink> =
200 Box<dyn Fn(&mut DB, &Config) -> Sink + Send + Sync + 'static>;
201
202pub struct BackendBuilder<Args, DB, Fetch, Sink, IdType, Config = ()> {
206 _marker: PhantomData<(Args, IdType)>,
207 database: Option<DB>,
208 fetcher: Option<FetcherBuilder<DB, Config, Fetch>>,
209 sink: Option<SinkerBuilder<DB, Config, Sink>>,
210 config: Option<Config>,
211}
212
213impl<Args, DB, Fetch, Sink, IdType, Config> fmt::Debug
214 for BackendBuilder<Args, DB, Fetch, Sink, IdType, Config>
215where
216 DB: fmt::Debug,
217 Config: fmt::Debug,
218{
219 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220 f.debug_struct("BackendBuilder")
221 .field(
222 "_marker",
223 &format_args!(
224 "PhantomData<({}, {})>",
225 std::any::type_name::<Args>(),
226 std::any::type_name::<IdType>()
227 ),
228 )
229 .field("database", &self.database)
230 .field("fetcher", &self.fetcher.as_ref().map(|_| "Some(fn)"))
231 .field("sink", &self.sink.as_ref().map(|_| "Some(fn)"))
232 .field("config", &self.config)
233 .finish()
234 }
235}
236
237impl<Args, DB, Fetch, Sink, IdType, Config> Default
238 for BackendBuilder<Args, DB, Fetch, Sink, IdType, Config>
239{
240 fn default() -> Self {
241 Self {
242 _marker: PhantomData,
243 database: None,
244 fetcher: None,
245 sink: None,
246 config: None,
247 }
248 }
249}
250
251impl<Args, DB, Fetch, Sink, IdType> BackendBuilder<Args, DB, Fetch, Sink, IdType, ()> {
252 #[must_use]
254 pub fn new() -> Self {
255 Self::new_with_cfg(())
256 }
257
258 pub fn new_with_cfg<Config>(
260 config: Config,
261 ) -> BackendBuilder<Args, DB, Fetch, Sink, IdType, Config> {
262 BackendBuilder {
263 config: Some(config),
264 ..Default::default()
265 }
266 }
267}
268
269impl<Args, DB, Fetch, Sink, IdType, Config> BackendBuilder<Args, DB, Fetch, Sink, IdType, Config> {
270 #[must_use]
272 pub fn database(mut self, db: DB) -> Self {
273 self.database = Some(db);
274 self
275 }
276
277 #[must_use]
279 pub fn fetcher<F: Fn(&mut DB, &Config, &WorkerContext) -> Fetch + Send + Sync + 'static>(
280 mut self,
281 fetcher: F,
282 ) -> Self {
283 self.fetcher = Some(Box::new(fetcher));
284 self
285 }
286
287 #[must_use]
289 pub fn sink<F: Fn(&mut DB, &Config) -> Sink + Send + Sync + 'static>(
290 mut self,
291 sink: F,
292 ) -> Self {
293 self.sink = Some(Box::new(sink));
294 self
295 }
296
297 #[allow(clippy::type_complexity)]
298 pub fn build(self) -> Result<CustomBackend<Args, DB, Fetch, Sink, IdType, Config>, BuildError> {
300 let mut db = self.database.ok_or(BuildError::MissingPool)?;
301 let config = self.config.ok_or(BuildError::MissingConfig)?;
302 let sink_fn = self.sink.ok_or(BuildError::MissingSink)?;
303 let sink = sink_fn(&mut db, &config);
304
305 Ok(CustomBackend {
306 _marker: PhantomData,
307 db,
308 fetcher: self
309 .fetcher
310 .map(Arc::new)
311 .ok_or(BuildError::MissingFetcher)?,
312 current_sink: sink,
313 sinker: Arc::new(sink_fn),
314 config,
315 })
316 }
317}
318
319#[derive(Debug, Error)]
321pub enum BuildError {
322 #[error("Database db is required")]
324 MissingPool,
325 #[error("Fetcher is required")]
327 MissingFetcher,
328 #[error("Sink is required")]
330 MissingSink,
331 #[error("Config is required")]
333 MissingConfig,
334}
335
336impl<Args, DB, Fetch, Sink, IdType: Clone, E, Ctx: Default, Config> Backend
337 for CustomBackend<Args, DB, Fetch, Sink, IdType, Config>
338where
339 Fetch: Stream<Item = Result<Option<Task<Args, Ctx, IdType>>, E>> + Send + 'static,
340 E: Into<BoxDynError>,
341{
342 type Args = Args;
343 type IdType = IdType;
344
345 type Context = Ctx;
346
347 type Error = BoxDynError;
348
349 type Stream = TaskStream<Task<Args, Ctx, IdType>, BoxDynError>;
350
351 type Beat = BoxStream<'static, Result<(), Self::Error>>;
352
353 type Layer = Identity;
354
355 fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
356 futures_util::stream::once(async { Ok(()) }).boxed()
357 }
358
359 fn middleware(&self) -> Self::Layer {
360 Identity::new()
361 }
362
363 fn poll(mut self, worker: &WorkerContext) -> Self::Stream {
364 (self.fetcher)(&mut self.db, &self.config, worker)
365 .map(|task| match task {
366 Ok(Some(t)) => Ok(Some(t)),
367 Ok(None) => Ok(None),
368 Err(e) => Err(e.into()),
369 })
370 .boxed()
371 }
372}
373
374impl<Args, Ctx, IdType, DB, Fetch, S, Config> Sink<Task<Args, Ctx, IdType>>
375 for CustomBackend<Args, DB, Fetch, S, IdType, Config>
376where
377 S: Sink<Task<Args, Ctx, IdType>>,
378{
379 type Error = S::Error;
380
381 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
382 self.project().current_sink.poll_ready_unpin(cx)
383 }
384
385 fn start_send(self: Pin<&mut Self>, item: Task<Args, Ctx, IdType>) -> Result<(), Self::Error> {
386 self.project().current_sink.start_send_unpin(item)
387 }
388
389 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
390 self.project().current_sink.poll_flush_unpin(cx)
391 }
392
393 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
394 self.project().current_sink.poll_close_unpin(cx)
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use std::{collections::VecDeque, time::Duration};
401
402 use futures_util::{FutureExt, lock::Mutex, sink, stream};
403
404 use crate::{
405 error::BoxDynError,
406 worker::{builder::WorkerBuilder, ext::event_listener::EventListenerExt},
407 };
408
409 use super::*;
410
411 const ITEMS: u32 = 10;
412
413 #[tokio::test]
414 async fn basic_custom_backend() {
415 let memory: Arc<Mutex<VecDeque<Task<u32, ()>>>> = Arc::new(Mutex::new(VecDeque::new()));
416
417 let mut backend = BackendBuilder::new()
418 .database(memory)
419 .fetcher(|db, _, _| {
420 stream::unfold(db.clone(), |p| async move {
421 tokio::time::sleep(Duration::from_millis(100)).await; let mut db = p.lock().await;
423 let item = db.pop_front();
424 drop(db);
425 match item {
426 Some(item) => Some((Ok::<_, BoxDynError>(Some(item)), p)),
427 None => Some((Ok::<_, BoxDynError>(None), p)),
428 }
429 })
430 .boxed()
431 })
432 .sink(|db, _| {
433 sink::unfold(db.clone(), move |p, item| {
434 async move {
435 let mut db = p.lock().await;
436 db.push_back(item);
437 drop(db);
438 Ok::<_, BoxDynError>(p)
439 }
440 .boxed()
441 })
442 })
443 .build()
444 .unwrap();
445
446 for i in 0..ITEMS {
447 backend.send(Task::new(i)).await.unwrap();
448 }
449
450 async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
451 tokio::time::sleep(Duration::from_secs(1)).await;
452 if task == ITEMS - 1 {
453 ctx.stop().unwrap();
454 return Err("Worker stopped!")?;
455 }
456 Ok(())
457 }
458
459 let worker = WorkerBuilder::new("rango-tango")
460 .backend(backend)
461 .on_event(|ctx, ev| {
462 println!("On Event = {ev:?} from {}", ctx.name());
463 })
464 .build(task);
465 worker.run().await.unwrap();
466 }
467}