scheduler/scheduler/
control.rs1use crate::error::SchedulerError;
2use std::collections::HashSet;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::Mutex;
7use tokio::sync::watch;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub(crate) enum SchedulerMode {
11 Running,
12 Paused,
13}
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub(crate) enum StopSignal {
17 Cancel,
18 Shutdown,
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub(crate) enum CommandDisposition {
23 Apply,
24 ObserveOnly { changed: bool },
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub(crate) struct ControlSignal {
29 pub(crate) desired_mode: SchedulerMode,
30 pub(crate) mode_command_seq: u64,
31 pub(crate) command_disposition: CommandDisposition,
32 pub(crate) stop_signal: Option<StopSignal>,
33}
34
35impl ControlSignal {
36 pub(crate) fn running() -> Self {
37 Self {
38 desired_mode: SchedulerMode::Running,
39 mode_command_seq: 0,
40 command_disposition: CommandDisposition::Apply,
41 stop_signal: None,
42 }
43 }
44}
45
46pub(crate) trait PauseController: Send + Sync + 'static {
47 fn pause<'a>(
48 &'a self,
49 job_id: &'a str,
50 ) -> Pin<Box<dyn Future<Output = Result<bool, SchedulerError>> + Send + 'a>>;
51
52 fn resume<'a>(
53 &'a self,
54 job_id: &'a str,
55 ) -> Pin<Box<dyn Future<Output = Result<bool, SchedulerError>> + Send + 'a>>;
56}
57
58#[derive(Clone)]
59pub struct SchedulerHandle {
60 control: watch::Sender<ControlSignal>,
61 pause_controller: Option<Arc<dyn PauseController>>,
62 active_job_ids: Arc<Mutex<HashSet<String>>>,
63}
64
65impl SchedulerHandle {
66 pub(crate) fn new(
67 control: watch::Sender<ControlSignal>,
68 pause_controller: Option<Arc<dyn PauseController>>,
69 active_job_ids: Arc<Mutex<HashSet<String>>>,
70 ) -> Self {
71 Self {
72 control,
73 pause_controller,
74 active_job_ids,
75 }
76 }
77
78 pub fn cancel(&self) {
79 self.send_stop_signal(StopSignal::Cancel);
80 }
81
82 pub fn shutdown(&self) {
83 self.send_stop_signal(StopSignal::Shutdown);
84 }
85
86 pub async fn pause(&self) -> Result<(), SchedulerError> {
87 if let Some(controller) = &self.pause_controller {
88 match self.single_active_job_id()? {
89 Some(job_id) => {
90 let changed = controller.pause(&job_id).await?;
91 self.send_mode_command(
92 SchedulerMode::Paused,
93 CommandDisposition::ObserveOnly { changed },
94 );
95 }
96 None => {
97 self.send_mode_command(SchedulerMode::Paused, CommandDisposition::Apply);
98 }
99 }
100 } else {
101 self.send_mode_command(SchedulerMode::Paused, CommandDisposition::Apply);
102 }
103 Ok(())
104 }
105
106 pub async fn resume(&self) -> Result<(), SchedulerError> {
107 if let Some(controller) = &self.pause_controller {
108 match self.single_active_job_id()? {
109 Some(job_id) => {
110 let changed = controller.resume(&job_id).await?;
111 self.send_mode_command(
112 SchedulerMode::Running,
113 CommandDisposition::ObserveOnly { changed },
114 );
115 }
116 None => {
117 self.send_mode_command(SchedulerMode::Running, CommandDisposition::Apply);
118 }
119 }
120 } else {
121 self.send_mode_command(SchedulerMode::Running, CommandDisposition::Apply);
122 }
123 Ok(())
124 }
125
126 fn send_mode_command(
127 &self,
128 desired_mode: SchedulerMode,
129 command_disposition: CommandDisposition,
130 ) {
131 let mut next = *self.control.borrow();
132 next.desired_mode = desired_mode;
133 next.command_disposition = command_disposition;
134 next.mode_command_seq = next.mode_command_seq.saturating_add(1);
135 let _ = self.control.send(next);
136 }
137
138 fn send_stop_signal(&self, stop_signal: StopSignal) {
139 let mut next = *self.control.borrow();
140 next.stop_signal = Some(stop_signal);
141 let _ = self.control.send(next);
142 }
143
144 fn single_active_job_id(&self) -> Result<Option<String>, SchedulerError> {
145 let active_job_ids = self.active_job_ids.lock().unwrap();
146 match active_job_ids.len() {
147 0 => Ok(None),
148 1 => Ok(active_job_ids.iter().next().cloned()),
149 _ => Err(SchedulerError::invalid_job_with_kind(
150 crate::InvalidJobKind::Other,
151 "pause/resume requires exactly one active job for coordinated schedulers",
152 )),
153 }
154 }
155}