solana_transaction_utils/
sync.rs1use crate::Error;
2use futures::TryFutureExt;
3use tokio::sync::{mpsc, oneshot};
4use tracing::warn;
5
6#[derive(Debug)]
7pub struct MessageSender<T>(pub(crate) mpsc::Sender<T>);
8#[derive(Debug)]
9pub struct MessageReceiver<T>(mpsc::Receiver<T>);
10pub struct MessageChannel<T> {
11 pub(crate) tx: MessageSender<T>,
12 pub(crate) rx: MessageReceiver<T>,
13}
14
15impl<T> From<(mpsc::Sender<T>, mpsc::Receiver<T>)> for MessageChannel<T> {
16 fn from(value: (mpsc::Sender<T>, mpsc::Receiver<T>)) -> Self {
17 Self {
18 tx: MessageSender(value.0),
19 rx: MessageReceiver(value.1),
20 }
21 }
22}
23
24impl<T> MessageChannel<T> {
25 pub fn new(size: usize) -> Self {
26 mpsc::channel(size).into()
27 }
28
29 pub async fn recv(&mut self) -> Option<T> {
30 self.rx.recv().await
31 }
32
33 pub fn sender(&self) -> MessageSender<T> {
34 self.tx.clone()
35 }
36}
37
38pub fn message_channel<T>(size: usize) -> (MessageSender<T>, MessageReceiver<T>) {
39 let (tx, rx) = mpsc::channel(size);
40 (MessageSender(tx), MessageReceiver(rx))
41}
42
43impl<T> MessageReceiver<T> {
44 pub async fn recv(&mut self) -> Option<T> {
45 self.0.recv().await
46 }
47}
48
49impl<T> Clone for MessageSender<T> {
50 fn clone(&self) -> Self {
51 Self(self.0.clone())
52 }
53}
54
55impl<T> MessageSender<T> {
56 pub async fn send(&self, msg: T) {
57 _ = self.0.send(msg).await
58 }
59
60 pub async fn request<R, F>(&self, req: F) -> Result<R, Error>
61 where
62 F: FnOnce(ResponseSender<R>) -> T,
63 {
64 let (tx, rx) = response_channel();
65 self.0
66 .send(req(tx))
67 .map_err(|_| Error::channel_closed())
68 .await?;
69 rx.recv().await
70 }
71}
72
73#[derive(Debug)]
74pub struct ResponseSender<T>(oneshot::Sender<T>);
75pub struct ResponseReceiver<T>(oneshot::Receiver<T>);
76
77pub fn response_channel<T>() -> (ResponseSender<T>, ResponseReceiver<T>) {
78 let (tx, rx) = oneshot::channel();
79 (ResponseSender(tx), ResponseReceiver(rx))
80}
81
82impl<T: std::fmt::Debug> ResponseSender<T> {
83 pub fn send(self, msg: T) {
84 match self.0.send(msg) {
85 Ok(()) => (),
86 Err(err) => warn!(?err, "ignoring channel error"),
87 }
88 }
89}
90
91impl<T> ResponseReceiver<T> {
92 pub async fn recv(self) -> Result<T, Error> {
93 self.0.map_err(|_| Error::channel_closed()).await
94 }
95}