cometbft_rpc/client/
sync.rs1use core::pin::Pin;
8
9use futures::{
10 task::{Context, Poll},
11 Stream,
12};
13use pin_project::pin_project;
14use tokio::sync::mpsc;
15
16use crate::Error;
17
18pub fn unbounded<T>() -> (ChannelTx<T>, ChannelRx<T>) {
20 let (tx, rx) = mpsc::unbounded_channel();
21 (ChannelTx(tx), ChannelRx(rx))
22}
23
24#[derive(Debug, Clone)]
29pub struct ChannelTx<T>(mpsc::UnboundedSender<T>);
30
31impl<T> ChannelTx<T> {
32 pub fn send(&self, value: T) -> Result<(), Error> {
33 self.0.send(value).map_err(Error::send)
34 }
35}
36
37#[pin_project]
39#[derive(Debug)]
40pub struct ChannelRx<T>(#[pin] mpsc::UnboundedReceiver<T>);
41
42impl<T> ChannelRx<T> {
43 #[allow(dead_code)]
46 pub async fn recv(&mut self) -> Option<T> {
47 self.0.recv().await
48 }
49}
50
51impl<T> Stream for ChannelRx<T> {
52 type Item = T;
53
54 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55 self.project().0.poll_recv(cx)
56 }
57}