use acton_reactive::prelude::AgentHandleInterface;
use axum::{
extract::{Path, State},
http::StatusCode,
response::{IntoResponse, Response},
Json,
};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use crate::auth::{user::User, Authenticated};
use crate::jobs::{
agent::{
CancelJobRequest, ClearDeadLetterQueueRequest, GetMetricsRequest, RetryAllFailedRequest,
RetryJobRequest,
},
JobId,
};
use crate::state::ActonHtmxState;
#[derive(Debug, Serialize, Deserialize)]
pub struct JobListResponse {
pub jobs: Vec<JobInfo>,
pub total: usize,
pub message: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct JobInfo {
pub id: String,
pub job_type: String,
pub status: String,
pub created_at: String,
pub priority: i32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct JobStatsResponse {
pub total_enqueued: u64,
pub running: usize,
pub pending: usize,
pub completed: u64,
pub failed: u64,
pub dead_letter: u64,
pub avg_execution_ms: f64,
pub p95_execution_ms: f64,
pub p99_execution_ms: f64,
pub success_rate: f64,
pub message: String,
}
pub async fn list_jobs(
State(_state): State<ActonHtmxState>,
Authenticated(admin): Authenticated<User>,
) -> Result<Response, StatusCode> {
if !admin.roles.contains(&"admin".to_string()) {
tracing::warn!(
admin_id = admin.id,
"Non-admin attempted to list jobs"
);
return Err(StatusCode::FORBIDDEN);
}
let response = JobListResponse {
jobs: vec![],
total: 0,
message: "Job listing functionality will be enhanced in Phase 3".to_string(),
};
tracing::info!(
admin_id = admin.id,
"Admin retrieved job list"
);
Ok((StatusCode::OK, Json(response)).into_response())
}
#[allow(clippy::cast_precision_loss)] pub async fn job_stats(
State(state): State<ActonHtmxState>,
Authenticated(admin): Authenticated<User>,
) -> Result<Response, StatusCode> {
if !admin.roles.contains(&"admin".to_string()) {
tracing::warn!(
admin_id = admin.id,
"Non-admin attempted to view job statistics"
);
return Err(StatusCode::FORBIDDEN);
}
let (request, rx) = GetMetricsRequest::new();
state.job_agent().send(request).await;
let timeout = Duration::from_millis(100);
let metrics = tokio::time::timeout(timeout, rx)
.await
.map_err(|_| {
tracing::error!("Job metrics retrieval timeout");
StatusCode::REQUEST_TIMEOUT
})?
.map_err(|_| {
tracing::error!("Job metrics channel error");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let total_processed = metrics.jobs_completed + metrics.jobs_failed;
let success_rate = if total_processed > 0 {
(metrics.jobs_completed as f64 / total_processed as f64) * 100.0
} else {
100.0
};
let response = JobStatsResponse {
total_enqueued: metrics.jobs_enqueued,
running: metrics.current_running,
pending: metrics.current_queue_size,
completed: metrics.jobs_completed,
failed: metrics.jobs_failed,
dead_letter: metrics.jobs_in_dlq,
avg_execution_ms: metrics.avg_execution_time_ms as f64,
p95_execution_ms: metrics.p95_execution_time_ms as f64,
p99_execution_ms: metrics.p99_execution_time_ms as f64,
success_rate,
message: "Statistics retrieved successfully".to_string(),
};
tracing::info!(
admin_id = admin.id,
jobs_enqueued = metrics.jobs_enqueued,
jobs_completed = metrics.jobs_completed,
jobs_failed = metrics.jobs_failed,
"Admin retrieved job statistics"
);
Ok((StatusCode::OK, Json(response)).into_response())
}
pub async fn retry_job(
State(state): State<ActonHtmxState>,
Authenticated(admin): Authenticated<User>,
Path(job_id): Path<JobId>,
) -> Result<Response, StatusCode> {
if !admin.roles.contains(&"admin".to_string()) {
tracing::warn!(
admin_id = admin.id,
%job_id,
"Non-admin attempted to retry job"
);
return Err(StatusCode::FORBIDDEN);
}
let (request, rx) = RetryJobRequest::new(job_id);
state.job_agent().send(request).await;
let timeout = Duration::from_millis(100);
let success = tokio::time::timeout(timeout, rx)
.await
.map_err(|_| {
tracing::error!(%job_id, "Job retry timeout");
StatusCode::REQUEST_TIMEOUT
})?
.map_err(|_| {
tracing::error!(%job_id, "Job retry channel error");
StatusCode::INTERNAL_SERVER_ERROR
})?;
if success {
tracing::info!(
admin_id = admin.id,
%job_id,
"Job queued for retry"
);
Ok((
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"message": "Job queued for retry"
})),
)
.into_response())
} else {
tracing::warn!(
admin_id = admin.id,
%job_id,
"Job not found in dead letter queue"
);
Err(StatusCode::NOT_FOUND)
}
}
pub async fn retry_all_jobs(
State(state): State<ActonHtmxState>,
Authenticated(admin): Authenticated<User>,
) -> Result<Response, StatusCode> {
if !admin.roles.contains(&"admin".to_string()) {
tracing::warn!(
admin_id = admin.id,
"Non-admin attempted to retry all jobs"
);
return Err(StatusCode::FORBIDDEN);
}
let (request, rx) = RetryAllFailedRequest::new();
state.job_agent().send(request).await;
let timeout = Duration::from_millis(500);
let retried = tokio::time::timeout(timeout, rx)
.await
.map_err(|_| {
tracing::error!("Retry all jobs timeout");
StatusCode::REQUEST_TIMEOUT
})?
.map_err(|_| {
tracing::error!("Retry all jobs channel error");
StatusCode::INTERNAL_SERVER_ERROR
})?;
tracing::info!(
admin_id = admin.id,
retried,
"All failed jobs queued for retry"
);
Ok((
StatusCode::OK,
Json(serde_json::json!({
"retried": retried,
"message": format!("{retried} jobs queued for retry")
})),
)
.into_response())
}
pub async fn cancel_job(
State(state): State<ActonHtmxState>,
Authenticated(admin): Authenticated<User>,
Path(job_id): Path<JobId>,
) -> Result<Response, StatusCode> {
if !admin.roles.contains(&"admin".to_string()) {
tracing::warn!(
admin_id = admin.id,
%job_id,
"Non-admin attempted to cancel job"
);
return Err(StatusCode::FORBIDDEN);
}
let (request, rx) = CancelJobRequest::new(job_id);
state.job_agent().send(request).await;
let timeout = Duration::from_millis(100);
let success = tokio::time::timeout(timeout, rx)
.await
.map_err(|_| {
tracing::error!(%job_id, "Job cancel timeout");
StatusCode::REQUEST_TIMEOUT
})?
.map_err(|_| {
tracing::error!(%job_id, "Job cancel channel error");
StatusCode::INTERNAL_SERVER_ERROR
})?;
if success {
tracing::info!(
admin_id = admin.id,
%job_id,
"Job cancellation requested"
);
Ok((
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"message": "Job cancellation requested"
})),
)
.into_response())
} else {
tracing::warn!(
admin_id = admin.id,
%job_id,
"Job not found"
);
Err(StatusCode::NOT_FOUND)
}
}
pub async fn clear_dead_letter_queue(
State(state): State<ActonHtmxState>,
Authenticated(admin): Authenticated<User>,
) -> Result<Response, StatusCode> {
if !admin.roles.contains(&"admin".to_string()) {
tracing::warn!(
admin_id = admin.id,
"Non-admin attempted to clear dead letter queue"
);
return Err(StatusCode::FORBIDDEN);
}
let (request, rx) = ClearDeadLetterQueueRequest::new();
state.job_agent().send(request).await;
let timeout = Duration::from_millis(100);
let cleared = tokio::time::timeout(timeout, rx)
.await
.map_err(|_| {
tracing::error!("Clear dead letter queue timeout");
StatusCode::REQUEST_TIMEOUT
})?
.map_err(|_| {
tracing::error!("Clear dead letter queue channel error");
StatusCode::INTERNAL_SERVER_ERROR
})?;
tracing::info!(
admin_id = admin.id,
cleared,
"Dead letter queue cleared"
);
Ok((
StatusCode::OK,
Json(serde_json::json!({
"cleared": cleared,
"message": format!("{cleared} jobs removed from dead letter queue")
})),
)
.into_response())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_job_info_serialization() {
let job = JobInfo {
id: "job-123".to_string(),
job_type: "WelcomeEmail".to_string(),
status: "pending".to_string(),
created_at: "2025-11-22T10:00:00Z".to_string(),
priority: 10,
};
let json = serde_json::to_string(&job).unwrap();
assert!(json.contains("job-123"));
assert!(json.contains("WelcomeEmail"));
}
#[test]
fn test_job_stats_response_serialization() {
let stats = JobStatsResponse {
total_enqueued: 100,
running: 2,
pending: 5,
completed: 90,
failed: 3,
dead_letter: 0,
avg_execution_ms: 125.5,
p95_execution_ms: 450.0,
p99_execution_ms: 890.0,
success_rate: 96.8,
message: "Success".to_string(),
};
let json = serde_json::to_string(&stats).unwrap();
assert!(json.contains("\"total_enqueued\":100"));
assert!(json.contains("\"running\":2"));
assert!(json.contains("\"success_rate\":96.8"));
}
}