1pub use crate::nonblocking::pubsub_client::PubsubClientError;
90use {
91 crossbeam_channel::{unbounded, Receiver, Sender},
92 log::*,
93 serde::de::DeserializeOwned,
94 serde_json::{
95 json,
96 value::Value::{Number, Object},
97 Map, Value,
98 },
99 solana_account_decoder_client_types::UiAccount,
100 solana_clock::Slot,
101 solana_pubkey::Pubkey,
102 solana_rpc_client_types::{
103 config::{
104 RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
105 RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
106 RpcTransactionLogsFilter,
107 },
108 response::{
109 Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
110 RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
111 },
112 },
113 solana_signature::Signature,
114 std::{
115 marker::PhantomData,
116 net::TcpStream,
117 sync::{
118 atomic::{AtomicBool, Ordering},
119 Arc, RwLock,
120 },
121 thread::{sleep, JoinHandle},
122 time::Duration,
123 },
124 tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
125 url::Url,
126};
127
128pub struct PubsubClientSubscription<T>
134where
135 T: DeserializeOwned,
136{
137 message_type: PhantomData<T>,
138 operation: &'static str,
139 socket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
140 subscription_id: u64,
141 t_cleanup: Option<JoinHandle<()>>,
142 exit: Arc<AtomicBool>,
143}
144
145impl<T> Drop for PubsubClientSubscription<T>
146where
147 T: DeserializeOwned,
148{
149 fn drop(&mut self) {
150 self.send_unsubscribe()
151 .unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
152 self.socket
153 .write()
154 .unwrap()
155 .close(None)
156 .unwrap_or_else(|_| warn!("unable to close websocket"));
157 }
158}
159
160impl<T> PubsubClientSubscription<T>
161where
162 T: DeserializeOwned,
163{
164 fn send_subscribe(
165 writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
166 body: String,
167 ) -> Result<u64, PubsubClientError> {
168 writable_socket
169 .write()
170 .unwrap()
171 .send(Message::Text(body))
172 .map_err(Box::new)?;
173 let message = writable_socket.write().unwrap().read().map_err(Box::new)?;
174 Self::extract_subscription_id(message)
175 }
176
177 fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
178 let message_text = &message.into_text().map_err(Box::new)?;
179
180 if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
181 if let Some(Number(x)) = json_msg.get("result") {
182 if let Some(x) = x.as_u64() {
183 return Ok(x);
184 }
185 }
186 }
187
188 Err(PubsubClientError::UnexpectedSubscriptionResponse(format!(
189 "msg={message_text}"
190 )))
191 }
192
193 pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
202 let method = format!("{}Unsubscribe", self.operation);
203 self.socket
204 .write()
205 .unwrap()
206 .send(Message::Text(
207 json!({
208 "jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
209 })
210 .to_string(),
211 ))
212 .map_err(Box::new)
213 .map_err(|err| err.into())
214 }
215
216 fn read_message(
217 writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
218 ) -> Result<Option<T>, PubsubClientError> {
219 let message = writable_socket.write().unwrap().read().map_err(Box::new)?;
220 if message.is_ping() {
221 return Ok(None);
222 }
223 let message_text = &message.into_text().map_err(Box::new)?;
224 if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
225 if let Some(Object(params)) = json_msg.get("params") {
226 if let Some(result) = params.get("result") {
227 if let Ok(x) = serde_json::from_value::<T>(result.clone()) {
228 return Ok(Some(x));
229 }
230 }
231 }
232 }
233
234 Err(PubsubClientError::UnexpectedMessageError(format!(
235 "msg={message_text}"
236 )))
237 }
238
239 pub fn shutdown(&mut self) -> std::thread::Result<()> {
248 if self.t_cleanup.is_some() {
249 info!("websocket thread - shutting down");
250 self.exit.store(true, Ordering::Relaxed);
251 let x = self.t_cleanup.take().unwrap().join();
252 info!("websocket thread - shut down.");
253 x
254 } else {
255 warn!("websocket thread - already shut down.");
256 Ok(())
257 }
258 }
259}
260
261pub type PubsubLogsClientSubscription = PubsubClientSubscription<RpcResponse<RpcLogsResponse>>;
262pub type LogsSubscription = (
263 PubsubLogsClientSubscription,
264 Receiver<RpcResponse<RpcLogsResponse>>,
265);
266
267pub type PubsubSlotClientSubscription = PubsubClientSubscription<SlotInfo>;
268pub type SlotsSubscription = (PubsubSlotClientSubscription, Receiver<SlotInfo>);
269
270pub type PubsubSignatureClientSubscription =
271 PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
272pub type SignatureSubscription = (
273 PubsubSignatureClientSubscription,
274 Receiver<RpcResponse<RpcSignatureResult>>,
275);
276
277pub type PubsubBlockClientSubscription = PubsubClientSubscription<RpcResponse<RpcBlockUpdate>>;
278pub type BlockSubscription = (
279 PubsubBlockClientSubscription,
280 Receiver<RpcResponse<RpcBlockUpdate>>,
281);
282
283pub type PubsubProgramClientSubscription = PubsubClientSubscription<RpcResponse<RpcKeyedAccount>>;
284pub type ProgramSubscription = (
285 PubsubProgramClientSubscription,
286 Receiver<RpcResponse<RpcKeyedAccount>>,
287);
288
289pub type PubsubAccountClientSubscription = PubsubClientSubscription<RpcResponse<UiAccount>>;
290pub type AccountSubscription = (
291 PubsubAccountClientSubscription,
292 Receiver<RpcResponse<UiAccount>>,
293);
294
295pub type PubsubVoteClientSubscription = PubsubClientSubscription<RpcVote>;
296pub type VoteSubscription = (PubsubVoteClientSubscription, Receiver<RpcVote>);
297
298pub type PubsubRootClientSubscription = PubsubClientSubscription<Slot>;
299pub type RootSubscription = (PubsubRootClientSubscription, Receiver<Slot>);
300
301pub struct PubsubClient {}
305
306fn connect_with_retry(
307 url: Url,
308) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Box<tungstenite::Error>> {
309 let mut connection_retries = 5;
310 loop {
311 let result = connect(url.clone()).map(|(socket, _)| socket);
312 if let Err(tungstenite::Error::Http(response)) = &result {
313 if response.status() == http::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0 {
314 let mut duration = Duration::from_millis(500);
315 if let Some(retry_after) = response.headers().get(http::header::RETRY_AFTER) {
316 if let Ok(retry_after) = retry_after.to_str() {
317 if let Ok(retry_after) = retry_after.parse::<u64>() {
318 if retry_after < 120 {
319 duration = Duration::from_secs(retry_after);
320 }
321 }
322 }
323 }
324
325 connection_retries -= 1;
326 debug!(
327 "Too many requests: server responded with {response:?}, {connection_retries} \
328 retries left, pausing for {duration:?}"
329 );
330
331 sleep(duration);
332 continue;
333 }
334 }
335 return result.map_err(Box::new);
336 }
337}
338
339impl PubsubClient {
340 pub fn account_subscribe(
350 url: &str,
351 pubkey: &Pubkey,
352 config: Option<RpcAccountInfoConfig>,
353 ) -> Result<AccountSubscription, PubsubClientError> {
354 let url = Url::parse(url)?;
355 let socket = connect_with_retry(url)?;
356 let (sender, receiver) = unbounded();
357
358 let socket = Arc::new(RwLock::new(socket));
359 let socket_clone = socket.clone();
360 let exit = Arc::new(AtomicBool::new(false));
361 let exit_clone = exit.clone();
362 let body = json!({
363 "jsonrpc":"2.0",
364 "id":1,
365 "method":"accountSubscribe",
366 "params":[
367 pubkey.to_string(),
368 config
369 ]
370 })
371 .to_string();
372 let subscription_id = PubsubAccountClientSubscription::send_subscribe(&socket_clone, body)?;
373
374 let t_cleanup = std::thread::spawn(move || {
375 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
376 });
377
378 let result = PubsubClientSubscription {
379 message_type: PhantomData,
380 operation: "account",
381 socket,
382 subscription_id,
383 t_cleanup: Some(t_cleanup),
384 exit,
385 };
386
387 Ok((result, receiver))
388 }
389
390 pub fn block_subscribe(
403 url: &str,
404 filter: RpcBlockSubscribeFilter,
405 config: Option<RpcBlockSubscribeConfig>,
406 ) -> Result<BlockSubscription, PubsubClientError> {
407 let url = Url::parse(url)?;
408 let socket = connect_with_retry(url)?;
409 let (sender, receiver) = unbounded();
410
411 let socket = Arc::new(RwLock::new(socket));
412 let socket_clone = socket.clone();
413 let exit = Arc::new(AtomicBool::new(false));
414 let exit_clone = exit.clone();
415 let body = json!({
416 "jsonrpc":"2.0",
417 "id":1,
418 "method":"blockSubscribe",
419 "params":[filter, config]
420 })
421 .to_string();
422
423 let subscription_id = PubsubBlockClientSubscription::send_subscribe(&socket_clone, body)?;
424
425 let t_cleanup = std::thread::spawn(move || {
426 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
427 });
428
429 let result = PubsubClientSubscription {
430 message_type: PhantomData,
431 operation: "block",
432 socket,
433 subscription_id,
434 t_cleanup: Some(t_cleanup),
435 exit,
436 };
437
438 Ok((result, receiver))
439 }
440
441 pub fn logs_subscribe(
451 url: &str,
452 filter: RpcTransactionLogsFilter,
453 config: RpcTransactionLogsConfig,
454 ) -> Result<LogsSubscription, PubsubClientError> {
455 let url = Url::parse(url)?;
456 let socket = connect_with_retry(url)?;
457 let (sender, receiver) = unbounded();
458
459 let socket = Arc::new(RwLock::new(socket));
460 let socket_clone = socket.clone();
461 let exit = Arc::new(AtomicBool::new(false));
462 let exit_clone = exit.clone();
463 let body = json!({
464 "jsonrpc":"2.0",
465 "id":1,
466 "method":"logsSubscribe",
467 "params":[filter, config]
468 })
469 .to_string();
470
471 let subscription_id = PubsubLogsClientSubscription::send_subscribe(&socket_clone, body)?;
472
473 let t_cleanup = std::thread::spawn(move || {
474 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
475 });
476
477 let result = PubsubClientSubscription {
478 message_type: PhantomData,
479 operation: "logs",
480 socket,
481 subscription_id,
482 t_cleanup: Some(t_cleanup),
483 exit,
484 };
485
486 Ok((result, receiver))
487 }
488
489 pub fn program_subscribe(
500 url: &str,
501 pubkey: &Pubkey,
502 config: Option<RpcProgramAccountsConfig>,
503 ) -> Result<ProgramSubscription, PubsubClientError> {
504 let url = Url::parse(url)?;
505 let socket = connect_with_retry(url)?;
506 let (sender, receiver) = unbounded();
507
508 let socket = Arc::new(RwLock::new(socket));
509 let socket_clone = socket.clone();
510 let exit = Arc::new(AtomicBool::new(false));
511 let exit_clone = exit.clone();
512
513 let body = json!({
514 "jsonrpc":"2.0",
515 "id":1,
516 "method":"programSubscribe",
517 "params":[
518 pubkey.to_string(),
519 config
520 ]
521 })
522 .to_string();
523 let subscription_id = PubsubProgramClientSubscription::send_subscribe(&socket_clone, body)?;
524
525 let t_cleanup = std::thread::spawn(move || {
526 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
527 });
528
529 let result = PubsubClientSubscription {
530 message_type: PhantomData,
531 operation: "program",
532 socket,
533 subscription_id,
534 t_cleanup: Some(t_cleanup),
535 exit,
536 };
537
538 Ok((result, receiver))
539 }
540
541 pub fn vote_subscribe(url: &str) -> Result<VoteSubscription, PubsubClientError> {
555 let url = Url::parse(url)?;
556 let socket = connect_with_retry(url)?;
557 let (sender, receiver) = unbounded();
558
559 let socket = Arc::new(RwLock::new(socket));
560 let socket_clone = socket.clone();
561 let exit = Arc::new(AtomicBool::new(false));
562 let exit_clone = exit.clone();
563 let body = json!({
564 "jsonrpc":"2.0",
565 "id":1,
566 "method":"voteSubscribe",
567 })
568 .to_string();
569 let subscription_id = PubsubVoteClientSubscription::send_subscribe(&socket_clone, body)?;
570
571 let t_cleanup = std::thread::spawn(move || {
572 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
573 });
574
575 let result = PubsubClientSubscription {
576 message_type: PhantomData,
577 operation: "vote",
578 socket,
579 subscription_id,
580 t_cleanup: Some(t_cleanup),
581 exit,
582 };
583
584 Ok((result, receiver))
585 }
586
587 pub fn root_subscribe(url: &str) -> Result<RootSubscription, PubsubClientError> {
600 let url = Url::parse(url)?;
601 let socket = connect_with_retry(url)?;
602 let (sender, receiver) = unbounded();
603
604 let socket = Arc::new(RwLock::new(socket));
605 let socket_clone = socket.clone();
606 let exit = Arc::new(AtomicBool::new(false));
607 let exit_clone = exit.clone();
608 let body = json!({
609 "jsonrpc":"2.0",
610 "id":1,
611 "method":"rootSubscribe",
612 })
613 .to_string();
614 let subscription_id = PubsubRootClientSubscription::send_subscribe(&socket_clone, body)?;
615
616 let t_cleanup = std::thread::spawn(move || {
617 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
618 });
619
620 let result = PubsubClientSubscription {
621 message_type: PhantomData,
622 operation: "root",
623 socket,
624 subscription_id,
625 t_cleanup: Some(t_cleanup),
626 exit,
627 };
628
629 Ok((result, receiver))
630 }
631
632 pub fn signature_subscribe(
646 url: &str,
647 signature: &Signature,
648 config: Option<RpcSignatureSubscribeConfig>,
649 ) -> Result<SignatureSubscription, PubsubClientError> {
650 let url = Url::parse(url)?;
651 let socket = connect_with_retry(url)?;
652 let (sender, receiver) = unbounded();
653
654 let socket = Arc::new(RwLock::new(socket));
655 let socket_clone = socket.clone();
656 let exit = Arc::new(AtomicBool::new(false));
657 let exit_clone = exit.clone();
658 let body = json!({
659 "jsonrpc":"2.0",
660 "id":1,
661 "method":"signatureSubscribe",
662 "params":[
663 signature.to_string(),
664 config
665 ]
666 })
667 .to_string();
668 let subscription_id =
669 PubsubSignatureClientSubscription::send_subscribe(&socket_clone, body)?;
670
671 let t_cleanup = std::thread::spawn(move || {
672 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
673 });
674
675 let result = PubsubClientSubscription {
676 message_type: PhantomData,
677 operation: "signature",
678 socket,
679 subscription_id,
680 t_cleanup: Some(t_cleanup),
681 exit,
682 };
683
684 Ok((result, receiver))
685 }
686
687 pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
697 let url = Url::parse(url)?;
698 let socket = connect_with_retry(url)?;
699 let (sender, receiver) = unbounded::<SlotInfo>();
700
701 let socket = Arc::new(RwLock::new(socket));
702 let socket_clone = socket.clone();
703 let exit = Arc::new(AtomicBool::new(false));
704 let exit_clone = exit.clone();
705 let body = json!({
706 "jsonrpc":"2.0",
707 "id":1,
708 "method":"slotSubscribe",
709 "params":[]
710 })
711 .to_string();
712 let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket_clone, body)?;
713
714 let t_cleanup = std::thread::spawn(move || {
715 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
716 });
717
718 let result = PubsubClientSubscription {
719 message_type: PhantomData,
720 operation: "slot",
721 socket,
722 subscription_id,
723 t_cleanup: Some(t_cleanup),
724 exit,
725 };
726
727 Ok((result, receiver))
728 }
729
730 pub fn slot_updates_subscribe(
745 url: &str,
746 handler: impl Fn(SlotUpdate) + Send + 'static,
747 ) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
748 let url = Url::parse(url)?;
749 let socket = connect_with_retry(url)?;
750
751 let socket = Arc::new(RwLock::new(socket));
752 let socket_clone = socket.clone();
753 let exit = Arc::new(AtomicBool::new(false));
754 let exit_clone = exit.clone();
755 let body = json!({
756 "jsonrpc":"2.0",
757 "id":1,
758 "method":"slotsUpdatesSubscribe",
759 "params":[]
760 })
761 .to_string();
762 let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket, body)?;
763
764 let t_cleanup = std::thread::spawn(move || {
765 Self::cleanup_with_handler(exit_clone, &socket_clone, handler)
766 });
767
768 Ok(PubsubClientSubscription {
769 message_type: PhantomData,
770 operation: "slotsUpdates",
771 socket,
772 subscription_id,
773 t_cleanup: Some(t_cleanup),
774 exit,
775 })
776 }
777
778 fn cleanup_with_sender<T>(
779 exit: Arc<AtomicBool>,
780 socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
781 sender: Sender<T>,
782 ) where
783 T: DeserializeOwned + Send + 'static,
784 {
785 let handler = move |message| match sender.send(message) {
786 Ok(_) => (),
787 Err(err) => {
788 info!("receive error: {err:?}");
789 }
790 };
791 Self::cleanup_with_handler(exit, socket, handler);
792 }
793
794 fn cleanup_with_handler<T, F>(
795 exit: Arc<AtomicBool>,
796 socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
797 handler: F,
798 ) where
799 T: DeserializeOwned,
800 F: Fn(T) + Send + 'static,
801 {
802 loop {
803 if exit.load(Ordering::Relaxed) {
804 break;
805 }
806
807 match PubsubClientSubscription::read_message(socket) {
808 Ok(Some(message)) => handler(message),
809 Ok(None) => {
810 }
812 Err(err) => {
813 info!("receive error: {err:?}");
814 break;
815 }
816 }
817 }
818
819 info!("websocket - exited receive loop");
820 }
821}
822
823#[cfg(test)]
824mod tests {
825 }