1use {
5 crate::{
6 packet::{
7 self, PacketBatch, PacketBatchRecycler, PacketRef, PinnedPacketBatch, PACKETS_PER_BATCH,
8 },
9 sendmmsg::{batch_send, SendPktsError},
10 socket::SocketAddrSpace,
11 },
12 crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender, TrySendError},
13 histogram::Histogram,
14 itertools::Itertools,
15 solana_pubkey::Pubkey,
16 solana_time_utils::timestamp,
17 std::{
18 cmp::Reverse,
19 collections::HashMap,
20 net::{IpAddr, UdpSocket},
21 sync::{
22 atomic::{AtomicBool, AtomicUsize, Ordering},
23 Arc,
24 },
25 thread::{sleep, Builder, JoinHandle},
26 time::{Duration, Instant},
27 },
28 thiserror::Error,
29};
30
31pub trait ChannelSend<T>: Send + 'static {
32 fn send(&self, msg: T) -> std::result::Result<(), SendError<T>>;
33
34 fn try_send(&self, msg: T) -> std::result::Result<(), TrySendError<T>>;
35
36 fn is_empty(&self) -> bool;
37
38 fn len(&self) -> usize;
39}
40
41impl<T> ChannelSend<T> for Sender<T>
42where
43 T: Send + 'static,
44{
45 #[inline]
46 fn send(&self, msg: T) -> std::result::Result<(), SendError<T>> {
47 self.send(msg)
48 }
49
50 #[inline]
51 fn try_send(&self, msg: T) -> std::result::Result<(), TrySendError<T>> {
52 self.try_send(msg)
53 }
54
55 #[inline]
56 fn is_empty(&self) -> bool {
57 self.is_empty()
58 }
59
60 #[inline]
61 fn len(&self) -> usize {
62 self.len()
63 }
64}
65
66#[derive(Default)]
68pub struct StakedNodes {
69 stakes: Arc<HashMap<Pubkey, u64>>,
70 overrides: HashMap<Pubkey, u64>,
71 total_stake: u64,
72 max_stake: u64,
73 min_stake: u64,
74}
75
76pub type PacketBatchReceiver = Receiver<PacketBatch>;
77pub type PacketBatchSender = Sender<PacketBatch>;
78
79#[derive(Error, Debug)]
80pub enum StreamerError {
81 #[error("I/O error")]
82 Io(#[from] std::io::Error),
83
84 #[error("receive timeout error")]
85 RecvTimeout(#[from] RecvTimeoutError),
86
87 #[error("send packets error")]
88 Send(#[from] SendError<PacketBatch>),
89
90 #[error(transparent)]
91 SendPktsError(#[from] SendPktsError),
92}
93
94pub struct StreamerReceiveStats {
95 pub name: &'static str,
96 pub packets_count: AtomicUsize,
97 pub packet_batches_count: AtomicUsize,
98 pub full_packet_batches_count: AtomicUsize,
99 pub max_channel_len: AtomicUsize,
100 pub num_packets_dropped: AtomicUsize,
101}
102
103impl StreamerReceiveStats {
104 pub fn new(name: &'static str) -> Self {
105 Self {
106 name,
107 packets_count: AtomicUsize::default(),
108 packet_batches_count: AtomicUsize::default(),
109 full_packet_batches_count: AtomicUsize::default(),
110 max_channel_len: AtomicUsize::default(),
111 num_packets_dropped: AtomicUsize::default(),
112 }
113 }
114
115 pub fn report(&self) {
116 datapoint_info!(
117 self.name,
118 (
119 "packets_count",
120 self.packets_count.swap(0, Ordering::Relaxed) as i64,
121 i64
122 ),
123 (
124 "packet_batches_count",
125 self.packet_batches_count.swap(0, Ordering::Relaxed) as i64,
126 i64
127 ),
128 (
129 "full_packet_batches_count",
130 self.full_packet_batches_count.swap(0, Ordering::Relaxed) as i64,
131 i64
132 ),
133 (
134 "channel_len",
135 self.max_channel_len.swap(0, Ordering::Relaxed) as i64,
136 i64
137 ),
138 (
139 "num_packets_dropped",
140 self.num_packets_dropped.swap(0, Ordering::Relaxed) as i64,
141 i64
142 ),
143 );
144 }
145}
146
147pub type Result<T> = std::result::Result<T, StreamerError>;
148
149fn recv_loop(
150 socket: &UdpSocket,
151 exit: &AtomicBool,
152 packet_batch_sender: &impl ChannelSend<PacketBatch>,
153 recycler: &PacketBatchRecycler,
154 stats: &StreamerReceiveStats,
155 coalesce: Option<Duration>,
156 use_pinned_memory: bool,
157 in_vote_only_mode: Option<Arc<AtomicBool>>,
158 is_staked_service: bool,
159) -> Result<()> {
160 loop {
161 let mut packet_batch = if use_pinned_memory {
162 PinnedPacketBatch::new_with_recycler(recycler, PACKETS_PER_BATCH, stats.name)
163 } else {
164 PinnedPacketBatch::with_capacity(PACKETS_PER_BATCH)
165 };
166 loop {
167 if exit.load(Ordering::Relaxed) {
170 return Ok(());
171 }
172
173 if let Some(ref in_vote_only_mode) = in_vote_only_mode {
174 if in_vote_only_mode.load(Ordering::Relaxed) {
175 sleep(Duration::from_millis(1));
176 continue;
177 }
178 }
179
180 if let Ok(len) = packet::recv_from(&mut packet_batch, socket, coalesce) {
181 if len > 0 {
182 let StreamerReceiveStats {
183 packets_count,
184 packet_batches_count,
185 full_packet_batches_count,
186 max_channel_len,
187 ..
188 } = stats;
189
190 packets_count.fetch_add(len, Ordering::Relaxed);
191 packet_batches_count.fetch_add(1, Ordering::Relaxed);
192 max_channel_len.fetch_max(packet_batch_sender.len(), Ordering::Relaxed);
193 if len == PACKETS_PER_BATCH {
194 full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
195 }
196 packet_batch
197 .iter_mut()
198 .for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service));
199 match packet_batch_sender.try_send(packet_batch.into()) {
200 Ok(_) => {}
201 Err(TrySendError::Full(_)) => {
202 stats.num_packets_dropped.fetch_add(len, Ordering::Relaxed);
203 }
204 Err(TrySendError::Disconnected(err)) => {
205 return Err(StreamerError::Send(SendError(err)))
206 }
207 }
208 }
209 break;
210 }
211 }
212 }
213}
214
215#[allow(clippy::too_many_arguments)]
216pub fn receiver(
217 thread_name: String,
218 socket: Arc<UdpSocket>,
219 exit: Arc<AtomicBool>,
220 packet_batch_sender: impl ChannelSend<PacketBatch>,
221 recycler: PacketBatchRecycler,
222 stats: Arc<StreamerReceiveStats>,
223 coalesce: Option<Duration>,
224 use_pinned_memory: bool,
225 in_vote_only_mode: Option<Arc<AtomicBool>>,
226 is_staked_service: bool,
227) -> JoinHandle<()> {
228 let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
229 assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
230 Builder::new()
231 .name(thread_name)
232 .spawn(move || {
233 let _ = recv_loop(
234 &socket,
235 &exit,
236 &packet_batch_sender,
237 &recycler,
238 &stats,
239 coalesce,
240 use_pinned_memory,
241 in_vote_only_mode,
242 is_staked_service,
243 );
244 })
245 .unwrap()
246}
247
248#[derive(Debug, Default)]
249struct SendStats {
250 bytes: u64,
251 count: u64,
252}
253
254#[derive(Default)]
255struct StreamerSendStats {
256 host_map: HashMap<IpAddr, SendStats>,
257 since: Option<Instant>,
258}
259
260impl StreamerSendStats {
261 fn report_stats(
262 name: &'static str,
263 host_map: HashMap<IpAddr, SendStats>,
264 sample_duration: Option<Duration>,
265 ) {
266 const MAX_REPORT_ENTRIES: usize = 5;
267 let sample_ms = sample_duration.map(|d| d.as_millis()).unwrap_or_default();
268 let mut hist = Histogram::default();
269 let mut byte_sum = 0;
270 let mut pkt_count = 0;
271 host_map.iter().for_each(|(_addr, host_stats)| {
272 hist.increment(host_stats.bytes).unwrap();
273 byte_sum += host_stats.bytes;
274 pkt_count += host_stats.count;
275 });
276
277 datapoint_info!(
278 name,
279 ("streamer-send-sample_duration_ms", sample_ms, i64),
280 ("streamer-send-host_count", host_map.len(), i64),
281 ("streamer-send-bytes_total", byte_sum, i64),
282 ("streamer-send-pkt_count_total", pkt_count, i64),
283 (
284 "streamer-send-host_bytes_min",
285 hist.minimum().unwrap_or_default(),
286 i64
287 ),
288 (
289 "streamer-send-host_bytes_max",
290 hist.maximum().unwrap_or_default(),
291 i64
292 ),
293 (
294 "streamer-send-host_bytes_mean",
295 hist.mean().unwrap_or_default(),
296 i64
297 ),
298 (
299 "streamer-send-host_bytes_90pct",
300 hist.percentile(90.0).unwrap_or_default(),
301 i64
302 ),
303 (
304 "streamer-send-host_bytes_50pct",
305 hist.percentile(50.0).unwrap_or_default(),
306 i64
307 ),
308 (
309 "streamer-send-host_bytes_10pct",
310 hist.percentile(10.0).unwrap_or_default(),
311 i64
312 ),
313 );
314
315 let num_entries = host_map.len();
316 let mut entries: Vec<_> = host_map.into_iter().collect();
317 if entries.len() > MAX_REPORT_ENTRIES {
318 entries.select_nth_unstable_by_key(MAX_REPORT_ENTRIES, |(_addr, stats)| {
319 Reverse(stats.bytes)
320 });
321 entries.truncate(MAX_REPORT_ENTRIES);
322 }
323 info!(
324 "streamer send {} hosts: count:{} {:?}",
325 name, num_entries, entries,
326 );
327 }
328
329 fn maybe_submit(&mut self, name: &'static str, sender: &Sender<Box<dyn FnOnce() + Send>>) {
330 const SUBMIT_CADENCE: Duration = Duration::from_secs(10);
331 const MAP_SIZE_REPORTING_THRESHOLD: usize = 1_000;
332 let elapsed = self.since.as_ref().map(Instant::elapsed);
333 if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default()
334 && self.host_map.len() < MAP_SIZE_REPORTING_THRESHOLD
335 {
336 return;
337 }
338
339 let host_map = std::mem::take(&mut self.host_map);
340 let _ = sender.send(Box::new(move || {
341 Self::report_stats(name, host_map, elapsed);
342 }));
343
344 *self = Self {
345 since: Some(Instant::now()),
346 ..Self::default()
347 };
348 }
349
350 fn record(&mut self, pkt: PacketRef) {
351 let ent = self.host_map.entry(pkt.meta().addr).or_default();
352 ent.count += 1;
353 ent.bytes += pkt.data(..).map(<[u8]>::len).unwrap_or_default() as u64;
354 }
355}
356
357impl StakedNodes {
358 fn calculate_stake_stats(
360 stakes: &Arc<HashMap<Pubkey, u64>>,
361 overrides: &HashMap<Pubkey, u64>,
362 ) -> (u64, u64, u64) {
363 let values = stakes
364 .iter()
365 .filter(|(pubkey, _)| !overrides.contains_key(pubkey))
366 .map(|(_, &stake)| stake)
367 .chain(overrides.values().copied())
368 .filter(|&stake| stake > 0);
369 let total_stake = values.clone().sum();
370 let (min_stake, max_stake) = values.minmax().into_option().unwrap_or_default();
371 (total_stake, min_stake, max_stake)
372 }
373
374 pub fn new(stakes: Arc<HashMap<Pubkey, u64>>, overrides: HashMap<Pubkey, u64>) -> Self {
375 let (total_stake, min_stake, max_stake) = Self::calculate_stake_stats(&stakes, &overrides);
376 Self {
377 stakes,
378 overrides,
379 total_stake,
380 max_stake,
381 min_stake,
382 }
383 }
384
385 pub fn get_node_stake(&self, pubkey: &Pubkey) -> Option<u64> {
386 self.overrides
387 .get(pubkey)
388 .or_else(|| self.stakes.get(pubkey))
389 .filter(|&&stake| stake > 0)
390 .copied()
391 }
392
393 #[inline]
394 pub fn total_stake(&self) -> u64 {
395 self.total_stake
396 }
397
398 #[inline]
399 pub(super) fn min_stake(&self) -> u64 {
400 self.min_stake
401 }
402
403 #[inline]
404 pub(super) fn max_stake(&self) -> u64 {
405 self.max_stake
406 }
407
408 pub fn update_stake_map(&mut self, stakes: Arc<HashMap<Pubkey, u64>>) {
410 let (total_stake, min_stake, max_stake) =
411 Self::calculate_stake_stats(&stakes, &self.overrides);
412
413 self.total_stake = total_stake;
414 self.min_stake = min_stake;
415 self.max_stake = max_stake;
416 self.stakes = stakes;
417 }
418}
419
420fn recv_send(
421 sock: &UdpSocket,
422 r: &PacketBatchReceiver,
423 socket_addr_space: &SocketAddrSpace,
424 stats: &mut Option<StreamerSendStats>,
425) -> Result<()> {
426 let timer = Duration::new(1, 0);
427 let packet_batch = r.recv_timeout(timer)?;
428 if let Some(stats) = stats {
429 packet_batch.iter().for_each(|p| stats.record(p));
430 }
431 let packets = packet_batch.iter().filter_map(|pkt| {
432 let addr = pkt.meta().socket_addr();
433 let data = pkt.data(..)?;
434 socket_addr_space.check(&addr).then_some((data, addr))
435 });
436 batch_send(sock, packets.collect::<Vec<_>>())?;
437 Ok(())
438}
439
440pub fn recv_packet_batches(
441 recvr: &PacketBatchReceiver,
442) -> Result<(Vec<PacketBatch>, usize, Duration)> {
443 let recv_start = Instant::now();
444 let timer = Duration::new(1, 0);
445 let packet_batch = recvr.recv_timeout(timer)?;
446 trace!("got packets");
447 let mut num_packets = packet_batch.len();
448 let mut packet_batches = vec![packet_batch];
449 while let Ok(packet_batch) = recvr.try_recv() {
450 trace!("got more packets");
451 num_packets += packet_batch.len();
452 packet_batches.push(packet_batch);
453 }
454 let recv_duration = recv_start.elapsed();
455 trace!(
456 "packet batches len: {}, num packets: {}",
457 packet_batches.len(),
458 num_packets
459 );
460 Ok((packet_batches, num_packets, recv_duration))
461}
462
463pub fn responder(
464 name: &'static str,
465 sock: Arc<UdpSocket>,
466 r: PacketBatchReceiver,
467 socket_addr_space: SocketAddrSpace,
468 stats_reporter_sender: Option<Sender<Box<dyn FnOnce() + Send>>>,
469) -> JoinHandle<()> {
470 Builder::new()
471 .name(format!("solRspndr{name}"))
472 .spawn(move || {
473 let mut errors = 0;
474 let mut last_error = None;
475 let mut last_print = 0;
476 let mut stats = None;
477
478 if stats_reporter_sender.is_some() {
479 stats = Some(StreamerSendStats::default());
480 }
481
482 loop {
483 if let Err(e) = recv_send(&sock, &r, &socket_addr_space, &mut stats) {
484 match e {
485 StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
486 StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
487 _ => {
488 errors += 1;
489 last_error = Some(e);
490 }
491 }
492 }
493 let now = timestamp();
494 if now - last_print > 1000 && errors != 0 {
495 datapoint_info!(name, ("errors", errors, i64),);
496 info!("{} last-error: {:?} count: {}", name, last_error, errors);
497 last_print = now;
498 errors = 0;
499 }
500 if let Some(ref stats_reporter_sender) = stats_reporter_sender {
501 if let Some(ref mut stats) = stats {
502 stats.maybe_submit(name, stats_reporter_sender);
503 }
504 }
505 }
506 })
507 .unwrap()
508}
509
510#[cfg(test)]
511mod test {
512 use {
513 super::*,
514 crate::{
515 packet::{Packet, PinnedPacketBatch, PACKET_DATA_SIZE},
516 streamer::{receiver, responder},
517 },
518 crossbeam_channel::unbounded,
519 solana_net_utils::bind_to_localhost,
520 solana_perf::recycler::Recycler,
521 std::{
522 io,
523 io::Write,
524 sync::{
525 atomic::{AtomicBool, Ordering},
526 Arc,
527 },
528 time::Duration,
529 },
530 };
531
532 fn get_packet_batches(r: PacketBatchReceiver, num_packets: &mut usize) {
533 for _ in 0..10 {
534 let packet_batch_res = r.recv_timeout(Duration::new(1, 0));
535 if packet_batch_res.is_err() {
536 continue;
537 }
538
539 *num_packets -= packet_batch_res.unwrap().len();
540
541 if *num_packets == 0 {
542 break;
543 }
544 }
545 }
546
547 #[test]
548 fn streamer_debug() {
549 write!(io::sink(), "{:?}", Packet::default()).unwrap();
550 write!(io::sink(), "{:?}", PinnedPacketBatch::default()).unwrap();
551 }
552 #[test]
553 fn streamer_send_test() {
554 let read = bind_to_localhost().expect("bind");
555 read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
556
557 let addr = read.local_addr().unwrap();
558 let send = bind_to_localhost().expect("bind");
559 let exit = Arc::new(AtomicBool::new(false));
560 let (s_reader, r_reader) = unbounded();
561 let stats = Arc::new(StreamerReceiveStats::new("test"));
562 let t_receiver = receiver(
563 "solRcvrTest".to_string(),
564 Arc::new(read),
565 exit.clone(),
566 s_reader,
567 Recycler::default(),
568 stats.clone(),
569 Some(Duration::from_millis(1)), true,
571 None,
572 false,
573 );
574 const NUM_PACKETS: usize = 5;
575 let t_responder = {
576 let (s_responder, r_responder) = unbounded();
577 let t_responder = responder(
578 "SendTest",
579 Arc::new(send),
580 r_responder,
581 SocketAddrSpace::Unspecified,
582 None,
583 );
584 let mut packet_batch = PinnedPacketBatch::default();
585 for i in 0..NUM_PACKETS {
586 let mut p = Packet::default();
587 {
588 p.buffer_mut()[0] = i as u8;
589 p.meta_mut().size = PACKET_DATA_SIZE;
590 p.meta_mut().set_socket_addr(&addr);
591 }
592 packet_batch.push(p);
593 }
594 let packet_batch = PacketBatch::from(packet_batch);
595 s_responder.send(packet_batch).expect("send");
596 t_responder
597 };
598
599 let mut packets_remaining = NUM_PACKETS;
600 get_packet_batches(r_reader, &mut packets_remaining);
601 assert_eq!(packets_remaining, 0);
602 exit.store(true, Ordering::Relaxed);
603 assert!(stats.packet_batches_count.load(Ordering::Relaxed) >= 1);
604 assert_eq!(stats.packets_count.load(Ordering::Relaxed), NUM_PACKETS);
605 assert_eq!(stats.full_packet_batches_count.load(Ordering::Relaxed), 0);
606 t_receiver.join().expect("join");
607 t_responder.join().expect("join");
608 }
609}