Skip to main content

apalis_diesel_postgres/
admin.rs

1//! Admin-facing trait implementations for `PostgresStorage`.
2//!
3//! These extend the storage with `BackendExpose`-style traits used by
4//! dashboards and tooling: lookup, listing, metrics, worker registration, and
5//! completion-waiting. SQL bodies live in `crate::queries::admin`; this file
6//! holds only the apalis trait wiring.
7
8use apalis_core::{
9    backend::{
10        BackendExt, FetchById, Filter, ListAllTasks, ListQueues, ListTasks, ListWorkers, Metrics,
11        QueueInfo, RegisterWorker, RunningWorker, Statistic, TaskResult, WaitForCompletion,
12        codec::Codec,
13    },
14    task::{Task, task_id::TaskId},
15};
16use futures::stream::BoxStream;
17use serde::de::DeserializeOwned;
18use ulid::Ulid;
19
20use crate::{CompactType, Error, PgContext, PgTask, PgTaskId, PostgresStorage, queries};
21
22impl<Args, D, F> FetchById<Args> for PostgresStorage<Args, D, F>
23where
24    PostgresStorage<Args, D, F>:
25        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
26    D: Codec<Args, Compact = CompactType>,
27    D::Error: std::error::Error + Send + Sync + 'static,
28    Args: 'static,
29{
30    fn fetch_by_id(
31        &mut self,
32        task_id: &PgTaskId,
33    ) -> impl Future<Output = Result<Option<PgTask<Args>>, Self::Error>> + Send {
34        queries::fetch_by_id::<Args, D>(
35            self.pool.clone(),
36            task_id.to_string(),
37            self.config.queue().to_string(),
38        )
39    }
40}
41
42impl<Args, D, F> ListTasks<Args> for PostgresStorage<Args, D, F>
43where
44    PostgresStorage<Args, D, F>:
45        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
46    D: Codec<Args, Compact = CompactType>,
47    D::Error: std::error::Error + Send + Sync + 'static,
48    Args: 'static,
49{
50    fn list_tasks(
51        &self,
52        filter: &Filter,
53    ) -> impl Future<Output = Result<Vec<PgTask<Args>>, Self::Error>> + Send {
54        queries::list_tasks::<Args, D>(self.pool.clone(), self.config.queue().to_string(), filter)
55    }
56}
57
58impl<Args, D, F> ListAllTasks for PostgresStorage<Args, D, F>
59where
60    PostgresStorage<Args, D, F>:
61        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
62{
63    fn list_all_tasks(
64        &self,
65        filter: &Filter,
66    ) -> impl Future<
67        Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
68    > + Send {
69        queries::list_all_tasks(self.pool.clone(), filter)
70    }
71}
72
73impl<Args, D, F> ListWorkers for PostgresStorage<Args, D, F>
74where
75    Args: Sync,
76    PostgresStorage<Args, D, F>:
77        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
78{
79    fn list_workers(&self) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
80        queries::list_workers(self.pool.clone(), Some(self.config.queue().to_string()))
81    }
82
83    fn list_all_workers(
84        &self,
85    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
86        queries::list_workers(self.pool.clone(), None)
87    }
88}
89
90impl<Args, D, F> ListQueues for PostgresStorage<Args, D, F>
91where
92    PostgresStorage<Args, D, F>:
93        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
94{
95    fn list_queues(&self) -> impl Future<Output = Result<Vec<QueueInfo>, Self::Error>> + Send {
96        queries::list_queues(self.pool.clone())
97    }
98}
99
100impl<Args, D, F> Metrics for PostgresStorage<Args, D, F>
101where
102    PostgresStorage<Args, D, F>:
103        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
104{
105    /// Scans `apalis.jobs` to compute global statistics. Each call evaluates
106    /// 20+ FILTER aggregates over every row; cost grows linearly with the
107    /// table size. Treat as a slow admin call.
108    fn global(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
109        queries::metrics_global(self.pool.clone())
110    }
111
112    /// Same shape as [`Self::global`], scoped to the configured queue. Cost
113    /// still depends on the number of jobs in that `job_type`.
114    fn fetch_by_queue(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
115        queries::metrics_for_queue(self.pool.clone(), self.config.queue().to_string())
116    }
117}
118
119impl<Args, D, F> RegisterWorker for PostgresStorage<Args, D, F>
120where
121    PostgresStorage<Args, D, F>:
122        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
123{
124    fn register_worker(
125        &mut self,
126        worker_id: String,
127    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
128        queries::register_worker(
129            self.pool.clone(),
130            worker_id,
131            self.config.queue().to_string(),
132        )
133    }
134}
135
136impl<O, Args, F, Decode> WaitForCompletion<O> for PostgresStorage<Args, Decode, F>
137where
138    O: 'static + Send,
139    PostgresStorage<Args, Decode, F>:
140        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
141    Result<O, String>: DeserializeOwned,
142{
143    type ResultStream = BoxStream<'static, Result<TaskResult<O, Ulid>, Error>>;
144
145    /// Wait for the given tasks to complete, yielding each result as it lands.
146    ///
147    /// # Error handling
148    ///
149    /// A transient database error during polling does **not** abandon the
150    /// batch: the poll is retried with backoff. The stream yields an `Err` and
151    /// ends only once the failures *persist* across several consecutive polls.
152    /// Because completed results are durable in `apalis.jobs`, a surfaced error
153    /// is always safe to recover from by calling `wait_for` again with the ids
154    /// that have not yet yielded a result.
155    fn wait_for(
156        &self,
157        task_ids: impl IntoIterator<Item = TaskId<Self::IdType>>,
158    ) -> Self::ResultStream {
159        queries::admin::wait_for_completion(self.pool.clone(), task_ids)
160    }
161
162    fn check_status(
163        &self,
164        task_ids: impl IntoIterator<Item = TaskId<Self::IdType>> + Send,
165    ) -> impl Future<Output = Result<Vec<TaskResult<O, Ulid>>, Self::Error>> + Send {
166        queries::admin::check_status(self.pool.clone(), task_ids)
167    }
168}