solana_pubsub_client/
pubsub_client.rs

1//! A client for subscribing to messages from the RPC server.
2//!
3//! The [`PubsubClient`] implements [Solana WebSocket event
4//! subscriptions][spec].
5//!
6//! [spec]: https://solana.com/docs/rpc/websocket
7//!
8//! This is a blocking API. For a non-blocking API use the asynchronous client
9//! in [`crate::nonblocking::pubsub_client`].
10//!
11//! `PubsubClient` contains static methods to subscribe to events, like
12//! [`PubsubClient::account_subscribe`]. These methods each return their own
13//! subscription type, like [`AccountSubscription`], that are typedefs of
14//! tuples, the first element being a handle to the subscription, like
15//! [`AccountSubscription`], the second a [`Receiver`] of [`RpcResponse`] of
16//! whichever type is appropriate for the subscription. The subscription handle
17//! is a typedef of [`PubsubClientSubscription`], and it must remain live for
18//! the receiver to continue receiving messages.
19//!
20//! Because this is a blocking API, with blocking receivers, a reasonable
21//! pattern for using this API is to move each event receiver to its own thread
22//! to block on messages, while holding all subscription handles on a single
23//! primary thread.
24//!
25//! While `PubsubClientSubscription` contains methods for shutting down,
26//! [`PubsubClientSubscription::send_unsubscribe`], and
27//! [`PubsubClientSubscription::shutdown`], because its internal receivers block
28//! on events from the server, these subscriptions cannot actually be shutdown
29//! reliably. For a non-blocking, cancelable API, use the asynchronous client
30//! in [`crate::nonblocking::pubsub_client`].
31//!
32//! By default the [`block_subscribe`] and [`vote_subscribe`] events are
33//! disabled on RPC nodes. They can be enabled by passing
34//! `--rpc-pubsub-enable-block-subscription` and
35//! `--rpc-pubsub-enable-vote-subscription` to `agave-validator`. When these
36//! methods are disabled, the RPC server will return a "Method not found" error
37//! message.
38//!
39//! [`block_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.block_subscribe
40//! [`vote_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.vote_subscribe
41//!
42//! # Examples
43//!
44//! This example subscribes to account events and then loops forever receiving
45//! them.
46//!
47//! ```
48//! use anyhow::Result;
49//! use solana_commitment_config::CommitmentConfig;
50//! use solana_pubkey::Pubkey;
51//! use solana_pubsub_client::pubsub_client::PubsubClient;
52//! use solana_rpc_client_types::config::RpcAccountInfoConfig;
53//! use std::thread;
54//!
55//! fn get_account_updates(account_pubkey: Pubkey) -> Result<()> {
56//!     let url = "wss://api.devnet.solana.com/";
57//!
58//!     let (mut account_subscription_client, account_subscription_receiver) =
59//!         PubsubClient::account_subscribe(
60//!             url,
61//!             &account_pubkey,
62//!             Some(RpcAccountInfoConfig {
63//!                 encoding: None,
64//!                 data_slice: None,
65//!                 commitment: Some(CommitmentConfig::confirmed()),
66//!                 min_context_slot: None,
67//!             }),
68//!         )?;
69//!
70//!     loop {
71//!         match account_subscription_receiver.recv() {
72//!             Ok(response) => {
73//!                 println!("account subscription response: {:?}", response);
74//!             }
75//!             Err(e) => {
76//!                 println!("account subscription error: {:?}", e);
77//!                 break;
78//!             }
79//!         }
80//!     }
81//!
82//!     Ok(())
83//! }
84//! #
85//! # get_account_updates(solana_pubkey::new_rand());
86//! # Ok::<(), anyhow::Error>(())
87//! ```
88
89pub use crate::nonblocking::pubsub_client::PubsubClientError;
90use {
91    crossbeam_channel::{unbounded, Receiver, Sender},
92    log::*,
93    serde::de::DeserializeOwned,
94    serde_json::{
95        json,
96        value::Value::{Number, Object},
97        Map, Value,
98    },
99    solana_account_decoder_client_types::UiAccount,
100    solana_clock::Slot,
101    solana_pubkey::Pubkey,
102    solana_rpc_client_types::{
103        config::{
104            RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
105            RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
106            RpcTransactionLogsFilter,
107        },
108        response::{
109            Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
110            RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
111        },
112    },
113    solana_signature::Signature,
114    std::{
115        marker::PhantomData,
116        net::TcpStream,
117        sync::{
118            atomic::{AtomicBool, Ordering},
119            Arc, RwLock,
120        },
121        thread::{sleep, JoinHandle},
122        time::Duration,
123    },
124    tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
125    url::Url,
126};
127
128/// A subscription.
129///
130/// The subscription is unsubscribed on drop, and note that unsubscription (and
131/// thus drop) time is unbounded. See
132/// [`PubsubClientSubscription::send_unsubscribe`].
133pub struct PubsubClientSubscription<T>
134where
135    T: DeserializeOwned,
136{
137    message_type: PhantomData<T>,
138    operation: &'static str,
139    socket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
140    subscription_id: u64,
141    t_cleanup: Option<JoinHandle<()>>,
142    exit: Arc<AtomicBool>,
143}
144
145impl<T> Drop for PubsubClientSubscription<T>
146where
147    T: DeserializeOwned,
148{
149    fn drop(&mut self) {
150        self.send_unsubscribe()
151            .unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
152        self.socket
153            .write()
154            .unwrap()
155            .close(None)
156            .unwrap_or_else(|_| warn!("unable to close websocket"));
157    }
158}
159
160impl<T> PubsubClientSubscription<T>
161where
162    T: DeserializeOwned,
163{
164    fn send_subscribe(
165        writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
166        body: String,
167    ) -> Result<u64, PubsubClientError> {
168        writable_socket.write().unwrap().send(Message::Text(body))?;
169        let message = writable_socket.write().unwrap().read()?;
170        Self::extract_subscription_id(message)
171    }
172
173    fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
174        let message_text = &message.into_text()?;
175
176        if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
177            if let Some(Number(x)) = json_msg.get("result") {
178                if let Some(x) = x.as_u64() {
179                    return Ok(x);
180                }
181            }
182        }
183
184        Err(PubsubClientError::UnexpectedSubscriptionResponse(format!(
185            "msg={message_text}"
186        )))
187    }
188
189    /// Send an unsubscribe message to the server.
190    ///
191    /// Note that this will block as long as the internal subscription receiver
192    /// is waiting on messages from the server, and this can take an unbounded
193    /// amount of time if the server does not send any messages.
194    ///
195    /// If a pubsub client needs to shutdown reliably it should use
196    /// the async client in [`crate::nonblocking::pubsub_client`].
197    pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
198        let method = format!("{}Unsubscribe", self.operation);
199        self.socket
200            .write()
201            .unwrap()
202            .send(Message::Text(
203                json!({
204                "jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
205                })
206                .to_string(),
207            ))
208            .map_err(|err| err.into())
209    }
210
211    fn read_message(
212        writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
213    ) -> Result<Option<T>, PubsubClientError> {
214        let message = writable_socket.write().unwrap().read()?;
215        if message.is_ping() {
216            return Ok(None);
217        }
218        let message_text = &message.into_text()?;
219        if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
220            if let Some(Object(params)) = json_msg.get("params") {
221                if let Some(result) = params.get("result") {
222                    if let Ok(x) = serde_json::from_value::<T>(result.clone()) {
223                        return Ok(Some(x));
224                    }
225                }
226            }
227        }
228
229        Err(PubsubClientError::UnexpectedMessageError(format!(
230            "msg={message_text}"
231        )))
232    }
233
234    /// Shutdown the internel message receiver and wait for its thread to exit.
235    ///
236    /// Note that this will block as long as the subscription receiver is
237    /// waiting on messages from the server, and this can take an unbounded
238    /// amount of time if the server does not send any messages.
239    ///
240    /// If a pubsub client needs to shutdown reliably it should use
241    /// the async client in [`crate::nonblocking::pubsub_client`].
242    pub fn shutdown(&mut self) -> std::thread::Result<()> {
243        if self.t_cleanup.is_some() {
244            info!("websocket thread - shutting down");
245            self.exit.store(true, Ordering::Relaxed);
246            let x = self.t_cleanup.take().unwrap().join();
247            info!("websocket thread - shut down.");
248            x
249        } else {
250            warn!("websocket thread - already shut down.");
251            Ok(())
252        }
253    }
254}
255
256pub type PubsubLogsClientSubscription = PubsubClientSubscription<RpcResponse<RpcLogsResponse>>;
257pub type LogsSubscription = (
258    PubsubLogsClientSubscription,
259    Receiver<RpcResponse<RpcLogsResponse>>,
260);
261
262pub type PubsubSlotClientSubscription = PubsubClientSubscription<SlotInfo>;
263pub type SlotsSubscription = (PubsubSlotClientSubscription, Receiver<SlotInfo>);
264
265pub type PubsubSignatureClientSubscription =
266    PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
267pub type SignatureSubscription = (
268    PubsubSignatureClientSubscription,
269    Receiver<RpcResponse<RpcSignatureResult>>,
270);
271
272pub type PubsubBlockClientSubscription = PubsubClientSubscription<RpcResponse<RpcBlockUpdate>>;
273pub type BlockSubscription = (
274    PubsubBlockClientSubscription,
275    Receiver<RpcResponse<RpcBlockUpdate>>,
276);
277
278pub type PubsubProgramClientSubscription = PubsubClientSubscription<RpcResponse<RpcKeyedAccount>>;
279pub type ProgramSubscription = (
280    PubsubProgramClientSubscription,
281    Receiver<RpcResponse<RpcKeyedAccount>>,
282);
283
284pub type PubsubAccountClientSubscription = PubsubClientSubscription<RpcResponse<UiAccount>>;
285pub type AccountSubscription = (
286    PubsubAccountClientSubscription,
287    Receiver<RpcResponse<UiAccount>>,
288);
289
290pub type PubsubVoteClientSubscription = PubsubClientSubscription<RpcVote>;
291pub type VoteSubscription = (PubsubVoteClientSubscription, Receiver<RpcVote>);
292
293pub type PubsubRootClientSubscription = PubsubClientSubscription<Slot>;
294pub type RootSubscription = (PubsubRootClientSubscription, Receiver<Slot>);
295
296/// A client for subscribing to messages from the RPC server.
297///
298/// See the [module documentation][self].
299pub struct PubsubClient {}
300
301fn connect_with_retry(
302    url: Url,
303) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Error> {
304    let mut connection_retries = 5;
305    loop {
306        let result = connect(url.clone()).map(|(socket, _)| socket);
307        if let Err(tungstenite::Error::Http(response)) = &result {
308            if response.status() == http::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0 {
309                let mut duration = Duration::from_millis(500);
310                if let Some(retry_after) = response.headers().get(http::header::RETRY_AFTER) {
311                    if let Ok(retry_after) = retry_after.to_str() {
312                        if let Ok(retry_after) = retry_after.parse::<u64>() {
313                            if retry_after < 120 {
314                                duration = Duration::from_secs(retry_after);
315                            }
316                        }
317                    }
318                }
319
320                connection_retries -= 1;
321                debug!(
322                    "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
323                    response, connection_retries, duration
324                );
325
326                sleep(duration);
327                continue;
328            }
329        }
330        return result;
331    }
332}
333
334impl PubsubClient {
335    /// Subscribe to account events.
336    ///
337    /// Receives messages of type [`UiAccount`] when an account's lamports or data changes.
338    ///
339    /// # RPC Reference
340    ///
341    /// This method corresponds directly to the [`accountSubscribe`] RPC method.
342    ///
343    /// [`accountSubscribe`]: https://solana.com/docs/rpc/websocket/accountsubscribe
344    pub fn account_subscribe(
345        url: &str,
346        pubkey: &Pubkey,
347        config: Option<RpcAccountInfoConfig>,
348    ) -> Result<AccountSubscription, PubsubClientError> {
349        let url = Url::parse(url)?;
350        let socket = connect_with_retry(url)?;
351        let (sender, receiver) = unbounded();
352
353        let socket = Arc::new(RwLock::new(socket));
354        let socket_clone = socket.clone();
355        let exit = Arc::new(AtomicBool::new(false));
356        let exit_clone = exit.clone();
357        let body = json!({
358            "jsonrpc":"2.0",
359            "id":1,
360            "method":"accountSubscribe",
361            "params":[
362                pubkey.to_string(),
363                config
364            ]
365        })
366        .to_string();
367        let subscription_id = PubsubAccountClientSubscription::send_subscribe(&socket_clone, body)?;
368
369        let t_cleanup = std::thread::spawn(move || {
370            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
371        });
372
373        let result = PubsubClientSubscription {
374            message_type: PhantomData,
375            operation: "account",
376            socket,
377            subscription_id,
378            t_cleanup: Some(t_cleanup),
379            exit,
380        };
381
382        Ok((result, receiver))
383    }
384
385    /// Subscribe to block events.
386    ///
387    /// Receives messages of type [`RpcBlockUpdate`] when a block is confirmed or finalized.
388    ///
389    /// This method is disabled by default. It can be enabled by passing
390    /// `--rpc-pubsub-enable-block-subscription` to `agave-validator`.
391    ///
392    /// # RPC Reference
393    ///
394    /// This method corresponds directly to the [`blockSubscribe`] RPC method.
395    ///
396    /// [`blockSubscribe`]: https://solana.com/docs/rpc/websocket/blocksubscribe
397    pub fn block_subscribe(
398        url: &str,
399        filter: RpcBlockSubscribeFilter,
400        config: Option<RpcBlockSubscribeConfig>,
401    ) -> Result<BlockSubscription, PubsubClientError> {
402        let url = Url::parse(url)?;
403        let socket = connect_with_retry(url)?;
404        let (sender, receiver) = unbounded();
405
406        let socket = Arc::new(RwLock::new(socket));
407        let socket_clone = socket.clone();
408        let exit = Arc::new(AtomicBool::new(false));
409        let exit_clone = exit.clone();
410        let body = json!({
411            "jsonrpc":"2.0",
412            "id":1,
413            "method":"blockSubscribe",
414            "params":[filter, config]
415        })
416        .to_string();
417
418        let subscription_id = PubsubBlockClientSubscription::send_subscribe(&socket_clone, body)?;
419
420        let t_cleanup = std::thread::spawn(move || {
421            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
422        });
423
424        let result = PubsubClientSubscription {
425            message_type: PhantomData,
426            operation: "block",
427            socket,
428            subscription_id,
429            t_cleanup: Some(t_cleanup),
430            exit,
431        };
432
433        Ok((result, receiver))
434    }
435
436    /// Subscribe to transaction log events.
437    ///
438    /// Receives messages of type [`RpcLogsResponse`] when a transaction is committed.
439    ///
440    /// # RPC Reference
441    ///
442    /// This method corresponds directly to the [`logsSubscribe`] RPC method.
443    ///
444    /// [`logsSubscribe`]: https://solana.com/docs/rpc/websocket/logssubscribe
445    pub fn logs_subscribe(
446        url: &str,
447        filter: RpcTransactionLogsFilter,
448        config: RpcTransactionLogsConfig,
449    ) -> Result<LogsSubscription, PubsubClientError> {
450        let url = Url::parse(url)?;
451        let socket = connect_with_retry(url)?;
452        let (sender, receiver) = unbounded();
453
454        let socket = Arc::new(RwLock::new(socket));
455        let socket_clone = socket.clone();
456        let exit = Arc::new(AtomicBool::new(false));
457        let exit_clone = exit.clone();
458        let body = json!({
459            "jsonrpc":"2.0",
460            "id":1,
461            "method":"logsSubscribe",
462            "params":[filter, config]
463        })
464        .to_string();
465
466        let subscription_id = PubsubLogsClientSubscription::send_subscribe(&socket_clone, body)?;
467
468        let t_cleanup = std::thread::spawn(move || {
469            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
470        });
471
472        let result = PubsubClientSubscription {
473            message_type: PhantomData,
474            operation: "logs",
475            socket,
476            subscription_id,
477            t_cleanup: Some(t_cleanup),
478            exit,
479        };
480
481        Ok((result, receiver))
482    }
483
484    /// Subscribe to program account events.
485    ///
486    /// Receives messages of type [`RpcKeyedAccount`] when an account owned
487    /// by the given program changes.
488    ///
489    /// # RPC Reference
490    ///
491    /// This method corresponds directly to the [`programSubscribe`] RPC method.
492    ///
493    /// [`programSubscribe`]: https://solana.com/docs/rpc/websocket/programsubscribe
494    pub fn program_subscribe(
495        url: &str,
496        pubkey: &Pubkey,
497        config: Option<RpcProgramAccountsConfig>,
498    ) -> Result<ProgramSubscription, PubsubClientError> {
499        let url = Url::parse(url)?;
500        let socket = connect_with_retry(url)?;
501        let (sender, receiver) = unbounded();
502
503        let socket = Arc::new(RwLock::new(socket));
504        let socket_clone = socket.clone();
505        let exit = Arc::new(AtomicBool::new(false));
506        let exit_clone = exit.clone();
507
508        let body = json!({
509            "jsonrpc":"2.0",
510            "id":1,
511            "method":"programSubscribe",
512            "params":[
513                pubkey.to_string(),
514                config
515            ]
516        })
517        .to_string();
518        let subscription_id = PubsubProgramClientSubscription::send_subscribe(&socket_clone, body)?;
519
520        let t_cleanup = std::thread::spawn(move || {
521            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
522        });
523
524        let result = PubsubClientSubscription {
525            message_type: PhantomData,
526            operation: "program",
527            socket,
528            subscription_id,
529            t_cleanup: Some(t_cleanup),
530            exit,
531        };
532
533        Ok((result, receiver))
534    }
535
536    /// Subscribe to vote events.
537    ///
538    /// Receives messages of type [`RpcVote`] when a new vote is observed. These
539    /// votes are observed prior to confirmation and may never be confirmed.
540    ///
541    /// This method is disabled by default. It can be enabled by passing
542    /// `--rpc-pubsub-enable-vote-subscription` to `agave-validator`.
543    ///
544    /// # RPC Reference
545    ///
546    /// This method corresponds directly to the [`voteSubscribe`] RPC method.
547    ///
548    /// [`voteSubscribe`]: https://solana.com/docs/rpc/websocket/votesubscribe
549    pub fn vote_subscribe(url: &str) -> Result<VoteSubscription, PubsubClientError> {
550        let url = Url::parse(url)?;
551        let socket = connect_with_retry(url)?;
552        let (sender, receiver) = unbounded();
553
554        let socket = Arc::new(RwLock::new(socket));
555        let socket_clone = socket.clone();
556        let exit = Arc::new(AtomicBool::new(false));
557        let exit_clone = exit.clone();
558        let body = json!({
559            "jsonrpc":"2.0",
560            "id":1,
561            "method":"voteSubscribe",
562        })
563        .to_string();
564        let subscription_id = PubsubVoteClientSubscription::send_subscribe(&socket_clone, body)?;
565
566        let t_cleanup = std::thread::spawn(move || {
567            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
568        });
569
570        let result = PubsubClientSubscription {
571            message_type: PhantomData,
572            operation: "vote",
573            socket,
574            subscription_id,
575            t_cleanup: Some(t_cleanup),
576            exit,
577        };
578
579        Ok((result, receiver))
580    }
581
582    /// Subscribe to root events.
583    ///
584    /// Receives messages of type [`Slot`] when a new [root] is set by the
585    /// validator.
586    ///
587    /// [root]: https://solana.com/docs/terminology#root
588    ///
589    /// # RPC Reference
590    ///
591    /// This method corresponds directly to the [`rootSubscribe`] RPC method.
592    ///
593    /// [`rootSubscribe`]: https://solana.com/docs/rpc/websocket/rootsubscribe
594    pub fn root_subscribe(url: &str) -> Result<RootSubscription, PubsubClientError> {
595        let url = Url::parse(url)?;
596        let socket = connect_with_retry(url)?;
597        let (sender, receiver) = unbounded();
598
599        let socket = Arc::new(RwLock::new(socket));
600        let socket_clone = socket.clone();
601        let exit = Arc::new(AtomicBool::new(false));
602        let exit_clone = exit.clone();
603        let body = json!({
604            "jsonrpc":"2.0",
605            "id":1,
606            "method":"rootSubscribe",
607        })
608        .to_string();
609        let subscription_id = PubsubRootClientSubscription::send_subscribe(&socket_clone, body)?;
610
611        let t_cleanup = std::thread::spawn(move || {
612            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
613        });
614
615        let result = PubsubClientSubscription {
616            message_type: PhantomData,
617            operation: "root",
618            socket,
619            subscription_id,
620            t_cleanup: Some(t_cleanup),
621            exit,
622        };
623
624        Ok((result, receiver))
625    }
626
627    /// Subscribe to transaction confirmation events.
628    ///
629    /// Receives messages of type [`RpcSignatureResult`] when a transaction
630    /// with the given signature is committed.
631    ///
632    /// This is a subscription to a single notification. It is automatically
633    /// cancelled by the server once the notification is sent.
634    ///
635    /// # RPC Reference
636    ///
637    /// This method corresponds directly to the [`signatureSubscribe`] RPC method.
638    ///
639    /// [`signatureSubscribe`]: https://solana.com/docs/rpc/websocket/signaturesubscribe
640    pub fn signature_subscribe(
641        url: &str,
642        signature: &Signature,
643        config: Option<RpcSignatureSubscribeConfig>,
644    ) -> Result<SignatureSubscription, PubsubClientError> {
645        let url = Url::parse(url)?;
646        let socket = connect_with_retry(url)?;
647        let (sender, receiver) = unbounded();
648
649        let socket = Arc::new(RwLock::new(socket));
650        let socket_clone = socket.clone();
651        let exit = Arc::new(AtomicBool::new(false));
652        let exit_clone = exit.clone();
653        let body = json!({
654            "jsonrpc":"2.0",
655            "id":1,
656            "method":"signatureSubscribe",
657            "params":[
658                signature.to_string(),
659                config
660            ]
661        })
662        .to_string();
663        let subscription_id =
664            PubsubSignatureClientSubscription::send_subscribe(&socket_clone, body)?;
665
666        let t_cleanup = std::thread::spawn(move || {
667            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
668        });
669
670        let result = PubsubClientSubscription {
671            message_type: PhantomData,
672            operation: "signature",
673            socket,
674            subscription_id,
675            t_cleanup: Some(t_cleanup),
676            exit,
677        };
678
679        Ok((result, receiver))
680    }
681
682    /// Subscribe to slot events.
683    ///
684    /// Receives messages of type [`SlotInfo`] when a slot is processed.
685    ///
686    /// # RPC Reference
687    ///
688    /// This method corresponds directly to the [`slotSubscribe`] RPC method.
689    ///
690    /// [`slotSubscribe`]: https://solana.com/docs/rpc/websocket/slotsubscribe
691    pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
692        let url = Url::parse(url)?;
693        let socket = connect_with_retry(url)?;
694        let (sender, receiver) = unbounded::<SlotInfo>();
695
696        let socket = Arc::new(RwLock::new(socket));
697        let socket_clone = socket.clone();
698        let exit = Arc::new(AtomicBool::new(false));
699        let exit_clone = exit.clone();
700        let body = json!({
701            "jsonrpc":"2.0",
702            "id":1,
703            "method":"slotSubscribe",
704            "params":[]
705        })
706        .to_string();
707        let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket_clone, body)?;
708
709        let t_cleanup = std::thread::spawn(move || {
710            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
711        });
712
713        let result = PubsubClientSubscription {
714            message_type: PhantomData,
715            operation: "slot",
716            socket,
717            subscription_id,
718            t_cleanup: Some(t_cleanup),
719            exit,
720        };
721
722        Ok((result, receiver))
723    }
724
725    /// Subscribe to slot update events.
726    ///
727    /// Receives messages of type [`SlotUpdate`] when various updates to a slot occur.
728    ///
729    /// Note that this method operates differently than other subscriptions:
730    /// instead of sending the message to a receiver on a channel, it accepts a
731    /// `handler` callback that processes the message directly. This processing
732    /// occurs on another thread.
733    ///
734    /// # RPC Reference
735    ///
736    /// This method corresponds directly to the [`slotUpdatesSubscribe`] RPC method.
737    ///
738    /// [`slotUpdatesSubscribe`]: https://solana.com/docs/rpc/websocket/slotsupdatessubscribe
739    pub fn slot_updates_subscribe(
740        url: &str,
741        handler: impl Fn(SlotUpdate) + Send + 'static,
742    ) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
743        let url = Url::parse(url)?;
744        let socket = connect_with_retry(url)?;
745
746        let socket = Arc::new(RwLock::new(socket));
747        let socket_clone = socket.clone();
748        let exit = Arc::new(AtomicBool::new(false));
749        let exit_clone = exit.clone();
750        let body = json!({
751            "jsonrpc":"2.0",
752            "id":1,
753            "method":"slotsUpdatesSubscribe",
754            "params":[]
755        })
756        .to_string();
757        let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket, body)?;
758
759        let t_cleanup = std::thread::spawn(move || {
760            Self::cleanup_with_handler(exit_clone, &socket_clone, handler)
761        });
762
763        Ok(PubsubClientSubscription {
764            message_type: PhantomData,
765            operation: "slotsUpdates",
766            socket,
767            subscription_id,
768            t_cleanup: Some(t_cleanup),
769            exit,
770        })
771    }
772
773    fn cleanup_with_sender<T>(
774        exit: Arc<AtomicBool>,
775        socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
776        sender: Sender<T>,
777    ) where
778        T: DeserializeOwned + Send + 'static,
779    {
780        let handler = move |message| match sender.send(message) {
781            Ok(_) => (),
782            Err(err) => {
783                info!("receive error: {:?}", err);
784            }
785        };
786        Self::cleanup_with_handler(exit, socket, handler);
787    }
788
789    fn cleanup_with_handler<T, F>(
790        exit: Arc<AtomicBool>,
791        socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
792        handler: F,
793    ) where
794        T: DeserializeOwned,
795        F: Fn(T) + Send + 'static,
796    {
797        loop {
798            if exit.load(Ordering::Relaxed) {
799                break;
800            }
801
802            match PubsubClientSubscription::read_message(socket) {
803                Ok(Some(message)) => handler(message),
804                Ok(None) => {
805                    // Nothing useful, means we received a ping message
806                }
807                Err(err) => {
808                    info!("receive error: {:?}", err);
809                    break;
810                }
811            }
812        }
813
814        info!("websocket - exited receive loop");
815    }
816}
817
818#[cfg(test)]
819mod tests {
820    // see client-test/test/client.rs
821}