solana_pubsub_client/
pubsub_client.rs

1//! A client for subscribing to messages from the RPC server.
2//!
3//! The [`PubsubClient`] implements [Solana WebSocket event
4//! subscriptions][spec].
5//!
6//! [spec]: https://solana.com/docs/rpc/websocket
7//!
8//! This is a blocking API. For a non-blocking API use the asynchronous client
9//! in [`crate::nonblocking::pubsub_client`].
10//!
11//! `PubsubClient` contains static methods to subscribe to events, like
12//! [`PubsubClient::account_subscribe`]. These methods each return their own
13//! subscription type, like [`AccountSubscription`], that are typedefs of
14//! tuples, the first element being a handle to the subscription, like
15//! [`AccountSubscription`], the second a [`Receiver`] of [`RpcResponse`] of
16//! whichever type is appropriate for the subscription. The subscription handle
17//! is a typedef of [`PubsubClientSubscription`], and it must remain live for
18//! the receiver to continue receiving messages.
19//!
20//! Because this is a blocking API, with blocking receivers, a reasonable
21//! pattern for using this API is to move each event receiver to its own thread
22//! to block on messages, while holding all subscription handles on a single
23//! primary thread.
24//!
25//! While `PubsubClientSubscription` contains methods for shutting down,
26//! [`PubsubClientSubscription::send_unsubscribe`], and
27//! [`PubsubClientSubscription::shutdown`], because its internal receivers block
28//! on events from the server, these subscriptions cannot actually be shutdown
29//! reliably. For a non-blocking, cancelable API, use the asynchronous client
30//! in [`crate::nonblocking::pubsub_client`].
31//!
32//! By default the [`block_subscribe`] and [`vote_subscribe`] events are
33//! disabled on RPC nodes. They can be enabled by passing
34//! `--rpc-pubsub-enable-block-subscription` and
35//! `--rpc-pubsub-enable-vote-subscription` to `agave-validator`. When these
36//! methods are disabled, the RPC server will return a "Method not found" error
37//! message.
38//!
39//! [`block_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.block_subscribe
40//! [`vote_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.vote_subscribe
41//!
42//! # Examples
43//!
44//! This example subscribes to account events and then loops forever receiving
45//! them.
46//!
47//! ```
48//! use anyhow::Result;
49//! use solana_commitment_config::CommitmentConfig;
50//! use solana_pubkey::Pubkey;
51//! use solana_pubsub_client::pubsub_client::PubsubClient;
52//! use solana_rpc_client_types::config::RpcAccountInfoConfig;
53//! use std::thread;
54//!
55//! fn get_account_updates(account_pubkey: Pubkey) -> Result<()> {
56//!     let url = "wss://api.devnet.solana.com/";
57//!
58//!     let (mut account_subscription_client, account_subscription_receiver) =
59//!         PubsubClient::account_subscribe(
60//!             url,
61//!             &account_pubkey,
62//!             Some(RpcAccountInfoConfig {
63//!                 encoding: None,
64//!                 data_slice: None,
65//!                 commitment: Some(CommitmentConfig::confirmed()),
66//!                 min_context_slot: None,
67//!             }),
68//!         )?;
69//!
70//!     loop {
71//!         match account_subscription_receiver.recv() {
72//!             Ok(response) => {
73//!                 println!("account subscription response: {:?}", response);
74//!             }
75//!             Err(e) => {
76//!                 println!("account subscription error: {:?}", e);
77//!                 break;
78//!             }
79//!         }
80//!     }
81//!
82//!     Ok(())
83//! }
84//! #
85//! # get_account_updates(solana_pubkey::new_rand());
86//! # Ok::<(), anyhow::Error>(())
87//! ```
88
89pub use crate::nonblocking::pubsub_client::PubsubClientError;
90use {
91    crossbeam_channel::{unbounded, Receiver, Sender},
92    log::*,
93    serde::de::DeserializeOwned,
94    serde_json::{
95        json,
96        value::Value::{Number, Object},
97        Map, Value,
98    },
99    solana_account_decoder_client_types::UiAccount,
100    solana_clock::Slot,
101    solana_pubkey::Pubkey,
102    solana_rpc_client_types::{
103        config::{
104            RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
105            RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
106            RpcTransactionLogsFilter,
107        },
108        response::{
109            Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
110            RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
111        },
112    },
113    solana_signature::Signature,
114    std::{
115        marker::PhantomData,
116        net::TcpStream,
117        sync::{
118            atomic::{AtomicBool, Ordering},
119            Arc, RwLock,
120        },
121        thread::{sleep, JoinHandle},
122        time::Duration,
123    },
124    tungstenite::{
125        client::IntoClientRequest,
126        connect,
127        http::{header, StatusCode},
128        stream::MaybeTlsStream,
129        Message, WebSocket,
130    },
131};
132
133/// A subscription.
134///
135/// The subscription is unsubscribed on drop, and note that unsubscription (and
136/// thus drop) time is unbounded. See
137/// [`PubsubClientSubscription::send_unsubscribe`].
138pub struct PubsubClientSubscription<T>
139where
140    T: DeserializeOwned,
141{
142    message_type: PhantomData<T>,
143    operation: &'static str,
144    socket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
145    subscription_id: u64,
146    t_cleanup: Option<JoinHandle<()>>,
147    exit: Arc<AtomicBool>,
148}
149
150impl<T> Drop for PubsubClientSubscription<T>
151where
152    T: DeserializeOwned,
153{
154    fn drop(&mut self) {
155        self.send_unsubscribe()
156            .unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
157        self.socket
158            .write()
159            .unwrap()
160            .close(None)
161            .unwrap_or_else(|_| warn!("unable to close websocket"));
162    }
163}
164
165impl<T> PubsubClientSubscription<T>
166where
167    T: DeserializeOwned,
168{
169    fn send_subscribe(
170        writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
171        body: String,
172    ) -> Result<u64, PubsubClientError> {
173        writable_socket
174            .write()
175            .unwrap()
176            .send(Message::Text(body.into()))
177            .map_err(Box::new)?;
178        let message = writable_socket.write().unwrap().read().map_err(Box::new)?;
179        Self::extract_subscription_id(message)
180    }
181
182    fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
183        let message_text = &message.into_text().map_err(Box::new)?;
184
185        if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
186            if let Some(Number(x)) = json_msg.get("result") {
187                if let Some(x) = x.as_u64() {
188                    return Ok(x);
189                }
190            }
191        }
192
193        Err(PubsubClientError::UnexpectedSubscriptionResponse(format!(
194            "msg={message_text}"
195        )))
196    }
197
198    /// Send an unsubscribe message to the server.
199    ///
200    /// Note that this will block as long as the internal subscription receiver
201    /// is waiting on messages from the server, and this can take an unbounded
202    /// amount of time if the server does not send any messages.
203    ///
204    /// If a pubsub client needs to shutdown reliably it should use
205    /// the async client in [`crate::nonblocking::pubsub_client`].
206    pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
207        let method = format!("{}Unsubscribe", self.operation);
208        self.socket
209            .write()
210            .unwrap()
211            .send(Message::Text(
212                json!({
213                "jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
214                })
215                .to_string()
216                .into(),
217            ))
218            .map_err(Box::new)
219            .map_err(|err| err.into())
220    }
221
222    fn read_message(
223        writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
224    ) -> Result<Option<T>, PubsubClientError> {
225        let message = writable_socket.write().unwrap().read().map_err(Box::new)?;
226        if message.is_ping() {
227            return Ok(None);
228        }
229        let message_text = &message.into_text().map_err(Box::new)?;
230        if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
231            if let Some(Object(params)) = json_msg.get("params") {
232                if let Some(result) = params.get("result") {
233                    if let Ok(x) = serde_json::from_value::<T>(result.clone()) {
234                        return Ok(Some(x));
235                    }
236                }
237            }
238        }
239
240        Err(PubsubClientError::UnexpectedMessageError(format!(
241            "msg={message_text}"
242        )))
243    }
244
245    /// Shutdown the internel message receiver and wait for its thread to exit.
246    ///
247    /// Note that this will block as long as the subscription receiver is
248    /// waiting on messages from the server, and this can take an unbounded
249    /// amount of time if the server does not send any messages.
250    ///
251    /// If a pubsub client needs to shutdown reliably it should use
252    /// the async client in [`crate::nonblocking::pubsub_client`].
253    pub fn shutdown(&mut self) -> std::thread::Result<()> {
254        if self.t_cleanup.is_some() {
255            info!("websocket thread - shutting down");
256            self.exit.store(true, Ordering::Relaxed);
257            let x = self.t_cleanup.take().unwrap().join();
258            info!("websocket thread - shut down.");
259            x
260        } else {
261            warn!("websocket thread - already shut down.");
262            Ok(())
263        }
264    }
265}
266
267pub type PubsubLogsClientSubscription = PubsubClientSubscription<RpcResponse<RpcLogsResponse>>;
268pub type LogsSubscription = (
269    PubsubLogsClientSubscription,
270    Receiver<RpcResponse<RpcLogsResponse>>,
271);
272
273pub type PubsubSlotClientSubscription = PubsubClientSubscription<SlotInfo>;
274pub type SlotsSubscription = (PubsubSlotClientSubscription, Receiver<SlotInfo>);
275
276pub type PubsubSignatureClientSubscription =
277    PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
278pub type SignatureSubscription = (
279    PubsubSignatureClientSubscription,
280    Receiver<RpcResponse<RpcSignatureResult>>,
281);
282
283pub type PubsubBlockClientSubscription = PubsubClientSubscription<RpcResponse<RpcBlockUpdate>>;
284pub type BlockSubscription = (
285    PubsubBlockClientSubscription,
286    Receiver<RpcResponse<RpcBlockUpdate>>,
287);
288
289pub type PubsubProgramClientSubscription = PubsubClientSubscription<RpcResponse<RpcKeyedAccount>>;
290pub type ProgramSubscription = (
291    PubsubProgramClientSubscription,
292    Receiver<RpcResponse<RpcKeyedAccount>>,
293);
294
295pub type PubsubAccountClientSubscription = PubsubClientSubscription<RpcResponse<UiAccount>>;
296pub type AccountSubscription = (
297    PubsubAccountClientSubscription,
298    Receiver<RpcResponse<UiAccount>>,
299);
300
301pub type PubsubVoteClientSubscription = PubsubClientSubscription<RpcVote>;
302pub type VoteSubscription = (PubsubVoteClientSubscription, Receiver<RpcVote>);
303
304pub type PubsubRootClientSubscription = PubsubClientSubscription<Slot>;
305pub type RootSubscription = (PubsubRootClientSubscription, Receiver<Slot>);
306
307/// A client for subscribing to messages from the RPC server.
308///
309/// See the [module documentation][self].
310pub struct PubsubClient {}
311
312fn connect_with_retry<R: IntoClientRequest>(
313    request: R,
314) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Box<tungstenite::Error>> {
315    let mut connection_retries = 5;
316    let client_request = request.into_client_request().map_err(Box::new)?;
317    loop {
318        let result = connect(client_request.clone()).map(|(socket, _)| socket);
319        if let Err(tungstenite::Error::Http(response)) = &result {
320            if response.status() == StatusCode::TOO_MANY_REQUESTS && connection_retries > 0 {
321                let mut duration = Duration::from_millis(500);
322                if let Some(retry_after) = response.headers().get(header::RETRY_AFTER) {
323                    if let Ok(retry_after) = retry_after.to_str() {
324                        if let Ok(retry_after) = retry_after.parse::<u64>() {
325                            if retry_after < 120 {
326                                duration = Duration::from_secs(retry_after);
327                            }
328                        }
329                    }
330                }
331
332                connection_retries -= 1;
333                debug!(
334                    "Too many requests: server responded with {response:?}, {connection_retries} \
335                     retries left, pausing for {duration:?}"
336                );
337
338                sleep(duration);
339                continue;
340            }
341        }
342        return result.map_err(Box::new);
343    }
344}
345
346impl PubsubClient {
347    /// Subscribe to account events.
348    ///
349    /// Receives messages of type [`UiAccount`] when an account's lamports or data changes.
350    ///
351    /// # RPC Reference
352    ///
353    /// This method corresponds directly to the [`accountSubscribe`] RPC method.
354    ///
355    /// [`accountSubscribe`]: https://solana.com/docs/rpc/websocket/accountsubscribe
356    pub fn account_subscribe<R: IntoClientRequest>(
357        request: R,
358        pubkey: &Pubkey,
359        config: Option<RpcAccountInfoConfig>,
360    ) -> Result<AccountSubscription, PubsubClientError> {
361        let client_request = request.into_client_request().map_err(Box::new)?;
362        let socket = connect_with_retry(client_request)?;
363        let (sender, receiver) = unbounded();
364
365        let socket = Arc::new(RwLock::new(socket));
366        let socket_clone = socket.clone();
367        let exit = Arc::new(AtomicBool::new(false));
368        let exit_clone = exit.clone();
369        let body = json!({
370            "jsonrpc":"2.0",
371            "id":1,
372            "method":"accountSubscribe",
373            "params":[
374                pubkey.to_string(),
375                config
376            ]
377        })
378        .to_string();
379        let subscription_id = PubsubAccountClientSubscription::send_subscribe(&socket_clone, body)?;
380
381        let t_cleanup = std::thread::spawn(move || {
382            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
383        });
384
385        let result = PubsubClientSubscription {
386            message_type: PhantomData,
387            operation: "account",
388            socket,
389            subscription_id,
390            t_cleanup: Some(t_cleanup),
391            exit,
392        };
393
394        Ok((result, receiver))
395    }
396
397    /// Subscribe to block events.
398    ///
399    /// Receives messages of type [`RpcBlockUpdate`] when a block is confirmed or finalized.
400    ///
401    /// This method is disabled by default. It can be enabled by passing
402    /// `--rpc-pubsub-enable-block-subscription` to `agave-validator`.
403    ///
404    /// # RPC Reference
405    ///
406    /// This method corresponds directly to the [`blockSubscribe`] RPC method.
407    ///
408    /// [`blockSubscribe`]: https://solana.com/docs/rpc/websocket/blocksubscribe
409    pub fn block_subscribe<R: IntoClientRequest>(
410        request: R,
411        filter: RpcBlockSubscribeFilter,
412        config: Option<RpcBlockSubscribeConfig>,
413    ) -> Result<BlockSubscription, PubsubClientError> {
414        let client_request = request.into_client_request().map_err(Box::new)?;
415        let socket = connect_with_retry(client_request)?;
416        let (sender, receiver) = unbounded();
417
418        let socket = Arc::new(RwLock::new(socket));
419        let socket_clone = socket.clone();
420        let exit = Arc::new(AtomicBool::new(false));
421        let exit_clone = exit.clone();
422        let body = json!({
423            "jsonrpc":"2.0",
424            "id":1,
425            "method":"blockSubscribe",
426            "params":[filter, config]
427        })
428        .to_string();
429
430        let subscription_id = PubsubBlockClientSubscription::send_subscribe(&socket_clone, body)?;
431
432        let t_cleanup = std::thread::spawn(move || {
433            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
434        });
435
436        let result = PubsubClientSubscription {
437            message_type: PhantomData,
438            operation: "block",
439            socket,
440            subscription_id,
441            t_cleanup: Some(t_cleanup),
442            exit,
443        };
444
445        Ok((result, receiver))
446    }
447
448    /// Subscribe to transaction log events.
449    ///
450    /// Receives messages of type [`RpcLogsResponse`] when a transaction is committed.
451    ///
452    /// # RPC Reference
453    ///
454    /// This method corresponds directly to the [`logsSubscribe`] RPC method.
455    ///
456    /// [`logsSubscribe`]: https://solana.com/docs/rpc/websocket/logssubscribe
457    pub fn logs_subscribe<R: IntoClientRequest>(
458        request: R,
459        filter: RpcTransactionLogsFilter,
460        config: RpcTransactionLogsConfig,
461    ) -> Result<LogsSubscription, PubsubClientError> {
462        let client_request = request.into_client_request().map_err(Box::new)?;
463        let socket = connect_with_retry(client_request)?;
464        let (sender, receiver) = unbounded();
465
466        let socket = Arc::new(RwLock::new(socket));
467        let socket_clone = socket.clone();
468        let exit = Arc::new(AtomicBool::new(false));
469        let exit_clone = exit.clone();
470        let body = json!({
471            "jsonrpc":"2.0",
472            "id":1,
473            "method":"logsSubscribe",
474            "params":[filter, config]
475        })
476        .to_string();
477
478        let subscription_id = PubsubLogsClientSubscription::send_subscribe(&socket_clone, body)?;
479
480        let t_cleanup = std::thread::spawn(move || {
481            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
482        });
483
484        let result = PubsubClientSubscription {
485            message_type: PhantomData,
486            operation: "logs",
487            socket,
488            subscription_id,
489            t_cleanup: Some(t_cleanup),
490            exit,
491        };
492
493        Ok((result, receiver))
494    }
495
496    /// Subscribe to program account events.
497    ///
498    /// Receives messages of type [`RpcKeyedAccount`] when an account owned
499    /// by the given program changes.
500    ///
501    /// # RPC Reference
502    ///
503    /// This method corresponds directly to the [`programSubscribe`] RPC method.
504    ///
505    /// [`programSubscribe`]: https://solana.com/docs/rpc/websocket/programsubscribe
506    pub fn program_subscribe<R: IntoClientRequest>(
507        request: R,
508        pubkey: &Pubkey,
509        config: Option<RpcProgramAccountsConfig>,
510    ) -> Result<ProgramSubscription, PubsubClientError> {
511        let client_request = request.into_client_request().map_err(Box::new)?;
512        let socket = connect_with_retry(client_request)?;
513        let (sender, receiver) = unbounded();
514
515        let socket = Arc::new(RwLock::new(socket));
516        let socket_clone = socket.clone();
517        let exit = Arc::new(AtomicBool::new(false));
518        let exit_clone = exit.clone();
519
520        let body = json!({
521            "jsonrpc":"2.0",
522            "id":1,
523            "method":"programSubscribe",
524            "params":[
525                pubkey.to_string(),
526                config
527            ]
528        })
529        .to_string();
530        let subscription_id = PubsubProgramClientSubscription::send_subscribe(&socket_clone, body)?;
531
532        let t_cleanup = std::thread::spawn(move || {
533            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
534        });
535
536        let result = PubsubClientSubscription {
537            message_type: PhantomData,
538            operation: "program",
539            socket,
540            subscription_id,
541            t_cleanup: Some(t_cleanup),
542            exit,
543        };
544
545        Ok((result, receiver))
546    }
547
548    /// Subscribe to vote events.
549    ///
550    /// Receives messages of type [`RpcVote`] when a new vote is observed. These
551    /// votes are observed prior to confirmation and may never be confirmed.
552    ///
553    /// This method is disabled by default. It can be enabled by passing
554    /// `--rpc-pubsub-enable-vote-subscription` to `agave-validator`.
555    ///
556    /// # RPC Reference
557    ///
558    /// This method corresponds directly to the [`voteSubscribe`] RPC method.
559    ///
560    /// [`voteSubscribe`]: https://solana.com/docs/rpc/websocket/votesubscribe
561    pub fn vote_subscribe<R: IntoClientRequest>(
562        request: R,
563    ) -> Result<VoteSubscription, PubsubClientError> {
564        let client_request = request.into_client_request().map_err(Box::new)?;
565        let socket = connect_with_retry(client_request)?;
566        let (sender, receiver) = unbounded();
567
568        let socket = Arc::new(RwLock::new(socket));
569        let socket_clone = socket.clone();
570        let exit = Arc::new(AtomicBool::new(false));
571        let exit_clone = exit.clone();
572        let body = json!({
573            "jsonrpc":"2.0",
574            "id":1,
575            "method":"voteSubscribe",
576        })
577        .to_string();
578        let subscription_id = PubsubVoteClientSubscription::send_subscribe(&socket_clone, body)?;
579
580        let t_cleanup = std::thread::spawn(move || {
581            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
582        });
583
584        let result = PubsubClientSubscription {
585            message_type: PhantomData,
586            operation: "vote",
587            socket,
588            subscription_id,
589            t_cleanup: Some(t_cleanup),
590            exit,
591        };
592
593        Ok((result, receiver))
594    }
595
596    /// Subscribe to root events.
597    ///
598    /// Receives messages of type [`Slot`] when a new [root] is set by the
599    /// validator.
600    ///
601    /// [root]: https://solana.com/docs/terminology#root
602    ///
603    /// # RPC Reference
604    ///
605    /// This method corresponds directly to the [`rootSubscribe`] RPC method.
606    ///
607    /// [`rootSubscribe`]: https://solana.com/docs/rpc/websocket/rootsubscribe
608    pub fn root_subscribe<R: IntoClientRequest>(
609        request: R,
610    ) -> Result<RootSubscription, PubsubClientError> {
611        let client_request = request.into_client_request().map_err(Box::new)?;
612        let socket = connect_with_retry(client_request)?;
613        let (sender, receiver) = unbounded();
614
615        let socket = Arc::new(RwLock::new(socket));
616        let socket_clone = socket.clone();
617        let exit = Arc::new(AtomicBool::new(false));
618        let exit_clone = exit.clone();
619        let body = json!({
620            "jsonrpc":"2.0",
621            "id":1,
622            "method":"rootSubscribe",
623        })
624        .to_string();
625        let subscription_id = PubsubRootClientSubscription::send_subscribe(&socket_clone, body)?;
626
627        let t_cleanup = std::thread::spawn(move || {
628            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
629        });
630
631        let result = PubsubClientSubscription {
632            message_type: PhantomData,
633            operation: "root",
634            socket,
635            subscription_id,
636            t_cleanup: Some(t_cleanup),
637            exit,
638        };
639
640        Ok((result, receiver))
641    }
642
643    /// Subscribe to transaction confirmation events.
644    ///
645    /// Receives messages of type [`RpcSignatureResult`] when a transaction
646    /// with the given signature is committed.
647    ///
648    /// This is a subscription to a single notification. It is automatically
649    /// cancelled by the server once the notification is sent.
650    ///
651    /// # RPC Reference
652    ///
653    /// This method corresponds directly to the [`signatureSubscribe`] RPC method.
654    ///
655    /// [`signatureSubscribe`]: https://solana.com/docs/rpc/websocket/signaturesubscribe
656    pub fn signature_subscribe<R: IntoClientRequest>(
657        request: R,
658        signature: &Signature,
659        config: Option<RpcSignatureSubscribeConfig>,
660    ) -> Result<SignatureSubscription, PubsubClientError> {
661        let client_request = request.into_client_request().map_err(Box::new)?;
662        let socket = connect_with_retry(client_request)?;
663        let (sender, receiver) = unbounded();
664
665        let socket = Arc::new(RwLock::new(socket));
666        let socket_clone = socket.clone();
667        let exit = Arc::new(AtomicBool::new(false));
668        let exit_clone = exit.clone();
669        let body = json!({
670            "jsonrpc":"2.0",
671            "id":1,
672            "method":"signatureSubscribe",
673            "params":[
674                signature.to_string(),
675                config
676            ]
677        })
678        .to_string();
679        let subscription_id =
680            PubsubSignatureClientSubscription::send_subscribe(&socket_clone, body)?;
681
682        let t_cleanup = std::thread::spawn(move || {
683            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
684        });
685
686        let result = PubsubClientSubscription {
687            message_type: PhantomData,
688            operation: "signature",
689            socket,
690            subscription_id,
691            t_cleanup: Some(t_cleanup),
692            exit,
693        };
694
695        Ok((result, receiver))
696    }
697
698    /// Subscribe to slot events.
699    ///
700    /// Receives messages of type [`SlotInfo`] when a slot is processed.
701    ///
702    /// # RPC Reference
703    ///
704    /// This method corresponds directly to the [`slotSubscribe`] RPC method.
705    ///
706    /// [`slotSubscribe`]: https://solana.com/docs/rpc/websocket/slotsubscribe
707    pub fn slot_subscribe<R: IntoClientRequest>(
708        request: R,
709    ) -> Result<SlotsSubscription, PubsubClientError> {
710        let client_request = request.into_client_request().map_err(Box::new)?;
711        let socket = connect_with_retry(client_request)?;
712        let (sender, receiver) = unbounded::<SlotInfo>();
713
714        let socket = Arc::new(RwLock::new(socket));
715        let socket_clone = socket.clone();
716        let exit = Arc::new(AtomicBool::new(false));
717        let exit_clone = exit.clone();
718        let body = json!({
719            "jsonrpc":"2.0",
720            "id":1,
721            "method":"slotSubscribe",
722            "params":[]
723        })
724        .to_string();
725        let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket_clone, body)?;
726
727        let t_cleanup = std::thread::spawn(move || {
728            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
729        });
730
731        let result = PubsubClientSubscription {
732            message_type: PhantomData,
733            operation: "slot",
734            socket,
735            subscription_id,
736            t_cleanup: Some(t_cleanup),
737            exit,
738        };
739
740        Ok((result, receiver))
741    }
742
743    /// Subscribe to slot update events.
744    ///
745    /// Receives messages of type [`SlotUpdate`] when various updates to a slot occur.
746    ///
747    /// Note that this method operates differently than other subscriptions:
748    /// instead of sending the message to a receiver on a channel, it accepts a
749    /// `handler` callback that processes the message directly. This processing
750    /// occurs on another thread.
751    ///
752    /// # RPC Reference
753    ///
754    /// This method corresponds directly to the [`slotUpdatesSubscribe`] RPC method.
755    ///
756    /// [`slotUpdatesSubscribe`]: https://solana.com/docs/rpc/websocket/slotsupdatessubscribe
757    pub fn slot_updates_subscribe<R: IntoClientRequest>(
758        request: R,
759        handler: impl Fn(SlotUpdate) + Send + 'static,
760    ) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
761        let client_request = request.into_client_request().map_err(Box::new)?;
762        let socket = connect_with_retry(client_request)?;
763
764        let socket = Arc::new(RwLock::new(socket));
765        let socket_clone = socket.clone();
766        let exit = Arc::new(AtomicBool::new(false));
767        let exit_clone = exit.clone();
768        let body = json!({
769            "jsonrpc":"2.0",
770            "id":1,
771            "method":"slotsUpdatesSubscribe",
772            "params":[]
773        })
774        .to_string();
775        let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket, body)?;
776
777        let t_cleanup = std::thread::spawn(move || {
778            Self::cleanup_with_handler(exit_clone, &socket_clone, handler)
779        });
780
781        Ok(PubsubClientSubscription {
782            message_type: PhantomData,
783            operation: "slotsUpdates",
784            socket,
785            subscription_id,
786            t_cleanup: Some(t_cleanup),
787            exit,
788        })
789    }
790
791    fn cleanup_with_sender<T>(
792        exit: Arc<AtomicBool>,
793        socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
794        sender: Sender<T>,
795    ) where
796        T: DeserializeOwned + Send + 'static,
797    {
798        let handler = move |message| match sender.send(message) {
799            Ok(_) => (),
800            Err(err) => {
801                info!("receive error: {err:?}");
802            }
803        };
804        Self::cleanup_with_handler(exit, socket, handler);
805    }
806
807    fn cleanup_with_handler<T, F>(
808        exit: Arc<AtomicBool>,
809        socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
810        handler: F,
811    ) where
812        T: DeserializeOwned,
813        F: Fn(T) + Send + 'static,
814    {
815        loop {
816            if exit.load(Ordering::Relaxed) {
817                break;
818            }
819
820            match PubsubClientSubscription::read_message(socket) {
821                Ok(Some(message)) => handler(message),
822                Ok(None) => {
823                    // Nothing useful, means we received a ping message
824                }
825                Err(err) => {
826                    info!("receive error: {err:?}");
827                    break;
828                }
829            }
830        }
831
832        info!("websocket - exited receive loop");
833    }
834}
835
836#[cfg(test)]
837mod tests {
838    // see client-test/test/client.rs
839}