acton_htmx/jobs/agent/messages.rs
1//! Messages for the job agent.
2
3use crate::jobs::{JobId, JobStatus};
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{oneshot, Mutex};
8
9/// Response channel type for web handler pattern.
10///
11/// Wraps `oneshot::Sender` in `Arc<Mutex<Option<T>>>` to satisfy
12/// `Clone + Debug` requirements for acton-reactive messages.
13pub type ResponseChannel<T> = Arc<Mutex<Option<oneshot::Sender<T>>>>;
14
15/// Enqueue a new job for processing.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct EnqueueJob {
18 /// Unique job identifier.
19 pub id: JobId,
20 /// Job type name.
21 pub job_type: String,
22 /// Serialized job payload.
23 pub payload: Vec<u8>,
24 /// Job priority (higher = more important).
25 pub priority: i32,
26 /// Maximum number of retry attempts.
27 pub max_retries: u32,
28 /// Job execution timeout.
29 pub timeout: Duration,
30}
31
32/// Response to job enqueue request.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct JobEnqueued {
35 /// The enqueued job ID.
36 pub id: JobId,
37}
38
39/// Get the status of a job (agent-to-agent pattern).
40///
41/// **Deprecated**: Use [`GetJobStatusRequest`] for web handlers.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct GetJobStatus {
44 /// Job ID to query.
45 pub id: JobId,
46}
47
48/// Response containing job status (agent-to-agent pattern).
49///
50/// **Deprecated**: Use [`GetJobStatusRequest`] for web handlers.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct JobStatusResponse {
53 /// Job ID.
54 pub id: JobId,
55 /// Current status (None if job not found).
56 pub status: Option<JobStatus>,
57}
58
59/// Request job metrics (agent-to-agent pattern).
60///
61/// **Deprecated**: Use [`GetMetricsRequest`] for web handlers.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct GetMetrics;
64
65/// Job processing metrics.
66#[derive(Debug, Clone, Default, Serialize, Deserialize)]
67pub struct JobMetrics {
68 /// Total jobs enqueued.
69 pub jobs_enqueued: u64,
70 /// Total jobs dequeued.
71 pub jobs_dequeued: u64,
72 /// Total jobs completed successfully.
73 pub jobs_completed: u64,
74 /// Total jobs failed.
75 pub jobs_failed: u64,
76 /// Total jobs rejected (queue full).
77 pub jobs_rejected: u64,
78 /// Total jobs in dead letter queue.
79 pub jobs_in_dlq: u64,
80 /// Current queue size.
81 pub current_queue_size: usize,
82 /// Current number of running jobs.
83 pub current_running: usize,
84 /// Total execution time in milliseconds.
85 pub total_execution_time_ms: u64,
86 /// Average execution time in milliseconds.
87 pub avg_execution_time_ms: u64,
88 /// Minimum execution time in milliseconds.
89 pub min_execution_time_ms: u64,
90 /// Maximum execution time in milliseconds.
91 pub max_execution_time_ms: u64,
92 /// P50 (median) execution time in milliseconds.
93 pub p50_execution_time_ms: u64,
94 /// P95 execution time in milliseconds.
95 pub p95_execution_time_ms: u64,
96 /// P99 execution time in milliseconds.
97 pub p99_execution_time_ms: u64,
98}
99
100impl JobMetrics {
101 /// Update metrics with a completed job execution time.
102 ///
103 /// This updates percentile calculations using a simple streaming algorithm.
104 /// For production use, consider using a histogram library like `hdrhistogram`.
105 pub const fn record_execution_time(&mut self, execution_time_ms: u64) {
106 self.total_execution_time_ms = self.total_execution_time_ms.saturating_add(execution_time_ms);
107
108 // Update min/max
109 if self.min_execution_time_ms == 0 || execution_time_ms < self.min_execution_time_ms {
110 self.min_execution_time_ms = execution_time_ms;
111 }
112 if execution_time_ms > self.max_execution_time_ms {
113 self.max_execution_time_ms = execution_time_ms;
114 }
115
116 // Update average
117 if self.jobs_completed > 0 {
118 self.avg_execution_time_ms = self.total_execution_time_ms / self.jobs_completed;
119 }
120
121 // Simple percentile estimation (will be replaced with histogram in production)
122 // For now, use max as p99, avg as p50, and interpolate p95
123 self.p50_execution_time_ms = self.avg_execution_time_ms;
124 self.p95_execution_time_ms = self.avg_execution_time_ms +
125 ((self.max_execution_time_ms.saturating_sub(self.avg_execution_time_ms)) * 75 / 100);
126 self.p99_execution_time_ms = self.max_execution_time_ms;
127 }
128
129 /// Calculate failure rate as percentage (0-100).
130 #[must_use]
131 #[allow(clippy::cast_precision_loss)] // Acceptable for metrics
132 pub fn failure_rate(&self) -> f64 {
133 let total = self.jobs_completed + self.jobs_failed;
134 if total == 0 {
135 0.0
136 } else {
137 (self.jobs_failed as f64 / total as f64) * 100.0
138 }
139 }
140}
141
142/// Internal message to trigger job processing.
143#[derive(Debug, Clone, Serialize, Deserialize)]
144#[allow(dead_code)] // Will be used in Week 5 for job processing loop
145pub(super) struct ProcessJobs;
146
147/// Internal message to cleanup expired jobs.
148#[derive(Debug, Clone, Serialize, Deserialize)]
149#[allow(dead_code)] // Will be used in Week 5 for cleanup scheduling
150pub(super) struct CleanupExpiredJobs;
151
152// ============================================================================
153// Web Handler Pattern Messages (HTTP handler to agent communication)
154// ============================================================================
155
156/// Request job metrics (web handler pattern).
157///
158/// Used by HTTP handlers to query job statistics. Uses oneshot channel
159/// for response to avoid blocking the handler.
160///
161/// # Example
162///
163/// ```rust,ignore
164/// use acton_htmx::jobs::agent::messages::GetMetricsRequest;
165/// use std::time::Duration;
166///
167/// async fn handler(State(state): State<ActonHtmxState>) -> Result<Response> {
168/// let (request, rx) = GetMetricsRequest::new();
169/// state.job_agent().send(request).await;
170///
171/// let timeout = Duration::from_millis(100);
172/// let metrics = tokio::time::timeout(timeout, rx).await??;
173///
174/// Ok(Json(metrics).into_response())
175/// }
176/// ```
177#[derive(Clone, Debug)]
178pub struct GetMetricsRequest {
179 /// Response channel for metrics.
180 pub response_tx: ResponseChannel<JobMetrics>,
181}
182
183impl GetMetricsRequest {
184 /// Create a new metrics request with response channel.
185 ///
186 /// Returns a tuple of (request, receiver) where the request should be
187 /// sent to the agent and the receiver awaited for the response.
188 #[must_use]
189 pub fn new() -> (Self, oneshot::Receiver<JobMetrics>) {
190 let (tx, rx) = oneshot::channel();
191 let request = Self {
192 response_tx: Arc::new(Mutex::new(Some(tx))),
193 };
194 (request, rx)
195 }
196}
197
198/// Request job status (web handler pattern).
199///
200/// Used by HTTP handlers to query the status of a specific job.
201/// Uses oneshot channel for response to avoid blocking the handler.
202///
203/// # Example
204///
205/// ```rust,ignore
206/// use acton_htmx::jobs::agent::messages::GetJobStatusRequest;
207/// use std::time::Duration;
208///
209/// async fn handler(
210/// State(state): State<ActonHtmxState>,
211/// Path(job_id): Path<JobId>,
212/// ) -> Result<Response> {
213/// let (request, rx) = GetJobStatusRequest::new(job_id);
214/// state.job_agent().send(request).await;
215///
216/// let timeout = Duration::from_millis(100);
217/// let status = tokio::time::timeout(timeout, rx).await??;
218///
219/// Ok(Json(status).into_response())
220/// }
221/// ```
222#[derive(Clone, Debug)]
223pub struct GetJobStatusRequest {
224 /// Job ID to query.
225 pub id: JobId,
226 /// Response channel for status.
227 pub response_tx: ResponseChannel<Option<JobStatus>>,
228}
229
230impl GetJobStatusRequest {
231 /// Create a new job status request with response channel.
232 ///
233 /// Returns a tuple of (request, receiver) where the request should be
234 /// sent to the agent and the receiver awaited for the response.
235 #[must_use]
236 pub fn new(id: JobId) -> (Self, oneshot::Receiver<Option<JobStatus>>) {
237 let (tx, rx) = oneshot::channel();
238 let request = Self {
239 id,
240 response_tx: Arc::new(Mutex::new(Some(tx))),
241 };
242 (request, rx)
243 }
244}
245
246/// Retry a failed job (web handler pattern).
247///
248/// Re-queues a job from the dead letter queue back into the main queue
249/// for another execution attempt.
250///
251/// # Example
252///
253/// ```rust,ignore
254/// use acton_htmx::jobs::agent::messages::RetryJobRequest;
255///
256/// async fn handler(
257/// State(state): State<ActonHtmxState>,
258/// Path(job_id): Path<JobId>,
259/// ) -> Result<Response> {
260/// let (request, rx) = RetryJobRequest::new(job_id);
261/// state.job_agent().send(request).await;
262///
263/// let success = tokio::time::timeout(Duration::from_millis(100), rx).await??;
264/// Ok(if success {
265/// StatusCode::OK
266/// } else {
267/// StatusCode::NOT_FOUND
268/// }.into_response())
269/// }
270/// ```
271#[derive(Clone, Debug)]
272pub struct RetryJobRequest {
273 /// Job ID to retry.
274 pub id: JobId,
275 /// Response channel indicating success.
276 pub response_tx: ResponseChannel<bool>,
277}
278
279impl RetryJobRequest {
280 /// Create a new retry job request with response channel.
281 ///
282 /// Returns a tuple of (request, receiver) where the request should be
283 /// sent to the agent and the receiver awaited for the response.
284 #[must_use]
285 pub fn new(id: JobId) -> (Self, oneshot::Receiver<bool>) {
286 let (tx, rx) = oneshot::channel();
287 let request = Self {
288 id,
289 response_tx: Arc::new(Mutex::new(Some(tx))),
290 };
291 (request, rx)
292 }
293}
294
295/// Retry all failed jobs (web handler pattern).
296///
297/// Re-queues all jobs from the dead letter queue back into the main queue.
298/// Returns the number of jobs successfully retried.
299///
300/// # Example
301///
302/// ```rust,ignore
303/// use acton_htmx::jobs::agent::messages::RetryAllFailedRequest;
304///
305/// async fn handler(State(state): State<ActonHtmxState>) -> Result<Response> {
306/// let (request, rx) = RetryAllFailedRequest::new();
307/// state.job_agent().send(request).await;
308///
309/// let count = tokio::time::timeout(Duration::from_millis(500), rx).await??;
310/// Ok(Json(json!({ "retried": count })).into_response())
311/// }
312/// ```
313#[derive(Clone, Debug)]
314pub struct RetryAllFailedRequest {
315 /// Response channel with count of retried jobs.
316 pub response_tx: ResponseChannel<usize>,
317}
318
319impl RetryAllFailedRequest {
320 /// Create a new retry all failed request with response channel.
321 ///
322 /// Returns a tuple of (request, receiver) where the request should be
323 /// sent to the agent and the receiver awaited for the response.
324 #[must_use]
325 pub fn new() -> (Self, oneshot::Receiver<usize>) {
326 let (tx, rx) = oneshot::channel();
327 let request = Self {
328 response_tx: Arc::new(Mutex::new(Some(tx))),
329 };
330 (request, rx)
331 }
332}
333
334/// Cancel a running or pending job (web handler pattern).
335///
336/// Attempts to cancel a job. If the job is pending, it's removed from the queue.
337/// If it's currently running, a cancellation signal is sent.
338///
339/// # Example
340///
341/// ```rust,ignore
342/// use acton_htmx::jobs::agent::messages::CancelJobRequest;
343///
344/// async fn handler(
345/// State(state): State<ActonHtmxState>,
346/// Path(job_id): Path<JobId>,
347/// ) -> Result<Response> {
348/// let (request, rx) = CancelJobRequest::new(job_id);
349/// state.job_agent().send(request).await;
350///
351/// let success = tokio::time::timeout(Duration::from_millis(100), rx).await??;
352/// Ok(if success {
353/// StatusCode::OK
354/// } else {
355/// StatusCode::NOT_FOUND
356/// }.into_response())
357/// }
358/// ```
359#[derive(Clone, Debug)]
360pub struct CancelJobRequest {
361 /// Job ID to cancel.
362 pub id: JobId,
363 /// Response channel indicating success.
364 pub response_tx: ResponseChannel<bool>,
365}
366
367impl CancelJobRequest {
368 /// Create a new cancel job request with response channel.
369 ///
370 /// Returns a tuple of (request, receiver) where the request should be
371 /// sent to the agent and the receiver awaited for the response.
372 #[must_use]
373 pub fn new(id: JobId) -> (Self, oneshot::Receiver<bool>) {
374 let (tx, rx) = oneshot::channel();
375 let request = Self {
376 id,
377 response_tx: Arc::new(Mutex::new(Some(tx))),
378 };
379 (request, rx)
380 }
381}
382
383/// Clear the dead letter queue (web handler pattern).
384///
385/// Permanently removes all jobs from the dead letter queue.
386/// This operation cannot be undone.
387///
388/// # Example
389///
390/// ```rust,ignore
391/// use acton_htmx::jobs::agent::messages::ClearDeadLetterQueueRequest;
392///
393/// async fn handler(State(state): State<ActonHtmxState>) -> Result<Response> {
394/// let (request, rx) = ClearDeadLetterQueueRequest::new();
395/// state.job_agent().send(request).await;
396///
397/// let count = tokio::time::timeout(Duration::from_millis(100), rx).await??;
398/// Ok(Json(json!({ "cleared": count })).into_response())
399/// }
400/// ```
401#[derive(Clone, Debug)]
402pub struct ClearDeadLetterQueueRequest {
403 /// Response channel with count of cleared jobs.
404 pub response_tx: ResponseChannel<usize>,
405}
406
407impl ClearDeadLetterQueueRequest {
408 /// Create a new clear dead letter queue request with response channel.
409 ///
410 /// Returns a tuple of (request, receiver) where the request should be
411 /// sent to the agent and the receiver awaited for the response.
412 #[must_use]
413 pub fn new() -> (Self, oneshot::Receiver<usize>) {
414 let (tx, rx) = oneshot::channel();
415 let request = Self {
416 response_tx: Arc::new(Mutex::new(Some(tx))),
417 };
418 (request, rx)
419 }
420}
421
422/// Job history page response containing records and pagination info.
423#[derive(Debug, Clone, Serialize, Deserialize)]
424pub struct JobHistoryPage {
425 /// Job history records for this page.
426 pub jobs: Vec<super::history::JobHistoryRecord>,
427 /// Current page number (1-indexed).
428 pub page: usize,
429 /// Number of records per page.
430 pub page_size: usize,
431 /// Total number of matching records across all pages.
432 pub total_count: usize,
433 /// Whether there is a previous page.
434 pub has_prev: bool,
435 /// Whether there is a next page.
436 pub has_next: bool,
437}
438
439impl JobHistoryPage {
440 /// Create a new job history page from records and pagination info.
441 #[must_use]
442 pub const fn new(
443 jobs: Vec<super::history::JobHistoryRecord>,
444 page: usize,
445 page_size: usize,
446 total_count: usize,
447 ) -> Self {
448 let has_prev = page > 1;
449 let total_pages = total_count.div_ceil(page_size);
450 let has_next = page < total_pages;
451
452 Self {
453 jobs,
454 page,
455 page_size,
456 total_count,
457 has_prev,
458 has_next,
459 }
460 }
461
462 /// Get the previous page number.
463 #[must_use]
464 pub fn prev_page(&self) -> usize {
465 self.page.saturating_sub(1).max(1)
466 }
467
468 /// Get the next page number.
469 #[must_use]
470 pub const fn next_page(&self) -> usize {
471 self.page + 1
472 }
473
474 /// Get the starting record number for this page (1-indexed).
475 #[must_use]
476 pub fn page_start(&self) -> usize {
477 if self.jobs.is_empty() {
478 0
479 } else {
480 (self.page - 1) * self.page_size + 1
481 }
482 }
483
484 /// Get the ending record number for this page (1-indexed).
485 #[must_use]
486 pub fn page_end(&self) -> usize {
487 if self.jobs.is_empty() {
488 0
489 } else {
490 self.page_start() + self.jobs.len() - 1
491 }
492 }
493}
494
495/// Request job history with pagination and search (web handler pattern).
496///
497/// Retrieves completed job history with optional search filtering
498/// and pagination support.
499///
500/// # Example
501///
502/// ```rust,ignore
503/// use acton_htmx::jobs::agent::messages::GetJobHistoryRequest;
504///
505/// async fn handler(
506/// State(state): State<ActonHtmxState>,
507/// Query(params): Query<HistoryParams>,
508/// ) -> Result<Response> {
509/// let (request, rx) = GetJobHistoryRequest::new(
510/// params.page.unwrap_or(1),
511/// params.page_size.unwrap_or(20),
512/// params.search,
513/// );
514/// state.job_agent().send(request).await;
515///
516/// let timeout = Duration::from_millis(200);
517/// let history = tokio::time::timeout(timeout, rx).await??;
518///
519/// Ok(Json(history).into_response())
520/// }
521/// ```
522#[derive(Clone, Debug)]
523pub struct GetJobHistoryRequest {
524 /// Page number (1-indexed).
525 pub page: usize,
526 /// Number of records per page.
527 pub page_size: usize,
528 /// Optional search query to filter results.
529 pub search_query: Option<String>,
530 /// Response channel for history page.
531 pub response_tx: ResponseChannel<JobHistoryPage>,
532}
533
534impl GetJobHistoryRequest {
535 /// Create a new job history request with response channel.
536 ///
537 /// # Arguments
538 ///
539 /// * `page` - Page number (1-indexed)
540 /// * `page_size` - Number of records per page
541 /// * `search_query` - Optional search string to filter records
542 ///
543 /// Returns a tuple of (request, receiver) where the request should be
544 /// sent to the agent and the receiver awaited for the response.
545 #[must_use]
546 pub fn new(
547 page: usize,
548 page_size: usize,
549 search_query: Option<String>,
550 ) -> (Self, oneshot::Receiver<JobHistoryPage>) {
551 let (tx, rx) = oneshot::channel();
552 let request = Self {
553 page: page.max(1), // Ensure page is at least 1
554 page_size: page_size.clamp(1, 100), // Clamp between 1-100
555 search_query,
556 response_tx: Arc::new(Mutex::new(Some(tx))),
557 };
558 (request, rx)
559 }
560}