1use crate::{
2 handle::ConnectionHandle,
3 ix::PubSubInstruction,
4 managers::{InFlight, RequestManager, SubscriptionManager},
5 PubSubConnect, PubSubFrontend, RawSubscription,
6};
7use alloy_json_rpc::{Id, PubSubItem, Request, Response, ResponsePayload, SubId};
8use alloy_primitives::B256;
9use alloy_transport::{
10 utils::{to_json_raw_value, Spawnable},
11 TransportErrorKind, TransportResult,
12};
13use serde_json::value::RawValue;
14use tokio::sync::{mpsc, oneshot};
15
16#[cfg(all(target_family = "wasm", target_os = "unknown"))]
17use wasmtimer::tokio::sleep;
18
19#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
20use tokio::time::sleep;
21
22#[derive(Debug)]
25pub(crate) struct PubSubService<T> {
26 pub(crate) handle: ConnectionHandle,
28
29 pub(crate) connector: T,
31
32 pub(crate) reqs: mpsc::UnboundedReceiver<PubSubInstruction>,
34
35 pub(crate) subs: SubscriptionManager,
37
38 pub(crate) in_flights: RequestManager,
40}
41
42impl<T: PubSubConnect> PubSubService<T> {
43 pub(crate) async fn connect(connector: T) -> TransportResult<PubSubFrontend> {
45 let handle = connector.connect().await?;
46
47 let (tx, reqs) = mpsc::unbounded_channel();
48 let this = Self {
49 handle,
50 connector,
51 reqs,
52 subs: SubscriptionManager::default(),
53 in_flights: Default::default(),
54 };
55 this.spawn();
56 Ok(PubSubFrontend::new(tx))
57 }
58
59 async fn get_new_backend(&mut self) -> TransportResult<ConnectionHandle> {
61 let mut handle = self.connector.try_reconnect().await?;
62 std::mem::swap(&mut self.handle, &mut handle);
63 Ok(handle)
64 }
65
66 async fn reconnect(&mut self) -> TransportResult<()> {
69 debug!("Reconnecting pubsub service backend");
70
71 let mut old_handle = self.get_new_backend().await?;
72
73 debug!("Draining old backend to_handle");
74
75 while let Ok(item) = old_handle.from_socket.try_recv() {
77 self.handle_item(item)?;
78 }
79
80 old_handle.shutdown();
81
82 debug!(count = self.in_flights.len(), "Reissuing pending requests");
84 for (_, in_flight) in self.in_flights.iter() {
85 let msg = in_flight.request.serialized().to_owned();
86 self.handle.to_socket.send(msg).map_err(|_| TransportErrorKind::backend_gone())?;
88 }
89
90 debug!(count = self.subs.len(), "Re-starting active subscriptions");
92
93 self.subs.drop_server_ids();
95
96 for (_, sub) in self.subs.iter() {
98 let req = sub.request().to_owned();
99 let (in_flight, _) = InFlight::new(req.clone(), sub.tx.receiver_count());
100 self.in_flights.insert(in_flight);
101
102 let msg = req.into_serialized();
103 self.handle.to_socket.send(msg).map_err(|_| TransportErrorKind::backend_gone())?;
104 }
105
106 Ok(())
107 }
108
109 fn dispatch_request(&self, brv: Box<RawValue>) -> TransportResult<()> {
111 self.handle.to_socket.send(brv).map(drop).map_err(|_| TransportErrorKind::backend_gone())
112 }
113
114 fn service_request(&mut self, in_flight: InFlight) -> TransportResult<()> {
116 let brv = in_flight.request();
117
118 self.dispatch_request(brv.serialized().to_owned())?;
119 self.in_flights.insert(in_flight);
120
121 Ok(())
122 }
123
124 fn service_get_sub(&self, local_id: B256, tx: oneshot::Sender<Option<RawSubscription>>) {
129 let _ = tx.send(self.subs.get_subscription(local_id));
130 }
131
132 fn service_unsubscribe(&mut self, local_id: B256) -> TransportResult<()> {
134 if let Some(server_id) = self.subs.server_id_for(&local_id) {
135 let req = Request::new("eth_unsubscribe", Id::Number(1), [server_id]);
137 let brv = req.serialize().expect("no ser error").take_request();
138
139 self.dispatch_request(brv)?;
140 }
141 self.subs.remove_sub(local_id);
142 Ok(())
143 }
144
145 fn service_ix(&mut self, ix: PubSubInstruction) -> TransportResult<()> {
147 trace!(?ix, "servicing instruction");
148 match ix {
149 PubSubInstruction::Request(in_flight) => self.service_request(in_flight),
150 PubSubInstruction::GetSub(alias, tx) => {
151 self.service_get_sub(alias, tx);
152 Ok(())
153 }
154 PubSubInstruction::Unsubscribe(alias) => self.service_unsubscribe(alias),
155 }
156 }
157
158 fn handle_item(&mut self, item: PubSubItem) -> TransportResult<()> {
160 match item {
161 PubSubItem::Response(resp) => match self.in_flights.handle_response(resp) {
162 Some((server_id, in_flight)) => self.handle_sub_response(in_flight, server_id),
163 None => Ok(()),
164 },
165 PubSubItem::Notification(notification) => {
166 self.subs.notify(notification);
167 Ok(())
168 }
169 }
170 }
171
172 fn handle_sub_response(
174 &mut self,
175 in_flight: InFlight,
176 server_id: SubId,
177 ) -> TransportResult<()> {
178 let request = in_flight.request;
179 let id = request.id().clone();
180
181 let sub = self.subs.upsert(request, server_id, in_flight.channel_size);
182
183 let ser_alias = to_json_raw_value(sub.local_id())?;
185
186 let _ =
189 in_flight.tx.send(Ok(Response { id, payload: ResponsePayload::Success(ser_alias) }));
190
191 Ok(())
192 }
193
194 async fn reconnect_with_retries(&mut self) -> TransportResult<()> {
196 let mut retry_count = 0;
197 let max_retries = self.handle.max_retries;
198 let interval = self.handle.retry_interval;
199 loop {
200 match self.reconnect().await {
201 Ok(()) => break Ok(()),
202 Err(e) => {
203 retry_count += 1;
204 if retry_count >= max_retries {
205 error!("Reconnect failed after {max_retries} attempts, shutting down: {e}");
206 break Err(e);
207 }
208 warn!(
209 "Reconnection attempt {retry_count}/{max_retries} failed: {e}. \
210 Retrying in {:?}s...",
211 interval.as_secs_f64(),
212 );
213 sleep(interval).await;
214 }
215 }
216 }
217 }
218
219 pub(crate) fn spawn(mut self) {
221 let fut = async move {
222 let result: TransportResult<()> = loop {
223 tokio::select! {
227 biased;
228
229 item_opt = self.handle.from_socket.recv() => {
230 if let Some(item) = item_opt {
231 if let Err(e) = self.handle_item(item) {
232 break Err(e)
233 }
234 } else if let Err(e) = self.reconnect_with_retries().await {
235 break Err(e)
236 }
237 }
238
239 _ = &mut self.handle.error => {
240 error!("Pubsub service backend error.");
241 if let Err(e) = self.reconnect_with_retries().await {
242 break Err(e)
243 }
244 }
245
246 req_opt = self.reqs.recv() => {
247 if let Some(req) = req_opt {
248 if let Err(e) = self.service_ix(req) {
249 break Err(e)
250 }
251 } else {
252 info!("Pubsub service request channel closed. Shutting down.");
253 break Ok(())
254 }
255 }
256 }
257 };
258
259 if let Err(err) = result {
260 error!(%err, "pubsub service reconnection error");
261 }
262 };
263 fut.spawn_task();
264 }
265}