1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::core::JobState;
7use crate::error::QmlError;
8use crate::storage::Storage;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct JobStatistics {
12 pub total_jobs: u64,
13 pub succeeded: u64,
14 pub failed: u64,
15 pub processing: u64,
16 pub enqueued: u64,
17 pub scheduled: u64,
18 pub awaiting_retry: u64,
19 pub deleted: u64,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct QueueStatistics {
24 pub queue_name: String,
25 pub enqueued_count: u64,
26 pub processing_count: u64,
27 pub scheduled_count: u64,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct JobDetails {
32 pub id: String,
33 pub method_name: String,
34 pub queue: String,
35 pub state: JobState,
36 pub created_at: DateTime<Utc>,
37 pub updated_at: DateTime<Utc>,
38 pub attempts: u32,
39 pub max_attempts: u32,
40 pub error_message: Option<String>,
41 pub scheduled_at: Option<DateTime<Utc>>,
42 pub duration_ms: Option<u64>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ServerStatistics {
47 pub server_count: u32,
48 pub worker_count: u32,
49 pub queues: Vec<QueueStatistics>,
50 pub jobs: JobStatistics,
51 pub recent_jobs: Vec<JobDetails>,
52}
53
54pub struct DashboardService {
55 storage: Arc<dyn Storage>,
56}
57
58impl DashboardService {
59 pub fn new(storage: Arc<dyn Storage>) -> Self {
60 Self { storage }
61 }
62
63 pub async fn get_job_statistics(&self) -> Result<JobStatistics, QmlError> {
65 match self.storage.get_job_counts().await {
67 Ok(counts) => {
68 let mut stats = JobStatistics {
69 total_jobs: 0,
70 succeeded: 0,
71 failed: 0,
72 processing: 0,
73 enqueued: 0,
74 scheduled: 0,
75 awaiting_retry: 0,
76 deleted: 0,
77 };
78
79 for (state, count) in counts {
80 stats.total_jobs += count as u64;
81 match state {
82 JobState::Enqueued { .. } => stats.enqueued += count as u64,
83 JobState::Processing { .. } => stats.processing += count as u64,
84 JobState::Succeeded { .. } => stats.succeeded += count as u64,
85 JobState::Failed { .. } => stats.failed += count as u64,
86 JobState::Scheduled { .. } => stats.scheduled += count as u64,
87 JobState::AwaitingRetry { .. } => stats.awaiting_retry += count as u64,
88 JobState::Deleted { .. } => stats.deleted += count as u64,
89 }
90 }
91
92 Ok(stats)
93 }
94 Err(_) => {
95 let sample_jobs = self
97 .storage
98 .list(None, Some(100), None)
99 .await
100 .map_err(|e| QmlError::StorageError {
101 message: e.to_string(),
102 })?;
103
104 let mut stats = JobStatistics {
105 total_jobs: sample_jobs.len() as u64,
106 succeeded: 0,
107 failed: 0,
108 processing: 0,
109 enqueued: 0,
110 scheduled: 0,
111 awaiting_retry: 0,
112 deleted: 0,
113 };
114
115 for job in sample_jobs {
116 match job.state {
117 JobState::Enqueued { .. } => stats.enqueued += 1,
118 JobState::Processing { .. } => stats.processing += 1,
119 JobState::Succeeded { .. } => stats.succeeded += 1,
120 JobState::Failed { .. } => stats.failed += 1,
121 JobState::Scheduled { .. } => stats.scheduled += 1,
122 JobState::AwaitingRetry { .. } => stats.awaiting_retry += 1,
123 JobState::Deleted { .. } => stats.deleted += 1,
124 }
125 }
126
127 Ok(stats)
128 }
129 }
130 }
131
132 pub async fn get_queue_statistics(&self) -> Result<Vec<QueueStatistics>, QmlError> {
134 let jobs = self
135 .storage
136 .list(None, Some(200), None)
137 .await
138 .map_err(|e| QmlError::StorageError {
139 message: e.to_string(),
140 })?;
141 let mut queue_map: HashMap<String, QueueStatistics> = HashMap::new();
142
143 for job in jobs {
144 let queue_name = job.queue.clone();
145 let stats = queue_map
146 .entry(queue_name.clone())
147 .or_insert(QueueStatistics {
148 queue_name,
149 enqueued_count: 0,
150 processing_count: 0,
151 scheduled_count: 0,
152 });
153
154 match job.state {
155 JobState::Enqueued { .. } => stats.enqueued_count += 1,
156 JobState::Processing { .. } => stats.processing_count += 1,
157 JobState::Scheduled { .. } => stats.scheduled_count += 1,
158 _ => {} }
160 }
161
162 Ok(queue_map.into_values().collect())
163 }
164
165 pub async fn get_recent_jobs(&self, limit: Option<usize>) -> Result<Vec<JobDetails>, QmlError> {
167 let limit = limit.unwrap_or(50);
168 let jobs = self
169 .storage
170 .list(None, Some(limit), None)
171 .await
172 .map_err(|e| QmlError::StorageError {
173 message: e.to_string(),
174 })?;
175
176 let job_details: Vec<JobDetails> = jobs
177 .into_iter()
178 .map(|job| {
179 let (error_message, scheduled_at, duration_ms) =
180 Self::extract_state_info(&job.state);
181
182 JobDetails {
183 id: job.id,
184 method_name: job.method,
185 queue: job.queue,
186 state: job.state,
187 created_at: job.created_at,
188 updated_at: job.created_at, attempts: 0, max_attempts: job.max_retries,
191 error_message,
192 scheduled_at,
193 duration_ms,
194 }
195 })
196 .collect();
197
198 Ok(job_details)
199 }
200
201 pub async fn get_jobs_by_state(&self, state: JobState) -> Result<Vec<JobDetails>, QmlError> {
203 let jobs = self
204 .storage
205 .list(Some(&state), Some(100), None)
206 .await
207 .map_err(|e| QmlError::StorageError {
208 message: e.to_string(),
209 })?;
210
211 let job_details: Vec<JobDetails> = jobs
212 .into_iter()
213 .map(|job| {
214 let (error_message, scheduled_at, duration_ms) =
215 Self::extract_state_info(&job.state);
216
217 JobDetails {
218 id: job.id,
219 method_name: job.method,
220 queue: job.queue,
221 state: job.state,
222 created_at: job.created_at,
223 updated_at: job.created_at,
224 attempts: 0,
225 max_attempts: job.max_retries,
226 error_message,
227 scheduled_at,
228 duration_ms,
229 }
230 })
231 .collect();
232
233 Ok(job_details)
234 }
235
236 pub async fn get_job_details(&self, job_id: &str) -> Result<Option<JobDetails>, QmlError> {
238 let job = self
239 .storage
240 .get(job_id)
241 .await
242 .map_err(|e| QmlError::StorageError {
243 message: e.to_string(),
244 })?;
245
246 Ok(job.map(|job| {
247 let (error_message, scheduled_at, duration_ms) = Self::extract_state_info(&job.state);
248
249 JobDetails {
250 id: job.id,
251 method_name: job.method,
252 queue: job.queue,
253 state: job.state,
254 created_at: job.created_at,
255 updated_at: job.created_at,
256 attempts: 0,
257 max_attempts: job.max_retries,
258 error_message,
259 scheduled_at,
260 duration_ms,
261 }
262 }))
263 }
264
265 pub async fn retry_job(&self, job_id: &str) -> Result<bool, QmlError> {
267 let mut job = match self
268 .storage
269 .get(job_id)
270 .await
271 .map_err(|e| QmlError::StorageError {
272 message: e.to_string(),
273 })? {
274 Some(job) => job,
275 None => return Ok(false),
276 };
277
278 if !matches!(job.state, JobState::Failed { .. }) {
280 return Ok(false);
281 }
282
283 job.state = JobState::enqueued(&job.queue);
285
286 self.storage
287 .update(&job)
288 .await
289 .map_err(|e| QmlError::StorageError {
290 message: e.to_string(),
291 })?;
292 Ok(true)
293 }
294
295 pub async fn delete_job(&self, job_id: &str) -> Result<bool, QmlError> {
297 match self.storage.delete(job_id).await {
298 Ok(deleted) => Ok(deleted),
299 Err(e) => Err(QmlError::StorageError {
300 message: e.to_string(),
301 }),
302 }
303 }
304
305 pub async fn get_server_statistics(&self) -> Result<ServerStatistics, QmlError> {
307 let job_stats = self.get_job_statistics().await?;
308 let queue_stats = self.get_queue_statistics().await?;
309 let recent_jobs = self.get_recent_jobs(Some(20)).await?;
310
311 Ok(ServerStatistics {
312 server_count: 1, worker_count: 1, queues: queue_stats,
315 jobs: job_stats,
316 recent_jobs,
317 })
318 }
319
320 fn extract_state_info(
322 state: &JobState,
323 ) -> (Option<String>, Option<DateTime<Utc>>, Option<u64>) {
324 match state {
325 JobState::Failed { exception, .. } => (Some(exception.clone()), None, None),
326 JobState::Succeeded { total_duration, .. } => (None, None, Some(*total_duration)),
327 JobState::Scheduled { enqueue_at, .. } => (None, Some(*enqueue_at), None),
328 JobState::AwaitingRetry {
329 last_exception,
330 retry_at,
331 ..
332 } => (Some(last_exception.clone()), Some(*retry_at), None),
333 _ => (None, None, None),
334 }
335 }
336}