1use actix_web::{web, HttpResponse, Responder};
3use tera::Context;
4use redis::AsyncCommands;
5use chrono::{Duration, Utc};
6use serde_json::json;
7use serde::{Deserialize, Serialize};
8use csv::WriterBuilder;
9use std::collections::HashSet;
10use crate::utils::rdconfig::{get_redis_connection};
11use crate::services::template_service::render_template;
12use crate::utils::pagination::{Pagination, PaginationQuery};
13use crate::utils::jconfig::{deserialize_job, to_job_info, JobInfo, fetch_job_info};
14use crate::utils::renderer::paginate_jobs;
15use crate::utils::constants::{
16 DEFAULT_PAGE,
17 DEFAULT_LIMIT,
18 DELAYED_JOBS_KEY,
19 PREFIX_QUEUE,
20};
21
22#[derive(Deserialize)]
23pub struct MetricsQuery {
24 pub search: Option<String>,
25 pub page: Option<usize>,
26 pub limit: Option<usize>,
27}
28
29
30pub async fn render_metrics(query: web::Query<MetricsQuery>) -> impl Responder {
31 let mut conn = match get_redis_connection().await {
32 Ok(c) => c,
33 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
34 };
35
36 let active_queues: Vec<String> = conn.smembers("snm:queues").await.unwrap_or_default();
38
39 let config_keys: Vec<String> = conn.keys("snm:queue:config:*").await.unwrap_or_default();
41 let configured_queues: Vec<String> = config_keys
42 .into_iter()
43 .filter_map(|key| key.strip_prefix("snm:queue:config:").map(String::from))
44 .collect();
45
46 let mut all_queues = configured_queues;
48 for queue in active_queues {
49 if !all_queues.contains(&queue) {
50 all_queues.push(queue);
51 }
52 }
53
54 if let Some(search) = &query.search {
56 let search_lower = search.to_lowercase();
57 all_queues = all_queues
58 .into_iter()
59 .filter(|q| q.to_lowercase().contains(&search_lower))
60 .collect();
61 }
62
63 let pagination_query = PaginationQuery {
65 page: query.page,
66 limit: query.limit,
67 };
68 let pagination = pagination_query.into_pagination(all_queues.len());
69
70 let (paginated_queues, pagination) =
72 paginate_jobs(all_queues, pagination.page, pagination.limit).await;
73
74 let mut queue_infos = vec![];
76 let mut total_success = 0;
77 let mut total_failed = 0;
78 let mut total_retry = 0;
79 let mut total_pending = 0;
80
81 for queue in &paginated_queues {
82 let success_key = format!("snm:success:{}", queue);
83 let failed_key = format!("snm:failed:{}", queue);
84 let retry_key = format!("snm:retry:{}", queue);
85 let pending_key = format!("snm:queue:{}", queue);
86
87 let success: usize = conn.llen(&success_key).await.unwrap_or(0);
88 let failed: usize = conn.llen(&failed_key).await.unwrap_or(0);
89 let retry: usize = conn.llen(&retry_key).await.unwrap_or(0);
90 let pending: usize = conn.llen(&pending_key).await.unwrap_or(0);
91
92 total_success += success;
93 total_failed += failed;
94 total_retry += retry;
95 total_pending += pending;
96
97 queue_infos.push(json!({
98 "name": queue,
99 "success": success,
100 "failed": failed,
101 "retry": retry,
102 "pending": pending
103 }));
104 }
105
106 let mut ctx = Context::new();
108 ctx.insert("title", "All Queues");
109 ctx.insert("queues", &queue_infos);
110
111 ctx.insert("stats", &json!({
112 "success_jobs": total_success,
113 "failed_jobs": total_failed,
114 "retry_jobs": total_retry,
115 "pending_jobs": total_pending,
116 }));
117
118 ctx.insert("query", &json!({
120 "search": query.search.clone().unwrap_or_default()
121 }));
122
123 ctx.insert("page", &json!({
124 "current": pagination.page,
125 "start": pagination.offset() + 1,
126 "end": (pagination.offset() + pagination.limit).min(pagination.total),
127 "total": pagination.total,
128 "has_prev": pagination.has_prev,
129 "has_next": pagination.has_next,
130 "query": format!("search={}", query.search.clone().unwrap_or_default())
131 }));
132
133 render_template("metrics.html.tera", ctx).await
134}
135
136
137
138pub async fn render_metrics_for_queue(
139 path: web::Path<String>,
140 query: web::Query<PaginationQuery>,
141) -> impl Responder {
142 let queue = path.into_inner();
143
144 let mut conn = match get_redis_connection().await {
145 Ok(c) => c,
146 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
147 };
148
149 let key = format!("snm:queue:{}", queue);
150
151 let all_jobs: Vec<String> = match conn.lrange(&key, 0, -1).await {
152 Ok(jobs) => jobs,
153 Err(_) => vec![], };
155
156 let page = query.page.unwrap_or(DEFAULT_PAGE);
157 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
158 let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
159
160 let mut job_infos: Vec<JobInfo> = Vec::new();
162 for job_id in job_ids {
163 match fetch_job_info(&job_id).await {
164 Ok(Some(info)) => job_infos.push(info),
165 Ok(None) => {
166 tracing::warn!("Job info not found for ID: {}", job_id);
167 }
168 Err(e) => {
169 tracing::error!("Failed to fetch job info for ID {}: {:?}", job_id, e);
170 }
171 }
172 }
173
174 let mut ctx = Context::new();
175 ctx.insert("title", &format!("Queue: {}", queue));
176 ctx.insert("queue", &queue);
177 ctx.insert("jobs", &job_infos);
178 ctx.insert("page", &serde_json::json!({
179 "current": pagination.page,
180 "start": pagination.offset() + 1,
181 "end": (pagination.offset() + pagination.limit).min(pagination.total),
182 "total": pagination.total,
183 "has_prev": pagination.has_prev,
184 "has_next": pagination.has_next,
185 "query": "" }));
187
188 render_template("queue_metrics.html.tera", ctx).await
189}
190
191
192
193
194pub async fn render_scheduled_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
195 let mut conn = match get_redis_connection().await {
196 Ok(c) => c,
197 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
198 };
199
200 let now = Utc::now().timestamp();
201 let job_ids: Vec<String> = conn
202 .zrangebyscore("snm:delayed", 0, now)
203 .await
204 .unwrap_or_default();
205
206 let mut job_infos = Vec::new();
207
208 for jid in job_ids {
209 if let Ok(data) = conn.get::<_, String>(format!("snm:job:{}", jid)).await {
210 if let Some(job) = deserialize_job(data).await {
211 job_infos.push(to_job_info(&job, &jid)); }
213 }
214 }
215
216 let page = query.page.unwrap_or(DEFAULT_PAGE);
217 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
218 let total = job_infos.len();
219 let pagination = Pagination::new(page, limit, total);
220
221 let start = pagination.offset();
222 let end = (start + limit).min(total);
223 let paginated_job_infos = &job_infos[start..end];
224
225 let mut ctx = Context::new();
226 ctx.insert("title", "Scheduled Jobs");
227 ctx.insert("jobs", &paginated_job_infos); ctx.insert("pagination", &pagination);
229
230 render_template("scheduled_jobs.html.tera", ctx).await
231}
232
233
234
235
236pub async fn render_delayed_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
237 let mut conn = match get_redis_connection().await {
238 Ok(c) => c,
239 Err(_) => return HttpResponse::InternalServerError().body("Redis connection failed"),
240 };
241
242 let job_ids: Vec<String> = match conn.zrange(DELAYED_JOBS_KEY, 0, -1).await {
244 Ok(ids) => ids,
245 Err(_) => vec![],
246 };
247
248 let mut job_infos = Vec::new();
250 for jid in job_ids {
251 match fetch_job_info(&jid).await {
252 Ok(Some(info)) => job_infos.push(info),
253 Ok(None) => {
254 tracing::warn!("❌ Delayed job not found in Redis for ID: {}", jid);
255 }
256 Err(err) => {
257 tracing::error!("❌ Failed to fetch delayed job ID {}: {:?}", jid, err);
258 }
259 }
260 }
261
262 let page = query.page.unwrap_or(DEFAULT_PAGE);
264 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
265 let total = job_infos.len();
266 let pagination = Pagination::new(page, limit, total);
267
268 let start = pagination.offset();
269 let end = (start + limit).min(total);
270 let paginated = &job_infos[start..end];
271
272 let mut ctx = Context::new();
274 ctx.insert("title", "All Delayed Jobs");
275 ctx.insert("jobs", &paginated);
276 ctx.insert("page", &serde_json::json!({
277 "current": pagination.page,
278 "start": start + 1,
279 "end": end,
280 "total": total,
281 "has_prev": pagination.has_prev,
282 "has_next": pagination.has_next,
283 "query": ""
284 }));
285
286 render_template("delayed_jobs.html.tera", ctx).await
287}
288
289
290
291pub async fn render_dead_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
292 let mut conn = match get_redis_connection().await {
294 Ok(c) => c,
295 Err(e) => {
296 tracing::error!("Redis connection failed: {:?}", e);
297 return HttpResponse::InternalServerError().body("Redis connection error");
298 }
299 };
300
301 let all_jobs: Vec<String> = match conn.lrange("snm:failed_jobs", 0, -1).await {
303 Ok(jobs) => jobs,
304 Err(e) => {
305 tracing::error!("Failed to read failed_jobs list: {:?}", e);
306 vec![]
307 }
308 };
309
310 let page = query.page.unwrap_or(DEFAULT_PAGE);
312 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
313 let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
314
315 let mut job_infos: Vec<JobInfo> = Vec::new();
317 for job_id in job_ids {
318 match fetch_job_info(&job_id).await {
319 Ok(Some(info)) => job_infos.push(info),
320 Ok(None) => {
321 tracing::warn!("Job info not found for ID: {}", job_id);
322 }
323 Err(e) => {
324 tracing::error!("Error fetching job info for ID {}: {:?}", job_id, e);
325 }
326 }
327 }
328
329 let mut ctx = Context::new();
331 ctx.insert("title", "Dead Jobs");
332 ctx.insert("jobs", &job_infos);
333 ctx.insert("page", &json!({
334 "current": pagination.page,
335 "start": pagination.offset() + 1,
336 "end": (pagination.offset() + pagination.limit).min(pagination.total),
337 "total": pagination.total,
338 "has_prev": pagination.has_prev,
339 "has_next": pagination.has_next,
340 "query": ""
341 }));
342
343 render_template("dead_jobs.html.tera", ctx).await
344}
345
346
347
348#[derive(Debug, Serialize, Deserialize)]
349struct WorkerStatus {
350 id: String,
351 queues: Vec<String>,
352 last_seen: String,
353 hostname: String,
354 pid: u32,
355}
356
357pub async fn render_worker_status() -> impl Responder {
358 let mut conn = match get_redis_connection().await {
359 Ok(c) => c,
360 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
361 };
362
363 let keys: Vec<String> = conn.keys("snm:worker:*").await.unwrap_or_default();
364 let mut workers = Vec::new();
365
366 for key in keys {
367 if let Ok(status_json) = conn.get::<_, String>(&key).await {
368 if let Ok(mut status) = serde_json::from_str::<WorkerStatus>(&status_json) {
369 status.id = key.replace("snm:worker:", "");
370 workers.push(status);
371 }
372 }
373 }
374
375 let mut ctx = Context::new();
376 ctx.insert("title", "Worker Status");
377 ctx.insert("workers", &workers);
378
379 render_template("workers.html.tera", ctx).await
380}
381
382
383
384pub async fn job_action(payload: web::Json<serde_json::Value>) -> impl Responder {
385 let mut conn = match get_redis_connection().await {
386 Ok(c) => c,
387 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
388 };
389
390 let action = payload.get("action").and_then(|a| a.as_str()).unwrap_or("");
391 let job_id = payload.get("job_id").and_then(|j| j.as_str()).unwrap_or("");
392 let job_key = format!("snm:job:{}", job_id);
393
394 match action {
395 "delete" => {
396 if conn.exists(&job_key).await.unwrap_or(false) {
397 let _: () = conn.del(&job_key).await.unwrap_or_default();
398 let _: () = conn.lpush("snm:logs:default", format!("[{}] 🗑️ Job {} deleted", Utc::now(), job_id)).await.unwrap_or_default();
399 let _: () = conn.ltrim("snm:logs:default", 0, 99).await.unwrap_or_default();
400 HttpResponse::Ok().json(json!({"status": "deleted"}))
401 } else {
402 HttpResponse::NotFound().json(json!({"error": "job not found"}))
403 }
404 },
405 "retry" | "queue" => {
406 let exists: bool = conn.exists(&job_key).await.unwrap_or(false);
407 if !exists {
408 return HttpResponse::NotFound().json(json!({ "error": "job not found" }));
409 }
410
411 let queue: String = conn.hget(&job_key, "queue").await.unwrap_or_else(|_| "default".to_string());
413 let queue_key = format!("{PREFIX_QUEUE}:{}", queue);
414
415 let _: () = conn.zrem(DELAYED_JOBS_KEY, &job_id).await.unwrap_or_default();
417
418 let _: () = conn.lrem(&queue_key, 0, &job_id).await.unwrap_or_default();
420
421 let _: () = conn.rpush(&queue_key, &job_id).await.unwrap_or_default();
423
424 let _: () = conn.hset_multiple(&job_key, &[
426 ("status", "queued"),
427 ("retry_at", &Utc::now().to_rfc3339()),
428 ]).await.unwrap_or_default();
429
430 let _: () = conn.hdel(&job_key, &["failed_at", "completed_at", "run_at"]).await.unwrap_or_default();
432
433 HttpResponse::Ok().json(json!({ "status": "retried", "queue": queue }))
434 }
435
436 _ => HttpResponse::BadRequest().json(json!({"error": "invalid action"})),
437 }
438}
439
440
441
442pub async fn get_metrics_summary() -> impl Responder {
443 let mut conn = match get_redis_connection().await {
444 Ok(c) => c,
445 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
446 };
447
448 let total_jobs: usize = conn.get("snm:qrush:total_jobs").await.unwrap_or(0);
449 let success_jobs: usize = conn.get("snm:qrush:success").await.unwrap_or(0);
450 let failed_jobs: usize = conn.get("snm:qrush:failed").await.unwrap_or(0);
451
452 let queues: Vec<String> = conn.smembers("snm:queues").await.unwrap_or_default();
453 let mut scheduled_jobs = 0;
454 for queue in &queues {
455 let len: usize = conn.llen(format!("snm:queue:{}", queue)).await.unwrap_or(0);
456 scheduled_jobs += len;
457 }
458
459 let worker_keys: Vec<String> = conn.keys("snm:worker:*").await.unwrap_or_default();
460 let active_workers = worker_keys.len();
461
462 let mut chart_labels = Vec::new();
464 let mut chart_success = Vec::new();
465 let mut chart_failed = Vec::new();
466
467 for i in (0..7).rev() {
468 let day = Utc::now().date_naive() - Duration::days(i);
469 let date_str = day.format("%Y-%m-%d").to_string();
470
471 let total_key = format!("snm:stats:jobs:{}", date_str);
472 let failed_key = format!("snm:stats:jobs:{}:failed", date_str);
473
474 let total: usize = conn.get(&total_key).await.unwrap_or(0);
475 let failed: usize = conn.get(&failed_key).await.unwrap_or(0);
476 let success = total.saturating_sub(failed);
477
478 chart_labels.push(day.format("%a").to_string()); chart_success.push(success);
480 chart_failed.push(failed);
481 }
482
483 let mut ctx = Context::new();
484 ctx.insert("title", "Metrics Summary");
485 ctx.insert("stats", &json!({
486 "total_jobs": total_jobs,
487 "success_jobs": success_jobs,
488 "failed_jobs": failed_jobs,
489 "scheduled_jobs": scheduled_jobs,
490 "active_workers": active_workers
491 }));
492 ctx.insert("chart", &json!({
493 "labels": chart_labels,
494 "success": chart_success,
495 "failed": chart_failed
496 }));
497
498 render_template("summary.html.tera", ctx).await
499}
500
501
502pub async fn export_queue_csv(path: web::Path<String>) -> impl Responder {
554 let queue = path.into_inner();
555 let mut conn = match get_redis_connection().await {
556 Ok(c) => c,
557 Err(e) => {
558 eprintln!("Redis connection error: {:?}", e);
559 return HttpResponse::InternalServerError().body("Failed to connect to Redis");
560 }
561 };
562
563 let key = format!("snm:queue:{}", queue);
564 let jobs: Vec<String> = match conn.lrange(&key, 0, -1).await {
565 Ok(j) => j,
566 Err(e) => {
567 eprintln!("Failed to fetch jobs from Redis: {:?}", e);
568 return HttpResponse::InternalServerError().body("Failed to fetch jobs");
569 }
570 };
571
572 let mut job_infos: Vec<JobInfo> = vec![];
573
574 for (i, payload) in jobs.into_iter().enumerate() {
575 match deserialize_job(payload).await {
576 Some(job) => {
577 let id = format!("{}_{}", queue, i); job_infos.push(to_job_info(&job, &id));
579 }
580 None => {
581 eprintln!("Failed to deserialize job at index {}", i);
582 continue;
583 }
584 }
585 }
586
587 let mut wtr = WriterBuilder::new()
588 .has_headers(true)
589 .from_writer(vec![]);
590
591 for job_info in &job_infos {
592 if let Err(e) = wtr.serialize(job_info) {
593 eprintln!("CSV serialization failed: {:?}", e);
594 }
595 }
596
597 let data = match wtr.into_inner() {
598 Ok(bytes) => bytes,
599 Err(e) => {
600 eprintln!("Failed to build CSV output: {:?}", e);
601 return HttpResponse::InternalServerError().body("Failed to generate CSV");
602 }
603 };
604
605 HttpResponse::Ok()
606 .content_type("text/csv")
607 .append_header(("Content-Disposition", format!("attachment; filename=queue_{}.csv", queue)))
608 .body(data)
609}
610
611
612
613
614
615
616
617
618
619pub async fn render_failed_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
620 let mut conn = match get_redis_connection().await {
621 Ok(c) => c,
622 Err(e) => {
623 tracing::error!("Redis connection failed (failed jobs): {:?}", e);
624 return HttpResponse::InternalServerError().body("Redis connection error");
625 }
626 };
627
628 let queues: Vec<String> = conn.smembers("snm:queues").await.unwrap_or_default();
630
631 let mut all_jobs: Vec<String> = Vec::new();
633 for queue in &queues {
634 let key = format!("snm:failed:{}", queue);
635 let ids: Vec<String> = conn.lrange(&key, 0, -1).await.unwrap_or_default();
636 all_jobs.extend(ids);
637 }
638
639 let all_jobs: Vec<String> = {
641 let mut set = HashSet::new();
642 all_jobs.into_iter().filter(|id| set.insert(id.clone())).collect()
643 };
644
645 let page = query.page.unwrap_or(DEFAULT_PAGE);
646 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
647 let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
648
649 let mut job_infos: Vec<JobInfo> = Vec::new();
650 for job_id in job_ids {
651 match fetch_job_info(&job_id).await {
652 Ok(Some(info)) => job_infos.push(info),
653 Ok(None) => {
654 tracing::warn!("Failed jobs: job info not found for ID: {}", job_id);
655 }
656 Err(e) => {
657 tracing::error!("Failed jobs: error fetching job info for {}: {:?}", job_id, e);
658 }
659 }
660 }
661
662 let mut ctx = Context::new();
663 ctx.insert("title", "Failed Jobs");
664 ctx.insert("jobs", &job_infos);
665 ctx.insert("page", &json!({
666 "current": pagination.page,
667 "start": pagination.offset() + 1,
668 "end": (pagination.offset() + pagination.limit).min(pagination.total),
669 "total": pagination.total,
670 "has_prev": pagination.has_prev,
671 "has_next": pagination.has_next,
672 "query": ""
673 }));
674
675 render_template("failed_jobs.html.tera", ctx).await
676}
677
678
679
680
681pub async fn render_retry_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
682 let mut conn = match get_redis_connection().await {
683 Ok(c) => c,
684 Err(e) => {
685 tracing::error!("Redis connection failed (retry jobs): {:?}", e);
686 return HttpResponse::InternalServerError().body("Redis connection error");
687 }
688 };
689
690 let queues: Vec<String> = conn.smembers("snm:queues").await.unwrap_or_default();
692
693 let mut all_jobs: Vec<String> = Vec::new();
695 for queue in &queues {
696 let key = format!("snm:retry:{}", queue);
697 let ids: Vec<String> = conn.lrange(&key, 0, -1).await.unwrap_or_default();
698 all_jobs.extend(ids);
699 }
700
701 let all_jobs: Vec<String> = {
703 let mut set = HashSet::new();
704 all_jobs.into_iter().filter(|id| set.insert(id.clone())).collect()
705 };
706
707 let page = query.page.unwrap_or(DEFAULT_PAGE);
708 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
709 let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
710
711 let mut job_infos: Vec<JobInfo> = Vec::new();
712 for job_id in job_ids {
713 match fetch_job_info(&job_id).await {
714 Ok(Some(info)) => job_infos.push(info),
715 Ok(None) => {
716 tracing::warn!("Retry jobs: job info not found for ID: {}", job_id);
717 }
718 Err(e) => {
719 tracing::error!("Retry jobs: error fetching job info for {}: {:?}", job_id, e);
720 }
721 }
722 }
723
724 let mut ctx = Context::new();
725 ctx.insert("title", "Retry Jobs");
726 ctx.insert("jobs", &job_infos);
727 ctx.insert("page", &json!({
728 "current": pagination.page,
729 "start": pagination.offset() + 1,
730 "end": (pagination.offset() + pagination.limit).min(pagination.total),
731 "total": pagination.total,
732 "has_prev": pagination.has_prev,
733 "has_next": pagination.has_next,
734 "query": ""
735 }));
736
737 render_template("retry_jobs.html.tera", ctx).await
738}
739