1pub use crate::nonblocking::pubsub_client::PubsubClientError;
90use {
91 crossbeam_channel::{unbounded, Receiver, Sender},
92 log::*,
93 serde::de::DeserializeOwned,
94 serde_json::{
95 json,
96 value::Value::{Number, Object},
97 Map, Value,
98 },
99 solana_account_decoder_client_types::UiAccount,
100 solana_clock::Slot,
101 solana_pubkey::Pubkey,
102 solana_rpc_client_types::{
103 config::{
104 RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
105 RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
106 RpcTransactionLogsFilter,
107 },
108 response::{
109 Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
110 RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
111 },
112 },
113 solana_signature::Signature,
114 std::{
115 marker::PhantomData,
116 net::TcpStream,
117 sync::{
118 atomic::{AtomicBool, Ordering},
119 Arc, RwLock,
120 },
121 thread::{sleep, JoinHandle},
122 time::Duration,
123 },
124 tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
125 url::Url,
126};
127
128pub struct PubsubClientSubscription<T>
134where
135 T: DeserializeOwned,
136{
137 message_type: PhantomData<T>,
138 operation: &'static str,
139 socket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
140 subscription_id: u64,
141 t_cleanup: Option<JoinHandle<()>>,
142 exit: Arc<AtomicBool>,
143}
144
145impl<T> Drop for PubsubClientSubscription<T>
146where
147 T: DeserializeOwned,
148{
149 fn drop(&mut self) {
150 self.send_unsubscribe()
151 .unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
152 self.socket
153 .write()
154 .unwrap()
155 .close(None)
156 .unwrap_or_else(|_| warn!("unable to close websocket"));
157 }
158}
159
160impl<T> PubsubClientSubscription<T>
161where
162 T: DeserializeOwned,
163{
164 fn send_subscribe(
165 writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
166 body: String,
167 ) -> Result<u64, PubsubClientError> {
168 writable_socket.write().unwrap().send(Message::Text(body))?;
169 let message = writable_socket.write().unwrap().read()?;
170 Self::extract_subscription_id(message)
171 }
172
173 fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
174 let message_text = &message.into_text()?;
175
176 if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
177 if let Some(Number(x)) = json_msg.get("result") {
178 if let Some(x) = x.as_u64() {
179 return Ok(x);
180 }
181 }
182 }
183
184 Err(PubsubClientError::UnexpectedSubscriptionResponse(format!(
185 "msg={message_text}"
186 )))
187 }
188
189 pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
198 let method = format!("{}Unsubscribe", self.operation);
199 self.socket
200 .write()
201 .unwrap()
202 .send(Message::Text(
203 json!({
204 "jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
205 })
206 .to_string(),
207 ))
208 .map_err(|err| err.into())
209 }
210
211 fn read_message(
212 writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
213 ) -> Result<Option<T>, PubsubClientError> {
214 let message = writable_socket.write().unwrap().read()?;
215 if message.is_ping() {
216 return Ok(None);
217 }
218 let message_text = &message.into_text()?;
219 if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
220 if let Some(Object(params)) = json_msg.get("params") {
221 if let Some(result) = params.get("result") {
222 if let Ok(x) = serde_json::from_value::<T>(result.clone()) {
223 return Ok(Some(x));
224 }
225 }
226 }
227 }
228
229 Err(PubsubClientError::UnexpectedMessageError(format!(
230 "msg={message_text}"
231 )))
232 }
233
234 pub fn shutdown(&mut self) -> std::thread::Result<()> {
243 if self.t_cleanup.is_some() {
244 info!("websocket thread - shutting down");
245 self.exit.store(true, Ordering::Relaxed);
246 let x = self.t_cleanup.take().unwrap().join();
247 info!("websocket thread - shut down.");
248 x
249 } else {
250 warn!("websocket thread - already shut down.");
251 Ok(())
252 }
253 }
254}
255
256pub type PubsubLogsClientSubscription = PubsubClientSubscription<RpcResponse<RpcLogsResponse>>;
257pub type LogsSubscription = (
258 PubsubLogsClientSubscription,
259 Receiver<RpcResponse<RpcLogsResponse>>,
260);
261
262pub type PubsubSlotClientSubscription = PubsubClientSubscription<SlotInfo>;
263pub type SlotsSubscription = (PubsubSlotClientSubscription, Receiver<SlotInfo>);
264
265pub type PubsubSignatureClientSubscription =
266 PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
267pub type SignatureSubscription = (
268 PubsubSignatureClientSubscription,
269 Receiver<RpcResponse<RpcSignatureResult>>,
270);
271
272pub type PubsubBlockClientSubscription = PubsubClientSubscription<RpcResponse<RpcBlockUpdate>>;
273pub type BlockSubscription = (
274 PubsubBlockClientSubscription,
275 Receiver<RpcResponse<RpcBlockUpdate>>,
276);
277
278pub type PubsubProgramClientSubscription = PubsubClientSubscription<RpcResponse<RpcKeyedAccount>>;
279pub type ProgramSubscription = (
280 PubsubProgramClientSubscription,
281 Receiver<RpcResponse<RpcKeyedAccount>>,
282);
283
284pub type PubsubAccountClientSubscription = PubsubClientSubscription<RpcResponse<UiAccount>>;
285pub type AccountSubscription = (
286 PubsubAccountClientSubscription,
287 Receiver<RpcResponse<UiAccount>>,
288);
289
290pub type PubsubVoteClientSubscription = PubsubClientSubscription<RpcVote>;
291pub type VoteSubscription = (PubsubVoteClientSubscription, Receiver<RpcVote>);
292
293pub type PubsubRootClientSubscription = PubsubClientSubscription<Slot>;
294pub type RootSubscription = (PubsubRootClientSubscription, Receiver<Slot>);
295
296pub struct PubsubClient {}
300
301fn connect_with_retry(
302 url: Url,
303) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Error> {
304 let mut connection_retries = 5;
305 loop {
306 let result = connect(url.clone()).map(|(socket, _)| socket);
307 if let Err(tungstenite::Error::Http(response)) = &result {
308 if response.status() == http::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0 {
309 let mut duration = Duration::from_millis(500);
310 if let Some(retry_after) = response.headers().get(http::header::RETRY_AFTER) {
311 if let Ok(retry_after) = retry_after.to_str() {
312 if let Ok(retry_after) = retry_after.parse::<u64>() {
313 if retry_after < 120 {
314 duration = Duration::from_secs(retry_after);
315 }
316 }
317 }
318 }
319
320 connection_retries -= 1;
321 debug!(
322 "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
323 response, connection_retries, duration
324 );
325
326 sleep(duration);
327 continue;
328 }
329 }
330 return result;
331 }
332}
333
334impl PubsubClient {
335 pub fn account_subscribe(
345 url: &str,
346 pubkey: &Pubkey,
347 config: Option<RpcAccountInfoConfig>,
348 ) -> Result<AccountSubscription, PubsubClientError> {
349 let url = Url::parse(url)?;
350 let socket = connect_with_retry(url)?;
351 let (sender, receiver) = unbounded();
352
353 let socket = Arc::new(RwLock::new(socket));
354 let socket_clone = socket.clone();
355 let exit = Arc::new(AtomicBool::new(false));
356 let exit_clone = exit.clone();
357 let body = json!({
358 "jsonrpc":"2.0",
359 "id":1,
360 "method":"accountSubscribe",
361 "params":[
362 pubkey.to_string(),
363 config
364 ]
365 })
366 .to_string();
367 let subscription_id = PubsubAccountClientSubscription::send_subscribe(&socket_clone, body)?;
368
369 let t_cleanup = std::thread::spawn(move || {
370 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
371 });
372
373 let result = PubsubClientSubscription {
374 message_type: PhantomData,
375 operation: "account",
376 socket,
377 subscription_id,
378 t_cleanup: Some(t_cleanup),
379 exit,
380 };
381
382 Ok((result, receiver))
383 }
384
385 pub fn block_subscribe(
398 url: &str,
399 filter: RpcBlockSubscribeFilter,
400 config: Option<RpcBlockSubscribeConfig>,
401 ) -> Result<BlockSubscription, PubsubClientError> {
402 let url = Url::parse(url)?;
403 let socket = connect_with_retry(url)?;
404 let (sender, receiver) = unbounded();
405
406 let socket = Arc::new(RwLock::new(socket));
407 let socket_clone = socket.clone();
408 let exit = Arc::new(AtomicBool::new(false));
409 let exit_clone = exit.clone();
410 let body = json!({
411 "jsonrpc":"2.0",
412 "id":1,
413 "method":"blockSubscribe",
414 "params":[filter, config]
415 })
416 .to_string();
417
418 let subscription_id = PubsubBlockClientSubscription::send_subscribe(&socket_clone, body)?;
419
420 let t_cleanup = std::thread::spawn(move || {
421 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
422 });
423
424 let result = PubsubClientSubscription {
425 message_type: PhantomData,
426 operation: "block",
427 socket,
428 subscription_id,
429 t_cleanup: Some(t_cleanup),
430 exit,
431 };
432
433 Ok((result, receiver))
434 }
435
436 pub fn logs_subscribe(
446 url: &str,
447 filter: RpcTransactionLogsFilter,
448 config: RpcTransactionLogsConfig,
449 ) -> Result<LogsSubscription, PubsubClientError> {
450 let url = Url::parse(url)?;
451 let socket = connect_with_retry(url)?;
452 let (sender, receiver) = unbounded();
453
454 let socket = Arc::new(RwLock::new(socket));
455 let socket_clone = socket.clone();
456 let exit = Arc::new(AtomicBool::new(false));
457 let exit_clone = exit.clone();
458 let body = json!({
459 "jsonrpc":"2.0",
460 "id":1,
461 "method":"logsSubscribe",
462 "params":[filter, config]
463 })
464 .to_string();
465
466 let subscription_id = PubsubLogsClientSubscription::send_subscribe(&socket_clone, body)?;
467
468 let t_cleanup = std::thread::spawn(move || {
469 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
470 });
471
472 let result = PubsubClientSubscription {
473 message_type: PhantomData,
474 operation: "logs",
475 socket,
476 subscription_id,
477 t_cleanup: Some(t_cleanup),
478 exit,
479 };
480
481 Ok((result, receiver))
482 }
483
484 pub fn program_subscribe(
495 url: &str,
496 pubkey: &Pubkey,
497 config: Option<RpcProgramAccountsConfig>,
498 ) -> Result<ProgramSubscription, PubsubClientError> {
499 let url = Url::parse(url)?;
500 let socket = connect_with_retry(url)?;
501 let (sender, receiver) = unbounded();
502
503 let socket = Arc::new(RwLock::new(socket));
504 let socket_clone = socket.clone();
505 let exit = Arc::new(AtomicBool::new(false));
506 let exit_clone = exit.clone();
507
508 let body = json!({
509 "jsonrpc":"2.0",
510 "id":1,
511 "method":"programSubscribe",
512 "params":[
513 pubkey.to_string(),
514 config
515 ]
516 })
517 .to_string();
518 let subscription_id = PubsubProgramClientSubscription::send_subscribe(&socket_clone, body)?;
519
520 let t_cleanup = std::thread::spawn(move || {
521 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
522 });
523
524 let result = PubsubClientSubscription {
525 message_type: PhantomData,
526 operation: "program",
527 socket,
528 subscription_id,
529 t_cleanup: Some(t_cleanup),
530 exit,
531 };
532
533 Ok((result, receiver))
534 }
535
536 pub fn vote_subscribe(url: &str) -> Result<VoteSubscription, PubsubClientError> {
550 let url = Url::parse(url)?;
551 let socket = connect_with_retry(url)?;
552 let (sender, receiver) = unbounded();
553
554 let socket = Arc::new(RwLock::new(socket));
555 let socket_clone = socket.clone();
556 let exit = Arc::new(AtomicBool::new(false));
557 let exit_clone = exit.clone();
558 let body = json!({
559 "jsonrpc":"2.0",
560 "id":1,
561 "method":"voteSubscribe",
562 })
563 .to_string();
564 let subscription_id = PubsubVoteClientSubscription::send_subscribe(&socket_clone, body)?;
565
566 let t_cleanup = std::thread::spawn(move || {
567 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
568 });
569
570 let result = PubsubClientSubscription {
571 message_type: PhantomData,
572 operation: "vote",
573 socket,
574 subscription_id,
575 t_cleanup: Some(t_cleanup),
576 exit,
577 };
578
579 Ok((result, receiver))
580 }
581
582 pub fn root_subscribe(url: &str) -> Result<RootSubscription, PubsubClientError> {
595 let url = Url::parse(url)?;
596 let socket = connect_with_retry(url)?;
597 let (sender, receiver) = unbounded();
598
599 let socket = Arc::new(RwLock::new(socket));
600 let socket_clone = socket.clone();
601 let exit = Arc::new(AtomicBool::new(false));
602 let exit_clone = exit.clone();
603 let body = json!({
604 "jsonrpc":"2.0",
605 "id":1,
606 "method":"rootSubscribe",
607 })
608 .to_string();
609 let subscription_id = PubsubRootClientSubscription::send_subscribe(&socket_clone, body)?;
610
611 let t_cleanup = std::thread::spawn(move || {
612 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
613 });
614
615 let result = PubsubClientSubscription {
616 message_type: PhantomData,
617 operation: "root",
618 socket,
619 subscription_id,
620 t_cleanup: Some(t_cleanup),
621 exit,
622 };
623
624 Ok((result, receiver))
625 }
626
627 pub fn signature_subscribe(
641 url: &str,
642 signature: &Signature,
643 config: Option<RpcSignatureSubscribeConfig>,
644 ) -> Result<SignatureSubscription, PubsubClientError> {
645 let url = Url::parse(url)?;
646 let socket = connect_with_retry(url)?;
647 let (sender, receiver) = unbounded();
648
649 let socket = Arc::new(RwLock::new(socket));
650 let socket_clone = socket.clone();
651 let exit = Arc::new(AtomicBool::new(false));
652 let exit_clone = exit.clone();
653 let body = json!({
654 "jsonrpc":"2.0",
655 "id":1,
656 "method":"signatureSubscribe",
657 "params":[
658 signature.to_string(),
659 config
660 ]
661 })
662 .to_string();
663 let subscription_id =
664 PubsubSignatureClientSubscription::send_subscribe(&socket_clone, body)?;
665
666 let t_cleanup = std::thread::spawn(move || {
667 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
668 });
669
670 let result = PubsubClientSubscription {
671 message_type: PhantomData,
672 operation: "signature",
673 socket,
674 subscription_id,
675 t_cleanup: Some(t_cleanup),
676 exit,
677 };
678
679 Ok((result, receiver))
680 }
681
682 pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
692 let url = Url::parse(url)?;
693 let socket = connect_with_retry(url)?;
694 let (sender, receiver) = unbounded::<SlotInfo>();
695
696 let socket = Arc::new(RwLock::new(socket));
697 let socket_clone = socket.clone();
698 let exit = Arc::new(AtomicBool::new(false));
699 let exit_clone = exit.clone();
700 let body = json!({
701 "jsonrpc":"2.0",
702 "id":1,
703 "method":"slotSubscribe",
704 "params":[]
705 })
706 .to_string();
707 let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket_clone, body)?;
708
709 let t_cleanup = std::thread::spawn(move || {
710 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
711 });
712
713 let result = PubsubClientSubscription {
714 message_type: PhantomData,
715 operation: "slot",
716 socket,
717 subscription_id,
718 t_cleanup: Some(t_cleanup),
719 exit,
720 };
721
722 Ok((result, receiver))
723 }
724
725 pub fn slot_updates_subscribe(
740 url: &str,
741 handler: impl Fn(SlotUpdate) + Send + 'static,
742 ) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
743 let url = Url::parse(url)?;
744 let socket = connect_with_retry(url)?;
745
746 let socket = Arc::new(RwLock::new(socket));
747 let socket_clone = socket.clone();
748 let exit = Arc::new(AtomicBool::new(false));
749 let exit_clone = exit.clone();
750 let body = json!({
751 "jsonrpc":"2.0",
752 "id":1,
753 "method":"slotsUpdatesSubscribe",
754 "params":[]
755 })
756 .to_string();
757 let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket, body)?;
758
759 let t_cleanup = std::thread::spawn(move || {
760 Self::cleanup_with_handler(exit_clone, &socket_clone, handler)
761 });
762
763 Ok(PubsubClientSubscription {
764 message_type: PhantomData,
765 operation: "slotsUpdates",
766 socket,
767 subscription_id,
768 t_cleanup: Some(t_cleanup),
769 exit,
770 })
771 }
772
773 fn cleanup_with_sender<T>(
774 exit: Arc<AtomicBool>,
775 socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
776 sender: Sender<T>,
777 ) where
778 T: DeserializeOwned + Send + 'static,
779 {
780 let handler = move |message| match sender.send(message) {
781 Ok(_) => (),
782 Err(err) => {
783 info!("receive error: {:?}", err);
784 }
785 };
786 Self::cleanup_with_handler(exit, socket, handler);
787 }
788
789 fn cleanup_with_handler<T, F>(
790 exit: Arc<AtomicBool>,
791 socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
792 handler: F,
793 ) where
794 T: DeserializeOwned,
795 F: Fn(T) + Send + 'static,
796 {
797 loop {
798 if exit.load(Ordering::Relaxed) {
799 break;
800 }
801
802 match PubsubClientSubscription::read_message(socket) {
803 Ok(Some(message)) => handler(message),
804 Ok(None) => {
805 }
807 Err(err) => {
808 info!("receive error: {:?}", err);
809 break;
810 }
811 }
812 }
813
814 info!("websocket - exited receive loop");
815 }
816}
817
818#[cfg(test)]
819mod tests {
820 }