use crate::runtime::execution::Execution;
use crate::runtime::task::{Task, TaskId};
use crate::runtime::thread::continuation::{ContinuationPool, CONTINUATION_POOL};
use crate::scheduler::metrics::MetricsScheduler;
use crate::scheduler::{Schedule, Scheduler};
use crate::Config;
use std::cell::RefCell;
use std::fmt;
use std::panic;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use std::time::Instant;
use tracing::{span, Level};
#[must_use]
struct ResetSpanOnDrop {
span: tracing::Span,
}
impl ResetSpanOnDrop {
fn new() -> Self {
Self {
span: tracing::Span::current().clone(),
}
}
}
impl Drop for ResetSpanOnDrop {
fn drop(&mut self) {
tracing::dispatcher::get_default(|subscriber| {
while let Some(span_id) = tracing::Span::current().id().as_ref() {
subscriber.exit(span_id);
}
if let Some(span_id) = self.span.id().as_ref() {
subscriber.enter(span_id);
}
});
}
}
#[derive(Debug)]
pub struct Runner<S: ?Sized + Scheduler> {
scheduler: Rc<RefCell<MetricsScheduler<S>>>,
config: Config,
}
impl<S: Scheduler + 'static> Runner<S> {
pub fn new(scheduler: S, config: Config) -> Self {
let metrics_scheduler = MetricsScheduler::new(scheduler);
Self {
scheduler: Rc::new(RefCell::new(metrics_scheduler)),
config,
}
}
pub fn run<F>(self, f: F) -> usize
where
F: Fn() + Send + Sync + 'static,
{
let _span_drop_guard = ResetSpanOnDrop::new();
CONTINUATION_POOL.set(&ContinuationPool::new(), || {
let f = Arc::new(f);
let start = Instant::now();
let mut i = 0;
loop {
if self.config.max_time.map(|t| start.elapsed() > t).unwrap_or(false) {
break;
}
let schedule = match self.scheduler.borrow_mut().new_execution() {
None => break,
Some(s) => s,
};
let execution = Execution::new(self.scheduler.clone(), schedule);
let f = Arc::clone(&f);
let _span_drop_guard2 = ResetSpanOnDrop::new();
span!(Level::ERROR, "execution", i).in_scope(|| execution.run(&self.config, move || f()));
i += 1;
}
i
})
}
}
pub struct PortfolioRunner {
schedulers: Vec<Box<dyn Scheduler + Send + 'static>>,
stop_on_first_failure: bool,
config: Config,
}
impl PortfolioRunner {
pub fn new(stop_on_first_failure: bool, config: Config) -> Self {
Self {
schedulers: Vec::new(),
stop_on_first_failure,
config,
}
}
pub fn add(&mut self, scheduler: impl Scheduler + Send + 'static) {
self.schedulers.push(Box::new(scheduler));
}
pub fn run<F>(self, f: F)
where
F: Fn() + Send + Sync + 'static,
{
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ThreadResult {
Passed,
Failed,
}
let (tx, rx) = mpsc::sync_channel::<ThreadResult>(0);
let stop_signal = Arc::new(AtomicBool::new(false));
let config = self.config;
let f = Arc::new(f);
let threads = self
.schedulers
.into_iter()
.enumerate()
.map(|(i, scheduler)| {
let f = Arc::clone(&f);
let tx = tx.clone();
let stop_signal = stop_signal.clone();
let config = config.clone();
thread::spawn(move || {
let scheduler = PortfolioStoppableScheduler { scheduler, stop_signal };
let runner = Runner::new(scheduler, config);
span!(Level::ERROR, "job", i).in_scope(|| {
let ret = panic::catch_unwind(panic::AssertUnwindSafe(|| runner.run(move || f())));
match ret {
Ok(_) => tx.send(ThreadResult::Passed),
Err(e) => {
tx.send(ThreadResult::Failed).unwrap();
panic::resume_unwind(e);
}
}
})
})
})
.collect::<Vec<_>>();
for _ in 0..threads.len() {
if rx.recv().unwrap() == ThreadResult::Failed && self.stop_on_first_failure {
stop_signal.store(true, Ordering::SeqCst);
}
}
let mut panic = None;
for thread in threads {
if let Err(e) = thread.join() {
panic = Some(e);
}
}
assert!(stop_signal.load(Ordering::SeqCst) == panic.is_some());
if let Some(e) = panic {
std::panic::resume_unwind(e);
}
}
}
impl fmt::Debug for PortfolioRunner {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("PortfolioRunner")
.field("schedulers", &self.schedulers.len())
.field("stop_on_first_failure", &self.stop_on_first_failure)
.field("config", &self.config)
.finish()
}
}
#[derive(Debug)]
struct PortfolioStoppableScheduler<S> {
scheduler: S,
stop_signal: Arc<AtomicBool>,
}
impl<S: Scheduler> Scheduler for PortfolioStoppableScheduler<S> {
fn new_execution(&mut self) -> Option<Schedule> {
if self.stop_signal.load(Ordering::SeqCst) {
None
} else {
self.scheduler.new_execution()
}
}
fn next_task(
&mut self,
runnable_tasks: &[&Task],
current_task: Option<TaskId>,
is_yielding: bool,
) -> Option<TaskId> {
if self.stop_signal.load(Ordering::SeqCst) {
None
} else {
self.scheduler.next_task(runnable_tasks, current_task, is_yielding)
}
}
fn next_u64(&mut self) -> u64 {
self.scheduler.next_u64()
}
}