hammerwork_web/api/
archive.rs

1//! Archive management API endpoints.
2//!
3//! This module provides REST API endpoints for managing job archiving operations,
4//! including archiving jobs, restoring archived jobs, listing archived jobs,
5//! and configuring archival policies.
6//!
7//! # Endpoints
8//!
9//! - `POST /api/archive/jobs` - Archive jobs based on policy
10//! - `GET /api/archive/jobs` - List archived jobs with pagination and filtering
11//! - `POST /api/archive/jobs/{id}/restore` - Restore an archived job
12//! - `DELETE /api/archive/jobs` - Purge old archived jobs
13//! - `GET /api/archive/stats` - Get archival statistics
14//! - `GET /api/archive/policies` - List archival policies
15//! - `POST /api/archive/policies` - Create or update archival policy
16//! - `DELETE /api/archive/policies/{id}` - Delete archival policy
17//!
18//! # Examples
19//!
20//! ## Archive Jobs
21//!
22//! ```rust
23//! use hammerwork_web::api::archive::{ArchiveRequest, ArchiveResponse};
24//! use hammerwork::archive::{ArchivalReason, ArchivalPolicy};
25//! use chrono::Duration;
26//!
27//! let request = ArchiveRequest {
28//!     queue_name: Some("completed_jobs".to_string()),
29//!     reason: ArchivalReason::Automatic,
30//!     archived_by: Some("scheduler".to_string()),
31//!     dry_run: false,
32//!     policy: Some(ArchivalPolicy::new()
33//!         .archive_completed_after(Duration::days(7))),
34//! };
35//!
36//! // This would be sent to POST /api/archive/jobs
37//! assert_eq!(request.queue_name, Some("completed_jobs".to_string()));
38//! assert!(!request.dry_run);
39//! ```
40//!
41//! ## List Archived Jobs
42//!
43//! ```rust
44//! use hammerwork_web::api::archive::ArchivedJobInfo;
45//! use hammerwork::archive::{ArchivalReason, ArchivedJob};
46//! use hammerwork::{JobId, JobStatus};
47//! use chrono::Utc;
48//! use uuid::Uuid;
49//!
50//! let archived_job = ArchivedJobInfo {
51//!     id: Uuid::new_v4(),
52//!     queue_name: "email_queue".to_string(),
53//!     status: JobStatus::Completed,
54//!     created_at: Utc::now(),
55//!     archived_at: Utc::now(),
56//!     archival_reason: ArchivalReason::Automatic,
57//!     original_payload_size: Some(1024),
58//!     payload_compressed: true,
59//!     archived_by: Some("system".to_string()),
60//! };
61//!
62//! assert_eq!(archived_job.queue_name, "email_queue");
63//! assert!(archived_job.payload_compressed);
64//! ```
65
66use super::{ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams};
67use hammerwork::{
68    archive::{ArchivalConfig, ArchivalPolicy, ArchivalReason, ArchivalStats, ArchivedJob},
69    queue::DatabaseQueue,
70    JobId, JobStatus,
71};
72use serde::{Deserialize, Serialize};
73use std::sync::Arc;
74use warp::{Filter, Reply};
75
76/// Request to archive jobs
77#[derive(Debug, Deserialize)]
78pub struct ArchiveRequest {
79    /// Optional queue name to limit archival to specific queue
80    pub queue_name: Option<String>,
81    /// Reason for archival
82    pub reason: ArchivalReason,
83    /// Who initiated the archival
84    pub archived_by: Option<String>,
85    /// Whether this is a dry run (don't actually archive)
86    pub dry_run: bool,
87    /// Archival policy to use (optional, uses default if not provided)
88    pub policy: Option<ArchivalPolicy>,
89    /// Archival configuration (optional, uses default if not provided)
90    pub config: Option<ArchivalConfig>,
91}
92
93impl Default for ArchiveRequest {
94    fn default() -> Self {
95        Self {
96            queue_name: None,
97            reason: ArchivalReason::Manual,
98            archived_by: None,
99            dry_run: true,
100            policy: None,
101            config: None,
102        }
103    }
104}
105
106/// Response from archiving jobs
107#[derive(Debug, Serialize)]
108pub struct ArchiveResponse {
109    /// Statistics from the archival operation
110    pub stats: ArchivalStats,
111    /// Whether this was a dry run
112    pub dry_run: bool,
113    /// Policy used for archival
114    pub policy_used: ArchivalPolicy,
115    /// Configuration used for archival
116    pub config_used: ArchivalConfig,
117}
118
119/// Request to restore an archived job
120#[derive(Debug, Deserialize)]
121pub struct RestoreRequest {
122    /// Optional reason for restoration
123    pub reason: Option<String>,
124    /// Who initiated the restoration
125    pub restored_by: Option<String>,
126}
127
128/// Response from restoring a job
129#[derive(Debug, Serialize)]
130pub struct RestoreResponse {
131    /// The restored job
132    pub job: hammerwork::Job,
133    /// When the job was restored
134    pub restored_at: chrono::DateTime<chrono::Utc>,
135    /// Who restored the job
136    pub restored_by: Option<String>,
137}
138
139/// Request to purge archived jobs
140#[derive(Debug, Deserialize)]
141pub struct PurgeRequest {
142    /// Delete archived jobs older than this date
143    pub older_than: chrono::DateTime<chrono::Utc>,
144    /// Whether this is a dry run
145    pub dry_run: bool,
146    /// Who initiated the purge
147    pub purged_by: Option<String>,
148}
149
150/// Response from purging archived jobs
151#[derive(Debug, Serialize)]
152pub struct PurgeResponse {
153    /// Number of jobs that would be (or were) purged
154    pub jobs_purged: u64,
155    /// Whether this was a dry run
156    pub dry_run: bool,
157    /// When the purge was executed
158    pub executed_at: chrono::DateTime<chrono::Utc>,
159}
160
161/// Archived job information for API responses
162#[derive(Debug, Serialize)]
163pub struct ArchivedJobInfo {
164    /// Job ID
165    pub id: JobId,
166    /// Queue name
167    pub queue_name: String,
168    /// Original job status
169    pub status: JobStatus,
170    /// When the job was created
171    pub created_at: chrono::DateTime<chrono::Utc>,
172    /// When the job was archived
173    pub archived_at: chrono::DateTime<chrono::Utc>,
174    /// Reason for archival
175    pub archival_reason: ArchivalReason,
176    /// Original payload size in bytes
177    pub original_payload_size: Option<usize>,
178    /// Whether the payload was compressed
179    pub payload_compressed: bool,
180    /// Who archived the job
181    pub archived_by: Option<String>,
182}
183
184impl From<ArchivedJob> for ArchivedJobInfo {
185    fn from(archived_job: ArchivedJob) -> Self {
186        Self {
187            id: archived_job.id,
188            queue_name: archived_job.queue_name,
189            status: archived_job.status,
190            created_at: archived_job.created_at,
191            archived_at: archived_job.archived_at,
192            archival_reason: archived_job.archival_reason,
193            original_payload_size: archived_job.original_payload_size,
194            payload_compressed: archived_job.payload_compressed,
195            archived_by: archived_job.archived_by,
196        }
197    }
198}
199
200/// Archival policy configuration request
201#[derive(Debug, Deserialize)]
202pub struct PolicyRequest {
203    /// Policy name/identifier
204    pub name: String,
205    /// Archival policy configuration
206    pub policy: ArchivalPolicy,
207    /// Whether this policy is active
208    pub active: bool,
209}
210
211/// Archival policy response
212#[derive(Debug, Serialize)]
213pub struct PolicyResponse {
214    /// Policy name/identifier
215    pub name: String,
216    /// Archival policy configuration
217    pub policy: ArchivalPolicy,
218    /// Whether this policy is active
219    pub active: bool,
220    /// When the policy was created
221    pub created_at: chrono::DateTime<chrono::Utc>,
222    /// When the policy was last modified
223    pub modified_at: chrono::DateTime<chrono::Utc>,
224}
225
226/// Archive statistics response
227#[derive(Debug, Serialize)]
228pub struct StatsResponse {
229    /// Overall archival statistics
230    pub stats: ArchivalStats,
231    /// Statistics by queue
232    pub by_queue: std::collections::HashMap<String, ArchivalStats>,
233    /// Recent archival operations
234    pub recent_operations: Vec<RecentOperation>,
235}
236
237/// Information about recent archival operations
238#[derive(Debug, Serialize)]
239pub struct RecentOperation {
240    /// Type of operation (archive, restore, purge)
241    pub operation_type: String,
242    /// Queue affected (if applicable)
243    pub queue_name: Option<String>,
244    /// Number of jobs affected
245    pub jobs_affected: u64,
246    /// When the operation occurred
247    pub executed_at: chrono::DateTime<chrono::Utc>,
248    /// Who executed the operation
249    pub executed_by: Option<String>,
250    /// Operation reason
251    pub reason: Option<String>,
252}
253
254/// Archive filter parameters
255#[derive(Debug, Deserialize, Default)]
256pub struct ArchiveFilterParams {
257    /// Filter by queue name
258    pub queue: Option<String>,
259    /// Filter by archival reason
260    pub reason: Option<String>,
261    /// Filter by archived after date
262    pub archived_after: Option<chrono::DateTime<chrono::Utc>>,
263    /// Filter by archived before date
264    pub archived_before: Option<chrono::DateTime<chrono::Utc>>,
265    /// Filter by who archived
266    pub archived_by: Option<String>,
267    /// Filter by compression status
268    pub compressed: Option<bool>,
269    /// Filter by original job status
270    pub original_status: Option<String>,
271}
272
273/// Create archive API routes
274pub fn archive_routes<Q>(
275    queue: Arc<Q>,
276) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
277where
278    Q: DatabaseQueue + Send + Sync + 'static,
279{
280    let archive_jobs = warp::path!("api" / "archive" / "jobs")
281        .and(warp::post())
282        .and(warp::body::json())
283        .and(with_queue(queue.clone()))
284        .and_then(handle_archive_jobs);
285
286    let list_archived = warp::path!("api" / "archive" / "jobs")
287        .and(warp::get())
288        .and(super::with_pagination())
289        .and(with_archive_filters())
290        .and(with_queue(queue.clone()))
291        .and_then(handle_list_archived_jobs);
292
293    let restore_job = warp::path!("api" / "archive" / "jobs" / String / "restore")
294        .and(warp::post())
295        .and(warp::body::json())
296        .and(with_queue(queue.clone()))
297        .and_then(handle_restore_job);
298
299    let purge_jobs = warp::path!("api" / "archive" / "purge")
300        .and(warp::delete())
301        .and(warp::body::json())
302        .and(with_queue(queue.clone()))
303        .and_then(handle_purge_jobs);
304
305    let archive_stats = warp::path!("api" / "archive" / "stats")
306        .and(warp::get())
307        .and(warp::query::<FilterParams>())
308        .and(with_queue(queue.clone()))
309        .and_then(handle_archive_stats);
310
311    archive_jobs
312        .or(list_archived)
313        .or(restore_job)
314        .or(purge_jobs)
315        .or(archive_stats)
316}
317
318/// Helper to inject queue into handlers
319fn with_queue<Q>(
320    queue: Arc<Q>,
321) -> impl Filter<Extract = (Arc<Q>,), Error = std::convert::Infallible> + Clone
322where
323    Q: DatabaseQueue + Send + Sync + 'static,
324{
325    warp::any().map(move || queue.clone())
326}
327
328/// Extract archive filter parameters from query string
329fn with_archive_filters() -> impl Filter<Extract = (ArchiveFilterParams,), Error = warp::Rejection> + Clone {
330    warp::query::<ArchiveFilterParams>()
331}
332
333/// Handle archive jobs request
334async fn handle_archive_jobs<Q>(
335    request: ArchiveRequest,
336    queue: Arc<Q>,
337) -> Result<impl Reply, warp::Rejection>
338where
339    Q: DatabaseQueue + Send + Sync + 'static,
340{
341    let policy = request.policy.unwrap_or_default();
342    let config = request.config.unwrap_or_default();
343
344    if request.dry_run {
345        // For dry run, return what would happen without actually archiving
346        let response = ArchiveResponse {
347            stats: ArchivalStats::default(), // In a real dry run, we'd calculate this
348            dry_run: true,
349            policy_used: policy,
350            config_used: config,
351        };
352        return Ok(warp::reply::json(&ApiResponse::success(response)));
353    }
354
355    match queue
356        .archive_jobs(
357            request.queue_name.as_deref(),
358            &policy,
359            &config,
360            request.reason,
361            request.archived_by.as_deref(),
362        )
363        .await
364    {
365        Ok(stats) => {
366            let response = ArchiveResponse {
367                stats,
368                dry_run: false,
369                policy_used: policy,
370                config_used: config,
371            };
372            Ok(warp::reply::json(&ApiResponse::success(response)))
373        }
374        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(
375            format!("Failed to archive jobs: {}", e),
376        ))),
377    }
378}
379
380/// Handle list archived jobs request
381async fn handle_list_archived_jobs<Q>(
382    pagination: PaginationParams,
383    filters: ArchiveFilterParams,
384    queue: Arc<Q>,
385) -> Result<impl Reply, warp::Rejection>
386where
387    Q: DatabaseQueue + Send + Sync + 'static,
388{
389    match queue
390        .list_archived_jobs(
391            filters.queue.as_deref(),
392            Some(pagination.get_limit()),
393            Some(pagination.get_offset()),
394        )
395        .await
396    {
397        Ok(archived_jobs) => {
398            // Convert to API format
399            let jobs: Vec<ArchivedJobInfo> = archived_jobs.into_iter().map(Into::into).collect();
400            
401            // For simplicity, we'll use the returned count as total
402            // In a real implementation, you'd want a separate count query
403            let total = jobs.len() as u64;
404            
405            let pagination_meta = PaginationMeta::new(&pagination, total);
406            let response = PaginatedResponse {
407                items: jobs,
408                pagination: pagination_meta,
409            };
410            
411            Ok(warp::reply::json(&ApiResponse::success(response)))
412        }
413        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(
414            format!("Failed to list archived jobs: {}", e),
415        ))),
416    }
417}
418
419/// Handle restore job request
420async fn handle_restore_job<Q>(
421    job_id_str: String,
422    request: RestoreRequest,
423    queue: Arc<Q>,
424) -> Result<impl Reply, warp::Rejection>
425where
426    Q: DatabaseQueue + Send + Sync + 'static,
427{
428    let job_id = match uuid::Uuid::parse_str(&job_id_str) {
429        Ok(id) => id,
430        Err(_) => {
431            return Ok(warp::reply::json(&ApiResponse::<()>::error(
432                "Invalid job ID format".to_string(),
433            )));
434        }
435    };
436
437    match queue.restore_archived_job(job_id).await {
438        Ok(job) => {
439            let response = RestoreResponse {
440                job,
441                restored_at: chrono::Utc::now(),
442                restored_by: request.restored_by,
443            };
444            Ok(warp::reply::json(&ApiResponse::success(response)))
445        }
446        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(
447            format!("Failed to restore job: {}", e),
448        ))),
449    }
450}
451
452/// Handle purge jobs request
453async fn handle_purge_jobs<Q>(
454    request: PurgeRequest,
455    queue: Arc<Q>,
456) -> Result<impl Reply, warp::Rejection>
457where
458    Q: DatabaseQueue + Send + Sync + 'static,
459{
460    if request.dry_run {
461        // For dry run, we'd need to implement a count query
462        // For now, return a placeholder
463        let response = PurgeResponse {
464            jobs_purged: 0, // Would calculate this in a real implementation
465            dry_run: true,
466            executed_at: chrono::Utc::now(),
467        };
468        return Ok(warp::reply::json(&ApiResponse::success(response)));
469    }
470
471    match queue.purge_archived_jobs(request.older_than).await {
472        Ok(jobs_purged) => {
473            let response = PurgeResponse {
474                jobs_purged,
475                dry_run: false,
476                executed_at: chrono::Utc::now(),
477            };
478            Ok(warp::reply::json(&ApiResponse::success(response)))
479        }
480        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(
481            format!("Failed to purge archived jobs: {}", e),
482        ))),
483    }
484}
485
486/// Handle archive stats request
487async fn handle_archive_stats<Q>(
488    filters: FilterParams,
489    queue: Arc<Q>,
490) -> Result<impl Reply, warp::Rejection>
491where
492    Q: DatabaseQueue + Send + Sync + 'static,
493{
494    match queue.get_archival_stats(filters.queue.as_deref()).await {
495        Ok(stats) => {
496            // For now, return simple stats. In a real implementation,
497            // you'd collect per-queue stats and recent operations
498            let response = StatsResponse {
499                stats,
500                by_queue: std::collections::HashMap::new(),
501                recent_operations: vec![],
502            };
503            Ok(warp::reply::json(&ApiResponse::success(response)))
504        }
505        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(
506            format!("Failed to get archive stats: {}", e),
507        ))),
508    }
509}
510
511#[cfg(test)]
512mod tests {
513    use super::*;
514    use chrono::Duration;
515
516    #[test]
517    fn test_archive_request_default() {
518        let request = ArchiveRequest::default();
519        assert!(request.dry_run);
520        assert_eq!(request.reason, ArchivalReason::Manual);
521        assert!(request.queue_name.is_none());
522    }
523
524    #[test]
525    fn test_archived_job_info_conversion() {
526        let archived_job = ArchivedJob {
527            id: uuid::Uuid::new_v4(),
528            queue_name: "test_queue".to_string(),
529            status: JobStatus::Completed,
530            created_at: chrono::Utc::now(),
531            archived_at: chrono::Utc::now(),
532            archival_reason: ArchivalReason::Automatic,
533            original_payload_size: Some(1024),
534            payload_compressed: true,
535            archived_by: Some("system".to_string()),
536        };
537
538        let info: ArchivedJobInfo = archived_job.into();
539        assert_eq!(info.queue_name, "test_queue");
540        assert!(info.payload_compressed);
541        assert_eq!(info.archival_reason, ArchivalReason::Automatic);
542    }
543
544    #[test]
545    fn test_purge_request_validation() {
546        let request = PurgeRequest {
547            older_than: chrono::Utc::now() - Duration::days(365),
548            dry_run: true,
549            purged_by: Some("admin".to_string()),
550        };
551
552        assert!(request.dry_run);
553        assert_eq!(request.purged_by, Some("admin".to_string()));
554    }
555}