Skip to main content

alloy_pubsub/
handle.rs

1use alloy_json_rpc::PubSubItem;
2use alloy_transport::{TransportError, TransportErrorKind};
3use serde_json::value::RawValue;
4use tokio::{
5    sync::{
6        mpsc,
7        oneshot::{self, error::TryRecvError},
8    },
9    time::Duration,
10};
11
12/// A handle to a backend. Communicates to a `ConnectionInterface` on the
13/// backend.
14///
15/// The backend SHOULD shut down when the handle is dropped (as indicated by
16/// the shutdown channel).
17#[derive(Debug)]
18pub struct ConnectionHandle {
19    /// Outbound channel to server.
20    pub(crate) to_socket: mpsc::UnboundedSender<Box<RawValue>>,
21
22    /// Inbound channel from remote server via WS.
23    pub(crate) from_socket: mpsc::UnboundedReceiver<PubSubItem>,
24
25    /// Notification from the backend of a terminal error.
26    ///
27    /// The carried [`TransportError`] is used by the pubsub service to
28    /// decide whether to attempt reconnection. A
29    /// [`TransportErrorKind::NonRetryable`] payload short-circuits the
30    /// reconnect loop; any other error (including the default
31    /// [`TransportErrorKind::BackendGone`] sent by
32    /// [`ConnectionInterface::close_with_error`]) triggers the normal
33    /// reconnect-with-retries path.
34    pub(crate) error: oneshot::Receiver<TransportError>,
35
36    /// Notify the backend of intentional shutdown.
37    pub(crate) shutdown: oneshot::Sender<()>,
38
39    /// Max number of retries before failing and exiting the connection.
40    /// Default is 10.
41    pub(crate) max_retries: u32,
42    /// The base interval between retries.
43    ///
44    /// Reconnect retries use capped exponential backoff from this base interval.
45    /// Default is 3 seconds.
46    pub(crate) retry_interval: Duration,
47}
48
49impl ConnectionHandle {
50    /// Create a new connection handle.
51    pub fn new() -> (Self, ConnectionInterface) {
52        let (to_socket, from_frontend) = mpsc::unbounded_channel();
53        let (to_frontend, from_socket) = mpsc::unbounded_channel();
54        let (error_tx, error_rx) = oneshot::channel();
55        let (shutdown_tx, shutdown_rx) = oneshot::channel();
56
57        let handle = Self {
58            to_socket,
59            from_socket,
60            error: error_rx,
61            shutdown: shutdown_tx,
62            max_retries: 10,
63            retry_interval: Duration::from_secs(3),
64        };
65        let interface = ConnectionInterface {
66            from_frontend,
67            to_frontend,
68            error: error_tx,
69            shutdown: shutdown_rx,
70        };
71        (handle, interface)
72    }
73
74    /// Set the max number of retries before failing and exiting the connection.
75    /// Default is 10.
76    pub const fn with_max_retries(mut self, max_retries: u32) -> Self {
77        self.max_retries = max_retries;
78        self
79    }
80
81    /// Set the base interval between retries.
82    ///
83    /// Reconnect retries use capped exponential backoff from this base interval.
84    pub const fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
85        self.retry_interval = retry_interval;
86        self
87    }
88
89    /// Shutdown the backend.
90    pub fn shutdown(self) {
91        let _ = self.shutdown.send(());
92    }
93}
94
95/// The reciprocal of [`ConnectionHandle`].
96#[derive(Debug)]
97pub struct ConnectionInterface {
98    /// Inbound channel from frontend.
99    pub(crate) from_frontend: mpsc::UnboundedReceiver<Box<RawValue>>,
100
101    /// Channel of responses to the frontend
102    pub(crate) to_frontend: mpsc::UnboundedSender<PubSubItem>,
103
104    /// Notifies the frontend of a terminal error.
105    pub(crate) error: oneshot::Sender<TransportError>,
106
107    /// Causes local shutdown when sender is triggered or dropped.
108    pub(crate) shutdown: oneshot::Receiver<()>,
109}
110
111impl ConnectionInterface {
112    /// Send a pubsub item to the frontend.
113    pub fn send_to_frontend(
114        &self,
115        item: PubSubItem,
116    ) -> Result<(), mpsc::error::SendError<PubSubItem>> {
117        self.to_frontend.send(item)
118    }
119
120    /// Receive a request from the frontend. Ensures that if the frontend has
121    /// dropped or issued a shutdown instruction, the backend sees no more
122    /// requests.
123    pub async fn recv_from_frontend(&mut self) -> Option<Box<RawValue>> {
124        match self.shutdown.try_recv() {
125            Ok(_) | Err(TryRecvError::Closed) => return None,
126            Err(TryRecvError::Empty) => {}
127        }
128
129        self.from_frontend.recv().await
130    }
131
132    /// Close the interface, signaling a generic backend-gone error to the
133    /// frontend.
134    ///
135    /// The pubsub service will attempt to reconnect using the configured
136    /// retry policy. Use [`Self::close_with_transport_error`] to opt out of
137    /// the reconnect loop on deterministic, non-retryable failures.
138    pub fn close_with_error(self) {
139        let _ = self.error.send(TransportErrorKind::backend_gone());
140    }
141
142    /// Close the interface, signaling a specific transport error to the
143    /// frontend.
144    ///
145    /// Use [`TransportErrorKind::non_retryable_str`] /
146    /// [`TransportErrorKind::non_retryable`] to construct an error that
147    /// short-circuits the pubsub service's reconnect loop. Any other error
148    /// kind triggers the normal reconnect-with-retries path.
149    pub fn close_with_transport_error(self, err: TransportError) {
150        let _ = self.error.send(err);
151    }
152}