apalis_core/backend/
expose.rs

1use crate::{
2    backend::{Backend, TaskSink},
3    task::{Task, status::Status},
4};
5
6const DEFAULT_PAGE_SIZE: u32 = 10;
7/// Allows exposing additional functionality from the backend
8pub trait Expose<Args> {}
9
10impl<B, Args> Expose<Args> for B where
11    B: Backend<Args = Args>
12        + Metrics
13        + ListWorkers
14        + ListQueues
15        + ListAllTasks
16        + ListTasks<Args>
17        + TaskSink<Args>
18{
19}
20
21/// Allows listing all queues available in the backend
22pub trait ListQueues: Backend {
23    /// List all available queues in the backend
24    fn list_queues(&self) -> impl Future<Output = Result<Vec<QueueInfo>, Self::Error>> + Send;
25}
26
27/// Allows listing all workers registered with the backend
28pub trait ListWorkers: Backend {
29    /// List all registered workers in the current queue
30    fn list_workers(
31        &self,
32        queue: &str,
33    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send;
34
35    /// List all registered workers in all queues
36    fn list_all_workers(
37        &self,
38    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send;
39}
40/// Allows listing tasks with optional filtering
41pub trait ListTasks<Args>: Backend {
42    /// List tasks matching the given filter in the current queue
43    fn list_tasks(
44        &self,
45        queue: &str,
46        filter: &Filter,
47    ) -> impl Future<Output = Result<Vec<Task<Args, Self::Context, Self::IdType>>, Self::Error>> + Send;
48}
49
50/// Allows listing tasks across all queues with optional filtering
51pub trait ListAllTasks: Backend {
52    /// List tasks matching the given filter in all queues
53    fn list_all_tasks(
54        &self,
55        filter: &Filter,
56    ) -> impl Future<
57        Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
58    > + Send;
59}
60
61/// Allows collecting metrics from the backend
62pub trait Metrics: Backend {
63    /// Collects and returns global statistics from the backend
64    fn global(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send;
65
66    /// Collects and returns statistics for a specific queue
67    fn fetch_by_queue(
68        &self,
69        queue: &str,
70    ) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send;
71}
72
73/// Represents information about a specific queue in the backend
74#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
75#[derive(Debug, Clone)]
76pub struct QueueInfo {
77    /// Name of the queue
78    pub name: String,
79    /// Statistics related to the queue
80    pub stats: Vec<Statistic>,
81    /// List of workers associated with the queue
82    pub workers: Vec<String>,
83    /// Last 7 days of activity in the queue
84    pub activity: Vec<usize>,
85}
86
87/// Represents a worker currently registered with the backend
88#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
89#[derive(Debug, Clone)]
90pub struct RunningWorker {
91    /// Unique identifier for the worker
92    pub id: String,
93    /// Queue the worker is processing tasks from
94    pub queue: String,
95    /// Backend of the worker
96    pub backend: String,
97    /// Timestamp when the worker was started
98    pub started_at: u64,
99    /// Timestamp of the last heartbeat received from the worker
100    pub last_heartbeat: u64,
101    /// Layers the worker is associated with
102    pub layers: String,
103}
104
105#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
106#[derive(Debug, Clone)]
107/// Filter criteria for listing tasks
108pub struct Filter {
109    /// Optional status to filter tasks by
110    #[cfg_attr(feature = "serde", serde(default))]
111    pub status: Option<Status>,
112    #[cfg_attr(feature = "serde", serde(default = "default_page"))]
113    /// Page number for pagination (default is 1)
114    pub page: u32,
115    /// Optional page size for pagination (default is 10)
116    #[cfg_attr(feature = "serde", serde(default))]
117    pub page_size: Option<u32>,
118}
119
120impl Filter {
121    /// Calculate the offset based on the current page and page size
122    pub fn offset(&self) -> u32 {
123        (self.page - 1) * self.page_size.unwrap_or(DEFAULT_PAGE_SIZE)
124    }
125
126    /// Get the limit (page size) for the query
127    pub fn limit(&self) -> u32 {
128        self.page_size.unwrap_or(DEFAULT_PAGE_SIZE)
129    }
130}
131
132#[cfg(feature = "serde")]
133fn default_page() -> u32 {
134    1
135}
136/// Represents an overview of the backend including queues, workers, and statistics
137#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
138#[derive(Debug, Clone)]
139pub struct Statistic {
140    /// Overall statistics of the backend
141    pub title: String,
142    /// The statistics type
143    pub stat_type: StatType,
144    /// The value of the statistic
145    pub value: String,
146    /// The priority of the statistic (lower number means higher priority)
147    pub priority: Option<u64>,
148}
149/// Statistics type
150#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
151#[derive(Debug, Clone)]
152pub enum StatType {
153    /// Timestamp statistic
154    Timestamp,
155    /// Numeric statistic
156    Number,
157    /// Decimal statistic
158    Decimal,
159    /// Percentage statistic
160    Percentage,
161}