bililive_core/stream/waker.rs
1//! Helper waker proxy type.
2
3use std::sync::Arc;
4use std::task::{Wake, Waker};
5
6use futures::task::AtomicWaker;
7
8/// When reading the stream, a `poll_ready` is executed to ensure that all pending write op including
9/// heartbeat is completed.
10/// Therefore, we need to wake the task on which stream is polled in `poll_ready` (and `poll_flush`).
11/// `WakerProxy` is a waker dispatcher. It will dispatch a wake op to both wakers (rx & tx), such that
12/// both stream task and sink task can be waken and no starvation will occur.
13#[derive(Debug, Default)]
14pub struct WakerProxy {
15 tx_waker: AtomicWaker,
16 rx_waker: AtomicWaker,
17}
18
19impl WakerProxy {
20 /// Register the read waker.
21 pub fn rx(&self, waker: &Waker) {
22 self.rx_waker.register(waker);
23 }
24 /// Register the write waker.
25 pub fn tx(&self, waker: &Waker) {
26 self.tx_waker.register(waker);
27 }
28}
29
30impl Wake for WakerProxy {
31 fn wake(self: Arc<Self>) {
32 self.rx_waker.wake();
33 self.tx_waker.wake();
34 }
35
36 fn wake_by_ref(self: &Arc<Self>) {
37 self.rx_waker.wake();
38 self.tx_waker.wake();
39 }
40}