1use {
2 crate::{
3 nonblocking::{
4 qos::{ConnectionContext, QosController},
5 quic::{ALPN_TPU_PROTOCOL_ID, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
6 simple_qos::{SimpleQos, SimpleQosConfig},
7 swqos::{SwQos, SwQosConfig},
8 },
9 streamer::StakedNodes,
10 },
11 crossbeam_channel::Sender,
12 pem::Pem,
13 quinn::{
14 crypto::rustls::{NoInitialCipherSuite, QuicServerConfig},
15 Endpoint, IdleTimeout, ServerConfig, VarInt,
16 },
17 rustls::KeyLogFile,
18 solana_keypair::Keypair,
19 solana_packet::PACKET_DATA_SIZE,
20 solana_perf::packet::PacketBatch,
21 solana_quic_definitions::{NotifyKeyUpdate, QUIC_MAX_TIMEOUT},
22 solana_tls_utils::{new_dummy_x509_certificate, tls_server_config_builder},
23 std::{
24 net::UdpSocket,
25 num::NonZeroUsize,
26 sync::{
27 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
28 Arc, Mutex, RwLock,
29 },
30 thread::{self},
31 time::Duration,
32 },
33 tokio::runtime::Runtime,
34 tokio_util::sync::CancellationToken,
35};
36
37pub const DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8;
39
40pub const DEFAULT_MAX_STAKED_CONNECTIONS: usize = 2000;
41
42pub const DEFAULT_MAX_UNSTAKED_CONNECTIONS: usize = 2000;
43
44pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 500;
46
47pub const DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE: u64 = 8;
51
52pub const DEFAULT_QUIC_ENDPOINTS: usize = 1;
54
55const CONNECTION_RECEIVE_WINDOW_BYTES: VarInt = VarInt::from_u32(8 * 1024 * 1024);
62
63pub fn default_num_tpu_transaction_forward_receive_threads() -> usize {
64 num_cpus::get().min(16)
65}
66
67pub fn default_num_tpu_transaction_receive_threads() -> usize {
68 num_cpus::get().min(8)
69}
70
71pub fn default_num_tpu_vote_transaction_receive_threads() -> usize {
72 num_cpus::get().min(8)
73}
74
75pub struct SpawnServerResult {
76 pub endpoints: Vec<Endpoint>,
77 pub thread: thread::JoinHandle<()>,
78 pub key_updater: Arc<EndpointKeyUpdater>,
79}
80
81pub(crate) const DEFAULT_ACCUMULATOR_CHANNEL_SIZE: usize = 250_000;
83
84#[allow(clippy::field_reassign_with_default)] pub(crate) fn configure_server(
87 identity_keypair: &Keypair,
88) -> Result<(ServerConfig, String), QuicServerError> {
89 let (cert, priv_key) = new_dummy_x509_certificate(identity_keypair);
90 let cert_chain_pem_parts = vec![Pem {
91 tag: "CERTIFICATE".to_string(),
92 contents: cert.as_ref().to_vec(),
93 }];
94 let cert_chain_pem = pem::encode_many(&cert_chain_pem_parts);
95
96 let mut server_tls_config =
97 tls_server_config_builder().with_single_cert(vec![cert], priv_key)?;
98 server_tls_config.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
99 server_tls_config.key_log = Arc::new(KeyLogFile::new());
100 let quic_server_config = QuicServerConfig::try_from(server_tls_config)?;
101
102 let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config));
103
104 server_config.migration(false);
106
107 let config = Arc::get_mut(&mut server_config.transport).unwrap();
108
109 config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
112 config.receive_window((PACKET_DATA_SIZE as u32).into());
115 config.max_concurrent_uni_streams(0u32.into());
117 config.receive_window(CONNECTION_RECEIVE_WINDOW_BYTES);
118 let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
119 config.max_idle_timeout(Some(timeout));
120
121 config.max_concurrent_bidi_streams(0u32.into());
123 config.datagram_receive_buffer_size(None);
124
125 config.enable_segmentation_offload(false);
130
131 Ok((server_config, cert_chain_pem))
132}
133
134fn rt(name: String, num_threads: NonZeroUsize) -> Runtime {
135 tokio::runtime::Builder::new_multi_thread()
136 .thread_name(name)
137 .worker_threads(num_threads.get())
138 .enable_all()
139 .build()
140 .unwrap()
141}
142
143#[derive(thiserror::Error, Debug)]
144pub enum QuicServerError {
145 #[error("Endpoint creation failed: {0}")]
146 EndpointFailed(std::io::Error),
147 #[error("TLS error: {0}")]
148 TlsError(#[from] rustls::Error),
149 #[error("No initial cipher suite")]
150 NoInitialCipherSuite(#[from] NoInitialCipherSuite),
151}
152
153pub struct EndpointKeyUpdater {
154 endpoints: Vec<Endpoint>,
155}
156
157impl NotifyKeyUpdate for EndpointKeyUpdater {
158 fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
159 let (config, _) = configure_server(key)?;
160 for endpoint in &self.endpoints {
161 endpoint.set_server_config(Some(config.clone()));
162 }
163 Ok(())
164 }
165}
166
167#[derive(Default)]
168pub struct StreamerStats {
169 pub(crate) total_connections: AtomicUsize,
170 pub(crate) total_new_connections: AtomicUsize,
171 pub(crate) active_streams: AtomicUsize,
172 pub(crate) total_new_streams: AtomicUsize,
173 pub(crate) invalid_stream_size: AtomicUsize,
174 pub(crate) total_packets_allocated: AtomicUsize,
175 pub(crate) total_packet_batches_allocated: AtomicUsize,
176 pub(crate) total_staked_chunks_received: AtomicUsize,
177 pub(crate) total_unstaked_chunks_received: AtomicUsize,
178 pub(crate) total_packet_batch_send_err: AtomicUsize,
179 pub(crate) total_handle_chunk_to_packet_batcher_send_err: AtomicUsize,
180 pub(crate) total_handle_chunk_to_packet_batcher_send_full_err: AtomicUsize,
181 pub(crate) total_handle_chunk_to_packet_batcher_send_disconnected_err: AtomicUsize,
182 pub(crate) total_packet_batches_sent: AtomicUsize,
183 pub(crate) total_packet_batches_none: AtomicUsize,
184 pub(crate) total_packets_sent_for_batching: AtomicUsize,
185 pub(crate) total_bytes_sent_for_batching: AtomicUsize,
186 pub(crate) total_chunks_sent_for_batching: AtomicUsize,
187 pub(crate) total_packets_sent_to_consumer: AtomicUsize,
188 pub(crate) total_bytes_sent_to_consumer: AtomicUsize,
189 pub(crate) total_chunks_processed_by_batcher: AtomicUsize,
190 pub(crate) total_stream_read_errors: AtomicUsize,
191 pub(crate) total_stream_read_timeouts: AtomicUsize,
192 pub(crate) num_evictions_staked: AtomicUsize,
193 pub(crate) num_evictions_unstaked: AtomicUsize,
194 pub(crate) connection_added_from_staked_peer: AtomicUsize,
195 pub(crate) connection_added_from_unstaked_peer: AtomicUsize,
196 pub(crate) connection_add_failed: AtomicUsize,
197 pub(crate) connection_add_failed_invalid_stream_count: AtomicUsize,
198 pub(crate) connection_add_failed_staked_node: AtomicUsize,
199 pub(crate) connection_add_failed_unstaked_node: AtomicUsize,
200 pub(crate) connection_add_failed_on_pruning: AtomicUsize,
201 pub(crate) connection_setup_timeout: AtomicUsize,
202 pub(crate) connection_setup_error: AtomicUsize,
203 pub(crate) connection_setup_error_closed: AtomicUsize,
204 pub(crate) connection_setup_error_timed_out: AtomicUsize,
205 pub(crate) connection_setup_error_transport: AtomicUsize,
206 pub(crate) connection_setup_error_app_closed: AtomicUsize,
207 pub(crate) connection_setup_error_reset: AtomicUsize,
208 pub(crate) connection_setup_error_locally_closed: AtomicUsize,
209 pub(crate) connection_removed: AtomicUsize,
210 pub(crate) connection_remove_failed: AtomicUsize,
211 pub(crate) connection_rate_limited_across_all: AtomicUsize,
214 pub(crate) connection_rate_limited_per_ipaddr: AtomicUsize,
217 pub(crate) throttled_streams: AtomicUsize,
218 pub(crate) stream_load_ema: AtomicUsize,
219 pub(crate) stream_load_ema_overflow: AtomicUsize,
220 pub(crate) stream_load_capacity_overflow: AtomicUsize,
221 pub(crate) process_sampled_packets_us_hist: Mutex<histogram::Histogram>,
222 pub(crate) perf_track_overhead_us: AtomicU64,
223 pub(crate) total_staked_packets_sent_for_batching: AtomicUsize,
224 pub(crate) total_unstaked_packets_sent_for_batching: AtomicUsize,
225 pub(crate) throttled_staked_streams: AtomicUsize,
226 pub(crate) throttled_unstaked_streams: AtomicUsize,
227 pub(crate) connection_rate_limiter_length: AtomicUsize,
228 pub(crate) open_connections: AtomicUsize,
230 pub(crate) open_staked_connections: AtomicUsize,
231 pub(crate) open_unstaked_connections: AtomicUsize,
232 pub(crate) refused_connections_too_many_open_connections: AtomicUsize,
233 pub(crate) outstanding_incoming_connection_attempts: AtomicUsize,
234 pub(crate) total_incoming_connection_attempts: AtomicUsize,
235 pub(crate) quic_endpoints_count: AtomicUsize,
236}
237
238impl StreamerStats {
239 pub fn report(&self, name: &'static str) {
240 let process_sampled_packets_us_hist = {
241 let mut metrics = self.process_sampled_packets_us_hist.lock().unwrap();
242 let process_sampled_packets_us_hist = metrics.clone();
243 metrics.clear();
244 process_sampled_packets_us_hist
245 };
246
247 datapoint_info!(
248 name,
249 (
250 "active_connections",
251 self.total_connections.load(Ordering::Relaxed),
252 i64
253 ),
254 (
255 "active_streams",
256 self.active_streams.load(Ordering::Relaxed),
257 i64
258 ),
259 (
260 "new_connections",
261 self.total_new_connections.swap(0, Ordering::Relaxed),
262 i64
263 ),
264 (
265 "new_streams",
266 self.total_new_streams.swap(0, Ordering::Relaxed),
267 i64
268 ),
269 (
270 "evictions_staked",
271 self.num_evictions_staked.swap(0, Ordering::Relaxed),
272 i64
273 ),
274 (
275 "evictions_unstaked",
276 self.num_evictions_unstaked.swap(0, Ordering::Relaxed),
277 i64
278 ),
279 (
280 "connection_added_from_staked_peer",
281 self.connection_added_from_staked_peer
282 .swap(0, Ordering::Relaxed),
283 i64
284 ),
285 (
286 "connection_added_from_unstaked_peer",
287 self.connection_added_from_unstaked_peer
288 .swap(0, Ordering::Relaxed),
289 i64
290 ),
291 (
292 "connection_add_failed",
293 self.connection_add_failed.swap(0, Ordering::Relaxed),
294 i64
295 ),
296 (
297 "connection_add_failed_invalid_stream_count",
298 self.connection_add_failed_invalid_stream_count
299 .swap(0, Ordering::Relaxed),
300 i64
301 ),
302 (
303 "connection_add_failed_staked_node",
304 self.connection_add_failed_staked_node
305 .swap(0, Ordering::Relaxed),
306 i64
307 ),
308 (
309 "connection_add_failed_unstaked_node",
310 self.connection_add_failed_unstaked_node
311 .swap(0, Ordering::Relaxed),
312 i64
313 ),
314 (
315 "connection_add_failed_on_pruning",
316 self.connection_add_failed_on_pruning
317 .swap(0, Ordering::Relaxed),
318 i64
319 ),
320 (
321 "connection_removed",
322 self.connection_removed.swap(0, Ordering::Relaxed),
323 i64
324 ),
325 (
326 "connection_remove_failed",
327 self.connection_remove_failed.swap(0, Ordering::Relaxed),
328 i64
329 ),
330 (
331 "connection_setup_timeout",
332 self.connection_setup_timeout.swap(0, Ordering::Relaxed),
333 i64
334 ),
335 (
336 "connection_setup_error",
337 self.connection_setup_error.swap(0, Ordering::Relaxed),
338 i64
339 ),
340 (
341 "connection_setup_error_timed_out",
342 self.connection_setup_error_timed_out
343 .swap(0, Ordering::Relaxed),
344 i64
345 ),
346 (
347 "connection_setup_error_closed",
348 self.connection_setup_error_closed
349 .swap(0, Ordering::Relaxed),
350 i64
351 ),
352 (
353 "connection_setup_error_transport",
354 self.connection_setup_error_transport
355 .swap(0, Ordering::Relaxed),
356 i64
357 ),
358 (
359 "connection_setup_error_app_closed",
360 self.connection_setup_error_app_closed
361 .swap(0, Ordering::Relaxed),
362 i64
363 ),
364 (
365 "connection_setup_error_reset",
366 self.connection_setup_error_reset.swap(0, Ordering::Relaxed),
367 i64
368 ),
369 (
370 "connection_setup_error_locally_closed",
371 self.connection_setup_error_locally_closed
372 .swap(0, Ordering::Relaxed),
373 i64
374 ),
375 (
376 "connection_rate_limited_across_all",
377 self.connection_rate_limited_across_all
378 .swap(0, Ordering::Relaxed),
379 i64
380 ),
381 (
382 "connection_rate_limited_per_ipaddr",
383 self.connection_rate_limited_per_ipaddr
384 .swap(0, Ordering::Relaxed),
385 i64
386 ),
387 (
388 "invalid_stream_size",
389 self.invalid_stream_size.swap(0, Ordering::Relaxed),
390 i64
391 ),
392 (
393 "packets_allocated",
394 self.total_packets_allocated.swap(0, Ordering::Relaxed),
395 i64
396 ),
397 (
398 "packet_batches_allocated",
399 self.total_packet_batches_allocated
400 .swap(0, Ordering::Relaxed),
401 i64
402 ),
403 (
404 "packets_sent_for_batching",
405 self.total_packets_sent_for_batching
406 .swap(0, Ordering::Relaxed),
407 i64
408 ),
409 (
410 "staked_packets_sent_for_batching",
411 self.total_staked_packets_sent_for_batching
412 .swap(0, Ordering::Relaxed),
413 i64
414 ),
415 (
416 "unstaked_packets_sent_for_batching",
417 self.total_unstaked_packets_sent_for_batching
418 .swap(0, Ordering::Relaxed),
419 i64
420 ),
421 (
422 "bytes_sent_for_batching",
423 self.total_bytes_sent_for_batching
424 .swap(0, Ordering::Relaxed),
425 i64
426 ),
427 (
428 "chunks_sent_for_batching",
429 self.total_chunks_sent_for_batching
430 .swap(0, Ordering::Relaxed),
431 i64
432 ),
433 (
434 "packets_sent_to_consumer",
435 self.total_packets_sent_to_consumer
436 .swap(0, Ordering::Relaxed),
437 i64
438 ),
439 (
440 "bytes_sent_to_consumer",
441 self.total_bytes_sent_to_consumer.swap(0, Ordering::Relaxed),
442 i64
443 ),
444 (
445 "chunks_processed_by_batcher",
446 self.total_chunks_processed_by_batcher
447 .swap(0, Ordering::Relaxed),
448 i64
449 ),
450 (
451 "staked_chunks_received",
452 self.total_staked_chunks_received.swap(0, Ordering::Relaxed),
453 i64
454 ),
455 (
456 "unstaked_chunks_received",
457 self.total_unstaked_chunks_received
458 .swap(0, Ordering::Relaxed),
459 i64
460 ),
461 (
462 "packet_batch_send_error",
463 self.total_packet_batch_send_err.swap(0, Ordering::Relaxed),
464 i64
465 ),
466 (
467 "handle_chunk_to_packet_batcher_send_error",
468 self.total_handle_chunk_to_packet_batcher_send_err
469 .swap(0, Ordering::Relaxed),
470 i64
471 ),
472 (
473 "handle_chunk_to_packet_batcher_send_full_err",
474 self.total_handle_chunk_to_packet_batcher_send_full_err
475 .swap(0, Ordering::Relaxed),
476 i64
477 ),
478 (
479 "handle_chunk_to_packet_batcher_send_disconnected_err",
480 self.total_handle_chunk_to_packet_batcher_send_disconnected_err
481 .swap(0, Ordering::Relaxed),
482 i64
483 ),
484 (
485 "packet_batches_sent",
486 self.total_packet_batches_sent.swap(0, Ordering::Relaxed),
487 i64
488 ),
489 (
490 "packet_batch_empty",
491 self.total_packet_batches_none.swap(0, Ordering::Relaxed),
492 i64
493 ),
494 (
495 "stream_read_errors",
496 self.total_stream_read_errors.swap(0, Ordering::Relaxed),
497 i64
498 ),
499 (
500 "stream_read_timeouts",
501 self.total_stream_read_timeouts.swap(0, Ordering::Relaxed),
502 i64
503 ),
504 (
505 "throttled_streams",
506 self.throttled_streams.swap(0, Ordering::Relaxed),
507 i64
508 ),
509 (
510 "stream_load_ema",
511 self.stream_load_ema.load(Ordering::Relaxed),
512 i64
513 ),
514 (
515 "stream_load_ema_overflow",
516 self.stream_load_ema_overflow.load(Ordering::Relaxed),
517 i64
518 ),
519 (
520 "stream_load_capacity_overflow",
521 self.stream_load_capacity_overflow.load(Ordering::Relaxed),
522 i64
523 ),
524 (
525 "throttled_unstaked_streams",
526 self.throttled_unstaked_streams.swap(0, Ordering::Relaxed),
527 i64
528 ),
529 (
530 "throttled_staked_streams",
531 self.throttled_staked_streams.swap(0, Ordering::Relaxed),
532 i64
533 ),
534 (
535 "process_sampled_packets_us_90pct",
536 process_sampled_packets_us_hist
537 .percentile(90.0)
538 .unwrap_or(0),
539 i64
540 ),
541 (
542 "process_sampled_packets_us_min",
543 process_sampled_packets_us_hist.minimum().unwrap_or(0),
544 i64
545 ),
546 (
547 "process_sampled_packets_us_max",
548 process_sampled_packets_us_hist.maximum().unwrap_or(0),
549 i64
550 ),
551 (
552 "process_sampled_packets_us_mean",
553 process_sampled_packets_us_hist.mean().unwrap_or(0),
554 i64
555 ),
556 (
557 "process_sampled_packets_count",
558 process_sampled_packets_us_hist.entries(),
559 i64
560 ),
561 (
562 "perf_track_overhead_us",
563 self.perf_track_overhead_us.swap(0, Ordering::Relaxed),
564 i64
565 ),
566 (
567 "connection_rate_limiter_length",
568 self.connection_rate_limiter_length.load(Ordering::Relaxed),
569 i64
570 ),
571 (
572 "outstanding_incoming_connection_attempts",
573 self.outstanding_incoming_connection_attempts
574 .load(Ordering::Relaxed),
575 i64
576 ),
577 (
578 "total_incoming_connection_attempts",
579 self.total_incoming_connection_attempts
580 .load(Ordering::Relaxed),
581 i64
582 ),
583 (
584 "quic_endpoints_count",
585 self.quic_endpoints_count.load(Ordering::Relaxed),
586 i64
587 ),
588 (
589 "open_connections",
590 self.open_connections.load(Ordering::Relaxed),
591 i64
592 ),
593 (
594 "open_staked_connections",
595 self.open_staked_connections.load(Ordering::Relaxed),
596 i64
597 ),
598 (
599 "open_unstaked_connections",
600 self.open_unstaked_connections.load(Ordering::Relaxed),
601 i64
602 ),
603 (
604 "refused_connections_too_many_open_connections",
605 self.refused_connections_too_many_open_connections
606 .swap(0, Ordering::Relaxed),
607 i64
608 ),
609 );
610 }
611}
612
613#[deprecated(since = "3.0.0", note = "Use spawn_server_with_cancel instead")]
614#[allow(deprecated)]
615pub fn spawn_server_multi(
616 thread_name: &'static str,
617 metrics_name: &'static str,
618 sockets: Vec<UdpSocket>,
619 keypair: &Keypair,
620 packet_sender: Sender<PacketBatch>,
621 exit: Arc<AtomicBool>,
622 staked_nodes: Arc<RwLock<StakedNodes>>,
623 quic_server_params: QuicServerParams,
624) -> Result<SpawnServerResult, QuicServerError> {
625 #[allow(deprecated)]
626 spawn_server(
627 thread_name,
628 metrics_name,
629 sockets,
630 keypair,
631 packet_sender,
632 exit,
633 staked_nodes,
634 quic_server_params,
635 )
636}
637
638#[derive(Clone)]
639#[deprecated(since = "3.1.0", note = "Use QuicStreamerConfig instead")]
640pub struct QuicServerParams {
641 pub max_connections_per_peer: usize,
642 pub max_staked_connections: usize,
643 pub max_unstaked_connections: usize,
644 pub max_connections_per_ipaddr_per_min: u64,
645 pub wait_for_chunk_timeout: Duration,
646 pub accumulator_channel_size: usize,
647 pub num_threads: NonZeroUsize,
648 pub max_streams_per_ms: u64,
649}
650
651#[derive(Clone)]
652pub struct QuicStreamerConfig {
653 pub max_connections_per_peer: usize,
654 pub max_staked_connections: usize,
655 pub max_unstaked_connections: usize,
656 pub max_connections_per_ipaddr_per_min: u64,
657 pub wait_for_chunk_timeout: Duration,
658 pub accumulator_channel_size: usize,
659 pub num_threads: NonZeroUsize,
660}
661
662#[derive(Clone)]
663pub struct SwQosQuicStreamerConfig {
664 pub quic_streamer_config: QuicStreamerConfig,
665 pub qos_config: SwQosConfig,
666}
667
668#[derive(Clone)]
669pub struct SimpleQosQuicStreamerConfig {
670 pub quic_streamer_config: QuicStreamerConfig,
671 pub qos_config: SimpleQosConfig,
672}
673
674#[allow(deprecated)]
675impl Default for QuicServerParams {
676 fn default() -> Self {
677 QuicServerParams {
678 max_connections_per_peer: 1,
679 max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS,
680 max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS,
681 max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
682 wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
683 accumulator_channel_size: DEFAULT_ACCUMULATOR_CHANNEL_SIZE,
684 num_threads: NonZeroUsize::new(num_cpus::get().min(1)).expect("1 is non-zero"),
685 max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
686 }
687 }
688}
689
690#[allow(deprecated)]
691impl QuicServerParams {
692 #[cfg(feature = "dev-context-only-utils")]
693 pub const DEFAULT_NUM_SERVER_THREADS_FOR_TEST: NonZeroUsize = NonZeroUsize::new(8).unwrap();
694
695 #[cfg(feature = "dev-context-only-utils")]
696 pub fn default_for_tests() -> Self {
697 Self {
699 num_threads: Self::DEFAULT_NUM_SERVER_THREADS_FOR_TEST,
700 ..Self::default()
701 }
702 }
703}
704
705impl Default for QuicStreamerConfig {
706 fn default() -> Self {
707 Self {
708 max_connections_per_peer: 1,
709 max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS,
710 max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS,
711 max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
712 wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
713 accumulator_channel_size: DEFAULT_ACCUMULATOR_CHANNEL_SIZE,
714 num_threads: NonZeroUsize::new(num_cpus::get().min(1)).expect("1 is non-zero"),
715 }
716 }
717}
718
719impl QuicStreamerConfig {
720 #[cfg(feature = "dev-context-only-utils")]
721 pub const DEFAULT_NUM_SERVER_THREADS_FOR_TEST: NonZeroUsize = NonZeroUsize::new(8).unwrap();
722
723 #[cfg(feature = "dev-context-only-utils")]
724 pub fn default_for_tests() -> Self {
725 Self {
727 accumulator_channel_size: 100_000,
728 num_threads: Self::DEFAULT_NUM_SERVER_THREADS_FOR_TEST,
729 ..Self::default()
730 }
731 }
732
733 pub(crate) fn max_concurrent_connections(&self) -> usize {
734 let conns = self.max_staked_connections + self.max_unstaked_connections;
735 conns + conns / 4
736 }
737}
738
739#[allow(deprecated)]
740impl From<&QuicServerParams> for QuicStreamerConfig {
741 fn from(params: &QuicServerParams) -> Self {
742 Self {
743 max_connections_per_peer: params.max_connections_per_peer,
744 max_staked_connections: params.max_staked_connections,
745 max_unstaked_connections: params.max_unstaked_connections,
746 max_connections_per_ipaddr_per_min: params.max_connections_per_ipaddr_per_min,
747 wait_for_chunk_timeout: params.wait_for_chunk_timeout,
748 accumulator_channel_size: params.accumulator_channel_size,
749 num_threads: params.num_threads,
750 }
751 }
752}
753
754#[deprecated(since = "3.1.0", note = "Use spawn_server_with_cancel instead")]
755#[allow(deprecated)]
756pub fn spawn_server(
757 thread_name: &'static str,
758 metrics_name: &'static str,
759 sockets: impl IntoIterator<Item = UdpSocket>,
760 keypair: &Keypair,
761 packet_sender: Sender<PacketBatch>,
762 exit: Arc<AtomicBool>,
763 staked_nodes: Arc<RwLock<StakedNodes>>,
764 quic_server_params: QuicServerParams,
765) -> Result<SpawnServerResult, QuicServerError> {
766 let cancel = CancellationToken::new();
767 thread::spawn({
768 let cancel = cancel.clone();
769 move || loop {
770 if exit.load(Ordering::Relaxed) {
771 cancel.cancel();
772 break;
773 }
774 thread::sleep(Duration::from_millis(100));
775 }
776 });
777 let quic_server_config: QuicStreamerConfig = (&quic_server_params).into();
778 let qos_config = SwQosConfig {
779 max_streams_per_ms: quic_server_params.max_streams_per_ms,
780 };
781 spawn_server_with_cancel(
782 thread_name,
783 metrics_name,
784 sockets,
785 keypair,
786 packet_sender,
787 staked_nodes,
788 quic_server_config,
789 qos_config,
790 cancel,
791 )
792}
793
794fn spawn_server_with_cancel_generic<Q, C>(
796 thread_name: &'static str,
797 metrics_name: &'static str,
798 stats: Arc<StreamerStats>,
799 sockets: impl IntoIterator<Item = UdpSocket>,
800 keypair: &Keypair,
801 packet_sender: Sender<PacketBatch>,
802 quic_server_params: QuicStreamerConfig,
803 qos: Arc<Q>,
804 cancel: CancellationToken,
805) -> Result<SpawnServerResult, QuicServerError>
806where
807 Q: QosController<C> + Send + Sync + 'static,
808 C: ConnectionContext + Send + Sync + 'static,
809{
810 let runtime = rt(format!("{thread_name}Rt"), quic_server_params.num_threads);
811 let result = {
812 let _guard = runtime.enter();
813 crate::nonblocking::quic::spawn_server_with_cancel_and_qos(
814 metrics_name,
815 stats,
816 sockets,
817 keypair,
818 packet_sender,
819 quic_server_params,
820 qos,
821 cancel,
822 )
823 }?;
824 let handle = thread::Builder::new()
825 .name(thread_name.into())
826 .spawn(move || {
827 if let Err(e) = runtime.block_on(result.thread) {
828 warn!("error from runtime.block_on: {e:?}");
829 }
830 })
831 .unwrap();
832 let updater = EndpointKeyUpdater {
833 endpoints: result.endpoints.clone(),
834 };
835 Ok(SpawnServerResult {
836 endpoints: result.endpoints,
837 thread: handle,
838 key_updater: Arc::new(updater),
839 })
840}
841
842pub fn spawn_server_with_cancel(
844 thread_name: &'static str,
845 metrics_name: &'static str,
846 sockets: impl IntoIterator<Item = UdpSocket>,
847 keypair: &Keypair,
848 packet_sender: Sender<PacketBatch>,
849 staked_nodes: Arc<RwLock<StakedNodes>>,
850 quic_server_params: QuicStreamerConfig,
851 qos_config: SwQosConfig,
852 cancel: CancellationToken,
853) -> Result<SpawnServerResult, QuicServerError> {
854 let stats = Arc::<StreamerStats>::default();
855 let swqos = Arc::new(SwQos::new(
856 qos_config,
857 quic_server_params.max_staked_connections,
858 quic_server_params.max_unstaked_connections,
859 quic_server_params.max_connections_per_peer,
860 stats.clone(),
861 staked_nodes,
862 cancel.clone(),
863 ));
864 spawn_server_with_cancel_generic(
865 thread_name,
866 metrics_name,
867 stats,
868 sockets,
869 keypair,
870 packet_sender,
871 quic_server_params,
872 swqos,
873 cancel,
874 )
875}
876
877pub fn spawn_simple_qos_server_with_cancel(
879 thread_name: &'static str,
880 metrics_name: &'static str,
881 sockets: impl IntoIterator<Item = UdpSocket>,
882 keypair: &Keypair,
883 packet_sender: Sender<PacketBatch>,
884 staked_nodes: Arc<RwLock<StakedNodes>>,
885 quic_server_params: QuicStreamerConfig,
886 qos_config: SimpleQosConfig,
887 cancel: CancellationToken,
888) -> Result<SpawnServerResult, QuicServerError> {
889 let stats = Arc::<StreamerStats>::default();
890
891 let simple_qos = Arc::new(SimpleQos::new(
892 qos_config,
893 quic_server_params.max_connections_per_peer,
894 quic_server_params.max_staked_connections,
895 stats.clone(),
896 staked_nodes,
897 cancel.clone(),
898 ));
899
900 spawn_server_with_cancel_generic(
901 thread_name,
902 metrics_name,
903 stats,
904 sockets,
905 keypair,
906 packet_sender,
907 quic_server_params,
908 simple_qos,
909 cancel,
910 )
911}
912
913#[cfg(test)]
914mod test {
915 use {
916 super::*,
917 crate::nonblocking::{quic::test::*, testing_utilities::check_multiple_streams},
918 crossbeam_channel::unbounded,
919 solana_net_utils::sockets::bind_to_localhost_unique,
920 solana_pubkey::Pubkey,
921 solana_signer::Signer,
922 std::{collections::HashMap, net::SocketAddr},
923 };
924
925 fn rt_for_test() -> Runtime {
926 rt(
927 "solQuicTestRt".to_string(),
928 QuicStreamerConfig::DEFAULT_NUM_SERVER_THREADS_FOR_TEST,
929 )
930 }
931
932 fn setup_quic_server_with_params(
933 server_params: QuicStreamerConfig,
934 staked_nodes: Arc<RwLock<StakedNodes>>,
935 ) -> (
936 std::thread::JoinHandle<()>,
937 crossbeam_channel::Receiver<PacketBatch>,
938 SocketAddr,
939 CancellationToken,
940 ) {
941 let s = bind_to_localhost_unique().expect("should bind");
942 let (sender, receiver) = unbounded();
943 let keypair = Keypair::new();
944 let server_address = s.local_addr().unwrap();
945 let cancel = CancellationToken::new();
946 let SpawnServerResult {
947 endpoints: _,
948 thread: t,
949 key_updater: _,
950 } = spawn_server_with_cancel(
951 "solQuicTest",
952 "quic_streamer_test",
953 [s],
954 &keypair,
955 sender,
956 staked_nodes,
957 server_params,
958 SwQosConfig::default(),
959 cancel.clone(),
960 )
961 .unwrap();
962 (t, receiver, server_address, cancel)
963 }
964
965 fn setup_simple_qos_quic_server_with_params(
966 server_params: SimpleQosQuicStreamerConfig,
967 staked_nodes: Arc<RwLock<StakedNodes>>,
968 ) -> (
969 std::thread::JoinHandle<()>,
970 crossbeam_channel::Receiver<PacketBatch>,
971 SocketAddr,
972 CancellationToken,
973 ) {
974 let s = bind_to_localhost_unique().expect("should bind");
975 let (sender, receiver) = unbounded();
976 let keypair = Keypair::new();
977 let server_address = s.local_addr().unwrap();
978 let cancel = CancellationToken::new();
979 let SpawnServerResult {
980 endpoints: _,
981 thread: t,
982 key_updater: _,
983 } = spawn_simple_qos_server_with_cancel(
984 "solQuicTest",
985 "quic_streamer_test",
986 [s],
987 &keypair,
988 sender,
989 staked_nodes,
990 server_params.quic_streamer_config,
991 server_params.qos_config,
992 cancel.clone(),
993 )
994 .unwrap();
995 (t, receiver, server_address, cancel)
996 }
997
998 fn setup_quic_server() -> (
999 std::thread::JoinHandle<()>,
1000 crossbeam_channel::Receiver<PacketBatch>,
1001 SocketAddr,
1002 CancellationToken,
1003 ) {
1004 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1005 setup_quic_server_with_params(QuicStreamerConfig::default_for_tests(), staked_nodes)
1006 }
1007
1008 #[test]
1009 fn test_quic_server_exit() {
1010 let (t, _receiver, _server_address, cancel) = setup_quic_server();
1011 cancel.cancel();
1012 t.join().unwrap();
1013 }
1014
1015 #[test]
1016 fn test_quic_timeout() {
1017 agave_logger::setup();
1018 let (t, receiver, server_address, cancel) = setup_quic_server();
1019 let runtime = rt_for_test();
1020 runtime.block_on(check_timeout(receiver, server_address));
1021 cancel.cancel();
1022 t.join().unwrap();
1023 }
1024
1025 #[test]
1026 fn test_quic_server_block_multiple_connections() {
1027 agave_logger::setup();
1028 let (t, _receiver, server_address, cancel) = setup_quic_server();
1029
1030 let runtime = rt_for_test();
1031 runtime.block_on(check_block_multiple_connections(server_address));
1032 cancel.cancel();
1033 t.join().unwrap();
1034 }
1035
1036 #[test]
1037 fn test_quic_server_multiple_streams() {
1038 agave_logger::setup();
1039 let s = bind_to_localhost_unique().expect("should bind");
1040 let (sender, receiver) = unbounded();
1041 let keypair = Keypair::new();
1042 let server_address = s.local_addr().unwrap();
1043 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1044 let cancel = CancellationToken::new();
1045 let SpawnServerResult {
1046 endpoints: _,
1047 thread: t,
1048 key_updater: _,
1049 } = spawn_server_with_cancel(
1050 "solQuicTest",
1051 "quic_streamer_test",
1052 [s],
1053 &keypair,
1054 sender,
1055 staked_nodes,
1056 QuicStreamerConfig {
1057 max_connections_per_peer: 2,
1058 ..QuicStreamerConfig::default_for_tests()
1059 },
1060 SwQosConfig::default(),
1061 cancel.clone(),
1062 )
1063 .unwrap();
1064
1065 let runtime = rt_for_test();
1066 runtime.block_on(check_multiple_streams(receiver, server_address, None));
1067 cancel.cancel();
1068 t.join().unwrap();
1069 }
1070
1071 #[test]
1072 fn test_quic_server_multiple_writes() {
1073 agave_logger::setup();
1074 let (t, receiver, server_address, cancel) = setup_quic_server();
1075
1076 let runtime = rt_for_test();
1077 runtime.block_on(check_multiple_writes(receiver, server_address, None));
1078 cancel.cancel();
1079 t.join().unwrap();
1080 }
1081
1082 #[test]
1083 fn test_quic_server_multiple_packets_with_simple_qos() {
1084 agave_logger::setup();
1088 let client_keypair = Keypair::new();
1089 let rich_node_keypair = Keypair::new();
1090
1091 let stakes = HashMap::from([
1092 (client_keypair.pubkey(), 1_000), (rich_node_keypair.pubkey(), 1_000_000_000),
1094 ]);
1095 let staked_nodes = StakedNodes::new(
1096 Arc::new(stakes),
1097 HashMap::<Pubkey, u64>::default(), );
1099
1100 let server_params = QuicStreamerConfig {
1101 max_unstaked_connections: 0,
1102 ..QuicStreamerConfig::default_for_tests()
1103 };
1104 let qos_config = SimpleQosConfig {
1105 max_streams_per_second: 20, };
1107 let server_params = SimpleQosQuicStreamerConfig {
1108 quic_streamer_config: server_params,
1109 qos_config,
1110 };
1111 let (t, receiver, server_address, cancel) = setup_simple_qos_quic_server_with_params(
1112 server_params,
1113 Arc::new(RwLock::new(staked_nodes)),
1114 );
1115
1116 let runtime = rt_for_test();
1117 let num_expected_packets = 20;
1118
1119 runtime.block_on(check_multiple_packets(
1120 receiver,
1121 server_address,
1122 Some(&client_keypair),
1123 num_expected_packets,
1124 ));
1125 cancel.cancel();
1126 t.join().unwrap();
1127 }
1128
1129 #[test]
1130 fn test_quic_server_unstaked_node_connect_failure() {
1131 agave_logger::setup();
1132 let s = bind_to_localhost_unique().expect("should bind");
1133 let (sender, _) = unbounded();
1134 let keypair = Keypair::new();
1135 let server_address = s.local_addr().unwrap();
1136 let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
1137 let cancel = CancellationToken::new();
1138 let SpawnServerResult {
1139 endpoints: _,
1140 thread: t,
1141 key_updater: _,
1142 } = spawn_server_with_cancel(
1143 "solQuicTest",
1144 "quic_streamer_test",
1145 [s],
1146 &keypair,
1147 sender,
1148 staked_nodes,
1149 QuicStreamerConfig {
1150 max_unstaked_connections: 0,
1151 ..QuicStreamerConfig::default_for_tests()
1152 },
1153 SwQosConfig::default(),
1154 cancel.clone(),
1155 )
1156 .unwrap();
1157
1158 let runtime = rt_for_test();
1159 runtime.block_on(check_unstaked_node_connect_failure(server_address));
1160 cancel.cancel();
1161 t.join().unwrap();
1162 }
1163}