use super::*;
struct ThreadWaker {
thread: thread::Thread,
}
impl Wake for ThreadWaker {
fn wake(self: Arc<Self>) {
self.thread.unpark();
}
fn wake_by_ref(self: &Arc<Self>) {
self.thread.unpark();
}
}
pub(super) struct BlockingPoller {
thread_id: Option<thread::ThreadId>,
waker: Option<Waker>,
}
impl BlockingPoller {
pub(super) fn new() -> Self {
Self {
thread_id: None,
waker: None,
}
}
fn refresh_waker(&mut self) {
let current = thread::current();
let current_id = current.id();
if self.thread_id == Some(current_id) {
return;
}
self.thread_id = Some(current_id);
self.waker = Some(Waker::from(Arc::new(ThreadWaker { thread: current })));
}
pub(super) fn poll_next<S>(&mut self, stream: &mut S) -> Option<S::Item>
where
S: Stream + Unpin,
{
self.refresh_waker();
let waker = self.waker.as_ref().expect("blocking poller has waker");
let mut cx = Context::from_waker(waker);
let mut idle_spins = 0;
loop {
match Pin::new(&mut *stream).poll_next(&mut cx) {
Poll::Ready(item) => {
return item;
}
Poll::Pending if idle_spins < STREAM_READY_SPINS => {
idle_spins += STREAM_SPIN_BACKOFF;
for _ in 0..STREAM_SPIN_BACKOFF {
std::hint::spin_loop();
}
}
Poll::Pending => {
idle_spins = 0;
thread::park_timeout(STREAM_MAX_PARK);
}
}
}
}
}