apalis_core/backend/
expose.rs

1use crate::{
2    backend::{Backend, BackendExt, TaskSink},
3    task::{Task, status::Status},
4};
5
6const DEFAULT_PAGE_SIZE: u32 = 10;
7/// Allows exposing additional functionality from the backend
8pub trait Expose<Args> {}
9
10impl<B, Args> Expose<Args> for B where
11    B: Backend<Args = Args>
12        + Metrics
13        + ListWorkers
14        + ListQueues
15        + ListAllTasks
16        + ListTasks<Args>
17        + TaskSink<Args>
18{
19}
20
21/// Allows listing all queues available in the backend
22pub trait ListQueues: Backend {
23    /// List all available queues in the backend
24    fn list_queues(&self) -> impl Future<Output = Result<Vec<QueueInfo>, Self::Error>> + Send;
25}
26
27/// Allows listing all workers registered with the backend
28pub trait ListWorkers: Backend {
29    /// List all registered workers in the current queue
30    fn list_workers(
31        &self,
32        queue: &str,
33    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send;
34
35    /// List all registered workers in all queues
36    fn list_all_workers(
37        &self,
38    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send;
39}
40/// Allows listing tasks with optional filtering
41pub trait ListTasks<Args>: Backend {
42    /// List tasks matching the given filter in the current queue
43    #[allow(clippy::type_complexity)]
44    fn list_tasks(
45        &self,
46        queue: &str,
47        filter: &Filter,
48    ) -> impl Future<Output = Result<Vec<Task<Args, Self::Context, Self::IdType>>, Self::Error>> + Send;
49}
50
51/// Allows listing tasks across all queues with optional filtering
52pub trait ListAllTasks: BackendExt {
53    /// List tasks matching the given filter in all queues
54    #[allow(clippy::type_complexity)]
55    fn list_all_tasks(
56        &self,
57        filter: &Filter,
58    ) -> impl Future<
59        Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
60    > + Send;
61}
62
63/// Allows collecting metrics from the backend
64pub trait Metrics: Backend {
65    /// Collects and returns global statistics from the backend
66    fn global(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send;
67
68    /// Collects and returns statistics for a specific queue
69    fn fetch_by_queue(
70        &self,
71        queue: &str,
72    ) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send;
73}
74
75/// Represents information about a specific queue in the backend
76#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
77#[derive(Debug, Clone)]
78pub struct QueueInfo {
79    /// Name of the queue
80    pub name: String,
81    /// Statistics related to the queue
82    pub stats: Vec<Statistic>,
83    /// List of workers associated with the queue
84    pub workers: Vec<String>,
85    /// Last 7 days of activity in the queue
86    pub activity: Vec<usize>,
87}
88
89/// Represents a worker currently registered with the backend
90#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
91#[derive(Debug, Clone)]
92pub struct RunningWorker {
93    /// Unique identifier for the worker
94    pub id: String,
95    /// Queue the worker is processing tasks from
96    pub queue: String,
97    /// Backend of the worker
98    pub backend: String,
99    /// Timestamp when the worker was started
100    pub started_at: u64,
101    /// Timestamp of the last heartbeat received from the worker
102    pub last_heartbeat: u64,
103    /// Layers the worker is associated with
104    pub layers: String,
105}
106
107#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
108#[derive(Debug, Clone)]
109/// Filter criteria for listing tasks
110pub struct Filter {
111    /// Optional status to filter tasks by
112    #[cfg_attr(feature = "serde", serde(default))]
113    pub status: Option<Status>,
114    #[cfg_attr(feature = "serde", serde(default = "default_page"))]
115    /// Page number for pagination (default is 1)
116    pub page: u32,
117    /// Optional page size for pagination (default is 10)
118    #[cfg_attr(feature = "serde", serde(default))]
119    pub page_size: Option<u32>,
120}
121
122impl Filter {
123    /// Calculate the offset based on the current page and page size
124    #[must_use]
125    pub fn offset(&self) -> u32 {
126        (self.page - 1) * self.page_size.unwrap_or(DEFAULT_PAGE_SIZE)
127    }
128
129    /// Get the limit (page size) for the query
130    #[must_use]
131    pub fn limit(&self) -> u32 {
132        self.page_size.unwrap_or(DEFAULT_PAGE_SIZE)
133    }
134}
135
136#[cfg(feature = "serde")]
137fn default_page() -> u32 {
138    1
139}
140/// Represents an overview of the backend including queues, workers, and statistics
141#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
142#[derive(Debug, Clone)]
143pub struct Statistic {
144    /// Overall statistics of the backend
145    pub title: String,
146    /// The statistics type
147    pub stat_type: StatType,
148    /// The value of the statistic
149    pub value: String,
150    /// The priority of the statistic (lower number means higher priority)
151    pub priority: Option<u64>,
152}
153/// Statistics type
154#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
155#[derive(Debug, Clone)]
156pub enum StatType {
157    /// Timestamp statistic
158    Timestamp,
159    /// Numeric statistic
160    Number,
161    /// Decimal statistic
162    Decimal,
163    /// Percentage statistic
164    Percentage,
165}