datum-core 0.6.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
use super::*;

struct ThreadWaker {
    thread: thread::Thread,
}

impl Wake for ThreadWaker {
    fn wake(self: Arc<Self>) {
        self.thread.unpark();
    }

    fn wake_by_ref(self: &Arc<Self>) {
        self.thread.unpark();
    }
}

pub(super) struct BlockingPoller {
    thread_id: Option<thread::ThreadId>,
    waker: Option<Waker>,
}

impl BlockingPoller {
    pub(super) fn new() -> Self {
        Self {
            thread_id: None,
            waker: None,
        }
    }

    fn refresh_waker(&mut self) {
        let current = thread::current();
        let current_id = current.id();
        if self.thread_id == Some(current_id) {
            return;
        }
        self.thread_id = Some(current_id);
        self.waker = Some(Waker::from(Arc::new(ThreadWaker { thread: current })));
    }

    pub(super) fn poll_next<S>(&mut self, stream: &mut S) -> Option<S::Item>
    where
        S: Stream + Unpin,
    {
        self.refresh_waker();
        let waker = self.waker.as_ref().expect("blocking poller has waker");
        let mut cx = Context::from_waker(waker);
        let mut idle_spins = 0;
        loop {
            match Pin::new(&mut *stream).poll_next(&mut cx) {
                Poll::Ready(item) => {
                    return item;
                }
                Poll::Pending if idle_spins < STREAM_READY_SPINS => {
                    idle_spins += STREAM_SPIN_BACKOFF;
                    for _ in 0..STREAM_SPIN_BACKOFF {
                        std::hint::spin_loop();
                    }
                }
                Poll::Pending => {
                    idle_spins = 0;
                    thread::park_timeout(STREAM_MAX_PARK);
                }
            }
        }
    }
}