alloy_pubsub/
handle.rs

1use alloy_json_rpc::PubSubItem;
2use serde_json::value::RawValue;
3use tokio::sync::{
4    mpsc,
5    oneshot::{self, error::TryRecvError},
6};
7
8/// A handle to a backend. Communicates to a `ConnectionInterface` on the
9/// backend.
10///
11/// The backend SHOULD shut down when the handle is dropped (as indicated by
12/// the shutdown channel).
13#[derive(Debug)]
14pub struct ConnectionHandle {
15    /// Outbound channel to server.
16    pub(crate) to_socket: mpsc::UnboundedSender<Box<RawValue>>,
17
18    /// Inbound channel from remote server via WS.
19    pub(crate) from_socket: mpsc::UnboundedReceiver<PubSubItem>,
20
21    /// Notification from the backend of a terminal error.
22    pub(crate) error: oneshot::Receiver<()>,
23
24    /// Notify the backend of intentional shutdown.
25    pub(crate) shutdown: oneshot::Sender<()>,
26}
27
28impl ConnectionHandle {
29    /// Create a new connection handle.
30    pub fn new() -> (Self, ConnectionInterface) {
31        let (to_socket, from_frontend) = mpsc::unbounded_channel();
32        let (to_frontend, from_socket) = mpsc::unbounded_channel();
33        let (error_tx, error_rx) = oneshot::channel();
34        let (shutdown_tx, shutdown_rx) = oneshot::channel();
35
36        let handle = Self { to_socket, from_socket, error: error_rx, shutdown: shutdown_tx };
37        let interface = ConnectionInterface {
38            from_frontend,
39            to_frontend,
40            error: error_tx,
41            shutdown: shutdown_rx,
42        };
43        (handle, interface)
44    }
45
46    /// Shutdown the backend.
47    pub fn shutdown(self) {
48        let _ = self.shutdown.send(());
49    }
50}
51
52/// The reciprocal of [`ConnectionHandle`].
53#[derive(Debug)]
54pub struct ConnectionInterface {
55    /// Inbound channel from frontend.
56    pub(crate) from_frontend: mpsc::UnboundedReceiver<Box<RawValue>>,
57
58    /// Channel of responses to the frontend
59    pub(crate) to_frontend: mpsc::UnboundedSender<PubSubItem>,
60
61    /// Notifies the frontend of a terminal error.
62    pub(crate) error: oneshot::Sender<()>,
63
64    /// Causes local shutdown when sender is triggered or dropped.
65    pub(crate) shutdown: oneshot::Receiver<()>,
66}
67
68impl ConnectionInterface {
69    /// Send a pubsub item to the frontend.
70    pub fn send_to_frontend(
71        &self,
72        item: PubSubItem,
73    ) -> Result<(), mpsc::error::SendError<PubSubItem>> {
74        self.to_frontend.send(item)
75    }
76
77    /// Receive a request from the frontend. Ensures that if the frontend has
78    /// dropped or issued a shutdown instruction, the backend sees no more
79    /// requests.
80    pub async fn recv_from_frontend(&mut self) -> Option<Box<RawValue>> {
81        match self.shutdown.try_recv() {
82            Ok(_) | Err(TryRecvError::Closed) => return None,
83            Err(TryRecvError::Empty) => {}
84        }
85
86        if self.shutdown.try_recv().is_ok() {
87            return None;
88        }
89
90        self.from_frontend.recv().await
91    }
92
93    /// Close the interface, sending an error to the frontend.
94    pub fn close_with_error(self) {
95        let _ = self.error.send(());
96    }
97}