solana_transaction_utils/
sync.rs

1use crate::Error;
2use futures::TryFutureExt;
3use tokio::sync::{mpsc, oneshot};
4use tracing::warn;
5
6#[derive(Debug)]
7pub struct MessageSender<T>(pub(crate) mpsc::Sender<T>);
8#[derive(Debug)]
9pub struct MessageReceiver<T>(mpsc::Receiver<T>);
10pub struct MessageChannel<T> {
11    pub(crate) tx: MessageSender<T>,
12    pub(crate) rx: MessageReceiver<T>,
13}
14
15impl<T> From<(mpsc::Sender<T>, mpsc::Receiver<T>)> for MessageChannel<T> {
16    fn from(value: (mpsc::Sender<T>, mpsc::Receiver<T>)) -> Self {
17        Self {
18            tx: MessageSender(value.0),
19            rx: MessageReceiver(value.1),
20        }
21    }
22}
23
24impl<T> MessageChannel<T> {
25    pub fn new(size: usize) -> Self {
26        mpsc::channel(size).into()
27    }
28
29    pub async fn recv(&mut self) -> Option<T> {
30        self.rx.recv().await
31    }
32
33    pub fn sender(&self) -> MessageSender<T> {
34        self.tx.clone()
35    }
36}
37
38pub fn message_channel<T>(size: usize) -> (MessageSender<T>, MessageReceiver<T>) {
39    let (tx, rx) = mpsc::channel(size);
40    (MessageSender(tx), MessageReceiver(rx))
41}
42
43impl<T> MessageReceiver<T> {
44    pub async fn recv(&mut self) -> Option<T> {
45        self.0.recv().await
46    }
47}
48
49impl<T> Clone for MessageSender<T> {
50    fn clone(&self) -> Self {
51        Self(self.0.clone())
52    }
53}
54
55impl<T> MessageSender<T> {
56    pub async fn send(&self, msg: T) {
57        _ = self.0.send(msg).await
58    }
59
60    pub async fn request<R, F>(&self, req: F) -> Result<R, Error>
61    where
62        F: FnOnce(ResponseSender<R>) -> T,
63    {
64        let (tx, rx) = response_channel();
65        self.0
66            .send(req(tx))
67            .map_err(|_| Error::channel_closed())
68            .await?;
69        rx.recv().await
70    }
71}
72
73#[derive(Debug)]
74pub struct ResponseSender<T>(oneshot::Sender<T>);
75pub struct ResponseReceiver<T>(oneshot::Receiver<T>);
76
77pub fn response_channel<T>() -> (ResponseSender<T>, ResponseReceiver<T>) {
78    let (tx, rx) = oneshot::channel();
79    (ResponseSender(tx), ResponseReceiver(rx))
80}
81
82impl<T: std::fmt::Debug> ResponseSender<T> {
83    pub fn send(self, msg: T) {
84        match self.0.send(msg) {
85            Ok(()) => (),
86            Err(err) => warn!(?err, "ignoring channel error"),
87        }
88    }
89}
90
91impl<T> ResponseReceiver<T> {
92    pub async fn recv(self) -> Result<T, Error> {
93        self.0.map_err(|_| Error::channel_closed()).await
94    }
95}