alloy_pubsub/
service.rs

1use crate::{
2    handle::ConnectionHandle,
3    ix::PubSubInstruction,
4    managers::{InFlight, RequestManager, SubscriptionManager},
5    PubSubConnect, PubSubFrontend, RawSubscription,
6};
7use alloy_json_rpc::{Id, PubSubItem, Request, Response, ResponsePayload, SubId};
8use alloy_primitives::B256;
9use alloy_transport::{
10    utils::{to_json_raw_value, Spawnable},
11    TransportErrorKind, TransportResult,
12};
13use serde_json::value::RawValue;
14use tokio::sync::{mpsc, oneshot};
15
16#[cfg(target_family = "wasm")]
17use wasmtimer::tokio::sleep;
18
19#[cfg(not(target_family = "wasm"))]
20use tokio::time::sleep;
21
22/// The service contains the backend handle, a subscription manager, and the
23/// configuration details required to reconnect.
24#[derive(Debug)]
25pub(crate) struct PubSubService<T> {
26    /// The backend handle.
27    pub(crate) handle: ConnectionHandle,
28
29    /// The configuration details required to reconnect.
30    pub(crate) connector: T,
31
32    /// The inbound requests.
33    pub(crate) reqs: mpsc::UnboundedReceiver<PubSubInstruction>,
34
35    /// The subscription manager.
36    pub(crate) subs: SubscriptionManager,
37
38    /// The request manager.
39    pub(crate) in_flights: RequestManager,
40}
41
42impl<T: PubSubConnect> PubSubService<T> {
43    /// Create a new service from a connector.
44    pub(crate) async fn connect(connector: T) -> TransportResult<PubSubFrontend> {
45        let handle = connector.connect().await?;
46
47        let (tx, reqs) = mpsc::unbounded_channel();
48        let this = Self {
49            handle,
50            connector,
51            reqs,
52            subs: SubscriptionManager::default(),
53            in_flights: Default::default(),
54        };
55        this.spawn();
56        Ok(PubSubFrontend::new(tx))
57    }
58
59    /// Reconnect by dropping the backend and creating a new one.
60    async fn get_new_backend(&mut self) -> TransportResult<ConnectionHandle> {
61        let mut handle = self.connector.try_reconnect().await?;
62        std::mem::swap(&mut self.handle, &mut handle);
63        Ok(handle)
64    }
65
66    /// Reconnect the backend, re-issue pending requests, and re-start active
67    /// subscriptions.
68    async fn reconnect(&mut self) -> TransportResult<()> {
69        debug!("Reconnecting pubsub service backend");
70
71        let mut old_handle = self.get_new_backend().await?;
72
73        debug!("Draining old backend to_handle");
74
75        // Drain the old backend
76        while let Ok(item) = old_handle.from_socket.try_recv() {
77            self.handle_item(item)?;
78        }
79
80        old_handle.shutdown();
81
82        // Re-issue pending requests.
83        debug!(count = self.in_flights.len(), "Reissuing pending requests");
84        for (_, in_flight) in self.in_flights.iter() {
85            let msg = in_flight.request.serialized().to_owned();
86            // Same as `dispatch_request`, but inlined to avoid double-borrowing `self`.
87            self.handle.to_socket.send(msg).map_err(|_| TransportErrorKind::backend_gone())?;
88        }
89
90        // Re-subscribe to all active subscriptions
91        debug!(count = self.subs.len(), "Re-starting active subscriptions");
92
93        // Drop all server IDs. We'll re-insert them as we get responses.
94        self.subs.drop_server_ids();
95
96        // Dispatch all subscription requests.
97        for (_, sub) in self.subs.iter() {
98            let req = sub.request().to_owned();
99            let (in_flight, _) = InFlight::new(req.clone(), sub.tx.receiver_count());
100            self.in_flights.insert(in_flight);
101
102            let msg = req.into_serialized();
103            self.handle.to_socket.send(msg).map_err(|_| TransportErrorKind::backend_gone())?;
104        }
105
106        Ok(())
107    }
108
109    /// Dispatch a request to the socket.
110    fn dispatch_request(&self, brv: Box<RawValue>) -> TransportResult<()> {
111        self.handle.to_socket.send(brv).map(drop).map_err(|_| TransportErrorKind::backend_gone())
112    }
113
114    /// Service a request.
115    fn service_request(&mut self, in_flight: InFlight) -> TransportResult<()> {
116        let brv = in_flight.request();
117
118        self.dispatch_request(brv.serialized().to_owned())?;
119        self.in_flights.insert(in_flight);
120
121        Ok(())
122    }
123
124    /// Service a GetSub instruction.
125    ///
126    /// If the subscription exists, the waiter is sent a broadcast receiver. If
127    /// the subscription does not exist, the waiter is sent nothing, and the
128    /// `tx` is dropped. This notifies the waiter that the subscription does
129    /// not exist.
130    fn service_get_sub(&self, local_id: B256, tx: oneshot::Sender<RawSubscription>) {
131        if let Some(rx) = self.subs.get_subscription(local_id) {
132            let _ = tx.send(rx);
133        }
134    }
135
136    /// Service an unsubscribe instruction.
137    fn service_unsubscribe(&mut self, local_id: B256) -> TransportResult<()> {
138        if let Some(server_id) = self.subs.server_id_for(&local_id) {
139            // TODO: ideally we can send this with an unused id
140            let req = Request::new("eth_unsubscribe", Id::Number(1), [server_id]);
141            let brv = req.serialize().expect("no ser error").take_request();
142
143            self.dispatch_request(brv)?;
144        }
145        self.subs.remove_sub(local_id);
146        Ok(())
147    }
148
149    /// Service an instruction
150    fn service_ix(&mut self, ix: PubSubInstruction) -> TransportResult<()> {
151        trace!(?ix, "servicing instruction");
152        match ix {
153            PubSubInstruction::Request(in_flight) => self.service_request(in_flight),
154            PubSubInstruction::GetSub(alias, tx) => {
155                self.service_get_sub(alias, tx);
156                Ok(())
157            }
158            PubSubInstruction::Unsubscribe(alias) => self.service_unsubscribe(alias),
159        }
160    }
161
162    /// Handle an item from the backend.
163    fn handle_item(&mut self, item: PubSubItem) -> TransportResult<()> {
164        match item {
165            PubSubItem::Response(resp) => match self.in_flights.handle_response(resp) {
166                Some((server_id, in_flight)) => self.handle_sub_response(in_flight, server_id),
167                None => Ok(()),
168            },
169            PubSubItem::Notification(notification) => {
170                self.subs.notify(notification);
171                Ok(())
172            }
173        }
174    }
175
176    /// Rewrite the subscription id and insert into the subscriptions manager
177    fn handle_sub_response(
178        &mut self,
179        in_flight: InFlight,
180        server_id: SubId,
181    ) -> TransportResult<()> {
182        let request = in_flight.request;
183        let id = request.id().clone();
184
185        let sub = self.subs.upsert(request, server_id, in_flight.channel_size);
186
187        // Serialized B256 is always a valid serialized U256 too.
188        let ser_alias = to_json_raw_value(sub.local_id())?;
189
190        // We send back a success response with the new subscription ID.
191        // We don't care if the channel is dead.
192        let _ =
193            in_flight.tx.send(Ok(Response { id, payload: ResponsePayload::Success(ser_alias) }));
194
195        Ok(())
196    }
197
198    /// Attempt to reconnect with retries
199    async fn reconnect_with_retries(&mut self) -> TransportResult<()> {
200        let mut retry_count = 0;
201        let max_retries = self.handle.max_retries;
202        let interval = self.handle.retry_interval;
203        loop {
204            match self.reconnect().await {
205                Ok(()) => break Ok(()),
206                Err(e) => {
207                    retry_count += 1;
208                    if retry_count >= max_retries {
209                        error!("Reconnect failed after {max_retries} attempts, shutting down: {e}");
210                        break Err(e);
211                    }
212                    warn!(
213                        "Reconnection attempt {retry_count}/{max_retries} failed: {e}. \
214                         Retrying in {:?}s...",
215                        interval.as_secs_f64(),
216                    );
217                    sleep(interval).await;
218                }
219            }
220        }
221    }
222
223    /// Spawn the service.
224    pub(crate) fn spawn(mut self) {
225        let fut = async move {
226            let result: TransportResult<()> = loop {
227                // We bias the loop so that we always handle new messages before
228                // reconnecting, and always reconnect before dispatching new
229                // requests.
230                tokio::select! {
231                    biased;
232
233                    item_opt = self.handle.from_socket.recv() => {
234                        if let Some(item) = item_opt {
235                            if let Err(e) = self.handle_item(item) {
236                                break Err(e)
237                            }
238                        } else if let Err(e) = self.reconnect_with_retries().await {
239                            break Err(e)
240                        }
241                    }
242
243                    _ = &mut self.handle.error => {
244                        error!("Pubsub service backend error.");
245                        if let Err(e) = self.reconnect_with_retries().await {
246                            break Err(e)
247                        }
248                    }
249
250                    req_opt = self.reqs.recv() => {
251                        if let Some(req) = req_opt {
252                            if let Err(e) = self.service_ix(req) {
253                                break Err(e)
254                            }
255                        } else {
256                            info!("Pubsub service request channel closed. Shutting down.");
257                           break Ok(())
258                        }
259                    }
260                }
261            };
262
263            if let Err(err) = result {
264                error!(%err, "pubsub service reconnection error");
265            }
266        };
267        fut.spawn_task();
268    }
269}