1use super::{
97 ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams, SortParams,
98 with_filters, with_pagination, with_sort,
99};
100use hammerwork::queue::DatabaseQueue;
101use serde::{Deserialize, Serialize};
102use std::sync::Arc;
103use warp::{Filter, Reply};
104
105#[derive(Debug, Serialize, Clone)]
107pub struct QueueInfo {
108 pub name: String,
109 pub pending_count: u64,
110 pub running_count: u64,
111 pub completed_count: u64,
112 pub failed_count: u64,
113 pub dead_count: u64,
114 pub avg_processing_time_ms: f64,
115 pub throughput_per_minute: f64,
116 pub error_rate: f64,
117 pub last_job_at: Option<chrono::DateTime<chrono::Utc>>,
118 pub oldest_pending_job: Option<chrono::DateTime<chrono::Utc>>,
119}
120
121#[derive(Debug, Serialize)]
123pub struct DetailedQueueStats {
124 pub queue_info: QueueInfo,
125 pub priority_breakdown: std::collections::HashMap<String, u64>,
126 pub status_breakdown: std::collections::HashMap<String, u64>,
127 pub hourly_throughput: Vec<HourlyThroughput>,
128 pub recent_errors: Vec<RecentError>,
129}
130
131#[derive(Debug, Serialize)]
133pub struct HourlyThroughput {
134 pub hour: chrono::DateTime<chrono::Utc>,
135 pub completed: u64,
136 pub failed: u64,
137}
138
139#[derive(Debug, Serialize)]
141pub struct RecentError {
142 pub job_id: String,
143 pub error_message: String,
144 pub occurred_at: chrono::DateTime<chrono::Utc>,
145 pub attempts: i32,
146}
147
148#[derive(Debug, Deserialize)]
150pub struct QueueActionRequest {
151 pub action: String, pub confirm: Option<bool>,
153}
154
155pub fn routes<T>(
157 queue: Arc<T>,
158) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
159where
160 T: DatabaseQueue + Send + Sync + 'static,
161{
162 let queue_filter = warp::any().map(move || queue.clone());
163
164 let list_queues = warp::path("queues")
165 .and(warp::path::end())
166 .and(warp::get())
167 .and(queue_filter.clone())
168 .and(with_pagination())
169 .and(with_filters())
170 .and(with_sort())
171 .and_then(list_queues_handler);
172
173 let get_queue = warp::path("queues")
174 .and(warp::path::param::<String>())
175 .and(warp::path::end())
176 .and(warp::get())
177 .and(queue_filter.clone())
178 .and_then(get_queue_handler);
179
180 let queue_action = warp::path("queues")
181 .and(warp::path::param::<String>())
182 .and(warp::path("actions"))
183 .and(warp::path::end())
184 .and(warp::post())
185 .and(queue_filter.clone())
186 .and(warp::body::json())
187 .and_then(queue_action_handler);
188
189 let queue_jobs = warp::path("queues")
190 .and(warp::path::param::<String>())
191 .and(warp::path("jobs"))
192 .and(warp::path::end())
193 .and(warp::get())
194 .and(queue_filter)
195 .and(with_pagination())
196 .and(with_filters())
197 .and(with_sort())
198 .and_then(queue_jobs_handler);
199
200 list_queues.or(get_queue).or(queue_action).or(queue_jobs)
201}
202
203async fn list_queues_handler<T>(
205 queue: Arc<T>,
206 pagination: PaginationParams,
207 _filters: FilterParams,
208 _sort: SortParams,
209) -> Result<impl Reply, warp::Rejection>
210where
211 T: DatabaseQueue + Send + Sync,
212{
213 match queue.get_all_queue_stats().await {
215 Ok(all_stats) => {
216 let mut queue_infos: Vec<QueueInfo> = Vec::new();
217
218 for stats in all_stats {
219 let queue_info = QueueInfo {
220 name: stats.queue_name.clone(),
221 pending_count: stats.pending_count,
222 running_count: stats.running_count,
223 completed_count: stats.completed_count,
224 failed_count: stats.dead_count + stats.timed_out_count,
225 dead_count: stats.dead_count,
226 avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
227 throughput_per_minute: stats.statistics.throughput_per_minute,
228 error_rate: stats.statistics.error_rate,
229 last_job_at: None, oldest_pending_job: None, };
232 queue_infos.push(queue_info);
233 }
234
235 let total = queue_infos.len() as u64;
237 let offset = pagination.get_offset() as usize;
238 let limit = pagination.get_limit() as usize;
239
240 let items = if offset < queue_infos.len() {
241 let end = (offset + limit).min(queue_infos.len());
242 queue_infos[offset..end].to_vec()
243 } else {
244 Vec::new()
245 };
246
247 let response = PaginatedResponse {
248 items,
249 pagination: PaginationMeta::new(&pagination, total),
250 };
251
252 Ok(warp::reply::json(&ApiResponse::success(response)))
253 }
254 Err(e) => {
255 let response =
256 ApiResponse::<()>::error(format!("Failed to get queue statistics: {}", e));
257 Ok(warp::reply::json(&response))
258 }
259 }
260}
261
262async fn get_queue_handler<T>(
264 queue_name: String,
265 queue: Arc<T>,
266) -> Result<impl Reply, warp::Rejection>
267where
268 T: DatabaseQueue + Send + Sync,
269{
270 match queue.get_all_queue_stats().await {
271 Ok(all_stats) => {
272 if let Some(stats) = all_stats.into_iter().find(|s| s.queue_name == queue_name) {
273 let priority_breakdown = std::collections::HashMap::new(); let status_breakdown = std::collections::HashMap::new(); let hourly_throughput = Vec::new(); let recent_errors = Vec::new(); let queue_info = QueueInfo {
280 name: stats.queue_name.clone(),
281 pending_count: stats.pending_count,
282 running_count: stats.running_count,
283 completed_count: stats.completed_count,
284 failed_count: stats.dead_count + stats.timed_out_count,
285 dead_count: stats.dead_count,
286 avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
287 throughput_per_minute: stats.statistics.throughput_per_minute,
288 error_rate: stats.statistics.error_rate,
289 last_job_at: None,
290 oldest_pending_job: None,
291 };
292
293 let detailed_stats = DetailedQueueStats {
294 queue_info,
295 priority_breakdown,
296 status_breakdown,
297 hourly_throughput,
298 recent_errors,
299 };
300
301 Ok(warp::reply::json(&ApiResponse::success(detailed_stats)))
302 } else {
303 let response =
304 ApiResponse::<()>::error(format!("Queue '{}' not found", queue_name));
305 Ok(warp::reply::json(&response))
306 }
307 }
308 Err(e) => {
309 let response =
310 ApiResponse::<()>::error(format!("Failed to get queue statistics: {}", e));
311 Ok(warp::reply::json(&response))
312 }
313 }
314}
315
316async fn queue_action_handler<T>(
318 queue_name: String,
319 queue: Arc<T>,
320 action_request: QueueActionRequest,
321) -> Result<impl Reply, warp::Rejection>
322where
323 T: DatabaseQueue + Send + Sync,
324{
325 match action_request.action.as_str() {
326 "clear_dead" => {
327 let older_than = chrono::Utc::now() - chrono::Duration::days(7); match queue.purge_dead_jobs(older_than).await {
329 Ok(count) => {
330 let response = ApiResponse::success(serde_json::json!({
331 "message": format!("Cleared {} dead jobs from queue '{}'", count, queue_name),
332 "count": count
333 }));
334 Ok(warp::reply::json(&response))
335 }
336 Err(e) => {
337 let response =
338 ApiResponse::<()>::error(format!("Failed to clear dead jobs: {}", e));
339 Ok(warp::reply::json(&response))
340 }
341 }
342 }
343 "clear_completed" => {
344 let response =
346 ApiResponse::<()>::error("Clear completed jobs not yet implemented".to_string());
347 Ok(warp::reply::json(&response))
348 }
349 "pause" => {
350 let response = ApiResponse::<()>::error("Queue pause not yet implemented".to_string());
352 Ok(warp::reply::json(&response))
353 }
354 "resume" => {
355 let response = ApiResponse::<()>::error("Queue resume not yet implemented".to_string());
357 Ok(warp::reply::json(&response))
358 }
359 _ => {
360 let response =
361 ApiResponse::<()>::error(format!("Unknown action: {}", action_request.action));
362 Ok(warp::reply::json(&response))
363 }
364 }
365}
366
367async fn queue_jobs_handler<T>(
369 queue_name: String,
370 queue: Arc<T>,
371 pagination: PaginationParams,
372 filters: FilterParams,
373 sort: SortParams,
374) -> Result<impl Reply, warp::Rejection>
375where
376 T: DatabaseQueue + Send + Sync,
377{
378 let _ = (queue, pagination, filters, sort);
381
382 let response = ApiResponse::success(serde_json::json!({
383 "message": format!("Jobs for queue '{}' - implementation pending", queue_name),
384 "queue": queue_name
385 }));
386
387 Ok(warp::reply::json(&response))
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393
394 #[test]
395 fn test_queue_action_request_deserialization() {
396 let json = r#"{"action": "clear_dead", "confirm": true}"#;
397 let request: QueueActionRequest = serde_json::from_str(json).unwrap();
398 assert_eq!(request.action, "clear_dead");
399 assert_eq!(request.confirm, Some(true));
400 }
401
402 #[test]
403 fn test_queue_info_serialization() {
404 let queue_info = QueueInfo {
405 name: "test_queue".to_string(),
406 pending_count: 42,
407 running_count: 3,
408 completed_count: 1000,
409 failed_count: 5,
410 dead_count: 2,
411 avg_processing_time_ms: 150.5,
412 throughput_per_minute: 25.0,
413 error_rate: 0.05,
414 last_job_at: None,
415 oldest_pending_job: None,
416 };
417
418 let json = serde_json::to_string(&queue_info).unwrap();
419 assert!(json.contains("test_queue"));
420 assert!(json.contains("42"));
421 }
422}