Skip to main content

scheduler/scheduler/
control.rs

1use crate::error::SchedulerError;
2use std::collections::HashSet;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::Mutex;
7use tokio::sync::watch;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub(crate) enum SchedulerMode {
11    Running,
12    Paused,
13}
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub(crate) enum StopSignal {
17    Cancel,
18    Shutdown,
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub(crate) enum CommandDisposition {
23    Apply,
24    ObserveOnly { changed: bool },
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub(crate) struct ControlSignal {
29    pub(crate) desired_mode: SchedulerMode,
30    pub(crate) mode_command_seq: u64,
31    pub(crate) command_disposition: CommandDisposition,
32    pub(crate) stop_signal: Option<StopSignal>,
33}
34
35impl ControlSignal {
36    pub(crate) fn running() -> Self {
37        Self {
38            desired_mode: SchedulerMode::Running,
39            mode_command_seq: 0,
40            command_disposition: CommandDisposition::Apply,
41            stop_signal: None,
42        }
43    }
44}
45
46pub(crate) trait PauseController: Send + Sync + 'static {
47    fn pause<'a>(
48        &'a self,
49        job_id: &'a str,
50    ) -> Pin<Box<dyn Future<Output = Result<bool, SchedulerError>> + Send + 'a>>;
51
52    fn resume<'a>(
53        &'a self,
54        job_id: &'a str,
55    ) -> Pin<Box<dyn Future<Output = Result<bool, SchedulerError>> + Send + 'a>>;
56}
57
58#[derive(Clone)]
59pub struct SchedulerHandle {
60    control: watch::Sender<ControlSignal>,
61    pause_controller: Option<Arc<dyn PauseController>>,
62    active_job_ids: Arc<Mutex<HashSet<String>>>,
63}
64
65impl SchedulerHandle {
66    pub(crate) fn new(
67        control: watch::Sender<ControlSignal>,
68        pause_controller: Option<Arc<dyn PauseController>>,
69        active_job_ids: Arc<Mutex<HashSet<String>>>,
70    ) -> Self {
71        Self {
72            control,
73            pause_controller,
74            active_job_ids,
75        }
76    }
77
78    pub fn cancel(&self) {
79        self.send_stop_signal(StopSignal::Cancel);
80    }
81
82    pub fn shutdown(&self) {
83        self.send_stop_signal(StopSignal::Shutdown);
84    }
85
86    pub async fn pause(&self) -> Result<(), SchedulerError> {
87        if let Some(controller) = &self.pause_controller {
88            match self.single_active_job_id()? {
89                Some(job_id) => {
90                    let changed = controller.pause(&job_id).await?;
91                    self.send_mode_command(
92                        SchedulerMode::Paused,
93                        CommandDisposition::ObserveOnly { changed },
94                    );
95                }
96                None => {
97                    self.send_mode_command(SchedulerMode::Paused, CommandDisposition::Apply);
98                }
99            }
100        } else {
101            self.send_mode_command(SchedulerMode::Paused, CommandDisposition::Apply);
102        }
103        Ok(())
104    }
105
106    pub async fn resume(&self) -> Result<(), SchedulerError> {
107        if let Some(controller) = &self.pause_controller {
108            match self.single_active_job_id()? {
109                Some(job_id) => {
110                    let changed = controller.resume(&job_id).await?;
111                    self.send_mode_command(
112                        SchedulerMode::Running,
113                        CommandDisposition::ObserveOnly { changed },
114                    );
115                }
116                None => {
117                    self.send_mode_command(SchedulerMode::Running, CommandDisposition::Apply);
118                }
119            }
120        } else {
121            self.send_mode_command(SchedulerMode::Running, CommandDisposition::Apply);
122        }
123        Ok(())
124    }
125
126    fn send_mode_command(
127        &self,
128        desired_mode: SchedulerMode,
129        command_disposition: CommandDisposition,
130    ) {
131        let mut next = *self.control.borrow();
132        next.desired_mode = desired_mode;
133        next.command_disposition = command_disposition;
134        next.mode_command_seq = next.mode_command_seq.saturating_add(1);
135        let _ = self.control.send(next);
136    }
137
138    fn send_stop_signal(&self, stop_signal: StopSignal) {
139        let mut next = *self.control.borrow();
140        next.stop_signal = Some(stop_signal);
141        let _ = self.control.send(next);
142    }
143
144    fn single_active_job_id(&self) -> Result<Option<String>, SchedulerError> {
145        let active_job_ids = self.active_job_ids.lock().unwrap();
146        match active_job_ids.len() {
147            0 => Ok(None),
148            1 => Ok(active_job_ids.iter().next().cloned()),
149            _ => Err(SchedulerError::invalid_job_with_kind(
150                crate::InvalidJobKind::Other,
151                "pause/resume requires exactly one active job for coordinated schedulers",
152            )),
153        }
154    }
155}