Skip to main content

hammerwork_web/api/
queues.rs

1//! Queue management API endpoints.
2//!
3//! This module provides REST API endpoints for managing and monitoring Hammerwork job queues,
4//! including queue statistics, actions, and job management within specific queues.
5//!
6//! # API Endpoints
7//!
8//! - `GET /api/queues` - List all queues with statistics
9//! - `GET /api/queues/{name}` - Get detailed statistics for a specific queue
10//! - `POST /api/queues/{name}/actions` - Perform actions on a queue (pause, resume, clear)
11//! - `GET /api/queues/{name}/jobs` - List jobs in a specific queue
12//!
13//! # Examples
14//!
15//! ## Queue Information Structure
16//!
17//! ```rust
18//! use hammerwork_web::api::queues::QueueInfo;
19//! use chrono::Utc;
20//!
21//! let queue_info = QueueInfo {
22//!     name: "email_queue".to_string(),
23//!     pending_count: 25,
24//!     running_count: 3,
25//!     completed_count: 1500,
26//!     failed_count: 12,
27//!     dead_count: 2,
28//!     avg_processing_time_ms: 250.5,
29//!     throughput_per_minute: 45.0,
30//!     error_rate: 0.008,
31//!     last_job_at: Some(Utc::now()),
32//!     oldest_pending_job: Some(Utc::now()),
33//! };
34//!
35//! assert_eq!(queue_info.name, "email_queue");
36//! assert_eq!(queue_info.pending_count, 25);
37//! assert_eq!(queue_info.running_count, 3);
38//! ```
39//!
40//! ## Queue Actions
41//!
42//! ```rust
43//! use hammerwork_web::api::queues::QueueActionRequest;
44//!
45//! let clear_dead_request = QueueActionRequest {
46//!     action: "clear_dead".to_string(),
47//!     confirm: Some(true),
48//! };
49//!
50//! let pause_request = QueueActionRequest {
51//!     action: "pause".to_string(),
52//!     confirm: None,
53//! };
54//!
55//! assert_eq!(clear_dead_request.action, "clear_dead");
56//! assert_eq!(pause_request.action, "pause");
57//! ```
58//!
59//! ## Detailed Queue Statistics
60//!
61//! ```rust
62//! use hammerwork_web::api::queues::{DetailedQueueStats, QueueInfo, HourlyThroughput, RecentError};
63//! use std::collections::HashMap;
64//! use chrono::Utc;
65//!
66//! let queue_info = QueueInfo {
67//!     name: "default".to_string(),
68//!     pending_count: 10,
69//!     running_count: 2,
70//!     completed_count: 500,
71//!     failed_count: 5,
72//!     dead_count: 1,
73//!     avg_processing_time_ms: 180.0,
74//!     throughput_per_minute: 30.0,
75//!     error_rate: 0.01,
76//!     last_job_at: None,
77//!     oldest_pending_job: None,
78//! };
79//!
80//! let mut priority_breakdown = HashMap::new();
81//! priority_breakdown.insert("high".to_string(), 5);
82//! priority_breakdown.insert("normal".to_string(), 15);
83//!
84//! let detailed_stats = DetailedQueueStats {
85//!     queue_info,
86//!     priority_breakdown,
87//!     status_breakdown: HashMap::new(),
88//!     hourly_throughput: vec![],
89//!     recent_errors: vec![],
90//! };
91//!
92//! assert_eq!(detailed_stats.queue_info.name, "default");
93//! assert_eq!(detailed_stats.priority_breakdown.get("high"), Some(&5));
94//! ```
95
96use super::{
97    ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams, SortParams,
98    with_filters, with_pagination, with_sort,
99};
100use hammerwork::{JobPriority, queue::DatabaseQueue};
101use serde::{Deserialize, Serialize};
102use std::sync::Arc;
103use warp::{Filter, Reply};
104
105/// Queue information for API responses
106#[derive(Debug, Serialize, Clone)]
107pub struct QueueInfo {
108    pub name: String,
109    pub pending_count: u64,
110    pub running_count: u64,
111    pub completed_count: u64,
112    pub failed_count: u64,
113    pub dead_count: u64,
114    pub avg_processing_time_ms: f64,
115    pub throughput_per_minute: f64,
116    pub error_rate: f64,
117    pub last_job_at: Option<chrono::DateTime<chrono::Utc>>,
118    pub oldest_pending_job: Option<chrono::DateTime<chrono::Utc>>,
119    pub is_paused: bool,
120    pub paused_at: Option<chrono::DateTime<chrono::Utc>>,
121    pub paused_by: Option<String>,
122}
123
124/// Detailed queue statistics
125#[derive(Debug, Serialize)]
126pub struct DetailedQueueStats {
127    pub queue_info: QueueInfo,
128    pub priority_breakdown: std::collections::HashMap<String, u64>,
129    pub status_breakdown: std::collections::HashMap<String, u64>,
130    pub hourly_throughput: Vec<HourlyThroughput>,
131    pub recent_errors: Vec<RecentError>,
132}
133
134/// Hourly throughput data point
135#[derive(Debug, Serialize)]
136pub struct HourlyThroughput {
137    pub hour: chrono::DateTime<chrono::Utc>,
138    pub completed: u64,
139    pub failed: u64,
140}
141
142/// Recent error information
143#[derive(Debug, Serialize)]
144pub struct RecentError {
145    pub job_id: String,
146    pub error_message: String,
147    pub occurred_at: chrono::DateTime<chrono::Utc>,
148    pub attempts: i32,
149}
150
151/// Queue action request
152#[derive(Debug, Deserialize)]
153pub struct QueueActionRequest {
154    pub action: String, // "pause", "resume", "clear_dead", "clear_completed"
155    pub confirm: Option<bool>,
156}
157
158/// Create queue routes
159pub fn routes<T>(
160    queue: Arc<T>,
161) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
162where
163    T: DatabaseQueue + Send + Sync + 'static,
164{
165    let queue_filter = warp::any().map(move || queue.clone());
166
167    let list_queues = warp::path("queues")
168        .and(warp::path::end())
169        .and(warp::get())
170        .and(queue_filter.clone())
171        .and(with_pagination())
172        .and(with_filters())
173        .and(with_sort())
174        .and_then(list_queues_handler);
175
176    let get_queue = warp::path("queues")
177        .and(warp::path::param::<String>())
178        .and(warp::path::end())
179        .and(warp::get())
180        .and(queue_filter.clone())
181        .and_then(get_queue_handler);
182
183    let queue_action = warp::path("queues")
184        .and(warp::path::param::<String>())
185        .and(warp::path("actions"))
186        .and(warp::path::end())
187        .and(warp::post())
188        .and(queue_filter.clone())
189        .and(warp::body::json())
190        .and_then(queue_action_handler);
191
192    let queue_jobs = warp::path("queues")
193        .and(warp::path::param::<String>())
194        .and(warp::path("jobs"))
195        .and(warp::path::end())
196        .and(warp::get())
197        .and(queue_filter)
198        .and(with_pagination())
199        .and(with_filters())
200        .and(with_sort())
201        .and_then(queue_jobs_handler);
202
203    list_queues.or(get_queue).or(queue_action).or(queue_jobs)
204}
205
206/// Handler for listing all queues
207async fn list_queues_handler<T>(
208    queue: Arc<T>,
209    pagination: PaginationParams,
210    _filters: FilterParams,
211    _sort: SortParams,
212) -> Result<impl Reply, warp::Rejection>
213where
214    T: DatabaseQueue + Send + Sync,
215{
216    // Get all queue statistics
217    match queue.get_all_queue_stats().await {
218        Ok(all_stats) => {
219            let mut queue_infos: Vec<QueueInfo> = Vec::new();
220
221            for stats in all_stats {
222                // Get pause information for this queue
223                let pause_info = queue
224                    .get_queue_pause_info(&stats.queue_name)
225                    .await
226                    .unwrap_or(None);
227
228                let queue_info = QueueInfo {
229                    name: stats.queue_name.clone(),
230                    pending_count: stats.pending_count,
231                    running_count: stats.running_count,
232                    completed_count: stats.completed_count,
233                    failed_count: stats.dead_count + stats.timed_out_count,
234                    dead_count: stats.dead_count,
235                    avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
236                    throughput_per_minute: stats.statistics.throughput_per_minute,
237                    error_rate: stats.statistics.error_rate,
238                    last_job_at: get_last_job_time(&queue, &stats.queue_name).await,
239                    oldest_pending_job: get_oldest_pending_job(&queue, &stats.queue_name).await,
240                    is_paused: pause_info.is_some(),
241                    paused_at: pause_info.as_ref().map(|p| p.paused_at),
242                    paused_by: pause_info.as_ref().and_then(|p| p.paused_by.clone()),
243                };
244                queue_infos.push(queue_info);
245            }
246
247            // Apply pagination
248            let total = queue_infos.len() as u64;
249            let offset = pagination.get_offset() as usize;
250            let limit = pagination.get_limit() as usize;
251
252            let items = if offset < queue_infos.len() {
253                let end = (offset + limit).min(queue_infos.len());
254                queue_infos[offset..end].to_vec()
255            } else {
256                Vec::new()
257            };
258
259            let response = PaginatedResponse {
260                items,
261                pagination: PaginationMeta::new(&pagination, total),
262            };
263
264            Ok(warp::reply::json(&ApiResponse::success(response)))
265        }
266        Err(e) => {
267            let response =
268                ApiResponse::<()>::error(format!("Failed to get queue statistics: {}", e));
269            Ok(warp::reply::json(&response))
270        }
271    }
272}
273
274/// Handler for getting a specific queue
275async fn get_queue_handler<T>(
276    queue_name: String,
277    queue: Arc<T>,
278) -> Result<impl Reply, warp::Rejection>
279where
280    T: DatabaseQueue + Send + Sync,
281{
282    match queue.get_all_queue_stats().await {
283        Ok(all_stats) => {
284            if let Some(stats) = all_stats.into_iter().find(|s| s.queue_name == queue_name) {
285                // Get additional details for this specific queue
286                let priority_breakdown = get_priority_breakdown(&queue, &queue_name).await;
287                let status_breakdown = get_status_breakdown(&queue, &queue_name).await;
288                let hourly_throughput = get_hourly_throughput(&queue, &queue_name).await;
289                let recent_errors = get_recent_errors(&queue, &queue_name).await;
290
291                // Get pause information for this queue
292                let pause_info = queue
293                    .get_queue_pause_info(&queue_name)
294                    .await
295                    .unwrap_or(None);
296
297                let queue_info = QueueInfo {
298                    name: stats.queue_name.clone(),
299                    pending_count: stats.pending_count,
300                    running_count: stats.running_count,
301                    completed_count: stats.completed_count,
302                    failed_count: stats.dead_count + stats.timed_out_count,
303                    dead_count: stats.dead_count,
304                    avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
305                    throughput_per_minute: stats.statistics.throughput_per_minute,
306                    error_rate: stats.statistics.error_rate,
307                    last_job_at: None,
308                    oldest_pending_job: None,
309                    is_paused: pause_info.is_some(),
310                    paused_at: pause_info.as_ref().map(|p| p.paused_at),
311                    paused_by: pause_info.as_ref().and_then(|p| p.paused_by.clone()),
312                };
313
314                let detailed_stats = DetailedQueueStats {
315                    queue_info,
316                    priority_breakdown,
317                    status_breakdown,
318                    hourly_throughput,
319                    recent_errors,
320                };
321
322                Ok(warp::reply::json(&ApiResponse::success(detailed_stats)))
323            } else {
324                let response =
325                    ApiResponse::<()>::error(format!("Queue '{}' not found", queue_name));
326                Ok(warp::reply::json(&response))
327            }
328        }
329        Err(e) => {
330            let response =
331                ApiResponse::<()>::error(format!("Failed to get queue statistics: {}", e));
332            Ok(warp::reply::json(&response))
333        }
334    }
335}
336
337/// Handler for queue actions (pause, resume, clear, etc.)
338async fn queue_action_handler<T>(
339    queue_name: String,
340    queue: Arc<T>,
341    action_request: QueueActionRequest,
342) -> Result<impl Reply, warp::Rejection>
343where
344    T: DatabaseQueue + Send + Sync,
345{
346    match action_request.action.as_str() {
347        "clear_dead" => {
348            let older_than = chrono::Utc::now() - chrono::Duration::days(7); // Remove jobs older than 7 days
349            match queue.purge_dead_jobs(older_than).await {
350                Ok(count) => {
351                    let response = ApiResponse::success(serde_json::json!({
352                        "message": format!("Cleared {} dead jobs from queue '{}'", count, queue_name),
353                        "count": count
354                    }));
355                    Ok(warp::reply::json(&response))
356                }
357                Err(e) => {
358                    let response =
359                        ApiResponse::<()>::error(format!("Failed to clear dead jobs: {}", e));
360                    Ok(warp::reply::json(&response))
361                }
362            }
363        }
364        "clear_completed" => match clear_completed_jobs(&queue, &queue_name).await {
365            Ok(count) => {
366                let response = ApiResponse::success(serde_json::json!({
367                    "message": format!("Cleared {} completed jobs from queue '{}'", count, queue_name),
368                    "queue": queue_name,
369                    "cleared_count": count
370                }));
371                Ok(warp::reply::json(&response))
372            }
373            Err(e) => {
374                let response =
375                    ApiResponse::<()>::error(format!("Failed to clear completed jobs: {}", e));
376                Ok(warp::reply::json(&response))
377            }
378        },
379        "pause" => match queue.pause_queue(&queue_name, Some("web-ui")).await {
380            Ok(()) => {
381                let response = ApiResponse::success(serde_json::json!({
382                    "message": format!("Queue '{}' has been paused", queue_name),
383                    "queue": queue_name,
384                    "action": "pause"
385                }));
386                Ok(warp::reply::json(&response))
387            }
388            Err(e) => {
389                let response = ApiResponse::<()>::error(format!("Failed to pause queue: {}", e));
390                Ok(warp::reply::json(&response))
391            }
392        },
393        "resume" => match queue.resume_queue(&queue_name, Some("web-ui")).await {
394            Ok(()) => {
395                let response = ApiResponse::success(serde_json::json!({
396                    "message": format!("Queue '{}' has been resumed", queue_name),
397                    "queue": queue_name,
398                    "action": "resume"
399                }));
400                Ok(warp::reply::json(&response))
401            }
402            Err(e) => {
403                let response = ApiResponse::<()>::error(format!("Failed to resume queue: {}", e));
404                Ok(warp::reply::json(&response))
405            }
406        },
407        _ => {
408            let response =
409                ApiResponse::<()>::error(format!("Unknown action: {}", action_request.action));
410            Ok(warp::reply::json(&response))
411        }
412    }
413}
414
415/// Handler for getting jobs in a specific queue
416async fn queue_jobs_handler<T>(
417    queue_name: String,
418    queue: Arc<T>,
419    pagination: PaginationParams,
420    filters: FilterParams,
421    sort: SortParams,
422) -> Result<impl Reply, warp::Rejection>
423where
424    T: DatabaseQueue + Send + Sync,
425{
426    // This would delegate to the jobs API with queue filter
427    // For now, return a simple response
428    let _ = (queue, pagination, filters, sort);
429
430    let response = ApiResponse::success(serde_json::json!({
431        "message": format!("Jobs for queue '{}' - implementation pending", queue_name),
432        "queue": queue_name
433    }));
434
435    Ok(warp::reply::json(&response))
436}
437
438/// Helper function to get the last job time for a queue
439async fn get_last_job_time<T>(
440    queue: &Arc<T>,
441    queue_name: &str,
442) -> Option<chrono::DateTime<chrono::Utc>>
443where
444    T: DatabaseQueue + Send + Sync,
445{
446    // Get recent jobs from multiple sources and find the most recent timestamp
447    let mut latest_time: Option<chrono::DateTime<chrono::Utc>> = None;
448
449    // Check ready jobs
450    if let Ok(ready_jobs) = queue.get_ready_jobs(queue_name, 10).await {
451        for job in ready_jobs {
452            if let Some(time) = job.completed_at.or(job.started_at).or(Some(job.created_at)) {
453                latest_time = match latest_time {
454                    Some(current) if time > current => Some(time),
455                    None => Some(time),
456                    _ => latest_time,
457                };
458            }
459        }
460    }
461
462    // Check dead jobs
463    if let Ok(dead_jobs) = queue
464        .get_dead_jobs_by_queue(queue_name, Some(10), Some(0))
465        .await
466    {
467        for job in dead_jobs {
468            if let Some(time) = job
469                .failed_at
470                .or(job.completed_at)
471                .or(job.started_at)
472                .or(Some(job.created_at))
473            {
474                latest_time = match latest_time {
475                    Some(current) if time > current => Some(time),
476                    None => Some(time),
477                    _ => latest_time,
478                };
479            }
480        }
481    }
482
483    latest_time
484}
485
486/// Helper function to get the oldest pending job time for a queue
487async fn get_oldest_pending_job<T>(
488    queue: &Arc<T>,
489    queue_name: &str,
490) -> Option<chrono::DateTime<chrono::Utc>>
491where
492    T: DatabaseQueue + Send + Sync,
493{
494    // Get ready jobs (these are pending jobs) and find the oldest
495    if let Ok(ready_jobs) = queue.get_ready_jobs(queue_name, 100).await {
496        ready_jobs
497            .iter()
498            .filter(|job| matches!(job.status, hammerwork::job::JobStatus::Pending))
499            .map(|job| job.created_at)
500            .min()
501    } else {
502        None
503    }
504}
505
506/// Helper function to get priority breakdown for a queue
507async fn get_priority_breakdown<T>(
508    queue: &Arc<T>,
509    queue_name: &str,
510) -> std::collections::HashMap<String, u64>
511where
512    T: DatabaseQueue + Send + Sync,
513{
514    // Use the new get_priority_stats method
515    if let Ok(priority_stats) = queue.get_priority_stats(queue_name).await {
516        let mut breakdown = std::collections::HashMap::new();
517        for (priority, count) in priority_stats.job_counts {
518            let priority_name = match priority {
519                JobPriority::Background => "background",
520                JobPriority::Low => "low",
521                JobPriority::Normal => "normal",
522                JobPriority::High => "high",
523                JobPriority::Critical => "critical",
524            };
525            breakdown.insert(priority_name.to_string(), count);
526        }
527        breakdown
528    } else {
529        std::collections::HashMap::new()
530    }
531}
532
533/// Helper function to get status breakdown for a queue
534async fn get_status_breakdown<T>(
535    queue: &Arc<T>,
536    queue_name: &str,
537) -> std::collections::HashMap<String, u64>
538where
539    T: DatabaseQueue + Send + Sync,
540{
541    // Use existing job counts method
542    if let Ok(counts) = queue.get_job_counts_by_status(queue_name).await {
543        counts.into_iter().collect()
544    } else {
545        std::collections::HashMap::new()
546    }
547}
548
549/// Helper function to get hourly throughput data for a queue
550async fn get_hourly_throughput<T>(_queue: &Arc<T>, _queue_name: &str) -> Vec<HourlyThroughput>
551where
552    T: DatabaseQueue + Send + Sync,
553{
554    // This would require tracking hourly statistics
555    // For now, return empty as it's not implemented in the DatabaseQueue trait
556    Vec::new()
557}
558
559/// Helper function to get recent errors for a queue
560async fn get_recent_errors<T>(queue: &Arc<T>, queue_name: &str) -> Vec<RecentError>
561where
562    T: DatabaseQueue + Send + Sync,
563{
564    // Get dead jobs which contain failed jobs with error messages
565    if let Ok(dead_jobs) = queue
566        .get_dead_jobs_by_queue(queue_name, Some(20), Some(0))
567        .await
568    {
569        dead_jobs
570            .into_iter()
571            .filter_map(|job| {
572                job.error_message.map(|error_msg| RecentError {
573                    job_id: job.id.to_string(),
574                    error_message: error_msg,
575                    occurred_at: job.failed_at.unwrap_or(job.created_at),
576                    attempts: job.attempts as i32,
577                })
578            })
579            .collect()
580    } else {
581        Vec::new()
582    }
583}
584
585/// Helper function to clear completed jobs from a queue
586async fn clear_completed_jobs<T>(_queue: &Arc<T>, _queue_name: &str) -> Result<u64, String>
587where
588    T: DatabaseQueue + Send + Sync,
589{
590    // To implement this properly, we would need:
591    // 1. A method to query jobs by status (get_jobs_by_status)
592    // 2. Filter for completed jobs
593    // 3. Delete them using the existing delete_job method
594    //
595    // Since DatabaseQueue trait doesn't provide a way to query jobs by status,
596    // we cannot implement this functionality without extending the trait.
597    Err(
598        "Clear completed jobs requires additional DatabaseQueue methods not yet available"
599            .to_string(),
600    )
601}
602
603#[cfg(test)]
604mod tests {
605    use super::*;
606
607    #[test]
608    fn test_queue_action_request_deserialization() {
609        let json = r#"{"action": "clear_dead", "confirm": true}"#;
610        let request: QueueActionRequest = serde_json::from_str(json).unwrap();
611        assert_eq!(request.action, "clear_dead");
612        assert_eq!(request.confirm, Some(true));
613    }
614
615    #[test]
616    fn test_queue_info_serialization() {
617        let queue_info = QueueInfo {
618            name: "test_queue".to_string(),
619            pending_count: 42,
620            running_count: 3,
621            completed_count: 1000,
622            failed_count: 5,
623            dead_count: 2,
624            avg_processing_time_ms: 150.5,
625            throughput_per_minute: 25.0,
626            error_rate: 0.05,
627            last_job_at: None,
628            oldest_pending_job: None,
629            is_paused: false,
630            paused_at: None,
631            paused_by: None,
632        };
633
634        let json = serde_json::to_string(&queue_info).unwrap();
635        assert!(json.contains("test_queue"));
636        assert!(json.contains("42"));
637        assert!(json.contains("is_paused"));
638    }
639}