1use {
5 crate::{
6 packet::{
7 self, PacketBatch, PacketBatchRecycler, PacketRef, PinnedPacketBatch, PACKETS_PER_BATCH,
8 },
9 sendmmsg::{batch_send, SendPktsError},
10 socket::SocketAddrSpace,
11 },
12 crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender, TrySendError},
13 histogram::Histogram,
14 solana_net_utils::multihomed_sockets::{
15 BindIpAddrs, CurrentSocket, FixedSocketProvider, MultihomedSocketProvider, SocketProvider,
16 },
17 solana_packet::Packet,
18 solana_pubkey::Pubkey,
19 solana_time_utils::timestamp,
20 std::{
21 cmp::Reverse,
22 collections::HashMap,
23 net::{IpAddr, UdpSocket},
24 sync::{
25 atomic::{AtomicBool, AtomicUsize, Ordering},
26 Arc,
27 },
28 thread::{sleep, Builder, JoinHandle},
29 time::{Duration, Instant},
30 },
31 thiserror::Error,
32};
33#[cfg(unix)]
34use {
35 nix::poll::{PollFd, PollFlags},
36 std::os::fd::AsFd,
37};
38
39pub trait ChannelSend<T>: Send + 'static {
40 fn send(&self, msg: T) -> std::result::Result<(), SendError<T>>;
41
42 fn try_send(&self, msg: T) -> std::result::Result<(), TrySendError<T>>;
43
44 fn is_empty(&self) -> bool;
45
46 fn len(&self) -> usize;
47}
48
49impl<T> ChannelSend<T> for Sender<T>
50where
51 T: Send + 'static,
52{
53 #[inline]
54 fn send(&self, msg: T) -> std::result::Result<(), SendError<T>> {
55 self.send(msg)
56 }
57
58 #[inline]
59 fn try_send(&self, msg: T) -> std::result::Result<(), TrySendError<T>> {
60 self.try_send(msg)
61 }
62
63 #[inline]
64 fn is_empty(&self) -> bool {
65 self.is_empty()
66 }
67
68 #[inline]
69 fn len(&self) -> usize {
70 self.len()
71 }
72}
73
74pub(crate) const SOCKET_READ_TIMEOUT: Duration = Duration::from_secs(1);
75
76#[derive(Default)]
78pub struct StakedNodes {
79 stakes: Arc<HashMap<Pubkey, u64>>,
80 overrides: HashMap<Pubkey, u64>,
81 total_stake: u64,
82}
83
84pub type PacketBatchReceiver = Receiver<PacketBatch>;
85pub type PacketBatchSender = Sender<PacketBatch>;
86
87#[derive(Error, Debug)]
88pub enum StreamerError {
89 #[error("I/O error")]
90 Io(#[from] std::io::Error),
91
92 #[error("receive timeout error")]
93 RecvTimeout(#[from] RecvTimeoutError),
94
95 #[error("send packets error")]
96 Send(#[from] SendError<PacketBatch>),
97
98 #[error(transparent)]
99 SendPktsError(#[from] SendPktsError),
100}
101
102pub struct StreamerReceiveStats {
103 pub name: &'static str,
104 pub packets_count: AtomicUsize,
105 pub packet_batches_count: AtomicUsize,
106 pub full_packet_batches_count: AtomicUsize,
107 pub max_channel_len: AtomicUsize,
108 pub num_packets_dropped: AtomicUsize,
109}
110
111impl StreamerReceiveStats {
112 pub fn new(name: &'static str) -> Self {
113 Self {
114 name,
115 packets_count: AtomicUsize::default(),
116 packet_batches_count: AtomicUsize::default(),
117 full_packet_batches_count: AtomicUsize::default(),
118 max_channel_len: AtomicUsize::default(),
119 num_packets_dropped: AtomicUsize::default(),
120 }
121 }
122
123 pub fn report(&self) {
124 datapoint_info!(
125 self.name,
126 (
127 "packets_count",
128 self.packets_count.swap(0, Ordering::Relaxed) as i64,
129 i64
130 ),
131 (
132 "packet_batches_count",
133 self.packet_batches_count.swap(0, Ordering::Relaxed) as i64,
134 i64
135 ),
136 (
137 "full_packet_batches_count",
138 self.full_packet_batches_count.swap(0, Ordering::Relaxed) as i64,
139 i64
140 ),
141 (
142 "channel_len",
143 self.max_channel_len.swap(0, Ordering::Relaxed) as i64,
144 i64
145 ),
146 (
147 "num_packets_dropped",
148 self.num_packets_dropped.swap(0, Ordering::Relaxed) as i64,
149 i64
150 ),
151 );
152 }
153}
154
155pub type Result<T> = std::result::Result<T, StreamerError>;
156
157fn recv_loop<P: SocketProvider>(
158 provider: &mut P,
159 exit: &AtomicBool,
160 packet_batch_sender: &impl ChannelSend<PacketBatch>,
161 recycler: &PacketBatchRecycler,
162 stats: &StreamerReceiveStats,
163 coalesce: Option<Duration>,
164 use_pinned_memory: bool,
165 in_vote_only_mode: Option<Arc<AtomicBool>>,
166 is_staked_service: bool,
167) -> Result<()> {
168 fn setup_socket(socket: &UdpSocket) -> Result<()> {
169 #[cfg(not(unix))]
172 socket.set_read_timeout(Some(SOCKET_READ_TIMEOUT))?;
173
174 #[cfg(unix)]
175 socket.set_nonblocking(true)?;
176
177 Ok(())
178 }
179
180 let mut socket = provider.current_socket_ref();
181 setup_socket(socket)?;
182 #[cfg(unix)]
183 let mut poll_fd = [PollFd::new(socket.as_fd(), PollFlags::POLLIN)];
184
185 loop {
186 let mut packet_batch = if use_pinned_memory {
187 PinnedPacketBatch::new_with_recycler(recycler, PACKETS_PER_BATCH, stats.name)
188 } else {
189 PinnedPacketBatch::with_capacity(PACKETS_PER_BATCH)
190 };
191 packet_batch.resize(PACKETS_PER_BATCH, Packet::default());
192
193 loop {
194 if exit.load(Ordering::Relaxed) {
197 return Ok(());
198 }
199
200 if let Some(ref in_vote_only_mode) = in_vote_only_mode {
201 if in_vote_only_mode.load(Ordering::Relaxed) {
202 sleep(Duration::from_millis(1));
203 continue;
204 }
205 }
206
207 #[cfg(unix)]
208 let result = packet::recv_from(&mut packet_batch, socket, coalesce, &mut poll_fd);
209 #[cfg(not(unix))]
210 let result = packet::recv_from(&mut packet_batch, socket, coalesce);
211
212 if let Ok(len) = result {
213 if len > 0 {
214 let StreamerReceiveStats {
215 packets_count,
216 packet_batches_count,
217 full_packet_batches_count,
218 max_channel_len,
219 ..
220 } = stats;
221
222 packets_count.fetch_add(len, Ordering::Relaxed);
223 packet_batches_count.fetch_add(1, Ordering::Relaxed);
224 max_channel_len.fetch_max(packet_batch_sender.len(), Ordering::Relaxed);
225 if len == PACKETS_PER_BATCH {
226 full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
227 }
228 packet_batch
229 .iter_mut()
230 .for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service));
231 match packet_batch_sender.try_send(packet_batch.into()) {
232 Ok(_) => {}
233 Err(TrySendError::Full(_)) => {
234 stats.num_packets_dropped.fetch_add(len, Ordering::Relaxed);
235 }
236 Err(TrySendError::Disconnected(err)) => {
237 return Err(StreamerError::Send(SendError(err)))
238 }
239 }
240 }
241 break;
242 }
243 }
244
245 if let CurrentSocket::Changed(s) = provider.current_socket() {
246 socket = s;
247 setup_socket(socket)?;
248
249 #[cfg(unix)]
250 {
251 poll_fd = [PollFd::new(socket.as_fd(), PollFlags::POLLIN)];
252 }
253 }
254 }
255}
256
257#[allow(clippy::too_many_arguments)]
258pub fn receiver(
259 thread_name: String,
260 socket: Arc<UdpSocket>,
261 exit: Arc<AtomicBool>,
262 packet_batch_sender: impl ChannelSend<PacketBatch>,
263 recycler: PacketBatchRecycler,
264 stats: Arc<StreamerReceiveStats>,
265 coalesce: Option<Duration>,
266 use_pinned_memory: bool,
267 in_vote_only_mode: Option<Arc<AtomicBool>>,
268 is_staked_service: bool,
269) -> JoinHandle<()> {
270 Builder::new()
271 .name(thread_name)
272 .spawn(move || {
273 let mut provider = FixedSocketProvider::new(socket);
274 let _ = recv_loop(
275 &mut provider,
276 &exit,
277 &packet_batch_sender,
278 &recycler,
279 &stats,
280 coalesce,
281 use_pinned_memory,
282 in_vote_only_mode,
283 is_staked_service,
284 );
285 })
286 .unwrap()
287}
288
289#[allow(clippy::too_many_arguments)]
290pub fn receiver_atomic(
291 thread_name: String,
292 sockets: Arc<[UdpSocket]>,
293 bind_ip_addrs: Arc<BindIpAddrs>,
294 exit: Arc<AtomicBool>,
295 packet_batch_sender: impl ChannelSend<PacketBatch>,
296 recycler: PacketBatchRecycler,
297 stats: Arc<StreamerReceiveStats>,
298 coalesce: Option<Duration>,
299 use_pinned_memory: bool,
300 in_vote_only_mode: Option<Arc<AtomicBool>>,
301 is_staked_service: bool,
302) -> JoinHandle<()> {
303 Builder::new()
304 .name(thread_name)
305 .spawn(move || {
306 let mut provider = MultihomedSocketProvider::new(sockets, bind_ip_addrs);
307 let _ = recv_loop(
308 &mut provider,
309 &exit,
310 &packet_batch_sender,
311 &recycler,
312 &stats,
313 coalesce,
314 use_pinned_memory,
315 in_vote_only_mode,
316 is_staked_service,
317 );
318 })
319 .unwrap()
320}
321
322#[derive(Debug, Default)]
323struct SendStats {
324 bytes: u64,
325 count: u64,
326}
327
328#[derive(Default)]
329struct StreamerSendStats {
330 host_map: HashMap<IpAddr, SendStats>,
331 since: Option<Instant>,
332}
333
334impl StreamerSendStats {
335 fn report_stats(
336 name: &'static str,
337 host_map: HashMap<IpAddr, SendStats>,
338 sample_duration: Option<Duration>,
339 ) {
340 const MAX_REPORT_ENTRIES: usize = 5;
341 let sample_ms = sample_duration.map(|d| d.as_millis()).unwrap_or_default();
342 let mut hist = Histogram::default();
343 let mut byte_sum = 0;
344 let mut pkt_count = 0;
345 host_map.iter().for_each(|(_addr, host_stats)| {
346 hist.increment(host_stats.bytes).unwrap();
347 byte_sum += host_stats.bytes;
348 pkt_count += host_stats.count;
349 });
350
351 datapoint_info!(
352 name,
353 ("streamer-send-sample_duration_ms", sample_ms, i64),
354 ("streamer-send-host_count", host_map.len(), i64),
355 ("streamer-send-bytes_total", byte_sum, i64),
356 ("streamer-send-pkt_count_total", pkt_count, i64),
357 (
358 "streamer-send-host_bytes_min",
359 hist.minimum().unwrap_or_default(),
360 i64
361 ),
362 (
363 "streamer-send-host_bytes_max",
364 hist.maximum().unwrap_or_default(),
365 i64
366 ),
367 (
368 "streamer-send-host_bytes_mean",
369 hist.mean().unwrap_or_default(),
370 i64
371 ),
372 (
373 "streamer-send-host_bytes_90pct",
374 hist.percentile(90.0).unwrap_or_default(),
375 i64
376 ),
377 (
378 "streamer-send-host_bytes_50pct",
379 hist.percentile(50.0).unwrap_or_default(),
380 i64
381 ),
382 (
383 "streamer-send-host_bytes_10pct",
384 hist.percentile(10.0).unwrap_or_default(),
385 i64
386 ),
387 );
388
389 let num_entries = host_map.len();
390 let mut entries: Vec<_> = host_map.into_iter().collect();
391 if entries.len() > MAX_REPORT_ENTRIES {
392 entries.select_nth_unstable_by_key(MAX_REPORT_ENTRIES, |(_addr, stats)| {
393 Reverse(stats.bytes)
394 });
395 entries.truncate(MAX_REPORT_ENTRIES);
396 }
397 info!("streamer send {name} hosts: count:{num_entries} {entries:?}");
398 }
399
400 fn maybe_submit(&mut self, name: &'static str, sender: &Sender<Box<dyn FnOnce() + Send>>) {
401 const SUBMIT_CADENCE: Duration = Duration::from_secs(10);
402 const MAP_SIZE_REPORTING_THRESHOLD: usize = 1_000;
403 let elapsed = self.since.as_ref().map(Instant::elapsed);
404 if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default()
405 && self.host_map.len() < MAP_SIZE_REPORTING_THRESHOLD
406 {
407 return;
408 }
409
410 let host_map = std::mem::take(&mut self.host_map);
411 let _ = sender.send(Box::new(move || {
412 Self::report_stats(name, host_map, elapsed);
413 }));
414
415 *self = Self {
416 since: Some(Instant::now()),
417 ..Self::default()
418 };
419 }
420
421 fn record(&mut self, pkt: PacketRef) {
422 let ent = self.host_map.entry(pkt.meta().addr).or_default();
423 ent.count += 1;
424 ent.bytes += pkt.data(..).map(<[u8]>::len).unwrap_or_default() as u64;
425 }
426}
427
428impl StakedNodes {
429 fn calculate_total_stake(
430 stakes: &HashMap<Pubkey, u64>,
431 overrides: &HashMap<Pubkey, u64>,
432 ) -> u64 {
433 stakes
434 .iter()
435 .filter(|(pubkey, _)| !overrides.contains_key(pubkey))
436 .map(|(_, &stake)| stake)
437 .chain(overrides.values().copied())
438 .sum()
439 }
440
441 pub fn new(stakes: Arc<HashMap<Pubkey, u64>>, overrides: HashMap<Pubkey, u64>) -> Self {
442 let total_stake = Self::calculate_total_stake(&stakes, &overrides);
443 Self {
444 stakes,
445 overrides,
446 total_stake,
447 }
448 }
449
450 pub fn get_node_stake(&self, pubkey: &Pubkey) -> Option<u64> {
451 self.overrides
452 .get(pubkey)
453 .or_else(|| self.stakes.get(pubkey))
454 .filter(|&&stake| stake > 0)
455 .copied()
456 }
457
458 #[inline]
459 pub fn total_stake(&self) -> u64 {
460 self.total_stake
461 }
462
463 pub fn update_stake_map(&mut self, stakes: Arc<HashMap<Pubkey, u64>>) {
465 let total_stake = Self::calculate_total_stake(&stakes, &self.overrides);
466 self.total_stake = total_stake;
467 self.stakes = stakes;
468 }
469}
470
471fn recv_send(
472 sock: &UdpSocket,
473 r: &PacketBatchReceiver,
474 socket_addr_space: &SocketAddrSpace,
475 stats: &mut Option<StreamerSendStats>,
476) -> Result<()> {
477 let timer = Duration::new(1, 0);
478 let packet_batch = r.recv_timeout(timer)?;
479 if let Some(stats) = stats {
480 packet_batch.iter().for_each(|p| stats.record(p));
481 }
482 let packets = packet_batch.iter().filter_map(|pkt| {
483 let addr = pkt.meta().socket_addr();
484 let data = pkt.data(..)?;
485 socket_addr_space.check(&addr).then_some((data, addr))
486 });
487 batch_send(sock, packets.collect::<Vec<_>>())?;
488 Ok(())
489}
490
491pub fn recv_packet_batches(
492 recvr: &PacketBatchReceiver,
493) -> Result<(Vec<PacketBatch>, usize, Duration)> {
494 let recv_start = Instant::now();
495 let timer = Duration::new(1, 0);
496 let packet_batch = recvr.recv_timeout(timer)?;
497 trace!("got packets");
498 let mut num_packets = packet_batch.len();
499 let mut packet_batches = vec![packet_batch];
500 while let Ok(packet_batch) = recvr.try_recv() {
501 trace!("got more packets");
502 num_packets += packet_batch.len();
503 packet_batches.push(packet_batch);
504 }
505 let recv_duration = recv_start.elapsed();
506 trace!(
507 "packet batches len: {}, num packets: {}",
508 packet_batches.len(),
509 num_packets
510 );
511 Ok((packet_batches, num_packets, recv_duration))
512}
513
514pub fn responder_atomic(
515 name: &'static str,
516 sockets: Arc<[UdpSocket]>,
517 bind_ip_addrs: Arc<BindIpAddrs>,
518 r: PacketBatchReceiver,
519 socket_addr_space: SocketAddrSpace,
520 stats_reporter_sender: Option<Sender<Box<dyn FnOnce() + Send>>>,
521) -> JoinHandle<()> {
522 Builder::new()
523 .name(format!("solRspndr{name}"))
524 .spawn(move || {
525 responder_loop(
526 MultihomedSocketProvider::new(sockets, bind_ip_addrs),
527 name,
528 r,
529 socket_addr_space,
530 stats_reporter_sender,
531 );
532 })
533 .unwrap()
534}
535
536pub fn responder(
537 name: &'static str,
538 sock: Arc<UdpSocket>,
539 r: PacketBatchReceiver,
540 socket_addr_space: SocketAddrSpace,
541 stats_reporter_sender: Option<Sender<Box<dyn FnOnce() + Send>>>,
542) -> JoinHandle<()> {
543 Builder::new()
544 .name(format!("solRspndr{name}"))
545 .spawn(move || {
546 responder_loop(
547 FixedSocketProvider::new(sock),
548 name,
549 r,
550 socket_addr_space,
551 stats_reporter_sender,
552 );
553 })
554 .unwrap()
555}
556
557fn responder_loop<P: SocketProvider>(
558 provider: P,
559 name: &'static str,
560 r: PacketBatchReceiver,
561 socket_addr_space: SocketAddrSpace,
562 stats_reporter_sender: Option<Sender<Box<dyn FnOnce() + Send>>>,
563) {
564 let mut errors = 0;
565 let mut last_error = None;
566 let mut last_print = 0;
567 let mut stats = None;
568
569 if stats_reporter_sender.is_some() {
570 stats = Some(StreamerSendStats::default());
571 }
572
573 loop {
574 let sock = provider.current_socket_ref();
575 if let Err(e) = recv_send(sock, &r, &socket_addr_space, &mut stats) {
576 match e {
577 StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
578 StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
579 _ => {
580 errors += 1;
581 last_error = Some(e);
582 }
583 }
584 }
585 let now = timestamp();
586 if now - last_print > 1000 && errors != 0 {
587 datapoint_info!(name, ("errors", errors, i64),);
588 info!("{name} last-error: {last_error:?} count: {errors}");
589 last_print = now;
590 errors = 0;
591 }
592 if let Some(ref stats_reporter_sender) = stats_reporter_sender {
593 if let Some(ref mut stats) = stats {
594 stats.maybe_submit(name, stats_reporter_sender);
595 }
596 }
597 }
598}
599
600#[cfg(test)]
601mod test {
602 use {
603 super::*,
604 crate::{
605 packet::{Packet, PinnedPacketBatch, PACKET_DATA_SIZE},
606 streamer::{receiver, responder},
607 },
608 crossbeam_channel::unbounded,
609 solana_net_utils::sockets::bind_to_localhost_unique,
610 solana_perf::recycler::Recycler,
611 std::{
612 io::{self, Write},
613 sync::{
614 atomic::{AtomicBool, Ordering},
615 Arc,
616 },
617 time::Duration,
618 },
619 };
620
621 fn get_packet_batches(r: PacketBatchReceiver, num_packets: &mut usize) {
622 for _ in 0..10 {
623 let packet_batch_res = r.recv_timeout(Duration::new(1, 0));
624 if packet_batch_res.is_err() {
625 continue;
626 }
627
628 *num_packets -= packet_batch_res.unwrap().len();
629
630 if *num_packets == 0 {
631 break;
632 }
633 }
634 }
635
636 #[test]
637 fn streamer_debug() {
638 write!(io::sink(), "{:?}", Packet::default()).unwrap();
639 write!(io::sink(), "{:?}", PinnedPacketBatch::default()).unwrap();
640 }
641 #[test]
642 fn streamer_send_test() {
643 let read = bind_to_localhost_unique().expect("should bind reader");
644 read.set_read_timeout(Some(SOCKET_READ_TIMEOUT)).unwrap();
645 let addr = read.local_addr().unwrap();
646 let send = bind_to_localhost_unique().expect("should bind sender");
647 let exit = Arc::new(AtomicBool::new(false));
648 let (s_reader, r_reader) = unbounded();
649 let stats = Arc::new(StreamerReceiveStats::new("test"));
650 let t_receiver = receiver(
651 "solRcvrTest".to_string(),
652 Arc::new(read),
653 exit.clone(),
654 s_reader,
655 Recycler::default(),
656 stats.clone(),
657 Some(Duration::from_millis(1)), true,
659 None,
660 false,
661 );
662 const NUM_PACKETS: usize = 5;
663 let t_responder = {
664 let (s_responder, r_responder) = unbounded();
665 let t_responder = responder(
666 "SendTest",
667 Arc::new(send),
668 r_responder,
669 SocketAddrSpace::Unspecified,
670 None,
671 );
672 let mut packet_batch = PinnedPacketBatch::default();
673 for i in 0..NUM_PACKETS {
674 let mut p = Packet::default();
675 {
676 p.buffer_mut()[0] = i as u8;
677 p.meta_mut().size = PACKET_DATA_SIZE;
678 p.meta_mut().set_socket_addr(&addr);
679 }
680 packet_batch.push(p);
681 }
682 let packet_batch = PacketBatch::from(packet_batch);
683 s_responder.send(packet_batch).expect("send");
684 t_responder
685 };
686
687 let mut packets_remaining = NUM_PACKETS;
688 get_packet_batches(r_reader, &mut packets_remaining);
689 assert_eq!(packets_remaining, 0);
690 exit.store(true, Ordering::Relaxed);
691 assert!(stats.packet_batches_count.load(Ordering::Relaxed) >= 1);
692 assert_eq!(stats.packets_count.load(Ordering::Relaxed), NUM_PACKETS);
693 assert_eq!(stats.full_packet_batches_count.load(Ordering::Relaxed), 0);
694 t_receiver.join().expect("join");
695 t_responder.join().expect("join");
696 }
697}