1use actix_web::{web, HttpResponse, Responder};
3use tera::Context;
4use redis::AsyncCommands;
5use chrono::{Duration, Utc};
6use serde_json::json;
7use serde::{Deserialize, Serialize};
8use csv::WriterBuilder;
9use std::collections::HashSet;
10use crate::utils::rdconfig::{get_redis_connection};
11use crate::services::template_service::render_template;
12use crate::utils::pagination::{Pagination, PaginationQuery};
13use crate::utils::jconfig::{deserialize_job, to_job_info, JobInfo, fetch_job_info};
14use crate::utils::renderer::paginate_jobs;
15use crate::utils::constants::{
16 DEFAULT_PAGE,
17 DEFAULT_LIMIT,
18 DELAYED_JOBS_KEY,
19 PREFIX_QUEUE,
20 PREFIX_JOB,
21 QUEUES_SET,
22 QUEUE_CONFIG_PREFIX,
23 SUCCESS_LIST_PREFIX,
24 FAILED_LIST_PREFIX,
25 RETRY_LIST_PREFIX,
26 COUNTER_SUCCESS,
27 COUNTER_FAILED,
28 COUNTER_TOTAL_JOBS,
29 STATS_JOBS_PREFIX,
30 STATS_JOBS_FAILED_PREFIX,
31 LOGS_PREFIX,
32 FAILED_JOBS_LIST,
33 WORKER_PREFIX,
34};
35
36#[derive(Deserialize)]
37pub struct MetricsQuery {
38 pub search: Option<String>,
39 pub page: Option<usize>,
40 pub limit: Option<usize>,
41}
42
43
44pub async fn render_metrics(query: web::Query<MetricsQuery>) -> impl Responder {
45 let mut conn = match get_redis_connection().await {
46 Ok(c) => c,
47 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
48 };
49
50 let active_queues: Vec<String> = conn.smembers(QUEUES_SET).await.unwrap_or_default();
52
53 let config_pattern = format!("{QUEUE_CONFIG_PREFIX}:*");
55 let config_keys: Vec<String> = conn.keys(&config_pattern).await.unwrap_or_default();
56 let configured_queues: Vec<String> = config_keys
57 .into_iter()
58 .filter_map(|key| key.strip_prefix(&format!("{QUEUE_CONFIG_PREFIX}:")).map(String::from))
59 .collect();
60
61 let mut all_queues = configured_queues;
63 for queue in active_queues {
64 if !all_queues.contains(&queue) {
65 all_queues.push(queue);
66 }
67 }
68
69 if let Some(search) = &query.search {
71 let search_lower = search.to_lowercase();
72 all_queues = all_queues
73 .into_iter()
74 .filter(|q| q.to_lowercase().contains(&search_lower))
75 .collect();
76 }
77
78 let pagination_query = PaginationQuery {
80 page: query.page,
81 limit: query.limit,
82 };
83 let pagination = pagination_query.into_pagination(all_queues.len());
84
85 let (paginated_queues, pagination) =
87 paginate_jobs(all_queues, pagination.page, pagination.limit).await;
88
89 let mut queue_infos = vec![];
91 let mut total_success = 0;
92 let mut total_failed = 0;
93 let mut total_retry = 0;
94 let mut total_pending = 0;
95
96 for queue in &paginated_queues {
97 let success_key = format!("{SUCCESS_LIST_PREFIX}:{}", queue);
98 let failed_key = format!("{FAILED_LIST_PREFIX}:{}", queue);
99 let retry_key = format!("{RETRY_LIST_PREFIX}:{}", queue);
100 let pending_key = format!("{PREFIX_QUEUE}:{}", queue);
101
102 let success: usize = conn.llen(&success_key).await.unwrap_or(0);
103 let failed: usize = conn.llen(&failed_key).await.unwrap_or(0);
104 let retry: usize = conn.llen(&retry_key).await.unwrap_or(0);
105 let pending: usize = conn.llen(&pending_key).await.unwrap_or(0);
106
107 total_success += success;
108 total_failed += failed;
109 total_retry += retry;
110 total_pending += pending;
111
112 queue_infos.push(json!({
113 "name": queue,
114 "success": success,
115 "failed": failed,
116 "retry": retry,
117 "pending": pending
118 }));
119 }
120
121 let mut ctx = Context::new();
123 ctx.insert("title", "All Queues");
124 ctx.insert("queues", &queue_infos);
125
126 ctx.insert("stats", &json!({
127 "success_jobs": total_success,
128 "failed_jobs": total_failed,
129 "retry_jobs": total_retry,
130 "pending_jobs": total_pending,
131 }));
132
133 ctx.insert("query", &json!({
135 "search": query.search.clone().unwrap_or_default()
136 }));
137
138 ctx.insert("page", &json!({
139 "current": pagination.page,
140 "start": pagination.offset() + 1,
141 "end": (pagination.offset() + pagination.limit).min(pagination.total),
142 "total": pagination.total,
143 "has_prev": pagination.has_prev,
144 "has_next": pagination.has_next,
145 "query": format!("search={}", query.search.clone().unwrap_or_default())
146 }));
147
148 render_template("metrics.html.tera", ctx).await
149}
150
151
152
153pub async fn render_metrics_for_queue(
154 path: web::Path<String>,
155 query: web::Query<PaginationQuery>,
156) -> impl Responder {
157 let queue = path.into_inner();
158
159 let mut conn = match get_redis_connection().await {
160 Ok(c) => c,
161 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
162 };
163
164 let key = format!("{PREFIX_QUEUE}:{}", queue);
165
166 let all_jobs: Vec<String> = match conn.lrange(&key, 0, -1).await {
167 Ok(jobs) => jobs,
168 Err(_) => vec![], };
170
171 let page = query.page.unwrap_or(DEFAULT_PAGE);
172 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
173 let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
174
175 let mut job_infos: Vec<JobInfo> = Vec::new();
177 for job_id in job_ids {
178 match fetch_job_info(&job_id).await {
179 Ok(Some(info)) => job_infos.push(info),
180 Ok(None) => {
181 tracing::warn!("Job info not found for ID: {}", job_id);
182 }
183 Err(e) => {
184 tracing::error!("Failed to fetch job info for ID {}: {:?}", job_id, e);
185 }
186 }
187 }
188
189 let mut ctx = Context::new();
190 ctx.insert("title", &format!("Queue: {}", queue));
191 ctx.insert("queue", &queue);
192 ctx.insert("jobs", &job_infos);
193 ctx.insert("page", &serde_json::json!({
194 "current": pagination.page,
195 "start": pagination.offset() + 1,
196 "end": (pagination.offset() + pagination.limit).min(pagination.total),
197 "total": pagination.total,
198 "has_prev": pagination.has_prev,
199 "has_next": pagination.has_next,
200 "query": "" }));
202
203 render_template("queue_metrics.html.tera", ctx).await
204}
205
206
207
208
209pub async fn render_scheduled_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
210 let mut conn = match get_redis_connection().await {
211 Ok(c) => c,
212 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
213 };
214
215 let now = Utc::now().timestamp();
216 let job_ids: Vec<String> = conn
217 .zrangebyscore(DELAYED_JOBS_KEY, 0, now)
218 .await
219 .unwrap_or_default();
220
221 let mut job_infos = Vec::new();
222
223 for jid in job_ids {
224 if let Ok(data) = conn.get::<_, String>(format!("{PREFIX_JOB}:{}", jid)).await {
225 if let Some(job) = deserialize_job(data).await {
226 job_infos.push(to_job_info(&job, &jid)); }
228 }
229 }
230
231 let page = query.page.unwrap_or(DEFAULT_PAGE);
232 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
233 let total = job_infos.len();
234 let pagination = Pagination::new(page, limit, total);
235
236 let start = pagination.offset();
237 let end = (start + limit).min(total);
238 let paginated_job_infos = &job_infos[start..end];
239
240 let mut ctx = Context::new();
241 ctx.insert("title", "Scheduled Jobs");
242 ctx.insert("jobs", &paginated_job_infos); ctx.insert("pagination", &pagination);
244
245 render_template("scheduled_jobs.html.tera", ctx).await
246}
247
248
249
250
251pub async fn render_delayed_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
252 let mut conn = match get_redis_connection().await {
253 Ok(c) => c,
254 Err(_) => return HttpResponse::InternalServerError().body("Redis connection failed"),
255 };
256
257 let job_ids: Vec<String> = match conn.zrange(DELAYED_JOBS_KEY, 0, -1).await {
259 Ok(ids) => ids,
260 Err(_) => vec![],
261 };
262
263 let mut job_infos = Vec::new();
265 for jid in job_ids {
266 match fetch_job_info(&jid).await {
267 Ok(Some(info)) => job_infos.push(info),
268 Ok(None) => {
269 tracing::warn!("❌ Delayed job not found in Redis for ID: {}", jid);
270 }
271 Err(err) => {
272 tracing::error!("❌ Failed to fetch delayed job ID {}: {:?}", jid, err);
273 }
274 }
275 }
276
277 let page = query.page.unwrap_or(DEFAULT_PAGE);
279 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
280 let total = job_infos.len();
281 let pagination = Pagination::new(page, limit, total);
282
283 let start = pagination.offset();
284 let end = (start + limit).min(total);
285 let paginated = &job_infos[start..end];
286
287 let mut ctx = Context::new();
289 ctx.insert("title", "All Delayed Jobs");
290 ctx.insert("jobs", &paginated);
291 ctx.insert("page", &serde_json::json!({
292 "current": pagination.page,
293 "start": start + 1,
294 "end": end,
295 "total": total,
296 "has_prev": pagination.has_prev,
297 "has_next": pagination.has_next,
298 "query": ""
299 }));
300
301 render_template("delayed_jobs.html.tera", ctx).await
302}
303
304
305
306pub async fn render_dead_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
307 let mut conn = match get_redis_connection().await {
309 Ok(c) => c,
310 Err(e) => {
311 tracing::error!("Redis connection failed: {:?}", e);
312 return HttpResponse::InternalServerError().body("Redis connection error");
313 }
314 };
315
316 let all_jobs: Vec<String> = match conn.lrange(FAILED_JOBS_LIST, 0, -1).await {
318 Ok(jobs) => jobs,
319 Err(e) => {
320 tracing::error!("Failed to read failed_jobs list: {:?}", e);
321 vec![]
322 }
323 };
324
325 let page = query.page.unwrap_or(DEFAULT_PAGE);
327 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
328 let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
329
330 let mut job_infos: Vec<JobInfo> = Vec::new();
332 for job_id in job_ids {
333 match fetch_job_info(&job_id).await {
334 Ok(Some(info)) => job_infos.push(info),
335 Ok(None) => {
336 tracing::warn!("Job info not found for ID: {}", job_id);
337 }
338 Err(e) => {
339 tracing::error!("Error fetching job info for ID {}: {:?}", job_id, e);
340 }
341 }
342 }
343
344 let mut ctx = Context::new();
346 ctx.insert("title", "Dead Jobs");
347 ctx.insert("jobs", &job_infos);
348 ctx.insert("page", &json!({
349 "current": pagination.page,
350 "start": pagination.offset() + 1,
351 "end": (pagination.offset() + pagination.limit).min(pagination.total),
352 "total": pagination.total,
353 "has_prev": pagination.has_prev,
354 "has_next": pagination.has_next,
355 "query": ""
356 }));
357
358 render_template("dead_jobs.html.tera", ctx).await
359}
360
361
362
363#[derive(Debug, Serialize, Deserialize)]
364struct WorkerStatus {
365 id: String,
366 queues: Vec<String>,
367 last_seen: String,
368 hostname: String,
369 pid: u32,
370}
371
372pub async fn render_worker_status() -> impl Responder {
373 let mut conn = match get_redis_connection().await {
374 Ok(c) => c,
375 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
376 };
377
378 let worker_pattern = format!("{WORKER_PREFIX}:*");
379 let keys: Vec<String> = conn.keys(&worker_pattern).await.unwrap_or_default();
380 let mut workers = Vec::new();
381
382 for key in keys {
383 if let Ok(status_json) = conn.get::<_, String>(&key).await {
384 if let Ok(mut status) = serde_json::from_str::<WorkerStatus>(&status_json) {
385 status.id = key.replace(&format!("{WORKER_PREFIX}:"), "");
386 workers.push(status);
387 }
388 }
389 }
390
391 let mut ctx = Context::new();
392 ctx.insert("title", "Worker Status");
393 ctx.insert("workers", &workers);
394
395 render_template("workers.html.tera", ctx).await
396}
397
398
399
400pub async fn job_action(payload: web::Json<serde_json::Value>) -> impl Responder {
401 let mut conn = match get_redis_connection().await {
402 Ok(c) => c,
403 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
404 };
405
406 let action = payload.get("action").and_then(|a| a.as_str()).unwrap_or("");
407 let job_id = payload.get("job_id").and_then(|j| j.as_str()).unwrap_or("");
408 let job_key = format!("{PREFIX_JOB}:{}", job_id);
409
410 match action {
411 "delete" => {
412 if conn.exists(&job_key).await.unwrap_or(false) {
413 let _: () = conn.del(&job_key).await.unwrap_or_default();
414 let logs_key = format!("{LOGS_PREFIX}:default");
415 let _: () = conn.lpush(&logs_key, format!("[{}] 🗑️ Job {} deleted", Utc::now(), job_id)).await.unwrap_or_default();
416 let _: () = conn.ltrim(&logs_key, 0, 99).await.unwrap_or_default();
417 HttpResponse::Ok().json(json!({"status": "deleted"}))
418 } else {
419 HttpResponse::NotFound().json(json!({"error": "job not found"}))
420 }
421 },
422 "retry" | "queue" => {
423 let exists: bool = conn.exists(&job_key).await.unwrap_or(false);
424 if !exists {
425 return HttpResponse::NotFound().json(json!({ "error": "job not found" }));
426 }
427
428 let queue: String = conn.hget(&job_key, "queue").await.unwrap_or_else(|_| "default".to_string());
430 let queue_key = format!("{PREFIX_QUEUE}:{}", queue);
431
432 let _: () = conn.zrem(DELAYED_JOBS_KEY, &job_id).await.unwrap_or_default();
434
435 let _: () = conn.lrem(&queue_key, 0, &job_id).await.unwrap_or_default();
437
438 let _: () = conn.rpush(&queue_key, &job_id).await.unwrap_or_default();
440
441 let _: () = conn.hset_multiple(&job_key, &[
443 ("status", "queued"),
444 ("retry_at", &Utc::now().to_rfc3339()),
445 ]).await.unwrap_or_default();
446
447 let _: () = conn.hdel(&job_key, &["failed_at", "completed_at", "run_at"]).await.unwrap_or_default();
449
450 HttpResponse::Ok().json(json!({ "status": "retried", "queue": queue }))
451 }
452
453 _ => HttpResponse::BadRequest().json(json!({"error": "invalid action"})),
454 }
455}
456
457
458
459pub async fn get_metrics_summary() -> impl Responder {
460 let mut conn = match get_redis_connection().await {
461 Ok(c) => c,
462 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
463 };
464
465 let total_jobs: usize = conn.get(COUNTER_TOTAL_JOBS).await.unwrap_or(0);
466 let success_jobs: usize = conn.get(COUNTER_SUCCESS).await.unwrap_or(0);
467 let failed_jobs: usize = conn.get(COUNTER_FAILED).await.unwrap_or(0);
468
469 let queues: Vec<String> = conn.smembers(QUEUES_SET).await.unwrap_or_default();
470 let mut scheduled_jobs = 0;
471 for queue in &queues {
472 let len: usize = conn.llen(format!("{PREFIX_QUEUE}:{}", queue)).await.unwrap_or(0);
473 scheduled_jobs += len;
474 }
475
476 let worker_pattern = format!("{WORKER_PREFIX}:*");
477 let worker_keys: Vec<String> = conn.keys(&worker_pattern).await.unwrap_or_default();
478 let active_workers = worker_keys.len();
479
480 let mut chart_labels = Vec::new();
482 let mut chart_success = Vec::new();
483 let mut chart_failed = Vec::new();
484
485 for i in (0..7).rev() {
486 let day = Utc::now().date_naive() - Duration::days(i);
487 let date_str = day.format("%Y-%m-%d").to_string();
488
489 let total_key = format!("{STATS_JOBS_PREFIX}:{}", date_str);
490 let failed_key = format!("{STATS_JOBS_FAILED_PREFIX}:{}:failed", date_str);
491
492 let total: usize = conn.get(&total_key).await.unwrap_or(0);
493 let failed: usize = conn.get(&failed_key).await.unwrap_or(0);
494 let success = total.saturating_sub(failed);
495
496 chart_labels.push(day.format("%a").to_string()); chart_success.push(success);
498 chart_failed.push(failed);
499 }
500
501 let mut ctx = Context::new();
502 ctx.insert("title", "Metrics Summary");
503 ctx.insert("stats", &json!({
504 "total_jobs": total_jobs,
505 "success_jobs": success_jobs,
506 "failed_jobs": failed_jobs,
507 "scheduled_jobs": scheduled_jobs,
508 "active_workers": active_workers
509 }));
510 ctx.insert("chart", &json!({
511 "labels": chart_labels,
512 "success": chart_success,
513 "failed": chart_failed
514 }));
515
516 render_template("summary.html.tera", ctx).await
517}
518
519
520
521pub async fn export_queue_csv(path: web::Path<String>) -> impl Responder {
522 let queue = path.into_inner();
523 let mut conn = match get_redis_connection().await {
524 Ok(c) => c,
525 Err(e) => {
526 eprintln!("Redis connection error: {:?}", e);
527 return HttpResponse::InternalServerError().body("Failed to connect to Redis");
528 }
529 };
530
531 let key = format!("{PREFIX_QUEUE}:{}", queue);
532 let jobs: Vec<String> = match conn.lrange(&key, 0, -1).await {
533 Ok(j) => j,
534 Err(e) => {
535 eprintln!("Failed to fetch jobs from Redis: {:?}", e);
536 return HttpResponse::InternalServerError().body("Failed to fetch jobs");
537 }
538 };
539
540 let mut job_infos: Vec<JobInfo> = vec![];
541
542 for (i, payload) in jobs.into_iter().enumerate() {
543 match deserialize_job(payload).await {
544 Some(job) => {
545 let id = format!("{}_{}", queue, i); job_infos.push(to_job_info(&job, &id));
547 }
548 None => {
549 eprintln!("Failed to deserialize job at index {}", i);
550 continue;
551 }
552 }
553 }
554
555 let mut wtr = WriterBuilder::new()
556 .has_headers(true)
557 .from_writer(vec![]);
558
559 for job_info in &job_infos {
560 if let Err(e) = wtr.serialize(job_info) {
561 eprintln!("CSV serialization failed: {:?}", e);
562 }
563 }
564
565 let data = match wtr.into_inner() {
566 Ok(bytes) => bytes,
567 Err(e) => {
568 eprintln!("Failed to build CSV output: {:?}", e);
569 return HttpResponse::InternalServerError().body("Failed to generate CSV");
570 }
571 };
572
573 HttpResponse::Ok()
574 .content_type("text/csv")
575 .append_header(("Content-Disposition", format!("attachment; filename=queue_{}.csv", queue)))
576 .body(data)
577}
578
579
580
581
582
583
584
585
586
587pub async fn render_failed_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
588 let mut conn = match get_redis_connection().await {
589 Ok(c) => c,
590 Err(e) => {
591 tracing::error!("Redis connection failed (failed jobs): {:?}", e);
592 return HttpResponse::InternalServerError().body("Redis connection error");
593 }
594 };
595
596 let queues: Vec<String> = conn.smembers(QUEUES_SET).await.unwrap_or_default();
598
599 let mut all_jobs: Vec<String> = Vec::new();
601 for queue in &queues {
602 let key = format!("{FAILED_LIST_PREFIX}:{}", queue);
603 let ids: Vec<String> = conn.lrange(&key, 0, -1).await.unwrap_or_default();
604 all_jobs.extend(ids);
605 }
606
607 let all_jobs: Vec<String> = {
609 let mut set = HashSet::new();
610 all_jobs.into_iter().filter(|id| set.insert(id.clone())).collect()
611 };
612
613 let page = query.page.unwrap_or(DEFAULT_PAGE);
614 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
615 let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
616
617 let mut job_infos: Vec<JobInfo> = Vec::new();
618 for job_id in job_ids {
619 match fetch_job_info(&job_id).await {
620 Ok(Some(info)) => job_infos.push(info),
621 Ok(None) => {
622 tracing::warn!("Failed jobs: job info not found for ID: {}", job_id);
623 }
624 Err(e) => {
625 tracing::error!("Failed jobs: error fetching job info for {}: {:?}", job_id, e);
626 }
627 }
628 }
629
630 let mut ctx = Context::new();
631 ctx.insert("title", "Failed Jobs");
632 ctx.insert("jobs", &job_infos);
633 ctx.insert("page", &json!({
634 "current": pagination.page,
635 "start": pagination.offset() + 1,
636 "end": (pagination.offset() + pagination.limit).min(pagination.total),
637 "total": pagination.total,
638 "has_prev": pagination.has_prev,
639 "has_next": pagination.has_next,
640 "query": ""
641 }));
642
643 render_template("failed_jobs.html.tera", ctx).await
644}
645
646
647
648
649pub async fn render_retry_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
650 let mut conn = match get_redis_connection().await {
651 Ok(c) => c,
652 Err(e) => {
653 tracing::error!("Redis connection failed (retry jobs): {:?}", e);
654 return HttpResponse::InternalServerError().body("Redis connection error");
655 }
656 };
657
658 let queues: Vec<String> = conn.smembers(QUEUES_SET).await.unwrap_or_default();
660
661 let mut all_jobs: Vec<String> = Vec::new();
663 for queue in &queues {
664 let key = format!("{RETRY_LIST_PREFIX}:{}", queue);
665 let ids: Vec<String> = conn.lrange(&key, 0, -1).await.unwrap_or_default();
666 all_jobs.extend(ids);
667 }
668
669 let all_jobs: Vec<String> = {
671 let mut set = HashSet::new();
672 all_jobs.into_iter().filter(|id| set.insert(id.clone())).collect()
673 };
674
675 let page = query.page.unwrap_or(DEFAULT_PAGE);
676 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
677 let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
678
679 let mut job_infos: Vec<JobInfo> = Vec::new();
680 for job_id in job_ids {
681 match fetch_job_info(&job_id).await {
682 Ok(Some(info)) => job_infos.push(info),
683 Ok(None) => {
684 tracing::warn!("Retry jobs: job info not found for ID: {}", job_id);
685 }
686 Err(e) => {
687 tracing::error!("Retry jobs: error fetching job info for {}: {:?}", job_id, e);
688 }
689 }
690 }
691
692 let mut ctx = Context::new();
693 ctx.insert("title", "Retry Jobs");
694 ctx.insert("jobs", &job_infos);
695 ctx.insert("page", &json!({
696 "current": pagination.page,
697 "start": pagination.offset() + 1,
698 "end": (pagination.offset() + pagination.limit).min(pagination.total),
699 "total": pagination.total,
700 "has_prev": pagination.has_prev,
701 "has_next": pagination.has_next,
702 "query": ""
703 }));
704
705 render_template("retry_jobs.html.tera", ctx).await
706}
707
708
709