futuresdr 0.0.27

An Experimental Async SDR Runtime for Heterogeneous Architectures.
use async_io::block_on;
use async_lock::Barrier;
use async_task::Runnable;
use async_task::Task;
use concurrent_queue::ConcurrentQueue;
use core_affinity;
use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures_lite::future::{self, Future, FutureExt};
use slab::Slab;
use std::cmp;
use std::fmt;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::task::{Poll, Waker};
use std::thread;

use crate::runtime::config;
use crate::runtime::run_block;
use crate::runtime::scheduler::Scheduler;
use crate::runtime::BlockMessage;
use crate::runtime::FlowgraphMessage;
use crate::runtime::Topology;

#[derive(Clone, Debug)]
pub struct FlowScheduler {
    inner: Arc<FlowSchedulerInner>,

struct FlowSchedulerInner {
    executor: Arc<FlowExecutor>,
    workers: Vec<(thread::JoinHandle<()>, oneshot::Sender<()>)>,

impl fmt::Debug for FlowSchedulerInner {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {

impl Drop for FlowSchedulerInner {
    fn drop(&mut self) {
        for i in self.workers.drain(..) {

impl FlowScheduler {
    pub fn new() -> FlowScheduler {
        let executor = Arc::new(FlowExecutor::new());
        let mut workers = Vec::new();

        let core_ids = core_affinity::get_core_ids().unwrap();
        debug!("flowsched: core ids {}", core_ids.len());

        let barrier = Arc::new(Barrier::new(core_ids.len() + 1));

        for id in core_ids {
            let b = barrier.clone();
            let e = executor.clone();
            let (sender, receiver) = oneshot::channel::<()>();

            let handle = thread::Builder::new()
                .name(format!("flow-{}", id.id))
                .spawn(move || {
                    debug!("starting executor thread on core id {}", id.id);
                    // core_affinity::set_for_current(id);
                    async_io::block_on(e.run(async {
                .expect("cannot spawn executor thread");

            workers.push((handle, sender));


        FlowScheduler {
            inner: Arc::new(FlowSchedulerInner { executor, workers }),

    fn map_block(block: usize, n_blocks: usize, n_cores: usize) -> usize {
        let n = n_blocks / n_cores;
        let r = n_blocks % n_cores;

        for x in 1..n_cores {
            if block < ((x) * n) + cmp::min(x, r) {
                return x - 1;

        n_cores - 1

impl Scheduler for FlowScheduler {
    fn run_topology(
        topology: &mut Topology,
        main_channel: &Sender<FlowgraphMessage>,
    ) -> Slab<Option<Sender<BlockMessage>>> {
        let mut inboxes = Slab::new();
        let max = topology.blocks.iter().map(|(i, _)| i).max().unwrap_or(0);
        for _ in 0..=max {
        let queue_size = config::config().queue_size;

        let n_blocks = topology.blocks.len();
        let n_cores = self.inner.workers.len();

        // spawn block executors
        for (id, block_o) in topology.blocks.iter_mut() {
            let block = block_o.take().unwrap();

            let (sender, receiver) = channel::<BlockMessage>(queue_size);
            inboxes[id] = Some(sender.clone());

            if block.is_blocking() {
                let main = main_channel.clone();
                debug!("spawing block on executor");
                        blocking::unblock(move || block_on(run_block(block, id, main, receiver))),
                        FlowScheduler::map_block(id, n_blocks, n_cores),
            } else {
                        run_block(block, id, main_channel.clone(), receiver),
                        FlowScheduler::map_block(id, n_blocks, n_cores),


    fn spawn<T: Send + 'static>(
        future: impl Future<Output = T> + Send + 'static,
    ) -> Task<T> {

    fn spawn_blocking<T: Send + 'static>(
        future: impl Future<Output = T> + Send + 'static,
    ) -> Task<T> {
            .spawn(blocking::unblock(|| async_io::block_on(future)))

impl Default for FlowScheduler {
    fn default() -> Self {

/// An async executor.
/// # Examples
/// A multi-threaded executor:
/// ```
/// use async_channel::unbounded;
/// use async_executor::Executor;
/// use easy_parallel::Parallel;
/// use futures_lite::future;
/// let ex = Executor::new();
/// let (signal, shutdown) = unbounded::<()>();
/// Parallel::new()
///     // Run four executor threads.
///     .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
///     // Run the main future on the current thread.
///     .finish(|| future::block_on(async {
///         println!("Hello world!");
///         drop(signal);
///     }));
/// ```
pub struct FlowExecutor {
    /// The executor state.
    state: once_cell::sync::OnceCell<Arc<State>>,

unsafe impl Send for FlowExecutor {}
unsafe impl Sync for FlowExecutor {}

impl UnwindSafe for FlowExecutor {}
impl RefUnwindSafe for FlowExecutor {}

impl FlowExecutor {
    /// Creates a new executor.
    /// # Examples
    /// ```
    /// use async_executor::Executor;
    /// let ex = Executor::new();
    /// ```
    pub const fn new() -> FlowExecutor {
        FlowExecutor {
            state: once_cell::sync::OnceCell::new(),

    /// Spawns a task onto the executor.
    /// # Examples
    /// ```
    /// use async_executor::Executor;
    /// let ex = Executor::new();
    /// let task = ex.spawn(async {
    ///     println!("Hello world");
    /// });
    /// ```
    pub fn spawn<T: Send + 'static>(
        future: impl Future<Output = T> + Send + 'static,
    ) -> Task<T> {
        let mut active = self.state().active.lock().unwrap();

        // Remove the task from the set of active tasks when the future finishes.
        let entry = active.vacant_entry();
        let key = entry.key();
        let state = self.state().clone();
        let future = async move {
            let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(key)));

        // Create the task and register it in the set of active tasks.
        let (runnable, task) = unsafe { async_task::spawn_unchecked(future, self.schedule()) };


    pub fn spawn_executor<T: Send + 'static>(
        future: impl Future<Output = T> + Send + 'static,
        executor: usize,
    ) -> Task<T> {
        let mut active = self.state().active.lock().unwrap();

        // Remove the task from the set of active tasks when the future finishes.
        let entry = active.vacant_entry();
        let key = entry.key();
        let state = self.state().clone();
        let future = async move {
            let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(key)));

        // Creates a slot for the task in the local queue of the executor
        let queues = self.state().local_queues.write().unwrap();
        let mut inner = queues[executor].lock();
        let n = inner.1.len();

        // Create the task and register it in the set of active tasks.
        let (runnable, task) =
            unsafe { async_task::spawn_unchecked(future, self.schedule_executor(executor, n)) };


    /// Runs the executor until the given future completes.
    /// # Examples
    /// ```
    /// use async_executor::Executor;
    /// use futures_lite::future;
    /// let ex = Executor::new();
    /// let task = ex.spawn(async { 1 + 2 });
    /// let res = future::block_on(ex.run(async { task.await * 2 }));
    /// assert_eq!(res, 6);
    /// ```
    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
        let runner = Runner::new(self.state());

        // A future that runs tasks forever.
        let run_forever = async {
            loop {
                let runnable = runner.runnable().await;
                debug!("running runnable {}", thread::current().name().unwrap());

        // Run `future` and `run_forever` concurrently until `future` completes.

    /// Returns a function that schedules a runnable task when it gets woken up.
    fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
        let state = self.state().clone();

        // TODO(stjepang): If possible, push into the current local queue and notify the ticker.
        move |runnable| {

    /// Returns a function that schedules a runnable task when it gets woken up.
    fn schedule_executor(
        executor: usize,
        n_task: usize,
    ) -> impl Fn(Runnable) + Send + Sync + 'static {
        let state = self.state().clone();
        let local = state.local_queues.read().unwrap()[executor].clone();

        move |runnable| {
                local.lock().1[n_task] = Some(runnable);

    /// Returns a reference to the inner state.
    fn state(&self) -> &Arc<State> {
        self.state.get_or_init(|| Arc::new(State::new()))

impl Drop for FlowExecutor {
    fn drop(&mut self) {
        debug!("dropping flow executor");
        if let Some(state) = self.state.get() {
            let active = state.active.lock().unwrap();

            for (_, w) in active.iter() {


            while state.queue.pop().is_ok() {}

            for q in state.local_queues.write().unwrap().iter() {
                let runnables = &mut q.lock().1;
                while runnables.pop().is_some() {}

/// The state of a executor.
struct State {
    /// The global queue.
    queue: ConcurrentQueue<Runnable>,

    /// Local queues created by runners.
    local_queues: RwLock<Vec<Arc<spin::Mutex<(usize, Vec<Option<Runnable>>)>>>>,

    /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
    notified: AtomicBool,

    /// A list of sleeping tickers.
    sleepers: spin::Mutex<Sleepers>,

    /// Currently active tasks.
    active: Mutex<Slab<Waker>>,

impl State {
    /// Creates state for a new executor.
    fn new() -> State {
        State {
            queue: ConcurrentQueue::unbounded(),
            local_queues: RwLock::new(Vec::new()),
            notified: AtomicBool::new(true),
            sleepers: spin::Mutex::new(Sleepers {
                count: 0,
                wakers: Vec::new(),
                free_ids: Vec::new(),
            active: Mutex::new(Slab::new()),

    /// Notifies a sleeping ticker.
    fn notify(&self) {
        if self
            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
            let waker = self.sleepers.lock().notify();
            if let Some(w) = waker {

    fn notify_executor(&self, queue_index: usize) {
        let waker = { self.sleepers.lock().notify_executor(queue_index) };
        if let Some(w) = waker {
                "{} scheduled task on executor {} -- waker found",
        } else {
                "{} scheduled task on executor {} -- no waker found",

/// A list of sleeping tickers.
struct Sleepers {
    /// Number of sleeping tickers (both notified and unnotified).
    count: usize,

    /// IDs and wakers of sleeping unnotified tickers.
    /// A sleeping ticker is notified when its waker is missing from this list.
    wakers: Vec<(usize, Waker, usize)>,

    /// Reclaimed IDs.
    free_ids: Vec<usize>,

impl Sleepers {
    /// Inserts a new sleeping ticker.
    fn insert(&mut self, waker: &Waker, queue_index: usize) -> usize {
        let id = match self.free_ids.pop() {
            Some(id) => id,
            None => self.count + 1,
        self.count += 1;
        self.wakers.push((id, waker.clone(), queue_index));

    /// Re-inserts a sleeping ticker's waker if it was notified.
    /// Returns `true` if the ticker was notified.
    fn update(&mut self, id: usize, waker: &Waker, queue_index: usize) -> bool {
        for item in &mut self.wakers {
            if item.0 == id {
                if !item.1.will_wake(waker) {
                    item.1 = waker.clone();
                return false;

        self.wakers.push((id, waker.clone(), queue_index));

    /// Removes a previously inserted sleeping ticker.
    /// Returns `true` if the ticker was notified.
    fn remove(&mut self, id: usize) -> bool {
        self.count -= 1;

        for i in (0..self.wakers.len()).rev() {
            if self.wakers[i].0 == id {
                return false;

    /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
    fn is_notified(&self) -> bool {
        self.count == 0 || self.count > self.wakers.len()

    /// Returns notification waker for a sleeping ticker.
    /// If a ticker was notified already or there are no tickers, `None` will be returned.
    fn notify(&mut self) -> Option<Waker> {
        if self.wakers.len() == self.count {
            debug!("sleeper notified");
            self.wakers.pop().map(|item| item.1)
        } else {
            debug!("no sleeper notified");

    fn notify_executor(&mut self, queue_index: usize) -> Option<Waker> {
        if let Some((index, _)) = self
            .find(|item| item.1 .2 == queue_index)
            return Some(self.wakers.remove(index).1);

/// Runs task one by one.
struct Ticker<'a> {
    /// The executor state.
    state: &'a State,

    queue_index: usize,

    /// Set to a non-zero sleeper ID when in sleeping state.
    /// States a ticker can be in:
    /// 1) Woken.
    /// 2a) Sleeping and unnotified.
    /// 2b) Sleeping and notified.
    sleeping: AtomicUsize,

impl Ticker<'_> {
    /// Creates a ticker.
    fn new(state: &State, queue_index: usize) -> Ticker<'_> {
        debug!("ticker created {}", queue_index);
        Ticker {
            sleeping: AtomicUsize::new(0),

    /// Moves the ticker into sleeping and unnotified state.
    /// Returns `false` if the ticker was already sleeping and unnotified.
    fn sleep(&self, waker: &Waker) -> bool {
        let mut sleepers = self.state.sleepers.lock();

        match self.sleeping.load(Ordering::SeqCst) {
            // Move to sleeping state.
            0 => self
                .store(sleepers.insert(waker, self.queue_index), Ordering::SeqCst),

            // Already sleeping, check if notified.
            id => {
                if !sleepers.update(id, waker, self.queue_index) {
                        "{} putting ticker to sleep {} -- false",
                    return false;

            .swap(sleepers.is_notified(), Ordering::SeqCst);

            "{} putting ticker to sleep {} -- true",

    /// Moves the ticker into woken state.
    fn wake(&self) {
        debug!("ticker waking {}", self.queue_index);
        let id = self.sleeping.swap(0, Ordering::SeqCst);
        if id != 0 {
            let mut sleepers = self.state.sleepers.lock();

                .swap(sleepers.is_notified(), Ordering::SeqCst);

    /// Waits for the next runnable task to run, given a function that searches for a task.
    async fn runnable_with(&self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
        future::poll_fn(|cx| {
            loop {
                match search() {
                    None => {
                            "{} runnable_with {} -- None",
                        // Move to sleeping and unnotified state.
                        if !self.sleep(cx.waker()) {
                            // If already sleeping and unnotified, return.
                            return Poll::Pending;
                    Some(r) => {
                            "{} runnable_with {} -- Some",
                        // Wake up.

                        // Notify another ticker now to pick up where this ticker left off, just in
                        // case running the task takes a long time.
                        // self.state.notify_executor(self.queue_index);

                        return Poll::Ready(r);

impl Drop for Ticker<'_> {
    fn drop(&mut self) {
        // If this ticker is in sleeping state, it must be removed from the sleepers list.
        let id = self.sleeping.swap(0, Ordering::SeqCst);
        if id != 0 {
            let mut sleepers = self.state.sleepers.lock();
            let notified = sleepers.remove(id);

                .swap(sleepers.is_notified(), Ordering::SeqCst);

            // If this ticker was notified, then notify another ticker.
            if notified {

/// A worker in a work-stealing executor.
/// This is just a ticker that also has an associated local queue for improved cache locality.
struct Runner<'a> {
    /// The executor state.
    state: &'a State,
    /// Inner ticker.
    ticker: Ticker<'a>,
    /// The local queue.
    local: Arc<spin::Mutex<(usize, Vec<Option<Runnable>>)>>,

impl Runner<'_> {
    /// Creates a runner and registers it in the executor state.
    fn new(state: &State) -> Runner<'_> {
        let local = Arc::new(spin::Mutex::new((0, Vec::new())));

        let mut s = state.local_queues.write().unwrap();

        let queue_index = s.len();

        Runner {
            ticker: Ticker::new(state, queue_index),

    /// Waits for the next runnable task to run.
    async fn runnable(&self) -> Runnable {
        let runnable = self
            .runnable_with(|| {
                // Try the local queue.
                let mut item = self.local.lock();
                let mut offset = item.0;
                let q = &mut item.1;
                let l = q.len();
                for (n, runnable) in q.iter().cycle().skip(offset).take(l).enumerate() {
                    if runnable.is_some() {
                        offset = (offset + n) % l;
                        let ret = q[offset].take();
                        item.0 = (offset + 1) % l;
                        return ret;

                // Try stealing one task from global queue
                if let Ok(r) = self.state.queue.pop() {
                    return Some(r);


        debug!("ticker found runnable {}", self.ticker.queue_index);


impl Drop for Runner<'_> {
    fn drop(&mut self) {
        // Remove the local queue.
        // self.state
        //     .local_queues
        //     .write()
        //     .unwrap()
        //     .retain(|local| !Arc::ptr_eq(local, &self.local));

        // // Re-schedule remaining tasks in the local queue.
        // while let Some(i) = self.local.lock().1.pop() {
        //     if let Some(r) = i {
        //         r.schedule();
        //     }
        // }

/// Runs a closure when dropped.
struct CallOnDrop<F: Fn()>(F);

impl<F: Fn()> Drop for CallOnDrop<F> {
    fn drop(&mut self) {

mod tests {
    use super::*;
    fn map_blocks() {
        let a: Vec<usize> = (0..3_usize)
            .map(|b| FlowScheduler::map_block(b, 3, 3))
        assert_eq!(a, vec![0, 1, 2]);

        let a: Vec<usize> = (0..6_usize)
            .map(|b| FlowScheduler::map_block(b, 6, 3))
        assert_eq!(a, vec![0, 0, 1, 1, 2, 2]);

        let a: Vec<usize> = (0..5_usize)
            .map(|b| FlowScheduler::map_block(b, 5, 10))
        assert_eq!(a, vec![0, 1, 2, 3, 4]);

        let a: Vec<usize> = (0..11_usize)
            .map(|b| FlowScheduler::map_block(b, 11, 3))
        assert_eq!(a, vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2]);