1use crate::{
2 handle::ConnectionHandle,
3 ix::PubSubInstruction,
4 managers::{InFlight, RequestManager, SubscriptionManager},
5 PubSubConnect, PubSubFrontend, RawSubscription,
6};
7use alloy_json_rpc::{Id, PubSubItem, Request, Response, ResponsePayload, SubId};
8use alloy_primitives::B256;
9use alloy_transport::{
10 utils::{to_json_raw_value, Spawnable},
11 TransportErrorKind, TransportResult,
12};
13use serde_json::value::RawValue;
14use tokio::sync::{mpsc, oneshot};
15
16#[derive(Debug)]
19pub(crate) struct PubSubService<T> {
20 pub(crate) handle: ConnectionHandle,
22
23 pub(crate) connector: T,
25
26 pub(crate) reqs: mpsc::UnboundedReceiver<PubSubInstruction>,
28
29 pub(crate) subs: SubscriptionManager,
31
32 pub(crate) in_flights: RequestManager,
34}
35
36impl<T: PubSubConnect> PubSubService<T> {
37 pub(crate) async fn connect(connector: T) -> TransportResult<PubSubFrontend> {
39 let handle = connector.connect().await?;
40
41 let (tx, reqs) = mpsc::unbounded_channel();
42 let this = Self {
43 handle,
44 connector,
45 reqs,
46 subs: SubscriptionManager::default(),
47 in_flights: Default::default(),
48 };
49 this.spawn();
50 Ok(PubSubFrontend::new(tx))
51 }
52
53 async fn get_new_backend(&mut self) -> TransportResult<ConnectionHandle> {
55 let mut handle = self.connector.try_reconnect().await?;
56 std::mem::swap(&mut self.handle, &mut handle);
57 Ok(handle)
58 }
59
60 async fn reconnect(&mut self) -> TransportResult<()> {
63 info!("Reconnecting pubsub service backend.");
64
65 let mut old_handle = self.get_new_backend().await?;
66
67 debug!("Draining old backend to_handle");
68
69 while let Ok(item) = old_handle.from_socket.try_recv() {
71 self.handle_item(item)?;
72 }
73
74 old_handle.shutdown();
75
76 debug!(count = self.in_flights.len(), "Reissuing pending requests");
78 for (_, in_flight) in self.in_flights.iter() {
79 let msg = in_flight.request.serialized().to_owned();
80 self.handle.to_socket.send(msg).map_err(|_| TransportErrorKind::backend_gone())?;
82 }
83
84 debug!(count = self.subs.len(), "Re-starting active subscriptions");
86
87 self.subs.drop_server_ids();
89
90 for (_, sub) in self.subs.iter() {
92 let req = sub.request().to_owned();
93 let (in_flight, _) = InFlight::new(req.clone(), 0);
96 self.in_flights.insert(in_flight);
97
98 let msg = req.into_serialized();
99 self.handle.to_socket.send(msg).map_err(|_| TransportErrorKind::backend_gone())?;
100 }
101
102 Ok(())
103 }
104
105 fn dispatch_request(&self, brv: Box<RawValue>) -> TransportResult<()> {
107 self.handle.to_socket.send(brv).map(drop).map_err(|_| TransportErrorKind::backend_gone())
108 }
109
110 fn service_request(&mut self, in_flight: InFlight) -> TransportResult<()> {
112 let brv = in_flight.request();
113
114 self.dispatch_request(brv.serialized().to_owned())?;
115 self.in_flights.insert(in_flight);
116
117 Ok(())
118 }
119
120 fn service_get_sub(&self, local_id: B256, tx: oneshot::Sender<RawSubscription>) {
127 if let Some(rx) = self.subs.get_subscription(local_id) {
128 let _ = tx.send(rx);
129 }
130 }
131
132 fn service_unsubscribe(&mut self, local_id: B256) -> TransportResult<()> {
134 if let Some(server_id) = self.subs.server_id_for(&local_id) {
135 let req = Request::new("eth_unsubscribe", Id::None, [server_id]);
136 let brv = req.serialize().expect("no ser error").take_request();
137
138 self.dispatch_request(brv)?;
139 }
140 self.subs.remove_sub(local_id);
141 Ok(())
142 }
143
144 fn service_ix(&mut self, ix: PubSubInstruction) -> TransportResult<()> {
146 trace!(?ix, "servicing instruction");
147 match ix {
148 PubSubInstruction::Request(in_flight) => self.service_request(in_flight),
149 PubSubInstruction::GetSub(alias, tx) => {
150 self.service_get_sub(alias, tx);
151 Ok(())
152 }
153 PubSubInstruction::Unsubscribe(alias) => self.service_unsubscribe(alias),
154 }
155 }
156
157 fn handle_item(&mut self, item: PubSubItem) -> TransportResult<()> {
159 match item {
160 PubSubItem::Response(resp) => match self.in_flights.handle_response(resp) {
161 Some((server_id, in_flight)) => self.handle_sub_response(in_flight, server_id),
162 None => Ok(()),
163 },
164 PubSubItem::Notification(notification) => {
165 self.subs.notify(notification);
166 Ok(())
167 }
168 }
169 }
170
171 fn handle_sub_response(
173 &mut self,
174 in_flight: InFlight,
175 server_id: SubId,
176 ) -> TransportResult<()> {
177 let request = in_flight.request;
178 let id = request.id().clone();
179
180 let sub = self.subs.upsert(request, server_id, in_flight.channel_size);
181
182 let ser_alias = to_json_raw_value(sub.local_id())?;
184
185 let _ =
188 in_flight.tx.send(Ok(Response { id, payload: ResponsePayload::Success(ser_alias) }));
189
190 Ok(())
191 }
192
193 pub(crate) fn spawn(mut self) {
195 let fut = async move {
196 let result: TransportResult<()> = loop {
197 tokio::select! {
201 biased;
202
203 item_opt = self.handle.from_socket.recv() => {
204 if let Some(item) = item_opt {
205 if let Err(e) = self.handle_item(item) {
206 break Err(e)
207 }
208 } else if let Err(e) = self.reconnect().await {
209 break Err(e)
210 }
211 }
212
213 _ = &mut self.handle.error => {
214 error!("Pubsub service backend error.");
215 if let Err(e) = self.reconnect().await {
216 break Err(e)
217 }
218 }
219
220 req_opt = self.reqs.recv() => {
221 if let Some(req) = req_opt {
222 if let Err(e) = self.service_ix(req) {
223 break Err(e)
224 }
225 } else {
226 info!("Pubsub service request channel closed. Shutting down.");
227 break Ok(())
228 }
229 }
230 }
231 };
232
233 if let Err(err) = result {
234 error!(%err, "pubsub service reconnection error");
235 }
236 };
237 fut.spawn_task();
238 }
239}