asim/
lib.rs

1use std::cell::RefCell;
2use std::future::Future;
3use std::pin::Pin;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::task::Context;
7
8use parking_lot::Mutex;
9
10use futures::task::{waker_ref, ArcWake};
11
12pub mod sync;
13
14pub mod network;
15
16mod timer;
17pub use timer::Timer;
18
19pub mod time;
20
21type TaskQueue = Vec<Rc<Task>>;
22
23type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
24
25struct Task {
26    future: Mutex<Option<BoxFuture<'static, ()>>>,
27    ready_tasks: Rc<RefCell<TaskQueue>>,
28}
29
30struct RcWrapper(Rc<Task>);
31unsafe impl Send for RcWrapper {}
32unsafe impl Sync for RcWrapper {}
33
34impl ArcWake for RcWrapper {
35    fn wake_by_ref(self_ptr: &Arc<Self>) {
36        let inner = &self_ptr.0;
37        inner.ready_tasks.borrow_mut().push(inner.clone());
38    }
39}
40
41// An event queue servers as an executor for the async tasks simulating the timed events
42pub struct TaskRunner {
43    ready_tasks: Rc<RefCell<TaskQueue>>,
44}
45
46impl Default for TaskRunner {
47    fn default() -> Self {
48        let ready_tasks = Default::default();
49        Self { ready_tasks }
50    }
51}
52
53impl TaskRunner {
54    /// Run all ready tasks
55    /// Will return true if any task ran
56    pub fn execute_tasks(&self) -> bool {
57        let mut ready_tasks = {
58            let mut tasks = self.ready_tasks.borrow_mut();
59            std::mem::take(&mut *tasks)
60        };
61
62        if ready_tasks.is_empty() {
63            return false;
64        } else {
65            log::trace!("Found {} tasks that are ready", ready_tasks.len());
66        }
67
68        for task in ready_tasks.drain(..) {
69            let mut fut_lock = task.future.lock();
70
71            if let Some(mut future) = fut_lock.take() {
72                let wrapper = Arc::new(RcWrapper(task.clone()));
73                let waker = waker_ref(&wrapper);
74                let context = &mut Context::from_waker(&waker);
75
76                if future.as_mut().poll(context).is_pending() {
77                    *fut_lock = Some(future);
78                }
79            }
80        }
81
82        true
83    }
84
85    pub fn spawn(&self, future: impl Future<Output = ()> + 'static) {
86        let future = Box::pin(future);
87        let task = Rc::new(Task {
88            future: Mutex::new(Some(future)),
89            ready_tasks: self.ready_tasks.clone(),
90        });
91
92        self.ready_tasks.borrow_mut().push(task);
93    }
94
95    /// Drops all queued events
96    pub fn stop(&self) {
97        self.ready_tasks.borrow_mut().clear();
98    }
99}