solana_perf/
packet.rs

1//! The `packet` module defines data structures and methods to pull data from the network.
2#[cfg(feature = "dev-context-only-utils")]
3use bytes::{BufMut, BytesMut};
4use {
5    crate::{cuda_runtime::PinnedVec, recycler::Recycler},
6    bincode::config::Options,
7    bytes::Bytes,
8    rayon::{
9        iter::{IndexedParallelIterator, ParallelIterator},
10        prelude::{IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator},
11    },
12    serde::{de::DeserializeOwned, Deserialize, Serialize},
13    std::{
14        borrow::Borrow,
15        io::Read,
16        net::SocketAddr,
17        ops::{Deref, DerefMut, Index, IndexMut},
18        slice::{Iter, SliceIndex},
19    },
20};
21pub use {
22    bytes,
23    solana_packet::{self, Meta, Packet, PacketFlags, PACKET_DATA_SIZE},
24};
25
26pub const NUM_PACKETS: usize = 1024 * 8;
27
28pub const PACKETS_PER_BATCH: usize = 64;
29pub const NUM_RCVMMSGS: usize = 64;
30
31/// Representation of a packet used in TPU.
32#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
33#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
34pub struct BytesPacket {
35    buffer: Bytes,
36    meta: Meta,
37}
38
39impl BytesPacket {
40    pub fn new(buffer: Bytes, meta: Meta) -> Self {
41        Self { buffer, meta }
42    }
43
44    #[cfg(feature = "dev-context-only-utils")]
45    pub fn empty() -> Self {
46        Self {
47            buffer: Bytes::new(),
48            meta: Meta::default(),
49        }
50    }
51
52    #[cfg(feature = "dev-context-only-utils")]
53    pub fn from_bytes(dest: Option<&SocketAddr>, buffer: Bytes) -> Self {
54        let mut meta = Meta {
55            size: buffer.len(),
56            ..Default::default()
57        };
58        if let Some(dest) = dest {
59            meta.set_socket_addr(dest);
60        }
61
62        Self { buffer, meta }
63    }
64
65    #[cfg(feature = "dev-context-only-utils")]
66    pub fn from_data<T>(dest: Option<&SocketAddr>, data: T) -> bincode::Result<Self>
67    where
68        T: solana_packet::Encode,
69    {
70        let buffer = BytesMut::with_capacity(PACKET_DATA_SIZE);
71        let mut writer = buffer.writer();
72        data.encode(&mut writer)?;
73        let buffer = writer.into_inner();
74        let buffer = buffer.freeze();
75
76        let mut meta = Meta {
77            size: buffer.len(),
78            ..Default::default()
79        };
80        if let Some(dest) = dest {
81            meta.set_socket_addr(dest);
82        }
83
84        Ok(Self { buffer, meta })
85    }
86
87    #[inline]
88    pub fn data<I>(&self, index: I) -> Option<&<I as SliceIndex<[u8]>>::Output>
89    where
90        I: SliceIndex<[u8]>,
91    {
92        if self.meta.discard() {
93            None
94        } else {
95            self.buffer.get(index)
96        }
97    }
98
99    #[inline]
100    pub fn meta(&self) -> &Meta {
101        &self.meta
102    }
103
104    #[inline]
105    pub fn meta_mut(&mut self) -> &mut Meta {
106        &mut self.meta
107    }
108
109    pub fn deserialize_slice<T, I>(&self, index: I) -> bincode::Result<T>
110    where
111        T: serde::de::DeserializeOwned,
112        I: SliceIndex<[u8], Output = [u8]>,
113    {
114        let bytes = self.data(index).ok_or(bincode::ErrorKind::SizeLimit)?;
115        bincode::options()
116            .with_limit(self.meta().size as u64)
117            .with_fixint_encoding()
118            .reject_trailing_bytes()
119            .deserialize(bytes)
120    }
121
122    #[cfg(feature = "dev-context-only-utils")]
123    pub fn copy_from_slice(&mut self, slice: &[u8]) {
124        self.buffer = Bytes::from(slice.to_vec());
125    }
126
127    #[inline]
128    pub fn as_ref(&self) -> PacketRef<'_> {
129        PacketRef::Bytes(self)
130    }
131
132    #[inline]
133    pub fn as_mut(&mut self) -> PacketRefMut<'_> {
134        PacketRefMut::Bytes(self)
135    }
136}
137
138#[cfg_attr(feature = "frozen-abi", derive(AbiExample, AbiEnumVisitor))]
139#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
140pub enum PacketBatch {
141    Pinned(PinnedPacketBatch),
142    Bytes(BytesPacketBatch),
143}
144
145impl PacketBatch {
146    #[cfg(feature = "dev-context-only-utils")]
147    pub fn first(&self) -> Option<PacketRef<'_>> {
148        match self {
149            Self::Pinned(batch) => batch.first().map(PacketRef::from),
150            Self::Bytes(batch) => batch.first().map(PacketRef::from),
151        }
152    }
153
154    #[cfg(feature = "dev-context-only-utils")]
155    pub fn first_mut(&mut self) -> Option<PacketRefMut<'_>> {
156        match self {
157            Self::Pinned(batch) => batch.first_mut().map(PacketRefMut::from),
158            Self::Bytes(batch) => batch.first_mut().map(PacketRefMut::from),
159        }
160    }
161
162    /// Returns `true` if the batch contains no elements.
163    pub fn is_empty(&self) -> bool {
164        match self {
165            Self::Pinned(batch) => batch.is_empty(),
166            Self::Bytes(batch) => batch.is_empty(),
167        }
168    }
169
170    /// Returns a reference to an element.
171    pub fn get(&self, index: usize) -> Option<PacketRef<'_>> {
172        match self {
173            Self::Pinned(batch) => batch.get(index).map(PacketRef::from),
174            Self::Bytes(batch) => batch.get(index).map(PacketRef::from),
175        }
176    }
177
178    pub fn get_mut(&mut self, index: usize) -> Option<PacketRefMut<'_>> {
179        match self {
180            Self::Pinned(batch) => batch.get_mut(index).map(PacketRefMut::from),
181            Self::Bytes(batch) => batch.get_mut(index).map(PacketRefMut::from),
182        }
183    }
184
185    pub fn iter(&self) -> PacketBatchIter<'_> {
186        match self {
187            Self::Pinned(batch) => PacketBatchIter::Pinned(batch.iter()),
188            Self::Bytes(batch) => PacketBatchIter::Bytes(batch.iter()),
189        }
190    }
191
192    pub fn iter_mut(&mut self) -> PacketBatchIterMut<'_> {
193        match self {
194            Self::Pinned(batch) => PacketBatchIterMut::Pinned(batch.iter_mut()),
195            Self::Bytes(batch) => PacketBatchIterMut::Bytes(batch.iter_mut()),
196        }
197    }
198
199    pub fn par_iter(&self) -> PacketBatchParIter {
200        match self {
201            Self::Pinned(batch) => {
202                PacketBatchParIter::Pinned(batch.par_iter().map(PacketRef::from))
203            }
204            Self::Bytes(batch) => PacketBatchParIter::Bytes(batch.par_iter().map(PacketRef::from)),
205        }
206    }
207
208    pub fn par_iter_mut(&mut self) -> PacketBatchParIterMut {
209        match self {
210            Self::Pinned(batch) => {
211                PacketBatchParIterMut::Pinned(batch.par_iter_mut().map(PacketRefMut::from))
212            }
213            Self::Bytes(batch) => {
214                PacketBatchParIterMut::Bytes(batch.par_iter_mut().map(PacketRefMut::from))
215            }
216        }
217    }
218
219    pub fn len(&self) -> usize {
220        match self {
221            Self::Pinned(batch) => batch.len(),
222            Self::Bytes(batch) => batch.len(),
223        }
224    }
225}
226
227impl From<PinnedPacketBatch> for PacketBatch {
228    fn from(batch: PinnedPacketBatch) -> Self {
229        Self::Pinned(batch)
230    }
231}
232
233impl From<BytesPacketBatch> for PacketBatch {
234    fn from(batch: BytesPacketBatch) -> Self {
235        Self::Bytes(batch)
236    }
237}
238
239impl From<Vec<BytesPacket>> for PacketBatch {
240    fn from(batch: Vec<BytesPacket>) -> Self {
241        Self::Bytes(BytesPacketBatch::from(batch))
242    }
243}
244
245impl<'a> IntoIterator for &'a PacketBatch {
246    type Item = PacketRef<'a>;
247    type IntoIter = PacketBatchIter<'a>;
248    fn into_iter(self) -> Self::IntoIter {
249        self.iter()
250    }
251}
252
253impl<'a> IntoIterator for &'a mut PacketBatch {
254    type Item = PacketRefMut<'a>;
255    type IntoIter = PacketBatchIterMut<'a>;
256    fn into_iter(self) -> Self::IntoIter {
257        self.iter_mut()
258    }
259}
260
261impl<'a> IntoParallelIterator for &'a PacketBatch {
262    type Iter = PacketBatchParIter<'a>;
263    type Item = PacketRef<'a>;
264    fn into_par_iter(self) -> Self::Iter {
265        self.par_iter()
266    }
267}
268
269impl<'a> IntoParallelIterator for &'a mut PacketBatch {
270    type Iter = PacketBatchParIterMut<'a>;
271    type Item = PacketRefMut<'a>;
272    fn into_par_iter(self) -> Self::Iter {
273        self.par_iter_mut()
274    }
275}
276
277#[derive(Clone, Copy, Debug, Eq)]
278pub enum PacketRef<'a> {
279    Packet(&'a Packet),
280    Bytes(&'a BytesPacket),
281}
282
283impl PartialEq for PacketRef<'_> {
284    fn eq(&self, other: &PacketRef<'_>) -> bool {
285        self.meta().eq(other.meta()) && self.data(..).eq(&other.data(..))
286    }
287}
288
289impl<'a> From<&'a Packet> for PacketRef<'a> {
290    fn from(packet: &'a Packet) -> Self {
291        Self::Packet(packet)
292    }
293}
294
295impl<'a> From<&'a mut Packet> for PacketRef<'a> {
296    fn from(packet: &'a mut Packet) -> Self {
297        Self::Packet(packet)
298    }
299}
300
301impl<'a> From<&'a BytesPacket> for PacketRef<'a> {
302    fn from(packet: &'a BytesPacket) -> Self {
303        Self::Bytes(packet)
304    }
305}
306
307impl<'a> From<&'a mut BytesPacket> for PacketRef<'a> {
308    fn from(packet: &'a mut BytesPacket) -> Self {
309        Self::Bytes(packet)
310    }
311}
312
313impl<'a> PacketRef<'a> {
314    pub fn data<I>(&self, index: I) -> Option<&'a <I as SliceIndex<[u8]>>::Output>
315    where
316        I: SliceIndex<[u8]>,
317    {
318        match self {
319            Self::Packet(packet) => packet.data(index),
320            Self::Bytes(packet) => packet.data(index),
321        }
322    }
323
324    #[inline]
325    pub fn meta(&self) -> &Meta {
326        match self {
327            Self::Packet(packet) => packet.meta(),
328            Self::Bytes(packet) => packet.meta(),
329        }
330    }
331
332    pub fn deserialize_slice<T, I>(&self, index: I) -> bincode::Result<T>
333    where
334        T: serde::de::DeserializeOwned,
335        I: SliceIndex<[u8], Output = [u8]>,
336    {
337        match self {
338            Self::Packet(packet) => packet.deserialize_slice(index),
339            Self::Bytes(packet) => packet.deserialize_slice(index),
340        }
341    }
342
343    pub fn to_bytes_packet(&self) -> BytesPacket {
344        match self {
345            // In case of the legacy `Packet` variant, we unfortunately need to
346            // make a copy.
347            Self::Packet(packet) => {
348                let buffer = packet
349                    .data(..)
350                    .map(|data| Bytes::from(data.to_vec()))
351                    .unwrap_or_else(Bytes::new);
352                BytesPacket::new(buffer, self.meta().clone())
353            }
354            // Cheap clone of `Bytes`.
355            // We call `to_owned()` twice, because `packet` is `&&BytesPacket`
356            // at this point. This will become less annoying once we switch to
357            // `BytesPacket` entirely and deal just with `Vec<BytesPacket>`
358            // everywhere.
359            Self::Bytes(packet) => packet.to_owned().to_owned(),
360        }
361    }
362}
363
364#[derive(Debug, Eq)]
365pub enum PacketRefMut<'a> {
366    Packet(&'a mut Packet),
367    Bytes(&'a mut BytesPacket),
368}
369
370impl<'a> PartialEq for PacketRefMut<'a> {
371    fn eq(&self, other: &PacketRefMut<'a>) -> bool {
372        self.data(..).eq(&other.data(..)) && self.meta().eq(other.meta())
373    }
374}
375
376impl<'a> From<&'a mut Packet> for PacketRefMut<'a> {
377    fn from(packet: &'a mut Packet) -> Self {
378        Self::Packet(packet)
379    }
380}
381
382impl<'a> From<&'a mut BytesPacket> for PacketRefMut<'a> {
383    fn from(packet: &'a mut BytesPacket) -> Self {
384        Self::Bytes(packet)
385    }
386}
387
388impl PacketRefMut<'_> {
389    pub fn data<I>(&self, index: I) -> Option<&<I as SliceIndex<[u8]>>::Output>
390    where
391        I: SliceIndex<[u8]>,
392    {
393        match self {
394            Self::Packet(packet) => packet.data(index),
395            Self::Bytes(packet) => packet.data(index),
396        }
397    }
398
399    #[inline]
400    pub fn meta(&self) -> &Meta {
401        match self {
402            Self::Packet(packet) => packet.meta(),
403            Self::Bytes(packet) => packet.meta(),
404        }
405    }
406
407    #[inline]
408    pub fn meta_mut(&mut self) -> &mut Meta {
409        match self {
410            Self::Packet(packet) => packet.meta_mut(),
411            Self::Bytes(packet) => packet.meta_mut(),
412        }
413    }
414
415    pub fn deserialize_slice<T, I>(&self, index: I) -> bincode::Result<T>
416    where
417        T: serde::de::DeserializeOwned,
418        I: SliceIndex<[u8], Output = [u8]>,
419    {
420        match self {
421            Self::Packet(packet) => packet.deserialize_slice(index),
422            Self::Bytes(packet) => packet.deserialize_slice(index),
423        }
424    }
425
426    #[cfg(feature = "dev-context-only-utils")]
427    #[inline]
428    pub fn copy_from_slice(&mut self, src: &[u8]) {
429        match self {
430            Self::Packet(packet) => {
431                let size = src.len();
432                packet.buffer_mut()[..size].copy_from_slice(src);
433            }
434            Self::Bytes(packet) => packet.copy_from_slice(src),
435        }
436    }
437
438    #[inline]
439    pub fn as_ref(&self) -> PacketRef<'_> {
440        match self {
441            Self::Packet(packet) => PacketRef::Packet(packet),
442            Self::Bytes(packet) => PacketRef::Bytes(packet),
443        }
444    }
445}
446
447pub enum PacketBatchIter<'a> {
448    Pinned(std::slice::Iter<'a, Packet>),
449    Bytes(std::slice::Iter<'a, BytesPacket>),
450}
451
452impl DoubleEndedIterator for PacketBatchIter<'_> {
453    fn next_back(&mut self) -> Option<Self::Item> {
454        match self {
455            Self::Pinned(iter) => iter.next_back().map(PacketRef::Packet),
456            Self::Bytes(iter) => iter.next_back().map(PacketRef::Bytes),
457        }
458    }
459}
460
461impl<'a> Iterator for PacketBatchIter<'a> {
462    type Item = PacketRef<'a>;
463
464    fn next(&mut self) -> Option<Self::Item> {
465        match self {
466            Self::Pinned(iter) => iter.next().map(PacketRef::Packet),
467            Self::Bytes(iter) => iter.next().map(PacketRef::Bytes),
468        }
469    }
470}
471
472pub enum PacketBatchIterMut<'a> {
473    Pinned(std::slice::IterMut<'a, Packet>),
474    Bytes(std::slice::IterMut<'a, BytesPacket>),
475}
476
477impl DoubleEndedIterator for PacketBatchIterMut<'_> {
478    fn next_back(&mut self) -> Option<Self::Item> {
479        match self {
480            Self::Pinned(iter) => iter.next_back().map(PacketRefMut::Packet),
481            Self::Bytes(iter) => iter.next_back().map(PacketRefMut::Bytes),
482        }
483    }
484}
485
486impl<'a> Iterator for PacketBatchIterMut<'a> {
487    type Item = PacketRefMut<'a>;
488
489    fn next(&mut self) -> Option<Self::Item> {
490        match self {
491            Self::Pinned(iter) => iter.next().map(PacketRefMut::Packet),
492            Self::Bytes(iter) => iter.next().map(PacketRefMut::Bytes),
493        }
494    }
495}
496
497type PacketParIter<'a> = rayon::slice::Iter<'a, Packet>;
498type BytesPacketParIter<'a> = rayon::slice::Iter<'a, BytesPacket>;
499
500pub enum PacketBatchParIter<'a> {
501    Pinned(
502        rayon::iter::Map<
503            PacketParIter<'a>,
504            fn(<PacketParIter<'a> as ParallelIterator>::Item) -> PacketRef<'a>,
505        >,
506    ),
507    Bytes(
508        rayon::iter::Map<
509            BytesPacketParIter<'a>,
510            fn(<BytesPacketParIter<'a> as ParallelIterator>::Item) -> PacketRef<'a>,
511        >,
512    ),
513}
514
515impl<'a> ParallelIterator for PacketBatchParIter<'a> {
516    type Item = PacketRef<'a>;
517    fn drive_unindexed<C>(self, consumer: C) -> C::Result
518    where
519        C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
520    {
521        match self {
522            Self::Pinned(iter) => iter.drive_unindexed(consumer),
523            Self::Bytes(iter) => iter.drive_unindexed(consumer),
524        }
525    }
526}
527
528impl IndexedParallelIterator for PacketBatchParIter<'_> {
529    fn len(&self) -> usize {
530        match self {
531            Self::Pinned(iter) => iter.len(),
532            Self::Bytes(iter) => iter.len(),
533        }
534    }
535
536    fn drive<C: rayon::iter::plumbing::Consumer<Self::Item>>(self, consumer: C) -> C::Result {
537        match self {
538            Self::Pinned(iter) => iter.drive(consumer),
539            Self::Bytes(iter) => iter.drive(consumer),
540        }
541    }
542
543    fn with_producer<CB: rayon::iter::plumbing::ProducerCallback<Self::Item>>(
544        self,
545        callback: CB,
546    ) -> CB::Output {
547        match self {
548            Self::Pinned(iter) => iter.with_producer(callback),
549            Self::Bytes(iter) => iter.with_producer(callback),
550        }
551    }
552}
553
554type PacketParIterMut<'a> = rayon::slice::IterMut<'a, Packet>;
555type BytesPacketParIterMut<'a> = rayon::slice::IterMut<'a, BytesPacket>;
556
557pub enum PacketBatchParIterMut<'a> {
558    Pinned(
559        rayon::iter::Map<
560            PacketParIterMut<'a>,
561            fn(<PacketParIterMut<'a> as ParallelIterator>::Item) -> PacketRefMut<'a>,
562        >,
563    ),
564    Bytes(
565        rayon::iter::Map<
566            BytesPacketParIterMut<'a>,
567            fn(<BytesPacketParIterMut<'a> as ParallelIterator>::Item) -> PacketRefMut<'a>,
568        >,
569    ),
570}
571
572impl<'a> ParallelIterator for PacketBatchParIterMut<'a> {
573    type Item = PacketRefMut<'a>;
574    fn drive_unindexed<C>(self, consumer: C) -> C::Result
575    where
576        C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
577    {
578        match self {
579            Self::Pinned(iter) => iter.drive_unindexed(consumer),
580            Self::Bytes(iter) => iter.drive_unindexed(consumer),
581        }
582    }
583}
584
585impl IndexedParallelIterator for PacketBatchParIterMut<'_> {
586    fn len(&self) -> usize {
587        match self {
588            Self::Pinned(iter) => iter.len(),
589            Self::Bytes(iter) => iter.len(),
590        }
591    }
592
593    fn drive<C: rayon::iter::plumbing::Consumer<Self::Item>>(self, consumer: C) -> C::Result {
594        match self {
595            Self::Pinned(iter) => iter.drive(consumer),
596            Self::Bytes(iter) => iter.drive(consumer),
597        }
598    }
599
600    fn with_producer<CB: rayon::iter::plumbing::ProducerCallback<Self::Item>>(
601        self,
602        callback: CB,
603    ) -> CB::Output {
604        match self {
605            Self::Pinned(iter) => iter.with_producer(callback),
606            Self::Bytes(iter) => iter.with_producer(callback),
607        }
608    }
609}
610
611#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
612#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
613pub struct PinnedPacketBatch {
614    packets: PinnedVec<Packet>,
615}
616
617pub type PacketBatchRecycler = Recycler<PinnedVec<Packet>>;
618
619impl PinnedPacketBatch {
620    pub fn new(packets: Vec<Packet>) -> Self {
621        let packets = PinnedVec::from_vec(packets);
622        Self { packets }
623    }
624
625    pub fn with_capacity(capacity: usize) -> Self {
626        let packets = PinnedVec::with_capacity(capacity);
627        Self { packets }
628    }
629
630    pub fn new_pinned_with_capacity(capacity: usize) -> Self {
631        let mut batch = Self::with_capacity(capacity);
632        batch.packets.reserve_and_pin(capacity);
633        batch
634    }
635
636    pub fn new_unpinned_with_recycler(
637        recycler: &PacketBatchRecycler,
638        capacity: usize,
639        name: &'static str,
640    ) -> Self {
641        let mut packets = recycler.allocate(name);
642        packets.reserve(capacity);
643        Self { packets }
644    }
645
646    pub fn new_with_recycler(
647        recycler: &PacketBatchRecycler,
648        capacity: usize,
649        name: &'static str,
650    ) -> Self {
651        let mut packets = recycler.allocate(name);
652        packets.reserve_and_pin(capacity);
653        Self { packets }
654    }
655
656    pub fn new_with_recycler_data(
657        recycler: &PacketBatchRecycler,
658        name: &'static str,
659        mut packets: Vec<Packet>,
660    ) -> Self {
661        let mut batch = Self::new_with_recycler(recycler, packets.len(), name);
662        batch.packets.append(&mut packets);
663        batch
664    }
665
666    pub fn new_unpinned_with_recycler_data_and_dests<S, T>(
667        recycler: &PacketBatchRecycler,
668        name: &'static str,
669        dests_and_data: impl IntoIterator<Item = (S, T), IntoIter: ExactSizeIterator>,
670    ) -> Self
671    where
672        S: Borrow<SocketAddr>,
673        T: solana_packet::Encode,
674    {
675        let dests_and_data = dests_and_data.into_iter();
676        let mut batch = Self::new_unpinned_with_recycler(recycler, dests_and_data.len(), name);
677        batch
678            .packets
679            .resize(dests_and_data.len(), Packet::default());
680
681        for ((addr, data), packet) in dests_and_data.zip(batch.packets.iter_mut()) {
682            let addr = addr.borrow();
683            if !addr.ip().is_unspecified() && addr.port() != 0 {
684                if let Err(e) = Packet::populate_packet(packet, Some(addr), &data) {
685                    // TODO: This should never happen. Instead the caller should
686                    // break the payload into smaller messages, and here any errors
687                    // should be propagated.
688                    error!("Couldn't write to packet {:?}. Data skipped.", e);
689                    packet.meta_mut().set_discard(true);
690                }
691            } else {
692                trace!("Dropping packet, as destination is unknown");
693                packet.meta_mut().set_discard(true);
694            }
695        }
696        batch
697    }
698
699    pub fn new_unpinned_with_recycler_data(
700        recycler: &PacketBatchRecycler,
701        name: &'static str,
702        mut packets: Vec<Packet>,
703    ) -> Self {
704        let mut batch = Self::new_unpinned_with_recycler(recycler, packets.len(), name);
705        batch.packets.append(&mut packets);
706        batch
707    }
708
709    pub fn set_addr(&mut self, addr: &SocketAddr) {
710        for p in self.iter_mut() {
711            p.meta_mut().set_socket_addr(addr);
712        }
713    }
714}
715
716impl Deref for PinnedPacketBatch {
717    type Target = PinnedVec<Packet>;
718
719    fn deref(&self) -> &Self::Target {
720        &self.packets
721    }
722}
723
724impl DerefMut for PinnedPacketBatch {
725    fn deref_mut(&mut self) -> &mut Self::Target {
726        &mut self.packets
727    }
728}
729
730impl<I: SliceIndex<[Packet]>> Index<I> for PinnedPacketBatch {
731    type Output = I::Output;
732
733    #[inline]
734    fn index(&self, index: I) -> &Self::Output {
735        &self.packets[index]
736    }
737}
738
739impl<I: SliceIndex<[Packet]>> IndexMut<I> for PinnedPacketBatch {
740    #[inline]
741    fn index_mut(&mut self, index: I) -> &mut Self::Output {
742        &mut self.packets[index]
743    }
744}
745
746impl<'a> IntoIterator for &'a PinnedPacketBatch {
747    type Item = &'a Packet;
748    type IntoIter = Iter<'a, Packet>;
749
750    fn into_iter(self) -> Self::IntoIter {
751        self.packets.iter()
752    }
753}
754
755impl<'a> IntoParallelIterator for &'a PinnedPacketBatch {
756    type Iter = rayon::slice::Iter<'a, Packet>;
757    type Item = &'a Packet;
758    fn into_par_iter(self) -> Self::Iter {
759        self.packets.par_iter()
760    }
761}
762
763impl<'a> IntoParallelIterator for &'a mut PinnedPacketBatch {
764    type Iter = rayon::slice::IterMut<'a, Packet>;
765    type Item = &'a mut Packet;
766    fn into_par_iter(self) -> Self::Iter {
767        self.packets.par_iter_mut()
768    }
769}
770
771impl From<PinnedPacketBatch> for Vec<Packet> {
772    fn from(batch: PinnedPacketBatch) -> Self {
773        batch.packets.into()
774    }
775}
776
777pub fn to_packet_batches<T: Serialize>(items: &[T], chunk_size: usize) -> Vec<PacketBatch> {
778    items
779        .chunks(chunk_size)
780        .map(|batch_items| {
781            let mut batch = PinnedPacketBatch::with_capacity(batch_items.len());
782            batch.resize(batch_items.len(), Packet::default());
783            for (item, packet) in batch_items.iter().zip(batch.packets.iter_mut()) {
784                Packet::populate_packet(packet, None, item).expect("serialize request");
785            }
786            batch.into()
787        })
788        .collect()
789}
790
791#[cfg(test)]
792fn to_packet_batches_for_tests<T: Serialize>(items: &[T]) -> Vec<PacketBatch> {
793    to_packet_batches(items, NUM_PACKETS)
794}
795
796#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
797#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
798pub struct BytesPacketBatch {
799    packets: Vec<BytesPacket>,
800}
801
802impl BytesPacketBatch {
803    pub fn new() -> Self {
804        Self::default()
805    }
806
807    pub fn with_capacity(capacity: usize) -> Self {
808        let packets = Vec::with_capacity(capacity);
809        Self { packets }
810    }
811
812    pub fn to_pinned_packet_batch(&self) -> PinnedPacketBatch {
813        let mut batch = PinnedPacketBatch::new_pinned_with_capacity(self.len());
814        for bytes_packet in self.iter() {
815            let mut packet = Packet::default();
816            let size = bytes_packet.meta().size;
817            *packet.meta_mut() = bytes_packet.meta().clone();
818            packet.meta_mut().size = size;
819            packet.buffer_mut()[..size].copy_from_slice(&bytes_packet.buffer);
820
821            batch.push(packet);
822        }
823
824        batch
825    }
826}
827
828impl Deref for BytesPacketBatch {
829    type Target = Vec<BytesPacket>;
830
831    fn deref(&self) -> &Self::Target {
832        &self.packets
833    }
834}
835
836impl DerefMut for BytesPacketBatch {
837    fn deref_mut(&mut self) -> &mut Self::Target {
838        &mut self.packets
839    }
840}
841
842impl From<Vec<BytesPacket>> for BytesPacketBatch {
843    fn from(packets: Vec<BytesPacket>) -> Self {
844        Self { packets }
845    }
846}
847
848impl FromIterator<BytesPacket> for BytesPacketBatch {
849    fn from_iter<T: IntoIterator<Item = BytesPacket>>(iter: T) -> Self {
850        let packets = Vec::from_iter(iter);
851        Self { packets }
852    }
853}
854
855impl<'a> IntoIterator for &'a BytesPacketBatch {
856    type Item = &'a BytesPacket;
857    type IntoIter = Iter<'a, BytesPacket>;
858
859    fn into_iter(self) -> Self::IntoIter {
860        self.packets.iter()
861    }
862}
863
864impl<'a> IntoParallelIterator for &'a BytesPacketBatch {
865    type Iter = rayon::slice::Iter<'a, BytesPacket>;
866    type Item = &'a BytesPacket;
867    fn into_par_iter(self) -> Self::Iter {
868        self.packets.par_iter()
869    }
870}
871
872impl<'a> IntoParallelIterator for &'a mut BytesPacketBatch {
873    type Iter = rayon::slice::IterMut<'a, BytesPacket>;
874    type Item = &'a mut BytesPacket;
875    fn into_par_iter(self) -> Self::Iter {
876        self.packets.par_iter_mut()
877    }
878}
879
880pub fn deserialize_from_with_limit<R, T>(reader: R) -> bincode::Result<T>
881where
882    R: Read,
883    T: DeserializeOwned,
884{
885    // with_limit causes pre-allocation size to be limited
886    // to prevent against memory exhaustion attacks.
887    bincode::options()
888        .with_limit(PACKET_DATA_SIZE as u64)
889        .with_fixint_encoding()
890        .allow_trailing_bytes()
891        .deserialize_from(reader)
892}
893
894#[cfg(test)]
895mod tests {
896    use {
897        super::*, solana_hash::Hash, solana_keypair::Keypair, solana_signer::Signer,
898        solana_system_transaction::transfer,
899    };
900
901    #[test]
902    fn test_to_packet_batches() {
903        let keypair = Keypair::new();
904        let hash = Hash::new_from_array([1; 32]);
905        let tx = transfer(&keypair, &keypair.pubkey(), 1, hash);
906        let rv = to_packet_batches_for_tests(&[tx.clone(); 1]);
907        assert_eq!(rv.len(), 1);
908        assert_eq!(rv[0].len(), 1);
909
910        #[allow(clippy::useless_vec)]
911        let rv = to_packet_batches_for_tests(&vec![tx.clone(); NUM_PACKETS]);
912        assert_eq!(rv.len(), 1);
913        assert_eq!(rv[0].len(), NUM_PACKETS);
914
915        #[allow(clippy::useless_vec)]
916        let rv = to_packet_batches_for_tests(&vec![tx; NUM_PACKETS + 1]);
917        assert_eq!(rv.len(), 2);
918        assert_eq!(rv[0].len(), NUM_PACKETS);
919        assert_eq!(rv[1].len(), 1);
920    }
921
922    #[test]
923    fn test_to_packets_pinning() {
924        let recycler = PacketBatchRecycler::default();
925        for i in 0..2 {
926            let _first_packets =
927                PinnedPacketBatch::new_with_recycler(&recycler, i + 1, "first one");
928        }
929    }
930}