Skip to main content

dynamo_subscriber/stream/
channel.rs

1use tokio::sync::oneshot::{self, error::TryRecvError, Receiver, Sender};
2use tracing::error;
3
4/// Create a pair of channel.
5pub fn new() -> (ProducerChannel, ConsumerChannel) {
6    let (tx_init, rx_init) = oneshot::channel::<()>();
7    let (tx_close, rx_close) = oneshot::channel::<()>();
8    (
9        ProducerChannel::new(tx_init, rx_close),
10        ConsumerChannel::new(tx_close, rx_init),
11    )
12}
13
14/// Communication channel half in the stream.
15///
16/// Using this channel, the stream does the following.
17///
18/// - Send `Initialized` event to the channel half.
19/// - Receive `Stop polling` event from the channel half.
20#[derive(Debug)]
21pub struct ProducerChannel {
22    /// Sender half to send `Initialized` event.
23    sender: Option<Sender<()>>,
24
25    /// Receiver half to receive `Close` event.
26    receiver: Receiver<()>,
27}
28
29impl ProducerChannel {
30    fn new(sender: Sender<()>, receiver: Receiver<()>) -> Self {
31        Self {
32            sender: Some(sender),
33            receiver,
34        }
35    }
36
37    /// Send `Initialized` event to the channel half.
38    pub fn send_init(&mut self) {
39        if let Some(tx) = self.sender.take() {
40            if let Err(err) = tx.send(()) {
41                error!(
42                    "Unexpected error during sending initialized event: {:?}",
43                    err
44                );
45            }
46        }
47    }
48
49    /// Return true if the `Stop polling` event is received.
50    pub fn should_close(&mut self) -> bool {
51        !matches!(self.receiver.try_recv(), Err(TryRecvError::Empty))
52    }
53}
54
55/// Communication channel half to the stream.
56///
57/// Using this channel, you can do the following.
58///
59/// - Confirm that the stream is ready to polling or not.
60/// - Send `Stop polling` event to the stream.
61#[derive(Debug)]
62pub struct ConsumerChannel {
63    /// Sender half to send `Close` event.
64    sender: Option<Sender<()>>,
65
66    /// Receiver half to receive `Initialized` event.
67    receiver: Receiver<()>,
68}
69
70impl ConsumerChannel {
71    fn new(sender: Sender<()>, receiver: Receiver<()>) -> Self {
72        Self {
73            sender: Some(sender),
74            receiver,
75        }
76    }
77
78    /// Send `Stop polling` event to the stream. The passed closure is executed only when
79    /// sending event fails.
80    pub fn close(&mut self, f: impl FnOnce()) {
81        if let Some(tx) = self.sender.take() {
82            let _ = tx.send(()).map_err(|_| f());
83        }
84    }
85
86    /// Return true if the stream is ready to polling.
87    pub fn initialized(&mut self) -> bool {
88        matches!(self.receiver.try_recv(), Err(TryRecvError::Closed))
89    }
90}