solana_streamer/
streamer.rs

1//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
2//!
3
4use {
5    crate::{
6        packet::{
7            self, PacketBatch, PacketBatchRecycler, PacketRef, PinnedPacketBatch, PACKETS_PER_BATCH,
8        },
9        sendmmsg::{batch_send, SendPktsError},
10        socket::SocketAddrSpace,
11    },
12    crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender, TrySendError},
13    histogram::Histogram,
14    itertools::Itertools,
15    solana_pubkey::Pubkey,
16    solana_time_utils::timestamp,
17    std::{
18        cmp::Reverse,
19        collections::HashMap,
20        net::{IpAddr, UdpSocket},
21        sync::{
22            atomic::{AtomicBool, AtomicUsize, Ordering},
23            Arc,
24        },
25        thread::{sleep, Builder, JoinHandle},
26        time::{Duration, Instant},
27    },
28    thiserror::Error,
29};
30
31pub trait ChannelSend<T>: Send + 'static {
32    fn send(&self, msg: T) -> std::result::Result<(), SendError<T>>;
33
34    fn try_send(&self, msg: T) -> std::result::Result<(), TrySendError<T>>;
35
36    fn is_empty(&self) -> bool;
37
38    fn len(&self) -> usize;
39}
40
41impl<T> ChannelSend<T> for Sender<T>
42where
43    T: Send + 'static,
44{
45    #[inline]
46    fn send(&self, msg: T) -> std::result::Result<(), SendError<T>> {
47        self.send(msg)
48    }
49
50    #[inline]
51    fn try_send(&self, msg: T) -> std::result::Result<(), TrySendError<T>> {
52        self.try_send(msg)
53    }
54
55    #[inline]
56    fn is_empty(&self) -> bool {
57        self.is_empty()
58    }
59
60    #[inline]
61    fn len(&self) -> usize {
62        self.len()
63    }
64}
65
66// Total stake and nodes => stake map
67#[derive(Default)]
68pub struct StakedNodes {
69    stakes: Arc<HashMap<Pubkey, u64>>,
70    overrides: HashMap<Pubkey, u64>,
71    total_stake: u64,
72    max_stake: u64,
73    min_stake: u64,
74}
75
76pub type PacketBatchReceiver = Receiver<PacketBatch>;
77pub type PacketBatchSender = Sender<PacketBatch>;
78
79#[derive(Error, Debug)]
80pub enum StreamerError {
81    #[error("I/O error")]
82    Io(#[from] std::io::Error),
83
84    #[error("receive timeout error")]
85    RecvTimeout(#[from] RecvTimeoutError),
86
87    #[error("send packets error")]
88    Send(#[from] SendError<PacketBatch>),
89
90    #[error(transparent)]
91    SendPktsError(#[from] SendPktsError),
92}
93
94pub struct StreamerReceiveStats {
95    pub name: &'static str,
96    pub packets_count: AtomicUsize,
97    pub packet_batches_count: AtomicUsize,
98    pub full_packet_batches_count: AtomicUsize,
99    pub max_channel_len: AtomicUsize,
100    pub num_packets_dropped: AtomicUsize,
101}
102
103impl StreamerReceiveStats {
104    pub fn new(name: &'static str) -> Self {
105        Self {
106            name,
107            packets_count: AtomicUsize::default(),
108            packet_batches_count: AtomicUsize::default(),
109            full_packet_batches_count: AtomicUsize::default(),
110            max_channel_len: AtomicUsize::default(),
111            num_packets_dropped: AtomicUsize::default(),
112        }
113    }
114
115    pub fn report(&self) {
116        datapoint_info!(
117            self.name,
118            (
119                "packets_count",
120                self.packets_count.swap(0, Ordering::Relaxed) as i64,
121                i64
122            ),
123            (
124                "packet_batches_count",
125                self.packet_batches_count.swap(0, Ordering::Relaxed) as i64,
126                i64
127            ),
128            (
129                "full_packet_batches_count",
130                self.full_packet_batches_count.swap(0, Ordering::Relaxed) as i64,
131                i64
132            ),
133            (
134                "channel_len",
135                self.max_channel_len.swap(0, Ordering::Relaxed) as i64,
136                i64
137            ),
138            (
139                "num_packets_dropped",
140                self.num_packets_dropped.swap(0, Ordering::Relaxed) as i64,
141                i64
142            ),
143        );
144    }
145}
146
147pub type Result<T> = std::result::Result<T, StreamerError>;
148
149fn recv_loop(
150    socket: &UdpSocket,
151    exit: &AtomicBool,
152    packet_batch_sender: &impl ChannelSend<PacketBatch>,
153    recycler: &PacketBatchRecycler,
154    stats: &StreamerReceiveStats,
155    coalesce: Option<Duration>,
156    use_pinned_memory: bool,
157    in_vote_only_mode: Option<Arc<AtomicBool>>,
158    is_staked_service: bool,
159) -> Result<()> {
160    loop {
161        let mut packet_batch = if use_pinned_memory {
162            PinnedPacketBatch::new_with_recycler(recycler, PACKETS_PER_BATCH, stats.name)
163        } else {
164            PinnedPacketBatch::with_capacity(PACKETS_PER_BATCH)
165        };
166        loop {
167            // Check for exit signal, even if socket is busy
168            // (for instance the leader transaction socket)
169            if exit.load(Ordering::Relaxed) {
170                return Ok(());
171            }
172
173            if let Some(ref in_vote_only_mode) = in_vote_only_mode {
174                if in_vote_only_mode.load(Ordering::Relaxed) {
175                    sleep(Duration::from_millis(1));
176                    continue;
177                }
178            }
179
180            if let Ok(len) = packet::recv_from(&mut packet_batch, socket, coalesce) {
181                if len > 0 {
182                    let StreamerReceiveStats {
183                        packets_count,
184                        packet_batches_count,
185                        full_packet_batches_count,
186                        max_channel_len,
187                        ..
188                    } = stats;
189
190                    packets_count.fetch_add(len, Ordering::Relaxed);
191                    packet_batches_count.fetch_add(1, Ordering::Relaxed);
192                    max_channel_len.fetch_max(packet_batch_sender.len(), Ordering::Relaxed);
193                    if len == PACKETS_PER_BATCH {
194                        full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
195                    }
196                    packet_batch
197                        .iter_mut()
198                        .for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service));
199                    match packet_batch_sender.try_send(packet_batch.into()) {
200                        Ok(_) => {}
201                        Err(TrySendError::Full(_)) => {
202                            stats.num_packets_dropped.fetch_add(len, Ordering::Relaxed);
203                        }
204                        Err(TrySendError::Disconnected(err)) => {
205                            return Err(StreamerError::Send(SendError(err)))
206                        }
207                    }
208                }
209                break;
210            }
211        }
212    }
213}
214
215#[allow(clippy::too_many_arguments)]
216pub fn receiver(
217    thread_name: String,
218    socket: Arc<UdpSocket>,
219    exit: Arc<AtomicBool>,
220    packet_batch_sender: impl ChannelSend<PacketBatch>,
221    recycler: PacketBatchRecycler,
222    stats: Arc<StreamerReceiveStats>,
223    coalesce: Option<Duration>,
224    use_pinned_memory: bool,
225    in_vote_only_mode: Option<Arc<AtomicBool>>,
226    is_staked_service: bool,
227) -> JoinHandle<()> {
228    let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
229    assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
230    Builder::new()
231        .name(thread_name)
232        .spawn(move || {
233            let _ = recv_loop(
234                &socket,
235                &exit,
236                &packet_batch_sender,
237                &recycler,
238                &stats,
239                coalesce,
240                use_pinned_memory,
241                in_vote_only_mode,
242                is_staked_service,
243            );
244        })
245        .unwrap()
246}
247
248#[derive(Debug, Default)]
249struct SendStats {
250    bytes: u64,
251    count: u64,
252}
253
254#[derive(Default)]
255struct StreamerSendStats {
256    host_map: HashMap<IpAddr, SendStats>,
257    since: Option<Instant>,
258}
259
260impl StreamerSendStats {
261    fn report_stats(
262        name: &'static str,
263        host_map: HashMap<IpAddr, SendStats>,
264        sample_duration: Option<Duration>,
265    ) {
266        const MAX_REPORT_ENTRIES: usize = 5;
267        let sample_ms = sample_duration.map(|d| d.as_millis()).unwrap_or_default();
268        let mut hist = Histogram::default();
269        let mut byte_sum = 0;
270        let mut pkt_count = 0;
271        host_map.iter().for_each(|(_addr, host_stats)| {
272            hist.increment(host_stats.bytes).unwrap();
273            byte_sum += host_stats.bytes;
274            pkt_count += host_stats.count;
275        });
276
277        datapoint_info!(
278            name,
279            ("streamer-send-sample_duration_ms", sample_ms, i64),
280            ("streamer-send-host_count", host_map.len(), i64),
281            ("streamer-send-bytes_total", byte_sum, i64),
282            ("streamer-send-pkt_count_total", pkt_count, i64),
283            (
284                "streamer-send-host_bytes_min",
285                hist.minimum().unwrap_or_default(),
286                i64
287            ),
288            (
289                "streamer-send-host_bytes_max",
290                hist.maximum().unwrap_or_default(),
291                i64
292            ),
293            (
294                "streamer-send-host_bytes_mean",
295                hist.mean().unwrap_or_default(),
296                i64
297            ),
298            (
299                "streamer-send-host_bytes_90pct",
300                hist.percentile(90.0).unwrap_or_default(),
301                i64
302            ),
303            (
304                "streamer-send-host_bytes_50pct",
305                hist.percentile(50.0).unwrap_or_default(),
306                i64
307            ),
308            (
309                "streamer-send-host_bytes_10pct",
310                hist.percentile(10.0).unwrap_or_default(),
311                i64
312            ),
313        );
314
315        let num_entries = host_map.len();
316        let mut entries: Vec<_> = host_map.into_iter().collect();
317        if entries.len() > MAX_REPORT_ENTRIES {
318            entries.select_nth_unstable_by_key(MAX_REPORT_ENTRIES, |(_addr, stats)| {
319                Reverse(stats.bytes)
320            });
321            entries.truncate(MAX_REPORT_ENTRIES);
322        }
323        info!(
324            "streamer send {} hosts: count:{} {:?}",
325            name, num_entries, entries,
326        );
327    }
328
329    fn maybe_submit(&mut self, name: &'static str, sender: &Sender<Box<dyn FnOnce() + Send>>) {
330        const SUBMIT_CADENCE: Duration = Duration::from_secs(10);
331        const MAP_SIZE_REPORTING_THRESHOLD: usize = 1_000;
332        let elapsed = self.since.as_ref().map(Instant::elapsed);
333        if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default()
334            && self.host_map.len() < MAP_SIZE_REPORTING_THRESHOLD
335        {
336            return;
337        }
338
339        let host_map = std::mem::take(&mut self.host_map);
340        let _ = sender.send(Box::new(move || {
341            Self::report_stats(name, host_map, elapsed);
342        }));
343
344        *self = Self {
345            since: Some(Instant::now()),
346            ..Self::default()
347        };
348    }
349
350    fn record(&mut self, pkt: PacketRef) {
351        let ent = self.host_map.entry(pkt.meta().addr).or_default();
352        ent.count += 1;
353        ent.bytes += pkt.data(..).map(<[u8]>::len).unwrap_or_default() as u64;
354    }
355}
356
357impl StakedNodes {
358    /// Calculate the stake stats: return the new (total_stake, min_stake and max_stake) tuple
359    fn calculate_stake_stats(
360        stakes: &Arc<HashMap<Pubkey, u64>>,
361        overrides: &HashMap<Pubkey, u64>,
362    ) -> (u64, u64, u64) {
363        let values = stakes
364            .iter()
365            .filter(|(pubkey, _)| !overrides.contains_key(pubkey))
366            .map(|(_, &stake)| stake)
367            .chain(overrides.values().copied())
368            .filter(|&stake| stake > 0);
369        let total_stake = values.clone().sum();
370        let (min_stake, max_stake) = values.minmax().into_option().unwrap_or_default();
371        (total_stake, min_stake, max_stake)
372    }
373
374    pub fn new(stakes: Arc<HashMap<Pubkey, u64>>, overrides: HashMap<Pubkey, u64>) -> Self {
375        let (total_stake, min_stake, max_stake) = Self::calculate_stake_stats(&stakes, &overrides);
376        Self {
377            stakes,
378            overrides,
379            total_stake,
380            max_stake,
381            min_stake,
382        }
383    }
384
385    pub fn get_node_stake(&self, pubkey: &Pubkey) -> Option<u64> {
386        self.overrides
387            .get(pubkey)
388            .or_else(|| self.stakes.get(pubkey))
389            .filter(|&&stake| stake > 0)
390            .copied()
391    }
392
393    #[inline]
394    pub fn total_stake(&self) -> u64 {
395        self.total_stake
396    }
397
398    #[inline]
399    pub(super) fn min_stake(&self) -> u64 {
400        self.min_stake
401    }
402
403    #[inline]
404    pub(super) fn max_stake(&self) -> u64 {
405        self.max_stake
406    }
407
408    // Update the stake map given a new stakes map
409    pub fn update_stake_map(&mut self, stakes: Arc<HashMap<Pubkey, u64>>) {
410        let (total_stake, min_stake, max_stake) =
411            Self::calculate_stake_stats(&stakes, &self.overrides);
412
413        self.total_stake = total_stake;
414        self.min_stake = min_stake;
415        self.max_stake = max_stake;
416        self.stakes = stakes;
417    }
418}
419
420fn recv_send(
421    sock: &UdpSocket,
422    r: &PacketBatchReceiver,
423    socket_addr_space: &SocketAddrSpace,
424    stats: &mut Option<StreamerSendStats>,
425) -> Result<()> {
426    let timer = Duration::new(1, 0);
427    let packet_batch = r.recv_timeout(timer)?;
428    if let Some(stats) = stats {
429        packet_batch.iter().for_each(|p| stats.record(p));
430    }
431    let packets = packet_batch.iter().filter_map(|pkt| {
432        let addr = pkt.meta().socket_addr();
433        let data = pkt.data(..)?;
434        socket_addr_space.check(&addr).then_some((data, addr))
435    });
436    batch_send(sock, packets.collect::<Vec<_>>())?;
437    Ok(())
438}
439
440pub fn recv_packet_batches(
441    recvr: &PacketBatchReceiver,
442) -> Result<(Vec<PacketBatch>, usize, Duration)> {
443    let recv_start = Instant::now();
444    let timer = Duration::new(1, 0);
445    let packet_batch = recvr.recv_timeout(timer)?;
446    trace!("got packets");
447    let mut num_packets = packet_batch.len();
448    let mut packet_batches = vec![packet_batch];
449    while let Ok(packet_batch) = recvr.try_recv() {
450        trace!("got more packets");
451        num_packets += packet_batch.len();
452        packet_batches.push(packet_batch);
453    }
454    let recv_duration = recv_start.elapsed();
455    trace!(
456        "packet batches len: {}, num packets: {}",
457        packet_batches.len(),
458        num_packets
459    );
460    Ok((packet_batches, num_packets, recv_duration))
461}
462
463pub fn responder(
464    name: &'static str,
465    sock: Arc<UdpSocket>,
466    r: PacketBatchReceiver,
467    socket_addr_space: SocketAddrSpace,
468    stats_reporter_sender: Option<Sender<Box<dyn FnOnce() + Send>>>,
469) -> JoinHandle<()> {
470    Builder::new()
471        .name(format!("solRspndr{name}"))
472        .spawn(move || {
473            let mut errors = 0;
474            let mut last_error = None;
475            let mut last_print = 0;
476            let mut stats = None;
477
478            if stats_reporter_sender.is_some() {
479                stats = Some(StreamerSendStats::default());
480            }
481
482            loop {
483                if let Err(e) = recv_send(&sock, &r, &socket_addr_space, &mut stats) {
484                    match e {
485                        StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
486                        StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
487                        _ => {
488                            errors += 1;
489                            last_error = Some(e);
490                        }
491                    }
492                }
493                let now = timestamp();
494                if now - last_print > 1000 && errors != 0 {
495                    datapoint_info!(name, ("errors", errors, i64),);
496                    info!("{} last-error: {:?} count: {}", name, last_error, errors);
497                    last_print = now;
498                    errors = 0;
499                }
500                if let Some(ref stats_reporter_sender) = stats_reporter_sender {
501                    if let Some(ref mut stats) = stats {
502                        stats.maybe_submit(name, stats_reporter_sender);
503                    }
504                }
505            }
506        })
507        .unwrap()
508}
509
510#[cfg(test)]
511mod test {
512    use {
513        super::*,
514        crate::{
515            packet::{Packet, PinnedPacketBatch, PACKET_DATA_SIZE},
516            streamer::{receiver, responder},
517        },
518        crossbeam_channel::unbounded,
519        solana_net_utils::bind_to_localhost,
520        solana_perf::recycler::Recycler,
521        std::{
522            io,
523            io::Write,
524            sync::{
525                atomic::{AtomicBool, Ordering},
526                Arc,
527            },
528            time::Duration,
529        },
530    };
531
532    fn get_packet_batches(r: PacketBatchReceiver, num_packets: &mut usize) {
533        for _ in 0..10 {
534            let packet_batch_res = r.recv_timeout(Duration::new(1, 0));
535            if packet_batch_res.is_err() {
536                continue;
537            }
538
539            *num_packets -= packet_batch_res.unwrap().len();
540
541            if *num_packets == 0 {
542                break;
543            }
544        }
545    }
546
547    #[test]
548    fn streamer_debug() {
549        write!(io::sink(), "{:?}", Packet::default()).unwrap();
550        write!(io::sink(), "{:?}", PinnedPacketBatch::default()).unwrap();
551    }
552    #[test]
553    fn streamer_send_test() {
554        let read = bind_to_localhost().expect("bind");
555        read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
556
557        let addr = read.local_addr().unwrap();
558        let send = bind_to_localhost().expect("bind");
559        let exit = Arc::new(AtomicBool::new(false));
560        let (s_reader, r_reader) = unbounded();
561        let stats = Arc::new(StreamerReceiveStats::new("test"));
562        let t_receiver = receiver(
563            "solRcvrTest".to_string(),
564            Arc::new(read),
565            exit.clone(),
566            s_reader,
567            Recycler::default(),
568            stats.clone(),
569            Some(Duration::from_millis(1)), // coalesce
570            true,
571            None,
572            false,
573        );
574        const NUM_PACKETS: usize = 5;
575        let t_responder = {
576            let (s_responder, r_responder) = unbounded();
577            let t_responder = responder(
578                "SendTest",
579                Arc::new(send),
580                r_responder,
581                SocketAddrSpace::Unspecified,
582                None,
583            );
584            let mut packet_batch = PinnedPacketBatch::default();
585            for i in 0..NUM_PACKETS {
586                let mut p = Packet::default();
587                {
588                    p.buffer_mut()[0] = i as u8;
589                    p.meta_mut().size = PACKET_DATA_SIZE;
590                    p.meta_mut().set_socket_addr(&addr);
591                }
592                packet_batch.push(p);
593            }
594            let packet_batch = PacketBatch::from(packet_batch);
595            s_responder.send(packet_batch).expect("send");
596            t_responder
597        };
598
599        let mut packets_remaining = NUM_PACKETS;
600        get_packet_batches(r_reader, &mut packets_remaining);
601        assert_eq!(packets_remaining, 0);
602        exit.store(true, Ordering::Relaxed);
603        assert!(stats.packet_batches_count.load(Ordering::Relaxed) >= 1);
604        assert_eq!(stats.packets_count.load(Ordering::Relaxed), NUM_PACKETS);
605        assert_eq!(stats.full_packet_batches_count.load(Ordering::Relaxed), 0);
606        t_receiver.join().expect("join");
607        t_responder.join().expect("join");
608    }
609}