apalis_core/
backend.rs

1use std::{any::type_name, future::Future};
2
3use futures::Stream;
4use serde::{Deserialize, Serialize};
5
6use crate::{
7    codec::Codec,
8    poller::Poller,
9    request::State,
10    worker::{Context, Worker},
11};
12
13/// A backend represents a task source
14/// Both [`Storage`] and [`MessageQueue`] need to implement it for workers to be able to consume tasks
15///
16/// [`Storage`]: crate::storage::Storage
17/// [`MessageQueue`]: crate::mq::MessageQueue
18pub trait Backend<Req> {
19    /// The stream to be produced by the backend
20    type Stream: Stream<Item = Result<Option<Req>, crate::error::Error>>;
21
22    /// Returns the final decoration of layers
23    type Layer;
24
25    /// Specifies the codec type used by the backend
26    type Codec: Codec;
27
28    /// Returns a poller that is ready for streaming
29    fn poll(self, worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer>;
30}
31
32/// Represents functionality that allows reading of jobs and stats from a backend
33/// Some backends esp MessageQueues may not currently implement this
34pub trait BackendExpose<T>
35where
36    Self: Sized,
37{
38    /// The request type being handled by the backend
39    type Request;
40    /// The error returned during reading jobs and stats
41    type Error;
42    /// List all Workers that are working on a backend
43    fn list_workers(
44        &self,
45    ) -> impl Future<Output = Result<Vec<Worker<WorkerState>>, Self::Error>> + Send;
46
47    /// Returns the counts of jobs in different states
48    fn stats(&self) -> impl Future<Output = Result<Stat, Self::Error>> + Send;
49
50    /// Fetch jobs persisted in a backend
51    fn list_jobs(
52        &self,
53        status: &State,
54        page: i32,
55    ) -> impl Future<Output = Result<Vec<Self::Request>, Self::Error>> + Send;
56}
57
58/// Represents the current statistics of a backend
59#[derive(Debug, Deserialize, Serialize, Default)]
60pub struct Stat {
61    /// Represents pending tasks
62    pub pending: usize,
63    /// Represents running tasks
64    pub running: usize,
65    /// Represents dead tasks
66    pub dead: usize,
67    /// Represents failed tasks
68    pub failed: usize,
69    /// Represents successful tasks
70    pub success: usize,
71}
72
73/// A serializable version of a worker's state.
74#[derive(Debug, Serialize, Deserialize)]
75pub struct WorkerState {
76    /// Type of task being consumed by the worker, useful for display and filtering
77    pub r#type: String,
78    /// The type of job stream
79    pub source: String,
80    // TODO: // The layers that were loaded for worker.
81    // TODO: // pub layers: Vec<Layer>,
82    // TODO: // last_seen: Timestamp,
83}
84impl WorkerState {
85    /// Build a new state
86    pub fn new<S>(r#type: String) -> Self {
87        Self {
88            r#type,
89            source: type_name::<S>().to_string(),
90        }
91    }
92}