solana_pubsub_client/nonblocking/
pubsub_client.rs

1//! A client for subscribing to messages from the RPC server.
2//!
3//! The [`PubsubClient`] implements [Solana WebSocket event
4//! subscriptions][spec].
5//!
6//! [spec]: https://solana.com/docs/rpc/websocket
7//!
8//! This is a nonblocking (async) API. For a blocking API use the synchronous
9//! client in [`crate::pubsub_client`].
10//!
11//! A single `PubsubClient` client may be used to subscribe to many events via
12//! subscription methods like [`PubsubClient::account_subscribe`]. These methods
13//! return a [`PubsubClientResult`] of a pair, the first element being a
14//! [`BoxStream`] of subscription-specific [`RpcResponse`]s, the second being an
15//! unsubscribe closure, an asynchronous function that can be called and
16//! `await`ed to unsubscribe.
17//!
18//! Note that `BoxStream` contains an immutable reference to the `PubsubClient`
19//! that created it. This makes `BoxStream` not `Send`, forcing it to stay in
20//! the same task as its `PubsubClient`. `PubsubClient` though is `Send` and
21//! `Sync`, and can be shared between tasks by putting it in an `Arc`. Thus
22//! one viable pattern to creating multiple subscriptions is:
23//!
24//! - create an `Arc<PubsubClient>`
25//! - spawn one task for each subscription, sharing the `PubsubClient`.
26//! - in each task:
27//!   - create a subscription
28//!   - send the `UnsubscribeFn` to another task to handle shutdown
29//!   - loop while receiving messages from the subscription
30//!
31//! This pattern is illustrated in the example below.
32//!
33//! By default the [`block_subscribe`] and [`vote_subscribe`] events are
34//! disabled on RPC nodes. They can be enabled by passing
35//! `--rpc-pubsub-enable-block-subscription` and
36//! `--rpc-pubsub-enable-vote-subscription` to `agave-validator`. When these
37//! methods are disabled, the RPC server will return a "Method not found" error
38//! message.
39//!
40//! [`block_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.block_subscribe
41//! [`vote_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.vote_subscribe
42//!
43//! # Examples
44//!
45//! Demo two async `PubsubClient` subscriptions with clean shutdown.
46//!
47//! This spawns a task for each subscription type, each of which subscribes and
48//! sends back a ready message and an unsubscribe channel (closure), then loops
49//! on printing messages. The main task then waits for user input before
50//! unsubscribing and waiting on the tasks.
51//!
52//! ```
53//! use anyhow::Result;
54//! use futures_util::StreamExt;
55//! use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
56//! use std::sync::Arc;
57//! use tokio::io::AsyncReadExt;
58//! use tokio::sync::mpsc::unbounded_channel;
59//!
60//! pub async fn watch_subscriptions(
61//!     websocket_url: &str,
62//! ) -> Result<()> {
63//!
64//!     // Subscription tasks will send a ready signal when they have subscribed.
65//!     let (ready_sender, mut ready_receiver) = unbounded_channel::<()>();
66//!
67//!     // Channel to receive unsubscribe channels (actually closures).
68//!     // These receive a pair of `(Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>), &'static str)`,
69//!     // where the first is a closure to call to unsubscribe, the second is the subscription name.
70//!     let (unsubscribe_sender, mut unsubscribe_receiver) = unbounded_channel::<(_, &'static str)>();
71//!
72//!     // The `PubsubClient` must be `Arc`ed to share it across tasks.
73//!     let pubsub_client = Arc::new(PubsubClient::new(websocket_url).await?);
74//!
75//!     let mut join_handles = vec![];
76//!
77//!     join_handles.push(("slot", tokio::spawn({
78//!         // Clone things we need before moving their clones into the `async move` block.
79//!         //
80//!         // The subscriptions have to be made from the tasks that will receive the subscription messages,
81//!         // because the subscription streams hold a reference to the `PubsubClient`.
82//!         // Otherwise we would just subscribe on the main task and send the receivers out to other tasks.
83//!
84//!         let ready_sender = ready_sender.clone();
85//!         let unsubscribe_sender = unsubscribe_sender.clone();
86//!         let pubsub_client = Arc::clone(&pubsub_client);
87//!         async move {
88//!             let (mut slot_notifications, slot_unsubscribe) =
89//!                 pubsub_client.slot_subscribe().await?;
90//!
91//!             // With the subscription started,
92//!             // send a signal back to the main task for synchronization.
93//!             ready_sender.send(()).expect("channel");
94//!
95//!             // Send the unsubscribe closure back to the main task.
96//!             unsubscribe_sender.send((slot_unsubscribe, "slot"))
97//!                 .map_err(|e| format!("{}", e)).expect("channel");
98//!
99//!             // Drop senders so that the channels can close.
100//!             // The main task will receive until channels are closed.
101//!             drop((ready_sender, unsubscribe_sender));
102//!
103//!             // Do something with the subscribed messages.
104//!             // This loop will end once the main task unsubscribes.
105//!             while let Some(slot_info) = slot_notifications.next().await {
106//!                 println!("------------------------------------------------------------");
107//!                 println!("slot pubsub result: {:?}", slot_info);
108//!             }
109//!
110//!             // This type hint is necessary to allow the `async move` block to use `?`.
111//!             Ok::<_, anyhow::Error>(())
112//!         }
113//!     })));
114//!
115//!     join_handles.push(("root", tokio::spawn({
116//!         let ready_sender = ready_sender.clone();
117//!         let unsubscribe_sender = unsubscribe_sender.clone();
118//!         let pubsub_client = Arc::clone(&pubsub_client);
119//!         async move {
120//!             let (mut root_notifications, root_unsubscribe) =
121//!                 pubsub_client.root_subscribe().await?;
122//!
123//!             ready_sender.send(()).expect("channel");
124//!             unsubscribe_sender.send((root_unsubscribe, "root"))
125//!                 .map_err(|e| format!("{}", e)).expect("channel");
126//!             drop((ready_sender, unsubscribe_sender));
127//!
128//!             while let Some(root) = root_notifications.next().await {
129//!                 println!("------------------------------------------------------------");
130//!                 println!("root pubsub result: {:?}", root);
131//!             }
132//!
133//!             Ok::<_, anyhow::Error>(())
134//!         }
135//!     })));
136//!
137//!     // Drop these senders so that the channels can close
138//!     // and their receivers return `None` below.
139//!     drop(ready_sender);
140//!     drop(unsubscribe_sender);
141//!
142//!     // Wait until all subscribers are ready before proceeding with application logic.
143//!     while let Some(_) = ready_receiver.recv().await { }
144//!
145//!     // Do application logic here.
146//!
147//!     // Wait for input or some application-specific shutdown condition.
148//!     tokio::io::stdin().read_u8().await?;
149//!
150//!     // Unsubscribe from everything, which will shutdown all the tasks.
151//!     while let Some((unsubscribe, name)) = unsubscribe_receiver.recv().await {
152//!         println!("unsubscribing from {}", name);
153//!         unsubscribe().await
154//!     }
155//!
156//!     // Wait for the tasks.
157//!     for (name, handle) in join_handles {
158//!         println!("waiting on task {}", name);
159//!         if let Ok(Err(e)) = handle.await {
160//!             println!("task {} failed: {}", name, e);
161//!         }
162//!     }
163//!
164//!     Ok(())
165//! }
166//! # Ok::<(), anyhow::Error>(())
167//! ```
168
169use {
170    futures_util::{
171        future::{ready, BoxFuture, FutureExt},
172        sink::SinkExt,
173        stream::{BoxStream, StreamExt},
174    },
175    log::*,
176    serde::de::DeserializeOwned,
177    serde_json::{json, Map, Value},
178    solana_account_decoder_client_types::UiAccount,
179    solana_clock::Slot,
180    solana_pubkey::Pubkey,
181    solana_rpc_client_types::{
182        config::{
183            RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
184            RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
185            RpcTransactionLogsFilter,
186        },
187        error_object::RpcErrorObject,
188        response::{
189            Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
190            RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
191        },
192    },
193    solana_signature::Signature,
194    std::collections::BTreeMap,
195    thiserror::Error,
196    tokio::{
197        net::TcpStream,
198        sync::{mpsc, oneshot},
199        task::JoinHandle,
200        time::{sleep, Duration},
201    },
202    tokio_stream::wrappers::UnboundedReceiverStream,
203    tokio_tungstenite::{
204        connect_async,
205        tungstenite::{
206            protocol::frame::{coding::CloseCode, CloseFrame},
207            Message,
208        },
209        MaybeTlsStream, WebSocketStream,
210    },
211    url::Url,
212};
213
214pub type PubsubClientResult<T = ()> = Result<T, PubsubClientError>;
215
216#[derive(Debug, Error)]
217pub enum PubsubClientError {
218    #[error("url parse error")]
219    UrlParseError(#[from] url::ParseError),
220
221    #[error("unable to connect to server")]
222    ConnectionError(Box<tokio_tungstenite::tungstenite::Error>),
223
224    #[error("websocket error")]
225    WsError(#[from] Box<tokio_tungstenite::tungstenite::Error>),
226
227    #[error("connection closed (({0})")]
228    ConnectionClosed(String),
229
230    #[error("json parse error")]
231    JsonParseError(#[from] serde_json::error::Error),
232
233    #[error("subscribe failed: {reason}")]
234    SubscribeFailed { reason: String, message: String },
235
236    #[error("unexpected message format: {0}")]
237    UnexpectedMessageError(String),
238
239    #[error("request failed: {reason}")]
240    RequestFailed { reason: String, message: String },
241
242    #[error("request error: {0}")]
243    RequestError(String),
244
245    #[error("could not find subscription id: {0}")]
246    UnexpectedSubscriptionResponse(String),
247
248    #[error("could not find node version: {0}")]
249    UnexpectedGetVersionResponse(String),
250}
251
252type UnsubscribeFn = Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>;
253type SubscribeResponseMsg =
254    Result<(mpsc::UnboundedReceiver<Value>, UnsubscribeFn), PubsubClientError>;
255type SubscribeRequestMsg = (String, Value, oneshot::Sender<SubscribeResponseMsg>);
256type SubscribeResult<'a, T> = PubsubClientResult<(BoxStream<'a, T>, UnsubscribeFn)>;
257type RequestMsg = (
258    String,
259    Value,
260    oneshot::Sender<Result<Value, PubsubClientError>>,
261);
262
263/// A client for subscribing to messages from the RPC server.
264///
265/// See the [module documentation][self].
266#[derive(Debug)]
267pub struct PubsubClient {
268    subscribe_sender: mpsc::UnboundedSender<SubscribeRequestMsg>,
269    _request_sender: mpsc::UnboundedSender<RequestMsg>,
270    shutdown_sender: oneshot::Sender<()>,
271    ws: JoinHandle<PubsubClientResult>,
272}
273
274impl PubsubClient {
275    pub async fn new(url: &str) -> PubsubClientResult<Self> {
276        let url = Url::parse(url)?;
277        let (ws, _response) = connect_async(url)
278            .await
279            .map_err(Box::new)
280            .map_err(PubsubClientError::ConnectionError)?;
281
282        let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel();
283        let (_request_sender, request_receiver) = mpsc::unbounded_channel();
284        let (shutdown_sender, shutdown_receiver) = oneshot::channel();
285
286        #[allow(clippy::used_underscore_binding)]
287        Ok(Self {
288            subscribe_sender,
289            _request_sender,
290            shutdown_sender,
291            ws: tokio::spawn(PubsubClient::run_ws(
292                ws,
293                subscribe_receiver,
294                request_receiver,
295                shutdown_receiver,
296            )),
297        })
298    }
299
300    pub async fn shutdown(self) -> PubsubClientResult {
301        let _ = self.shutdown_sender.send(());
302        self.ws.await.unwrap() // WS future should not be cancelled or panicked
303    }
304
305    async fn subscribe<'a, T>(&self, operation: &str, params: Value) -> SubscribeResult<'a, T>
306    where
307        T: DeserializeOwned + Send + 'a,
308    {
309        let (response_sender, response_receiver) = oneshot::channel();
310        self.subscribe_sender
311            .send((operation.to_string(), params, response_sender))
312            .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))?;
313
314        let (notifications, unsubscribe) = response_receiver
315            .await
316            .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))??;
317        Ok((
318            UnboundedReceiverStream::new(notifications)
319                .filter_map(|value| ready(serde_json::from_value::<T>(value).ok()))
320                .boxed(),
321            unsubscribe,
322        ))
323    }
324
325    /// Subscribe to account events.
326    ///
327    /// Receives messages of type [`UiAccount`] when an account's lamports or data changes.
328    ///
329    /// # RPC Reference
330    ///
331    /// This method corresponds directly to the [`accountSubscribe`] RPC method.
332    ///
333    /// [`accountSubscribe`]: https://solana.com/docs/rpc/websocket#accountsubscribe
334    pub async fn account_subscribe(
335        &self,
336        pubkey: &Pubkey,
337        config: Option<RpcAccountInfoConfig>,
338    ) -> SubscribeResult<'_, RpcResponse<UiAccount>> {
339        let params = json!([pubkey.to_string(), config]);
340        self.subscribe("account", params).await
341    }
342
343    /// Subscribe to block events.
344    ///
345    /// Receives messages of type [`RpcBlockUpdate`] when a block is confirmed or finalized.
346    ///
347    /// This method is disabled by default. It can be enabled by passing
348    /// `--rpc-pubsub-enable-block-subscription` to `agave-validator`.
349    ///
350    /// # RPC Reference
351    ///
352    /// This method corresponds directly to the [`blockSubscribe`] RPC method.
353    ///
354    /// [`blockSubscribe`]: https://solana.com/docs/rpc/websocket#blocksubscribe
355    pub async fn block_subscribe(
356        &self,
357        filter: RpcBlockSubscribeFilter,
358        config: Option<RpcBlockSubscribeConfig>,
359    ) -> SubscribeResult<'_, RpcResponse<RpcBlockUpdate>> {
360        self.subscribe("block", json!([filter, config])).await
361    }
362
363    /// Subscribe to transaction log events.
364    ///
365    /// Receives messages of type [`RpcLogsResponse`] when a transaction is committed.
366    ///
367    /// # RPC Reference
368    ///
369    /// This method corresponds directly to the [`logsSubscribe`] RPC method.
370    ///
371    /// [`logsSubscribe`]: https://solana.com/docs/rpc/websocket#logssubscribe
372    pub async fn logs_subscribe(
373        &self,
374        filter: RpcTransactionLogsFilter,
375        config: RpcTransactionLogsConfig,
376    ) -> SubscribeResult<'_, RpcResponse<RpcLogsResponse>> {
377        self.subscribe("logs", json!([filter, config])).await
378    }
379
380    /// Subscribe to program account events.
381    ///
382    /// Receives messages of type [`RpcKeyedAccount`] when an account owned
383    /// by the given program changes.
384    ///
385    /// # RPC Reference
386    ///
387    /// This method corresponds directly to the [`programSubscribe`] RPC method.
388    ///
389    /// [`programSubscribe`]: https://solana.com/docs/rpc/websocket#programsubscribe
390    pub async fn program_subscribe(
391        &self,
392        pubkey: &Pubkey,
393        config: Option<RpcProgramAccountsConfig>,
394    ) -> SubscribeResult<'_, RpcResponse<RpcKeyedAccount>> {
395        let params = json!([pubkey.to_string(), config]);
396        self.subscribe("program", params).await
397    }
398
399    /// Subscribe to vote events.
400    ///
401    /// Receives messages of type [`RpcVote`] when a new vote is observed. These
402    /// votes are observed prior to confirmation and may never be confirmed.
403    ///
404    /// This method is disabled by default. It can be enabled by passing
405    /// `--rpc-pubsub-enable-vote-subscription` to `agave-validator`.
406    ///
407    /// # RPC Reference
408    ///
409    /// This method corresponds directly to the [`voteSubscribe`] RPC method.
410    ///
411    /// [`voteSubscribe`]: https://solana.com/docs/rpc/websocket#votesubscribe
412    pub async fn vote_subscribe(&self) -> SubscribeResult<'_, RpcVote> {
413        self.subscribe("vote", json!([])).await
414    }
415
416    /// Subscribe to root events.
417    ///
418    /// Receives messages of type [`Slot`] when a new [root] is set by the
419    /// validator.
420    ///
421    /// [root]: https://solana.com/docs/terminology#root
422    ///
423    /// # RPC Reference
424    ///
425    /// This method corresponds directly to the [`rootSubscribe`] RPC method.
426    ///
427    /// [`rootSubscribe`]: https://solana.com/docs/rpc/websocket#rootsubscribe
428    pub async fn root_subscribe(&self) -> SubscribeResult<'_, Slot> {
429        self.subscribe("root", json!([])).await
430    }
431
432    /// Subscribe to transaction confirmation events.
433    ///
434    /// Receives messages of type [`RpcSignatureResult`] when a transaction
435    /// with the given signature is committed.
436    ///
437    /// This is a subscription to a single notification. It is automatically
438    /// cancelled by the server once the notification is sent.
439    ///
440    /// # RPC Reference
441    ///
442    /// This method corresponds directly to the [`signatureSubscribe`] RPC method.
443    ///
444    /// [`signatureSubscribe`]: https://solana.com/docs/rpc/websocket#signaturesubscribe
445    pub async fn signature_subscribe(
446        &self,
447        signature: &Signature,
448        config: Option<RpcSignatureSubscribeConfig>,
449    ) -> SubscribeResult<'_, RpcResponse<RpcSignatureResult>> {
450        let params = json!([signature.to_string(), config]);
451        self.subscribe("signature", params).await
452    }
453
454    /// Subscribe to slot events.
455    ///
456    /// Receives messages of type [`SlotInfo`] when a slot is processed.
457    ///
458    /// # RPC Reference
459    ///
460    /// This method corresponds directly to the [`slotSubscribe`] RPC method.
461    ///
462    /// [`slotSubscribe`]: https://solana.com/docs/rpc/websocket#slotsubscribe
463    pub async fn slot_subscribe(&self) -> SubscribeResult<'_, SlotInfo> {
464        self.subscribe("slot", json!([])).await
465    }
466
467    /// Subscribe to slot update events.
468    ///
469    /// Receives messages of type [`SlotUpdate`] when various updates to a slot occur.
470    ///
471    /// Note that this method operates differently than other subscriptions:
472    /// instead of sending the message to a receiver on a channel, it accepts a
473    /// `handler` callback that processes the message directly. This processing
474    /// occurs on another thread.
475    ///
476    /// # RPC Reference
477    ///
478    /// This method corresponds directly to the [`slotUpdatesSubscribe`] RPC method.
479    ///
480    /// [`slotUpdatesSubscribe`]: https://solana.com/docs/rpc/websocket#slotsupdatessubscribe
481    pub async fn slot_updates_subscribe(&self) -> SubscribeResult<'_, SlotUpdate> {
482        self.subscribe("slotsUpdates", json!([])).await
483    }
484
485    async fn run_ws(
486        mut ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
487        mut subscribe_receiver: mpsc::UnboundedReceiver<SubscribeRequestMsg>,
488        mut request_receiver: mpsc::UnboundedReceiver<RequestMsg>,
489        mut shutdown_receiver: oneshot::Receiver<()>,
490    ) -> PubsubClientResult {
491        let mut request_id: u64 = 0;
492
493        let mut requests_subscribe = BTreeMap::new();
494        let mut requests_unsubscribe = BTreeMap::<u64, oneshot::Sender<()>>::new();
495        let mut other_requests = BTreeMap::new();
496        let mut subscriptions = BTreeMap::new();
497        let (unsubscribe_sender, mut unsubscribe_receiver) = mpsc::unbounded_channel();
498
499        loop {
500            tokio::select! {
501                // Send close on shutdown signal
502                _ = (&mut shutdown_receiver) => {
503                    let frame = CloseFrame { code: CloseCode::Normal, reason: "".into() };
504                    ws.send(Message::Close(Some(frame))).await.map_err(Box::new)?;
505                    ws.flush().await.map_err(Box::new)?;
506                    break;
507                },
508                // Send `Message::Ping` each 10s if no any other communication
509                () = sleep(Duration::from_secs(10)) => {
510                    ws.send(Message::Ping(Vec::new())).await.map_err(Box::new)?;
511                },
512                // Read message for subscribe
513                Some((operation, params, response_sender)) = subscribe_receiver.recv() => {
514                    request_id += 1;
515                    let method = format!("{operation}Subscribe");
516                    let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string();
517                    ws.send(Message::Text(text)).await.map_err(Box::new)?;
518                    requests_subscribe.insert(request_id, (operation, response_sender));
519                },
520                // Read message for unsubscribe
521                Some((operation, sid, response_sender)) = unsubscribe_receiver.recv() => {
522                    subscriptions.remove(&sid);
523                    request_id += 1;
524                    let method = format!("{operation}Unsubscribe");
525                    let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":[sid]}).to_string();
526                    ws.send(Message::Text(text)).await.map_err(Box::new)?;
527                    requests_unsubscribe.insert(request_id, response_sender);
528                },
529                // Read message for other requests
530                Some((method, params, response_sender)) = request_receiver.recv() => {
531                    request_id += 1;
532                    let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string();
533                    ws.send(Message::Text(text)).await.map_err(Box::new)?;
534                    other_requests.insert(request_id, response_sender);
535                }
536                // Read incoming WebSocket message
537                next_msg = ws.next() => {
538                    let msg = match next_msg {
539                        Some(msg) => msg.map_err(Box::new)?,
540                        None => break,
541                    };
542                    trace!("ws.next(): {:?}", &msg);
543
544                    // Get text from the message
545                    let text = match msg {
546                        Message::Text(text) => text,
547                        Message::Binary(_data) => continue, // Ignore
548                        Message::Ping(data) => {
549                            ws.send(Message::Pong(data)).await.map_err(Box::new)?;
550                            continue
551                        },
552                        Message::Pong(_data) => continue,
553                        Message::Close(_frame) => break,
554                        Message::Frame(_frame) => continue,
555                    };
556
557
558                    let mut json: Map<String, Value> = serde_json::from_str(&text)?;
559
560                    // Subscribe/Unsubscribe response, example:
561                    // `{"jsonrpc":"2.0","result":5308752,"id":1}`
562                    if let Some(id) = json.get("id") {
563                        let id = id.as_u64().ok_or_else(|| {
564                            PubsubClientError::SubscribeFailed { reason: "invalid `id` field".into(), message: text.clone() }
565                        })?;
566
567                        let err = json.get("error").map(|error_object| {
568                            match serde_json::from_value::<RpcErrorObject>(error_object.clone()) {
569                                Ok(rpc_error_object) => {
570                                    format!("{} ({})",  rpc_error_object.message, rpc_error_object.code)
571                                }
572                                Err(err) => format!(
573                                    "Failed to deserialize RPC error response: {} [{}]",
574                                    serde_json::to_string(error_object).unwrap(),
575                                    err
576                                )
577                            }
578                        });
579
580                        if let Some(response_sender) = other_requests.remove(&id) {
581                            match err {
582                                Some(reason) => {
583                                    let _ = response_sender.send(Err(PubsubClientError::RequestFailed { reason, message: text.clone()}));
584                                },
585                                None => {
586                                    let json_result = json.get("result").ok_or_else(|| {
587                                        PubsubClientError::RequestFailed { reason: "missing `result` field".into(), message: text.clone() }
588                                    })?;
589                                    if response_sender.send(Ok(json_result.clone())).is_err() {
590                                        break;
591                                    }
592                                }
593                            }
594                        } else if let Some(response_sender) = requests_unsubscribe.remove(&id) {
595                            let _ = response_sender.send(()); // do not care if receiver is closed
596                        } else if let Some((operation, response_sender)) = requests_subscribe.remove(&id) {
597                            match err {
598                                Some(reason) => {
599                                    let _ = response_sender.send(Err(PubsubClientError::SubscribeFailed { reason, message: text.clone()}));
600                                },
601                                None => {
602                                    // Subscribe Id
603                                    let sid = json.get("result").and_then(Value::as_u64).ok_or_else(|| {
604                                        PubsubClientError::SubscribeFailed { reason: "invalid `result` field".into(), message: text.clone() }
605                                    })?;
606
607                                    // Create notifications channel and unsubscribe function
608                                    let (notifications_sender, notifications_receiver) = mpsc::unbounded_channel();
609                                    let unsubscribe_sender = unsubscribe_sender.clone();
610                                    let unsubscribe = Box::new(move || async move {
611                                        let (response_sender, response_receiver) = oneshot::channel();
612                                        // do nothing if ws already closed
613                                        if unsubscribe_sender.send((operation, sid, response_sender)).is_ok() {
614                                            let _ = response_receiver.await; // channel can be closed only if ws is closed
615                                        }
616                                    }.boxed());
617
618                                    if response_sender.send(Ok((notifications_receiver, unsubscribe))).is_err() {
619                                        break;
620                                    }
621                                    subscriptions.insert(sid, notifications_sender);
622                                }
623                            }
624                        } else {
625                            error!("Unknown request id: {id}");
626                            break;
627                        }
628                        continue;
629                    }
630
631                    // Notification, example:
632                    // `{"jsonrpc":"2.0","method":"logsNotification","params":{"result":{...},"subscription":3114862}}`
633                    if let Some(Value::Object(params)) = json.get_mut("params") {
634                        if let Some(sid) = params.get("subscription").and_then(Value::as_u64) {
635                            let mut unsubscribe_required = false;
636
637                            if let Some(notifications_sender) = subscriptions.get(&sid) {
638                                if let Some(result) = params.remove("result") {
639                                    if notifications_sender.send(result).is_err() {
640                                        unsubscribe_required = true;
641                                    }
642                                }
643                            } else {
644                                unsubscribe_required = true;
645                            }
646
647                            if unsubscribe_required {
648                                if let Some(Value::String(method)) = json.remove("method") {
649                                    if let Some(operation) = method.strip_suffix("Notification") {
650                                        let (response_sender, _response_receiver) = oneshot::channel();
651                                        let _ = unsubscribe_sender.send((operation.to_string(), sid, response_sender));
652                                    }
653                                }
654                            }
655                        }
656                    }
657                }
658            }
659        }
660
661        Ok(())
662    }
663}
664
665#[cfg(test)]
666mod tests {
667    // see client-test/test/client.rs
668}