acton_htmx/jobs/agent/
messages.rs

1//! Messages for the job agent.
2
3use crate::jobs::{JobId, JobStatus};
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{oneshot, Mutex};
8
9/// Response channel type for web handler pattern.
10///
11/// Wraps `oneshot::Sender` in `Arc<Mutex<Option<T>>>` to satisfy
12/// `Clone + Debug` requirements for acton-reactive messages.
13pub type ResponseChannel<T> = Arc<Mutex<Option<oneshot::Sender<T>>>>;
14
15/// Enqueue a new job for processing.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct EnqueueJob {
18    /// Unique job identifier.
19    pub id: JobId,
20    /// Job type name.
21    pub job_type: String,
22    /// Serialized job payload.
23    pub payload: Vec<u8>,
24    /// Job priority (higher = more important).
25    pub priority: i32,
26    /// Maximum number of retry attempts.
27    pub max_retries: u32,
28    /// Job execution timeout.
29    pub timeout: Duration,
30}
31
32/// Response to job enqueue request.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct JobEnqueued {
35    /// The enqueued job ID.
36    pub id: JobId,
37}
38
39/// Get the status of a job (agent-to-agent pattern).
40///
41/// **Deprecated**: Use [`GetJobStatusRequest`] for web handlers.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct GetJobStatus {
44    /// Job ID to query.
45    pub id: JobId,
46}
47
48/// Response containing job status (agent-to-agent pattern).
49///
50/// **Deprecated**: Use [`GetJobStatusRequest`] for web handlers.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct JobStatusResponse {
53    /// Job ID.
54    pub id: JobId,
55    /// Current status (None if job not found).
56    pub status: Option<JobStatus>,
57}
58
59/// Request job metrics (agent-to-agent pattern).
60///
61/// **Deprecated**: Use [`GetMetricsRequest`] for web handlers.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct GetMetrics;
64
65/// Job processing metrics.
66#[derive(Debug, Clone, Default, Serialize, Deserialize)]
67pub struct JobMetrics {
68    /// Total jobs enqueued.
69    pub jobs_enqueued: u64,
70    /// Total jobs dequeued.
71    pub jobs_dequeued: u64,
72    /// Total jobs completed successfully.
73    pub jobs_completed: u64,
74    /// Total jobs failed.
75    pub jobs_failed: u64,
76    /// Total jobs rejected (queue full).
77    pub jobs_rejected: u64,
78    /// Total jobs in dead letter queue.
79    pub jobs_in_dlq: u64,
80    /// Current queue size.
81    pub current_queue_size: usize,
82    /// Current number of running jobs.
83    pub current_running: usize,
84    /// Total execution time in milliseconds.
85    pub total_execution_time_ms: u64,
86    /// Average execution time in milliseconds.
87    pub avg_execution_time_ms: u64,
88    /// Minimum execution time in milliseconds.
89    pub min_execution_time_ms: u64,
90    /// Maximum execution time in milliseconds.
91    pub max_execution_time_ms: u64,
92    /// P50 (median) execution time in milliseconds.
93    pub p50_execution_time_ms: u64,
94    /// P95 execution time in milliseconds.
95    pub p95_execution_time_ms: u64,
96    /// P99 execution time in milliseconds.
97    pub p99_execution_time_ms: u64,
98}
99
100impl JobMetrics {
101    /// Update metrics with a completed job execution time.
102    ///
103    /// This updates percentile calculations using a simple streaming algorithm.
104    /// For production use, consider using a histogram library like `hdrhistogram`.
105    pub const fn record_execution_time(&mut self, execution_time_ms: u64) {
106        self.total_execution_time_ms = self.total_execution_time_ms.saturating_add(execution_time_ms);
107
108        // Update min/max
109        if self.min_execution_time_ms == 0 || execution_time_ms < self.min_execution_time_ms {
110            self.min_execution_time_ms = execution_time_ms;
111        }
112        if execution_time_ms > self.max_execution_time_ms {
113            self.max_execution_time_ms = execution_time_ms;
114        }
115
116        // Update average
117        if self.jobs_completed > 0 {
118            self.avg_execution_time_ms = self.total_execution_time_ms / self.jobs_completed;
119        }
120
121        // Simple percentile estimation (will be replaced with histogram in production)
122        // For now, use max as p99, avg as p50, and interpolate p95
123        self.p50_execution_time_ms = self.avg_execution_time_ms;
124        self.p95_execution_time_ms = self.avg_execution_time_ms +
125            ((self.max_execution_time_ms.saturating_sub(self.avg_execution_time_ms)) * 75 / 100);
126        self.p99_execution_time_ms = self.max_execution_time_ms;
127    }
128
129    /// Calculate failure rate as percentage (0-100).
130    #[must_use]
131    #[allow(clippy::cast_precision_loss)] // Acceptable for metrics
132    pub fn failure_rate(&self) -> f64 {
133        let total = self.jobs_completed + self.jobs_failed;
134        if total == 0 {
135            0.0
136        } else {
137            (self.jobs_failed as f64 / total as f64) * 100.0
138        }
139    }
140}
141
142/// Internal message to trigger job processing.
143#[derive(Debug, Clone, Serialize, Deserialize)]
144#[allow(dead_code)] // Will be used in Week 5 for job processing loop
145pub(super) struct ProcessJobs;
146
147/// Internal message to cleanup expired jobs.
148#[derive(Debug, Clone, Serialize, Deserialize)]
149#[allow(dead_code)] // Will be used in Week 5 for cleanup scheduling
150pub(super) struct CleanupExpiredJobs;
151
152// ============================================================================
153// Web Handler Pattern Messages (HTTP handler to agent communication)
154// ============================================================================
155
156/// Request job metrics (web handler pattern).
157///
158/// Used by HTTP handlers to query job statistics. Uses oneshot channel
159/// for response to avoid blocking the handler.
160///
161/// # Example
162///
163/// ```rust,ignore
164/// use acton_htmx::jobs::agent::messages::GetMetricsRequest;
165/// use std::time::Duration;
166///
167/// async fn handler(State(state): State<ActonHtmxState>) -> Result<Response> {
168///     let (request, rx) = GetMetricsRequest::new();
169///     state.job_agent().send(request).await;
170///
171///     let timeout = Duration::from_millis(100);
172///     let metrics = tokio::time::timeout(timeout, rx).await??;
173///
174///     Ok(Json(metrics).into_response())
175/// }
176/// ```
177#[derive(Clone, Debug)]
178pub struct GetMetricsRequest {
179    /// Response channel for metrics.
180    pub response_tx: ResponseChannel<JobMetrics>,
181}
182
183impl GetMetricsRequest {
184    /// Create a new metrics request with response channel.
185    ///
186    /// Returns a tuple of (request, receiver) where the request should be
187    /// sent to the agent and the receiver awaited for the response.
188    #[must_use]
189    pub fn new() -> (Self, oneshot::Receiver<JobMetrics>) {
190        let (tx, rx) = oneshot::channel();
191        let request = Self {
192            response_tx: Arc::new(Mutex::new(Some(tx))),
193        };
194        (request, rx)
195    }
196}
197
198/// Request job status (web handler pattern).
199///
200/// Used by HTTP handlers to query the status of a specific job.
201/// Uses oneshot channel for response to avoid blocking the handler.
202///
203/// # Example
204///
205/// ```rust,ignore
206/// use acton_htmx::jobs::agent::messages::GetJobStatusRequest;
207/// use std::time::Duration;
208///
209/// async fn handler(
210///     State(state): State<ActonHtmxState>,
211///     Path(job_id): Path<JobId>,
212/// ) -> Result<Response> {
213///     let (request, rx) = GetJobStatusRequest::new(job_id);
214///     state.job_agent().send(request).await;
215///
216///     let timeout = Duration::from_millis(100);
217///     let status = tokio::time::timeout(timeout, rx).await??;
218///
219///     Ok(Json(status).into_response())
220/// }
221/// ```
222#[derive(Clone, Debug)]
223pub struct GetJobStatusRequest {
224    /// Job ID to query.
225    pub id: JobId,
226    /// Response channel for status.
227    pub response_tx: ResponseChannel<Option<JobStatus>>,
228}
229
230impl GetJobStatusRequest {
231    /// Create a new job status request with response channel.
232    ///
233    /// Returns a tuple of (request, receiver) where the request should be
234    /// sent to the agent and the receiver awaited for the response.
235    #[must_use]
236    pub fn new(id: JobId) -> (Self, oneshot::Receiver<Option<JobStatus>>) {
237        let (tx, rx) = oneshot::channel();
238        let request = Self {
239            id,
240            response_tx: Arc::new(Mutex::new(Some(tx))),
241        };
242        (request, rx)
243    }
244}
245
246/// Retry a failed job (web handler pattern).
247///
248/// Re-queues a job from the dead letter queue back into the main queue
249/// for another execution attempt.
250///
251/// # Example
252///
253/// ```rust,ignore
254/// use acton_htmx::jobs::agent::messages::RetryJobRequest;
255///
256/// async fn handler(
257///     State(state): State<ActonHtmxState>,
258///     Path(job_id): Path<JobId>,
259/// ) -> Result<Response> {
260///     let (request, rx) = RetryJobRequest::new(job_id);
261///     state.job_agent().send(request).await;
262///
263///     let success = tokio::time::timeout(Duration::from_millis(100), rx).await??;
264///     Ok(if success {
265///         StatusCode::OK
266///     } else {
267///         StatusCode::NOT_FOUND
268///     }.into_response())
269/// }
270/// ```
271#[derive(Clone, Debug)]
272pub struct RetryJobRequest {
273    /// Job ID to retry.
274    pub id: JobId,
275    /// Response channel indicating success.
276    pub response_tx: ResponseChannel<bool>,
277}
278
279impl RetryJobRequest {
280    /// Create a new retry job request with response channel.
281    ///
282    /// Returns a tuple of (request, receiver) where the request should be
283    /// sent to the agent and the receiver awaited for the response.
284    #[must_use]
285    pub fn new(id: JobId) -> (Self, oneshot::Receiver<bool>) {
286        let (tx, rx) = oneshot::channel();
287        let request = Self {
288            id,
289            response_tx: Arc::new(Mutex::new(Some(tx))),
290        };
291        (request, rx)
292    }
293}
294
295/// Retry all failed jobs (web handler pattern).
296///
297/// Re-queues all jobs from the dead letter queue back into the main queue.
298/// Returns the number of jobs successfully retried.
299///
300/// # Example
301///
302/// ```rust,ignore
303/// use acton_htmx::jobs::agent::messages::RetryAllFailedRequest;
304///
305/// async fn handler(State(state): State<ActonHtmxState>) -> Result<Response> {
306///     let (request, rx) = RetryAllFailedRequest::new();
307///     state.job_agent().send(request).await;
308///
309///     let count = tokio::time::timeout(Duration::from_millis(500), rx).await??;
310///     Ok(Json(json!({ "retried": count })).into_response())
311/// }
312/// ```
313#[derive(Clone, Debug)]
314pub struct RetryAllFailedRequest {
315    /// Response channel with count of retried jobs.
316    pub response_tx: ResponseChannel<usize>,
317}
318
319impl RetryAllFailedRequest {
320    /// Create a new retry all failed request with response channel.
321    ///
322    /// Returns a tuple of (request, receiver) where the request should be
323    /// sent to the agent and the receiver awaited for the response.
324    #[must_use]
325    pub fn new() -> (Self, oneshot::Receiver<usize>) {
326        let (tx, rx) = oneshot::channel();
327        let request = Self {
328            response_tx: Arc::new(Mutex::new(Some(tx))),
329        };
330        (request, rx)
331    }
332}
333
334/// Cancel a running or pending job (web handler pattern).
335///
336/// Attempts to cancel a job. If the job is pending, it's removed from the queue.
337/// If it's currently running, a cancellation signal is sent.
338///
339/// # Example
340///
341/// ```rust,ignore
342/// use acton_htmx::jobs::agent::messages::CancelJobRequest;
343///
344/// async fn handler(
345///     State(state): State<ActonHtmxState>,
346///     Path(job_id): Path<JobId>,
347/// ) -> Result<Response> {
348///     let (request, rx) = CancelJobRequest::new(job_id);
349///     state.job_agent().send(request).await;
350///
351///     let success = tokio::time::timeout(Duration::from_millis(100), rx).await??;
352///     Ok(if success {
353///         StatusCode::OK
354///     } else {
355///         StatusCode::NOT_FOUND
356///     }.into_response())
357/// }
358/// ```
359#[derive(Clone, Debug)]
360pub struct CancelJobRequest {
361    /// Job ID to cancel.
362    pub id: JobId,
363    /// Response channel indicating success.
364    pub response_tx: ResponseChannel<bool>,
365}
366
367impl CancelJobRequest {
368    /// Create a new cancel job request with response channel.
369    ///
370    /// Returns a tuple of (request, receiver) where the request should be
371    /// sent to the agent and the receiver awaited for the response.
372    #[must_use]
373    pub fn new(id: JobId) -> (Self, oneshot::Receiver<bool>) {
374        let (tx, rx) = oneshot::channel();
375        let request = Self {
376            id,
377            response_tx: Arc::new(Mutex::new(Some(tx))),
378        };
379        (request, rx)
380    }
381}
382
383/// Clear the dead letter queue (web handler pattern).
384///
385/// Permanently removes all jobs from the dead letter queue.
386/// This operation cannot be undone.
387///
388/// # Example
389///
390/// ```rust,ignore
391/// use acton_htmx::jobs::agent::messages::ClearDeadLetterQueueRequest;
392///
393/// async fn handler(State(state): State<ActonHtmxState>) -> Result<Response> {
394///     let (request, rx) = ClearDeadLetterQueueRequest::new();
395///     state.job_agent().send(request).await;
396///
397///     let count = tokio::time::timeout(Duration::from_millis(100), rx).await??;
398///     Ok(Json(json!({ "cleared": count })).into_response())
399/// }
400/// ```
401#[derive(Clone, Debug)]
402pub struct ClearDeadLetterQueueRequest {
403    /// Response channel with count of cleared jobs.
404    pub response_tx: ResponseChannel<usize>,
405}
406
407impl ClearDeadLetterQueueRequest {
408    /// Create a new clear dead letter queue request with response channel.
409    ///
410    /// Returns a tuple of (request, receiver) where the request should be
411    /// sent to the agent and the receiver awaited for the response.
412    #[must_use]
413    pub fn new() -> (Self, oneshot::Receiver<usize>) {
414        let (tx, rx) = oneshot::channel();
415        let request = Self {
416            response_tx: Arc::new(Mutex::new(Some(tx))),
417        };
418        (request, rx)
419    }
420}
421
422/// Job history page response containing records and pagination info.
423#[derive(Debug, Clone, Serialize, Deserialize)]
424pub struct JobHistoryPage {
425    /// Job history records for this page.
426    pub jobs: Vec<super::history::JobHistoryRecord>,
427    /// Current page number (1-indexed).
428    pub page: usize,
429    /// Number of records per page.
430    pub page_size: usize,
431    /// Total number of matching records across all pages.
432    pub total_count: usize,
433    /// Whether there is a previous page.
434    pub has_prev: bool,
435    /// Whether there is a next page.
436    pub has_next: bool,
437}
438
439impl JobHistoryPage {
440    /// Create a new job history page from records and pagination info.
441    #[must_use]
442    pub const fn new(
443        jobs: Vec<super::history::JobHistoryRecord>,
444        page: usize,
445        page_size: usize,
446        total_count: usize,
447    ) -> Self {
448        let has_prev = page > 1;
449        let total_pages = total_count.div_ceil(page_size);
450        let has_next = page < total_pages;
451
452        Self {
453            jobs,
454            page,
455            page_size,
456            total_count,
457            has_prev,
458            has_next,
459        }
460    }
461
462    /// Get the previous page number.
463    #[must_use]
464    pub fn prev_page(&self) -> usize {
465        self.page.saturating_sub(1).max(1)
466    }
467
468    /// Get the next page number.
469    #[must_use]
470    pub const fn next_page(&self) -> usize {
471        self.page + 1
472    }
473
474    /// Get the starting record number for this page (1-indexed).
475    #[must_use]
476    pub fn page_start(&self) -> usize {
477        if self.jobs.is_empty() {
478            0
479        } else {
480            (self.page - 1) * self.page_size + 1
481        }
482    }
483
484    /// Get the ending record number for this page (1-indexed).
485    #[must_use]
486    pub fn page_end(&self) -> usize {
487        if self.jobs.is_empty() {
488            0
489        } else {
490            self.page_start() + self.jobs.len() - 1
491        }
492    }
493}
494
495/// Request job history with pagination and search (web handler pattern).
496///
497/// Retrieves completed job history with optional search filtering
498/// and pagination support.
499///
500/// # Example
501///
502/// ```rust,ignore
503/// use acton_htmx::jobs::agent::messages::GetJobHistoryRequest;
504///
505/// async fn handler(
506///     State(state): State<ActonHtmxState>,
507///     Query(params): Query<HistoryParams>,
508/// ) -> Result<Response> {
509///     let (request, rx) = GetJobHistoryRequest::new(
510///         params.page.unwrap_or(1),
511///         params.page_size.unwrap_or(20),
512///         params.search,
513///     );
514///     state.job_agent().send(request).await;
515///
516///     let timeout = Duration::from_millis(200);
517///     let history = tokio::time::timeout(timeout, rx).await??;
518///
519///     Ok(Json(history).into_response())
520/// }
521/// ```
522#[derive(Clone, Debug)]
523pub struct GetJobHistoryRequest {
524    /// Page number (1-indexed).
525    pub page: usize,
526    /// Number of records per page.
527    pub page_size: usize,
528    /// Optional search query to filter results.
529    pub search_query: Option<String>,
530    /// Response channel for history page.
531    pub response_tx: ResponseChannel<JobHistoryPage>,
532}
533
534impl GetJobHistoryRequest {
535    /// Create a new job history request with response channel.
536    ///
537    /// # Arguments
538    ///
539    /// * `page` - Page number (1-indexed)
540    /// * `page_size` - Number of records per page
541    /// * `search_query` - Optional search string to filter records
542    ///
543    /// Returns a tuple of (request, receiver) where the request should be
544    /// sent to the agent and the receiver awaited for the response.
545    #[must_use]
546    pub fn new(
547        page: usize,
548        page_size: usize,
549        search_query: Option<String>,
550    ) -> (Self, oneshot::Receiver<JobHistoryPage>) {
551        let (tx, rx) = oneshot::channel();
552        let request = Self {
553            page: page.max(1), // Ensure page is at least 1
554            page_size: page_size.clamp(1, 100), // Clamp between 1-100
555            search_query,
556            response_tx: Arc::new(Mutex::new(Some(tx))),
557        };
558        (request, rx)
559    }
560}