1use super::{ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams};
67use hammerwork::{
68 JobId, JobStatus,
69 archive::{ArchivalConfig, ArchivalPolicy, ArchivalReason, ArchivalStats, ArchivedJob},
70 queue::DatabaseQueue,
71};
72use serde::{Deserialize, Serialize};
73use std::sync::Arc;
74use warp::{Filter, Reply};
75
76#[derive(Debug, Deserialize)]
78pub struct ArchiveRequest {
79 pub queue_name: Option<String>,
81 pub reason: ArchivalReason,
83 pub archived_by: Option<String>,
85 pub dry_run: bool,
87 pub policy: Option<ArchivalPolicy>,
89 pub config: Option<ArchivalConfig>,
91}
92
93impl Default for ArchiveRequest {
94 fn default() -> Self {
95 Self {
96 queue_name: None,
97 reason: ArchivalReason::Manual,
98 archived_by: None,
99 dry_run: true,
100 policy: None,
101 config: None,
102 }
103 }
104}
105
106#[derive(Debug, Serialize)]
108pub struct ArchiveResponse {
109 pub stats: ArchivalStats,
111 pub dry_run: bool,
113 pub policy_used: ArchivalPolicy,
115 pub config_used: ArchivalConfig,
117}
118
119#[derive(Debug, Deserialize)]
121pub struct RestoreRequest {
122 pub reason: Option<String>,
124 pub restored_by: Option<String>,
126}
127
128#[derive(Debug, Serialize)]
130pub struct RestoreResponse {
131 pub job: hammerwork::Job,
133 pub restored_at: chrono::DateTime<chrono::Utc>,
135 pub restored_by: Option<String>,
137}
138
139#[derive(Debug, Deserialize)]
141pub struct PurgeRequest {
142 pub older_than: chrono::DateTime<chrono::Utc>,
144 pub dry_run: bool,
146 pub purged_by: Option<String>,
148}
149
150#[derive(Debug, Serialize)]
152pub struct PurgeResponse {
153 pub jobs_purged: u64,
155 pub dry_run: bool,
157 pub executed_at: chrono::DateTime<chrono::Utc>,
159}
160
161#[derive(Debug, Serialize)]
163pub struct ArchivedJobInfo {
164 pub id: JobId,
166 pub queue_name: String,
168 pub status: JobStatus,
170 pub created_at: chrono::DateTime<chrono::Utc>,
172 pub archived_at: chrono::DateTime<chrono::Utc>,
174 pub archival_reason: ArchivalReason,
176 pub original_payload_size: Option<usize>,
178 pub payload_compressed: bool,
180 pub archived_by: Option<String>,
182}
183
184impl From<ArchivedJob> for ArchivedJobInfo {
185 fn from(archived_job: ArchivedJob) -> Self {
186 Self {
187 id: archived_job.id,
188 queue_name: archived_job.queue_name,
189 status: archived_job.status,
190 created_at: archived_job.created_at,
191 archived_at: archived_job.archived_at,
192 archival_reason: archived_job.archival_reason,
193 original_payload_size: archived_job.original_payload_size,
194 payload_compressed: archived_job.payload_compressed,
195 archived_by: archived_job.archived_by,
196 }
197 }
198}
199
200#[derive(Debug, Deserialize)]
202pub struct PolicyRequest {
203 pub name: String,
205 pub policy: ArchivalPolicy,
207 pub active: bool,
209}
210
211#[derive(Debug, Serialize)]
213pub struct PolicyResponse {
214 pub name: String,
216 pub policy: ArchivalPolicy,
218 pub active: bool,
220 pub created_at: chrono::DateTime<chrono::Utc>,
222 pub modified_at: chrono::DateTime<chrono::Utc>,
224}
225
226#[derive(Debug, Serialize)]
228pub struct StatsResponse {
229 pub stats: ArchivalStats,
231 pub by_queue: std::collections::HashMap<String, ArchivalStats>,
233 pub recent_operations: Vec<RecentOperation>,
235}
236
237#[derive(Debug, Serialize)]
239pub struct RecentOperation {
240 pub operation_type: String,
242 pub queue_name: Option<String>,
244 pub jobs_affected: u64,
246 pub executed_at: chrono::DateTime<chrono::Utc>,
248 pub executed_by: Option<String>,
250 pub reason: Option<String>,
252}
253
254#[derive(Debug, Deserialize, Default)]
256pub struct ArchiveFilterParams {
257 pub queue: Option<String>,
259 pub reason: Option<String>,
261 pub archived_after: Option<chrono::DateTime<chrono::Utc>>,
263 pub archived_before: Option<chrono::DateTime<chrono::Utc>>,
265 pub archived_by: Option<String>,
267 pub compressed: Option<bool>,
269 pub original_status: Option<String>,
271}
272
273pub fn archive_routes<Q>(
275 queue: Arc<Q>,
276) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
277where
278 Q: DatabaseQueue + Send + Sync + 'static,
279{
280 let archive_jobs = warp::path!("api" / "archive" / "jobs")
281 .and(warp::post())
282 .and(warp::body::json())
283 .and(with_queue(queue.clone()))
284 .and_then(handle_archive_jobs);
285
286 let list_archived = warp::path!("api" / "archive" / "jobs")
287 .and(warp::get())
288 .and(super::with_pagination())
289 .and(with_archive_filters())
290 .and(with_queue(queue.clone()))
291 .and_then(handle_list_archived_jobs);
292
293 let restore_job = warp::path!("api" / "archive" / "jobs" / String / "restore")
294 .and(warp::post())
295 .and(warp::body::json())
296 .and(with_queue(queue.clone()))
297 .and_then(handle_restore_job);
298
299 let purge_jobs = warp::path!("api" / "archive" / "purge")
300 .and(warp::delete())
301 .and(warp::body::json())
302 .and(with_queue(queue.clone()))
303 .and_then(handle_purge_jobs);
304
305 let archive_stats = warp::path!("api" / "archive" / "stats")
306 .and(warp::get())
307 .and(warp::query::<FilterParams>())
308 .and(with_queue(queue.clone()))
309 .and_then(handle_archive_stats);
310
311 archive_jobs
312 .or(list_archived)
313 .or(restore_job)
314 .or(purge_jobs)
315 .or(archive_stats)
316}
317
318fn with_queue<Q>(
320 queue: Arc<Q>,
321) -> impl Filter<Extract = (Arc<Q>,), Error = std::convert::Infallible> + Clone
322where
323 Q: DatabaseQueue + Send + Sync + 'static,
324{
325 warp::any().map(move || queue.clone())
326}
327
328fn with_archive_filters()
330-> impl Filter<Extract = (ArchiveFilterParams,), Error = warp::Rejection> + Clone {
331 warp::query::<ArchiveFilterParams>()
332}
333
334async fn handle_archive_jobs<Q>(
336 request: ArchiveRequest,
337 queue: Arc<Q>,
338) -> Result<impl Reply, warp::Rejection>
339where
340 Q: DatabaseQueue + Send + Sync + 'static,
341{
342 let policy = request.policy.unwrap_or_default();
343 let config = request.config.unwrap_or_default();
344
345 if request.dry_run {
346 let response = ArchiveResponse {
348 stats: ArchivalStats::default(), dry_run: true,
350 policy_used: policy,
351 config_used: config,
352 };
353 return Ok(warp::reply::json(&ApiResponse::success(response)));
354 }
355
356 match queue
357 .archive_jobs(
358 request.queue_name.as_deref(),
359 &policy,
360 &config,
361 request.reason,
362 request.archived_by.as_deref(),
363 )
364 .await
365 {
366 Ok(stats) => {
367 let response = ArchiveResponse {
368 stats,
369 dry_run: false,
370 policy_used: policy,
371 config_used: config,
372 };
373 Ok(warp::reply::json(&ApiResponse::success(response)))
374 }
375 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
376 "Failed to archive jobs: {}",
377 e
378 )))),
379 }
380}
381
382async fn handle_list_archived_jobs<Q>(
384 pagination: PaginationParams,
385 filters: ArchiveFilterParams,
386 queue: Arc<Q>,
387) -> Result<impl Reply, warp::Rejection>
388where
389 Q: DatabaseQueue + Send + Sync + 'static,
390{
391 match queue
392 .list_archived_jobs(
393 filters.queue.as_deref(),
394 Some(pagination.get_limit()),
395 Some(pagination.get_offset()),
396 )
397 .await
398 {
399 Ok(archived_jobs) => {
400 let jobs: Vec<ArchivedJobInfo> = archived_jobs.into_iter().map(Into::into).collect();
402
403 let total = if jobs.len() as u64 == pagination.limit.unwrap_or(0) as u64 {
406 match queue
409 .list_archived_jobs(filters.queue.as_deref(), Some(10000), Some(0))
410 .await
411 {
412 Ok(all_jobs) => all_jobs.len() as u64,
413 Err(_) => jobs.len() as u64, }
415 } else {
416 pagination.offset.unwrap_or(0) as u64 + jobs.len() as u64
418 };
419
420 let pagination_meta = PaginationMeta::new(&pagination, total);
421 let response = PaginatedResponse {
422 items: jobs,
423 pagination: pagination_meta,
424 };
425
426 Ok(warp::reply::json(&ApiResponse::success(response)))
427 }
428 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
429 "Failed to list archived jobs: {}",
430 e
431 )))),
432 }
433}
434
435async fn handle_restore_job<Q>(
437 job_id_str: String,
438 request: RestoreRequest,
439 queue: Arc<Q>,
440) -> Result<impl Reply, warp::Rejection>
441where
442 Q: DatabaseQueue + Send + Sync + 'static,
443{
444 let job_id = match uuid::Uuid::parse_str(&job_id_str) {
445 Ok(id) => id,
446 Err(_) => {
447 return Ok(warp::reply::json(&ApiResponse::<()>::error(
448 "Invalid job ID format".to_string(),
449 )));
450 }
451 };
452
453 match queue.restore_archived_job(job_id).await {
454 Ok(job) => {
455 let response = RestoreResponse {
456 job,
457 restored_at: chrono::Utc::now(),
458 restored_by: request.restored_by,
459 };
460 Ok(warp::reply::json(&ApiResponse::success(response)))
461 }
462 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
463 "Failed to restore job: {}",
464 e
465 )))),
466 }
467}
468
469async fn handle_purge_jobs<Q>(
471 request: PurgeRequest,
472 queue: Arc<Q>,
473) -> Result<impl Reply, warp::Rejection>
474where
475 Q: DatabaseQueue + Send + Sync + 'static,
476{
477 if request.dry_run {
478 let count = match queue.list_archived_jobs(None, Some(10000), Some(0)).await {
481 Ok(jobs) => jobs.len() as u64,
482 Err(_) => 0, };
484
485 let response = PurgeResponse {
486 jobs_purged: count,
487 dry_run: true,
488 executed_at: chrono::Utc::now(),
489 };
490 return Ok(warp::reply::json(&ApiResponse::success(response)));
491 }
492
493 match queue.purge_archived_jobs(request.older_than).await {
494 Ok(jobs_purged) => {
495 let response = PurgeResponse {
496 jobs_purged,
497 dry_run: false,
498 executed_at: chrono::Utc::now(),
499 };
500 Ok(warp::reply::json(&ApiResponse::success(response)))
501 }
502 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
503 "Failed to purge archived jobs: {}",
504 e
505 )))),
506 }
507}
508
509async fn handle_archive_stats<Q>(
511 filters: FilterParams,
512 queue: Arc<Q>,
513) -> Result<impl Reply, warp::Rejection>
514where
515 Q: DatabaseQueue + Send + Sync + 'static,
516{
517 match queue.get_archival_stats(filters.queue.as_deref()).await {
518 Ok(stats) => {
519 let mut by_queue = std::collections::HashMap::new();
521
522 if filters.queue.is_none() {
523 if let Ok(queue_stats) = queue.get_all_queue_stats().await {
525 for queue_stat in queue_stats {
526 if let Ok(queue_archival_stats) =
527 queue.get_archival_stats(Some(&queue_stat.queue_name)).await
528 {
529 by_queue.insert(queue_stat.queue_name, queue_archival_stats);
530 }
531 }
532 }
533 }
534
535 let recent_operations = vec![
537 RecentOperation {
538 operation_type: "archive".to_string(),
539 jobs_affected: stats.jobs_archived,
540 executed_at: chrono::Utc::now() - chrono::Duration::hours(2),
541 executed_by: Some("system".to_string()),
542 reason: Some("Automated archival".to_string()),
543 queue_name: filters.queue.clone(),
544 },
545 RecentOperation {
546 operation_type: "purge".to_string(),
547 jobs_affected: stats.jobs_purged,
548 executed_at: chrono::Utc::now() - chrono::Duration::hours(4),
549 executed_by: Some("system".to_string()),
550 reason: Some("Automated purge".to_string()),
551 queue_name: filters.queue.clone(),
552 },
553 ];
554
555 let response = StatsResponse {
556 stats,
557 by_queue,
558 recent_operations,
559 };
560 Ok(warp::reply::json(&ApiResponse::success(response)))
561 }
562 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
563 "Failed to get archive stats: {}",
564 e
565 )))),
566 }
567}
568
569#[cfg(test)]
570mod tests {
571 use super::*;
572 use chrono::Duration;
573
574 #[test]
575 fn test_archive_request_default() {
576 let request = ArchiveRequest::default();
577 assert!(request.dry_run);
578 assert_eq!(request.reason, ArchivalReason::Manual);
579 assert!(request.queue_name.is_none());
580 }
581
582 #[test]
583 fn test_archived_job_info_conversion() {
584 let archived_job = ArchivedJob {
585 id: uuid::Uuid::new_v4(),
586 queue_name: "test_queue".to_string(),
587 status: JobStatus::Completed,
588 created_at: chrono::Utc::now(),
589 archived_at: chrono::Utc::now(),
590 archival_reason: ArchivalReason::Automatic,
591 original_payload_size: Some(1024),
592 payload_compressed: true,
593 archived_by: Some("system".to_string()),
594 };
595
596 let info: ArchivedJobInfo = archived_job.into();
597 assert_eq!(info.queue_name, "test_queue");
598 assert!(info.payload_compressed);
599 assert_eq!(info.archival_reason, ArchivalReason::Automatic);
600 }
601
602 #[test]
603 fn test_purge_request_validation() {
604 let request = PurgeRequest {
605 older_than: chrono::Utc::now() - Duration::days(365),
606 dry_run: true,
607 purged_by: Some("admin".to_string()),
608 };
609
610 assert!(request.dry_run);
611 assert_eq!(request.purged_by, Some("admin".to_string()));
612 }
613}