solana_perf/
packet.rs

1//! The `packet` module defines data structures and methods to pull data from the network.
2#[cfg(feature = "dev-context-only-utils")]
3use bytes::{BufMut, BytesMut};
4use {
5    crate::{cuda_runtime::PinnedVec, recycler::Recycler},
6    bincode::config::Options,
7    bytes::Bytes,
8    rayon::{
9        iter::{IndexedParallelIterator, ParallelIterator},
10        prelude::{IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator},
11    },
12    serde::{de::DeserializeOwned, Deserialize, Serialize},
13    std::{
14        borrow::Borrow,
15        io::Read,
16        net::SocketAddr,
17        ops::{Deref, DerefMut, Index, IndexMut},
18        slice::{Iter, SliceIndex},
19    },
20};
21pub use {
22    bytes,
23    solana_packet::{self, Meta, Packet, PacketFlags, PACKET_DATA_SIZE},
24};
25
26pub const NUM_PACKETS: usize = 1024 * 8;
27
28pub const PACKETS_PER_BATCH: usize = 64;
29pub const NUM_RCVMMSGS: usize = 64;
30
31/// Representation of a packet used in TPU.
32#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
33#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
34pub struct BytesPacket {
35    buffer: Bytes,
36    meta: Meta,
37}
38
39impl BytesPacket {
40    pub fn new(buffer: Bytes, meta: Meta) -> Self {
41        Self { buffer, meta }
42    }
43
44    #[cfg(feature = "dev-context-only-utils")]
45    pub fn empty() -> Self {
46        Self {
47            buffer: Bytes::new(),
48            meta: Meta::default(),
49        }
50    }
51
52    #[cfg(feature = "dev-context-only-utils")]
53    pub fn from_bytes(dest: Option<&SocketAddr>, buffer: impl Into<Bytes>) -> Self {
54        let buffer = buffer.into();
55        let mut meta = Meta {
56            size: buffer.len(),
57            ..Default::default()
58        };
59        if let Some(dest) = dest {
60            meta.set_socket_addr(dest);
61        }
62
63        Self { buffer, meta }
64    }
65
66    #[cfg(feature = "dev-context-only-utils")]
67    pub fn from_data<T>(dest: Option<&SocketAddr>, data: T) -> bincode::Result<Self>
68    where
69        T: solana_packet::Encode,
70    {
71        let buffer = BytesMut::with_capacity(PACKET_DATA_SIZE);
72        let mut writer = buffer.writer();
73        data.encode(&mut writer)?;
74        let buffer = writer.into_inner();
75        let buffer = buffer.freeze();
76
77        let mut meta = Meta {
78            size: buffer.len(),
79            ..Default::default()
80        };
81        if let Some(dest) = dest {
82            meta.set_socket_addr(dest);
83        }
84
85        Ok(Self { buffer, meta })
86    }
87
88    #[inline]
89    pub fn data<I>(&self, index: I) -> Option<&<I as SliceIndex<[u8]>>::Output>
90    where
91        I: SliceIndex<[u8]>,
92    {
93        if self.meta.discard() {
94            None
95        } else {
96            self.buffer.get(index)
97        }
98    }
99
100    #[inline]
101    pub fn meta(&self) -> &Meta {
102        &self.meta
103    }
104
105    #[inline]
106    pub fn meta_mut(&mut self) -> &mut Meta {
107        &mut self.meta
108    }
109
110    pub fn deserialize_slice<T, I>(&self, index: I) -> bincode::Result<T>
111    where
112        T: serde::de::DeserializeOwned,
113        I: SliceIndex<[u8], Output = [u8]>,
114    {
115        let bytes = self.data(index).ok_or(bincode::ErrorKind::SizeLimit)?;
116        bincode::options()
117            .with_limit(self.meta().size as u64)
118            .with_fixint_encoding()
119            .reject_trailing_bytes()
120            .deserialize(bytes)
121    }
122
123    #[cfg(feature = "dev-context-only-utils")]
124    pub fn copy_from_slice(&mut self, slice: &[u8]) {
125        self.buffer = Bytes::from(slice.to_vec());
126    }
127
128    #[inline]
129    pub fn as_ref(&self) -> PacketRef<'_> {
130        PacketRef::Bytes(self)
131    }
132
133    #[inline]
134    pub fn as_mut(&mut self) -> PacketRefMut<'_> {
135        PacketRefMut::Bytes(self)
136    }
137
138    #[inline]
139    pub fn buffer(&self) -> &Bytes {
140        &self.buffer
141    }
142
143    #[inline]
144    pub fn set_buffer(&mut self, buffer: impl Into<Bytes>) {
145        let buffer = buffer.into();
146        self.meta.size = buffer.len();
147        self.buffer = buffer;
148    }
149}
150
151#[cfg_attr(feature = "frozen-abi", derive(AbiExample, AbiEnumVisitor))]
152#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
153pub enum PacketBatch {
154    Pinned(PinnedPacketBatch),
155    Bytes(BytesPacketBatch),
156}
157
158impl PacketBatch {
159    #[cfg(feature = "dev-context-only-utils")]
160    pub fn first(&self) -> Option<PacketRef<'_>> {
161        match self {
162            Self::Pinned(batch) => batch.first().map(PacketRef::from),
163            Self::Bytes(batch) => batch.first().map(PacketRef::from),
164        }
165    }
166
167    #[cfg(feature = "dev-context-only-utils")]
168    pub fn first_mut(&mut self) -> Option<PacketRefMut<'_>> {
169        match self {
170            Self::Pinned(batch) => batch.first_mut().map(PacketRefMut::from),
171            Self::Bytes(batch) => batch.first_mut().map(PacketRefMut::from),
172        }
173    }
174
175    /// Returns `true` if the batch contains no elements.
176    pub fn is_empty(&self) -> bool {
177        match self {
178            Self::Pinned(batch) => batch.is_empty(),
179            Self::Bytes(batch) => batch.is_empty(),
180        }
181    }
182
183    /// Returns a reference to an element.
184    pub fn get(&self, index: usize) -> Option<PacketRef<'_>> {
185        match self {
186            Self::Pinned(batch) => batch.get(index).map(PacketRef::from),
187            Self::Bytes(batch) => batch.get(index).map(PacketRef::from),
188        }
189    }
190
191    pub fn get_mut(&mut self, index: usize) -> Option<PacketRefMut<'_>> {
192        match self {
193            Self::Pinned(batch) => batch.get_mut(index).map(PacketRefMut::from),
194            Self::Bytes(batch) => batch.get_mut(index).map(PacketRefMut::from),
195        }
196    }
197
198    pub fn iter(&self) -> PacketBatchIter<'_> {
199        match self {
200            Self::Pinned(batch) => PacketBatchIter::Pinned(batch.iter()),
201            Self::Bytes(batch) => PacketBatchIter::Bytes(batch.iter()),
202        }
203    }
204
205    pub fn iter_mut(&mut self) -> PacketBatchIterMut<'_> {
206        match self {
207            Self::Pinned(batch) => PacketBatchIterMut::Pinned(batch.iter_mut()),
208            Self::Bytes(batch) => PacketBatchIterMut::Bytes(batch.iter_mut()),
209        }
210    }
211
212    pub fn par_iter(&self) -> PacketBatchParIter<'_> {
213        match self {
214            Self::Pinned(batch) => {
215                PacketBatchParIter::Pinned(batch.par_iter().map(PacketRef::from))
216            }
217            Self::Bytes(batch) => PacketBatchParIter::Bytes(batch.par_iter().map(PacketRef::from)),
218        }
219    }
220
221    pub fn par_iter_mut(&mut self) -> PacketBatchParIterMut<'_> {
222        match self {
223            Self::Pinned(batch) => {
224                PacketBatchParIterMut::Pinned(batch.par_iter_mut().map(PacketRefMut::from))
225            }
226            Self::Bytes(batch) => {
227                PacketBatchParIterMut::Bytes(batch.par_iter_mut().map(PacketRefMut::from))
228            }
229        }
230    }
231
232    pub fn len(&self) -> usize {
233        match self {
234            Self::Pinned(batch) => batch.len(),
235            Self::Bytes(batch) => batch.len(),
236        }
237    }
238}
239
240impl From<PinnedPacketBatch> for PacketBatch {
241    fn from(batch: PinnedPacketBatch) -> Self {
242        Self::Pinned(batch)
243    }
244}
245
246impl From<BytesPacketBatch> for PacketBatch {
247    fn from(batch: BytesPacketBatch) -> Self {
248        Self::Bytes(batch)
249    }
250}
251
252impl From<Vec<BytesPacket>> for PacketBatch {
253    fn from(batch: Vec<BytesPacket>) -> Self {
254        Self::Bytes(BytesPacketBatch::from(batch))
255    }
256}
257
258impl<'a> IntoIterator for &'a PacketBatch {
259    type Item = PacketRef<'a>;
260    type IntoIter = PacketBatchIter<'a>;
261    fn into_iter(self) -> Self::IntoIter {
262        self.iter()
263    }
264}
265
266impl<'a> IntoIterator for &'a mut PacketBatch {
267    type Item = PacketRefMut<'a>;
268    type IntoIter = PacketBatchIterMut<'a>;
269    fn into_iter(self) -> Self::IntoIter {
270        self.iter_mut()
271    }
272}
273
274impl<'a> IntoParallelIterator for &'a PacketBatch {
275    type Iter = PacketBatchParIter<'a>;
276    type Item = PacketRef<'a>;
277    fn into_par_iter(self) -> Self::Iter {
278        self.par_iter()
279    }
280}
281
282impl<'a> IntoParallelIterator for &'a mut PacketBatch {
283    type Iter = PacketBatchParIterMut<'a>;
284    type Item = PacketRefMut<'a>;
285    fn into_par_iter(self) -> Self::Iter {
286        self.par_iter_mut()
287    }
288}
289
290#[derive(Clone, Copy, Debug, Eq)]
291pub enum PacketRef<'a> {
292    Packet(&'a Packet),
293    Bytes(&'a BytesPacket),
294}
295
296impl PartialEq for PacketRef<'_> {
297    fn eq(&self, other: &PacketRef<'_>) -> bool {
298        self.meta().eq(other.meta()) && self.data(..).eq(&other.data(..))
299    }
300}
301
302impl<'a> From<&'a Packet> for PacketRef<'a> {
303    fn from(packet: &'a Packet) -> Self {
304        Self::Packet(packet)
305    }
306}
307
308impl<'a> From<&'a mut Packet> for PacketRef<'a> {
309    fn from(packet: &'a mut Packet) -> Self {
310        Self::Packet(packet)
311    }
312}
313
314impl<'a> From<&'a BytesPacket> for PacketRef<'a> {
315    fn from(packet: &'a BytesPacket) -> Self {
316        Self::Bytes(packet)
317    }
318}
319
320impl<'a> From<&'a mut BytesPacket> for PacketRef<'a> {
321    fn from(packet: &'a mut BytesPacket) -> Self {
322        Self::Bytes(packet)
323    }
324}
325
326impl<'a> PacketRef<'a> {
327    pub fn data<I>(&self, index: I) -> Option<&'a <I as SliceIndex<[u8]>>::Output>
328    where
329        I: SliceIndex<[u8]>,
330    {
331        match self {
332            Self::Packet(packet) => packet.data(index),
333            Self::Bytes(packet) => packet.data(index),
334        }
335    }
336
337    #[inline]
338    pub fn meta(&self) -> &Meta {
339        match self {
340            Self::Packet(packet) => packet.meta(),
341            Self::Bytes(packet) => packet.meta(),
342        }
343    }
344
345    pub fn deserialize_slice<T, I>(&self, index: I) -> bincode::Result<T>
346    where
347        T: serde::de::DeserializeOwned,
348        I: SliceIndex<[u8], Output = [u8]>,
349    {
350        match self {
351            Self::Packet(packet) => packet.deserialize_slice(index),
352            Self::Bytes(packet) => packet.deserialize_slice(index),
353        }
354    }
355
356    pub fn to_bytes_packet(&self) -> BytesPacket {
357        match self {
358            // In case of the legacy `Packet` variant, we unfortunately need to
359            // make a copy.
360            Self::Packet(packet) => {
361                let buffer = packet
362                    .data(..)
363                    .map(|data| Bytes::from(data.to_vec()))
364                    .unwrap_or_else(Bytes::new);
365                BytesPacket::new(buffer, self.meta().clone())
366            }
367            // Cheap clone of `Bytes`.
368            // We call `to_owned()` twice, because `packet` is `&&BytesPacket`
369            // at this point. This will become less annoying once we switch to
370            // `BytesPacket` entirely and deal just with `Vec<BytesPacket>`
371            // everywhere.
372            Self::Bytes(packet) => packet.to_owned().to_owned(),
373        }
374    }
375}
376
377#[derive(Debug, Eq)]
378pub enum PacketRefMut<'a> {
379    Packet(&'a mut Packet),
380    Bytes(&'a mut BytesPacket),
381}
382
383impl<'a> PartialEq for PacketRefMut<'a> {
384    fn eq(&self, other: &PacketRefMut<'a>) -> bool {
385        self.data(..).eq(&other.data(..)) && self.meta().eq(other.meta())
386    }
387}
388
389impl<'a> From<&'a mut Packet> for PacketRefMut<'a> {
390    fn from(packet: &'a mut Packet) -> Self {
391        Self::Packet(packet)
392    }
393}
394
395impl<'a> From<&'a mut BytesPacket> for PacketRefMut<'a> {
396    fn from(packet: &'a mut BytesPacket) -> Self {
397        Self::Bytes(packet)
398    }
399}
400
401impl PacketRefMut<'_> {
402    pub fn data<I>(&self, index: I) -> Option<&<I as SliceIndex<[u8]>>::Output>
403    where
404        I: SliceIndex<[u8]>,
405    {
406        match self {
407            Self::Packet(packet) => packet.data(index),
408            Self::Bytes(packet) => packet.data(index),
409        }
410    }
411
412    #[inline]
413    pub fn meta(&self) -> &Meta {
414        match self {
415            Self::Packet(packet) => packet.meta(),
416            Self::Bytes(packet) => packet.meta(),
417        }
418    }
419
420    #[inline]
421    pub fn meta_mut(&mut self) -> &mut Meta {
422        match self {
423            Self::Packet(packet) => packet.meta_mut(),
424            Self::Bytes(packet) => packet.meta_mut(),
425        }
426    }
427
428    pub fn deserialize_slice<T, I>(&self, index: I) -> bincode::Result<T>
429    where
430        T: serde::de::DeserializeOwned,
431        I: SliceIndex<[u8], Output = [u8]>,
432    {
433        match self {
434            Self::Packet(packet) => packet.deserialize_slice(index),
435            Self::Bytes(packet) => packet.deserialize_slice(index),
436        }
437    }
438
439    #[cfg(feature = "dev-context-only-utils")]
440    #[inline]
441    pub fn copy_from_slice(&mut self, src: &[u8]) {
442        match self {
443            Self::Packet(packet) => {
444                let size = src.len();
445                packet.buffer_mut()[..size].copy_from_slice(src);
446            }
447            Self::Bytes(packet) => packet.copy_from_slice(src),
448        }
449    }
450
451    #[inline]
452    pub fn as_ref(&self) -> PacketRef<'_> {
453        match self {
454            Self::Packet(packet) => PacketRef::Packet(packet),
455            Self::Bytes(packet) => PacketRef::Bytes(packet),
456        }
457    }
458}
459
460pub enum PacketBatchIter<'a> {
461    Pinned(std::slice::Iter<'a, Packet>),
462    Bytes(std::slice::Iter<'a, BytesPacket>),
463}
464
465impl DoubleEndedIterator for PacketBatchIter<'_> {
466    fn next_back(&mut self) -> Option<Self::Item> {
467        match self {
468            Self::Pinned(iter) => iter.next_back().map(PacketRef::Packet),
469            Self::Bytes(iter) => iter.next_back().map(PacketRef::Bytes),
470        }
471    }
472}
473
474impl<'a> Iterator for PacketBatchIter<'a> {
475    type Item = PacketRef<'a>;
476
477    fn next(&mut self) -> Option<Self::Item> {
478        match self {
479            Self::Pinned(iter) => iter.next().map(PacketRef::Packet),
480            Self::Bytes(iter) => iter.next().map(PacketRef::Bytes),
481        }
482    }
483}
484
485pub enum PacketBatchIterMut<'a> {
486    Pinned(std::slice::IterMut<'a, Packet>),
487    Bytes(std::slice::IterMut<'a, BytesPacket>),
488}
489
490impl DoubleEndedIterator for PacketBatchIterMut<'_> {
491    fn next_back(&mut self) -> Option<Self::Item> {
492        match self {
493            Self::Pinned(iter) => iter.next_back().map(PacketRefMut::Packet),
494            Self::Bytes(iter) => iter.next_back().map(PacketRefMut::Bytes),
495        }
496    }
497}
498
499impl<'a> Iterator for PacketBatchIterMut<'a> {
500    type Item = PacketRefMut<'a>;
501
502    fn next(&mut self) -> Option<Self::Item> {
503        match self {
504            Self::Pinned(iter) => iter.next().map(PacketRefMut::Packet),
505            Self::Bytes(iter) => iter.next().map(PacketRefMut::Bytes),
506        }
507    }
508}
509
510type PacketParIter<'a> = rayon::slice::Iter<'a, Packet>;
511type BytesPacketParIter<'a> = rayon::slice::Iter<'a, BytesPacket>;
512
513pub enum PacketBatchParIter<'a> {
514    Pinned(
515        rayon::iter::Map<
516            PacketParIter<'a>,
517            fn(<PacketParIter<'a> as ParallelIterator>::Item) -> PacketRef<'a>,
518        >,
519    ),
520    Bytes(
521        rayon::iter::Map<
522            BytesPacketParIter<'a>,
523            fn(<BytesPacketParIter<'a> as ParallelIterator>::Item) -> PacketRef<'a>,
524        >,
525    ),
526}
527
528impl<'a> ParallelIterator for PacketBatchParIter<'a> {
529    type Item = PacketRef<'a>;
530    fn drive_unindexed<C>(self, consumer: C) -> C::Result
531    where
532        C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
533    {
534        match self {
535            Self::Pinned(iter) => iter.drive_unindexed(consumer),
536            Self::Bytes(iter) => iter.drive_unindexed(consumer),
537        }
538    }
539}
540
541impl IndexedParallelIterator for PacketBatchParIter<'_> {
542    fn len(&self) -> usize {
543        match self {
544            Self::Pinned(iter) => iter.len(),
545            Self::Bytes(iter) => iter.len(),
546        }
547    }
548
549    fn drive<C: rayon::iter::plumbing::Consumer<Self::Item>>(self, consumer: C) -> C::Result {
550        match self {
551            Self::Pinned(iter) => iter.drive(consumer),
552            Self::Bytes(iter) => iter.drive(consumer),
553        }
554    }
555
556    fn with_producer<CB: rayon::iter::plumbing::ProducerCallback<Self::Item>>(
557        self,
558        callback: CB,
559    ) -> CB::Output {
560        match self {
561            Self::Pinned(iter) => iter.with_producer(callback),
562            Self::Bytes(iter) => iter.with_producer(callback),
563        }
564    }
565}
566
567type PacketParIterMut<'a> = rayon::slice::IterMut<'a, Packet>;
568type BytesPacketParIterMut<'a> = rayon::slice::IterMut<'a, BytesPacket>;
569
570pub enum PacketBatchParIterMut<'a> {
571    Pinned(
572        rayon::iter::Map<
573            PacketParIterMut<'a>,
574            fn(<PacketParIterMut<'a> as ParallelIterator>::Item) -> PacketRefMut<'a>,
575        >,
576    ),
577    Bytes(
578        rayon::iter::Map<
579            BytesPacketParIterMut<'a>,
580            fn(<BytesPacketParIterMut<'a> as ParallelIterator>::Item) -> PacketRefMut<'a>,
581        >,
582    ),
583}
584
585impl<'a> ParallelIterator for PacketBatchParIterMut<'a> {
586    type Item = PacketRefMut<'a>;
587    fn drive_unindexed<C>(self, consumer: C) -> C::Result
588    where
589        C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
590    {
591        match self {
592            Self::Pinned(iter) => iter.drive_unindexed(consumer),
593            Self::Bytes(iter) => iter.drive_unindexed(consumer),
594        }
595    }
596}
597
598impl IndexedParallelIterator for PacketBatchParIterMut<'_> {
599    fn len(&self) -> usize {
600        match self {
601            Self::Pinned(iter) => iter.len(),
602            Self::Bytes(iter) => iter.len(),
603        }
604    }
605
606    fn drive<C: rayon::iter::plumbing::Consumer<Self::Item>>(self, consumer: C) -> C::Result {
607        match self {
608            Self::Pinned(iter) => iter.drive(consumer),
609            Self::Bytes(iter) => iter.drive(consumer),
610        }
611    }
612
613    fn with_producer<CB: rayon::iter::plumbing::ProducerCallback<Self::Item>>(
614        self,
615        callback: CB,
616    ) -> CB::Output {
617        match self {
618            Self::Pinned(iter) => iter.with_producer(callback),
619            Self::Bytes(iter) => iter.with_producer(callback),
620        }
621    }
622}
623
624#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
625#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
626pub struct PinnedPacketBatch {
627    packets: PinnedVec<Packet>,
628}
629
630pub type PacketBatchRecycler = Recycler<PinnedVec<Packet>>;
631
632impl PinnedPacketBatch {
633    pub fn new(packets: Vec<Packet>) -> Self {
634        let packets = PinnedVec::from_vec(packets);
635        Self { packets }
636    }
637
638    pub fn with_capacity(capacity: usize) -> Self {
639        let packets = PinnedVec::with_capacity(capacity);
640        Self { packets }
641    }
642
643    pub fn new_pinned_with_capacity(capacity: usize) -> Self {
644        let mut batch = Self::with_capacity(capacity);
645        batch.packets.reserve_and_pin(capacity);
646        batch
647    }
648
649    pub fn new_unpinned_with_recycler(
650        recycler: &PacketBatchRecycler,
651        capacity: usize,
652        name: &'static str,
653    ) -> Self {
654        let mut packets = recycler.allocate(name);
655        packets.reserve(capacity);
656        Self { packets }
657    }
658
659    pub fn new_with_recycler(
660        recycler: &PacketBatchRecycler,
661        capacity: usize,
662        name: &'static str,
663    ) -> Self {
664        let mut packets = recycler.allocate(name);
665        packets.reserve_and_pin(capacity);
666        Self { packets }
667    }
668
669    pub fn new_with_recycler_data(
670        recycler: &PacketBatchRecycler,
671        name: &'static str,
672        mut packets: Vec<Packet>,
673    ) -> Self {
674        let mut batch = Self::new_with_recycler(recycler, packets.len(), name);
675        batch.packets.append(&mut packets);
676        batch
677    }
678
679    pub fn new_unpinned_with_recycler_data_and_dests<S, T>(
680        recycler: &PacketBatchRecycler,
681        name: &'static str,
682        dests_and_data: impl IntoIterator<Item = (S, T), IntoIter: ExactSizeIterator>,
683    ) -> Self
684    where
685        S: Borrow<SocketAddr>,
686        T: solana_packet::Encode,
687    {
688        let dests_and_data = dests_and_data.into_iter();
689        let mut batch = Self::new_unpinned_with_recycler(recycler, dests_and_data.len(), name);
690        batch
691            .packets
692            .resize(dests_and_data.len(), Packet::default());
693
694        for ((addr, data), packet) in dests_and_data.zip(batch.packets.iter_mut()) {
695            let addr = addr.borrow();
696            if !addr.ip().is_unspecified() && addr.port() != 0 {
697                if let Err(e) = Packet::populate_packet(packet, Some(addr), &data) {
698                    // TODO: This should never happen. Instead the caller should
699                    // break the payload into smaller messages, and here any errors
700                    // should be propagated.
701                    error!("Couldn't write to packet {e:?}. Data skipped.");
702                    packet.meta_mut().set_discard(true);
703                }
704            } else {
705                trace!("Dropping packet, as destination is unknown");
706                packet.meta_mut().set_discard(true);
707            }
708        }
709        batch
710    }
711
712    pub fn new_unpinned_with_recycler_data(
713        recycler: &PacketBatchRecycler,
714        name: &'static str,
715        mut packets: Vec<Packet>,
716    ) -> Self {
717        let mut batch = Self::new_unpinned_with_recycler(recycler, packets.len(), name);
718        batch.packets.append(&mut packets);
719        batch
720    }
721
722    pub fn set_addr(&mut self, addr: &SocketAddr) {
723        for p in self.iter_mut() {
724            p.meta_mut().set_socket_addr(addr);
725        }
726    }
727}
728
729impl Deref for PinnedPacketBatch {
730    type Target = PinnedVec<Packet>;
731
732    fn deref(&self) -> &Self::Target {
733        &self.packets
734    }
735}
736
737impl DerefMut for PinnedPacketBatch {
738    fn deref_mut(&mut self) -> &mut Self::Target {
739        &mut self.packets
740    }
741}
742
743impl<I: SliceIndex<[Packet]>> Index<I> for PinnedPacketBatch {
744    type Output = I::Output;
745
746    #[inline]
747    fn index(&self, index: I) -> &Self::Output {
748        &self.packets[index]
749    }
750}
751
752impl<I: SliceIndex<[Packet]>> IndexMut<I> for PinnedPacketBatch {
753    #[inline]
754    fn index_mut(&mut self, index: I) -> &mut Self::Output {
755        &mut self.packets[index]
756    }
757}
758
759impl<'a> IntoIterator for &'a PinnedPacketBatch {
760    type Item = &'a Packet;
761    type IntoIter = Iter<'a, Packet>;
762
763    fn into_iter(self) -> Self::IntoIter {
764        self.packets.iter()
765    }
766}
767
768impl<'a> IntoParallelIterator for &'a PinnedPacketBatch {
769    type Iter = rayon::slice::Iter<'a, Packet>;
770    type Item = &'a Packet;
771    fn into_par_iter(self) -> Self::Iter {
772        self.packets.par_iter()
773    }
774}
775
776impl<'a> IntoParallelIterator for &'a mut PinnedPacketBatch {
777    type Iter = rayon::slice::IterMut<'a, Packet>;
778    type Item = &'a mut Packet;
779    fn into_par_iter(self) -> Self::Iter {
780        self.packets.par_iter_mut()
781    }
782}
783
784impl From<PinnedPacketBatch> for Vec<Packet> {
785    fn from(batch: PinnedPacketBatch) -> Self {
786        batch.packets.into()
787    }
788}
789
790pub fn to_packet_batches<T: Serialize>(items: &[T], chunk_size: usize) -> Vec<PacketBatch> {
791    items
792        .chunks(chunk_size)
793        .map(|batch_items| {
794            let mut batch = PinnedPacketBatch::with_capacity(batch_items.len());
795            batch.resize(batch_items.len(), Packet::default());
796            for (item, packet) in batch_items.iter().zip(batch.packets.iter_mut()) {
797                Packet::populate_packet(packet, None, item).expect("serialize request");
798            }
799            batch.into()
800        })
801        .collect()
802}
803
804#[cfg(test)]
805fn to_packet_batches_for_tests<T: Serialize>(items: &[T]) -> Vec<PacketBatch> {
806    to_packet_batches(items, NUM_PACKETS)
807}
808
809#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
810#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
811pub struct BytesPacketBatch {
812    packets: Vec<BytesPacket>,
813}
814
815impl BytesPacketBatch {
816    pub fn new() -> Self {
817        Self::default()
818    }
819
820    pub fn with_capacity(capacity: usize) -> Self {
821        let packets = Vec::with_capacity(capacity);
822        Self { packets }
823    }
824
825    pub fn to_pinned_packet_batch(&self) -> PinnedPacketBatch {
826        let mut batch = PinnedPacketBatch::new_pinned_with_capacity(self.len());
827        for bytes_packet in self.iter() {
828            let mut packet = Packet::default();
829            let size = bytes_packet.meta().size;
830            *packet.meta_mut() = bytes_packet.meta().clone();
831            packet.meta_mut().size = size;
832            packet.buffer_mut()[..size].copy_from_slice(&bytes_packet.buffer);
833
834            batch.push(packet);
835        }
836
837        batch
838    }
839}
840
841impl Deref for BytesPacketBatch {
842    type Target = Vec<BytesPacket>;
843
844    fn deref(&self) -> &Self::Target {
845        &self.packets
846    }
847}
848
849impl DerefMut for BytesPacketBatch {
850    fn deref_mut(&mut self) -> &mut Self::Target {
851        &mut self.packets
852    }
853}
854
855impl From<Vec<BytesPacket>> for BytesPacketBatch {
856    fn from(packets: Vec<BytesPacket>) -> Self {
857        Self { packets }
858    }
859}
860
861impl FromIterator<BytesPacket> for BytesPacketBatch {
862    fn from_iter<T: IntoIterator<Item = BytesPacket>>(iter: T) -> Self {
863        let packets = Vec::from_iter(iter);
864        Self { packets }
865    }
866}
867
868impl<'a> IntoIterator for &'a BytesPacketBatch {
869    type Item = &'a BytesPacket;
870    type IntoIter = Iter<'a, BytesPacket>;
871
872    fn into_iter(self) -> Self::IntoIter {
873        self.packets.iter()
874    }
875}
876
877impl<'a> IntoParallelIterator for &'a BytesPacketBatch {
878    type Iter = rayon::slice::Iter<'a, BytesPacket>;
879    type Item = &'a BytesPacket;
880    fn into_par_iter(self) -> Self::Iter {
881        self.packets.par_iter()
882    }
883}
884
885impl<'a> IntoParallelIterator for &'a mut BytesPacketBatch {
886    type Iter = rayon::slice::IterMut<'a, BytesPacket>;
887    type Item = &'a mut BytesPacket;
888    fn into_par_iter(self) -> Self::Iter {
889        self.packets.par_iter_mut()
890    }
891}
892
893pub fn deserialize_from_with_limit<R, T>(reader: R) -> bincode::Result<T>
894where
895    R: Read,
896    T: DeserializeOwned,
897{
898    // with_limit causes pre-allocation size to be limited
899    // to prevent against memory exhaustion attacks.
900    bincode::options()
901        .with_limit(PACKET_DATA_SIZE as u64)
902        .with_fixint_encoding()
903        .allow_trailing_bytes()
904        .deserialize_from(reader)
905}
906
907#[cfg(test)]
908mod tests {
909    use {
910        super::*, solana_hash::Hash, solana_keypair::Keypair, solana_signer::Signer,
911        solana_system_transaction::transfer,
912    };
913
914    #[test]
915    fn test_to_packet_batches() {
916        let keypair = Keypair::new();
917        let hash = Hash::new_from_array([1; 32]);
918        let tx = transfer(&keypair, &keypair.pubkey(), 1, hash);
919        let rv = to_packet_batches_for_tests(&[tx.clone(); 1]);
920        assert_eq!(rv.len(), 1);
921        assert_eq!(rv[0].len(), 1);
922
923        #[allow(clippy::useless_vec)]
924        let rv = to_packet_batches_for_tests(&vec![tx.clone(); NUM_PACKETS]);
925        assert_eq!(rv.len(), 1);
926        assert_eq!(rv[0].len(), NUM_PACKETS);
927
928        #[allow(clippy::useless_vec)]
929        let rv = to_packet_batches_for_tests(&vec![tx; NUM_PACKETS + 1]);
930        assert_eq!(rv.len(), 2);
931        assert_eq!(rv[0].len(), NUM_PACKETS);
932        assert_eq!(rv[1].len(), 1);
933    }
934
935    #[test]
936    fn test_to_packets_pinning() {
937        let recycler = PacketBatchRecycler::default();
938        for i in 0..2 {
939            let _first_packets =
940                PinnedPacketBatch::new_with_recycler(&recycler, i + 1, "first one");
941        }
942    }
943}