mod injector;
mod mailbox;
mod queue_items;
mod scheduler;
mod sim_init;
pub use injector::{Injector, ModelInjector};
pub use mailbox::{Address, Mailbox};
pub use queue_items::{AutoEventKey, EventId, EventKey, QueryId};
pub use scheduler::{Scheduler, SchedulingError};
pub use sim_init::{
BenchError, DuplicateEventSinkError, DuplicateEventSourceError, DuplicateQuerySourceError,
SimInit,
};
pub(crate) use injector::InjectorQueue;
#[cfg(feature = "server")]
pub(crate) use queue_items::Event;
pub(crate) use queue_items::{
EVENT_KEY_REG, EventIdErased, EventKeyReg, InputSource, QueryIdErased, QueueItem,
SchedulerRegistry,
};
pub(crate) use scheduler::GlobalScheduler;
use std::any::{Any, TypeId};
use std::cell::Cell;
use std::collections::HashMap;
use std::error::Error;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::Poll;
use std::time::Duration;
use std::{panic, task};
use pin_project::pin_project;
use recycle_box::{RecycleBox, coerce_box};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use scheduler::{SchedulerKey, SchedulerQueue};
use crate::channel::{ChannelObserver, SendError};
#[cfg(feature = "server")]
use crate::endpoints::{EventSourceEntryAny, QuerySourceEntryAny, ReplyReaderAny};
use crate::executor::{Executor, ExecutorError, Signal};
use crate::model::{BuildContext, Context, Model, ProtoModel, RegisteredModel};
use crate::path::Path;
use crate::ports::{ReplierFn, query_replier};
use crate::time::{AtomicTime, Clock, Deadline, MonotonicTime, SyncStatus, Ticker};
use crate::util::seq_futures::SeqFuture;
use crate::util::serialization::serialization_config;
use crate::util::slot;
thread_local! { pub(crate) static CURRENT_MODEL_ID: Cell<ModelId> = const { Cell::new(ModelId::none()) }; }
const GLOBAL_ORIGIN_ID: usize = usize::MAX - 1;
pub struct Simulation {
executor: Executor,
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
scheduler_registry: SchedulerRegistry,
injector_queue: Arc<Mutex<InjectorQueue>>,
time: AtomicTime,
clock: Box<dyn Clock>,
clock_tolerance: Option<Duration>,
ticker: Option<Box<dyn Ticker>>,
timeout: Duration,
observers: Vec<(Path, Box<dyn ChannelObserver>)>,
registered_models: Vec<RegisteredModel>,
is_halted: Arc<AtomicBool>,
is_terminated: bool,
}
impl Simulation {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
executor: Executor,
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
scheduler_registry: SchedulerRegistry,
injector_queue: Arc<Mutex<InjectorQueue>>,
time: AtomicTime,
clock: Box<dyn Clock>,
clock_tolerance: Option<Duration>,
ticker: Option<Box<dyn Ticker>>,
timeout: Duration,
observers: Vec<(Path, Box<dyn ChannelObserver>)>,
registered_models: Vec<RegisteredModel>,
is_halted: Arc<AtomicBool>,
) -> Self {
Self {
executor,
scheduler_queue,
scheduler_registry,
injector_queue,
time,
clock,
clock_tolerance,
ticker,
timeout,
observers,
registered_models,
is_halted,
is_terminated: false,
}
}
pub fn injector(&self) -> Injector {
Injector::new(self.injector_queue.clone())
}
pub fn scheduler(&self) -> Scheduler {
Scheduler::new(
self.scheduler_queue.clone(),
self.time.reader(),
self.is_halted.clone(),
)
}
pub fn with_clock(&mut self, clock: impl Clock, ticker: impl Ticker) {
self.clock = Box::new(clock);
self.ticker = Some(Box::new(ticker));
}
pub fn with_tickless_clock(&mut self, clock: impl Clock) {
self.clock = Box::new(clock);
self.ticker = None;
}
#[cfg(not(target_family = "wasm"))]
pub fn with_timeout(&mut self, timeout: Duration) {
self.timeout = timeout;
}
pub fn time(&self) -> MonotonicTime {
self.time.read()
}
pub fn step(&mut self) -> Result<(), ExecutionError> {
self.step_to_next(None).map(|_| ())
}
pub fn step_until(&mut self, deadline: impl Deadline) -> Result<(), ExecutionError> {
let now = self.time.read();
let target_time = deadline.into_time(now);
if target_time < now {
return Err(ExecutionError::InvalidDeadline(target_time));
}
self.step_until_unchecked(Some(target_time))
}
pub fn run(&mut self) -> Result<(), ExecutionError> {
self.step_until_unchecked(None)
}
fn process_future(
&mut self,
fut: impl Future<Output = ()> + Send + 'static,
) -> Result<(), ExecutionError> {
self.take_halt_flag()?;
self.executor.spawn_and_forget(fut);
self.run_executor()
}
pub fn process_event<T>(&mut self, event_id: &EventId<T>, arg: T) -> Result<(), ExecutionError>
where
T: Serialize + DeserializeOwned + Send + Clone + 'static,
{
let source = self
.scheduler_registry
.get_event_source(&(*event_id).into())
.ok_or(ExecutionError::InvalidEventId(event_id.0))?;
let fut = source.future_owned(Box::new(arg), None)?;
self.process_future(fut)
}
#[cfg(feature = "server")]
pub(crate) fn process_event_erased(
&mut self,
event_source: &dyn EventSourceEntryAny,
arg: Box<dyn Any>,
) -> Result<(), ExecutionError> {
let source = self
.scheduler_registry
.get_event_source(&event_source.get_event_id())
.ok_or(ExecutionError::InvalidEventId(
event_source.get_event_id().0,
))?;
let fut = source.future_owned(arg, None)?;
self.process_future(fut)
}
pub fn process_query<T, R>(
&mut self,
query_id: &QueryId<T, R>,
arg: T,
) -> Result<R, ExecutionError>
where
T: Send + Clone + 'static,
R: Send + 'static,
{
let (tx, rx) = query_replier();
let source = self
.scheduler_registry
.get_query_source(&(*query_id).into())
.ok_or(ExecutionError::InvalidQueryId(query_id.0))?;
let fut = source.future(Box::new(arg), Some(Box::new(tx)))?;
self.process_future(fut)?;
Ok(rx.read().unwrap().next().unwrap())
}
#[cfg(feature = "server")]
pub(crate) fn process_query_erased(
&mut self,
query_source: &dyn QuerySourceEntryAny,
arg: Box<dyn Any>,
) -> Result<Box<dyn ReplyReaderAny>, ExecutionError> {
let source = self
.scheduler_registry
.get_query_source(&query_source.get_query_id())
.ok_or(ExecutionError::InvalidQueryId(
query_source.get_query_id().0,
))?;
let (tx, rx) = query_source.replier();
let fut = source.future(arg, Some(tx))?;
self.process_future(fut)?;
Ok(rx)
}
pub(crate) fn process_replier_fn<M, F, T, R, S>(
&mut self,
func: F,
arg: T,
address: impl Into<Address<M>>,
) -> Result<R, ExecutionError>
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S>,
T: Send + Clone + 'static,
R: Send + 'static,
{
let (reply_writer, mut reply_reader) = slot::slot();
let sender = address.into().0;
let fut = async move {
let _ = sender
.send(
move |model: &mut M,
scheduler,
env,
recycle_box: RecycleBox<()>|
-> RecycleBox<dyn Future<Output = ()> + Send + '_> {
let fut = async move {
let reply = func.call(model, arg, scheduler, env).await;
let _ = reply_writer.write(reply);
};
coerce_box!(RecycleBox::recycle(recycle_box, fut))
},
)
.await;
};
self.process_future(fut)?;
reply_reader
.try_read()
.map_err(|_| ExecutionError::BadQuery)
}
fn run_executor(&mut self) -> Result<(), ExecutionError> {
if self.is_terminated {
return Err(ExecutionError::Terminated);
}
self.executor.run(self.timeout).map_err(|e| {
self.is_terminated = true;
match e {
ExecutorError::UnprocessedMessages(msg_count) => {
let mut deadlock_info = Vec::new();
for (model, observer) in &self.observers {
let mailbox_size = observer.len();
if mailbox_size != 0 {
deadlock_info.push(DeadlockInfo {
model: model.clone(),
mailbox_size,
});
}
}
if deadlock_info.is_empty() {
ExecutionError::MessageLoss(msg_count)
} else {
ExecutionError::Deadlock(deadlock_info)
}
}
ExecutorError::Timeout => ExecutionError::Timeout,
ExecutorError::Panic(model_id, payload) => {
let model = model_id
.get()
.map(|id| self.registered_models.get(id).unwrap().path.clone());
if (*payload).type_id() == TypeId::of::<SendError>() {
return ExecutionError::NoRecipient { model };
}
if let Some(model) = model {
return ExecutionError::Panic { model, payload };
}
panic::resume_unwind(payload);
}
}
})
}
fn synchronize(&mut self, deadline: MonotonicTime) -> Result<(), ExecutionError> {
if let SyncStatus::OutOfSync(lag) = self.clock.synchronize(deadline)
&& let Some(tolerance) = &self.clock_tolerance
&& &lag > tolerance
{
self.is_terminated = true;
return Err(ExecutionError::OutOfSync(lag));
}
Ok(())
}
fn step_to_next(
&mut self,
upper_time_bound: Option<MonotonicTime>,
) -> Result<Option<MonotonicTime>, ExecutionError> {
self.take_halt_flag()?;
if self.is_terminated {
return Err(ExecutionError::Terminated);
}
let upper_time_bound = upper_time_bound.unwrap_or(MonotonicTime::MAX);
let peek_next_key = |scheduler_queue: &mut MutexGuard<SchedulerQueue>| {
loop {
match scheduler_queue.peek() {
Some((&key, item)) if key.0 <= upper_time_bound => {
if let QueueItem::Event(event) = item
&& event.is_cancelled()
{
scheduler_queue.pull();
} else {
break Some(key);
}
}
_ => break None,
}
}
};
let next_tick = self.ticker.as_mut().and_then(|ticker| {
let tick = ticker.next_tick(self.time.read());
(tick <= upper_time_bound).then_some(tick)
});
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
let mut next_key = peek_next_key(&mut scheduler_queue);
let time = match (next_key, next_tick) {
(Some(key), Some(tick)) => tick.min(key.0),
(Some(key), None) => key.0,
(None, Some(tick)) => tick,
(None, None) => return Ok(None),
};
self.time.write(time);
let mut has_events = false;
while next_key.map(|key| key.0 == time).unwrap_or(false) {
let mut event_seq = SeqFuture::new();
next_key = loop {
let ((time, origin_id), item) = scheduler_queue.pull().unwrap();
let fut = match item {
QueueItem::Event(event) => {
let source = self
.scheduler_registry
.get_event_source(&event.event_id)
.ok_or(ExecutionError::InvalidEventId(event.event_id.0))?;
if let Some(period) = event.period {
let fut = source.future_borrowed(&*event.arg, event.key.as_ref())?;
scheduler_queue
.insert((time + period, origin_id), QueueItem::Event(event));
fut
} else {
source.future_owned(event.arg, event.key)?
}
}
QueueItem::Query(query) => {
let source = self
.scheduler_registry
.get_query_source(&query.query_id)
.ok_or(ExecutionError::InvalidQueryId(query.query_id.0))?;
source.future(query.arg, query.replier)?
}
};
event_seq.push(fut);
let key = peek_next_key(&mut scheduler_queue);
if key != next_key {
break key;
}
};
self.executor.spawn_and_forget(event_seq);
has_events = true;
}
drop(scheduler_queue);
{
let mut injector_queue = self.injector_queue.lock().unwrap();
if let Some(mut origin_id) = injector_queue.peek().map(|item| *item.0) {
has_events = true;
let mut event_seq = SeqFuture::new();
while let Some((id, event)) = injector_queue.pull() {
let source = self
.scheduler_registry
.get_event_source(&event.event_id)
.ok_or(ExecutionError::InvalidEventId(event.event_id.0))?;
let fut = source.future_owned(event.arg, event.key)?;
if id != origin_id {
self.executor.spawn_and_forget(event_seq);
event_seq = SeqFuture::new();
origin_id = id
}
event_seq.push(fut);
}
self.executor.spawn_and_forget(event_seq);
}
}
self.synchronize(time)?;
if has_events {
self.run_executor()?;
}
Ok(Some(time))
}
fn step_until_unchecked(
&mut self,
target_time: Option<MonotonicTime>,
) -> Result<(), ExecutionError> {
loop {
match self.step_to_next(target_time) {
Ok(time) if time == target_time => return Ok(()),
Ok(None) => {
if let Some(target_time) = target_time {
self.time.write(target_time);
self.synchronize(target_time)?;
}
return Ok(());
}
Err(e) => return Err(e),
_ => {}
}
}
}
fn take_halt_flag(&mut self) -> Result<(), ExecutionError> {
if self.is_halted.load(Ordering::Relaxed) {
self.is_halted.store(false, Ordering::Relaxed);
return Err(ExecutionError::Halted);
}
Ok(())
}
fn save_models(&mut self) -> Result<Vec<Vec<u8>>, ExecutionError> {
let models = self.registered_models.drain(..).collect::<Vec<_>>();
let mut values = Vec::new();
for model in models.iter() {
values.push((model.serialize)(self)?);
}
self.registered_models = models;
Ok(values)
}
fn restore_models(
&mut self,
model_state: Vec<Vec<u8>>,
event_key_reg: &EventKeyReg,
) -> Result<(), ExecutionError> {
let models = self.registered_models.drain(..).collect::<Vec<_>>();
for (model, state) in models.iter().zip(model_state) {
(model.deserialize)(self, (state, event_key_reg.clone()))?;
}
self.registered_models = models;
Ok(())
}
fn save_queue(&self) -> Result<Vec<u8>, ExecutionError> {
let scheduler_queue = self.scheduler_queue.lock().unwrap();
let queue = scheduler_queue
.iter()
.map(|(k, v)| match v.serialize(&self.scheduler_registry) {
Ok(v) => Ok((*k, v)),
Err(e) => Err(e),
})
.collect::<Result<Vec<_>, ExecutionError>>()?;
bincode::serde::encode_to_vec(&queue, serialization_config())
.map_err(|e| SaveError::SchedulerQueueSerializationError { cause: Box::new(e) }.into())
}
fn restore_queue(&mut self, state: &[u8]) -> Result<(), ExecutionError> {
let deserialized: Vec<(SchedulerKey, Vec<u8>)> =
bincode::serde::decode_from_slice(state, serialization_config())
.map_err(|e| RestoreError::SchedulerQueueDeserializationError {
cause: Box::new(e),
})?
.0;
let mut scheduler_queue = self.scheduler_queue.lock().unwrap();
scheduler_queue.clear();
for entry in deserialized {
scheduler_queue.insert(
entry.0,
QueueItem::deserialize(&entry.1, &self.scheduler_registry)?,
);
}
Ok(())
}
pub fn save<W: std::io::Write>(&mut self, writer: &mut W) -> Result<usize, ExecutionError> {
let state = SimulationState {
models: self.save_models()?,
scheduler_queue: self.save_queue()?,
time: self.time(),
};
bincode::serde::encode_into_std_write(state, writer, serialization_config())
.map_err(|e| SaveError::SimulationStateSerializationError { cause: Box::new(e) }.into())
}
pub(crate) fn restore<R: std::io::Read>(&mut self, mut state: R) -> Result<(), ExecutionError> {
let event_key_reg = Arc::new(Mutex::new(HashMap::new()));
EVENT_KEY_REG.set(&event_key_reg, || {
let state: SimulationState =
bincode::serde::decode_from_std_read(&mut state, serialization_config()).map_err(
|e| RestoreError::SimulationStateDeserializationError { cause: Box::new(e) },
)?;
self.time.write(state.time);
self.restore_models(state.models, &event_key_reg)?;
self.restore_queue(&state.scheduler_queue)?;
Ok::<_, ExecutionError>(())
})?;
Ok(())
}
}
impl fmt::Debug for Simulation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Simulation")
.field("time", &self.time.read())
.finish_non_exhaustive()
}
}
#[derive(Serialize, Deserialize)]
struct SimulationState {
models: Vec<Vec<u8>>,
scheduler_queue: Vec<u8>,
time: MonotonicTime,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct DeadlockInfo {
pub model: Path,
pub mailbox_size: usize,
}
#[non_exhaustive]
#[derive(Debug)]
pub enum SaveError {
ConfigSerializationError {
cause: Box<dyn Error + Send>,
},
ModelSerializationError {
model: Path,
type_name: &'static str,
cause: Box<dyn Error + Send>,
},
EventSerializationError {
event_id: usize,
cause: Box<dyn Error + Send>,
},
QuerySerializationError {
query_id: usize,
cause: Box<dyn Error + Send>,
},
SchedulerQueueSerializationError {
cause: Box<dyn Error + Send>,
},
SimulationStateSerializationError {
cause: Box<dyn Error + Send>,
},
EventNotFound {
event_id: usize,
},
QueryNotFound {
query_id: usize,
},
ArgumentTypeMismatch {
type_name: &'static str,
},
ArgumentSerializationError {
type_name: &'static str,
cause: Box<dyn Error + Send>,
},
}
impl fmt::Display for SaveError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::ConfigSerializationError { .. } => f.write_str("config serialization has failed"),
Self::ModelSerializationError {
model: path,
type_name,
..
} => write!(f, "cannot serialize model '{path}': {type_name}"),
Self::EventSerializationError { event_id, .. } => {
write!(f, "cannot serialize event {event_id}")
}
Self::QuerySerializationError { query_id, .. } => {
write!(f, "cannot serialize query {query_id}")
}
Self::SchedulerQueueSerializationError { .. } => {
f.write_str("cannot serialize scheduler queue")
}
Self::SimulationStateSerializationError { .. } => {
f.write_str("cannot serialize simulation state")
}
Self::EventNotFound { event_id } => {
write!(f, "serialized event (id {event_id}) cannot be found")
}
Self::QueryNotFound { query_id } => {
write!(f, "serialized query (id {query_id}) cannot be found")
}
Self::ArgumentTypeMismatch { type_name } => write!(
f,
"type mismatch while casting event argument, expected: {type_name}"
),
Self::ArgumentSerializationError { type_name, .. } => {
write!(f, "cannot serialize event arg, expected type: {type_name}")
}
}
}
}
impl Error for SaveError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::ConfigSerializationError { cause } => Some(cause.as_ref()),
Self::ModelSerializationError { cause, .. } => Some(cause.as_ref()),
Self::EventSerializationError { cause, .. } => Some(cause.as_ref()),
Self::QuerySerializationError { cause, .. } => Some(cause.as_ref()),
Self::SchedulerQueueSerializationError { cause } => Some(cause.as_ref()),
Self::SimulationStateSerializationError { cause } => Some(cause.as_ref()),
Self::EventNotFound { .. } => None,
Self::QueryNotFound { .. } => None,
Self::ArgumentTypeMismatch { .. } => None,
Self::ArgumentSerializationError { cause, .. } => Some(cause.as_ref()),
}
}
}
#[non_exhaustive]
#[derive(Debug)]
pub enum RestoreError {
ConfigMissing,
ModelDeserializationError {
model: Path,
type_name: &'static str,
cause: Box<dyn Error + Send>,
},
ModelSerializationError {
model: Path,
type_name: &'static str,
cause: Box<dyn Error + Send>,
},
QueueItemDeserializationError {
cause: Box<dyn Error + Send>,
},
SchedulerQueueDeserializationError {
cause: Box<dyn Error + Send>,
},
SimulationStateDeserializationError {
cause: Box<dyn Error + Send>,
},
EventNotFound {
event_id: usize,
},
QueryNotFound {
query_id: usize,
},
ArgumentDeserializationError {
type_name: &'static str,
cause: Box<dyn Error + Send>,
},
}
impl fmt::Display for RestoreError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::ConfigMissing => f.write_str("simulation config is missing"),
Self::ModelDeserializationError {
model, type_name, ..
} => write!(f, "cannot deserialize model {model}: {type_name}"),
Self::ModelSerializationError {
model, type_name, ..
} => write!(f, "cannot serialize model {model}: {type_name}"),
Self::QueueItemDeserializationError { .. } => {
f.write_str("cannot deserialize queue item")
}
Self::SchedulerQueueDeserializationError { .. } => {
f.write_str("cannot deserialize scheduler queue")
}
Self::SimulationStateDeserializationError { .. } => {
f.write_str("cannot deserialize simulation state")
}
Self::EventNotFound { event_id } => {
write!(f, "deserialized event (id {event_id}) cannot be found")
}
Self::QueryNotFound { query_id } => {
write!(f, "deserialized query (id {query_id}) cannot be found")
}
Self::ArgumentDeserializationError { type_name, .. } => {
write!(
f,
"cannot deserialize event arg, expected type: {type_name}"
)
}
}
}
}
impl Error for RestoreError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::ConfigMissing => None,
Self::ModelDeserializationError { cause, .. } => Some(cause.as_ref()),
Self::ModelSerializationError { cause, .. } => Some(cause.as_ref()),
Self::QueueItemDeserializationError { cause, .. } => Some(cause.as_ref()),
Self::SchedulerQueueDeserializationError { cause } => Some(cause.as_ref()),
Self::SimulationStateDeserializationError { cause } => Some(cause.as_ref()),
Self::EventNotFound { .. } => None,
Self::QueryNotFound { .. } => None,
Self::ArgumentDeserializationError { cause, .. } => Some(cause.as_ref()),
}
}
}
#[non_exhaustive]
#[derive(Debug)]
pub enum ExecutionError {
Halted,
Terminated,
Deadlock(Vec<DeadlockInfo>),
MessageLoss(usize),
NoRecipient {
model: Option<Path>,
},
Panic {
model: Path,
payload: Box<dyn Any + Send + 'static>,
},
Timeout,
OutOfSync(Duration),
BadQuery,
InvalidDeadline(MonotonicTime),
InvalidEventId(usize),
InvalidQueryId(usize),
InvalidEventType {
expected_event_type: &'static str,
},
InvalidQueryType {
expected_request_type: &'static str,
expected_reply_type: &'static str,
},
SaveError(SaveError),
RestoreError(RestoreError),
}
impl fmt::Display for ExecutionError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Halted => f.write_str("the simulation has been intentionally stopped"),
Self::Terminated => f.write_str("the simulation has been terminated"),
Self::Deadlock(list) => {
f.write_str(
"a simulation deadlock has been detected that involves the following models: ",
)?;
let mut first_item = true;
for info in list {
if first_item {
first_item = false;
} else {
f.write_str(", ")?;
}
write!(
f,
"'{}' ({} item{} in mailbox)",
info.model,
info.mailbox_size,
if info.mailbox_size == 1 { "" } else { "s" }
)?;
}
Ok(())
}
Self::MessageLoss(count) => {
write!(f, "{count} messages have been lost")
}
Self::NoRecipient{model} => {
match model {
Some(model) => write!(f,
"an attempt by model '{model}' to send a message failed because the recipient's mailbox is no longer alive"
),
None => f.write_str("an attempt by the scheduler to send a message failed because the recipient's mailbox is no longer alive"),
}
}
Self::Panic{model, payload} => {
let msg: &str = if let Some(s) = payload.downcast_ref::<&str>() {
s
} else if let Some(s) = payload.downcast_ref::<String>() {
s
} else {
return write!(f, "model '{model}' has panicked");
};
write!(f, "model '{model}' has panicked with the message: '{msg}'")
}
Self::Timeout => f.write_str("the simulation step has failed to complete within the allocated time"),
Self::OutOfSync(lag) => {
write!(
f,
"the simulation has lost synchronization and lags behind the clock by '{lag:?}'"
)
}
Self::BadQuery => f.write_str("the query did not return any response; was the target mailbox added to the simulation?"),
Self::InvalidDeadline(time) => {
write!(
f,
"the specified deadline ({time}) lies in the past of the current simulation time"
)
}
Self::InvalidEventId(e) => write!(f, "event source with identifier '{e}' was not found"),
Self::InvalidQueryId(e) => write!(f, "query source with identifier '{e}' was not found"),
Self::InvalidEventType {
expected_event_type,
} => {
write!(
f,
"invalid event type, expected: {expected_event_type}"
)
}
Self::InvalidQueryType {
expected_request_type,
expected_reply_type,
} => {
write!(
f,
"invalid query request-reply type pair, expected: ('{expected_request_type}', '{expected_reply_type}')"
)
}
Self::SaveError(o) => write!(f, "saving the simulation state has failed: {o}"),
Self::RestoreError(o) => write!(f, "restoring the simulation state has failed: {o}"),
}
}
}
impl Error for ExecutionError {}
impl From<SaveError> for ExecutionError {
fn from(e: SaveError) -> Self {
Self::SaveError(e)
}
}
impl From<RestoreError> for ExecutionError {
fn from(e: RestoreError) -> Self {
Self::RestoreError(e)
}
}
#[non_exhaustive]
#[derive(Debug)]
pub enum SimulationError {
BenchError(BenchError),
ExecutionError(ExecutionError),
SchedulingError(SchedulingError),
}
impl fmt::Display for SimulationError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::BenchError(e) => e.fmt(f),
Self::ExecutionError(e) => e.fmt(f),
Self::SchedulingError(e) => e.fmt(f),
}
}
}
impl Error for SimulationError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::BenchError(e) => Some(e),
Self::ExecutionError(e) => Some(e),
Self::SchedulingError(e) => Some(e),
}
}
}
impl From<BenchError> for SimulationError {
fn from(e: BenchError) -> Self {
Self::BenchError(e)
}
}
impl From<ExecutionError> for SimulationError {
fn from(e: ExecutionError) -> Self {
Self::ExecutionError(e)
}
}
impl From<SchedulingError> for SimulationError {
fn from(e: SchedulingError) -> Self {
Self::SchedulingError(e)
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn add_model<P>(
model: P,
mailbox: Mailbox<P::Model>,
path: Path,
scheduler: GlobalScheduler,
scheduler_registry: &mut SchedulerRegistry,
injector: &Arc<Mutex<InjectorQueue>>,
executor: &Executor,
abort_signal: &Signal,
registered_models: &mut Vec<RegisteredModel>,
is_resumed: Arc<AtomicBool>,
) where
P: ProtoModel,
{
#[cfg(feature = "tracing")]
let span = tracing::span!(target: env!("CARGO_PKG_NAME"), tracing::Level::INFO, "model", path = path.to_string());
let model_id = ModelId::new(registered_models.len());
let mut build_cx = BuildContext::new(
&mailbox,
&path,
&scheduler,
scheduler_registry,
injector,
model_id.0,
executor,
abort_signal,
registered_models,
is_resumed.clone(),
);
let model_registry = Arc::new(P::Model::register_schedulables(&mut build_cx));
build_cx.set_model_registry(&model_registry);
let (model, mut env) = model.build(&mut build_cx);
let address = mailbox.address();
let mut receiver = mailbox.0;
let abort_signal = abort_signal.clone();
registered_models.push(RegisteredModel::new(path.clone(), address.clone()));
let cx = Context::new(path, scheduler, address, model_id.0, model_registry);
let fut = async move {
let mut model = if !is_resumed.load(Ordering::Relaxed) {
model.init(&cx, &mut env).await.0
} else {
model
};
while !abort_signal.is_set() && receiver.recv(&mut model, &cx, &mut env).await.is_ok() {}
};
#[cfg(not(feature = "tracing"))]
let fut = ModelFuture::new(fut, model_id);
#[cfg(feature = "tracing")]
let fut = ModelFuture::new(fut, model_id, span);
executor.spawn_and_forget(fut);
}
#[derive(Copy, Clone, Debug)]
pub(crate) struct ModelId(usize);
impl ModelId {
const fn none() -> Self {
Self(usize::MAX)
}
fn new(id: usize) -> Self {
assert_ne!(id, usize::MAX);
Self(id)
}
fn get(&self) -> Option<usize> {
if self.0 != usize::MAX {
Some(self.0)
} else {
None
}
}
}
impl Default for ModelId {
fn default() -> Self {
Self(usize::MAX)
}
}
#[pin_project]
struct ModelFuture<F> {
#[pin]
fut: F,
id: ModelId,
#[cfg(feature = "tracing")]
span: tracing::Span,
}
impl<F> ModelFuture<F> {
#[cfg(not(feature = "tracing"))]
fn new(fut: F, id: ModelId) -> Self {
Self { fut, id }
}
#[cfg(feature = "tracing")]
fn new(fut: F, id: ModelId, span: tracing::Span) -> Self {
Self { fut, id, span }
}
}
impl<F: Future> Future for ModelFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let this = self.project();
#[cfg(feature = "tracing")]
let _enter = this.span.enter();
CURRENT_MODEL_ID.set(*this.id);
let poll = this.fut.poll(cx);
CURRENT_MODEL_ID.set(ModelId::none());
poll
}
}