mod endpoints;
mod mailbox;
mod sim_init;
pub use endpoints::{EventSlot, EventStream};
pub use mailbox::{Address, Mailbox};
pub use sim_init::SimInit;
use std::error::Error;
use std::fmt;
use std::future::Future;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use recycle_box::{coerce_box, RecycleBox};
use crate::executor::Executor;
use crate::model::{InputFn, Model, ReplierFn};
use crate::time::{
self, Clock, Deadline, EventKey, MonotonicTime, NoClock, ScheduledEvent, SchedulerQueue,
SchedulingError, TearableAtomicTime,
};
use crate::util::futures::SeqFuture;
use crate::util::slot;
use crate::util::sync_cell::SyncCell;
pub struct Simulation {
executor: Executor,
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: SyncCell<TearableAtomicTime>,
clock: Box<dyn Clock>,
}
impl Simulation {
pub(crate) fn new(
executor: Executor,
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: SyncCell<TearableAtomicTime>,
) -> Self {
Self {
executor,
scheduler_queue,
time,
clock: Box::new(NoClock::new()),
}
}
pub(crate) fn with_clock(
executor: Executor,
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: SyncCell<TearableAtomicTime>,
clock: impl Clock + 'static,
) -> Self {
Self {
executor,
scheduler_queue,
time,
clock: Box::new(clock),
}
}
pub fn time(&self) -> MonotonicTime {
self.time.read()
}
pub fn step(&mut self) {
self.step_to_next_bounded(MonotonicTime::MAX);
}
pub fn step_by(&mut self, duration: Duration) {
let target_time = self.time.read() + duration;
self.step_until_unchecked(target_time);
}
pub fn step_until(&mut self, target_time: MonotonicTime) -> Result<(), SchedulingError> {
if self.time.read() >= target_time {
return Err(SchedulingError::InvalidScheduledTime);
}
self.step_until_unchecked(target_time);
Ok(())
}
pub fn schedule_event<M, F, T, S>(
&mut self,
deadline: impl Deadline,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<(), SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
time::schedule_event_at_unchecked(time, func, arg, address.into().0, &self.scheduler_queue);
Ok(())
}
pub fn schedule_keyed_event<M, F, T, S>(
&mut self,
deadline: impl Deadline,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<EventKey, SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
S: Send + 'static,
{
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
let event_key = time::schedule_keyed_event_at_unchecked(
time,
func,
arg,
address.into().0,
&self.scheduler_queue,
);
Ok(event_key)
}
pub fn schedule_periodic_event<M, F, T, S>(
&mut self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<(), SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
time::schedule_periodic_event_at_unchecked(
time,
period,
func,
arg,
address.into().0,
&self.scheduler_queue,
);
Ok(())
}
pub fn schedule_keyed_periodic_event<M, F, T, S>(
&mut self,
deadline: impl Deadline,
period: Duration,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<EventKey, SchedulingError>
where
M: Model,
F: for<'a> InputFn<'a, M, T, S> + Clone,
T: Send + Clone + 'static,
S: Send + 'static,
{
let now = self.time();
let time = deadline.into_time(now);
if now >= time {
return Err(SchedulingError::InvalidScheduledTime);
}
if period.is_zero() {
return Err(SchedulingError::NullRepetitionPeriod);
}
let event_key = time::schedule_periodic_keyed_event_at_unchecked(
time,
period,
func,
arg,
address.into().0,
&self.scheduler_queue,
);
Ok(event_key)
}
pub fn send_event<M, F, T, S>(&mut self, func: F, arg: T, address: impl Into<Address<M>>)
where
M: Model,
F: for<'a> InputFn<'a, M, T, S>,
T: Send + Clone + 'static,
{
let sender = address.into().0;
let fut = async move {
let _ = sender
.send(
move |model: &mut M,
scheduler,
recycle_box: RecycleBox<()>|
-> RecycleBox<dyn Future<Output = ()> + Send + '_> {
let fut = func.call(model, arg, scheduler);
coerce_box!(RecycleBox::recycle(recycle_box, fut))
},
)
.await;
};
self.executor.spawn_and_forget(fut);
self.executor.run();
}
pub fn send_query<M, F, T, R, S>(
&mut self,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<R, QueryError>
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S>,
T: Send + Clone + 'static,
R: Send + 'static,
{
let (reply_writer, mut reply_reader) = slot::slot();
let sender = address.into().0;
let fut = async move {
let _ = sender
.send(
move |model: &mut M,
scheduler,
recycle_box: RecycleBox<()>|
-> RecycleBox<dyn Future<Output = ()> + Send + '_> {
let fut = async move {
let reply = func.call(model, arg, scheduler).await;
let _ = reply_writer.write(reply);
};
coerce_box!(RecycleBox::recycle(recycle_box, fut))
},
)
.await;
};
self.executor.spawn_and_forget(fut);
self.executor.run();
reply_reader.try_read().map_err(|_| QueryError {})
}
fn step_to_next_bounded(&mut self, upper_time_bound: MonotonicTime) -> Option<MonotonicTime> {
fn pull_next_event(
scheduler_queue: &mut MutexGuard<SchedulerQueue>,
) -> Box<dyn ScheduledEvent> {
let ((time, channel_id), event) = scheduler_queue.pull().unwrap();
if let Some((event_clone, period)) = event.next() {
scheduler_queue.insert((time + period, channel_id), event_clone);
}
event
}
let peek_next_key = |scheduler_queue: &mut MutexGuard<SchedulerQueue>| {
loop {
match scheduler_queue.peek() {
Some((&k, t)) if k.0 <= upper_time_bound => {
if !t.is_cancelled() {
break Some(k);
}
scheduler_queue.pull();
}
_ => break None,
}
}
};
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let mut current_key = peek_next_key(&mut scheduler_queue)?;
self.time.write(current_key.0);
loop {
let event = pull_next_event(&mut scheduler_queue);
let mut next_key = peek_next_key(&mut scheduler_queue);
if next_key != Some(current_key) {
event.spawn_and_forget(&self.executor);
} else {
let mut event_sequence = SeqFuture::new();
event_sequence.push(event.into_future());
loop {
let event = pull_next_event(&mut scheduler_queue);
event_sequence.push(event.into_future());
next_key = peek_next_key(&mut scheduler_queue);
if next_key != Some(current_key) {
break;
}
}
self.executor.spawn_and_forget(event_sequence);
}
current_key = match next_key {
Some(k) if k.0 == current_key.0 => k,
_ => {
drop(scheduler_queue); let current_time = current_key.0;
self.clock.synchronize(current_time);
self.executor.run();
return Some(current_time);
}
};
}
}
fn step_until_unchecked(&mut self, target_time: MonotonicTime) {
loop {
match self.step_to_next_bounded(target_time) {
Some(t) if t == target_time => return,
None => {
self.time.write(target_time);
return;
}
_ => {}
}
}
}
}
impl fmt::Debug for Simulation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Simulation")
.field("time", &self.time.read())
.finish_non_exhaustive()
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct QueryError {}
impl fmt::Display for QueryError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "the query did not receive a response")
}
}
impl Error for QueryError {}