Skip to main content

actionqueue_engine/scheduler/
promotion.rs

1//! Scheduled-to-ready promotion logic.
2//!
3//! This module provides functionality for promoting runs from the Scheduled
4//! state to the Ready state when their scheduled_at time has passed.
5
6use actionqueue_core::mutation::{
7    DurabilityPolicy, MutationAuthority, MutationCommand, MutationOutcome,
8    RunStateTransitionCommand,
9};
10use actionqueue_core::run::run_instance::{RunInstance, RunInstanceError};
11use actionqueue_core::run::state::RunState;
12
13use crate::index::scheduled::ScheduledIndex;
14
15/// Result of promoting scheduled runs to ready.
16///
17/// This structure contains the runs that were promoted from Scheduled to Ready
18/// and the remaining runs that are still in Scheduled state ( waiting for their
19/// scheduled_at time to pass).
20#[derive(Debug, Clone, PartialEq, Eq)]
21#[must_use]
22pub struct PromotionResult {
23    /// Runs that were promoted from Scheduled to Ready.
24    promoted: Vec<RunInstance>,
25    /// Runs that remain in Scheduled state.
26    remaining_scheduled: Vec<RunInstance>,
27}
28
29impl PromotionResult {
30    /// Returns the runs that were promoted from Scheduled to Ready.
31    pub fn promoted(&self) -> &[RunInstance] {
32        &self.promoted
33    }
34
35    /// Returns the runs that remain in Scheduled state.
36    pub fn remaining_scheduled(&self) -> &[RunInstance] {
37        &self.remaining_scheduled
38    }
39}
40
41/// Result of authority-mediated scheduled-to-ready promotion.
42#[derive(Debug, Clone, PartialEq, Eq)]
43#[must_use]
44pub struct AuthorityPromotionResult {
45    /// Mutation outcomes returned by the storage-owned authority.
46    outcomes: Vec<MutationOutcome>,
47    /// Runs that remain in Scheduled state.
48    remaining_scheduled: Vec<RunInstance>,
49}
50
51impl AuthorityPromotionResult {
52    /// Returns the mutation outcomes from promotion.
53    pub fn outcomes(&self) -> &[MutationOutcome] {
54        &self.outcomes
55    }
56
57    /// Returns the runs that remain in Scheduled state.
58    pub fn remaining_scheduled(&self) -> &[RunInstance] {
59        &self.remaining_scheduled
60    }
61}
62
63/// Error returned when authority-mediated promotion fails.
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub enum AuthorityPromotionError<AuthorityError> {
66    /// Promotion command sequencing overflowed `u64` while preparing commands.
67    SequenceOverflow,
68    /// Authority rejected or failed processing a specific run promotion command.
69    Authority {
70        /// Run whose promotion command failed.
71        run_id: actionqueue_core::ids::RunId,
72        /// Underlying authority error.
73        source: AuthorityError,
74    },
75}
76
77impl<E: std::fmt::Display> std::fmt::Display for AuthorityPromotionError<E> {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        match self {
80            AuthorityPromotionError::SequenceOverflow => {
81                write!(f, "promotion command sequencing overflowed u64")
82            }
83            AuthorityPromotionError::Authority { run_id, source } => {
84                write!(f, "authority error for run {run_id}: {source}")
85            }
86        }
87    }
88}
89
90impl<E: std::fmt::Debug + std::fmt::Display> std::error::Error for AuthorityPromotionError<E> {}
91
92/// Parameters for authority-mediated promotion that group sequencing and timing context.
93pub struct PromotionParams {
94    /// Current time used to determine which runs are eligible for promotion.
95    current_time: u64,
96    /// First WAL sequence to assign to promotion commands.
97    first_sequence: u64,
98    /// Timestamp carried into durable event payloads.
99    event_timestamp: u64,
100    /// Durability behavior requested for promotion commands.
101    durability: DurabilityPolicy,
102}
103
104impl PromotionParams {
105    /// Creates new promotion parameters.
106    pub fn new(
107        current_time: u64,
108        first_sequence: u64,
109        event_timestamp: u64,
110        durability: DurabilityPolicy,
111    ) -> Self {
112        Self { current_time, first_sequence, event_timestamp, durability }
113    }
114
115    /// Returns the current time used for promotion eligibility.
116    pub fn current_time(&self) -> u64 {
117        self.current_time
118    }
119
120    /// Returns the first WAL sequence to assign.
121    pub fn first_sequence(&self) -> u64 {
122        self.first_sequence
123    }
124
125    /// Returns the event timestamp.
126    pub fn event_timestamp(&self) -> u64 {
127        self.event_timestamp
128    }
129
130    /// Returns the durability policy.
131    pub fn durability(&self) -> DurabilityPolicy {
132        self.durability
133    }
134}
135
136/// Promotes scheduled runs through the engine-storage mutation authority boundary.
137///
138/// Unlike [`promote_scheduled_to_ready`], this function is durability-aware and
139/// emits semantic commands through a [`MutationAuthority`] implementation.
140pub fn promote_scheduled_to_ready_via_authority<A: MutationAuthority>(
141    scheduled: &ScheduledIndex,
142    params: PromotionParams,
143    authority: &mut A,
144) -> Result<AuthorityPromotionResult, AuthorityPromotionError<A::Error>> {
145    let PromotionParams { current_time, first_sequence, event_timestamp, durability } = params;
146    let runs = scheduled.runs();
147    let (ready_for_promotion, still_waiting): (Vec<&RunInstance>, Vec<&RunInstance>) =
148        runs.iter().partition(|run| run.scheduled_at() <= current_time);
149
150    let mut outcomes = Vec::with_capacity(ready_for_promotion.len());
151    for (index, run) in ready_for_promotion.into_iter().enumerate() {
152        let offset = u64::try_from(index).map_err(|_| AuthorityPromotionError::SequenceOverflow)?;
153        let sequence =
154            first_sequence.checked_add(offset).ok_or(AuthorityPromotionError::SequenceOverflow)?;
155
156        let command = MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
157            sequence,
158            run.id(),
159            RunState::Scheduled,
160            RunState::Ready,
161            event_timestamp,
162        ));
163
164        let outcome = authority
165            .submit_command(command, durability)
166            .map_err(|source| AuthorityPromotionError::Authority { run_id: run.id(), source })?;
167        outcomes.push(outcome);
168    }
169
170    let remaining_scheduled: Vec<RunInstance> = still_waiting.into_iter().cloned().collect();
171    Ok(AuthorityPromotionResult { outcomes, remaining_scheduled })
172}
173
174/// Promotes runs from Scheduled to Ready based on the current time.
175///
176/// This function takes the scheduled index and the current time, then moves
177/// runs that are ready for promotion (scheduled_at <= current_time) to the
178/// Ready state.
179///
180/// # Arguments
181///
182/// * `scheduled` - The scheduled index containing runs in Scheduled state
183/// * `current_time` - The current time according to the scheduler clock
184///
185/// # Returns
186///
187/// A PromotionResult containing:
188/// - `promoted`: Runs that transitioned from Scheduled to Ready
189/// - `remaining_scheduled`: Runs that are still waiting for their scheduled_at time
190pub fn promote_scheduled_to_ready(
191    scheduled: &ScheduledIndex,
192    current_time: u64,
193) -> Result<PromotionResult, RunInstanceError> {
194    let runs = scheduled.runs();
195
196    // Partition runs into those ready for promotion and those still waiting
197    let (ready_for_promotion, still_waiting): (Vec<&RunInstance>, Vec<&RunInstance>) =
198        runs.iter().partition(|run| run.scheduled_at() <= current_time);
199
200    // Convert ready runs to Ready state
201    let mut promoted = Vec::with_capacity(ready_for_promotion.len());
202    for run in ready_for_promotion {
203        let mut ready_run = run.clone();
204        ready_run.promote_to_ready()?;
205        promoted.push(ready_run);
206    }
207
208    // Convert still waiting runs back to Scheduled state (they should already be Scheduled)
209    let remaining_scheduled: Vec<RunInstance> = still_waiting.into_iter().cloned().collect();
210
211    Ok(PromotionResult { promoted, remaining_scheduled })
212}
213
214#[cfg(test)]
215mod tests {
216    use actionqueue_core::ids::TaskId;
217    use actionqueue_core::mutation::AppliedMutation;
218    use actionqueue_core::run::state::RunState;
219
220    use super::*;
221
222    #[derive(Debug, Default)]
223    struct MockAuthority {
224        submitted: Vec<MutationCommand>,
225    }
226
227    impl MutationAuthority for MockAuthority {
228        type Error = &'static str;
229
230        fn submit_command(
231            &mut self,
232            command: MutationCommand,
233            _durability: DurabilityPolicy,
234        ) -> Result<MutationOutcome, Self::Error> {
235            let (sequence, run_id) = match &command {
236                MutationCommand::RunStateTransition(details) => {
237                    (details.sequence(), details.run_id())
238                }
239                MutationCommand::TaskCreate(_)
240                | MutationCommand::RunCreate(_)
241                | MutationCommand::AttemptStart(_)
242                | MutationCommand::AttemptFinish(_)
243                | MutationCommand::LeaseAcquire(_)
244                | MutationCommand::LeaseHeartbeat(_)
245                | MutationCommand::LeaseExpire(_)
246                | MutationCommand::LeaseRelease(_)
247                | MutationCommand::EnginePause(_)
248                | MutationCommand::EngineResume(_)
249                | MutationCommand::TaskCancel(_)
250                | MutationCommand::DependencyDeclare(_)
251                | MutationCommand::RunSuspend(_)
252                | MutationCommand::RunResume(_)
253                | MutationCommand::BudgetAllocate(_)
254                | MutationCommand::BudgetConsume(_)
255                | MutationCommand::BudgetReplenish(_)
256                | MutationCommand::SubscriptionCreate(_)
257                | MutationCommand::SubscriptionCancel(_)
258                | MutationCommand::SubscriptionTrigger(_)
259                | MutationCommand::ActorRegister(_)
260                | MutationCommand::ActorDeregister(_)
261                | MutationCommand::ActorHeartbeat(_)
262                | MutationCommand::TenantCreate(_)
263                | MutationCommand::RoleAssign(_)
264                | MutationCommand::CapabilityGrant(_)
265                | MutationCommand::CapabilityRevoke(_)
266                | MutationCommand::LedgerAppend(_) => {
267                    return Err("unexpected command in promotion authority test");
268                }
269            };
270            self.submitted.push(command.clone());
271            Ok(MutationOutcome::new(
272                sequence,
273                AppliedMutation::RunStateTransition {
274                    run_id,
275                    previous_state: RunState::Scheduled,
276                    new_state: RunState::Ready,
277                },
278            ))
279        }
280    }
281
282    #[test]
283    fn promotes_runs_with_past_scheduled_at() {
284        let now = 1000;
285        let task_id = TaskId::new();
286
287        let scheduled_runs = vec![
288            RunInstance::new_scheduled(task_id, 900, now).expect("valid scheduled run"), /* past, should be promoted */
289            RunInstance::new_scheduled(task_id, 950, now).expect("valid scheduled run"), /* past, should be promoted */
290        ];
291
292        let scheduled_index = ScheduledIndex::from_runs(scheduled_runs);
293
294        let result = promote_scheduled_to_ready(&scheduled_index, 1000)
295            .expect("promotion should succeed for valid scheduled runs");
296
297        assert_eq!(result.promoted().len(), 2);
298        assert!(result.promoted().iter().all(|run| run.state() == RunState::Ready));
299        assert!(result.remaining_scheduled().is_empty());
300    }
301
302    #[test]
303    fn does_not_promote_runs_with_future_scheduled_at() {
304        let now = 1000;
305        let task_id = TaskId::new();
306
307        let scheduled_runs = vec![
308            RunInstance::new_scheduled(task_id, 1100, now).expect("valid scheduled run"), /* future, should not be promoted */
309            RunInstance::new_scheduled(task_id, 1200, now).expect("valid scheduled run"), /* future, should not be promoted */
310        ];
311
312        let scheduled_index = ScheduledIndex::from_runs(scheduled_runs);
313
314        let result = promote_scheduled_to_ready(&scheduled_index, 1000)
315            .expect("promotion should succeed for valid scheduled runs");
316
317        assert!(result.promoted().is_empty());
318        assert_eq!(result.remaining_scheduled().len(), 2);
319        assert!(result.remaining_scheduled().iter().all(|run| run.state() == RunState::Scheduled));
320    }
321
322    #[test]
323    fn promotes_runs_with_equal_scheduled_at() {
324        let now = 1000;
325        let task_id = TaskId::new();
326
327        let scheduled_runs = vec![
328            RunInstance::new_scheduled(task_id, 1000, now).expect("valid scheduled run"), /* equal, should be promoted */
329        ];
330
331        let scheduled_index = ScheduledIndex::from_runs(scheduled_runs);
332
333        let result = promote_scheduled_to_ready(&scheduled_index, 1000)
334            .expect("promotion should succeed for valid scheduled runs");
335
336        assert_eq!(result.promoted().len(), 1);
337        assert_eq!(result.promoted()[0].state(), RunState::Ready);
338    }
339
340    #[test]
341    fn mixed_promotion_of_past_and_future_scheduled_runs() {
342        let now = 1000;
343        let task_id = TaskId::new();
344
345        let scheduled_runs = vec![
346            RunInstance::new_scheduled(task_id, 900, now).expect("valid scheduled run"), /* past, should be promoted */
347            RunInstance::new_scheduled(task_id, 1100, now).expect("valid scheduled run"), /* future, should not be promoted */
348            RunInstance::new_scheduled(task_id, 950, now).expect("valid scheduled run"), /* past, should be promoted */
349            RunInstance::new_scheduled(task_id, 1050, now).expect("valid scheduled run"), /* future, should not be promoted */
350        ];
351
352        let scheduled_index = ScheduledIndex::from_runs(scheduled_runs);
353
354        let result = promote_scheduled_to_ready(&scheduled_index, 1000)
355            .expect("promotion should succeed for valid scheduled runs");
356
357        assert_eq!(result.promoted().len(), 2);
358        assert_eq!(result.remaining_scheduled().len(), 2);
359
360        // Check that promoted runs have past scheduled_at times
361        assert!(result.promoted().iter().all(|run| run.scheduled_at() <= 1000));
362
363        // Check that remaining scheduled runs have future scheduled_at times
364        assert!(result.remaining_scheduled().iter().all(|run| run.scheduled_at() > 1000));
365    }
366
367    #[test]
368    fn preserves_run_data_during_promotion() {
369        let now = 1000;
370        let task_id = TaskId::new();
371
372        let scheduled_runs =
373            vec![RunInstance::new_scheduled(task_id, 900, now).expect("valid scheduled run")];
374
375        let scheduled_index = ScheduledIndex::from_runs(scheduled_runs.clone());
376
377        let result = promote_scheduled_to_ready(&scheduled_index, 1000)
378            .expect("promotion should succeed for valid scheduled runs");
379
380        assert_eq!(result.promoted().len(), 1);
381
382        let promoted_run = &result.promoted()[0];
383        let original_run = &scheduled_runs[0];
384
385        // Verify all fields except state are preserved
386        assert_eq!(promoted_run.id(), original_run.id());
387        assert_eq!(promoted_run.task_id(), original_run.task_id());
388        assert_eq!(promoted_run.current_attempt_id(), original_run.current_attempt_id());
389        assert_eq!(promoted_run.attempt_count(), original_run.attempt_count());
390        assert_eq!(promoted_run.created_at(), original_run.created_at());
391        assert_eq!(promoted_run.scheduled_at(), original_run.scheduled_at());
392        assert_eq!(promoted_run.state(), RunState::Ready);
393    }
394
395    #[test]
396    fn empty_index_returns_empty_results() {
397        let scheduled_index = ScheduledIndex::new();
398
399        let result = promote_scheduled_to_ready(&scheduled_index, 1000)
400            .expect("promotion should succeed for valid scheduled runs");
401
402        assert!(result.promoted().is_empty());
403        assert!(result.remaining_scheduled().is_empty());
404    }
405
406    #[test]
407    fn authority_promotion_emits_transition_commands_for_ready_runs() {
408        let now = 1_000;
409        let task_id = TaskId::new();
410        let scheduled_runs = vec![
411            RunInstance::new_scheduled(task_id, 900, now).expect("valid scheduled run"),
412            RunInstance::new_scheduled(task_id, 1_100, now).expect("valid scheduled run"),
413        ];
414        let scheduled_index = ScheduledIndex::from_runs(scheduled_runs);
415        let mut authority = MockAuthority::default();
416
417        let result = promote_scheduled_to_ready_via_authority(
418            &scheduled_index,
419            PromotionParams::new(now, 7, now, DurabilityPolicy::Immediate),
420            &mut authority,
421        )
422        .expect("authority promotion should succeed");
423
424        assert_eq!(result.outcomes().len(), 1);
425        assert_eq!(result.outcomes()[0].sequence(), 7);
426        assert_eq!(result.remaining_scheduled().len(), 1);
427        assert_eq!(authority.submitted.len(), 1);
428        assert!(matches!(
429            &authority.submitted[0],
430            MutationCommand::RunStateTransition(cmd)
431                if cmd.sequence() == 7
432                && cmd.previous_state() == RunState::Scheduled
433                && cmd.new_state() == RunState::Ready
434        ));
435    }
436
437    #[test]
438    fn authority_promotion_empty_scheduled_index_produces_no_outcomes() {
439        let scheduled_index = ScheduledIndex::from_runs(Vec::new());
440        let mut authority = MockAuthority::default();
441
442        let result = promote_scheduled_to_ready_via_authority(
443            &scheduled_index,
444            PromotionParams::new(1000, 1, 1000, DurabilityPolicy::Immediate),
445            &mut authority,
446        )
447        .expect("authority promotion of empty index should succeed");
448
449        assert_eq!(result.outcomes().len(), 0);
450        assert_eq!(result.remaining_scheduled().len(), 0);
451        assert!(authority.submitted.is_empty());
452    }
453
454    #[test]
455    fn authority_promotion_preserves_future_runs_in_remaining() {
456        let now = 100;
457        let task_id = TaskId::new();
458
459        let scheduled_runs = vec![
460            RunInstance::new_scheduled(task_id, 100, now).expect("valid scheduled run"), /* past/equal, should be promoted */
461            RunInstance::new_scheduled(task_id, u64::MAX, now).expect("valid scheduled run"), /* far future, should remain */
462        ];
463        let scheduled_index = ScheduledIndex::from_runs(scheduled_runs);
464        let mut authority = MockAuthority::default();
465
466        let result = promote_scheduled_to_ready_via_authority(
467            &scheduled_index,
468            PromotionParams::new(200, 1, 200, DurabilityPolicy::Immediate),
469            &mut authority,
470        )
471        .expect("authority promotion should succeed");
472
473        assert_eq!(result.outcomes().len(), 1);
474        assert_eq!(result.remaining_scheduled().len(), 1);
475        assert_eq!(result.remaining_scheduled()[0].scheduled_at(), u64::MAX);
476        assert_eq!(authority.submitted.len(), 1);
477    }
478}