use super::{ApiResponse, FilterParams, PaginatedResponse, PaginationMeta, PaginationParams};
use hammerwork::{
JobId, JobStatus,
archive::{ArchivalConfig, ArchivalPolicy, ArchivalReason, ArchivalStats, ArchivedJob},
queue::DatabaseQueue,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use warp::{Filter, Reply};
#[derive(Debug, Deserialize)]
pub struct ArchiveRequest {
pub queue_name: Option<String>,
pub reason: ArchivalReason,
pub archived_by: Option<String>,
pub dry_run: bool,
pub policy: Option<ArchivalPolicy>,
pub config: Option<ArchivalConfig>,
}
impl Default for ArchiveRequest {
fn default() -> Self {
Self {
queue_name: None,
reason: ArchivalReason::Manual,
archived_by: None,
dry_run: true,
policy: None,
config: None,
}
}
}
#[derive(Debug, Serialize)]
pub struct ArchiveResponse {
pub stats: ArchivalStats,
pub dry_run: bool,
pub policy_used: ArchivalPolicy,
pub config_used: ArchivalConfig,
}
#[derive(Debug, Deserialize)]
pub struct RestoreRequest {
pub reason: Option<String>,
pub restored_by: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct RestoreResponse {
pub job: hammerwork::Job,
pub restored_at: chrono::DateTime<chrono::Utc>,
pub restored_by: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct PurgeRequest {
pub older_than: chrono::DateTime<chrono::Utc>,
pub dry_run: bool,
pub purged_by: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct PurgeResponse {
pub jobs_purged: u64,
pub dry_run: bool,
pub executed_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Serialize)]
pub struct ArchivedJobInfo {
pub id: JobId,
pub queue_name: String,
pub status: JobStatus,
pub created_at: chrono::DateTime<chrono::Utc>,
pub archived_at: chrono::DateTime<chrono::Utc>,
pub archival_reason: ArchivalReason,
pub original_payload_size: Option<usize>,
pub payload_compressed: bool,
pub archived_by: Option<String>,
}
impl From<ArchivedJob> for ArchivedJobInfo {
fn from(archived_job: ArchivedJob) -> Self {
Self {
id: archived_job.id,
queue_name: archived_job.queue_name,
status: archived_job.status,
created_at: archived_job.created_at,
archived_at: archived_job.archived_at,
archival_reason: archived_job.archival_reason,
original_payload_size: archived_job.original_payload_size,
payload_compressed: archived_job.payload_compressed,
archived_by: archived_job.archived_by,
}
}
}
#[derive(Debug, Deserialize)]
pub struct PolicyRequest {
pub name: String,
pub policy: ArchivalPolicy,
pub active: bool,
}
#[derive(Debug, Serialize)]
pub struct PolicyResponse {
pub name: String,
pub policy: ArchivalPolicy,
pub active: bool,
pub created_at: chrono::DateTime<chrono::Utc>,
pub modified_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Serialize)]
pub struct StatsResponse {
pub stats: ArchivalStats,
pub by_queue: std::collections::HashMap<String, ArchivalStats>,
pub recent_operations: Vec<RecentOperation>,
}
#[derive(Debug, Serialize)]
pub struct RecentOperation {
pub operation_type: String,
pub queue_name: Option<String>,
pub jobs_affected: u64,
pub executed_at: chrono::DateTime<chrono::Utc>,
pub executed_by: Option<String>,
pub reason: Option<String>,
}
#[derive(Debug, Deserialize, Default)]
pub struct ArchiveFilterParams {
pub queue: Option<String>,
pub reason: Option<String>,
pub archived_after: Option<chrono::DateTime<chrono::Utc>>,
pub archived_before: Option<chrono::DateTime<chrono::Utc>>,
pub archived_by: Option<String>,
pub compressed: Option<bool>,
pub original_status: Option<String>,
}
pub fn archive_routes<Q>(
queue: Arc<Q>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
where
Q: DatabaseQueue + Send + Sync + 'static,
{
let archive_jobs = warp::path!("api" / "archive" / "jobs")
.and(warp::post())
.and(warp::body::json())
.and(with_queue(queue.clone()))
.and_then(handle_archive_jobs);
let list_archived = warp::path!("api" / "archive" / "jobs")
.and(warp::get())
.and(super::with_pagination())
.and(with_archive_filters())
.and(with_queue(queue.clone()))
.and_then(handle_list_archived_jobs);
let restore_job = warp::path!("api" / "archive" / "jobs" / String / "restore")
.and(warp::post())
.and(warp::body::json())
.and(with_queue(queue.clone()))
.and_then(handle_restore_job);
let purge_jobs = warp::path!("api" / "archive" / "purge")
.and(warp::delete())
.and(warp::body::json())
.and(with_queue(queue.clone()))
.and_then(handle_purge_jobs);
let archive_stats = warp::path!("api" / "archive" / "stats")
.and(warp::get())
.and(warp::query::<FilterParams>())
.and(with_queue(queue.clone()))
.and_then(handle_archive_stats);
archive_jobs
.or(list_archived)
.or(restore_job)
.or(purge_jobs)
.or(archive_stats)
}
fn with_queue<Q>(
queue: Arc<Q>,
) -> impl Filter<Extract = (Arc<Q>,), Error = std::convert::Infallible> + Clone
where
Q: DatabaseQueue + Send + Sync + 'static,
{
warp::any().map(move || queue.clone())
}
fn with_archive_filters()
-> impl Filter<Extract = (ArchiveFilterParams,), Error = warp::Rejection> + Clone {
warp::query::<ArchiveFilterParams>()
}
async fn handle_archive_jobs<Q>(
request: ArchiveRequest,
queue: Arc<Q>,
) -> Result<impl Reply, warp::Rejection>
where
Q: DatabaseQueue + Send + Sync + 'static,
{
let policy = request.policy.unwrap_or_default();
let config = request.config.unwrap_or_default();
if request.dry_run {
let response = ArchiveResponse {
stats: ArchivalStats::default(), dry_run: true,
policy_used: policy,
config_used: config,
};
return Ok(warp::reply::json(&ApiResponse::success(response)));
}
match queue
.archive_jobs(
request.queue_name.as_deref(),
&policy,
&config,
request.reason,
request.archived_by.as_deref(),
)
.await
{
Ok(stats) => {
let response = ArchiveResponse {
stats,
dry_run: false,
policy_used: policy,
config_used: config,
};
Ok(warp::reply::json(&ApiResponse::success(response)))
}
Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
"Failed to archive jobs: {}",
e
)))),
}
}
async fn handle_list_archived_jobs<Q>(
pagination: PaginationParams,
filters: ArchiveFilterParams,
queue: Arc<Q>,
) -> Result<impl Reply, warp::Rejection>
where
Q: DatabaseQueue + Send + Sync + 'static,
{
match queue
.list_archived_jobs(
filters.queue.as_deref(),
Some(pagination.get_limit()),
Some(pagination.get_offset()),
)
.await
{
Ok(archived_jobs) => {
let jobs: Vec<ArchivedJobInfo> = archived_jobs.into_iter().map(Into::into).collect();
let total = if jobs.len() as u64 == pagination.limit.unwrap_or(0) as u64 {
match queue
.list_archived_jobs(filters.queue.as_deref(), Some(10000), Some(0))
.await
{
Ok(all_jobs) => all_jobs.len() as u64,
Err(_) => jobs.len() as u64, }
} else {
pagination.offset.unwrap_or(0) as u64 + jobs.len() as u64
};
let pagination_meta = PaginationMeta::new(&pagination, total);
let response = PaginatedResponse {
items: jobs,
pagination: pagination_meta,
};
Ok(warp::reply::json(&ApiResponse::success(response)))
}
Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
"Failed to list archived jobs: {}",
e
)))),
}
}
async fn handle_restore_job<Q>(
job_id_str: String,
request: RestoreRequest,
queue: Arc<Q>,
) -> Result<impl Reply, warp::Rejection>
where
Q: DatabaseQueue + Send + Sync + 'static,
{
let job_id = match uuid::Uuid::parse_str(&job_id_str) {
Ok(id) => id,
Err(_) => {
return Ok(warp::reply::json(&ApiResponse::<()>::error(
"Invalid job ID format".to_string(),
)));
}
};
match queue.restore_archived_job(job_id).await {
Ok(job) => {
let response = RestoreResponse {
job,
restored_at: chrono::Utc::now(),
restored_by: request.restored_by,
};
Ok(warp::reply::json(&ApiResponse::success(response)))
}
Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
"Failed to restore job: {}",
e
)))),
}
}
async fn handle_purge_jobs<Q>(
request: PurgeRequest,
queue: Arc<Q>,
) -> Result<impl Reply, warp::Rejection>
where
Q: DatabaseQueue + Send + Sync + 'static,
{
if request.dry_run {
let count = match queue.list_archived_jobs(None, Some(10000), Some(0)).await {
Ok(jobs) => jobs.len() as u64,
Err(_) => 0, };
let response = PurgeResponse {
jobs_purged: count,
dry_run: true,
executed_at: chrono::Utc::now(),
};
return Ok(warp::reply::json(&ApiResponse::success(response)));
}
match queue.purge_archived_jobs(request.older_than).await {
Ok(jobs_purged) => {
let response = PurgeResponse {
jobs_purged,
dry_run: false,
executed_at: chrono::Utc::now(),
};
Ok(warp::reply::json(&ApiResponse::success(response)))
}
Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
"Failed to purge archived jobs: {}",
e
)))),
}
}
async fn handle_archive_stats<Q>(
filters: FilterParams,
queue: Arc<Q>,
) -> Result<impl Reply, warp::Rejection>
where
Q: DatabaseQueue + Send + Sync + 'static,
{
match queue.get_archival_stats(filters.queue.as_deref()).await {
Ok(stats) => {
let mut by_queue = std::collections::HashMap::new();
if filters.queue.is_none() {
if let Ok(queue_stats) = queue.get_all_queue_stats().await {
for queue_stat in queue_stats {
if let Ok(queue_archival_stats) =
queue.get_archival_stats(Some(&queue_stat.queue_name)).await
{
by_queue.insert(queue_stat.queue_name, queue_archival_stats);
}
}
}
}
let recent_operations = vec![
RecentOperation {
operation_type: "archive".to_string(),
jobs_affected: stats.jobs_archived,
executed_at: chrono::Utc::now() - chrono::Duration::hours(2),
executed_by: Some("system".to_string()),
reason: Some("Automated archival".to_string()),
queue_name: filters.queue.clone(),
},
RecentOperation {
operation_type: "purge".to_string(),
jobs_affected: stats.jobs_purged,
executed_at: chrono::Utc::now() - chrono::Duration::hours(4),
executed_by: Some("system".to_string()),
reason: Some("Automated purge".to_string()),
queue_name: filters.queue.clone(),
},
];
let response = StatsResponse {
stats,
by_queue,
recent_operations,
};
Ok(warp::reply::json(&ApiResponse::success(response)))
}
Err(e) => Ok(warp::reply::json(&ApiResponse::<()>::error(format!(
"Failed to get archive stats: {}",
e
)))),
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration;
#[test]
fn test_archive_request_default() {
let request = ArchiveRequest::default();
assert!(request.dry_run);
assert_eq!(request.reason, ArchivalReason::Manual);
assert!(request.queue_name.is_none());
}
#[test]
fn test_archived_job_info_conversion() {
let archived_job = ArchivedJob {
id: uuid::Uuid::new_v4(),
queue_name: "test_queue".to_string(),
status: JobStatus::Completed,
created_at: chrono::Utc::now(),
archived_at: chrono::Utc::now(),
archival_reason: ArchivalReason::Automatic,
original_payload_size: Some(1024),
payload_compressed: true,
archived_by: Some("system".to_string()),
};
let info: ArchivedJobInfo = archived_job.into();
assert_eq!(info.queue_name, "test_queue");
assert!(info.payload_compressed);
assert_eq!(info.archival_reason, ArchivalReason::Automatic);
}
#[test]
fn test_purge_request_validation() {
let request = PurgeRequest {
older_than: chrono::Utc::now() - Duration::days(365),
dry_run: true,
purged_by: Some("admin".to_string()),
};
assert!(request.dry_run);
assert_eq!(request.purged_by, Some("admin".to_string()));
}
}