use tokio::sync::oneshot::{self, error::TryRecvError, Receiver, Sender};
use tracing::error;
pub fn new() -> (ProducerChannel, ConsumerChannel) {
let (tx_init, rx_init) = oneshot::channel::<()>();
let (tx_close, rx_close) = oneshot::channel::<()>();
(
ProducerChannel::new(tx_init, rx_close),
ConsumerChannel::new(tx_close, rx_init),
)
}
#[derive(Debug)]
pub struct ProducerChannel {
sender: Option<Sender<()>>,
receiver: Receiver<()>,
}
impl ProducerChannel {
fn new(sender: Sender<()>, receiver: Receiver<()>) -> Self {
Self {
sender: Some(sender),
receiver,
}
}
pub fn send_init(&mut self) {
if let Some(tx) = self.sender.take() {
if let Err(err) = tx.send(()) {
error!(
"Unexpected error during sending initialized event: {:?}",
err
);
}
}
}
pub fn should_close(&mut self) -> bool {
!matches!(self.receiver.try_recv(), Err(TryRecvError::Empty))
}
}
#[derive(Debug)]
pub struct ConsumerChannel {
sender: Option<Sender<()>>,
receiver: Receiver<()>,
}
impl ConsumerChannel {
fn new(sender: Sender<()>, receiver: Receiver<()>) -> Self {
Self {
sender: Some(sender),
receiver,
}
}
pub fn close(&mut self, f: impl FnOnce()) {
if let Some(tx) = self.sender.take() {
let _ = tx.send(()).map_err(|_| f());
}
}
pub fn initialized(&mut self) -> bool {
matches!(self.receiver.try_recv(), Err(TryRecvError::Closed))
}
}