1use super::{
97 ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams, SortParams,
98 with_filters, with_pagination, with_sort,
99};
100use hammerwork::{JobPriority, queue::DatabaseQueue};
101use serde::{Deserialize, Serialize};
102use std::sync::Arc;
103use warp::{Filter, Reply};
104
105#[derive(Debug, Serialize, Clone)]
107pub struct QueueInfo {
108 pub name: String,
109 pub pending_count: u64,
110 pub running_count: u64,
111 pub completed_count: u64,
112 pub failed_count: u64,
113 pub dead_count: u64,
114 pub avg_processing_time_ms: f64,
115 pub throughput_per_minute: f64,
116 pub error_rate: f64,
117 pub last_job_at: Option<chrono::DateTime<chrono::Utc>>,
118 pub oldest_pending_job: Option<chrono::DateTime<chrono::Utc>>,
119 pub is_paused: bool,
120 pub paused_at: Option<chrono::DateTime<chrono::Utc>>,
121 pub paused_by: Option<String>,
122}
123
124#[derive(Debug, Serialize)]
126pub struct DetailedQueueStats {
127 pub queue_info: QueueInfo,
128 pub priority_breakdown: std::collections::HashMap<String, u64>,
129 pub status_breakdown: std::collections::HashMap<String, u64>,
130 pub hourly_throughput: Vec<HourlyThroughput>,
131 pub recent_errors: Vec<RecentError>,
132}
133
134#[derive(Debug, Serialize)]
136pub struct HourlyThroughput {
137 pub hour: chrono::DateTime<chrono::Utc>,
138 pub completed: u64,
139 pub failed: u64,
140}
141
142#[derive(Debug, Serialize)]
144pub struct RecentError {
145 pub job_id: String,
146 pub error_message: String,
147 pub occurred_at: chrono::DateTime<chrono::Utc>,
148 pub attempts: i32,
149}
150
151#[derive(Debug, Deserialize)]
153pub struct QueueActionRequest {
154 pub action: String, pub confirm: Option<bool>,
156}
157
158pub fn routes<T>(
160 queue: Arc<T>,
161) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
162where
163 T: DatabaseQueue + Send + Sync + 'static,
164{
165 let queue_filter = warp::any().map(move || queue.clone());
166
167 let list_queues = warp::path("queues")
168 .and(warp::path::end())
169 .and(warp::get())
170 .and(queue_filter.clone())
171 .and(with_pagination())
172 .and(with_filters())
173 .and(with_sort())
174 .and_then(list_queues_handler);
175
176 let get_queue = warp::path("queues")
177 .and(warp::path::param::<String>())
178 .and(warp::path::end())
179 .and(warp::get())
180 .and(queue_filter.clone())
181 .and_then(get_queue_handler);
182
183 let queue_action = warp::path("queues")
184 .and(warp::path::param::<String>())
185 .and(warp::path("actions"))
186 .and(warp::path::end())
187 .and(warp::post())
188 .and(queue_filter.clone())
189 .and(warp::body::json())
190 .and_then(queue_action_handler);
191
192 let queue_jobs = warp::path("queues")
193 .and(warp::path::param::<String>())
194 .and(warp::path("jobs"))
195 .and(warp::path::end())
196 .and(warp::get())
197 .and(queue_filter)
198 .and(with_pagination())
199 .and(with_filters())
200 .and(with_sort())
201 .and_then(queue_jobs_handler);
202
203 list_queues.or(get_queue).or(queue_action).or(queue_jobs)
204}
205
206async fn list_queues_handler<T>(
208 queue: Arc<T>,
209 pagination: PaginationParams,
210 _filters: FilterParams,
211 _sort: SortParams,
212) -> Result<impl Reply, warp::Rejection>
213where
214 T: DatabaseQueue + Send + Sync,
215{
216 match queue.get_all_queue_stats().await {
218 Ok(all_stats) => {
219 let mut queue_infos: Vec<QueueInfo> = Vec::new();
220
221 for stats in all_stats {
222 let pause_info = queue
224 .get_queue_pause_info(&stats.queue_name)
225 .await
226 .unwrap_or(None);
227
228 let queue_info = QueueInfo {
229 name: stats.queue_name.clone(),
230 pending_count: stats.pending_count,
231 running_count: stats.running_count,
232 completed_count: stats.completed_count,
233 failed_count: stats.dead_count + stats.timed_out_count,
234 dead_count: stats.dead_count,
235 avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
236 throughput_per_minute: stats.statistics.throughput_per_minute,
237 error_rate: stats.statistics.error_rate,
238 last_job_at: get_last_job_time(&queue, &stats.queue_name).await,
239 oldest_pending_job: get_oldest_pending_job(&queue, &stats.queue_name).await,
240 is_paused: pause_info.is_some(),
241 paused_at: pause_info.as_ref().map(|p| p.paused_at),
242 paused_by: pause_info.as_ref().and_then(|p| p.paused_by.clone()),
243 };
244 queue_infos.push(queue_info);
245 }
246
247 let total = queue_infos.len() as u64;
249 let offset = pagination.get_offset() as usize;
250 let limit = pagination.get_limit() as usize;
251
252 let items = if offset < queue_infos.len() {
253 let end = (offset + limit).min(queue_infos.len());
254 queue_infos[offset..end].to_vec()
255 } else {
256 Vec::new()
257 };
258
259 let response = PaginatedResponse {
260 items,
261 pagination: PaginationMeta::new(&pagination, total),
262 };
263
264 Ok(warp::reply::json(&ApiResponse::success(response)))
265 }
266 Err(e) => {
267 let response =
268 ApiResponse::<()>::error(format!("Failed to get queue statistics: {}", e));
269 Ok(warp::reply::json(&response))
270 }
271 }
272}
273
274async fn get_queue_handler<T>(
276 queue_name: String,
277 queue: Arc<T>,
278) -> Result<impl Reply, warp::Rejection>
279where
280 T: DatabaseQueue + Send + Sync,
281{
282 match queue.get_all_queue_stats().await {
283 Ok(all_stats) => {
284 if let Some(stats) = all_stats.into_iter().find(|s| s.queue_name == queue_name) {
285 let priority_breakdown = get_priority_breakdown(&queue, &queue_name).await;
287 let status_breakdown = get_status_breakdown(&queue, &queue_name).await;
288 let hourly_throughput = get_hourly_throughput(&queue, &queue_name).await;
289 let recent_errors = get_recent_errors(&queue, &queue_name).await;
290
291 let pause_info = queue
293 .get_queue_pause_info(&queue_name)
294 .await
295 .unwrap_or(None);
296
297 let queue_info = QueueInfo {
298 name: stats.queue_name.clone(),
299 pending_count: stats.pending_count,
300 running_count: stats.running_count,
301 completed_count: stats.completed_count,
302 failed_count: stats.dead_count + stats.timed_out_count,
303 dead_count: stats.dead_count,
304 avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
305 throughput_per_minute: stats.statistics.throughput_per_minute,
306 error_rate: stats.statistics.error_rate,
307 last_job_at: None,
308 oldest_pending_job: None,
309 is_paused: pause_info.is_some(),
310 paused_at: pause_info.as_ref().map(|p| p.paused_at),
311 paused_by: pause_info.as_ref().and_then(|p| p.paused_by.clone()),
312 };
313
314 let detailed_stats = DetailedQueueStats {
315 queue_info,
316 priority_breakdown,
317 status_breakdown,
318 hourly_throughput,
319 recent_errors,
320 };
321
322 Ok(warp::reply::json(&ApiResponse::success(detailed_stats)))
323 } else {
324 let response =
325 ApiResponse::<()>::error(format!("Queue '{}' not found", queue_name));
326 Ok(warp::reply::json(&response))
327 }
328 }
329 Err(e) => {
330 let response =
331 ApiResponse::<()>::error(format!("Failed to get queue statistics: {}", e));
332 Ok(warp::reply::json(&response))
333 }
334 }
335}
336
337async fn queue_action_handler<T>(
339 queue_name: String,
340 queue: Arc<T>,
341 action_request: QueueActionRequest,
342) -> Result<impl Reply, warp::Rejection>
343where
344 T: DatabaseQueue + Send + Sync,
345{
346 match action_request.action.as_str() {
347 "clear_dead" => {
348 let older_than = chrono::Utc::now() - chrono::Duration::days(7); match queue.purge_dead_jobs(older_than).await {
350 Ok(count) => {
351 let response = ApiResponse::success(serde_json::json!({
352 "message": format!("Cleared {} dead jobs from queue '{}'", count, queue_name),
353 "count": count
354 }));
355 Ok(warp::reply::json(&response))
356 }
357 Err(e) => {
358 let response =
359 ApiResponse::<()>::error(format!("Failed to clear dead jobs: {}", e));
360 Ok(warp::reply::json(&response))
361 }
362 }
363 }
364 "clear_completed" => match clear_completed_jobs(&queue, &queue_name).await {
365 Ok(count) => {
366 let response = ApiResponse::success(serde_json::json!({
367 "message": format!("Cleared {} completed jobs from queue '{}'", count, queue_name),
368 "queue": queue_name,
369 "cleared_count": count
370 }));
371 Ok(warp::reply::json(&response))
372 }
373 Err(e) => {
374 let response =
375 ApiResponse::<()>::error(format!("Failed to clear completed jobs: {}", e));
376 Ok(warp::reply::json(&response))
377 }
378 },
379 "pause" => match queue.pause_queue(&queue_name, Some("web-ui")).await {
380 Ok(()) => {
381 let response = ApiResponse::success(serde_json::json!({
382 "message": format!("Queue '{}' has been paused", queue_name),
383 "queue": queue_name,
384 "action": "pause"
385 }));
386 Ok(warp::reply::json(&response))
387 }
388 Err(e) => {
389 let response = ApiResponse::<()>::error(format!("Failed to pause queue: {}", e));
390 Ok(warp::reply::json(&response))
391 }
392 },
393 "resume" => match queue.resume_queue(&queue_name, Some("web-ui")).await {
394 Ok(()) => {
395 let response = ApiResponse::success(serde_json::json!({
396 "message": format!("Queue '{}' has been resumed", queue_name),
397 "queue": queue_name,
398 "action": "resume"
399 }));
400 Ok(warp::reply::json(&response))
401 }
402 Err(e) => {
403 let response = ApiResponse::<()>::error(format!("Failed to resume queue: {}", e));
404 Ok(warp::reply::json(&response))
405 }
406 },
407 _ => {
408 let response =
409 ApiResponse::<()>::error(format!("Unknown action: {}", action_request.action));
410 Ok(warp::reply::json(&response))
411 }
412 }
413}
414
415async fn queue_jobs_handler<T>(
417 queue_name: String,
418 queue: Arc<T>,
419 pagination: PaginationParams,
420 filters: FilterParams,
421 sort: SortParams,
422) -> Result<impl Reply, warp::Rejection>
423where
424 T: DatabaseQueue + Send + Sync,
425{
426 let _ = (queue, pagination, filters, sort);
429
430 let response = ApiResponse::success(serde_json::json!({
431 "message": format!("Jobs for queue '{}' - implementation pending", queue_name),
432 "queue": queue_name
433 }));
434
435 Ok(warp::reply::json(&response))
436}
437
438async fn get_last_job_time<T>(
440 queue: &Arc<T>,
441 queue_name: &str,
442) -> Option<chrono::DateTime<chrono::Utc>>
443where
444 T: DatabaseQueue + Send + Sync,
445{
446 let mut latest_time: Option<chrono::DateTime<chrono::Utc>> = None;
448
449 if let Ok(ready_jobs) = queue.get_ready_jobs(queue_name, 10).await {
451 for job in ready_jobs {
452 if let Some(time) = job.completed_at.or(job.started_at).or(Some(job.created_at)) {
453 latest_time = match latest_time {
454 Some(current) if time > current => Some(time),
455 None => Some(time),
456 _ => latest_time,
457 };
458 }
459 }
460 }
461
462 if let Ok(dead_jobs) = queue
464 .get_dead_jobs_by_queue(queue_name, Some(10), Some(0))
465 .await
466 {
467 for job in dead_jobs {
468 if let Some(time) = job
469 .failed_at
470 .or(job.completed_at)
471 .or(job.started_at)
472 .or(Some(job.created_at))
473 {
474 latest_time = match latest_time {
475 Some(current) if time > current => Some(time),
476 None => Some(time),
477 _ => latest_time,
478 };
479 }
480 }
481 }
482
483 latest_time
484}
485
486async fn get_oldest_pending_job<T>(
488 queue: &Arc<T>,
489 queue_name: &str,
490) -> Option<chrono::DateTime<chrono::Utc>>
491where
492 T: DatabaseQueue + Send + Sync,
493{
494 if let Ok(ready_jobs) = queue.get_ready_jobs(queue_name, 100).await {
496 ready_jobs
497 .iter()
498 .filter(|job| matches!(job.status, hammerwork::job::JobStatus::Pending))
499 .map(|job| job.created_at)
500 .min()
501 } else {
502 None
503 }
504}
505
506async fn get_priority_breakdown<T>(
508 queue: &Arc<T>,
509 queue_name: &str,
510) -> std::collections::HashMap<String, u64>
511where
512 T: DatabaseQueue + Send + Sync,
513{
514 if let Ok(priority_stats) = queue.get_priority_stats(queue_name).await {
516 let mut breakdown = std::collections::HashMap::new();
517 for (priority, count) in priority_stats.job_counts {
518 let priority_name = match priority {
519 JobPriority::Background => "background",
520 JobPriority::Low => "low",
521 JobPriority::Normal => "normal",
522 JobPriority::High => "high",
523 JobPriority::Critical => "critical",
524 };
525 breakdown.insert(priority_name.to_string(), count);
526 }
527 breakdown
528 } else {
529 std::collections::HashMap::new()
530 }
531}
532
533async fn get_status_breakdown<T>(
535 queue: &Arc<T>,
536 queue_name: &str,
537) -> std::collections::HashMap<String, u64>
538where
539 T: DatabaseQueue + Send + Sync,
540{
541 if let Ok(counts) = queue.get_job_counts_by_status(queue_name).await {
543 counts.into_iter().collect()
544 } else {
545 std::collections::HashMap::new()
546 }
547}
548
549async fn get_hourly_throughput<T>(_queue: &Arc<T>, _queue_name: &str) -> Vec<HourlyThroughput>
551where
552 T: DatabaseQueue + Send + Sync,
553{
554 Vec::new()
557}
558
559async fn get_recent_errors<T>(queue: &Arc<T>, queue_name: &str) -> Vec<RecentError>
561where
562 T: DatabaseQueue + Send + Sync,
563{
564 if let Ok(dead_jobs) = queue
566 .get_dead_jobs_by_queue(queue_name, Some(20), Some(0))
567 .await
568 {
569 dead_jobs
570 .into_iter()
571 .filter_map(|job| {
572 job.error_message.map(|error_msg| RecentError {
573 job_id: job.id.to_string(),
574 error_message: error_msg,
575 occurred_at: job.failed_at.unwrap_or(job.created_at),
576 attempts: job.attempts as i32,
577 })
578 })
579 .collect()
580 } else {
581 Vec::new()
582 }
583}
584
585async fn clear_completed_jobs<T>(_queue: &Arc<T>, _queue_name: &str) -> Result<u64, String>
587where
588 T: DatabaseQueue + Send + Sync,
589{
590 Err(
598 "Clear completed jobs requires additional DatabaseQueue methods not yet available"
599 .to_string(),
600 )
601}
602
603#[cfg(test)]
604mod tests {
605 use super::*;
606
607 #[test]
608 fn test_queue_action_request_deserialization() {
609 let json = r#"{"action": "clear_dead", "confirm": true}"#;
610 let request: QueueActionRequest = serde_json::from_str(json).unwrap();
611 assert_eq!(request.action, "clear_dead");
612 assert_eq!(request.confirm, Some(true));
613 }
614
615 #[test]
616 fn test_queue_info_serialization() {
617 let queue_info = QueueInfo {
618 name: "test_queue".to_string(),
619 pending_count: 42,
620 running_count: 3,
621 completed_count: 1000,
622 failed_count: 5,
623 dead_count: 2,
624 avg_processing_time_ms: 150.5,
625 throughput_per_minute: 25.0,
626 error_rate: 0.05,
627 last_job_at: None,
628 oldest_pending_job: None,
629 is_paused: false,
630 paused_at: None,
631 paused_by: None,
632 };
633
634 let json = serde_json::to_string(&queue_info).unwrap();
635 assert!(json.contains("test_queue"));
636 assert!(json.contains("42"));
637 assert!(json.contains("is_paused"));
638 }
639}