1use {
4 crate::{
5 rpc_pubsub::{RpcSolPubSubImpl, RpcSolPubSubInternal},
6 rpc_subscription_tracker::{
7 SubscriptionControl, SubscriptionId, SubscriptionParams, SubscriptionToken,
8 },
9 rpc_subscriptions::{RpcNotification, RpcSubscriptions},
10 },
11 dashmap::{mapref::entry::Entry, DashMap},
12 jsonrpc_core::IoHandler,
13 soketto::handshake::{server, Server},
14 solana_metrics::TokenCounter,
15 solana_rayon_threadlimit::get_thread_count,
16 solana_time_utils::AtomicInterval,
17 std::{
18 io,
19 net::SocketAddr,
20 num::NonZeroUsize,
21 str,
22 sync::{
23 atomic::{AtomicU64, AtomicUsize, Ordering},
24 Arc,
25 },
26 thread::{self, Builder, JoinHandle},
27 },
28 stream_cancel::{Trigger, Tripwire},
29 thiserror::Error,
30 tokio::{net::TcpStream, pin, select, sync::broadcast},
31 tokio_util::compat::TokioAsyncReadCompatExt,
32};
33
34pub const MAX_ACTIVE_SUBSCRIPTIONS: usize = 1_000_000;
35pub const DEFAULT_QUEUE_CAPACITY_ITEMS: usize = 10_000_000;
36pub const DEFAULT_TEST_QUEUE_CAPACITY_ITEMS: usize = 100;
37pub const DEFAULT_QUEUE_CAPACITY_BYTES: usize = 256 * 1024 * 1024;
38pub const DEFAULT_WORKER_THREADS: usize = 1;
39
40#[derive(Debug, Clone, PartialEq)]
41pub struct PubSubConfig {
42 pub enable_block_subscription: bool,
43 pub enable_vote_subscription: bool,
44 pub max_active_subscriptions: usize,
45 pub queue_capacity_items: usize,
46 pub queue_capacity_bytes: usize,
47 pub worker_threads: usize,
48 pub notification_threads: Option<NonZeroUsize>,
49}
50
51impl Default for PubSubConfig {
52 fn default() -> Self {
53 Self {
54 enable_block_subscription: false,
55 enable_vote_subscription: false,
56 max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS,
57 queue_capacity_items: DEFAULT_QUEUE_CAPACITY_ITEMS,
58 queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES,
59 worker_threads: DEFAULT_WORKER_THREADS,
60 notification_threads: NonZeroUsize::new(get_thread_count()),
61 }
62 }
63}
64
65impl PubSubConfig {
66 pub fn default_for_tests() -> Self {
67 Self {
68 enable_block_subscription: false,
69 enable_vote_subscription: false,
70 max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS,
71 queue_capacity_items: DEFAULT_TEST_QUEUE_CAPACITY_ITEMS,
72 queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES,
73 worker_threads: DEFAULT_WORKER_THREADS,
74 notification_threads: NonZeroUsize::new(2),
75 }
76 }
77}
78
79pub struct PubSubService {
80 thread_hdl: JoinHandle<()>,
81}
82
83impl PubSubService {
84 pub fn new(
85 pubsub_config: PubSubConfig,
86 subscriptions: &RpcSubscriptions,
87 pubsub_addr: SocketAddr,
88 ) -> (Trigger, Self) {
89 let subscription_control = subscriptions.control().clone();
90 info!("rpc_pubsub bound to {pubsub_addr:?}");
91
92 let (trigger, tripwire) = Tripwire::new();
93 let thread_hdl = Builder::new()
94 .name("solRpcPubSub".to_string())
95 .spawn(move || {
96 info!("PubSubService has started");
97 let runtime = tokio::runtime::Builder::new_multi_thread()
98 .thread_name("solRpcPubSubRt")
99 .worker_threads(pubsub_config.worker_threads)
100 .enable_all()
101 .build()
102 .expect("runtime creation failed");
103 if let Err(err) = runtime.block_on(listen(
104 pubsub_addr,
105 pubsub_config,
106 subscription_control,
107 tripwire,
108 )) {
109 error!("PubSubService has stopped due to error: {err}");
110 };
111 info!("PubSubService has stopped");
112 })
113 .expect("thread spawn failed");
114
115 (trigger, Self { thread_hdl })
116 }
117
118 pub fn close(self) -> thread::Result<()> {
119 self.join()
120 }
121
122 pub fn join(self) -> thread::Result<()> {
123 self.thread_hdl.join()
124 }
125}
126
127const METRICS_REPORT_INTERVAL_MS: u64 = 10_000;
128
129#[derive(Default)]
130struct SentNotificationStats {
131 num_account: AtomicUsize,
132 num_logs: AtomicUsize,
133 num_program: AtomicUsize,
134 num_signature: AtomicUsize,
135 num_slot: AtomicUsize,
136 num_slots_updates: AtomicUsize,
137 num_root: AtomicUsize,
138 num_vote: AtomicUsize,
139 num_block: AtomicUsize,
140 total_creation_to_queue_time_us: AtomicU64,
141 last_report: AtomicInterval,
142}
143
144impl SentNotificationStats {
145 fn maybe_report(&self) {
146 if self.last_report.should_update(METRICS_REPORT_INTERVAL_MS) {
147 datapoint_info!(
148 "rpc_pubsub-sent_notifications",
149 (
150 "num_account",
151 self.num_account.swap(0, Ordering::Relaxed) as i64,
152 i64
153 ),
154 (
155 "num_logs",
156 self.num_logs.swap(0, Ordering::Relaxed) as i64,
157 i64
158 ),
159 (
160 "num_program",
161 self.num_program.swap(0, Ordering::Relaxed) as i64,
162 i64
163 ),
164 (
165 "num_signature",
166 self.num_signature.swap(0, Ordering::Relaxed) as i64,
167 i64
168 ),
169 (
170 "num_slot",
171 self.num_slot.swap(0, Ordering::Relaxed) as i64,
172 i64
173 ),
174 (
175 "num_slots_updates",
176 self.num_slots_updates.swap(0, Ordering::Relaxed) as i64,
177 i64
178 ),
179 (
180 "num_root",
181 self.num_root.swap(0, Ordering::Relaxed) as i64,
182 i64
183 ),
184 (
185 "num_vote",
186 self.num_vote.swap(0, Ordering::Relaxed) as i64,
187 i64
188 ),
189 (
190 "num_block",
191 self.num_block.swap(0, Ordering::Relaxed) as i64,
192 i64
193 ),
194 (
195 "total_creation_to_queue_time_us",
196 self.total_creation_to_queue_time_us
197 .swap(0, Ordering::Relaxed) as i64,
198 i64
199 )
200 );
201 }
202 }
203}
204
205struct BroadcastHandler {
206 current_subscriptions: Arc<DashMap<SubscriptionId, SubscriptionToken>>,
207 sent_stats: Arc<SentNotificationStats>,
208}
209
210fn increment_sent_notification_stats(
211 params: &SubscriptionParams,
212 notification: &RpcNotification,
213 stats: &Arc<SentNotificationStats>,
214) {
215 match params {
216 SubscriptionParams::Account(_) => {
217 stats.num_account.fetch_add(1, Ordering::Relaxed);
218 }
219 SubscriptionParams::Logs(_) => {
220 stats.num_logs.fetch_add(1, Ordering::Relaxed);
221 }
222 SubscriptionParams::Program(_) => {
223 stats.num_program.fetch_add(1, Ordering::Relaxed);
224 }
225 SubscriptionParams::Signature(_) => {
226 stats.num_signature.fetch_add(1, Ordering::Relaxed);
227 }
228 SubscriptionParams::Slot => {
229 stats.num_slot.fetch_add(1, Ordering::Relaxed);
230 }
231 SubscriptionParams::SlotsUpdates => {
232 stats.num_slots_updates.fetch_add(1, Ordering::Relaxed);
233 }
234 SubscriptionParams::Root => {
235 stats.num_root.fetch_add(1, Ordering::Relaxed);
236 }
237 SubscriptionParams::Vote => {
238 stats.num_vote.fetch_add(1, Ordering::Relaxed);
239 }
240 SubscriptionParams::Block(_) => {
241 stats.num_block.fetch_add(1, Ordering::Relaxed);
242 }
243 }
244 stats.total_creation_to_queue_time_us.fetch_add(
245 notification.created_at.elapsed().as_micros() as u64,
246 Ordering::Relaxed,
247 );
248
249 stats.maybe_report();
250}
251
252impl BroadcastHandler {
253 fn new(current_subscriptions: Arc<DashMap<SubscriptionId, SubscriptionToken>>) -> Self {
254 let sent_stats = Arc::new(SentNotificationStats::default());
255 Self {
256 current_subscriptions,
257 sent_stats,
258 }
259 }
260
261 fn handle(&self, notification: RpcNotification) -> Result<Option<Arc<String>>, Error> {
262 if let Entry::Occupied(entry) = self
263 .current_subscriptions
264 .entry(notification.subscription_id)
265 {
266 increment_sent_notification_stats(
267 entry.get().params(),
268 ¬ification,
269 &self.sent_stats,
270 );
271
272 if notification.is_final {
273 entry.remove();
274 }
275 notification
276 .json
277 .upgrade()
278 .ok_or(Error::NotificationIsGone)
279 .map(Some)
280 } else {
281 Ok(None)
282 }
283 }
284}
285
286#[cfg(test)]
287pub struct TestBroadcastReceiver {
288 handler: BroadcastHandler,
289 inner: tokio::sync::broadcast::Receiver<RpcNotification>,
290}
291
292#[cfg(test)]
293impl TestBroadcastReceiver {
294 pub fn recv(&mut self) -> String {
295 match self.recv_timeout(std::time::Duration::from_secs(10)) {
296 Err(err) => panic!("broadcast receiver error: {err}"),
297 Ok(str) => str,
298 }
299 }
300
301 pub fn recv_timeout(&mut self, timeout: std::time::Duration) -> Result<String, String> {
302 use {std::thread::sleep, tokio::sync::broadcast::error::TryRecvError};
303
304 let started = std::time::Instant::now();
305
306 loop {
307 match self.inner.try_recv() {
308 Ok(notification) => {
309 debug!(
310 "TestBroadcastReceiver: {:?}ms elapsed",
311 started.elapsed().as_millis()
312 );
313 if let Some(json) = self.handler.handle(notification).expect("handler failed") {
314 return Ok(json.to_string());
315 }
316 }
317 Err(TryRecvError::Empty) => {
318 if started.elapsed() > timeout {
319 return Err("TestBroadcastReceiver: no data, timeout reached".into());
320 }
321 sleep(std::time::Duration::from_millis(50));
322 }
323 Err(e) => return Err(e.to_string()),
324 }
325 }
326 }
327}
328
329#[cfg(test)]
330pub fn test_connection(
331 subscriptions: &Arc<RpcSubscriptions>,
332) -> (RpcSolPubSubImpl, TestBroadcastReceiver) {
333 let current_subscriptions = Arc::new(DashMap::new());
334
335 let rpc_impl = RpcSolPubSubImpl::new(
336 PubSubConfig {
337 enable_block_subscription: true,
338 enable_vote_subscription: true,
339 queue_capacity_items: 100,
340 ..PubSubConfig::default()
341 },
342 subscriptions.control().clone(),
343 Arc::clone(¤t_subscriptions),
344 );
345 let broadcast_handler = BroadcastHandler::new(current_subscriptions);
346 let receiver = TestBroadcastReceiver {
347 inner: subscriptions.control().broadcast_receiver(),
348 handler: broadcast_handler,
349 };
350 (rpc_impl, receiver)
351}
352
353#[derive(Debug, Error)]
354enum Error {
355 #[error("handshake error: {0}")]
356 Handshake(#[from] soketto::handshake::Error),
357 #[error("connection error: {0}")]
358 Connection(#[from] soketto::connection::Error),
359 #[error("broadcast queue error: {0}")]
360 Broadcast(#[from] broadcast::error::RecvError),
361 #[error("client has lagged behind (notification is gone)")]
362 NotificationIsGone,
363}
364
365async fn handle_connection(
366 socket: TcpStream,
367 subscription_control: SubscriptionControl,
368 config: PubSubConfig,
369 mut tripwire: Tripwire,
370) -> Result<(), Error> {
371 let mut server = Server::new(socket.compat());
372 let request = server.receive_request().await?;
373 let accept = server::Response::Accept {
374 key: request.key(),
375 protocol: None,
376 };
377 server.send_response(&accept).await?;
378 let mut builder = server.into_builder();
379 builder.set_max_message_size(4_096);
380 builder.set_max_frame_size(4_096);
381 let (mut sender, mut receiver) = builder.finish();
382
383 let mut broadcast_receiver = subscription_control.broadcast_receiver();
384 let mut data = Vec::new();
385 let current_subscriptions = Arc::new(DashMap::new());
386
387 let mut json_rpc_handler = IoHandler::new();
388 let rpc_impl = RpcSolPubSubImpl::new(
389 config,
390 subscription_control,
391 Arc::clone(¤t_subscriptions),
392 );
393 json_rpc_handler.extend_with(rpc_impl.to_delegate());
394 let broadcast_handler = BroadcastHandler::new(current_subscriptions);
395 loop {
396 {
398 let receive_future = receiver.receive_data(&mut data);
401 pin!(receive_future);
402 loop {
403 select! {
404 biased; result = &mut receive_future => match result {
411 Ok(_) => break,
412 Err(soketto::connection::Error::Closed) => return Ok(()),
413 Err(err) => return Err(err.into()),
414 },
415 result = broadcast_receiver.recv() => {
416
417 if let Some(json) = broadcast_handler.handle(result?)? {
419 sender.send_text(&*json).await?;
420 }
421 },
422 _ = &mut tripwire => {
423 warn!("disconnecting websocket client: shutting down");
424 return Ok(())
425 },
426
427 }
428 }
429 }
430 let Ok(data_str) = str::from_utf8(&data) else {
431 break;
434 };
435
436 if let Some(response) = json_rpc_handler.handle_request(data_str).await {
437 sender.send_text(&response).await?;
438 }
439 data.clear();
440 }
441
442 Ok(())
443}
444
445async fn listen(
446 listen_address: SocketAddr,
447 config: PubSubConfig,
448 subscription_control: SubscriptionControl,
449 mut tripwire: Tripwire,
450) -> io::Result<()> {
451 let listener = tokio::net::TcpListener::bind(&listen_address).await?;
452 let counter = TokenCounter::new("rpc_pubsub_connections");
453 loop {
454 select! {
455 result = listener.accept() => match result {
456 Ok((socket, addr)) => {
457 debug!("new client ({addr:?})");
458 let subscription_control = subscription_control.clone();
459 let config = config.clone();
460 let tripwire = tripwire.clone();
461 let counter_token = counter.create_token();
462 tokio::spawn(async move {
463 let handle = handle_connection(
464 socket, subscription_control, config, tripwire
465 );
466 match handle.await {
467 Ok(()) => debug!("connection closed ({addr:?})"),
468 Err(err) => warn!("connection handler error ({addr:?}): {err}"),
469 }
470 drop(counter_token); });
472 }
473 Err(e) => error!("couldn't accept connection: {e:?}"),
474 },
475 _ = &mut tripwire => return Ok(()),
476 }
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use {
483 super::*,
484 crate::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
485 solana_runtime::{
486 bank::Bank,
487 bank_forks::BankForks,
488 commitment::BlockCommitmentCache,
489 genesis_utils::{create_genesis_config, GenesisConfigInfo},
490 },
491 std::{
492 net::{IpAddr, Ipv4Addr},
493 sync::{
494 atomic::{AtomicBool, AtomicU64},
495 RwLock,
496 },
497 },
498 };
499
500 #[test]
501 fn test_pubsub_new() {
502 let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
503 let exit = Arc::new(AtomicBool::new(false));
504 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
505 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
506 let bank = Bank::new_for_tests(&genesis_config);
507 let bank_forks = BankForks::new_rw_arc(bank);
508 let optimistically_confirmed_bank =
509 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
510 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
511 exit,
512 max_complete_transaction_status_slot,
513 bank_forks,
514 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
515 optimistically_confirmed_bank,
516 ));
517 let (_trigger, pubsub_service) =
518 PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
519 let thread = pubsub_service.thread_hdl.thread();
520 assert_eq!(thread.name().unwrap(), "solRpcPubSub");
521 }
522}