1pub use crate::nonblocking::pubsub_client::PubsubClientError;
90use {
91 crossbeam_channel::{unbounded, Receiver, Sender},
92 log::*,
93 serde::de::DeserializeOwned,
94 serde_json::{
95 json,
96 value::Value::{Number, Object},
97 Map, Value,
98 },
99 solana_account_decoder_client_types::UiAccount,
100 solana_clock::Slot,
101 solana_pubkey::Pubkey,
102 solana_rpc_client_types::{
103 config::{
104 RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
105 RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
106 RpcTransactionLogsFilter,
107 },
108 response::{
109 Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
110 RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
111 },
112 },
113 solana_signature::Signature,
114 std::{
115 marker::PhantomData,
116 net::TcpStream,
117 sync::{
118 atomic::{AtomicBool, Ordering},
119 Arc, RwLock,
120 },
121 thread::{sleep, JoinHandle},
122 time::Duration,
123 },
124 tungstenite::{
125 client::IntoClientRequest,
126 connect,
127 http::{header, StatusCode},
128 stream::MaybeTlsStream,
129 Message, WebSocket,
130 },
131};
132
133pub struct PubsubClientSubscription<T>
139where
140 T: DeserializeOwned,
141{
142 message_type: PhantomData<T>,
143 operation: &'static str,
144 socket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
145 subscription_id: u64,
146 t_cleanup: Option<JoinHandle<()>>,
147 exit: Arc<AtomicBool>,
148}
149
150impl<T> Drop for PubsubClientSubscription<T>
151where
152 T: DeserializeOwned,
153{
154 fn drop(&mut self) {
155 self.send_unsubscribe()
156 .unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
157 self.socket
158 .write()
159 .unwrap()
160 .close(None)
161 .unwrap_or_else(|_| warn!("unable to close websocket"));
162 }
163}
164
165impl<T> PubsubClientSubscription<T>
166where
167 T: DeserializeOwned,
168{
169 fn send_subscribe(
170 writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
171 body: String,
172 ) -> Result<u64, PubsubClientError> {
173 writable_socket
174 .write()
175 .unwrap()
176 .send(Message::Text(body.into()))
177 .map_err(Box::new)?;
178 let message = writable_socket.write().unwrap().read().map_err(Box::new)?;
179 Self::extract_subscription_id(message)
180 }
181
182 fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
183 let message_text = &message.into_text().map_err(Box::new)?;
184
185 if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
186 if let Some(Number(x)) = json_msg.get("result") {
187 if let Some(x) = x.as_u64() {
188 return Ok(x);
189 }
190 }
191 }
192
193 Err(PubsubClientError::UnexpectedSubscriptionResponse(format!(
194 "msg={message_text}"
195 )))
196 }
197
198 pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
207 let method = format!("{}Unsubscribe", self.operation);
208 self.socket
209 .write()
210 .unwrap()
211 .send(Message::Text(
212 json!({
213 "jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
214 })
215 .to_string()
216 .into(),
217 ))
218 .map_err(Box::new)
219 .map_err(|err| err.into())
220 }
221
222 fn read_message(
223 writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
224 ) -> Result<Option<T>, PubsubClientError> {
225 let message = writable_socket.write().unwrap().read().map_err(Box::new)?;
226 if message.is_ping() {
227 return Ok(None);
228 }
229 let message_text = &message.into_text().map_err(Box::new)?;
230 if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
231 if let Some(Object(params)) = json_msg.get("params") {
232 if let Some(result) = params.get("result") {
233 if let Ok(x) = serde_json::from_value::<T>(result.clone()) {
234 return Ok(Some(x));
235 }
236 }
237 }
238 }
239
240 Err(PubsubClientError::UnexpectedMessageError(format!(
241 "msg={message_text}"
242 )))
243 }
244
245 pub fn shutdown(&mut self) -> std::thread::Result<()> {
254 if self.t_cleanup.is_some() {
255 info!("websocket thread - shutting down");
256 self.exit.store(true, Ordering::Relaxed);
257 let x = self.t_cleanup.take().unwrap().join();
258 info!("websocket thread - shut down.");
259 x
260 } else {
261 warn!("websocket thread - already shut down.");
262 Ok(())
263 }
264 }
265}
266
267pub type PubsubLogsClientSubscription = PubsubClientSubscription<RpcResponse<RpcLogsResponse>>;
268pub type LogsSubscription = (
269 PubsubLogsClientSubscription,
270 Receiver<RpcResponse<RpcLogsResponse>>,
271);
272
273pub type PubsubSlotClientSubscription = PubsubClientSubscription<SlotInfo>;
274pub type SlotsSubscription = (PubsubSlotClientSubscription, Receiver<SlotInfo>);
275
276pub type PubsubSignatureClientSubscription =
277 PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
278pub type SignatureSubscription = (
279 PubsubSignatureClientSubscription,
280 Receiver<RpcResponse<RpcSignatureResult>>,
281);
282
283pub type PubsubBlockClientSubscription = PubsubClientSubscription<RpcResponse<RpcBlockUpdate>>;
284pub type BlockSubscription = (
285 PubsubBlockClientSubscription,
286 Receiver<RpcResponse<RpcBlockUpdate>>,
287);
288
289pub type PubsubProgramClientSubscription = PubsubClientSubscription<RpcResponse<RpcKeyedAccount>>;
290pub type ProgramSubscription = (
291 PubsubProgramClientSubscription,
292 Receiver<RpcResponse<RpcKeyedAccount>>,
293);
294
295pub type PubsubAccountClientSubscription = PubsubClientSubscription<RpcResponse<UiAccount>>;
296pub type AccountSubscription = (
297 PubsubAccountClientSubscription,
298 Receiver<RpcResponse<UiAccount>>,
299);
300
301pub type PubsubVoteClientSubscription = PubsubClientSubscription<RpcVote>;
302pub type VoteSubscription = (PubsubVoteClientSubscription, Receiver<RpcVote>);
303
304pub type PubsubRootClientSubscription = PubsubClientSubscription<Slot>;
305pub type RootSubscription = (PubsubRootClientSubscription, Receiver<Slot>);
306
307pub struct PubsubClient {}
311
312fn connect_with_retry<R: IntoClientRequest>(
313 request: R,
314) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Box<tungstenite::Error>> {
315 let mut connection_retries = 5;
316 let client_request = request.into_client_request().map_err(Box::new)?;
317 loop {
318 let result = connect(client_request.clone()).map(|(socket, _)| socket);
319 if let Err(tungstenite::Error::Http(response)) = &result {
320 if response.status() == StatusCode::TOO_MANY_REQUESTS && connection_retries > 0 {
321 let mut duration = Duration::from_millis(500);
322 if let Some(retry_after) = response.headers().get(header::RETRY_AFTER) {
323 if let Ok(retry_after) = retry_after.to_str() {
324 if let Ok(retry_after) = retry_after.parse::<u64>() {
325 if retry_after < 120 {
326 duration = Duration::from_secs(retry_after);
327 }
328 }
329 }
330 }
331
332 connection_retries -= 1;
333 debug!(
334 "Too many requests: server responded with {response:?}, {connection_retries} \
335 retries left, pausing for {duration:?}"
336 );
337
338 sleep(duration);
339 continue;
340 }
341 }
342 return result.map_err(Box::new);
343 }
344}
345
346impl PubsubClient {
347 pub fn account_subscribe<R: IntoClientRequest>(
357 request: R,
358 pubkey: &Pubkey,
359 config: Option<RpcAccountInfoConfig>,
360 ) -> Result<AccountSubscription, PubsubClientError> {
361 let client_request = request.into_client_request().map_err(Box::new)?;
362 let socket = connect_with_retry(client_request)?;
363 let (sender, receiver) = unbounded();
364
365 let socket = Arc::new(RwLock::new(socket));
366 let socket_clone = socket.clone();
367 let exit = Arc::new(AtomicBool::new(false));
368 let exit_clone = exit.clone();
369 let body = json!({
370 "jsonrpc":"2.0",
371 "id":1,
372 "method":"accountSubscribe",
373 "params":[
374 pubkey.to_string(),
375 config
376 ]
377 })
378 .to_string();
379 let subscription_id = PubsubAccountClientSubscription::send_subscribe(&socket_clone, body)?;
380
381 let t_cleanup = std::thread::spawn(move || {
382 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
383 });
384
385 let result = PubsubClientSubscription {
386 message_type: PhantomData,
387 operation: "account",
388 socket,
389 subscription_id,
390 t_cleanup: Some(t_cleanup),
391 exit,
392 };
393
394 Ok((result, receiver))
395 }
396
397 pub fn block_subscribe<R: IntoClientRequest>(
410 request: R,
411 filter: RpcBlockSubscribeFilter,
412 config: Option<RpcBlockSubscribeConfig>,
413 ) -> Result<BlockSubscription, PubsubClientError> {
414 let client_request = request.into_client_request().map_err(Box::new)?;
415 let socket = connect_with_retry(client_request)?;
416 let (sender, receiver) = unbounded();
417
418 let socket = Arc::new(RwLock::new(socket));
419 let socket_clone = socket.clone();
420 let exit = Arc::new(AtomicBool::new(false));
421 let exit_clone = exit.clone();
422 let body = json!({
423 "jsonrpc":"2.0",
424 "id":1,
425 "method":"blockSubscribe",
426 "params":[filter, config]
427 })
428 .to_string();
429
430 let subscription_id = PubsubBlockClientSubscription::send_subscribe(&socket_clone, body)?;
431
432 let t_cleanup = std::thread::spawn(move || {
433 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
434 });
435
436 let result = PubsubClientSubscription {
437 message_type: PhantomData,
438 operation: "block",
439 socket,
440 subscription_id,
441 t_cleanup: Some(t_cleanup),
442 exit,
443 };
444
445 Ok((result, receiver))
446 }
447
448 pub fn logs_subscribe<R: IntoClientRequest>(
458 request: R,
459 filter: RpcTransactionLogsFilter,
460 config: RpcTransactionLogsConfig,
461 ) -> Result<LogsSubscription, PubsubClientError> {
462 let client_request = request.into_client_request().map_err(Box::new)?;
463 let socket = connect_with_retry(client_request)?;
464 let (sender, receiver) = unbounded();
465
466 let socket = Arc::new(RwLock::new(socket));
467 let socket_clone = socket.clone();
468 let exit = Arc::new(AtomicBool::new(false));
469 let exit_clone = exit.clone();
470 let body = json!({
471 "jsonrpc":"2.0",
472 "id":1,
473 "method":"logsSubscribe",
474 "params":[filter, config]
475 })
476 .to_string();
477
478 let subscription_id = PubsubLogsClientSubscription::send_subscribe(&socket_clone, body)?;
479
480 let t_cleanup = std::thread::spawn(move || {
481 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
482 });
483
484 let result = PubsubClientSubscription {
485 message_type: PhantomData,
486 operation: "logs",
487 socket,
488 subscription_id,
489 t_cleanup: Some(t_cleanup),
490 exit,
491 };
492
493 Ok((result, receiver))
494 }
495
496 pub fn program_subscribe<R: IntoClientRequest>(
507 request: R,
508 pubkey: &Pubkey,
509 config: Option<RpcProgramAccountsConfig>,
510 ) -> Result<ProgramSubscription, PubsubClientError> {
511 let client_request = request.into_client_request().map_err(Box::new)?;
512 let socket = connect_with_retry(client_request)?;
513 let (sender, receiver) = unbounded();
514
515 let socket = Arc::new(RwLock::new(socket));
516 let socket_clone = socket.clone();
517 let exit = Arc::new(AtomicBool::new(false));
518 let exit_clone = exit.clone();
519
520 let body = json!({
521 "jsonrpc":"2.0",
522 "id":1,
523 "method":"programSubscribe",
524 "params":[
525 pubkey.to_string(),
526 config
527 ]
528 })
529 .to_string();
530 let subscription_id = PubsubProgramClientSubscription::send_subscribe(&socket_clone, body)?;
531
532 let t_cleanup = std::thread::spawn(move || {
533 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
534 });
535
536 let result = PubsubClientSubscription {
537 message_type: PhantomData,
538 operation: "program",
539 socket,
540 subscription_id,
541 t_cleanup: Some(t_cleanup),
542 exit,
543 };
544
545 Ok((result, receiver))
546 }
547
548 pub fn vote_subscribe<R: IntoClientRequest>(
562 request: R,
563 ) -> Result<VoteSubscription, PubsubClientError> {
564 let client_request = request.into_client_request().map_err(Box::new)?;
565 let socket = connect_with_retry(client_request)?;
566 let (sender, receiver) = unbounded();
567
568 let socket = Arc::new(RwLock::new(socket));
569 let socket_clone = socket.clone();
570 let exit = Arc::new(AtomicBool::new(false));
571 let exit_clone = exit.clone();
572 let body = json!({
573 "jsonrpc":"2.0",
574 "id":1,
575 "method":"voteSubscribe",
576 })
577 .to_string();
578 let subscription_id = PubsubVoteClientSubscription::send_subscribe(&socket_clone, body)?;
579
580 let t_cleanup = std::thread::spawn(move || {
581 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
582 });
583
584 let result = PubsubClientSubscription {
585 message_type: PhantomData,
586 operation: "vote",
587 socket,
588 subscription_id,
589 t_cleanup: Some(t_cleanup),
590 exit,
591 };
592
593 Ok((result, receiver))
594 }
595
596 pub fn root_subscribe<R: IntoClientRequest>(
609 request: R,
610 ) -> Result<RootSubscription, PubsubClientError> {
611 let client_request = request.into_client_request().map_err(Box::new)?;
612 let socket = connect_with_retry(client_request)?;
613 let (sender, receiver) = unbounded();
614
615 let socket = Arc::new(RwLock::new(socket));
616 let socket_clone = socket.clone();
617 let exit = Arc::new(AtomicBool::new(false));
618 let exit_clone = exit.clone();
619 let body = json!({
620 "jsonrpc":"2.0",
621 "id":1,
622 "method":"rootSubscribe",
623 })
624 .to_string();
625 let subscription_id = PubsubRootClientSubscription::send_subscribe(&socket_clone, body)?;
626
627 let t_cleanup = std::thread::spawn(move || {
628 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
629 });
630
631 let result = PubsubClientSubscription {
632 message_type: PhantomData,
633 operation: "root",
634 socket,
635 subscription_id,
636 t_cleanup: Some(t_cleanup),
637 exit,
638 };
639
640 Ok((result, receiver))
641 }
642
643 pub fn signature_subscribe<R: IntoClientRequest>(
657 request: R,
658 signature: &Signature,
659 config: Option<RpcSignatureSubscribeConfig>,
660 ) -> Result<SignatureSubscription, PubsubClientError> {
661 let client_request = request.into_client_request().map_err(Box::new)?;
662 let socket = connect_with_retry(client_request)?;
663 let (sender, receiver) = unbounded();
664
665 let socket = Arc::new(RwLock::new(socket));
666 let socket_clone = socket.clone();
667 let exit = Arc::new(AtomicBool::new(false));
668 let exit_clone = exit.clone();
669 let body = json!({
670 "jsonrpc":"2.0",
671 "id":1,
672 "method":"signatureSubscribe",
673 "params":[
674 signature.to_string(),
675 config
676 ]
677 })
678 .to_string();
679 let subscription_id =
680 PubsubSignatureClientSubscription::send_subscribe(&socket_clone, body)?;
681
682 let t_cleanup = std::thread::spawn(move || {
683 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
684 });
685
686 let result = PubsubClientSubscription {
687 message_type: PhantomData,
688 operation: "signature",
689 socket,
690 subscription_id,
691 t_cleanup: Some(t_cleanup),
692 exit,
693 };
694
695 Ok((result, receiver))
696 }
697
698 pub fn slot_subscribe<R: IntoClientRequest>(
708 request: R,
709 ) -> Result<SlotsSubscription, PubsubClientError> {
710 let client_request = request.into_client_request().map_err(Box::new)?;
711 let socket = connect_with_retry(client_request)?;
712 let (sender, receiver) = unbounded::<SlotInfo>();
713
714 let socket = Arc::new(RwLock::new(socket));
715 let socket_clone = socket.clone();
716 let exit = Arc::new(AtomicBool::new(false));
717 let exit_clone = exit.clone();
718 let body = json!({
719 "jsonrpc":"2.0",
720 "id":1,
721 "method":"slotSubscribe",
722 "params":[]
723 })
724 .to_string();
725 let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket_clone, body)?;
726
727 let t_cleanup = std::thread::spawn(move || {
728 Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
729 });
730
731 let result = PubsubClientSubscription {
732 message_type: PhantomData,
733 operation: "slot",
734 socket,
735 subscription_id,
736 t_cleanup: Some(t_cleanup),
737 exit,
738 };
739
740 Ok((result, receiver))
741 }
742
743 pub fn slot_updates_subscribe<R: IntoClientRequest>(
758 request: R,
759 handler: impl Fn(SlotUpdate) + Send + 'static,
760 ) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
761 let client_request = request.into_client_request().map_err(Box::new)?;
762 let socket = connect_with_retry(client_request)?;
763
764 let socket = Arc::new(RwLock::new(socket));
765 let socket_clone = socket.clone();
766 let exit = Arc::new(AtomicBool::new(false));
767 let exit_clone = exit.clone();
768 let body = json!({
769 "jsonrpc":"2.0",
770 "id":1,
771 "method":"slotsUpdatesSubscribe",
772 "params":[]
773 })
774 .to_string();
775 let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket, body)?;
776
777 let t_cleanup = std::thread::spawn(move || {
778 Self::cleanup_with_handler(exit_clone, &socket_clone, handler)
779 });
780
781 Ok(PubsubClientSubscription {
782 message_type: PhantomData,
783 operation: "slotsUpdates",
784 socket,
785 subscription_id,
786 t_cleanup: Some(t_cleanup),
787 exit,
788 })
789 }
790
791 fn cleanup_with_sender<T>(
792 exit: Arc<AtomicBool>,
793 socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
794 sender: Sender<T>,
795 ) where
796 T: DeserializeOwned + Send + 'static,
797 {
798 let handler = move |message| match sender.send(message) {
799 Ok(_) => (),
800 Err(err) => {
801 info!("receive error: {err:?}");
802 }
803 };
804 Self::cleanup_with_handler(exit, socket, handler);
805 }
806
807 fn cleanup_with_handler<T, F>(
808 exit: Arc<AtomicBool>,
809 socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
810 handler: F,
811 ) where
812 T: DeserializeOwned,
813 F: Fn(T) + Send + 'static,
814 {
815 loop {
816 if exit.load(Ordering::Relaxed) {
817 break;
818 }
819
820 match PubsubClientSubscription::read_message(socket) {
821 Ok(Some(message)) => handler(message),
822 Ok(None) => {
823 }
825 Err(err) => {
826 info!("receive error: {err:?}");
827 break;
828 }
829 }
830 }
831
832 info!("websocket - exited receive loop");
833 }
834}
835
836#[cfg(test)]
837mod tests {
838 }