solana_rpc/
rpc_pubsub_service.rs

1//! The `pubsub` module implements a threaded subscription service on client RPC request
2
3use {
4    crate::{
5        rpc_pubsub::{RpcSolPubSubImpl, RpcSolPubSubInternal},
6        rpc_subscription_tracker::{
7            SubscriptionControl, SubscriptionId, SubscriptionParams, SubscriptionToken,
8        },
9        rpc_subscriptions::{RpcNotification, RpcSubscriptions},
10    },
11    dashmap::{mapref::entry::Entry, DashMap},
12    jsonrpc_core::IoHandler,
13    soketto::handshake::{server, Server},
14    solana_metrics::TokenCounter,
15    solana_rayon_threadlimit::get_thread_count,
16    solana_time_utils::AtomicInterval,
17    std::{
18        io,
19        net::SocketAddr,
20        num::NonZeroUsize,
21        str,
22        sync::{
23            atomic::{AtomicU64, AtomicUsize, Ordering},
24            Arc,
25        },
26        thread::{self, Builder, JoinHandle},
27    },
28    stream_cancel::{Trigger, Tripwire},
29    thiserror::Error,
30    tokio::{net::TcpStream, pin, select, sync::broadcast},
31    tokio_util::compat::TokioAsyncReadCompatExt,
32};
33
34pub const MAX_ACTIVE_SUBSCRIPTIONS: usize = 1_000_000;
35pub const DEFAULT_QUEUE_CAPACITY_ITEMS: usize = 10_000_000;
36pub const DEFAULT_TEST_QUEUE_CAPACITY_ITEMS: usize = 100;
37pub const DEFAULT_QUEUE_CAPACITY_BYTES: usize = 256 * 1024 * 1024;
38pub const DEFAULT_WORKER_THREADS: usize = 1;
39
40#[derive(Debug, Clone, PartialEq)]
41pub struct PubSubConfig {
42    pub enable_block_subscription: bool,
43    pub enable_vote_subscription: bool,
44    pub max_active_subscriptions: usize,
45    pub queue_capacity_items: usize,
46    pub queue_capacity_bytes: usize,
47    pub worker_threads: usize,
48    pub notification_threads: Option<NonZeroUsize>,
49}
50
51impl Default for PubSubConfig {
52    fn default() -> Self {
53        Self {
54            enable_block_subscription: false,
55            enable_vote_subscription: false,
56            max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS,
57            queue_capacity_items: DEFAULT_QUEUE_CAPACITY_ITEMS,
58            queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES,
59            worker_threads: DEFAULT_WORKER_THREADS,
60            notification_threads: NonZeroUsize::new(get_thread_count()),
61        }
62    }
63}
64
65impl PubSubConfig {
66    pub fn default_for_tests() -> Self {
67        Self {
68            enable_block_subscription: false,
69            enable_vote_subscription: false,
70            max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS,
71            queue_capacity_items: DEFAULT_TEST_QUEUE_CAPACITY_ITEMS,
72            queue_capacity_bytes: DEFAULT_QUEUE_CAPACITY_BYTES,
73            worker_threads: DEFAULT_WORKER_THREADS,
74            notification_threads: NonZeroUsize::new(2),
75        }
76    }
77}
78
79pub struct PubSubService {
80    thread_hdl: JoinHandle<()>,
81}
82
83impl PubSubService {
84    pub fn new(
85        pubsub_config: PubSubConfig,
86        subscriptions: &RpcSubscriptions,
87        pubsub_addr: SocketAddr,
88    ) -> (Trigger, Self) {
89        let subscription_control = subscriptions.control().clone();
90        info!("rpc_pubsub bound to {pubsub_addr:?}");
91
92        let (trigger, tripwire) = Tripwire::new();
93        let thread_hdl = Builder::new()
94            .name("solRpcPubSub".to_string())
95            .spawn(move || {
96                info!("PubSubService has started");
97                let runtime = tokio::runtime::Builder::new_multi_thread()
98                    .thread_name("solRpcPubSubRt")
99                    .worker_threads(pubsub_config.worker_threads)
100                    .enable_all()
101                    .build()
102                    .expect("runtime creation failed");
103                if let Err(err) = runtime.block_on(listen(
104                    pubsub_addr,
105                    pubsub_config,
106                    subscription_control,
107                    tripwire,
108                )) {
109                    error!("PubSubService has stopped due to error: {err}");
110                };
111                info!("PubSubService has stopped");
112            })
113            .expect("thread spawn failed");
114
115        (trigger, Self { thread_hdl })
116    }
117
118    pub fn close(self) -> thread::Result<()> {
119        self.join()
120    }
121
122    pub fn join(self) -> thread::Result<()> {
123        self.thread_hdl.join()
124    }
125}
126
127const METRICS_REPORT_INTERVAL_MS: u64 = 10_000;
128
129#[derive(Default)]
130struct SentNotificationStats {
131    num_account: AtomicUsize,
132    num_logs: AtomicUsize,
133    num_program: AtomicUsize,
134    num_signature: AtomicUsize,
135    num_slot: AtomicUsize,
136    num_slots_updates: AtomicUsize,
137    num_root: AtomicUsize,
138    num_vote: AtomicUsize,
139    num_block: AtomicUsize,
140    total_creation_to_queue_time_us: AtomicU64,
141    last_report: AtomicInterval,
142}
143
144impl SentNotificationStats {
145    fn maybe_report(&self) {
146        if self.last_report.should_update(METRICS_REPORT_INTERVAL_MS) {
147            datapoint_info!(
148                "rpc_pubsub-sent_notifications",
149                (
150                    "num_account",
151                    self.num_account.swap(0, Ordering::Relaxed) as i64,
152                    i64
153                ),
154                (
155                    "num_logs",
156                    self.num_logs.swap(0, Ordering::Relaxed) as i64,
157                    i64
158                ),
159                (
160                    "num_program",
161                    self.num_program.swap(0, Ordering::Relaxed) as i64,
162                    i64
163                ),
164                (
165                    "num_signature",
166                    self.num_signature.swap(0, Ordering::Relaxed) as i64,
167                    i64
168                ),
169                (
170                    "num_slot",
171                    self.num_slot.swap(0, Ordering::Relaxed) as i64,
172                    i64
173                ),
174                (
175                    "num_slots_updates",
176                    self.num_slots_updates.swap(0, Ordering::Relaxed) as i64,
177                    i64
178                ),
179                (
180                    "num_root",
181                    self.num_root.swap(0, Ordering::Relaxed) as i64,
182                    i64
183                ),
184                (
185                    "num_vote",
186                    self.num_vote.swap(0, Ordering::Relaxed) as i64,
187                    i64
188                ),
189                (
190                    "num_block",
191                    self.num_block.swap(0, Ordering::Relaxed) as i64,
192                    i64
193                ),
194                (
195                    "total_creation_to_queue_time_us",
196                    self.total_creation_to_queue_time_us
197                        .swap(0, Ordering::Relaxed) as i64,
198                    i64
199                )
200            );
201        }
202    }
203}
204
205struct BroadcastHandler {
206    current_subscriptions: Arc<DashMap<SubscriptionId, SubscriptionToken>>,
207    sent_stats: Arc<SentNotificationStats>,
208}
209
210fn increment_sent_notification_stats(
211    params: &SubscriptionParams,
212    notification: &RpcNotification,
213    stats: &Arc<SentNotificationStats>,
214) {
215    match params {
216        SubscriptionParams::Account(_) => {
217            stats.num_account.fetch_add(1, Ordering::Relaxed);
218        }
219        SubscriptionParams::Logs(_) => {
220            stats.num_logs.fetch_add(1, Ordering::Relaxed);
221        }
222        SubscriptionParams::Program(_) => {
223            stats.num_program.fetch_add(1, Ordering::Relaxed);
224        }
225        SubscriptionParams::Signature(_) => {
226            stats.num_signature.fetch_add(1, Ordering::Relaxed);
227        }
228        SubscriptionParams::Slot => {
229            stats.num_slot.fetch_add(1, Ordering::Relaxed);
230        }
231        SubscriptionParams::SlotsUpdates => {
232            stats.num_slots_updates.fetch_add(1, Ordering::Relaxed);
233        }
234        SubscriptionParams::Root => {
235            stats.num_root.fetch_add(1, Ordering::Relaxed);
236        }
237        SubscriptionParams::Vote => {
238            stats.num_vote.fetch_add(1, Ordering::Relaxed);
239        }
240        SubscriptionParams::Block(_) => {
241            stats.num_block.fetch_add(1, Ordering::Relaxed);
242        }
243    }
244    stats.total_creation_to_queue_time_us.fetch_add(
245        notification.created_at.elapsed().as_micros() as u64,
246        Ordering::Relaxed,
247    );
248
249    stats.maybe_report();
250}
251
252impl BroadcastHandler {
253    fn new(current_subscriptions: Arc<DashMap<SubscriptionId, SubscriptionToken>>) -> Self {
254        let sent_stats = Arc::new(SentNotificationStats::default());
255        Self {
256            current_subscriptions,
257            sent_stats,
258        }
259    }
260
261    fn handle(&self, notification: RpcNotification) -> Result<Option<Arc<String>>, Error> {
262        if let Entry::Occupied(entry) = self
263            .current_subscriptions
264            .entry(notification.subscription_id)
265        {
266            increment_sent_notification_stats(
267                entry.get().params(),
268                &notification,
269                &self.sent_stats,
270            );
271
272            if notification.is_final {
273                entry.remove();
274            }
275            notification
276                .json
277                .upgrade()
278                .ok_or(Error::NotificationIsGone)
279                .map(Some)
280        } else {
281            Ok(None)
282        }
283    }
284}
285
286#[cfg(test)]
287pub struct TestBroadcastReceiver {
288    handler: BroadcastHandler,
289    inner: tokio::sync::broadcast::Receiver<RpcNotification>,
290}
291
292#[cfg(test)]
293impl TestBroadcastReceiver {
294    pub fn recv(&mut self) -> String {
295        match self.recv_timeout(std::time::Duration::from_secs(10)) {
296            Err(err) => panic!("broadcast receiver error: {err}"),
297            Ok(str) => str,
298        }
299    }
300
301    pub fn recv_timeout(&mut self, timeout: std::time::Duration) -> Result<String, String> {
302        use {std::thread::sleep, tokio::sync::broadcast::error::TryRecvError};
303
304        let started = std::time::Instant::now();
305
306        loop {
307            match self.inner.try_recv() {
308                Ok(notification) => {
309                    debug!(
310                        "TestBroadcastReceiver: {:?}ms elapsed",
311                        started.elapsed().as_millis()
312                    );
313                    if let Some(json) = self.handler.handle(notification).expect("handler failed") {
314                        return Ok(json.to_string());
315                    }
316                }
317                Err(TryRecvError::Empty) => {
318                    if started.elapsed() > timeout {
319                        return Err("TestBroadcastReceiver: no data, timeout reached".into());
320                    }
321                    sleep(std::time::Duration::from_millis(50));
322                }
323                Err(e) => return Err(e.to_string()),
324            }
325        }
326    }
327}
328
329#[cfg(test)]
330pub fn test_connection(
331    subscriptions: &Arc<RpcSubscriptions>,
332) -> (RpcSolPubSubImpl, TestBroadcastReceiver) {
333    let current_subscriptions = Arc::new(DashMap::new());
334
335    let rpc_impl = RpcSolPubSubImpl::new(
336        PubSubConfig {
337            enable_block_subscription: true,
338            enable_vote_subscription: true,
339            queue_capacity_items: 100,
340            ..PubSubConfig::default()
341        },
342        subscriptions.control().clone(),
343        Arc::clone(&current_subscriptions),
344    );
345    let broadcast_handler = BroadcastHandler::new(current_subscriptions);
346    let receiver = TestBroadcastReceiver {
347        inner: subscriptions.control().broadcast_receiver(),
348        handler: broadcast_handler,
349    };
350    (rpc_impl, receiver)
351}
352
353#[derive(Debug, Error)]
354enum Error {
355    #[error("handshake error: {0}")]
356    Handshake(#[from] soketto::handshake::Error),
357    #[error("connection error: {0}")]
358    Connection(#[from] soketto::connection::Error),
359    #[error("broadcast queue error: {0}")]
360    Broadcast(#[from] broadcast::error::RecvError),
361    #[error("client has lagged behind (notification is gone)")]
362    NotificationIsGone,
363}
364
365async fn handle_connection(
366    socket: TcpStream,
367    subscription_control: SubscriptionControl,
368    config: PubSubConfig,
369    mut tripwire: Tripwire,
370) -> Result<(), Error> {
371    let mut server = Server::new(socket.compat());
372    let request = server.receive_request().await?;
373    let accept = server::Response::Accept {
374        key: request.key(),
375        protocol: None,
376    };
377    server.send_response(&accept).await?;
378    let mut builder = server.into_builder();
379    builder.set_max_message_size(4_096);
380    builder.set_max_frame_size(4_096);
381    let (mut sender, mut receiver) = builder.finish();
382
383    let mut broadcast_receiver = subscription_control.broadcast_receiver();
384    let mut data = Vec::new();
385    let current_subscriptions = Arc::new(DashMap::new());
386
387    let mut json_rpc_handler = IoHandler::new();
388    let rpc_impl = RpcSolPubSubImpl::new(
389        config,
390        subscription_control,
391        Arc::clone(&current_subscriptions),
392    );
393    json_rpc_handler.extend_with(rpc_impl.to_delegate());
394    let broadcast_handler = BroadcastHandler::new(current_subscriptions);
395    loop {
396        // Extra block for dropping `receive_future`.
397        {
398            // soketto is not cancel safe, so we have to introduce an inner loop to poll
399            // `receive_data` to completion.
400            let receive_future = receiver.receive_data(&mut data);
401            pin!(receive_future);
402            loop {
403                select! {
404                    biased; // See [prioritization] note below.
405
406                    // [prioritization]
407                    // This block must come FIRST in the `select!` macro. This prioritizes
408                    // processing received messages over sending messages. This ensures the timely
409                    // processing of new subscriptions and time-sensitive opcodes like `PING`.
410                    result = &mut receive_future => match result {
411                        Ok(_) => break,
412                        Err(soketto::connection::Error::Closed) => return Ok(()),
413                        Err(err) => return Err(err.into()),
414                    },
415                    result = broadcast_receiver.recv() => {
416
417                        // In both possible error cases (closed or lagged) we disconnect the client.
418                        if let Some(json) = broadcast_handler.handle(result?)? {
419                            sender.send_text(&*json).await?;
420                        }
421                    },
422                    _ = &mut tripwire => {
423                        warn!("disconnecting websocket client: shutting down");
424                        return Ok(())
425                    },
426
427                }
428            }
429        }
430        let Ok(data_str) = str::from_utf8(&data) else {
431            // Old implementation just closes the connection, so we preserve that behavior
432            // for now. It would be more correct to respond with an error.
433            break;
434        };
435
436        if let Some(response) = json_rpc_handler.handle_request(data_str).await {
437            sender.send_text(&response).await?;
438        }
439        data.clear();
440    }
441
442    Ok(())
443}
444
445async fn listen(
446    listen_address: SocketAddr,
447    config: PubSubConfig,
448    subscription_control: SubscriptionControl,
449    mut tripwire: Tripwire,
450) -> io::Result<()> {
451    let listener = tokio::net::TcpListener::bind(&listen_address).await?;
452    let counter = TokenCounter::new("rpc_pubsub_connections");
453    loop {
454        select! {
455            result = listener.accept() => match result {
456                Ok((socket, addr)) => {
457                    debug!("new client ({addr:?})");
458                    let subscription_control = subscription_control.clone();
459                    let config = config.clone();
460                    let tripwire = tripwire.clone();
461                    let counter_token = counter.create_token();
462                    tokio::spawn(async move {
463                        let handle = handle_connection(
464                            socket, subscription_control, config, tripwire
465                        );
466                        match handle.await {
467                            Ok(()) => debug!("connection closed ({addr:?})"),
468                            Err(err) => warn!("connection handler error ({addr:?}): {err}"),
469                        }
470                        drop(counter_token); // Force moving token into the task.
471                    });
472                }
473                Err(e) => error!("couldn't accept connection: {e:?}"),
474            },
475            _ = &mut tripwire => return Ok(()),
476        }
477    }
478}
479
480#[cfg(test)]
481mod tests {
482    use {
483        super::*,
484        crate::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
485        solana_runtime::{
486            bank::Bank,
487            bank_forks::BankForks,
488            commitment::BlockCommitmentCache,
489            genesis_utils::{create_genesis_config, GenesisConfigInfo},
490        },
491        std::{
492            net::{IpAddr, Ipv4Addr},
493            sync::{
494                atomic::{AtomicBool, AtomicU64},
495                RwLock,
496            },
497        },
498    };
499
500    #[test]
501    fn test_pubsub_new() {
502        let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
503        let exit = Arc::new(AtomicBool::new(false));
504        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
505        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
506        let bank = Bank::new_for_tests(&genesis_config);
507        let bank_forks = BankForks::new_rw_arc(bank);
508        let optimistically_confirmed_bank =
509            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
510        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
511            exit,
512            max_complete_transaction_status_slot,
513            bank_forks,
514            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
515            optimistically_confirmed_bank,
516        ));
517        let (_trigger, pubsub_service) =
518            PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
519        let thread = pubsub_service.thread_hdl.thread();
520        assert_eq!(thread.name().unwrap(), "solRpcPubSub");
521    }
522}