1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#![no_std]
extern crate alloc;
pub use executor_macros::*;
use lazy_static::*;
use {
    alloc::{boxed::Box, collections::vec_deque::VecDeque, sync::Arc},
    core::{
        future::Future,
        pin::Pin,
        task::{Context, Poll},
    },
    spin::Mutex,
    woke::{waker_ref, Woke},
};

/// Executor holds a list of tasks to be processed
pub struct Executor {
    tasks: VecDeque<Box<dyn Pendable + core::marker::Send + core::marker::Sync>>,
}

trait Pendable {
    fn is_pending(&self) -> bool;
}

impl Default for Executor {
    fn default() -> Self {
        Executor {
            tasks: VecDeque::new(),
        }
    }
}

/// Task is our unit of execution and holds a future are waiting on
struct Task<T> {
    pub future: Mutex<Pin<Box<dyn Future<Output = T> + Send + 'static>>>,
}

/// Implement what we would like to do when a task gets woken up
impl<T> Woke for Task<T> {
    fn wake_by_ref(_: &Arc<Self>) {
        // poll everything because future is done and may have created conditions for something to finish
        DEFAULT_EXECUTOR.lock().poll_tasks()
    }
}

impl<T> Pendable for Arc<Task<T>> {
    fn is_pending(&self) -> bool {
        let mut future = self.future.lock();
        // make a waker for our task
        let waker = waker_ref(&self);
        // poll our future and give it a waker
        let context = &mut Context::from_waker(&*waker);
        let check_pending = matches!(future.as_mut().poll(context), Poll::Pending);
        check_pending
    }
}

impl Executor {
    // Block on task
    fn block_on<T>(&mut self, future: Box<dyn Future<Output = T> + 'static + Send + Unpin>) -> T
    where
        T: Send + 'static,
    {
        let task = Arc::new(Task {
            future: Mutex::new(Box::pin(future)),
        });
        loop {
            let mut future = task.future.lock();
            // make a waker for our task
            let waker = waker_ref(&task);
            // poll our future and give it a waker
            let context = &mut Context::from_waker(&*waker);
            let result = future.as_mut().poll(context);
            if let Poll::Ready(val) = result {
                return val;
            }
        }
    }

    // Run async task but don't block
    pub fn run<T>(&mut self, future: Box<dyn Future<Output = T> + 'static + Send + Unpin>)
    where
        T: Send + 'static,
    {
        self.add_task(future);
        self.poll_tasks();
    }

    /// Add task for a future to the list of tasks
    fn add_task<T>(
        &mut self,
        future: Box<dyn Future<Output = T> + 'static + Send + Unpin>,
    ) -> Arc<Task<T>>
    where
        T: Send + 'static,
    {
        // store our task
        let task = Arc::new(Task {
            future: Mutex::new(Box::pin(future)),
        });
        self.tasks.push_back(Box::new(task.clone()));
        task
    }

    // Poll all tasks on global executor
    fn poll_tasks(&mut self) {
        let count = self.tasks.len();
        for _ in 0..count {
            let task = self.tasks.remove(0).unwrap();
            let mut is_pending = false;
            {
                if task.is_pending() {
                    is_pending = true;
                }
            }
            if is_pending {
                self.tasks.push_back(task);
            }
        }
    }
}

lazy_static! {
    static ref DEFAULT_EXECUTOR: Mutex<Box<Executor>> = {
        let m = Executor::default();
        Mutex::new(Box::new(m))
    };
}

/// Give future to global executor to be polled and executed.
pub fn block_on<T>(future: impl Future<Output = T> + 'static + Send) -> T
where
    T: Send + 'static,
{
    DEFAULT_EXECUTOR.lock().block_on(Box::new(Box::pin(future)))
}

pub fn run<T>(future: impl Future<Output = T> + 'static + Send)
where
    T: Send + 'static,
{
    DEFAULT_EXECUTOR.lock().run(Box::new(Box::pin(future)))
}