1use super::{
82 ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams, SortParams,
83 with_filters, with_pagination, with_sort,
84};
85use hammerwork::queue::DatabaseQueue;
86use serde::{Deserialize, Serialize};
87use std::sync::Arc;
88use warp::{Filter, Reply};
89
90#[derive(Debug, Serialize)]
92pub struct JobInfo {
93 pub id: String,
94 pub queue_name: String,
95 pub status: String,
96 pub priority: String,
97 pub attempts: i32,
98 pub max_attempts: i32,
99 pub payload: serde_json::Value,
100 pub created_at: chrono::DateTime<chrono::Utc>,
101 pub scheduled_at: chrono::DateTime<chrono::Utc>,
102 pub started_at: Option<chrono::DateTime<chrono::Utc>>,
103 pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
104 pub failed_at: Option<chrono::DateTime<chrono::Utc>>,
105 pub error_message: Option<String>,
106 pub processing_time_ms: Option<i64>,
107 pub cron_schedule: Option<String>,
108 pub is_recurring: bool,
109 pub trace_id: Option<String>,
110 pub correlation_id: Option<String>,
111}
112
113#[derive(Debug, Deserialize, Serialize)]
115pub struct CreateJobRequest {
116 pub queue_name: String,
117 pub payload: serde_json::Value,
118 pub priority: Option<String>,
119 pub scheduled_at: Option<chrono::DateTime<chrono::Utc>>,
120 pub max_attempts: Option<i32>,
121 pub cron_schedule: Option<String>,
122 pub trace_id: Option<String>,
123 pub correlation_id: Option<String>,
124}
125
126#[derive(Debug, Deserialize)]
128pub struct JobActionRequest {
129 pub action: String, pub reason: Option<String>,
131}
132
133#[derive(Debug, Deserialize)]
135pub struct BulkJobActionRequest {
136 pub job_ids: Vec<String>,
137 pub action: String,
138 pub reason: Option<String>,
139}
140
141#[derive(Debug, Deserialize)]
143pub struct JobSearchRequest {
144 pub query: String,
145 pub queues: Option<Vec<String>>,
146 pub statuses: Option<Vec<String>>,
147 pub priorities: Option<Vec<String>>,
148 pub created_after: Option<chrono::DateTime<chrono::Utc>>,
149 pub created_before: Option<chrono::DateTime<chrono::Utc>>,
150}
151
152pub fn routes<T>(
154 queue: Arc<T>,
155) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
156where
157 T: DatabaseQueue + Send + Sync + 'static,
158{
159 let queue_filter = warp::any().map(move || queue.clone());
160
161 let list_jobs = warp::path("jobs")
162 .and(warp::path::end())
163 .and(warp::get())
164 .and(queue_filter.clone())
165 .and(with_pagination())
166 .and(with_filters())
167 .and(with_sort())
168 .and_then(list_jobs_handler);
169
170 let create_job = warp::path("jobs")
171 .and(warp::path::end())
172 .and(warp::post())
173 .and(queue_filter.clone())
174 .and(warp::body::json())
175 .and_then(create_job_handler);
176
177 let get_job = warp::path("jobs")
178 .and(warp::path::param::<String>())
179 .and(warp::path::end())
180 .and(warp::get())
181 .and(queue_filter.clone())
182 .and_then(get_job_handler);
183
184 let job_action = warp::path("jobs")
185 .and(warp::path::param::<String>())
186 .and(warp::path("actions"))
187 .and(warp::path::end())
188 .and(warp::post())
189 .and(queue_filter.clone())
190 .and(warp::body::json())
191 .and_then(job_action_handler);
192
193 let bulk_action = warp::path("jobs")
194 .and(warp::path("bulk"))
195 .and(warp::path::end())
196 .and(warp::post())
197 .and(queue_filter.clone())
198 .and(warp::body::json())
199 .and_then(bulk_job_action_handler);
200
201 let search_jobs = warp::path("jobs")
202 .and(warp::path("search"))
203 .and(warp::path::end())
204 .and(warp::post())
205 .and(queue_filter)
206 .and(warp::body::json())
207 .and(with_pagination())
208 .and_then(search_jobs_handler);
209
210 list_jobs
211 .or(create_job)
212 .or(get_job)
213 .or(job_action)
214 .or(bulk_action)
215 .or(search_jobs)
216}
217
218async fn list_jobs_handler<T>(
220 queue: Arc<T>,
221 pagination: PaginationParams,
222 filters: FilterParams,
223 sort: SortParams,
224) -> Result<impl Reply, warp::Rejection>
225where
226 T: DatabaseQueue + Send + Sync,
227{
228 let mut all_jobs = Vec::new();
231
232 let queue_stats = match queue.get_all_queue_stats().await {
234 Ok(stats) => stats,
235 Err(e) => {
236 let response = ApiResponse::<()>::error(format!("Failed to get queue stats: {}", e));
237 return Ok(warp::reply::json(&response));
238 }
239 };
240
241 let target_queues: Vec<String> = if let Some(ref queue_name) = filters.queue {
243 vec![queue_name.clone()]
244 } else {
245 queue_stats.iter().map(|s| s.queue_name.clone()).collect()
246 };
247
248 for queue_name in &target_queues {
250 let mut queue_jobs = Vec::new();
251
252 if filters.status.is_none() || filters.status.as_ref().unwrap().to_lowercase() == "pending"
254 {
255 if let Ok(ready_jobs) = queue.get_ready_jobs(&queue_name, 100).await {
257 queue_jobs.extend(ready_jobs);
258 }
259 }
260
261 if filters.status.is_none()
262 || filters.status.as_ref().unwrap().to_lowercase() == "failed"
263 || filters.status.as_ref().unwrap().to_lowercase() == "dead"
264 {
265 if let Ok(dead_jobs) = queue
267 .get_dead_jobs_by_queue(&queue_name, Some(100), Some(0))
268 .await
269 {
270 queue_jobs.extend(dead_jobs);
271 }
272 }
273
274 if filters.status.is_none()
275 || filters.status.as_ref().unwrap().to_lowercase() == "recurring"
276 {
277 if let Ok(recurring_jobs) = queue.get_recurring_jobs(&queue_name).await {
279 queue_jobs.extend(recurring_jobs);
280 }
281 }
282
283 for job in queue_jobs {
284 let processing_time_ms = match (job.started_at, job.completed_at) {
285 (Some(started), Some(completed)) => Some((completed - started).num_milliseconds()),
286 _ => None,
287 };
288
289 let job_info = JobInfo {
290 id: job.id.to_string(),
291 queue_name: job.queue_name.clone(),
292 status: job.status.as_str().to_string(),
293 priority: format!("{:?}", job.priority),
294 attempts: job.attempts as i32,
295 max_attempts: job.max_attempts as i32,
296 payload: job.payload.clone(),
297 created_at: job.created_at,
298 scheduled_at: job.scheduled_at,
299 started_at: job.started_at,
300 completed_at: job.completed_at,
301 failed_at: job.failed_at,
302 error_message: job.error_message.clone(),
303 processing_time_ms,
304 cron_schedule: job.cron_schedule.as_ref().map(|c| c.to_string()),
305 is_recurring: job.is_recurring(),
306 trace_id: job.trace_id.clone(),
307 correlation_id: job.correlation_id.clone(),
308 };
309
310 if let Some(ref status) = filters.status {
312 if job_info.status.to_lowercase() != status.to_lowercase() {
313 continue;
314 }
315 }
316
317 if let Some(ref priority) = filters.priority {
319 if job_info.priority.to_lowercase() != priority.to_lowercase() {
320 continue;
321 }
322 }
323
324 all_jobs.push(job_info);
325 }
326 }
327
328 match sort.sort_by.as_deref() {
330 Some("created_at") => {
331 all_jobs.sort_by(|a, b| {
332 if sort.sort_order.as_deref() == Some("asc") {
333 a.created_at.cmp(&b.created_at)
334 } else {
335 b.created_at.cmp(&a.created_at)
336 }
337 });
338 }
339 Some("scheduled_at") => {
340 all_jobs.sort_by(|a, b| {
341 if sort.sort_order.as_deref() == Some("asc") {
342 a.scheduled_at.cmp(&b.scheduled_at)
343 } else {
344 b.scheduled_at.cmp(&a.scheduled_at)
345 }
346 });
347 }
348 Some("priority") => {
349 all_jobs.sort_by(|a, b| {
350 if sort.sort_order.as_deref() == Some("asc") {
351 a.priority.cmp(&b.priority)
352 } else {
353 b.priority.cmp(&a.priority)
354 }
355 });
356 }
357 _ => {
358 all_jobs.sort_by(|a, b| b.created_at.cmp(&a.created_at));
360 }
361 }
362
363 let total_count = all_jobs.len() as u64;
365 let page_size = pagination.limit.unwrap_or(20).min(100) as usize;
366 let page = pagination.page.unwrap_or(1).max(1) as usize;
367 let offset = (page - 1) * page_size;
368
369 let paginated_jobs: Vec<JobInfo> = all_jobs.into_iter().skip(offset).take(page_size).collect();
370
371 let response = PaginatedResponse {
372 items: paginated_jobs,
373 pagination: PaginationMeta::new(&pagination, total_count),
374 };
375
376 Ok(warp::reply::json(&ApiResponse::success(response)))
377}
378
379async fn create_job_handler<T>(
381 queue: Arc<T>,
382 request: CreateJobRequest,
383) -> Result<impl Reply, warp::Rejection>
384where
385 T: DatabaseQueue + Send + Sync,
386{
387 use hammerwork::{Job, JobPriority};
388
389 let priority = match request.priority.as_deref() {
390 Some("background") => JobPriority::Background,
391 Some("low") => JobPriority::Low,
392 Some("normal") => JobPriority::Normal,
393 Some("high") => JobPriority::High,
394 Some("critical") => JobPriority::Critical,
395 _ => JobPriority::Normal,
396 };
397
398 let mut job = Job::new(request.queue_name, request.payload).with_priority(priority);
399
400 if let Some(scheduled_at) = request.scheduled_at {
401 job.scheduled_at = scheduled_at;
402 }
403
404 if let Some(max_attempts) = request.max_attempts {
405 job = job.with_max_attempts(max_attempts);
406 }
407
408 if let Some(trace_id) = request.trace_id {
409 job.trace_id = Some(trace_id);
410 }
411
412 if let Some(correlation_id) = request.correlation_id {
413 job.correlation_id = Some(correlation_id);
414 }
415
416 match queue.enqueue(job).await {
417 Ok(job_id) => {
418 let response = ApiResponse::success(serde_json::json!({
419 "message": "Job created successfully",
420 "job_id": job_id.to_string()
421 }));
422 Ok(warp::reply::json(&response))
423 }
424 Err(e) => {
425 let response = ApiResponse::<()>::error(format!("Failed to create job: {}", e));
426 Ok(warp::reply::json(&response))
427 }
428 }
429}
430
431async fn get_job_handler<T>(job_id: String, queue: Arc<T>) -> Result<impl Reply, warp::Rejection>
433where
434 T: DatabaseQueue + Send + Sync,
435{
436 let job_uuid = match uuid::Uuid::parse_str(&job_id) {
437 Ok(uuid) => uuid,
438 Err(_) => {
439 let response = ApiResponse::<()>::error("Invalid job ID format".to_string());
440 return Ok(warp::reply::json(&response));
441 }
442 };
443
444 match queue.get_job(job_uuid).await {
445 Ok(Some(job)) => {
446 let job_info = JobInfo {
447 id: job.id.to_string(),
448 queue_name: job.queue_name.clone(),
449 status: format!("{:?}", job.status),
450 priority: format!("{:?}", job.priority),
451 attempts: job.attempts,
452 max_attempts: job.max_attempts,
453 payload: job.payload.clone(),
454 created_at: job.created_at,
455 scheduled_at: job.scheduled_at,
456 started_at: job.started_at,
457 completed_at: job.completed_at,
458 failed_at: job.failed_at,
459 error_message: job.error_message.clone(),
460 processing_time_ms: job.started_at.and_then(|start| {
461 job.completed_at
462 .or(job.failed_at)
463 .or(job.timed_out_at)
464 .map(|end| (end - start).num_milliseconds())
465 }),
466 cron_schedule: job.cron_schedule.clone(),
467 is_recurring: job.is_recurring(),
468 trace_id: job.trace_id.clone(),
469 correlation_id: job.correlation_id.clone(),
470 };
471
472 Ok(warp::reply::json(&ApiResponse::success(job_info)))
473 }
474 Ok(None) => {
475 let response = ApiResponse::<()>::error(format!("Job '{}' not found", job_id));
476 Ok(warp::reply::json(&response))
477 }
478 Err(e) => {
479 let response = ApiResponse::<()>::error(format!("Failed to get job: {}", e));
480 Ok(warp::reply::json(&response))
481 }
482 }
483}
484
485async fn job_action_handler<T>(
487 job_id: String,
488 queue: Arc<T>,
489 action_request: JobActionRequest,
490) -> Result<impl Reply, warp::Rejection>
491where
492 T: DatabaseQueue + Send + Sync,
493{
494 let job_uuid = match uuid::Uuid::parse_str(&job_id) {
495 Ok(uuid) => uuid,
496 Err(_) => {
497 let response = ApiResponse::<()>::error("Invalid job ID format".to_string());
498 return Ok(warp::reply::json(&response));
499 }
500 };
501
502 match action_request.action.as_str() {
503 "retry" => match queue.retry_job(job_uuid, chrono::Utc::now()).await {
504 Ok(()) => {
505 let response = ApiResponse::success(serde_json::json!({
506 "message": format!("Job '{}' scheduled for retry", job_id)
507 }));
508 Ok(warp::reply::json(&response))
509 }
510 Err(e) => {
511 let response = ApiResponse::<()>::error(format!("Failed to retry job: {}", e));
512 Ok(warp::reply::json(&response))
513 }
514 },
515 "cancel" | "delete" => match queue.delete_job(job_uuid).await {
516 Ok(()) => {
517 let response = ApiResponse::success(serde_json::json!({
518 "message": format!("Job '{}' deleted", job_id)
519 }));
520 Ok(warp::reply::json(&response))
521 }
522 Err(e) => {
523 let response = ApiResponse::<()>::error(format!("Failed to delete job: {}", e));
524 Ok(warp::reply::json(&response))
525 }
526 },
527 _ => {
528 let response =
529 ApiResponse::<()>::error(format!("Unknown action: {}", action_request.action));
530 Ok(warp::reply::json(&response))
531 }
532 }
533}
534
535async fn bulk_job_action_handler<T>(
537 queue: Arc<T>,
538 request: BulkJobActionRequest,
539) -> Result<impl Reply, warp::Rejection>
540where
541 T: DatabaseQueue + Send + Sync,
542{
543 let mut successful = 0;
544 let mut failed = 0;
545 let mut errors = Vec::new();
546
547 for job_id_str in &request.job_ids {
548 let job_uuid = match uuid::Uuid::parse_str(job_id_str) {
549 Ok(uuid) => uuid,
550 Err(_) => {
551 failed += 1;
552 errors.push(format!("Invalid job ID: {}", job_id_str));
553 continue;
554 }
555 };
556
557 let result = match request.action.as_str() {
558 "retry" => queue.retry_job(job_uuid, chrono::Utc::now()).await,
559 "delete" => queue.delete_job(job_uuid).await,
560 _ => {
561 failed += 1;
562 errors.push(format!("Unknown action: {}", request.action));
563 continue;
564 }
565 };
566
567 match result {
568 Ok(()) => successful += 1,
569 Err(e) => {
570 failed += 1;
571 errors.push(format!("Job {}: {}", job_id_str, e));
572 }
573 }
574 }
575
576 let response = ApiResponse::success(serde_json::json!({
577 "successful": successful,
578 "failed": failed,
579 "errors": errors,
580 "message": format!("Bulk {} completed: {} successful, {} failed", request.action, successful, failed)
581 }));
582
583 Ok(warp::reply::json(&response))
584}
585
586async fn search_jobs_handler<T>(
588 queue: Arc<T>,
589 search_request: JobSearchRequest,
590 pagination: PaginationParams,
591) -> Result<impl Reply, warp::Rejection>
592where
593 T: DatabaseQueue + Send + Sync,
594{
595 let mut matching_jobs = Vec::new();
597 let search_term = search_request.query.to_lowercase();
598
599 let queue_stats = match queue.get_all_queue_stats().await {
601 Ok(stats) => stats,
602 Err(e) => {
603 let response = ApiResponse::<()>::error(format!("Failed to get queue stats: {}", e));
604 return Ok(warp::reply::json(&response));
605 }
606 };
607
608 let target_queues: Vec<String> = if let Some(ref queue_names) = search_request.queues {
610 queue_names.clone()
611 } else {
612 queue_stats.iter().map(|s| s.queue_name.clone()).collect()
613 };
614
615 for queue_name in &target_queues {
616 let mut queue_jobs = Vec::new();
617
618 if let Ok(ready_jobs) = queue.get_ready_jobs(&queue_name, 200).await {
620 queue_jobs.extend(ready_jobs);
621 }
622
623 if let Ok(dead_jobs) = queue
624 .get_dead_jobs_by_queue(&queue_name, Some(200), Some(0))
625 .await
626 {
627 queue_jobs.extend(dead_jobs);
628 }
629
630 if let Ok(recurring_jobs) = queue.get_recurring_jobs(&queue_name).await {
631 queue_jobs.extend(recurring_jobs);
632 }
633
634 for job in queue_jobs {
635 let payload_str = serde_json::to_string(&job.payload)
637 .unwrap_or_default()
638 .to_lowercase();
639 let matches_search = job.id.to_string().contains(&search_term)
640 || job.queue_name.to_lowercase().contains(&search_term)
641 || payload_str.contains(&search_term)
642 || job
643 .error_message
644 .as_ref()
645 .map(|e| e.to_lowercase().contains(&search_term))
646 .unwrap_or(false)
647 || job
648 .trace_id
649 .as_ref()
650 .map(|t| t.to_lowercase().contains(&search_term))
651 .unwrap_or(false)
652 || job
653 .correlation_id
654 .as_ref()
655 .map(|c| c.to_lowercase().contains(&search_term))
656 .unwrap_or(false);
657
658 if !matches_search {
659 continue;
660 }
661
662 if let Some(ref statuses) = search_request.statuses {
664 if !statuses
665 .iter()
666 .any(|s| s.eq_ignore_ascii_case(job.status.as_str()))
667 {
668 continue;
669 }
670 }
671
672 if let Some(ref priorities) = search_request.priorities {
674 let job_priority_str = format!("{:?}", job.priority);
675 if !priorities
676 .iter()
677 .any(|p| p.eq_ignore_ascii_case(&job_priority_str))
678 {
679 continue;
680 }
681 }
682
683 if let Some(ref created_after) = search_request.created_after {
685 if job.created_at < *created_after {
686 continue;
687 }
688 }
689
690 if let Some(ref created_before) = search_request.created_before {
691 if job.created_at > *created_before {
692 continue;
693 }
694 }
695
696 let processing_time_ms = match (job.started_at, job.completed_at) {
697 (Some(started), Some(completed)) => Some((completed - started).num_milliseconds()),
698 _ => None,
699 };
700
701 matching_jobs.push(JobInfo {
702 id: job.id.to_string(),
703 queue_name: job.queue_name.clone(),
704 status: job.status.as_str().to_string(),
705 priority: format!("{:?}", job.priority),
706 attempts: job.attempts as i32,
707 max_attempts: job.max_attempts as i32,
708 payload: job.payload.clone(),
709 created_at: job.created_at,
710 scheduled_at: job.scheduled_at,
711 started_at: job.started_at,
712 completed_at: job.completed_at,
713 failed_at: job.failed_at,
714 error_message: job.error_message.clone(),
715 processing_time_ms,
716 cron_schedule: job.cron_schedule.as_ref().map(|c| c.to_string()),
717 is_recurring: job.is_recurring(),
718 trace_id: job.trace_id.clone(),
719 correlation_id: job.correlation_id.clone(),
720 });
721 }
722
723 let recurring_jobs = match queue.get_recurring_jobs(&queue_name).await {
725 Ok(jobs) => jobs,
726 Err(e) => {
727 eprintln!(
728 "Failed to get recurring jobs for queue {}: {}",
729 queue_name, e
730 );
731 continue;
732 }
733 };
734
735 for job in recurring_jobs {
736 let payload_str = serde_json::to_string(&job.payload)
738 .unwrap_or_default()
739 .to_lowercase();
740 let matches_search = job.id.to_string().contains(&search_term)
741 || job.queue_name.to_lowercase().contains(&search_term)
742 || payload_str.contains(&search_term)
743 || job
744 .error_message
745 .as_ref()
746 .map(|e| e.to_lowercase().contains(&search_term))
747 .unwrap_or(false)
748 || job
749 .trace_id
750 .as_ref()
751 .map(|t| t.to_lowercase().contains(&search_term))
752 .unwrap_or(false)
753 || job
754 .correlation_id
755 .as_ref()
756 .map(|c| c.to_lowercase().contains(&search_term))
757 .unwrap_or(false);
758
759 if !matches_search {
760 continue;
761 }
762
763 if let Some(ref statuses) = search_request.statuses {
765 if !statuses
766 .iter()
767 .any(|s| s.eq_ignore_ascii_case(job.status.as_str()))
768 {
769 continue;
770 }
771 }
772
773 if let Some(ref priorities) = search_request.priorities {
774 let job_priority_str = format!("{:?}", job.priority);
775 if !priorities
776 .iter()
777 .any(|p| p.eq_ignore_ascii_case(&job_priority_str))
778 {
779 continue;
780 }
781 }
782
783 if let Some(ref created_after) = search_request.created_after {
784 if job.created_at < *created_after {
785 continue;
786 }
787 }
788
789 if let Some(ref created_before) = search_request.created_before {
790 if job.created_at > *created_before {
791 continue;
792 }
793 }
794
795 let processing_time_ms = match (job.started_at, job.completed_at) {
796 (Some(started), Some(completed)) => Some((completed - started).num_milliseconds()),
797 _ => None,
798 };
799
800 matching_jobs.push(JobInfo {
801 id: job.id.to_string(),
802 queue_name: job.queue_name.clone(),
803 status: job.status.as_str().to_string(),
804 priority: format!("{:?}", job.priority),
805 attempts: job.attempts as i32,
806 max_attempts: job.max_attempts as i32,
807 payload: job.payload.clone(),
808 created_at: job.created_at,
809 scheduled_at: job.scheduled_at,
810 started_at: job.started_at,
811 completed_at: job.completed_at,
812 failed_at: job.failed_at,
813 error_message: job.error_message.clone(),
814 processing_time_ms,
815 cron_schedule: job.cron_schedule.as_ref().map(|c| c.to_string()),
816 is_recurring: job.is_recurring(),
817 trace_id: job.trace_id.clone(),
818 correlation_id: job.correlation_id.clone(),
819 });
820 }
821 }
822
823 matching_jobs.sort_by(|a, b| b.created_at.cmp(&a.created_at));
825
826 let total_count = matching_jobs.len() as u64;
828 let page_size = pagination.limit.unwrap_or(20).min(100) as usize;
829 let page = pagination.page.unwrap_or(1).max(1) as usize;
830 let offset = (page - 1) * page_size;
831
832 let paginated_jobs: Vec<JobInfo> = matching_jobs
833 .into_iter()
834 .skip(offset)
835 .take(page_size)
836 .collect();
837
838 let response = PaginatedResponse {
839 items: paginated_jobs,
840 pagination: PaginationMeta::new(&pagination, total_count),
841 };
842
843 Ok(warp::reply::json(&ApiResponse::success(response)))
844}
845
846#[cfg(test)]
847mod tests {
848 use super::*;
849
850 #[test]
851 fn test_create_job_request_deserialization() {
852 let json = r#"{
853 "queue_name": "email",
854 "payload": {"to": "user@example.com", "subject": "Hello"},
855 "priority": "high",
856 "max_attempts": 5
857 }"#;
858
859 let request: CreateJobRequest = serde_json::from_str(json).unwrap();
860 assert_eq!(request.queue_name, "email");
861 assert_eq!(request.priority, Some("high".to_string()));
862 assert_eq!(request.max_attempts, Some(5));
863 }
864
865 #[test]
866 fn test_job_action_request_deserialization() {
867 let json = r#"{"action": "retry", "reason": "Network error resolved"}"#;
868 let request: JobActionRequest = serde_json::from_str(json).unwrap();
869 assert_eq!(request.action, "retry");
870 assert_eq!(request.reason, Some("Network error resolved".to_string()));
871 }
872
873 #[test]
874 fn test_bulk_job_action_request() {
875 let json = r#"{
876 "job_ids": ["job-1", "job-2", "job-3"],
877 "action": "delete",
878 "reason": "Cleanup old jobs"
879 }"#;
880
881 let request: BulkJobActionRequest = serde_json::from_str(json).unwrap();
882 assert_eq!(request.job_ids.len(), 3);
883 assert_eq!(request.action, "delete");
884 }
885}