use async_io::block_on;
use async_lock::Barrier;
use async_task::Runnable;
use async_task::Task;
use concurrent_queue::ConcurrentQueue;
use core_affinity;
use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures_lite::future::{self, Future, FutureExt};
use slab::Slab;
use std::cmp;
use std::fmt;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::task::{Poll, Waker};
use std::thread;
use crate::runtime::config;
use crate::runtime::run_block;
use crate::runtime::scheduler::Scheduler;
use crate::runtime::BlockMessage;
use crate::runtime::FlowgraphMessage;
use crate::runtime::Topology;
#[derive(Clone, Debug)]
pub struct FlowScheduler {
inner: Arc<FlowSchedulerInner>,
}
struct FlowSchedulerInner {
executor: Arc<FlowExecutor>,
workers: Vec<(thread::JoinHandle<()>, oneshot::Sender<()>)>,
}
impl fmt::Debug for FlowSchedulerInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FlowSchedulerInner").finish()
}
}
impl Drop for FlowSchedulerInner {
fn drop(&mut self) {
for i in self.workers.drain(..) {
i.1.send(()).unwrap();
i.0.join().unwrap();
}
}
}
impl FlowScheduler {
pub fn new() -> FlowScheduler {
let executor = Arc::new(FlowExecutor::new());
let mut workers = Vec::new();
let core_ids = core_affinity::get_core_ids().unwrap();
debug!("flowsched: core ids {}", core_ids.len());
let barrier = Arc::new(Barrier::new(core_ids.len() + 1));
for id in core_ids {
let b = barrier.clone();
let e = executor.clone();
let (sender, receiver) = oneshot::channel::<()>();
let handle = thread::Builder::new()
.name(format!("flow-{}", id.id))
.spawn(move || {
debug!("starting executor thread on core id {}", id.id);
async_io::block_on(e.run(async {
b.wait().await;
receiver.await
}))
.unwrap();
})
.expect("cannot spawn executor thread");
workers.push((handle, sender));
}
async_io::block_on(barrier.wait());
FlowScheduler {
inner: Arc::new(FlowSchedulerInner { executor, workers }),
}
}
fn map_block(block: usize, n_blocks: usize, n_cores: usize) -> usize {
let n = n_blocks / n_cores;
let r = n_blocks % n_cores;
for x in 1..n_cores {
if block < ((x) * n) + cmp::min(x, r) {
return x - 1;
}
}
n_cores - 1
}
}
impl Scheduler for FlowScheduler {
fn run_topology(
&self,
topology: &mut Topology,
main_channel: &Sender<FlowgraphMessage>,
) -> Slab<Option<Sender<BlockMessage>>> {
let mut inboxes = Slab::new();
let max = topology.blocks.iter().map(|(i, _)| i).max().unwrap_or(0);
for _ in 0..=max {
inboxes.insert(None);
}
let queue_size = config::config().queue_size;
let n_blocks = topology.blocks.len();
let n_cores = self.inner.workers.len();
for (id, block_o) in topology.blocks.iter_mut() {
let block = block_o.take().unwrap();
let (sender, receiver) = channel::<BlockMessage>(queue_size);
inboxes[id] = Some(sender.clone());
if block.is_blocking() {
let main = main_channel.clone();
debug!("spawing block on executor");
self.inner
.executor
.spawn_executor(
blocking::unblock(move || block_on(run_block(block, id, main, receiver))),
FlowScheduler::map_block(id, n_blocks, n_cores),
)
.detach();
} else {
self.inner
.executor
.spawn_executor(
run_block(block, id, main_channel.clone(), receiver),
FlowScheduler::map_block(id, n_blocks, n_cores),
)
.detach();
}
}
inboxes
}
fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
self.inner.executor.spawn(future)
}
fn spawn_blocking<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
self.inner
.executor
.spawn(blocking::unblock(|| async_io::block_on(future)))
}
}
impl Default for FlowScheduler {
fn default() -> Self {
Self::new()
}
}
pub struct FlowExecutor {
state: once_cell::sync::OnceCell<Arc<State>>,
}
unsafe impl Send for FlowExecutor {}
unsafe impl Sync for FlowExecutor {}
impl UnwindSafe for FlowExecutor {}
impl RefUnwindSafe for FlowExecutor {}
impl FlowExecutor {
pub const fn new() -> FlowExecutor {
FlowExecutor {
state: once_cell::sync::OnceCell::new(),
}
}
pub fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
let mut active = self.state().active.lock().unwrap();
let entry = active.vacant_entry();
let key = entry.key();
let state = self.state().clone();
let future = async move {
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(key)));
future.await
};
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, self.schedule()) };
entry.insert(runnable.waker());
runnable.schedule();
task
}
pub fn spawn_executor<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
executor: usize,
) -> Task<T> {
let mut active = self.state().active.lock().unwrap();
let entry = active.vacant_entry();
let key = entry.key();
let state = self.state().clone();
let future = async move {
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(key)));
future.await
};
let queues = self.state().local_queues.write().unwrap();
let mut inner = queues[executor].lock();
let n = inner.1.len();
inner.1.push(None);
drop(inner);
drop(queues);
let (runnable, task) =
unsafe { async_task::spawn_unchecked(future, self.schedule_executor(executor, n)) };
entry.insert(runnable.waker());
runnable.schedule();
task
}
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
let runner = Runner::new(self.state());
let run_forever = async {
loop {
let runnable = runner.runnable().await;
debug!("running runnable {}", thread::current().name().unwrap());
runnable.run();
}
};
future.or(run_forever).await
}
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state = self.state().clone();
move |runnable| {
state.queue.push(runnable).unwrap();
state.notify();
}
}
fn schedule_executor(
&self,
executor: usize,
n_task: usize,
) -> impl Fn(Runnable) + Send + Sync + 'static {
let state = self.state().clone();
let local = state.local_queues.read().unwrap()[executor].clone();
move |runnable| {
{
local.lock().1[n_task] = Some(runnable);
}
state.notify_executor(executor);
}
}
fn state(&self) -> &Arc<State> {
self.state.get_or_init(|| Arc::new(State::new()))
}
}
impl Drop for FlowExecutor {
#[allow(clippy::significant_drop_in_scrutinee)]
fn drop(&mut self) {
debug!("dropping flow executor");
if let Some(state) = self.state.get() {
let active = state.active.lock().unwrap();
for (_, w) in active.iter() {
w.clone().wake();
}
drop(active);
while state.queue.pop().is_ok() {}
for q in state.local_queues.write().unwrap().iter() {
let runnables = &mut q.lock().1;
while runnables.pop().is_some() {}
}
}
}
}
struct State {
queue: ConcurrentQueue<Runnable>,
#[allow(clippy::type_complexity)]
local_queues: RwLock<Vec<Arc<spin::Mutex<(usize, Vec<Option<Runnable>>)>>>>,
notified: AtomicBool,
sleepers: spin::Mutex<Sleepers>,
active: Mutex<Slab<Waker>>,
}
impl State {
fn new() -> State {
State {
queue: ConcurrentQueue::unbounded(),
local_queues: RwLock::new(Vec::new()),
notified: AtomicBool::new(true),
sleepers: spin::Mutex::new(Sleepers {
count: 0,
wakers: Vec::new(),
free_ids: Vec::new(),
}),
active: Mutex::new(Slab::new()),
}
}
#[inline]
fn notify(&self) {
if self
.notified
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
let waker = self.sleepers.lock().notify();
if let Some(w) = waker {
w.wake();
}
}
}
#[inline]
fn notify_executor(&self, queue_index: usize) {
let waker = { self.sleepers.lock().notify_executor(queue_index) };
if let Some(w) = waker {
debug!(
"{} scheduled task on executor {} -- waker found",
thread::current().name().unwrap(),
queue_index
);
w.wake();
} else {
debug!(
"{} scheduled task on executor {} -- no waker found",
thread::current().name().unwrap(),
queue_index
);
}
}
}
#[derive(Debug)]
struct Sleepers {
count: usize,
wakers: Vec<(usize, Waker, usize)>,
free_ids: Vec<usize>,
}
impl Sleepers {
fn insert(&mut self, waker: &Waker, queue_index: usize) -> usize {
let id = match self.free_ids.pop() {
Some(id) => id,
None => self.count + 1,
};
self.count += 1;
self.wakers.push((id, waker.clone(), queue_index));
id
}
fn update(&mut self, id: usize, waker: &Waker, queue_index: usize) -> bool {
for item in &mut self.wakers {
if item.0 == id {
if !item.1.will_wake(waker) {
item.1 = waker.clone();
}
return false;
}
}
self.wakers.push((id, waker.clone(), queue_index));
true
}
fn remove(&mut self, id: usize) -> bool {
self.count -= 1;
self.free_ids.push(id);
for i in (0..self.wakers.len()).rev() {
if self.wakers[i].0 == id {
self.wakers.remove(i);
return false;
}
}
true
}
fn is_notified(&self) -> bool {
self.count == 0 || self.count > self.wakers.len()
}
fn notify(&mut self) -> Option<Waker> {
if self.wakers.len() == self.count {
debug!("sleeper notified");
self.wakers.pop().map(|item| item.1)
} else {
debug!("no sleeper notified");
None
}
}
fn notify_executor(&mut self, queue_index: usize) -> Option<Waker> {
if let Some((index, _)) = self
.wakers
.iter()
.enumerate()
.find(|item| item.1 .2 == queue_index)
{
return Some(self.wakers.remove(index).1);
}
None
}
}
struct Ticker<'a> {
state: &'a State,
queue_index: usize,
sleeping: AtomicUsize,
}
impl Ticker<'_> {
fn new(state: &State, queue_index: usize) -> Ticker<'_> {
debug!("ticker created {}", queue_index);
Ticker {
state,
queue_index,
sleeping: AtomicUsize::new(0),
}
}
fn sleep(&self, waker: &Waker) -> bool {
let mut sleepers = self.state.sleepers.lock();
match self.sleeping.load(Ordering::SeqCst) {
0 => self
.sleeping
.store(sleepers.insert(waker, self.queue_index), Ordering::SeqCst),
id => {
if !sleepers.update(id, waker, self.queue_index) {
debug!(
"{} putting ticker to sleep {} -- false",
thread::current().name().unwrap(),
self.queue_index
);
return false;
}
}
}
self.state
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
debug!(
"{} putting ticker to sleep {} -- true",
thread::current().name().unwrap(),
self.queue_index
);
true
}
fn wake(&self) {
debug!("ticker waking {}", self.queue_index);
let id = self.sleeping.swap(0, Ordering::SeqCst);
if id != 0 {
let mut sleepers = self.state.sleepers.lock();
sleepers.remove(id);
self.state
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
}
}
async fn runnable_with(&self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
future::poll_fn(|cx| {
loop {
match search() {
None => {
debug!(
"{} runnable_with {} -- None",
thread::current().name().unwrap(),
self.queue_index
);
if !self.sleep(cx.waker()) {
return Poll::Pending;
}
}
Some(r) => {
debug!(
"{} runnable_with {} -- Some",
thread::current().name().unwrap(),
self.queue_index
);
self.wake();
return Poll::Ready(r);
}
}
}
})
.await
}
}
impl Drop for Ticker<'_> {
fn drop(&mut self) {
let id = self.sleeping.swap(0, Ordering::SeqCst);
if id != 0 {
let mut sleepers = self.state.sleepers.lock();
let notified = sleepers.remove(id);
self.state
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
if notified {
drop(sleepers);
self.state.notify();
}
}
}
}
struct Runner<'a> {
state: &'a State,
ticker: Ticker<'a>,
local: Arc<spin::Mutex<(usize, Vec<Option<Runnable>>)>>,
}
impl Runner<'_> {
fn new(state: &State) -> Runner<'_> {
let local = Arc::new(spin::Mutex::new((0, Vec::new())));
let mut s = state.local_queues.write().unwrap();
let queue_index = s.len();
s.push(local.clone());
Runner {
state,
ticker: Ticker::new(state, queue_index),
local,
}
}
async fn runnable(&self) -> Runnable {
let runnable = self
.ticker
.runnable_with(|| {
let mut item = self.local.lock();
let mut offset = item.0;
let q = &mut item.1;
let l = q.len();
for (n, runnable) in q.iter().cycle().skip(offset).take(l).enumerate() {
if runnable.is_some() {
offset = (offset + n) % l;
let ret = q[offset].take();
item.0 = (offset + 1) % l;
return ret;
}
}
if let Ok(r) = self.state.queue.pop() {
return Some(r);
}
None
})
.await;
debug!("ticker found runnable {}", self.ticker.queue_index);
runnable
}
}
impl Drop for Runner<'_> {
fn drop(&mut self) {
}
}
struct CallOnDrop<F: Fn()>(F);
impl<F: Fn()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn map_blocks() {
let a: Vec<usize> = (0..3_usize)
.map(|b| FlowScheduler::map_block(b, 3, 3))
.collect();
assert_eq!(a, vec![0, 1, 2]);
let a: Vec<usize> = (0..6_usize)
.map(|b| FlowScheduler::map_block(b, 6, 3))
.collect();
assert_eq!(a, vec![0, 0, 1, 1, 2, 2]);
let a: Vec<usize> = (0..5_usize)
.map(|b| FlowScheduler::map_block(b, 5, 10))
.collect();
assert_eq!(a, vec![0, 1, 2, 3, 4]);
let a: Vec<usize> = (0..11_usize)
.map(|b| FlowScheduler::map_block(b, 11, 3))
.collect();
assert_eq!(a, vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2]);
}
}