1mod concurrency;
2mod machine;
3#[cfg(test)]
4mod tests;
5
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::fmt;
9use std::str::FromStr;
10use std::sync::LazyLock;
11
12use crate::identifiers::RunnerId;
13
14pub use concurrency::ConcurrencyControlType;
15pub use machine::{
16 compute_new_owner, status_record_transition, validate_ownership, validate_transition,
17 OwnershipError, StatusMachineError, StatusTransitionError,
18};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
29#[non_exhaustive]
30pub enum InvocationStatus {
31 Registered,
33 ConcurrencyControlled,
35 ConcurrencyControlledFinal,
37 Rerouted,
39 Pending,
41 PendingRecovery,
43 Running,
45 RunningRecovery,
47 Paused,
49 Resumed,
51 Killed,
53 Success,
55 Failed,
57 Retry,
59}
60
61pub const ALL_STATUSES: &[InvocationStatus] = &[
63 InvocationStatus::Registered,
64 InvocationStatus::ConcurrencyControlled,
65 InvocationStatus::ConcurrencyControlledFinal,
66 InvocationStatus::Rerouted,
67 InvocationStatus::Pending,
68 InvocationStatus::PendingRecovery,
69 InvocationStatus::Running,
70 InvocationStatus::RunningRecovery,
71 InvocationStatus::Paused,
72 InvocationStatus::Resumed,
73 InvocationStatus::Killed,
74 InvocationStatus::Success,
75 InvocationStatus::Failed,
76 InvocationStatus::Retry,
77];
78
79impl InvocationStatus {
80 #[inline]
82 pub fn is_terminal(&self) -> bool {
83 STATUS_CONFIG.definition(*self).is_final
84 }
85
86 #[inline]
88 pub fn is_available_for_run(&self) -> bool {
89 STATUS_CONFIG.definition(*self).available_for_run
90 }
91
92 #[inline]
94 pub fn valid_transitions(&self) -> &[InvocationStatus] {
95 &STATUS_CONFIG.definition(*self).allowed_transitions
96 }
97
98 #[inline]
100 pub fn can_transition_to(&self, next: InvocationStatus) -> bool {
101 self.valid_transitions().contains(&next)
102 }
103
104 pub fn final_statuses() -> &'static [InvocationStatus] {
106 &STATUS_CONFIG.final_statuses
107 }
108
109 pub fn available_for_run_statuses() -> &'static [InvocationStatus] {
111 &STATUS_CONFIG.available_for_run_statuses
112 }
113}
114
115impl fmt::Display for InvocationStatus {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 match self {
118 Self::Registered => write!(f, "REGISTERED"),
119 Self::ConcurrencyControlled => write!(f, "CONCURRENCY_CONTROLLED"),
120 Self::ConcurrencyControlledFinal => write!(f, "CONCURRENCY_CONTROLLED_FINAL"),
121 Self::Rerouted => write!(f, "REROUTED"),
122 Self::Pending => write!(f, "PENDING"),
123 Self::PendingRecovery => write!(f, "PENDING_RECOVERY"),
124 Self::Running => write!(f, "RUNNING"),
125 Self::RunningRecovery => write!(f, "RUNNING_RECOVERY"),
126 Self::Paused => write!(f, "PAUSED"),
127 Self::Resumed => write!(f, "RESUMED"),
128 Self::Killed => write!(f, "KILLED"),
129 Self::Success => write!(f, "SUCCESS"),
130 Self::Failed => write!(f, "FAILED"),
131 Self::Retry => write!(f, "RETRY"),
132 }
133 }
134}
135
136impl FromStr for InvocationStatus {
137 type Err = String;
138
139 fn from_str(s: &str) -> Result<Self, Self::Err> {
140 match s.to_uppercase().as_str() {
141 "REGISTERED" => Ok(Self::Registered),
142 "CONCURRENCY_CONTROLLED" => Ok(Self::ConcurrencyControlled),
143 "CONCURRENCY_CONTROLLED_FINAL" => Ok(Self::ConcurrencyControlledFinal),
144 "REROUTED" => Ok(Self::Rerouted),
145 "PENDING" => Ok(Self::Pending),
146 "PENDING_RECOVERY" => Ok(Self::PendingRecovery),
147 "RUNNING" => Ok(Self::Running),
148 "RUNNING_RECOVERY" => Ok(Self::RunningRecovery),
149 "PAUSED" => Ok(Self::Paused),
150 "RESUMED" => Ok(Self::Resumed),
151 "KILLED" => Ok(Self::Killed),
152 "SUCCESS" => Ok(Self::Success),
153 "FAILED" => Ok(Self::Failed),
154 "RETRY" => Ok(Self::Retry),
155 other => Err(format!("unknown invocation status: {other}")),
156 }
157 }
158}
159
160#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
166pub struct InvocationStatusRecord {
167 pub status: InvocationStatus,
168 pub runner_id: Option<RunnerId>,
169 pub timestamp: DateTime<Utc>,
170}
171
172impl InvocationStatusRecord {
173 pub fn new(status: InvocationStatus, runner_id: Option<RunnerId>) -> Self {
174 Self {
175 status,
176 runner_id,
177 timestamp: Utc::now(),
178 }
179 }
180}
181
182#[derive(Debug, Clone)]
190pub struct StatusDefinition {
191 pub allowed_transitions: Vec<InvocationStatus>,
193 pub is_final: bool,
195 pub available_for_run: bool,
197 pub requires_ownership: bool,
199 pub acquires_ownership: bool,
201 pub releases_ownership: bool,
203 pub overrides_ownership: bool,
205}
206
207impl StatusDefinition {
208 const fn new() -> Self {
209 Self {
210 allowed_transitions: Vec::new(),
211 is_final: false,
212 available_for_run: false,
213 requires_ownership: false,
214 acquires_ownership: false,
215 releases_ownership: false,
216 overrides_ownership: false,
217 }
218 }
219}
220
221pub(super) struct StatusConfiguration {
227 pub(super) initial: StatusDefinition,
229 definitions: Vec<(InvocationStatus, StatusDefinition)>,
231 pub(super) final_statuses: Vec<InvocationStatus>,
233 pub(super) available_for_run_statuses: Vec<InvocationStatus>,
235}
236
237impl StatusConfiguration {
238 pub(super) fn definition(&self, status: InvocationStatus) -> &StatusDefinition {
239 self.definitions
240 .iter()
241 .find(|(s, _)| *s == status)
242 .map_or_else(
243 || panic!("missing StatusDefinition for {status:?}"),
244 |(_, d)| d,
245 )
246 }
247}
248
249fn build_config() -> StatusConfiguration {
251 use InvocationStatus::*;
252
253 let initial = StatusDefinition {
254 allowed_transitions: vec![Registered],
255 ..StatusDefinition::new()
256 };
257
258 let definitions = vec![
259 (
260 Registered,
261 StatusDefinition {
262 allowed_transitions: vec![
263 Pending,
264 ConcurrencyControlled,
265 ConcurrencyControlledFinal,
266 ],
267 available_for_run: true,
268 releases_ownership: true,
269 ..StatusDefinition::new()
270 },
271 ),
272 (
273 ConcurrencyControlled,
274 StatusDefinition {
275 allowed_transitions: vec![Rerouted],
276 releases_ownership: true,
277 ..StatusDefinition::new()
278 },
279 ),
280 (
281 Rerouted,
282 StatusDefinition {
283 allowed_transitions: vec![Pending, ConcurrencyControlled],
284 available_for_run: true,
285 releases_ownership: true,
286 ..StatusDefinition::new()
287 },
288 ),
289 (
290 Pending,
291 StatusDefinition {
292 allowed_transitions: vec![Running, Killed, Rerouted, Failed, PendingRecovery],
296 requires_ownership: true,
297 acquires_ownership: true,
298 ..StatusDefinition::new()
299 },
300 ),
301 (
302 PendingRecovery,
303 StatusDefinition {
304 allowed_transitions: vec![Rerouted],
305 releases_ownership: true,
306 overrides_ownership: true,
307 ..StatusDefinition::new()
308 },
309 ),
310 (
311 Running,
312 StatusDefinition {
313 allowed_transitions: vec![Paused, Killed, Retry, Success, Failed, RunningRecovery],
314 requires_ownership: true,
315 ..StatusDefinition::new()
316 },
317 ),
318 (
319 RunningRecovery,
320 StatusDefinition {
321 allowed_transitions: vec![Rerouted],
322 releases_ownership: true,
323 overrides_ownership: true,
324 ..StatusDefinition::new()
325 },
326 ),
327 (
328 Paused,
329 StatusDefinition {
330 allowed_transitions: vec![Resumed, Killed],
331 requires_ownership: true,
332 ..StatusDefinition::new()
333 },
334 ),
335 (
336 Resumed,
337 StatusDefinition {
338 allowed_transitions: vec![Paused, Killed, Retry, Success, Failed],
339 requires_ownership: true,
340 ..StatusDefinition::new()
341 },
342 ),
343 (
344 Killed,
345 StatusDefinition {
346 allowed_transitions: vec![Rerouted],
347 releases_ownership: true,
348 ..StatusDefinition::new()
349 },
350 ),
351 (
352 Retry,
353 StatusDefinition {
354 allowed_transitions: vec![Pending],
355 available_for_run: true,
356 releases_ownership: true,
357 ..StatusDefinition::new()
358 },
359 ),
360 (
361 Success,
362 StatusDefinition {
363 is_final: true,
364 releases_ownership: true,
365 ..StatusDefinition::new()
366 },
367 ),
368 (
369 Failed,
370 StatusDefinition {
371 is_final: true,
372 releases_ownership: true,
373 ..StatusDefinition::new()
374 },
375 ),
376 (
377 ConcurrencyControlledFinal,
378 StatusDefinition {
379 is_final: true,
380 releases_ownership: true,
381 ..StatusDefinition::new()
382 },
383 ),
384 ];
385
386 let final_statuses: Vec<_> = definitions
387 .iter()
388 .filter(|(_, d)| d.is_final)
389 .map(|(s, _)| *s)
390 .collect();
391 let available_for_run_statuses: Vec<_> = definitions
392 .iter()
393 .filter(|(_, d)| d.available_for_run)
394 .map(|(s, _)| *s)
395 .collect();
396
397 StatusConfiguration {
398 initial,
399 definitions,
400 final_statuses,
401 available_for_run_statuses,
402 }
403}
404
405pub(super) static STATUS_CONFIG: LazyLock<StatusConfiguration> = LazyLock::new(build_config);
406
407pub fn get_status_definition(status: InvocationStatus) -> &'static StatusDefinition {
409 STATUS_CONFIG.definition(status)
410}
411
412pub fn get_initial_definition() -> &'static StatusDefinition {
414 &STATUS_CONFIG.initial
415}