hammerwork_web/api/
archive.rs

1//! Archive management API endpoints.
2//!
3//! This module provides REST API endpoints for managing job archiving operations,
4//! including archiving jobs, restoring archived jobs, listing archived jobs,
5//! and configuring archival policies.
6//!
7//! # Endpoints
8//!
9//! - `POST /api/archive/jobs` - Archive jobs based on policy
10//! - `GET /api/archive/jobs` - List archived jobs with pagination and filtering
11//! - `POST /api/archive/jobs/{id}/restore` - Restore an archived job
12//! - `DELETE /api/archive/jobs` - Purge old archived jobs
13//! - `GET /api/archive/stats` - Get archival statistics
14//! - `GET /api/archive/policies` - List archival policies
15//! - `POST /api/archive/policies` - Create or update archival policy
16//! - `DELETE /api/archive/policies/{id}` - Delete archival policy
17//!
18//! # Examples
19//!
20//! ## Archive Jobs
21//!
22//! ```rust
23//! use hammerwork_web::api::archive::{ArchiveRequest, ArchiveResponse};
24//! use hammerwork::archive::{ArchivalReason, ArchivalPolicy};
25//! use chrono::Duration;
26//!
27//! let request = ArchiveRequest {
28//!     queue_name: Some("completed_jobs".to_string()),
29//!     reason: ArchivalReason::Automatic,
30//!     archived_by: Some("scheduler".to_string()),
31//!     dry_run: false,
32//!     policy: Some(ArchivalPolicy::new()
33//!         .archive_completed_after(Duration::days(7))),
34//! };
35//!
36//! // This would be sent to POST /api/archive/jobs
37//! assert_eq!(request.queue_name, Some("completed_jobs".to_string()));
38//! assert!(!request.dry_run);
39//! ```
40//!
41//! ## List Archived Jobs
42//!
43//! ```rust
44//! use hammerwork_web::api::archive::ArchivedJobInfo;
45//! use hammerwork::archive::{ArchivalReason, ArchivedJob};
46//! use hammerwork::{JobId, JobStatus};
47//! use chrono::Utc;
48//! use uuid::Uuid;
49//!
50//! let archived_job = ArchivedJobInfo {
51//!     id: Uuid::new_v4(),
52//!     queue_name: "email_queue".to_string(),
53//!     status: JobStatus::Completed,
54//!     created_at: Utc::now(),
55//!     archived_at: Utc::now(),
56//!     archival_reason: ArchivalReason::Automatic,
57//!     original_payload_size: Some(1024),
58//!     payload_compressed: true,
59//!     archived_by: Some("system".to_string()),
60//! };
61//!
62//! assert_eq!(archived_job.queue_name, "email_queue");
63//! assert!(archived_job.payload_compressed);
64//! ```
65
66use super::{ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams};
67use hammerwork::{
68    JobId, JobStatus,
69    archive::{ArchivalConfig, ArchivalPolicy, ArchivalReason, ArchivalStats, ArchivedJob},
70    queue::DatabaseQueue,
71};
72use serde::{Deserialize, Serialize};
73use std::sync::Arc;
74use warp::{Filter, Reply};
75
76/// Request to archive jobs
77#[derive(Debug, Deserialize)]
78pub struct ArchiveRequest {
79    /// Optional queue name to limit archival to specific queue
80    pub queue_name: Option<String>,
81    /// Reason for archival
82    pub reason: ArchivalReason,
83    /// Who initiated the archival
84    pub archived_by: Option<String>,
85    /// Whether this is a dry run (don't actually archive)
86    pub dry_run: bool,
87    /// Archival policy to use (optional, uses default if not provided)
88    pub policy: Option<ArchivalPolicy>,
89    /// Archival configuration (optional, uses default if not provided)
90    pub config: Option<ArchivalConfig>,
91}
92
93impl Default for ArchiveRequest {
94    fn default() -> Self {
95        Self {
96            queue_name: None,
97            reason: ArchivalReason::Manual,
98            archived_by: None,
99            dry_run: true,
100            policy: None,
101            config: None,
102        }
103    }
104}
105
106/// Response from archiving jobs
107#[derive(Debug, Serialize)]
108pub struct ArchiveResponse {
109    /// Statistics from the archival operation
110    pub stats: ArchivalStats,
111    /// Whether this was a dry run
112    pub dry_run: bool,
113    /// Policy used for archival
114    pub policy_used: ArchivalPolicy,
115    /// Configuration used for archival
116    pub config_used: ArchivalConfig,
117}
118
119/// Request to restore an archived job
120#[derive(Debug, Deserialize)]
121pub struct RestoreRequest {
122    /// Optional reason for restoration
123    pub reason: Option<String>,
124    /// Who initiated the restoration
125    pub restored_by: Option<String>,
126}
127
128/// Response from restoring a job
129#[derive(Debug, Serialize)]
130pub struct RestoreResponse {
131    /// The restored job
132    pub job: hammerwork::Job,
133    /// When the job was restored
134    pub restored_at: chrono::DateTime<chrono::Utc>,
135    /// Who restored the job
136    pub restored_by: Option<String>,
137}
138
139/// Request to purge archived jobs
140#[derive(Debug, Deserialize)]
141pub struct PurgeRequest {
142    /// Delete archived jobs older than this date
143    pub older_than: chrono::DateTime<chrono::Utc>,
144    /// Whether this is a dry run
145    pub dry_run: bool,
146    /// Who initiated the purge
147    pub purged_by: Option<String>,
148}
149
150/// Response from purging archived jobs
151#[derive(Debug, Serialize)]
152pub struct PurgeResponse {
153    /// Number of jobs that would be (or were) purged
154    pub jobs_purged: u64,
155    /// Whether this was a dry run
156    pub dry_run: bool,
157    /// When the purge was executed
158    pub executed_at: chrono::DateTime<chrono::Utc>,
159}
160
161/// Archived job information for API responses
162#[derive(Debug, Serialize)]
163pub struct ArchivedJobInfo {
164    /// Job ID
165    pub id: JobId,
166    /// Queue name
167    pub queue_name: String,
168    /// Original job status
169    pub status: JobStatus,
170    /// When the job was created
171    pub created_at: chrono::DateTime<chrono::Utc>,
172    /// When the job was archived
173    pub archived_at: chrono::DateTime<chrono::Utc>,
174    /// Reason for archival
175    pub archival_reason: ArchivalReason,
176    /// Original payload size in bytes
177    pub original_payload_size: Option<usize>,
178    /// Whether the payload was compressed
179    pub payload_compressed: bool,
180    /// Who archived the job
181    pub archived_by: Option<String>,
182}
183
184impl From<ArchivedJob> for ArchivedJobInfo {
185    fn from(archived_job: ArchivedJob) -> Self {
186        Self {
187            id: archived_job.id,
188            queue_name: archived_job.queue_name,
189            status: archived_job.status,
190            created_at: archived_job.created_at,
191            archived_at: archived_job.archived_at,
192            archival_reason: archived_job.archival_reason,
193            original_payload_size: archived_job.original_payload_size,
194            payload_compressed: archived_job.payload_compressed,
195            archived_by: archived_job.archived_by,
196        }
197    }
198}
199
200/// Archival policy configuration request
201#[derive(Debug, Deserialize)]
202pub struct PolicyRequest {
203    /// Policy name/identifier
204    pub name: String,
205    /// Archival policy configuration
206    pub policy: ArchivalPolicy,
207    /// Whether this policy is active
208    pub active: bool,
209}
210
211/// Archival policy response
212#[derive(Debug, Serialize)]
213pub struct PolicyResponse {
214    /// Policy name/identifier
215    pub name: String,
216    /// Archival policy configuration
217    pub policy: ArchivalPolicy,
218    /// Whether this policy is active
219    pub active: bool,
220    /// When the policy was created
221    pub created_at: chrono::DateTime<chrono::Utc>,
222    /// When the policy was last modified
223    pub modified_at: chrono::DateTime<chrono::Utc>,
224}
225
226/// Archive statistics response
227#[derive(Debug, Serialize)]
228pub struct StatsResponse {
229    /// Overall archival statistics
230    pub stats: ArchivalStats,
231    /// Statistics by queue
232    pub by_queue: std::collections::HashMap<String, ArchivalStats>,
233    /// Recent archival operations
234    pub recent_operations: Vec<RecentOperation>,
235}
236
237/// Information about recent archival operations
238#[derive(Debug, Serialize)]
239pub struct RecentOperation {
240    /// Type of operation (archive, restore, purge)
241    pub operation_type: String,
242    /// Queue affected (if applicable)
243    pub queue_name: Option<String>,
244    /// Number of jobs affected
245    pub jobs_affected: u64,
246    /// When the operation occurred
247    pub executed_at: chrono::DateTime<chrono::Utc>,
248    /// Who executed the operation
249    pub executed_by: Option<String>,
250    /// Operation reason
251    pub reason: Option<String>,
252}
253
254/// Archive filter parameters
255#[derive(Debug, Deserialize, Default)]
256pub struct ArchiveFilterParams {
257    /// Filter by queue name
258    pub queue: Option<String>,
259    /// Filter by archival reason
260    pub reason: Option<String>,
261    /// Filter by archived after date
262    pub archived_after: Option<chrono::DateTime<chrono::Utc>>,
263    /// Filter by archived before date
264    pub archived_before: Option<chrono::DateTime<chrono::Utc>>,
265    /// Filter by who archived
266    pub archived_by: Option<String>,
267    /// Filter by compression status
268    pub compressed: Option<bool>,
269    /// Filter by original job status
270    pub original_status: Option<String>,
271}
272
273/// Create archive API routes
274pub fn archive_routes<Q>(
275    queue: Arc<Q>,
276) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
277where
278    Q: DatabaseQueue + Send + Sync + 'static,
279{
280    let archive_jobs = warp::path!("api" / "archive" / "jobs")
281        .and(warp::post())
282        .and(warp::body::json())
283        .and(with_queue(queue.clone()))
284        .and_then(handle_archive_jobs);
285
286    let list_archived = warp::path!("api" / "archive" / "jobs")
287        .and(warp::get())
288        .and(super::with_pagination())
289        .and(with_archive_filters())
290        .and(with_queue(queue.clone()))
291        .and_then(handle_list_archived_jobs);
292
293    let restore_job = warp::path!("api" / "archive" / "jobs" / String / "restore")
294        .and(warp::post())
295        .and(warp::body::json())
296        .and(with_queue(queue.clone()))
297        .and_then(handle_restore_job);
298
299    let purge_jobs = warp::path!("api" / "archive" / "purge")
300        .and(warp::delete())
301        .and(warp::body::json())
302        .and(with_queue(queue.clone()))
303        .and_then(handle_purge_jobs);
304
305    let archive_stats = warp::path!("api" / "archive" / "stats")
306        .and(warp::get())
307        .and(warp::query::<FilterParams>())
308        .and(with_queue(queue.clone()))
309        .and_then(handle_archive_stats);
310
311    archive_jobs
312        .or(list_archived)
313        .or(restore_job)
314        .or(purge_jobs)
315        .or(archive_stats)
316}
317
318/// Helper to inject queue into handlers
319fn with_queue<Q>(
320    queue: Arc<Q>,
321) -> impl Filter<Extract = (Arc<Q>,), Error = std::convert::Infallible> + Clone
322where
323    Q: DatabaseQueue + Send + Sync + 'static,
324{
325    warp::any().map(move || queue.clone())
326}
327
328/// Extract archive filter parameters from query string
329fn with_archive_filters()
330-> impl Filter<Extract = (ArchiveFilterParams,), Error = warp::Rejection> + Clone {
331    warp::query::<ArchiveFilterParams>()
332}
333
334/// Handle archive jobs request
335async fn handle_archive_jobs<Q>(
336    request: ArchiveRequest,
337    queue: Arc<Q>,
338) -> Result<impl Reply, warp::Rejection>
339where
340    Q: DatabaseQueue + Send + Sync + 'static,
341{
342    let policy = request.policy.unwrap_or_default();
343    let config = request.config.unwrap_or_default();
344
345    if request.dry_run {
346        // For dry run, return what would happen without actually archiving
347        let response = ArchiveResponse {
348            stats: ArchivalStats::default(), // In a real dry run, we'd calculate this
349            dry_run: true,
350            policy_used: policy,
351            config_used: config,
352        };
353        return Ok(warp::reply::json(&ApiResponse::success(response)));
354    }
355
356    match queue
357        .archive_jobs(
358            request.queue_name.as_deref(),
359            &policy,
360            &config,
361            request.reason,
362            request.archived_by.as_deref(),
363        )
364        .await
365    {
366        Ok(stats) => {
367            let response = ArchiveResponse {
368                stats,
369                dry_run: false,
370                policy_used: policy,
371                config_used: config,
372            };
373            Ok(warp::reply::json(&ApiResponse::success(response)))
374        }
375        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
376            "Failed to archive jobs: {}",
377            e
378        )))),
379    }
380}
381
382/// Handle list archived jobs request
383async fn handle_list_archived_jobs<Q>(
384    pagination: PaginationParams,
385    filters: ArchiveFilterParams,
386    queue: Arc<Q>,
387) -> Result<impl Reply, warp::Rejection>
388where
389    Q: DatabaseQueue + Send + Sync + 'static,
390{
391    match queue
392        .list_archived_jobs(
393            filters.queue.as_deref(),
394            Some(pagination.get_limit()),
395            Some(pagination.get_offset()),
396        )
397        .await
398    {
399        Ok(archived_jobs) => {
400            // Convert to API format
401            let jobs: Vec<ArchivedJobInfo> = archived_jobs.into_iter().map(Into::into).collect();
402
403            // For simplicity, we'll use the returned count as total
404            // In a real implementation, you'd want a separate count query
405            let total = jobs.len() as u64;
406
407            let pagination_meta = PaginationMeta::new(&pagination, total);
408            let response = PaginatedResponse {
409                items: jobs,
410                pagination: pagination_meta,
411            };
412
413            Ok(warp::reply::json(&ApiResponse::success(response)))
414        }
415        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
416            "Failed to list archived jobs: {}",
417            e
418        )))),
419    }
420}
421
422/// Handle restore job request
423async fn handle_restore_job<Q>(
424    job_id_str: String,
425    request: RestoreRequest,
426    queue: Arc<Q>,
427) -> Result<impl Reply, warp::Rejection>
428where
429    Q: DatabaseQueue + Send + Sync + 'static,
430{
431    let job_id = match uuid::Uuid::parse_str(&job_id_str) {
432        Ok(id) => id,
433        Err(_) => {
434            return Ok(warp::reply::json(&ApiResponse::<()>::error(
435                "Invalid job ID format".to_string(),
436            )));
437        }
438    };
439
440    match queue.restore_archived_job(job_id).await {
441        Ok(job) => {
442            let response = RestoreResponse {
443                job,
444                restored_at: chrono::Utc::now(),
445                restored_by: request.restored_by,
446            };
447            Ok(warp::reply::json(&ApiResponse::success(response)))
448        }
449        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
450            "Failed to restore job: {}",
451            e
452        )))),
453    }
454}
455
456/// Handle purge jobs request
457async fn handle_purge_jobs<Q>(
458    request: PurgeRequest,
459    queue: Arc<Q>,
460) -> Result<impl Reply, warp::Rejection>
461where
462    Q: DatabaseQueue + Send + Sync + 'static,
463{
464    if request.dry_run {
465        // For dry run, we'd need to implement a count query
466        // For now, return a placeholder
467        let response = PurgeResponse {
468            jobs_purged: 0, // Would calculate this in a real implementation
469            dry_run: true,
470            executed_at: chrono::Utc::now(),
471        };
472        return Ok(warp::reply::json(&ApiResponse::success(response)));
473    }
474
475    match queue.purge_archived_jobs(request.older_than).await {
476        Ok(jobs_purged) => {
477            let response = PurgeResponse {
478                jobs_purged,
479                dry_run: false,
480                executed_at: chrono::Utc::now(),
481            };
482            Ok(warp::reply::json(&ApiResponse::success(response)))
483        }
484        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
485            "Failed to purge archived jobs: {}",
486            e
487        )))),
488    }
489}
490
491/// Handle archive stats request
492async fn handle_archive_stats<Q>(
493    filters: FilterParams,
494    queue: Arc<Q>,
495) -> Result<impl Reply, warp::Rejection>
496where
497    Q: DatabaseQueue + Send + Sync + 'static,
498{
499    match queue.get_archival_stats(filters.queue.as_deref()).await {
500        Ok(stats) => {
501            // For now, return simple stats. In a real implementation,
502            // you'd collect per-queue stats and recent operations
503            let response = StatsResponse {
504                stats,
505                by_queue: std::collections::HashMap::new(),
506                recent_operations: vec![],
507            };
508            Ok(warp::reply::json(&ApiResponse::success(response)))
509        }
510        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
511            "Failed to get archive stats: {}",
512            e
513        )))),
514    }
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520    use chrono::Duration;
521
522    #[test]
523    fn test_archive_request_default() {
524        let request = ArchiveRequest::default();
525        assert!(request.dry_run);
526        assert_eq!(request.reason, ArchivalReason::Manual);
527        assert!(request.queue_name.is_none());
528    }
529
530    #[test]
531    fn test_archived_job_info_conversion() {
532        let archived_job = ArchivedJob {
533            id: uuid::Uuid::new_v4(),
534            queue_name: "test_queue".to_string(),
535            status: JobStatus::Completed,
536            created_at: chrono::Utc::now(),
537            archived_at: chrono::Utc::now(),
538            archival_reason: ArchivalReason::Automatic,
539            original_payload_size: Some(1024),
540            payload_compressed: true,
541            archived_by: Some("system".to_string()),
542        };
543
544        let info: ArchivedJobInfo = archived_job.into();
545        assert_eq!(info.queue_name, "test_queue");
546        assert!(info.payload_compressed);
547        assert_eq!(info.archival_reason, ArchivalReason::Automatic);
548    }
549
550    #[test]
551    fn test_purge_request_validation() {
552        let request = PurgeRequest {
553            older_than: chrono::Utc::now() - Duration::days(365),
554            dry_run: true,
555            purged_by: Some("admin".to_string()),
556        };
557
558        assert!(request.dry_run);
559        assert_eq!(request.purged_by, Some("admin".to_string()));
560    }
561}