1use super::{
82 ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams, SortParams,
83 with_filters, with_pagination, with_sort,
84};
85use hammerwork::queue::DatabaseQueue;
86use serde::{Deserialize, Serialize};
87use std::sync::Arc;
88use warp::{Filter, Reply};
89
90#[derive(Debug, Serialize)]
92pub struct JobInfo {
93 pub id: String,
94 pub queue_name: String,
95 pub status: String,
96 pub priority: String,
97 pub attempts: i32,
98 pub max_attempts: i32,
99 pub payload: serde_json::Value,
100 pub created_at: chrono::DateTime<chrono::Utc>,
101 pub scheduled_at: chrono::DateTime<chrono::Utc>,
102 pub started_at: Option<chrono::DateTime<chrono::Utc>>,
103 pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
104 pub failed_at: Option<chrono::DateTime<chrono::Utc>>,
105 pub error_message: Option<String>,
106 pub processing_time_ms: Option<i64>,
107 pub cron_schedule: Option<String>,
108 pub is_recurring: bool,
109 pub trace_id: Option<String>,
110 pub correlation_id: Option<String>,
111}
112
113#[derive(Debug, Deserialize, Serialize)]
115pub struct CreateJobRequest {
116 pub queue_name: String,
117 pub payload: serde_json::Value,
118 pub priority: Option<String>,
119 pub scheduled_at: Option<chrono::DateTime<chrono::Utc>>,
120 pub max_attempts: Option<i32>,
121 pub cron_schedule: Option<String>,
122 pub trace_id: Option<String>,
123 pub correlation_id: Option<String>,
124}
125
126#[derive(Debug, Deserialize)]
128pub struct JobActionRequest {
129 pub action: String, pub reason: Option<String>,
131}
132
133#[derive(Debug, Deserialize)]
135pub struct BulkJobActionRequest {
136 pub job_ids: Vec<String>,
137 pub action: String,
138 pub reason: Option<String>,
139}
140
141#[derive(Debug, Deserialize)]
143pub struct JobSearchRequest {
144 pub query: String,
145 pub queues: Option<Vec<String>>,
146 pub statuses: Option<Vec<String>>,
147 pub priorities: Option<Vec<String>>,
148}
149
150pub fn routes<T>(
152 queue: Arc<T>,
153) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
154where
155 T: DatabaseQueue + Send + Sync + 'static,
156{
157 let queue_filter = warp::any().map(move || queue.clone());
158
159 let list_jobs = warp::path("jobs")
160 .and(warp::path::end())
161 .and(warp::get())
162 .and(queue_filter.clone())
163 .and(with_pagination())
164 .and(with_filters())
165 .and(with_sort())
166 .and_then(list_jobs_handler);
167
168 let create_job = warp::path("jobs")
169 .and(warp::path::end())
170 .and(warp::post())
171 .and(queue_filter.clone())
172 .and(warp::body::json())
173 .and_then(create_job_handler);
174
175 let get_job = warp::path("jobs")
176 .and(warp::path::param::<String>())
177 .and(warp::path::end())
178 .and(warp::get())
179 .and(queue_filter.clone())
180 .and_then(get_job_handler);
181
182 let job_action = warp::path("jobs")
183 .and(warp::path::param::<String>())
184 .and(warp::path("actions"))
185 .and(warp::path::end())
186 .and(warp::post())
187 .and(queue_filter.clone())
188 .and(warp::body::json())
189 .and_then(job_action_handler);
190
191 let bulk_action = warp::path("jobs")
192 .and(warp::path("bulk"))
193 .and(warp::path::end())
194 .and(warp::post())
195 .and(queue_filter.clone())
196 .and(warp::body::json())
197 .and_then(bulk_job_action_handler);
198
199 let search_jobs = warp::path("jobs")
200 .and(warp::path("search"))
201 .and(warp::path::end())
202 .and(warp::post())
203 .and(queue_filter)
204 .and(warp::body::json())
205 .and(with_pagination())
206 .and_then(search_jobs_handler);
207
208 list_jobs
209 .or(create_job)
210 .or(get_job)
211 .or(job_action)
212 .or(bulk_action)
213 .or(search_jobs)
214}
215
216async fn list_jobs_handler<T>(
218 queue: Arc<T>,
219 pagination: PaginationParams,
220 filters: FilterParams,
221 sort: SortParams,
222) -> Result<impl Reply, warp::Rejection>
223where
224 T: DatabaseQueue + Send + Sync,
225{
226 let _ = (queue, filters, sort);
229
230 let mock_jobs = vec![JobInfo {
231 id: "job-1".to_string(),
232 queue_name: "default".to_string(),
233 status: "pending".to_string(),
234 priority: "normal".to_string(),
235 attempts: 0,
236 max_attempts: 3,
237 payload: serde_json::json!({"task": "send_email", "to": "user@example.com"}),
238 created_at: chrono::Utc::now(),
239 scheduled_at: chrono::Utc::now(),
240 started_at: None,
241 completed_at: None,
242 failed_at: None,
243 error_message: None,
244 processing_time_ms: None,
245 cron_schedule: None,
246 is_recurring: false,
247 trace_id: None,
248 correlation_id: None,
249 }];
250
251 let response = PaginatedResponse {
252 items: mock_jobs,
253 pagination: PaginationMeta::new(&pagination, 1),
254 };
255
256 Ok(warp::reply::json(&ApiResponse::success(response)))
257}
258
259async fn create_job_handler<T>(
261 queue: Arc<T>,
262 request: CreateJobRequest,
263) -> Result<impl Reply, warp::Rejection>
264where
265 T: DatabaseQueue + Send + Sync,
266{
267 use hammerwork::{Job, JobPriority};
268
269 let priority = match request.priority.as_deref() {
270 Some("background") => JobPriority::Background,
271 Some("low") => JobPriority::Low,
272 Some("normal") => JobPriority::Normal,
273 Some("high") => JobPriority::High,
274 Some("critical") => JobPriority::Critical,
275 _ => JobPriority::Normal,
276 };
277
278 let mut job = Job::new(request.queue_name, request.payload).with_priority(priority);
279
280 if let Some(scheduled_at) = request.scheduled_at {
281 job.scheduled_at = scheduled_at;
282 }
283
284 if let Some(max_attempts) = request.max_attempts {
285 job = job.with_max_attempts(max_attempts);
286 }
287
288 if let Some(trace_id) = request.trace_id {
289 job.trace_id = Some(trace_id);
290 }
291
292 if let Some(correlation_id) = request.correlation_id {
293 job.correlation_id = Some(correlation_id);
294 }
295
296 match queue.enqueue(job).await {
297 Ok(job_id) => {
298 let response = ApiResponse::success(serde_json::json!({
299 "message": "Job created successfully",
300 "job_id": job_id.to_string()
301 }));
302 Ok(warp::reply::json(&response))
303 }
304 Err(e) => {
305 let response = ApiResponse::<()>::error(format!("Failed to create job: {}", e));
306 Ok(warp::reply::json(&response))
307 }
308 }
309}
310
311async fn get_job_handler<T>(job_id: String, queue: Arc<T>) -> Result<impl Reply, warp::Rejection>
313where
314 T: DatabaseQueue + Send + Sync,
315{
316 let job_uuid = match uuid::Uuid::parse_str(&job_id) {
317 Ok(uuid) => uuid,
318 Err(_) => {
319 let response = ApiResponse::<()>::error("Invalid job ID format".to_string());
320 return Ok(warp::reply::json(&response));
321 }
322 };
323
324 match queue.get_job(job_uuid).await {
325 Ok(Some(job)) => {
326 let job_info = JobInfo {
327 id: job.id.to_string(),
328 queue_name: job.queue_name.clone(),
329 status: format!("{:?}", job.status),
330 priority: format!("{:?}", job.priority),
331 attempts: job.attempts,
332 max_attempts: job.max_attempts,
333 payload: job.payload.clone(),
334 created_at: job.created_at,
335 scheduled_at: job.scheduled_at,
336 started_at: job.started_at,
337 completed_at: job.completed_at,
338 failed_at: job.failed_at,
339 error_message: job.error_message.clone(),
340 processing_time_ms: job.started_at.and_then(|start| {
341 job.completed_at
342 .or(job.failed_at)
343 .or(job.timed_out_at)
344 .map(|end| (end - start).num_milliseconds())
345 }),
346 cron_schedule: job.cron_schedule.clone(),
347 is_recurring: job.is_recurring(),
348 trace_id: job.trace_id.clone(),
349 correlation_id: job.correlation_id.clone(),
350 };
351
352 Ok(warp::reply::json(&ApiResponse::success(job_info)))
353 }
354 Ok(None) => {
355 let response = ApiResponse::<()>::error(format!("Job '{}' not found", job_id));
356 Ok(warp::reply::json(&response))
357 }
358 Err(e) => {
359 let response = ApiResponse::<()>::error(format!("Failed to get job: {}", e));
360 Ok(warp::reply::json(&response))
361 }
362 }
363}
364
365async fn job_action_handler<T>(
367 job_id: String,
368 queue: Arc<T>,
369 action_request: JobActionRequest,
370) -> Result<impl Reply, warp::Rejection>
371where
372 T: DatabaseQueue + Send + Sync,
373{
374 let job_uuid = match uuid::Uuid::parse_str(&job_id) {
375 Ok(uuid) => uuid,
376 Err(_) => {
377 let response = ApiResponse::<()>::error("Invalid job ID format".to_string());
378 return Ok(warp::reply::json(&response));
379 }
380 };
381
382 match action_request.action.as_str() {
383 "retry" => match queue.retry_job(job_uuid, chrono::Utc::now()).await {
384 Ok(()) => {
385 let response = ApiResponse::success(serde_json::json!({
386 "message": format!("Job '{}' scheduled for retry", job_id)
387 }));
388 Ok(warp::reply::json(&response))
389 }
390 Err(e) => {
391 let response = ApiResponse::<()>::error(format!("Failed to retry job: {}", e));
392 Ok(warp::reply::json(&response))
393 }
394 },
395 "cancel" | "delete" => match queue.delete_job(job_uuid).await {
396 Ok(()) => {
397 let response = ApiResponse::success(serde_json::json!({
398 "message": format!("Job '{}' deleted", job_id)
399 }));
400 Ok(warp::reply::json(&response))
401 }
402 Err(e) => {
403 let response = ApiResponse::<()>::error(format!("Failed to delete job: {}", e));
404 Ok(warp::reply::json(&response))
405 }
406 },
407 _ => {
408 let response =
409 ApiResponse::<()>::error(format!("Unknown action: {}", action_request.action));
410 Ok(warp::reply::json(&response))
411 }
412 }
413}
414
415async fn bulk_job_action_handler<T>(
417 queue: Arc<T>,
418 request: BulkJobActionRequest,
419) -> Result<impl Reply, warp::Rejection>
420where
421 T: DatabaseQueue + Send + Sync,
422{
423 let mut successful = 0;
424 let mut failed = 0;
425 let mut errors = Vec::new();
426
427 for job_id_str in &request.job_ids {
428 let job_uuid = match uuid::Uuid::parse_str(job_id_str) {
429 Ok(uuid) => uuid,
430 Err(_) => {
431 failed += 1;
432 errors.push(format!("Invalid job ID: {}", job_id_str));
433 continue;
434 }
435 };
436
437 let result = match request.action.as_str() {
438 "retry" => queue.retry_job(job_uuid, chrono::Utc::now()).await,
439 "delete" => queue.delete_job(job_uuid).await,
440 _ => {
441 failed += 1;
442 errors.push(format!("Unknown action: {}", request.action));
443 continue;
444 }
445 };
446
447 match result {
448 Ok(()) => successful += 1,
449 Err(e) => {
450 failed += 1;
451 errors.push(format!("Job {}: {}", job_id_str, e));
452 }
453 }
454 }
455
456 let response = ApiResponse::success(serde_json::json!({
457 "successful": successful,
458 "failed": failed,
459 "errors": errors,
460 "message": format!("Bulk {} completed: {} successful, {} failed", request.action, successful, failed)
461 }));
462
463 Ok(warp::reply::json(&response))
464}
465
466async fn search_jobs_handler<T>(
468 queue: Arc<T>,
469 search_request: JobSearchRequest,
470 pagination: PaginationParams,
471) -> Result<impl Reply, warp::Rejection>
472where
473 T: DatabaseQueue + Send + Sync,
474{
475 let _ = (queue, search_request);
478
479 let response = PaginatedResponse {
480 items: Vec::<JobInfo>::new(),
481 pagination: PaginationMeta::new(&pagination, 0),
482 };
483
484 Ok(warp::reply::json(&ApiResponse::success(response)))
485}
486
487#[cfg(test)]
488mod tests {
489 use super::*;
490
491 #[test]
492 fn test_create_job_request_deserialization() {
493 let json = r#"{
494 "queue_name": "email",
495 "payload": {"to": "user@example.com", "subject": "Hello"},
496 "priority": "high",
497 "max_attempts": 5
498 }"#;
499
500 let request: CreateJobRequest = serde_json::from_str(json).unwrap();
501 assert_eq!(request.queue_name, "email");
502 assert_eq!(request.priority, Some("high".to_string()));
503 assert_eq!(request.max_attempts, Some(5));
504 }
505
506 #[test]
507 fn test_job_action_request_deserialization() {
508 let json = r#"{"action": "retry", "reason": "Network error resolved"}"#;
509 let request: JobActionRequest = serde_json::from_str(json).unwrap();
510 assert_eq!(request.action, "retry");
511 assert_eq!(request.reason, Some("Network error resolved".to_string()));
512 }
513
514 #[test]
515 fn test_bulk_job_action_request() {
516 let json = r#"{
517 "job_ids": ["job-1", "job-2", "job-3"],
518 "action": "delete",
519 "reason": "Cleanup old jobs"
520 }"#;
521
522 let request: BulkJobActionRequest = serde_json::from_str(json).unwrap();
523 assert_eq!(request.job_ids.len(), 3);
524 assert_eq!(request.action, "delete");
525 }
526}