mod mt_executor;
mod st_executor;
mod task;
use std::any::Any;
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
use crossbeam_utils::CachePadded;
use crate::macros::scoped_thread_local::scoped_thread_local;
use crate::simulation::ModelId;
#[cfg(feature = "tracing")]
use crate::time::AtomicTimeReader;
use task::Promise;
static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug)]
#[non_exhaustive]
pub(crate) enum ExecutorError {
UnprocessedMessages(usize),
Timeout,
Panic(ModelId, Box<dyn Any + Send + 'static>),
}
#[derive(Clone)]
pub(crate) struct SimulationContext {
#[cfg(feature = "tracing")]
pub(crate) time_reader: AtomicTimeReader,
}
scoped_thread_local!(pub(crate) static SIMULATION_CONTEXT: SimulationContext);
#[derive(Debug)]
pub(crate) enum Executor {
StExecutor(st_executor::Executor),
MtExecutor(mt_executor::Executor),
}
impl Executor {
pub(crate) fn new_single_threaded(
simulation_context: SimulationContext,
abort_signal: Signal,
) -> Self {
Self::StExecutor(st_executor::Executor::new(simulation_context, abort_signal))
}
pub(crate) fn new_multi_threaded(
num_threads: usize,
simulation_context: SimulationContext,
abort_signal: Signal,
) -> Self {
Self::MtExecutor(mt_executor::Executor::new(
num_threads,
simulation_context,
abort_signal,
))
}
#[allow(unused)]
pub(crate) fn spawn<T>(&self, future: T) -> Promise<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
match self {
Self::StExecutor(executor) => executor.spawn(future),
Self::MtExecutor(executor) => executor.spawn(future),
}
}
pub(crate) fn spawn_and_forget<T>(&self, future: T)
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
match self {
Self::StExecutor(executor) => executor.spawn_and_forget(future),
Self::MtExecutor(executor) => executor.spawn_and_forget(future),
}
}
pub(crate) fn run(&mut self, timeout: Duration) -> Result<(), ExecutorError> {
match self {
Self::StExecutor(executor) => executor.run(timeout),
Self::MtExecutor(executor) => executor.run(timeout),
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct Signal(Arc<CachePadded<AtomicBool>>);
impl Signal {
pub(crate) fn new() -> Self {
Self(Arc::new(CachePadded::new(AtomicBool::new(false))))
}
pub(crate) fn set(&self) {
self.0.store(true, Ordering::Relaxed);
}
pub(crate) fn is_set(&self) -> bool {
self.0.load(Ordering::Relaxed)
}
}
#[cfg(all(test, not(nexosim_loom)))]
mod tests {
use std::sync::Arc;
use std::sync::atomic::Ordering;
use futures_channel::mpsc;
use futures_util::StreamExt;
use super::*;
fn dummy_simulation_context() -> SimulationContext {
SimulationContext {
#[cfg(feature = "tracing")]
time_reader: crate::util::sync_cell::SyncCell::new(
crate::time::TearableAtomicTime::new(crate::time::MonotonicTime::EPOCH),
)
.reader(),
}
}
struct RunOnDrop<F: FnOnce()> {
drop_fn: Option<F>,
}
impl<F: FnOnce()> RunOnDrop<F> {
fn new(drop_fn: F) -> Self {
Self {
drop_fn: Some(drop_fn),
}
}
}
impl<F: FnOnce()> Drop for RunOnDrop<F> {
fn drop(&mut self) {
if let Some(f) = self.drop_fn.take() {
f()
}
}
}
fn executor_drop_cycle(mut executor: Executor) {
let (sender1, mut receiver1) = mpsc::channel(2);
let (sender2, mut receiver2) = mpsc::channel(2);
let (sender3, mut receiver3) = mpsc::channel(2);
let drop_count = Arc::new(AtomicUsize::new(0));
executor.spawn_and_forget({
let mut sender2 = sender2.clone();
let mut sender3 = sender3.clone();
let drop_count = drop_count.clone();
async move {
let _guard = RunOnDrop::new(move || {
let _ = sender2.try_send(());
let _ = sender3.try_send(());
drop_count.fetch_add(1, Ordering::Relaxed);
});
let _ = receiver1.next().await;
}
});
executor.spawn_and_forget({
let mut sender1 = sender1.clone();
let mut sender3 = sender3.clone();
let drop_count = drop_count.clone();
async move {
let _guard = RunOnDrop::new(move || {
let _ = sender1.try_send(());
let _ = sender3.try_send(());
drop_count.fetch_add(1, Ordering::Relaxed);
});
let _ = receiver2.next().await;
}
});
executor.spawn_and_forget({
let mut sender1 = sender1.clone();
let mut sender2 = sender2.clone();
let drop_count = drop_count.clone();
async move {
let _guard = RunOnDrop::new(move || {
let _ = sender1.try_send(());
let _ = sender2.try_send(());
drop_count.fetch_add(1, Ordering::Relaxed);
});
let _ = receiver3.next().await;
}
});
executor.run(Duration::ZERO).unwrap();
drop(executor);
assert_eq!(drop_count.load(Ordering::Relaxed), 3);
}
#[test]
fn executor_drop_cycle_st() {
executor_drop_cycle(Executor::new_single_threaded(
dummy_simulation_context(),
Signal::new(),
));
}
#[test]
fn executor_drop_cycle_mt() {
executor_drop_cycle(Executor::new_multi_threaded(
3,
dummy_simulation_context(),
Signal::new(),
));
}
}