1use std::collections::HashMap;
2use std::future::Future;
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::broadcast;
7
8use crate::error::{ArtifactError, LogError, QueueError, RegistryError};
9
10#[derive(Clone, Debug, Serialize, Deserialize)]
14pub struct Lease {
15 pub lease_id: String,
16 pub job_id: String,
17 pub workflow_id: String,
18 pub worker_id: String,
19 pub ttl_secs: u64,
20 pub granted_at_ms: u64,
22}
23
24#[derive(Clone, Debug, Serialize, Deserialize)]
26pub struct QueuedJob {
27 pub job_id: String,
28 pub workflow_id: String,
29 pub command: String,
30 pub required_labels: Vec<String>,
31 pub retry_policy: RetryPolicy,
32 pub attempt: u32,
33 pub upstream_outputs: HashMap<String, HashMap<String, String>>,
35 pub enqueued_at_ms: u64,
36 #[serde(default)]
38 pub delayed_until_ms: u64,
39}
40
41#[derive(Clone, Debug, Serialize, Deserialize)]
43pub struct RetryPolicy {
44 pub max_retries: u32,
45 pub backoff: BackoffStrategy,
46}
47
48impl Default for RetryPolicy {
49 fn default() -> Self {
50 Self {
51 max_retries: 0,
52 backoff: BackoffStrategy::None,
53 }
54 }
55}
56
57#[derive(Clone, Debug, Serialize, Deserialize)]
58pub enum BackoffStrategy {
59 None,
60 Fixed { delay_secs: u64 },
61 Exponential { base_secs: u64, max_secs: u64 },
62}
63
64impl BackoffStrategy {
65 pub fn delay_ms(&self, attempt: u32) -> u64 {
67 match self {
68 BackoffStrategy::None => 0,
69 BackoffStrategy::Fixed { delay_secs } => delay_secs * 1000,
70 BackoffStrategy::Exponential {
71 base_secs,
72 max_secs,
73 } => {
74 let delay = base_secs.saturating_mul(2u64.saturating_pow(attempt));
75 delay.min(*max_secs) * 1000
76 }
77 }
78 }
79}
80
81#[derive(Clone, Debug)]
83pub enum JobEvent {
84 Ready {
85 workflow_id: String,
86 job_id: String,
87 },
88 Started {
89 workflow_id: String,
90 job_id: String,
91 worker_id: String,
92 },
93 Completed {
94 workflow_id: String,
95 job_id: String,
96 outputs: HashMap<String, String>,
97 },
98 Failed {
99 workflow_id: String,
100 job_id: String,
101 error: String,
102 retryable: bool,
103 },
104 Cancelled {
105 workflow_id: String,
106 job_id: String,
107 },
108 LeaseExpired {
109 workflow_id: String,
110 job_id: String,
111 worker_id: String,
112 },
113}
114
115#[derive(Clone, Debug, Serialize, Deserialize)]
119pub struct LogChunk {
120 pub workflow_id: String,
121 pub job_id: String,
122 pub sequence: u64,
123 pub data: String,
124 pub timestamp_ms: u64,
125 pub stream: LogStream,
126}
127
128#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
129#[serde(rename_all = "snake_case")]
130pub enum LogStream {
131 Stdout,
132 Stderr,
133}
134
135#[derive(Clone, Debug, Serialize, Deserialize)]
138pub struct WorkerInfo {
139 pub worker_id: String,
140 pub labels: Vec<String>,
141 pub registered_at_ms: u64,
142 pub last_heartbeat_ms: u64,
143 pub current_job: Option<String>,
144 pub status: WorkerStatus,
145}
146
147#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
148#[serde(rename_all = "snake_case")]
149pub enum WorkerStatus {
150 Idle,
151 Busy,
152 Offline,
153}
154
155pub trait JobQueue: Send + Sync + 'static {
168 fn enqueue(&self, job: QueuedJob) -> impl Future<Output = Result<(), QueueError>> + Send;
170
171 fn claim(
174 &self,
175 worker_id: &str,
176 worker_labels: &[String],
177 lease_ttl: Duration,
178 ) -> impl Future<Output = Result<Option<(QueuedJob, Lease)>, QueueError>> + Send;
179
180 fn renew_lease(
182 &self,
183 lease_id: &str,
184 extend_by: Duration,
185 ) -> impl Future<Output = Result<(), QueueError>> + Send;
186
187 fn complete(
189 &self,
190 lease_id: &str,
191 outputs: HashMap<String, String>,
192 ) -> impl Future<Output = Result<(), QueueError>> + Send;
193
194 fn fail(
196 &self,
197 lease_id: &str,
198 error: String,
199 retryable: bool,
200 ) -> impl Future<Output = Result<(), QueueError>> + Send;
201
202 fn cancel(
204 &self,
205 workflow_id: &str,
206 job_id: &str,
207 ) -> impl Future<Output = Result<(), QueueError>> + Send;
208
209 fn cancel_workflow(
211 &self,
212 workflow_id: &str,
213 ) -> impl Future<Output = Result<(), QueueError>> + Send;
214
215 fn is_cancelled(
217 &self,
218 workflow_id: &str,
219 job_id: &str,
220 ) -> impl Future<Output = Result<bool, QueueError>> + Send;
221
222 fn reap_expired_leases(&self)
225 -> impl Future<Output = Result<Vec<JobEvent>, QueueError>> + Send;
226
227 fn subscribe(&self) -> broadcast::Receiver<JobEvent>;
229}
230
231pub trait ArtifactStore: Send + Sync + 'static {
238 fn put_outputs(
240 &self,
241 workflow_id: &str,
242 job_id: &str,
243 outputs: HashMap<String, String>,
244 ) -> impl Future<Output = Result<(), ArtifactError>> + Send;
245
246 fn get_outputs(
248 &self,
249 workflow_id: &str,
250 job_id: &str,
251 ) -> impl Future<Output = Result<HashMap<String, String>, ArtifactError>> + Send;
252
253 fn get_upstream_outputs(
255 &self,
256 workflow_id: &str,
257 job_ids: &[String],
258 ) -> impl Future<Output = Result<HashMap<String, HashMap<String, String>>, ArtifactError>> + Send;
259}
260
261pub trait LogSink: Send + Sync + 'static {
268 fn append(&self, chunk: LogChunk) -> impl Future<Output = Result<(), LogError>> + Send;
270
271 fn get_all(
273 &self,
274 workflow_id: &str,
275 job_id: &str,
276 ) -> impl Future<Output = Result<Vec<LogChunk>, LogError>> + Send;
277
278 fn subscribe(&self, workflow_id: &str, job_id: &str) -> broadcast::Receiver<LogChunk>;
280}
281
282pub trait WorkerRegistry: Send + Sync + 'static {
288 fn register(
290 &self,
291 worker_id: &str,
292 labels: &[String],
293 ) -> impl Future<Output = Result<(), RegistryError>> + Send;
294
295 fn heartbeat(&self, worker_id: &str) -> impl Future<Output = Result<(), RegistryError>> + Send;
297
298 fn deregister(&self, worker_id: &str)
300 -> impl Future<Output = Result<(), RegistryError>> + Send;
301
302 fn list_workers(&self) -> impl Future<Output = Result<Vec<WorkerInfo>, RegistryError>> + Send;
304
305 fn mark_busy(
307 &self,
308 worker_id: &str,
309 job_id: &str,
310 ) -> impl Future<Output = Result<(), RegistryError>> + Send;
311
312 fn mark_idle(&self, worker_id: &str) -> impl Future<Output = Result<(), RegistryError>> + Send;
314}