solana_pubsub_client/nonblocking/
pubsub_client.rs

1//! A client for subscribing to messages from the RPC server.
2//!
3//! The [`PubsubClient`] implements [Solana WebSocket event
4//! subscriptions][spec].
5//!
6//! [spec]: https://solana.com/docs/rpc/websocket
7//!
8//! This is a nonblocking (async) API. For a blocking API use the synchronous
9//! client in [`crate::pubsub_client`].
10//!
11//! A single `PubsubClient` client may be used to subscribe to many events via
12//! subscription methods like [`PubsubClient::account_subscribe`]. These methods
13//! return a [`PubsubClientResult`] of a pair, the first element being a
14//! [`BoxStream`] of subscription-specific [`RpcResponse`]s, the second being an
15//! unsubscribe closure, an asynchronous function that can be called and
16//! `await`ed to unsubscribe.
17//!
18//! Note that `BoxStream` contains an immutable reference to the `PubsubClient`
19//! that created it. This makes `BoxStream` not `Send`, forcing it to stay in
20//! the same task as its `PubsubClient`. `PubsubClient` though is `Send` and
21//! `Sync`, and can be shared between tasks by putting it in an `Arc`. Thus
22//! one viable pattern to creating multiple subscriptions is:
23//!
24//! - create an `Arc<PubsubClient>`
25//! - spawn one task for each subscription, sharing the `PubsubClient`.
26//! - in each task:
27//!   - create a subscription
28//!   - send the `UnsubscribeFn` to another task to handle shutdown
29//!   - loop while receiving messages from the subscription
30//!
31//! This pattern is illustrated in the example below.
32//!
33//! By default the [`block_subscribe`] and [`vote_subscribe`] events are
34//! disabled on RPC nodes. They can be enabled by passing
35//! `--rpc-pubsub-enable-block-subscription` and
36//! `--rpc-pubsub-enable-vote-subscription` to `agave-validator`. When these
37//! methods are disabled, the RPC server will return a "Method not found" error
38//! message.
39//!
40//! [`block_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.block_subscribe
41//! [`vote_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.vote_subscribe
42//!
43//! # Examples
44//!
45//! Demo two async `PubsubClient` subscriptions with clean shutdown.
46//!
47//! This spawns a task for each subscription type, each of which subscribes and
48//! sends back a ready message and an unsubscribe channel (closure), then loops
49//! on printing messages. The main task then waits for user input before
50//! unsubscribing and waiting on the tasks.
51//!
52//! ```
53//! use anyhow::Result;
54//! use futures_util::StreamExt;
55//! use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
56//! use std::sync::Arc;
57//! use tokio::io::AsyncReadExt;
58//! use tokio::sync::mpsc::unbounded_channel;
59//!
60//! pub async fn watch_subscriptions(
61//!     websocket_url: &str,
62//! ) -> Result<()> {
63//!
64//!     // Subscription tasks will send a ready signal when they have subscribed.
65//!     let (ready_sender, mut ready_receiver) = unbounded_channel::<()>();
66//!
67//!     // Channel to receive unsubscribe channels (actually closures).
68//!     // These receive a pair of `(Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>), &'static str)`,
69//!     // where the first is a closure to call to unsubscribe, the second is the subscription name.
70//!     let (unsubscribe_sender, mut unsubscribe_receiver) = unbounded_channel::<(_, &'static str)>();
71//!
72//!     // The `PubsubClient` must be `Arc`ed to share it across tasks.
73//!     let pubsub_client = Arc::new(PubsubClient::new(websocket_url).await?);
74//!
75//!     let mut join_handles = vec![];
76//!
77//!     join_handles.push(("slot", tokio::spawn({
78//!         // Clone things we need before moving their clones into the `async move` block.
79//!         //
80//!         // The subscriptions have to be made from the tasks that will receive the subscription messages,
81//!         // because the subscription streams hold a reference to the `PubsubClient`.
82//!         // Otherwise we would just subscribe on the main task and send the receivers out to other tasks.
83//!
84//!         let ready_sender = ready_sender.clone();
85//!         let unsubscribe_sender = unsubscribe_sender.clone();
86//!         let pubsub_client = Arc::clone(&pubsub_client);
87//!         async move {
88//!             let (mut slot_notifications, slot_unsubscribe) =
89//!                 pubsub_client.slot_subscribe().await?;
90//!
91//!             // With the subscription started,
92//!             // send a signal back to the main task for synchronization.
93//!             ready_sender.send(()).expect("channel");
94//!
95//!             // Send the unsubscribe closure back to the main task.
96//!             unsubscribe_sender.send((slot_unsubscribe, "slot"))
97//!                 .map_err(|e| format!("{}", e)).expect("channel");
98//!
99//!             // Drop senders so that the channels can close.
100//!             // The main task will receive until channels are closed.
101//!             drop((ready_sender, unsubscribe_sender));
102//!
103//!             // Do something with the subscribed messages.
104//!             // This loop will end once the main task unsubscribes.
105//!             while let Some(slot_info) = slot_notifications.next().await {
106//!                 println!("------------------------------------------------------------");
107//!                 println!("slot pubsub result: {:?}", slot_info);
108//!             }
109//!
110//!             // This type hint is necessary to allow the `async move` block to use `?`.
111//!             Ok::<_, anyhow::Error>(())
112//!         }
113//!     })));
114//!
115//!     join_handles.push(("root", tokio::spawn({
116//!         let ready_sender = ready_sender.clone();
117//!         let unsubscribe_sender = unsubscribe_sender.clone();
118//!         let pubsub_client = Arc::clone(&pubsub_client);
119//!         async move {
120//!             let (mut root_notifications, root_unsubscribe) =
121//!                 pubsub_client.root_subscribe().await?;
122//!
123//!             ready_sender.send(()).expect("channel");
124//!             unsubscribe_sender.send((root_unsubscribe, "root"))
125//!                 .map_err(|e| format!("{}", e)).expect("channel");
126//!             drop((ready_sender, unsubscribe_sender));
127//!
128//!             while let Some(root) = root_notifications.next().await {
129//!                 println!("------------------------------------------------------------");
130//!                 println!("root pubsub result: {:?}", root);
131//!             }
132//!
133//!             Ok::<_, anyhow::Error>(())
134//!         }
135//!     })));
136//!
137//!     // Drop these senders so that the channels can close
138//!     // and their receivers return `None` below.
139//!     drop(ready_sender);
140//!     drop(unsubscribe_sender);
141//!
142//!     // Wait until all subscribers are ready before proceeding with application logic.
143//!     while let Some(_) = ready_receiver.recv().await { }
144//!
145//!     // Do application logic here.
146//!
147//!     // Wait for input or some application-specific shutdown condition.
148//!     tokio::io::stdin().read_u8().await?;
149//!
150//!     // Unsubscribe from everything, which will shutdown all the tasks.
151//!     while let Some((unsubscribe, name)) = unsubscribe_receiver.recv().await {
152//!         println!("unsubscribing from {}", name);
153//!         unsubscribe().await
154//!     }
155//!
156//!     // Wait for the tasks.
157//!     for (name, handle) in join_handles {
158//!         println!("waiting on task {}", name);
159//!         if let Ok(Err(e)) = handle.await {
160//!             println!("task {} failed: {}", name, e);
161//!         }
162//!     }
163//!
164//!     Ok(())
165//! }
166//! # Ok::<(), anyhow::Error>(())
167//! ```
168
169use {
170    futures_util::{
171        future::{ready, BoxFuture, FutureExt},
172        sink::SinkExt,
173        stream::{BoxStream, StreamExt},
174    },
175    log::*,
176    serde::de::DeserializeOwned,
177    serde_json::{json, Map, Value},
178    solana_account_decoder_client_types::UiAccount,
179    solana_clock::Slot,
180    solana_pubkey::Pubkey,
181    solana_rpc_client_types::{
182        config::{
183            RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
184            RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
185            RpcTransactionLogsFilter,
186        },
187        error_object::RpcErrorObject,
188        response::{
189            Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
190            RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
191        },
192    },
193    solana_signature::Signature,
194    std::collections::BTreeMap,
195    thiserror::Error,
196    tokio::{
197        net::TcpStream,
198        sync::{mpsc, oneshot},
199        task::JoinHandle,
200        time::{sleep, Duration},
201    },
202    tokio_stream::wrappers::UnboundedReceiverStream,
203    tokio_tungstenite::{
204        connect_async,
205        tungstenite::{
206            protocol::frame::{coding::CloseCode, CloseFrame},
207            Message,
208        },
209        MaybeTlsStream, WebSocketStream,
210    },
211    tungstenite::{
212        client::IntoClientRequest,
213        http::{header, StatusCode},
214        Bytes,
215    },
216};
217
218pub type PubsubClientResult<T = ()> = Result<T, PubsubClientError>;
219
220#[derive(Debug, Error)]
221pub enum PubsubClientError {
222    #[error("url parse error")]
223    UrlParseError(#[from] url::ParseError),
224
225    #[error("unable to connect to server")]
226    ConnectionError(Box<tokio_tungstenite::tungstenite::Error>),
227
228    #[error("websocket error")]
229    WsError(#[from] Box<tokio_tungstenite::tungstenite::Error>),
230
231    #[error("connection closed (({0})")]
232    ConnectionClosed(String),
233
234    #[error("json parse error")]
235    JsonParseError(#[from] serde_json::error::Error),
236
237    #[error("subscribe failed: {reason}")]
238    SubscribeFailed { reason: String, message: String },
239
240    #[error("unexpected message format: {0}")]
241    UnexpectedMessageError(String),
242
243    #[error("request failed: {reason}")]
244    RequestFailed { reason: String, message: String },
245
246    #[error("request error: {0}")]
247    RequestError(String),
248
249    #[error("could not find subscription id: {0}")]
250    UnexpectedSubscriptionResponse(String),
251
252    #[error("could not find node version: {0}")]
253    UnexpectedGetVersionResponse(String),
254}
255
256type UnsubscribeFn = Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>;
257type SubscribeResponseMsg =
258    Result<(mpsc::UnboundedReceiver<Value>, UnsubscribeFn), PubsubClientError>;
259type SubscribeRequestMsg = (String, Value, oneshot::Sender<SubscribeResponseMsg>);
260type SubscribeResult<'a, T> = PubsubClientResult<(BoxStream<'a, T>, UnsubscribeFn)>;
261type RequestMsg = (
262    String,
263    Value,
264    oneshot::Sender<Result<Value, PubsubClientError>>,
265);
266
267/// A client for subscribing to messages from the RPC server.
268///
269/// See the [module documentation][self].
270#[derive(Debug)]
271pub struct PubsubClient {
272    subscribe_sender: mpsc::UnboundedSender<SubscribeRequestMsg>,
273    _request_sender: mpsc::UnboundedSender<RequestMsg>,
274    shutdown_sender: oneshot::Sender<()>,
275    ws: JoinHandle<PubsubClientResult>,
276}
277
278async fn connect_with_retry<R: IntoClientRequest>(
279    request: R,
280) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, Box<tungstenite::Error>> {
281    let mut connection_retries = 5;
282    let client_request = request.into_client_request().map_err(Box::new)?;
283    loop {
284        let result = connect_async(client_request.clone())
285            .await
286            .map(|(socket, _)| socket);
287        if let Err(tungstenite::Error::Http(response)) = &result {
288            if response.status() == StatusCode::TOO_MANY_REQUESTS && connection_retries > 0 {
289                let mut duration = Duration::from_millis(500);
290                if let Some(retry_after) = response.headers().get(header::RETRY_AFTER) {
291                    if let Ok(retry_after) = retry_after.to_str() {
292                        if let Ok(retry_after) = retry_after.parse::<u64>() {
293                            if retry_after < 120 {
294                                duration = Duration::from_secs(retry_after);
295                            }
296                        }
297                    }
298                }
299
300                connection_retries -= 1;
301                debug!(
302                    "Too many requests: server responded with {response:?}, {connection_retries} \
303                     retries left, pausing for {duration:?}"
304                );
305
306                sleep(duration).await;
307                continue;
308            }
309        }
310        return result.map_err(Box::new);
311    }
312}
313
314impl PubsubClient {
315    pub async fn new<R: IntoClientRequest>(request: R) -> PubsubClientResult<Self> {
316        let client_request = request.into_client_request().map_err(Box::new)?;
317        let ws = connect_with_retry(client_request)
318            .await
319            .map_err(PubsubClientError::ConnectionError)?;
320
321        let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel();
322        let (_request_sender, request_receiver) = mpsc::unbounded_channel();
323        let (shutdown_sender, shutdown_receiver) = oneshot::channel();
324
325        #[allow(clippy::used_underscore_binding)]
326        Ok(Self {
327            subscribe_sender,
328            _request_sender,
329            shutdown_sender,
330            ws: tokio::spawn(PubsubClient::run_ws(
331                ws,
332                subscribe_receiver,
333                request_receiver,
334                shutdown_receiver,
335            )),
336        })
337    }
338
339    pub async fn shutdown(self) -> PubsubClientResult {
340        let _ = self.shutdown_sender.send(());
341        self.ws.await.unwrap() // WS future should not be cancelled or panicked
342    }
343
344    async fn subscribe<'a, T>(&self, operation: &str, params: Value) -> SubscribeResult<'a, T>
345    where
346        T: DeserializeOwned + Send + 'a,
347    {
348        let (response_sender, response_receiver) = oneshot::channel();
349        self.subscribe_sender
350            .send((operation.to_string(), params, response_sender))
351            .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))?;
352
353        let (notifications, unsubscribe) = response_receiver
354            .await
355            .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))??;
356        Ok((
357            UnboundedReceiverStream::new(notifications)
358                .filter_map(|value| ready(serde_json::from_value::<T>(value).ok()))
359                .boxed(),
360            unsubscribe,
361        ))
362    }
363
364    /// Subscribe to account events.
365    ///
366    /// Receives messages of type [`UiAccount`] when an account's lamports or data changes.
367    ///
368    /// # RPC Reference
369    ///
370    /// This method corresponds directly to the [`accountSubscribe`] RPC method.
371    ///
372    /// [`accountSubscribe`]: https://solana.com/docs/rpc/websocket#accountsubscribe
373    pub async fn account_subscribe(
374        &self,
375        pubkey: &Pubkey,
376        config: Option<RpcAccountInfoConfig>,
377    ) -> SubscribeResult<'_, RpcResponse<UiAccount>> {
378        let params = json!([pubkey.to_string(), config]);
379        self.subscribe("account", params).await
380    }
381
382    /// Subscribe to block events.
383    ///
384    /// Receives messages of type [`RpcBlockUpdate`] when a block is confirmed or finalized.
385    ///
386    /// This method is disabled by default. It can be enabled by passing
387    /// `--rpc-pubsub-enable-block-subscription` to `agave-validator`.
388    ///
389    /// # RPC Reference
390    ///
391    /// This method corresponds directly to the [`blockSubscribe`] RPC method.
392    ///
393    /// [`blockSubscribe`]: https://solana.com/docs/rpc/websocket#blocksubscribe
394    pub async fn block_subscribe(
395        &self,
396        filter: RpcBlockSubscribeFilter,
397        config: Option<RpcBlockSubscribeConfig>,
398    ) -> SubscribeResult<'_, RpcResponse<RpcBlockUpdate>> {
399        self.subscribe("block", json!([filter, config])).await
400    }
401
402    /// Subscribe to transaction log events.
403    ///
404    /// Receives messages of type [`RpcLogsResponse`] when a transaction is committed.
405    ///
406    /// # RPC Reference
407    ///
408    /// This method corresponds directly to the [`logsSubscribe`] RPC method.
409    ///
410    /// [`logsSubscribe`]: https://solana.com/docs/rpc/websocket#logssubscribe
411    pub async fn logs_subscribe(
412        &self,
413        filter: RpcTransactionLogsFilter,
414        config: RpcTransactionLogsConfig,
415    ) -> SubscribeResult<'_, RpcResponse<RpcLogsResponse>> {
416        self.subscribe("logs", json!([filter, config])).await
417    }
418
419    /// Subscribe to program account events.
420    ///
421    /// Receives messages of type [`RpcKeyedAccount`] when an account owned
422    /// by the given program changes.
423    ///
424    /// # RPC Reference
425    ///
426    /// This method corresponds directly to the [`programSubscribe`] RPC method.
427    ///
428    /// [`programSubscribe`]: https://solana.com/docs/rpc/websocket#programsubscribe
429    pub async fn program_subscribe(
430        &self,
431        pubkey: &Pubkey,
432        config: Option<RpcProgramAccountsConfig>,
433    ) -> SubscribeResult<'_, RpcResponse<RpcKeyedAccount>> {
434        let params = json!([pubkey.to_string(), config]);
435        self.subscribe("program", params).await
436    }
437
438    /// Subscribe to vote events.
439    ///
440    /// Receives messages of type [`RpcVote`] when a new vote is observed. These
441    /// votes are observed prior to confirmation and may never be confirmed.
442    ///
443    /// This method is disabled by default. It can be enabled by passing
444    /// `--rpc-pubsub-enable-vote-subscription` to `agave-validator`.
445    ///
446    /// # RPC Reference
447    ///
448    /// This method corresponds directly to the [`voteSubscribe`] RPC method.
449    ///
450    /// [`voteSubscribe`]: https://solana.com/docs/rpc/websocket#votesubscribe
451    pub async fn vote_subscribe(&self) -> SubscribeResult<'_, RpcVote> {
452        self.subscribe("vote", json!([])).await
453    }
454
455    /// Subscribe to root events.
456    ///
457    /// Receives messages of type [`Slot`] when a new [root] is set by the
458    /// validator.
459    ///
460    /// [root]: https://solana.com/docs/terminology#root
461    ///
462    /// # RPC Reference
463    ///
464    /// This method corresponds directly to the [`rootSubscribe`] RPC method.
465    ///
466    /// [`rootSubscribe`]: https://solana.com/docs/rpc/websocket#rootsubscribe
467    pub async fn root_subscribe(&self) -> SubscribeResult<'_, Slot> {
468        self.subscribe("root", json!([])).await
469    }
470
471    /// Subscribe to transaction confirmation events.
472    ///
473    /// Receives messages of type [`RpcSignatureResult`] when a transaction
474    /// with the given signature is committed.
475    ///
476    /// This is a subscription to a single notification. It is automatically
477    /// cancelled by the server once the notification is sent.
478    ///
479    /// # RPC Reference
480    ///
481    /// This method corresponds directly to the [`signatureSubscribe`] RPC method.
482    ///
483    /// [`signatureSubscribe`]: https://solana.com/docs/rpc/websocket#signaturesubscribe
484    pub async fn signature_subscribe(
485        &self,
486        signature: &Signature,
487        config: Option<RpcSignatureSubscribeConfig>,
488    ) -> SubscribeResult<'_, RpcResponse<RpcSignatureResult>> {
489        let params = json!([signature.to_string(), config]);
490        self.subscribe("signature", params).await
491    }
492
493    /// Subscribe to slot events.
494    ///
495    /// Receives messages of type [`SlotInfo`] when a slot is processed.
496    ///
497    /// # RPC Reference
498    ///
499    /// This method corresponds directly to the [`slotSubscribe`] RPC method.
500    ///
501    /// [`slotSubscribe`]: https://solana.com/docs/rpc/websocket#slotsubscribe
502    pub async fn slot_subscribe(&self) -> SubscribeResult<'_, SlotInfo> {
503        self.subscribe("slot", json!([])).await
504    }
505
506    /// Subscribe to slot update events.
507    ///
508    /// Receives messages of type [`SlotUpdate`] when various updates to a slot occur.
509    ///
510    /// Note that this method operates differently than other subscriptions:
511    /// instead of sending the message to a receiver on a channel, it accepts a
512    /// `handler` callback that processes the message directly. This processing
513    /// occurs on another thread.
514    ///
515    /// # RPC Reference
516    ///
517    /// This method corresponds directly to the [`slotUpdatesSubscribe`] RPC method.
518    ///
519    /// [`slotUpdatesSubscribe`]: https://solana.com/docs/rpc/websocket#slotsupdatessubscribe
520    pub async fn slot_updates_subscribe(&self) -> SubscribeResult<'_, SlotUpdate> {
521        self.subscribe("slotsUpdates", json!([])).await
522    }
523
524    async fn run_ws(
525        mut ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
526        mut subscribe_receiver: mpsc::UnboundedReceiver<SubscribeRequestMsg>,
527        mut request_receiver: mpsc::UnboundedReceiver<RequestMsg>,
528        mut shutdown_receiver: oneshot::Receiver<()>,
529    ) -> PubsubClientResult {
530        let mut request_id: u64 = 0;
531
532        let mut requests_subscribe = BTreeMap::new();
533        let mut requests_unsubscribe = BTreeMap::<u64, oneshot::Sender<()>>::new();
534        let mut other_requests = BTreeMap::new();
535        let mut subscriptions = BTreeMap::new();
536        let (unsubscribe_sender, mut unsubscribe_receiver) = mpsc::unbounded_channel();
537
538        loop {
539            tokio::select! {
540                // Send close on shutdown signal
541                _ = (&mut shutdown_receiver) => {
542                    let frame = CloseFrame { code: CloseCode::Normal, reason: "".into() };
543                    ws.send(Message::Close(Some(frame))).await.map_err(Box::new)?;
544                    ws.flush().await.map_err(Box::new)?;
545                    break;
546                },
547                // Send `Message::Ping` each 10s if no any other communication
548                () = sleep(Duration::from_secs(10)) => {
549                    ws.send(Message::Ping(Bytes::new())).await.map_err(Box::new)?;
550                },
551                // Read message for subscribe
552                Some((operation, params, response_sender)) = subscribe_receiver.recv() => {
553                    request_id += 1;
554                    let method = format!("{operation}Subscribe");
555                    let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string();
556                    ws.send(Message::Text(text.into())).await.map_err(Box::new)?;
557                    requests_subscribe.insert(request_id, (operation, response_sender));
558                },
559                // Read message for unsubscribe
560                Some((operation, sid, response_sender)) = unsubscribe_receiver.recv() => {
561                    subscriptions.remove(&sid);
562                    request_id += 1;
563                    let method = format!("{operation}Unsubscribe");
564                    let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":[sid]}).to_string();
565                    ws.send(Message::Text(text.into())).await.map_err(Box::new)?;
566                    requests_unsubscribe.insert(request_id, response_sender);
567                },
568                // Read message for other requests
569                Some((method, params, response_sender)) = request_receiver.recv() => {
570                    request_id += 1;
571                    let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string();
572                    ws.send(Message::Text(text.into())).await.map_err(Box::new)?;
573                    other_requests.insert(request_id, response_sender);
574                }
575                // Read incoming WebSocket message
576                next_msg = ws.next() => {
577                    let msg = match next_msg {
578                        Some(msg) => msg.map_err(Box::new)?,
579                        None => break,
580                    };
581                    trace!("ws.next(): {:?}", &msg);
582
583                    // Get text from the message
584                    let text = match msg {
585                        Message::Text(text) => text,
586                        Message::Binary(_data) => continue, // Ignore
587                        Message::Ping(data) => {
588                            ws.send(Message::Pong(data)).await.map_err(Box::new)?;
589                            continue
590                        },
591                        Message::Pong(_data) => continue,
592                        Message::Close(_frame) => break,
593                        Message::Frame(_frame) => continue,
594                    };
595
596
597                    let mut json: Map<String, Value> = serde_json::from_str(&text)?;
598
599                    // Subscribe/Unsubscribe response, example:
600                    // `{"jsonrpc":"2.0","result":5308752,"id":1}`
601                    if let Some(id) = json.get("id") {
602                        let id = id.as_u64().ok_or_else(|| {
603                            PubsubClientError::SubscribeFailed { reason: "invalid `id` field".into(), message: text.to_string() }
604                        })?;
605
606                        let err = json.get("error").map(|error_object| {
607                            match serde_json::from_value::<RpcErrorObject>(error_object.clone()) {
608                                Ok(rpc_error_object) => {
609                                    format!("{} ({})",  rpc_error_object.message, rpc_error_object.code)
610                                }
611                                Err(err) => format!(
612                                    "Failed to deserialize RPC error response: {} [{}]",
613                                    serde_json::to_string(error_object).unwrap(),
614                                    err
615                                )
616                            }
617                        });
618
619                        if let Some(response_sender) = other_requests.remove(&id) {
620                            match err {
621                                Some(reason) => {
622                                    let _ = response_sender.send(Err(PubsubClientError::RequestFailed { reason, message: text.to_string()}));
623                                },
624                                None => {
625                                    let json_result = json.get("result").ok_or_else(|| {
626                                        PubsubClientError::RequestFailed { reason: "missing `result` field".into(), message: text.to_string() }
627                                    })?;
628                                    if response_sender.send(Ok(json_result.clone())).is_err() {
629                                        break;
630                                    }
631                                }
632                            }
633                        } else if let Some(response_sender) = requests_unsubscribe.remove(&id) {
634                            let _ = response_sender.send(()); // do not care if receiver is closed
635                        } else if let Some((operation, response_sender)) = requests_subscribe.remove(&id) {
636                            match err {
637                                Some(reason) => {
638                                    let _ = response_sender.send(Err(PubsubClientError::SubscribeFailed { reason, message: text.to_string()}));
639                                },
640                                None => {
641                                    // Subscribe Id
642                                    let sid = json.get("result").and_then(Value::as_u64).ok_or_else(|| {
643                                        PubsubClientError::SubscribeFailed { reason: "invalid `result` field".into(), message: text.to_string() }
644                                    })?;
645
646                                    // Create notifications channel and unsubscribe function
647                                    let (notifications_sender, notifications_receiver) = mpsc::unbounded_channel();
648                                    let unsubscribe_sender = unsubscribe_sender.clone();
649                                    let unsubscribe = Box::new(move || async move {
650                                        let (response_sender, response_receiver) = oneshot::channel();
651                                        // do nothing if ws already closed
652                                        if unsubscribe_sender.send((operation, sid, response_sender)).is_ok() {
653                                            let _ = response_receiver.await; // channel can be closed only if ws is closed
654                                        }
655                                    }.boxed());
656
657                                    if response_sender.send(Ok((notifications_receiver, unsubscribe))).is_err() {
658                                        break;
659                                    }
660                                    subscriptions.insert(sid, notifications_sender);
661                                }
662                            }
663                        } else {
664                            error!("Unknown request id: {id}");
665                            break;
666                        }
667                        continue;
668                    }
669
670                    // Notification, example:
671                    // `{"jsonrpc":"2.0","method":"logsNotification","params":{"result":{...},"subscription":3114862}}`
672                    if let Some(Value::Object(params)) = json.get_mut("params") {
673                        if let Some(sid) = params.get("subscription").and_then(Value::as_u64) {
674                            let mut unsubscribe_required = false;
675
676                            if let Some(notifications_sender) = subscriptions.get(&sid) {
677                                if let Some(result) = params.remove("result") {
678                                    if notifications_sender.send(result).is_err() {
679                                        unsubscribe_required = true;
680                                    }
681                                }
682                            } else {
683                                unsubscribe_required = true;
684                            }
685
686                            if unsubscribe_required {
687                                if let Some(Value::String(method)) = json.remove("method") {
688                                    if let Some(operation) = method.strip_suffix("Notification") {
689                                        let (response_sender, _response_receiver) = oneshot::channel();
690                                        let _ = unsubscribe_sender.send((operation.to_string(), sid, response_sender));
691                                    }
692                                }
693                            }
694                        }
695                    }
696                }
697            }
698        }
699
700        Ok(())
701    }
702}
703
704#[cfg(test)]
705mod tests {
706    // see client-test/test/client.rs
707}