1use apalis_core::{
9 backend::{
10 BackendExt, FetchById, Filter, ListAllTasks, ListQueues, ListTasks, ListWorkers, Metrics,
11 QueueInfo, RegisterWorker, RunningWorker, Statistic, TaskResult, WaitForCompletion,
12 codec::Codec,
13 },
14 task::{Task, task_id::TaskId},
15};
16use futures::stream::BoxStream;
17use serde::de::DeserializeOwned;
18use ulid::Ulid;
19
20use crate::{CompactType, Error, PgContext, PgTask, PgTaskId, PostgresStorage, queries};
21
22impl<Args, D, F> FetchById<Args> for PostgresStorage<Args, D, F>
23where
24 PostgresStorage<Args, D, F>:
25 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
26 D: Codec<Args, Compact = CompactType>,
27 D::Error: std::error::Error + Send + Sync + 'static,
28 Args: 'static,
29{
30 fn fetch_by_id(
31 &mut self,
32 task_id: &PgTaskId,
33 ) -> impl Future<Output = Result<Option<PgTask<Args>>, Self::Error>> + Send {
34 queries::fetch_by_id::<Args, D>(
35 self.pool.clone(),
36 task_id.to_string(),
37 self.config.queue().to_string(),
38 )
39 }
40}
41
42impl<Args, D, F> ListTasks<Args> for PostgresStorage<Args, D, F>
43where
44 PostgresStorage<Args, D, F>:
45 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
46 D: Codec<Args, Compact = CompactType>,
47 D::Error: std::error::Error + Send + Sync + 'static,
48 Args: 'static,
49{
50 fn list_tasks(
51 &self,
52 filter: &Filter,
53 ) -> impl Future<Output = Result<Vec<PgTask<Args>>, Self::Error>> + Send {
54 queries::list_tasks::<Args, D>(self.pool.clone(), self.config.queue().to_string(), filter)
55 }
56}
57
58impl<Args, D, F> ListAllTasks for PostgresStorage<Args, D, F>
59where
60 PostgresStorage<Args, D, F>:
61 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
62{
63 fn list_all_tasks(
64 &self,
65 filter: &Filter,
66 ) -> impl Future<
67 Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
68 > + Send {
69 queries::list_all_tasks(self.pool.clone(), filter)
70 }
71}
72
73impl<Args, D, F> ListWorkers for PostgresStorage<Args, D, F>
74where
75 Args: Sync,
76 PostgresStorage<Args, D, F>:
77 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
78{
79 fn list_workers(&self) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
80 queries::list_workers(self.pool.clone(), Some(self.config.queue().to_string()))
81 }
82
83 fn list_all_workers(
84 &self,
85 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
86 queries::list_workers(self.pool.clone(), None)
87 }
88}
89
90impl<Args, D, F> ListQueues for PostgresStorage<Args, D, F>
91where
92 PostgresStorage<Args, D, F>:
93 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
94{
95 fn list_queues(&self) -> impl Future<Output = Result<Vec<QueueInfo>, Self::Error>> + Send {
96 queries::list_queues(self.pool.clone())
97 }
98}
99
100impl<Args, D, F> Metrics for PostgresStorage<Args, D, F>
101where
102 PostgresStorage<Args, D, F>:
103 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
104{
105 fn global(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
109 queries::metrics_global(self.pool.clone())
110 }
111
112 fn fetch_by_queue(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
115 queries::metrics_for_queue(self.pool.clone(), self.config.queue().to_string())
116 }
117}
118
119impl<Args, D, F> RegisterWorker for PostgresStorage<Args, D, F>
120where
121 PostgresStorage<Args, D, F>:
122 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
123{
124 fn register_worker(
125 &mut self,
126 worker_id: String,
127 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
128 queries::register_worker(
129 self.pool.clone(),
130 worker_id,
131 self.config.queue().to_string(),
132 )
133 }
134}
135
136impl<O, Args, F, Decode> WaitForCompletion<O> for PostgresStorage<Args, Decode, F>
137where
138 O: 'static + Send,
139 PostgresStorage<Args, Decode, F>:
140 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
141 Result<O, String>: DeserializeOwned,
142{
143 type ResultStream = BoxStream<'static, Result<TaskResult<O, Ulid>, Error>>;
144
145 fn wait_for(
156 &self,
157 task_ids: impl IntoIterator<Item = TaskId<Self::IdType>>,
158 ) -> Self::ResultStream {
159 queries::admin::wait_for_completion(self.pool.clone(), task_ids)
160 }
161
162 fn check_status(
163 &self,
164 task_ids: impl IntoIterator<Item = TaskId<Self::IdType>> + Send,
165 ) -> impl Future<Output = Result<Vec<TaskResult<O, Ulid>>, Self::Error>> + Send {
166 queries::admin::check_status(self.pool.clone(), task_ids)
167 }
168}