Skip to main content

workflow_graph_queue/
traits.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::broadcast;
7
8use crate::error::{ArtifactError, LogError, QueueError, RegistryError};
9
10// ─── Queue Types ─────────────────────────────────────────────────────────────
11
12/// A lease proving a worker has exclusively claimed a job.
13#[derive(Clone, Debug, Serialize, Deserialize)]
14pub struct Lease {
15    pub lease_id: String,
16    pub job_id: String,
17    pub workflow_id: String,
18    pub worker_id: String,
19    pub ttl_secs: u64,
20    /// Epoch milliseconds when the lease was granted/last renewed.
21    pub granted_at_ms: u64,
22}
23
24/// A job sitting in the queue, ready to be claimed by a worker.
25#[derive(Clone, Debug, Serialize, Deserialize)]
26pub struct QueuedJob {
27    pub job_id: String,
28    pub workflow_id: String,
29    pub command: String,
30    pub required_labels: Vec<String>,
31    pub retry_policy: RetryPolicy,
32    pub attempt: u32,
33    /// Outputs from upstream jobs, keyed by job_id then output key.
34    pub upstream_outputs: HashMap<String, HashMap<String, String>>,
35    pub enqueued_at_ms: u64,
36    /// Epoch milliseconds before which this job should not be claimed (backoff delay).
37    #[serde(default)]
38    pub delayed_until_ms: u64,
39}
40
41/// Configurable retry behavior per job.
42#[derive(Clone, Debug, Serialize, Deserialize)]
43pub struct RetryPolicy {
44    pub max_retries: u32,
45    pub backoff: BackoffStrategy,
46}
47
48impl Default for RetryPolicy {
49    fn default() -> Self {
50        Self {
51            max_retries: 0,
52            backoff: BackoffStrategy::None,
53        }
54    }
55}
56
57#[derive(Clone, Debug, Serialize, Deserialize)]
58pub enum BackoffStrategy {
59    None,
60    Fixed { delay_secs: u64 },
61    Exponential { base_secs: u64, max_secs: u64 },
62}
63
64impl BackoffStrategy {
65    /// Calculate delay in milliseconds for the given attempt number.
66    pub fn delay_ms(&self, attempt: u32) -> u64 {
67        match self {
68            BackoffStrategy::None => 0,
69            BackoffStrategy::Fixed { delay_secs } => delay_secs * 1000,
70            BackoffStrategy::Exponential {
71                base_secs,
72                max_secs,
73            } => {
74                let delay = base_secs.saturating_mul(2u64.saturating_pow(attempt));
75                delay.min(*max_secs) * 1000
76            }
77        }
78    }
79}
80
81/// Events emitted by the queue for the scheduler to react to.
82#[derive(Clone, Debug)]
83pub enum JobEvent {
84    Ready {
85        workflow_id: String,
86        job_id: String,
87    },
88    Started {
89        workflow_id: String,
90        job_id: String,
91        worker_id: String,
92    },
93    Completed {
94        workflow_id: String,
95        job_id: String,
96        outputs: HashMap<String, String>,
97    },
98    Failed {
99        workflow_id: String,
100        job_id: String,
101        error: String,
102        retryable: bool,
103    },
104    Cancelled {
105        workflow_id: String,
106        job_id: String,
107    },
108    LeaseExpired {
109        workflow_id: String,
110        job_id: String,
111        worker_id: String,
112    },
113}
114
115// ─── Log Types ───────────────────────────────────────────────────────────────
116
117/// A chunk of log output from a running job.
118#[derive(Clone, Debug, Serialize, Deserialize)]
119pub struct LogChunk {
120    pub workflow_id: String,
121    pub job_id: String,
122    pub sequence: u64,
123    pub data: String,
124    pub timestamp_ms: u64,
125    pub stream: LogStream,
126}
127
128#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
129#[serde(rename_all = "snake_case")]
130pub enum LogStream {
131    Stdout,
132    Stderr,
133}
134
135// ─── Worker Types ────────────────────────────────────────────────────────────
136
137#[derive(Clone, Debug, Serialize, Deserialize)]
138pub struct WorkerInfo {
139    pub worker_id: String,
140    pub labels: Vec<String>,
141    pub registered_at_ms: u64,
142    pub last_heartbeat_ms: u64,
143    pub current_job: Option<String>,
144    pub status: WorkerStatus,
145}
146
147#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
148#[serde(rename_all = "snake_case")]
149pub enum WorkerStatus {
150    Idle,
151    Busy,
152    Offline,
153}
154
155// ─── Trait: JobQueue ─────────────────────────────────────────────────────────
156
157/// Pluggable job queue backend.
158///
159/// Implementations: InMemoryJobQueue, PgBossJobQueue (Postgres), RedisJobQueue.
160///
161/// pg-boss mapping:
162/// - `enqueue()` → `boss.send(queue, data, options)`
163/// - `claim()`   → `boss.fetch(queue)` (SELECT ... FOR UPDATE SKIP LOCKED)
164/// - `complete()` → `boss.complete(jobId)`
165/// - `fail()`    → `boss.fail(jobId)`
166/// - `cancel()`  → `boss.cancel(jobId)`
167pub trait JobQueue: Send + Sync + 'static {
168    /// Enqueue a job for execution. Called by the scheduler when dependencies are met.
169    fn enqueue(&self, job: QueuedJob) -> impl Future<Output = Result<(), QueueError>> + Send;
170
171    /// Atomically claim the next available job matching the worker's labels.
172    /// Returns `None` if no matching job is available.
173    fn claim(
174        &self,
175        worker_id: &str,
176        worker_labels: &[String],
177        lease_ttl: Duration,
178    ) -> impl Future<Output = Result<Option<(QueuedJob, Lease)>, QueueError>> + Send;
179
180    /// Renew a lease (heartbeat). Returns error if the lease has already expired.
181    fn renew_lease(
182        &self,
183        lease_id: &str,
184        extend_by: Duration,
185    ) -> impl Future<Output = Result<(), QueueError>> + Send;
186
187    /// Complete a job successfully. Releases the lease and stores outputs.
188    fn complete(
189        &self,
190        lease_id: &str,
191        outputs: HashMap<String, String>,
192    ) -> impl Future<Output = Result<(), QueueError>> + Send;
193
194    /// Fail a job. The queue decides whether to re-enqueue based on RetryPolicy.
195    fn fail(
196        &self,
197        lease_id: &str,
198        error: String,
199        retryable: bool,
200    ) -> impl Future<Output = Result<(), QueueError>> + Send;
201
202    /// Cancel a specific job. If currently claimed, marks it for cancellation.
203    fn cancel(
204        &self,
205        workflow_id: &str,
206        job_id: &str,
207    ) -> impl Future<Output = Result<(), QueueError>> + Send;
208
209    /// Cancel all jobs for a workflow.
210    fn cancel_workflow(
211        &self,
212        workflow_id: &str,
213    ) -> impl Future<Output = Result<(), QueueError>> + Send;
214
215    /// Check if a job has been marked for cancellation (workers poll this).
216    fn is_cancelled(
217        &self,
218        workflow_id: &str,
219        job_id: &str,
220    ) -> impl Future<Output = Result<bool, QueueError>> + Send;
221
222    /// Collect expired leases and emit LeaseExpired events.
223    /// Called periodically by the server's monitor task.
224    fn reap_expired_leases(&self)
225    -> impl Future<Output = Result<Vec<JobEvent>, QueueError>> + Send;
226
227    /// Subscribe to job events for event-driven processing.
228    fn subscribe(&self) -> broadcast::Receiver<JobEvent>;
229}
230
231// ─── Trait: ArtifactStore ────────────────────────────────────────────────────
232
233/// Pluggable artifact/output storage.
234///
235/// Jobs publish key-value outputs; downstream jobs read upstream outputs.
236/// Implementations: InMemoryArtifactStore, S3ArtifactStore, FsArtifactStore.
237pub trait ArtifactStore: Send + Sync + 'static {
238    /// Store key-value outputs for a completed job.
239    fn put_outputs(
240        &self,
241        workflow_id: &str,
242        job_id: &str,
243        outputs: HashMap<String, String>,
244    ) -> impl Future<Output = Result<(), ArtifactError>> + Send;
245
246    /// Retrieve outputs for a specific job.
247    fn get_outputs(
248        &self,
249        workflow_id: &str,
250        job_id: &str,
251    ) -> impl Future<Output = Result<HashMap<String, String>, ArtifactError>> + Send;
252
253    /// Retrieve outputs for multiple upstream jobs at once.
254    fn get_upstream_outputs(
255        &self,
256        workflow_id: &str,
257        job_ids: &[String],
258    ) -> impl Future<Output = Result<HashMap<String, HashMap<String, String>>, ArtifactError>> + Send;
259}
260
261// ─── Trait: LogSink ──────────────────────────────────────────────────────────
262
263/// Pluggable log storage and streaming backend.
264///
265/// Workers push log chunks; the server streams them to clients via SSE.
266/// Implementations: InMemoryLogSink, FileLogSink, S3LogSink.
267pub trait LogSink: Send + Sync + 'static {
268    /// Append a log chunk from a worker.
269    fn append(&self, chunk: LogChunk) -> impl Future<Output = Result<(), LogError>> + Send;
270
271    /// Get all log chunks for a job (for catch-up on SSE connect).
272    fn get_all(
273        &self,
274        workflow_id: &str,
275        job_id: &str,
276    ) -> impl Future<Output = Result<Vec<LogChunk>, LogError>> + Send;
277
278    /// Subscribe to live log chunks for a specific job.
279    fn subscribe(&self, workflow_id: &str, job_id: &str) -> broadcast::Receiver<LogChunk>;
280}
281
282// ─── Trait: WorkerRegistry ───────────────────────────────────────────────────
283
284/// Registry of connected workers and their capabilities.
285///
286/// Used for monitoring and matching jobs to capable workers.
287pub trait WorkerRegistry: Send + Sync + 'static {
288    /// Register a worker with its capability labels.
289    fn register(
290        &self,
291        worker_id: &str,
292        labels: &[String],
293    ) -> impl Future<Output = Result<(), RegistryError>> + Send;
294
295    /// Record a heartbeat from a worker.
296    fn heartbeat(&self, worker_id: &str) -> impl Future<Output = Result<(), RegistryError>> + Send;
297
298    /// Remove a worker from the registry.
299    fn deregister(&self, worker_id: &str)
300    -> impl Future<Output = Result<(), RegistryError>> + Send;
301
302    /// List all registered workers.
303    fn list_workers(&self) -> impl Future<Output = Result<Vec<WorkerInfo>, RegistryError>> + Send;
304
305    /// Mark a worker as busy with a specific job.
306    fn mark_busy(
307        &self,
308        worker_id: &str,
309        job_id: &str,
310    ) -> impl Future<Output = Result<(), RegistryError>> + Send;
311
312    /// Mark a worker as idle (finished or released a job).
313    fn mark_idle(&self, worker_id: &str) -> impl Future<Output = Result<(), RegistryError>> + Send;
314}