hammerwork_web/api/
archive.rs

1//! Archive management API endpoints.
2//!
3//! This module provides REST API endpoints for managing job archiving operations,
4//! including archiving jobs, restoring archived jobs, listing archived jobs,
5//! and configuring archival policies.
6//!
7//! # Endpoints
8//!
9//! - `POST /api/archive/jobs` - Archive jobs based on policy
10//! - `GET /api/archive/jobs` - List archived jobs with pagination and filtering
11//! - `POST /api/archive/jobs/{id}/restore` - Restore an archived job
12//! - `DELETE /api/archive/jobs` - Purge old archived jobs
13//! - `GET /api/archive/stats` - Get archival statistics
14//! - `GET /api/archive/policies` - List archival policies
15//! - `POST /api/archive/policies` - Create or update archival policy
16//! - `DELETE /api/archive/policies/{id}` - Delete archival policy
17//!
18//! # Examples
19//!
20//! ## Archive Jobs
21//!
22//! ```rust
23//! use hammerwork_web::api::archive::{ArchiveRequest, ArchiveResponse};
24//! use hammerwork::archive::{ArchivalReason, ArchivalPolicy};
25//! use chrono::Duration;
26//!
27//! let request = ArchiveRequest {
28//!     queue_name: Some("completed_jobs".to_string()),
29//!     reason: ArchivalReason::Automatic,
30//!     archived_by: Some("scheduler".to_string()),
31//!     dry_run: false,
32//!     policy: Some(ArchivalPolicy::new()
33//!         .archive_completed_after(Duration::days(7))),
34//! };
35//!
36//! // This would be sent to POST /api/archive/jobs
37//! assert_eq!(request.queue_name, Some("completed_jobs".to_string()));
38//! assert!(!request.dry_run);
39//! ```
40//!
41//! ## List Archived Jobs
42//!
43//! ```rust
44//! use hammerwork_web::api::archive::ArchivedJobInfo;
45//! use hammerwork::archive::{ArchivalReason, ArchivedJob};
46//! use hammerwork::{JobId, JobStatus};
47//! use chrono::Utc;
48//! use uuid::Uuid;
49//!
50//! let archived_job = ArchivedJobInfo {
51//!     id: Uuid::new_v4(),
52//!     queue_name: "email_queue".to_string(),
53//!     status: JobStatus::Completed,
54//!     created_at: Utc::now(),
55//!     archived_at: Utc::now(),
56//!     archival_reason: ArchivalReason::Automatic,
57//!     original_payload_size: Some(1024),
58//!     payload_compressed: true,
59//!     archived_by: Some("system".to_string()),
60//! };
61//!
62//! assert_eq!(archived_job.queue_name, "email_queue");
63//! assert!(archived_job.payload_compressed);
64//! ```
65
66use super::{ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams};
67use hammerwork::{
68    JobId, JobStatus,
69    archive::{ArchivalConfig, ArchivalPolicy, ArchivalReason, ArchivalStats, ArchivedJob},
70    queue::DatabaseQueue,
71};
72use serde::{Deserialize, Serialize};
73use std::sync::Arc;
74use warp::{Filter, Reply};
75
76/// Request to archive jobs
77#[derive(Debug, Deserialize)]
78pub struct ArchiveRequest {
79    /// Optional queue name to limit archival to specific queue
80    pub queue_name: Option<String>,
81    /// Reason for archival
82    pub reason: ArchivalReason,
83    /// Who initiated the archival
84    pub archived_by: Option<String>,
85    /// Whether this is a dry run (don't actually archive)
86    pub dry_run: bool,
87    /// Archival policy to use (optional, uses default if not provided)
88    pub policy: Option<ArchivalPolicy>,
89    /// Archival configuration (optional, uses default if not provided)
90    pub config: Option<ArchivalConfig>,
91}
92
93impl Default for ArchiveRequest {
94    fn default() -> Self {
95        Self {
96            queue_name: None,
97            reason: ArchivalReason::Manual,
98            archived_by: None,
99            dry_run: true,
100            policy: None,
101            config: None,
102        }
103    }
104}
105
106/// Response from archiving jobs
107#[derive(Debug, Serialize)]
108pub struct ArchiveResponse {
109    /// Statistics from the archival operation
110    pub stats: ArchivalStats,
111    /// Whether this was a dry run
112    pub dry_run: bool,
113    /// Policy used for archival
114    pub policy_used: ArchivalPolicy,
115    /// Configuration used for archival
116    pub config_used: ArchivalConfig,
117}
118
119/// Request to restore an archived job
120#[derive(Debug, Deserialize)]
121pub struct RestoreRequest {
122    /// Optional reason for restoration
123    pub reason: Option<String>,
124    /// Who initiated the restoration
125    pub restored_by: Option<String>,
126}
127
128/// Response from restoring a job
129#[derive(Debug, Serialize)]
130pub struct RestoreResponse {
131    /// The restored job
132    pub job: hammerwork::Job,
133    /// When the job was restored
134    pub restored_at: chrono::DateTime<chrono::Utc>,
135    /// Who restored the job
136    pub restored_by: Option<String>,
137}
138
139/// Request to purge archived jobs
140#[derive(Debug, Deserialize)]
141pub struct PurgeRequest {
142    /// Delete archived jobs older than this date
143    pub older_than: chrono::DateTime<chrono::Utc>,
144    /// Whether this is a dry run
145    pub dry_run: bool,
146    /// Who initiated the purge
147    pub purged_by: Option<String>,
148}
149
150/// Response from purging archived jobs
151#[derive(Debug, Serialize)]
152pub struct PurgeResponse {
153    /// Number of jobs that would be (or were) purged
154    pub jobs_purged: u64,
155    /// Whether this was a dry run
156    pub dry_run: bool,
157    /// When the purge was executed
158    pub executed_at: chrono::DateTime<chrono::Utc>,
159}
160
161/// Archived job information for API responses
162#[derive(Debug, Serialize)]
163pub struct ArchivedJobInfo {
164    /// Job ID
165    pub id: JobId,
166    /// Queue name
167    pub queue_name: String,
168    /// Original job status
169    pub status: JobStatus,
170    /// When the job was created
171    pub created_at: chrono::DateTime<chrono::Utc>,
172    /// When the job was archived
173    pub archived_at: chrono::DateTime<chrono::Utc>,
174    /// Reason for archival
175    pub archival_reason: ArchivalReason,
176    /// Original payload size in bytes
177    pub original_payload_size: Option<usize>,
178    /// Whether the payload was compressed
179    pub payload_compressed: bool,
180    /// Who archived the job
181    pub archived_by: Option<String>,
182}
183
184impl From<ArchivedJob> for ArchivedJobInfo {
185    fn from(archived_job: ArchivedJob) -> Self {
186        Self {
187            id: archived_job.id,
188            queue_name: archived_job.queue_name,
189            status: archived_job.status,
190            created_at: archived_job.created_at,
191            archived_at: archived_job.archived_at,
192            archival_reason: archived_job.archival_reason,
193            original_payload_size: archived_job.original_payload_size,
194            payload_compressed: archived_job.payload_compressed,
195            archived_by: archived_job.archived_by,
196        }
197    }
198}
199
200/// Archival policy configuration request
201#[derive(Debug, Deserialize)]
202pub struct PolicyRequest {
203    /// Policy name/identifier
204    pub name: String,
205    /// Archival policy configuration
206    pub policy: ArchivalPolicy,
207    /// Whether this policy is active
208    pub active: bool,
209}
210
211/// Archival policy response
212#[derive(Debug, Serialize)]
213pub struct PolicyResponse {
214    /// Policy name/identifier
215    pub name: String,
216    /// Archival policy configuration
217    pub policy: ArchivalPolicy,
218    /// Whether this policy is active
219    pub active: bool,
220    /// When the policy was created
221    pub created_at: chrono::DateTime<chrono::Utc>,
222    /// When the policy was last modified
223    pub modified_at: chrono::DateTime<chrono::Utc>,
224}
225
226/// Archive statistics response
227#[derive(Debug, Serialize)]
228pub struct StatsResponse {
229    /// Overall archival statistics
230    pub stats: ArchivalStats,
231    /// Statistics by queue
232    pub by_queue: std::collections::HashMap<String, ArchivalStats>,
233    /// Recent archival operations
234    pub recent_operations: Vec<RecentOperation>,
235}
236
237/// Information about recent archival operations
238#[derive(Debug, Serialize)]
239pub struct RecentOperation {
240    /// Type of operation (archive, restore, purge)
241    pub operation_type: String,
242    /// Queue affected (if applicable)
243    pub queue_name: Option<String>,
244    /// Number of jobs affected
245    pub jobs_affected: u64,
246    /// When the operation occurred
247    pub executed_at: chrono::DateTime<chrono::Utc>,
248    /// Who executed the operation
249    pub executed_by: Option<String>,
250    /// Operation reason
251    pub reason: Option<String>,
252}
253
254/// Archive filter parameters
255#[derive(Debug, Deserialize, Default)]
256pub struct ArchiveFilterParams {
257    /// Filter by queue name
258    pub queue: Option<String>,
259    /// Filter by archival reason
260    pub reason: Option<String>,
261    /// Filter by archived after date
262    pub archived_after: Option<chrono::DateTime<chrono::Utc>>,
263    /// Filter by archived before date
264    pub archived_before: Option<chrono::DateTime<chrono::Utc>>,
265    /// Filter by who archived
266    pub archived_by: Option<String>,
267    /// Filter by compression status
268    pub compressed: Option<bool>,
269    /// Filter by original job status
270    pub original_status: Option<String>,
271}
272
273/// Create archive API routes
274pub fn archive_routes<Q>(
275    queue: Arc<Q>,
276) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
277where
278    Q: DatabaseQueue + Send + Sync + 'static,
279{
280    let archive_jobs = warp::path!("api" / "archive" / "jobs")
281        .and(warp::post())
282        .and(warp::body::json())
283        .and(with_queue(queue.clone()))
284        .and_then(handle_archive_jobs);
285
286    let list_archived = warp::path!("api" / "archive" / "jobs")
287        .and(warp::get())
288        .and(super::with_pagination())
289        .and(with_archive_filters())
290        .and(with_queue(queue.clone()))
291        .and_then(handle_list_archived_jobs);
292
293    let restore_job = warp::path!("api" / "archive" / "jobs" / String / "restore")
294        .and(warp::post())
295        .and(warp::body::json())
296        .and(with_queue(queue.clone()))
297        .and_then(handle_restore_job);
298
299    let purge_jobs = warp::path!("api" / "archive" / "purge")
300        .and(warp::delete())
301        .and(warp::body::json())
302        .and(with_queue(queue.clone()))
303        .and_then(handle_purge_jobs);
304
305    let archive_stats = warp::path!("api" / "archive" / "stats")
306        .and(warp::get())
307        .and(warp::query::<FilterParams>())
308        .and(with_queue(queue.clone()))
309        .and_then(handle_archive_stats);
310
311    archive_jobs
312        .or(list_archived)
313        .or(restore_job)
314        .or(purge_jobs)
315        .or(archive_stats)
316}
317
318/// Helper to inject queue into handlers
319fn with_queue<Q>(
320    queue: Arc<Q>,
321) -> impl Filter<Extract = (Arc<Q>,), Error = std::convert::Infallible> + Clone
322where
323    Q: DatabaseQueue + Send + Sync + 'static,
324{
325    warp::any().map(move || queue.clone())
326}
327
328/// Extract archive filter parameters from query string
329fn with_archive_filters()
330-> impl Filter<Extract = (ArchiveFilterParams,), Error = warp::Rejection> + Clone {
331    warp::query::<ArchiveFilterParams>()
332}
333
334/// Handle archive jobs request
335async fn handle_archive_jobs<Q>(
336    request: ArchiveRequest,
337    queue: Arc<Q>,
338) -> Result<impl Reply, warp::Rejection>
339where
340    Q: DatabaseQueue + Send + Sync + 'static,
341{
342    let policy = request.policy.unwrap_or_default();
343    let config = request.config.unwrap_or_default();
344
345    if request.dry_run {
346        // For dry run, return what would happen without actually archiving
347        let response = ArchiveResponse {
348            stats: ArchivalStats::default(), // In a real dry run, we'd calculate this
349            dry_run: true,
350            policy_used: policy,
351            config_used: config,
352        };
353        return Ok(warp::reply::json(&ApiResponse::success(response)));
354    }
355
356    match queue
357        .archive_jobs(
358            request.queue_name.as_deref(),
359            &policy,
360            &config,
361            request.reason,
362            request.archived_by.as_deref(),
363        )
364        .await
365    {
366        Ok(stats) => {
367            let response = ArchiveResponse {
368                stats,
369                dry_run: false,
370                policy_used: policy,
371                config_used: config,
372            };
373            Ok(warp::reply::json(&ApiResponse::success(response)))
374        }
375        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
376            "Failed to archive jobs: {}",
377            e
378        )))),
379    }
380}
381
382/// Handle list archived jobs request
383async fn handle_list_archived_jobs<Q>(
384    pagination: PaginationParams,
385    filters: ArchiveFilterParams,
386    queue: Arc<Q>,
387) -> Result<impl Reply, warp::Rejection>
388where
389    Q: DatabaseQueue + Send + Sync + 'static,
390{
391    match queue
392        .list_archived_jobs(
393            filters.queue.as_deref(),
394            Some(pagination.get_limit()),
395            Some(pagination.get_offset()),
396        )
397        .await
398    {
399        Ok(archived_jobs) => {
400            // Convert to API format
401            let jobs: Vec<ArchivedJobInfo> = archived_jobs.into_iter().map(Into::into).collect();
402
403            // Get a better estimate of total count by running a query with a large limit
404            // This is not perfect but gives a more accurate total than just the current page
405            let total = if jobs.len() as u64 == pagination.limit.unwrap_or(0) as u64 {
406                // If we got exactly the limit, there might be more records
407                // Run another query to get a better count estimate
408                match queue
409                    .list_archived_jobs(
410                        filters.queue.as_deref(),
411                        Some(10000),
412                        Some(0),
413                    )
414                    .await
415                {
416                    Ok(all_jobs) => all_jobs.len() as u64,
417                    Err(_) => jobs.len() as u64, // Fallback to current page count
418                }
419            } else {
420                // If we got less than the limit, we have all records
421                pagination.offset.unwrap_or(0) as u64 + jobs.len() as u64
422            };
423
424            let pagination_meta = PaginationMeta::new(&pagination, total);
425            let response = PaginatedResponse {
426                items: jobs,
427                pagination: pagination_meta,
428            };
429
430            Ok(warp::reply::json(&ApiResponse::success(response)))
431        }
432        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
433            "Failed to list archived jobs: {}",
434            e
435        )))),
436    }
437}
438
439/// Handle restore job request
440async fn handle_restore_job<Q>(
441    job_id_str: String,
442    request: RestoreRequest,
443    queue: Arc<Q>,
444) -> Result<impl Reply, warp::Rejection>
445where
446    Q: DatabaseQueue + Send + Sync + 'static,
447{
448    let job_id = match uuid::Uuid::parse_str(&job_id_str) {
449        Ok(id) => id,
450        Err(_) => {
451            return Ok(warp::reply::json(&ApiResponse::<()>::error(
452                "Invalid job ID format".to_string(),
453            )));
454        }
455    };
456
457    match queue.restore_archived_job(job_id).await {
458        Ok(job) => {
459            let response = RestoreResponse {
460                job,
461                restored_at: chrono::Utc::now(),
462                restored_by: request.restored_by,
463            };
464            Ok(warp::reply::json(&ApiResponse::success(response)))
465        }
466        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
467            "Failed to restore job: {}",
468            e
469        )))),
470    }
471}
472
473/// Handle purge jobs request
474async fn handle_purge_jobs<Q>(
475    request: PurgeRequest,
476    queue: Arc<Q>,
477) -> Result<impl Reply, warp::Rejection>
478where
479    Q: DatabaseQueue + Send + Sync + 'static,
480{
481    if request.dry_run {
482        // For dry run, estimate how many jobs would be purged by using list_archived_jobs
483        // with a large limit to get an accurate count
484        let count = match queue
485            .list_archived_jobs(None, Some(10000), Some(0))
486            .await
487        {
488            Ok(jobs) => jobs.len() as u64,
489            Err(_) => 0, // If we can't get the count, return 0 for safety
490        };
491
492        let response = PurgeResponse {
493            jobs_purged: count,
494            dry_run: true,
495            executed_at: chrono::Utc::now(),
496        };
497        return Ok(warp::reply::json(&ApiResponse::success(response)));
498    }
499
500    match queue.purge_archived_jobs(request.older_than).await {
501        Ok(jobs_purged) => {
502            let response = PurgeResponse {
503                jobs_purged,
504                dry_run: false,
505                executed_at: chrono::Utc::now(),
506            };
507            Ok(warp::reply::json(&ApiResponse::success(response)))
508        }
509        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
510            "Failed to purge archived jobs: {}",
511            e
512        )))),
513    }
514}
515
516/// Handle archive stats request
517async fn handle_archive_stats<Q>(
518    filters: FilterParams,
519    queue: Arc<Q>,
520) -> Result<impl Reply, warp::Rejection>
521where
522    Q: DatabaseQueue + Send + Sync + 'static,
523{
524    match queue.get_archival_stats(filters.queue.as_deref()).await {
525        Ok(stats) => {
526            // Collect per-queue stats if no specific queue is filtered
527            let mut by_queue = std::collections::HashMap::new();
528
529            if filters.queue.is_none() {
530                // Get queue list and collect stats for each
531                if let Ok(queue_stats) = queue.get_all_queue_stats().await {
532                    for queue_stat in queue_stats {
533                        if let Ok(queue_archival_stats) =
534                            queue.get_archival_stats(Some(&queue_stat.queue_name)).await
535                        {
536                            by_queue.insert(queue_stat.queue_name, queue_archival_stats);
537                        }
538                    }
539                }
540            }
541
542            // Generate some mock recent operations (in a real implementation, these would be tracked)
543            let recent_operations = vec![
544                RecentOperation {
545                    operation_type: "archive".to_string(),
546                    jobs_affected: stats.jobs_archived,
547                    executed_at: chrono::Utc::now() - chrono::Duration::hours(2),
548                    executed_by: Some("system".to_string()),
549                    reason: Some("Automated archival".to_string()),
550                    queue_name: filters.queue.clone(),
551                },
552                RecentOperation {
553                    operation_type: "purge".to_string(),
554                    jobs_affected: stats.jobs_purged,
555                    executed_at: chrono::Utc::now() - chrono::Duration::hours(4),
556                    executed_by: Some("system".to_string()),
557                    reason: Some("Automated purge".to_string()),
558                    queue_name: filters.queue.clone(),
559                },
560            ];
561
562            let response = StatsResponse {
563                stats,
564                by_queue,
565                recent_operations,
566            };
567            Ok(warp::reply::json(&ApiResponse::success(response)))
568        }
569        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
570            "Failed to get archive stats: {}",
571            e
572        )))),
573    }
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579    use chrono::Duration;
580
581    #[test]
582    fn test_archive_request_default() {
583        let request = ArchiveRequest::default();
584        assert!(request.dry_run);
585        assert_eq!(request.reason, ArchivalReason::Manual);
586        assert!(request.queue_name.is_none());
587    }
588
589    #[test]
590    fn test_archived_job_info_conversion() {
591        let archived_job = ArchivedJob {
592            id: uuid::Uuid::new_v4(),
593            queue_name: "test_queue".to_string(),
594            status: JobStatus::Completed,
595            created_at: chrono::Utc::now(),
596            archived_at: chrono::Utc::now(),
597            archival_reason: ArchivalReason::Automatic,
598            original_payload_size: Some(1024),
599            payload_compressed: true,
600            archived_by: Some("system".to_string()),
601        };
602
603        let info: ArchivedJobInfo = archived_job.into();
604        assert_eq!(info.queue_name, "test_queue");
605        assert!(info.payload_compressed);
606        assert_eq!(info.archival_reason, ArchivalReason::Automatic);
607    }
608
609    #[test]
610    fn test_purge_request_validation() {
611        let request = PurgeRequest {
612            older_than: chrono::Utc::now() - Duration::days(365),
613            dry_run: true,
614            purged_by: Some("admin".to_string()),
615        };
616
617        assert!(request.dry_run);
618        assert_eq!(request.purged_by, Some("admin".to_string()));
619    }
620}