essential_node_types/block_notify.rs
1use tokio::sync::watch::{channel, error::RecvError, Receiver, Sender};
2
3/// Wrapper around `watch::Sender` to notify of new blocks.
4///
5/// This is used by `essential-builder` to notify `essential-relayer`
6/// and by `essential-relayer` to notify [`validation`] stream.
7#[derive(Clone, Default)]
8pub struct BlockTx(Sender<()>);
9
10/// Wrapper around `watch::Receiver` to listen to new blocks.
11///
12/// This is used by [`db::subscribe_blocks`] stream.
13#[derive(Clone)]
14pub struct BlockRx(Receiver<()>);
15
16impl BlockTx {
17 /// Create a new [`BlockTx`] to notify listeners of new blocks.
18 pub fn new() -> Self {
19 let (block_tx, _block_rx) = channel(());
20 Self(block_tx)
21 }
22
23 /// Notify listeners that a new block has been received.
24 ///
25 /// Note this is best effort and will still send even if there are currently no listeners.
26 pub fn notify(&self) {
27 let _ = self.0.send(());
28 }
29
30 /// Create a new [`BlockRx`] to listen for new blocks.
31 pub fn new_listener(&self) -> BlockRx {
32 BlockRx(self.0.subscribe())
33 }
34
35 /// Get the number of receivers listening for new blocks.
36 pub fn receiver_count(&self) -> usize {
37 self.0.receiver_count()
38 }
39}
40
41impl BlockRx {
42 /// Waits for a change notification.
43 pub async fn changed(&mut self) -> Result<(), RecvError> {
44 self.0.changed().await
45 }
46}