alloy_pubsub/handle.rs
1use alloy_json_rpc::PubSubItem;
2use alloy_transport::{TransportError, TransportErrorKind};
3use serde_json::value::RawValue;
4use tokio::{
5 sync::{
6 mpsc,
7 oneshot::{self, error::TryRecvError},
8 },
9 time::Duration,
10};
11
12/// A handle to a backend. Communicates to a `ConnectionInterface` on the
13/// backend.
14///
15/// The backend SHOULD shut down when the handle is dropped (as indicated by
16/// the shutdown channel).
17#[derive(Debug)]
18pub struct ConnectionHandle {
19 /// Outbound channel to server.
20 pub(crate) to_socket: mpsc::UnboundedSender<Box<RawValue>>,
21
22 /// Inbound channel from remote server via WS.
23 pub(crate) from_socket: mpsc::UnboundedReceiver<PubSubItem>,
24
25 /// Notification from the backend of a terminal error.
26 ///
27 /// The carried [`TransportError`] is used by the pubsub service to
28 /// decide whether to attempt reconnection. A
29 /// [`TransportErrorKind::NonRetryable`] payload short-circuits the
30 /// reconnect loop; any other error (including the default
31 /// [`TransportErrorKind::BackendGone`] sent by
32 /// [`ConnectionInterface::close_with_error`]) triggers the normal
33 /// reconnect-with-retries path.
34 pub(crate) error: oneshot::Receiver<TransportError>,
35
36 /// Notify the backend of intentional shutdown.
37 pub(crate) shutdown: oneshot::Sender<()>,
38
39 /// Max number of retries before failing and exiting the connection.
40 /// Default is 10.
41 pub(crate) max_retries: u32,
42 /// The base interval between retries.
43 ///
44 /// Reconnect retries use capped exponential backoff from this base interval.
45 /// Default is 3 seconds.
46 pub(crate) retry_interval: Duration,
47}
48
49impl ConnectionHandle {
50 /// Create a new connection handle.
51 pub fn new() -> (Self, ConnectionInterface) {
52 let (to_socket, from_frontend) = mpsc::unbounded_channel();
53 let (to_frontend, from_socket) = mpsc::unbounded_channel();
54 let (error_tx, error_rx) = oneshot::channel();
55 let (shutdown_tx, shutdown_rx) = oneshot::channel();
56
57 let handle = Self {
58 to_socket,
59 from_socket,
60 error: error_rx,
61 shutdown: shutdown_tx,
62 max_retries: 10,
63 retry_interval: Duration::from_secs(3),
64 };
65 let interface = ConnectionInterface {
66 from_frontend,
67 to_frontend,
68 error: error_tx,
69 shutdown: shutdown_rx,
70 };
71 (handle, interface)
72 }
73
74 /// Set the max number of retries before failing and exiting the connection.
75 /// Default is 10.
76 pub const fn with_max_retries(mut self, max_retries: u32) -> Self {
77 self.max_retries = max_retries;
78 self
79 }
80
81 /// Set the base interval between retries.
82 ///
83 /// Reconnect retries use capped exponential backoff from this base interval.
84 pub const fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
85 self.retry_interval = retry_interval;
86 self
87 }
88
89 /// Shutdown the backend.
90 pub fn shutdown(self) {
91 let _ = self.shutdown.send(());
92 }
93}
94
95/// The reciprocal of [`ConnectionHandle`].
96#[derive(Debug)]
97pub struct ConnectionInterface {
98 /// Inbound channel from frontend.
99 pub(crate) from_frontend: mpsc::UnboundedReceiver<Box<RawValue>>,
100
101 /// Channel of responses to the frontend
102 pub(crate) to_frontend: mpsc::UnboundedSender<PubSubItem>,
103
104 /// Notifies the frontend of a terminal error.
105 pub(crate) error: oneshot::Sender<TransportError>,
106
107 /// Causes local shutdown when sender is triggered or dropped.
108 pub(crate) shutdown: oneshot::Receiver<()>,
109}
110
111impl ConnectionInterface {
112 /// Send a pubsub item to the frontend.
113 pub fn send_to_frontend(
114 &self,
115 item: PubSubItem,
116 ) -> Result<(), mpsc::error::SendError<PubSubItem>> {
117 self.to_frontend.send(item)
118 }
119
120 /// Receive a request from the frontend. Ensures that if the frontend has
121 /// dropped or issued a shutdown instruction, the backend sees no more
122 /// requests.
123 pub async fn recv_from_frontend(&mut self) -> Option<Box<RawValue>> {
124 match self.shutdown.try_recv() {
125 Ok(_) | Err(TryRecvError::Closed) => return None,
126 Err(TryRecvError::Empty) => {}
127 }
128
129 self.from_frontend.recv().await
130 }
131
132 /// Close the interface, signaling a generic backend-gone error to the
133 /// frontend.
134 ///
135 /// The pubsub service will attempt to reconnect using the configured
136 /// retry policy. Use [`Self::close_with_transport_error`] to opt out of
137 /// the reconnect loop on deterministic, non-retryable failures.
138 pub fn close_with_error(self) {
139 let _ = self.error.send(TransportErrorKind::backend_gone());
140 }
141
142 /// Close the interface, signaling a specific transport error to the
143 /// frontend.
144 ///
145 /// Use [`TransportErrorKind::non_retryable_str`] /
146 /// [`TransportErrorKind::non_retryable`] to construct an error that
147 /// short-circuits the pubsub service's reconnect loop. Any other error
148 /// kind triggers the normal reconnect-with-retries path.
149 pub fn close_with_transport_error(self, err: TransportError) {
150 let _ = self.error.send(err);
151 }
152}