1use super::{ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams};
67use hammerwork::{
68 archive::{ArchivalConfig, ArchivalPolicy, ArchivalReason, ArchivalStats, ArchivedJob},
69 queue::DatabaseQueue,
70 JobId, JobStatus,
71};
72use serde::{Deserialize, Serialize};
73use std::sync::Arc;
74use warp::{Filter, Reply};
75
76#[derive(Debug, Deserialize)]
78pub struct ArchiveRequest {
79 pub queue_name: Option<String>,
81 pub reason: ArchivalReason,
83 pub archived_by: Option<String>,
85 pub dry_run: bool,
87 pub policy: Option<ArchivalPolicy>,
89 pub config: Option<ArchivalConfig>,
91}
92
93impl Default for ArchiveRequest {
94 fn default() -> Self {
95 Self {
96 queue_name: None,
97 reason: ArchivalReason::Manual,
98 archived_by: None,
99 dry_run: true,
100 policy: None,
101 config: None,
102 }
103 }
104}
105
106#[derive(Debug, Serialize)]
108pub struct ArchiveResponse {
109 pub stats: ArchivalStats,
111 pub dry_run: bool,
113 pub policy_used: ArchivalPolicy,
115 pub config_used: ArchivalConfig,
117}
118
119#[derive(Debug, Deserialize)]
121pub struct RestoreRequest {
122 pub reason: Option<String>,
124 pub restored_by: Option<String>,
126}
127
128#[derive(Debug, Serialize)]
130pub struct RestoreResponse {
131 pub job: hammerwork::Job,
133 pub restored_at: chrono::DateTime<chrono::Utc>,
135 pub restored_by: Option<String>,
137}
138
139#[derive(Debug, Deserialize)]
141pub struct PurgeRequest {
142 pub older_than: chrono::DateTime<chrono::Utc>,
144 pub dry_run: bool,
146 pub purged_by: Option<String>,
148}
149
150#[derive(Debug, Serialize)]
152pub struct PurgeResponse {
153 pub jobs_purged: u64,
155 pub dry_run: bool,
157 pub executed_at: chrono::DateTime<chrono::Utc>,
159}
160
161#[derive(Debug, Serialize)]
163pub struct ArchivedJobInfo {
164 pub id: JobId,
166 pub queue_name: String,
168 pub status: JobStatus,
170 pub created_at: chrono::DateTime<chrono::Utc>,
172 pub archived_at: chrono::DateTime<chrono::Utc>,
174 pub archival_reason: ArchivalReason,
176 pub original_payload_size: Option<usize>,
178 pub payload_compressed: bool,
180 pub archived_by: Option<String>,
182}
183
184impl From<ArchivedJob> for ArchivedJobInfo {
185 fn from(archived_job: ArchivedJob) -> Self {
186 Self {
187 id: archived_job.id,
188 queue_name: archived_job.queue_name,
189 status: archived_job.status,
190 created_at: archived_job.created_at,
191 archived_at: archived_job.archived_at,
192 archival_reason: archived_job.archival_reason,
193 original_payload_size: archived_job.original_payload_size,
194 payload_compressed: archived_job.payload_compressed,
195 archived_by: archived_job.archived_by,
196 }
197 }
198}
199
200#[derive(Debug, Deserialize)]
202pub struct PolicyRequest {
203 pub name: String,
205 pub policy: ArchivalPolicy,
207 pub active: bool,
209}
210
211#[derive(Debug, Serialize)]
213pub struct PolicyResponse {
214 pub name: String,
216 pub policy: ArchivalPolicy,
218 pub active: bool,
220 pub created_at: chrono::DateTime<chrono::Utc>,
222 pub modified_at: chrono::DateTime<chrono::Utc>,
224}
225
226#[derive(Debug, Serialize)]
228pub struct StatsResponse {
229 pub stats: ArchivalStats,
231 pub by_queue: std::collections::HashMap<String, ArchivalStats>,
233 pub recent_operations: Vec<RecentOperation>,
235}
236
237#[derive(Debug, Serialize)]
239pub struct RecentOperation {
240 pub operation_type: String,
242 pub queue_name: Option<String>,
244 pub jobs_affected: u64,
246 pub executed_at: chrono::DateTime<chrono::Utc>,
248 pub executed_by: Option<String>,
250 pub reason: Option<String>,
252}
253
254#[derive(Debug, Deserialize, Default)]
256pub struct ArchiveFilterParams {
257 pub queue: Option<String>,
259 pub reason: Option<String>,
261 pub archived_after: Option<chrono::DateTime<chrono::Utc>>,
263 pub archived_before: Option<chrono::DateTime<chrono::Utc>>,
265 pub archived_by: Option<String>,
267 pub compressed: Option<bool>,
269 pub original_status: Option<String>,
271}
272
273pub fn archive_routes<Q>(
275 queue: Arc<Q>,
276) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
277where
278 Q: DatabaseQueue + Send + Sync + 'static,
279{
280 let archive_jobs = warp::path!("api" / "archive" / "jobs")
281 .and(warp::post())
282 .and(warp::body::json())
283 .and(with_queue(queue.clone()))
284 .and_then(handle_archive_jobs);
285
286 let list_archived = warp::path!("api" / "archive" / "jobs")
287 .and(warp::get())
288 .and(super::with_pagination())
289 .and(with_archive_filters())
290 .and(with_queue(queue.clone()))
291 .and_then(handle_list_archived_jobs);
292
293 let restore_job = warp::path!("api" / "archive" / "jobs" / String / "restore")
294 .and(warp::post())
295 .and(warp::body::json())
296 .and(with_queue(queue.clone()))
297 .and_then(handle_restore_job);
298
299 let purge_jobs = warp::path!("api" / "archive" / "purge")
300 .and(warp::delete())
301 .and(warp::body::json())
302 .and(with_queue(queue.clone()))
303 .and_then(handle_purge_jobs);
304
305 let archive_stats = warp::path!("api" / "archive" / "stats")
306 .and(warp::get())
307 .and(warp::query::<FilterParams>())
308 .and(with_queue(queue.clone()))
309 .and_then(handle_archive_stats);
310
311 archive_jobs
312 .or(list_archived)
313 .or(restore_job)
314 .or(purge_jobs)
315 .or(archive_stats)
316}
317
318fn with_queue<Q>(
320 queue: Arc<Q>,
321) -> impl Filter<Extract = (Arc<Q>,), Error = std::convert::Infallible> + Clone
322where
323 Q: DatabaseQueue + Send + Sync + 'static,
324{
325 warp::any().map(move || queue.clone())
326}
327
328fn with_archive_filters() -> impl Filter<Extract = (ArchiveFilterParams,), Error = warp::Rejection> + Clone {
330 warp::query::<ArchiveFilterParams>()
331}
332
333async fn handle_archive_jobs<Q>(
335 request: ArchiveRequest,
336 queue: Arc<Q>,
337) -> Result<impl Reply, warp::Rejection>
338where
339 Q: DatabaseQueue + Send + Sync + 'static,
340{
341 let policy = request.policy.unwrap_or_default();
342 let config = request.config.unwrap_or_default();
343
344 if request.dry_run {
345 let response = ArchiveResponse {
347 stats: ArchivalStats::default(), dry_run: true,
349 policy_used: policy,
350 config_used: config,
351 };
352 return Ok(warp::reply::json(&ApiResponse::success(response)));
353 }
354
355 match queue
356 .archive_jobs(
357 request.queue_name.as_deref(),
358 &policy,
359 &config,
360 request.reason,
361 request.archived_by.as_deref(),
362 )
363 .await
364 {
365 Ok(stats) => {
366 let response = ArchiveResponse {
367 stats,
368 dry_run: false,
369 policy_used: policy,
370 config_used: config,
371 };
372 Ok(warp::reply::json(&ApiResponse::success(response)))
373 }
374 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(
375 format!("Failed to archive jobs: {}", e),
376 ))),
377 }
378}
379
380async fn handle_list_archived_jobs<Q>(
382 pagination: PaginationParams,
383 filters: ArchiveFilterParams,
384 queue: Arc<Q>,
385) -> Result<impl Reply, warp::Rejection>
386where
387 Q: DatabaseQueue + Send + Sync + 'static,
388{
389 match queue
390 .list_archived_jobs(
391 filters.queue.as_deref(),
392 Some(pagination.get_limit()),
393 Some(pagination.get_offset()),
394 )
395 .await
396 {
397 Ok(archived_jobs) => {
398 let jobs: Vec<ArchivedJobInfo> = archived_jobs.into_iter().map(Into::into).collect();
400
401 let total = jobs.len() as u64;
404
405 let pagination_meta = PaginationMeta::new(&pagination, total);
406 let response = PaginatedResponse {
407 items: jobs,
408 pagination: pagination_meta,
409 };
410
411 Ok(warp::reply::json(&ApiResponse::success(response)))
412 }
413 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(
414 format!("Failed to list archived jobs: {}", e),
415 ))),
416 }
417}
418
419async fn handle_restore_job<Q>(
421 job_id_str: String,
422 request: RestoreRequest,
423 queue: Arc<Q>,
424) -> Result<impl Reply, warp::Rejection>
425where
426 Q: DatabaseQueue + Send + Sync + 'static,
427{
428 let job_id = match uuid::Uuid::parse_str(&job_id_str) {
429 Ok(id) => id,
430 Err(_) => {
431 return Ok(warp::reply::json(&ApiResponse::<()>::error(
432 "Invalid job ID format".to_string(),
433 )));
434 }
435 };
436
437 match queue.restore_archived_job(job_id).await {
438 Ok(job) => {
439 let response = RestoreResponse {
440 job,
441 restored_at: chrono::Utc::now(),
442 restored_by: request.restored_by,
443 };
444 Ok(warp::reply::json(&ApiResponse::success(response)))
445 }
446 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(
447 format!("Failed to restore job: {}", e),
448 ))),
449 }
450}
451
452async fn handle_purge_jobs<Q>(
454 request: PurgeRequest,
455 queue: Arc<Q>,
456) -> Result<impl Reply, warp::Rejection>
457where
458 Q: DatabaseQueue + Send + Sync + 'static,
459{
460 if request.dry_run {
461 let response = PurgeResponse {
464 jobs_purged: 0, dry_run: true,
466 executed_at: chrono::Utc::now(),
467 };
468 return Ok(warp::reply::json(&ApiResponse::success(response)));
469 }
470
471 match queue.purge_archived_jobs(request.older_than).await {
472 Ok(jobs_purged) => {
473 let response = PurgeResponse {
474 jobs_purged,
475 dry_run: false,
476 executed_at: chrono::Utc::now(),
477 };
478 Ok(warp::reply::json(&ApiResponse::success(response)))
479 }
480 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(
481 format!("Failed to purge archived jobs: {}", e),
482 ))),
483 }
484}
485
486async fn handle_archive_stats<Q>(
488 filters: FilterParams,
489 queue: Arc<Q>,
490) -> Result<impl Reply, warp::Rejection>
491where
492 Q: DatabaseQueue + Send + Sync + 'static,
493{
494 match queue.get_archival_stats(filters.queue.as_deref()).await {
495 Ok(stats) => {
496 let response = StatsResponse {
499 stats,
500 by_queue: std::collections::HashMap::new(),
501 recent_operations: vec![],
502 };
503 Ok(warp::reply::json(&ApiResponse::success(response)))
504 }
505 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(
506 format!("Failed to get archive stats: {}", e),
507 ))),
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use super::*;
514 use chrono::Duration;
515
516 #[test]
517 fn test_archive_request_default() {
518 let request = ArchiveRequest::default();
519 assert!(request.dry_run);
520 assert_eq!(request.reason, ArchivalReason::Manual);
521 assert!(request.queue_name.is_none());
522 }
523
524 #[test]
525 fn test_archived_job_info_conversion() {
526 let archived_job = ArchivedJob {
527 id: uuid::Uuid::new_v4(),
528 queue_name: "test_queue".to_string(),
529 status: JobStatus::Completed,
530 created_at: chrono::Utc::now(),
531 archived_at: chrono::Utc::now(),
532 archival_reason: ArchivalReason::Automatic,
533 original_payload_size: Some(1024),
534 payload_compressed: true,
535 archived_by: Some("system".to_string()),
536 };
537
538 let info: ArchivedJobInfo = archived_job.into();
539 assert_eq!(info.queue_name, "test_queue");
540 assert!(info.payload_compressed);
541 assert_eq!(info.archival_reason, ArchivalReason::Automatic);
542 }
543
544 #[test]
545 fn test_purge_request_validation() {
546 let request = PurgeRequest {
547 older_than: chrono::Utc::now() - Duration::days(365),
548 dry_run: true,
549 purged_by: Some("admin".to_string()),
550 };
551
552 assert!(request.dry_run);
553 assert_eq!(request.purged_by, Some("admin".to_string()));
554 }
555}