1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
use std::{future::Future, rc::Rc, task::Poll, time::Instant};
use futures::pin_mut;
pub mod oneshot;
pub mod timeout;
pub mod watch;
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
#[error("sender dropped")]
pub struct RecvError;
mod waker {
use crate::fiber;
use std::rc::Rc;
use std::task::RawWaker;
use std::task::RawWakerVTable;
use std::task::Waker;
#[derive(Default)]
pub struct FiberWaker {
cond: fiber::Cond,
}
impl FiberWaker {
pub fn cond(&self) -> &fiber::Cond {
&self.cond
}
pub fn wake(&self) {
self.cond.broadcast()
}
}
unsafe impl Send for FiberWaker {}
unsafe impl Sync for FiberWaker {}
pub fn with_rcw(rcw: Rc<FiberWaker>) -> Waker {
let raw_waker = raw_waker(rcw);
unsafe { Waker::from_raw(raw_waker) }
}
fn raw_waker(rcw: Rc<FiberWaker>) -> RawWaker {
const RC_WAKER_VT: RawWakerVTable = RawWakerVTable::new(
rc_waker_clone,
rc_waker_wake,
rc_waker_wake_by_ref,
rc_waker_drop,
);
let ptr: *const () = Rc::into_raw(rcw).cast();
RawWaker::new(ptr, &RC_WAKER_VT)
}
unsafe fn rc_waker_clone(data: *const ()) -> RawWaker {
let rcw: Rc<FiberWaker> = {
Rc::increment_strong_count(data);
Rc::from_raw(data.cast())
};
raw_waker(rcw)
}
unsafe fn rc_waker_wake(data: *const ()) {
let rcw: Rc<FiberWaker> = Rc::from_raw(data.cast());
rcw.wake();
drop(rcw);
}
unsafe fn rc_waker_wake_by_ref(data: *const ()) {
let rcw: Rc<FiberWaker> = Rc::from_raw(data.cast());
rcw.wake();
std::mem::forget(rcw);
}
unsafe fn rc_waker_drop(data: *const ()) {
let rcw: Rc<FiberWaker> = Rc::from_raw(data.cast());
drop(rcw)
}
}
mod context {
use std::task::Context;
use std::task::Waker;
use std::time::Instant;
#[repr(C)]
pub struct ContextExt<'a> {
cx: Context<'a>,
deadline: Option<Instant>,
}
impl<'a> ContextExt<'a> {
#[must_use]
pub fn from_waker(waker: &'a Waker) -> Self {
Self {
cx: Context::from_waker(waker),
deadline: None,
}
}
pub fn cx(&mut self) -> &mut Context<'a> {
&mut self.cx
}
pub fn deadline(&self) -> Option<Instant> {
self.deadline
}
pub unsafe fn set_deadline(cx: &mut Context<'_>, new: Instant) {
let cx: &mut ContextExt = &mut *(cx as *mut Context).cast();
if matches!(cx.deadline, Some(old) if new > old) {
return;
}
cx.deadline = Some(new);
}
}
}
pub fn block_on<F: Future>(f: F) -> F::Output {
let rcw: Rc<waker::FiberWaker> = Default::default();
let waker = waker::with_rcw(rcw.clone());
pin_mut!(f);
loop {
let mut cx = context::ContextExt::from_waker(&waker);
if let Poll::Ready(t) = f.as_mut().poll(cx.cx()) {
return t;
}
match cx.deadline() {
Some(deadline) => {
let timeout = deadline.saturating_duration_since(Instant::now());
rcw.cond().wait_timeout(timeout)
}
None => rcw.cond().wait(),
};
}
}