1use super::{ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams};
67use hammerwork::{
68 JobId, JobStatus,
69 archive::{ArchivalConfig, ArchivalPolicy, ArchivalReason, ArchivalStats, ArchivedJob},
70 queue::DatabaseQueue,
71};
72use serde::{Deserialize, Serialize};
73use std::sync::Arc;
74use warp::{Filter, Reply};
75
76#[derive(Debug, Deserialize)]
78pub struct ArchiveRequest {
79 pub queue_name: Option<String>,
81 pub reason: ArchivalReason,
83 pub archived_by: Option<String>,
85 pub dry_run: bool,
87 pub policy: Option<ArchivalPolicy>,
89 pub config: Option<ArchivalConfig>,
91}
92
93impl Default for ArchiveRequest {
94 fn default() -> Self {
95 Self {
96 queue_name: None,
97 reason: ArchivalReason::Manual,
98 archived_by: None,
99 dry_run: true,
100 policy: None,
101 config: None,
102 }
103 }
104}
105
106#[derive(Debug, Serialize)]
108pub struct ArchiveResponse {
109 pub stats: ArchivalStats,
111 pub dry_run: bool,
113 pub policy_used: ArchivalPolicy,
115 pub config_used: ArchivalConfig,
117}
118
119#[derive(Debug, Deserialize)]
121pub struct RestoreRequest {
122 pub reason: Option<String>,
124 pub restored_by: Option<String>,
126}
127
128#[derive(Debug, Serialize)]
130pub struct RestoreResponse {
131 pub job: hammerwork::Job,
133 pub restored_at: chrono::DateTime<chrono::Utc>,
135 pub restored_by: Option<String>,
137}
138
139#[derive(Debug, Deserialize)]
141pub struct PurgeRequest {
142 pub older_than: chrono::DateTime<chrono::Utc>,
144 pub dry_run: bool,
146 pub purged_by: Option<String>,
148}
149
150#[derive(Debug, Serialize)]
152pub struct PurgeResponse {
153 pub jobs_purged: u64,
155 pub dry_run: bool,
157 pub executed_at: chrono::DateTime<chrono::Utc>,
159}
160
161#[derive(Debug, Serialize)]
163pub struct ArchivedJobInfo {
164 pub id: JobId,
166 pub queue_name: String,
168 pub status: JobStatus,
170 pub created_at: chrono::DateTime<chrono::Utc>,
172 pub archived_at: chrono::DateTime<chrono::Utc>,
174 pub archival_reason: ArchivalReason,
176 pub original_payload_size: Option<usize>,
178 pub payload_compressed: bool,
180 pub archived_by: Option<String>,
182}
183
184impl From<ArchivedJob> for ArchivedJobInfo {
185 fn from(archived_job: ArchivedJob) -> Self {
186 Self {
187 id: archived_job.id,
188 queue_name: archived_job.queue_name,
189 status: archived_job.status,
190 created_at: archived_job.created_at,
191 archived_at: archived_job.archived_at,
192 archival_reason: archived_job.archival_reason,
193 original_payload_size: archived_job.original_payload_size,
194 payload_compressed: archived_job.payload_compressed,
195 archived_by: archived_job.archived_by,
196 }
197 }
198}
199
200#[derive(Debug, Deserialize)]
202pub struct PolicyRequest {
203 pub name: String,
205 pub policy: ArchivalPolicy,
207 pub active: bool,
209}
210
211#[derive(Debug, Serialize)]
213pub struct PolicyResponse {
214 pub name: String,
216 pub policy: ArchivalPolicy,
218 pub active: bool,
220 pub created_at: chrono::DateTime<chrono::Utc>,
222 pub modified_at: chrono::DateTime<chrono::Utc>,
224}
225
226#[derive(Debug, Serialize)]
228pub struct StatsResponse {
229 pub stats: ArchivalStats,
231 pub by_queue: std::collections::HashMap<String, ArchivalStats>,
233 pub recent_operations: Vec<RecentOperation>,
235}
236
237#[derive(Debug, Serialize)]
239pub struct RecentOperation {
240 pub operation_type: String,
242 pub queue_name: Option<String>,
244 pub jobs_affected: u64,
246 pub executed_at: chrono::DateTime<chrono::Utc>,
248 pub executed_by: Option<String>,
250 pub reason: Option<String>,
252}
253
254#[derive(Debug, Deserialize, Default)]
256pub struct ArchiveFilterParams {
257 pub queue: Option<String>,
259 pub reason: Option<String>,
261 pub archived_after: Option<chrono::DateTime<chrono::Utc>>,
263 pub archived_before: Option<chrono::DateTime<chrono::Utc>>,
265 pub archived_by: Option<String>,
267 pub compressed: Option<bool>,
269 pub original_status: Option<String>,
271}
272
273pub fn archive_routes<Q>(
275 queue: Arc<Q>,
276) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
277where
278 Q: DatabaseQueue + Send + Sync + 'static,
279{
280 let archive_jobs = warp::path!("api" / "archive" / "jobs")
281 .and(warp::post())
282 .and(warp::body::json())
283 .and(with_queue(queue.clone()))
284 .and_then(handle_archive_jobs);
285
286 let list_archived = warp::path!("api" / "archive" / "jobs")
287 .and(warp::get())
288 .and(super::with_pagination())
289 .and(with_archive_filters())
290 .and(with_queue(queue.clone()))
291 .and_then(handle_list_archived_jobs);
292
293 let restore_job = warp::path!("api" / "archive" / "jobs" / String / "restore")
294 .and(warp::post())
295 .and(warp::body::json())
296 .and(with_queue(queue.clone()))
297 .and_then(handle_restore_job);
298
299 let purge_jobs = warp::path!("api" / "archive" / "purge")
300 .and(warp::delete())
301 .and(warp::body::json())
302 .and(with_queue(queue.clone()))
303 .and_then(handle_purge_jobs);
304
305 let archive_stats = warp::path!("api" / "archive" / "stats")
306 .and(warp::get())
307 .and(warp::query::<FilterParams>())
308 .and(with_queue(queue.clone()))
309 .and_then(handle_archive_stats);
310
311 archive_jobs
312 .or(list_archived)
313 .or(restore_job)
314 .or(purge_jobs)
315 .or(archive_stats)
316}
317
318fn with_queue<Q>(
320 queue: Arc<Q>,
321) -> impl Filter<Extract = (Arc<Q>,), Error = std::convert::Infallible> + Clone
322where
323 Q: DatabaseQueue + Send + Sync + 'static,
324{
325 warp::any().map(move || queue.clone())
326}
327
328fn with_archive_filters()
330-> impl Filter<Extract = (ArchiveFilterParams,), Error = warp::Rejection> + Clone {
331 warp::query::<ArchiveFilterParams>()
332}
333
334async fn handle_archive_jobs<Q>(
336 request: ArchiveRequest,
337 queue: Arc<Q>,
338) -> Result<impl Reply, warp::Rejection>
339where
340 Q: DatabaseQueue + Send + Sync + 'static,
341{
342 let policy = request.policy.unwrap_or_default();
343 let config = request.config.unwrap_or_default();
344
345 if request.dry_run {
346 let response = ArchiveResponse {
348 stats: ArchivalStats::default(), dry_run: true,
350 policy_used: policy,
351 config_used: config,
352 };
353 return Ok(warp::reply::json(&ApiResponse::success(response)));
354 }
355
356 match queue
357 .archive_jobs(
358 request.queue_name.as_deref(),
359 &policy,
360 &config,
361 request.reason,
362 request.archived_by.as_deref(),
363 )
364 .await
365 {
366 Ok(stats) => {
367 let response = ArchiveResponse {
368 stats,
369 dry_run: false,
370 policy_used: policy,
371 config_used: config,
372 };
373 Ok(warp::reply::json(&ApiResponse::success(response)))
374 }
375 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
376 "Failed to archive jobs: {}",
377 e
378 )))),
379 }
380}
381
382async fn handle_list_archived_jobs<Q>(
384 pagination: PaginationParams,
385 filters: ArchiveFilterParams,
386 queue: Arc<Q>,
387) -> Result<impl Reply, warp::Rejection>
388where
389 Q: DatabaseQueue + Send + Sync + 'static,
390{
391 match queue
392 .list_archived_jobs(
393 filters.queue.as_deref(),
394 Some(pagination.get_limit()),
395 Some(pagination.get_offset()),
396 )
397 .await
398 {
399 Ok(archived_jobs) => {
400 let jobs: Vec<ArchivedJobInfo> = archived_jobs.into_iter().map(Into::into).collect();
402
403 let total = jobs.len() as u64;
406
407 let pagination_meta = PaginationMeta::new(&pagination, total);
408 let response = PaginatedResponse {
409 items: jobs,
410 pagination: pagination_meta,
411 };
412
413 Ok(warp::reply::json(&ApiResponse::success(response)))
414 }
415 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
416 "Failed to list archived jobs: {}",
417 e
418 )))),
419 }
420}
421
422async fn handle_restore_job<Q>(
424 job_id_str: String,
425 request: RestoreRequest,
426 queue: Arc<Q>,
427) -> Result<impl Reply, warp::Rejection>
428where
429 Q: DatabaseQueue + Send + Sync + 'static,
430{
431 let job_id = match uuid::Uuid::parse_str(&job_id_str) {
432 Ok(id) => id,
433 Err(_) => {
434 return Ok(warp::reply::json(&ApiResponse::<()>::error(
435 "Invalid job ID format".to_string(),
436 )));
437 }
438 };
439
440 match queue.restore_archived_job(job_id).await {
441 Ok(job) => {
442 let response = RestoreResponse {
443 job,
444 restored_at: chrono::Utc::now(),
445 restored_by: request.restored_by,
446 };
447 Ok(warp::reply::json(&ApiResponse::success(response)))
448 }
449 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
450 "Failed to restore job: {}",
451 e
452 )))),
453 }
454}
455
456async fn handle_purge_jobs<Q>(
458 request: PurgeRequest,
459 queue: Arc<Q>,
460) -> Result<impl Reply, warp::Rejection>
461where
462 Q: DatabaseQueue + Send + Sync + 'static,
463{
464 if request.dry_run {
465 let response = PurgeResponse {
468 jobs_purged: 0, dry_run: true,
470 executed_at: chrono::Utc::now(),
471 };
472 return Ok(warp::reply::json(&ApiResponse::success(response)));
473 }
474
475 match queue.purge_archived_jobs(request.older_than).await {
476 Ok(jobs_purged) => {
477 let response = PurgeResponse {
478 jobs_purged,
479 dry_run: false,
480 executed_at: chrono::Utc::now(),
481 };
482 Ok(warp::reply::json(&ApiResponse::success(response)))
483 }
484 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
485 "Failed to purge archived jobs: {}",
486 e
487 )))),
488 }
489}
490
491async fn handle_archive_stats<Q>(
493 filters: FilterParams,
494 queue: Arc<Q>,
495) -> Result<impl Reply, warp::Rejection>
496where
497 Q: DatabaseQueue + Send + Sync + 'static,
498{
499 match queue.get_archival_stats(filters.queue.as_deref()).await {
500 Ok(stats) => {
501 let response = StatsResponse {
504 stats,
505 by_queue: std::collections::HashMap::new(),
506 recent_operations: vec![],
507 };
508 Ok(warp::reply::json(&ApiResponse::success(response)))
509 }
510 Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
511 "Failed to get archive stats: {}",
512 e
513 )))),
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use chrono::Duration;
521
522 #[test]
523 fn test_archive_request_default() {
524 let request = ArchiveRequest::default();
525 assert!(request.dry_run);
526 assert_eq!(request.reason, ArchivalReason::Manual);
527 assert!(request.queue_name.is_none());
528 }
529
530 #[test]
531 fn test_archived_job_info_conversion() {
532 let archived_job = ArchivedJob {
533 id: uuid::Uuid::new_v4(),
534 queue_name: "test_queue".to_string(),
535 status: JobStatus::Completed,
536 created_at: chrono::Utc::now(),
537 archived_at: chrono::Utc::now(),
538 archival_reason: ArchivalReason::Automatic,
539 original_payload_size: Some(1024),
540 payload_compressed: true,
541 archived_by: Some("system".to_string()),
542 };
543
544 let info: ArchivedJobInfo = archived_job.into();
545 assert_eq!(info.queue_name, "test_queue");
546 assert!(info.payload_compressed);
547 assert_eq!(info.archival_reason, ArchivalReason::Automatic);
548 }
549
550 #[test]
551 fn test_purge_request_validation() {
552 let request = PurgeRequest {
553 older_than: chrono::Utc::now() - Duration::days(365),
554 dry_run: true,
555 purged_by: Some("admin".to_string()),
556 };
557
558 assert!(request.dry_run);
559 assert_eq!(request.purged_by, Some("admin".to_string()));
560 }
561}