dynamo_subscriber/stream/
channel.rs1use tokio::sync::oneshot::{self, error::TryRecvError, Receiver, Sender};
2use tracing::error;
3
4pub fn new() -> (ProducerChannel, ConsumerChannel) {
6 let (tx_init, rx_init) = oneshot::channel::<()>();
7 let (tx_close, rx_close) = oneshot::channel::<()>();
8 (
9 ProducerChannel::new(tx_init, rx_close),
10 ConsumerChannel::new(tx_close, rx_init),
11 )
12}
13
14#[derive(Debug)]
21pub struct ProducerChannel {
22 sender: Option<Sender<()>>,
24
25 receiver: Receiver<()>,
27}
28
29impl ProducerChannel {
30 fn new(sender: Sender<()>, receiver: Receiver<()>) -> Self {
31 Self {
32 sender: Some(sender),
33 receiver,
34 }
35 }
36
37 pub fn send_init(&mut self) {
39 if let Some(tx) = self.sender.take() {
40 if let Err(err) = tx.send(()) {
41 error!(
42 "Unexpected error during sending initialized event: {:?}",
43 err
44 );
45 }
46 }
47 }
48
49 pub fn should_close(&mut self) -> bool {
51 !matches!(self.receiver.try_recv(), Err(TryRecvError::Empty))
52 }
53}
54
55#[derive(Debug)]
62pub struct ConsumerChannel {
63 sender: Option<Sender<()>>,
65
66 receiver: Receiver<()>,
68}
69
70impl ConsumerChannel {
71 fn new(sender: Sender<()>, receiver: Receiver<()>) -> Self {
72 Self {
73 sender: Some(sender),
74 receiver,
75 }
76 }
77
78 pub fn close(&mut self, f: impl FnOnce()) {
81 if let Some(tx) = self.sender.take() {
82 let _ = tx.send(()).map_err(|_| f());
83 }
84 }
85
86 pub fn initialized(&mut self) -> bool {
88 matches!(self.receiver.try_recv(), Err(TryRecvError::Closed))
89 }
90}