solana_pubsub_client/
pubsub_client.rs

1//! A client for subscribing to messages from the RPC server.
2//!
3//! The [`PubsubClient`] implements [Solana WebSocket event
4//! subscriptions][spec].
5//!
6//! [spec]: https://solana.com/docs/rpc/websocket
7//!
8//! This is a blocking API. For a non-blocking API use the asynchronous client
9//! in [`crate::nonblocking::pubsub_client`].
10//!
11//! `PubsubClient` contains static methods to subscribe to events, like
12//! [`PubsubClient::account_subscribe`]. These methods each return their own
13//! subscription type, like [`AccountSubscription`], that are typedefs of
14//! tuples, the first element being a handle to the subscription, like
15//! [`AccountSubscription`], the second a [`Receiver`] of [`RpcResponse`] of
16//! whichever type is appropriate for the subscription. The subscription handle
17//! is a typedef of [`PubsubClientSubscription`], and it must remain live for
18//! the receiver to continue receiving messages.
19//!
20//! Because this is a blocking API, with blocking receivers, a reasonable
21//! pattern for using this API is to move each event receiver to its own thread
22//! to block on messages, while holding all subscription handles on a single
23//! primary thread.
24//!
25//! While `PubsubClientSubscription` contains methods for shutting down,
26//! [`PubsubClientSubscription::send_unsubscribe`], and
27//! [`PubsubClientSubscription::shutdown`], because its internal receivers block
28//! on events from the server, these subscriptions cannot actually be shutdown
29//! reliably. For a non-blocking, cancelable API, use the asynchronous client
30//! in [`crate::nonblocking::pubsub_client`].
31//!
32//! By default the [`block_subscribe`] and [`vote_subscribe`] events are
33//! disabled on RPC nodes. They can be enabled by passing
34//! `--rpc-pubsub-enable-block-subscription` and
35//! `--rpc-pubsub-enable-vote-subscription` to `agave-validator`. When these
36//! methods are disabled, the RPC server will return a "Method not found" error
37//! message.
38//!
39//! [`block_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.block_subscribe
40//! [`vote_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.vote_subscribe
41//!
42//! # Examples
43//!
44//! This example subscribes to account events and then loops forever receiving
45//! them.
46//!
47//! ```
48//! use anyhow::Result;
49//! use solana_commitment_config::CommitmentConfig;
50//! use solana_pubkey::Pubkey;
51//! use solana_pubsub_client::pubsub_client::PubsubClient;
52//! use solana_rpc_client_types::config::RpcAccountInfoConfig;
53//! use std::thread;
54//!
55//! fn get_account_updates(account_pubkey: Pubkey) -> Result<()> {
56//!     let url = "wss://api.devnet.solana.com/";
57//!
58//!     let (mut account_subscription_client, account_subscription_receiver) =
59//!         PubsubClient::account_subscribe(
60//!             url,
61//!             &account_pubkey,
62//!             Some(RpcAccountInfoConfig {
63//!                 encoding: None,
64//!                 data_slice: None,
65//!                 commitment: Some(CommitmentConfig::confirmed()),
66//!                 min_context_slot: None,
67//!             }),
68//!         )?;
69//!
70//!     loop {
71//!         match account_subscription_receiver.recv() {
72//!             Ok(response) => {
73//!                 println!("account subscription response: {:?}", response);
74//!             }
75//!             Err(e) => {
76//!                 println!("account subscription error: {:?}", e);
77//!                 break;
78//!             }
79//!         }
80//!     }
81//!
82//!     Ok(())
83//! }
84//! #
85//! # get_account_updates(solana_pubkey::new_rand());
86//! # Ok::<(), anyhow::Error>(())
87//! ```
88
89pub use crate::nonblocking::pubsub_client::PubsubClientError;
90use {
91    crossbeam_channel::{unbounded, Receiver, Sender},
92    log::*,
93    serde::de::DeserializeOwned,
94    serde_json::{
95        json,
96        value::Value::{Number, Object},
97        Map, Value,
98    },
99    solana_account_decoder_client_types::UiAccount,
100    solana_clock::Slot,
101    solana_pubkey::Pubkey,
102    solana_rpc_client_types::{
103        config::{
104            RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
105            RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
106            RpcTransactionLogsFilter,
107        },
108        response::{
109            Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
110            RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
111        },
112    },
113    solana_signature::Signature,
114    std::{
115        marker::PhantomData,
116        net::TcpStream,
117        sync::{
118            atomic::{AtomicBool, Ordering},
119            Arc, RwLock,
120        },
121        thread::{sleep, JoinHandle},
122        time::Duration,
123    },
124    tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
125    url::Url,
126};
127
128/// A subscription.
129///
130/// The subscription is unsubscribed on drop, and note that unsubscription (and
131/// thus drop) time is unbounded. See
132/// [`PubsubClientSubscription::send_unsubscribe`].
133pub struct PubsubClientSubscription<T>
134where
135    T: DeserializeOwned,
136{
137    message_type: PhantomData<T>,
138    operation: &'static str,
139    socket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
140    subscription_id: u64,
141    t_cleanup: Option<JoinHandle<()>>,
142    exit: Arc<AtomicBool>,
143}
144
145impl<T> Drop for PubsubClientSubscription<T>
146where
147    T: DeserializeOwned,
148{
149    fn drop(&mut self) {
150        self.send_unsubscribe()
151            .unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
152        self.socket
153            .write()
154            .unwrap()
155            .close(None)
156            .unwrap_or_else(|_| warn!("unable to close websocket"));
157    }
158}
159
160impl<T> PubsubClientSubscription<T>
161where
162    T: DeserializeOwned,
163{
164    fn send_subscribe(
165        writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
166        body: String,
167    ) -> Result<u64, PubsubClientError> {
168        writable_socket
169            .write()
170            .unwrap()
171            .send(Message::Text(body))
172            .map_err(Box::new)?;
173        let message = writable_socket.write().unwrap().read().map_err(Box::new)?;
174        Self::extract_subscription_id(message)
175    }
176
177    fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
178        let message_text = &message.into_text().map_err(Box::new)?;
179
180        if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
181            if let Some(Number(x)) = json_msg.get("result") {
182                if let Some(x) = x.as_u64() {
183                    return Ok(x);
184                }
185            }
186        }
187
188        Err(PubsubClientError::UnexpectedSubscriptionResponse(format!(
189            "msg={message_text}"
190        )))
191    }
192
193    /// Send an unsubscribe message to the server.
194    ///
195    /// Note that this will block as long as the internal subscription receiver
196    /// is waiting on messages from the server, and this can take an unbounded
197    /// amount of time if the server does not send any messages.
198    ///
199    /// If a pubsub client needs to shutdown reliably it should use
200    /// the async client in [`crate::nonblocking::pubsub_client`].
201    pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
202        let method = format!("{}Unsubscribe", self.operation);
203        self.socket
204            .write()
205            .unwrap()
206            .send(Message::Text(
207                json!({
208                "jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
209                })
210                .to_string(),
211            ))
212            .map_err(Box::new)
213            .map_err(|err| err.into())
214    }
215
216    fn read_message(
217        writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
218    ) -> Result<Option<T>, PubsubClientError> {
219        let message = writable_socket.write().unwrap().read().map_err(Box::new)?;
220        if message.is_ping() {
221            return Ok(None);
222        }
223        let message_text = &message.into_text().map_err(Box::new)?;
224        if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
225            if let Some(Object(params)) = json_msg.get("params") {
226                if let Some(result) = params.get("result") {
227                    if let Ok(x) = serde_json::from_value::<T>(result.clone()) {
228                        return Ok(Some(x));
229                    }
230                }
231            }
232        }
233
234        Err(PubsubClientError::UnexpectedMessageError(format!(
235            "msg={message_text}"
236        )))
237    }
238
239    /// Shutdown the internel message receiver and wait for its thread to exit.
240    ///
241    /// Note that this will block as long as the subscription receiver is
242    /// waiting on messages from the server, and this can take an unbounded
243    /// amount of time if the server does not send any messages.
244    ///
245    /// If a pubsub client needs to shutdown reliably it should use
246    /// the async client in [`crate::nonblocking::pubsub_client`].
247    pub fn shutdown(&mut self) -> std::thread::Result<()> {
248        if self.t_cleanup.is_some() {
249            info!("websocket thread - shutting down");
250            self.exit.store(true, Ordering::Relaxed);
251            let x = self.t_cleanup.take().unwrap().join();
252            info!("websocket thread - shut down.");
253            x
254        } else {
255            warn!("websocket thread - already shut down.");
256            Ok(())
257        }
258    }
259}
260
261pub type PubsubLogsClientSubscription = PubsubClientSubscription<RpcResponse<RpcLogsResponse>>;
262pub type LogsSubscription = (
263    PubsubLogsClientSubscription,
264    Receiver<RpcResponse<RpcLogsResponse>>,
265);
266
267pub type PubsubSlotClientSubscription = PubsubClientSubscription<SlotInfo>;
268pub type SlotsSubscription = (PubsubSlotClientSubscription, Receiver<SlotInfo>);
269
270pub type PubsubSignatureClientSubscription =
271    PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
272pub type SignatureSubscription = (
273    PubsubSignatureClientSubscription,
274    Receiver<RpcResponse<RpcSignatureResult>>,
275);
276
277pub type PubsubBlockClientSubscription = PubsubClientSubscription<RpcResponse<RpcBlockUpdate>>;
278pub type BlockSubscription = (
279    PubsubBlockClientSubscription,
280    Receiver<RpcResponse<RpcBlockUpdate>>,
281);
282
283pub type PubsubProgramClientSubscription = PubsubClientSubscription<RpcResponse<RpcKeyedAccount>>;
284pub type ProgramSubscription = (
285    PubsubProgramClientSubscription,
286    Receiver<RpcResponse<RpcKeyedAccount>>,
287);
288
289pub type PubsubAccountClientSubscription = PubsubClientSubscription<RpcResponse<UiAccount>>;
290pub type AccountSubscription = (
291    PubsubAccountClientSubscription,
292    Receiver<RpcResponse<UiAccount>>,
293);
294
295pub type PubsubVoteClientSubscription = PubsubClientSubscription<RpcVote>;
296pub type VoteSubscription = (PubsubVoteClientSubscription, Receiver<RpcVote>);
297
298pub type PubsubRootClientSubscription = PubsubClientSubscription<Slot>;
299pub type RootSubscription = (PubsubRootClientSubscription, Receiver<Slot>);
300
301/// A client for subscribing to messages from the RPC server.
302///
303/// See the [module documentation][self].
304pub struct PubsubClient {}
305
306fn connect_with_retry(
307    url: Url,
308) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Box<tungstenite::Error>> {
309    let mut connection_retries = 5;
310    loop {
311        let result = connect(url.clone()).map(|(socket, _)| socket);
312        if let Err(tungstenite::Error::Http(response)) = &result {
313            if response.status() == http::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0 {
314                let mut duration = Duration::from_millis(500);
315                if let Some(retry_after) = response.headers().get(http::header::RETRY_AFTER) {
316                    if let Ok(retry_after) = retry_after.to_str() {
317                        if let Ok(retry_after) = retry_after.parse::<u64>() {
318                            if retry_after < 120 {
319                                duration = Duration::from_secs(retry_after);
320                            }
321                        }
322                    }
323                }
324
325                connection_retries -= 1;
326                debug!(
327                    "Too many requests: server responded with {response:?}, {connection_retries} \
328                     retries left, pausing for {duration:?}"
329                );
330
331                sleep(duration);
332                continue;
333            }
334        }
335        return result.map_err(Box::new);
336    }
337}
338
339impl PubsubClient {
340    /// Subscribe to account events.
341    ///
342    /// Receives messages of type [`UiAccount`] when an account's lamports or data changes.
343    ///
344    /// # RPC Reference
345    ///
346    /// This method corresponds directly to the [`accountSubscribe`] RPC method.
347    ///
348    /// [`accountSubscribe`]: https://solana.com/docs/rpc/websocket/accountsubscribe
349    pub fn account_subscribe(
350        url: &str,
351        pubkey: &Pubkey,
352        config: Option<RpcAccountInfoConfig>,
353    ) -> Result<AccountSubscription, PubsubClientError> {
354        let url = Url::parse(url)?;
355        let socket = connect_with_retry(url)?;
356        let (sender, receiver) = unbounded();
357
358        let socket = Arc::new(RwLock::new(socket));
359        let socket_clone = socket.clone();
360        let exit = Arc::new(AtomicBool::new(false));
361        let exit_clone = exit.clone();
362        let body = json!({
363            "jsonrpc":"2.0",
364            "id":1,
365            "method":"accountSubscribe",
366            "params":[
367                pubkey.to_string(),
368                config
369            ]
370        })
371        .to_string();
372        let subscription_id = PubsubAccountClientSubscription::send_subscribe(&socket_clone, body)?;
373
374        let t_cleanup = std::thread::spawn(move || {
375            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
376        });
377
378        let result = PubsubClientSubscription {
379            message_type: PhantomData,
380            operation: "account",
381            socket,
382            subscription_id,
383            t_cleanup: Some(t_cleanup),
384            exit,
385        };
386
387        Ok((result, receiver))
388    }
389
390    /// Subscribe to block events.
391    ///
392    /// Receives messages of type [`RpcBlockUpdate`] when a block is confirmed or finalized.
393    ///
394    /// This method is disabled by default. It can be enabled by passing
395    /// `--rpc-pubsub-enable-block-subscription` to `agave-validator`.
396    ///
397    /// # RPC Reference
398    ///
399    /// This method corresponds directly to the [`blockSubscribe`] RPC method.
400    ///
401    /// [`blockSubscribe`]: https://solana.com/docs/rpc/websocket/blocksubscribe
402    pub fn block_subscribe(
403        url: &str,
404        filter: RpcBlockSubscribeFilter,
405        config: Option<RpcBlockSubscribeConfig>,
406    ) -> Result<BlockSubscription, PubsubClientError> {
407        let url = Url::parse(url)?;
408        let socket = connect_with_retry(url)?;
409        let (sender, receiver) = unbounded();
410
411        let socket = Arc::new(RwLock::new(socket));
412        let socket_clone = socket.clone();
413        let exit = Arc::new(AtomicBool::new(false));
414        let exit_clone = exit.clone();
415        let body = json!({
416            "jsonrpc":"2.0",
417            "id":1,
418            "method":"blockSubscribe",
419            "params":[filter, config]
420        })
421        .to_string();
422
423        let subscription_id = PubsubBlockClientSubscription::send_subscribe(&socket_clone, body)?;
424
425        let t_cleanup = std::thread::spawn(move || {
426            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
427        });
428
429        let result = PubsubClientSubscription {
430            message_type: PhantomData,
431            operation: "block",
432            socket,
433            subscription_id,
434            t_cleanup: Some(t_cleanup),
435            exit,
436        };
437
438        Ok((result, receiver))
439    }
440
441    /// Subscribe to transaction log events.
442    ///
443    /// Receives messages of type [`RpcLogsResponse`] when a transaction is committed.
444    ///
445    /// # RPC Reference
446    ///
447    /// This method corresponds directly to the [`logsSubscribe`] RPC method.
448    ///
449    /// [`logsSubscribe`]: https://solana.com/docs/rpc/websocket/logssubscribe
450    pub fn logs_subscribe(
451        url: &str,
452        filter: RpcTransactionLogsFilter,
453        config: RpcTransactionLogsConfig,
454    ) -> Result<LogsSubscription, PubsubClientError> {
455        let url = Url::parse(url)?;
456        let socket = connect_with_retry(url)?;
457        let (sender, receiver) = unbounded();
458
459        let socket = Arc::new(RwLock::new(socket));
460        let socket_clone = socket.clone();
461        let exit = Arc::new(AtomicBool::new(false));
462        let exit_clone = exit.clone();
463        let body = json!({
464            "jsonrpc":"2.0",
465            "id":1,
466            "method":"logsSubscribe",
467            "params":[filter, config]
468        })
469        .to_string();
470
471        let subscription_id = PubsubLogsClientSubscription::send_subscribe(&socket_clone, body)?;
472
473        let t_cleanup = std::thread::spawn(move || {
474            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
475        });
476
477        let result = PubsubClientSubscription {
478            message_type: PhantomData,
479            operation: "logs",
480            socket,
481            subscription_id,
482            t_cleanup: Some(t_cleanup),
483            exit,
484        };
485
486        Ok((result, receiver))
487    }
488
489    /// Subscribe to program account events.
490    ///
491    /// Receives messages of type [`RpcKeyedAccount`] when an account owned
492    /// by the given program changes.
493    ///
494    /// # RPC Reference
495    ///
496    /// This method corresponds directly to the [`programSubscribe`] RPC method.
497    ///
498    /// [`programSubscribe`]: https://solana.com/docs/rpc/websocket/programsubscribe
499    pub fn program_subscribe(
500        url: &str,
501        pubkey: &Pubkey,
502        config: Option<RpcProgramAccountsConfig>,
503    ) -> Result<ProgramSubscription, PubsubClientError> {
504        let url = Url::parse(url)?;
505        let socket = connect_with_retry(url)?;
506        let (sender, receiver) = unbounded();
507
508        let socket = Arc::new(RwLock::new(socket));
509        let socket_clone = socket.clone();
510        let exit = Arc::new(AtomicBool::new(false));
511        let exit_clone = exit.clone();
512
513        let body = json!({
514            "jsonrpc":"2.0",
515            "id":1,
516            "method":"programSubscribe",
517            "params":[
518                pubkey.to_string(),
519                config
520            ]
521        })
522        .to_string();
523        let subscription_id = PubsubProgramClientSubscription::send_subscribe(&socket_clone, body)?;
524
525        let t_cleanup = std::thread::spawn(move || {
526            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
527        });
528
529        let result = PubsubClientSubscription {
530            message_type: PhantomData,
531            operation: "program",
532            socket,
533            subscription_id,
534            t_cleanup: Some(t_cleanup),
535            exit,
536        };
537
538        Ok((result, receiver))
539    }
540
541    /// Subscribe to vote events.
542    ///
543    /// Receives messages of type [`RpcVote`] when a new vote is observed. These
544    /// votes are observed prior to confirmation and may never be confirmed.
545    ///
546    /// This method is disabled by default. It can be enabled by passing
547    /// `--rpc-pubsub-enable-vote-subscription` to `agave-validator`.
548    ///
549    /// # RPC Reference
550    ///
551    /// This method corresponds directly to the [`voteSubscribe`] RPC method.
552    ///
553    /// [`voteSubscribe`]: https://solana.com/docs/rpc/websocket/votesubscribe
554    pub fn vote_subscribe(url: &str) -> Result<VoteSubscription, PubsubClientError> {
555        let url = Url::parse(url)?;
556        let socket = connect_with_retry(url)?;
557        let (sender, receiver) = unbounded();
558
559        let socket = Arc::new(RwLock::new(socket));
560        let socket_clone = socket.clone();
561        let exit = Arc::new(AtomicBool::new(false));
562        let exit_clone = exit.clone();
563        let body = json!({
564            "jsonrpc":"2.0",
565            "id":1,
566            "method":"voteSubscribe",
567        })
568        .to_string();
569        let subscription_id = PubsubVoteClientSubscription::send_subscribe(&socket_clone, body)?;
570
571        let t_cleanup = std::thread::spawn(move || {
572            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
573        });
574
575        let result = PubsubClientSubscription {
576            message_type: PhantomData,
577            operation: "vote",
578            socket,
579            subscription_id,
580            t_cleanup: Some(t_cleanup),
581            exit,
582        };
583
584        Ok((result, receiver))
585    }
586
587    /// Subscribe to root events.
588    ///
589    /// Receives messages of type [`Slot`] when a new [root] is set by the
590    /// validator.
591    ///
592    /// [root]: https://solana.com/docs/terminology#root
593    ///
594    /// # RPC Reference
595    ///
596    /// This method corresponds directly to the [`rootSubscribe`] RPC method.
597    ///
598    /// [`rootSubscribe`]: https://solana.com/docs/rpc/websocket/rootsubscribe
599    pub fn root_subscribe(url: &str) -> Result<RootSubscription, PubsubClientError> {
600        let url = Url::parse(url)?;
601        let socket = connect_with_retry(url)?;
602        let (sender, receiver) = unbounded();
603
604        let socket = Arc::new(RwLock::new(socket));
605        let socket_clone = socket.clone();
606        let exit = Arc::new(AtomicBool::new(false));
607        let exit_clone = exit.clone();
608        let body = json!({
609            "jsonrpc":"2.0",
610            "id":1,
611            "method":"rootSubscribe",
612        })
613        .to_string();
614        let subscription_id = PubsubRootClientSubscription::send_subscribe(&socket_clone, body)?;
615
616        let t_cleanup = std::thread::spawn(move || {
617            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
618        });
619
620        let result = PubsubClientSubscription {
621            message_type: PhantomData,
622            operation: "root",
623            socket,
624            subscription_id,
625            t_cleanup: Some(t_cleanup),
626            exit,
627        };
628
629        Ok((result, receiver))
630    }
631
632    /// Subscribe to transaction confirmation events.
633    ///
634    /// Receives messages of type [`RpcSignatureResult`] when a transaction
635    /// with the given signature is committed.
636    ///
637    /// This is a subscription to a single notification. It is automatically
638    /// cancelled by the server once the notification is sent.
639    ///
640    /// # RPC Reference
641    ///
642    /// This method corresponds directly to the [`signatureSubscribe`] RPC method.
643    ///
644    /// [`signatureSubscribe`]: https://solana.com/docs/rpc/websocket/signaturesubscribe
645    pub fn signature_subscribe(
646        url: &str,
647        signature: &Signature,
648        config: Option<RpcSignatureSubscribeConfig>,
649    ) -> Result<SignatureSubscription, PubsubClientError> {
650        let url = Url::parse(url)?;
651        let socket = connect_with_retry(url)?;
652        let (sender, receiver) = unbounded();
653
654        let socket = Arc::new(RwLock::new(socket));
655        let socket_clone = socket.clone();
656        let exit = Arc::new(AtomicBool::new(false));
657        let exit_clone = exit.clone();
658        let body = json!({
659            "jsonrpc":"2.0",
660            "id":1,
661            "method":"signatureSubscribe",
662            "params":[
663                signature.to_string(),
664                config
665            ]
666        })
667        .to_string();
668        let subscription_id =
669            PubsubSignatureClientSubscription::send_subscribe(&socket_clone, body)?;
670
671        let t_cleanup = std::thread::spawn(move || {
672            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
673        });
674
675        let result = PubsubClientSubscription {
676            message_type: PhantomData,
677            operation: "signature",
678            socket,
679            subscription_id,
680            t_cleanup: Some(t_cleanup),
681            exit,
682        };
683
684        Ok((result, receiver))
685    }
686
687    /// Subscribe to slot events.
688    ///
689    /// Receives messages of type [`SlotInfo`] when a slot is processed.
690    ///
691    /// # RPC Reference
692    ///
693    /// This method corresponds directly to the [`slotSubscribe`] RPC method.
694    ///
695    /// [`slotSubscribe`]: https://solana.com/docs/rpc/websocket/slotsubscribe
696    pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
697        let url = Url::parse(url)?;
698        let socket = connect_with_retry(url)?;
699        let (sender, receiver) = unbounded::<SlotInfo>();
700
701        let socket = Arc::new(RwLock::new(socket));
702        let socket_clone = socket.clone();
703        let exit = Arc::new(AtomicBool::new(false));
704        let exit_clone = exit.clone();
705        let body = json!({
706            "jsonrpc":"2.0",
707            "id":1,
708            "method":"slotSubscribe",
709            "params":[]
710        })
711        .to_string();
712        let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket_clone, body)?;
713
714        let t_cleanup = std::thread::spawn(move || {
715            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
716        });
717
718        let result = PubsubClientSubscription {
719            message_type: PhantomData,
720            operation: "slot",
721            socket,
722            subscription_id,
723            t_cleanup: Some(t_cleanup),
724            exit,
725        };
726
727        Ok((result, receiver))
728    }
729
730    /// Subscribe to slot update events.
731    ///
732    /// Receives messages of type [`SlotUpdate`] when various updates to a slot occur.
733    ///
734    /// Note that this method operates differently than other subscriptions:
735    /// instead of sending the message to a receiver on a channel, it accepts a
736    /// `handler` callback that processes the message directly. This processing
737    /// occurs on another thread.
738    ///
739    /// # RPC Reference
740    ///
741    /// This method corresponds directly to the [`slotUpdatesSubscribe`] RPC method.
742    ///
743    /// [`slotUpdatesSubscribe`]: https://solana.com/docs/rpc/websocket/slotsupdatessubscribe
744    pub fn slot_updates_subscribe(
745        url: &str,
746        handler: impl Fn(SlotUpdate) + Send + 'static,
747    ) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
748        let url = Url::parse(url)?;
749        let socket = connect_with_retry(url)?;
750
751        let socket = Arc::new(RwLock::new(socket));
752        let socket_clone = socket.clone();
753        let exit = Arc::new(AtomicBool::new(false));
754        let exit_clone = exit.clone();
755        let body = json!({
756            "jsonrpc":"2.0",
757            "id":1,
758            "method":"slotsUpdatesSubscribe",
759            "params":[]
760        })
761        .to_string();
762        let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket, body)?;
763
764        let t_cleanup = std::thread::spawn(move || {
765            Self::cleanup_with_handler(exit_clone, &socket_clone, handler)
766        });
767
768        Ok(PubsubClientSubscription {
769            message_type: PhantomData,
770            operation: "slotsUpdates",
771            socket,
772            subscription_id,
773            t_cleanup: Some(t_cleanup),
774            exit,
775        })
776    }
777
778    fn cleanup_with_sender<T>(
779        exit: Arc<AtomicBool>,
780        socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
781        sender: Sender<T>,
782    ) where
783        T: DeserializeOwned + Send + 'static,
784    {
785        let handler = move |message| match sender.send(message) {
786            Ok(_) => (),
787            Err(err) => {
788                info!("receive error: {err:?}");
789            }
790        };
791        Self::cleanup_with_handler(exit, socket, handler);
792    }
793
794    fn cleanup_with_handler<T, F>(
795        exit: Arc<AtomicBool>,
796        socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
797        handler: F,
798    ) where
799        T: DeserializeOwned,
800        F: Fn(T) + Send + 'static,
801    {
802        loop {
803            if exit.load(Ordering::Relaxed) {
804                break;
805            }
806
807            match PubsubClientSubscription::read_message(socket) {
808                Ok(Some(message)) => handler(message),
809                Ok(None) => {
810                    // Nothing useful, means we received a ping message
811                }
812                Err(err) => {
813                    info!("receive error: {err:?}");
814                    break;
815                }
816            }
817        }
818
819        info!("websocket - exited receive loop");
820    }
821}
822
823#[cfg(test)]
824mod tests {
825    // see client-test/test/client.rs
826}