solana_streamer/
streamer.rs

1//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
2//!
3
4use {
5    crate::{
6        packet::{
7            self, PacketBatch, PacketBatchRecycler, PacketRef, PinnedPacketBatch, PACKETS_PER_BATCH,
8        },
9        sendmmsg::{batch_send, SendPktsError},
10        socket::SocketAddrSpace,
11    },
12    crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender, TrySendError},
13    histogram::Histogram,
14    solana_net_utils::multihomed_sockets::{
15        BindIpAddrs, CurrentSocket, FixedSocketProvider, MultihomedSocketProvider, SocketProvider,
16    },
17    solana_packet::Packet,
18    solana_pubkey::Pubkey,
19    solana_time_utils::timestamp,
20    std::{
21        cmp::Reverse,
22        collections::HashMap,
23        net::{IpAddr, UdpSocket},
24        sync::{
25            atomic::{AtomicBool, AtomicUsize, Ordering},
26            Arc,
27        },
28        thread::{sleep, Builder, JoinHandle},
29        time::{Duration, Instant},
30    },
31    thiserror::Error,
32};
33#[cfg(unix)]
34use {
35    nix::poll::{PollFd, PollFlags},
36    std::os::fd::AsFd,
37};
38
39pub trait ChannelSend<T>: Send + 'static {
40    fn send(&self, msg: T) -> std::result::Result<(), SendError<T>>;
41
42    fn try_send(&self, msg: T) -> std::result::Result<(), TrySendError<T>>;
43
44    fn is_empty(&self) -> bool;
45
46    fn len(&self) -> usize;
47}
48
49impl<T> ChannelSend<T> for Sender<T>
50where
51    T: Send + 'static,
52{
53    #[inline]
54    fn send(&self, msg: T) -> std::result::Result<(), SendError<T>> {
55        self.send(msg)
56    }
57
58    #[inline]
59    fn try_send(&self, msg: T) -> std::result::Result<(), TrySendError<T>> {
60        self.try_send(msg)
61    }
62
63    #[inline]
64    fn is_empty(&self) -> bool {
65        self.is_empty()
66    }
67
68    #[inline]
69    fn len(&self) -> usize {
70        self.len()
71    }
72}
73
74pub(crate) const SOCKET_READ_TIMEOUT: Duration = Duration::from_secs(1);
75
76// Total stake and nodes => stake map
77#[derive(Default)]
78pub struct StakedNodes {
79    stakes: Arc<HashMap<Pubkey, u64>>,
80    overrides: HashMap<Pubkey, u64>,
81    total_stake: u64,
82}
83
84pub type PacketBatchReceiver = Receiver<PacketBatch>;
85pub type PacketBatchSender = Sender<PacketBatch>;
86
87#[derive(Error, Debug)]
88pub enum StreamerError {
89    #[error("I/O error")]
90    Io(#[from] std::io::Error),
91
92    #[error("receive timeout error")]
93    RecvTimeout(#[from] RecvTimeoutError),
94
95    #[error("send packets error")]
96    Send(#[from] SendError<PacketBatch>),
97
98    #[error(transparent)]
99    SendPktsError(#[from] SendPktsError),
100}
101
102pub struct StreamerReceiveStats {
103    pub name: &'static str,
104    pub packets_count: AtomicUsize,
105    pub packet_batches_count: AtomicUsize,
106    pub full_packet_batches_count: AtomicUsize,
107    pub max_channel_len: AtomicUsize,
108    pub num_packets_dropped: AtomicUsize,
109}
110
111impl StreamerReceiveStats {
112    pub fn new(name: &'static str) -> Self {
113        Self {
114            name,
115            packets_count: AtomicUsize::default(),
116            packet_batches_count: AtomicUsize::default(),
117            full_packet_batches_count: AtomicUsize::default(),
118            max_channel_len: AtomicUsize::default(),
119            num_packets_dropped: AtomicUsize::default(),
120        }
121    }
122
123    pub fn report(&self) {
124        datapoint_info!(
125            self.name,
126            (
127                "packets_count",
128                self.packets_count.swap(0, Ordering::Relaxed) as i64,
129                i64
130            ),
131            (
132                "packet_batches_count",
133                self.packet_batches_count.swap(0, Ordering::Relaxed) as i64,
134                i64
135            ),
136            (
137                "full_packet_batches_count",
138                self.full_packet_batches_count.swap(0, Ordering::Relaxed) as i64,
139                i64
140            ),
141            (
142                "channel_len",
143                self.max_channel_len.swap(0, Ordering::Relaxed) as i64,
144                i64
145            ),
146            (
147                "num_packets_dropped",
148                self.num_packets_dropped.swap(0, Ordering::Relaxed) as i64,
149                i64
150            ),
151        );
152    }
153}
154
155pub type Result<T> = std::result::Result<T, StreamerError>;
156
157fn recv_loop<P: SocketProvider>(
158    provider: &mut P,
159    exit: &AtomicBool,
160    packet_batch_sender: &impl ChannelSend<PacketBatch>,
161    recycler: &PacketBatchRecycler,
162    stats: &StreamerReceiveStats,
163    coalesce: Option<Duration>,
164    use_pinned_memory: bool,
165    in_vote_only_mode: Option<Arc<AtomicBool>>,
166    is_staked_service: bool,
167) -> Result<()> {
168    fn setup_socket(socket: &UdpSocket) -> Result<()> {
169        // Non-unix implementation may block indefinitely due to its lack of polling support,
170        // so we set a read timeout to avoid blocking indefinitely.
171        #[cfg(not(unix))]
172        socket.set_read_timeout(Some(SOCKET_READ_TIMEOUT))?;
173
174        #[cfg(unix)]
175        socket.set_nonblocking(true)?;
176
177        Ok(())
178    }
179
180    let mut socket = provider.current_socket_ref();
181    setup_socket(socket)?;
182    #[cfg(unix)]
183    let mut poll_fd = [PollFd::new(socket.as_fd(), PollFlags::POLLIN)];
184
185    loop {
186        let mut packet_batch = if use_pinned_memory {
187            PinnedPacketBatch::new_with_recycler(recycler, PACKETS_PER_BATCH, stats.name)
188        } else {
189            PinnedPacketBatch::with_capacity(PACKETS_PER_BATCH)
190        };
191        packet_batch.resize(PACKETS_PER_BATCH, Packet::default());
192
193        loop {
194            // Check for exit signal, even if socket is busy
195            // (for instance the leader transaction socket)
196            if exit.load(Ordering::Relaxed) {
197                return Ok(());
198            }
199
200            if let Some(ref in_vote_only_mode) = in_vote_only_mode {
201                if in_vote_only_mode.load(Ordering::Relaxed) {
202                    sleep(Duration::from_millis(1));
203                    continue;
204                }
205            }
206
207            #[cfg(unix)]
208            let result = packet::recv_from(&mut packet_batch, socket, coalesce, &mut poll_fd);
209            #[cfg(not(unix))]
210            let result = packet::recv_from(&mut packet_batch, socket, coalesce);
211
212            if let Ok(len) = result {
213                if len > 0 {
214                    let StreamerReceiveStats {
215                        packets_count,
216                        packet_batches_count,
217                        full_packet_batches_count,
218                        max_channel_len,
219                        ..
220                    } = stats;
221
222                    packets_count.fetch_add(len, Ordering::Relaxed);
223                    packet_batches_count.fetch_add(1, Ordering::Relaxed);
224                    max_channel_len.fetch_max(packet_batch_sender.len(), Ordering::Relaxed);
225                    if len == PACKETS_PER_BATCH {
226                        full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
227                    }
228                    packet_batch
229                        .iter_mut()
230                        .for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service));
231                    match packet_batch_sender.try_send(packet_batch.into()) {
232                        Ok(_) => {}
233                        Err(TrySendError::Full(_)) => {
234                            stats.num_packets_dropped.fetch_add(len, Ordering::Relaxed);
235                        }
236                        Err(TrySendError::Disconnected(err)) => {
237                            return Err(StreamerError::Send(SendError(err)))
238                        }
239                    }
240                }
241                break;
242            }
243        }
244
245        if let CurrentSocket::Changed(s) = provider.current_socket() {
246            socket = s;
247            setup_socket(socket)?;
248
249            #[cfg(unix)]
250            {
251                poll_fd = [PollFd::new(socket.as_fd(), PollFlags::POLLIN)];
252            }
253        }
254    }
255}
256
257#[allow(clippy::too_many_arguments)]
258pub fn receiver(
259    thread_name: String,
260    socket: Arc<UdpSocket>,
261    exit: Arc<AtomicBool>,
262    packet_batch_sender: impl ChannelSend<PacketBatch>,
263    recycler: PacketBatchRecycler,
264    stats: Arc<StreamerReceiveStats>,
265    coalesce: Option<Duration>,
266    use_pinned_memory: bool,
267    in_vote_only_mode: Option<Arc<AtomicBool>>,
268    is_staked_service: bool,
269) -> JoinHandle<()> {
270    Builder::new()
271        .name(thread_name)
272        .spawn(move || {
273            let mut provider = FixedSocketProvider::new(socket);
274            let _ = recv_loop(
275                &mut provider,
276                &exit,
277                &packet_batch_sender,
278                &recycler,
279                &stats,
280                coalesce,
281                use_pinned_memory,
282                in_vote_only_mode,
283                is_staked_service,
284            );
285        })
286        .unwrap()
287}
288
289#[allow(clippy::too_many_arguments)]
290pub fn receiver_atomic(
291    thread_name: String,
292    sockets: Arc<[UdpSocket]>,
293    bind_ip_addrs: Arc<BindIpAddrs>,
294    exit: Arc<AtomicBool>,
295    packet_batch_sender: impl ChannelSend<PacketBatch>,
296    recycler: PacketBatchRecycler,
297    stats: Arc<StreamerReceiveStats>,
298    coalesce: Option<Duration>,
299    use_pinned_memory: bool,
300    in_vote_only_mode: Option<Arc<AtomicBool>>,
301    is_staked_service: bool,
302) -> JoinHandle<()> {
303    Builder::new()
304        .name(thread_name)
305        .spawn(move || {
306            let mut provider = MultihomedSocketProvider::new(sockets, bind_ip_addrs);
307            let _ = recv_loop(
308                &mut provider,
309                &exit,
310                &packet_batch_sender,
311                &recycler,
312                &stats,
313                coalesce,
314                use_pinned_memory,
315                in_vote_only_mode,
316                is_staked_service,
317            );
318        })
319        .unwrap()
320}
321
322#[derive(Debug, Default)]
323struct SendStats {
324    bytes: u64,
325    count: u64,
326}
327
328#[derive(Default)]
329struct StreamerSendStats {
330    host_map: HashMap<IpAddr, SendStats>,
331    since: Option<Instant>,
332}
333
334impl StreamerSendStats {
335    fn report_stats(
336        name: &'static str,
337        host_map: HashMap<IpAddr, SendStats>,
338        sample_duration: Option<Duration>,
339    ) {
340        const MAX_REPORT_ENTRIES: usize = 5;
341        let sample_ms = sample_duration.map(|d| d.as_millis()).unwrap_or_default();
342        let mut hist = Histogram::default();
343        let mut byte_sum = 0;
344        let mut pkt_count = 0;
345        host_map.iter().for_each(|(_addr, host_stats)| {
346            hist.increment(host_stats.bytes).unwrap();
347            byte_sum += host_stats.bytes;
348            pkt_count += host_stats.count;
349        });
350
351        datapoint_info!(
352            name,
353            ("streamer-send-sample_duration_ms", sample_ms, i64),
354            ("streamer-send-host_count", host_map.len(), i64),
355            ("streamer-send-bytes_total", byte_sum, i64),
356            ("streamer-send-pkt_count_total", pkt_count, i64),
357            (
358                "streamer-send-host_bytes_min",
359                hist.minimum().unwrap_or_default(),
360                i64
361            ),
362            (
363                "streamer-send-host_bytes_max",
364                hist.maximum().unwrap_or_default(),
365                i64
366            ),
367            (
368                "streamer-send-host_bytes_mean",
369                hist.mean().unwrap_or_default(),
370                i64
371            ),
372            (
373                "streamer-send-host_bytes_90pct",
374                hist.percentile(90.0).unwrap_or_default(),
375                i64
376            ),
377            (
378                "streamer-send-host_bytes_50pct",
379                hist.percentile(50.0).unwrap_or_default(),
380                i64
381            ),
382            (
383                "streamer-send-host_bytes_10pct",
384                hist.percentile(10.0).unwrap_or_default(),
385                i64
386            ),
387        );
388
389        let num_entries = host_map.len();
390        let mut entries: Vec<_> = host_map.into_iter().collect();
391        if entries.len() > MAX_REPORT_ENTRIES {
392            entries.select_nth_unstable_by_key(MAX_REPORT_ENTRIES, |(_addr, stats)| {
393                Reverse(stats.bytes)
394            });
395            entries.truncate(MAX_REPORT_ENTRIES);
396        }
397        info!("streamer send {name} hosts: count:{num_entries} {entries:?}");
398    }
399
400    fn maybe_submit(&mut self, name: &'static str, sender: &Sender<Box<dyn FnOnce() + Send>>) {
401        const SUBMIT_CADENCE: Duration = Duration::from_secs(10);
402        const MAP_SIZE_REPORTING_THRESHOLD: usize = 1_000;
403        let elapsed = self.since.as_ref().map(Instant::elapsed);
404        if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default()
405            && self.host_map.len() < MAP_SIZE_REPORTING_THRESHOLD
406        {
407            return;
408        }
409
410        let host_map = std::mem::take(&mut self.host_map);
411        let _ = sender.send(Box::new(move || {
412            Self::report_stats(name, host_map, elapsed);
413        }));
414
415        *self = Self {
416            since: Some(Instant::now()),
417            ..Self::default()
418        };
419    }
420
421    fn record(&mut self, pkt: PacketRef) {
422        let ent = self.host_map.entry(pkt.meta().addr).or_default();
423        ent.count += 1;
424        ent.bytes += pkt.data(..).map(<[u8]>::len).unwrap_or_default() as u64;
425    }
426}
427
428impl StakedNodes {
429    fn calculate_total_stake(
430        stakes: &HashMap<Pubkey, u64>,
431        overrides: &HashMap<Pubkey, u64>,
432    ) -> u64 {
433        stakes
434            .iter()
435            .filter(|(pubkey, _)| !overrides.contains_key(pubkey))
436            .map(|(_, &stake)| stake)
437            .chain(overrides.values().copied())
438            .sum()
439    }
440
441    pub fn new(stakes: Arc<HashMap<Pubkey, u64>>, overrides: HashMap<Pubkey, u64>) -> Self {
442        let total_stake = Self::calculate_total_stake(&stakes, &overrides);
443        Self {
444            stakes,
445            overrides,
446            total_stake,
447        }
448    }
449
450    pub fn get_node_stake(&self, pubkey: &Pubkey) -> Option<u64> {
451        self.overrides
452            .get(pubkey)
453            .or_else(|| self.stakes.get(pubkey))
454            .filter(|&&stake| stake > 0)
455            .copied()
456    }
457
458    #[inline]
459    pub fn total_stake(&self) -> u64 {
460        self.total_stake
461    }
462
463    // Update the stake map given a new stakes map
464    pub fn update_stake_map(&mut self, stakes: Arc<HashMap<Pubkey, u64>>) {
465        let total_stake = Self::calculate_total_stake(&stakes, &self.overrides);
466        self.total_stake = total_stake;
467        self.stakes = stakes;
468    }
469}
470
471fn recv_send(
472    sock: &UdpSocket,
473    r: &PacketBatchReceiver,
474    socket_addr_space: &SocketAddrSpace,
475    stats: &mut Option<StreamerSendStats>,
476) -> Result<()> {
477    let timer = Duration::new(1, 0);
478    let packet_batch = r.recv_timeout(timer)?;
479    if let Some(stats) = stats {
480        packet_batch.iter().for_each(|p| stats.record(p));
481    }
482    let packets = packet_batch.iter().filter_map(|pkt| {
483        let addr = pkt.meta().socket_addr();
484        let data = pkt.data(..)?;
485        socket_addr_space.check(&addr).then_some((data, addr))
486    });
487    batch_send(sock, packets.collect::<Vec<_>>())?;
488    Ok(())
489}
490
491pub fn recv_packet_batches(
492    recvr: &PacketBatchReceiver,
493) -> Result<(Vec<PacketBatch>, usize, Duration)> {
494    let recv_start = Instant::now();
495    let timer = Duration::new(1, 0);
496    let packet_batch = recvr.recv_timeout(timer)?;
497    trace!("got packets");
498    let mut num_packets = packet_batch.len();
499    let mut packet_batches = vec![packet_batch];
500    while let Ok(packet_batch) = recvr.try_recv() {
501        trace!("got more packets");
502        num_packets += packet_batch.len();
503        packet_batches.push(packet_batch);
504    }
505    let recv_duration = recv_start.elapsed();
506    trace!(
507        "packet batches len: {}, num packets: {}",
508        packet_batches.len(),
509        num_packets
510    );
511    Ok((packet_batches, num_packets, recv_duration))
512}
513
514pub fn responder_atomic(
515    name: &'static str,
516    sockets: Arc<[UdpSocket]>,
517    bind_ip_addrs: Arc<BindIpAddrs>,
518    r: PacketBatchReceiver,
519    socket_addr_space: SocketAddrSpace,
520    stats_reporter_sender: Option<Sender<Box<dyn FnOnce() + Send>>>,
521) -> JoinHandle<()> {
522    Builder::new()
523        .name(format!("solRspndr{name}"))
524        .spawn(move || {
525            responder_loop(
526                MultihomedSocketProvider::new(sockets, bind_ip_addrs),
527                name,
528                r,
529                socket_addr_space,
530                stats_reporter_sender,
531            );
532        })
533        .unwrap()
534}
535
536pub fn responder(
537    name: &'static str,
538    sock: Arc<UdpSocket>,
539    r: PacketBatchReceiver,
540    socket_addr_space: SocketAddrSpace,
541    stats_reporter_sender: Option<Sender<Box<dyn FnOnce() + Send>>>,
542) -> JoinHandle<()> {
543    Builder::new()
544        .name(format!("solRspndr{name}"))
545        .spawn(move || {
546            responder_loop(
547                FixedSocketProvider::new(sock),
548                name,
549                r,
550                socket_addr_space,
551                stats_reporter_sender,
552            );
553        })
554        .unwrap()
555}
556
557fn responder_loop<P: SocketProvider>(
558    provider: P,
559    name: &'static str,
560    r: PacketBatchReceiver,
561    socket_addr_space: SocketAddrSpace,
562    stats_reporter_sender: Option<Sender<Box<dyn FnOnce() + Send>>>,
563) {
564    let mut errors = 0;
565    let mut last_error = None;
566    let mut last_print = 0;
567    let mut stats = None;
568
569    if stats_reporter_sender.is_some() {
570        stats = Some(StreamerSendStats::default());
571    }
572
573    loop {
574        let sock = provider.current_socket_ref();
575        if let Err(e) = recv_send(sock, &r, &socket_addr_space, &mut stats) {
576            match e {
577                StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
578                StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
579                _ => {
580                    errors += 1;
581                    last_error = Some(e);
582                }
583            }
584        }
585        let now = timestamp();
586        if now - last_print > 1000 && errors != 0 {
587            datapoint_info!(name, ("errors", errors, i64),);
588            info!("{name} last-error: {last_error:?} count: {errors}");
589            last_print = now;
590            errors = 0;
591        }
592        if let Some(ref stats_reporter_sender) = stats_reporter_sender {
593            if let Some(ref mut stats) = stats {
594                stats.maybe_submit(name, stats_reporter_sender);
595            }
596        }
597    }
598}
599
600#[cfg(test)]
601mod test {
602    use {
603        super::*,
604        crate::{
605            packet::{Packet, PinnedPacketBatch, PACKET_DATA_SIZE},
606            streamer::{receiver, responder},
607        },
608        crossbeam_channel::unbounded,
609        solana_net_utils::sockets::bind_to_localhost_unique,
610        solana_perf::recycler::Recycler,
611        std::{
612            io::{self, Write},
613            sync::{
614                atomic::{AtomicBool, Ordering},
615                Arc,
616            },
617            time::Duration,
618        },
619    };
620
621    fn get_packet_batches(r: PacketBatchReceiver, num_packets: &mut usize) {
622        for _ in 0..10 {
623            let packet_batch_res = r.recv_timeout(Duration::new(1, 0));
624            if packet_batch_res.is_err() {
625                continue;
626            }
627
628            *num_packets -= packet_batch_res.unwrap().len();
629
630            if *num_packets == 0 {
631                break;
632            }
633        }
634    }
635
636    #[test]
637    fn streamer_debug() {
638        write!(io::sink(), "{:?}", Packet::default()).unwrap();
639        write!(io::sink(), "{:?}", PinnedPacketBatch::default()).unwrap();
640    }
641    #[test]
642    fn streamer_send_test() {
643        let read = bind_to_localhost_unique().expect("should bind reader");
644        read.set_read_timeout(Some(SOCKET_READ_TIMEOUT)).unwrap();
645        let addr = read.local_addr().unwrap();
646        let send = bind_to_localhost_unique().expect("should bind sender");
647        let exit = Arc::new(AtomicBool::new(false));
648        let (s_reader, r_reader) = unbounded();
649        let stats = Arc::new(StreamerReceiveStats::new("test"));
650        let t_receiver = receiver(
651            "solRcvrTest".to_string(),
652            Arc::new(read),
653            exit.clone(),
654            s_reader,
655            Recycler::default(),
656            stats.clone(),
657            Some(Duration::from_millis(1)), // coalesce
658            true,
659            None,
660            false,
661        );
662        const NUM_PACKETS: usize = 5;
663        let t_responder = {
664            let (s_responder, r_responder) = unbounded();
665            let t_responder = responder(
666                "SendTest",
667                Arc::new(send),
668                r_responder,
669                SocketAddrSpace::Unspecified,
670                None,
671            );
672            let mut packet_batch = PinnedPacketBatch::default();
673            for i in 0..NUM_PACKETS {
674                let mut p = Packet::default();
675                {
676                    p.buffer_mut()[0] = i as u8;
677                    p.meta_mut().size = PACKET_DATA_SIZE;
678                    p.meta_mut().set_socket_addr(&addr);
679                }
680                packet_batch.push(p);
681            }
682            let packet_batch = PacketBatch::from(packet_batch);
683            s_responder.send(packet_batch).expect("send");
684            t_responder
685        };
686
687        let mut packets_remaining = NUM_PACKETS;
688        get_packet_batches(r_reader, &mut packets_remaining);
689        assert_eq!(packets_remaining, 0);
690        exit.store(true, Ordering::Relaxed);
691        assert!(stats.packet_batches_count.load(Ordering::Relaxed) >= 1);
692        assert_eq!(stats.packets_count.load(Ordering::Relaxed), NUM_PACKETS);
693        assert_eq!(stats.full_packet_batches_count.load(Ordering::Relaxed), 0);
694        t_receiver.join().expect("join");
695        t_responder.join().expect("join");
696    }
697}