use std::{ops::ControlFlow, sync::Arc};
use tokio::sync::{mpsc, watch};
use super::messages::{BlockMessage, ConfirmTransactionMessage, SendTransactionMessage};
pub struct Channels {
pub blockdata_tx: watch::Sender<BlockMessage>,
pub blockdata_rx: watch::Receiver<BlockMessage>,
pub transaction_confirmer_tx: mpsc::UnboundedSender<ConfirmTransactionMessage>,
pub transaction_confirmer_rx: mpsc::UnboundedReceiver<ConfirmTransactionMessage>,
pub transaction_sender_tx: Arc<mpsc::UnboundedSender<SendTransactionMessage>>,
pub transaction_sender_rx: mpsc::UnboundedReceiver<SendTransactionMessage>,
}
impl Channels {
pub fn new() -> Self {
let (blockdata_tx, mut blockdata_rx) = watch::channel(BlockMessage::default());
blockdata_rx.mark_unchanged();
let (transaction_confirmer_tx, transaction_confirmer_rx) = mpsc::unbounded_channel();
let (transaction_sender_tx, transaction_sender_rx) = mpsc::unbounded_channel();
let transaction_sender_tx = Arc::new(transaction_sender_tx);
Self {
blockdata_tx,
blockdata_rx,
transaction_confirmer_tx,
transaction_confirmer_rx,
transaction_sender_tx,
transaction_sender_rx,
}
}
}
pub fn upgrade_and_send<T>(
sender: &mpsc::WeakUnboundedSender<T>,
messages: impl IntoIterator<Item = T>,
) -> ControlFlow<()> {
let Some(sender) = sender.upgrade() else {
return ControlFlow::Break(());
};
for message in messages {
let res = sender.send(message);
if res.is_err() {
return ControlFlow::Break(());
}
}
ControlFlow::Continue(())
}