solana_pubsub_client/nonblocking/pubsub_client.rs
1//! A client for subscribing to messages from the RPC server.
2//!
3//! The [`PubsubClient`] implements [Solana WebSocket event
4//! subscriptions][spec].
5//!
6//! [spec]: https://solana.com/docs/rpc/websocket
7//!
8//! This is a nonblocking (async) API. For a blocking API use the synchronous
9//! client in [`crate::pubsub_client`].
10//!
11//! A single `PubsubClient` client may be used to subscribe to many events via
12//! subscription methods like [`PubsubClient::account_subscribe`]. These methods
13//! return a [`PubsubClientResult`] of a pair, the first element being a
14//! [`BoxStream`] of subscription-specific [`RpcResponse`]s, the second being an
15//! unsubscribe closure, an asynchronous function that can be called and
16//! `await`ed to unsubscribe.
17//!
18//! Note that `BoxStream` contains an immutable reference to the `PubsubClient`
19//! that created it. This makes `BoxStream` not `Send`, forcing it to stay in
20//! the same task as its `PubsubClient`. `PubsubClient` though is `Send` and
21//! `Sync`, and can be shared between tasks by putting it in an `Arc`. Thus
22//! one viable pattern to creating multiple subscriptions is:
23//!
24//! - create an `Arc<PubsubClient>`
25//! - spawn one task for each subscription, sharing the `PubsubClient`.
26//! - in each task:
27//! - create a subscription
28//! - send the `UnsubscribeFn` to another task to handle shutdown
29//! - loop while receiving messages from the subscription
30//!
31//! This pattern is illustrated in the example below.
32//!
33//! By default the [`block_subscribe`] and [`vote_subscribe`] events are
34//! disabled on RPC nodes. They can be enabled by passing
35//! `--rpc-pubsub-enable-block-subscription` and
36//! `--rpc-pubsub-enable-vote-subscription` to `agave-validator`. When these
37//! methods are disabled, the RPC server will return a "Method not found" error
38//! message.
39//!
40//! [`block_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.block_subscribe
41//! [`vote_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.vote_subscribe
42//!
43//! # Examples
44//!
45//! Demo two async `PubsubClient` subscriptions with clean shutdown.
46//!
47//! This spawns a task for each subscription type, each of which subscribes and
48//! sends back a ready message and an unsubscribe channel (closure), then loops
49//! on printing messages. The main task then waits for user input before
50//! unsubscribing and waiting on the tasks.
51//!
52//! ```
53//! use anyhow::Result;
54//! use futures_util::StreamExt;
55//! use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
56//! use std::sync::Arc;
57//! use tokio::io::AsyncReadExt;
58//! use tokio::sync::mpsc::unbounded_channel;
59//!
60//! pub async fn watch_subscriptions(
61//! websocket_url: &str,
62//! ) -> Result<()> {
63//!
64//! // Subscription tasks will send a ready signal when they have subscribed.
65//! let (ready_sender, mut ready_receiver) = unbounded_channel::<()>();
66//!
67//! // Channel to receive unsubscribe channels (actually closures).
68//! // These receive a pair of `(Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>), &'static str)`,
69//! // where the first is a closure to call to unsubscribe, the second is the subscription name.
70//! let (unsubscribe_sender, mut unsubscribe_receiver) = unbounded_channel::<(_, &'static str)>();
71//!
72//! // The `PubsubClient` must be `Arc`ed to share it across tasks.
73//! let pubsub_client = Arc::new(PubsubClient::new(websocket_url).await?);
74//!
75//! let mut join_handles = vec![];
76//!
77//! join_handles.push(("slot", tokio::spawn({
78//! // Clone things we need before moving their clones into the `async move` block.
79//! //
80//! // The subscriptions have to be made from the tasks that will receive the subscription messages,
81//! // because the subscription streams hold a reference to the `PubsubClient`.
82//! // Otherwise we would just subscribe on the main task and send the receivers out to other tasks.
83//!
84//! let ready_sender = ready_sender.clone();
85//! let unsubscribe_sender = unsubscribe_sender.clone();
86//! let pubsub_client = Arc::clone(&pubsub_client);
87//! async move {
88//! let (mut slot_notifications, slot_unsubscribe) =
89//! pubsub_client.slot_subscribe().await?;
90//!
91//! // With the subscription started,
92//! // send a signal back to the main task for synchronization.
93//! ready_sender.send(()).expect("channel");
94//!
95//! // Send the unsubscribe closure back to the main task.
96//! unsubscribe_sender.send((slot_unsubscribe, "slot"))
97//! .map_err(|e| format!("{}", e)).expect("channel");
98//!
99//! // Drop senders so that the channels can close.
100//! // The main task will receive until channels are closed.
101//! drop((ready_sender, unsubscribe_sender));
102//!
103//! // Do something with the subscribed messages.
104//! // This loop will end once the main task unsubscribes.
105//! while let Some(slot_info) = slot_notifications.next().await {
106//! println!("------------------------------------------------------------");
107//! println!("slot pubsub result: {:?}", slot_info);
108//! }
109//!
110//! // This type hint is necessary to allow the `async move` block to use `?`.
111//! Ok::<_, anyhow::Error>(())
112//! }
113//! })));
114//!
115//! join_handles.push(("root", tokio::spawn({
116//! let ready_sender = ready_sender.clone();
117//! let unsubscribe_sender = unsubscribe_sender.clone();
118//! let pubsub_client = Arc::clone(&pubsub_client);
119//! async move {
120//! let (mut root_notifications, root_unsubscribe) =
121//! pubsub_client.root_subscribe().await?;
122//!
123//! ready_sender.send(()).expect("channel");
124//! unsubscribe_sender.send((root_unsubscribe, "root"))
125//! .map_err(|e| format!("{}", e)).expect("channel");
126//! drop((ready_sender, unsubscribe_sender));
127//!
128//! while let Some(root) = root_notifications.next().await {
129//! println!("------------------------------------------------------------");
130//! println!("root pubsub result: {:?}", root);
131//! }
132//!
133//! Ok::<_, anyhow::Error>(())
134//! }
135//! })));
136//!
137//! // Drop these senders so that the channels can close
138//! // and their receivers return `None` below.
139//! drop(ready_sender);
140//! drop(unsubscribe_sender);
141//!
142//! // Wait until all subscribers are ready before proceeding with application logic.
143//! while let Some(_) = ready_receiver.recv().await { }
144//!
145//! // Do application logic here.
146//!
147//! // Wait for input or some application-specific shutdown condition.
148//! tokio::io::stdin().read_u8().await?;
149//!
150//! // Unsubscribe from everything, which will shutdown all the tasks.
151//! while let Some((unsubscribe, name)) = unsubscribe_receiver.recv().await {
152//! println!("unsubscribing from {}", name);
153//! unsubscribe().await
154//! }
155//!
156//! // Wait for the tasks.
157//! for (name, handle) in join_handles {
158//! println!("waiting on task {}", name);
159//! if let Ok(Err(e)) = handle.await {
160//! println!("task {} failed: {}", name, e);
161//! }
162//! }
163//!
164//! Ok(())
165//! }
166//! # Ok::<(), anyhow::Error>(())
167//! ```
168
169use {
170 futures_util::{
171 future::{ready, BoxFuture, FutureExt},
172 sink::SinkExt,
173 stream::{BoxStream, StreamExt},
174 },
175 log::*,
176 serde::de::DeserializeOwned,
177 serde_json::{json, Map, Value},
178 solana_account_decoder_client_types::UiAccount,
179 solana_clock::Slot,
180 solana_pubkey::Pubkey,
181 solana_rpc_client_types::{
182 config::{
183 RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
184 RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
185 RpcTransactionLogsFilter,
186 },
187 error_object::RpcErrorObject,
188 response::{
189 Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
190 RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
191 },
192 },
193 solana_signature::Signature,
194 std::collections::BTreeMap,
195 thiserror::Error,
196 tokio::{
197 net::TcpStream,
198 sync::{mpsc, oneshot},
199 task::JoinHandle,
200 time::{sleep, Duration},
201 },
202 tokio_stream::wrappers::UnboundedReceiverStream,
203 tokio_tungstenite::{
204 connect_async,
205 tungstenite::{
206 protocol::frame::{coding::CloseCode, CloseFrame},
207 Message,
208 },
209 MaybeTlsStream, WebSocketStream,
210 },
211 tungstenite::{
212 client::IntoClientRequest,
213 http::{header, StatusCode},
214 Bytes,
215 },
216};
217
218pub type PubsubClientResult<T = ()> = Result<T, PubsubClientError>;
219
220#[derive(Debug, Error)]
221pub enum PubsubClientError {
222 #[error("url parse error")]
223 UrlParseError(#[from] url::ParseError),
224
225 #[error("unable to connect to server")]
226 ConnectionError(Box<tokio_tungstenite::tungstenite::Error>),
227
228 #[error("websocket error")]
229 WsError(#[from] Box<tokio_tungstenite::tungstenite::Error>),
230
231 #[error("connection closed (({0})")]
232 ConnectionClosed(String),
233
234 #[error("json parse error")]
235 JsonParseError(#[from] serde_json::error::Error),
236
237 #[error("subscribe failed: {reason}")]
238 SubscribeFailed { reason: String, message: String },
239
240 #[error("unexpected message format: {0}")]
241 UnexpectedMessageError(String),
242
243 #[error("request failed: {reason}")]
244 RequestFailed { reason: String, message: String },
245
246 #[error("request error: {0}")]
247 RequestError(String),
248
249 #[error("could not find subscription id: {0}")]
250 UnexpectedSubscriptionResponse(String),
251
252 #[error("could not find node version: {0}")]
253 UnexpectedGetVersionResponse(String),
254}
255
256type UnsubscribeFn = Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>;
257type SubscribeResponseMsg =
258 Result<(mpsc::UnboundedReceiver<Value>, UnsubscribeFn), PubsubClientError>;
259type SubscribeRequestMsg = (String, Value, oneshot::Sender<SubscribeResponseMsg>);
260type SubscribeResult<'a, T> = PubsubClientResult<(BoxStream<'a, T>, UnsubscribeFn)>;
261type RequestMsg = (
262 String,
263 Value,
264 oneshot::Sender<Result<Value, PubsubClientError>>,
265);
266
267/// A client for subscribing to messages from the RPC server.
268///
269/// See the [module documentation][self].
270#[derive(Debug)]
271pub struct PubsubClient {
272 subscribe_sender: mpsc::UnboundedSender<SubscribeRequestMsg>,
273 _request_sender: mpsc::UnboundedSender<RequestMsg>,
274 shutdown_sender: oneshot::Sender<()>,
275 ws: JoinHandle<PubsubClientResult>,
276}
277
278async fn connect_with_retry<R: IntoClientRequest>(
279 request: R,
280) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, Box<tungstenite::Error>> {
281 let mut connection_retries = 5;
282 let client_request = request.into_client_request().map_err(Box::new)?;
283 loop {
284 let result = connect_async(client_request.clone())
285 .await
286 .map(|(socket, _)| socket);
287 if let Err(tungstenite::Error::Http(response)) = &result {
288 if response.status() == StatusCode::TOO_MANY_REQUESTS && connection_retries > 0 {
289 let mut duration = Duration::from_millis(500);
290 if let Some(retry_after) = response.headers().get(header::RETRY_AFTER) {
291 if let Ok(retry_after) = retry_after.to_str() {
292 if let Ok(retry_after) = retry_after.parse::<u64>() {
293 if retry_after < 120 {
294 duration = Duration::from_secs(retry_after);
295 }
296 }
297 }
298 }
299
300 connection_retries -= 1;
301 debug!(
302 "Too many requests: server responded with {response:?}, {connection_retries} \
303 retries left, pausing for {duration:?}"
304 );
305
306 sleep(duration).await;
307 continue;
308 }
309 }
310 return result.map_err(Box::new);
311 }
312}
313
314impl PubsubClient {
315 pub async fn new<R: IntoClientRequest>(request: R) -> PubsubClientResult<Self> {
316 let client_request = request.into_client_request().map_err(Box::new)?;
317 let ws = connect_with_retry(client_request)
318 .await
319 .map_err(PubsubClientError::ConnectionError)?;
320
321 let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel();
322 let (_request_sender, request_receiver) = mpsc::unbounded_channel();
323 let (shutdown_sender, shutdown_receiver) = oneshot::channel();
324
325 #[allow(clippy::used_underscore_binding)]
326 Ok(Self {
327 subscribe_sender,
328 _request_sender,
329 shutdown_sender,
330 ws: tokio::spawn(PubsubClient::run_ws(
331 ws,
332 subscribe_receiver,
333 request_receiver,
334 shutdown_receiver,
335 )),
336 })
337 }
338
339 pub async fn shutdown(self) -> PubsubClientResult {
340 let _ = self.shutdown_sender.send(());
341 self.ws.await.unwrap() // WS future should not be cancelled or panicked
342 }
343
344 async fn subscribe<'a, T>(&self, operation: &str, params: Value) -> SubscribeResult<'a, T>
345 where
346 T: DeserializeOwned + Send + 'a,
347 {
348 let (response_sender, response_receiver) = oneshot::channel();
349 self.subscribe_sender
350 .send((operation.to_string(), params, response_sender))
351 .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))?;
352
353 let (notifications, unsubscribe) = response_receiver
354 .await
355 .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))??;
356 Ok((
357 UnboundedReceiverStream::new(notifications)
358 .filter_map(|value| ready(serde_json::from_value::<T>(value).ok()))
359 .boxed(),
360 unsubscribe,
361 ))
362 }
363
364 /// Subscribe to account events.
365 ///
366 /// Receives messages of type [`UiAccount`] when an account's lamports or data changes.
367 ///
368 /// # RPC Reference
369 ///
370 /// This method corresponds directly to the [`accountSubscribe`] RPC method.
371 ///
372 /// [`accountSubscribe`]: https://solana.com/docs/rpc/websocket#accountsubscribe
373 pub async fn account_subscribe(
374 &self,
375 pubkey: &Pubkey,
376 config: Option<RpcAccountInfoConfig>,
377 ) -> SubscribeResult<'_, RpcResponse<UiAccount>> {
378 let params = json!([pubkey.to_string(), config]);
379 self.subscribe("account", params).await
380 }
381
382 /// Subscribe to block events.
383 ///
384 /// Receives messages of type [`RpcBlockUpdate`] when a block is confirmed or finalized.
385 ///
386 /// This method is disabled by default. It can be enabled by passing
387 /// `--rpc-pubsub-enable-block-subscription` to `agave-validator`.
388 ///
389 /// # RPC Reference
390 ///
391 /// This method corresponds directly to the [`blockSubscribe`] RPC method.
392 ///
393 /// [`blockSubscribe`]: https://solana.com/docs/rpc/websocket#blocksubscribe
394 pub async fn block_subscribe(
395 &self,
396 filter: RpcBlockSubscribeFilter,
397 config: Option<RpcBlockSubscribeConfig>,
398 ) -> SubscribeResult<'_, RpcResponse<RpcBlockUpdate>> {
399 self.subscribe("block", json!([filter, config])).await
400 }
401
402 /// Subscribe to transaction log events.
403 ///
404 /// Receives messages of type [`RpcLogsResponse`] when a transaction is committed.
405 ///
406 /// # RPC Reference
407 ///
408 /// This method corresponds directly to the [`logsSubscribe`] RPC method.
409 ///
410 /// [`logsSubscribe`]: https://solana.com/docs/rpc/websocket#logssubscribe
411 pub async fn logs_subscribe(
412 &self,
413 filter: RpcTransactionLogsFilter,
414 config: RpcTransactionLogsConfig,
415 ) -> SubscribeResult<'_, RpcResponse<RpcLogsResponse>> {
416 self.subscribe("logs", json!([filter, config])).await
417 }
418
419 /// Subscribe to program account events.
420 ///
421 /// Receives messages of type [`RpcKeyedAccount`] when an account owned
422 /// by the given program changes.
423 ///
424 /// # RPC Reference
425 ///
426 /// This method corresponds directly to the [`programSubscribe`] RPC method.
427 ///
428 /// [`programSubscribe`]: https://solana.com/docs/rpc/websocket#programsubscribe
429 pub async fn program_subscribe(
430 &self,
431 pubkey: &Pubkey,
432 config: Option<RpcProgramAccountsConfig>,
433 ) -> SubscribeResult<'_, RpcResponse<RpcKeyedAccount>> {
434 let params = json!([pubkey.to_string(), config]);
435 self.subscribe("program", params).await
436 }
437
438 /// Subscribe to vote events.
439 ///
440 /// Receives messages of type [`RpcVote`] when a new vote is observed. These
441 /// votes are observed prior to confirmation and may never be confirmed.
442 ///
443 /// This method is disabled by default. It can be enabled by passing
444 /// `--rpc-pubsub-enable-vote-subscription` to `agave-validator`.
445 ///
446 /// # RPC Reference
447 ///
448 /// This method corresponds directly to the [`voteSubscribe`] RPC method.
449 ///
450 /// [`voteSubscribe`]: https://solana.com/docs/rpc/websocket#votesubscribe
451 pub async fn vote_subscribe(&self) -> SubscribeResult<'_, RpcVote> {
452 self.subscribe("vote", json!([])).await
453 }
454
455 /// Subscribe to root events.
456 ///
457 /// Receives messages of type [`Slot`] when a new [root] is set by the
458 /// validator.
459 ///
460 /// [root]: https://solana.com/docs/terminology#root
461 ///
462 /// # RPC Reference
463 ///
464 /// This method corresponds directly to the [`rootSubscribe`] RPC method.
465 ///
466 /// [`rootSubscribe`]: https://solana.com/docs/rpc/websocket#rootsubscribe
467 pub async fn root_subscribe(&self) -> SubscribeResult<'_, Slot> {
468 self.subscribe("root", json!([])).await
469 }
470
471 /// Subscribe to transaction confirmation events.
472 ///
473 /// Receives messages of type [`RpcSignatureResult`] when a transaction
474 /// with the given signature is committed.
475 ///
476 /// This is a subscription to a single notification. It is automatically
477 /// cancelled by the server once the notification is sent.
478 ///
479 /// # RPC Reference
480 ///
481 /// This method corresponds directly to the [`signatureSubscribe`] RPC method.
482 ///
483 /// [`signatureSubscribe`]: https://solana.com/docs/rpc/websocket#signaturesubscribe
484 pub async fn signature_subscribe(
485 &self,
486 signature: &Signature,
487 config: Option<RpcSignatureSubscribeConfig>,
488 ) -> SubscribeResult<'_, RpcResponse<RpcSignatureResult>> {
489 let params = json!([signature.to_string(), config]);
490 self.subscribe("signature", params).await
491 }
492
493 /// Subscribe to slot events.
494 ///
495 /// Receives messages of type [`SlotInfo`] when a slot is processed.
496 ///
497 /// # RPC Reference
498 ///
499 /// This method corresponds directly to the [`slotSubscribe`] RPC method.
500 ///
501 /// [`slotSubscribe`]: https://solana.com/docs/rpc/websocket#slotsubscribe
502 pub async fn slot_subscribe(&self) -> SubscribeResult<'_, SlotInfo> {
503 self.subscribe("slot", json!([])).await
504 }
505
506 /// Subscribe to slot update events.
507 ///
508 /// Receives messages of type [`SlotUpdate`] when various updates to a slot occur.
509 ///
510 /// Note that this method operates differently than other subscriptions:
511 /// instead of sending the message to a receiver on a channel, it accepts a
512 /// `handler` callback that processes the message directly. This processing
513 /// occurs on another thread.
514 ///
515 /// # RPC Reference
516 ///
517 /// This method corresponds directly to the [`slotUpdatesSubscribe`] RPC method.
518 ///
519 /// [`slotUpdatesSubscribe`]: https://solana.com/docs/rpc/websocket#slotsupdatessubscribe
520 pub async fn slot_updates_subscribe(&self) -> SubscribeResult<'_, SlotUpdate> {
521 self.subscribe("slotsUpdates", json!([])).await
522 }
523
524 async fn run_ws(
525 mut ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
526 mut subscribe_receiver: mpsc::UnboundedReceiver<SubscribeRequestMsg>,
527 mut request_receiver: mpsc::UnboundedReceiver<RequestMsg>,
528 mut shutdown_receiver: oneshot::Receiver<()>,
529 ) -> PubsubClientResult {
530 let mut request_id: u64 = 0;
531
532 let mut requests_subscribe = BTreeMap::new();
533 let mut requests_unsubscribe = BTreeMap::<u64, oneshot::Sender<()>>::new();
534 let mut other_requests = BTreeMap::new();
535 let mut subscriptions = BTreeMap::new();
536 let (unsubscribe_sender, mut unsubscribe_receiver) = mpsc::unbounded_channel();
537
538 loop {
539 tokio::select! {
540 // Send close on shutdown signal
541 _ = (&mut shutdown_receiver) => {
542 let frame = CloseFrame { code: CloseCode::Normal, reason: "".into() };
543 ws.send(Message::Close(Some(frame))).await.map_err(Box::new)?;
544 ws.flush().await.map_err(Box::new)?;
545 break;
546 },
547 // Send `Message::Ping` each 10s if no any other communication
548 () = sleep(Duration::from_secs(10)) => {
549 ws.send(Message::Ping(Bytes::new())).await.map_err(Box::new)?;
550 },
551 // Read message for subscribe
552 Some((operation, params, response_sender)) = subscribe_receiver.recv() => {
553 request_id += 1;
554 let method = format!("{operation}Subscribe");
555 let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string();
556 ws.send(Message::Text(text.into())).await.map_err(Box::new)?;
557 requests_subscribe.insert(request_id, (operation, response_sender));
558 },
559 // Read message for unsubscribe
560 Some((operation, sid, response_sender)) = unsubscribe_receiver.recv() => {
561 subscriptions.remove(&sid);
562 request_id += 1;
563 let method = format!("{operation}Unsubscribe");
564 let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":[sid]}).to_string();
565 ws.send(Message::Text(text.into())).await.map_err(Box::new)?;
566 requests_unsubscribe.insert(request_id, response_sender);
567 },
568 // Read message for other requests
569 Some((method, params, response_sender)) = request_receiver.recv() => {
570 request_id += 1;
571 let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string();
572 ws.send(Message::Text(text.into())).await.map_err(Box::new)?;
573 other_requests.insert(request_id, response_sender);
574 }
575 // Read incoming WebSocket message
576 next_msg = ws.next() => {
577 let msg = match next_msg {
578 Some(msg) => msg.map_err(Box::new)?,
579 None => break,
580 };
581 trace!("ws.next(): {:?}", &msg);
582
583 // Get text from the message
584 let text = match msg {
585 Message::Text(text) => text,
586 Message::Binary(_data) => continue, // Ignore
587 Message::Ping(data) => {
588 ws.send(Message::Pong(data)).await.map_err(Box::new)?;
589 continue
590 },
591 Message::Pong(_data) => continue,
592 Message::Close(_frame) => break,
593 Message::Frame(_frame) => continue,
594 };
595
596
597 let mut json: Map<String, Value> = serde_json::from_str(&text)?;
598
599 // Subscribe/Unsubscribe response, example:
600 // `{"jsonrpc":"2.0","result":5308752,"id":1}`
601 if let Some(id) = json.get("id") {
602 let id = id.as_u64().ok_or_else(|| {
603 PubsubClientError::SubscribeFailed { reason: "invalid `id` field".into(), message: text.to_string() }
604 })?;
605
606 let err = json.get("error").map(|error_object| {
607 match serde_json::from_value::<RpcErrorObject>(error_object.clone()) {
608 Ok(rpc_error_object) => {
609 format!("{} ({})", rpc_error_object.message, rpc_error_object.code)
610 }
611 Err(err) => format!(
612 "Failed to deserialize RPC error response: {} [{}]",
613 serde_json::to_string(error_object).unwrap(),
614 err
615 )
616 }
617 });
618
619 if let Some(response_sender) = other_requests.remove(&id) {
620 match err {
621 Some(reason) => {
622 let _ = response_sender.send(Err(PubsubClientError::RequestFailed { reason, message: text.to_string()}));
623 },
624 None => {
625 let json_result = json.get("result").ok_or_else(|| {
626 PubsubClientError::RequestFailed { reason: "missing `result` field".into(), message: text.to_string() }
627 })?;
628 if response_sender.send(Ok(json_result.clone())).is_err() {
629 break;
630 }
631 }
632 }
633 } else if let Some(response_sender) = requests_unsubscribe.remove(&id) {
634 let _ = response_sender.send(()); // do not care if receiver is closed
635 } else if let Some((operation, response_sender)) = requests_subscribe.remove(&id) {
636 match err {
637 Some(reason) => {
638 let _ = response_sender.send(Err(PubsubClientError::SubscribeFailed { reason, message: text.to_string()}));
639 },
640 None => {
641 // Subscribe Id
642 let sid = json.get("result").and_then(Value::as_u64).ok_or_else(|| {
643 PubsubClientError::SubscribeFailed { reason: "invalid `result` field".into(), message: text.to_string() }
644 })?;
645
646 // Create notifications channel and unsubscribe function
647 let (notifications_sender, notifications_receiver) = mpsc::unbounded_channel();
648 let unsubscribe_sender = unsubscribe_sender.clone();
649 let unsubscribe = Box::new(move || async move {
650 let (response_sender, response_receiver) = oneshot::channel();
651 // do nothing if ws already closed
652 if unsubscribe_sender.send((operation, sid, response_sender)).is_ok() {
653 let _ = response_receiver.await; // channel can be closed only if ws is closed
654 }
655 }.boxed());
656
657 if response_sender.send(Ok((notifications_receiver, unsubscribe))).is_err() {
658 break;
659 }
660 subscriptions.insert(sid, notifications_sender);
661 }
662 }
663 } else {
664 error!("Unknown request id: {id}");
665 break;
666 }
667 continue;
668 }
669
670 // Notification, example:
671 // `{"jsonrpc":"2.0","method":"logsNotification","params":{"result":{...},"subscription":3114862}}`
672 if let Some(Value::Object(params)) = json.get_mut("params") {
673 if let Some(sid) = params.get("subscription").and_then(Value::as_u64) {
674 let mut unsubscribe_required = false;
675
676 if let Some(notifications_sender) = subscriptions.get(&sid) {
677 if let Some(result) = params.remove("result") {
678 if notifications_sender.send(result).is_err() {
679 unsubscribe_required = true;
680 }
681 }
682 } else {
683 unsubscribe_required = true;
684 }
685
686 if unsubscribe_required {
687 if let Some(Value::String(method)) = json.remove("method") {
688 if let Some(operation) = method.strip_suffix("Notification") {
689 let (response_sender, _response_receiver) = oneshot::channel();
690 let _ = unsubscribe_sender.send((operation.to_string(), sid, response_sender));
691 }
692 }
693 }
694 }
695 }
696 }
697 }
698 }
699
700 Ok(())
701 }
702}
703
704#[cfg(test)]
705mod tests {
706 // see client-test/test/client.rs
707}