essential_node_types/
block_notify.rs

1use tokio::sync::watch::{channel, error::RecvError, Receiver, Sender};
2
3/// Wrapper around `watch::Sender` to notify of new blocks.
4///
5/// This is used by `essential-builder` to notify `essential-relayer`
6/// and by `essential-relayer` to notify [`validation`] stream.
7#[derive(Clone, Default)]
8pub struct BlockTx(Sender<()>);
9
10/// Wrapper around `watch::Receiver` to listen to new blocks.
11///
12/// This is used by [`db::subscribe_blocks`] stream.
13#[derive(Clone)]
14pub struct BlockRx(Receiver<()>);
15
16impl BlockTx {
17    /// Create a new [`BlockTx`] to notify listeners of new blocks.
18    pub fn new() -> Self {
19        let (block_tx, _block_rx) = channel(());
20        Self(block_tx)
21    }
22
23    /// Notify listeners that a new block has been received.
24    ///
25    /// Note this is best effort and will still send even if there are currently no listeners.
26    pub fn notify(&self) {
27        let _ = self.0.send(());
28    }
29
30    /// Create a new [`BlockRx`] to listen for new blocks.
31    pub fn new_listener(&self) -> BlockRx {
32        BlockRx(self.0.subscribe())
33    }
34
35    /// Get the number of receivers listening for new blocks.
36    pub fn receiver_count(&self) -> usize {
37        self.0.receiver_count()
38    }
39}
40
41impl BlockRx {
42    /// Waits for a change notification.
43    pub async fn changed(&mut self) -> Result<(), RecvError> {
44        self.0.changed().await
45    }
46}