hammerwork_web/api/
jobs.rs

1//! Job management API endpoints.
2//!
3//! This module provides comprehensive REST API endpoints for managing Hammerwork jobs,
4//! including creating, listing, searching, and performing actions on jobs.
5//!
6//! # API Endpoints
7//!
8//! - `GET /api/jobs` - List jobs with filtering, pagination, and sorting
9//! - `POST /api/jobs` - Create a new job
10//! - `GET /api/jobs/{id}` - Get details of a specific job
11//! - `POST /api/jobs/{id}/actions` - Perform actions on a job (retry, cancel, delete)
12//! - `POST /api/jobs/bulk` - Perform bulk actions on multiple jobs
13//! - `POST /api/jobs/search` - Search jobs with full-text queries
14//!
15//! # Examples
16//!
17//! ## Creating a Job
18//!
19//! ```rust
20//! use hammerwork_web::api::jobs::CreateJobRequest;
21//! use serde_json::json;
22//!
23//! let create_request = CreateJobRequest {
24//!     queue_name: "email_queue".to_string(),
25//!     payload: json!({
26//!         "to": "user@example.com",
27//!         "subject": "Welcome!",
28//!         "template": "welcome_email"
29//!     }),
30//!     priority: Some("high".to_string()),
31//!     scheduled_at: None,
32//!     max_attempts: Some(3),
33//!     cron_schedule: None,
34//!     trace_id: Some("trace-123".to_string()),
35//!     correlation_id: Some("corr-456".to_string()),
36//! };
37//!
38//! // This would be sent as JSON in a POST request to /api/jobs
39//! let json_payload = serde_json::to_string(&create_request).unwrap();
40//! assert!(json_payload.contains("email_queue"));
41//! assert!(json_payload.contains("high"));
42//! ```
43//!
44//! ## Job Actions
45//!
46//! ```rust
47//! use hammerwork_web::api::jobs::JobActionRequest;
48//!
49//! let retry_request = JobActionRequest {
50//!     action: "retry".to_string(),
51//!     reason: Some("Network issue resolved".to_string()),
52//! };
53//!
54//! let cancel_request = JobActionRequest {
55//!     action: "cancel".to_string(),
56//!     reason: Some("No longer needed".to_string()),
57//! };
58//!
59//! assert_eq!(retry_request.action, "retry");
60//! assert_eq!(cancel_request.action, "cancel");
61//! ```
62//!
63//! ## Bulk Operations
64//!
65//! ```rust
66//! use hammerwork_web::api::jobs::BulkJobActionRequest;
67//!
68//! let bulk_delete = BulkJobActionRequest {
69//!     job_ids: vec![
70//!         "550e8400-e29b-41d4-a716-446655440000".to_string(),
71//!         "550e8400-e29b-41d4-a716-446655440001".to_string(),
72//!     ],
73//!     action: "delete".to_string(),
74//!     reason: Some("Cleanup old failed jobs".to_string()),
75//! };
76//!
77//! assert_eq!(bulk_delete.job_ids.len(), 2);
78//! assert_eq!(bulk_delete.action, "delete");
79//! ```
80
81use super::{
82    ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams, SortParams,
83    with_filters, with_pagination, with_sort,
84};
85use hammerwork::queue::DatabaseQueue;
86use serde::{Deserialize, Serialize};
87use std::sync::Arc;
88use warp::{Filter, Reply};
89
90/// Job information for API responses
91#[derive(Debug, Serialize)]
92pub struct JobInfo {
93    pub id: String,
94    pub queue_name: String,
95    pub status: String,
96    pub priority: String,
97    pub attempts: i32,
98    pub max_attempts: i32,
99    pub payload: serde_json::Value,
100    pub created_at: chrono::DateTime<chrono::Utc>,
101    pub scheduled_at: chrono::DateTime<chrono::Utc>,
102    pub started_at: Option<chrono::DateTime<chrono::Utc>>,
103    pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
104    pub failed_at: Option<chrono::DateTime<chrono::Utc>>,
105    pub error_message: Option<String>,
106    pub processing_time_ms: Option<i64>,
107    pub cron_schedule: Option<String>,
108    pub is_recurring: bool,
109    pub trace_id: Option<String>,
110    pub correlation_id: Option<String>,
111}
112
113/// Job creation request
114#[derive(Debug, Deserialize, Serialize)]
115pub struct CreateJobRequest {
116    pub queue_name: String,
117    pub payload: serde_json::Value,
118    pub priority: Option<String>,
119    pub scheduled_at: Option<chrono::DateTime<chrono::Utc>>,
120    pub max_attempts: Option<i32>,
121    pub cron_schedule: Option<String>,
122    pub trace_id: Option<String>,
123    pub correlation_id: Option<String>,
124}
125
126/// Job action request
127#[derive(Debug, Deserialize)]
128pub struct JobActionRequest {
129    pub action: String, // "retry", "cancel", "delete"
130    pub reason: Option<String>,
131}
132
133/// Bulk job action request
134#[derive(Debug, Deserialize)]
135pub struct BulkJobActionRequest {
136    pub job_ids: Vec<String>,
137    pub action: String,
138    pub reason: Option<String>,
139}
140
141/// Job search request
142#[derive(Debug, Deserialize)]
143pub struct JobSearchRequest {
144    pub query: String,
145    pub queues: Option<Vec<String>>,
146    pub statuses: Option<Vec<String>>,
147    pub priorities: Option<Vec<String>>,
148}
149
150/// Create job routes
151pub fn routes<T>(
152    queue: Arc<T>,
153) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
154where
155    T: DatabaseQueue + Send + Sync + 'static,
156{
157    let queue_filter = warp::any().map(move || queue.clone());
158
159    let list_jobs = warp::path("jobs")
160        .and(warp::path::end())
161        .and(warp::get())
162        .and(queue_filter.clone())
163        .and(with_pagination())
164        .and(with_filters())
165        .and(with_sort())
166        .and_then(list_jobs_handler);
167
168    let create_job = warp::path("jobs")
169        .and(warp::path::end())
170        .and(warp::post())
171        .and(queue_filter.clone())
172        .and(warp::body::json())
173        .and_then(create_job_handler);
174
175    let get_job = warp::path("jobs")
176        .and(warp::path::param::<String>())
177        .and(warp::path::end())
178        .and(warp::get())
179        .and(queue_filter.clone())
180        .and_then(get_job_handler);
181
182    let job_action = warp::path("jobs")
183        .and(warp::path::param::<String>())
184        .and(warp::path("actions"))
185        .and(warp::path::end())
186        .and(warp::post())
187        .and(queue_filter.clone())
188        .and(warp::body::json())
189        .and_then(job_action_handler);
190
191    let bulk_action = warp::path("jobs")
192        .and(warp::path("bulk"))
193        .and(warp::path::end())
194        .and(warp::post())
195        .and(queue_filter.clone())
196        .and(warp::body::json())
197        .and_then(bulk_job_action_handler);
198
199    let search_jobs = warp::path("jobs")
200        .and(warp::path("search"))
201        .and(warp::path::end())
202        .and(warp::post())
203        .and(queue_filter)
204        .and(warp::body::json())
205        .and(with_pagination())
206        .and_then(search_jobs_handler);
207
208    list_jobs
209        .or(create_job)
210        .or(get_job)
211        .or(job_action)
212        .or(bulk_action)
213        .or(search_jobs)
214}
215
216/// Handler for listing jobs
217async fn list_jobs_handler<T>(
218    queue: Arc<T>,
219    pagination: PaginationParams,
220    filters: FilterParams,
221    sort: SortParams,
222) -> Result<impl Reply, warp::Rejection>
223where
224    T: DatabaseQueue + Send + Sync,
225{
226    // For now, return a placeholder response
227    // In a real implementation, this would query the database with filters and pagination
228    let _ = (queue, filters, sort);
229
230    let mock_jobs = vec![JobInfo {
231        id: "job-1".to_string(),
232        queue_name: "default".to_string(),
233        status: "pending".to_string(),
234        priority: "normal".to_string(),
235        attempts: 0,
236        max_attempts: 3,
237        payload: serde_json::json!({"task": "send_email", "to": "user@example.com"}),
238        created_at: chrono::Utc::now(),
239        scheduled_at: chrono::Utc::now(),
240        started_at: None,
241        completed_at: None,
242        failed_at: None,
243        error_message: None,
244        processing_time_ms: None,
245        cron_schedule: None,
246        is_recurring: false,
247        trace_id: None,
248        correlation_id: None,
249    }];
250
251    let response = PaginatedResponse {
252        items: mock_jobs,
253        pagination: PaginationMeta::new(&pagination, 1),
254    };
255
256    Ok(warp::reply::json(&ApiResponse::success(response)))
257}
258
259/// Handler for creating a new job
260async fn create_job_handler<T>(
261    queue: Arc<T>,
262    request: CreateJobRequest,
263) -> Result<impl Reply, warp::Rejection>
264where
265    T: DatabaseQueue + Send + Sync,
266{
267    use hammerwork::{Job, JobPriority};
268
269    let priority = match request.priority.as_deref() {
270        Some("background") => JobPriority::Background,
271        Some("low") => JobPriority::Low,
272        Some("normal") => JobPriority::Normal,
273        Some("high") => JobPriority::High,
274        Some("critical") => JobPriority::Critical,
275        _ => JobPriority::Normal,
276    };
277
278    let mut job = Job::new(request.queue_name, request.payload).with_priority(priority);
279
280    if let Some(scheduled_at) = request.scheduled_at {
281        job.scheduled_at = scheduled_at;
282    }
283
284    if let Some(max_attempts) = request.max_attempts {
285        job = job.with_max_attempts(max_attempts);
286    }
287
288    if let Some(trace_id) = request.trace_id {
289        job.trace_id = Some(trace_id);
290    }
291
292    if let Some(correlation_id) = request.correlation_id {
293        job.correlation_id = Some(correlation_id);
294    }
295
296    match queue.enqueue(job).await {
297        Ok(job_id) => {
298            let response = ApiResponse::success(serde_json::json!({
299                "message": "Job created successfully",
300                "job_id": job_id.to_string()
301            }));
302            Ok(warp::reply::json(&response))
303        }
304        Err(e) => {
305            let response = ApiResponse::<()>::error(format!("Failed to create job: {}", e));
306            Ok(warp::reply::json(&response))
307        }
308    }
309}
310
311/// Handler for getting a specific job
312async fn get_job_handler<T>(job_id: String, queue: Arc<T>) -> Result<impl Reply, warp::Rejection>
313where
314    T: DatabaseQueue + Send + Sync,
315{
316    let job_uuid = match uuid::Uuid::parse_str(&job_id) {
317        Ok(uuid) => uuid,
318        Err(_) => {
319            let response = ApiResponse::<()>::error("Invalid job ID format".to_string());
320            return Ok(warp::reply::json(&response));
321        }
322    };
323
324    match queue.get_job(job_uuid).await {
325        Ok(Some(job)) => {
326            let job_info = JobInfo {
327                id: job.id.to_string(),
328                queue_name: job.queue_name.clone(),
329                status: format!("{:?}", job.status),
330                priority: format!("{:?}", job.priority),
331                attempts: job.attempts,
332                max_attempts: job.max_attempts,
333                payload: job.payload.clone(),
334                created_at: job.created_at,
335                scheduled_at: job.scheduled_at,
336                started_at: job.started_at,
337                completed_at: job.completed_at,
338                failed_at: job.failed_at,
339                error_message: job.error_message.clone(),
340                processing_time_ms: job.started_at.and_then(|start| {
341                    job.completed_at
342                        .or(job.failed_at)
343                        .or(job.timed_out_at)
344                        .map(|end| (end - start).num_milliseconds())
345                }),
346                cron_schedule: job.cron_schedule.clone(),
347                is_recurring: job.is_recurring(),
348                trace_id: job.trace_id.clone(),
349                correlation_id: job.correlation_id.clone(),
350            };
351
352            Ok(warp::reply::json(&ApiResponse::success(job_info)))
353        }
354        Ok(None) => {
355            let response = ApiResponse::<()>::error(format!("Job '{}' not found", job_id));
356            Ok(warp::reply::json(&response))
357        }
358        Err(e) => {
359            let response = ApiResponse::<()>::error(format!("Failed to get job: {}", e));
360            Ok(warp::reply::json(&response))
361        }
362    }
363}
364
365/// Handler for job actions
366async fn job_action_handler<T>(
367    job_id: String,
368    queue: Arc<T>,
369    action_request: JobActionRequest,
370) -> Result<impl Reply, warp::Rejection>
371where
372    T: DatabaseQueue + Send + Sync,
373{
374    let job_uuid = match uuid::Uuid::parse_str(&job_id) {
375        Ok(uuid) => uuid,
376        Err(_) => {
377            let response = ApiResponse::<()>::error("Invalid job ID format".to_string());
378            return Ok(warp::reply::json(&response));
379        }
380    };
381
382    match action_request.action.as_str() {
383        "retry" => match queue.retry_job(job_uuid, chrono::Utc::now()).await {
384            Ok(()) => {
385                let response = ApiResponse::success(serde_json::json!({
386                    "message": format!("Job '{}' scheduled for retry", job_id)
387                }));
388                Ok(warp::reply::json(&response))
389            }
390            Err(e) => {
391                let response = ApiResponse::<()>::error(format!("Failed to retry job: {}", e));
392                Ok(warp::reply::json(&response))
393            }
394        },
395        "cancel" | "delete" => match queue.delete_job(job_uuid).await {
396            Ok(()) => {
397                let response = ApiResponse::success(serde_json::json!({
398                    "message": format!("Job '{}' deleted", job_id)
399                }));
400                Ok(warp::reply::json(&response))
401            }
402            Err(e) => {
403                let response = ApiResponse::<()>::error(format!("Failed to delete job: {}", e));
404                Ok(warp::reply::json(&response))
405            }
406        },
407        _ => {
408            let response =
409                ApiResponse::<()>::error(format!("Unknown action: {}", action_request.action));
410            Ok(warp::reply::json(&response))
411        }
412    }
413}
414
415/// Handler for bulk job actions
416async fn bulk_job_action_handler<T>(
417    queue: Arc<T>,
418    request: BulkJobActionRequest,
419) -> Result<impl Reply, warp::Rejection>
420where
421    T: DatabaseQueue + Send + Sync,
422{
423    let mut successful = 0;
424    let mut failed = 0;
425    let mut errors = Vec::new();
426
427    for job_id_str in &request.job_ids {
428        let job_uuid = match uuid::Uuid::parse_str(job_id_str) {
429            Ok(uuid) => uuid,
430            Err(_) => {
431                failed += 1;
432                errors.push(format!("Invalid job ID: {}", job_id_str));
433                continue;
434            }
435        };
436
437        let result = match request.action.as_str() {
438            "retry" => queue.retry_job(job_uuid, chrono::Utc::now()).await,
439            "delete" => queue.delete_job(job_uuid).await,
440            _ => {
441                failed += 1;
442                errors.push(format!("Unknown action: {}", request.action));
443                continue;
444            }
445        };
446
447        match result {
448            Ok(()) => successful += 1,
449            Err(e) => {
450                failed += 1;
451                errors.push(format!("Job {}: {}", job_id_str, e));
452            }
453        }
454    }
455
456    let response = ApiResponse::success(serde_json::json!({
457        "successful": successful,
458        "failed": failed,
459        "errors": errors,
460        "message": format!("Bulk {} completed: {} successful, {} failed", request.action, successful, failed)
461    }));
462
463    Ok(warp::reply::json(&response))
464}
465
466/// Handler for searching jobs
467async fn search_jobs_handler<T>(
468    queue: Arc<T>,
469    search_request: JobSearchRequest,
470    pagination: PaginationParams,
471) -> Result<impl Reply, warp::Rejection>
472where
473    T: DatabaseQueue + Send + Sync,
474{
475    // For now, return a placeholder response
476    // In a real implementation, this would perform full-text search on job payloads
477    let _ = (queue, search_request);
478
479    let response = PaginatedResponse {
480        items: Vec::<JobInfo>::new(),
481        pagination: PaginationMeta::new(&pagination, 0),
482    };
483
484    Ok(warp::reply::json(&ApiResponse::success(response)))
485}
486
487#[cfg(test)]
488mod tests {
489    use super::*;
490
491    #[test]
492    fn test_create_job_request_deserialization() {
493        let json = r#"{
494            "queue_name": "email",
495            "payload": {"to": "user@example.com", "subject": "Hello"},
496            "priority": "high",
497            "max_attempts": 5
498        }"#;
499
500        let request: CreateJobRequest = serde_json::from_str(json).unwrap();
501        assert_eq!(request.queue_name, "email");
502        assert_eq!(request.priority, Some("high".to_string()));
503        assert_eq!(request.max_attempts, Some(5));
504    }
505
506    #[test]
507    fn test_job_action_request_deserialization() {
508        let json = r#"{"action": "retry", "reason": "Network error resolved"}"#;
509        let request: JobActionRequest = serde_json::from_str(json).unwrap();
510        assert_eq!(request.action, "retry");
511        assert_eq!(request.reason, Some("Network error resolved".to_string()));
512    }
513
514    #[test]
515    fn test_bulk_job_action_request() {
516        let json = r#"{
517            "job_ids": ["job-1", "job-2", "job-3"],
518            "action": "delete",
519            "reason": "Cleanup old jobs"
520        }"#;
521
522        let request: BulkJobActionRequest = serde_json::from_str(json).unwrap();
523        assert_eq!(request.job_ids.len(), 3);
524        assert_eq!(request.action, "delete");
525    }
526}