1use super::{
97 ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams, SortParams,
98 with_filters, with_pagination, with_sort,
99};
100use hammerwork::queue::DatabaseQueue;
101use serde::{Deserialize, Serialize};
102use std::sync::Arc;
103use warp::{Filter, Reply};
104
105#[derive(Debug, Serialize, Clone)]
107pub struct QueueInfo {
108 pub name: String,
109 pub pending_count: u64,
110 pub running_count: u64,
111 pub completed_count: u64,
112 pub failed_count: u64,
113 pub dead_count: u64,
114 pub avg_processing_time_ms: f64,
115 pub throughput_per_minute: f64,
116 pub error_rate: f64,
117 pub last_job_at: Option<chrono::DateTime<chrono::Utc>>,
118 pub oldest_pending_job: Option<chrono::DateTime<chrono::Utc>>,
119 pub is_paused: bool,
120 pub paused_at: Option<chrono::DateTime<chrono::Utc>>,
121 pub paused_by: Option<String>,
122}
123
124#[derive(Debug, Serialize)]
126pub struct DetailedQueueStats {
127 pub queue_info: QueueInfo,
128 pub priority_breakdown: std::collections::HashMap<String, u64>,
129 pub status_breakdown: std::collections::HashMap<String, u64>,
130 pub hourly_throughput: Vec<HourlyThroughput>,
131 pub recent_errors: Vec<RecentError>,
132}
133
134#[derive(Debug, Serialize)]
136pub struct HourlyThroughput {
137 pub hour: chrono::DateTime<chrono::Utc>,
138 pub completed: u64,
139 pub failed: u64,
140}
141
142#[derive(Debug, Serialize)]
144pub struct RecentError {
145 pub job_id: String,
146 pub error_message: String,
147 pub occurred_at: chrono::DateTime<chrono::Utc>,
148 pub attempts: i32,
149}
150
151#[derive(Debug, Deserialize)]
153pub struct QueueActionRequest {
154 pub action: String, pub confirm: Option<bool>,
156}
157
158pub fn routes<T>(
160 queue: Arc<T>,
161) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
162where
163 T: DatabaseQueue + Send + Sync + 'static,
164{
165 let queue_filter = warp::any().map(move || queue.clone());
166
167 let list_queues = warp::path("queues")
168 .and(warp::path::end())
169 .and(warp::get())
170 .and(queue_filter.clone())
171 .and(with_pagination())
172 .and(with_filters())
173 .and(with_sort())
174 .and_then(list_queues_handler);
175
176 let get_queue = warp::path("queues")
177 .and(warp::path::param::<String>())
178 .and(warp::path::end())
179 .and(warp::get())
180 .and(queue_filter.clone())
181 .and_then(get_queue_handler);
182
183 let queue_action = warp::path("queues")
184 .and(warp::path::param::<String>())
185 .and(warp::path("actions"))
186 .and(warp::path::end())
187 .and(warp::post())
188 .and(queue_filter.clone())
189 .and(warp::body::json())
190 .and_then(queue_action_handler);
191
192 let queue_jobs = warp::path("queues")
193 .and(warp::path::param::<String>())
194 .and(warp::path("jobs"))
195 .and(warp::path::end())
196 .and(warp::get())
197 .and(queue_filter)
198 .and(with_pagination())
199 .and(with_filters())
200 .and(with_sort())
201 .and_then(queue_jobs_handler);
202
203 list_queues.or(get_queue).or(queue_action).or(queue_jobs)
204}
205
206async fn list_queues_handler<T>(
208 queue: Arc<T>,
209 pagination: PaginationParams,
210 _filters: FilterParams,
211 _sort: SortParams,
212) -> Result<impl Reply, warp::Rejection>
213where
214 T: DatabaseQueue + Send + Sync,
215{
216 match queue.get_all_queue_stats().await {
218 Ok(all_stats) => {
219 let mut queue_infos: Vec<QueueInfo> = Vec::new();
220
221 for stats in all_stats {
222 let pause_info = queue.get_queue_pause_info(&stats.queue_name).await.unwrap_or(None);
224
225 let queue_info = QueueInfo {
226 name: stats.queue_name.clone(),
227 pending_count: stats.pending_count,
228 running_count: stats.running_count,
229 completed_count: stats.completed_count,
230 failed_count: stats.dead_count + stats.timed_out_count,
231 dead_count: stats.dead_count,
232 avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
233 throughput_per_minute: stats.statistics.throughput_per_minute,
234 error_rate: stats.statistics.error_rate,
235 last_job_at: None, oldest_pending_job: None, is_paused: pause_info.is_some(),
238 paused_at: pause_info.as_ref().map(|p| p.paused_at),
239 paused_by: pause_info.as_ref().and_then(|p| p.paused_by.clone()),
240 };
241 queue_infos.push(queue_info);
242 }
243
244 let total = queue_infos.len() as u64;
246 let offset = pagination.get_offset() as usize;
247 let limit = pagination.get_limit() as usize;
248
249 let items = if offset < queue_infos.len() {
250 let end = (offset + limit).min(queue_infos.len());
251 queue_infos[offset..end].to_vec()
252 } else {
253 Vec::new()
254 };
255
256 let response = PaginatedResponse {
257 items,
258 pagination: PaginationMeta::new(&pagination, total),
259 };
260
261 Ok(warp::reply::json(&ApiResponse::success(response)))
262 }
263 Err(e) => {
264 let response =
265 ApiResponse::<()>::error(format!("Failed to get queue statistics: {}", e));
266 Ok(warp::reply::json(&response))
267 }
268 }
269}
270
271async fn get_queue_handler<T>(
273 queue_name: String,
274 queue: Arc<T>,
275) -> Result<impl Reply, warp::Rejection>
276where
277 T: DatabaseQueue + Send + Sync,
278{
279 match queue.get_all_queue_stats().await {
280 Ok(all_stats) => {
281 if let Some(stats) = all_stats.into_iter().find(|s| s.queue_name == queue_name) {
282 let priority_breakdown = std::collections::HashMap::new(); let status_breakdown = std::collections::HashMap::new(); let hourly_throughput = Vec::new(); let recent_errors = Vec::new(); let pause_info = queue.get_queue_pause_info(&queue_name).await.unwrap_or(None);
290
291 let queue_info = QueueInfo {
292 name: stats.queue_name.clone(),
293 pending_count: stats.pending_count,
294 running_count: stats.running_count,
295 completed_count: stats.completed_count,
296 failed_count: stats.dead_count + stats.timed_out_count,
297 dead_count: stats.dead_count,
298 avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
299 throughput_per_minute: stats.statistics.throughput_per_minute,
300 error_rate: stats.statistics.error_rate,
301 last_job_at: None,
302 oldest_pending_job: None,
303 is_paused: pause_info.is_some(),
304 paused_at: pause_info.as_ref().map(|p| p.paused_at),
305 paused_by: pause_info.as_ref().and_then(|p| p.paused_by.clone()),
306 };
307
308 let detailed_stats = DetailedQueueStats {
309 queue_info,
310 priority_breakdown,
311 status_breakdown,
312 hourly_throughput,
313 recent_errors,
314 };
315
316 Ok(warp::reply::json(&ApiResponse::success(detailed_stats)))
317 } else {
318 let response =
319 ApiResponse::<()>::error(format!("Queue '{}' not found", queue_name));
320 Ok(warp::reply::json(&response))
321 }
322 }
323 Err(e) => {
324 let response =
325 ApiResponse::<()>::error(format!("Failed to get queue statistics: {}", e));
326 Ok(warp::reply::json(&response))
327 }
328 }
329}
330
331async fn queue_action_handler<T>(
333 queue_name: String,
334 queue: Arc<T>,
335 action_request: QueueActionRequest,
336) -> Result<impl Reply, warp::Rejection>
337where
338 T: DatabaseQueue + Send + Sync,
339{
340 match action_request.action.as_str() {
341 "clear_dead" => {
342 let older_than = chrono::Utc::now() - chrono::Duration::days(7); match queue.purge_dead_jobs(older_than).await {
344 Ok(count) => {
345 let response = ApiResponse::success(serde_json::json!({
346 "message": format!("Cleared {} dead jobs from queue '{}'", count, queue_name),
347 "count": count
348 }));
349 Ok(warp::reply::json(&response))
350 }
351 Err(e) => {
352 let response =
353 ApiResponse::<()>::error(format!("Failed to clear dead jobs: {}", e));
354 Ok(warp::reply::json(&response))
355 }
356 }
357 }
358 "clear_completed" => {
359 let response =
361 ApiResponse::<()>::error("Clear completed jobs not yet implemented".to_string());
362 Ok(warp::reply::json(&response))
363 }
364 "pause" => {
365 match queue.pause_queue(&queue_name, Some("web-ui")).await {
366 Ok(()) => {
367 let response = ApiResponse::success(serde_json::json!({
368 "message": format!("Queue '{}' has been paused", queue_name),
369 "queue": queue_name,
370 "action": "pause"
371 }));
372 Ok(warp::reply::json(&response))
373 }
374 Err(e) => {
375 let response =
376 ApiResponse::<()>::error(format!("Failed to pause queue: {}", e));
377 Ok(warp::reply::json(&response))
378 }
379 }
380 }
381 "resume" => {
382 match queue.resume_queue(&queue_name, Some("web-ui")).await {
383 Ok(()) => {
384 let response = ApiResponse::success(serde_json::json!({
385 "message": format!("Queue '{}' has been resumed", queue_name),
386 "queue": queue_name,
387 "action": "resume"
388 }));
389 Ok(warp::reply::json(&response))
390 }
391 Err(e) => {
392 let response =
393 ApiResponse::<()>::error(format!("Failed to resume queue: {}", e));
394 Ok(warp::reply::json(&response))
395 }
396 }
397 }
398 _ => {
399 let response =
400 ApiResponse::<()>::error(format!("Unknown action: {}", action_request.action));
401 Ok(warp::reply::json(&response))
402 }
403 }
404}
405
406async fn queue_jobs_handler<T>(
408 queue_name: String,
409 queue: Arc<T>,
410 pagination: PaginationParams,
411 filters: FilterParams,
412 sort: SortParams,
413) -> Result<impl Reply, warp::Rejection>
414where
415 T: DatabaseQueue + Send + Sync,
416{
417 let _ = (queue, pagination, filters, sort);
420
421 let response = ApiResponse::success(serde_json::json!({
422 "message": format!("Jobs for queue '{}' - implementation pending", queue_name),
423 "queue": queue_name
424 }));
425
426 Ok(warp::reply::json(&response))
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432
433 #[test]
434 fn test_queue_action_request_deserialization() {
435 let json = r#"{"action": "clear_dead", "confirm": true}"#;
436 let request: QueueActionRequest = serde_json::from_str(json).unwrap();
437 assert_eq!(request.action, "clear_dead");
438 assert_eq!(request.confirm, Some(true));
439 }
440
441 #[test]
442 fn test_queue_info_serialization() {
443 let queue_info = QueueInfo {
444 name: "test_queue".to_string(),
445 pending_count: 42,
446 running_count: 3,
447 completed_count: 1000,
448 failed_count: 5,
449 dead_count: 2,
450 avg_processing_time_ms: 150.5,
451 throughput_per_minute: 25.0,
452 error_rate: 0.05,
453 last_job_at: None,
454 oldest_pending_job: None,
455 is_paused: false,
456 paused_at: None,
457 paused_by: None,
458 };
459
460 let json = serde_json::to_string(&queue_info).unwrap();
461 assert!(json.contains("test_queue"));
462 assert!(json.contains("42"));
463 assert!(json.contains("is_paused"));
464 }
465}