1use alloy_json_rpc::PubSubItem;
2use serde_json::value::RawValue;
3use tokio::sync::{
4 mpsc,
5 oneshot::{self, error::TryRecvError},
6};
7
8#[derive(Debug)]
14pub struct ConnectionHandle {
15 pub(crate) to_socket: mpsc::UnboundedSender<Box<RawValue>>,
17
18 pub(crate) from_socket: mpsc::UnboundedReceiver<PubSubItem>,
20
21 pub(crate) error: oneshot::Receiver<()>,
23
24 pub(crate) shutdown: oneshot::Sender<()>,
26}
27
28impl ConnectionHandle {
29 pub fn new() -> (Self, ConnectionInterface) {
31 let (to_socket, from_frontend) = mpsc::unbounded_channel();
32 let (to_frontend, from_socket) = mpsc::unbounded_channel();
33 let (error_tx, error_rx) = oneshot::channel();
34 let (shutdown_tx, shutdown_rx) = oneshot::channel();
35
36 let handle = Self { to_socket, from_socket, error: error_rx, shutdown: shutdown_tx };
37 let interface = ConnectionInterface {
38 from_frontend,
39 to_frontend,
40 error: error_tx,
41 shutdown: shutdown_rx,
42 };
43 (handle, interface)
44 }
45
46 pub fn shutdown(self) {
48 let _ = self.shutdown.send(());
49 }
50}
51
52#[derive(Debug)]
54pub struct ConnectionInterface {
55 pub(crate) from_frontend: mpsc::UnboundedReceiver<Box<RawValue>>,
57
58 pub(crate) to_frontend: mpsc::UnboundedSender<PubSubItem>,
60
61 pub(crate) error: oneshot::Sender<()>,
63
64 pub(crate) shutdown: oneshot::Receiver<()>,
66}
67
68impl ConnectionInterface {
69 pub fn send_to_frontend(
71 &self,
72 item: PubSubItem,
73 ) -> Result<(), mpsc::error::SendError<PubSubItem>> {
74 self.to_frontend.send(item)
75 }
76
77 pub async fn recv_from_frontend(&mut self) -> Option<Box<RawValue>> {
81 match self.shutdown.try_recv() {
82 Ok(_) | Err(TryRecvError::Closed) => return None,
83 Err(TryRecvError::Empty) => {}
84 }
85
86 if self.shutdown.try_recv().is_ok() {
87 return None;
88 }
89
90 self.from_frontend.recv().await
91 }
92
93 pub fn close_with_error(self) {
95 let _ = self.error.send(());
96 }
97}