qrush_engine/services/
metrics_service.rs

1// /Users/snm/ws/xsnm/ws/crates/qrush-engine/src/services/metrics_service.rs
2use actix_web::{web, HttpResponse, Responder};
3use tera::Context;
4use redis::AsyncCommands;
5use chrono::{Duration, Utc};
6use serde_json::json;
7use serde::{Deserialize, Serialize};
8use csv::WriterBuilder;
9use std::collections::HashSet;
10use crate::utils::rdconfig::{get_redis_connection};
11use crate::services::template_service::render_template;
12use crate::utils::pagination::{Pagination, PaginationQuery};
13use crate::utils::jconfig::{deserialize_job, to_job_info, JobInfo, fetch_job_info};
14use crate::utils::renderer::paginate_jobs;
15use crate::utils::constants::{
16    DEFAULT_PAGE,
17    DEFAULT_LIMIT,
18    DELAYED_JOBS_KEY,
19    PREFIX_QUEUE,
20};
21
22#[derive(Deserialize)]
23pub struct MetricsQuery {
24    pub search: Option<String>,
25    pub page: Option<usize>,
26    pub limit: Option<usize>,
27}
28
29
30pub async fn render_metrics(query: web::Query<MetricsQuery>) -> impl Responder {
31    let mut conn = match get_redis_connection().await {
32        Ok(c) => c,
33        Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
34    };
35
36    // Fetch queues with jobs
37    let active_queues: Vec<String> = conn.smembers("xsm:queues").await.unwrap_or_default();
38    
39    // Get configured queues from Redis config keys
40    let config_keys: Vec<String> = conn.keys("xsm:queue:config:*").await.unwrap_or_default();
41    let configured_queues: Vec<String> = config_keys
42        .into_iter()
43        .filter_map(|key| key.strip_prefix("xsm:queue:config:").map(String::from))
44        .collect();
45    
46    // Merge and deduplicate queues (configured queues take priority)
47    let mut all_queues = configured_queues;
48    for queue in active_queues {
49        if !all_queues.contains(&queue) {
50            all_queues.push(queue);
51        }
52    }
53    
54    // Apply search filter
55    if let Some(search) = &query.search {
56        let search_lower = search.to_lowercase();
57        all_queues = all_queues
58            .into_iter()
59            .filter(|q| q.to_lowercase().contains(&search_lower))
60            .collect();
61    }
62
63    // Convert query into Pagination
64    let pagination_query = PaginationQuery {
65        page: query.page,
66        limit: query.limit,
67    };
68    let pagination = pagination_query.into_pagination(all_queues.len());
69
70    // Paginate queues
71    let (paginated_queues, pagination) =
72        paginate_jobs(all_queues, pagination.page, pagination.limit).await;
73
74    // Per-queue detailed stats
75    let mut queue_infos = vec![];
76    let mut total_success = 0;
77    let mut total_failed = 0;
78    let mut total_retry = 0;
79    let mut total_pending = 0;
80
81    for queue in &paginated_queues {
82        let success_key = format!("xsm:success:{}", queue);
83        let failed_key = format!("xsm:failed:{}", queue);
84        let retry_key = format!("xsm:retry:{}", queue);
85        let pending_key = format!("xsm:queue:{}", queue);
86
87        let success: usize = conn.llen(&success_key).await.unwrap_or(0);
88        let failed: usize = conn.llen(&failed_key).await.unwrap_or(0);
89        let retry: usize = conn.llen(&retry_key).await.unwrap_or(0);
90        let pending: usize = conn.llen(&pending_key).await.unwrap_or(0);
91
92        total_success += success;
93        total_failed += failed;
94        total_retry += retry;
95        total_pending += pending;
96
97        queue_infos.push(json!({
98            "name": queue,
99            "success": success,
100            "failed": failed,
101            "retry": retry,
102            "pending": pending
103        }));
104    }
105
106    // Prepare context
107    let mut ctx = Context::new();
108    ctx.insert("title", "All Queues");
109    ctx.insert("queues", &queue_infos);
110
111    ctx.insert("stats", &json!({
112        "success_jobs": total_success,
113        "failed_jobs": total_failed,
114        "retry_jobs": total_retry,
115        "pending_jobs": total_pending,
116    }));
117
118    // Fix query object structure for template
119    ctx.insert("query", &json!({
120        "search": query.search.clone().unwrap_or_default()
121    }));
122
123    ctx.insert("page", &json!({
124        "current": pagination.page,
125        "start": pagination.offset() + 1,
126        "end": (pagination.offset() + pagination.limit).min(pagination.total),
127        "total": pagination.total,
128        "has_prev": pagination.has_prev,
129        "has_next": pagination.has_next,
130        "query": format!("search={}", query.search.clone().unwrap_or_default())
131    }));
132
133    render_template("metrics.html.tera", ctx).await
134}
135
136
137
138pub async fn render_metrics_for_queue(
139    path: web::Path<String>,
140    query: web::Query<PaginationQuery>,
141) -> impl Responder {
142    let queue = path.into_inner();
143
144    let mut conn = match get_redis_connection().await {
145        Ok(c) => c,
146        Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
147    };
148
149    let key = format!("xsm:queue:{}", queue);
150
151    let all_jobs: Vec<String> = match conn.lrange(&key, 0, -1).await {
152        Ok(jobs) => jobs,
153        Err(_) => vec![], // fallback to empty if queue not found
154    };
155
156    let page = query.page.unwrap_or(DEFAULT_PAGE);
157    let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
158    let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
159
160    // 🟫 5. Collect job info
161    let mut job_infos: Vec<JobInfo> = Vec::new();
162    for job_id in job_ids {
163        match fetch_job_info(&job_id).await {
164            Ok(Some(info)) => job_infos.push(info),
165            Ok(None) => {
166                tracing::warn!("Job info not found for ID: {}", job_id);
167            }
168            Err(e) => {
169                tracing::error!("Failed to fetch job info for ID {}: {:?}", job_id, e);
170            }
171        }
172    }
173
174    let mut ctx = Context::new();
175    ctx.insert("title", &format!("Queue: {}", queue));
176    ctx.insert("queue", &queue);
177    ctx.insert("jobs", &job_infos);
178    ctx.insert("page", &serde_json::json!({
179        "current": pagination.page,
180        "start": pagination.offset() + 1,
181        "end": (pagination.offset() + pagination.limit).min(pagination.total),
182        "total": pagination.total,
183        "has_prev": pagination.has_prev,
184        "has_next": pagination.has_next,
185        "query": "" // Add filters if any, like queue name or status
186    }));
187
188    render_template("queue_metrics.html.tera", ctx).await
189}
190
191
192
193
194pub async fn render_scheduled_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
195    let mut conn = match get_redis_connection().await {
196        Ok(c) => c,
197        Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
198    };
199
200    let now = Utc::now().timestamp();
201    let job_ids: Vec<String> = conn
202        .zrangebyscore("xsm:delayed", 0, now)
203        .await
204        .unwrap_or_default();
205
206    let mut job_infos = Vec::new();
207
208    for jid in job_ids {
209        if let Ok(data) = conn.get::<_, String>(format!("xsm:job:{}", jid)).await {
210            if let Some(job) = deserialize_job(data).await {
211                job_infos.push(to_job_info(&job, &jid)); // ✅ Fixed: provide both arguments
212            }
213        }
214    }
215
216    let page = query.page.unwrap_or(DEFAULT_PAGE);
217    let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
218    let total = job_infos.len();
219    let pagination = Pagination::new(page, limit, total);
220
221    let start = pagination.offset();
222    let end = (start + limit).min(total);
223    let paginated_job_infos = &job_infos[start..end];
224
225    let mut ctx = Context::new();
226    ctx.insert("title", "Scheduled Jobs");
227    ctx.insert("jobs", &paginated_job_infos); // ✅ Safe for Tera (JobInfo implements Serialize)
228    ctx.insert("pagination", &pagination);
229
230    render_template("scheduled_jobs.html.tera", ctx).await
231}
232
233
234
235
236pub async fn render_delayed_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
237    let mut conn = match get_redis_connection().await {
238        Ok(c) => c,
239        Err(_) => return HttpResponse::InternalServerError().body("Redis connection failed"),
240    };
241
242    // 1. Get all delayed job IDs
243    let job_ids: Vec<String> = match conn.zrange(DELAYED_JOBS_KEY, 0, -1).await {
244        Ok(ids) => ids,
245        Err(_) => vec![],
246    };
247
248    // 2. Fetch job info for each valid ID
249    let mut job_infos = Vec::new();
250    for jid in job_ids {
251        match fetch_job_info(&jid).await {
252            Ok(Some(info)) => job_infos.push(info),
253            Ok(None) => {
254                tracing::warn!("❌ Delayed job not found in Redis for ID: {}", jid);
255            }
256            Err(err) => {
257                tracing::error!("❌ Failed to fetch delayed job ID {}: {:?}", jid, err);
258            }
259        }
260    }
261
262    // 3. Pagination logic
263    let page = query.page.unwrap_or(DEFAULT_PAGE);
264    let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
265    let total = job_infos.len();
266    let pagination = Pagination::new(page, limit, total);
267
268    let start = pagination.offset();
269    let end = (start + limit).min(total);
270    let paginated = &job_infos[start..end];
271
272    // 4. Render Tera template
273    let mut ctx = Context::new();
274    ctx.insert("title", "All Delayed Jobs");
275    ctx.insert("jobs", &paginated);
276    ctx.insert("page", &serde_json::json!({
277        "current": pagination.page,
278        "start": start + 1,
279        "end": end,
280        "total": total,
281        "has_prev": pagination.has_prev,
282        "has_next": pagination.has_next,
283        "query": ""
284    }));
285
286    render_template("delayed_jobs.html.tera", ctx).await
287}
288
289
290
291pub async fn render_dead_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
292    // 1. Connect to Redis
293    let mut conn = match get_redis_connection().await {
294        Ok(c) => c,
295        Err(e) => {
296            tracing::error!("Redis connection failed: {:?}", e);
297            return HttpResponse::InternalServerError().body("Redis connection error");
298        }
299    };
300
301    // 2. Fetch all failed job IDs
302    let all_jobs: Vec<String> = match conn.lrange("xsm:failed_jobs", 0, -1).await {
303        Ok(jobs) => jobs,
304        Err(e) => {
305            tracing::error!("Failed to read failed_jobs list: {:?}", e);
306            vec![]
307        }
308    };
309
310    // 3. Apply pagination
311    let page = query.page.unwrap_or(DEFAULT_PAGE);
312    let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
313    let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
314
315    // 4. Fetch job details
316    let mut job_infos: Vec<JobInfo> = Vec::new();
317    for job_id in job_ids {
318        match fetch_job_info(&job_id).await {
319            Ok(Some(info)) => job_infos.push(info),
320            Ok(None) => {
321                tracing::warn!("Job info not found for ID: {}", job_id);
322            }
323            Err(e) => {
324                tracing::error!("Error fetching job info for ID {}: {:?}", job_id, e);
325            }
326        }
327    }
328
329    // 5. Render using Tera template
330    let mut ctx = Context::new();
331    ctx.insert("title", "Dead Jobs");
332    ctx.insert("jobs", &job_infos);
333    ctx.insert("page", &json!({
334        "current": pagination.page,
335        "start": pagination.offset() + 1,
336        "end": (pagination.offset() + pagination.limit).min(pagination.total),
337        "total": pagination.total,
338        "has_prev": pagination.has_prev,
339        "has_next": pagination.has_next,
340        "query": ""
341    }));
342
343    render_template("dead_jobs.html.tera", ctx).await
344}
345
346
347
348#[derive(Debug, Serialize, Deserialize)]
349struct WorkerStatus {
350    id: String,
351    queues: Vec<String>,
352    last_seen: String,
353    hostname: String,
354    pid: u32,
355}
356
357pub async fn render_worker_status() -> impl Responder {
358    let mut conn = match get_redis_connection().await {
359        Ok(c) => c,
360        Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
361    };
362
363    let keys: Vec<String> = conn.keys("xsm:worker:*").await.unwrap_or_default();
364    let mut workers = Vec::new();
365
366    for key in keys {
367        if let Ok(status_json) = conn.get::<_, String>(&key).await {
368            if let Ok(mut status) = serde_json::from_str::<WorkerStatus>(&status_json) {
369                status.id = key.replace("xsm:worker:", "");
370                workers.push(status);
371            }
372        }
373    }
374
375    let mut ctx = Context::new();
376    ctx.insert("title", "Worker Status");
377    ctx.insert("workers", &workers);
378
379    render_template("workers.html.tera", ctx).await
380}
381
382
383
384pub async fn job_action(payload: web::Json<serde_json::Value>) -> impl Responder {
385    let mut conn = match get_redis_connection().await {
386        Ok(c) => c,
387        Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
388    };
389
390    let action = payload.get("action").and_then(|a| a.as_str()).unwrap_or("");
391    let job_id = payload.get("job_id").and_then(|j| j.as_str()).unwrap_or("");
392    let job_key = format!("xsm:job:{}", job_id);
393
394    match action {
395        "delete" => {
396            if conn.exists(&job_key).await.unwrap_or(false) {
397                let _: () = conn.del(&job_key).await.unwrap_or_default();
398                let _: () = conn.lpush("xsm:logs:default", format!("[{}] 🗑️ Job {} deleted", Utc::now(), job_id)).await.unwrap_or_default();
399                let _: () = conn.ltrim("xsm:logs:default", 0, 99).await.unwrap_or_default();
400                HttpResponse::Ok().json(json!({"status": "deleted"}))
401            } else {
402                HttpResponse::NotFound().json(json!({"error": "job not found"}))
403            }
404        },
405        "retry" | "queue" => {
406            let exists: bool = conn.exists(&job_key).await.unwrap_or(false);
407            if !exists {
408                return HttpResponse::NotFound().json(json!({ "error": "job not found" }));
409            }
410
411            // Fetch queue name from the job
412            let queue: String = conn.hget(&job_key, "queue").await.unwrap_or_else(|_| "default".to_string());
413            let queue_key = format!("{PREFIX_QUEUE}:{}", queue);
414
415            // Always remove job from delayed_jobs if present
416            let _: () = conn.zrem(DELAYED_JOBS_KEY, &job_id).await.unwrap_or_default();
417
418            // Also remove it from the Redis queue (avoid duplicates)
419            let _: () = conn.lrem(&queue_key, 0, &job_id).await.unwrap_or_default();
420
421            // Push job back to the correct queue
422            let _: () = conn.rpush(&queue_key, &job_id).await.unwrap_or_default();
423
424            // Update job metadata
425            let _: () = conn.hset_multiple(&job_key, &[
426                ("status", "queued"),
427                ("retry_at", &Utc::now().to_rfc3339()),
428            ]).await.unwrap_or_default();
429
430            // Remove any stale timestamps
431            let _: () = conn.hdel(&job_key, &["failed_at", "completed_at", "run_at"]).await.unwrap_or_default();
432
433            HttpResponse::Ok().json(json!({ "status": "retried", "queue": queue }))
434        }
435
436        _ => HttpResponse::BadRequest().json(json!({"error": "invalid action"})),
437    }
438}
439
440
441
442pub async fn get_metrics_summary() -> impl Responder {
443    let mut conn = match get_redis_connection().await {
444        Ok(c) => c,
445        Err(_) => return HttpResponse::InternalServerError().body("Redis error"),
446    };
447
448    let total_jobs: usize = conn.get("xsm:qrush:total_jobs").await.unwrap_or(0);
449    let success_jobs: usize = conn.get("xsm:qrush:success").await.unwrap_or(0);
450    let failed_jobs: usize = conn.get("xsm:qrush:failed").await.unwrap_or(0);
451
452    let queues: Vec<String> = conn.smembers("xsm:queues").await.unwrap_or_default();
453    let mut scheduled_jobs = 0;
454    for queue in &queues {
455        let len: usize = conn.llen(format!("xsm:queue:{}", queue)).await.unwrap_or(0);
456        scheduled_jobs += len;
457    }
458
459    let worker_keys: Vec<String> = conn.keys("xsm:worker:*").await.unwrap_or_default();
460    let active_workers = worker_keys.len();
461
462    // Collect last 7 days stats
463    let mut chart_labels = Vec::new();
464    let mut chart_success = Vec::new();
465    let mut chart_failed = Vec::new();
466
467    for i in (0..7).rev() {
468        let day = Utc::now().date_naive() - Duration::days(i);
469        let date_str = day.format("%Y-%m-%d").to_string();
470
471        let total_key = format!("xsm:stats:jobs:{}", date_str);
472        let failed_key = format!("xsm:stats:jobs:{}:failed", date_str);
473
474        let total: usize = conn.get(&total_key).await.unwrap_or(0);
475        let failed: usize = conn.get(&failed_key).await.unwrap_or(0);
476        let success = total.saturating_sub(failed);
477
478        chart_labels.push(day.format("%a").to_string()); // "Mon", "Tue", etc.
479        chart_success.push(success);
480        chart_failed.push(failed);
481    }
482
483    let mut ctx = Context::new();
484    ctx.insert("title", "Metrics Summary");
485    ctx.insert("stats", &json!({
486        "total_jobs": total_jobs,
487        "success_jobs": success_jobs,
488        "failed_jobs": failed_jobs,
489        "scheduled_jobs": scheduled_jobs,
490        "active_workers": active_workers
491    }));
492    ctx.insert("chart", &json!({
493        "labels": chart_labels,
494        "success": chart_success,
495        "failed": chart_failed
496    }));
497
498    render_template("summary.html.tera", ctx).await
499}
500
501
502
503pub async fn export_queue_csv(path: web::Path<String>) -> impl Responder {
504    let queue = path.into_inner();
505    let mut conn = match get_redis_connection().await {
506        Ok(c) => c,
507        Err(e) => {
508            eprintln!("Redis connection error: {:?}", e);
509            return HttpResponse::InternalServerError().body("Failed to connect to Redis");
510        }
511    };
512
513    let key = format!("xsm:queue:{}", queue);
514    let jobs: Vec<String> = match conn.lrange(&key, 0, -1).await {
515        Ok(j) => j,
516        Err(e) => {
517            eprintln!("Failed to fetch jobs from Redis: {:?}", e);
518            return HttpResponse::InternalServerError().body("Failed to fetch jobs");
519        }
520    };
521
522    let mut job_infos: Vec<JobInfo> = vec![];
523
524    for (i, payload) in jobs.into_iter().enumerate() {
525        match deserialize_job(payload).await {
526            Some(job) => {
527                let id = format!("{}_{}", queue, i); // fallback ID
528                job_infos.push(to_job_info(&job, &id));
529            }
530            None => {
531                eprintln!("Failed to deserialize job at index {}", i);
532                continue;
533            }
534        }
535    }
536
537    let mut wtr = WriterBuilder::new()
538        .has_headers(true)
539        .from_writer(vec![]);
540
541    for job_info in &job_infos {
542        if let Err(e) = wtr.serialize(job_info) {
543            eprintln!("CSV serialization failed: {:?}", e);
544        }
545    }
546
547    let data = match wtr.into_inner() {
548        Ok(bytes) => bytes,
549        Err(e) => {
550            eprintln!("Failed to build CSV output: {:?}", e);
551            return HttpResponse::InternalServerError().body("Failed to generate CSV");
552        }
553    };
554
555    HttpResponse::Ok()
556        .content_type("text/csv")
557        .append_header(("Content-Disposition", format!("attachment; filename=queue_{}.csv", queue)))
558        .body(data)
559}
560
561
562
563
564
565
566
567
568
569pub async fn render_failed_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
570    let mut conn = match get_redis_connection().await {
571        Ok(c) => c,
572        Err(e) => {
573            tracing::error!("Redis connection failed (failed jobs): {:?}", e);
574            return HttpResponse::InternalServerError().body("Redis connection error");
575        }
576    };
577
578    // Collect all queues
579    let queues: Vec<String> = conn.smembers("xsm:queues").await.unwrap_or_default();
580
581    // Aggregate failed job IDs from all queues
582    let mut all_jobs: Vec<String> = Vec::new();
583    for queue in &queues {
584        let key = format!("xsm:failed:{}", queue);
585        let ids: Vec<String> = conn.lrange(&key, 0, -1).await.unwrap_or_default();
586        all_jobs.extend(ids);
587    }
588
589    // Optional: dedupe IDs
590    let all_jobs: Vec<String> = {
591        let mut set = HashSet::new();
592        all_jobs.into_iter().filter(|id| set.insert(id.clone())).collect()
593    };
594
595    let page = query.page.unwrap_or(DEFAULT_PAGE);
596    let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
597    let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
598
599    let mut job_infos: Vec<JobInfo> = Vec::new();
600    for job_id in job_ids {
601        match fetch_job_info(&job_id).await {
602            Ok(Some(info)) => job_infos.push(info),
603            Ok(None) => {
604                tracing::warn!("Failed jobs: job info not found for ID: {}", job_id);
605            }
606            Err(e) => {
607                tracing::error!("Failed jobs: error fetching job info for {}: {:?}", job_id, e);
608            }
609        }
610    }
611
612    let mut ctx = Context::new();
613    ctx.insert("title", "Failed Jobs");
614    ctx.insert("jobs", &job_infos);
615    ctx.insert("page", &json!({
616        "current": pagination.page,
617        "start": pagination.offset() + 1,
618        "end": (pagination.offset() + pagination.limit).min(pagination.total),
619        "total": pagination.total,
620        "has_prev": pagination.has_prev,
621        "has_next": pagination.has_next,
622        "query": ""
623    }));
624
625    render_template("failed_jobs.html.tera", ctx).await
626}
627
628
629
630
631pub async fn render_retry_jobs(query: web::Query<PaginationQuery>) -> impl Responder {
632    let mut conn = match get_redis_connection().await {
633        Ok(c) => c,
634        Err(e) => {
635            tracing::error!("Redis connection failed (retry jobs): {:?}", e);
636            return HttpResponse::InternalServerError().body("Redis connection error");
637        }
638    };
639
640    // Collect all queues
641    let queues: Vec<String> = conn.smembers("xsm:queues").await.unwrap_or_default();
642
643    // Aggregate retry job IDs from all queues
644    let mut all_jobs: Vec<String> = Vec::new();
645    for queue in &queues {
646        let key = format!("xsm:retry:{}", queue);
647        let ids: Vec<String> = conn.lrange(&key, 0, -1).await.unwrap_or_default();
648        all_jobs.extend(ids);
649    }
650
651    // Optional: dedupe IDs
652    let all_jobs: Vec<String> = {
653        let mut set = HashSet::new();
654        all_jobs.into_iter().filter(|id| set.insert(id.clone())).collect()
655    };
656
657    let page = query.page.unwrap_or(DEFAULT_PAGE);
658    let limit = query.limit.unwrap_or(DEFAULT_LIMIT);
659    let (job_ids, pagination) = paginate_jobs(all_jobs, page, limit).await;
660
661    let mut job_infos: Vec<JobInfo> = Vec::new();
662    for job_id in job_ids {
663        match fetch_job_info(&job_id).await {
664            Ok(Some(info)) => job_infos.push(info),
665            Ok(None) => {
666                tracing::warn!("Retry jobs: job info not found for ID: {}", job_id);
667            }
668            Err(e) => {
669                tracing::error!("Retry jobs: error fetching job info for {}: {:?}", job_id, e);
670            }
671        }
672    }
673
674    let mut ctx = Context::new();
675    ctx.insert("title", "Retry Jobs");
676    ctx.insert("jobs", &job_infos);
677    ctx.insert("page", &json!({
678        "current": pagination.page,
679        "start": pagination.offset() + 1,
680        "end": (pagination.offset() + pagination.limit).min(pagination.total),
681        "total": pagination.total,
682        "has_prev": pagination.has_prev,
683        "has_next": pagination.has_next,
684        "query": ""
685    }));
686
687    render_template("retry_jobs.html.tera", ctx).await
688}
689
690
691