alloy_pubsub/
handle.rs

1use alloy_json_rpc::PubSubItem;
2use serde_json::value::RawValue;
3use tokio::{
4    sync::{
5        mpsc,
6        oneshot::{self, error::TryRecvError},
7    },
8    time::Duration,
9};
10
11/// A handle to a backend. Communicates to a `ConnectionInterface` on the
12/// backend.
13///
14/// The backend SHOULD shut down when the handle is dropped (as indicated by
15/// the shutdown channel).
16#[derive(Debug)]
17pub struct ConnectionHandle {
18    /// Outbound channel to server.
19    pub(crate) to_socket: mpsc::UnboundedSender<Box<RawValue>>,
20
21    /// Inbound channel from remote server via WS.
22    pub(crate) from_socket: mpsc::UnboundedReceiver<PubSubItem>,
23
24    /// Notification from the backend of a terminal error.
25    pub(crate) error: oneshot::Receiver<()>,
26
27    /// Notify the backend of intentional shutdown.
28    pub(crate) shutdown: oneshot::Sender<()>,
29
30    /// Max number of retries before failing and exiting the connection.
31    /// Default is 10.
32    pub(crate) max_retries: u32,
33    /// The interval between retries.
34    /// Default is 3 seconds.
35    pub(crate) retry_interval: Duration,
36}
37
38impl ConnectionHandle {
39    /// Create a new connection handle.
40    pub fn new() -> (Self, ConnectionInterface) {
41        let (to_socket, from_frontend) = mpsc::unbounded_channel();
42        let (to_frontend, from_socket) = mpsc::unbounded_channel();
43        let (error_tx, error_rx) = oneshot::channel();
44        let (shutdown_tx, shutdown_rx) = oneshot::channel();
45
46        let handle = Self {
47            to_socket,
48            from_socket,
49            error: error_rx,
50            shutdown: shutdown_tx,
51            max_retries: 10,
52            retry_interval: Duration::from_secs(3),
53        };
54        let interface = ConnectionInterface {
55            from_frontend,
56            to_frontend,
57            error: error_tx,
58            shutdown: shutdown_rx,
59        };
60        (handle, interface)
61    }
62
63    /// Set the max number of retries before failing and exiting the connection.
64    /// Default is 10.
65    pub const fn with_max_retries(mut self, max_retries: u32) -> Self {
66        self.max_retries = max_retries;
67        self
68    }
69
70    /// Set the interval between retries.
71    pub const fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
72        self.retry_interval = retry_interval;
73        self
74    }
75
76    /// Shutdown the backend.
77    pub fn shutdown(self) {
78        let _ = self.shutdown.send(());
79    }
80}
81
82/// The reciprocal of [`ConnectionHandle`].
83#[derive(Debug)]
84pub struct ConnectionInterface {
85    /// Inbound channel from frontend.
86    pub(crate) from_frontend: mpsc::UnboundedReceiver<Box<RawValue>>,
87
88    /// Channel of responses to the frontend
89    pub(crate) to_frontend: mpsc::UnboundedSender<PubSubItem>,
90
91    /// Notifies the frontend of a terminal error.
92    pub(crate) error: oneshot::Sender<()>,
93
94    /// Causes local shutdown when sender is triggered or dropped.
95    pub(crate) shutdown: oneshot::Receiver<()>,
96}
97
98impl ConnectionInterface {
99    /// Send a pubsub item to the frontend.
100    pub fn send_to_frontend(
101        &self,
102        item: PubSubItem,
103    ) -> Result<(), mpsc::error::SendError<PubSubItem>> {
104        self.to_frontend.send(item)
105    }
106
107    /// Receive a request from the frontend. Ensures that if the frontend has
108    /// dropped or issued a shutdown instruction, the backend sees no more
109    /// requests.
110    pub async fn recv_from_frontend(&mut self) -> Option<Box<RawValue>> {
111        match self.shutdown.try_recv() {
112            Ok(_) | Err(TryRecvError::Closed) => return None,
113            Err(TryRecvError::Empty) => {}
114        }
115
116        self.from_frontend.recv().await
117    }
118
119    /// Close the interface, sending an error to the frontend.
120    pub fn close_with_error(self) {
121        let _ = self.error.send(());
122    }
123}