1#[cfg(feature = "dev-context-only-utils")]
3use bytes::{BufMut, BytesMut};
4use {
5 crate::{cuda_runtime::PinnedVec, recycler::Recycler},
6 bincode::config::Options,
7 bytes::Bytes,
8 rayon::{
9 iter::{IndexedParallelIterator, ParallelIterator},
10 prelude::{IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator},
11 },
12 serde::{de::DeserializeOwned, Deserialize, Serialize},
13 std::{
14 borrow::Borrow,
15 io::Read,
16 net::SocketAddr,
17 ops::{Deref, DerefMut, Index, IndexMut},
18 slice::{Iter, SliceIndex},
19 },
20};
21pub use {
22 bytes,
23 solana_packet::{self, Meta, Packet, PacketFlags, PACKET_DATA_SIZE},
24};
25
26pub const NUM_PACKETS: usize = 1024 * 8;
27
28pub const PACKETS_PER_BATCH: usize = 64;
29pub const NUM_RCVMMSGS: usize = 64;
30
31#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
33#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
34pub struct BytesPacket {
35 buffer: Bytes,
36 meta: Meta,
37}
38
39impl BytesPacket {
40 pub fn new(buffer: Bytes, meta: Meta) -> Self {
41 Self { buffer, meta }
42 }
43
44 #[cfg(feature = "dev-context-only-utils")]
45 pub fn empty() -> Self {
46 Self {
47 buffer: Bytes::new(),
48 meta: Meta::default(),
49 }
50 }
51
52 #[cfg(feature = "dev-context-only-utils")]
53 pub fn from_bytes(dest: Option<&SocketAddr>, buffer: Bytes) -> Self {
54 let mut meta = Meta {
55 size: buffer.len(),
56 ..Default::default()
57 };
58 if let Some(dest) = dest {
59 meta.set_socket_addr(dest);
60 }
61
62 Self { buffer, meta }
63 }
64
65 #[cfg(feature = "dev-context-only-utils")]
66 pub fn from_data<T>(dest: Option<&SocketAddr>, data: T) -> bincode::Result<Self>
67 where
68 T: solana_packet::Encode,
69 {
70 let buffer = BytesMut::with_capacity(PACKET_DATA_SIZE);
71 let mut writer = buffer.writer();
72 data.encode(&mut writer)?;
73 let buffer = writer.into_inner();
74 let buffer = buffer.freeze();
75
76 let mut meta = Meta {
77 size: buffer.len(),
78 ..Default::default()
79 };
80 if let Some(dest) = dest {
81 meta.set_socket_addr(dest);
82 }
83
84 Ok(Self { buffer, meta })
85 }
86
87 #[inline]
88 pub fn data<I>(&self, index: I) -> Option<&<I as SliceIndex<[u8]>>::Output>
89 where
90 I: SliceIndex<[u8]>,
91 {
92 if self.meta.discard() {
93 None
94 } else {
95 self.buffer.get(index)
96 }
97 }
98
99 #[inline]
100 pub fn meta(&self) -> &Meta {
101 &self.meta
102 }
103
104 #[inline]
105 pub fn meta_mut(&mut self) -> &mut Meta {
106 &mut self.meta
107 }
108
109 pub fn deserialize_slice<T, I>(&self, index: I) -> bincode::Result<T>
110 where
111 T: serde::de::DeserializeOwned,
112 I: SliceIndex<[u8], Output = [u8]>,
113 {
114 let bytes = self.data(index).ok_or(bincode::ErrorKind::SizeLimit)?;
115 bincode::options()
116 .with_limit(self.meta().size as u64)
117 .with_fixint_encoding()
118 .reject_trailing_bytes()
119 .deserialize(bytes)
120 }
121
122 #[cfg(feature = "dev-context-only-utils")]
123 pub fn copy_from_slice(&mut self, slice: &[u8]) {
124 self.buffer = Bytes::from(slice.to_vec());
125 }
126
127 #[inline]
128 pub fn as_ref(&self) -> PacketRef<'_> {
129 PacketRef::Bytes(self)
130 }
131
132 #[inline]
133 pub fn as_mut(&mut self) -> PacketRefMut<'_> {
134 PacketRefMut::Bytes(self)
135 }
136}
137
138#[cfg_attr(feature = "frozen-abi", derive(AbiExample, AbiEnumVisitor))]
139#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
140pub enum PacketBatch {
141 Pinned(PinnedPacketBatch),
142 Bytes(BytesPacketBatch),
143}
144
145impl PacketBatch {
146 #[cfg(feature = "dev-context-only-utils")]
147 pub fn first(&self) -> Option<PacketRef<'_>> {
148 match self {
149 Self::Pinned(batch) => batch.first().map(PacketRef::from),
150 Self::Bytes(batch) => batch.first().map(PacketRef::from),
151 }
152 }
153
154 #[cfg(feature = "dev-context-only-utils")]
155 pub fn first_mut(&mut self) -> Option<PacketRefMut<'_>> {
156 match self {
157 Self::Pinned(batch) => batch.first_mut().map(PacketRefMut::from),
158 Self::Bytes(batch) => batch.first_mut().map(PacketRefMut::from),
159 }
160 }
161
162 pub fn is_empty(&self) -> bool {
164 match self {
165 Self::Pinned(batch) => batch.is_empty(),
166 Self::Bytes(batch) => batch.is_empty(),
167 }
168 }
169
170 pub fn get(&self, index: usize) -> Option<PacketRef<'_>> {
172 match self {
173 Self::Pinned(batch) => batch.get(index).map(PacketRef::from),
174 Self::Bytes(batch) => batch.get(index).map(PacketRef::from),
175 }
176 }
177
178 pub fn get_mut(&mut self, index: usize) -> Option<PacketRefMut<'_>> {
179 match self {
180 Self::Pinned(batch) => batch.get_mut(index).map(PacketRefMut::from),
181 Self::Bytes(batch) => batch.get_mut(index).map(PacketRefMut::from),
182 }
183 }
184
185 pub fn iter(&self) -> PacketBatchIter<'_> {
186 match self {
187 Self::Pinned(batch) => PacketBatchIter::Pinned(batch.iter()),
188 Self::Bytes(batch) => PacketBatchIter::Bytes(batch.iter()),
189 }
190 }
191
192 pub fn iter_mut(&mut self) -> PacketBatchIterMut<'_> {
193 match self {
194 Self::Pinned(batch) => PacketBatchIterMut::Pinned(batch.iter_mut()),
195 Self::Bytes(batch) => PacketBatchIterMut::Bytes(batch.iter_mut()),
196 }
197 }
198
199 pub fn par_iter(&self) -> PacketBatchParIter {
200 match self {
201 Self::Pinned(batch) => {
202 PacketBatchParIter::Pinned(batch.par_iter().map(PacketRef::from))
203 }
204 Self::Bytes(batch) => PacketBatchParIter::Bytes(batch.par_iter().map(PacketRef::from)),
205 }
206 }
207
208 pub fn par_iter_mut(&mut self) -> PacketBatchParIterMut {
209 match self {
210 Self::Pinned(batch) => {
211 PacketBatchParIterMut::Pinned(batch.par_iter_mut().map(PacketRefMut::from))
212 }
213 Self::Bytes(batch) => {
214 PacketBatchParIterMut::Bytes(batch.par_iter_mut().map(PacketRefMut::from))
215 }
216 }
217 }
218
219 pub fn len(&self) -> usize {
220 match self {
221 Self::Pinned(batch) => batch.len(),
222 Self::Bytes(batch) => batch.len(),
223 }
224 }
225}
226
227impl From<PinnedPacketBatch> for PacketBatch {
228 fn from(batch: PinnedPacketBatch) -> Self {
229 Self::Pinned(batch)
230 }
231}
232
233impl From<BytesPacketBatch> for PacketBatch {
234 fn from(batch: BytesPacketBatch) -> Self {
235 Self::Bytes(batch)
236 }
237}
238
239impl From<Vec<BytesPacket>> for PacketBatch {
240 fn from(batch: Vec<BytesPacket>) -> Self {
241 Self::Bytes(BytesPacketBatch::from(batch))
242 }
243}
244
245impl<'a> IntoIterator for &'a PacketBatch {
246 type Item = PacketRef<'a>;
247 type IntoIter = PacketBatchIter<'a>;
248 fn into_iter(self) -> Self::IntoIter {
249 self.iter()
250 }
251}
252
253impl<'a> IntoIterator for &'a mut PacketBatch {
254 type Item = PacketRefMut<'a>;
255 type IntoIter = PacketBatchIterMut<'a>;
256 fn into_iter(self) -> Self::IntoIter {
257 self.iter_mut()
258 }
259}
260
261impl<'a> IntoParallelIterator for &'a PacketBatch {
262 type Iter = PacketBatchParIter<'a>;
263 type Item = PacketRef<'a>;
264 fn into_par_iter(self) -> Self::Iter {
265 self.par_iter()
266 }
267}
268
269impl<'a> IntoParallelIterator for &'a mut PacketBatch {
270 type Iter = PacketBatchParIterMut<'a>;
271 type Item = PacketRefMut<'a>;
272 fn into_par_iter(self) -> Self::Iter {
273 self.par_iter_mut()
274 }
275}
276
277#[derive(Clone, Copy, Debug, Eq)]
278pub enum PacketRef<'a> {
279 Packet(&'a Packet),
280 Bytes(&'a BytesPacket),
281}
282
283impl PartialEq for PacketRef<'_> {
284 fn eq(&self, other: &PacketRef<'_>) -> bool {
285 self.meta().eq(other.meta()) && self.data(..).eq(&other.data(..))
286 }
287}
288
289impl<'a> From<&'a Packet> for PacketRef<'a> {
290 fn from(packet: &'a Packet) -> Self {
291 Self::Packet(packet)
292 }
293}
294
295impl<'a> From<&'a mut Packet> for PacketRef<'a> {
296 fn from(packet: &'a mut Packet) -> Self {
297 Self::Packet(packet)
298 }
299}
300
301impl<'a> From<&'a BytesPacket> for PacketRef<'a> {
302 fn from(packet: &'a BytesPacket) -> Self {
303 Self::Bytes(packet)
304 }
305}
306
307impl<'a> From<&'a mut BytesPacket> for PacketRef<'a> {
308 fn from(packet: &'a mut BytesPacket) -> Self {
309 Self::Bytes(packet)
310 }
311}
312
313impl<'a> PacketRef<'a> {
314 pub fn data<I>(&self, index: I) -> Option<&'a <I as SliceIndex<[u8]>>::Output>
315 where
316 I: SliceIndex<[u8]>,
317 {
318 match self {
319 Self::Packet(packet) => packet.data(index),
320 Self::Bytes(packet) => packet.data(index),
321 }
322 }
323
324 #[inline]
325 pub fn meta(&self) -> &Meta {
326 match self {
327 Self::Packet(packet) => packet.meta(),
328 Self::Bytes(packet) => packet.meta(),
329 }
330 }
331
332 pub fn deserialize_slice<T, I>(&self, index: I) -> bincode::Result<T>
333 where
334 T: serde::de::DeserializeOwned,
335 I: SliceIndex<[u8], Output = [u8]>,
336 {
337 match self {
338 Self::Packet(packet) => packet.deserialize_slice(index),
339 Self::Bytes(packet) => packet.deserialize_slice(index),
340 }
341 }
342
343 pub fn to_bytes_packet(&self) -> BytesPacket {
344 match self {
345 Self::Packet(packet) => {
348 let buffer = packet
349 .data(..)
350 .map(|data| Bytes::from(data.to_vec()))
351 .unwrap_or_else(Bytes::new);
352 BytesPacket::new(buffer, self.meta().clone())
353 }
354 Self::Bytes(packet) => packet.to_owned().to_owned(),
360 }
361 }
362}
363
364#[derive(Debug, Eq)]
365pub enum PacketRefMut<'a> {
366 Packet(&'a mut Packet),
367 Bytes(&'a mut BytesPacket),
368}
369
370impl<'a> PartialEq for PacketRefMut<'a> {
371 fn eq(&self, other: &PacketRefMut<'a>) -> bool {
372 self.data(..).eq(&other.data(..)) && self.meta().eq(other.meta())
373 }
374}
375
376impl<'a> From<&'a mut Packet> for PacketRefMut<'a> {
377 fn from(packet: &'a mut Packet) -> Self {
378 Self::Packet(packet)
379 }
380}
381
382impl<'a> From<&'a mut BytesPacket> for PacketRefMut<'a> {
383 fn from(packet: &'a mut BytesPacket) -> Self {
384 Self::Bytes(packet)
385 }
386}
387
388impl PacketRefMut<'_> {
389 pub fn data<I>(&self, index: I) -> Option<&<I as SliceIndex<[u8]>>::Output>
390 where
391 I: SliceIndex<[u8]>,
392 {
393 match self {
394 Self::Packet(packet) => packet.data(index),
395 Self::Bytes(packet) => packet.data(index),
396 }
397 }
398
399 #[inline]
400 pub fn meta(&self) -> &Meta {
401 match self {
402 Self::Packet(packet) => packet.meta(),
403 Self::Bytes(packet) => packet.meta(),
404 }
405 }
406
407 #[inline]
408 pub fn meta_mut(&mut self) -> &mut Meta {
409 match self {
410 Self::Packet(packet) => packet.meta_mut(),
411 Self::Bytes(packet) => packet.meta_mut(),
412 }
413 }
414
415 pub fn deserialize_slice<T, I>(&self, index: I) -> bincode::Result<T>
416 where
417 T: serde::de::DeserializeOwned,
418 I: SliceIndex<[u8], Output = [u8]>,
419 {
420 match self {
421 Self::Packet(packet) => packet.deserialize_slice(index),
422 Self::Bytes(packet) => packet.deserialize_slice(index),
423 }
424 }
425
426 #[cfg(feature = "dev-context-only-utils")]
427 #[inline]
428 pub fn copy_from_slice(&mut self, src: &[u8]) {
429 match self {
430 Self::Packet(packet) => {
431 let size = src.len();
432 packet.buffer_mut()[..size].copy_from_slice(src);
433 }
434 Self::Bytes(packet) => packet.copy_from_slice(src),
435 }
436 }
437
438 #[inline]
439 pub fn as_ref(&self) -> PacketRef<'_> {
440 match self {
441 Self::Packet(packet) => PacketRef::Packet(packet),
442 Self::Bytes(packet) => PacketRef::Bytes(packet),
443 }
444 }
445}
446
447pub enum PacketBatchIter<'a> {
448 Pinned(std::slice::Iter<'a, Packet>),
449 Bytes(std::slice::Iter<'a, BytesPacket>),
450}
451
452impl DoubleEndedIterator for PacketBatchIter<'_> {
453 fn next_back(&mut self) -> Option<Self::Item> {
454 match self {
455 Self::Pinned(iter) => iter.next_back().map(PacketRef::Packet),
456 Self::Bytes(iter) => iter.next_back().map(PacketRef::Bytes),
457 }
458 }
459}
460
461impl<'a> Iterator for PacketBatchIter<'a> {
462 type Item = PacketRef<'a>;
463
464 fn next(&mut self) -> Option<Self::Item> {
465 match self {
466 Self::Pinned(iter) => iter.next().map(PacketRef::Packet),
467 Self::Bytes(iter) => iter.next().map(PacketRef::Bytes),
468 }
469 }
470}
471
472pub enum PacketBatchIterMut<'a> {
473 Pinned(std::slice::IterMut<'a, Packet>),
474 Bytes(std::slice::IterMut<'a, BytesPacket>),
475}
476
477impl DoubleEndedIterator for PacketBatchIterMut<'_> {
478 fn next_back(&mut self) -> Option<Self::Item> {
479 match self {
480 Self::Pinned(iter) => iter.next_back().map(PacketRefMut::Packet),
481 Self::Bytes(iter) => iter.next_back().map(PacketRefMut::Bytes),
482 }
483 }
484}
485
486impl<'a> Iterator for PacketBatchIterMut<'a> {
487 type Item = PacketRefMut<'a>;
488
489 fn next(&mut self) -> Option<Self::Item> {
490 match self {
491 Self::Pinned(iter) => iter.next().map(PacketRefMut::Packet),
492 Self::Bytes(iter) => iter.next().map(PacketRefMut::Bytes),
493 }
494 }
495}
496
497type PacketParIter<'a> = rayon::slice::Iter<'a, Packet>;
498type BytesPacketParIter<'a> = rayon::slice::Iter<'a, BytesPacket>;
499
500pub enum PacketBatchParIter<'a> {
501 Pinned(
502 rayon::iter::Map<
503 PacketParIter<'a>,
504 fn(<PacketParIter<'a> as ParallelIterator>::Item) -> PacketRef<'a>,
505 >,
506 ),
507 Bytes(
508 rayon::iter::Map<
509 BytesPacketParIter<'a>,
510 fn(<BytesPacketParIter<'a> as ParallelIterator>::Item) -> PacketRef<'a>,
511 >,
512 ),
513}
514
515impl<'a> ParallelIterator for PacketBatchParIter<'a> {
516 type Item = PacketRef<'a>;
517 fn drive_unindexed<C>(self, consumer: C) -> C::Result
518 where
519 C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
520 {
521 match self {
522 Self::Pinned(iter) => iter.drive_unindexed(consumer),
523 Self::Bytes(iter) => iter.drive_unindexed(consumer),
524 }
525 }
526}
527
528impl IndexedParallelIterator for PacketBatchParIter<'_> {
529 fn len(&self) -> usize {
530 match self {
531 Self::Pinned(iter) => iter.len(),
532 Self::Bytes(iter) => iter.len(),
533 }
534 }
535
536 fn drive<C: rayon::iter::plumbing::Consumer<Self::Item>>(self, consumer: C) -> C::Result {
537 match self {
538 Self::Pinned(iter) => iter.drive(consumer),
539 Self::Bytes(iter) => iter.drive(consumer),
540 }
541 }
542
543 fn with_producer<CB: rayon::iter::plumbing::ProducerCallback<Self::Item>>(
544 self,
545 callback: CB,
546 ) -> CB::Output {
547 match self {
548 Self::Pinned(iter) => iter.with_producer(callback),
549 Self::Bytes(iter) => iter.with_producer(callback),
550 }
551 }
552}
553
554type PacketParIterMut<'a> = rayon::slice::IterMut<'a, Packet>;
555type BytesPacketParIterMut<'a> = rayon::slice::IterMut<'a, BytesPacket>;
556
557pub enum PacketBatchParIterMut<'a> {
558 Pinned(
559 rayon::iter::Map<
560 PacketParIterMut<'a>,
561 fn(<PacketParIterMut<'a> as ParallelIterator>::Item) -> PacketRefMut<'a>,
562 >,
563 ),
564 Bytes(
565 rayon::iter::Map<
566 BytesPacketParIterMut<'a>,
567 fn(<BytesPacketParIterMut<'a> as ParallelIterator>::Item) -> PacketRefMut<'a>,
568 >,
569 ),
570}
571
572impl<'a> ParallelIterator for PacketBatchParIterMut<'a> {
573 type Item = PacketRefMut<'a>;
574 fn drive_unindexed<C>(self, consumer: C) -> C::Result
575 where
576 C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
577 {
578 match self {
579 Self::Pinned(iter) => iter.drive_unindexed(consumer),
580 Self::Bytes(iter) => iter.drive_unindexed(consumer),
581 }
582 }
583}
584
585impl IndexedParallelIterator for PacketBatchParIterMut<'_> {
586 fn len(&self) -> usize {
587 match self {
588 Self::Pinned(iter) => iter.len(),
589 Self::Bytes(iter) => iter.len(),
590 }
591 }
592
593 fn drive<C: rayon::iter::plumbing::Consumer<Self::Item>>(self, consumer: C) -> C::Result {
594 match self {
595 Self::Pinned(iter) => iter.drive(consumer),
596 Self::Bytes(iter) => iter.drive(consumer),
597 }
598 }
599
600 fn with_producer<CB: rayon::iter::plumbing::ProducerCallback<Self::Item>>(
601 self,
602 callback: CB,
603 ) -> CB::Output {
604 match self {
605 Self::Pinned(iter) => iter.with_producer(callback),
606 Self::Bytes(iter) => iter.with_producer(callback),
607 }
608 }
609}
610
611#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
612#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
613pub struct PinnedPacketBatch {
614 packets: PinnedVec<Packet>,
615}
616
617pub type PacketBatchRecycler = Recycler<PinnedVec<Packet>>;
618
619impl PinnedPacketBatch {
620 pub fn new(packets: Vec<Packet>) -> Self {
621 let packets = PinnedVec::from_vec(packets);
622 Self { packets }
623 }
624
625 pub fn with_capacity(capacity: usize) -> Self {
626 let packets = PinnedVec::with_capacity(capacity);
627 Self { packets }
628 }
629
630 pub fn new_pinned_with_capacity(capacity: usize) -> Self {
631 let mut batch = Self::with_capacity(capacity);
632 batch.packets.reserve_and_pin(capacity);
633 batch
634 }
635
636 pub fn new_unpinned_with_recycler(
637 recycler: &PacketBatchRecycler,
638 capacity: usize,
639 name: &'static str,
640 ) -> Self {
641 let mut packets = recycler.allocate(name);
642 packets.reserve(capacity);
643 Self { packets }
644 }
645
646 pub fn new_with_recycler(
647 recycler: &PacketBatchRecycler,
648 capacity: usize,
649 name: &'static str,
650 ) -> Self {
651 let mut packets = recycler.allocate(name);
652 packets.reserve_and_pin(capacity);
653 Self { packets }
654 }
655
656 pub fn new_with_recycler_data(
657 recycler: &PacketBatchRecycler,
658 name: &'static str,
659 mut packets: Vec<Packet>,
660 ) -> Self {
661 let mut batch = Self::new_with_recycler(recycler, packets.len(), name);
662 batch.packets.append(&mut packets);
663 batch
664 }
665
666 pub fn new_unpinned_with_recycler_data_and_dests<S, T>(
667 recycler: &PacketBatchRecycler,
668 name: &'static str,
669 dests_and_data: impl IntoIterator<Item = (S, T), IntoIter: ExactSizeIterator>,
670 ) -> Self
671 where
672 S: Borrow<SocketAddr>,
673 T: solana_packet::Encode,
674 {
675 let dests_and_data = dests_and_data.into_iter();
676 let mut batch = Self::new_unpinned_with_recycler(recycler, dests_and_data.len(), name);
677 batch
678 .packets
679 .resize(dests_and_data.len(), Packet::default());
680
681 for ((addr, data), packet) in dests_and_data.zip(batch.packets.iter_mut()) {
682 let addr = addr.borrow();
683 if !addr.ip().is_unspecified() && addr.port() != 0 {
684 if let Err(e) = Packet::populate_packet(packet, Some(addr), &data) {
685 error!("Couldn't write to packet {:?}. Data skipped.", e);
689 packet.meta_mut().set_discard(true);
690 }
691 } else {
692 trace!("Dropping packet, as destination is unknown");
693 packet.meta_mut().set_discard(true);
694 }
695 }
696 batch
697 }
698
699 pub fn new_unpinned_with_recycler_data(
700 recycler: &PacketBatchRecycler,
701 name: &'static str,
702 mut packets: Vec<Packet>,
703 ) -> Self {
704 let mut batch = Self::new_unpinned_with_recycler(recycler, packets.len(), name);
705 batch.packets.append(&mut packets);
706 batch
707 }
708
709 pub fn set_addr(&mut self, addr: &SocketAddr) {
710 for p in self.iter_mut() {
711 p.meta_mut().set_socket_addr(addr);
712 }
713 }
714}
715
716impl Deref for PinnedPacketBatch {
717 type Target = PinnedVec<Packet>;
718
719 fn deref(&self) -> &Self::Target {
720 &self.packets
721 }
722}
723
724impl DerefMut for PinnedPacketBatch {
725 fn deref_mut(&mut self) -> &mut Self::Target {
726 &mut self.packets
727 }
728}
729
730impl<I: SliceIndex<[Packet]>> Index<I> for PinnedPacketBatch {
731 type Output = I::Output;
732
733 #[inline]
734 fn index(&self, index: I) -> &Self::Output {
735 &self.packets[index]
736 }
737}
738
739impl<I: SliceIndex<[Packet]>> IndexMut<I> for PinnedPacketBatch {
740 #[inline]
741 fn index_mut(&mut self, index: I) -> &mut Self::Output {
742 &mut self.packets[index]
743 }
744}
745
746impl<'a> IntoIterator for &'a PinnedPacketBatch {
747 type Item = &'a Packet;
748 type IntoIter = Iter<'a, Packet>;
749
750 fn into_iter(self) -> Self::IntoIter {
751 self.packets.iter()
752 }
753}
754
755impl<'a> IntoParallelIterator for &'a PinnedPacketBatch {
756 type Iter = rayon::slice::Iter<'a, Packet>;
757 type Item = &'a Packet;
758 fn into_par_iter(self) -> Self::Iter {
759 self.packets.par_iter()
760 }
761}
762
763impl<'a> IntoParallelIterator for &'a mut PinnedPacketBatch {
764 type Iter = rayon::slice::IterMut<'a, Packet>;
765 type Item = &'a mut Packet;
766 fn into_par_iter(self) -> Self::Iter {
767 self.packets.par_iter_mut()
768 }
769}
770
771impl From<PinnedPacketBatch> for Vec<Packet> {
772 fn from(batch: PinnedPacketBatch) -> Self {
773 batch.packets.into()
774 }
775}
776
777pub fn to_packet_batches<T: Serialize>(items: &[T], chunk_size: usize) -> Vec<PacketBatch> {
778 items
779 .chunks(chunk_size)
780 .map(|batch_items| {
781 let mut batch = PinnedPacketBatch::with_capacity(batch_items.len());
782 batch.resize(batch_items.len(), Packet::default());
783 for (item, packet) in batch_items.iter().zip(batch.packets.iter_mut()) {
784 Packet::populate_packet(packet, None, item).expect("serialize request");
785 }
786 batch.into()
787 })
788 .collect()
789}
790
791#[cfg(test)]
792fn to_packet_batches_for_tests<T: Serialize>(items: &[T]) -> Vec<PacketBatch> {
793 to_packet_batches(items, NUM_PACKETS)
794}
795
796#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
797#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
798pub struct BytesPacketBatch {
799 packets: Vec<BytesPacket>,
800}
801
802impl BytesPacketBatch {
803 pub fn new() -> Self {
804 Self::default()
805 }
806
807 pub fn with_capacity(capacity: usize) -> Self {
808 let packets = Vec::with_capacity(capacity);
809 Self { packets }
810 }
811
812 pub fn to_pinned_packet_batch(&self) -> PinnedPacketBatch {
813 let mut batch = PinnedPacketBatch::new_pinned_with_capacity(self.len());
814 for bytes_packet in self.iter() {
815 let mut packet = Packet::default();
816 let size = bytes_packet.meta().size;
817 *packet.meta_mut() = bytes_packet.meta().clone();
818 packet.meta_mut().size = size;
819 packet.buffer_mut()[..size].copy_from_slice(&bytes_packet.buffer);
820
821 batch.push(packet);
822 }
823
824 batch
825 }
826}
827
828impl Deref for BytesPacketBatch {
829 type Target = Vec<BytesPacket>;
830
831 fn deref(&self) -> &Self::Target {
832 &self.packets
833 }
834}
835
836impl DerefMut for BytesPacketBatch {
837 fn deref_mut(&mut self) -> &mut Self::Target {
838 &mut self.packets
839 }
840}
841
842impl From<Vec<BytesPacket>> for BytesPacketBatch {
843 fn from(packets: Vec<BytesPacket>) -> Self {
844 Self { packets }
845 }
846}
847
848impl FromIterator<BytesPacket> for BytesPacketBatch {
849 fn from_iter<T: IntoIterator<Item = BytesPacket>>(iter: T) -> Self {
850 let packets = Vec::from_iter(iter);
851 Self { packets }
852 }
853}
854
855impl<'a> IntoIterator for &'a BytesPacketBatch {
856 type Item = &'a BytesPacket;
857 type IntoIter = Iter<'a, BytesPacket>;
858
859 fn into_iter(self) -> Self::IntoIter {
860 self.packets.iter()
861 }
862}
863
864impl<'a> IntoParallelIterator for &'a BytesPacketBatch {
865 type Iter = rayon::slice::Iter<'a, BytesPacket>;
866 type Item = &'a BytesPacket;
867 fn into_par_iter(self) -> Self::Iter {
868 self.packets.par_iter()
869 }
870}
871
872impl<'a> IntoParallelIterator for &'a mut BytesPacketBatch {
873 type Iter = rayon::slice::IterMut<'a, BytesPacket>;
874 type Item = &'a mut BytesPacket;
875 fn into_par_iter(self) -> Self::Iter {
876 self.packets.par_iter_mut()
877 }
878}
879
880pub fn deserialize_from_with_limit<R, T>(reader: R) -> bincode::Result<T>
881where
882 R: Read,
883 T: DeserializeOwned,
884{
885 bincode::options()
888 .with_limit(PACKET_DATA_SIZE as u64)
889 .with_fixint_encoding()
890 .allow_trailing_bytes()
891 .deserialize_from(reader)
892}
893
894#[cfg(test)]
895mod tests {
896 use {
897 super::*, solana_hash::Hash, solana_keypair::Keypair, solana_signer::Signer,
898 solana_system_transaction::transfer,
899 };
900
901 #[test]
902 fn test_to_packet_batches() {
903 let keypair = Keypair::new();
904 let hash = Hash::new_from_array([1; 32]);
905 let tx = transfer(&keypair, &keypair.pubkey(), 1, hash);
906 let rv = to_packet_batches_for_tests(&[tx.clone(); 1]);
907 assert_eq!(rv.len(), 1);
908 assert_eq!(rv[0].len(), 1);
909
910 #[allow(clippy::useless_vec)]
911 let rv = to_packet_batches_for_tests(&vec![tx.clone(); NUM_PACKETS]);
912 assert_eq!(rv.len(), 1);
913 assert_eq!(rv[0].len(), NUM_PACKETS);
914
915 #[allow(clippy::useless_vec)]
916 let rv = to_packet_batches_for_tests(&vec![tx; NUM_PACKETS + 1]);
917 assert_eq!(rv.len(), 2);
918 assert_eq!(rv[0].len(), NUM_PACKETS);
919 assert_eq!(rv[1].len(), 1);
920 }
921
922 #[test]
923 fn test_to_packets_pinning() {
924 let recycler = PacketBatchRecycler::default();
925 for i in 0..2 {
926 let _first_packets =
927 PinnedPacketBatch::new_with_recycler(&recycler, i + 1, "first one");
928 }
929 }
930}