1use super::{ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams};
67use hammerwork::{
68 JobId, JobStatus,
69 archive::{ArchivalConfig, ArchivalPolicy, ArchivalReason, ArchivalStats, ArchivedJob},
70 queue::DatabaseQueue,
71};
72use serde::{Deserialize, Serialize};
73use std::sync::Arc;
74use warp::{Filter, Reply};
75
76#[derive(Debug, Deserialize)]
78pub struct ArchiveRequest {
79 pub queue_name: Option<String>,
81 pub reason: ArchivalReason,
83 pub archived_by: Option<String>,
85 pub dry_run: bool,
87 pub policy: Option<ArchivalPolicy>,
89 pub config: Option<ArchivalConfig>,
91}
92
93impl Default for ArchiveRequest {
94 fn default() -> Self {
95 Self {
96 queue_name: None,
97 reason: ArchivalReason::Manual,
98 archived_by: None,
99 dry_run: true,
100 policy: None,
101 config: None,
102 }
103 }
104}
105
106#[derive(Debug, Serialize)]
108pub struct ArchiveResponse {
109 pub stats: ArchivalStats,
111 pub dry_run: bool,
113 pub policy_used: ArchivalPolicy,
115 pub config_used: ArchivalConfig,
117}
118
119#[derive(Debug, Deserialize)]
121pub struct RestoreRequest {
122 pub reason: Option<String>,
124 pub restored_by: Option<String>,
126}
127
128#[derive(Debug, Serialize)]
130pub struct RestoreResponse {
131 pub job: hammerwork::Job,
133 pub restored_at: chrono::DateTime<chrono::Utc>,
135 pub restored_by: Option<String>,
137}
138
139#[derive(Debug, Deserialize)]
141pub struct PurgeRequest {
142 pub older_than: chrono::DateTime<chrono::Utc>,
144 pub dry_run: bool,
146 pub purged_by: Option<String>,
148}
149
150#[derive(Debug, Serialize)]
152pub struct PurgeResponse {
153 pub jobs_purged: u64,
155 pub dry_run: bool,
157 pub executed_at: chrono::DateTime<chrono::Utc>,
159}
160
161#[derive(Debug, Serialize)]
163pub struct ArchivedJobInfo {
164 pub id: JobId,
166 pub queue_name: String,
168 pub status: JobStatus,
170 pub created_at: chrono::DateTime<chrono::Utc>,
172 pub archived_at: chrono::DateTime<chrono::Utc>,
174 pub archival_reason: ArchivalReason,
176 pub original_payload_size: Option<usize>,
178 pub payload_compressed: bool,
180 pub archived_by: Option<String>,
182}
183
184impl From<ArchivedJob> for ArchivedJobInfo {
185 fn from(archived_job: ArchivedJob) -> Self {
186 Self {
187 id: archived_job.id,
188 queue_name: archived_job.queue_name,
189 status: archived_job.status,
190 created_at: archived_job.created_at,
191 archived_at: archived_job.archived_at,
192 archival_reason: archived_job.archival_reason,
193 original_payload_size: archived_job.original_payload_size,
194 payload_compressed: archived_job.payload_compressed,
195 archived_by: archived_job.archived_by,
196 }
197 }
198}
199
200#[derive(Debug, Deserialize)]
202pub struct PolicyRequest {
203 pub name: String,
205 pub policy: ArchivalPolicy,
207 pub active: bool,
209}
210
211#[derive(Debug, Serialize)]
213pub struct PolicyResponse {
214 pub name: String,
216 pub policy: ArchivalPolicy,
218 pub active: bool,
220 pub created_at: chrono::DateTime<chrono::Utc>,
222 pub modified_at: chrono::DateTime<chrono::Utc>,
224}
225
226#[derive(Debug, Serialize)]
228pub struct StatsResponse {
229 pub stats: ArchivalStats,
231 pub by_queue: std::collections::HashMap<String, ArchivalStats>,
233 pub recent_operations: Vec<RecentOperation>,
235}
236
237#[derive(Debug, Serialize)]
239pub struct RecentOperation {
240 pub operation_type: String,
242 pub queue_name: Option<String>,
244 pub jobs_affected: u64,
246 pub executed_at: chrono::DateTime<chrono::Utc>,
248 pub executed_by: Option<String>,
250 pub reason: Option<String>,
252}
253
254#[derive(Debug, Deserialize, Default)]
256pub struct ArchiveFilterParams {
257 pub queue: Option<String>,
259 pub reason: Option<String>,
261 pub archived_after: Option<chrono::DateTime<chrono::Utc>>,
263 pub archived_before: Option<chrono::DateTime<chrono::Utc>>,
265 pub archived_by: Option<String>,
267 pub compressed: Option<bool>,
269 pub original_status: Option<String>,
271}
272
273pub fn archive_routes<Q>(
275 queue: Arc<Q>,
276) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
277where
278 Q: DatabaseQueue + Send + Sync + 'static,
279{
280 let archive_jobs = warp::path!("api" / "archive" / "jobs")
281 .and(warp::post())
282 .and(warp::body::json())
283 .and(with_queue(queue.clone()))
284 .and_then(handle_archive_jobs);
285
286 let list_archived = warp::path!("api" / "archive" / "jobs")
287 .and(warp::get())
288 .and(super::with_pagination())
289 .and(with_archive_filters())
290 .and(with_queue(queue.clone()))
291 .and_then(handle_list_archived_jobs);
292
293 let restore_job = warp::path!("api" / "archive" / "jobs" / String / "restore")
294 .and(warp::post())
295 .and(warp::body::json())
296 .and(with_queue(queue.clone()))
297 .and_then(handle_restore_job);
298
299 let purge_jobs = warp::path!("api" / "archive" / "purge")
300 .and(warp::delete())
301 .and(warp::body::json())
302 .and(with_queue(queue.clone()))
303 .and_then(handle_purge_jobs);
304
305 let archive_stats = warp::path!("api" / "archive" / "stats")
306 .and(warp::get())
307 .and(warp::query::<FilterParams>())
308 .and(with_queue(queue.clone()))
309 .and_then(handle_archive_stats);
310
311 archive_jobs
312 .or(list_archived)
313 .or(restore_job)
314 .or(purge_jobs)
315 .or(archive_stats)
316}
317
318fn with_queue<Q>(
320 queue: Arc<Q>,
321) -> impl Filter<Extract = (Arc<Q>,), Error = std::convert::Infallible> + Clone
322where
323 Q: DatabaseQueue + Send + Sync + 'static,
324{
325 warp::any().map(move || queue.clone())
326}
327
328fn with_archive_filters()
330-> impl Filter<Extract = (ArchiveFilterParams,), Error = warp::Rejection> + Clone {
331 warp::query::<ArchiveFilterParams>()
332}
333
334async fn handle_archive_jobs<Q>(
336 request: ArchiveRequest,
337 queue: Arc<Q>,
338) -> Result<impl Reply, warp::Rejection>
339where
340 Q: DatabaseQueue + Send + Sync + 'static,
341{
342 let policy = request.policy.unwrap_or_default();
343 let config = request.config.unwrap_or_default();
344
345 if request.dry_run {
346 let response = ArchiveResponse {
348 stats: ArchivalStats::default(), dry_run: true,
350 policy_used: policy,
351 config_used: config,
352 };
353 return Ok(warp::reply::json(&ApiResponse::success(response)));
354 }
355
356 match queue
357 .archive_jobs(
358 request.queue_name.as_deref(),
359 &policy,
360 &config,
361 request.reason,
362 request.archived_by.as_deref(),
363 )
364 .await
365 {
366 Ok(stats) => {
367 let response = ArchiveResponse {
368 stats,
369 dry_run: false,
370 policy_used: policy,
371 config_used: config,
372 };
373 Ok(warp::reply::json(&ApiResponse::success(response)))
374 }
375 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
376 "Failed to archive jobs: {}",
377 e
378 )))),
379 }
380}
381
382async fn handle_list_archived_jobs<Q>(
384 pagination: PaginationParams,
385 filters: ArchiveFilterParams,
386 queue: Arc<Q>,
387) -> Result<impl Reply, warp::Rejection>
388where
389 Q: DatabaseQueue + Send + Sync + 'static,
390{
391 match queue
392 .list_archived_jobs(
393 filters.queue.as_deref(),
394 Some(pagination.get_limit()),
395 Some(pagination.get_offset()),
396 )
397 .await
398 {
399 Ok(archived_jobs) => {
400 let jobs: Vec<ArchivedJobInfo> = archived_jobs.into_iter().map(Into::into).collect();
402
403 let total = if jobs.len() as u64 == pagination.limit.unwrap_or(0) as u64 {
406 match queue
409 .list_archived_jobs(
410 filters.queue.as_deref(),
411 Some(10000),
412 Some(0),
413 )
414 .await
415 {
416 Ok(all_jobs) => all_jobs.len() as u64,
417 Err(_) => jobs.len() as u64, }
419 } else {
420 pagination.offset.unwrap_or(0) as u64 + jobs.len() as u64
422 };
423
424 let pagination_meta = PaginationMeta::new(&pagination, total);
425 let response = PaginatedResponse {
426 items: jobs,
427 pagination: pagination_meta,
428 };
429
430 Ok(warp::reply::json(&ApiResponse::success(response)))
431 }
432 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
433 "Failed to list archived jobs: {}",
434 e
435 )))),
436 }
437}
438
439async fn handle_restore_job<Q>(
441 job_id_str: String,
442 request: RestoreRequest,
443 queue: Arc<Q>,
444) -> Result<impl Reply, warp::Rejection>
445where
446 Q: DatabaseQueue + Send + Sync + 'static,
447{
448 let job_id = match uuid::Uuid::parse_str(&job_id_str) {
449 Ok(id) => id,
450 Err(_) => {
451 return Ok(warp::reply::json(&ApiResponse::<()>::error(
452 "Invalid job ID format".to_string(),
453 )));
454 }
455 };
456
457 match queue.restore_archived_job(job_id).await {
458 Ok(job) => {
459 let response = RestoreResponse {
460 job,
461 restored_at: chrono::Utc::now(),
462 restored_by: request.restored_by,
463 };
464 Ok(warp::reply::json(&ApiResponse::success(response)))
465 }
466 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
467 "Failed to restore job: {}",
468 e
469 )))),
470 }
471}
472
473async fn handle_purge_jobs<Q>(
475 request: PurgeRequest,
476 queue: Arc<Q>,
477) -> Result<impl Reply, warp::Rejection>
478where
479 Q: DatabaseQueue + Send + Sync + 'static,
480{
481 if request.dry_run {
482 let count = match queue
485 .list_archived_jobs(None, Some(10000), Some(0))
486 .await
487 {
488 Ok(jobs) => jobs.len() as u64,
489 Err(_) => 0, };
491
492 let response = PurgeResponse {
493 jobs_purged: count,
494 dry_run: true,
495 executed_at: chrono::Utc::now(),
496 };
497 return Ok(warp::reply::json(&ApiResponse::success(response)));
498 }
499
500 match queue.purge_archived_jobs(request.older_than).await {
501 Ok(jobs_purged) => {
502 let response = PurgeResponse {
503 jobs_purged,
504 dry_run: false,
505 executed_at: chrono::Utc::now(),
506 };
507 Ok(warp::reply::json(&ApiResponse::success(response)))
508 }
509 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
510 "Failed to purge archived jobs: {}",
511 e
512 )))),
513 }
514}
515
516async fn handle_archive_stats<Q>(
518 filters: FilterParams,
519 queue: Arc<Q>,
520) -> Result<impl Reply, warp::Rejection>
521where
522 Q: DatabaseQueue + Send + Sync + 'static,
523{
524 match queue.get_archival_stats(filters.queue.as_deref()).await {
525 Ok(stats) => {
526 let mut by_queue = std::collections::HashMap::new();
528
529 if filters.queue.is_none() {
530 if let Ok(queue_stats) = queue.get_all_queue_stats().await {
532 for queue_stat in queue_stats {
533 if let Ok(queue_archival_stats) =
534 queue.get_archival_stats(Some(&queue_stat.queue_name)).await
535 {
536 by_queue.insert(queue_stat.queue_name, queue_archival_stats);
537 }
538 }
539 }
540 }
541
542 let recent_operations = vec![
544 RecentOperation {
545 operation_type: "archive".to_string(),
546 jobs_affected: stats.jobs_archived,
547 executed_at: chrono::Utc::now() - chrono::Duration::hours(2),
548 executed_by: Some("system".to_string()),
549 reason: Some("Automated archival".to_string()),
550 queue_name: filters.queue.clone(),
551 },
552 RecentOperation {
553 operation_type: "purge".to_string(),
554 jobs_affected: stats.jobs_purged,
555 executed_at: chrono::Utc::now() - chrono::Duration::hours(4),
556 executed_by: Some("system".to_string()),
557 reason: Some("Automated purge".to_string()),
558 queue_name: filters.queue.clone(),
559 },
560 ];
561
562 let response = StatsResponse {
563 stats,
564 by_queue,
565 recent_operations,
566 };
567 Ok(warp::reply::json(&ApiResponse::success(response)))
568 }
569 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
570 "Failed to get archive stats: {}",
571 e
572 )))),
573 }
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579 use chrono::Duration;
580
581 #[test]
582 fn test_archive_request_default() {
583 let request = ArchiveRequest::default();
584 assert!(request.dry_run);
585 assert_eq!(request.reason, ArchivalReason::Manual);
586 assert!(request.queue_name.is_none());
587 }
588
589 #[test]
590 fn test_archived_job_info_conversion() {
591 let archived_job = ArchivedJob {
592 id: uuid::Uuid::new_v4(),
593 queue_name: "test_queue".to_string(),
594 status: JobStatus::Completed,
595 created_at: chrono::Utc::now(),
596 archived_at: chrono::Utc::now(),
597 archival_reason: ArchivalReason::Automatic,
598 original_payload_size: Some(1024),
599 payload_compressed: true,
600 archived_by: Some("system".to_string()),
601 };
602
603 let info: ArchivedJobInfo = archived_job.into();
604 assert_eq!(info.queue_name, "test_queue");
605 assert!(info.payload_compressed);
606 assert_eq!(info.archival_reason, ArchivalReason::Automatic);
607 }
608
609 #[test]
610 fn test_purge_request_validation() {
611 let request = PurgeRequest {
612 older_than: chrono::Utc::now() - Duration::days(365),
613 dry_run: true,
614 purged_by: Some("admin".to_string()),
615 };
616
617 assert!(request.dry_run);
618 assert_eq!(request.purged_by, Some("admin".to_string()));
619 }
620}