Skip to main content

cometbft_rpc/client/
sync.rs

1//! Synchronization primitives specific to the CometBFT RPC client.
2//!
3//! At present, this wraps Tokio's synchronization primitives and provides some
4//! convenience methods. We also only implement unbounded channels at present.
5//! In future, if RPC consumers need it, we will implement bounded channels.
6
7use core::pin::Pin;
8
9use futures::{
10    task::{Context, Poll},
11    Stream,
12};
13use pin_project::pin_project;
14use tokio::sync::mpsc;
15
16use crate::Error;
17
18/// Constructor for an unbounded channel.
19pub fn unbounded<T>() -> (ChannelTx<T>, ChannelRx<T>) {
20    let (tx, rx) = mpsc::unbounded_channel();
21    (ChannelTx(tx), ChannelRx(rx))
22}
23
24/// Sender interface for a channel.
25///
26/// Can be cloned because the underlying channel used is
27/// [`mpsc`](https://docs.rs/tokio/*/tokio/sync/mpsc/index.html).
28#[derive(Debug, Clone)]
29pub struct ChannelTx<T>(mpsc::UnboundedSender<T>);
30
31impl<T> ChannelTx<T> {
32    pub fn send(&self, value: T) -> Result<(), Error> {
33        self.0.send(value).map_err(Error::send)
34    }
35}
36
37/// Receiver interface for a channel.
38#[pin_project]
39#[derive(Debug)]
40pub struct ChannelRx<T>(#[pin] mpsc::UnboundedReceiver<T>);
41
42impl<T> ChannelRx<T> {
43    /// Wait indefinitely until we receive a value from the channel (or the
44    /// channel is closed).
45    #[allow(dead_code)]
46    pub async fn recv(&mut self) -> Option<T> {
47        self.0.recv().await
48    }
49}
50
51impl<T> Stream for ChannelRx<T> {
52    type Item = T;
53
54    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55        self.project().0.poll_recv(cx)
56    }
57}