alloy_pubsub/
service.rs

1use crate::{
2    handle::ConnectionHandle,
3    ix::PubSubInstruction,
4    managers::{InFlight, RequestManager, SubscriptionManager},
5    PubSubConnect, PubSubFrontend, RawSubscription,
6};
7use alloy_json_rpc::{Id, PubSubItem, Request, Response, ResponsePayload, SubId};
8use alloy_primitives::B256;
9use alloy_transport::{
10    utils::{to_json_raw_value, Spawnable},
11    TransportErrorKind, TransportResult,
12};
13use serde_json::value::RawValue;
14use tokio::sync::{mpsc, oneshot};
15
16/// The service contains the backend handle, a subscription manager, and the
17/// configuration details required to reconnect.
18#[derive(Debug)]
19pub(crate) struct PubSubService<T> {
20    /// The backend handle.
21    pub(crate) handle: ConnectionHandle,
22
23    /// The configuration details required to reconnect.
24    pub(crate) connector: T,
25
26    /// The inbound requests.
27    pub(crate) reqs: mpsc::UnboundedReceiver<PubSubInstruction>,
28
29    /// The subscription manager.
30    pub(crate) subs: SubscriptionManager,
31
32    /// The request manager.
33    pub(crate) in_flights: RequestManager,
34}
35
36impl<T: PubSubConnect> PubSubService<T> {
37    /// Create a new service from a connector.
38    pub(crate) async fn connect(connector: T) -> TransportResult<PubSubFrontend> {
39        let handle = connector.connect().await?;
40
41        let (tx, reqs) = mpsc::unbounded_channel();
42        let this = Self {
43            handle,
44            connector,
45            reqs,
46            subs: SubscriptionManager::default(),
47            in_flights: Default::default(),
48        };
49        this.spawn();
50        Ok(PubSubFrontend::new(tx))
51    }
52
53    /// Reconnect by dropping the backend and creating a new one.
54    async fn get_new_backend(&mut self) -> TransportResult<ConnectionHandle> {
55        let mut handle = self.connector.try_reconnect().await?;
56        std::mem::swap(&mut self.handle, &mut handle);
57        Ok(handle)
58    }
59
60    /// Reconnect the backend, re-issue pending requests, and re-start active
61    /// subscriptions.
62    async fn reconnect(&mut self) -> TransportResult<()> {
63        info!("Reconnecting pubsub service backend.");
64
65        let mut old_handle = self.get_new_backend().await?;
66
67        debug!("Draining old backend to_handle");
68
69        // Drain the old backend
70        while let Ok(item) = old_handle.from_socket.try_recv() {
71            self.handle_item(item)?;
72        }
73
74        old_handle.shutdown();
75
76        // Re-issue pending requests.
77        debug!(count = self.in_flights.len(), "Reissuing pending requests");
78        for (_, in_flight) in self.in_flights.iter() {
79            let msg = in_flight.request.serialized().to_owned();
80            // Same as `dispatch_request`, but inlined to avoid double-borrowing `self`.
81            self.handle.to_socket.send(msg).map_err(|_| TransportErrorKind::backend_gone())?;
82        }
83
84        // Re-subscribe to all active subscriptions
85        debug!(count = self.subs.len(), "Re-starting active subscriptions");
86
87        // Drop all server IDs. We'll re-insert them as we get responses.
88        self.subs.drop_server_ids();
89
90        // Dispatch all subscription requests.
91        for (_, sub) in self.subs.iter() {
92            let req = sub.request().to_owned();
93            // 0 is a dummy value, we don't care about the channel size here,
94            // as none of these will result in channel creation.
95            let (in_flight, _) = InFlight::new(req.clone(), 0);
96            self.in_flights.insert(in_flight);
97
98            let msg = req.into_serialized();
99            self.handle.to_socket.send(msg).map_err(|_| TransportErrorKind::backend_gone())?;
100        }
101
102        Ok(())
103    }
104
105    /// Dispatch a request to the socket.
106    fn dispatch_request(&self, brv: Box<RawValue>) -> TransportResult<()> {
107        self.handle.to_socket.send(brv).map(drop).map_err(|_| TransportErrorKind::backend_gone())
108    }
109
110    /// Service a request.
111    fn service_request(&mut self, in_flight: InFlight) -> TransportResult<()> {
112        let brv = in_flight.request();
113
114        self.dispatch_request(brv.serialized().to_owned())?;
115        self.in_flights.insert(in_flight);
116
117        Ok(())
118    }
119
120    /// Service a GetSub instruction.
121    ///
122    /// If the subscription exists, the waiter is sent a broadcast receiver. If
123    /// the subscription does not exist, the waiter is sent nothing, and the
124    /// `tx` is dropped. This notifies the waiter that the subscription does
125    /// not exist.
126    fn service_get_sub(&self, local_id: B256, tx: oneshot::Sender<RawSubscription>) {
127        if let Some(rx) = self.subs.get_subscription(local_id) {
128            let _ = tx.send(rx);
129        }
130    }
131
132    /// Service an unsubscribe instruction.
133    fn service_unsubscribe(&mut self, local_id: B256) -> TransportResult<()> {
134        if let Some(server_id) = self.subs.server_id_for(&local_id) {
135            let req = Request::new("eth_unsubscribe", Id::None, [server_id]);
136            let brv = req.serialize().expect("no ser error").take_request();
137
138            self.dispatch_request(brv)?;
139        }
140        self.subs.remove_sub(local_id);
141        Ok(())
142    }
143
144    /// Service an instruction
145    fn service_ix(&mut self, ix: PubSubInstruction) -> TransportResult<()> {
146        trace!(?ix, "servicing instruction");
147        match ix {
148            PubSubInstruction::Request(in_flight) => self.service_request(in_flight),
149            PubSubInstruction::GetSub(alias, tx) => {
150                self.service_get_sub(alias, tx);
151                Ok(())
152            }
153            PubSubInstruction::Unsubscribe(alias) => self.service_unsubscribe(alias),
154        }
155    }
156
157    /// Handle an item from the backend.
158    fn handle_item(&mut self, item: PubSubItem) -> TransportResult<()> {
159        match item {
160            PubSubItem::Response(resp) => match self.in_flights.handle_response(resp) {
161                Some((server_id, in_flight)) => self.handle_sub_response(in_flight, server_id),
162                None => Ok(()),
163            },
164            PubSubItem::Notification(notification) => {
165                self.subs.notify(notification);
166                Ok(())
167            }
168        }
169    }
170
171    /// Rewrite the subscription id and insert into the subscriptions manager
172    fn handle_sub_response(
173        &mut self,
174        in_flight: InFlight,
175        server_id: SubId,
176    ) -> TransportResult<()> {
177        let request = in_flight.request;
178        let id = request.id().clone();
179
180        let sub = self.subs.upsert(request, server_id, in_flight.channel_size);
181
182        // Serialized B256 is always a valid serialized U256 too.
183        let ser_alias = to_json_raw_value(sub.local_id())?;
184
185        // We send back a success response with the new subscription ID.
186        // We don't care if the channel is dead.
187        let _ =
188            in_flight.tx.send(Ok(Response { id, payload: ResponsePayload::Success(ser_alias) }));
189
190        Ok(())
191    }
192
193    /// Spawn the service.
194    pub(crate) fn spawn(mut self) {
195        let fut = async move {
196            let result: TransportResult<()> = loop {
197                // We bias the loop so that we always handle new messages before
198                // reconnecting, and always reconnect before dispatching new
199                // requests.
200                tokio::select! {
201                    biased;
202
203                    item_opt = self.handle.from_socket.recv() => {
204                        if let Some(item) = item_opt {
205                            if let Err(e) = self.handle_item(item) {
206                                break Err(e)
207                            }
208                        } else if let Err(e) = self.reconnect().await {
209                            break Err(e)
210                        }
211                    }
212
213                    _ = &mut self.handle.error => {
214                        error!("Pubsub service backend error.");
215                        if let Err(e) = self.reconnect().await {
216                            break Err(e)
217                        }
218                    }
219
220                    req_opt = self.reqs.recv() => {
221                        if let Some(req) = req_opt {
222                            if let Err(e) = self.service_ix(req) {
223                                break Err(e)
224                            }
225                        } else {
226                            info!("Pubsub service request channel closed. Shutting down.");
227                           break Ok(())
228                        }
229                    }
230                }
231            };
232
233            if let Err(err) = result {
234                error!(%err, "pubsub service reconnection error");
235            }
236        };
237        fut.spawn_task();
238    }
239}