hammerwork_web/api/
queues.rs

1//! Queue management API endpoints.
2//!
3//! This module provides REST API endpoints for managing and monitoring Hammerwork job queues,
4//! including queue statistics, actions, and job management within specific queues.
5//!
6//! # API Endpoints
7//!
8//! - `GET /api/queues` - List all queues with statistics
9//! - `GET /api/queues/{name}` - Get detailed statistics for a specific queue
10//! - `POST /api/queues/{name}/actions` - Perform actions on a queue (pause, resume, clear)
11//! - `GET /api/queues/{name}/jobs` - List jobs in a specific queue
12//!
13//! # Examples
14//!
15//! ## Queue Information Structure
16//!
17//! ```rust
18//! use hammerwork_web::api::queues::QueueInfo;
19//! use chrono::Utc;
20//!
21//! let queue_info = QueueInfo {
22//!     name: "email_queue".to_string(),
23//!     pending_count: 25,
24//!     running_count: 3,
25//!     completed_count: 1500,
26//!     failed_count: 12,
27//!     dead_count: 2,
28//!     avg_processing_time_ms: 250.5,
29//!     throughput_per_minute: 45.0,
30//!     error_rate: 0.008,
31//!     last_job_at: Some(Utc::now()),
32//!     oldest_pending_job: Some(Utc::now()),
33//! };
34//!
35//! assert_eq!(queue_info.name, "email_queue");
36//! assert_eq!(queue_info.pending_count, 25);
37//! assert_eq!(queue_info.running_count, 3);
38//! ```
39//!
40//! ## Queue Actions
41//!
42//! ```rust
43//! use hammerwork_web::api::queues::QueueActionRequest;
44//!
45//! let clear_dead_request = QueueActionRequest {
46//!     action: "clear_dead".to_string(),
47//!     confirm: Some(true),
48//! };
49//!
50//! let pause_request = QueueActionRequest {
51//!     action: "pause".to_string(),
52//!     confirm: None,
53//! };
54//!
55//! assert_eq!(clear_dead_request.action, "clear_dead");
56//! assert_eq!(pause_request.action, "pause");
57//! ```
58//!
59//! ## Detailed Queue Statistics
60//!
61//! ```rust
62//! use hammerwork_web::api::queues::{DetailedQueueStats, QueueInfo, HourlyThroughput, RecentError};
63//! use std::collections::HashMap;
64//! use chrono::Utc;
65//!
66//! let queue_info = QueueInfo {
67//!     name: "default".to_string(),
68//!     pending_count: 10,
69//!     running_count: 2,
70//!     completed_count: 500,
71//!     failed_count: 5,
72//!     dead_count: 1,
73//!     avg_processing_time_ms: 180.0,
74//!     throughput_per_minute: 30.0,
75//!     error_rate: 0.01,
76//!     last_job_at: None,
77//!     oldest_pending_job: None,
78//! };
79//!
80//! let mut priority_breakdown = HashMap::new();
81//! priority_breakdown.insert("high".to_string(), 5);
82//! priority_breakdown.insert("normal".to_string(), 15);
83//!
84//! let detailed_stats = DetailedQueueStats {
85//!     queue_info,
86//!     priority_breakdown,
87//!     status_breakdown: HashMap::new(),
88//!     hourly_throughput: vec![],
89//!     recent_errors: vec![],
90//! };
91//!
92//! assert_eq!(detailed_stats.queue_info.name, "default");
93//! assert_eq!(detailed_stats.priority_breakdown.get("high"), Some(&5));
94//! ```
95
96use super::{
97    ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams, SortParams,
98    with_filters, with_pagination, with_sort,
99};
100use hammerwork::queue::DatabaseQueue;
101use serde::{Deserialize, Serialize};
102use std::sync::Arc;
103use warp::{Filter, Reply};
104
105/// Queue information for API responses
106#[derive(Debug, Serialize, Clone)]
107pub struct QueueInfo {
108    pub name: String,
109    pub pending_count: u64,
110    pub running_count: u64,
111    pub completed_count: u64,
112    pub failed_count: u64,
113    pub dead_count: u64,
114    pub avg_processing_time_ms: f64,
115    pub throughput_per_minute: f64,
116    pub error_rate: f64,
117    pub last_job_at: Option<chrono::DateTime<chrono::Utc>>,
118    pub oldest_pending_job: Option<chrono::DateTime<chrono::Utc>>,
119}
120
121/// Detailed queue statistics
122#[derive(Debug, Serialize)]
123pub struct DetailedQueueStats {
124    pub queue_info: QueueInfo,
125    pub priority_breakdown: std::collections::HashMap<String, u64>,
126    pub status_breakdown: std::collections::HashMap<String, u64>,
127    pub hourly_throughput: Vec<HourlyThroughput>,
128    pub recent_errors: Vec<RecentError>,
129}
130
131/// Hourly throughput data point
132#[derive(Debug, Serialize)]
133pub struct HourlyThroughput {
134    pub hour: chrono::DateTime<chrono::Utc>,
135    pub completed: u64,
136    pub failed: u64,
137}
138
139/// Recent error information
140#[derive(Debug, Serialize)]
141pub struct RecentError {
142    pub job_id: String,
143    pub error_message: String,
144    pub occurred_at: chrono::DateTime<chrono::Utc>,
145    pub attempts: i32,
146}
147
148/// Queue action request
149#[derive(Debug, Deserialize)]
150pub struct QueueActionRequest {
151    pub action: String, // "pause", "resume", "clear_dead", "clear_completed"
152    pub confirm: Option<bool>,
153}
154
155/// Create queue routes
156pub fn routes<T>(
157    queue: Arc<T>,
158) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
159where
160    T: DatabaseQueue + Send + Sync + 'static,
161{
162    let queue_filter = warp::any().map(move || queue.clone());
163
164    let list_queues = warp::path("queues")
165        .and(warp::path::end())
166        .and(warp::get())
167        .and(queue_filter.clone())
168        .and(with_pagination())
169        .and(with_filters())
170        .and(with_sort())
171        .and_then(list_queues_handler);
172
173    let get_queue = warp::path("queues")
174        .and(warp::path::param::<String>())
175        .and(warp::path::end())
176        .and(warp::get())
177        .and(queue_filter.clone())
178        .and_then(get_queue_handler);
179
180    let queue_action = warp::path("queues")
181        .and(warp::path::param::<String>())
182        .and(warp::path("actions"))
183        .and(warp::path::end())
184        .and(warp::post())
185        .and(queue_filter.clone())
186        .and(warp::body::json())
187        .and_then(queue_action_handler);
188
189    let queue_jobs = warp::path("queues")
190        .and(warp::path::param::<String>())
191        .and(warp::path("jobs"))
192        .and(warp::path::end())
193        .and(warp::get())
194        .and(queue_filter)
195        .and(with_pagination())
196        .and(with_filters())
197        .and(with_sort())
198        .and_then(queue_jobs_handler);
199
200    list_queues.or(get_queue).or(queue_action).or(queue_jobs)
201}
202
203/// Handler for listing all queues
204async fn list_queues_handler<T>(
205    queue: Arc<T>,
206    pagination: PaginationParams,
207    _filters: FilterParams,
208    _sort: SortParams,
209) -> Result<impl Reply, warp::Rejection>
210where
211    T: DatabaseQueue + Send + Sync,
212{
213    // Get all queue statistics
214    match queue.get_all_queue_stats().await {
215        Ok(all_stats) => {
216            let mut queue_infos: Vec<QueueInfo> = Vec::new();
217
218            for stats in all_stats {
219                let queue_info = QueueInfo {
220                    name: stats.queue_name.clone(),
221                    pending_count: stats.pending_count,
222                    running_count: stats.running_count,
223                    completed_count: stats.completed_count,
224                    failed_count: stats.dead_count + stats.timed_out_count,
225                    dead_count: stats.dead_count,
226                    avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
227                    throughput_per_minute: stats.statistics.throughput_per_minute,
228                    error_rate: stats.statistics.error_rate,
229                    last_job_at: None,        // TODO: Get from database
230                    oldest_pending_job: None, // TODO: Get from database
231                };
232                queue_infos.push(queue_info);
233            }
234
235            // Apply pagination
236            let total = queue_infos.len() as u64;
237            let offset = pagination.get_offset() as usize;
238            let limit = pagination.get_limit() as usize;
239
240            let items = if offset < queue_infos.len() {
241                let end = (offset + limit).min(queue_infos.len());
242                queue_infos[offset..end].to_vec()
243            } else {
244                Vec::new()
245            };
246
247            let response = PaginatedResponse {
248                items,
249                pagination: PaginationMeta::new(&pagination, total),
250            };
251
252            Ok(warp::reply::json(&ApiResponse::success(response)))
253        }
254        Err(e) => {
255            let response =
256                ApiResponse::<()>::error(format!("Failed to get queue statistics: {}", e));
257            Ok(warp::reply::json(&response))
258        }
259    }
260}
261
262/// Handler for getting a specific queue
263async fn get_queue_handler<T>(
264    queue_name: String,
265    queue: Arc<T>,
266) -> Result<impl Reply, warp::Rejection>
267where
268    T: DatabaseQueue + Send + Sync,
269{
270    match queue.get_all_queue_stats().await {
271        Ok(all_stats) => {
272            if let Some(stats) = all_stats.into_iter().find(|s| s.queue_name == queue_name) {
273                // Get additional details for this specific queue
274                let priority_breakdown = std::collections::HashMap::new(); // TODO: Implement
275                let status_breakdown = std::collections::HashMap::new(); // TODO: Implement
276                let hourly_throughput = Vec::new(); // TODO: Implement
277                let recent_errors = Vec::new(); // TODO: Implement
278
279                let queue_info = QueueInfo {
280                    name: stats.queue_name.clone(),
281                    pending_count: stats.pending_count,
282                    running_count: stats.running_count,
283                    completed_count: stats.completed_count,
284                    failed_count: stats.dead_count + stats.timed_out_count,
285                    dead_count: stats.dead_count,
286                    avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
287                    throughput_per_minute: stats.statistics.throughput_per_minute,
288                    error_rate: stats.statistics.error_rate,
289                    last_job_at: None,
290                    oldest_pending_job: None,
291                };
292
293                let detailed_stats = DetailedQueueStats {
294                    queue_info,
295                    priority_breakdown,
296                    status_breakdown,
297                    hourly_throughput,
298                    recent_errors,
299                };
300
301                Ok(warp::reply::json(&ApiResponse::success(detailed_stats)))
302            } else {
303                let response =
304                    ApiResponse::<()>::error(format!("Queue '{}' not found", queue_name));
305                Ok(warp::reply::json(&response))
306            }
307        }
308        Err(e) => {
309            let response =
310                ApiResponse::<()>::error(format!("Failed to get queue statistics: {}", e));
311            Ok(warp::reply::json(&response))
312        }
313    }
314}
315
316/// Handler for queue actions (pause, resume, clear, etc.)
317async fn queue_action_handler<T>(
318    queue_name: String,
319    queue: Arc<T>,
320    action_request: QueueActionRequest,
321) -> Result<impl Reply, warp::Rejection>
322where
323    T: DatabaseQueue + Send + Sync,
324{
325    match action_request.action.as_str() {
326        "clear_dead" => {
327            let older_than = chrono::Utc::now() - chrono::Duration::days(7); // Remove jobs older than 7 days
328            match queue.purge_dead_jobs(older_than).await {
329                Ok(count) => {
330                    let response = ApiResponse::success(serde_json::json!({
331                        "message": format!("Cleared {} dead jobs from queue '{}'", count, queue_name),
332                        "count": count
333                    }));
334                    Ok(warp::reply::json(&response))
335                }
336                Err(e) => {
337                    let response =
338                        ApiResponse::<()>::error(format!("Failed to clear dead jobs: {}", e));
339                    Ok(warp::reply::json(&response))
340                }
341            }
342        }
343        "clear_completed" => {
344            // TODO: Implement clear completed jobs
345            let response =
346                ApiResponse::<()>::error("Clear completed jobs not yet implemented".to_string());
347            Ok(warp::reply::json(&response))
348        }
349        "pause" => {
350            // TODO: Implement queue pause
351            let response = ApiResponse::<()>::error("Queue pause not yet implemented".to_string());
352            Ok(warp::reply::json(&response))
353        }
354        "resume" => {
355            // TODO: Implement queue resume
356            let response = ApiResponse::<()>::error("Queue resume not yet implemented".to_string());
357            Ok(warp::reply::json(&response))
358        }
359        _ => {
360            let response =
361                ApiResponse::<()>::error(format!("Unknown action: {}", action_request.action));
362            Ok(warp::reply::json(&response))
363        }
364    }
365}
366
367/// Handler for getting jobs in a specific queue
368async fn queue_jobs_handler<T>(
369    queue_name: String,
370    queue: Arc<T>,
371    pagination: PaginationParams,
372    filters: FilterParams,
373    sort: SortParams,
374) -> Result<impl Reply, warp::Rejection>
375where
376    T: DatabaseQueue + Send + Sync,
377{
378    // This would delegate to the jobs API with queue filter
379    // For now, return a simple response
380    let _ = (queue, pagination, filters, sort);
381
382    let response = ApiResponse::success(serde_json::json!({
383        "message": format!("Jobs for queue '{}' - implementation pending", queue_name),
384        "queue": queue_name
385    }));
386
387    Ok(warp::reply::json(&response))
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393
394    #[test]
395    fn test_queue_action_request_deserialization() {
396        let json = r#"{"action": "clear_dead", "confirm": true}"#;
397        let request: QueueActionRequest = serde_json::from_str(json).unwrap();
398        assert_eq!(request.action, "clear_dead");
399        assert_eq!(request.confirm, Some(true));
400    }
401
402    #[test]
403    fn test_queue_info_serialization() {
404        let queue_info = QueueInfo {
405            name: "test_queue".to_string(),
406            pending_count: 42,
407            running_count: 3,
408            completed_count: 1000,
409            failed_count: 5,
410            dead_count: 2,
411            avg_processing_time_ms: 150.5,
412            throughput_per_minute: 25.0,
413            error_rate: 0.05,
414            last_job_at: None,
415            oldest_pending_job: None,
416        };
417
418        let json = serde_json::to_string(&queue_info).unwrap();
419        assert!(json.contains("test_queue"));
420        assert!(json.contains("42"));
421    }
422}