use crate::jobs::{JobId, JobStatus};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{oneshot, Mutex};
pub type ResponseChannel<T> = Arc<Mutex<Option<oneshot::Sender<T>>>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnqueueJob {
pub id: JobId,
pub job_type: String,
pub payload: Vec<u8>,
pub priority: i32,
pub max_retries: u32,
pub timeout: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobEnqueued {
pub id: JobId,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetJobStatus {
pub id: JobId,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobStatusResponse {
pub id: JobId,
pub status: Option<JobStatus>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetMetrics;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct JobMetrics {
pub jobs_enqueued: u64,
pub jobs_dequeued: u64,
pub jobs_completed: u64,
pub jobs_failed: u64,
pub jobs_rejected: u64,
pub jobs_in_dlq: u64,
pub current_queue_size: usize,
pub current_running: usize,
pub total_execution_time_ms: u64,
pub avg_execution_time_ms: u64,
pub min_execution_time_ms: u64,
pub max_execution_time_ms: u64,
pub p50_execution_time_ms: u64,
pub p95_execution_time_ms: u64,
pub p99_execution_time_ms: u64,
}
impl JobMetrics {
pub const fn record_execution_time(&mut self, execution_time_ms: u64) {
self.total_execution_time_ms = self.total_execution_time_ms.saturating_add(execution_time_ms);
if self.min_execution_time_ms == 0 || execution_time_ms < self.min_execution_time_ms {
self.min_execution_time_ms = execution_time_ms;
}
if execution_time_ms > self.max_execution_time_ms {
self.max_execution_time_ms = execution_time_ms;
}
if self.jobs_completed > 0 {
self.avg_execution_time_ms = self.total_execution_time_ms / self.jobs_completed;
}
self.p50_execution_time_ms = self.avg_execution_time_ms;
self.p95_execution_time_ms = self.avg_execution_time_ms +
((self.max_execution_time_ms.saturating_sub(self.avg_execution_time_ms)) * 75 / 100);
self.p99_execution_time_ms = self.max_execution_time_ms;
}
#[must_use]
#[allow(clippy::cast_precision_loss)] pub fn failure_rate(&self) -> f64 {
let total = self.jobs_completed + self.jobs_failed;
if total == 0 {
0.0
} else {
(self.jobs_failed as f64 / total as f64) * 100.0
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(dead_code)] pub(super) struct ProcessJobs;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(dead_code)] pub(super) struct CleanupExpiredJobs;
#[derive(Clone, Debug)]
pub struct GetMetricsRequest {
pub response_tx: ResponseChannel<JobMetrics>,
}
impl GetMetricsRequest {
#[must_use]
pub fn new() -> (Self, oneshot::Receiver<JobMetrics>) {
let (tx, rx) = oneshot::channel();
let request = Self {
response_tx: Arc::new(Mutex::new(Some(tx))),
};
(request, rx)
}
}
#[derive(Clone, Debug)]
pub struct GetJobStatusRequest {
pub id: JobId,
pub response_tx: ResponseChannel<Option<JobStatus>>,
}
impl GetJobStatusRequest {
#[must_use]
pub fn new(id: JobId) -> (Self, oneshot::Receiver<Option<JobStatus>>) {
let (tx, rx) = oneshot::channel();
let request = Self {
id,
response_tx: Arc::new(Mutex::new(Some(tx))),
};
(request, rx)
}
}
#[derive(Clone, Debug)]
pub struct RetryJobRequest {
pub id: JobId,
pub response_tx: ResponseChannel<bool>,
}
impl RetryJobRequest {
#[must_use]
pub fn new(id: JobId) -> (Self, oneshot::Receiver<bool>) {
let (tx, rx) = oneshot::channel();
let request = Self {
id,
response_tx: Arc::new(Mutex::new(Some(tx))),
};
(request, rx)
}
}
#[derive(Clone, Debug)]
pub struct RetryAllFailedRequest {
pub response_tx: ResponseChannel<usize>,
}
impl RetryAllFailedRequest {
#[must_use]
pub fn new() -> (Self, oneshot::Receiver<usize>) {
let (tx, rx) = oneshot::channel();
let request = Self {
response_tx: Arc::new(Mutex::new(Some(tx))),
};
(request, rx)
}
}
#[derive(Clone, Debug)]
pub struct CancelJobRequest {
pub id: JobId,
pub response_tx: ResponseChannel<bool>,
}
impl CancelJobRequest {
#[must_use]
pub fn new(id: JobId) -> (Self, oneshot::Receiver<bool>) {
let (tx, rx) = oneshot::channel();
let request = Self {
id,
response_tx: Arc::new(Mutex::new(Some(tx))),
};
(request, rx)
}
}
#[derive(Clone, Debug)]
pub struct ClearDeadLetterQueueRequest {
pub response_tx: ResponseChannel<usize>,
}
impl ClearDeadLetterQueueRequest {
#[must_use]
pub fn new() -> (Self, oneshot::Receiver<usize>) {
let (tx, rx) = oneshot::channel();
let request = Self {
response_tx: Arc::new(Mutex::new(Some(tx))),
};
(request, rx)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobHistoryPage {
pub jobs: Vec<super::history::JobHistoryRecord>,
pub page: usize,
pub page_size: usize,
pub total_count: usize,
pub has_prev: bool,
pub has_next: bool,
}
impl JobHistoryPage {
#[must_use]
pub const fn new(
jobs: Vec<super::history::JobHistoryRecord>,
page: usize,
page_size: usize,
total_count: usize,
) -> Self {
let has_prev = page > 1;
let total_pages = total_count.div_ceil(page_size);
let has_next = page < total_pages;
Self {
jobs,
page,
page_size,
total_count,
has_prev,
has_next,
}
}
#[must_use]
pub fn prev_page(&self) -> usize {
self.page.saturating_sub(1).max(1)
}
#[must_use]
pub const fn next_page(&self) -> usize {
self.page + 1
}
#[must_use]
pub fn page_start(&self) -> usize {
if self.jobs.is_empty() {
0
} else {
(self.page - 1) * self.page_size + 1
}
}
#[must_use]
pub fn page_end(&self) -> usize {
if self.jobs.is_empty() {
0
} else {
self.page_start() + self.jobs.len() - 1
}
}
}
#[derive(Clone, Debug)]
pub struct GetJobHistoryRequest {
pub page: usize,
pub page_size: usize,
pub search_query: Option<String>,
pub response_tx: ResponseChannel<JobHistoryPage>,
}
impl GetJobHistoryRequest {
#[must_use]
pub fn new(
page: usize,
page_size: usize,
search_query: Option<String>,
) -> (Self, oneshot::Receiver<JobHistoryPage>) {
let (tx, rx) = oneshot::channel();
let request = Self {
page: page.max(1), page_size: page_size.clamp(1, 100), search_query,
response_tx: Arc::new(Mutex::new(Some(tx))),
};
(request, rx)
}
}