Skip to main content

hammerwork_web/api/
jobs.rs

1//! Job management API endpoints.
2//!
3//! This module provides comprehensive REST API endpoints for managing Hammerwork jobs,
4//! including creating, listing, searching, and performing actions on jobs.
5//!
6//! # API Endpoints
7//!
8//! - `GET /api/jobs` - List jobs with filtering, pagination, and sorting
9//! - `POST /api/jobs` - Create a new job
10//! - `GET /api/jobs/{id}` - Get details of a specific job
11//! - `POST /api/jobs/{id}/actions` - Perform actions on a job (retry, cancel, delete)
12//! - `POST /api/jobs/bulk` - Perform bulk actions on multiple jobs
13//! - `POST /api/jobs/search` - Search jobs with full-text queries
14//!
15//! # Examples
16//!
17//! ## Creating a Job
18//!
19//! ```rust
20//! use hammerwork_web::api::jobs::CreateJobRequest;
21//! use serde_json::json;
22//!
23//! let create_request = CreateJobRequest {
24//!     queue_name: "email_queue".to_string(),
25//!     payload: json!({
26//!         "to": "user@example.com",
27//!         "subject": "Welcome!",
28//!         "template": "welcome_email"
29//!     }),
30//!     priority: Some("high".to_string()),
31//!     scheduled_at: None,
32//!     max_attempts: Some(3),
33//!     cron_schedule: None,
34//!     trace_id: Some("trace-123".to_string()),
35//!     correlation_id: Some("corr-456".to_string()),
36//! };
37//!
38//! // This would be sent as JSON in a POST request to /api/jobs
39//! let json_payload = serde_json::to_string(&create_request).unwrap();
40//! assert!(json_payload.contains("email_queue"));
41//! assert!(json_payload.contains("high"));
42//! ```
43//!
44//! ## Job Actions
45//!
46//! ```rust
47//! use hammerwork_web::api::jobs::JobActionRequest;
48//!
49//! let retry_request = JobActionRequest {
50//!     action: "retry".to_string(),
51//!     reason: Some("Network issue resolved".to_string()),
52//! };
53//!
54//! let cancel_request = JobActionRequest {
55//!     action: "cancel".to_string(),
56//!     reason: Some("No longer needed".to_string()),
57//! };
58//!
59//! assert_eq!(retry_request.action, "retry");
60//! assert_eq!(cancel_request.action, "cancel");
61//! ```
62//!
63//! ## Bulk Operations
64//!
65//! ```rust
66//! use hammerwork_web::api::jobs::BulkJobActionRequest;
67//!
68//! let bulk_delete = BulkJobActionRequest {
69//!     job_ids: vec![
70//!         "550e8400-e29b-41d4-a716-446655440000".to_string(),
71//!         "550e8400-e29b-41d4-a716-446655440001".to_string(),
72//!     ],
73//!     action: "delete".to_string(),
74//!     reason: Some("Cleanup old failed jobs".to_string()),
75//! };
76//!
77//! assert_eq!(bulk_delete.job_ids.len(), 2);
78//! assert_eq!(bulk_delete.action, "delete");
79//! ```
80
81use super::{
82    ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams, SortParams,
83    with_filters, with_pagination, with_sort,
84};
85use hammerwork::queue::DatabaseQueue;
86use serde::{Deserialize, Serialize};
87use std::sync::Arc;
88use warp::{Filter, Reply};
89
90/// Job information for API responses
91#[derive(Debug, Serialize)]
92pub struct JobInfo {
93    pub id: String,
94    pub queue_name: String,
95    pub status: String,
96    pub priority: String,
97    pub attempts: i32,
98    pub max_attempts: i32,
99    pub payload: serde_json::Value,
100    pub created_at: chrono::DateTime<chrono::Utc>,
101    pub scheduled_at: chrono::DateTime<chrono::Utc>,
102    pub started_at: Option<chrono::DateTime<chrono::Utc>>,
103    pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
104    pub failed_at: Option<chrono::DateTime<chrono::Utc>>,
105    pub error_message: Option<String>,
106    pub processing_time_ms: Option<i64>,
107    pub cron_schedule: Option<String>,
108    pub is_recurring: bool,
109    pub trace_id: Option<String>,
110    pub correlation_id: Option<String>,
111}
112
113/// Job creation request
114#[derive(Debug, Deserialize, Serialize)]
115pub struct CreateJobRequest {
116    pub queue_name: String,
117    pub payload: serde_json::Value,
118    pub priority: Option<String>,
119    pub scheduled_at: Option<chrono::DateTime<chrono::Utc>>,
120    pub max_attempts: Option<i32>,
121    pub cron_schedule: Option<String>,
122    pub trace_id: Option<String>,
123    pub correlation_id: Option<String>,
124}
125
126/// Job action request
127#[derive(Debug, Deserialize)]
128pub struct JobActionRequest {
129    pub action: String, // "retry", "cancel", "delete"
130    pub reason: Option<String>,
131}
132
133/// Bulk job action request
134#[derive(Debug, Deserialize)]
135pub struct BulkJobActionRequest {
136    pub job_ids: Vec<String>,
137    pub action: String,
138    pub reason: Option<String>,
139}
140
141/// Job search request
142#[derive(Debug, Deserialize)]
143pub struct JobSearchRequest {
144    pub query: String,
145    pub queues: Option<Vec<String>>,
146    pub statuses: Option<Vec<String>>,
147    pub priorities: Option<Vec<String>>,
148    pub created_after: Option<chrono::DateTime<chrono::Utc>>,
149    pub created_before: Option<chrono::DateTime<chrono::Utc>>,
150}
151
152/// Create job routes
153pub fn routes<T>(
154    queue: Arc<T>,
155) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
156where
157    T: DatabaseQueue + Send + Sync + 'static,
158{
159    let queue_filter = warp::any().map(move || queue.clone());
160
161    let list_jobs = warp::path("jobs")
162        .and(warp::path::end())
163        .and(warp::get())
164        .and(queue_filter.clone())
165        .and(with_pagination())
166        .and(with_filters())
167        .and(with_sort())
168        .and_then(list_jobs_handler);
169
170    let create_job = warp::path("jobs")
171        .and(warp::path::end())
172        .and(warp::post())
173        .and(queue_filter.clone())
174        .and(warp::body::json())
175        .and_then(create_job_handler);
176
177    let get_job = warp::path("jobs")
178        .and(warp::path::param::<String>())
179        .and(warp::path::end())
180        .and(warp::get())
181        .and(queue_filter.clone())
182        .and_then(get_job_handler);
183
184    let job_action = warp::path("jobs")
185        .and(warp::path::param::<String>())
186        .and(warp::path("actions"))
187        .and(warp::path::end())
188        .and(warp::post())
189        .and(queue_filter.clone())
190        .and(warp::body::json())
191        .and_then(job_action_handler);
192
193    let bulk_action = warp::path("jobs")
194        .and(warp::path("bulk"))
195        .and(warp::path::end())
196        .and(warp::post())
197        .and(queue_filter.clone())
198        .and(warp::body::json())
199        .and_then(bulk_job_action_handler);
200
201    let search_jobs = warp::path("jobs")
202        .and(warp::path("search"))
203        .and(warp::path::end())
204        .and(warp::post())
205        .and(queue_filter)
206        .and(warp::body::json())
207        .and(with_pagination())
208        .and_then(search_jobs_handler);
209
210    list_jobs
211        .or(create_job)
212        .or(get_job)
213        .or(job_action)
214        .or(bulk_action)
215        .or(search_jobs)
216}
217
218/// Handler for listing jobs
219async fn list_jobs_handler<T>(
220    queue: Arc<T>,
221    pagination: PaginationParams,
222    filters: FilterParams,
223    sort: SortParams,
224) -> Result<impl Reply, warp::Rejection>
225where
226    T: DatabaseQueue + Send + Sync,
227{
228    // Since DatabaseQueue doesn't provide direct list methods with filters,
229    // we'll use the available methods to gather jobs
230    let mut all_jobs = Vec::new();
231
232    // Get queue stats to find available queues
233    let queue_stats = match queue.get_all_queue_stats().await {
234        Ok(stats) => stats,
235        Err(e) => {
236            let response = ApiResponse::<()>::error(format!("Failed to get queue stats: {}", e));
237            return Ok(warp::reply::json(&response));
238        }
239    };
240
241    // Filter by queue if specified
242    let target_queues: Vec<String> = if let Some(ref queue_name) = filters.queue {
243        vec![queue_name.clone()]
244    } else {
245        queue_stats.iter().map(|s| s.queue_name.clone()).collect()
246    };
247
248    // For each queue, get jobs from different sources based on status filter
249    for queue_name in &target_queues {
250        let mut queue_jobs = Vec::new();
251
252        // Collect jobs based on status filter or get all types if no filter
253        if filters.status.is_none() || filters.status.as_ref().unwrap().to_lowercase() == "pending"
254        {
255            // Get ready jobs (pending jobs ready to be processed)
256            if let Ok(ready_jobs) = queue.get_ready_jobs(&queue_name, 100).await {
257                queue_jobs.extend(ready_jobs);
258            }
259        }
260
261        if filters.status.is_none()
262            || filters.status.as_ref().unwrap().to_lowercase() == "failed"
263            || filters.status.as_ref().unwrap().to_lowercase() == "dead"
264        {
265            // Get dead jobs
266            if let Ok(dead_jobs) = queue
267                .get_dead_jobs_by_queue(&queue_name, Some(100), Some(0))
268                .await
269            {
270                queue_jobs.extend(dead_jobs);
271            }
272        }
273
274        if filters.status.is_none()
275            || filters.status.as_ref().unwrap().to_lowercase() == "recurring"
276        {
277            // Get recurring jobs
278            if let Ok(recurring_jobs) = queue.get_recurring_jobs(&queue_name).await {
279                queue_jobs.extend(recurring_jobs);
280            }
281        }
282
283        for job in queue_jobs {
284            let processing_time_ms = match (job.started_at, job.completed_at) {
285                (Some(started), Some(completed)) => Some((completed - started).num_milliseconds()),
286                _ => None,
287            };
288
289            let job_info = JobInfo {
290                id: job.id.to_string(),
291                queue_name: job.queue_name.clone(),
292                status: job.status.as_str().to_string(),
293                priority: format!("{:?}", job.priority),
294                attempts: job.attempts as i32,
295                max_attempts: job.max_attempts as i32,
296                payload: job.payload.clone(),
297                created_at: job.created_at,
298                scheduled_at: job.scheduled_at,
299                started_at: job.started_at,
300                completed_at: job.completed_at,
301                failed_at: job.failed_at,
302                error_message: job.error_message.clone(),
303                processing_time_ms,
304                cron_schedule: job.cron_schedule.as_ref().map(|c| c.to_string()),
305                is_recurring: job.is_recurring(),
306                trace_id: job.trace_id.clone(),
307                correlation_id: job.correlation_id.clone(),
308            };
309
310            // Apply status filter
311            if let Some(ref status) = filters.status {
312                if job_info.status.to_lowercase() != status.to_lowercase() {
313                    continue;
314                }
315            }
316
317            // Apply priority filter
318            if let Some(ref priority) = filters.priority {
319                if job_info.priority.to_lowercase() != priority.to_lowercase() {
320                    continue;
321                }
322            }
323
324            all_jobs.push(job_info);
325        }
326    }
327
328    // Sort jobs
329    match sort.sort_by.as_deref() {
330        Some("created_at") => {
331            all_jobs.sort_by(|a, b| {
332                if sort.sort_order.as_deref() == Some("asc") {
333                    a.created_at.cmp(&b.created_at)
334                } else {
335                    b.created_at.cmp(&a.created_at)
336                }
337            });
338        }
339        Some("scheduled_at") => {
340            all_jobs.sort_by(|a, b| {
341                if sort.sort_order.as_deref() == Some("asc") {
342                    a.scheduled_at.cmp(&b.scheduled_at)
343                } else {
344                    b.scheduled_at.cmp(&a.scheduled_at)
345                }
346            });
347        }
348        Some("priority") => {
349            all_jobs.sort_by(|a, b| {
350                if sort.sort_order.as_deref() == Some("asc") {
351                    a.priority.cmp(&b.priority)
352                } else {
353                    b.priority.cmp(&a.priority)
354                }
355            });
356        }
357        _ => {
358            // Default sort by created_at desc
359            all_jobs.sort_by(|a, b| b.created_at.cmp(&a.created_at));
360        }
361    }
362
363    // Apply pagination
364    let total_count = all_jobs.len() as u64;
365    let page_size = pagination.limit.unwrap_or(20).min(100) as usize;
366    let page = pagination.page.unwrap_or(1).max(1) as usize;
367    let offset = (page - 1) * page_size;
368
369    let paginated_jobs: Vec<JobInfo> = all_jobs.into_iter().skip(offset).take(page_size).collect();
370
371    let response = PaginatedResponse {
372        items: paginated_jobs,
373        pagination: PaginationMeta::new(&pagination, total_count),
374    };
375
376    Ok(warp::reply::json(&ApiResponse::success(response)))
377}
378
379/// Handler for creating a new job
380async fn create_job_handler<T>(
381    queue: Arc<T>,
382    request: CreateJobRequest,
383) -> Result<impl Reply, warp::Rejection>
384where
385    T: DatabaseQueue + Send + Sync,
386{
387    use hammerwork::{Job, JobPriority};
388
389    let priority = match request.priority.as_deref() {
390        Some("background") => JobPriority::Background,
391        Some("low") => JobPriority::Low,
392        Some("normal") => JobPriority::Normal,
393        Some("high") => JobPriority::High,
394        Some("critical") => JobPriority::Critical,
395        _ => JobPriority::Normal,
396    };
397
398    let mut job = Job::new(request.queue_name, request.payload).with_priority(priority);
399
400    if let Some(scheduled_at) = request.scheduled_at {
401        job.scheduled_at = scheduled_at;
402    }
403
404    if let Some(max_attempts) = request.max_attempts {
405        job = job.with_max_attempts(max_attempts);
406    }
407
408    if let Some(trace_id) = request.trace_id {
409        job.trace_id = Some(trace_id);
410    }
411
412    if let Some(correlation_id) = request.correlation_id {
413        job.correlation_id = Some(correlation_id);
414    }
415
416    match queue.enqueue(job).await {
417        Ok(job_id) => {
418            let response = ApiResponse::success(serde_json::json!({
419                "message": "Job created successfully",
420                "job_id": job_id.to_string()
421            }));
422            Ok(warp::reply::json(&response))
423        }
424        Err(e) => {
425            let response = ApiResponse::<()>::error(format!("Failed to create job: {}", e));
426            Ok(warp::reply::json(&response))
427        }
428    }
429}
430
431/// Handler for getting a specific job
432async fn get_job_handler<T>(job_id: String, queue: Arc<T>) -> Result<impl Reply, warp::Rejection>
433where
434    T: DatabaseQueue + Send + Sync,
435{
436    let job_uuid = match uuid::Uuid::parse_str(&job_id) {
437        Ok(uuid) => uuid,
438        Err(_) => {
439            let response = ApiResponse::<()>::error("Invalid job ID format".to_string());
440            return Ok(warp::reply::json(&response));
441        }
442    };
443
444    match queue.get_job(job_uuid).await {
445        Ok(Some(job)) => {
446            let job_info = JobInfo {
447                id: job.id.to_string(),
448                queue_name: job.queue_name.clone(),
449                status: format!("{:?}", job.status),
450                priority: format!("{:?}", job.priority),
451                attempts: job.attempts,
452                max_attempts: job.max_attempts,
453                payload: job.payload.clone(),
454                created_at: job.created_at,
455                scheduled_at: job.scheduled_at,
456                started_at: job.started_at,
457                completed_at: job.completed_at,
458                failed_at: job.failed_at,
459                error_message: job.error_message.clone(),
460                processing_time_ms: job.started_at.and_then(|start| {
461                    job.completed_at
462                        .or(job.failed_at)
463                        .or(job.timed_out_at)
464                        .map(|end| (end - start).num_milliseconds())
465                }),
466                cron_schedule: job.cron_schedule.clone(),
467                is_recurring: job.is_recurring(),
468                trace_id: job.trace_id.clone(),
469                correlation_id: job.correlation_id.clone(),
470            };
471
472            Ok(warp::reply::json(&ApiResponse::success(job_info)))
473        }
474        Ok(None) => {
475            let response = ApiResponse::<()>::error(format!("Job '{}' not found", job_id));
476            Ok(warp::reply::json(&response))
477        }
478        Err(e) => {
479            let response = ApiResponse::<()>::error(format!("Failed to get job: {}", e));
480            Ok(warp::reply::json(&response))
481        }
482    }
483}
484
485/// Handler for job actions
486async fn job_action_handler<T>(
487    job_id: String,
488    queue: Arc<T>,
489    action_request: JobActionRequest,
490) -> Result<impl Reply, warp::Rejection>
491where
492    T: DatabaseQueue + Send + Sync,
493{
494    let job_uuid = match uuid::Uuid::parse_str(&job_id) {
495        Ok(uuid) => uuid,
496        Err(_) => {
497            let response = ApiResponse::<()>::error("Invalid job ID format".to_string());
498            return Ok(warp::reply::json(&response));
499        }
500    };
501
502    match action_request.action.as_str() {
503        "retry" => match queue.retry_job(job_uuid, chrono::Utc::now()).await {
504            Ok(()) => {
505                let response = ApiResponse::success(serde_json::json!({
506                    "message": format!("Job '{}' scheduled for retry", job_id)
507                }));
508                Ok(warp::reply::json(&response))
509            }
510            Err(e) => {
511                let response = ApiResponse::<()>::error(format!("Failed to retry job: {}", e));
512                Ok(warp::reply::json(&response))
513            }
514        },
515        "cancel" | "delete" => match queue.delete_job(job_uuid).await {
516            Ok(()) => {
517                let response = ApiResponse::success(serde_json::json!({
518                    "message": format!("Job '{}' deleted", job_id)
519                }));
520                Ok(warp::reply::json(&response))
521            }
522            Err(e) => {
523                let response = ApiResponse::<()>::error(format!("Failed to delete job: {}", e));
524                Ok(warp::reply::json(&response))
525            }
526        },
527        _ => {
528            let response =
529                ApiResponse::<()>::error(format!("Unknown action: {}", action_request.action));
530            Ok(warp::reply::json(&response))
531        }
532    }
533}
534
535/// Handler for bulk job actions
536async fn bulk_job_action_handler<T>(
537    queue: Arc<T>,
538    request: BulkJobActionRequest,
539) -> Result<impl Reply, warp::Rejection>
540where
541    T: DatabaseQueue + Send + Sync,
542{
543    let mut successful = 0;
544    let mut failed = 0;
545    let mut errors = Vec::new();
546
547    for job_id_str in &request.job_ids {
548        let job_uuid = match uuid::Uuid::parse_str(job_id_str) {
549            Ok(uuid) => uuid,
550            Err(_) => {
551                failed += 1;
552                errors.push(format!("Invalid job ID: {}", job_id_str));
553                continue;
554            }
555        };
556
557        let result = match request.action.as_str() {
558            "retry" => queue.retry_job(job_uuid, chrono::Utc::now()).await,
559            "delete" => queue.delete_job(job_uuid).await,
560            _ => {
561                failed += 1;
562                errors.push(format!("Unknown action: {}", request.action));
563                continue;
564            }
565        };
566
567        match result {
568            Ok(()) => successful += 1,
569            Err(e) => {
570                failed += 1;
571                errors.push(format!("Job {}: {}", job_id_str, e));
572            }
573        }
574    }
575
576    let response = ApiResponse::success(serde_json::json!({
577        "successful": successful,
578        "failed": failed,
579        "errors": errors,
580        "message": format!("Bulk {} completed: {} successful, {} failed", request.action, successful, failed)
581    }));
582
583    Ok(warp::reply::json(&response))
584}
585
586/// Handler for searching jobs
587async fn search_jobs_handler<T>(
588    queue: Arc<T>,
589    search_request: JobSearchRequest,
590    pagination: PaginationParams,
591) -> Result<impl Reply, warp::Rejection>
592where
593    T: DatabaseQueue + Send + Sync,
594{
595    // Since we don't have direct search methods, we'll gather jobs and filter in memory
596    let mut matching_jobs = Vec::new();
597    let search_term = search_request.query.to_lowercase();
598
599    // Get queue stats to find available queues
600    let queue_stats = match queue.get_all_queue_stats().await {
601        Ok(stats) => stats,
602        Err(e) => {
603            let response = ApiResponse::<()>::error(format!("Failed to get queue stats: {}", e));
604            return Ok(warp::reply::json(&response));
605        }
606    };
607
608    // Filter by specified queues or use all
609    let target_queues: Vec<String> = if let Some(ref queue_names) = search_request.queues {
610        queue_names.clone()
611    } else {
612        queue_stats.iter().map(|s| s.queue_name.clone()).collect()
613    };
614
615    for queue_name in &target_queues {
616        let mut queue_jobs = Vec::new();
617
618        // Collect jobs from all sources for comprehensive search
619        if let Ok(ready_jobs) = queue.get_ready_jobs(&queue_name, 200).await {
620            queue_jobs.extend(ready_jobs);
621        }
622
623        if let Ok(dead_jobs) = queue
624            .get_dead_jobs_by_queue(&queue_name, Some(200), Some(0))
625            .await
626        {
627            queue_jobs.extend(dead_jobs);
628        }
629
630        if let Ok(recurring_jobs) = queue.get_recurring_jobs(&queue_name).await {
631            queue_jobs.extend(recurring_jobs);
632        }
633
634        for job in queue_jobs {
635            // Check if job matches search criteria
636            let payload_str = serde_json::to_string(&job.payload)
637                .unwrap_or_default()
638                .to_lowercase();
639            let matches_search = job.id.to_string().contains(&search_term)
640                || job.queue_name.to_lowercase().contains(&search_term)
641                || payload_str.contains(&search_term)
642                || job
643                    .error_message
644                    .as_ref()
645                    .map(|e| e.to_lowercase().contains(&search_term))
646                    .unwrap_or(false)
647                || job
648                    .trace_id
649                    .as_ref()
650                    .map(|t| t.to_lowercase().contains(&search_term))
651                    .unwrap_or(false)
652                || job
653                    .correlation_id
654                    .as_ref()
655                    .map(|c| c.to_lowercase().contains(&search_term))
656                    .unwrap_or(false);
657
658            if !matches_search {
659                continue;
660            }
661
662            // Apply status filter
663            if let Some(ref statuses) = search_request.statuses {
664                if !statuses
665                    .iter()
666                    .any(|s| s.eq_ignore_ascii_case(job.status.as_str()))
667                {
668                    continue;
669                }
670            }
671
672            // Apply priority filter
673            if let Some(ref priorities) = search_request.priorities {
674                let job_priority_str = format!("{:?}", job.priority);
675                if !priorities
676                    .iter()
677                    .any(|p| p.eq_ignore_ascii_case(&job_priority_str))
678                {
679                    continue;
680                }
681            }
682
683            // Apply date filters
684            if let Some(ref created_after) = search_request.created_after {
685                if job.created_at < *created_after {
686                    continue;
687                }
688            }
689
690            if let Some(ref created_before) = search_request.created_before {
691                if job.created_at > *created_before {
692                    continue;
693                }
694            }
695
696            let processing_time_ms = match (job.started_at, job.completed_at) {
697                (Some(started), Some(completed)) => Some((completed - started).num_milliseconds()),
698                _ => None,
699            };
700
701            matching_jobs.push(JobInfo {
702                id: job.id.to_string(),
703                queue_name: job.queue_name.clone(),
704                status: job.status.as_str().to_string(),
705                priority: format!("{:?}", job.priority),
706                attempts: job.attempts as i32,
707                max_attempts: job.max_attempts as i32,
708                payload: job.payload.clone(),
709                created_at: job.created_at,
710                scheduled_at: job.scheduled_at,
711                started_at: job.started_at,
712                completed_at: job.completed_at,
713                failed_at: job.failed_at,
714                error_message: job.error_message.clone(),
715                processing_time_ms,
716                cron_schedule: job.cron_schedule.as_ref().map(|c| c.to_string()),
717                is_recurring: job.is_recurring(),
718                trace_id: job.trace_id.clone(),
719                correlation_id: job.correlation_id.clone(),
720            });
721        }
722
723        // Also search recurring jobs
724        let recurring_jobs = match queue.get_recurring_jobs(&queue_name).await {
725            Ok(jobs) => jobs,
726            Err(e) => {
727                eprintln!(
728                    "Failed to get recurring jobs for queue {}: {}",
729                    queue_name, e
730                );
731                continue;
732            }
733        };
734
735        for job in recurring_jobs {
736            // Check if job matches search criteria
737            let payload_str = serde_json::to_string(&job.payload)
738                .unwrap_or_default()
739                .to_lowercase();
740            let matches_search = job.id.to_string().contains(&search_term)
741                || job.queue_name.to_lowercase().contains(&search_term)
742                || payload_str.contains(&search_term)
743                || job
744                    .error_message
745                    .as_ref()
746                    .map(|e| e.to_lowercase().contains(&search_term))
747                    .unwrap_or(false)
748                || job
749                    .trace_id
750                    .as_ref()
751                    .map(|t| t.to_lowercase().contains(&search_term))
752                    .unwrap_or(false)
753                || job
754                    .correlation_id
755                    .as_ref()
756                    .map(|c| c.to_lowercase().contains(&search_term))
757                    .unwrap_or(false);
758
759            if !matches_search {
760                continue;
761            }
762
763            // Apply additional filters
764            if let Some(ref statuses) = search_request.statuses {
765                if !statuses
766                    .iter()
767                    .any(|s| s.eq_ignore_ascii_case(job.status.as_str()))
768                {
769                    continue;
770                }
771            }
772
773            if let Some(ref priorities) = search_request.priorities {
774                let job_priority_str = format!("{:?}", job.priority);
775                if !priorities
776                    .iter()
777                    .any(|p| p.eq_ignore_ascii_case(&job_priority_str))
778                {
779                    continue;
780                }
781            }
782
783            if let Some(ref created_after) = search_request.created_after {
784                if job.created_at < *created_after {
785                    continue;
786                }
787            }
788
789            if let Some(ref created_before) = search_request.created_before {
790                if job.created_at > *created_before {
791                    continue;
792                }
793            }
794
795            let processing_time_ms = match (job.started_at, job.completed_at) {
796                (Some(started), Some(completed)) => Some((completed - started).num_milliseconds()),
797                _ => None,
798            };
799
800            matching_jobs.push(JobInfo {
801                id: job.id.to_string(),
802                queue_name: job.queue_name.clone(),
803                status: job.status.as_str().to_string(),
804                priority: format!("{:?}", job.priority),
805                attempts: job.attempts as i32,
806                max_attempts: job.max_attempts as i32,
807                payload: job.payload.clone(),
808                created_at: job.created_at,
809                scheduled_at: job.scheduled_at,
810                started_at: job.started_at,
811                completed_at: job.completed_at,
812                failed_at: job.failed_at,
813                error_message: job.error_message.clone(),
814                processing_time_ms,
815                cron_schedule: job.cron_schedule.as_ref().map(|c| c.to_string()),
816                is_recurring: job.is_recurring(),
817                trace_id: job.trace_id.clone(),
818                correlation_id: job.correlation_id.clone(),
819            });
820        }
821    }
822
823    // Sort by created_at desc by default
824    matching_jobs.sort_by(|a, b| b.created_at.cmp(&a.created_at));
825
826    // Apply pagination
827    let total_count = matching_jobs.len() as u64;
828    let page_size = pagination.limit.unwrap_or(20).min(100) as usize;
829    let page = pagination.page.unwrap_or(1).max(1) as usize;
830    let offset = (page - 1) * page_size;
831
832    let paginated_jobs: Vec<JobInfo> = matching_jobs
833        .into_iter()
834        .skip(offset)
835        .take(page_size)
836        .collect();
837
838    let response = PaginatedResponse {
839        items: paginated_jobs,
840        pagination: PaginationMeta::new(&pagination, total_count),
841    };
842
843    Ok(warp::reply::json(&ApiResponse::success(response)))
844}
845
846#[cfg(test)]
847mod tests {
848    use super::*;
849
850    #[test]
851    fn test_create_job_request_deserialization() {
852        let json = r#"{
853            "queue_name": "email",
854            "payload": {"to": "user@example.com", "subject": "Hello"},
855            "priority": "high",
856            "max_attempts": 5
857        }"#;
858
859        let request: CreateJobRequest = serde_json::from_str(json).unwrap();
860        assert_eq!(request.queue_name, "email");
861        assert_eq!(request.priority, Some("high".to_string()));
862        assert_eq!(request.max_attempts, Some(5));
863    }
864
865    #[test]
866    fn test_job_action_request_deserialization() {
867        let json = r#"{"action": "retry", "reason": "Network error resolved"}"#;
868        let request: JobActionRequest = serde_json::from_str(json).unwrap();
869        assert_eq!(request.action, "retry");
870        assert_eq!(request.reason, Some("Network error resolved".to_string()));
871    }
872
873    #[test]
874    fn test_bulk_job_action_request() {
875        let json = r#"{
876            "job_ids": ["job-1", "job-2", "job-3"],
877            "action": "delete",
878            "reason": "Cleanup old jobs"
879        }"#;
880
881        let request: BulkJobActionRequest = serde_json::from_str(json).unwrap();
882        assert_eq!(request.job_ids.len(), 3);
883        assert_eq!(request.action, "delete");
884    }
885}