1use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12
13#[derive(Clone, Debug, Default, Deserialize)]
16pub struct StatusCounts {
17 #[serde(default)]
18 pub pending: u64,
19 #[serde(default)]
20 pub scheduled: u64,
21 #[serde(default)]
22 pub in_progress: u64,
23 #[serde(default)]
24 pub done: u64,
25 #[serde(default)]
26 pub failed: u64,
27 #[serde(default)]
28 pub dead: u64,
29}
30
31impl StatusCounts {
32 #[must_use]
33 pub const fn total(&self) -> u64 {
34 self.pending + self.scheduled + self.in_progress + self.done + self.failed + self.dead
35 }
36}
37
38#[derive(Clone, Debug, Deserialize)]
39pub struct QueueProcess {
40 pub process_id: String,
41 pub queue_name: String,
42 pub host_id: String,
43 pub started_at: DateTime<Utc>,
44 pub heartbeat_at: DateTime<Utc>,
45 #[serde(default)]
46 pub current_job_id: Option<String>,
47}
48
49#[derive(Clone, Debug, Deserialize)]
50pub struct QueueOverview {
51 pub name: String,
52 pub max_workers: i32,
53 pub paused: bool,
54 pub retain_done_days: i32,
55 pub retain_dead_days: i32,
56 #[serde(default)]
57 pub backoff_enabled: bool,
58 #[serde(default = "default_backoff_base")]
59 pub backoff_base_seconds: i32,
60 #[serde(default = "default_backoff_max")]
61 pub backoff_max_seconds: i32,
62 #[serde(default)]
65 pub throttled_until: Option<DateTime<Utc>>,
66 #[serde(default)]
69 pub oldest_pending_age_seconds: u64,
70 pub counts: StatusCounts,
71 #[serde(default)]
72 pub processes: Vec<QueueProcess>,
73}
74
75const fn default_backoff_base() -> i32 {
76 60
77}
78
79const fn default_backoff_max() -> i32 {
80 1800
81}
82
83#[derive(Clone, Debug, Deserialize)]
85pub struct WorkerSlot {
86 pub queue_name: String,
87 #[serde(default)]
88 pub slots: i32,
89}
90
91#[derive(Clone, Debug, Deserialize)]
93pub struct Worker {
94 pub host_id: String,
95 #[serde(default)]
96 pub worker_name: Option<String>,
97 #[serde(default)]
98 pub queues: Vec<String>,
99 #[serde(default)]
100 pub slots: Vec<WorkerSlot>,
101 #[serde(default)]
102 pub workers_live: u32,
103 #[serde(default)]
104 pub in_flight: u32,
105 pub heartbeat_at: DateTime<Utc>,
106 #[serde(default)]
107 pub heartbeat_age_seconds: u64,
108}
109
110impl Worker {
111 #[must_use]
113 pub fn display_name(&self) -> &str {
114 self.worker_name.as_deref().unwrap_or(&self.host_id)
115 }
116}
117
118#[derive(Clone, Debug, Default, Deserialize)]
120pub struct WorkersOverview {
121 #[serde(default)]
122 pub workers: Vec<Worker>,
123 #[serde(default)]
124 pub unassigned_queues: Vec<String>,
125}
126
127#[derive(Clone, Debug, Deserialize)]
128pub struct JobRow {
129 pub id: String,
130 pub queue_name: String,
131 pub kind: String,
132 pub status: String,
133 pub priority: i32,
134 pub attempts: i32,
135 pub max_attempts: i32,
136 pub enqueued_at: DateTime<Utc>,
137 pub scheduled_at: DateTime<Utc>,
138 #[serde(default)]
139 pub started_at: Option<DateTime<Utc>>,
140 #[serde(default)]
141 pub completed_at: Option<DateTime<Utc>>,
142 #[serde(default)]
143 pub last_error: Option<String>,
144 #[serde(default)]
145 pub process_id: Option<String>,
146 #[serde(default)]
147 pub dedupe_key: Option<String>,
148 #[serde(default)]
151 pub heartbeat_at: Option<DateTime<Utc>>,
152}
153
154#[derive(Clone, Debug, Deserialize)]
155pub struct JobInspect {
156 pub row: JobRow,
157 #[serde(default)]
158 pub payload: serde_json::Value,
159 #[serde(default)]
160 pub error_history: Vec<serde_json::Value>,
161}
162
163#[derive(Clone, Debug, Default, Serialize)]
164pub struct JobsFilter {
165 #[serde(default)]
166 pub queues: Vec<String>,
167 #[serde(default)]
168 pub kinds: Vec<String>,
169 #[serde(default)]
170 pub statuses: Vec<String>,
171 #[serde(default, skip_serializing_if = "Option::is_none")]
172 pub from: Option<DateTime<Utc>>,
173 #[serde(default, skip_serializing_if = "Option::is_none")]
174 pub to: Option<DateTime<Utc>>,
175 #[serde(default, skip_serializing_if = "Option::is_none")]
176 pub payload_search: Option<String>,
177}
178
179#[derive(Clone, Debug, Deserialize)]
180pub struct JobsPage {
181 pub rows: Vec<JobRow>,
182 pub total: u64,
183 pub limit: u32,
184 pub offset: u32,
185}
186
187#[derive(Clone, Debug, Deserialize)]
188pub struct TimelineBucket {
189 pub at: DateTime<Utc>,
190 pub enqueued: u64,
191 #[serde(default)]
193 pub started: u64,
194 #[serde(default)]
198 pub retried: u64,
199 pub completed: u64,
200 pub failed: u64,
201 #[serde(default)]
206 pub processing_p50_ms: u64,
207 #[serde(default)]
208 pub processing_p95_ms: u64,
209 #[serde(default)]
210 pub processing_p99_ms: u64,
211 #[serde(default)]
212 pub total_p50_ms: u64,
213 #[serde(default)]
214 pub total_p95_ms: u64,
215 #[serde(default)]
216 pub total_p99_ms: u64,
217}
218
219#[derive(Clone, Debug, Default, Deserialize)]
223pub struct MetricSeriesBucket {
224 pub at: DateTime<Utc>,
225 #[serde(default)]
226 pub enqueued: u64,
227 #[serde(default)]
228 pub completed: u64,
229 #[serde(default)]
230 pub failed: u64,
231 #[serde(default)]
232 pub proc_p50_ms: u64,
233 #[serde(default)]
234 pub proc_p95_ms: u64,
235 #[serde(default)]
236 pub proc_p99_ms: u64,
237 #[serde(default)]
238 pub total_p50_ms: u64,
239 #[serde(default)]
240 pub total_p95_ms: u64,
241 #[serde(default)]
242 pub total_p99_ms: u64,
243}
244
245#[derive(Clone, Debug, Default, Deserialize)]
248pub struct ResourceBucket {
249 pub at: DateTime<Utc>,
250 #[serde(default)]
251 pub cpu_pct: f64,
252 #[serde(default)]
253 pub rss_bytes: u64,
254 #[serde(default)]
255 pub disk_read_bytes: u64,
256 #[serde(default)]
257 pub disk_write_bytes: u64,
258 #[serde(default)]
259 pub disk_used_pct: f64,
260}
261
262#[derive(Clone, Debug, Default, Deserialize)]
264pub struct ResourceHostSeries {
265 #[serde(default)]
266 pub host: String,
267 #[serde(default)]
270 pub name: Option<String>,
271 #[serde(default)]
272 pub buckets: Vec<ResourceBucket>,
273}
274
275impl ResourceHostSeries {
276 #[must_use]
278 pub fn display_name(&self) -> &str {
279 self.name.as_deref().unwrap_or(&self.host)
280 }
281}
282
283#[derive(Clone, Debug, Default, Deserialize)]
289pub struct DbHealthBucket {
290 pub at: DateTime<Utc>,
291 #[serde(default)]
292 pub read_p50_ms: u64,
293 #[serde(default)]
294 pub read_p95_ms: u64,
295 #[serde(default)]
296 pub read_p99_ms: u64,
297 #[serde(default)]
298 pub reads_per_min: u64,
299 #[serde(default)]
300 pub write_p50_ms: u64,
301 #[serde(default)]
302 pub write_p95_ms: u64,
303 #[serde(default)]
304 pub write_p99_ms: u64,
305 #[serde(default)]
306 pub writes_per_min: u64,
307 #[serde(default)]
308 pub pool_active: u64,
309 #[serde(default)]
310 pub pool_idle: u64,
311 #[serde(default)]
312 pub pool_max: u64,
313 #[serde(default)]
314 pub pool_used_pct: f64,
315 #[serde(default)]
316 pub db_size_bytes: u64,
317 #[serde(default)]
318 pub wal_bytes: u64,
319}
320
321#[derive(Clone, Debug, Default, Deserialize)]
323pub struct DbHealthHostSeries {
324 #[serde(default)]
325 pub host: String,
326 #[serde(default)]
329 pub name: Option<String>,
330 #[serde(default)]
331 pub buckets: Vec<DbHealthBucket>,
332}
333
334impl DbHealthHostSeries {
335 #[must_use]
337 pub fn display_name(&self) -> &str {
338 self.name.as_deref().unwrap_or(&self.host)
339 }
340}
341
342#[derive(Clone, Debug, Default, Deserialize)]
343pub struct CleanupReport {
344 #[serde(default)]
345 pub done_deleted: u64,
346 #[serde(default)]
347 pub dead_deleted: u64,
348}
349
350pub const JOB_STATUSES: &[&str] = &["pending", "in_progress", "done", "failed", "dead"];
354
355#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
356pub struct CronSchedule {
357 pub name: String,
358 pub kind: String,
359 #[serde(default)]
360 pub payload: serde_json::Value,
361 #[serde(default)]
362 pub queue_name: Option<String>,
363 pub cron_expr: String,
364 pub enabled: bool,
365 #[serde(default)]
366 pub max_attempts: Option<i32>,
367 #[serde(default)]
370 pub dedupe_key: Option<String>,
371 #[serde(default)]
372 pub last_fired_at: Option<DateTime<Utc>>,
373 #[serde(default)]
374 pub next_fire_at: Option<DateTime<Utc>>,
375 #[serde(default)]
376 pub last_error: Option<String>,
377 pub created_at: DateTime<Utc>,
378 pub updated_at: DateTime<Utc>,
379}
380
381#[derive(Clone, Debug, Serialize)]
386pub struct JobsEnqueueReq {
387 pub kind: String,
388 pub payload: serde_json::Value,
389 #[serde(skip_serializing_if = "Option::is_none")]
390 pub queue_name: Option<String>,
391 #[serde(skip_serializing_if = "Option::is_none")]
392 pub dedupe_key: Option<String>,
393 #[serde(skip_serializing_if = "Option::is_none")]
396 pub run_at: Option<DateTime<Utc>>,
397 #[serde(skip_serializing_if = "Option::is_none")]
398 pub priority: Option<i32>,
399 #[serde(skip_serializing_if = "Option::is_none")]
400 pub max_attempts: Option<i32>,
401}
402
403#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
417#[serde(tag = "kind", rename_all = "snake_case")]
418pub enum IpcError {
419 Validation { field: String, msg: String },
420 NotFound { msg: String },
421 Storage { msg: String },
422 RateLimited { retry_after_secs: u32 },
423 Internal { msg: String },
424}
425
426impl IpcError {
427 #[must_use]
433 pub fn message(&self) -> &str {
434 match self {
435 Self::Validation { msg, .. }
436 | Self::NotFound { msg }
437 | Self::Storage { msg }
438 | Self::Internal { msg } => msg,
439 Self::RateLimited { .. } => "rate limited",
440 }
441 }
442
443 #[must_use]
447 pub fn internal(msg: impl Into<String>) -> Self {
448 Self::Internal { msg: msg.into() }
449 }
450}
451
452impl std::fmt::Display for IpcError {
453 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454 f.write_str(self.message())
455 }
456}
457
458#[async_trait(?Send)]
468pub trait QueueIpc: Send + Sync + 'static {
469 async fn queue_overview(&self) -> Result<Vec<QueueOverview>, IpcError>;
471 async fn queue_processes(
472 &self,
473 queue_name: Option<&str>,
474 ) -> Result<Vec<QueueProcess>, IpcError>;
475 async fn queue_workers(&self) -> Result<WorkersOverview, IpcError>;
479 async fn queue_timeline_range(
484 &self,
485 from: DateTime<Utc>,
486 to: DateTime<Utc>,
487 bucket_secs: u32,
488 ) -> Result<Vec<TimelineBucket>, IpcError>;
489 async fn queue_metric_series(
494 &self,
495 queue: &str,
496 from: DateTime<Utc>,
497 to: DateTime<Utc>,
498 bucket_secs: u32,
499 ) -> Result<Vec<MetricSeriesBucket>, IpcError>;
500 async fn queue_resource_series(
504 &self,
505 from: DateTime<Utc>,
506 to: DateTime<Utc>,
507 bucket_secs: u32,
508 ) -> Result<Vec<ResourceHostSeries>, IpcError>;
509 async fn queue_db_series(
512 &self,
513 from: DateTime<Utc>,
514 to: DateTime<Utc>,
515 bucket_secs: u32,
516 ) -> Result<Vec<DbHealthHostSeries>, IpcError>;
517 async fn jobs_list(
518 &self,
519 filter: JobsFilter,
520 limit: u32,
521 offset: u32,
522 ) -> Result<JobsPage, IpcError>;
523 async fn jobs_failed(&self, limit: u32) -> Result<Vec<JobRow>, IpcError>;
524 async fn jobs_kinds(&self, queue_name: Option<&str>) -> Result<Vec<String>, IpcError>;
525 async fn job_inspect(&self, id: &str) -> Result<JobInspect, IpcError>;
526
527 async fn queue_set_max_workers(&self, queue_name: &str, n: i32) -> Result<(), IpcError>;
529 async fn queue_set_paused(&self, queue_name: &str, paused: bool) -> Result<(), IpcError>;
530 async fn queue_set_retention(
531 &self,
532 queue_name: &str,
533 done_days: i32,
534 dead_days: i32,
535 ) -> Result<(), IpcError>;
536 async fn queue_set_backoff(
537 &self,
538 queue_name: &str,
539 enabled: bool,
540 base_seconds: i32,
541 max_seconds: i32,
542 ) -> Result<(), IpcError>;
543 async fn queue_cleanup_now(&self) -> Result<CleanupReport, IpcError>;
544 async fn queue_enqueue_demo(&self, payload: serde_json::Value) -> Result<String, IpcError>;
545
546 async fn jobs_enqueue(&self, req: JobsEnqueueReq) -> Result<String, IpcError>;
551
552 async fn jobs_scheduled(&self, queue_name: Option<&str>) -> Result<Vec<JobRow>, IpcError>;
555
556 async fn jobs_run_now(&self, id: &str) -> Result<bool, IpcError>;
561 async fn jobs_retry(&self, ids: &[String]) -> Result<u64, IpcError>;
562 async fn jobs_retry_all_failed(&self) -> Result<u64, IpcError>;
563 async fn jobs_retry_all_by_status(&self, status: &str) -> Result<u64, IpcError>;
566 async fn jobs_delete(&self, ids: &[String]) -> Result<u64, IpcError>;
567 async fn jobs_requeue(&self, ids: &[String]) -> Result<u64, IpcError>;
568 async fn jobs_delete_done_older_than(
569 &self,
570 days: u32,
571 queue_name: Option<&str>,
572 ) -> Result<u64, IpcError>;
573 async fn jobs_delete_by_status(
577 &self,
578 status: &str,
579 queue_name: Option<&str>,
580 ) -> Result<u64, IpcError>;
581
582 async fn cron_list(&self) -> Result<Vec<CronSchedule>, IpcError>;
584 async fn cron_set_enabled(&self, name: &str, enabled: bool) -> Result<(), IpcError>;
585 async fn cron_set_expr(&self, name: &str, expr: &str) -> Result<(), IpcError>;
586 async fn cron_set_dedupe(&self, name: &str, dedupe: bool) -> Result<(), IpcError>;
590 async fn cron_trigger_now(&self, name: &str) -> Result<String, IpcError>;
592}
593
594pub type IpcCtx = std::sync::Arc<dyn QueueIpc>;