1use alloy_json_rpc::PubSubItem;
2use serde_json::value::RawValue;
3use tokio::{
4 sync::{
5 mpsc,
6 oneshot::{self, error::TryRecvError},
7 },
8 time::Duration,
9};
10
11#[derive(Debug)]
17pub struct ConnectionHandle {
18 pub(crate) to_socket: mpsc::UnboundedSender<Box<RawValue>>,
20
21 pub(crate) from_socket: mpsc::UnboundedReceiver<PubSubItem>,
23
24 pub(crate) error: oneshot::Receiver<()>,
26
27 pub(crate) shutdown: oneshot::Sender<()>,
29
30 pub(crate) max_retries: u32,
33 pub(crate) retry_interval: Duration,
36}
37
38impl ConnectionHandle {
39 pub fn new() -> (Self, ConnectionInterface) {
41 let (to_socket, from_frontend) = mpsc::unbounded_channel();
42 let (to_frontend, from_socket) = mpsc::unbounded_channel();
43 let (error_tx, error_rx) = oneshot::channel();
44 let (shutdown_tx, shutdown_rx) = oneshot::channel();
45
46 let handle = Self {
47 to_socket,
48 from_socket,
49 error: error_rx,
50 shutdown: shutdown_tx,
51 max_retries: 10,
52 retry_interval: Duration::from_secs(3),
53 };
54 let interface = ConnectionInterface {
55 from_frontend,
56 to_frontend,
57 error: error_tx,
58 shutdown: shutdown_rx,
59 };
60 (handle, interface)
61 }
62
63 pub const fn with_max_retries(mut self, max_retries: u32) -> Self {
66 self.max_retries = max_retries;
67 self
68 }
69
70 pub const fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
72 self.retry_interval = retry_interval;
73 self
74 }
75
76 pub fn shutdown(self) {
78 let _ = self.shutdown.send(());
79 }
80}
81
82#[derive(Debug)]
84pub struct ConnectionInterface {
85 pub(crate) from_frontend: mpsc::UnboundedReceiver<Box<RawValue>>,
87
88 pub(crate) to_frontend: mpsc::UnboundedSender<PubSubItem>,
90
91 pub(crate) error: oneshot::Sender<()>,
93
94 pub(crate) shutdown: oneshot::Receiver<()>,
96}
97
98impl ConnectionInterface {
99 pub fn send_to_frontend(
101 &self,
102 item: PubSubItem,
103 ) -> Result<(), mpsc::error::SendError<PubSubItem>> {
104 self.to_frontend.send(item)
105 }
106
107 pub async fn recv_from_frontend(&mut self) -> Option<Box<RawValue>> {
111 match self.shutdown.try_recv() {
112 Ok(_) | Err(TryRecvError::Closed) => return None,
113 Err(TryRecvError::Empty) => {}
114 }
115
116 if self.shutdown.try_recv().is_ok() {
117 return None;
118 }
119
120 self.from_frontend.recv().await
121 }
122
123 pub fn close_with_error(self) {
125 let _ = self.error.send(());
126 }
127}