1#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
2use std::{str::FromStr, sync::Arc};
3
4use apalis_board_types::ApiError;
5use apalis_core::{
6 backend::{
7 Backend, BackendExt, FetchById, Filter, ListAllTasks, ListQueues, ListTasks, ListWorkers,
8 Metrics, QueueInfo, RunningWorker, Statistic, TaskSink, codec::Codec,
9 },
10 task::{Task, builder::TaskBuilder, task_id::TaskId},
11};
12use serde::{Serialize, de::DeserializeOwned};
13use tokio::sync::RwLock;
14
15pub mod framework;
17#[cfg(feature = "sse")]
19pub mod sse;
20#[cfg(feature = "ui")]
22pub mod ui;
23
24pub async fn push_task<Args, B, Compact>(
26 _queue: String,
27 task: Args,
28 storage: Arc<RwLock<B>>,
29) -> Result<(), ApiError>
30where
31 Args: Serialize + DeserializeOwned + 'static,
32 B: TaskSink<Args> + Send + BackendExt,
33 B::Error: std::error::Error,
34 B::Codec: Codec<Args, Compact = Compact>,
35 <<B as BackendExt>::Codec as Codec<Args>>::Error: std::error::Error,
36{
37 let task = TaskBuilder::new(task).build();
38 let res = storage.write().await.push_task(task).await;
39 match res {
40 Ok(_) => Ok(()),
41 Err(e) => Err(ApiError::BackendError(e.to_string())),
42 }
43}
44pub async fn stats_by_queue<S>(
46 storage: Arc<RwLock<S>>,
47 queue: String,
48) -> Result<Vec<Statistic>, ApiError>
49where
50 S::Error: std::error::Error,
51 S: Metrics,
52{
53 let stats = storage.read().await.fetch_by_queue(queue.as_ref()).await;
54 match stats {
55 Ok(stats) => Ok(stats),
56 Err(e) => Err(ApiError::BackendError(e.to_string())),
57 }
58}
59
60pub async fn get_tasks<S, T, Compact>(
62 queue: String,
63 storage: Arc<RwLock<S>>,
64 filter: Filter,
65) -> Result<Vec<Task<T, S::Context, S::IdType>>, ApiError>
66where
67 T: Serialize + DeserializeOwned + 'static,
68 S: ListTasks<T> + Send + BackendExt,
69 S::Context: Serialize,
70 S::IdType: Serialize,
71 <S as Backend>::Error: std::error::Error,
72 S::Codec: Codec<T, Compact = Compact>,
73{
74 storage
75 .read()
76 .await
77 .list_tasks(queue.as_ref(), &filter)
78 .await
79 .map_err(|e| ApiError::BackendError(e.to_string()))
80}
81
82pub async fn get_workers<S>(
84 storage: Arc<RwLock<S>>,
85 queue: String,
86) -> Result<Vec<RunningWorker>, ApiError>
87where
88 S: ListWorkers,
89 S::Error: std::error::Error,
90{
91 storage
92 .read()
93 .await
94 .list_workers(queue.as_ref())
95 .await
96 .map_err(|e| ApiError::BackendError(e.to_string()))
97}
98
99pub async fn get_task_by_id<B, T>(
101 task_id: String,
102 storage: Arc<RwLock<B>>,
103) -> Result<Option<Task<T, B::Context, B::IdType>>, ApiError>
104where
105 T: Serialize + DeserializeOwned + 'static,
106 B: FetchById<T> + 'static,
107 B::Context: Serialize,
108 B::IdType: Serialize,
109 B::Context: Serialize,
110 B::Error: std::error::Error,
111 B::IdType: FromStr,
112 <<B as Backend>::IdType as FromStr>::Err: std::error::Error,
113{
114 let task_id = TaskId::<B::IdType>::from_str(&task_id)
115 .map_err(|e| ApiError::BackendError(e.to_string()))?;
116
117 storage
118 .write()
119 .await
120 .fetch_by_id(&task_id)
121 .await
122 .map_err(|e| ApiError::BackendError(e.to_string()))
123}
124
125pub async fn get_all_tasks<S>(
127 storage: Arc<RwLock<S>>,
128 filter: Filter,
129) -> Result<Vec<Task<S::Compact, S::Context, S::IdType>>, ApiError>
130where
131 S: ListAllTasks + Send,
132 S::Context: Serialize,
133 S::IdType: Serialize,
134 S::Compact: Serialize,
135 <S as Backend>::Error: std::error::Error,
136 <<S as BackendExt>::Codec as Codec<<S as Backend>::Args>>::Error: std::error::Error,
137{
138 storage
139 .read()
140 .await
141 .list_all_tasks(&filter)
142 .await
143 .map_err(|e| ApiError::BackendError(e.to_string()))
144}
145
146pub async fn get_all_workers<S>(storage: Arc<RwLock<S>>) -> Result<Vec<RunningWorker>, ApiError>
148where
149 S: ListWorkers,
150 S::Error: std::error::Error,
151{
152 storage
153 .read()
154 .await
155 .list_all_workers()
156 .await
157 .map_err(|e| ApiError::BackendError(e.to_string()))
158}
159
160pub async fn fetch_queues<S>(storage: Arc<RwLock<S>>) -> Result<Vec<QueueInfo>, ApiError>
162where
163 S::Error: std::error::Error,
164 S: ListQueues,
165{
166 storage
167 .read()
168 .await
169 .list_queues()
170 .await
171 .map_err(|e| ApiError::BackendError(e.to_string()))
172}
173
174pub async fn overview<S>(storage: Arc<RwLock<S>>) -> Result<Vec<Statistic>, ApiError>
176where
177 S::Error: std::error::Error,
178 S: Metrics,
179{
180 let overview = storage.read().await.global().await;
181 match overview {
182 Ok(overview) => Ok(overview),
183 Err(e) => Err(ApiError::BackendError(e.to_string())),
184 }
185}