qrush_engine/services/
metrics_service.rs

1// /Users/snm/ws/xsnm/ws/crates/qrush-engine/src/services/metrics_service.rs
2use actix_web::{web, HttpResponse, Responder};
3use tera::Context;
4use redis::AsyncCommands;
5use chrono::{Duration, Utc};
6use serde_json::json;
7use serde::{Deserialize, Serialize};
8use csv::WriterBuilder;
9use std::collections::HashSet;
10use crate::utils::rdconfig::{get_redis_connection};
11use crate::services::template_service::render_template;
12use crate::utils::pagination::{Pagination, PaginationQuery};
13use crate::utils::jconfig::{deserialize_job, to_job_info, JobInfo, fetch_job_info};
14use crate::utils::renderer::paginate_jobs;
15use crate::utils::constants::{
16    DEFAULT_PAGE,
17    DEFAULT_LIMIT,
18    DELAYED_JOBS_KEY,
19    PREFIX_QUEUE,
20    PREFIX_JOB,
21    QUEUES_SET,
22    QUEUE_CONFIG_PREFIX,
23    SUCCESS_LIST_PREFIX,
24    FAILED_LIST_PREFIX,
25    RETRY_LIST_PREFIX,
26    COUNTER_SUCCESS,
27    COUNTER_FAILED,
28    COUNTER_TOTAL_JOBS,
29    STATS_JOBS_PREFIX,
30    STATS_JOBS_FAILED_PREFIX,
31    LOGS_PREFIX,
32    FAILED_JOBS_LIST,
33    WORKER_PREFIX,
34};
35
36#[derive(Deserialize)]
37pub struct MetricsQuery {
38    pub search: Option<String>,
39    pub page: Option<usize>,
40    pub limit: Option<usize>,
41}
42
43
44pub async fn render_metrics(query: web::Query<MetricsQuery>) -> impl Responder {
45    let mut conn = match get_redis_connection().await {
46        Ok(c) => c,
47        Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
48    };
49
50    // Fetch queues with jobs
51    let active_queues: Vec<String> = conn.smembers(QUEUES_SET).await.unwrap_or_default();
52    
53    // Get configured queues from Redis config keys
54    let config_pattern = format!("{QUEUE_CONFIG_PREFIX}:*");
55    let config_keys: Vec<String> = conn.keys(&config_pattern).await.unwrap_or_default();
56    let configured_queues: Vec<String> = config_keys
57        .into_iter()
58        .filter_map(|key| key.strip_prefix(&format!("{QUEUE_CONFIG_PREFIX}:")).map(String::from))
59        .collect();
60    
61    // Merge and deduplicate queues (configured queues take priority)
62    let mut all_queues = configured_queues;
63    for queue in active_queues {
64        if !all_queues.contains(&queue) {
65            all_queues.push(queue);
66        }
67    }
68    
69    // Apply search filter
70    if let Some(search) = &query.search {
71        let search_lower = search.to_lowercase();
72        all_queues = all_queues
73            .into_iter()
74            .filter(|q| q.to_lowercase().contains(&search_lower))
75            .collect();
76    }
77
78    // Convert query into Pagination
79    let pagination_query = PaginationQuery {
80        page: query.page,
81        limit: query.limit,
82    };
83    let pagination = pagination_query.into_pagination(all_queues.len());
84
85    // Paginate queues
86    let (paginated_queues, pagination) =
87        paginate_jobs(all_queues, pagination.page, pagination.limit).await;
88
89    // Per-queue detailed stats
90    let mut queue_infos = vec![];
91    let mut total_success = 0;
92    let mut total_failed = 0;
93    let mut total_retry = 0;
94    let mut total_pending = 0;
95
96    for queue in &paginated_queues {
97        let success_key = format!("{SUCCESS_LIST_PREFIX}:{}", queue);
98        let failed_key = format!("{FAILED_LIST_PREFIX}:{}", queue);
99        let retry_key = format!("{RETRY_LIST_PREFIX}:{}", queue);
100        let pending_key = format!("{PREFIX_QUEUE}:{}", queue);
101
102        let success: usize = conn.llen(&success_key).await.unwrap_or(0);
103        let failed: usize = conn.llen(&failed_key).await.unwrap_or(0);
104        let retry: usize = conn.llen(&retry_key).await.unwrap_or(0);
105        let pending: usize = conn.llen(&pending_key).await.unwrap_or(0);
106
107        total_success += success;
108        total_failed += failed;
109        total_retry += retry;
110        total_pending += pending;
111
112        queue_infos.push(json!({
113            "name": queue,
114            "success": success,
115            "failed": failed,
116            "retry": retry,
117            "pending": pending
118        }));
119    }
120
121    // Prepare context
122    let mut ctx = Context::new();
123    ctx.insert("title", "All Queues");
124    ctx.insert("queues", &queue_infos);
125
126    ctx.insert("stats", &json!({
127        "success_jobs": total_success,
128        "failed_jobs": total_failed,
129        "retry_jobs": total_retry,
130        "pending_jobs": total_pending,
131    }));
132
133    // Fix query object structure for template
134    ctx.insert("query", &json!({
135        "search": query.search.clone().unwrap_or_default()
136    }));
137
138    ctx.insert("page", &json!({
139        "current": pagination.page,
140        "start": pagination.offset() + 1,
141        "end": (pagination.offset() + pagination.limit).min(pagination.total),
142        "total": pagination.total,
143        "has_prev": pagination.has_prev,
144        "has_next": pagination.has_next,
145        "query": format!("search={}", query.search.clone().unwrap_or_default())
146    }));
147
148    render_template("metrics.html.tera", ctx).await
149}
150
151
152
153pub async fn render_metrics_for_queue(
154    path: web::Path<String>,
155    query: web::Query<PaginationQuery>,
156) -> impl Responder {
157    let queue = path.into_inner();
158
159    let mut conn = match get_redis_connection().await {
160        Ok(c) => c,
161        Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
162    };
163
164    let key = format!("{PREFIX_QUEUE}:{}", queue);
165
166    let all_jobs: Vec<String> = match conn.lrange(&key, 0, -1).await {
167        Ok(jobs) => jobs,
168        Err(_) => vec![], // fallback to empty if queue not found
169    };
170
171    let page = query.page.unwrap_or(DEFAULT_PAGE);
172    let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
173    let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
174
175    // 🟫 5. Collect job info
176    let mut job_infos: Vec<JobInfo> = Vec::new();
177    for job_id in job_ids {
178        match fetch_job_info(&job_id).await {
179            Ok(Some(info)) => job_infos.push(info),
180            Ok(None) => {
181                tracing::warn!("Job info not found for ID: {}", job_id);
182            }
183            Err(e) => {
184                tracing::error!("Failed to fetch job info for ID {}: {:?}", job_id, e);
185            }
186        }
187    }
188
189    let mut ctx = Context::new();
190    ctx.insert("title", &format!("Queue: {}", queue));
191    ctx.insert("queue", &queue);
192    ctx.insert("jobs", &job_infos);
193    ctx.insert("page", &serde_json::json!({
194        "current": pagination.page,
195        "start": pagination.offset() + 1,
196        "end": (pagination.offset() + pagination.limit).min(pagination.total),
197        "total": pagination.total,
198        "has_prev": pagination.has_prev,
199        "has_next": pagination.has_next,
200        "query": "" // Add filters if any, like queue name or status
201    }));
202
203    render_template("queue_metrics.html.tera", ctx).await
204}
205
206
207
208
209pub async fn render_scheduled_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
210    let mut conn = match get_redis_connection().await {
211        Ok(c) => c,
212        Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
213    };
214
215    let now = Utc::now().timestamp();
216    let job_ids: Vec<String> = conn
217        .zrangebyscore(DELAYED_JOBS_KEY, 0, now)
218        .await
219        .unwrap_or_default();
220
221    let mut job_infos = Vec::new();
222
223    for jid in job_ids {
224        if let Ok(data) = conn.get::<_, String>(format!("{PREFIX_JOB}:{}", jid)).await {
225            if let Some(job) = deserialize_job(data).await {
226                job_infos.push(to_job_info(&job, &jid)); // ✅ Fixed: provide both arguments
227            }
228        }
229    }
230
231    let page = query.page.unwrap_or(DEFAULT_PAGE);
232    let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
233    let total = job_infos.len();
234    let pagination = Pagination::new(page, limit, total);
235
236    let start = pagination.offset();
237    let end = (start + limit).min(total);
238    let paginated_job_infos = &job_infos[start..end];
239
240    let mut ctx = Context::new();
241    ctx.insert("title", "Scheduled Jobs");
242    ctx.insert("jobs", &paginated_job_infos); // ✅ Safe for Tera (JobInfo implements Serialize)
243    ctx.insert("pagination", &pagination);
244
245    render_template("scheduled_jobs.html.tera", ctx).await
246}
247
248
249
250
251pub async fn render_delayed_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
252    let mut conn = match get_redis_connection().await {
253        Ok(c) => c,
254        Err(_) => return HttpResponse::InternalServerError().body("Redis connection failed"),
255    };
256
257    // 1. Get all delayed job IDs
258    let job_ids: Vec<String> = match conn.zrange(DELAYED_JOBS_KEY, 0, -1).await {
259        Ok(ids) => ids,
260        Err(_) => vec![],
261    };
262
263    // 2. Fetch job info for each valid ID
264    let mut job_infos = Vec::new();
265    for jid in job_ids {
266        match fetch_job_info(&jid).await {
267            Ok(Some(info)) => job_infos.push(info),
268            Ok(None) => {
269                tracing::warn!("❌ Delayed job not found in Redis for ID: {}", jid);
270            }
271            Err(err) => {
272                tracing::error!("❌ Failed to fetch delayed job ID {}: {:?}", jid, err);
273            }
274        }
275    }
276
277    // 3. Pagination logic
278    let page = query.page.unwrap_or(DEFAULT_PAGE);
279    let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
280    let total = job_infos.len();
281    let pagination = Pagination::new(page, limit, total);
282
283    let start = pagination.offset();
284    let end = (start + limit).min(total);
285    let paginated = &job_infos[start..end];
286
287    // 4. Render Tera template
288    let mut ctx = Context::new();
289    ctx.insert("title", "All Delayed Jobs");
290    ctx.insert("jobs", &paginated);
291    ctx.insert("page", &serde_json::json!({
292        "current": pagination.page,
293        "start": start + 1,
294        "end": end,
295        "total": total,
296        "has_prev": pagination.has_prev,
297        "has_next": pagination.has_next,
298        "query": ""
299    }));
300
301    render_template("delayed_jobs.html.tera", ctx).await
302}
303
304
305
306pub async fn render_dead_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
307    // 1. Connect to Redis
308    let mut conn = match get_redis_connection().await {
309        Ok(c) => c,
310        Err(e) => {
311            tracing::error!("Redis connection failed: {:?}", e);
312            return HttpResponse::InternalServerError().body("Redis connection error");
313        }
314    };
315
316    // 2. Fetch all failed job IDs
317    let all_jobs: Vec<String> = match conn.lrange(FAILED_JOBS_LIST, 0, -1).await {
318        Ok(jobs) => jobs,
319        Err(e) => {
320            tracing::error!("Failed to read failed_jobs list: {:?}", e);
321            vec![]
322        }
323    };
324
325    // 3. Apply pagination
326    let page = query.page.unwrap_or(DEFAULT_PAGE);
327    let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
328    let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
329
330    // 4. Fetch job details
331    let mut job_infos: Vec<JobInfo> = Vec::new();
332    for job_id in job_ids {
333        match fetch_job_info(&job_id).await {
334            Ok(Some(info)) => job_infos.push(info),
335            Ok(None) => {
336                tracing::warn!("Job info not found for ID: {}", job_id);
337            }
338            Err(e) => {
339                tracing::error!("Error fetching job info for ID {}: {:?}", job_id, e);
340            }
341        }
342    }
343
344    // 5. Render using Tera template
345    let mut ctx = Context::new();
346    ctx.insert("title", "Dead Jobs");
347    ctx.insert("jobs", &job_infos);
348    ctx.insert("page", &json!({
349        "current": pagination.page,
350        "start": pagination.offset() + 1,
351        "end": (pagination.offset() + pagination.limit).min(pagination.total),
352        "total": pagination.total,
353        "has_prev": pagination.has_prev,
354        "has_next": pagination.has_next,
355        "query": ""
356    }));
357
358    render_template("dead_jobs.html.tera", ctx).await
359}
360
361
362
363#[derive(Debug, Serialize, Deserialize)]
364struct WorkerStatus {
365    id: String,
366    queues: Vec<String>,
367    last_seen: String,
368    hostname: String,
369    pid: u32,
370}
371
372pub async fn render_worker_status() -> impl Responder {
373    let mut conn = match get_redis_connection().await {
374        Ok(c) => c,
375        Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
376    };
377
378    let worker_pattern = format!("{WORKER_PREFIX}:*");
379    let keys: Vec<String> = conn.keys(&worker_pattern).await.unwrap_or_default();
380    let mut workers = Vec::new();
381
382    for key in keys {
383        if let Ok(status_json) = conn.get::<_, String>(&key).await {
384            if let Ok(mut status) = serde_json::from_str::<WorkerStatus>(&status_json) {
385                status.id = key.replace(&format!("{WORKER_PREFIX}:"), "");
386                workers.push(status);
387            }
388        }
389    }
390
391    let mut ctx = Context::new();
392    ctx.insert("title", "Worker Status");
393    ctx.insert("workers", &workers);
394
395    render_template("workers.html.tera", ctx).await
396}
397
398
399
400pub async fn job_action(payload: web::Json<serde_json::Value>) -> impl Responder {
401    let mut conn = match get_redis_connection().await {
402        Ok(c) => c,
403        Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
404    };
405
406    let action = payload.get("action").and_then(|a| a.as_str()).unwrap_or("");
407    let job_id = payload.get("job_id").and_then(|j| j.as_str()).unwrap_or("");
408    let job_key = format!("{PREFIX_JOB}:{}", job_id);
409
410    match action {
411        "delete" => {
412            if conn.exists(&job_key).await.unwrap_or(false) {
413                let _: () = conn.del(&job_key).await.unwrap_or_default();
414                let logs_key = format!("{LOGS_PREFIX}:default");
415                let _: () = conn.lpush(&logs_key, format!("[{}] 🗑️ Job {} deleted", Utc::now(), job_id)).await.unwrap_or_default();
416                let _: () = conn.ltrim(&logs_key, 0, 99).await.unwrap_or_default();
417                HttpResponse::Ok().json(json!({"status": "deleted"}))
418            } else {
419                HttpResponse::NotFound().json(json!({"error": "job not found"}))
420            }
421        },
422        "retry" | "queue" => {
423            let exists: bool = conn.exists(&job_key).await.unwrap_or(false);
424            if !exists {
425                return HttpResponse::NotFound().json(json!({ "error": "job not found" }));
426            }
427
428            // Fetch queue name from the job
429            let queue: String = conn.hget(&job_key, "queue").await.unwrap_or_else(|_| "default".to_string());
430            let queue_key = format!("{PREFIX_QUEUE}:{}", queue);
431
432            // Always remove job from delayed_jobs if present
433            let _: () = conn.zrem(DELAYED_JOBS_KEY, &job_id).await.unwrap_or_default();
434
435            // Also remove it from the Redis queue (avoid duplicates)
436            let _: () = conn.lrem(&queue_key, 0, &job_id).await.unwrap_or_default();
437
438            // Push job back to the correct queue
439            let _: () = conn.rpush(&queue_key, &job_id).await.unwrap_or_default();
440
441            // Update job metadata
442            let _: () = conn.hset_multiple(&job_key, &[
443                ("status", "queued"),
444                ("retry_at", &Utc::now().to_rfc3339()),
445            ]).await.unwrap_or_default();
446
447            // Remove any stale timestamps
448            let _: () = conn.hdel(&job_key, &["failed_at", "completed_at", "run_at"]).await.unwrap_or_default();
449
450            HttpResponse::Ok().json(json!({ "status": "retried", "queue": queue }))
451        }
452
453        _ => HttpResponse::BadRequest().json(json!({"error": "invalid action"})),
454    }
455}
456
457
458
459pub async fn get_metrics_summary() -> impl Responder {
460    let mut conn = match get_redis_connection().await {
461        Ok(c) => c,
462        Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
463    };
464
465    let total_jobs: usize = conn.get(COUNTER_TOTAL_JOBS).await.unwrap_or(0);
466    let success_jobs: usize = conn.get(COUNTER_SUCCESS).await.unwrap_or(0);
467    let failed_jobs: usize = conn.get(COUNTER_FAILED).await.unwrap_or(0);
468
469    let queues: Vec<String> = conn.smembers(QUEUES_SET).await.unwrap_or_default();
470    let mut scheduled_jobs = 0;
471    for queue in &queues {
472        let len: usize = conn.llen(format!("{PREFIX_QUEUE}:{}", queue)).await.unwrap_or(0);
473        scheduled_jobs += len;
474    }
475
476    let worker_pattern = format!("{WORKER_PREFIX}:*");
477    let worker_keys: Vec<String> = conn.keys(&worker_pattern).await.unwrap_or_default();
478    let active_workers = worker_keys.len();
479
480    // Collect last 7 days stats
481    let mut chart_labels = Vec::new();
482    let mut chart_success = Vec::new();
483    let mut chart_failed = Vec::new();
484
485    for i in (0..7).rev() {
486        let day = Utc::now().date_naive() - Duration::days(i);
487        let date_str = day.format("%Y-%m-%d").to_string();
488
489        let total_key = format!("{STATS_JOBS_PREFIX}:{}", date_str);
490        let failed_key = format!("{STATS_JOBS_FAILED_PREFIX}:{}:failed", date_str);
491
492        let total: usize = conn.get(&total_key).await.unwrap_or(0);
493        let failed: usize = conn.get(&failed_key).await.unwrap_or(0);
494        let success = total.saturating_sub(failed);
495
496        chart_labels.push(day.format("%a").to_string()); // "Mon", "Tue", etc.
497        chart_success.push(success);
498        chart_failed.push(failed);
499    }
500
501    let mut ctx = Context::new();
502    ctx.insert("title", "Metrics Summary");
503    ctx.insert("stats", &json!({
504        "total_jobs": total_jobs,
505        "success_jobs": success_jobs,
506        "failed_jobs": failed_jobs,
507        "scheduled_jobs": scheduled_jobs,
508        "active_workers": active_workers
509    }));
510    ctx.insert("chart", &json!({
511        "labels": chart_labels,
512        "success": chart_success,
513        "failed": chart_failed
514    }));
515
516    render_template("summary.html.tera", ctx).await
517}
518
519
520
521pub async fn export_queue_csv(path: web::Path<String>) -> impl Responder {
522    let queue = path.into_inner();
523    let mut conn = match get_redis_connection().await {
524        Ok(c) => c,
525        Err(e) => {
526            eprintln!("Redis connection error: {:?}", e);
527            return HttpResponse::InternalServerError().body("Failed to connect to Redis");
528        }
529    };
530
531    let key = format!("{PREFIX_QUEUE}:{}", queue);
532    let jobs: Vec<String> = match conn.lrange(&key, 0, -1).await {
533        Ok(j) => j,
534        Err(e) => {
535            eprintln!("Failed to fetch jobs from Redis: {:?}", e);
536            return HttpResponse::InternalServerError().body("Failed to fetch jobs");
537        }
538    };
539
540    let mut job_infos: Vec<JobInfo> = vec![];
541
542    for (i, payload) in jobs.into_iter().enumerate() {
543        match deserialize_job(payload).await {
544            Some(job) => {
545                let id = format!("{}_{}", queue, i); // fallback ID
546                job_infos.push(to_job_info(&job, &id));
547            }
548            None => {
549                eprintln!("Failed to deserialize job at index {}", i);
550                continue;
551            }
552        }
553    }
554
555    let mut wtr = WriterBuilder::new()
556        .has_headers(true)
557        .from_writer(vec![]);
558
559    for job_info in &job_infos {
560        if let Err(e) = wtr.serialize(job_info) {
561            eprintln!("CSV serialization failed: {:?}", e);
562        }
563    }
564
565    let data = match wtr.into_inner() {
566        Ok(bytes) => bytes,
567        Err(e) => {
568            eprintln!("Failed to build CSV output: {:?}", e);
569            return HttpResponse::InternalServerError().body("Failed to generate CSV");
570        }
571    };
572
573    HttpResponse::Ok()
574        .content_type("text/csv")
575        .append_header(("Content-Disposition", format!("attachment; filename=queue_{}.csv", queue)))
576        .body(data)
577}
578
579
580
581
582
583
584
585
586
587pub async fn render_failed_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
588    let mut conn = match get_redis_connection().await {
589        Ok(c) => c,
590        Err(e) => {
591            tracing::error!("Redis connection failed (failed jobs): {:?}", e);
592            return HttpResponse::InternalServerError().body("Redis connection error");
593        }
594    };
595
596    // Collect all queues
597    let queues: Vec<String> = conn.smembers(QUEUES_SET).await.unwrap_or_default();
598
599    // Aggregate failed job IDs from all queues
600    let mut all_jobs: Vec<String> = Vec::new();
601    for queue in &queues {
602        let key = format!("{FAILED_LIST_PREFIX}:{}", queue);
603        let ids: Vec<String> = conn.lrange(&key, 0, -1).await.unwrap_or_default();
604        all_jobs.extend(ids);
605    }
606
607    // Optional: dedupe IDs
608    let all_jobs: Vec<String> = {
609        let mut set = HashSet::new();
610        all_jobs.into_iter().filter(|id| set.insert(id.clone())).collect()
611    };
612
613    let page = query.page.unwrap_or(DEFAULT_PAGE);
614    let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
615    let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
616
617    let mut job_infos: Vec<JobInfo> = Vec::new();
618    for job_id in job_ids {
619        match fetch_job_info(&job_id).await {
620            Ok(Some(info)) => job_infos.push(info),
621            Ok(None) => {
622                tracing::warn!("Failed jobs: job info not found for ID: {}", job_id);
623            }
624            Err(e) => {
625                tracing::error!("Failed jobs: error fetching job info for {}: {:?}", job_id, e);
626            }
627        }
628    }
629
630    let mut ctx = Context::new();
631    ctx.insert("title", "Failed Jobs");
632    ctx.insert("jobs", &job_infos);
633    ctx.insert("page", &json!({
634        "current": pagination.page,
635        "start": pagination.offset() + 1,
636        "end": (pagination.offset() + pagination.limit).min(pagination.total),
637        "total": pagination.total,
638        "has_prev": pagination.has_prev,
639        "has_next": pagination.has_next,
640        "query": ""
641    }));
642
643    render_template("failed_jobs.html.tera", ctx).await
644}
645
646
647
648
649pub async fn render_retry_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
650    let mut conn = match get_redis_connection().await {
651        Ok(c) => c,
652        Err(e) => {
653            tracing::error!("Redis connection failed (retry jobs): {:?}", e);
654            return HttpResponse::InternalServerError().body("Redis connection error");
655        }
656    };
657
658    // Collect all queues
659    let queues: Vec<String> = conn.smembers(QUEUES_SET).await.unwrap_or_default();
660
661    // Aggregate retry job IDs from all queues
662    let mut all_jobs: Vec<String> = Vec::new();
663    for queue in &queues {
664        let key = format!("{RETRY_LIST_PREFIX}:{}", queue);
665        let ids: Vec<String> = conn.lrange(&key, 0, -1).await.unwrap_or_default();
666        all_jobs.extend(ids);
667    }
668
669    // Optional: dedupe IDs
670    let all_jobs: Vec<String> = {
671        let mut set = HashSet::new();
672        all_jobs.into_iter().filter(|id| set.insert(id.clone())).collect()
673    };
674
675    let page = query.page.unwrap_or(DEFAULT_PAGE);
676    let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
677    let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
678
679    let mut job_infos: Vec<JobInfo> = Vec::new();
680    for job_id in job_ids {
681        match fetch_job_info(&job_id).await {
682            Ok(Some(info)) => job_infos.push(info),
683            Ok(None) => {
684                tracing::warn!("Retry jobs: job info not found for ID: {}", job_id);
685            }
686            Err(e) => {
687                tracing::error!("Retry jobs: error fetching job info for {}: {:?}", job_id, e);
688            }
689        }
690    }
691
692    let mut ctx = Context::new();
693    ctx.insert("title", "Retry Jobs");
694    ctx.insert("jobs", &job_infos);
695    ctx.insert("page", &json!({
696        "current": pagination.page,
697        "start": pagination.offset() + 1,
698        "end": (pagination.offset() + pagination.limit).min(pagination.total),
699        "total": pagination.total,
700        "has_prev": pagination.has_prev,
701        "has_next": pagination.has_next,
702        "query": ""
703    }));
704
705    render_template("retry_jobs.html.tera", ctx).await
706}
707
708
709