Skip to main content

hammerwork_web/api/
archive.rs

1//! Archive management API endpoints.
2//!
3//! This module provides REST API endpoints for managing job archiving operations,
4//! including archiving jobs, restoring archived jobs, listing archived jobs,
5//! and configuring archival policies.
6//!
7//! # Endpoints
8//!
9//! - `POST /api/archive/jobs` - Archive jobs based on policy
10//! - `GET /api/archive/jobs` - List archived jobs with pagination and filtering
11//! - `POST /api/archive/jobs/{id}/restore` - Restore an archived job
12//! - `DELETE /api/archive/jobs` - Purge old archived jobs
13//! - `GET /api/archive/stats` - Get archival statistics
14//! - `GET /api/archive/policies` - List archival policies
15//! - `POST /api/archive/policies` - Create or update archival policy
16//! - `DELETE /api/archive/policies/{id}` - Delete archival policy
17//!
18//! # Examples
19//!
20//! ## Archive Jobs
21//!
22//! ```rust
23//! use hammerwork_web::api::archive::{ArchiveRequest, ArchiveResponse};
24//! use hammerwork::archive::{ArchivalReason, ArchivalPolicy};
25//! use chrono::Duration;
26//!
27//! let request = ArchiveRequest {
28//!     queue_name: Some("completed_jobs".to_string()),
29//!     reason: ArchivalReason::Automatic,
30//!     archived_by: Some("scheduler".to_string()),
31//!     dry_run: false,
32//!     policy: Some(ArchivalPolicy::new()
33//!         .archive_completed_after(Duration::days(7))),
34//! };
35//!
36//! // This would be sent to POST /api/archive/jobs
37//! assert_eq!(request.queue_name, Some("completed_jobs".to_string()));
38//! assert!(!request.dry_run);
39//! ```
40//!
41//! ## List Archived Jobs
42//!
43//! ```rust
44//! use hammerwork_web::api::archive::ArchivedJobInfo;
45//! use hammerwork::archive::{ArchivalReason, ArchivedJob};
46//! use hammerwork::{JobId, JobStatus};
47//! use chrono::Utc;
48//! use uuid::Uuid;
49//!
50//! let archived_job = ArchivedJobInfo {
51//!     id: Uuid::new_v4(),
52//!     queue_name: "email_queue".to_string(),
53//!     status: JobStatus::Completed,
54//!     created_at: Utc::now(),
55//!     archived_at: Utc::now(),
56//!     archival_reason: ArchivalReason::Automatic,
57//!     original_payload_size: Some(1024),
58//!     payload_compressed: true,
59//!     archived_by: Some("system".to_string()),
60//! };
61//!
62//! assert_eq!(archived_job.queue_name, "email_queue");
63//! assert!(archived_job.payload_compressed);
64//! ```
65
66use super::{ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams};
67use hammerwork::{
68    JobId, JobStatus,
69    archive::{ArchivalConfig, ArchivalPolicy, ArchivalReason, ArchivalStats, ArchivedJob},
70    queue::DatabaseQueue,
71};
72use serde::{Deserialize, Serialize};
73use std::sync::Arc;
74use warp::{Filter, Reply};
75
76/// Request to archive jobs
77#[derive(Debug, Deserialize)]
78pub struct ArchiveRequest {
79    /// Optional queue name to limit archival to specific queue
80    pub queue_name: Option<String>,
81    /// Reason for archival
82    pub reason: ArchivalReason,
83    /// Who initiated the archival
84    pub archived_by: Option<String>,
85    /// Whether this is a dry run (don't actually archive)
86    pub dry_run: bool,
87    /// Archival policy to use (optional, uses default if not provided)
88    pub policy: Option<ArchivalPolicy>,
89    /// Archival configuration (optional, uses default if not provided)
90    pub config: Option<ArchivalConfig>,
91}
92
93impl Default for ArchiveRequest {
94    fn default() -> Self {
95        Self {
96            queue_name: None,
97            reason: ArchivalReason::Manual,
98            archived_by: None,
99            dry_run: true,
100            policy: None,
101            config: None,
102        }
103    }
104}
105
106/// Response from archiving jobs
107#[derive(Debug, Serialize)]
108pub struct ArchiveResponse {
109    /// Statistics from the archival operation
110    pub stats: ArchivalStats,
111    /// Whether this was a dry run
112    pub dry_run: bool,
113    /// Policy used for archival
114    pub policy_used: ArchivalPolicy,
115    /// Configuration used for archival
116    pub config_used: ArchivalConfig,
117}
118
119/// Request to restore an archived job
120#[derive(Debug, Deserialize)]
121pub struct RestoreRequest {
122    /// Optional reason for restoration
123    pub reason: Option<String>,
124    /// Who initiated the restoration
125    pub restored_by: Option<String>,
126}
127
128/// Response from restoring a job
129#[derive(Debug, Serialize)]
130pub struct RestoreResponse {
131    /// The restored job
132    pub job: hammerwork::Job,
133    /// When the job was restored
134    pub restored_at: chrono::DateTime<chrono::Utc>,
135    /// Who restored the job
136    pub restored_by: Option<String>,
137}
138
139/// Request to purge archived jobs
140#[derive(Debug, Deserialize)]
141pub struct PurgeRequest {
142    /// Delete archived jobs older than this date
143    pub older_than: chrono::DateTime<chrono::Utc>,
144    /// Whether this is a dry run
145    pub dry_run: bool,
146    /// Who initiated the purge
147    pub purged_by: Option<String>,
148}
149
150/// Response from purging archived jobs
151#[derive(Debug, Serialize)]
152pub struct PurgeResponse {
153    /// Number of jobs that would be (or were) purged
154    pub jobs_purged: u64,
155    /// Whether this was a dry run
156    pub dry_run: bool,
157    /// When the purge was executed
158    pub executed_at: chrono::DateTime<chrono::Utc>,
159}
160
161/// Archived job information for API responses
162#[derive(Debug, Serialize)]
163pub struct ArchivedJobInfo {
164    /// Job ID
165    pub id: JobId,
166    /// Queue name
167    pub queue_name: String,
168    /// Original job status
169    pub status: JobStatus,
170    /// When the job was created
171    pub created_at: chrono::DateTime<chrono::Utc>,
172    /// When the job was archived
173    pub archived_at: chrono::DateTime<chrono::Utc>,
174    /// Reason for archival
175    pub archival_reason: ArchivalReason,
176    /// Original payload size in bytes
177    pub original_payload_size: Option<usize>,
178    /// Whether the payload was compressed
179    pub payload_compressed: bool,
180    /// Who archived the job
181    pub archived_by: Option<String>,
182}
183
184impl From<ArchivedJob> for ArchivedJobInfo {
185    fn from(archived_job: ArchivedJob) -> Self {
186        Self {
187            id: archived_job.id,
188            queue_name: archived_job.queue_name,
189            status: archived_job.status,
190            created_at: archived_job.created_at,
191            archived_at: archived_job.archived_at,
192            archival_reason: archived_job.archival_reason,
193            original_payload_size: archived_job.original_payload_size,
194            payload_compressed: archived_job.payload_compressed,
195            archived_by: archived_job.archived_by,
196        }
197    }
198}
199
200/// Archival policy configuration request
201#[derive(Debug, Deserialize)]
202pub struct PolicyRequest {
203    /// Policy name/identifier
204    pub name: String,
205    /// Archival policy configuration
206    pub policy: ArchivalPolicy,
207    /// Whether this policy is active
208    pub active: bool,
209}
210
211/// Archival policy response
212#[derive(Debug, Serialize)]
213pub struct PolicyResponse {
214    /// Policy name/identifier
215    pub name: String,
216    /// Archival policy configuration
217    pub policy: ArchivalPolicy,
218    /// Whether this policy is active
219    pub active: bool,
220    /// When the policy was created
221    pub created_at: chrono::DateTime<chrono::Utc>,
222    /// When the policy was last modified
223    pub modified_at: chrono::DateTime<chrono::Utc>,
224}
225
226/// Archive statistics response
227#[derive(Debug, Serialize)]
228pub struct StatsResponse {
229    /// Overall archival statistics
230    pub stats: ArchivalStats,
231    /// Statistics by queue
232    pub by_queue: std::collections::HashMap<String, ArchivalStats>,
233    /// Recent archival operations
234    pub recent_operations: Vec<RecentOperation>,
235}
236
237/// Information about recent archival operations
238#[derive(Debug, Serialize)]
239pub struct RecentOperation {
240    /// Type of operation (archive, restore, purge)
241    pub operation_type: String,
242    /// Queue affected (if applicable)
243    pub queue_name: Option<String>,
244    /// Number of jobs affected
245    pub jobs_affected: u64,
246    /// When the operation occurred
247    pub executed_at: chrono::DateTime<chrono::Utc>,
248    /// Who executed the operation
249    pub executed_by: Option<String>,
250    /// Operation reason
251    pub reason: Option<String>,
252}
253
254/// Archive filter parameters
255#[derive(Debug, Deserialize, Default)]
256pub struct ArchiveFilterParams {
257    /// Filter by queue name
258    pub queue: Option<String>,
259    /// Filter by archival reason
260    pub reason: Option<String>,
261    /// Filter by archived after date
262    pub archived_after: Option<chrono::DateTime<chrono::Utc>>,
263    /// Filter by archived before date
264    pub archived_before: Option<chrono::DateTime<chrono::Utc>>,
265    /// Filter by who archived
266    pub archived_by: Option<String>,
267    /// Filter by compression status
268    pub compressed: Option<bool>,
269    /// Filter by original job status
270    pub original_status: Option<String>,
271}
272
273/// Create archive API routes
274pub fn archive_routes<Q>(
275    queue: Arc<Q>,
276) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
277where
278    Q: DatabaseQueue + Send + Sync + 'static,
279{
280    let archive_jobs = warp::path!("api" / "archive" / "jobs")
281        .and(warp::post())
282        .and(warp::body::json())
283        .and(with_queue(queue.clone()))
284        .and_then(handle_archive_jobs);
285
286    let list_archived = warp::path!("api" / "archive" / "jobs")
287        .and(warp::get())
288        .and(super::with_pagination())
289        .and(with_archive_filters())
290        .and(with_queue(queue.clone()))
291        .and_then(handle_list_archived_jobs);
292
293    let restore_job = warp::path!("api" / "archive" / "jobs" / String / "restore")
294        .and(warp::post())
295        .and(warp::body::json())
296        .and(with_queue(queue.clone()))
297        .and_then(handle_restore_job);
298
299    let purge_jobs = warp::path!("api" / "archive" / "purge")
300        .and(warp::delete())
301        .and(warp::body::json())
302        .and(with_queue(queue.clone()))
303        .and_then(handle_purge_jobs);
304
305    let archive_stats = warp::path!("api" / "archive" / "stats")
306        .and(warp::get())
307        .and(warp::query::<FilterParams>())
308        .and(with_queue(queue.clone()))
309        .and_then(handle_archive_stats);
310
311    archive_jobs
312        .or(list_archived)
313        .or(restore_job)
314        .or(purge_jobs)
315        .or(archive_stats)
316}
317
318/// Helper to inject queue into handlers
319fn with_queue<Q>(
320    queue: Arc<Q>,
321) -> impl Filter<Extract = (Arc<Q>,), Error = std::convert::Infallible> + Clone
322where
323    Q: DatabaseQueue + Send + Sync + 'static,
324{
325    warp::any().map(move || queue.clone())
326}
327
328/// Extract archive filter parameters from query string
329fn with_archive_filters()
330-> impl Filter<Extract = (ArchiveFilterParams,), Error = warp::Rejection> + Clone {
331    warp::query::<ArchiveFilterParams>()
332}
333
334/// Handle archive jobs request
335async fn handle_archive_jobs<Q>(
336    request: ArchiveRequest,
337    queue: Arc<Q>,
338) -> Result<impl Reply, warp::Rejection>
339where
340    Q: DatabaseQueue + Send + Sync + 'static,
341{
342    let policy = request.policy.unwrap_or_default();
343    let config = request.config.unwrap_or_default();
344
345    if request.dry_run {
346        // For dry run, return what would happen without actually archiving
347        let response = ArchiveResponse {
348            stats: ArchivalStats::default(), // In a real dry run, we'd calculate this
349            dry_run: true,
350            policy_used: policy,
351            config_used: config,
352        };
353        return Ok(warp::reply::json(&ApiResponse::success(response)));
354    }
355
356    match queue
357        .archive_jobs(
358            request.queue_name.as_deref(),
359            &policy,
360            &config,
361            request.reason,
362            request.archived_by.as_deref(),
363        )
364        .await
365    {
366        Ok(stats) => {
367            let response = ArchiveResponse {
368                stats,
369                dry_run: false,
370                policy_used: policy,
371                config_used: config,
372            };
373            Ok(warp::reply::json(&ApiResponse::success(response)))
374        }
375        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
376            "Failed to archive jobs: {}",
377            e
378        )))),
379    }
380}
381
382/// Handle list archived jobs request
383async fn handle_list_archived_jobs<Q>(
384    pagination: PaginationParams,
385    filters: ArchiveFilterParams,
386    queue: Arc<Q>,
387) -> Result<impl Reply, warp::Rejection>
388where
389    Q: DatabaseQueue + Send + Sync + 'static,
390{
391    match queue
392        .list_archived_jobs(
393            filters.queue.as_deref(),
394            Some(pagination.get_limit()),
395            Some(pagination.get_offset()),
396        )
397        .await
398    {
399        Ok(archived_jobs) => {
400            // Convert to API format
401            let jobs: Vec<ArchivedJobInfo> = archived_jobs.into_iter().map(Into::into).collect();
402
403            // Get a better estimate of total count by running a query with a large limit
404            // This is not perfect but gives a more accurate total than just the current page
405            let total = if jobs.len() as u64 == pagination.limit.unwrap_or(0) as u64 {
406                // If we got exactly the limit, there might be more records
407                // Run another query to get a better count estimate
408                match queue
409                    .list_archived_jobs(filters.queue.as_deref(), Some(10000), Some(0))
410                    .await
411                {
412                    Ok(all_jobs) => all_jobs.len() as u64,
413                    Err(_) => jobs.len() as u64, // Fallback to current page count
414                }
415            } else {
416                // If we got less than the limit, we have all records
417                pagination.offset.unwrap_or(0) as u64 + jobs.len() as u64
418            };
419
420            let pagination_meta = PaginationMeta::new(&pagination, total);
421            let response = PaginatedResponse {
422                items: jobs,
423                pagination: pagination_meta,
424            };
425
426            Ok(warp::reply::json(&ApiResponse::success(response)))
427        }
428        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
429            "Failed to list archived jobs: {}",
430            e
431        )))),
432    }
433}
434
435/// Handle restore job request
436async fn handle_restore_job<Q>(
437    job_id_str: String,
438    request: RestoreRequest,
439    queue: Arc<Q>,
440) -> Result<impl Reply, warp::Rejection>
441where
442    Q: DatabaseQueue + Send + Sync + 'static,
443{
444    let job_id = match uuid::Uuid::parse_str(&job_id_str) {
445        Ok(id) => id,
446        Err(_) => {
447            return Ok(warp::reply::json(&ApiResponse::<()>::error(
448                "Invalid job ID format".to_string(),
449            )));
450        }
451    };
452
453    match queue.restore_archived_job(job_id).await {
454        Ok(job) => {
455            let response = RestoreResponse {
456                job,
457                restored_at: chrono::Utc::now(),
458                restored_by: request.restored_by,
459            };
460            Ok(warp::reply::json(&ApiResponse::success(response)))
461        }
462        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
463            "Failed to restore job: {}",
464            e
465        )))),
466    }
467}
468
469/// Handle purge jobs request
470async fn handle_purge_jobs<Q>(
471    request: PurgeRequest,
472    queue: Arc<Q>,
473) -> Result<impl Reply, warp::Rejection>
474where
475    Q: DatabaseQueue + Send + Sync + 'static,
476{
477    if request.dry_run {
478        // For dry run, estimate how many jobs would be purged by using list_archived_jobs
479        // with a large limit to get an accurate count
480        let count = match queue.list_archived_jobs(None, Some(10000), Some(0)).await {
481            Ok(jobs) => jobs.len() as u64,
482            Err(_) => 0, // If we can't get the count, return 0 for safety
483        };
484
485        let response = PurgeResponse {
486            jobs_purged: count,
487            dry_run: true,
488            executed_at: chrono::Utc::now(),
489        };
490        return Ok(warp::reply::json(&ApiResponse::success(response)));
491    }
492
493    match queue.purge_archived_jobs(request.older_than).await {
494        Ok(jobs_purged) => {
495            let response = PurgeResponse {
496                jobs_purged,
497                dry_run: false,
498                executed_at: chrono::Utc::now(),
499            };
500            Ok(warp::reply::json(&ApiResponse::success(response)))
501        }
502        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
503            "Failed to purge archived jobs: {}",
504            e
505        )))),
506    }
507}
508
509/// Handle archive stats request
510async fn handle_archive_stats<Q>(
511    filters: FilterParams,
512    queue: Arc<Q>,
513) -> Result<impl Reply, warp::Rejection>
514where
515    Q: DatabaseQueue + Send + Sync + 'static,
516{
517    match queue.get_archival_stats(filters.queue.as_deref()).await {
518        Ok(stats) => {
519            // Collect per-queue stats if no specific queue is filtered
520            let mut by_queue = std::collections::HashMap::new();
521
522            if filters.queue.is_none() {
523                // Get queue list and collect stats for each
524                if let Ok(queue_stats) = queue.get_all_queue_stats().await {
525                    for queue_stat in queue_stats {
526                        if let Ok(queue_archival_stats) =
527                            queue.get_archival_stats(Some(&queue_stat.queue_name)).await
528                        {
529                            by_queue.insert(queue_stat.queue_name, queue_archival_stats);
530                        }
531                    }
532                }
533            }
534
535            // Generate some mock recent operations (in a real implementation, these would be tracked)
536            let recent_operations = vec![
537                RecentOperation {
538                    operation_type: "archive".to_string(),
539                    jobs_affected: stats.jobs_archived,
540                    executed_at: chrono::Utc::now() - chrono::Duration::hours(2),
541                    executed_by: Some("system".to_string()),
542                    reason: Some("Automated archival".to_string()),
543                    queue_name: filters.queue.clone(),
544                },
545                RecentOperation {
546                    operation_type: "purge".to_string(),
547                    jobs_affected: stats.jobs_purged,
548                    executed_at: chrono::Utc::now() - chrono::Duration::hours(4),
549                    executed_by: Some("system".to_string()),
550                    reason: Some("Automated purge".to_string()),
551                    queue_name: filters.queue.clone(),
552                },
553            ];
554
555            let response = StatsResponse {
556                stats,
557                by_queue,
558                recent_operations,
559            };
560            Ok(warp::reply::json(&ApiResponse::success(response)))
561        }
562        Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
563            "Failed to get archive stats: {}",
564            e
565        )))),
566    }
567}
568
569#[cfg(test)]
570mod tests {
571    use super::*;
572    use chrono::Duration;
573
574    #[test]
575    fn test_archive_request_default() {
576        let request = ArchiveRequest::default();
577        assert!(request.dry_run);
578        assert_eq!(request.reason, ArchivalReason::Manual);
579        assert!(request.queue_name.is_none());
580    }
581
582    #[test]
583    fn test_archived_job_info_conversion() {
584        let archived_job = ArchivedJob {
585            id: uuid::Uuid::new_v4(),
586            queue_name: "test_queue".to_string(),
587            status: JobStatus::Completed,
588            created_at: chrono::Utc::now(),
589            archived_at: chrono::Utc::now(),
590            archival_reason: ArchivalReason::Automatic,
591            original_payload_size: Some(1024),
592            payload_compressed: true,
593            archived_by: Some("system".to_string()),
594        };
595
596        let info: ArchivedJobInfo = archived_job.into();
597        assert_eq!(info.queue_name, "test_queue");
598        assert!(info.payload_compressed);
599        assert_eq!(info.archival_reason, ArchivalReason::Automatic);
600    }
601
602    #[test]
603    fn test_purge_request_validation() {
604        let request = PurgeRequest {
605            older_than: chrono::Utc::now() - Duration::days(365),
606            dry_run: true,
607            purged_by: Some("admin".to_string()),
608        };
609
610        assert!(request.dry_run);
611        assert_eq!(request.purged_by, Some("admin".to_string()));
612    }
613}