qml_rs/dashboard/
service.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::core::JobState;
7use crate::error::QmlError;
8use crate::storage::Storage;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct JobStatistics {
12    pub total_jobs: u64,
13    pub succeeded: u64,
14    pub failed: u64,
15    pub processing: u64,
16    pub enqueued: u64,
17    pub scheduled: u64,
18    pub awaiting_retry: u64,
19    pub deleted: u64,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct QueueStatistics {
24    pub queue_name: String,
25    pub enqueued_count: u64,
26    pub processing_count: u64,
27    pub scheduled_count: u64,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct JobDetails {
32    pub id: String,
33    pub method_name: String,
34    pub queue: String,
35    pub state: JobState,
36    pub created_at: DateTime<Utc>,
37    pub updated_at: DateTime<Utc>,
38    pub attempts: u32,
39    pub max_attempts: u32,
40    pub error_message: Option<String>,
41    pub scheduled_at: Option<DateTime<Utc>>,
42    pub duration_ms: Option<u64>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ServerStatistics {
47    pub server_count: u32,
48    pub worker_count: u32,
49    pub queues: Vec<QueueStatistics>,
50    pub jobs: JobStatistics,
51    pub recent_jobs: Vec<JobDetails>,
52}
53
54pub struct DashboardService {
55    storage: Arc<dyn Storage>,
56}
57
58impl DashboardService {
59    pub fn new(storage: Arc<dyn Storage>) -> Self {
60        Self { storage }
61    }
62
63    /// Get overall job statistics across all queues
64    pub async fn get_job_statistics(&self) -> Result<JobStatistics, QmlError> {
65        // Use the storage's get_job_counts method if available, otherwise fall back to listing
66        match self.storage.get_job_counts().await {
67            Ok(counts) => {
68                let mut stats = JobStatistics {
69                    total_jobs: 0,
70                    succeeded: 0,
71                    failed: 0,
72                    processing: 0,
73                    enqueued: 0,
74                    scheduled: 0,
75                    awaiting_retry: 0,
76                    deleted: 0,
77                };
78
79                for (state, count) in counts {
80                    stats.total_jobs += count as u64;
81                    match state {
82                        JobState::Enqueued { .. } => stats.enqueued += count as u64,
83                        JobState::Processing { .. } => stats.processing += count as u64,
84                        JobState::Succeeded { .. } => stats.succeeded += count as u64,
85                        JobState::Failed { .. } => stats.failed += count as u64,
86                        JobState::Scheduled { .. } => stats.scheduled += count as u64,
87                        JobState::AwaitingRetry { .. } => stats.awaiting_retry += count as u64,
88                        JobState::Deleted { .. } => stats.deleted += count as u64,
89                    }
90                }
91
92                Ok(stats)
93            }
94            Err(_) => {
95                // Fallback: get a small sample and estimate
96                let sample_jobs = self
97                    .storage
98                    .list(None, Some(100), None)
99                    .await
100                    .map_err(|e| QmlError::StorageError {
101                        message: e.to_string(),
102                    })?;
103
104                let mut stats = JobStatistics {
105                    total_jobs: sample_jobs.len() as u64,
106                    succeeded: 0,
107                    failed: 0,
108                    processing: 0,
109                    enqueued: 0,
110                    scheduled: 0,
111                    awaiting_retry: 0,
112                    deleted: 0,
113                };
114
115                for job in sample_jobs {
116                    match job.state {
117                        JobState::Enqueued { .. } => stats.enqueued += 1,
118                        JobState::Processing { .. } => stats.processing += 1,
119                        JobState::Succeeded { .. } => stats.succeeded += 1,
120                        JobState::Failed { .. } => stats.failed += 1,
121                        JobState::Scheduled { .. } => stats.scheduled += 1,
122                        JobState::AwaitingRetry { .. } => stats.awaiting_retry += 1,
123                        JobState::Deleted { .. } => stats.deleted += 1,
124                    }
125                }
126
127                Ok(stats)
128            }
129        }
130    }
131
132    /// Get statistics for each queue
133    pub async fn get_queue_statistics(&self) -> Result<Vec<QueueStatistics>, QmlError> {
134        let jobs = self
135            .storage
136            .list(None, Some(200), None)
137            .await
138            .map_err(|e| QmlError::StorageError {
139                message: e.to_string(),
140            })?;
141        let mut queue_map: HashMap<String, QueueStatistics> = HashMap::new();
142
143        for job in jobs {
144            let queue_name = job.queue.clone();
145            let stats = queue_map
146                .entry(queue_name.clone())
147                .or_insert(QueueStatistics {
148                    queue_name,
149                    enqueued_count: 0,
150                    processing_count: 0,
151                    scheduled_count: 0,
152                });
153
154            match job.state {
155                JobState::Enqueued { .. } => stats.enqueued_count += 1,
156                JobState::Processing { .. } => stats.processing_count += 1,
157                JobState::Scheduled { .. } => stats.scheduled_count += 1,
158                _ => {} // Other states don't affect queue stats
159            }
160        }
161
162        Ok(queue_map.into_values().collect())
163    }
164
165    /// Get recent jobs (limited sample)
166    pub async fn get_recent_jobs(&self, limit: Option<usize>) -> Result<Vec<JobDetails>, QmlError> {
167        let limit = limit.unwrap_or(50);
168        let jobs = self
169            .storage
170            .list(None, Some(limit), None)
171            .await
172            .map_err(|e| QmlError::StorageError {
173                message: e.to_string(),
174            })?;
175
176        let job_details: Vec<JobDetails> = jobs
177            .into_iter()
178            .map(|job| {
179                let (error_message, scheduled_at, duration_ms) =
180                    Self::extract_state_info(&job.state);
181
182                JobDetails {
183                    id: job.id,
184                    method_name: job.method,
185                    queue: job.queue,
186                    state: job.state,
187                    created_at: job.created_at,
188                    updated_at: job.created_at, // Use created_at as approximation
189                    attempts: 0,                // Not tracked in current Job struct
190                    max_attempts: job.max_retries,
191                    error_message,
192                    scheduled_at,
193                    duration_ms,
194                }
195            })
196            .collect();
197
198        Ok(job_details)
199    }
200
201    /// Get jobs by state (limited sample)
202    pub async fn get_jobs_by_state(&self, state: JobState) -> Result<Vec<JobDetails>, QmlError> {
203        let jobs = self
204            .storage
205            .list(Some(&state), Some(100), None)
206            .await
207            .map_err(|e| QmlError::StorageError {
208                message: e.to_string(),
209            })?;
210
211        let job_details: Vec<JobDetails> = jobs
212            .into_iter()
213            .map(|job| {
214                let (error_message, scheduled_at, duration_ms) =
215                    Self::extract_state_info(&job.state);
216
217                JobDetails {
218                    id: job.id,
219                    method_name: job.method,
220                    queue: job.queue,
221                    state: job.state,
222                    created_at: job.created_at,
223                    updated_at: job.created_at,
224                    attempts: 0,
225                    max_attempts: job.max_retries,
226                    error_message,
227                    scheduled_at,
228                    duration_ms,
229                }
230            })
231            .collect();
232
233        Ok(job_details)
234    }
235
236    /// Get detailed job information by ID
237    pub async fn get_job_details(&self, job_id: &str) -> Result<Option<JobDetails>, QmlError> {
238        let job = self
239            .storage
240            .get(job_id)
241            .await
242            .map_err(|e| QmlError::StorageError {
243                message: e.to_string(),
244            })?;
245
246        Ok(job.map(|job| {
247            let (error_message, scheduled_at, duration_ms) = Self::extract_state_info(&job.state);
248
249            JobDetails {
250                id: job.id,
251                method_name: job.method,
252                queue: job.queue,
253                state: job.state,
254                created_at: job.created_at,
255                updated_at: job.created_at,
256                attempts: 0,
257                max_attempts: job.max_retries,
258                error_message,
259                scheduled_at,
260                duration_ms,
261            }
262        }))
263    }
264
265    /// Retry a failed job (simplified)
266    pub async fn retry_job(&self, job_id: &str) -> Result<bool, QmlError> {
267        let mut job = match self
268            .storage
269            .get(job_id)
270            .await
271            .map_err(|e| QmlError::StorageError {
272                message: e.to_string(),
273            })? {
274            Some(job) => job,
275            None => return Ok(false),
276        };
277
278        // Only retry failed jobs
279        if !matches!(job.state, JobState::Failed { .. }) {
280            return Ok(false);
281        }
282
283        // Reset job state to enqueued for retry
284        job.state = JobState::enqueued(&job.queue);
285
286        self.storage
287            .update(&job)
288            .await
289            .map_err(|e| QmlError::StorageError {
290                message: e.to_string(),
291            })?;
292        Ok(true)
293    }
294
295    /// Delete a job (simplified)
296    pub async fn delete_job(&self, job_id: &str) -> Result<bool, QmlError> {
297        match self.storage.delete(job_id).await {
298            Ok(deleted) => Ok(deleted),
299            Err(e) => Err(QmlError::StorageError {
300                message: e.to_string(),
301            }),
302        }
303    }
304
305    /// Get comprehensive server statistics
306    pub async fn get_server_statistics(&self) -> Result<ServerStatistics, QmlError> {
307        let job_stats = self.get_job_statistics().await?;
308        let queue_stats = self.get_queue_statistics().await?;
309        let recent_jobs = self.get_recent_jobs(Some(20)).await?;
310
311        Ok(ServerStatistics {
312            server_count: 1, // TODO: Implement multi-server support
313            worker_count: 1, // TODO: Get from server configuration
314            queues: queue_stats,
315            jobs: job_stats,
316            recent_jobs,
317        })
318    }
319
320    /// Extract information from job state
321    fn extract_state_info(
322        state: &JobState,
323    ) -> (Option<String>, Option<DateTime<Utc>>, Option<u64>) {
324        match state {
325            JobState::Failed { exception, .. } => (Some(exception.clone()), None, None),
326            JobState::Succeeded { total_duration, .. } => (None, None, Some(*total_duration)),
327            JobState::Scheduled { enqueue_at, .. } => (None, Some(*enqueue_at), None),
328            JobState::AwaitingRetry {
329                last_exception,
330                retry_at,
331                ..
332            } => (Some(last_exception.clone()), Some(*retry_at), None),
333            _ => (None, None, None),
334        }
335    }
336}