use std::error::Error;
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::ports::ReplyReader;
use crate::simulation::queue_items::{Event, EventId, EventKey, Query, QueryId, QueueItem};
use crate::time::{AtomicTimeReader, ClockReader, Deadline, MonotonicTime};
use crate::util::priority_queue::PriorityQueue;
#[cfg(all(test, not(nexosim_loom)))]
use crate::{time::AtomicTime, time::TearableAtomicTime, util::sync_cell::SyncCell};
use super::GLOBAL_ORIGIN_ID;
#[derive(Clone, Debug)]
pub struct Scheduler(GlobalScheduler);
impl Scheduler {
pub(crate) fn new(
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTimeReader,
is_halted: Arc<AtomicBool>,
) -> Self {
Self(GlobalScheduler::new(scheduler_queue, time, is_halted))
}
#[cfg(all(test, not(nexosim_loom)))]
#[allow(dead_code)]
pub(crate) fn dummy() -> Self {
let time = AtomicTime::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let scheduler_queue = Arc::new(Mutex::new(SchedulerQueue::new()));
let is_halted = Arc::new(AtomicBool::default());
Self(GlobalScheduler::new(scheduler_queue, time, is_halted))
}
pub fn time(&self) -> MonotonicTime {
self.0.time()
}
#[cfg(feature = "server")]
pub(crate) fn schedule(
&self,
deadline: impl Deadline,
event: Event,
) -> Result<(), SchedulingError> {
self.0.schedule_from(deadline, event, GLOBAL_ORIGIN_ID)
}
pub fn schedule_event<T>(
&self,
deadline: impl Deadline,
event_id: &EventId<T>,
arg: T,
) -> Result<(), SchedulingError>
where
T: Send + Clone + 'static,
{
self.0
.schedule_event_from(deadline, event_id, arg, GLOBAL_ORIGIN_ID)
}
pub fn schedule_keyed_event<T>(
&self,
deadline: impl Deadline,
event_id: &EventId<T>,
arg: T,
) -> Result<EventKey, SchedulingError>
where
T: Send + Clone + 'static,
{
self.0
.schedule_keyed_event_from(deadline, event_id, arg, GLOBAL_ORIGIN_ID)
}
pub fn schedule_periodic_event<T>(
&self,
deadline: impl Deadline,
period: Duration,
event_id: &EventId<T>,
arg: T,
) -> Result<(), SchedulingError>
where
T: Send + Clone + 'static,
{
self.0
.schedule_periodic_event_from(deadline, period, event_id, arg, GLOBAL_ORIGIN_ID)
}
pub fn schedule_keyed_periodic_event<T>(
&self,
deadline: impl Deadline,
period: Duration,
event_id: &EventId<T>,
arg: T,
) -> Result<EventKey, SchedulingError>
where
T: Send + Clone + 'static,
{
self.0
.schedule_keyed_periodic_event_from(deadline, period, event_id, arg, GLOBAL_ORIGIN_ID)
}
pub fn schedule_query<T, R>(
&self,
deadline: impl Deadline,
query_id: &QueryId<T, R>,
arg: T,
) -> Result<ReplyReader<R>, SchedulingError>
where
T: Send + Clone + 'static,
R: Send + 'static,
{
self.0
.schedule_query_from(deadline, query_id, arg, GLOBAL_ORIGIN_ID)
}
pub fn halt(&self) {
self.0.halt()
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[non_exhaustive]
pub enum SchedulingError {
InvalidScheduledTime,
NullRepetitionPeriod,
}
impl fmt::Display for SchedulingError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidScheduledTime => write!(
fmt,
"the scheduled time should be in the future of the current simulation time"
),
Self::NullRepetitionPeriod => write!(fmt, "the repetition period cannot be zero"),
}
}
}
impl Error for SchedulingError {}
pub(crate) type SchedulerQueue = PriorityQueue<SchedulerKey, QueueItem>;
pub(crate) type SchedulerKey = (MonotonicTime, usize);
#[derive(Clone)]
pub(crate) struct GlobalScheduler {
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTimeReader,
is_halted: Arc<AtomicBool>,
}
impl GlobalScheduler {
pub(crate) fn new(
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: AtomicTimeReader,
is_halted: Arc<AtomicBool>,
) -> Self {
Self {
scheduler_queue,
time,
is_halted,
}
}
pub(crate) fn time(&self) -> MonotonicTime {
self.time.read()
}
pub(crate) fn clock_reader(&self) -> ClockReader {
ClockReader::from_atomic_time_reader(&self.time)
}
#[cfg(feature = "server")]
pub(crate) fn schedule_from(
&self,
deadline: impl Deadline,
event: Event,
origin_id: usize,
) -> Result<(), SchedulingError> {
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
scheduler_queue.insert((time, origin_id), QueueItem::Event(event));
Ok(())
}
pub(crate) fn schedule_event_from<T>(
&self,
deadline: impl Deadline,
event_id: &EventId<T>,
arg: T,
origin_id: usize,
) -> Result<(), SchedulingError>
where
T: Send + Clone + 'static,
{
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
let event = Event::new(event_id, arg);
scheduler_queue.insert((time, origin_id), QueueItem::Event(event));
Ok(())
}
pub(crate) fn schedule_keyed_event_from<T>(
&self,
deadline: impl Deadline,
event_id: &EventId<T>,
arg: T,
origin_id: usize,
) -> Result<EventKey, SchedulingError>
where
T: Send + Clone + 'static,
{
let event_key = EventKey::new();
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
let event = Event::new(event_id, arg).with_key(event_key.clone());
scheduler_queue.insert((time, origin_id), QueueItem::Event(event));
Ok(event_key)
}
pub(crate) fn schedule_periodic_event_from<T>(
&self,
deadline: impl Deadline,
period: Duration,
event_id: &EventId<T>,
arg: T,
origin_id: usize,
) -> Result<(), SchedulingError>
where
T: Send + Clone + 'static,
{
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
let event = Event::new(event_id, arg).with_period(period);
scheduler_queue.insert((time, origin_id), QueueItem::Event(event));
Ok(())
}
pub(crate) fn schedule_keyed_periodic_event_from<T>(
&self,
deadline: impl Deadline,
period: Duration,
event_id: &EventId<T>,
arg: T,
origin_id: usize,
) -> Result<EventKey, SchedulingError>
where
T: Send + Clone + 'static,
{
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
let event_key = EventKey::new();
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
let event = Event::new(event_id, arg)
.with_period(period)
.with_key(event_key.clone());
scheduler_queue.insert((time, origin_id), QueueItem::Event(event));
Ok(event_key)
}
pub(crate) fn schedule_query_from<T, R>(
&self,
deadline: impl Deadline,
query_id: &QueryId<T, R>,
arg: T,
origin_id: usize,
) -> Result<ReplyReader<R>, SchedulingError>
where
T: Send + Clone + 'static,
R: Send + 'static,
{
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
let (query, rx) = Query::new(query_id, arg);
scheduler_queue.insert((time, origin_id), QueueItem::Query(query));
Ok(rx)
}
pub(crate) fn halt(&self) {
self.is_halted.store(true, Ordering::Relaxed);
}
}
impl fmt::Debug for GlobalScheduler {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("GlobalScheduler")
.field("time", &self.time())
.field("is_halted", &self.is_halted.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
#[cfg(all(test, not(nexosim_loom)))]
impl GlobalScheduler {
pub(crate) fn new_dummy() -> Self {
let dummy_priority_queue = Arc::new(Mutex::new(SchedulerQueue::new()));
let dummy_time = SyncCell::new(TearableAtomicTime::new(MonotonicTime::EPOCH)).reader();
let dummy_running = Arc::new(AtomicBool::new(false));
GlobalScheduler::new(dummy_priority_queue, dummy_time, dummy_running)
}
}