1mod error;
2mod filters;
3mod handlers;
4mod templates;
5
6use axum::{
7 Router,
8 extract::Extension,
9 routing::{get, post},
10};
11
12const JOBS_PER_PAGE: usize = 50;
13
14#[derive(Clone)]
15pub struct OxanaWebState {
16 pub storage: oxana::Storage,
17 pub catalog: oxana::Catalog,
18 pub base_path: String,
19}
20
21impl OxanaWebState {
22 pub fn new(storage: oxana::Storage, catalog: oxana::Catalog, base_path: String) -> Self {
23 Self {
24 storage,
25 catalog,
26 base_path,
27 }
28 }
29}
30
31pub fn router(state: OxanaWebState) -> Router {
32 Router::new()
33 .route("/", get(handlers::dashboard))
34 .route("/busy", get(handlers::busy))
35 .route("/queues", get(handlers::queues_list))
36 .route("/metrics", get(handlers::metrics))
37 .route("/metrics/job", get(handlers::metric_detail))
38 .route("/cron", get(handlers::cron_jobs))
39 .route("/cron/enqueue", post(handlers::enqueue_cron_job))
40 .route("/on-demand", get(handlers::on_demand_jobs))
41 .route("/on-demand/enqueue", post(handlers::enqueue_on_demand_job))
42 .route("/scheduled", get(handlers::scheduled_jobs))
43 .route("/dead", get(handlers::dead_jobs))
44 .route("/dead/revive_all", post(handlers::revive_all_dead))
45 .route("/dead/wipe", post(handlers::wipe_dead))
46 .route("/retries", get(handlers::retry_jobs))
47 .route("/retries/retry_all_now", post(handlers::retry_all_now))
48 .route("/jobs/{*job_id}", get(handlers::job_detail))
49 .route("/queues/{queue_key}", get(handlers::queue_detail))
50 .route("/enqueue", post(handlers::enqueue_job))
51 .route("/queues/{queue_key}/pause", post(handlers::pause_queue))
52 .route("/queues/{queue_key}/unpause", post(handlers::unpause_queue))
53 .route(
54 "/queues/{queue_key}/concurrency",
55 post(handlers::set_queue_concurrency),
56 )
57 .route("/queues/{queue_key}/wipe", post(handlers::wipe_queue))
58 .route(
59 "/queues/{queue_key}/jobs/{job_id}/delete",
60 post(handlers::delete_job),
61 )
62 .layer(Extension(state))
63}
64
65#[cfg(test)]
66mod tests {
67 use axum::{
68 Router,
69 body::{Body, to_bytes},
70 extract::Path,
71 http::{Request, StatusCode},
72 routing::get,
73 };
74 use tower::ServiceExt;
75
76 #[tokio::test]
77 async fn job_detail_route_captures_slash_job_ids() {
78 let app = Router::new().route(
79 "/jobs/{*job_id}",
80 get(|Path(job_id): Path<String>| async move { job_id }),
81 );
82
83 let response = app
84 .oneshot(
85 Request::builder()
86 .uri("/jobs/crate%3A%3AWorker%2Ftype-123")
87 .body(Body::empty())
88 .unwrap(),
89 )
90 .await
91 .unwrap();
92
93 assert_eq!(response.status(), StatusCode::OK);
94 let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
95 assert_eq!(&body[..], b"crate::Worker/type-123");
96 }
97
98 #[tokio::test]
99 async fn delete_job_route_captures_strictly_encoded_slash_job_ids() {
100 let app = Router::new().route(
101 "/queues/{queue_key}/jobs/{job_id}/delete",
102 get(
103 |Path((queue_key, job_id)): Path<(String, String)>| async move {
104 format!("{queue_key}:{job_id}")
105 },
106 ),
107 );
108
109 let response = app
110 .oneshot(
111 Request::builder()
112 .uri("/queues/default/jobs/crate%3A%3AWorker%2Ftype-123/delete")
113 .body(Body::empty())
114 .unwrap(),
115 )
116 .await
117 .unwrap();
118
119 assert_eq!(response.status(), StatusCode::OK);
120 let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
121 assert_eq!(&body[..], b"default:crate::Worker/type-123");
122 }
123}