Skip to main content

apalis_core/backend/
expose.rs

1use crate::{
2    backend::{Backend, BackendExt, TaskSink},
3    task::{Task, status::Status},
4};
5
6const DEFAULT_PAGE_SIZE: u32 = 10;
7/// Allows exposing additional functionality from the backend
8pub trait Expose<Args> {}
9
10impl<B, Args> Expose<Args> for B where
11    B: Backend<Args = Args>
12        + Metrics
13        + ListWorkers
14        + ListQueues
15        + ListAllTasks
16        + ListTasks<Args>
17        + TaskSink<Args>
18{
19}
20
21/// Allows listing all queues available in the backend
22pub trait ListQueues: Backend {
23    /// List all available queues in the backend
24    fn list_queues(&self) -> impl Future<Output = Result<Vec<QueueInfo>, Self::Error>> + Send;
25}
26
27/// Allows listing all workers registered with the backend
28pub trait ListWorkers: Backend {
29    /// List all registered workers in the current queue
30    fn list_workers(&self) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send;
31
32    /// List all registered workers in all queues
33    fn list_all_workers(
34        &self,
35    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send;
36}
37/// Allows listing tasks with optional filtering
38pub trait ListTasks<Args>: Backend {
39    /// List tasks matching the given filter in the current queue
40    #[allow(clippy::type_complexity)]
41    fn list_tasks(
42        &self,
43        filter: &Filter,
44    ) -> impl Future<Output = Result<Vec<Task<Args, Self::Context, Self::IdType>>, Self::Error>> + Send;
45}
46
47/// Allows listing tasks across all queues with optional filtering
48pub trait ListAllTasks: BackendExt {
49    /// List tasks matching the given filter in all queues
50    #[allow(clippy::type_complexity)]
51    fn list_all_tasks(
52        &self,
53        filter: &Filter,
54    ) -> impl Future<
55        Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
56    > + Send;
57}
58
59/// Allows collecting metrics from the backend
60pub trait Metrics: Backend {
61    /// Collects and returns global statistics from the backend
62    fn global(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send;
63
64    /// Collects and returns statistics for a specific queue
65    fn fetch_by_queue(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send;
66}
67
68/// Represents information about a specific queue in the backend
69#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
70#[derive(Debug, Clone)]
71pub struct QueueInfo {
72    /// Name of the queue
73    pub name: String,
74    /// Statistics related to the queue
75    pub stats: Vec<Statistic>,
76    /// List of workers associated with the queue
77    pub workers: Vec<String>,
78    /// Last 7 days of activity in the queue
79    pub activity: Vec<usize>,
80}
81
82/// Represents a worker currently registered with the backend
83#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
84#[derive(Debug, Clone)]
85pub struct RunningWorker {
86    /// Unique identifier for the worker
87    pub id: String,
88    /// Queue the worker is processing tasks from
89    pub queue: String,
90    /// Backend of the worker
91    pub backend: String,
92    /// Timestamp when the worker was started
93    pub started_at: u64,
94    /// Timestamp of the last heartbeat received from the worker
95    pub last_heartbeat: u64,
96    /// Layers the worker is associated with
97    pub layers: String,
98}
99
100#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
101#[derive(Debug, Clone)]
102/// Filter criteria for listing tasks
103pub struct Filter {
104    /// Optional status to filter tasks by
105    #[cfg_attr(feature = "serde", serde(default))]
106    pub status: Option<Status>,
107    #[cfg_attr(feature = "serde", serde(default = "default_page"))]
108    /// Page number for pagination (default is 1)
109    pub page: u32,
110    /// Optional page size for pagination (default is 10)
111    #[cfg_attr(feature = "serde", serde(default))]
112    pub page_size: Option<u32>,
113}
114
115impl Filter {
116    /// Calculate the offset based on the current page and page size
117    #[must_use]
118    pub fn offset(&self) -> u32 {
119        (self.page - 1) * self.page_size.unwrap_or(DEFAULT_PAGE_SIZE)
120    }
121
122    /// Get the limit (page size) for the query
123    #[must_use]
124    pub fn limit(&self) -> u32 {
125        self.page_size.unwrap_or(DEFAULT_PAGE_SIZE)
126    }
127}
128
129#[cfg(feature = "serde")]
130fn default_page() -> u32 {
131    1
132}
133/// Represents an overview of the backend including queues, workers, and statistics
134#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
135#[derive(Debug, Clone)]
136pub struct Statistic {
137    /// Overall statistics of the backend
138    pub title: String,
139    /// The statistics type
140    pub stat_type: StatType,
141    /// The value of the statistic
142    pub value: String,
143    /// The priority of the statistic (lower number means higher priority)
144    pub priority: Option<u64>,
145}
146/// Statistics type
147#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
148#[derive(Debug, Clone)]
149pub enum StatType {
150    /// Timestamp statistic
151    Timestamp,
152    /// Numeric statistic
153    Number,
154    /// Decimal statistic
155    Decimal,
156    /// Percentage statistic
157    Percentage,
158}