use crate::error::SchedulerError;
use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::watch;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SchedulerMode {
Running,
Paused,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum StopSignal {
Cancel,
Shutdown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum CommandDisposition {
Apply,
ObserveOnly { changed: bool },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct ControlSignal {
pub(crate) desired_mode: SchedulerMode,
pub(crate) mode_command_seq: u64,
pub(crate) command_disposition: CommandDisposition,
pub(crate) stop_signal: Option<StopSignal>,
}
impl ControlSignal {
pub(crate) fn running() -> Self {
Self {
desired_mode: SchedulerMode::Running,
mode_command_seq: 0,
command_disposition: CommandDisposition::Apply,
stop_signal: None,
}
}
}
pub(crate) trait PauseController: Send + Sync + 'static {
fn pause<'a>(
&'a self,
job_id: &'a str,
) -> Pin<Box<dyn Future<Output = Result<bool, SchedulerError>> + Send + 'a>>;
fn resume<'a>(
&'a self,
job_id: &'a str,
) -> Pin<Box<dyn Future<Output = Result<bool, SchedulerError>> + Send + 'a>>;
}
#[derive(Clone)]
pub struct SchedulerHandle {
control: watch::Sender<ControlSignal>,
pause_controller: Option<Arc<dyn PauseController>>,
active_job_ids: Arc<Mutex<HashSet<String>>>,
}
impl SchedulerHandle {
pub(crate) fn new(
control: watch::Sender<ControlSignal>,
pause_controller: Option<Arc<dyn PauseController>>,
active_job_ids: Arc<Mutex<HashSet<String>>>,
) -> Self {
Self {
control,
pause_controller,
active_job_ids,
}
}
pub fn cancel(&self) {
self.send_stop_signal(StopSignal::Cancel);
}
pub fn shutdown(&self) {
self.send_stop_signal(StopSignal::Shutdown);
}
pub async fn pause(&self) -> Result<(), SchedulerError> {
if let Some(controller) = &self.pause_controller {
match self.single_active_job_id()? {
Some(job_id) => {
let changed = controller.pause(&job_id).await?;
self.send_mode_command(
SchedulerMode::Paused,
CommandDisposition::ObserveOnly { changed },
);
}
None => {
self.send_mode_command(SchedulerMode::Paused, CommandDisposition::Apply);
}
}
} else {
self.send_mode_command(SchedulerMode::Paused, CommandDisposition::Apply);
}
Ok(())
}
pub async fn resume(&self) -> Result<(), SchedulerError> {
if let Some(controller) = &self.pause_controller {
match self.single_active_job_id()? {
Some(job_id) => {
let changed = controller.resume(&job_id).await?;
self.send_mode_command(
SchedulerMode::Running,
CommandDisposition::ObserveOnly { changed },
);
}
None => {
self.send_mode_command(SchedulerMode::Running, CommandDisposition::Apply);
}
}
} else {
self.send_mode_command(SchedulerMode::Running, CommandDisposition::Apply);
}
Ok(())
}
fn send_mode_command(
&self,
desired_mode: SchedulerMode,
command_disposition: CommandDisposition,
) {
let mut next = *self.control.borrow();
next.desired_mode = desired_mode;
next.command_disposition = command_disposition;
next.mode_command_seq = next.mode_command_seq.saturating_add(1);
let _ = self.control.send(next);
}
fn send_stop_signal(&self, stop_signal: StopSignal) {
let mut next = *self.control.borrow();
next.stop_signal = Some(stop_signal);
let _ = self.control.send(next);
}
fn single_active_job_id(&self) -> Result<Option<String>, SchedulerError> {
let active_job_ids = self.active_job_ids.lock().unwrap();
match active_job_ids.len() {
0 => Ok(None),
1 => Ok(active_job_ids.iter().next().cloned()),
_ => Err(SchedulerError::invalid_job_with_kind(
crate::InvalidJobKind::Other,
"pause/resume requires exactly one active job for coordinated schedulers",
)),
}
}
}