1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#![no_std]

use async_task::Runnable;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll, Waker};
use core::time::Duration;
use fixed_queue::spsc::{Receiver, Sender, Spsc};
use futures_core::stream::Stream;
use futures_intrusive::timer::{Clock, MockClock};
use futures_intrusive::timer::{GenericTimerService, Timer, TimerFuture};
use spin::{Lazy, Mutex};

const SPSC_LEN: usize = 30;
static SPSC: Spsc<Runnable, SPSC_LEN> = Spsc::new();
static SENDER: Lazy<Sender<'static, Runnable, SPSC_LEN>> =
    Lazy::new(|| SPSC.take_sender().unwrap());
static RECVER: Lazy<Receiver<'static, Runnable, SPSC_LEN>> =
    Lazy::new(|| SPSC.take_recver().unwrap());
static WAKER: Mutex<Option<Waker>> = Mutex::new(None);
static CLOCK: MockClock = MockClock::new();
static TIMER: Lazy<GenericTimerService<Mutex<()>>> = Lazy::new(|| GenericTimerService::new(&CLOCK));

fn schedule(runnable: Runnable) {
    SENDER.send(runnable).unwrap();
    if let Some(waker) = WAKER.lock().take() {
        waker.wake();
    }
}
pub fn spawn<F>(future: F)
where
    F: Future + Send + 'static,
    F::Output: Send + 'static,
{
    let (runnable, task) = async_task::spawn(future, schedule);
    runnable.schedule();
    task.detach();
}

pub fn tick(tick: u64) {
    CLOCK.set_time(CLOCK.now() + tick);
    TIMER.check_expirations();
}
pub fn now() -> u64 {
    CLOCK.now()
}
pub fn sleep(delay: Duration) -> TimerFuture<'static> {
    TIMER.delay(delay)
}

pub struct TaskStream {
    _inner: (),
}
impl TaskStream {
    pub fn stream() -> Self {
        TaskStream { _inner: () }
    }
    pub fn get_task(&self) -> Option<Runnable> {
        if let Ok(task) = RECVER.try_recv() {
            Some(task)
        } else {
            None
        }
    }
}
impl Stream for TaskStream {
    type Item = Runnable;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if let Ok(task) = RECVER.try_recv() {
            cx.waker().wake_by_ref();
            Poll::Ready(Some(task))
        } else {
            *WAKER.lock() = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}