1use actionqueue_core::mutation::{
7 DurabilityPolicy, MutationAuthority, MutationCommand, MutationOutcome,
8 RunStateTransitionCommand,
9};
10use actionqueue_core::run::run_instance::{RunInstance, RunInstanceError};
11use actionqueue_core::run::state::RunState;
12
13use crate::index::scheduled::ScheduledIndex;
14
15#[derive(Debug, Clone, PartialEq, Eq)]
21#[must_use]
22pub struct PromotionResult {
23 promoted: Vec<RunInstance>,
25 remaining_scheduled: Vec<RunInstance>,
27}
28
29impl PromotionResult {
30 pub fn promoted(&self) -> &[RunInstance] {
32 &self.promoted
33 }
34
35 pub fn remaining_scheduled(&self) -> &[RunInstance] {
37 &self.remaining_scheduled
38 }
39}
40
41#[derive(Debug, Clone, PartialEq, Eq)]
43#[must_use]
44pub struct AuthorityPromotionResult {
45 outcomes: Vec<MutationOutcome>,
47 remaining_scheduled: Vec<RunInstance>,
49}
50
51impl AuthorityPromotionResult {
52 pub fn outcomes(&self) -> &[MutationOutcome] {
54 &self.outcomes
55 }
56
57 pub fn remaining_scheduled(&self) -> &[RunInstance] {
59 &self.remaining_scheduled
60 }
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
65pub enum AuthorityPromotionError<AuthorityError> {
66 SequenceOverflow,
68 Authority {
70 run_id: actionqueue_core::ids::RunId,
72 source: AuthorityError,
74 },
75}
76
77impl<E: std::fmt::Display> std::fmt::Display for AuthorityPromotionError<E> {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 match self {
80 AuthorityPromotionError::SequenceOverflow => {
81 write!(f, "promotion command sequencing overflowed u64")
82 }
83 AuthorityPromotionError::Authority { run_id, source } => {
84 write!(f, "authority error for run {run_id}: {source}")
85 }
86 }
87 }
88}
89
90impl<E: std::fmt::Debug + std::fmt::Display> std::error::Error for AuthorityPromotionError<E> {}
91
92pub struct PromotionParams {
94 current_time: u64,
96 first_sequence: u64,
98 event_timestamp: u64,
100 durability: DurabilityPolicy,
102}
103
104impl PromotionParams {
105 pub fn new(
107 current_time: u64,
108 first_sequence: u64,
109 event_timestamp: u64,
110 durability: DurabilityPolicy,
111 ) -> Self {
112 Self { current_time, first_sequence, event_timestamp, durability }
113 }
114
115 pub fn current_time(&self) -> u64 {
117 self.current_time
118 }
119
120 pub fn first_sequence(&self) -> u64 {
122 self.first_sequence
123 }
124
125 pub fn event_timestamp(&self) -> u64 {
127 self.event_timestamp
128 }
129
130 pub fn durability(&self) -> DurabilityPolicy {
132 self.durability
133 }
134}
135
136pub fn promote_scheduled_to_ready_via_authority<A: MutationAuthority>(
141 scheduled: &ScheduledIndex,
142 params: PromotionParams,
143 authority: &mut A,
144) -> Result<AuthorityPromotionResult, AuthorityPromotionError<A::Error>> {
145 let PromotionParams { current_time, first_sequence, event_timestamp, durability } = params;
146 let runs = scheduled.runs();
147 let (ready_for_promotion, still_waiting): (Vec<&RunInstance>, Vec<&RunInstance>) =
148 runs.iter().partition(|run| run.scheduled_at() <= current_time);
149
150 let mut outcomes = Vec::with_capacity(ready_for_promotion.len());
151 for (index, run) in ready_for_promotion.into_iter().enumerate() {
152 let offset = u64::try_from(index).map_err(|_| AuthorityPromotionError::SequenceOverflow)?;
153 let sequence =
154 first_sequence.checked_add(offset).ok_or(AuthorityPromotionError::SequenceOverflow)?;
155
156 let command = MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
157 sequence,
158 run.id(),
159 RunState::Scheduled,
160 RunState::Ready,
161 event_timestamp,
162 ));
163
164 let outcome = authority
165 .submit_command(command, durability)
166 .map_err(|source| AuthorityPromotionError::Authority { run_id: run.id(), source })?;
167 outcomes.push(outcome);
168 }
169
170 let remaining_scheduled: Vec<RunInstance> = still_waiting.into_iter().cloned().collect();
171 Ok(AuthorityPromotionResult { outcomes, remaining_scheduled })
172}
173
174pub fn promote_scheduled_to_ready(
191 scheduled: &ScheduledIndex,
192 current_time: u64,
193) -> Result<PromotionResult, RunInstanceError> {
194 let runs = scheduled.runs();
195
196 let (ready_for_promotion, still_waiting): (Vec<&RunInstance>, Vec<&RunInstance>) =
198 runs.iter().partition(|run| run.scheduled_at() <= current_time);
199
200 let mut promoted = Vec::with_capacity(ready_for_promotion.len());
202 for run in ready_for_promotion {
203 let mut ready_run = run.clone();
204 ready_run.promote_to_ready()?;
205 promoted.push(ready_run);
206 }
207
208 let remaining_scheduled: Vec<RunInstance> = still_waiting.into_iter().cloned().collect();
210
211 Ok(PromotionResult { promoted, remaining_scheduled })
212}
213
214#[cfg(test)]
215mod tests {
216 use actionqueue_core::ids::TaskId;
217 use actionqueue_core::mutation::AppliedMutation;
218 use actionqueue_core::run::state::RunState;
219
220 use super::*;
221
222 #[derive(Debug, Default)]
223 struct MockAuthority {
224 submitted: Vec<MutationCommand>,
225 }
226
227 impl MutationAuthority for MockAuthority {
228 type Error = &'static str;
229
230 fn submit_command(
231 &mut self,
232 command: MutationCommand,
233 _durability: DurabilityPolicy,
234 ) -> Result<MutationOutcome, Self::Error> {
235 let (sequence, run_id) = match &command {
236 MutationCommand::RunStateTransition(details) => {
237 (details.sequence(), details.run_id())
238 }
239 MutationCommand::TaskCreate(_)
240 | MutationCommand::RunCreate(_)
241 | MutationCommand::AttemptStart(_)
242 | MutationCommand::AttemptFinish(_)
243 | MutationCommand::LeaseAcquire(_)
244 | MutationCommand::LeaseHeartbeat(_)
245 | MutationCommand::LeaseExpire(_)
246 | MutationCommand::LeaseRelease(_)
247 | MutationCommand::EnginePause(_)
248 | MutationCommand::EngineResume(_)
249 | MutationCommand::TaskCancel(_)
250 | MutationCommand::DependencyDeclare(_)
251 | MutationCommand::RunSuspend(_)
252 | MutationCommand::RunResume(_)
253 | MutationCommand::BudgetAllocate(_)
254 | MutationCommand::BudgetConsume(_)
255 | MutationCommand::BudgetReplenish(_)
256 | MutationCommand::SubscriptionCreate(_)
257 | MutationCommand::SubscriptionCancel(_)
258 | MutationCommand::SubscriptionTrigger(_)
259 | MutationCommand::ActorRegister(_)
260 | MutationCommand::ActorDeregister(_)
261 | MutationCommand::ActorHeartbeat(_)
262 | MutationCommand::TenantCreate(_)
263 | MutationCommand::RoleAssign(_)
264 | MutationCommand::CapabilityGrant(_)
265 | MutationCommand::CapabilityRevoke(_)
266 | MutationCommand::LedgerAppend(_) => {
267 return Err("unexpected command in promotion authority test");
268 }
269 };
270 self.submitted.push(command.clone());
271 Ok(MutationOutcome::new(
272 sequence,
273 AppliedMutation::RunStateTransition {
274 run_id,
275 previous_state: RunState::Scheduled,
276 new_state: RunState::Ready,
277 },
278 ))
279 }
280 }
281
282 #[test]
283 fn promotes_runs_with_past_scheduled_at() {
284 let now = 1000;
285 let task_id = TaskId::new();
286
287 let scheduled_runs = vec![
288 RunInstance::new_scheduled(task_id, 900, now).expect("valid scheduled run"), RunInstance::new_scheduled(task_id, 950, now).expect("valid scheduled run"), ];
291
292 let scheduled_index = ScheduledIndex::from_runs(scheduled_runs);
293
294 let result = promote_scheduled_to_ready(&scheduled_index, 1000)
295 .expect("promotion should succeed for valid scheduled runs");
296
297 assert_eq!(result.promoted().len(), 2);
298 assert!(result.promoted().iter().all(|run| run.state() == RunState::Ready));
299 assert!(result.remaining_scheduled().is_empty());
300 }
301
302 #[test]
303 fn does_not_promote_runs_with_future_scheduled_at() {
304 let now = 1000;
305 let task_id = TaskId::new();
306
307 let scheduled_runs = vec![
308 RunInstance::new_scheduled(task_id, 1100, now).expect("valid scheduled run"), RunInstance::new_scheduled(task_id, 1200, now).expect("valid scheduled run"), ];
311
312 let scheduled_index = ScheduledIndex::from_runs(scheduled_runs);
313
314 let result = promote_scheduled_to_ready(&scheduled_index, 1000)
315 .expect("promotion should succeed for valid scheduled runs");
316
317 assert!(result.promoted().is_empty());
318 assert_eq!(result.remaining_scheduled().len(), 2);
319 assert!(result.remaining_scheduled().iter().all(|run| run.state() == RunState::Scheduled));
320 }
321
322 #[test]
323 fn promotes_runs_with_equal_scheduled_at() {
324 let now = 1000;
325 let task_id = TaskId::new();
326
327 let scheduled_runs = vec![
328 RunInstance::new_scheduled(task_id, 1000, now).expect("valid scheduled run"), ];
330
331 let scheduled_index = ScheduledIndex::from_runs(scheduled_runs);
332
333 let result = promote_scheduled_to_ready(&scheduled_index, 1000)
334 .expect("promotion should succeed for valid scheduled runs");
335
336 assert_eq!(result.promoted().len(), 1);
337 assert_eq!(result.promoted()[0].state(), RunState::Ready);
338 }
339
340 #[test]
341 fn mixed_promotion_of_past_and_future_scheduled_runs() {
342 let now = 1000;
343 let task_id = TaskId::new();
344
345 let scheduled_runs = vec![
346 RunInstance::new_scheduled(task_id, 900, now).expect("valid scheduled run"), RunInstance::new_scheduled(task_id, 1100, now).expect("valid scheduled run"), RunInstance::new_scheduled(task_id, 950, now).expect("valid scheduled run"), RunInstance::new_scheduled(task_id, 1050, now).expect("valid scheduled run"), ];
351
352 let scheduled_index = ScheduledIndex::from_runs(scheduled_runs);
353
354 let result = promote_scheduled_to_ready(&scheduled_index, 1000)
355 .expect("promotion should succeed for valid scheduled runs");
356
357 assert_eq!(result.promoted().len(), 2);
358 assert_eq!(result.remaining_scheduled().len(), 2);
359
360 assert!(result.promoted().iter().all(|run| run.scheduled_at() <= 1000));
362
363 assert!(result.remaining_scheduled().iter().all(|run| run.scheduled_at() > 1000));
365 }
366
367 #[test]
368 fn preserves_run_data_during_promotion() {
369 let now = 1000;
370 let task_id = TaskId::new();
371
372 let scheduled_runs =
373 vec![RunInstance::new_scheduled(task_id, 900, now).expect("valid scheduled run")];
374
375 let scheduled_index = ScheduledIndex::from_runs(scheduled_runs.clone());
376
377 let result = promote_scheduled_to_ready(&scheduled_index, 1000)
378 .expect("promotion should succeed for valid scheduled runs");
379
380 assert_eq!(result.promoted().len(), 1);
381
382 let promoted_run = &result.promoted()[0];
383 let original_run = &scheduled_runs[0];
384
385 assert_eq!(promoted_run.id(), original_run.id());
387 assert_eq!(promoted_run.task_id(), original_run.task_id());
388 assert_eq!(promoted_run.current_attempt_id(), original_run.current_attempt_id());
389 assert_eq!(promoted_run.attempt_count(), original_run.attempt_count());
390 assert_eq!(promoted_run.created_at(), original_run.created_at());
391 assert_eq!(promoted_run.scheduled_at(), original_run.scheduled_at());
392 assert_eq!(promoted_run.state(), RunState::Ready);
393 }
394
395 #[test]
396 fn empty_index_returns_empty_results() {
397 let scheduled_index = ScheduledIndex::new();
398
399 let result = promote_scheduled_to_ready(&scheduled_index, 1000)
400 .expect("promotion should succeed for valid scheduled runs");
401
402 assert!(result.promoted().is_empty());
403 assert!(result.remaining_scheduled().is_empty());
404 }
405
406 #[test]
407 fn authority_promotion_emits_transition_commands_for_ready_runs() {
408 let now = 1_000;
409 let task_id = TaskId::new();
410 let scheduled_runs = vec![
411 RunInstance::new_scheduled(task_id, 900, now).expect("valid scheduled run"),
412 RunInstance::new_scheduled(task_id, 1_100, now).expect("valid scheduled run"),
413 ];
414 let scheduled_index = ScheduledIndex::from_runs(scheduled_runs);
415 let mut authority = MockAuthority::default();
416
417 let result = promote_scheduled_to_ready_via_authority(
418 &scheduled_index,
419 PromotionParams::new(now, 7, now, DurabilityPolicy::Immediate),
420 &mut authority,
421 )
422 .expect("authority promotion should succeed");
423
424 assert_eq!(result.outcomes().len(), 1);
425 assert_eq!(result.outcomes()[0].sequence(), 7);
426 assert_eq!(result.remaining_scheduled().len(), 1);
427 assert_eq!(authority.submitted.len(), 1);
428 assert!(matches!(
429 &authority.submitted[0],
430 MutationCommand::RunStateTransition(cmd)
431 if cmd.sequence() == 7
432 && cmd.previous_state() == RunState::Scheduled
433 && cmd.new_state() == RunState::Ready
434 ));
435 }
436
437 #[test]
438 fn authority_promotion_empty_scheduled_index_produces_no_outcomes() {
439 let scheduled_index = ScheduledIndex::from_runs(Vec::new());
440 let mut authority = MockAuthority::default();
441
442 let result = promote_scheduled_to_ready_via_authority(
443 &scheduled_index,
444 PromotionParams::new(1000, 1, 1000, DurabilityPolicy::Immediate),
445 &mut authority,
446 )
447 .expect("authority promotion of empty index should succeed");
448
449 assert_eq!(result.outcomes().len(), 0);
450 assert_eq!(result.remaining_scheduled().len(), 0);
451 assert!(authority.submitted.is_empty());
452 }
453
454 #[test]
455 fn authority_promotion_preserves_future_runs_in_remaining() {
456 let now = 100;
457 let task_id = TaskId::new();
458
459 let scheduled_runs = vec![
460 RunInstance::new_scheduled(task_id, 100, now).expect("valid scheduled run"), RunInstance::new_scheduled(task_id, u64::MAX, now).expect("valid scheduled run"), ];
463 let scheduled_index = ScheduledIndex::from_runs(scheduled_runs);
464 let mut authority = MockAuthority::default();
465
466 let result = promote_scheduled_to_ready_via_authority(
467 &scheduled_index,
468 PromotionParams::new(200, 1, 200, DurabilityPolicy::Immediate),
469 &mut authority,
470 )
471 .expect("authority promotion should succeed");
472
473 assert_eq!(result.outcomes().len(), 1);
474 assert_eq!(result.remaining_scheduled().len(), 1);
475 assert_eq!(result.remaining_scheduled()[0].scheduled_at(), u64::MAX);
476 assert_eq!(authority.submitted.len(), 1);
477 }
478}