Skip to main content

forge_jobs_ui/
ipc.rs

1//! The crate's IPC contract.
2//!
3//! Components don't call `invoke()` directly — they reach for a
4//! `QueueIpc` trait object from Leptos context. The consumer
5//! implements the trait around their host's own IPC mechanism (Tauri's
6//! `invoke`, a REST client, an in-process mock for tests, etc.), so
7//! the panel is reusable across hosts.
8
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12
13// ── DTOs ─────────────────────────────────────────────────────────────
14
15#[derive(Clone, Debug, Default, Deserialize)]
16pub struct StatusCounts {
17    #[serde(default)]
18    pub pending: u64,
19    #[serde(default)]
20    pub scheduled: u64,
21    #[serde(default)]
22    pub in_progress: u64,
23    #[serde(default)]
24    pub done: u64,
25    #[serde(default)]
26    pub failed: u64,
27    #[serde(default)]
28    pub dead: u64,
29}
30
31impl StatusCounts {
32    #[must_use]
33    pub const fn total(&self) -> u64 {
34        self.pending + self.scheduled + self.in_progress + self.done + self.failed + self.dead
35    }
36}
37
38#[derive(Clone, Debug, Deserialize)]
39pub struct QueueProcess {
40    pub process_id: String,
41    pub queue_name: String,
42    pub host_id: String,
43    pub started_at: DateTime<Utc>,
44    pub heartbeat_at: DateTime<Utc>,
45    #[serde(default)]
46    pub current_job_id: Option<String>,
47}
48
49#[derive(Clone, Debug, Deserialize)]
50pub struct QueueOverview {
51    pub name: String,
52    pub max_workers: i32,
53    pub paused: bool,
54    pub retain_done_days: i32,
55    pub retain_dead_days: i32,
56    #[serde(default)]
57    pub backoff_enabled: bool,
58    #[serde(default = "default_backoff_base")]
59    pub backoff_base_seconds: i32,
60    #[serde(default = "default_backoff_max")]
61    pub backoff_max_seconds: i32,
62    /// Cool-down deadline while the queue is throttled; drives the
63    /// "resuming in Ns" countdown on the card. `None` when not throttled.
64    #[serde(default)]
65    pub throttled_until: Option<DateTime<Utc>>,
66    /// Age (seconds) of the oldest ready job — the queue lag. Sampled
67    /// by the live-metrics chart.
68    #[serde(default)]
69    pub oldest_pending_age_seconds: u64,
70    pub counts: StatusCounts,
71    #[serde(default)]
72    pub processes: Vec<QueueProcess>,
73}
74
75const fn default_backoff_base() -> i32 {
76    60
77}
78
79const fn default_backoff_max() -> i32 {
80    1800
81}
82
83/// One rebalancer-assigned slot count for a worker, by queue.
84#[derive(Clone, Debug, Deserialize)]
85pub struct WorkerSlot {
86    pub queue_name: String,
87    #[serde(default)]
88    pub slots: i32,
89}
90
91/// One worker process (pod) for the Workers health view.
92#[derive(Clone, Debug, Deserialize)]
93pub struct Worker {
94    pub host_id: String,
95    #[serde(default)]
96    pub worker_name: Option<String>,
97    #[serde(default)]
98    pub queues: Vec<String>,
99    #[serde(default)]
100    pub slots: Vec<WorkerSlot>,
101    #[serde(default)]
102    pub workers_live: u32,
103    #[serde(default)]
104    pub in_flight: u32,
105    pub heartbeat_at: DateTime<Utc>,
106    #[serde(default)]
107    pub heartbeat_age_seconds: u64,
108}
109
110impl Worker {
111    /// Display name — the human-friendly label if set, else the host id.
112    #[must_use]
113    pub fn display_name(&self) -> &str {
114        self.worker_name.as_deref().unwrap_or(&self.host_id)
115    }
116}
117
118/// `queue_workers` response — live workers plus any queues none cover.
119#[derive(Clone, Debug, Default, Deserialize)]
120pub struct WorkersOverview {
121    #[serde(default)]
122    pub workers: Vec<Worker>,
123    #[serde(default)]
124    pub unassigned_queues: Vec<String>,
125}
126
127#[derive(Clone, Debug, Deserialize)]
128pub struct JobRow {
129    pub id: String,
130    pub queue_name: String,
131    pub kind: String,
132    pub status: String,
133    pub priority: i32,
134    pub attempts: i32,
135    pub max_attempts: i32,
136    pub enqueued_at: DateTime<Utc>,
137    pub scheduled_at: DateTime<Utc>,
138    #[serde(default)]
139    pub started_at: Option<DateTime<Utc>>,
140    #[serde(default)]
141    pub completed_at: Option<DateTime<Utc>>,
142    #[serde(default)]
143    pub last_error: Option<String>,
144    #[serde(default)]
145    pub process_id: Option<String>,
146    #[serde(default)]
147    pub dedupe_key: Option<String>,
148    /// Last heartbeat the worker wrote on this row. Fresh while a
149    /// handler is actively running; stale → the reaper will revive.
150    #[serde(default)]
151    pub heartbeat_at: Option<DateTime<Utc>>,
152}
153
154#[derive(Clone, Debug, Deserialize)]
155pub struct JobInspect {
156    pub row: JobRow,
157    #[serde(default)]
158    pub payload: serde_json::Value,
159    #[serde(default)]
160    pub error_history: Vec<serde_json::Value>,
161}
162
163#[derive(Clone, Debug, Default, Serialize)]
164pub struct JobsFilter {
165    #[serde(default)]
166    pub queues: Vec<String>,
167    #[serde(default)]
168    pub kinds: Vec<String>,
169    #[serde(default)]
170    pub statuses: Vec<String>,
171    #[serde(default, skip_serializing_if = "Option::is_none")]
172    pub from: Option<DateTime<Utc>>,
173    #[serde(default, skip_serializing_if = "Option::is_none")]
174    pub to: Option<DateTime<Utc>>,
175    #[serde(default, skip_serializing_if = "Option::is_none")]
176    pub payload_search: Option<String>,
177}
178
179#[derive(Clone, Debug, Deserialize)]
180pub struct JobsPage {
181    pub rows: Vec<JobRow>,
182    pub total: u64,
183    pub limit: u32,
184    pub offset: u32,
185}
186
187#[derive(Clone, Debug, Deserialize)]
188pub struct TimelineBucket {
189    pub at: DateTime<Utc>,
190    pub enqueued: u64,
191    /// Count of jobs claimed by a worker in this bucket.
192    #[serde(default)]
193    pub started: u64,
194    /// Count of worker-claimed rows that returned to the schedulable
195    /// pool without finishing (throttle / non-terminal failure
196    /// re-queue) in this bucket. Plotted as the "Retried" series.
197    #[serde(default)]
198    pub retried: u64,
199    pub completed: u64,
200    pub failed: u64,
201    /// Latency percentiles (ms) over jobs that finalized in this bucket.
202    /// `processing_*` = claim→finalize (handler speed); `total_*` =
203    /// enqueue→finalize (end-to-end). Zero when nothing completed in the
204    /// bucket. Plotted as the two latency charts.
205    #[serde(default)]
206    pub processing_p50_ms: u64,
207    #[serde(default)]
208    pub processing_p95_ms: u64,
209    #[serde(default)]
210    pub processing_p99_ms: u64,
211    #[serde(default)]
212    pub total_p50_ms: u64,
213    #[serde(default)]
214    pub total_p95_ms: u64,
215    #[serde(default)]
216    pub total_p99_ms: u64,
217}
218
219/// One bucket of a per-queue metric series (mirror of the plugin's
220/// `MetricSeriesBucket`) — throughput + latency. Resources are
221/// per-process; see [`ResourceHostSeries`].
222#[derive(Clone, Debug, Default, Deserialize)]
223pub struct MetricSeriesBucket {
224    pub at: DateTime<Utc>,
225    #[serde(default)]
226    pub enqueued: u64,
227    #[serde(default)]
228    pub completed: u64,
229    #[serde(default)]
230    pub failed: u64,
231    #[serde(default)]
232    pub proc_p50_ms: u64,
233    #[serde(default)]
234    pub proc_p95_ms: u64,
235    #[serde(default)]
236    pub proc_p99_ms: u64,
237    #[serde(default)]
238    pub total_p50_ms: u64,
239    #[serde(default)]
240    pub total_p95_ms: u64,
241    #[serde(default)]
242    pub total_p99_ms: u64,
243}
244
245/// One bucket of a single pod's resource series (mirror of the plugin's
246/// `ResourceBucket`).
247#[derive(Clone, Debug, Default, Deserialize)]
248pub struct ResourceBucket {
249    pub at: DateTime<Utc>,
250    #[serde(default)]
251    pub cpu_pct: f64,
252    #[serde(default)]
253    pub rss_bytes: u64,
254    #[serde(default)]
255    pub disk_read_bytes: u64,
256    #[serde(default)]
257    pub disk_write_bytes: u64,
258    #[serde(default)]
259    pub disk_used_pct: f64,
260}
261
262/// Resource series for one pod (`host_id`). One per pod; locally, one.
263#[derive(Clone, Debug, Default, Deserialize)]
264pub struct ResourceHostSeries {
265    #[serde(default)]
266    pub host: String,
267    /// Composed worker label, when the pod set a prefix; else `None` and
268    /// [`display_name`](Self::display_name) falls back to `host`.
269    #[serde(default)]
270    pub name: Option<String>,
271    #[serde(default)]
272    pub buckets: Vec<ResourceBucket>,
273}
274
275impl ResourceHostSeries {
276    /// Worker label if set, else the raw host id.
277    #[must_use]
278    pub fn display_name(&self) -> &str {
279        self.name.as_deref().unwrap_or(&self.host)
280    }
281}
282
283/// One bucket of a single pod's DB-health series (mirror of the
284/// plugin's `DbHealthBucket`). All gauges past `ops_per_min` come
285/// from the running database — `SQLite` fills `db_size_bytes` +
286/// `wal_bytes` and leaves `pool_*` at zero; Postgres fills `pool_*`
287/// + `db_size_bytes` and leaves `wal_bytes` at zero.
288#[derive(Clone, Debug, Default, Deserialize)]
289pub struct DbHealthBucket {
290    pub at: DateTime<Utc>,
291    #[serde(default)]
292    pub read_p50_ms: u64,
293    #[serde(default)]
294    pub read_p95_ms: u64,
295    #[serde(default)]
296    pub read_p99_ms: u64,
297    #[serde(default)]
298    pub reads_per_min: u64,
299    #[serde(default)]
300    pub write_p50_ms: u64,
301    #[serde(default)]
302    pub write_p95_ms: u64,
303    #[serde(default)]
304    pub write_p99_ms: u64,
305    #[serde(default)]
306    pub writes_per_min: u64,
307    #[serde(default)]
308    pub pool_active: u64,
309    #[serde(default)]
310    pub pool_idle: u64,
311    #[serde(default)]
312    pub pool_max: u64,
313    #[serde(default)]
314    pub pool_used_pct: f64,
315    #[serde(default)]
316    pub db_size_bytes: u64,
317    #[serde(default)]
318    pub wal_bytes: u64,
319}
320
321/// DB-health series for one pod (`host_id`). One per pod; locally, one.
322#[derive(Clone, Debug, Default, Deserialize)]
323pub struct DbHealthHostSeries {
324    #[serde(default)]
325    pub host: String,
326    /// Composed worker label, when the pod set a prefix; else `None` and
327    /// [`display_name`](Self::display_name) falls back to `host`.
328    #[serde(default)]
329    pub name: Option<String>,
330    #[serde(default)]
331    pub buckets: Vec<DbHealthBucket>,
332}
333
334impl DbHealthHostSeries {
335    /// Worker label if set, else the raw host id.
336    #[must_use]
337    pub fn display_name(&self) -> &str {
338        self.name.as_deref().unwrap_or(&self.host)
339    }
340}
341
342#[derive(Clone, Debug, Default, Deserialize)]
343pub struct CleanupReport {
344    #[serde(default)]
345    pub done_deleted: u64,
346    #[serde(default)]
347    pub dead_deleted: u64,
348}
349
350/// Every status the schema understands, in display order. Used by the
351/// panel to populate the status filter dropdown without an extra
352/// round-trip.
353pub const JOB_STATUSES: &[&str] = &["pending", "in_progress", "done", "failed", "dead"];
354
355#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
356pub struct CronSchedule {
357    pub name: String,
358    pub kind: String,
359    #[serde(default)]
360    pub payload: serde_json::Value,
361    #[serde(default)]
362    pub queue_name: Option<String>,
363    pub cron_expr: String,
364    pub enabled: bool,
365    #[serde(default)]
366    pub max_attempts: Option<i32>,
367    /// When set, each firing dedupes against it (skip-if-in-flight). The
368    /// Cron tab renders this as a checkbox.
369    #[serde(default)]
370    pub dedupe_key: Option<String>,
371    #[serde(default)]
372    pub last_fired_at: Option<DateTime<Utc>>,
373    #[serde(default)]
374    pub next_fire_at: Option<DateTime<Utc>>,
375    #[serde(default)]
376    pub last_error: Option<String>,
377    pub created_at: DateTime<Utc>,
378    pub updated_at: DateTime<Utc>,
379}
380
381/// Args for [`QueueIpc::jobs_enqueue`] — the Rails `perform_later`
382/// analog. Every field except `kind` + `payload` is optional;
383/// defaults match the host's enqueue defaults (route by kind prefix,
384/// no dedupe, priority 0, runs immediately).
385#[derive(Clone, Debug, Serialize)]
386pub struct JobsEnqueueReq {
387    pub kind: String,
388    pub payload: serde_json::Value,
389    #[serde(skip_serializing_if = "Option::is_none")]
390    pub queue_name: Option<String>,
391    #[serde(skip_serializing_if = "Option::is_none")]
392    pub dedupe_key: Option<String>,
393    /// Future timestamp; the worker won't claim until `>=` this.
394    /// `None` = enqueue runs as soon as a worker is free.
395    #[serde(skip_serializing_if = "Option::is_none")]
396    pub run_at: Option<DateTime<Utc>>,
397    #[serde(skip_serializing_if = "Option::is_none")]
398    pub priority: Option<i32>,
399    #[serde(skip_serializing_if = "Option::is_none")]
400    pub max_attempts: Option<i32>,
401}
402
403// ── trait ────────────────────────────────────────────────────────────
404
405/// Structured IPC error.
406///
407/// Mirrors the host plugin's `Error` enum so the frontend can branch on
408/// `kind` (e.g. show a retry pill on `RateLimited`, jump to the
409/// offending input on `Validation`) instead of regex-matching a string.
410/// Variants are kept identical to the host's tagged-enum shape so
411/// `serde_wasm_bindgen::from_value` round-trips cleanly.
412///
413/// When the host wasn't reachable or returned an unstructured error,
414/// the frontend bridge constructs `Internal { msg }` from whatever
415/// JS-side message it could extract.
416#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
417#[serde(tag = "kind", rename_all = "snake_case")]
418pub enum IpcError {
419    Validation { field: String, msg: String },
420    NotFound { msg: String },
421    Storage { msg: String },
422    RateLimited { retry_after_secs: u32 },
423    Internal { msg: String },
424}
425
426impl IpcError {
427    /// Single source of truth for the user-visible message — every
428    /// variant carries one. The panel's default error pill displays
429    /// this directly; `kind`-specific UI affordances (a retry button
430    /// on `RateLimited`, a focus on `Validation.field`) are layered
431    /// on top in the views that care.
432    #[must_use]
433    pub fn message(&self) -> &str {
434        match self {
435            Self::Validation { msg, .. }
436            | Self::NotFound { msg }
437            | Self::Storage { msg }
438            | Self::Internal { msg } => msg,
439            Self::RateLimited { .. } => "rate limited",
440        }
441    }
442
443    /// Wrap an unstructured string (legacy JS-side message, network
444    /// error, …) as an `Internal`. The bridge calls this as its
445    /// fallback when the JSON the host emitted didn't deserialize.
446    #[must_use]
447    pub fn internal(msg: impl Into<String>) -> Self {
448        Self::Internal { msg: msg.into() }
449    }
450}
451
452impl std::fmt::Display for IpcError {
453    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454        f.write_str(self.message())
455    }
456}
457
458/// Bridge between the panel and the host's queue API. The host
459/// implements this once around its preferred IPC mechanism (Tauri's
460/// `invoke`, REST, in-process mock) and provides the `Arc<dyn QueueIpc>`
461/// via Leptos context.
462///
463/// `?Send` because Leptos CSR is single-threaded — the futures don't
464/// need to cross threads. The trait itself still requires `Send + Sync`
465/// because Leptos' `provide_context` wraps it in a globally-shareable
466/// handle even in CSR mode.
467#[async_trait(?Send)]
468pub trait QueueIpc: Send + Sync + 'static {
469    // ── reads
470    async fn queue_overview(&self) -> Result<Vec<QueueOverview>, IpcError>;
471    async fn queue_processes(
472        &self,
473        queue_name: Option<&str>,
474    ) -> Result<Vec<QueueProcess>, IpcError>;
475    /// Worker-centric health view: one entry per live worker process
476    /// (its declared queues, assigned slots, live/in-flight counts,
477    /// heartbeat age) plus any configured queue no live worker covers.
478    async fn queue_workers(&self) -> Result<WorkersOverview, IpcError>;
479    /// Bucketed enqueue/completion/failure counts across the half-open
480    /// `[from, to)` range at the given granularity. Buckets are aligned
481    /// to `from` (not wall-clock) and walk forward in `bucket_secs`
482    /// steps. The host caps the bucket count to keep payloads bounded.
483    async fn queue_timeline_range(
484        &self,
485        from: DateTime<Utc>,
486        to: DateTime<Utc>,
487        bucket_secs: u32,
488    ) -> Result<Vec<TimelineBucket>, IpcError>;
489    /// Per-queue metric series from the pre-aggregated rollup, bucketed
490    /// to `bucket_secs` (clamped up to the 60s base). Pass `queue = ""`
491    /// for the process-wide CPU/RAM gauges; a queue name for that
492    /// queue's throughput + latency.
493    async fn queue_metric_series(
494        &self,
495        queue: &str,
496        from: DateTime<Utc>,
497        to: DateTime<Utc>,
498        bucket_secs: u32,
499    ) -> Result<Vec<MetricSeriesBucket>, IpcError>;
500    /// Per-pod resource series (CPU/RAM/disk) from the rollup, one entry
501    /// per `host_id`. Locally there's exactly one pod; a cluster returns
502    /// one series per pod.
503    async fn queue_resource_series(
504        &self,
505        from: DateTime<Utc>,
506        to: DateTime<Utc>,
507        bucket_secs: u32,
508    ) -> Result<Vec<ResourceHostSeries>, IpcError>;
509    /// Per-pod DB-health series (op-latency percentiles + pool
510    /// saturation) from the rollup, one entry per `host_id`.
511    async fn queue_db_series(
512        &self,
513        from: DateTime<Utc>,
514        to: DateTime<Utc>,
515        bucket_secs: u32,
516    ) -> Result<Vec<DbHealthHostSeries>, IpcError>;
517    async fn jobs_list(
518        &self,
519        filter: JobsFilter,
520        limit: u32,
521        offset: u32,
522    ) -> Result<JobsPage, IpcError>;
523    async fn jobs_failed(&self, limit: u32) -> Result<Vec<JobRow>, IpcError>;
524    async fn jobs_kinds(&self, queue_name: Option<&str>) -> Result<Vec<String>, IpcError>;
525    async fn job_inspect(&self, id: &str) -> Result<JobInspect, IpcError>;
526
527    // ── mutations
528    async fn queue_set_max_workers(&self, queue_name: &str, n: i32) -> Result<(), IpcError>;
529    async fn queue_set_paused(&self, queue_name: &str, paused: bool) -> Result<(), IpcError>;
530    async fn queue_set_retention(
531        &self,
532        queue_name: &str,
533        done_days: i32,
534        dead_days: i32,
535    ) -> Result<(), IpcError>;
536    async fn queue_set_backoff(
537        &self,
538        queue_name: &str,
539        enabled: bool,
540        base_seconds: i32,
541        max_seconds: i32,
542    ) -> Result<(), IpcError>;
543    async fn queue_cleanup_now(&self) -> Result<CleanupReport, IpcError>;
544    async fn queue_enqueue_demo(&self, payload: serde_json::Value) -> Result<String, IpcError>;
545
546    /// Generic typed-job enqueue — the Rails `perform_later` analog.
547    /// Returns the new job's id. `run_at` schedules a future run;
548    /// pass `None` for "immediately." All other knobs follow the
549    /// queue's defaults when `None`.
550    async fn jobs_enqueue(&self, req: JobsEnqueueReq) -> Result<String, IpcError>;
551
552    /// Pending rows scheduled to run strictly after now. Powers the
553    /// "Scheduled" tab. Ordered ascending by `scheduled_at`.
554    async fn jobs_scheduled(&self, queue_name: Option<&str>) -> Result<Vec<JobRow>, IpcError>;
555
556    /// Advance one pending row's `scheduled_at` to now so the next
557    /// worker claim picks it up. Returns `true` when the row was
558    /// touched; `false` when the row isn't `pending` (already
559    /// running, terminal, or missing).
560    async fn jobs_run_now(&self, id: &str) -> Result<bool, IpcError>;
561    async fn jobs_retry(&self, ids: &[String]) -> Result<u64, IpcError>;
562    async fn jobs_retry_all_failed(&self) -> Result<u64, IpcError>;
563    /// Requeue every job currently in `status` (`failed` or `dead`).
564    /// Backs the per-tab "Retry all" buttons on the Retries / Dead panels.
565    async fn jobs_retry_all_by_status(&self, status: &str) -> Result<u64, IpcError>;
566    async fn jobs_delete(&self, ids: &[String]) -> Result<u64, IpcError>;
567    async fn jobs_requeue(&self, ids: &[String]) -> Result<u64, IpcError>;
568    async fn jobs_delete_done_older_than(
569        &self,
570        days: u32,
571        queue_name: Option<&str>,
572    ) -> Result<u64, IpcError>;
573    /// Delete every job currently in `status` (`done` / `failed` / `dead`),
574    /// optionally scoped to one `queue_name` (`None` = all queues).
575    /// Backs the per-tab Purge buttons. Returns the row count deleted.
576    async fn jobs_delete_by_status(
577        &self,
578        status: &str,
579        queue_name: Option<&str>,
580    ) -> Result<u64, IpcError>;
581
582    // ── cron schedules
583    async fn cron_list(&self) -> Result<Vec<CronSchedule>, IpcError>;
584    async fn cron_set_enabled(&self, name: &str, enabled: bool) -> Result<(), IpcError>;
585    async fn cron_set_expr(&self, name: &str, expr: &str) -> Result<(), IpcError>;
586    /// Toggle skip-if-in-flight: `true` dedupes each firing against the
587    /// schedule name (a tick during an active run is a no-op); `false`
588    /// clears it.
589    async fn cron_set_dedupe(&self, name: &str, dedupe: bool) -> Result<(), IpcError>;
590    /// Force a schedule to fire immediately. Returns the enqueued job id.
591    async fn cron_trigger_now(&self, name: &str) -> Result<String, IpcError>;
592}
593
594/// Type alias for the context value the consumer provides via
595/// `provide_context::<IpcCtx>(Arc::new(my_impl))`. `Arc` (not `Rc`)
596/// because Leptos requires context values to be `Send + Sync` even in
597/// CSR mode.
598pub type IpcCtx = std::sync::Arc<dyn QueueIpc>;