use std::{
error::Error as StdError,
fmt::Display,
num::NonZeroUsize,
sync::{Arc, OnceLock},
};
use flume::{Receiver, RecvError, Sender};
use super::tasks::message::{Interconnect, MixerMessage};
use crate::{constants::TIMESTEP_LENGTH, Config as DriverConfig};
mod config;
mod idle;
mod live;
mod stats;
mod task;
pub use config::*;
use idle::*;
pub use live::*;
pub use stats::*;
pub use task::*;
const RESCHEDULE_THRESHOLD: u64 = ((TIMESTEP_LENGTH.subsec_nanos() as u64) * 9) / 10;
const DEFAULT_MIXERS_PER_THREAD: NonZeroUsize = match NonZeroUsize::new(16) {
Some(v) => v,
None => unreachable!(),
};
pub fn get_default_scheduler() -> &'static Scheduler {
static DEFAULT_SCHEDULER: OnceLock<Scheduler> = OnceLock::new();
DEFAULT_SCHEDULER.get_or_init(Scheduler::default)
}
#[derive(Clone, Debug)]
pub struct Scheduler {
inner: Arc<InnerScheduler>,
}
#[derive(Clone, Debug)]
struct InnerScheduler {
tx: Sender<SchedulerMessage>,
stats: Arc<StatBlock>,
}
impl Scheduler {
#[must_use]
pub fn new(config: Config) -> Self {
let (core, tx) = Idle::new(config);
let stats = core.stats.clone();
core.spawn();
let inner = Arc::new(InnerScheduler { tx, stats });
Self { inner }
}
pub(crate) fn new_mixer(
&self,
config: &DriverConfig,
ic: Interconnect,
rx: Receiver<MixerMessage>,
) {
self.inner
.tx
.send(SchedulerMessage::NewMixer(rx, ic, config.clone()))
.unwrap();
}
#[must_use]
pub fn total_tasks(&self) -> u64 {
self.inner.stats.total_mixers()
}
#[must_use]
pub fn live_tasks(&self) -> u64 {
self.inner.stats.live_mixers()
}
#[must_use]
pub fn worker_threads(&self) -> u64 {
self.inner.stats.worker_threads()
}
pub async fn worker_thread_stats(&self) -> Result<Vec<Arc<LiveStatBlock>>, Error> {
let (tx, rx) = flume::bounded(1);
_ = self.inner.tx.send(SchedulerMessage::GetStats(tx));
rx.recv_async().await.map_err(Error::from)
}
pub fn worker_thread_stats_blocking(&self) -> Result<Vec<Arc<LiveStatBlock>>, Error> {
let (tx, rx) = flume::bounded(1);
_ = self.inner.tx.send(SchedulerMessage::GetStats(tx));
rx.recv().map_err(Error::from)
}
}
impl Drop for InnerScheduler {
fn drop(&mut self) {
_ = self.tx.send(SchedulerMessage::Kill);
}
}
impl Default for Scheduler {
fn default() -> Self {
Scheduler::new(Config::default())
}
}
pub enum SchedulerMessage {
NewMixer(Receiver<MixerMessage>, Interconnect, DriverConfig),
Do(TaskId, MixerMessage),
Demote(TaskId, ParkedMixer),
Overspill(WorkerId, TaskId, ParkedMixer),
GetStats(Sender<Vec<Arc<LiveStatBlock>>>),
Kill,
}
#[non_exhaustive]
#[derive(Debug)]
pub enum Error {
Disconnected,
}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Disconnected => f.write_str("the scheduler terminated mid-request"),
}
}
}
impl StdError for Error {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
None
}
}
impl From<RecvError> for Error {
fn from(_: RecvError) -> Self {
Self::Disconnected
}
}