apalis_core/backend.rs
1use std::{any::type_name, future::Future};
2
3use futures::Stream;
4use serde::{Deserialize, Serialize};
5
6use crate::{
7 codec::Codec,
8 poller::Poller,
9 request::State,
10 worker::{Context, Worker},
11};
12
13/// A backend represents a task source
14/// Both [`Storage`] and [`MessageQueue`] need to implement it for workers to be able to consume tasks
15///
16/// [`Storage`]: crate::storage::Storage
17/// [`MessageQueue`]: crate::mq::MessageQueue
18pub trait Backend<Req> {
19 /// The stream to be produced by the backend
20 type Stream: Stream<Item = Result<Option<Req>, crate::error::Error>>;
21
22 /// Returns the final decoration of layers
23 type Layer;
24
25 /// Specifies the codec type used by the backend
26 type Codec: Codec;
27
28 /// Returns a poller that is ready for streaming
29 fn poll(self, worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer>;
30}
31
32/// Represents functionality that allows reading of jobs and stats from a backend
33/// Some backends esp MessageQueues may not currently implement this
34pub trait BackendExpose<T>
35where
36 Self: Sized,
37{
38 /// The request type being handled by the backend
39 type Request;
40 /// The error returned during reading jobs and stats
41 type Error;
42 /// List all Workers that are working on a backend
43 fn list_workers(
44 &self,
45 ) -> impl Future<Output = Result<Vec<Worker<WorkerState>>, Self::Error>> + Send;
46
47 /// Returns the counts of jobs in different states
48 fn stats(&self) -> impl Future<Output = Result<Stat, Self::Error>> + Send;
49
50 /// Fetch jobs persisted in a backend
51 fn list_jobs(
52 &self,
53 status: &State,
54 page: i32,
55 ) -> impl Future<Output = Result<Vec<Self::Request>, Self::Error>> + Send;
56}
57
58/// Represents the current statistics of a backend
59#[derive(Debug, Deserialize, Serialize, Default)]
60pub struct Stat {
61 /// Represents pending tasks
62 pub pending: usize,
63 /// Represents running tasks
64 pub running: usize,
65 /// Represents dead tasks
66 pub dead: usize,
67 /// Represents failed tasks
68 pub failed: usize,
69 /// Represents successful tasks
70 pub success: usize,
71}
72
73/// A serializable version of a worker's state.
74#[derive(Debug, Serialize, Deserialize)]
75pub struct WorkerState {
76 /// Type of task being consumed by the worker, useful for display and filtering
77 pub r#type: String,
78 /// The type of job stream
79 pub source: String,
80 // TODO: // The layers that were loaded for worker.
81 // TODO: // pub layers: Vec<Layer>,
82 // TODO: // last_seen: Timestamp,
83}
84impl WorkerState {
85 /// Build a new state
86 pub fn new<S>(r#type: String) -> Self {
87 Self {
88 r#type,
89 source: type_name::<S>().to_string(),
90 }
91 }
92}