solana_streamer/
quic.rs

1use {
2    crate::{
3        nonblocking::{
4            qos::{ConnectionContext, QosController},
5            quic::{ALPN_TPU_PROTOCOL_ID, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
6            simple_qos::{SimpleQos, SimpleQosConfig},
7            swqos::{SwQos, SwQosConfig},
8        },
9        streamer::StakedNodes,
10    },
11    crossbeam_channel::Sender,
12    pem::Pem,
13    quinn::{
14        crypto::rustls::{NoInitialCipherSuite, QuicServerConfig},
15        Endpoint, IdleTimeout, ServerConfig, VarInt,
16    },
17    rustls::KeyLogFile,
18    solana_keypair::Keypair,
19    solana_packet::PACKET_DATA_SIZE,
20    solana_perf::packet::PacketBatch,
21    solana_quic_definitions::{NotifyKeyUpdate, QUIC_MAX_TIMEOUT},
22    solana_tls_utils::{new_dummy_x509_certificate, tls_server_config_builder},
23    std::{
24        net::UdpSocket,
25        num::NonZeroUsize,
26        sync::{
27            atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
28            Arc, Mutex, RwLock,
29        },
30        thread::{self},
31        time::Duration,
32    },
33    tokio::runtime::Runtime,
34    tokio_util::sync::CancellationToken,
35};
36
37// allow multiple connections for NAT and any open/close overlap
38pub const DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8;
39
40pub const DEFAULT_MAX_STAKED_CONNECTIONS: usize = 2000;
41
42pub const DEFAULT_MAX_UNSTAKED_CONNECTIONS: usize = 2000;
43
44/// Limit to 500K PPS
45pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 500;
46
47/// The new connections per minute from a particular IP address.
48/// Heuristically set to the default maximum concurrent connections
49/// per IP address. Might be adjusted later.
50pub const DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE: u64 = 8;
51
52// This will be adjusted and parameterized in follow-on PRs.
53pub const DEFAULT_QUIC_ENDPOINTS: usize = 1;
54
55/// Allow for 8 MB QUIC connection receive window (MAX_DATA). This is sufficient to
56/// support 200 Mbps upload rate at 320 ms RTT. It is unreasonable to expect a single
57/// connection to require more bandwidth. This prevents MAX_DATA from affecting
58/// the bitrate achieved by a single connection. Actual throttling is achieved based
59/// on the number of concurrent streams. This does not affect the memory allocation
60/// in Quinn, that is driven primarily by MAX_STREAMS, not MAX_DATA.
61const CONNECTION_RECEIVE_WINDOW_BYTES: VarInt = VarInt::from_u32(8 * 1024 * 1024);
62
63pub fn default_num_tpu_transaction_forward_receive_threads() -> usize {
64    num_cpus::get().min(16)
65}
66
67pub fn default_num_tpu_transaction_receive_threads() -> usize {
68    num_cpus::get().min(8)
69}
70
71pub fn default_num_tpu_vote_transaction_receive_threads() -> usize {
72    num_cpus::get().min(8)
73}
74
75pub struct SpawnServerResult {
76    pub endpoints: Vec<Endpoint>,
77    pub thread: thread::JoinHandle<()>,
78    pub key_updater: Arc<EndpointKeyUpdater>,
79}
80
81/// Controls the the channel size for the PacketBatch coalesce
82pub(crate) const DEFAULT_ACCUMULATOR_CHANNEL_SIZE: usize = 250_000;
83
84/// Returns default server configuration along with its PEM certificate chain.
85#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527
86pub(crate) fn configure_server(
87    identity_keypair: &Keypair,
88) -> Result<(ServerConfig, String), QuicServerError> {
89    let (cert, priv_key) = new_dummy_x509_certificate(identity_keypair);
90    let cert_chain_pem_parts = vec![Pem {
91        tag: "CERTIFICATE".to_string(),
92        contents: cert.as_ref().to_vec(),
93    }];
94    let cert_chain_pem = pem::encode_many(&cert_chain_pem_parts);
95
96    let mut server_tls_config =
97        tls_server_config_builder().with_single_cert(vec![cert], priv_key)?;
98    server_tls_config.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
99    server_tls_config.key_log = Arc::new(KeyLogFile::new());
100    let quic_server_config = QuicServerConfig::try_from(server_tls_config)?;
101
102    let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config));
103
104    // disable path migration as we do not expect TPU clients to be on a mobile device
105    server_config.migration(false);
106
107    let config = Arc::get_mut(&mut server_config.transport).unwrap();
108
109    // Set STREAM_MAX_DATA to fit at most 1 transaction.
110    // This should match the maximal TX size.
111    config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
112    // set the receive window really small initially to prevent the fresh connections
113    // from slamming us with traffic.
114    config.receive_window((PACKET_DATA_SIZE as u32).into());
115    // disable uni_streams until handshake is complete
116    config.max_concurrent_uni_streams(0u32.into());
117    config.receive_window(CONNECTION_RECEIVE_WINDOW_BYTES);
118    let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
119    config.max_idle_timeout(Some(timeout));
120
121    // disable bidi & datagrams
122    config.max_concurrent_bidi_streams(0u32.into());
123    config.datagram_receive_buffer_size(None);
124
125    // Disable GSO. The server only accepts inbound unidirectional streams initiated by clients,
126    // which means that reply data never exceeds one MTU. By disabling GSO, we make
127    // quinn_proto::Connection::poll_transmit allocate only 1 MTU vs 10 * MTU for _each_ transmit.
128    // See https://github.com/anza-xyz/agave/pull/1647.
129    config.enable_segmentation_offload(false);
130
131    Ok((server_config, cert_chain_pem))
132}
133
134fn rt(name: String, num_threads: NonZeroUsize) -> Runtime {
135    tokio::runtime::Builder::new_multi_thread()
136        .thread_name(name)
137        .worker_threads(num_threads.get())
138        .enable_all()
139        .build()
140        .unwrap()
141}
142
143#[derive(thiserror::Error, Debug)]
144pub enum QuicServerError {
145    #[error("Endpoint creation failed: {0}")]
146    EndpointFailed(std::io::Error),
147    #[error("TLS error: {0}")]
148    TlsError(#[from] rustls::Error),
149    #[error("No initial cipher suite")]
150    NoInitialCipherSuite(#[from] NoInitialCipherSuite),
151}
152
153pub struct EndpointKeyUpdater {
154    endpoints: Vec<Endpoint>,
155}
156
157impl NotifyKeyUpdate for EndpointKeyUpdater {
158    fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
159        let (config, _) = configure_server(key)?;
160        for endpoint in &self.endpoints {
161            endpoint.set_server_config(Some(config.clone()));
162        }
163        Ok(())
164    }
165}
166
167#[derive(Default)]
168pub struct StreamerStats {
169    pub(crate) total_connections: AtomicUsize,
170    pub(crate) total_new_connections: AtomicUsize,
171    pub(crate) active_streams: AtomicUsize,
172    pub(crate) total_new_streams: AtomicUsize,
173    pub(crate) invalid_stream_size: AtomicUsize,
174    pub(crate) total_packets_allocated: AtomicUsize,
175    pub(crate) total_packet_batches_allocated: AtomicUsize,
176    pub(crate) total_staked_chunks_received: AtomicUsize,
177    pub(crate) total_unstaked_chunks_received: AtomicUsize,
178    pub(crate) total_packet_batch_send_err: AtomicUsize,
179    pub(crate) total_handle_chunk_to_packet_batcher_send_err: AtomicUsize,
180    pub(crate) total_handle_chunk_to_packet_batcher_send_full_err: AtomicUsize,
181    pub(crate) total_handle_chunk_to_packet_batcher_send_disconnected_err: AtomicUsize,
182    pub(crate) total_packet_batches_sent: AtomicUsize,
183    pub(crate) total_packet_batches_none: AtomicUsize,
184    pub(crate) total_packets_sent_for_batching: AtomicUsize,
185    pub(crate) total_bytes_sent_for_batching: AtomicUsize,
186    pub(crate) total_chunks_sent_for_batching: AtomicUsize,
187    pub(crate) total_packets_sent_to_consumer: AtomicUsize,
188    pub(crate) total_bytes_sent_to_consumer: AtomicUsize,
189    pub(crate) total_chunks_processed_by_batcher: AtomicUsize,
190    pub(crate) total_stream_read_errors: AtomicUsize,
191    pub(crate) total_stream_read_timeouts: AtomicUsize,
192    pub(crate) num_evictions_staked: AtomicUsize,
193    pub(crate) num_evictions_unstaked: AtomicUsize,
194    pub(crate) connection_added_from_staked_peer: AtomicUsize,
195    pub(crate) connection_added_from_unstaked_peer: AtomicUsize,
196    pub(crate) connection_add_failed: AtomicUsize,
197    pub(crate) connection_add_failed_invalid_stream_count: AtomicUsize,
198    pub(crate) connection_add_failed_staked_node: AtomicUsize,
199    pub(crate) connection_add_failed_unstaked_node: AtomicUsize,
200    pub(crate) connection_add_failed_on_pruning: AtomicUsize,
201    pub(crate) connection_setup_timeout: AtomicUsize,
202    pub(crate) connection_setup_error: AtomicUsize,
203    pub(crate) connection_setup_error_closed: AtomicUsize,
204    pub(crate) connection_setup_error_timed_out: AtomicUsize,
205    pub(crate) connection_setup_error_transport: AtomicUsize,
206    pub(crate) connection_setup_error_app_closed: AtomicUsize,
207    pub(crate) connection_setup_error_reset: AtomicUsize,
208    pub(crate) connection_setup_error_locally_closed: AtomicUsize,
209    pub(crate) connection_removed: AtomicUsize,
210    pub(crate) connection_remove_failed: AtomicUsize,
211    // Number of connections to the endpoint exceeding the allowed limit
212    // regardless of the source IP address.
213    pub(crate) connection_rate_limited_across_all: AtomicUsize,
214    // Per IP rate-limiting is triggered each time when there are too many connections
215    // opened from a particular IP address.
216    pub(crate) connection_rate_limited_per_ipaddr: AtomicUsize,
217    pub(crate) throttled_streams: AtomicUsize,
218    pub(crate) stream_load_ema: AtomicUsize,
219    pub(crate) stream_load_ema_overflow: AtomicUsize,
220    pub(crate) stream_load_capacity_overflow: AtomicUsize,
221    pub(crate) process_sampled_packets_us_hist: Mutex<histogram::Histogram>,
222    pub(crate) perf_track_overhead_us: AtomicU64,
223    pub(crate) total_staked_packets_sent_for_batching: AtomicUsize,
224    pub(crate) total_unstaked_packets_sent_for_batching: AtomicUsize,
225    pub(crate) throttled_staked_streams: AtomicUsize,
226    pub(crate) throttled_unstaked_streams: AtomicUsize,
227    pub(crate) connection_rate_limiter_length: AtomicUsize,
228    // All connections in various states such as Incoming, Connecting, Connection
229    pub(crate) open_connections: AtomicUsize,
230    pub(crate) open_staked_connections: AtomicUsize,
231    pub(crate) open_unstaked_connections: AtomicUsize,
232    pub(crate) refused_connections_too_many_open_connections: AtomicUsize,
233    pub(crate) outstanding_incoming_connection_attempts: AtomicUsize,
234    pub(crate) total_incoming_connection_attempts: AtomicUsize,
235    pub(crate) quic_endpoints_count: AtomicUsize,
236}
237
238impl StreamerStats {
239    pub fn report(&self, name: &'static str) {
240        let process_sampled_packets_us_hist = {
241            let mut metrics = self.process_sampled_packets_us_hist.lock().unwrap();
242            let process_sampled_packets_us_hist = metrics.clone();
243            metrics.clear();
244            process_sampled_packets_us_hist
245        };
246
247        datapoint_info!(
248            name,
249            (
250                "active_connections",
251                self.total_connections.load(Ordering::Relaxed),
252                i64
253            ),
254            (
255                "active_streams",
256                self.active_streams.load(Ordering::Relaxed),
257                i64
258            ),
259            (
260                "new_connections",
261                self.total_new_connections.swap(0, Ordering::Relaxed),
262                i64
263            ),
264            (
265                "new_streams",
266                self.total_new_streams.swap(0, Ordering::Relaxed),
267                i64
268            ),
269            (
270                "evictions_staked",
271                self.num_evictions_staked.swap(0, Ordering::Relaxed),
272                i64
273            ),
274            (
275                "evictions_unstaked",
276                self.num_evictions_unstaked.swap(0, Ordering::Relaxed),
277                i64
278            ),
279            (
280                "connection_added_from_staked_peer",
281                self.connection_added_from_staked_peer
282                    .swap(0, Ordering::Relaxed),
283                i64
284            ),
285            (
286                "connection_added_from_unstaked_peer",
287                self.connection_added_from_unstaked_peer
288                    .swap(0, Ordering::Relaxed),
289                i64
290            ),
291            (
292                "connection_add_failed",
293                self.connection_add_failed.swap(0, Ordering::Relaxed),
294                i64
295            ),
296            (
297                "connection_add_failed_invalid_stream_count",
298                self.connection_add_failed_invalid_stream_count
299                    .swap(0, Ordering::Relaxed),
300                i64
301            ),
302            (
303                "connection_add_failed_staked_node",
304                self.connection_add_failed_staked_node
305                    .swap(0, Ordering::Relaxed),
306                i64
307            ),
308            (
309                "connection_add_failed_unstaked_node",
310                self.connection_add_failed_unstaked_node
311                    .swap(0, Ordering::Relaxed),
312                i64
313            ),
314            (
315                "connection_add_failed_on_pruning",
316                self.connection_add_failed_on_pruning
317                    .swap(0, Ordering::Relaxed),
318                i64
319            ),
320            (
321                "connection_removed",
322                self.connection_removed.swap(0, Ordering::Relaxed),
323                i64
324            ),
325            (
326                "connection_remove_failed",
327                self.connection_remove_failed.swap(0, Ordering::Relaxed),
328                i64
329            ),
330            (
331                "connection_setup_timeout",
332                self.connection_setup_timeout.swap(0, Ordering::Relaxed),
333                i64
334            ),
335            (
336                "connection_setup_error",
337                self.connection_setup_error.swap(0, Ordering::Relaxed),
338                i64
339            ),
340            (
341                "connection_setup_error_timed_out",
342                self.connection_setup_error_timed_out
343                    .swap(0, Ordering::Relaxed),
344                i64
345            ),
346            (
347                "connection_setup_error_closed",
348                self.connection_setup_error_closed
349                    .swap(0, Ordering::Relaxed),
350                i64
351            ),
352            (
353                "connection_setup_error_transport",
354                self.connection_setup_error_transport
355                    .swap(0, Ordering::Relaxed),
356                i64
357            ),
358            (
359                "connection_setup_error_app_closed",
360                self.connection_setup_error_app_closed
361                    .swap(0, Ordering::Relaxed),
362                i64
363            ),
364            (
365                "connection_setup_error_reset",
366                self.connection_setup_error_reset.swap(0, Ordering::Relaxed),
367                i64
368            ),
369            (
370                "connection_setup_error_locally_closed",
371                self.connection_setup_error_locally_closed
372                    .swap(0, Ordering::Relaxed),
373                i64
374            ),
375            (
376                "connection_rate_limited_across_all",
377                self.connection_rate_limited_across_all
378                    .swap(0, Ordering::Relaxed),
379                i64
380            ),
381            (
382                "connection_rate_limited_per_ipaddr",
383                self.connection_rate_limited_per_ipaddr
384                    .swap(0, Ordering::Relaxed),
385                i64
386            ),
387            (
388                "invalid_stream_size",
389                self.invalid_stream_size.swap(0, Ordering::Relaxed),
390                i64
391            ),
392            (
393                "packets_allocated",
394                self.total_packets_allocated.swap(0, Ordering::Relaxed),
395                i64
396            ),
397            (
398                "packet_batches_allocated",
399                self.total_packet_batches_allocated
400                    .swap(0, Ordering::Relaxed),
401                i64
402            ),
403            (
404                "packets_sent_for_batching",
405                self.total_packets_sent_for_batching
406                    .swap(0, Ordering::Relaxed),
407                i64
408            ),
409            (
410                "staked_packets_sent_for_batching",
411                self.total_staked_packets_sent_for_batching
412                    .swap(0, Ordering::Relaxed),
413                i64
414            ),
415            (
416                "unstaked_packets_sent_for_batching",
417                self.total_unstaked_packets_sent_for_batching
418                    .swap(0, Ordering::Relaxed),
419                i64
420            ),
421            (
422                "bytes_sent_for_batching",
423                self.total_bytes_sent_for_batching
424                    .swap(0, Ordering::Relaxed),
425                i64
426            ),
427            (
428                "chunks_sent_for_batching",
429                self.total_chunks_sent_for_batching
430                    .swap(0, Ordering::Relaxed),
431                i64
432            ),
433            (
434                "packets_sent_to_consumer",
435                self.total_packets_sent_to_consumer
436                    .swap(0, Ordering::Relaxed),
437                i64
438            ),
439            (
440                "bytes_sent_to_consumer",
441                self.total_bytes_sent_to_consumer.swap(0, Ordering::Relaxed),
442                i64
443            ),
444            (
445                "chunks_processed_by_batcher",
446                self.total_chunks_processed_by_batcher
447                    .swap(0, Ordering::Relaxed),
448                i64
449            ),
450            (
451                "staked_chunks_received",
452                self.total_staked_chunks_received.swap(0, Ordering::Relaxed),
453                i64
454            ),
455            (
456                "unstaked_chunks_received",
457                self.total_unstaked_chunks_received
458                    .swap(0, Ordering::Relaxed),
459                i64
460            ),
461            (
462                "packet_batch_send_error",
463                self.total_packet_batch_send_err.swap(0, Ordering::Relaxed),
464                i64
465            ),
466            (
467                "handle_chunk_to_packet_batcher_send_error",
468                self.total_handle_chunk_to_packet_batcher_send_err
469                    .swap(0, Ordering::Relaxed),
470                i64
471            ),
472            (
473                "handle_chunk_to_packet_batcher_send_full_err",
474                self.total_handle_chunk_to_packet_batcher_send_full_err
475                    .swap(0, Ordering::Relaxed),
476                i64
477            ),
478            (
479                "handle_chunk_to_packet_batcher_send_disconnected_err",
480                self.total_handle_chunk_to_packet_batcher_send_disconnected_err
481                    .swap(0, Ordering::Relaxed),
482                i64
483            ),
484            (
485                "packet_batches_sent",
486                self.total_packet_batches_sent.swap(0, Ordering::Relaxed),
487                i64
488            ),
489            (
490                "packet_batch_empty",
491                self.total_packet_batches_none.swap(0, Ordering::Relaxed),
492                i64
493            ),
494            (
495                "stream_read_errors",
496                self.total_stream_read_errors.swap(0, Ordering::Relaxed),
497                i64
498            ),
499            (
500                "stream_read_timeouts",
501                self.total_stream_read_timeouts.swap(0, Ordering::Relaxed),
502                i64
503            ),
504            (
505                "throttled_streams",
506                self.throttled_streams.swap(0, Ordering::Relaxed),
507                i64
508            ),
509            (
510                "stream_load_ema",
511                self.stream_load_ema.load(Ordering::Relaxed),
512                i64
513            ),
514            (
515                "stream_load_ema_overflow",
516                self.stream_load_ema_overflow.load(Ordering::Relaxed),
517                i64
518            ),
519            (
520                "stream_load_capacity_overflow",
521                self.stream_load_capacity_overflow.load(Ordering::Relaxed),
522                i64
523            ),
524            (
525                "throttled_unstaked_streams",
526                self.throttled_unstaked_streams.swap(0, Ordering::Relaxed),
527                i64
528            ),
529            (
530                "throttled_staked_streams",
531                self.throttled_staked_streams.swap(0, Ordering::Relaxed),
532                i64
533            ),
534            (
535                "process_sampled_packets_us_90pct",
536                process_sampled_packets_us_hist
537                    .percentile(90.0)
538                    .unwrap_or(0),
539                i64
540            ),
541            (
542                "process_sampled_packets_us_min",
543                process_sampled_packets_us_hist.minimum().unwrap_or(0),
544                i64
545            ),
546            (
547                "process_sampled_packets_us_max",
548                process_sampled_packets_us_hist.maximum().unwrap_or(0),
549                i64
550            ),
551            (
552                "process_sampled_packets_us_mean",
553                process_sampled_packets_us_hist.mean().unwrap_or(0),
554                i64
555            ),
556            (
557                "process_sampled_packets_count",
558                process_sampled_packets_us_hist.entries(),
559                i64
560            ),
561            (
562                "perf_track_overhead_us",
563                self.perf_track_overhead_us.swap(0, Ordering::Relaxed),
564                i64
565            ),
566            (
567                "connection_rate_limiter_length",
568                self.connection_rate_limiter_length.load(Ordering::Relaxed),
569                i64
570            ),
571            (
572                "outstanding_incoming_connection_attempts",
573                self.outstanding_incoming_connection_attempts
574                    .load(Ordering::Relaxed),
575                i64
576            ),
577            (
578                "total_incoming_connection_attempts",
579                self.total_incoming_connection_attempts
580                    .load(Ordering::Relaxed),
581                i64
582            ),
583            (
584                "quic_endpoints_count",
585                self.quic_endpoints_count.load(Ordering::Relaxed),
586                i64
587            ),
588            (
589                "open_connections",
590                self.open_connections.load(Ordering::Relaxed),
591                i64
592            ),
593            (
594                "open_staked_connections",
595                self.open_staked_connections.load(Ordering::Relaxed),
596                i64
597            ),
598            (
599                "open_unstaked_connections",
600                self.open_unstaked_connections.load(Ordering::Relaxed),
601                i64
602            ),
603            (
604                "refused_connections_too_many_open_connections",
605                self.refused_connections_too_many_open_connections
606                    .swap(0, Ordering::Relaxed),
607                i64
608            ),
609        );
610    }
611}
612
613#[deprecated(since = "3.0.0", note = "Use spawn_server_with_cancel instead")]
614#[allow(deprecated)]
615pub fn spawn_server_multi(
616    thread_name: &'static str,
617    metrics_name: &'static str,
618    sockets: Vec<UdpSocket>,
619    keypair: &Keypair,
620    packet_sender: Sender<PacketBatch>,
621    exit: Arc<AtomicBool>,
622    staked_nodes: Arc<RwLock<StakedNodes>>,
623    quic_server_params: QuicServerParams,
624) -> Result<SpawnServerResult, QuicServerError> {
625    #[allow(deprecated)]
626    spawn_server(
627        thread_name,
628        metrics_name,
629        sockets,
630        keypair,
631        packet_sender,
632        exit,
633        staked_nodes,
634        quic_server_params,
635    )
636}
637
638#[derive(Clone)]
639#[deprecated(since = "3.1.0", note = "Use QuicStreamerConfig instead")]
640pub struct QuicServerParams {
641    pub max_connections_per_peer: usize,
642    pub max_staked_connections: usize,
643    pub max_unstaked_connections: usize,
644    pub max_connections_per_ipaddr_per_min: u64,
645    pub wait_for_chunk_timeout: Duration,
646    pub accumulator_channel_size: usize,
647    pub num_threads: NonZeroUsize,
648    pub max_streams_per_ms: u64,
649}
650
651#[derive(Clone)]
652pub struct QuicStreamerConfig {
653    pub max_connections_per_peer: usize,
654    pub max_staked_connections: usize,
655    pub max_unstaked_connections: usize,
656    pub max_connections_per_ipaddr_per_min: u64,
657    pub wait_for_chunk_timeout: Duration,
658    pub accumulator_channel_size: usize,
659    pub num_threads: NonZeroUsize,
660}
661
662#[derive(Clone)]
663pub struct SwQosQuicStreamerConfig {
664    pub quic_streamer_config: QuicStreamerConfig,
665    pub qos_config: SwQosConfig,
666}
667
668#[derive(Clone)]
669pub struct SimpleQosQuicStreamerConfig {
670    pub quic_streamer_config: QuicStreamerConfig,
671    pub qos_config: SimpleQosConfig,
672}
673
674#[allow(deprecated)]
675impl Default for QuicServerParams {
676    fn default() -> Self {
677        QuicServerParams {
678            max_connections_per_peer: 1,
679            max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS,
680            max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS,
681            max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
682            wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
683            accumulator_channel_size: DEFAULT_ACCUMULATOR_CHANNEL_SIZE,
684            num_threads: NonZeroUsize::new(num_cpus::get().min(1)).expect("1 is non-zero"),
685            max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
686        }
687    }
688}
689
690#[allow(deprecated)]
691impl QuicServerParams {
692    #[cfg(feature = "dev-context-only-utils")]
693    pub const DEFAULT_NUM_SERVER_THREADS_FOR_TEST: NonZeroUsize = NonZeroUsize::new(8).unwrap();
694
695    #[cfg(feature = "dev-context-only-utils")]
696    pub fn default_for_tests() -> Self {
697        // Shrink the channel size to avoid a massive allocation for tests
698        Self {
699            num_threads: Self::DEFAULT_NUM_SERVER_THREADS_FOR_TEST,
700            ..Self::default()
701        }
702    }
703}
704
705impl Default for QuicStreamerConfig {
706    fn default() -> Self {
707        Self {
708            max_connections_per_peer: 1,
709            max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS,
710            max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS,
711            max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
712            wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
713            accumulator_channel_size: DEFAULT_ACCUMULATOR_CHANNEL_SIZE,
714            num_threads: NonZeroUsize::new(num_cpus::get().min(1)).expect("1 is non-zero"),
715        }
716    }
717}
718
719impl QuicStreamerConfig {
720    #[cfg(feature = "dev-context-only-utils")]
721    pub const DEFAULT_NUM_SERVER_THREADS_FOR_TEST: NonZeroUsize = NonZeroUsize::new(8).unwrap();
722
723    #[cfg(feature = "dev-context-only-utils")]
724    pub fn default_for_tests() -> Self {
725        // Shrink the channel size to avoid a massive allocation for tests
726        Self {
727            accumulator_channel_size: 100_000,
728            num_threads: Self::DEFAULT_NUM_SERVER_THREADS_FOR_TEST,
729            ..Self::default()
730        }
731    }
732
733    pub(crate) fn max_concurrent_connections(&self) -> usize {
734        let conns = self.max_staked_connections + self.max_unstaked_connections;
735        conns + conns / 4
736    }
737}
738
739#[allow(deprecated)]
740impl From<&QuicServerParams> for QuicStreamerConfig {
741    fn from(params: &QuicServerParams) -> Self {
742        Self {
743            max_connections_per_peer: params.max_connections_per_peer,
744            max_staked_connections: params.max_staked_connections,
745            max_unstaked_connections: params.max_unstaked_connections,
746            max_connections_per_ipaddr_per_min: params.max_connections_per_ipaddr_per_min,
747            wait_for_chunk_timeout: params.wait_for_chunk_timeout,
748            accumulator_channel_size: params.accumulator_channel_size,
749            num_threads: params.num_threads,
750        }
751    }
752}
753
754#[deprecated(since = "3.1.0", note = "Use spawn_server_with_cancel instead")]
755#[allow(deprecated)]
756pub fn spawn_server(
757    thread_name: &'static str,
758    metrics_name: &'static str,
759    sockets: impl IntoIterator<Item = UdpSocket>,
760    keypair: &Keypair,
761    packet_sender: Sender<PacketBatch>,
762    exit: Arc<AtomicBool>,
763    staked_nodes: Arc<RwLock<StakedNodes>>,
764    quic_server_params: QuicServerParams,
765) -> Result<SpawnServerResult, QuicServerError> {
766    let cancel = CancellationToken::new();
767    thread::spawn({
768        let cancel = cancel.clone();
769        move || loop {
770            if exit.load(Ordering::Relaxed) {
771                cancel.cancel();
772                break;
773            }
774            thread::sleep(Duration::from_millis(100));
775        }
776    });
777    let quic_server_config: QuicStreamerConfig = (&quic_server_params).into();
778    let qos_config = SwQosConfig {
779        max_streams_per_ms: quic_server_params.max_streams_per_ms,
780    };
781    spawn_server_with_cancel(
782        thread_name,
783        metrics_name,
784        sockets,
785        keypair,
786        packet_sender,
787        staked_nodes,
788        quic_server_config,
789        qos_config,
790        cancel,
791    )
792}
793
794/// Generic function to spawn a QUIC server with any QoS implementation
795fn spawn_server_with_cancel_generic<Q, C>(
796    thread_name: &'static str,
797    metrics_name: &'static str,
798    stats: Arc<StreamerStats>,
799    sockets: impl IntoIterator<Item = UdpSocket>,
800    keypair: &Keypair,
801    packet_sender: Sender<PacketBatch>,
802    quic_server_params: QuicStreamerConfig,
803    qos: Arc<Q>,
804    cancel: CancellationToken,
805) -> Result<SpawnServerResult, QuicServerError>
806where
807    Q: QosController<C> + Send + Sync + 'static,
808    C: ConnectionContext + Send + Sync + 'static,
809{
810    let runtime = rt(format!("{thread_name}Rt"), quic_server_params.num_threads);
811    let result = {
812        let _guard = runtime.enter();
813        crate::nonblocking::quic::spawn_server_with_cancel_and_qos(
814            metrics_name,
815            stats,
816            sockets,
817            keypair,
818            packet_sender,
819            quic_server_params,
820            qos,
821            cancel,
822        )
823    }?;
824    let handle = thread::Builder::new()
825        .name(thread_name.into())
826        .spawn(move || {
827            if let Err(e) = runtime.block_on(result.thread) {
828                warn!("error from runtime.block_on: {e:?}");
829            }
830        })
831        .unwrap();
832    let updater = EndpointKeyUpdater {
833        endpoints: result.endpoints.clone(),
834    };
835    Ok(SpawnServerResult {
836        endpoints: result.endpoints,
837        thread: handle,
838        key_updater: Arc::new(updater),
839    })
840}
841
842/// Spawns a tokio runtime and a streamer instance inside it.
843pub fn spawn_server_with_cancel(
844    thread_name: &'static str,
845    metrics_name: &'static str,
846    sockets: impl IntoIterator<Item = UdpSocket>,
847    keypair: &Keypair,
848    packet_sender: Sender<PacketBatch>,
849    staked_nodes: Arc<RwLock<StakedNodes>>,
850    quic_server_params: QuicStreamerConfig,
851    qos_config: SwQosConfig,
852    cancel: CancellationToken,
853) -> Result<SpawnServerResult, QuicServerError> {
854    let stats = Arc::<StreamerStats>::default();
855    let swqos = Arc::new(SwQos::new(
856        qos_config,
857        quic_server_params.max_staked_connections,
858        quic_server_params.max_unstaked_connections,
859        quic_server_params.max_connections_per_peer,
860        stats.clone(),
861        staked_nodes,
862        cancel.clone(),
863    ));
864    spawn_server_with_cancel_generic(
865        thread_name,
866        metrics_name,
867        stats,
868        sockets,
869        keypair,
870        packet_sender,
871        quic_server_params,
872        swqos,
873        cancel,
874    )
875}
876
877/// Spawns a tokio runtime and a streamer instance inside it.
878pub fn spawn_simple_qos_server_with_cancel(
879    thread_name: &'static str,
880    metrics_name: &'static str,
881    sockets: impl IntoIterator<Item = UdpSocket>,
882    keypair: &Keypair,
883    packet_sender: Sender<PacketBatch>,
884    staked_nodes: Arc<RwLock<StakedNodes>>,
885    quic_server_params: QuicStreamerConfig,
886    qos_config: SimpleQosConfig,
887    cancel: CancellationToken,
888) -> Result<SpawnServerResult, QuicServerError> {
889    let stats = Arc::<StreamerStats>::default();
890
891    let simple_qos = Arc::new(SimpleQos::new(
892        qos_config,
893        quic_server_params.max_connections_per_peer,
894        quic_server_params.max_staked_connections,
895        stats.clone(),
896        staked_nodes,
897        cancel.clone(),
898    ));
899
900    spawn_server_with_cancel_generic(
901        thread_name,
902        metrics_name,
903        stats,
904        sockets,
905        keypair,
906        packet_sender,
907        quic_server_params,
908        simple_qos,
909        cancel,
910    )
911}
912
913#[cfg(test)]
914mod test {
915    use {
916        super::*,
917        crate::nonblocking::{quic::test::*, testing_utilities::check_multiple_streams},
918        crossbeam_channel::unbounded,
919        solana_net_utils::sockets::bind_to_localhost_unique,
920        solana_pubkey::Pubkey,
921        solana_signer::Signer,
922        std::{collections::HashMap, net::SocketAddr},
923    };
924
925    fn rt_for_test() -> Runtime {
926        rt(
927            "solQuicTestRt".to_string(),
928            QuicStreamerConfig::DEFAULT_NUM_SERVER_THREADS_FOR_TEST,
929        )
930    }
931
932    fn setup_quic_server_with_params(
933        server_params: QuicStreamerConfig,
934        staked_nodes: Arc<RwLock<StakedNodes>>,
935    ) -> (
936        std::thread::JoinHandle<()>,
937        crossbeam_channel::Receiver<PacketBatch>,
938        SocketAddr,
939        CancellationToken,
940    ) {
941        let s = bind_to_localhost_unique().expect("should bind");
942        let (sender, receiver) = unbounded();
943        let keypair = Keypair::new();
944        let server_address = s.local_addr().unwrap();
945        let cancel = CancellationToken::new();
946        let SpawnServerResult {
947            endpoints: _,
948            thread: t,
949            key_updater: _,
950        } = spawn_server_with_cancel(
951            "solQuicTest",
952            "quic_streamer_test",
953            [s],
954            &keypair,
955            sender,
956            staked_nodes,
957            server_params,
958            SwQosConfig::default(),
959            cancel.clone(),
960        )
961        .unwrap();
962        (t, receiver, server_address, cancel)
963    }
964
965    fn setup_simple_qos_quic_server_with_params(
966        server_params: SimpleQosQuicStreamerConfig,
967        staked_nodes: Arc<RwLock<StakedNodes>>,
968    ) -> (
969        std::thread::JoinHandle<()>,
970        crossbeam_channel::Receiver<PacketBatch>,
971        SocketAddr,
972        CancellationToken,
973    ) {
974        let s = bind_to_localhost_unique().expect("should bind");
975        let (sender, receiver) = unbounded();
976        let keypair = Keypair::new();
977        let server_address = s.local_addr().unwrap();
978        let cancel = CancellationToken::new();
979        let SpawnServerResult {
980            endpoints: _,
981            thread: t,
982            key_updater: _,
983        } = spawn_simple_qos_server_with_cancel(
984            "solQuicTest",
985            "quic_streamer_test",
986            [s],
987            &keypair,
988            sender,
989            staked_nodes,
990            server_params.quic_streamer_config,
991            server_params.qos_config,
992            cancel.clone(),
993        )
994        .unwrap();
995        (t, receiver, server_address, cancel)
996    }
997
998    fn setup_quic_server() -> (
999        std::thread::JoinHandle<()>,
1000        crossbeam_channel::Receiver<PacketBatch>,
1001        SocketAddr,
1002        CancellationToken,
1003    ) {
1004        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1005        setup_quic_server_with_params(QuicStreamerConfig::default_for_tests(), staked_nodes)
1006    }
1007
1008    #[test]
1009    fn test_quic_server_exit() {
1010        let (t, _receiver, _server_address, cancel) = setup_quic_server();
1011        cancel.cancel();
1012        t.join().unwrap();
1013    }
1014
1015    #[test]
1016    fn test_quic_timeout() {
1017        agave_logger::setup();
1018        let (t, receiver, server_address, cancel) = setup_quic_server();
1019        let runtime = rt_for_test();
1020        runtime.block_on(check_timeout(receiver, server_address));
1021        cancel.cancel();
1022        t.join().unwrap();
1023    }
1024
1025    #[test]
1026    fn test_quic_server_block_multiple_connections() {
1027        agave_logger::setup();
1028        let (t, _receiver, server_address, cancel) = setup_quic_server();
1029
1030        let runtime = rt_for_test();
1031        runtime.block_on(check_block_multiple_connections(server_address));
1032        cancel.cancel();
1033        t.join().unwrap();
1034    }
1035
1036    #[test]
1037    fn test_quic_server_multiple_streams() {
1038        agave_logger::setup();
1039        let s = bind_to_localhost_unique().expect("should bind");
1040        let (sender, receiver) = unbounded();
1041        let keypair = Keypair::new();
1042        let server_address = s.local_addr().unwrap();
1043        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1044        let cancel = CancellationToken::new();
1045        let SpawnServerResult {
1046            endpoints: _,
1047            thread: t,
1048            key_updater: _,
1049        } = spawn_server_with_cancel(
1050            "solQuicTest",
1051            "quic_streamer_test",
1052            [s],
1053            &keypair,
1054            sender,
1055            staked_nodes,
1056            QuicStreamerConfig {
1057                max_connections_per_peer: 2,
1058                ..QuicStreamerConfig::default_for_tests()
1059            },
1060            SwQosConfig::default(),
1061            cancel.clone(),
1062        )
1063        .unwrap();
1064
1065        let runtime = rt_for_test();
1066        runtime.block_on(check_multiple_streams(receiver, server_address, None));
1067        cancel.cancel();
1068        t.join().unwrap();
1069    }
1070
1071    #[test]
1072    fn test_quic_server_multiple_writes() {
1073        agave_logger::setup();
1074        let (t, receiver, server_address, cancel) = setup_quic_server();
1075
1076        let runtime = rt_for_test();
1077        runtime.block_on(check_multiple_writes(receiver, server_address, None));
1078        cancel.cancel();
1079        t.join().unwrap();
1080    }
1081
1082    #[test]
1083    fn test_quic_server_multiple_packets_with_simple_qos() {
1084        // Send multiple writes from a staked node with SimpleStreamsPerSecond QoS mode
1085        // with a super low staked client stake to ensure it can send all packets
1086        // within the rate limit.
1087        agave_logger::setup();
1088        let client_keypair = Keypair::new();
1089        let rich_node_keypair = Keypair::new();
1090
1091        let stakes = HashMap::from([
1092            (client_keypair.pubkey(), 1_000), // very small staked node
1093            (rich_node_keypair.pubkey(), 1_000_000_000),
1094        ]);
1095        let staked_nodes = StakedNodes::new(
1096            Arc::new(stakes),
1097            HashMap::<Pubkey, u64>::default(), // overrides
1098        );
1099
1100        let server_params = QuicStreamerConfig {
1101            max_unstaked_connections: 0,
1102            ..QuicStreamerConfig::default_for_tests()
1103        };
1104        let qos_config = SimpleQosConfig {
1105            max_streams_per_second: 20, // low limit to ensure staked node can send all packets
1106        };
1107        let server_params = SimpleQosQuicStreamerConfig {
1108            quic_streamer_config: server_params,
1109            qos_config,
1110        };
1111        let (t, receiver, server_address, cancel) = setup_simple_qos_quic_server_with_params(
1112            server_params,
1113            Arc::new(RwLock::new(staked_nodes)),
1114        );
1115
1116        let runtime = rt_for_test();
1117        let num_expected_packets = 20;
1118
1119        runtime.block_on(check_multiple_packets(
1120            receiver,
1121            server_address,
1122            Some(&client_keypair),
1123            num_expected_packets,
1124        ));
1125        cancel.cancel();
1126        t.join().unwrap();
1127    }
1128
1129    #[test]
1130    fn test_quic_server_unstaked_node_connect_failure() {
1131        agave_logger::setup();
1132        let s = bind_to_localhost_unique().expect("should bind");
1133        let (sender, _) = unbounded();
1134        let keypair = Keypair::new();
1135        let server_address = s.local_addr().unwrap();
1136        let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1137        let cancel = CancellationToken::new();
1138        let SpawnServerResult {
1139            endpoints: _,
1140            thread: t,
1141            key_updater: _,
1142        } = spawn_server_with_cancel(
1143            "solQuicTest",
1144            "quic_streamer_test",
1145            [s],
1146            &keypair,
1147            sender,
1148            staked_nodes,
1149            QuicStreamerConfig {
1150                max_unstaked_connections: 0,
1151                ..QuicStreamerConfig::default_for_tests()
1152            },
1153            SwQosConfig::default(),
1154            cancel.clone(),
1155        )
1156        .unwrap();
1157
1158        let runtime = rt_for_test();
1159        runtime.block_on(check_unstaked_node_connect_failure(server_address));
1160        cancel.cancel();
1161        t.join().unwrap();
1162    }
1163}