hammerwork_web/api/
queues.rs

1//! Queue management API endpoints.
2//!
3//! This module provides REST API endpoints for managing and monitoring Hammerwork job queues,
4//! including queue statistics, actions, and job management within specific queues.
5//!
6//! # API Endpoints
7//!
8//! - `GET /api/queues` - List all queues with statistics
9//! - `GET /api/queues/{name}` - Get detailed statistics for a specific queue
10//! - `POST /api/queues/{name}/actions` - Perform actions on a queue (pause, resume, clear)
11//! - `GET /api/queues/{name}/jobs` - List jobs in a specific queue
12//!
13//! # Examples
14//!
15//! ## Queue Information Structure
16//!
17//! ```rust
18//! use hammerwork_web::api::queues::QueueInfo;
19//! use chrono::Utc;
20//!
21//! let queue_info = QueueInfo {
22//!     name: "email_queue".to_string(),
23//!     pending_count: 25,
24//!     running_count: 3,
25//!     completed_count: 1500,
26//!     failed_count: 12,
27//!     dead_count: 2,
28//!     avg_processing_time_ms: 250.5,
29//!     throughput_per_minute: 45.0,
30//!     error_rate: 0.008,
31//!     last_job_at: Some(Utc::now()),
32//!     oldest_pending_job: Some(Utc::now()),
33//! };
34//!
35//! assert_eq!(queue_info.name, "email_queue");
36//! assert_eq!(queue_info.pending_count, 25);
37//! assert_eq!(queue_info.running_count, 3);
38//! ```
39//!
40//! ## Queue Actions
41//!
42//! ```rust
43//! use hammerwork_web::api::queues::QueueActionRequest;
44//!
45//! let clear_dead_request = QueueActionRequest {
46//!     action: "clear_dead".to_string(),
47//!     confirm: Some(true),
48//! };
49//!
50//! let pause_request = QueueActionRequest {
51//!     action: "pause".to_string(),
52//!     confirm: None,
53//! };
54//!
55//! assert_eq!(clear_dead_request.action, "clear_dead");
56//! assert_eq!(pause_request.action, "pause");
57//! ```
58//!
59//! ## Detailed Queue Statistics
60//!
61//! ```rust
62//! use hammerwork_web::api::queues::{DetailedQueueStats, QueueInfo, HourlyThroughput, RecentError};
63//! use std::collections::HashMap;
64//! use chrono::Utc;
65//!
66//! let queue_info = QueueInfo {
67//!     name: "default".to_string(),
68//!     pending_count: 10,
69//!     running_count: 2,
70//!     completed_count: 500,
71//!     failed_count: 5,
72//!     dead_count: 1,
73//!     avg_processing_time_ms: 180.0,
74//!     throughput_per_minute: 30.0,
75//!     error_rate: 0.01,
76//!     last_job_at: None,
77//!     oldest_pending_job: None,
78//! };
79//!
80//! let mut priority_breakdown = HashMap::new();
81//! priority_breakdown.insert("high".to_string(), 5);
82//! priority_breakdown.insert("normal".to_string(), 15);
83//!
84//! let detailed_stats = DetailedQueueStats {
85//!     queue_info,
86//!     priority_breakdown,
87//!     status_breakdown: HashMap::new(),
88//!     hourly_throughput: vec![],
89//!     recent_errors: vec![],
90//! };
91//!
92//! assert_eq!(detailed_stats.queue_info.name, "default");
93//! assert_eq!(detailed_stats.priority_breakdown.get("high"), Some(&5));
94//! ```
95
96use super::{
97    ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams, SortParams,
98    with_filters, with_pagination, with_sort,
99};
100use hammerwork::queue::DatabaseQueue;
101use serde::{Deserialize, Serialize};
102use std::sync::Arc;
103use warp::{Filter, Reply};
104
105/// Queue information for API responses
106#[derive(Debug, Serialize, Clone)]
107pub struct QueueInfo {
108    pub name: String,
109    pub pending_count: u64,
110    pub running_count: u64,
111    pub completed_count: u64,
112    pub failed_count: u64,
113    pub dead_count: u64,
114    pub avg_processing_time_ms: f64,
115    pub throughput_per_minute: f64,
116    pub error_rate: f64,
117    pub last_job_at: Option<chrono::DateTime<chrono::Utc>>,
118    pub oldest_pending_job: Option<chrono::DateTime<chrono::Utc>>,
119    pub is_paused: bool,
120    pub paused_at: Option<chrono::DateTime<chrono::Utc>>,
121    pub paused_by: Option<String>,
122}
123
124/// Detailed queue statistics
125#[derive(Debug, Serialize)]
126pub struct DetailedQueueStats {
127    pub queue_info: QueueInfo,
128    pub priority_breakdown: std::collections::HashMap<String, u64>,
129    pub status_breakdown: std::collections::HashMap<String, u64>,
130    pub hourly_throughput: Vec<HourlyThroughput>,
131    pub recent_errors: Vec<RecentError>,
132}
133
134/// Hourly throughput data point
135#[derive(Debug, Serialize)]
136pub struct HourlyThroughput {
137    pub hour: chrono::DateTime<chrono::Utc>,
138    pub completed: u64,
139    pub failed: u64,
140}
141
142/// Recent error information
143#[derive(Debug, Serialize)]
144pub struct RecentError {
145    pub job_id: String,
146    pub error_message: String,
147    pub occurred_at: chrono::DateTime<chrono::Utc>,
148    pub attempts: i32,
149}
150
151/// Queue action request
152#[derive(Debug, Deserialize)]
153pub struct QueueActionRequest {
154    pub action: String, // "pause", "resume", "clear_dead", "clear_completed"
155    pub confirm: Option<bool>,
156}
157
158/// Create queue routes
159pub fn routes<T>(
160    queue: Arc<T>,
161) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
162where
163    T: DatabaseQueue + Send + Sync + 'static,
164{
165    let queue_filter = warp::any().map(move || queue.clone());
166
167    let list_queues = warp::path("queues")
168        .and(warp::path::end())
169        .and(warp::get())
170        .and(queue_filter.clone())
171        .and(with_pagination())
172        .and(with_filters())
173        .and(with_sort())
174        .and_then(list_queues_handler);
175
176    let get_queue = warp::path("queues")
177        .and(warp::path::param::<String>())
178        .and(warp::path::end())
179        .and(warp::get())
180        .and(queue_filter.clone())
181        .and_then(get_queue_handler);
182
183    let queue_action = warp::path("queues")
184        .and(warp::path::param::<String>())
185        .and(warp::path("actions"))
186        .and(warp::path::end())
187        .and(warp::post())
188        .and(queue_filter.clone())
189        .and(warp::body::json())
190        .and_then(queue_action_handler);
191
192    let queue_jobs = warp::path("queues")
193        .and(warp::path::param::<String>())
194        .and(warp::path("jobs"))
195        .and(warp::path::end())
196        .and(warp::get())
197        .and(queue_filter)
198        .and(with_pagination())
199        .and(with_filters())
200        .and(with_sort())
201        .and_then(queue_jobs_handler);
202
203    list_queues.or(get_queue).or(queue_action).or(queue_jobs)
204}
205
206/// Handler for listing all queues
207async fn list_queues_handler<T>(
208    queue: Arc<T>,
209    pagination: PaginationParams,
210    _filters: FilterParams,
211    _sort: SortParams,
212) -> Result<impl Reply, warp::Rejection>
213where
214    T: DatabaseQueue + Send + Sync,
215{
216    // Get all queue statistics
217    match queue.get_all_queue_stats().await {
218        Ok(all_stats) => {
219            let mut queue_infos: Vec<QueueInfo> = Vec::new();
220
221            for stats in all_stats {
222                // Get pause information for this queue
223                let pause_info = queue.get_queue_pause_info(&stats.queue_name).await.unwrap_or(None);
224                
225                let queue_info = QueueInfo {
226                    name: stats.queue_name.clone(),
227                    pending_count: stats.pending_count,
228                    running_count: stats.running_count,
229                    completed_count: stats.completed_count,
230                    failed_count: stats.dead_count + stats.timed_out_count,
231                    dead_count: stats.dead_count,
232                    avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
233                    throughput_per_minute: stats.statistics.throughput_per_minute,
234                    error_rate: stats.statistics.error_rate,
235                    last_job_at: None,        // TODO: Get from database
236                    oldest_pending_job: None, // TODO: Get from database
237                    is_paused: pause_info.is_some(),
238                    paused_at: pause_info.as_ref().map(|p| p.paused_at),
239                    paused_by: pause_info.as_ref().and_then(|p| p.paused_by.clone()),
240                };
241                queue_infos.push(queue_info);
242            }
243
244            // Apply pagination
245            let total = queue_infos.len() as u64;
246            let offset = pagination.get_offset() as usize;
247            let limit = pagination.get_limit() as usize;
248
249            let items = if offset < queue_infos.len() {
250                let end = (offset + limit).min(queue_infos.len());
251                queue_infos[offset..end].to_vec()
252            } else {
253                Vec::new()
254            };
255
256            let response = PaginatedResponse {
257                items,
258                pagination: PaginationMeta::new(&pagination, total),
259            };
260
261            Ok(warp::reply::json(&ApiResponse::success(response)))
262        }
263        Err(e) => {
264            let response =
265                ApiResponse::<()>::error(format!("Failed to get queue statistics: {}", e));
266            Ok(warp::reply::json(&response))
267        }
268    }
269}
270
271/// Handler for getting a specific queue
272async fn get_queue_handler<T>(
273    queue_name: String,
274    queue: Arc<T>,
275) -> Result<impl Reply, warp::Rejection>
276where
277    T: DatabaseQueue + Send + Sync,
278{
279    match queue.get_all_queue_stats().await {
280        Ok(all_stats) => {
281            if let Some(stats) = all_stats.into_iter().find(|s| s.queue_name == queue_name) {
282                // Get additional details for this specific queue
283                let priority_breakdown = std::collections::HashMap::new(); // TODO: Implement
284                let status_breakdown = std::collections::HashMap::new(); // TODO: Implement
285                let hourly_throughput = Vec::new(); // TODO: Implement
286                let recent_errors = Vec::new(); // TODO: Implement
287
288                // Get pause information for this queue
289                let pause_info = queue.get_queue_pause_info(&queue_name).await.unwrap_or(None);
290                
291                let queue_info = QueueInfo {
292                    name: stats.queue_name.clone(),
293                    pending_count: stats.pending_count,
294                    running_count: stats.running_count,
295                    completed_count: stats.completed_count,
296                    failed_count: stats.dead_count + stats.timed_out_count,
297                    dead_count: stats.dead_count,
298                    avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
299                    throughput_per_minute: stats.statistics.throughput_per_minute,
300                    error_rate: stats.statistics.error_rate,
301                    last_job_at: None,
302                    oldest_pending_job: None,
303                    is_paused: pause_info.is_some(),
304                    paused_at: pause_info.as_ref().map(|p| p.paused_at),
305                    paused_by: pause_info.as_ref().and_then(|p| p.paused_by.clone()),
306                };
307
308                let detailed_stats = DetailedQueueStats {
309                    queue_info,
310                    priority_breakdown,
311                    status_breakdown,
312                    hourly_throughput,
313                    recent_errors,
314                };
315
316                Ok(warp::reply::json(&ApiResponse::success(detailed_stats)))
317            } else {
318                let response =
319                    ApiResponse::<()>::error(format!("Queue '{}' not found", queue_name));
320                Ok(warp::reply::json(&response))
321            }
322        }
323        Err(e) => {
324            let response =
325                ApiResponse::<()>::error(format!("Failed to get queue statistics: {}", e));
326            Ok(warp::reply::json(&response))
327        }
328    }
329}
330
331/// Handler for queue actions (pause, resume, clear, etc.)
332async fn queue_action_handler<T>(
333    queue_name: String,
334    queue: Arc<T>,
335    action_request: QueueActionRequest,
336) -> Result<impl Reply, warp::Rejection>
337where
338    T: DatabaseQueue + Send + Sync,
339{
340    match action_request.action.as_str() {
341        "clear_dead" => {
342            let older_than = chrono::Utc::now() - chrono::Duration::days(7); // Remove jobs older than 7 days
343            match queue.purge_dead_jobs(older_than).await {
344                Ok(count) => {
345                    let response = ApiResponse::success(serde_json::json!({
346                        "message": format!("Cleared {} dead jobs from queue '{}'", count, queue_name),
347                        "count": count
348                    }));
349                    Ok(warp::reply::json(&response))
350                }
351                Err(e) => {
352                    let response =
353                        ApiResponse::<()>::error(format!("Failed to clear dead jobs: {}", e));
354                    Ok(warp::reply::json(&response))
355                }
356            }
357        }
358        "clear_completed" => {
359            // TODO: Implement clear completed jobs
360            let response =
361                ApiResponse::<()>::error("Clear completed jobs not yet implemented".to_string());
362            Ok(warp::reply::json(&response))
363        }
364        "pause" => {
365            match queue.pause_queue(&queue_name, Some("web-ui")).await {
366                Ok(()) => {
367                    let response = ApiResponse::success(serde_json::json!({
368                        "message": format!("Queue '{}' has been paused", queue_name),
369                        "queue": queue_name,
370                        "action": "pause"
371                    }));
372                    Ok(warp::reply::json(&response))
373                }
374                Err(e) => {
375                    let response =
376                        ApiResponse::<()>::error(format!("Failed to pause queue: {}", e));
377                    Ok(warp::reply::json(&response))
378                }
379            }
380        }
381        "resume" => {
382            match queue.resume_queue(&queue_name, Some("web-ui")).await {
383                Ok(()) => {
384                    let response = ApiResponse::success(serde_json::json!({
385                        "message": format!("Queue '{}' has been resumed", queue_name),
386                        "queue": queue_name,
387                        "action": "resume"
388                    }));
389                    Ok(warp::reply::json(&response))
390                }
391                Err(e) => {
392                    let response =
393                        ApiResponse::<()>::error(format!("Failed to resume queue: {}", e));
394                    Ok(warp::reply::json(&response))
395                }
396            }
397        }
398        _ => {
399            let response =
400                ApiResponse::<()>::error(format!("Unknown action: {}", action_request.action));
401            Ok(warp::reply::json(&response))
402        }
403    }
404}
405
406/// Handler for getting jobs in a specific queue
407async fn queue_jobs_handler<T>(
408    queue_name: String,
409    queue: Arc<T>,
410    pagination: PaginationParams,
411    filters: FilterParams,
412    sort: SortParams,
413) -> Result<impl Reply, warp::Rejection>
414where
415    T: DatabaseQueue + Send + Sync,
416{
417    // This would delegate to the jobs API with queue filter
418    // For now, return a simple response
419    let _ = (queue, pagination, filters, sort);
420
421    let response = ApiResponse::success(serde_json::json!({
422        "message": format!("Jobs for queue '{}' - implementation pending", queue_name),
423        "queue": queue_name
424    }));
425
426    Ok(warp::reply::json(&response))
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432
433    #[test]
434    fn test_queue_action_request_deserialization() {
435        let json = r#"{"action": "clear_dead", "confirm": true}"#;
436        let request: QueueActionRequest = serde_json::from_str(json).unwrap();
437        assert_eq!(request.action, "clear_dead");
438        assert_eq!(request.confirm, Some(true));
439    }
440
441    #[test]
442    fn test_queue_info_serialization() {
443        let queue_info = QueueInfo {
444            name: "test_queue".to_string(),
445            pending_count: 42,
446            running_count: 3,
447            completed_count: 1000,
448            failed_count: 5,
449            dead_count: 2,
450            avg_processing_time_ms: 150.5,
451            throughput_per_minute: 25.0,
452            error_rate: 0.05,
453            last_job_at: None,
454            oldest_pending_job: None,
455            is_paused: false,
456            paused_at: None,
457            paused_by: None,
458        };
459
460        let json = serde_json::to_string(&queue_info).unwrap();
461        assert!(json.contains("test_queue"));
462        assert!(json.contains("42"));
463        assert!(json.contains("is_paused"));
464    }
465}