#![warn(missing_docs, missing_debug_implementations)]
use std::cell::RefCell;
use std::collections::{BTreeMap, BinaryHeap};
use std::fmt;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant};
use futures_lite::pin;
use scoped_tls::scoped_thread_local;
use crate::multitask;
use crate::parking;
use crate::task::{self, waker_fn::waker_fn};
use crate::{IoRequirements, Latency};
use crate::{Local, Reactor, Shares};
static EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, Clone)]
pub struct QueueNotFoundError {
index: usize,
}
impl QueueNotFoundError {
fn new(h: TaskQueueHandle) -> Self {
QueueNotFoundError { index: h.index }
}
}
impl std::error::Error for QueueNotFoundError {}
impl fmt::Display for QueueNotFoundError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "invalid queue index: {}", self.index)
}
}
#[derive(Debug, Clone)]
pub struct QueueStillActiveError {
index: usize,
}
impl std::error::Error for QueueStillActiveError {}
impl QueueStillActiveError {
fn new(h: TaskQueueHandle) -> Self {
QueueStillActiveError { index: h.index }
}
}
impl fmt::Display for QueueStillActiveError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"queue with index {} is still active, but tried to remove",
self.index
)
}
}
scoped_thread_local!(static LOCAL_EX: LocalExecutor);
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct TaskQueueHandle {
index: usize,
}
impl Default for TaskQueueHandle {
fn default() -> Self {
TaskQueueHandle { index: 0 }
}
}
#[derive(Debug)]
struct TaskQueue {
ex: Rc<multitask::LocalExecutor>,
active: bool,
shares: Shares,
vruntime: u64,
io_requirements: IoRequirements,
name: String,
last_adjustment: Instant,
yielded: bool,
stats: TaskQueueStats,
}
impl Ord for TaskQueue {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.vruntime.cmp(&self.vruntime)
}
}
impl PartialOrd for TaskQueue {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(other.vruntime.cmp(&self.vruntime))
}
}
impl PartialEq for TaskQueue {
fn eq(&self, other: &Self) -> bool {
self.vruntime == other.vruntime
}
}
impl Eq for TaskQueue {}
impl TaskQueue {
fn new<F, S>(
index: usize,
name: S,
shares: Shares,
ioreq: IoRequirements,
notify: F,
) -> Rc<RefCell<Self>>
where
F: Fn() + 'static,
S: Into<String>,
{
Rc::new(RefCell::new(TaskQueue {
ex: Rc::new(multitask::LocalExecutor::new(notify)),
active: false,
stats: TaskQueueStats::new(index, shares.reciprocal_shares()),
shares,
vruntime: 0,
io_requirements: ioreq,
name: name.into(),
last_adjustment: Instant::now(),
yielded: false,
}))
}
fn is_active(&self) -> bool {
self.active
}
fn get_task(&mut self) -> Option<multitask::Runnable> {
self.ex.get_task()
}
fn yielded(&self) -> bool {
self.yielded
}
fn prepare_to_run(&mut self, now: Instant) {
self.yielded = false;
if let Shares::Dynamic(bm) = &self.shares {
if now.saturating_duration_since(self.last_adjustment) > bm.adjustment_period() {
self.last_adjustment = now;
self.stats.reciprocal_shares = self.shares.reciprocal_shares();
}
}
}
fn account_vruntime(&mut self, delta: Duration) -> Option<u64> {
let delta_scaled = (self.stats.reciprocal_shares * (delta.as_nanos() as u64)) >> 12;
self.stats.runtime += delta;
self.stats.queue_selected += 1;
self.active = self.ex.is_active();
let vruntime = self.vruntime.checked_add(delta_scaled);
if let Some(x) = vruntime {
self.vruntime = x;
}
vruntime
}
}
macro_rules! to_io_error {
($error:expr) => {{
match $error {
Ok(x) => Ok(x),
Err(nix::Error::Sys(_)) => Err(io::Error::last_os_error()),
Err(nix::Error::InvalidUtf8) => Err(io::Error::new(io::ErrorKind::InvalidInput, "")),
Err(nix::Error::InvalidPath) => Err(io::Error::new(io::ErrorKind::InvalidInput, "")),
Err(nix::Error::UnsupportedOperation) => Err(io::Error::new(io::ErrorKind::Other, "")),
}
}};
}
fn bind_to_cpu(cpu: usize) -> io::Result<()> {
let mut cpuset = nix::sched::CpuSet::new();
to_io_error!(&cpuset.set(cpu as usize))?;
let pid = nix::unistd::Pid::from_raw(0);
to_io_error!(nix::sched::sched_setaffinity(pid, &cpuset))
}
#[derive(Debug, Copy, Clone)]
pub struct ExecutorStats {
total_runtime: Duration,
scheduler_runs: u64,
tasks_executed: u64,
}
impl ExecutorStats {
fn new() -> Self {
Self {
total_runtime: Duration::from_nanos(0),
scheduler_runs: 0,
tasks_executed: 0,
}
}
pub fn total_runtime(&self) -> Duration {
self.total_runtime
}
pub fn scheduler_runs(&self) -> u64 {
self.scheduler_runs
}
pub fn tasks_executed(&self) -> u64 {
self.tasks_executed
}
}
#[derive(Debug, Copy, Clone)]
pub struct TaskQueueStats {
index: usize,
reciprocal_shares: u64,
queue_selected: u64,
runtime: Duration,
}
impl TaskQueueStats {
fn new(index: usize, reciprocal_shares: u64) -> Self {
Self {
index,
reciprocal_shares,
runtime: Duration::from_nanos(0),
queue_selected: 0,
}
}
pub fn index(&self) -> usize {
self.index
}
pub fn current_shares(&self) -> usize {
((1u64 << 22) / self.reciprocal_shares) as usize
}
pub fn runtime(&self) -> Duration {
self.runtime
}
pub fn queue_selected(&self) -> u64 {
self.queue_selected
}
}
#[derive(Debug)]
struct ExecutorQueues {
active_executors: BinaryHeap<Rc<RefCell<TaskQueue>>>,
available_executors: BTreeMap<usize, Rc<RefCell<TaskQueue>>>,
active_executing: Option<Rc<RefCell<TaskQueue>>>,
default_executor: TaskQueueHandle,
executor_index: usize,
last_vruntime: u64,
preempt_timer_duration: Duration,
spin_before_park: Option<Duration>,
stats: ExecutorStats,
}
impl ExecutorQueues {
fn new() -> Rc<RefCell<Self>> {
Rc::new(RefCell::new(ExecutorQueues {
active_executors: BinaryHeap::new(),
available_executors: BTreeMap::new(),
active_executing: None,
default_executor: TaskQueueHandle::default(),
executor_index: 1,
last_vruntime: 0,
preempt_timer_duration: Duration::from_millis(100),
spin_before_park: None,
stats: ExecutorStats::new(),
}))
}
fn reevaluate_preempt_timer(&mut self) {
self.preempt_timer_duration = self
.active_executors
.iter()
.map(|tq| match tq.borrow().io_requirements.latency_req {
Latency::NotImportant => Duration::from_millis(100),
Latency::Matters(d) => d,
})
.min()
.unwrap_or_else(|| Duration::from_secs(1))
}
fn maybe_activate(&mut self, index: usize) {
let queue = self
.available_executors
.get(&index)
.expect("Trying to activate invalid queue! Index")
.clone();
let mut state = queue.borrow_mut();
if !state.is_active() {
state.vruntime = self.last_vruntime;
state.active = true;
drop(state);
self.active_executors.push(queue);
self.reevaluate_preempt_timer();
}
}
}
#[derive(Debug)]
pub struct LocalExecutorBuilder {
binding: Option<usize>,
spin_before_park: Option<Duration>,
name: String,
}
impl LocalExecutorBuilder {
pub fn new() -> LocalExecutorBuilder {
LocalExecutorBuilder {
binding: None,
spin_before_park: None,
name: String::from("unnamed"),
}
}
pub fn pin_to_cpu(mut self, cpu: usize) -> LocalExecutorBuilder {
self.binding = Some(cpu);
self
}
pub fn spin_before_park(mut self, spin: Duration) -> LocalExecutorBuilder {
self.spin_before_park = Some(spin);
self
}
pub fn name(mut self, name: &str) -> LocalExecutorBuilder {
self.name = String::from(name);
self
}
pub fn make(self) -> io::Result<LocalExecutor> {
let mut le = LocalExecutor::new(EXECUTOR_ID.fetch_add(1, Ordering::Relaxed));
if let Some(cpu) = self.binding {
le.bind_to_cpu(cpu)?;
le.queues.borrow_mut().spin_before_park = self.spin_before_park;
}
match le.init() {
Ok(_) => Ok(le),
Err(e) => Err(e),
}
}
#[must_use = "This spawns an executor on a thread, so you must acquire its handle and then join() to keep it alive"]
pub fn spawn<G, F, T>(self, fut_gen: G) -> io::Result<JoinHandle<()>>
where
G: FnOnce() -> F + std::marker::Send + 'static,
F: Future<Output = T> + 'static,
{
let id = EXECUTOR_ID.fetch_add(1, Ordering::Relaxed);
let name = format!("{}-{}", self.name, id);
Builder::new().name(name).spawn(move || {
let mut le = LocalExecutor::new(id);
if let Some(cpu) = self.binding {
le.bind_to_cpu(cpu).unwrap();
}
le.init().unwrap();
le.run(async move {
let task = Task::local(async move {
fut_gen().await;
});
task.await;
})
})
}
}
impl Default for LocalExecutorBuilder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct LocalExecutor {
queues: Rc<RefCell<ExecutorQueues>>,
parker: parking::Parker,
id: usize,
}
impl LocalExecutor {
fn bind_to_cpu(&self, cpu: usize) -> io::Result<()> {
bind_to_cpu(cpu)
}
fn init(&mut self) -> io::Result<()> {
let index = 0;
let queues_weak = Rc::downgrade(&self.queues);
let io_requirements = IoRequirements::new(Latency::NotImportant, 0);
self.queues.borrow_mut().available_executors.insert(
0,
TaskQueue::new(
0,
"default",
Shares::Static(1000),
io_requirements,
move || {
let q = queues_weak.upgrade().unwrap();
let mut queues = q.borrow_mut();
queues.maybe_activate(index);
},
),
);
Ok(())
}
pub fn make_default() -> LocalExecutor {
LocalExecutorBuilder::new().make().unwrap()
}
fn new(id: usize) -> LocalExecutor {
let p = parking::Parker::new();
LocalExecutor {
queues: ExecutorQueues::new(),
parker: p,
id,
}
}
pub fn id(&self) -> usize {
self.id
}
pub fn create_task_queue<S>(&self, shares: Shares, latency: Latency, name: S) -> TaskQueueHandle
where
S: Into<String>,
{
let index = {
let mut ex = self.queues.borrow_mut();
let index = ex.executor_index;
ex.executor_index += 1;
index
};
let queues_weak = Rc::downgrade(&self.queues);
let io_requirements = IoRequirements::new(latency, index);
let tq = TaskQueue::new(index, name, shares, io_requirements, move || {
let queues = queues_weak.upgrade().unwrap();
let mut queues = queues.borrow_mut();
queues.maybe_activate(index);
});
self.queues
.borrow_mut()
.available_executors
.insert(index, tq);
TaskQueueHandle { index }
}
pub fn remove_task_queue(
&self,
handle: TaskQueueHandle,
) -> Result<(), Box<dyn std::error::Error>> {
let mut queues = self.queues.borrow_mut();
if let Some(tq) = queues.available_executors.get(&handle.index) {
if tq.borrow().is_active() {
return Err(Box::new(QueueStillActiveError::new(handle)));
}
queues
.available_executors
.remove(&handle.index)
.expect("test already done");
return Ok(());
}
Err(Box::new(QueueNotFoundError::new(handle)))
}
fn get_queue(&self, handle: &TaskQueueHandle) -> Option<Rc<RefCell<TaskQueue>>> {
self.queues
.borrow()
.available_executors
.get(&handle.index)
.cloned()
}
fn get_executor(&self, handle: &TaskQueueHandle) -> Option<Rc<multitask::LocalExecutor>> {
self.get_queue(handle).map(|x| x.borrow().ex.clone())
}
fn current_task_queue(&self) -> TaskQueueHandle {
TaskQueueHandle {
index: self
.queues
.borrow()
.active_executing
.as_ref()
.unwrap()
.borrow()
.stats
.index,
}
}
fn mark_me_for_yield(&self) {
let queues = self.queues.borrow();
let mut me = queues.active_executing.as_ref().unwrap().borrow_mut();
me.yielded = true;
}
pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
let ex = self
.queues
.borrow()
.active_executing
.as_ref()
.map(|x| x.borrow().ex.clone())
.or_else(|| self.get_executor(&TaskQueueHandle { index: 0 }))
.unwrap();
Task(ex.spawn(future))
}
pub fn spawn_into<T, F>(
&self,
future: F,
handle: TaskQueueHandle,
) -> Result<Task<T>, QueueNotFoundError>
where
T: 'static,
F: Future<Output = T> + 'static,
{
self.get_executor(&handle)
.map(|ex| Task(ex.spawn(future)))
.ok_or_else(|| QueueNotFoundError::new(handle))
}
fn preempt_timer_duration(&self) -> Duration {
self.queues.borrow().preempt_timer_duration
}
fn spin_before_park(&self) -> Option<Duration> {
self.queues.borrow().spin_before_park
}
fn run_task_queues(&self) -> bool {
let mut ran = false;
while !Reactor::need_preempt() {
if !self.run_one_task_queue() {
return false;
} else {
ran = true;
}
}
ran
}
fn run_one_task_queue(&self) -> bool {
let mut tq = self.queues.borrow_mut();
let candidate = tq.active_executors.pop();
tq.stats.scheduler_runs += 1;
match candidate {
Some(queue) => {
tq.active_executing = Some(queue.clone());
drop(tq);
let time = {
let now = Instant::now();
let mut queue_ref = queue.borrow_mut();
queue_ref.prepare_to_run(now);
now
};
let mut tasks_executed_this_loop = 0;
loop {
let mut queue_ref = queue.borrow_mut();
if Reactor::need_preempt() || queue_ref.yielded() {
break;
}
if let Some(r) = queue_ref.get_task() {
Reactor::get().inform_io_requirements(queue_ref.io_requirements);
drop(queue_ref);
r.run();
tasks_executed_this_loop += 1;
} else {
break;
}
}
let runtime = time.elapsed();
let (need_repush, last_vruntime) = {
let mut state = queue.borrow_mut();
let last_vruntime = state.account_vruntime(runtime);
(state.is_active(), last_vruntime)
};
let mut tq = self.queues.borrow_mut();
tq.active_executing = None;
tq.stats.total_runtime += runtime;
tq.stats.tasks_executed += tasks_executed_this_loop;
tq.last_vruntime = match last_vruntime {
Some(x) => x,
None => {
for queue in tq.available_executors.values() {
let mut q = queue.borrow_mut();
q.vruntime = 0;
}
0
}
};
if need_repush {
tq.active_executors.push(queue);
} else {
tq.reevaluate_preempt_timer();
}
true
}
None => false,
}
}
pub fn run<T>(&self, future: impl Future<Output = T>) -> T {
pin!(future);
let waker = waker_fn(|| {});
let cx = &mut Context::from_waker(&waker);
let spin_before_park = self.spin_before_park().unwrap_or_default();
let spin = spin_before_park.as_nanos() > 0;
let mut spin_since: Option<Instant> = None;
LOCAL_EX.set(self, || loop {
if let Poll::Ready(t) = future.as_mut().poll(cx) {
break t;
}
let duration = self.preempt_timer_duration();
self.parker.poll_io(duration);
if !self.run_task_queues() {
if spin {
if let Some(t) = spin_since {
if t.elapsed() < spin_before_park {
continue;
}
spin_since = None
} else {
spin_since = Some(Instant::now());
continue;
}
}
self.parker.park();
} else {
spin_since = None
}
})
}
}
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
#[derive(Debug)]
pub struct Task<T>(multitask::Task<T>);
impl<T> Task<T> {
pub fn local(future: impl Future<Output = T> + 'static) -> Task<T>
where
T: 'static,
{
if LOCAL_EX.is_set() {
LOCAL_EX.with(|local_ex| local_ex.spawn(future))
} else {
panic!("`Task::local()` must be called from a `LocalExecutor`")
}
}
pub async fn later() {
if LOCAL_EX.is_set() {
LOCAL_EX.with(|local_ex| local_ex.mark_me_for_yield())
} else {
panic!("`Task::local()` must be called from a `LocalExecutor`")
}
struct Yield {
done: Option<()>,
}
impl Future for Yield {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.done.take() {
Some(_) => {
cx.waker().clone().wake();
Poll::Pending
}
None => Poll::Ready(()),
}
}
}
let y = Yield { done: Some(()) };
y.await;
}
#[inline]
pub fn need_preempt() -> bool {
Reactor::need_preempt()
}
#[inline]
pub async fn yield_if_needed() {
if Reactor::need_preempt() {
Local::later().await;
}
}
pub fn local_into(
future: impl Future<Output = T> + 'static,
handle: TaskQueueHandle,
) -> Result<Task<T>, QueueNotFoundError>
where
T: 'static,
{
if LOCAL_EX.is_set() {
LOCAL_EX.with(|local_ex| local_ex.spawn_into(future, handle))
} else {
panic!("`Task::local()` must be called from a `LocalExecutor`")
}
}
pub fn id() -> usize
where
T: 'static,
{
if LOCAL_EX.is_set() {
LOCAL_EX.with(|local_ex| local_ex.id())
} else {
panic!("`Task::id()` must be called from a `LocalExecutor`")
}
}
pub fn detach(self) -> task::JoinHandle<T, ()> {
self.0.detach()
}
pub fn create_task_queue(shares: Shares, latency: Latency, name: &str) -> TaskQueueHandle {
if LOCAL_EX.is_set() {
LOCAL_EX.with(|local_ex| local_ex.create_task_queue(shares, latency, name))
} else {
panic!("`Task::create_task_queue()` must be called from a `LocalExecutor`")
}
}
pub fn current_task_queue() -> TaskQueueHandle {
if LOCAL_EX.is_set() {
LOCAL_EX.with(|local_ex| local_ex.current_task_queue())
} else {
panic!("`Task::current_task_queue()` must be called from a `LocalExecutor`")
}
}
pub fn task_queue_stats(handle: TaskQueueHandle) -> Result<TaskQueueStats, QueueNotFoundError> {
if LOCAL_EX.is_set() {
LOCAL_EX.with(|local_ex| match local_ex.get_queue(&handle) {
Some(x) => Ok(x.borrow().stats),
None => Err(QueueNotFoundError::new(handle)),
})
} else {
panic!("`Task::current_task_queue()` must be called from a `LocalExecutor`")
}
}
pub fn all_task_queue_stats<V>(mut output: V) -> V
where
V: Extend<TaskQueueStats>,
{
if LOCAL_EX.is_set() {
LOCAL_EX.with(|local_ex| {
let tq = local_ex.queues.borrow();
output.extend(tq.available_executors.values().map(|x| x.borrow().stats));
output
})
} else {
panic!("`Task::current_task_queue()` must be called from a `LocalExecutor`")
}
}
pub fn executor_stats() -> ExecutorStats {
if LOCAL_EX.is_set() {
LOCAL_EX.with(|local_ex| local_ex.queues.borrow().stats)
} else {
panic!("`Task::current_task_queue()` must be called from a `LocalExecutor`")
}
}
pub async fn cancel(self) -> Option<T> {
self.0.cancel().await
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::timer::{self, Timer};
use crate::Semaphore;
use crate::{enclose, SharesManager};
use core::mem::MaybeUninit;
use futures::join;
use std::cell::Cell;
#[test]
fn create_and_destroy_executor() {
let mut var = Rc::new(RefCell::new(0));
let local_ex = LocalExecutor::make_default();
let varclone = var.clone();
local_ex.run(async move {
let mut m = varclone.borrow_mut();
*m += 10;
});
let v = Rc::get_mut(&mut var).unwrap();
let v = v.replace(0);
assert_eq!(v, 10);
}
#[test]
fn create_fail_to_bind() {
if let Ok(_) = LocalExecutorBuilder::new().pin_to_cpu(usize::MAX).make() {
panic!("Should have failed");
}
}
#[test]
fn create_and_bind() {
if let Err(x) = LocalExecutorBuilder::new().pin_to_cpu(0).make() {
panic!("got error {:?}", x);
}
}
#[test]
#[should_panic]
fn spawn_without_executor() {
let _ = LocalExecutor::make_default();
let _ = Task::local(async move {});
}
#[test]
fn invalid_task_queue() {
let local_ex = LocalExecutor::make_default();
local_ex.run(async {
let task = Task::local_into(
async move {
panic!("Should not have executed this");
},
TaskQueueHandle { index: 1 },
);
if let Ok(_) = task {
panic!("Should have failed");
}
});
}
#[test]
fn ten_yielding_queues() {
let local_ex = LocalExecutor::make_default();
let executed_last = Rc::new(RefCell::new(0));
local_ex.run(async {
let mut joins = Vec::with_capacity(10);
for id in 1..11 {
let exec = executed_last.clone();
joins.push(Task::local(async move {
for _ in 0..10_000 {
let mut last = exec.borrow_mut();
assert_ne!(id, *last);
*last = id;
drop(last);
Local::later().await;
}
}));
}
futures::future::join_all(joins).await;
});
}
#[test]
fn task_with_latency_requirements() {
let local_ex = LocalExecutor::make_default();
local_ex.run(async {
let not_latency =
Local::create_task_queue(Shares::default(), Latency::NotImportant, "test");
let latency = Local::create_task_queue(
Shares::default(),
Latency::Matters(Duration::from_millis(2)),
"testlat",
);
let nolat_started = Rc::new(RefCell::new(false));
let lat_status = Rc::new(RefCell::new(false));
let nolat = local_ex
.spawn_into(
crate::enclose! { (nolat_started, lat_status)
async move {
*(nolat_started.borrow_mut()) = true;
let start = Instant::now();
loop {
if *(lat_status.borrow()) {
break;
}
if start.elapsed().as_secs() > 1 {
panic!("Never received preempt signal");
}
Local::yield_if_needed().await;
}
}
},
not_latency,
)
.unwrap();
let lat = local_ex
.spawn_into(
crate::enclose! { (nolat_started, lat_status)
async move {
loop {
if *(nolat_started.borrow()) == false {
Local::later().await;
} else {
break;
}
}
*(lat_status.borrow_mut()) = true;
}
},
latency,
)
.unwrap();
futures::join!(nolat, lat);
});
}
#[test]
fn current_task_queue_matches() {
let local_ex = LocalExecutor::make_default();
local_ex.run(async {
let tq1 = Local::create_task_queue(Shares::default(), Latency::NotImportant, "test1");
let tq2 = Local::create_task_queue(Shares::default(), Latency::NotImportant, "test2");
let id1 = tq1.index;
let id2 = tq2.index;
let j0 = Local::local(async {
assert_eq!(Local::current_task_queue().index, 0);
});
let j1 = Local::local_into(
async move {
assert_eq!(Local::current_task_queue().index, id1);
},
tq1,
)
.unwrap();
let j2 = Local::local_into(
async move {
assert_eq!(Local::current_task_queue().index, id2);
},
tq2,
)
.unwrap();
futures::join!(j0, j1, j2);
})
}
#[test]
fn task_optimized_for_throughput() {
let local_ex = LocalExecutor::make_default();
local_ex.run(async {
let tq1 = Local::create_task_queue(Shares::default(), Latency::NotImportant, "test");
let tq2 = Local::create_task_queue(Shares::default(), Latency::NotImportant, "testlat");
let first_started = Rc::new(RefCell::new(false));
let second_status = Rc::new(RefCell::new(false));
let first = local_ex
.spawn_into(
crate::enclose! { (first_started, second_status)
async move {
*(first_started.borrow_mut()) = true;
let start = Instant::now();
loop {
if start.elapsed().as_millis() >= 99 {
break;
}
if *(second_status.borrow()) {
panic!("I was preempted but should not have been");
}
Local::yield_if_needed().await;
}
}
},
tq1,
)
.unwrap();
let second = local_ex
.spawn_into(
crate::enclose! { (first_started, second_status)
async move {
loop {
if *(first_started.borrow()) == false {
Local::later().await;
} else {
break;
}
}
*(second_status.borrow_mut()) = true;
}
},
tq2,
)
.unwrap();
futures::join!(first, second);
});
}
#[test]
fn test_detach() {
let ex = LocalExecutor::make_default();
ex.spawn(async {
loop {
Local::later().await;
}
})
.detach();
ex.run(async {
Timer::new(Duration::from_micros(100)).await;
});
}
fn from_timeval(v: libc::timeval) -> Duration {
Duration::from_secs(v.tv_sec as u64) + Duration::from_micros(v.tv_usec as u64)
}
fn getrusage() -> libc::rusage {
let mut s0 = MaybeUninit::<libc::rusage>::uninit();
let err = unsafe { libc::getrusage(libc::RUSAGE_THREAD, s0.as_mut_ptr()) };
if err != 0 {
panic!("getrusage error = {}", err);
}
unsafe { s0.assume_init() }
}
fn getrusage_utime() -> Duration {
from_timeval(getrusage().ru_utime)
}
#[test]
fn test_no_spin() {
let ex = LocalExecutor::make_default();
let task_queue = ex.create_task_queue(
Shares::default(),
Latency::Matters(Duration::from_millis(10)),
"my_tq",
);
let start = getrusage_utime();
let task = ex
.spawn_into(
async { timer::sleep(Duration::from_secs(1)).await },
task_queue,
)
.expect("failed to spawn task");
ex.run(async { task.await });
assert!(
getrusage_utime() - start < Duration::from_millis(2),
"expected user time on LE is less than 2 millisecond"
);
}
#[test]
fn test_spin() {
let dur = Duration::from_secs(1);
let ex0 = LocalExecutorBuilder::new().make().unwrap();
let ex0_ru_start = getrusage_utime();
ex0.run(async { timer::sleep(dur).await });
let ex0_ru_finish = getrusage_utime();
let ex = LocalExecutorBuilder::new()
.pin_to_cpu(0)
.spin_before_park(Duration::from_millis(100))
.make()
.unwrap();
let ex_ru_start = getrusage_utime();
let task = ex.spawn(async move { timer::sleep(dur).await });
ex.run(async { task.await });
let ex_ru_finish = getrusage_utime();
assert!(
ex0_ru_finish - ex0_ru_start < Duration::from_millis(10),
"expected user time on LE0 is less than 10 millisecond"
);
assert!(
ex_ru_finish - ex_ru_start >= Duration::from_millis(50),
"expected user time on LE is much greater than 50 millisecond"
);
}
async fn work_quanta() {
let now = Instant::now();
while now.elapsed().as_millis() < 2 {}
Local::later().await;
}
async fn do_ping_pong_work(last: Rc<Cell<usize>>, counter: Rc<Cell<usize>>, me: usize) {
if last.get() == me {
counter.replace(counter.get() + 1);
} else {
counter.replace(1);
}
assert!(counter.get() < 10);
last.replace(me);
work_quanta().await;
}
#[test]
fn test_ping_pong_yield() {
test_executor!(async move {
let tq1 = Local::create_task_queue(Shares::default(), Latency::NotImportant, "test1");
let tq2 = Local::create_task_queue(Shares::default(), Latency::NotImportant, "test2");
let counter = Rc::new(Cell::new(0));
let last = Rc::new(Cell::new(0));
let sem = Rc::new(Semaphore::new(0));
let j1 = Local::local_into(
enclose! {(last, counter, sem) async move {
let mut v = Vec::new();
for _ in 0..100 {
sem.acquire(1).await.unwrap();
v.push(Local::local(enclose!{(last, counter) async move {
do_ping_pong_work(last, counter, 0).await;
}}).detach());
}
join_all(v).await;
}},
tq1,
)
.unwrap();
let j2 = Local::local_into(
enclose! {(last, counter, sem) async move {
let mut v = Vec::new();
for _ in 0..100 {
sem.acquire(1).await.unwrap();
v.push(Local::local(enclose!{(last, counter) async move {
do_ping_pong_work(last, counter, 1).await;
}}).detach());
}
join_all(v).await;
}},
tq2,
)
.unwrap();
sem.signal(200);
futures::join!(j1, j2);
});
}
macro_rules! test_static_shares {
( $s1:expr, $s2:expr, $work:block ) => {
let local_ex = LocalExecutor::make_default();
local_ex.run(async {
let tq1 = Local::create_task_queue(
Shares::Static($s1),
Latency::Matters(Duration::from_millis(1)),
"test_1",
);
let tq2 = Local::create_task_queue(
Shares::Static($s2),
Latency::Matters(Duration::from_millis(1)),
"test_2",
);
let tq1_count = Rc::new(Cell::new(0));
let tq2_count = Rc::new(Cell::new(0));
let now = Instant::now();
let t1 = Local::local_into(
enclose! { (tq1_count, now) async move {
while now.elapsed().as_secs() < 5 {
$work;
tq1_count.replace(tq1_count.get() + 1);
}
}},
tq1,
)
.unwrap();
let t2 = Local::local_into(
enclose! { (tq2_count, now ) async move {
while now.elapsed().as_secs() < 5 {
$work;
tq2_count.replace(tq2_count.get() + 1);
}
}},
tq2,
)
.unwrap();
join!(t1, t2);
let expected_ratio = $s2 as f64 / (($s2 + $s1) as f64);
let actual_ratio =
tq2_count.get() as f64 / ((tq1_count.get() + tq2_count.get()) as f64);
assert!((expected_ratio - actual_ratio).abs() < 0.1);
});
};
}
#[test]
fn test_shares_high_disparity_fat_task() {
test_static_shares!(1000, 10, { work_quanta().await });
}
#[test]
fn test_shares_low_disparity_fat_task() {
test_static_shares!(1000, 1000, { work_quanta().await });
}
struct DynamicSharesTest {
shares: Cell<usize>,
}
impl DynamicSharesTest {
fn new() -> Rc<Self> {
Rc::new(Self {
shares: Cell::new(0),
})
}
fn tick(&self, millis: u64) {
if millis < 1000 {
self.shares.replace(1);
} else {
self.shares.replace(1000);
}
}
}
impl SharesManager for DynamicSharesTest {
fn shares(&self) -> usize {
self.shares.get()
}
fn adjustment_period(&self) -> Duration {
Duration::from_millis(1)
}
}
#[test]
fn test_dynamic_shares() {
let local_ex = LocalExecutor::make_default();
local_ex.run(async {
let bm = DynamicSharesTest::new();
let tq1 = Local::create_task_queue(
Shares::Static(1000),
Latency::Matters(Duration::from_millis(1)),
"test_1",
);
let tq2 = Local::create_task_queue(
Shares::Dynamic(bm.clone()),
Latency::Matters(Duration::from_millis(1)),
"test_2",
);
let tq1_count = Rc::new(RefCell::new(vec![0, 0]));
let tq2_count = Rc::new(RefCell::new(vec![0, 0]));
let now = Instant::now();
let t1 = Local::local_into(
enclose! { (tq1_count, now) async move {
loop {
let secs = now.elapsed().as_secs();
if secs >= 2 {
break;
}
(*tq1_count.borrow_mut())[secs as usize] += 1;
Local::later().await;
}
}},
tq1,
)
.unwrap();
let t2 = Local::local_into(
enclose! { (tq2_count, now, bm) async move {
loop {
let elapsed = now.elapsed();
let secs = elapsed.as_secs();
if secs >= 2 {
break;
}
bm.tick(elapsed.as_millis() as u64);
(*tq2_count.borrow_mut())[secs as usize] += 1;
Local::later().await;
}
}},
tq2,
)
.unwrap();
join!(t1, t2);
let ratios: Vec<f64> = tq1_count
.borrow()
.iter()
.zip(tq2_count.borrow().iter())
.map(|(x, y)| *y as f64 / *x as f64)
.collect();
assert!(ratios[1] > ratios[0]);
assert!(ratios[0] < 0.25);
assert!(ratios[1] > 0.50);
});
}
}