1#[cfg(feature = "dev-context-only-utils")]
3use bytes::{BufMut, BytesMut};
4use {
5 crate::{cuda_runtime::PinnedVec, recycler::Recycler},
6 bincode::config::Options,
7 bytes::Bytes,
8 rayon::{
9 iter::{IndexedParallelIterator, ParallelIterator},
10 prelude::{IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator},
11 },
12 serde::{de::DeserializeOwned, Deserialize, Serialize},
13 std::{
14 borrow::Borrow,
15 io::Read,
16 net::SocketAddr,
17 ops::{Deref, DerefMut, Index, IndexMut},
18 slice::{Iter, SliceIndex},
19 },
20};
21pub use {
22 bytes,
23 solana_packet::{self, Meta, Packet, PacketFlags, PACKET_DATA_SIZE},
24};
25
26pub const NUM_PACKETS: usize = 1024 * 8;
27
28pub const PACKETS_PER_BATCH: usize = 64;
29pub const NUM_RCVMMSGS: usize = 64;
30
31#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
33#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
34pub struct BytesPacket {
35 buffer: Bytes,
36 meta: Meta,
37}
38
39impl BytesPacket {
40 pub fn new(buffer: Bytes, meta: Meta) -> Self {
41 Self { buffer, meta }
42 }
43
44 #[cfg(feature = "dev-context-only-utils")]
45 pub fn empty() -> Self {
46 Self {
47 buffer: Bytes::new(),
48 meta: Meta::default(),
49 }
50 }
51
52 #[cfg(feature = "dev-context-only-utils")]
53 pub fn from_bytes(dest: Option<&SocketAddr>, buffer: impl Into<Bytes>) -> Self {
54 let buffer = buffer.into();
55 let mut meta = Meta {
56 size: buffer.len(),
57 ..Default::default()
58 };
59 if let Some(dest) = dest {
60 meta.set_socket_addr(dest);
61 }
62
63 Self { buffer, meta }
64 }
65
66 #[cfg(feature = "dev-context-only-utils")]
67 pub fn from_data<T>(dest: Option<&SocketAddr>, data: T) -> bincode::Result<Self>
68 where
69 T: solana_packet::Encode,
70 {
71 let buffer = BytesMut::with_capacity(PACKET_DATA_SIZE);
72 let mut writer = buffer.writer();
73 data.encode(&mut writer)?;
74 let buffer = writer.into_inner();
75 let buffer = buffer.freeze();
76
77 let mut meta = Meta {
78 size: buffer.len(),
79 ..Default::default()
80 };
81 if let Some(dest) = dest {
82 meta.set_socket_addr(dest);
83 }
84
85 Ok(Self { buffer, meta })
86 }
87
88 #[inline]
89 pub fn data<I>(&self, index: I) -> Option<&<I as SliceIndex<[u8]>>::Output>
90 where
91 I: SliceIndex<[u8]>,
92 {
93 if self.meta.discard() {
94 None
95 } else {
96 self.buffer.get(index)
97 }
98 }
99
100 #[inline]
101 pub fn meta(&self) -> &Meta {
102 &self.meta
103 }
104
105 #[inline]
106 pub fn meta_mut(&mut self) -> &mut Meta {
107 &mut self.meta
108 }
109
110 pub fn deserialize_slice<T, I>(&self, index: I) -> bincode::Result<T>
111 where
112 T: serde::de::DeserializeOwned,
113 I: SliceIndex<[u8], Output = [u8]>,
114 {
115 let bytes = self.data(index).ok_or(bincode::ErrorKind::SizeLimit)?;
116 bincode::options()
117 .with_limit(self.meta().size as u64)
118 .with_fixint_encoding()
119 .reject_trailing_bytes()
120 .deserialize(bytes)
121 }
122
123 #[cfg(feature = "dev-context-only-utils")]
124 pub fn copy_from_slice(&mut self, slice: &[u8]) {
125 self.buffer = Bytes::from(slice.to_vec());
126 }
127
128 #[inline]
129 pub fn as_ref(&self) -> PacketRef<'_> {
130 PacketRef::Bytes(self)
131 }
132
133 #[inline]
134 pub fn as_mut(&mut self) -> PacketRefMut<'_> {
135 PacketRefMut::Bytes(self)
136 }
137
138 #[inline]
139 pub fn buffer(&self) -> &Bytes {
140 &self.buffer
141 }
142
143 #[inline]
144 pub fn set_buffer(&mut self, buffer: impl Into<Bytes>) {
145 let buffer = buffer.into();
146 self.meta.size = buffer.len();
147 self.buffer = buffer;
148 }
149}
150
151#[cfg_attr(feature = "frozen-abi", derive(AbiExample, AbiEnumVisitor))]
152#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
153pub enum PacketBatch {
154 Pinned(PinnedPacketBatch),
155 Bytes(BytesPacketBatch),
156}
157
158impl PacketBatch {
159 #[cfg(feature = "dev-context-only-utils")]
160 pub fn first(&self) -> Option<PacketRef<'_>> {
161 match self {
162 Self::Pinned(batch) => batch.first().map(PacketRef::from),
163 Self::Bytes(batch) => batch.first().map(PacketRef::from),
164 }
165 }
166
167 #[cfg(feature = "dev-context-only-utils")]
168 pub fn first_mut(&mut self) -> Option<PacketRefMut<'_>> {
169 match self {
170 Self::Pinned(batch) => batch.first_mut().map(PacketRefMut::from),
171 Self::Bytes(batch) => batch.first_mut().map(PacketRefMut::from),
172 }
173 }
174
175 pub fn is_empty(&self) -> bool {
177 match self {
178 Self::Pinned(batch) => batch.is_empty(),
179 Self::Bytes(batch) => batch.is_empty(),
180 }
181 }
182
183 pub fn get(&self, index: usize) -> Option<PacketRef<'_>> {
185 match self {
186 Self::Pinned(batch) => batch.get(index).map(PacketRef::from),
187 Self::Bytes(batch) => batch.get(index).map(PacketRef::from),
188 }
189 }
190
191 pub fn get_mut(&mut self, index: usize) -> Option<PacketRefMut<'_>> {
192 match self {
193 Self::Pinned(batch) => batch.get_mut(index).map(PacketRefMut::from),
194 Self::Bytes(batch) => batch.get_mut(index).map(PacketRefMut::from),
195 }
196 }
197
198 pub fn iter(&self) -> PacketBatchIter<'_> {
199 match self {
200 Self::Pinned(batch) => PacketBatchIter::Pinned(batch.iter()),
201 Self::Bytes(batch) => PacketBatchIter::Bytes(batch.iter()),
202 }
203 }
204
205 pub fn iter_mut(&mut self) -> PacketBatchIterMut<'_> {
206 match self {
207 Self::Pinned(batch) => PacketBatchIterMut::Pinned(batch.iter_mut()),
208 Self::Bytes(batch) => PacketBatchIterMut::Bytes(batch.iter_mut()),
209 }
210 }
211
212 pub fn par_iter(&self) -> PacketBatchParIter<'_> {
213 match self {
214 Self::Pinned(batch) => {
215 PacketBatchParIter::Pinned(batch.par_iter().map(PacketRef::from))
216 }
217 Self::Bytes(batch) => PacketBatchParIter::Bytes(batch.par_iter().map(PacketRef::from)),
218 }
219 }
220
221 pub fn par_iter_mut(&mut self) -> PacketBatchParIterMut<'_> {
222 match self {
223 Self::Pinned(batch) => {
224 PacketBatchParIterMut::Pinned(batch.par_iter_mut().map(PacketRefMut::from))
225 }
226 Self::Bytes(batch) => {
227 PacketBatchParIterMut::Bytes(batch.par_iter_mut().map(PacketRefMut::from))
228 }
229 }
230 }
231
232 pub fn len(&self) -> usize {
233 match self {
234 Self::Pinned(batch) => batch.len(),
235 Self::Bytes(batch) => batch.len(),
236 }
237 }
238}
239
240impl From<PinnedPacketBatch> for PacketBatch {
241 fn from(batch: PinnedPacketBatch) -> Self {
242 Self::Pinned(batch)
243 }
244}
245
246impl From<BytesPacketBatch> for PacketBatch {
247 fn from(batch: BytesPacketBatch) -> Self {
248 Self::Bytes(batch)
249 }
250}
251
252impl From<Vec<BytesPacket>> for PacketBatch {
253 fn from(batch: Vec<BytesPacket>) -> Self {
254 Self::Bytes(BytesPacketBatch::from(batch))
255 }
256}
257
258impl<'a> IntoIterator for &'a PacketBatch {
259 type Item = PacketRef<'a>;
260 type IntoIter = PacketBatchIter<'a>;
261 fn into_iter(self) -> Self::IntoIter {
262 self.iter()
263 }
264}
265
266impl<'a> IntoIterator for &'a mut PacketBatch {
267 type Item = PacketRefMut<'a>;
268 type IntoIter = PacketBatchIterMut<'a>;
269 fn into_iter(self) -> Self::IntoIter {
270 self.iter_mut()
271 }
272}
273
274impl<'a> IntoParallelIterator for &'a PacketBatch {
275 type Iter = PacketBatchParIter<'a>;
276 type Item = PacketRef<'a>;
277 fn into_par_iter(self) -> Self::Iter {
278 self.par_iter()
279 }
280}
281
282impl<'a> IntoParallelIterator for &'a mut PacketBatch {
283 type Iter = PacketBatchParIterMut<'a>;
284 type Item = PacketRefMut<'a>;
285 fn into_par_iter(self) -> Self::Iter {
286 self.par_iter_mut()
287 }
288}
289
290#[derive(Clone, Copy, Debug, Eq)]
291pub enum PacketRef<'a> {
292 Packet(&'a Packet),
293 Bytes(&'a BytesPacket),
294}
295
296impl PartialEq for PacketRef<'_> {
297 fn eq(&self, other: &PacketRef<'_>) -> bool {
298 self.meta().eq(other.meta()) && self.data(..).eq(&other.data(..))
299 }
300}
301
302impl<'a> From<&'a Packet> for PacketRef<'a> {
303 fn from(packet: &'a Packet) -> Self {
304 Self::Packet(packet)
305 }
306}
307
308impl<'a> From<&'a mut Packet> for PacketRef<'a> {
309 fn from(packet: &'a mut Packet) -> Self {
310 Self::Packet(packet)
311 }
312}
313
314impl<'a> From<&'a BytesPacket> for PacketRef<'a> {
315 fn from(packet: &'a BytesPacket) -> Self {
316 Self::Bytes(packet)
317 }
318}
319
320impl<'a> From<&'a mut BytesPacket> for PacketRef<'a> {
321 fn from(packet: &'a mut BytesPacket) -> Self {
322 Self::Bytes(packet)
323 }
324}
325
326impl<'a> PacketRef<'a> {
327 pub fn data<I>(&self, index: I) -> Option<&'a <I as SliceIndex<[u8]>>::Output>
328 where
329 I: SliceIndex<[u8]>,
330 {
331 match self {
332 Self::Packet(packet) => packet.data(index),
333 Self::Bytes(packet) => packet.data(index),
334 }
335 }
336
337 #[inline]
338 pub fn meta(&self) -> &Meta {
339 match self {
340 Self::Packet(packet) => packet.meta(),
341 Self::Bytes(packet) => packet.meta(),
342 }
343 }
344
345 pub fn deserialize_slice<T, I>(&self, index: I) -> bincode::Result<T>
346 where
347 T: serde::de::DeserializeOwned,
348 I: SliceIndex<[u8], Output = [u8]>,
349 {
350 match self {
351 Self::Packet(packet) => packet.deserialize_slice(index),
352 Self::Bytes(packet) => packet.deserialize_slice(index),
353 }
354 }
355
356 pub fn to_bytes_packet(&self) -> BytesPacket {
357 match self {
358 Self::Packet(packet) => {
361 let buffer = packet
362 .data(..)
363 .map(|data| Bytes::from(data.to_vec()))
364 .unwrap_or_else(Bytes::new);
365 BytesPacket::new(buffer, self.meta().clone())
366 }
367 Self::Bytes(packet) => packet.to_owned().to_owned(),
373 }
374 }
375}
376
377#[derive(Debug, Eq)]
378pub enum PacketRefMut<'a> {
379 Packet(&'a mut Packet),
380 Bytes(&'a mut BytesPacket),
381}
382
383impl<'a> PartialEq for PacketRefMut<'a> {
384 fn eq(&self, other: &PacketRefMut<'a>) -> bool {
385 self.data(..).eq(&other.data(..)) && self.meta().eq(other.meta())
386 }
387}
388
389impl<'a> From<&'a mut Packet> for PacketRefMut<'a> {
390 fn from(packet: &'a mut Packet) -> Self {
391 Self::Packet(packet)
392 }
393}
394
395impl<'a> From<&'a mut BytesPacket> for PacketRefMut<'a> {
396 fn from(packet: &'a mut BytesPacket) -> Self {
397 Self::Bytes(packet)
398 }
399}
400
401impl PacketRefMut<'_> {
402 pub fn data<I>(&self, index: I) -> Option<&<I as SliceIndex<[u8]>>::Output>
403 where
404 I: SliceIndex<[u8]>,
405 {
406 match self {
407 Self::Packet(packet) => packet.data(index),
408 Self::Bytes(packet) => packet.data(index),
409 }
410 }
411
412 #[inline]
413 pub fn meta(&self) -> &Meta {
414 match self {
415 Self::Packet(packet) => packet.meta(),
416 Self::Bytes(packet) => packet.meta(),
417 }
418 }
419
420 #[inline]
421 pub fn meta_mut(&mut self) -> &mut Meta {
422 match self {
423 Self::Packet(packet) => packet.meta_mut(),
424 Self::Bytes(packet) => packet.meta_mut(),
425 }
426 }
427
428 pub fn deserialize_slice<T, I>(&self, index: I) -> bincode::Result<T>
429 where
430 T: serde::de::DeserializeOwned,
431 I: SliceIndex<[u8], Output = [u8]>,
432 {
433 match self {
434 Self::Packet(packet) => packet.deserialize_slice(index),
435 Self::Bytes(packet) => packet.deserialize_slice(index),
436 }
437 }
438
439 #[cfg(feature = "dev-context-only-utils")]
440 #[inline]
441 pub fn copy_from_slice(&mut self, src: &[u8]) {
442 match self {
443 Self::Packet(packet) => {
444 let size = src.len();
445 packet.buffer_mut()[..size].copy_from_slice(src);
446 }
447 Self::Bytes(packet) => packet.copy_from_slice(src),
448 }
449 }
450
451 #[inline]
452 pub fn as_ref(&self) -> PacketRef<'_> {
453 match self {
454 Self::Packet(packet) => PacketRef::Packet(packet),
455 Self::Bytes(packet) => PacketRef::Bytes(packet),
456 }
457 }
458}
459
460pub enum PacketBatchIter<'a> {
461 Pinned(std::slice::Iter<'a, Packet>),
462 Bytes(std::slice::Iter<'a, BytesPacket>),
463}
464
465impl DoubleEndedIterator for PacketBatchIter<'_> {
466 fn next_back(&mut self) -> Option<Self::Item> {
467 match self {
468 Self::Pinned(iter) => iter.next_back().map(PacketRef::Packet),
469 Self::Bytes(iter) => iter.next_back().map(PacketRef::Bytes),
470 }
471 }
472}
473
474impl<'a> Iterator for PacketBatchIter<'a> {
475 type Item = PacketRef<'a>;
476
477 fn next(&mut self) -> Option<Self::Item> {
478 match self {
479 Self::Pinned(iter) => iter.next().map(PacketRef::Packet),
480 Self::Bytes(iter) => iter.next().map(PacketRef::Bytes),
481 }
482 }
483}
484
485pub enum PacketBatchIterMut<'a> {
486 Pinned(std::slice::IterMut<'a, Packet>),
487 Bytes(std::slice::IterMut<'a, BytesPacket>),
488}
489
490impl DoubleEndedIterator for PacketBatchIterMut<'_> {
491 fn next_back(&mut self) -> Option<Self::Item> {
492 match self {
493 Self::Pinned(iter) => iter.next_back().map(PacketRefMut::Packet),
494 Self::Bytes(iter) => iter.next_back().map(PacketRefMut::Bytes),
495 }
496 }
497}
498
499impl<'a> Iterator for PacketBatchIterMut<'a> {
500 type Item = PacketRefMut<'a>;
501
502 fn next(&mut self) -> Option<Self::Item> {
503 match self {
504 Self::Pinned(iter) => iter.next().map(PacketRefMut::Packet),
505 Self::Bytes(iter) => iter.next().map(PacketRefMut::Bytes),
506 }
507 }
508}
509
510type PacketParIter<'a> = rayon::slice::Iter<'a, Packet>;
511type BytesPacketParIter<'a> = rayon::slice::Iter<'a, BytesPacket>;
512
513pub enum PacketBatchParIter<'a> {
514 Pinned(
515 rayon::iter::Map<
516 PacketParIter<'a>,
517 fn(<PacketParIter<'a> as ParallelIterator>::Item) -> PacketRef<'a>,
518 >,
519 ),
520 Bytes(
521 rayon::iter::Map<
522 BytesPacketParIter<'a>,
523 fn(<BytesPacketParIter<'a> as ParallelIterator>::Item) -> PacketRef<'a>,
524 >,
525 ),
526}
527
528impl<'a> ParallelIterator for PacketBatchParIter<'a> {
529 type Item = PacketRef<'a>;
530 fn drive_unindexed<C>(self, consumer: C) -> C::Result
531 where
532 C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
533 {
534 match self {
535 Self::Pinned(iter) => iter.drive_unindexed(consumer),
536 Self::Bytes(iter) => iter.drive_unindexed(consumer),
537 }
538 }
539}
540
541impl IndexedParallelIterator for PacketBatchParIter<'_> {
542 fn len(&self) -> usize {
543 match self {
544 Self::Pinned(iter) => iter.len(),
545 Self::Bytes(iter) => iter.len(),
546 }
547 }
548
549 fn drive<C: rayon::iter::plumbing::Consumer<Self::Item>>(self, consumer: C) -> C::Result {
550 match self {
551 Self::Pinned(iter) => iter.drive(consumer),
552 Self::Bytes(iter) => iter.drive(consumer),
553 }
554 }
555
556 fn with_producer<CB: rayon::iter::plumbing::ProducerCallback<Self::Item>>(
557 self,
558 callback: CB,
559 ) -> CB::Output {
560 match self {
561 Self::Pinned(iter) => iter.with_producer(callback),
562 Self::Bytes(iter) => iter.with_producer(callback),
563 }
564 }
565}
566
567type PacketParIterMut<'a> = rayon::slice::IterMut<'a, Packet>;
568type BytesPacketParIterMut<'a> = rayon::slice::IterMut<'a, BytesPacket>;
569
570pub enum PacketBatchParIterMut<'a> {
571 Pinned(
572 rayon::iter::Map<
573 PacketParIterMut<'a>,
574 fn(<PacketParIterMut<'a> as ParallelIterator>::Item) -> PacketRefMut<'a>,
575 >,
576 ),
577 Bytes(
578 rayon::iter::Map<
579 BytesPacketParIterMut<'a>,
580 fn(<BytesPacketParIterMut<'a> as ParallelIterator>::Item) -> PacketRefMut<'a>,
581 >,
582 ),
583}
584
585impl<'a> ParallelIterator for PacketBatchParIterMut<'a> {
586 type Item = PacketRefMut<'a>;
587 fn drive_unindexed<C>(self, consumer: C) -> C::Result
588 where
589 C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
590 {
591 match self {
592 Self::Pinned(iter) => iter.drive_unindexed(consumer),
593 Self::Bytes(iter) => iter.drive_unindexed(consumer),
594 }
595 }
596}
597
598impl IndexedParallelIterator for PacketBatchParIterMut<'_> {
599 fn len(&self) -> usize {
600 match self {
601 Self::Pinned(iter) => iter.len(),
602 Self::Bytes(iter) => iter.len(),
603 }
604 }
605
606 fn drive<C: rayon::iter::plumbing::Consumer<Self::Item>>(self, consumer: C) -> C::Result {
607 match self {
608 Self::Pinned(iter) => iter.drive(consumer),
609 Self::Bytes(iter) => iter.drive(consumer),
610 }
611 }
612
613 fn with_producer<CB: rayon::iter::plumbing::ProducerCallback<Self::Item>>(
614 self,
615 callback: CB,
616 ) -> CB::Output {
617 match self {
618 Self::Pinned(iter) => iter.with_producer(callback),
619 Self::Bytes(iter) => iter.with_producer(callback),
620 }
621 }
622}
623
624#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
625#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
626pub struct PinnedPacketBatch {
627 packets: PinnedVec<Packet>,
628}
629
630pub type PacketBatchRecycler = Recycler<PinnedVec<Packet>>;
631
632impl PinnedPacketBatch {
633 pub fn new(packets: Vec<Packet>) -> Self {
634 let packets = PinnedVec::from_vec(packets);
635 Self { packets }
636 }
637
638 pub fn with_capacity(capacity: usize) -> Self {
639 let packets = PinnedVec::with_capacity(capacity);
640 Self { packets }
641 }
642
643 pub fn new_pinned_with_capacity(capacity: usize) -> Self {
644 let mut batch = Self::with_capacity(capacity);
645 batch.packets.reserve_and_pin(capacity);
646 batch
647 }
648
649 pub fn new_unpinned_with_recycler(
650 recycler: &PacketBatchRecycler,
651 capacity: usize,
652 name: &'static str,
653 ) -> Self {
654 let mut packets = recycler.allocate(name);
655 packets.reserve(capacity);
656 Self { packets }
657 }
658
659 pub fn new_with_recycler(
660 recycler: &PacketBatchRecycler,
661 capacity: usize,
662 name: &'static str,
663 ) -> Self {
664 let mut packets = recycler.allocate(name);
665 packets.reserve_and_pin(capacity);
666 Self { packets }
667 }
668
669 pub fn new_with_recycler_data(
670 recycler: &PacketBatchRecycler,
671 name: &'static str,
672 mut packets: Vec<Packet>,
673 ) -> Self {
674 let mut batch = Self::new_with_recycler(recycler, packets.len(), name);
675 batch.packets.append(&mut packets);
676 batch
677 }
678
679 pub fn new_unpinned_with_recycler_data_and_dests<S, T>(
680 recycler: &PacketBatchRecycler,
681 name: &'static str,
682 dests_and_data: impl IntoIterator<Item = (S, T), IntoIter: ExactSizeIterator>,
683 ) -> Self
684 where
685 S: Borrow<SocketAddr>,
686 T: solana_packet::Encode,
687 {
688 let dests_and_data = dests_and_data.into_iter();
689 let mut batch = Self::new_unpinned_with_recycler(recycler, dests_and_data.len(), name);
690 batch
691 .packets
692 .resize(dests_and_data.len(), Packet::default());
693
694 for ((addr, data), packet) in dests_and_data.zip(batch.packets.iter_mut()) {
695 let addr = addr.borrow();
696 if !addr.ip().is_unspecified() && addr.port() != 0 {
697 if let Err(e) = Packet::populate_packet(packet, Some(addr), &data) {
698 error!("Couldn't write to packet {e:?}. Data skipped.");
702 packet.meta_mut().set_discard(true);
703 }
704 } else {
705 trace!("Dropping packet, as destination is unknown");
706 packet.meta_mut().set_discard(true);
707 }
708 }
709 batch
710 }
711
712 pub fn new_unpinned_with_recycler_data(
713 recycler: &PacketBatchRecycler,
714 name: &'static str,
715 mut packets: Vec<Packet>,
716 ) -> Self {
717 let mut batch = Self::new_unpinned_with_recycler(recycler, packets.len(), name);
718 batch.packets.append(&mut packets);
719 batch
720 }
721
722 pub fn set_addr(&mut self, addr: &SocketAddr) {
723 for p in self.iter_mut() {
724 p.meta_mut().set_socket_addr(addr);
725 }
726 }
727}
728
729impl Deref for PinnedPacketBatch {
730 type Target = PinnedVec<Packet>;
731
732 fn deref(&self) -> &Self::Target {
733 &self.packets
734 }
735}
736
737impl DerefMut for PinnedPacketBatch {
738 fn deref_mut(&mut self) -> &mut Self::Target {
739 &mut self.packets
740 }
741}
742
743impl<I: SliceIndex<[Packet]>> Index<I> for PinnedPacketBatch {
744 type Output = I::Output;
745
746 #[inline]
747 fn index(&self, index: I) -> &Self::Output {
748 &self.packets[index]
749 }
750}
751
752impl<I: SliceIndex<[Packet]>> IndexMut<I> for PinnedPacketBatch {
753 #[inline]
754 fn index_mut(&mut self, index: I) -> &mut Self::Output {
755 &mut self.packets[index]
756 }
757}
758
759impl<'a> IntoIterator for &'a PinnedPacketBatch {
760 type Item = &'a Packet;
761 type IntoIter = Iter<'a, Packet>;
762
763 fn into_iter(self) -> Self::IntoIter {
764 self.packets.iter()
765 }
766}
767
768impl<'a> IntoParallelIterator for &'a PinnedPacketBatch {
769 type Iter = rayon::slice::Iter<'a, Packet>;
770 type Item = &'a Packet;
771 fn into_par_iter(self) -> Self::Iter {
772 self.packets.par_iter()
773 }
774}
775
776impl<'a> IntoParallelIterator for &'a mut PinnedPacketBatch {
777 type Iter = rayon::slice::IterMut<'a, Packet>;
778 type Item = &'a mut Packet;
779 fn into_par_iter(self) -> Self::Iter {
780 self.packets.par_iter_mut()
781 }
782}
783
784impl From<PinnedPacketBatch> for Vec<Packet> {
785 fn from(batch: PinnedPacketBatch) -> Self {
786 batch.packets.into()
787 }
788}
789
790pub fn to_packet_batches<T: Serialize>(items: &[T], chunk_size: usize) -> Vec<PacketBatch> {
791 items
792 .chunks(chunk_size)
793 .map(|batch_items| {
794 let mut batch = PinnedPacketBatch::with_capacity(batch_items.len());
795 batch.resize(batch_items.len(), Packet::default());
796 for (item, packet) in batch_items.iter().zip(batch.packets.iter_mut()) {
797 Packet::populate_packet(packet, None, item).expect("serialize request");
798 }
799 batch.into()
800 })
801 .collect()
802}
803
804#[cfg(test)]
805fn to_packet_batches_for_tests<T: Serialize>(items: &[T]) -> Vec<PacketBatch> {
806 to_packet_batches(items, NUM_PACKETS)
807}
808
809#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
810#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
811pub struct BytesPacketBatch {
812 packets: Vec<BytesPacket>,
813}
814
815impl BytesPacketBatch {
816 pub fn new() -> Self {
817 Self::default()
818 }
819
820 pub fn with_capacity(capacity: usize) -> Self {
821 let packets = Vec::with_capacity(capacity);
822 Self { packets }
823 }
824
825 pub fn to_pinned_packet_batch(&self) -> PinnedPacketBatch {
826 let mut batch = PinnedPacketBatch::new_pinned_with_capacity(self.len());
827 for bytes_packet in self.iter() {
828 let mut packet = Packet::default();
829 let size = bytes_packet.meta().size;
830 *packet.meta_mut() = bytes_packet.meta().clone();
831 packet.meta_mut().size = size;
832 packet.buffer_mut()[..size].copy_from_slice(&bytes_packet.buffer);
833
834 batch.push(packet);
835 }
836
837 batch
838 }
839}
840
841impl Deref for BytesPacketBatch {
842 type Target = Vec<BytesPacket>;
843
844 fn deref(&self) -> &Self::Target {
845 &self.packets
846 }
847}
848
849impl DerefMut for BytesPacketBatch {
850 fn deref_mut(&mut self) -> &mut Self::Target {
851 &mut self.packets
852 }
853}
854
855impl From<Vec<BytesPacket>> for BytesPacketBatch {
856 fn from(packets: Vec<BytesPacket>) -> Self {
857 Self { packets }
858 }
859}
860
861impl FromIterator<BytesPacket> for BytesPacketBatch {
862 fn from_iter<T: IntoIterator<Item = BytesPacket>>(iter: T) -> Self {
863 let packets = Vec::from_iter(iter);
864 Self { packets }
865 }
866}
867
868impl<'a> IntoIterator for &'a BytesPacketBatch {
869 type Item = &'a BytesPacket;
870 type IntoIter = Iter<'a, BytesPacket>;
871
872 fn into_iter(self) -> Self::IntoIter {
873 self.packets.iter()
874 }
875}
876
877impl<'a> IntoParallelIterator for &'a BytesPacketBatch {
878 type Iter = rayon::slice::Iter<'a, BytesPacket>;
879 type Item = &'a BytesPacket;
880 fn into_par_iter(self) -> Self::Iter {
881 self.packets.par_iter()
882 }
883}
884
885impl<'a> IntoParallelIterator for &'a mut BytesPacketBatch {
886 type Iter = rayon::slice::IterMut<'a, BytesPacket>;
887 type Item = &'a mut BytesPacket;
888 fn into_par_iter(self) -> Self::Iter {
889 self.packets.par_iter_mut()
890 }
891}
892
893pub fn deserialize_from_with_limit<R, T>(reader: R) -> bincode::Result<T>
894where
895 R: Read,
896 T: DeserializeOwned,
897{
898 bincode::options()
901 .with_limit(PACKET_DATA_SIZE as u64)
902 .with_fixint_encoding()
903 .allow_trailing_bytes()
904 .deserialize_from(reader)
905}
906
907#[cfg(test)]
908mod tests {
909 use {
910 super::*, solana_hash::Hash, solana_keypair::Keypair, solana_signer::Signer,
911 solana_system_transaction::transfer,
912 };
913
914 #[test]
915 fn test_to_packet_batches() {
916 let keypair = Keypair::new();
917 let hash = Hash::new_from_array([1; 32]);
918 let tx = transfer(&keypair, &keypair.pubkey(), 1, hash);
919 let rv = to_packet_batches_for_tests(&[tx.clone(); 1]);
920 assert_eq!(rv.len(), 1);
921 assert_eq!(rv[0].len(), 1);
922
923 #[allow(clippy::useless_vec)]
924 let rv = to_packet_batches_for_tests(&vec![tx.clone(); NUM_PACKETS]);
925 assert_eq!(rv.len(), 1);
926 assert_eq!(rv[0].len(), NUM_PACKETS);
927
928 #[allow(clippy::useless_vec)]
929 let rv = to_packet_batches_for_tests(&vec![tx; NUM_PACKETS + 1]);
930 assert_eq!(rv.len(), 2);
931 assert_eq!(rv[0].len(), NUM_PACKETS);
932 assert_eq!(rv[1].len(), 1);
933 }
934
935 #[test]
936 fn test_to_packets_pinning() {
937 let recycler = PacketBatchRecycler::default();
938 for i in 0..2 {
939 let _first_packets =
940 PinnedPacketBatch::new_with_recycler(&recycler, i + 1, "first one");
941 }
942 }
943}