apalis_board_api/
lib.rs

1#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
2use std::{str::FromStr, sync::Arc};
3
4use apalis_board_types::ApiError;
5use apalis_core::{
6    backend::{
7        Backend, BackendExt, FetchById, Filter, ListAllTasks, ListQueues, ListTasks, ListWorkers,
8        Metrics, QueueInfo, RunningWorker, Statistic, TaskSink, codec::Codec,
9    },
10    task::{Task, builder::TaskBuilder, task_id::TaskId},
11};
12use serde::{Serialize, de::DeserializeOwned};
13use tokio::sync::RwLock;
14
15/// Contains different web framework routes.
16pub mod framework;
17/// Expose Server-Sent Events (SSE) functionality.
18#[cfg(feature = "sse")]
19pub mod sse;
20/// Expose UI components and functionality.
21#[cfg(feature = "ui")]
22pub mod ui;
23
24/// Push a new task to the specified queue.
25pub async fn push_task<Args, B, Compact>(
26    _queue: String,
27    task: Args,
28    storage: Arc<RwLock<B>>,
29) -> Result<(), ApiError>
30where
31    Args: Serialize + DeserializeOwned + 'static,
32    B: TaskSink<Args> + Send + BackendExt,
33    B::Error: std::error::Error,
34    B::Codec: Codec<Args, Compact = Compact>,
35    <<B as BackendExt>::Codec as Codec<Args>>::Error: std::error::Error,
36{
37    let task = TaskBuilder::new(task).build();
38    let res = storage.write().await.push_task(task).await;
39    match res {
40        Ok(_) => Ok(()),
41        Err(e) => Err(ApiError::BackendError(e.to_string())),
42    }
43}
44/// Get statistics for a specific queue.
45pub async fn stats_by_queue<S>(
46    storage: Arc<RwLock<S>>,
47    queue: String,
48) -> Result<Vec<Statistic>, ApiError>
49where
50    S::Error: std::error::Error,
51    S: Metrics,
52{
53    let stats = storage.read().await.fetch_by_queue(queue.as_ref()).await;
54    match stats {
55        Ok(stats) => Ok(stats),
56        Err(e) => Err(ApiError::BackendError(e.to_string())),
57    }
58}
59
60/// Get a list of tasks from the specified queue with filtering options.
61pub async fn get_tasks<S, T, Compact>(
62    queue: String,
63    storage: Arc<RwLock<S>>,
64    filter: Filter,
65) -> Result<Vec<Task<T, S::Context, S::IdType>>, ApiError>
66where
67    T: Serialize + DeserializeOwned + 'static,
68    S: ListTasks<T> + Send + BackendExt,
69    S::Context: Serialize,
70    S::IdType: Serialize,
71    <S as Backend>::Error: std::error::Error,
72    S::Codec: Codec<T, Compact = Compact>,
73{
74    storage
75        .read()
76        .await
77        .list_tasks(queue.as_ref(), &filter)
78        .await
79        .map_err(|e| ApiError::BackendError(e.to_string()))
80}
81
82/// Get workers for a specific queue.
83pub async fn get_workers<S>(
84    storage: Arc<RwLock<S>>,
85    queue: String,
86) -> Result<Vec<RunningWorker>, ApiError>
87where
88    S: ListWorkers,
89    S::Error: std::error::Error,
90{
91    storage
92        .read()
93        .await
94        .list_workers(queue.as_ref())
95        .await
96        .map_err(|e| ApiError::BackendError(e.to_string()))
97}
98
99/// Get a task by its ID.
100pub async fn get_task_by_id<B, T>(
101    task_id: String,
102    storage: Arc<RwLock<B>>,
103) -> Result<Option<Task<T, B::Context, B::IdType>>, ApiError>
104where
105    T: Serialize + DeserializeOwned + 'static,
106    B: FetchById<T> + 'static,
107    B::Context: Serialize,
108    B::IdType: Serialize,
109    B::Context: Serialize,
110    B::Error: std::error::Error,
111    B::IdType: FromStr,
112    <<B as Backend>::IdType as FromStr>::Err: std::error::Error,
113{
114    let task_id = TaskId::<B::IdType>::from_str(&task_id)
115        .map_err(|e| ApiError::BackendError(e.to_string()))?;
116
117    storage
118        .write()
119        .await
120        .fetch_by_id(&task_id)
121        .await
122        .map_err(|e| ApiError::BackendError(e.to_string()))
123}
124
125/// Get all tasks across all queues.
126pub async fn get_all_tasks<S>(
127    storage: Arc<RwLock<S>>,
128    filter: Filter,
129) -> Result<Vec<Task<S::Compact, S::Context, S::IdType>>, ApiError>
130where
131    S: ListAllTasks + Send,
132    S::Context: Serialize,
133    S::IdType: Serialize,
134    S::Compact: Serialize,
135    <S as Backend>::Error: std::error::Error,
136    <<S as BackendExt>::Codec as Codec<<S as Backend>::Args>>::Error: std::error::Error,
137{
138    storage
139        .read()
140        .await
141        .list_all_tasks(&filter)
142        .await
143        .map_err(|e| ApiError::BackendError(e.to_string()))
144}
145
146/// Get all workers across all queues.
147pub async fn get_all_workers<S>(storage: Arc<RwLock<S>>) -> Result<Vec<RunningWorker>, ApiError>
148where
149    S: ListWorkers,
150    S::Error: std::error::Error,
151{
152    storage
153        .read()
154        .await
155        .list_all_workers()
156        .await
157        .map_err(|e| ApiError::BackendError(e.to_string()))
158}
159
160/// Fetch all queues.
161pub async fn fetch_queues<S>(storage: Arc<RwLock<S>>) -> Result<Vec<QueueInfo>, ApiError>
162where
163    S::Error: std::error::Error,
164    S: ListQueues,
165{
166    storage
167        .read()
168        .await
169        .list_queues()
170        .await
171        .map_err(|e| ApiError::BackendError(e.to_string()))
172}
173
174/// Get an overview of statistics.
175pub async fn overview<S>(storage: Arc<RwLock<S>>) -> Result<Vec<Statistic>, ApiError>
176where
177    S::Error: std::error::Error,
178    S: Metrics,
179{
180    let overview = storage.read().await.global().await;
181    match overview {
182        Ok(overview) => Ok(overview),
183        Err(e) => Err(ApiError::BackendError(e.to_string())),
184    }
185}