1use crate::ids::{AttemptId, RunId, TaskId};
4use crate::run::state::RunState;
5use crate::run::transitions::is_valid_transition;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum RunInstanceConstructionError {
10 InvalidRunId {
12 run_id: RunId,
14 },
15 InvalidTaskId {
17 task_id: TaskId,
19 },
20 ReadyScheduledAtAfterCreatedAt {
22 run_id: RunId,
24 scheduled_at: u64,
26 created_at: u64,
28 },
29}
30
31impl std::fmt::Display for RunInstanceConstructionError {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 match self {
34 Self::InvalidRunId { run_id } => {
35 write!(f, "invalid run_id for run construction: {run_id}")
36 }
37 Self::InvalidTaskId { task_id } => {
38 write!(f, "invalid task_id for run construction: {task_id}")
39 }
40 Self::ReadyScheduledAtAfterCreatedAt { run_id, scheduled_at, created_at } => {
41 write!(
42 f,
43 "run {run_id} cannot be created in Ready with scheduled_at ({scheduled_at}) > \
44 created_at ({created_at})"
45 )
46 }
47 }
48 }
49}
50
51impl std::error::Error for RunInstanceConstructionError {}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
58#[cfg_attr(feature = "serde", derive(serde::Serialize))]
59#[must_use]
60pub struct RunInstance {
61 id: RunId,
63
64 task_id: TaskId,
66
67 state: RunState,
69
70 current_attempt_id: Option<AttemptId>,
73
74 attempt_count: u32,
76
77 created_at: u64,
79
80 scheduled_at: u64,
84
85 effective_priority: i32,
88
89 #[cfg_attr(feature = "serde", serde(default))]
92 last_state_change_at: u64,
93}
94
95impl RunInstance {
96 pub fn new_scheduled(
98 task_id: TaskId,
99 scheduled_at: u64,
100 created_at: u64,
101 ) -> Result<Self, RunInstanceConstructionError> {
102 Self::new_scheduled_with_id(RunId::new(), task_id, scheduled_at, created_at)
103 }
104
105 pub fn new_scheduled_with_id(
110 id: RunId,
111 task_id: TaskId,
112 scheduled_at: u64,
113 created_at: u64,
114 ) -> Result<Self, RunInstanceConstructionError> {
115 Self::validate_identifiers(id, task_id)?;
116
117 Ok(Self {
118 id,
119 task_id,
120 state: RunState::Scheduled,
121 current_attempt_id: None,
122 attempt_count: 0,
123 created_at,
124 scheduled_at,
125 effective_priority: 0,
126 last_state_change_at: created_at,
127 })
128 }
129
130 pub fn new_ready(
132 task_id: TaskId,
133 scheduled_at: u64,
134 created_at: u64,
135 effective_priority: i32,
136 ) -> Result<Self, RunInstanceConstructionError> {
137 Self::new_ready_with_id(RunId::new(), task_id, scheduled_at, created_at, effective_priority)
138 }
139
140 pub fn new_ready_with_id(
142 id: RunId,
143 task_id: TaskId,
144 scheduled_at: u64,
145 created_at: u64,
146 effective_priority: i32,
147 ) -> Result<Self, RunInstanceConstructionError> {
148 Self::validate_identifiers(id, task_id)?;
149
150 if scheduled_at > created_at {
151 return Err(RunInstanceConstructionError::ReadyScheduledAtAfterCreatedAt {
152 run_id: id,
153 scheduled_at,
154 created_at,
155 });
156 }
157
158 Ok(Self {
159 id,
160 task_id,
161 state: RunState::Ready,
162 current_attempt_id: None,
163 attempt_count: 0,
164 created_at,
165 scheduled_at,
166 effective_priority,
167 last_state_change_at: created_at,
168 })
169 }
170
171 fn validate_identifiers(
173 id: RunId,
174 task_id: TaskId,
175 ) -> Result<(), RunInstanceConstructionError> {
176 if id.as_uuid().is_nil() {
177 return Err(RunInstanceConstructionError::InvalidRunId { run_id: id });
178 }
179
180 if task_id.as_uuid().is_nil() {
181 return Err(RunInstanceConstructionError::InvalidTaskId { task_id });
182 }
183
184 Ok(())
185 }
186
187 pub fn is_terminal(&self) -> bool {
189 self.state.is_terminal()
190 }
191
192 pub fn id(&self) -> RunId {
194 self.id
195 }
196
197 pub fn task_id(&self) -> TaskId {
199 self.task_id
200 }
201
202 pub fn state(&self) -> RunState {
204 self.state
205 }
206
207 pub fn current_attempt_id(&self) -> Option<AttemptId> {
209 self.current_attempt_id
210 }
211
212 pub fn attempt_count(&self) -> u32 {
214 self.attempt_count
215 }
216
217 pub fn created_at(&self) -> u64 {
219 self.created_at
220 }
221
222 pub fn scheduled_at(&self) -> u64 {
224 self.scheduled_at
225 }
226
227 pub fn effective_priority(&self) -> i32 {
229 self.effective_priority
230 }
231
232 pub fn last_state_change_at(&self) -> u64 {
234 self.last_state_change_at
235 }
236
237 pub fn record_state_change_at(&mut self, timestamp: u64) {
248 self.last_state_change_at = timestamp;
249 }
250
251 pub fn transition_to(&mut self, new_state: RunState) -> Result<(), RunInstanceError> {
256 if !is_valid_transition(self.state, new_state) {
257 return Err(RunInstanceError::InvalidTransition {
258 run_id: self.id,
259 from: self.state,
260 to: new_state,
261 });
262 }
263
264 if self.state == RunState::Running
265 && new_state != RunState::Running
266 && new_state != RunState::Canceled
267 && self.current_attempt_id.is_some()
268 {
269 return Err(RunInstanceError::AttemptInProgress {
270 run_id: self.id,
271 active_attempt_id: self.current_attempt_id.expect("checked is_some above"),
272 });
273 }
274
275 if new_state != RunState::Running {
276 self.current_attempt_id = None;
277 }
278 self.state = new_state;
279 Ok(())
280 }
281
282 pub fn promote_to_ready(&mut self) -> Result<(), RunInstanceError> {
284 self.transition_to(RunState::Ready)
285 }
286
287 pub fn promote_to_ready_with_priority(
289 &mut self,
290 effective_priority: i32,
291 ) -> Result<(), RunInstanceError> {
292 self.transition_to(RunState::Ready)?;
293 self.effective_priority = effective_priority;
294 Ok(())
295 }
296
297 pub fn set_effective_priority(
299 &mut self,
300 effective_priority: i32,
301 ) -> Result<(), RunInstanceError> {
302 if self.state != RunState::Ready {
303 return Err(RunInstanceError::PriorityMutationRequiresReady {
304 run_id: self.id,
305 current_state: self.state,
306 });
307 }
308
309 self.effective_priority = effective_priority;
310 Ok(())
311 }
312
313 pub fn start_attempt(&mut self, attempt_id: AttemptId) -> Result<(), RunInstanceError> {
315 if self.state != RunState::Running {
316 return Err(RunInstanceError::AttemptStartRequiresRunning {
317 run_id: self.id,
318 current_state: self.state,
319 });
320 }
321
322 if self.current_attempt_id.is_some() {
323 return Err(RunInstanceError::AttemptAlreadyActive {
324 run_id: self.id,
325 active_attempt_id: self.current_attempt_id.expect("checked is_some above"),
326 });
327 }
328
329 self.attempt_count = self
330 .attempt_count
331 .checked_add(1)
332 .ok_or(RunInstanceError::AttemptCountOverflow { run_id: self.id })?;
333 self.current_attempt_id = Some(attempt_id);
334 Ok(())
335 }
336
337 pub fn finish_attempt(&mut self, attempt_id: AttemptId) -> Result<(), RunInstanceError> {
339 if self.state != RunState::Running {
340 return Err(RunInstanceError::AttemptFinishRequiresRunning {
341 run_id: self.id,
342 current_state: self.state,
343 });
344 }
345
346 match self.current_attempt_id {
347 Some(active_attempt_id) if active_attempt_id == attempt_id => {
348 self.current_attempt_id = None;
349 Ok(())
350 }
351 Some(active_attempt_id) => Err(RunInstanceError::AttemptOwnershipMismatch {
352 run_id: self.id,
353 expected_attempt_id: active_attempt_id,
354 actual_attempt_id: attempt_id,
355 }),
356 None => Err(RunInstanceError::NoActiveAttempt { run_id: self.id }),
357 }
358 }
359}
360
361#[derive(Debug, Clone, Copy, PartialEq, Eq)]
363pub enum RunInstanceError {
364 InvalidTransition {
366 run_id: RunId,
368 from: RunState,
370 to: RunState,
372 },
373 AttemptInProgress {
375 run_id: RunId,
377 active_attempt_id: AttemptId,
379 },
380 AttemptStartRequiresRunning {
382 run_id: RunId,
384 current_state: RunState,
386 },
387 AttemptFinishRequiresRunning {
389 run_id: RunId,
391 current_state: RunState,
393 },
394 AttemptAlreadyActive {
396 run_id: RunId,
398 active_attempt_id: AttemptId,
400 },
401 AttemptOwnershipMismatch {
403 run_id: RunId,
405 expected_attempt_id: AttemptId,
407 actual_attempt_id: AttemptId,
409 },
410 NoActiveAttempt {
412 run_id: RunId,
414 },
415 AttemptCountOverflow {
417 run_id: RunId,
419 },
420 PriorityMutationRequiresReady {
422 run_id: RunId,
424 current_state: RunState,
426 },
427}
428
429impl std::fmt::Display for RunInstanceError {
430 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431 match self {
432 RunInstanceError::InvalidTransition { run_id, from, to } => {
433 write!(f, "invalid run transition for {run_id}: {from:?} -> {to:?}")
434 }
435 RunInstanceError::AttemptInProgress { run_id, active_attempt_id } => {
436 write!(
437 f,
438 "run {run_id} cannot leave Running while attempt {active_attempt_id} is active"
439 )
440 }
441 RunInstanceError::AttemptStartRequiresRunning { run_id, current_state } => {
442 write!(f, "run {run_id} cannot start attempt in state {current_state:?}")
443 }
444 RunInstanceError::AttemptFinishRequiresRunning { run_id, current_state } => {
445 write!(f, "run {run_id} cannot finish attempt in state {current_state:?}")
446 }
447 RunInstanceError::AttemptAlreadyActive { run_id, active_attempt_id } => {
448 write!(f, "run {run_id} already has active attempt {active_attempt_id}")
449 }
450 RunInstanceError::AttemptOwnershipMismatch {
451 run_id,
452 expected_attempt_id,
453 actual_attempt_id,
454 } => {
455 write!(
456 f,
457 "run {run_id} attempt mismatch: expected {expected_attempt_id}, got \
458 {actual_attempt_id}"
459 )
460 }
461 RunInstanceError::NoActiveAttempt { run_id } => {
462 write!(f, "run {run_id} has no active attempt")
463 }
464 RunInstanceError::AttemptCountOverflow { run_id } => {
465 write!(f, "run {run_id} attempt counter overflow")
466 }
467 RunInstanceError::PriorityMutationRequiresReady { run_id, current_state } => {
468 write!(
469 f,
470 "run {run_id} priority can only be updated in Ready state (current: \
471 {current_state:?})"
472 )
473 }
474 }
475 }
476}
477
478impl std::error::Error for RunInstanceError {}
479
480#[cfg(feature = "serde")]
481impl<'de> serde::Deserialize<'de> for RunInstance {
482 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
483 where
484 D: serde::Deserializer<'de>,
485 {
486 #[derive(serde::Deserialize)]
487 struct RunInstanceWire {
488 id: RunId,
489 task_id: TaskId,
490 state: RunState,
491 current_attempt_id: Option<AttemptId>,
492 attempt_count: u32,
493 created_at: u64,
494 scheduled_at: u64,
495 effective_priority: i32,
496 #[serde(default)]
497 last_state_change_at: u64,
498 }
499
500 let wire = RunInstanceWire::deserialize(deserializer)?;
501
502 if wire.id.as_uuid().is_nil() {
504 return Err(serde::de::Error::custom("run_id must not be nil"));
505 }
506 if wire.task_id.as_uuid().is_nil() {
507 return Err(serde::de::Error::custom("task_id must not be nil"));
508 }
509
510 if wire.state != RunState::Running && wire.current_attempt_id.is_some() {
512 return Err(serde::de::Error::custom(
513 "active attempt_id is only valid in Running state",
514 ));
515 }
516 if wire.state.is_terminal() && wire.current_attempt_id.is_some() {
517 return Err(serde::de::Error::custom("terminal state cannot have an active attempt"));
518 }
519
520 if wire.state == RunState::Ready && wire.scheduled_at > wire.created_at {
522 return Err(serde::de::Error::custom(format!(
523 "Ready state requires scheduled_at ({}) <= created_at ({})",
524 wire.scheduled_at, wire.created_at,
525 )));
526 }
527
528 Ok(RunInstance {
529 id: wire.id,
530 task_id: wire.task_id,
531 state: wire.state,
532 current_attempt_id: wire.current_attempt_id,
533 attempt_count: wire.attempt_count,
534 created_at: wire.created_at,
535 scheduled_at: wire.scheduled_at,
536 effective_priority: wire.effective_priority,
537 last_state_change_at: wire.last_state_change_at,
538 })
539 }
540}
541
542impl RunInstance {
543 pub fn restore_attempt_state_for_bootstrap(
554 &mut self,
555 count: u32,
556 active_attempt: Option<AttemptId>,
557 ) {
558 self.attempt_count = count;
559 self.current_attempt_id = active_attempt;
560 }
561}
562
563#[cfg(test)]
564impl RunInstance {
565 pub(crate) fn set_attempt_count_for_testing(&mut self, count: u32) {
567 self.attempt_count = count;
568 }
569}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574 use crate::ids::{AttemptId, TaskId};
575
576 #[test]
577 fn start_attempt_at_u32_max_returns_overflow_error() {
578 let mut run =
579 RunInstance::new_scheduled(TaskId::new(), 1_000, 1_000).expect("valid scheduled run");
580 run.transition_to(RunState::Ready).unwrap();
581 run.transition_to(RunState::Leased).unwrap();
582 run.transition_to(RunState::Running).unwrap();
583 run.set_attempt_count_for_testing(u32::MAX);
584
585 let result = run.start_attempt(AttemptId::new());
586
587 assert!(result.is_err(), "start_attempt at u32::MAX should fail");
588 assert_eq!(
589 result.unwrap_err(),
590 RunInstanceError::AttemptCountOverflow { run_id: run.id() },
591 );
592 }
593}