1use actix_web::{web, HttpResponse, Responder};
3use tera::Context;
4use redis::AsyncCommands;
5use chrono::{Duration, Utc};
6use serde_json::json;
7use serde::{Deserialize, Serialize};
8use csv::WriterBuilder;
9use std::collections::HashSet;
10use crate::utils::rdconfig::{get_redis_connection};
11use crate::services::template_service::render_template;
12use crate::utils::pagination::{Pagination, PaginationQuery};
13use crate::utils::jconfig::{deserialize_job, to_job_info, JobInfo, fetch_job_info};
14use crate::utils::renderer::paginate_jobs;
15use crate::utils::constants::{
16 DEFAULT_PAGE,
17 DEFAULT_LIMIT,
18 DELAYED_JOBS_KEY,
19 PREFIX_QUEUE,
20};
21
22#[derive(Deserialize)]
23pub struct MetricsQuery {
24 pub search: Option<String>,
25 pub page: Option<usize>,
26 pub limit: Option<usize>,
27}
28
29
30pub async fn render_metrics(query: web::Query<MetricsQuery>) -> impl Responder {
31 let mut conn = match get_redis_connection().await {
32 Ok(c) => c,
33 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
34 };
35
36 let active_queues: Vec<String> = conn.smembers("xsm:queues").await.unwrap_or_default();
38
39 let config_keys: Vec<String> = conn.keys("xsm:queue:config:*").await.unwrap_or_default();
41 let configured_queues: Vec<String> = config_keys
42 .into_iter()
43 .filter_map(|key| key.strip_prefix("xsm:queue:config:").map(String::from))
44 .collect();
45
46 let mut all_queues = configured_queues;
48 for queue in active_queues {
49 if !all_queues.contains(&queue) {
50 all_queues.push(queue);
51 }
52 }
53
54 if let Some(search) = &query.search {
56 let search_lower = search.to_lowercase();
57 all_queues = all_queues
58 .into_iter()
59 .filter(|q| q.to_lowercase().contains(&search_lower))
60 .collect();
61 }
62
63 let pagination_query = PaginationQuery {
65 page: query.page,
66 limit: query.limit,
67 };
68 let pagination = pagination_query.into_pagination(all_queues.len());
69
70 let (paginated_queues, pagination) =
72 paginate_jobs(all_queues, pagination.page, pagination.limit).await;
73
74 let mut queue_infos = vec![];
76 let mut total_success = 0;
77 let mut total_failed = 0;
78 let mut total_retry = 0;
79 let mut total_pending = 0;
80
81 for queue in &paginated_queues {
82 let success_key = format!("xsm:success:{}", queue);
83 let failed_key = format!("xsm:failed:{}", queue);
84 let retry_key = format!("xsm:retry:{}", queue);
85 let pending_key = format!("xsm:queue:{}", queue);
86
87 let success: usize = conn.llen(&success_key).await.unwrap_or(0);
88 let failed: usize = conn.llen(&failed_key).await.unwrap_or(0);
89 let retry: usize = conn.llen(&retry_key).await.unwrap_or(0);
90 let pending: usize = conn.llen(&pending_key).await.unwrap_or(0);
91
92 total_success += success;
93 total_failed += failed;
94 total_retry += retry;
95 total_pending += pending;
96
97 queue_infos.push(json!({
98 "name": queue,
99 "success": success,
100 "failed": failed,
101 "retry": retry,
102 "pending": pending
103 }));
104 }
105
106 let mut ctx = Context::new();
108 ctx.insert("title", "All Queues");
109 ctx.insert("queues", &queue_infos);
110
111 ctx.insert("stats", &json!({
112 "success_jobs": total_success,
113 "failed_jobs": total_failed,
114 "retry_jobs": total_retry,
115 "pending_jobs": total_pending,
116 }));
117
118 ctx.insert("query", &json!({
120 "search": query.search.clone().unwrap_or_default()
121 }));
122
123 ctx.insert("page", &json!({
124 "current": pagination.page,
125 "start": pagination.offset() + 1,
126 "end": (pagination.offset() + pagination.limit).min(pagination.total),
127 "total": pagination.total,
128 "has_prev": pagination.has_prev,
129 "has_next": pagination.has_next,
130 "query": format!("search={}", query.search.clone().unwrap_or_default())
131 }));
132
133 render_template("metrics.html.tera", ctx).await
134}
135
136
137
138pub async fn render_metrics_for_queue(
139 path: web::Path<String>,
140 query: web::Query<PaginationQuery>,
141) -> impl Responder {
142 let queue = path.into_inner();
143
144 let mut conn = match get_redis_connection().await {
145 Ok(c) => c,
146 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
147 };
148
149 let key = format!("xsm:queue:{}", queue);
150
151 let all_jobs: Vec<String> = match conn.lrange(&key, 0, -1).await {
152 Ok(jobs) => jobs,
153 Err(_) => vec![], };
155
156 let page = query.page.unwrap_or(DEFAULT_PAGE);
157 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
158 let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
159
160 let mut job_infos: Vec<JobInfo> = Vec::new();
162 for job_id in job_ids {
163 match fetch_job_info(&job_id).await {
164 Ok(Some(info)) => job_infos.push(info),
165 Ok(None) => {
166 tracing::warn!("Job info not found for ID: {}", job_id);
167 }
168 Err(e) => {
169 tracing::error!("Failed to fetch job info for ID {}: {:?}", job_id, e);
170 }
171 }
172 }
173
174 let mut ctx = Context::new();
175 ctx.insert("title", &format!("Queue: {}", queue));
176 ctx.insert("queue", &queue);
177 ctx.insert("jobs", &job_infos);
178 ctx.insert("page", &serde_json::json!({
179 "current": pagination.page,
180 "start": pagination.offset() + 1,
181 "end": (pagination.offset() + pagination.limit).min(pagination.total),
182 "total": pagination.total,
183 "has_prev": pagination.has_prev,
184 "has_next": pagination.has_next,
185 "query": "" }));
187
188 render_template("queue_metrics.html.tera", ctx).await
189}
190
191
192
193
194pub async fn render_scheduled_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
195 let mut conn = match get_redis_connection().await {
196 Ok(c) => c,
197 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
198 };
199
200 let now = Utc::now().timestamp();
201 let job_ids: Vec<String> = conn
202 .zrangebyscore("xsm:delayed", 0, now)
203 .await
204 .unwrap_or_default();
205
206 let mut job_infos = Vec::new();
207
208 for jid in job_ids {
209 if let Ok(data) = conn.get::<_, String>(format!("xsm:job:{}", jid)).await {
210 if let Some(job) = deserialize_job(data).await {
211 job_infos.push(to_job_info(&job, &jid)); }
213 }
214 }
215
216 let page = query.page.unwrap_or(DEFAULT_PAGE);
217 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
218 let total = job_infos.len();
219 let pagination = Pagination::new(page, limit, total);
220
221 let start = pagination.offset();
222 let end = (start + limit).min(total);
223 let paginated_job_infos = &job_infos[start..end];
224
225 let mut ctx = Context::new();
226 ctx.insert("title", "Scheduled Jobs");
227 ctx.insert("jobs", &paginated_job_infos); ctx.insert("pagination", &pagination);
229
230 render_template("scheduled_jobs.html.tera", ctx).await
231}
232
233
234
235
236pub async fn render_delayed_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
237 let mut conn = match get_redis_connection().await {
238 Ok(c) => c,
239 Err(_) => return HttpResponse::InternalServerError().body("Redis connection failed"),
240 };
241
242 let job_ids: Vec<String> = match conn.zrange(DELAYED_JOBS_KEY, 0, -1).await {
244 Ok(ids) => ids,
245 Err(_) => vec![],
246 };
247
248 let mut job_infos = Vec::new();
250 for jid in job_ids {
251 match fetch_job_info(&jid).await {
252 Ok(Some(info)) => job_infos.push(info),
253 Ok(None) => {
254 tracing::warn!("❌ Delayed job not found in Redis for ID: {}", jid);
255 }
256 Err(err) => {
257 tracing::error!("❌ Failed to fetch delayed job ID {}: {:?}", jid, err);
258 }
259 }
260 }
261
262 let page = query.page.unwrap_or(DEFAULT_PAGE);
264 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
265 let total = job_infos.len();
266 let pagination = Pagination::new(page, limit, total);
267
268 let start = pagination.offset();
269 let end = (start + limit).min(total);
270 let paginated = &job_infos[start..end];
271
272 let mut ctx = Context::new();
274 ctx.insert("title", "All Delayed Jobs");
275 ctx.insert("jobs", &paginated);
276 ctx.insert("page", &serde_json::json!({
277 "current": pagination.page,
278 "start": start + 1,
279 "end": end,
280 "total": total,
281 "has_prev": pagination.has_prev,
282 "has_next": pagination.has_next,
283 "query": ""
284 }));
285
286 render_template("delayed_jobs.html.tera", ctx).await
287}
288
289
290
291pub async fn render_dead_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
292 let mut conn = match get_redis_connection().await {
294 Ok(c) => c,
295 Err(e) => {
296 tracing::error!("Redis connection failed: {:?}", e);
297 return HttpResponse::InternalServerError().body("Redis connection error");
298 }
299 };
300
301 let all_jobs: Vec<String> = match conn.lrange("xsm:failed_jobs", 0, -1).await {
303 Ok(jobs) => jobs,
304 Err(e) => {
305 tracing::error!("Failed to read failed_jobs list: {:?}", e);
306 vec![]
307 }
308 };
309
310 let page = query.page.unwrap_or(DEFAULT_PAGE);
312 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
313 let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
314
315 let mut job_infos: Vec<JobInfo> = Vec::new();
317 for job_id in job_ids {
318 match fetch_job_info(&job_id).await {
319 Ok(Some(info)) => job_infos.push(info),
320 Ok(None) => {
321 tracing::warn!("Job info not found for ID: {}", job_id);
322 }
323 Err(e) => {
324 tracing::error!("Error fetching job info for ID {}: {:?}", job_id, e);
325 }
326 }
327 }
328
329 let mut ctx = Context::new();
331 ctx.insert("title", "Dead Jobs");
332 ctx.insert("jobs", &job_infos);
333 ctx.insert("page", &json!({
334 "current": pagination.page,
335 "start": pagination.offset() + 1,
336 "end": (pagination.offset() + pagination.limit).min(pagination.total),
337 "total": pagination.total,
338 "has_prev": pagination.has_prev,
339 "has_next": pagination.has_next,
340 "query": ""
341 }));
342
343 render_template("dead_jobs.html.tera", ctx).await
344}
345
346
347
348#[derive(Debug, Serialize, Deserialize)]
349struct WorkerStatus {
350 id: String,
351 queues: Vec<String>,
352 last_seen: String,
353 hostname: String,
354 pid: u32,
355}
356
357pub async fn render_worker_status() -> impl Responder {
358 let mut conn = match get_redis_connection().await {
359 Ok(c) => c,
360 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
361 };
362
363 let keys: Vec<String> = conn.keys("xsm:worker:*").await.unwrap_or_default();
364 let mut workers = Vec::new();
365
366 for key in keys {
367 if let Ok(status_json) = conn.get::<_, String>(&key).await {
368 if let Ok(mut status) = serde_json::from_str::<WorkerStatus>(&status_json) {
369 status.id = key.replace("xsm:worker:", "");
370 workers.push(status);
371 }
372 }
373 }
374
375 let mut ctx = Context::new();
376 ctx.insert("title", "Worker Status");
377 ctx.insert("workers", &workers);
378
379 render_template("workers.html.tera", ctx).await
380}
381
382
383
384pub async fn job_action(payload: web::Json<serde_json::Value>) -> impl Responder {
385 let mut conn = match get_redis_connection().await {
386 Ok(c) => c,
387 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
388 };
389
390 let action = payload.get("action").and_then(|a| a.as_str()).unwrap_or("");
391 let job_id = payload.get("job_id").and_then(|j| j.as_str()).unwrap_or("");
392 let job_key = format!("xsm:job:{}", job_id);
393
394 match action {
395 "delete" => {
396 if conn.exists(&job_key).await.unwrap_or(false) {
397 let _: () = conn.del(&job_key).await.unwrap_or_default();
398 let _: () = conn.lpush("xsm:logs:default", format!("[{}] 🗑️ Job {} deleted", Utc::now(), job_id)).await.unwrap_or_default();
399 let _: () = conn.ltrim("xsm:logs:default", 0, 99).await.unwrap_or_default();
400 HttpResponse::Ok().json(json!({"status": "deleted"}))
401 } else {
402 HttpResponse::NotFound().json(json!({"error": "job not found"}))
403 }
404 },
405 "retry" | "queue" => {
406 let exists: bool = conn.exists(&job_key).await.unwrap_or(false);
407 if !exists {
408 return HttpResponse::NotFound().json(json!({ "error": "job not found" }));
409 }
410
411 let queue: String = conn.hget(&job_key, "queue").await.unwrap_or_else(|_| "default".to_string());
413 let queue_key = format!("{PREFIX_QUEUE}:{}", queue);
414
415 let _: () = conn.zrem(DELAYED_JOBS_KEY, &job_id).await.unwrap_or_default();
417
418 let _: () = conn.lrem(&queue_key, 0, &job_id).await.unwrap_or_default();
420
421 let _: () = conn.rpush(&queue_key, &job_id).await.unwrap_or_default();
423
424 let _: () = conn.hset_multiple(&job_key, &[
426 ("status", "queued"),
427 ("retry_at", &Utc::now().to_rfc3339()),
428 ]).await.unwrap_or_default();
429
430 let _: () = conn.hdel(&job_key, &["failed_at", "completed_at", "run_at"]).await.unwrap_or_default();
432
433 HttpResponse::Ok().json(json!({ "status": "retried", "queue": queue }))
434 }
435
436 _ => HttpResponse::BadRequest().json(json!({"error": "invalid action"})),
437 }
438}
439
440
441
442pub async fn get_metrics_summary() -> impl Responder {
443 let mut conn = match get_redis_connection().await {
444 Ok(c) => c,
445 Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
446 };
447
448 let total_jobs: usize = conn.get("xsm:qrush:total_jobs").await.unwrap_or(0);
449 let success_jobs: usize = conn.get("xsm:qrush:success").await.unwrap_or(0);
450 let failed_jobs: usize = conn.get("xsm:qrush:failed").await.unwrap_or(0);
451
452 let queues: Vec<String> = conn.smembers("xsm:queues").await.unwrap_or_default();
453 let mut scheduled_jobs = 0;
454 for queue in &queues {
455 let len: usize = conn.llen(format!("xsm:queue:{}", queue)).await.unwrap_or(0);
456 scheduled_jobs += len;
457 }
458
459 let worker_keys: Vec<String> = conn.keys("xsm:worker:*").await.unwrap_or_default();
460 let active_workers = worker_keys.len();
461
462 let mut chart_labels = Vec::new();
464 let mut chart_success = Vec::new();
465 let mut chart_failed = Vec::new();
466
467 for i in (0..7).rev() {
468 let day = Utc::now().date_naive() - Duration::days(i);
469 let date_str = day.format("%Y-%m-%d").to_string();
470
471 let total_key = format!("xsm:stats:jobs:{}", date_str);
472 let failed_key = format!("xsm:stats:jobs:{}:failed", date_str);
473
474 let total: usize = conn.get(&total_key).await.unwrap_or(0);
475 let failed: usize = conn.get(&failed_key).await.unwrap_or(0);
476 let success = total.saturating_sub(failed);
477
478 chart_labels.push(day.format("%a").to_string()); chart_success.push(success);
480 chart_failed.push(failed);
481 }
482
483 let mut ctx = Context::new();
484 ctx.insert("title", "Metrics Summary");
485 ctx.insert("stats", &json!({
486 "total_jobs": total_jobs,
487 "success_jobs": success_jobs,
488 "failed_jobs": failed_jobs,
489 "scheduled_jobs": scheduled_jobs,
490 "active_workers": active_workers
491 }));
492 ctx.insert("chart", &json!({
493 "labels": chart_labels,
494 "success": chart_success,
495 "failed": chart_failed
496 }));
497
498 render_template("summary.html.tera", ctx).await
499}
500
501
502
503pub async fn export_queue_csv(path: web::Path<String>) -> impl Responder {
504 let queue = path.into_inner();
505 let mut conn = match get_redis_connection().await {
506 Ok(c) => c,
507 Err(e) => {
508 eprintln!("Redis connection error: {:?}", e);
509 return HttpResponse::InternalServerError().body("Failed to connect to Redis");
510 }
511 };
512
513 let key = format!("xsm:queue:{}", queue);
514 let jobs: Vec<String> = match conn.lrange(&key, 0, -1).await {
515 Ok(j) => j,
516 Err(e) => {
517 eprintln!("Failed to fetch jobs from Redis: {:?}", e);
518 return HttpResponse::InternalServerError().body("Failed to fetch jobs");
519 }
520 };
521
522 let mut job_infos: Vec<JobInfo> = vec![];
523
524 for (i, payload) in jobs.into_iter().enumerate() {
525 match deserialize_job(payload).await {
526 Some(job) => {
527 let id = format!("{}_{}", queue, i); job_infos.push(to_job_info(&job, &id));
529 }
530 None => {
531 eprintln!("Failed to deserialize job at index {}", i);
532 continue;
533 }
534 }
535 }
536
537 let mut wtr = WriterBuilder::new()
538 .has_headers(true)
539 .from_writer(vec![]);
540
541 for job_info in &job_infos {
542 if let Err(e) = wtr.serialize(job_info) {
543 eprintln!("CSV serialization failed: {:?}", e);
544 }
545 }
546
547 let data = match wtr.into_inner() {
548 Ok(bytes) => bytes,
549 Err(e) => {
550 eprintln!("Failed to build CSV output: {:?}", e);
551 return HttpResponse::InternalServerError().body("Failed to generate CSV");
552 }
553 };
554
555 HttpResponse::Ok()
556 .content_type("text/csv")
557 .append_header(("Content-Disposition", format!("attachment; filename=queue_{}.csv", queue)))
558 .body(data)
559}
560
561
562
563
564
565
566
567
568
569pub async fn render_failed_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
570 let mut conn = match get_redis_connection().await {
571 Ok(c) => c,
572 Err(e) => {
573 tracing::error!("Redis connection failed (failed jobs): {:?}", e);
574 return HttpResponse::InternalServerError().body("Redis connection error");
575 }
576 };
577
578 let queues: Vec<String> = conn.smembers("xsm:queues").await.unwrap_or_default();
580
581 let mut all_jobs: Vec<String> = Vec::new();
583 for queue in &queues {
584 let key = format!("xsm:failed:{}", queue);
585 let ids: Vec<String> = conn.lrange(&key, 0, -1).await.unwrap_or_default();
586 all_jobs.extend(ids);
587 }
588
589 let all_jobs: Vec<String> = {
591 let mut set = HashSet::new();
592 all_jobs.into_iter().filter(|id| set.insert(id.clone())).collect()
593 };
594
595 let page = query.page.unwrap_or(DEFAULT_PAGE);
596 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
597 let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
598
599 let mut job_infos: Vec<JobInfo> = Vec::new();
600 for job_id in job_ids {
601 match fetch_job_info(&job_id).await {
602 Ok(Some(info)) => job_infos.push(info),
603 Ok(None) => {
604 tracing::warn!("Failed jobs: job info not found for ID: {}", job_id);
605 }
606 Err(e) => {
607 tracing::error!("Failed jobs: error fetching job info for {}: {:?}", job_id, e);
608 }
609 }
610 }
611
612 let mut ctx = Context::new();
613 ctx.insert("title", "Failed Jobs");
614 ctx.insert("jobs", &job_infos);
615 ctx.insert("page", &json!({
616 "current": pagination.page,
617 "start": pagination.offset() + 1,
618 "end": (pagination.offset() + pagination.limit).min(pagination.total),
619 "total": pagination.total,
620 "has_prev": pagination.has_prev,
621 "has_next": pagination.has_next,
622 "query": ""
623 }));
624
625 render_template("failed_jobs.html.tera", ctx).await
626}
627
628
629
630
631pub async fn render_retry_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
632 let mut conn = match get_redis_connection().await {
633 Ok(c) => c,
634 Err(e) => {
635 tracing::error!("Redis connection failed (retry jobs): {:?}", e);
636 return HttpResponse::InternalServerError().body("Redis connection error");
637 }
638 };
639
640 let queues: Vec<String> = conn.smembers("xsm:queues").await.unwrap_or_default();
642
643 let mut all_jobs: Vec<String> = Vec::new();
645 for queue in &queues {
646 let key = format!("xsm:retry:{}", queue);
647 let ids: Vec<String> = conn.lrange(&key, 0, -1).await.unwrap_or_default();
648 all_jobs.extend(ids);
649 }
650
651 let all_jobs: Vec<String> = {
653 let mut set = HashSet::new();
654 all_jobs.into_iter().filter(|id| set.insert(id.clone())).collect()
655 };
656
657 let page = query.page.unwrap_or(DEFAULT_PAGE);
658 let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
659 let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
660
661 let mut job_infos: Vec<JobInfo> = Vec::new();
662 for job_id in job_ids {
663 match fetch_job_info(&job_id).await {
664 Ok(Some(info)) => job_infos.push(info),
665 Ok(None) => {
666 tracing::warn!("Retry jobs: job info not found for ID: {}", job_id);
667 }
668 Err(e) => {
669 tracing::error!("Retry jobs: error fetching job info for {}: {:?}", job_id, e);
670 }
671 }
672 }
673
674 let mut ctx = Context::new();
675 ctx.insert("title", "Retry Jobs");
676 ctx.insert("jobs", &job_infos);
677 ctx.insert("page", &json!({
678 "current": pagination.page,
679 "start": pagination.offset() + 1,
680 "end": (pagination.offset() + pagination.limit).min(pagination.total),
681 "total": pagination.total,
682 "has_prev": pagination.has_prev,
683 "has_next": pagination.has_next,
684 "query": ""
685 }));
686
687 render_template("retry_jobs.html.tera", ctx).await
688}
689
690
691