bililive_core/stream/
waker.rs

1//! Helper waker proxy type.
2
3use std::sync::Arc;
4use std::task::{Wake, Waker};
5
6use futures::task::AtomicWaker;
7
8/// When reading the stream, a `poll_ready` is executed to ensure that all pending write op including
9/// heartbeat is completed.
10/// Therefore, we need to wake the task on which stream is polled in `poll_ready` (and `poll_flush`).
11/// `WakerProxy` is a waker dispatcher. It will dispatch a wake op to both wakers (rx & tx), such that
12/// both stream task and sink task can be waken and no starvation will occur.
13#[derive(Debug, Default)]
14pub struct WakerProxy {
15    tx_waker: AtomicWaker,
16    rx_waker: AtomicWaker,
17}
18
19impl WakerProxy {
20    /// Register the read waker.
21    pub fn rx(&self, waker: &Waker) {
22        self.rx_waker.register(waker);
23    }
24    /// Register the write waker.
25    pub fn tx(&self, waker: &Waker) {
26        self.tx_waker.register(waker);
27    }
28}
29
30impl Wake for WakerProxy {
31    fn wake(self: Arc<Self>) {
32        self.rx_waker.wake();
33        self.tx_waker.wake();
34    }
35
36    fn wake_by_ref(self: &Arc<Self>) {
37        self.rx_waker.wake();
38        self.tx_waker.wake();
39    }
40}