Skip to main content

mfio/io/packet/
mod.rs

1use crate::std_prelude::*;
2
3use super::OpaqueStore;
4use crate::error::Error;
5pub use cglue::task::{CWaker, FastCWaker};
6use core::cell::UnsafeCell;
7use core::future::Future;
8use core::marker::{PhantomData, PhantomPinned};
9use core::mem::ManuallyDrop;
10use core::mem::MaybeUninit;
11use core::num::NonZeroI32;
12use core::pin::Pin;
13use core::sync::atomic::*;
14use core::task::{Context, Poll};
15use rangemap::RangeSet;
16use tarc::BaseArc;
17
18mod output;
19pub use output::*;
20mod view;
21pub use view::*;
22
23const LOCK_BIT: u64 = 1 << 63;
24const HAS_WAKER_BIT: u64 = 1 << 62;
25const FINALIZED_BIT: u64 = 1 << 61;
26const ALL_BITS: u64 = LOCK_BIT | HAS_WAKER_BIT | FINALIZED_BIT;
27
28struct RcAndWaker {
29    rc_and_flags: AtomicU64,
30    waker: UnsafeCell<MaybeUninit<CWaker>>,
31}
32
33impl Default for RcAndWaker {
34    fn default() -> Self {
35        Self {
36            rc_and_flags: 0.into(),
37            waker: UnsafeCell::new(MaybeUninit::uninit()),
38        }
39    }
40}
41
42impl core::fmt::Debug for RcAndWaker {
43    fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result {
44        write!(
45            fmt,
46            "{}",
47            (self.rc_and_flags.load(Ordering::Relaxed) & HAS_WAKER_BIT) != 0
48        )
49    }
50}
51
52impl RcAndWaker {
53    fn acquire(&self) -> bool {
54        (loop {
55            let flags = self.rc_and_flags.fetch_or(LOCK_BIT, Ordering::AcqRel);
56            if (flags & LOCK_BIT) == 0 {
57                break flags;
58            }
59            while self.rc_and_flags.load(Ordering::Relaxed) & LOCK_BIT != 0 {
60                core::hint::spin_loop();
61            }
62        } & HAS_WAKER_BIT)
63            != 0
64    }
65
66    pub fn take(&self) -> Option<CWaker> {
67        let ret = if self.acquire() {
68            Some(unsafe { (*self.waker.get()).assume_init_read() })
69        } else {
70            None
71        };
72        self.rc_and_flags
73            .fetch_and(!(LOCK_BIT | HAS_WAKER_BIT), Ordering::Release);
74        ret
75    }
76
77    pub fn write(&self, waker: CWaker) -> u64 {
78        if self.acquire() {
79            unsafe { core::ptr::drop_in_place((*self.waker.get()).as_mut_ptr()) }
80        }
81
82        unsafe { *self.waker.get() = MaybeUninit::new(waker) };
83
84        self.rc_and_flags.fetch_or(HAS_WAKER_BIT, Ordering::Relaxed);
85
86        self.rc_and_flags.fetch_and(!LOCK_BIT, Ordering::AcqRel) & !ALL_BITS
87    }
88
89    pub fn acquire_rc(&self) -> u64 {
90        self.rc_and_flags.load(Ordering::Acquire) & !ALL_BITS
91    }
92
93    pub fn dec_rc(&self) -> (u64, bool) {
94        let ret = self.rc_and_flags.fetch_sub(1, Ordering::AcqRel);
95        (ret & !ALL_BITS, (ret & HAS_WAKER_BIT) != 0)
96    }
97
98    pub fn inc_rc(&self) -> u64 {
99        self.rc_and_flags.fetch_add(1, Ordering::AcqRel) & !ALL_BITS
100    }
101
102    pub fn finalize(&self) {
103        self.rc_and_flags.fetch_or(FINALIZED_BIT, Ordering::Release);
104    }
105
106    pub fn wait_finalize(&self) {
107        while (self.rc_and_flags.load(Ordering::Acquire) & FINALIZED_BIT) == 0 {
108            core::hint::spin_loop();
109        }
110    }
111}
112
113/// Describes a full packet.
114///
115/// This packet is considered simple.
116///
117/// This packet cant be stored and modified on the stack. If `#[cfg(mfio_assume_linear_types)]` is
118/// enabled, then the packet can also be sent to I/O backend as reference. Otherwise, it first
119/// needs to be converted to an arc.
120#[repr(C)]
121pub struct FullPacket<T, Perms: PacketPerms> {
122    header: Packet<Perms>,
123    data: PackedLenData<T>,
124}
125
126impl<T, Perms: PacketPerms> FullPacket<T, Perms> {
127    /// Creates a new [`FullPacket`] with POD data.
128    pub fn new(val: T) -> Self
129    where
130        T: bytemuck::Pod,
131    {
132        unsafe { Self::new_unchecked(val) }
133    }
134
135    /// Creates a new `FullPacket` with `T` as backing byte storage.
136    ///
137    /// # Safety
138    ///
139    /// It must be okay to interpret `T` as raw bytes. If `T` is a complex type, undefined behavior
140    /// may be reached.
141    pub unsafe fn new_unchecked(data: T) -> Self {
142        FullPacket {
143            header: Packet::new_hdr(PacketVtblRef {
144                tag: PacketVtblTag::SimpleDirect as _,
145            }),
146            data: PackedLenData {
147                len: core::mem::size_of::<T>(),
148                data,
149            },
150        }
151    }
152}
153
154impl<T: bytemuck::Pod> FullPacket<MaybeUninit<T>, Write> {
155    /// Builds a new packet with uninitialized data of `T`.
156    pub fn new_uninit() -> Self {
157        unsafe { Self::new_unchecked(MaybeUninit::uninit()) }
158    }
159}
160
161impl<T, Perms: PacketPerms> AsRef<Packet<Perms>> for FullPacket<T, Perms> {
162    fn as_ref(&self) -> &Packet<Perms> {
163        &self.header
164    }
165}
166
167impl<T, Perms: PacketPerms> core::ops::Deref for FullPacket<T, Perms> {
168    type Target = Packet<Perms>;
169
170    fn deref(&self) -> &Self::Target {
171        self.as_ref()
172    }
173}
174
175/// Describes a vector packet.
176///
177/// This packet is considered simple.
178///
179/// This is a convenience structure to allow passing existing vectors to I/O system, and retrieving
180/// them afterwards. Only [`Vec::len`](Vec::len) amount of data is used for the packet.
181#[repr(C)]
182pub struct VecPacket<Perms: PacketPerms> {
183    header: Packet<Perms>,
184    data: PackedLenData<Perms::DataType>,
185    capacity: usize,
186    drop: unsafe extern "C" fn(Perms::DataType, usize, usize),
187}
188
189unsafe impl<Perms: PacketPerms> Send for VecPacket<Perms> {}
190unsafe impl<Perms: PacketPerms> Sync for VecPacket<Perms> {}
191
192impl<Perms: PacketPerms> AsRef<Packet<Perms>> for VecPacket<Perms> {
193    fn as_ref(&self) -> &Packet<Perms> {
194        &self.header
195    }
196}
197
198impl<Perms: PacketPerms> core::ops::Deref for VecPacket<Perms> {
199    type Target = Packet<Perms>;
200
201    fn deref(&self) -> &Self::Target {
202        self.as_ref()
203    }
204}
205
206impl VecPacket<Read> {
207    /// Convert this `VecPacket` into a `Vec`.
208    pub fn take(self) -> Vec<u8> {
209        let vec = unsafe {
210            Vec::from_raw_parts(
211                self.data.data.cast_mut().cast(),
212                self.data.len,
213                self.capacity,
214            )
215        };
216        core::mem::forget(self);
217        vec
218    }
219}
220
221impl VecPacket<Write> {
222    /// Convert this `VecPacket` into a `Vec`.
223    pub fn take(self) -> Vec<MaybeUninit<u8>> {
224        let vec =
225            unsafe { Vec::from_raw_parts(self.data.data.cast(), self.data.len, self.capacity) };
226        core::mem::forget(self);
227        vec
228    }
229}
230
231impl VecPacket<ReadWrite> {
232    /// Convert this `VecPacket` into a `Vec`.
233    pub fn take(self) -> Vec<u8> {
234        let vec =
235            unsafe { Vec::from_raw_parts(self.data.data.cast(), self.data.len, self.capacity) };
236        core::mem::forget(self);
237        vec
238    }
239}
240
241impl<Perms: PacketPerms> Drop for VecPacket<Perms> {
242    fn drop(&mut self) {
243        unsafe {
244            (self.drop)(self.data.data, self.data.len, self.capacity);
245        }
246    }
247}
248
249impl From<Vec<u8>> for VecPacket<Read> {
250    fn from(mut vec: Vec<u8>) -> Self {
251        unsafe extern "C" fn drop(
252            data: <Read as PacketPerms>::DataType,
253            len: usize,
254            capacity: usize,
255        ) {
256            let _ = Vec::from_raw_parts(data.cast_mut().cast::<u8>(), len, capacity);
257        }
258
259        let data = vec.as_mut_ptr();
260        let len = vec.len();
261        let capacity = vec.capacity();
262        core::mem::forget(vec);
263
264        Self {
265            header: unsafe {
266                Packet::new_hdr(PacketVtblRef {
267                    tag: PacketVtblTag::SimpleIndirect as _,
268                })
269            },
270            data: PackedLenData {
271                len,
272                data: data.cast(),
273            },
274            capacity,
275            drop,
276        }
277    }
278}
279
280impl<T: AnyBytes> From<Vec<T>> for VecPacket<Write> {
281    fn from(mut vec: Vec<T>) -> Self {
282        unsafe extern "C" fn drop(
283            data: <Write as PacketPerms>::DataType,
284            len: usize,
285            capacity: usize,
286        ) {
287            let _ = Vec::from_raw_parts(data.cast::<u8>(), len, capacity);
288        }
289
290        let data = vec.as_mut_ptr();
291        let len = vec.len();
292        let capacity = vec.capacity();
293        core::mem::forget(vec);
294
295        Self {
296            header: unsafe {
297                Packet::new_hdr(PacketVtblRef {
298                    tag: PacketVtblTag::SimpleIndirect as _,
299                })
300            },
301            data: PackedLenData {
302                len,
303                data: data.cast(),
304            },
305            capacity,
306            drop,
307        }
308    }
309}
310
311/// Represents a packet that owns a `Box<[u8]>`.
312///
313/// This packet is considered simple.
314///
315/// Once the packet is dropped, the underlying box is dropped as well.
316#[repr(C)]
317pub struct OwnedPacket<Perms: PacketPerms> {
318    header: Packet<Perms>,
319    data: PackedLenData<Perms::DataType>,
320    drop: unsafe extern "C" fn(Perms::DataType, usize),
321}
322
323unsafe impl<Perms: PacketPerms> Send for OwnedPacket<Perms> {}
324unsafe impl<Perms: PacketPerms> Sync for OwnedPacket<Perms> {}
325
326impl<Perms: PacketPerms> Drop for OwnedPacket<Perms> {
327    fn drop(&mut self) {
328        unsafe {
329            (self.drop)(self.data.data, self.data.len);
330        }
331    }
332}
333
334impl From<Box<[u8]>> for OwnedPacket<Read> {
335    fn from(slc: Box<[u8]>) -> Self {
336        unsafe extern "C" fn drop(data: <Read as PacketPerms>::DataType, len: usize) {
337            let _ = Box::from_raw(core::slice::from_raw_parts_mut(
338                data.cast_mut().cast::<u8>(),
339                len,
340            ));
341        }
342
343        let data = Box::leak(slc);
344
345        Self {
346            header: unsafe {
347                Packet::new_hdr(PacketVtblRef {
348                    tag: PacketVtblTag::SimpleIndirect as _,
349                })
350            },
351            data: PackedLenData {
352                len: data.len(),
353                data: data as *const [u8] as *const (),
354            },
355            drop,
356        }
357    }
358}
359
360impl From<Box<[u8]>> for OwnedPacket<ReadWrite> {
361    fn from(slc: Box<[u8]>) -> Self {
362        unsafe extern "C" fn drop(data: <ReadWrite as PacketPerms>::DataType, len: usize) {
363            let _ = Box::from_raw(core::slice::from_raw_parts_mut(data.cast::<u8>(), len));
364        }
365
366        let data = Box::leak(slc);
367
368        Self {
369            header: unsafe {
370                Packet::new_hdr(PacketVtblRef {
371                    tag: PacketVtblTag::SimpleIndirect as _,
372                })
373            },
374            data: PackedLenData {
375                len: data.len(),
376                data: data as *mut [u8] as *mut (),
377            },
378            drop,
379        }
380    }
381}
382
383impl<T: AnyBytes> From<Box<[T]>> for OwnedPacket<Write> {
384    fn from(slc: Box<[T]>) -> Self {
385        unsafe extern "C" fn drop(data: <Write as PacketPerms>::DataType, len: usize) {
386            let _ = Box::from_raw(core::slice::from_raw_parts_mut(
387                data.cast::<MaybeUninit<u8>>(),
388                len,
389            ));
390        }
391
392        let data = Box::leak(slc);
393
394        Self {
395            header: unsafe {
396                Packet::new_hdr(PacketVtblRef {
397                    tag: PacketVtblTag::SimpleIndirect as _,
398                })
399            },
400            data: PackedLenData {
401                len: data.len(),
402                data: data as *mut [T] as *mut (),
403            },
404            drop,
405        }
406    }
407}
408
409impl<Perms: PacketPerms> AsRef<Packet<Perms>> for OwnedPacket<Perms> {
410    fn as_ref(&self) -> &Packet<Perms> {
411        &self.header
412    }
413}
414
415impl<Perms: PacketPerms> core::ops::Deref for OwnedPacket<Perms> {
416    type Target = Packet<Perms>;
417
418    fn deref(&self) -> &Self::Target {
419        self.as_ref()
420    }
421}
422
423/// Represents a borrowed byte buffer packet.
424///
425/// This packet is considered simple.
426///
427/// A non-static `RefPacket` may be sent to the I/O backend, if `#[cfg(mfio_assume_linear_types)]`
428/// config switch is enabled. Otherwise, only static `RefPacket`s may be sent, and only when they
429/// are wrapped in an arc.
430#[repr(C)]
431pub struct RefPacket<'a, Perms: PacketPerms> {
432    header: Packet<Perms>,
433    data: PackedLenData<Perms::DataType>,
434    // mut because it is more constrained than const.
435    _phantom: PhantomData<&'a mut u8>,
436}
437
438#[cfg(mfio_assume_linear_types)]
439impl<'a> From<&'a [u8]> for RefPacket<'a, Read> {
440    fn from(slc: &'a [u8]) -> Self {
441        Self {
442            header: unsafe {
443                Packet::new_hdr(PacketVtblRef {
444                    tag: PacketVtblTag::SimpleIndirect as _,
445                })
446            },
447            data: PackedLenData {
448                len: slc.len(),
449                data: slc.as_ptr().cast(),
450            },
451            _phantom: PhantomData,
452        }
453    }
454}
455
456#[cfg(mfio_assume_linear_types)]
457impl<'a, T: AnyBytes> From<&'a mut [T]> for RefPacket<'a, Write> {
458    fn from(slc: &'a mut [T]) -> Self {
459        Self {
460            header: unsafe {
461                Packet::new_hdr(PacketVtblRef {
462                    tag: PacketVtblTag::SimpleIndirect as _,
463                })
464            },
465            data: PackedLenData {
466                len: slc.len(),
467                data: slc.as_mut_ptr().cast(),
468            },
469            _phantom: PhantomData,
470        }
471    }
472}
473
474#[cfg(mfio_assume_linear_types)]
475impl<'a> From<&'a mut [u8]> for RefPacket<'a, ReadWrite> {
476    fn from(slc: &'a mut [u8]) -> Self {
477        Self {
478            header: unsafe {
479                Packet::new_hdr(PacketVtblRef {
480                    tag: PacketVtblTag::SimpleIndirect as _,
481                })
482            },
483            data: PackedLenData {
484                len: slc.len(),
485                data: slc.as_mut_ptr().cast(),
486            },
487            _phantom: PhantomData,
488        }
489    }
490}
491
492impl<Perms: PacketPerms> AsRef<Packet<Perms>> for RefPacket<'_, Perms> {
493    fn as_ref(&self) -> &Packet<Perms> {
494        &self.header
495    }
496}
497
498impl<Perms: PacketPerms> core::ops::Deref for RefPacket<'_, Perms> {
499    type Target = Packet<Perms>;
500
501    fn deref(&self) -> &Self::Target {
502        self.as_ref()
503    }
504}
505
506/// Packed length + data pair for simple packets.
507#[repr(C, packed)]
508struct PackedLenData<T> {
509    len: usize,
510    data: T,
511}
512
513/// Represents different packet modes.
514#[repr(usize)]
515#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
516pub enum PacketVtblTag {
517    /// Simple packet where after header we have length+buffer with no indirection.
518    SimpleDirect = 0,
519    /// Simple packet where after header we have length+pointer to a byte buffer with given length.
520    SimpleIndirect = 1,
521    /// Complex packeet.
522    ///
523    /// This is a placeholder value - inside the packet the true value in this case would be a
524    /// valid vtable pointer.
525    Complex,
526}
527
528/// Describes packet type.
529///
530/// If the value within this union is less than [`PacketVtblTag::Complex`], then the packet is
531/// simple, and `tag` should be accessed. Meanwhile, any other value implies the packet is a
532/// complex one, and `vtable` should be accessed instead.
533#[derive(Clone, Copy)]
534pub union PacketVtblRef<Perms: PacketPerms> {
535    pub tag: usize,
536    pub vtbl: &'static Perms,
537}
538
539impl<Perms: PacketPerms> core::fmt::Debug for PacketVtblRef<Perms> {
540    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
541        match self.tag() {
542            PacketVtblTag::Complex => core::fmt::Debug::fmt(unsafe { self.vtbl }, f),
543            v => core::fmt::Debug::fmt(&v, f),
544        }
545    }
546}
547
548impl<Perms: PacketPerms> PacketVtblRef<Perms> {
549    pub fn tag(&self) -> PacketVtblTag {
550        match unsafe { self.tag } {
551            0 => PacketVtblTag::SimpleDirect,
552            1 => PacketVtblTag::SimpleIndirect,
553            _ => PacketVtblTag::Complex,
554        }
555    }
556
557    pub fn vtbl(&self) -> Option<&'static Perms> {
558        if self.tag() == PacketVtblTag::Complex {
559            unsafe { Some(self.vtbl) }
560        } else {
561            None
562        }
563    }
564}
565
566/// Main packet header.
567///
568/// This header is at the start of every packet. In practice, all valid packet references may be
569/// opaquable to references to [`Packet`].
570///
571/// This header primarily describes the current state of the packet (how many references in flight,
572/// presence of a waker, etc.), but it also describes functionality of the packet.
573///
574/// ## Simple packets
575///
576/// A packet may be considered "simple", if what follows the header is either data buffer, or
577/// reference to a data buffer. If that is not the case, then the packet is considered "complex".
578///
579/// ## Complex packets
580///
581/// A packet is considered "complex", if behaviors of various low level operations (such as
582/// allocation, data transfer, etc.) are governed by a vtable. This extra level of indirection may
583/// cost some performance overhead, but it allows to build more complex structures, such as lazily
584/// allocated outputs.
585#[repr(C)]
586#[derive(Debug)]
587pub struct Packet<Perms: PacketPerms> {
588    /// If `None`, then we have a len + byte buffer after the header
589    vtbl: PacketVtblRef<Perms>,
590    /// Number of packet fragments currently in-flight (-1), and waker flags.
591    ///
592    /// The waker flags are encoded in the 2 highest bits. The left-most bit is a "start writing"
593    /// bit. The second left-most bit is a "end writing" bit.
594    ///
595    /// Setting of waker should look as follows:
596    ///
597    /// ```ignore
598    /// // Clear the flag bits, because we want the end writing bit be properly set
599    /// rc_and_flags.fetch_and(!(0b11 << 62), Ordering::AcqRel);
600    /// // Load in the start writing bit
601    /// let loaded = rc_and_flags.fetch_or(0b1 << 63, Ordering::AcqRel);
602    ///
603    /// if !(loaded | (0b11 << 62)) == 0 {
604    ///     // no more packets left, we don't need to write anything
605    ///     return false
606    /// }
607    ///
608    /// unsafe {
609    ///     *waker.get() = in_waker;
610    /// }
611    ///
612    /// // Load in the end writing bit.
613    /// let loaded = rc_and_flags.fetch_or(0b1 << 62, Ordering::AcqRel);
614    ///
615    /// if !(loaded | (0b11 << 62)) == 0 {
616    ///     // no more packets left, we wrote uselessly
617    ///     return false
618    /// }
619    ///
620    /// // true indicates the waker was installed and we can go to sleep.
621    /// return true
622    /// ```
623    ///
624    /// Acquiring the waker is done as follows:
625    ///
626    /// ```ignore
627    /// let loaded = rc_and_flags.fetch_sub(1, Ordering::AcqRel);
628    ///
629    /// // we are not the last packet here, do nothing.
630    /// if (loaded & !(0b11 << 62)) != 0 {
631    ///     return false
632    /// }
633    ///
634    /// // if the packet was not fully written yet, then we will do nothing, because the polling
635    /// // thread will catch this and handle it appropriately.
636    /// if loaded >> 62 != 0b11 {
637    ///     return false
638    /// }
639    ///
640    /// unsafe {
641    ///     *waker.get()
642    /// }.wake();
643    ///
644    /// return true
645    /// ```
646    rc_and_waker: RcAndWaker,
647    /// What was the smallest position that resulted in an error.
648    ///
649    /// This value is initialized to !0, and upon each errored packet segment, is minned
650    /// atomically. Upon I/O is complete, this allows the caller to check for the size of the
651    /// contiguous memory region being successfully processed without gaps.
652    error_clamp: AtomicU64,
653    /// Note that this may be raced against so it should not be relied as "the minimum error".
654    min_error: AtomicI32,
655    // We need miri to treat packets magically. Without marking this type as !Unpin, miri would
656    // detect UB in situations where a packet is shared across threads, even though we are not
657    // performing any mutable aliasing.
658    //
659    // See:
660    // https://github.com/RalfJung/rfcs/blob/9881c94c5a9b7b24b12aaa07541c8b25e64ad5ae/text/0000-unsafe-aliased.md
661    _phantom: PhantomPinned,
662    // data afterwards
663}
664
665impl<Perms: PacketPerms> AsRef<Self> for Packet<Perms> {
666    fn as_ref(&self) -> &Self {
667        self
668    }
669}
670
671unsafe impl<Perms: PacketPerms> Send for Packet<Perms> {}
672unsafe impl<Perms: PacketPerms> Sync for Packet<Perms> {}
673
674impl<Perms: PacketPerms> Drop for Packet<Perms> {
675    fn drop(&mut self) {
676        let loaded = self.rc_and_waker.acquire_rc();
677        assert_eq!(loaded, 0, "The packet has in-flight segments.");
678    }
679}
680
681impl<'a, Perms: PacketPerms> Future for &'a Packet<Perms> {
682    type Output = ();
683
684    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
685        let this = Pin::into_inner(self);
686
687        let rc = this.rc_and_waker.write(cx.waker().clone().into());
688
689        if rc == 0 {
690            // Synchronize the thread that last decremented the refcount.
691            // If we don't, we risk a race condition where we drop the packet, while the packet
692            // reference is still being used to take the waker.
693            this.rc_and_waker.wait_finalize();
694
695            // no more packets left, we don't need to write anything
696            return Poll::Ready(());
697        }
698
699        Poll::Pending
700    }
701}
702
703impl<Perms: PacketPerms> Packet<Perms> {
704    /// Current reference count of the packet.
705    pub fn rc(&self) -> usize {
706        (self.rc_and_waker.acquire_rc()) as usize
707    }
708
709    unsafe fn on_output(&self, error: Option<(u64, NonZeroI32)>) -> Option<CWaker> {
710        if let Some((start, error)) = error {
711            if self.error_clamp.fetch_min(start, Ordering::AcqRel) > start {
712                self.min_error.store(error.into(), Ordering::Relaxed);
713            }
714        }
715
716        let (prev, has_waker) = self.rc_and_waker.dec_rc();
717
718        // Do nothing, because we are not the last packet
719        if prev != 1 {
720            return None;
721        }
722
723        let ret = if has_waker {
724            self.rc_and_waker.take()
725        } else {
726            None
727        };
728
729        self.rc_and_waker.finalize();
730
731        ret
732    }
733
734    unsafe fn on_add_to_view(&self) {
735        let rc = self.rc_and_waker.inc_rc();
736        if rc != 0 {
737            self.rc_and_waker.dec_rc();
738            assert_eq!(rc, 0);
739        }
740    }
741
742    /// Resets error state of the packet.
743    ///
744    /// # Safety
745    ///
746    /// This function is safe to call only when all packet operations have concluded.
747    pub unsafe fn reset_err(&self) {
748        self.error_clamp.store(!0u64, Ordering::Release);
749        self.min_error.store(0, Ordering::Release);
750    }
751
752    /// Creates a new `Packet` header.
753    ///
754    /// # Safety
755    ///
756    /// The caller must ensure the resulting header is placed into an appropriate packet structure
757    /// that the provided vtable is meant to use. Failure to do so leads to undefined behavior.
758    pub unsafe fn new_hdr(vtbl: PacketVtblRef<Perms>) -> Self {
759        Packet {
760            vtbl,
761            rc_and_waker: Default::default(),
762            error_clamp: (!0u64).into(),
763            min_error: 0.into(),
764            _phantom: PhantomPinned,
765        }
766    }
767
768    /// Gets pointer to length of a simple packet.
769    ///
770    /// # Safety
771    ///
772    /// `this` must point to a valid packet header for a simple packet. I.e. after the header what
773    /// must proceed is a single usize field representing the length of the following byte data.
774    pub unsafe fn simple_len(this: *const Self) -> *const usize {
775        this.add(1).cast()
776    }
777
778    /// Gets pointer to data of a simple packet.
779    ///
780    /// # Safety
781    ///
782    /// `this` must point to a valid packet header for a simple packet. I.e. after the header what
783    /// must proceed is a single usize field representing the length of the following byte data.
784    /// This function will return a `*const u8`, which may be cast to `*const *const u8` in
785    /// indirect packet scenarios.
786    pub unsafe fn simple_data(this: *const Self) -> *const u8 {
787        this.add(1).cast::<usize>().add(1).cast()
788    }
789
790    /// Gets pointer to data of a simple packet.
791    ///
792    /// # Safety
793    ///
794    /// `this` must point to a valid packet header for a simple packet. I.e. after the header what
795    /// must proceed is a single usize field representing the length of the following byte data.
796    /// This function will return a `*mut u8`, which may be cast to `*const *mut u8` in
797    /// indirect packet scenarios.
798    pub unsafe fn simple_data_mut(this: *const Self) -> *mut u8 {
799        this.add(1).cast::<usize>().add(1).cast_mut().cast()
800    }
801
802    /// Gets slice to the data of the simple packet.
803    ///
804    /// This will return all packet bytes as `MaybeUninit<u8>`. If you want to get only initialized
805    /// bytes (in read scenario), then use
806    /// [`simple_contiguous_slice`](Self::simple_contiguous_slice) function.
807    pub fn simple_slice(&self) -> Option<&[MaybeUninit<u8>]> {
808        if self.vtbl.tag() == PacketVtblTag::Complex {
809            None
810        } else {
811            Some(unsafe {
812                core::slice::from_raw_parts(self.simple_data_ptr().cast(), *Self::simple_len(self))
813            })
814        }
815    }
816
817    /// Gets a mutable slice to the data of the simple packet.
818    ///
819    /// # Safety
820    ///
821    /// Only a single instance of the mutable data must be active at a given time, with no constant
822    /// references.
823    pub unsafe fn simple_slice_mut(&self) -> Option<&mut [MaybeUninit<u8>]> {
824        if self.vtbl.tag() == PacketVtblTag::Complex {
825            None
826        } else {
827            Some(unsafe {
828                core::slice::from_raw_parts_mut(
829                    self.simple_data_ptr().cast_mut().cast(),
830                    *Self::simple_len(self),
831                )
832            })
833        }
834    }
835
836    /// Gets slice to the first segment of contiguously processed bytes.
837    pub fn simple_contiguous_slice(&self) -> Option<&[u8]> {
838        if self.vtbl.tag() == PacketVtblTag::Complex {
839            None
840        } else {
841            Some(unsafe {
842                core::slice::from_raw_parts(
843                    self.simple_data_ptr(),
844                    core::cmp::min(
845                        self.error_clamp.load(Ordering::Acquire) as usize,
846                        *Self::simple_len(self),
847                    ),
848                )
849            })
850        }
851    }
852
853    /// Gets pointer to data of a simple packet.
854    ///
855    /// # Panics
856    ///
857    /// This function will panic if called on a complex packet.
858    pub fn simple_data_ptr(&self) -> *const u8 {
859        match self.vtbl.tag() {
860            PacketVtblTag::SimpleDirect => unsafe { Self::simple_data(self) },
861            PacketVtblTag::SimpleIndirect => unsafe {
862                *Self::simple_data(self).cast::<*const u8>()
863            },
864            PacketVtblTag::Complex => panic!("simple_data_ptr called on complex Packet"),
865        }
866    }
867
868    /// Gets mutable pointer to data of a simple packet.
869    ///
870    /// # Panics
871    ///
872    /// This function will panic if called on a complex packet.
873    pub fn simple_data_ptr_mut(&mut self) -> *mut u8 {
874        match self.vtbl.tag() {
875            PacketVtblTag::SimpleDirect => unsafe { Self::simple_data_mut(self) },
876            PacketVtblTag::SimpleIndirect => unsafe { *Self::simple_data(self).cast::<*mut u8>() },
877            PacketVtblTag::Complex => panic!("simple_data_ptr called on complex Packet"),
878        }
879    }
880
881    /// Gets the error value at the lowest packet offset that was erroneous.
882    pub fn min_error(&self) -> Option<Error> {
883        NonZeroI32::new(self.min_error.load(Ordering::Relaxed)).map(Error::from_int_err)
884    }
885
886    /// Gets the length of the leading contiguous segment, or `!0`.
887    ///
888    /// If there was an error case, this function will return the length of the first contiguous
889    /// segment. However, if the packet encountered no error cases, `!0` will be returned.
890    pub fn error_clamp(&self) -> u64 {
891        self.error_clamp.load(Ordering::Relaxed)
892    }
893
894    /// Returns [`min_error`](Self::min_error) if 0 leading bytes were processed.
895    pub fn err_on_zero(&self) -> Result<(), Error> {
896        if self.error_clamp() > 0 {
897            Ok(())
898        } else {
899            Err(self.min_error().expect("No error when error_clamp is 0"))
900        }
901    }
902
903    /// Returns [`min_erorr`](Self::min_error) if any parts of the packet had errors.
904    pub fn err_any(&self) -> Result<(), Error> {
905        if let Some(err) = self.min_error() {
906            Err(err)
907        } else {
908            Ok(())
909        }
910    }
911}
912
913impl<Perms: PacketPerms> Packet<Perms> {
914    /// Builds a new packet with dynamically sized buffer.
915    ///
916    /// The returned packet will be simple, and wrapped in an arc.
917    pub fn new_buf(len: usize) -> BaseArc<Packet<Perms>> {
918        // TODO: feature gate this
919        use std::alloc::Layout;
920
921        let size = core::mem::size_of::<FullPacket<PhantomData<()>, Perms>>() + len;
922        let align = core::mem::align_of::<FullPacket<PhantomData<()>, Perms>>();
923
924        unsafe extern "C" fn drop_pkt<Perms: PacketPerms>(data: *mut ()) {
925            core::ptr::drop_in_place(data.cast::<Packet<Perms>>())
926        }
927
928        // SAFETY: we are creating a packet that is sufficient for our needed amount of data.
929        // In addition, we do not need a cleanup routine, because, because the only object that
930        // would need freeing is the `CWaker`. The polling implementation ensures that the
931        // `CWaker` is not left without any packets left to process it, and any outputted packets
932        // will trigger the `CWaker`. Any other fields are POD.
933        let packet = unsafe {
934            BaseArc::custom(
935                Layout::from_size_align_unchecked(size, align),
936                Some(drop_pkt::<Perms>),
937            )
938        };
939
940        // SAFETY: we are initializing the packet here, hence the transmute is valid
941        unsafe {
942            // first we need to write the packet data to make it valid
943            (packet.exclusive_ptr().unwrap().as_ptr() as *mut FullPacket<PhantomData<()>, Perms>)
944                .write(FullPacket {
945                    header: Self::new_hdr(PacketVtblRef {
946                        tag: PacketVtblTag::SimpleDirect as _,
947                    }),
948                    data: PackedLenData {
949                        len,
950                        data: PhantomData,
951                    },
952                });
953
954            core::mem::transmute(packet)
955        }
956    }
957}
958
959impl Packet<Read> {
960    /// Creates a new packet filled with data from the given slice.
961    pub fn copy_from_slice(buf: &[u8]) -> BaseArc<Packet<Read>> {
962        let pkt = Self::new_buf(buf.len());
963        unsafe {
964            core::ptr::copy_nonoverlapping(
965                buf.as_ptr(),
966                pkt.simple_data_ptr().cast_mut(),
967                buf.len(),
968            )
969        };
970        pkt
971    }
972}
973
974impl Packet<ReadWrite> {
975    /// Creates a new packet filled with data from the given slice.
976    pub fn copy_from_slice(buf: &[u8]) -> BaseArc<Packet<ReadWrite>> {
977        let pkt = Self::new_buf(buf.len());
978        unsafe {
979            core::ptr::copy_nonoverlapping(
980                buf.as_ptr(),
981                pkt.simple_data_ptr().cast_mut(),
982                buf.len(),
983            )
984        };
985        pkt
986    }
987}
988
989unsafe impl<Perms: PacketPerms> OpaqueStore for BaseArc<Packet<Perms>> {
990    type ConstHdr = Packet<Perms>;
991    type Opaque<'a> = PacketView<'a, Perms> where Self: 'a;
992    type StackReq<'a> = Self where Self: 'a;
993    type HeapReq = Self where Self: 'static;
994
995    fn stack<'a>(self) -> Self::StackReq<'a>
996    where
997        Self: 'a,
998    {
999        self
1000    }
1001
1002    fn heap(self) -> Self::HeapReq
1003    where
1004        Self: 'static,
1005    {
1006        self
1007    }
1008
1009    fn stack_hdr<'a: 'b, 'b>(stack: &'b Self::StackReq<'a>) -> &'b Self::ConstHdr {
1010        stack
1011    }
1012
1013    fn stack_opaque<'a>(stack: &'a Self::StackReq<'a>) -> Self::Opaque<'a> {
1014        // TODO: deal with tags
1015        PacketView::from_arc_ref(stack, 0)
1016    }
1017}
1018
1019unsafe impl<'c, Perms: PacketPerms> OpaqueStore for &'c BaseArc<Packet<Perms>> {
1020    type ConstHdr = Packet<Perms>;
1021    type Opaque<'a> = PacketView<'a, Perms> where Self: 'a;
1022    type StackReq<'a> = Self where Self: 'a;
1023    type HeapReq = BaseArc<Packet<Perms>> where Self: 'static;
1024
1025    fn stack<'a>(self) -> Self::StackReq<'a>
1026    where
1027        Self: 'a,
1028    {
1029        self
1030    }
1031
1032    fn heap(self) -> Self::HeapReq
1033    where
1034        Self: 'static,
1035    {
1036        self.clone()
1037    }
1038
1039    fn stack_hdr<'a: 'b, 'b>(stack: &'b Self::StackReq<'a>) -> &'b Self::ConstHdr
1040    where
1041        Self: 'a,
1042    {
1043        stack
1044    }
1045
1046    fn stack_opaque<'a>(stack: &'a Self::StackReq<'a>) -> Self::Opaque<'a> {
1047        // TODO: deal with tags
1048        PacketView::from_arc_ref(*stack, 0)
1049    }
1050}
1051
1052unsafe impl<T: 'static, Perms: PacketPerms> OpaqueStore for FullPacket<T, Perms> {
1053    type ConstHdr = Packet<Perms>;
1054    type Opaque<'a> = PacketView<'a, Perms> where Self: 'a;
1055
1056    crate::linear_types_switch! {
1057        Standard => {
1058            type StackReq<'a> = BaseArc<Self> where Self: 'a;
1059        }
1060        Linear => {
1061            type StackReq<'a> = Self where Self: 'a;
1062        }
1063    }
1064
1065    type HeapReq = BaseArc<Self> where Self: 'static;
1066
1067    fn stack<'a>(self) -> Self::StackReq<'a>
1068    where
1069        Self: 'a,
1070    {
1071        #[allow(clippy::useless_conversion)]
1072        self.into()
1073    }
1074
1075    fn heap(self) -> Self::HeapReq
1076    where
1077        Self: 'static,
1078    {
1079        self.into()
1080    }
1081
1082    fn stack_hdr<'a: 'b, 'b>(stack: &'b Self::StackReq<'a>) -> &'b Self::ConstHdr {
1083        stack
1084    }
1085
1086    fn stack_opaque<'a>(stack: &'a Self::StackReq<'a>) -> Self::Opaque<'a> {
1087        // TODO: deal with tags
1088        crate::linear_types_switch! {
1089            Standard => {
1090                PacketView::from_arc_ref(
1091                    // SAFETY: we know the header is at the start of the fullpacket struct, therefore
1092                    // reinterpreting the reference is okay.
1093                    unsafe { &*(stack as *const BaseArc<FullPacket<T, Perms>>).cast() },
1094                    0,
1095                )
1096            }
1097            Linear => {
1098                PacketView::from_ref(
1099                    // SAFETY: we know the header is at the start of the fullpacket struct, therefore
1100                    // reinterpreting the reference is okay.
1101                    unsafe { &*(stack as *const FullPacket<T, Perms>).cast() },
1102                    0,
1103                )
1104            }
1105        }
1106    }
1107}
1108
1109unsafe impl<T: 'static, Perms: PacketPerms> OpaqueStore for BaseArc<FullPacket<T, Perms>> {
1110    type ConstHdr = Packet<Perms>;
1111    type Opaque<'a> = PacketView<'a, Perms> where Self: 'a;
1112    type StackReq<'a> = Self where Self: 'a;
1113    type HeapReq = Self where Self: 'static;
1114
1115    fn stack<'a>(self) -> Self::StackReq<'a>
1116    where
1117        Self: 'a,
1118    {
1119        self
1120    }
1121
1122    fn heap(self) -> Self::HeapReq
1123    where
1124        Self: 'static,
1125    {
1126        self
1127    }
1128
1129    fn stack_hdr<'a: 'b, 'b>(stack: &'b Self::StackReq<'a>) -> &'b Self::ConstHdr {
1130        stack
1131    }
1132
1133    fn stack_opaque<'a>(stack: &'a Self::StackReq<'a>) -> Self::Opaque<'a> {
1134        // TODO: deal with tags
1135        PacketView::from_arc_ref(
1136            // SAFETY: we know the header is at the start of the fullpacket struct, therefore
1137            // reinterpreting the reference is okay.
1138            unsafe { &*(stack as *const BaseArc<FullPacket<T, Perms>>).cast() },
1139            0,
1140        )
1141    }
1142}
1143
1144unsafe impl<Perms: PacketPerms> OpaqueStore for crate::linear_types_switch! { Standard => { RefPacket::<'static, Perms> } Linear => { RefPacket::<'_, Perms> } } {
1145    type ConstHdr = Packet<Perms>;
1146    type Opaque<'a> = PacketView<'a, Perms> where Self: 'a;
1147    type StackReq<'a> = Self where Self: 'a;
1148    // TODO: do we need an arc here?
1149    type HeapReq = BaseArc<Self> where Self: 'static;
1150
1151    fn stack<'a>(self) -> Self::StackReq<'a>
1152    where
1153        Self: 'a,
1154    {
1155        self
1156    }
1157
1158    fn heap(self) -> Self::HeapReq
1159    where
1160        Self: 'static,
1161    {
1162        self.into()
1163    }
1164
1165    fn stack_hdr<'a: 'b, 'b>(stack: &'b Self::StackReq<'a>) -> &'b Self::ConstHdr
1166    where
1167        Self: 'a,
1168    {
1169        stack
1170    }
1171
1172    fn stack_opaque<'a>(stack: &'a Self::StackReq<'a>) -> Self::Opaque<'a> {
1173        // TODO: deal with tags
1174        PacketView::from_ref(
1175            // SAFETY: we know the header is at the start of the fullpacket struct, therefore
1176            // reinterpreting the reference is okay.
1177            unsafe { &*(stack as *const Self).cast() },
1178            0,
1179        )
1180    }
1181}
1182
1183unsafe impl<Perms: PacketPerms> OpaqueStore for VecPacket<Perms> {
1184    type ConstHdr = Packet<Perms>;
1185    type Opaque<'a> = PacketView<'a, Perms> where Self: 'a;
1186    type StackReq<'a> = Self where Self: 'a;
1187    type HeapReq = BaseArc<Self> where Self: 'static;
1188
1189    fn stack<'a>(self) -> Self::StackReq<'a>
1190    where
1191        Self: 'a,
1192    {
1193        self
1194    }
1195
1196    fn heap(self) -> Self::HeapReq
1197    where
1198        Self: 'static,
1199    {
1200        self.into()
1201    }
1202
1203    fn stack_hdr<'a: 'b, 'b>(stack: &'b Self::StackReq<'a>) -> &'b Self::ConstHdr
1204    where
1205        Self: 'a,
1206    {
1207        stack
1208    }
1209
1210    fn stack_opaque<'a>(stack: &'a Self::StackReq<'a>) -> Self::Opaque<'a> {
1211        // TODO: deal with tags
1212        PacketView::from_ref(
1213            // SAFETY: we know the header is at the start of the fullpacket struct, therefore
1214            // reinterpreting the reference is okay.
1215            unsafe { &*(stack as *const Self).cast() },
1216            0,
1217        )
1218    }
1219}
1220
1221unsafe impl<Perms: PacketPerms> OpaqueStore for OwnedPacket<Perms> {
1222    type ConstHdr = Packet<Perms>;
1223    type Opaque<'a> = PacketView<'a, Perms> where Self: 'a;
1224    type StackReq<'a> = Self where Self: 'a;
1225    type HeapReq = BaseArc<Self> where Self: 'static;
1226
1227    fn stack<'a>(self) -> Self::StackReq<'a>
1228    where
1229        Self: 'a,
1230    {
1231        self
1232    }
1233
1234    fn heap(self) -> Self::HeapReq
1235    where
1236        Self: 'static,
1237    {
1238        self.into()
1239    }
1240
1241    fn stack_hdr<'a: 'b, 'b>(stack: &'b Self::StackReq<'a>) -> &'b Self::ConstHdr
1242    where
1243        Self: 'a,
1244    {
1245        stack
1246    }
1247
1248    fn stack_opaque<'a>(stack: &'a Self::StackReq<'a>) -> Self::Opaque<'a> {
1249        // TODO: deal with tags
1250        PacketView::from_ref(
1251            // SAFETY: we know the header is at the start of the fullpacket struct, therefore
1252            // reinterpreting the reference is okay.
1253            unsafe { &*(stack as *const Self).cast() },
1254            0,
1255        )
1256    }
1257}
1258
1259impl<T: AsRef<Packet<Perms>>, Perms: PacketPerms> From<BaseArc<T>> for PacketView<'static, Perms> {
1260    fn from(pkt: BaseArc<T>) -> Self {
1261        // TODO: deal with tags
1262        Self::from_arc(pkt.transpose().into_base().unwrap(), 0)
1263    }
1264}
1265
1266pub trait PacketStore<'a, Perms: PacketPerms>:
1267    'a + OpaqueStore<Opaque<'a> = PacketView<'a, Perms>, ConstHdr = Packet<Perms>>
1268{
1269}
1270
1271impl<'a, Perms: PacketPerms, T> PacketStore<'a, Perms> for T where
1272    T: 'a + OpaqueStore<Opaque<'a> = PacketView<'a, Perms>, ConstHdr = Packet<Perms>>
1273{
1274}
1275
1276trait AnyBytes {}
1277
1278impl AnyBytes for u8 {}
1279impl AnyBytes for MaybeUninit<u8> {}
1280
1281/// Object convertable into a packet.
1282///
1283/// This is an extension of [`OpaqueStore`], where an object may have additional synchronization
1284/// steps taken upon the end of I/O processing.
1285pub trait IntoPacket<'a, Perms: PacketPerms> {
1286    type Target: PacketStore<'a, Perms>;
1287    type SyncHandle;
1288
1289    /// Converts `Self` into a packet.
1290    fn into_packet(self) -> (Self::Target, Self::SyncHandle);
1291
1292    /// Sync data back from the packet.
1293    ///
1294    /// The blanket implementation of this method is a no-op, however, some types may need to have
1295    /// the I/O results be synchronized back from the packets that were sent out (this is due to
1296    /// necessary heap indirections needed to appease the borrow checker). Therefore, any I/O
1297    /// abstraction should call this function before returning data back to the user.
1298    fn sync_back(_hdr: &<Self::Target as OpaqueStore>::ConstHdr, _handle: Self::SyncHandle) {}
1299}
1300
1301impl<'a, T: PacketStore<'a, Perms>, Perms: PacketPerms> IntoPacket<'a, Perms> for T {
1302    type Target = Self;
1303    type SyncHandle = ();
1304
1305    fn into_packet(self) -> (Self, ()) {
1306        (self, ())
1307    }
1308}
1309
1310impl<'a, 'b: 'a> IntoPacket<'a, Read> for &'b [u8] {
1311    type Target = BaseArc<Packet<Read>>;
1312    type SyncHandle = ();
1313
1314    fn into_packet(self) -> (Self::Target, ()) {
1315        (Packet::<Read>::copy_from_slice(self), ())
1316    }
1317}
1318
1319impl<'a, 'b: 'a, T: AnyBytes> IntoPacket<'a, Write> for &'b mut [T] {
1320    crate::linear_types_switch! {
1321        Standard => {
1322            type SyncHandle = Self;
1323            type Target = BaseArc<Packet<Write>>;
1324
1325            fn into_packet(self) -> (Self::Target, Self) {
1326                (Packet::<Write>::new_buf(self.len()), self)
1327            }
1328
1329            fn sync_back(hdr: &<Self::Target as OpaqueStore>::ConstHdr, handle: Self::SyncHandle) {
1330                unsafe {
1331                    core::ptr::copy_nonoverlapping(
1332                        hdr.simple_data_ptr(),
1333                        handle.as_mut_ptr().cast(),
1334                        core::cmp::min(handle.len(), hdr.error_clamp() as usize),
1335                    );
1336                }
1337            }
1338        }
1339        Linear => {
1340            type SyncHandle = ();
1341            type Target = RefPacket<'a, Write>;
1342
1343            fn into_packet(self) -> (Self::Target, Self::SyncHandle) {
1344                (self.into(), ())
1345            }
1346        }
1347    }
1348}
1349
1350impl<'a, 'b: 'a> IntoPacket<'a, ReadWrite> for &'b mut [u8] {
1351    type Target = BaseArc<Packet<ReadWrite>>;
1352    type SyncHandle = Self;
1353
1354    fn into_packet(self) -> (Self::Target, Self) {
1355        (Packet::<ReadWrite>::copy_from_slice(&*self), self)
1356    }
1357
1358    fn sync_back(hdr: &<Self::Target as OpaqueStore>::ConstHdr, handle: Self::SyncHandle) {
1359        unsafe {
1360            core::ptr::copy_nonoverlapping(
1361                hdr.simple_data_ptr(),
1362                handle.as_mut_ptr(),
1363                core::cmp::min(handle.len(), hdr.error_clamp() as usize),
1364            );
1365        }
1366    }
1367}
1368
1369use cglue::prelude::v1::*;
1370
1371/// Consumes `BoundPacketObj` and returns allocated version of it.
1372///
1373/// If the `alignment` constraint is met, this function consumes `packet`, fills `out_aligned`, and
1374/// returns `true`. If the constraint cannot be satisfied, `false` is returned.
1375///
1376/// If this function returns `false`, the caller should consider allocating an intermediate buffer,
1377/// and invoking [`TransferDataFn`](TransferDataFn) instead.
1378///
1379/// # Remarks
1380///
1381/// The caller should prefer [`TransferDataFn`](TransferDataFn), as opposed to this function, if
1382/// they can access their internal buffer with zero allocations.
1383pub type AllocFn<T> = for<'a> unsafe extern "C" fn(
1384    packet: &'a mut ManuallyDrop<BoundPacketView<T>>,
1385    alignment: usize,
1386    out_alloced: &'a mut MaybeUninit<<T as PacketPerms>::Alloced>,
1387) -> bool;
1388
1389/// Transfers data between a `BoundPacketObj` and reverse data type.
1390///
1391/// This function consumes `packet`, and transfers data between it and `input`. The caller must
1392/// ensure that `input` bounds are sufficient for the size of the packet.
1393///
1394/// TODO: decide on whether this function can fail.
1395pub type TransferDataFn<T> = for<'a> unsafe extern "C" fn(
1396    packet: &'a mut PacketView<T>,
1397    input: <T as PacketPerms>::ReverseDataType,
1398);
1399
1400/// Retrieves total length of the packet.
1401pub type LenFn<T> = unsafe extern "C" fn(packet: &Packet<T>) -> u64;
1402
1403/// Packet that may be alloced.
1404///
1405/// `Ok` represents alloced, `Err` represents unalloced (bound packet view).
1406pub type MaybeAlloced<T> = Result<<T as PacketPerms>::Alloced, BoundPacketView<T>>;
1407
1408/// Represents the state where packet is either alloced or transferred.
1409///
1410/// `Ok` represents alloced, `Err` represents transferred.
1411pub type AllocedOrTransferred<T> = Result<<T as PacketPerms>::Alloced, TransferredPacket<T>>;
1412
1413/// Describes type constraints on packet operations.
1414pub trait PacketPerms: 'static + core::fmt::Debug + Clone + Copy {
1415    type DataType: Clone + Copy + core::fmt::Debug;
1416    type ReverseDataType: Clone + Copy + core::fmt::Debug;
1417    type Alloced: AllocatedPacket<Perms = Self>;
1418
1419    /// Returns vtable function for getting packet length.
1420    fn len_fn(&self) -> LenFn<Self>;
1421
1422    /// Returns packet length.
1423    fn len(packet: &Packet<Self>) -> u64 {
1424        if let Some(vtbl) = packet.vtbl.vtbl() {
1425            unsafe { (vtbl.len_fn())(packet) }
1426        } else {
1427            unsafe { *Packet::simple_len(packet) as u64 }
1428        }
1429    }
1430
1431    /// Returns vtable function for getting
1432    fn alloc_fn(&self) -> AllocFn<Self>;
1433
1434    /// A buffer for simple packet.
1435    ///
1436    /// # Safety
1437    ///
1438    /// The given packet view must be simple.
1439    unsafe fn alloced_simple(packet: BoundPacketView<Self>) -> Self::Alloced;
1440
1441    /// Tries allocating packet buffer with given alignment.
1442    fn try_alloc(packet: BoundPacketView<Self>, alignment: usize) -> MaybeAlloced<Self> {
1443        if let Some(vtbl) = packet.view.pkt().vtbl.vtbl() {
1444            let mut view = ManuallyDrop::new(packet);
1445            let mut out = MaybeUninit::uninit();
1446            let ret = unsafe { (vtbl.alloc_fn())(&mut view, alignment, &mut out) };
1447            if ret {
1448                Ok(unsafe { out.assume_init() })
1449            } else {
1450                Err(ManuallyDrop::into_inner(view))
1451            }
1452        } else {
1453            let data = unsafe {
1454                packet
1455                    .view
1456                    .pkt()
1457                    .simple_data_ptr()
1458                    .add(packet.view.start as usize)
1459            };
1460            if data.align_offset(alignment) == 0 {
1461                Ok(unsafe { Self::alloced_simple(packet) })
1462            } else {
1463                Err(packet)
1464            }
1465        }
1466    }
1467
1468    /// Returns vtable function for transfering data.
1469    fn transfer_data_fn(&self) -> TransferDataFn<Self>;
1470
1471    /// Transfers data between a simple packet and a byte buffer.
1472    ///
1473    /// # Safety
1474    ///
1475    /// The provided input buffer must be valid and cover the length of the packet view.
1476    ///
1477    /// In addition, the provided packet must be simple.
1478    unsafe fn transfer_data_simple(packet: &mut PacketView<Self>, input: Self::ReverseDataType);
1479
1480    /// Transfers data between a packet and a byte buffer.
1481    ///
1482    /// # Safety
1483    ///
1484    /// The provided input buffer must be valid and cover the length of the packet view.
1485    unsafe fn transfer_data(packet: &mut PacketView<Self>, input: Self::ReverseDataType) {
1486        if let Some(vtbl) = packet.pkt().vtbl.vtbl() {
1487            (vtbl.transfer_data_fn())(packet, input)
1488        } else {
1489            Self::transfer_data_simple(packet, input)
1490        }
1491    }
1492}
1493
1494/// ReadWrite permissions.
1495///
1496/// The behavior of ReadWrite is not well defined, but for simple buffers, it implies the swapping
1497/// of data between the 2 buffers.
1498#[repr(C)]
1499#[derive(Clone, Copy)]
1500pub struct ReadWrite {
1501    pub len: unsafe extern "C" fn(&Packet<Self>) -> u64,
1502    pub get_mut: for<'a> unsafe extern "C" fn(
1503        &mut ManuallyDrop<BoundPacketView<Self>>,
1504        usize,
1505        &mut MaybeUninit<ReadWritePacketObj>,
1506    ) -> bool,
1507    pub transfer_data: for<'a, 'b> unsafe extern "C" fn(&'a mut PacketView<Self>, *mut ()),
1508}
1509
1510impl core::fmt::Debug for ReadWrite {
1511    fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result {
1512        write!(fmt, "{:?}", self.get_mut as *const ())
1513    }
1514}
1515
1516impl PacketPerms for ReadWrite {
1517    type DataType = *mut ();
1518    type ReverseDataType = *mut ();
1519    type Alloced = ReadWritePacketObj;
1520
1521    fn len_fn(&self) -> LenFn<Self> {
1522        self.len
1523    }
1524
1525    fn alloc_fn(&self) -> AllocFn<Self> {
1526        self.get_mut
1527    }
1528
1529    fn transfer_data_fn(&self) -> TransferDataFn<Self> {
1530        self.transfer_data
1531    }
1532
1533    unsafe fn alloced_simple(packet: BoundPacketView<Self>) -> Self::Alloced {
1534        let data = packet.view.pkt().simple_data_ptr().cast_mut();
1535        ReadWritePacketObj {
1536            alloced_packet: unsafe { data.add(packet.view.start as usize) },
1537            buffer: packet,
1538        }
1539    }
1540
1541    unsafe fn transfer_data_simple(view: &mut PacketView<Self>, data: *mut ()) {
1542        let dst = Packet::simple_data_ptr_mut(view.pkt_mut());
1543        // TODO: does this operation even make sense?
1544        core::ptr::swap_nonoverlapping(
1545            data.cast(),
1546            dst.add(view.start as usize),
1547            view.len() as usize,
1548        );
1549    }
1550}
1551
1552/// Write permissions.
1553///
1554/// This implies the packet is writeable and may not have valid data beforehand.
1555#[repr(C)]
1556#[derive(Clone, Copy)]
1557pub struct Write {
1558    pub len: unsafe extern "C" fn(&Packet<Self>) -> u64,
1559    pub get_mut: for<'a> unsafe extern "C" fn(
1560        &mut ManuallyDrop<BoundPacketView<Self>>,
1561        usize,
1562        &mut MaybeUninit<WritePacketObj>,
1563    ) -> bool,
1564    pub transfer_data: for<'a, 'b> unsafe extern "C" fn(&'a mut PacketView<Self>, *const ()),
1565}
1566
1567impl core::fmt::Debug for Write {
1568    fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result {
1569        write!(fmt, "{:?}", self.get_mut as *const ())
1570    }
1571}
1572
1573impl PacketPerms for Write {
1574    type DataType = *mut ();
1575    type ReverseDataType = *const ();
1576    type Alloced = WritePacketObj;
1577
1578    fn len_fn(&self) -> LenFn<Self> {
1579        self.len
1580    }
1581
1582    fn alloc_fn(&self) -> AllocFn<Self> {
1583        self.get_mut
1584    }
1585
1586    fn transfer_data_fn(&self) -> TransferDataFn<Self> {
1587        self.transfer_data
1588    }
1589
1590    unsafe fn alloced_simple(packet: BoundPacketView<Self>) -> Self::Alloced {
1591        let data = packet
1592            .view
1593            .pkt()
1594            .simple_data_ptr()
1595            .cast_mut()
1596            .cast::<MaybeUninit<u8>>();
1597        WritePacketObj {
1598            alloced_packet: unsafe { data.add(packet.view.start as usize) },
1599            buffer: packet,
1600        }
1601    }
1602
1603    unsafe fn transfer_data_simple(view: &mut PacketView<Self>, data: *const ()) {
1604        let dst = Packet::simple_data_ptr_mut(view.pkt_mut());
1605        core::ptr::copy(
1606            data.cast(),
1607            dst.add(view.start as usize),
1608            view.len() as usize,
1609        );
1610    }
1611}
1612
1613/// Read permissions.
1614///
1615/// This implies this packet contains valid data and it can be read.
1616#[repr(C)]
1617#[derive(Clone, Copy)]
1618pub struct Read {
1619    pub len: unsafe extern "C" fn(&Packet<Self>) -> u64,
1620    pub get: unsafe extern "C" fn(
1621        &mut ManuallyDrop<BoundPacketView<Self>>,
1622        usize,
1623        &mut MaybeUninit<ReadPacketObj>,
1624    ) -> bool,
1625    pub transfer_data: for<'a> unsafe extern "C" fn(&'a mut PacketView<Self>, *mut ()),
1626}
1627
1628impl core::fmt::Debug for Read {
1629    fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result {
1630        write!(fmt, "{:?}", self.get as *const ())
1631    }
1632}
1633
1634impl PacketPerms for Read {
1635    type DataType = *const ();
1636    type ReverseDataType = *mut ();
1637    type Alloced = ReadPacketObj;
1638
1639    fn len_fn(&self) -> LenFn<Self> {
1640        self.len
1641    }
1642
1643    fn alloc_fn(&self) -> AllocFn<Self> {
1644        self.get
1645    }
1646
1647    fn transfer_data_fn(&self) -> TransferDataFn<Self> {
1648        self.transfer_data
1649    }
1650
1651    unsafe fn alloced_simple(packet: BoundPacketView<Self>) -> Self::Alloced {
1652        let data = packet.view.pkt().simple_data_ptr().cast::<u8>();
1653        ReadPacketObj {
1654            alloced_packet: unsafe { data.add(packet.view.start as usize) },
1655            buffer: packet,
1656        }
1657    }
1658
1659    unsafe fn transfer_data_simple(view: &mut PacketView<Self>, data: *mut ()) {
1660        let src = view.pkt().simple_data_ptr();
1661        core::ptr::copy(
1662            src,
1663            data.cast::<u8>().add(view.start as usize),
1664            view.len() as usize,
1665        );
1666    }
1667}
1668
1669/// Objects that can be split.
1670///
1671/// This trait enables splitting objects into non-overlapping parts.
1672pub trait Splittable<T: Default + PartialEq>: Sized {
1673    /// Splits an object at given position.
1674    ///
1675    /// # Panics
1676    ///
1677    /// This function may panic if len is outside the bounds of the given object.
1678    fn split_at(self, len: T) -> (Self, Self);
1679    fn len(&self) -> T;
1680    fn is_empty(&self) -> bool {
1681        self.len() == Default::default()
1682    }
1683}
1684
1685impl<T: Default + PartialEq, A: Splittable<T>, B: Splittable<T>> Splittable<T> for Result<A, B> {
1686    fn split_at(self, len: T) -> (Self, Self) {
1687        match self {
1688            Ok(v) => {
1689                let (a, b) = v.split_at(len);
1690                (Ok(a), Ok(b))
1691            }
1692            Err(v) => {
1693                let (a, b) = v.split_at(len);
1694                (Err(a), Err(b))
1695            }
1696        }
1697    }
1698
1699    fn len(&self) -> T {
1700        match self {
1701            Ok(v) => v.len(),
1702            Err(v) => v.len(),
1703        }
1704    }
1705}
1706
1707/// Object that can be returned with an error.
1708///
1709/// This is meant for all types of packets - allow backend to easily return them to the user with
1710/// an appropriate error condition attached.
1711pub trait Errorable: Sized {
1712    fn error(self, err: Error);
1713}
1714
1715impl<A: Errorable, B: Errorable> Errorable for Result<A, B> {
1716    fn error(self, err: Error) {
1717        match self {
1718            Ok(v) => v.error(err),
1719            Err(v) => v.error(err),
1720        }
1721    }
1722}
1723
1724/// Packet which has been allocated.
1725///
1726/// Allocated packets expose direct access to the underlying buffer.
1727pub trait AllocatedPacket: Splittable<u64> + Errorable {
1728    type Perms: PacketPerms;
1729    type Pointer: Copy;
1730
1731    fn as_ptr(&self) -> Self::Pointer;
1732}
1733
1734/// Represents a simple allocated packet with write permissions.
1735#[repr(C)]
1736pub struct ReadWritePacketObj {
1737    alloced_packet: *mut u8,
1738    buffer: BoundPacketView<ReadWrite>,
1739}
1740
1741impl Splittable<u64> for ReadWritePacketObj {
1742    fn split_at(self, len: u64) -> (Self, Self) {
1743        let (b1, b2) = self.buffer.split_at(len);
1744
1745        (
1746            Self {
1747                alloced_packet: self.alloced_packet,
1748                buffer: b1,
1749            },
1750            Self {
1751                alloced_packet: unsafe { self.alloced_packet.add(len as usize) },
1752                buffer: b2,
1753            },
1754        )
1755    }
1756
1757    fn len(&self) -> u64 {
1758        self.buffer.view.len()
1759    }
1760}
1761
1762impl Errorable for ReadWritePacketObj {
1763    fn error(self, err: Error) {
1764        self.buffer.error(err)
1765    }
1766}
1767
1768impl AllocatedPacket for ReadWritePacketObj {
1769    type Perms = ReadWrite;
1770    type Pointer = *mut u8;
1771
1772    fn as_ptr(&self) -> Self::Pointer {
1773        self.alloced_packet
1774    }
1775}
1776
1777unsafe impl Send for ReadWritePacketObj {}
1778unsafe impl Sync for ReadWritePacketObj {}
1779
1780impl core::ops::Deref for ReadWritePacketObj {
1781    type Target = [u8];
1782
1783    fn deref(&self) -> &Self::Target {
1784        unsafe { core::slice::from_raw_parts(self.alloced_packet, self.buffer.view.len() as usize) }
1785    }
1786}
1787
1788impl core::ops::DerefMut for ReadWritePacketObj {
1789    fn deref_mut(&mut self) -> &mut Self::Target {
1790        unsafe {
1791            core::slice::from_raw_parts_mut(self.alloced_packet, self.buffer.view.len() as usize)
1792        }
1793    }
1794}
1795
1796/// Represents a simple allocated packet with write permissions.
1797///
1798/// The data inside may not be initialized, therefore, this packet should only be written to.
1799#[repr(C)]
1800pub struct WritePacketObj {
1801    alloced_packet: *mut MaybeUninit<u8>,
1802    buffer: BoundPacketView<Write>,
1803}
1804
1805impl Splittable<u64> for WritePacketObj {
1806    fn split_at(self, len: u64) -> (Self, Self) {
1807        let (b1, b2) = self.buffer.split_at(len);
1808
1809        (
1810            Self {
1811                alloced_packet: self.alloced_packet,
1812                buffer: b1,
1813            },
1814            Self {
1815                alloced_packet: unsafe { self.alloced_packet.add(len as usize) },
1816                buffer: b2,
1817            },
1818        )
1819    }
1820
1821    fn len(&self) -> u64 {
1822        self.buffer.view.len()
1823    }
1824}
1825
1826impl Errorable for WritePacketObj {
1827    fn error(self, err: Error) {
1828        self.buffer.error(err)
1829    }
1830}
1831
1832impl AllocatedPacket for WritePacketObj {
1833    type Perms = Write;
1834    type Pointer = *mut MaybeUninit<u8>;
1835
1836    fn as_ptr(&self) -> Self::Pointer {
1837        self.alloced_packet
1838    }
1839}
1840
1841unsafe impl Send for WritePacketObj {}
1842unsafe impl Sync for WritePacketObj {}
1843
1844impl core::ops::Deref for WritePacketObj {
1845    type Target = [MaybeUninit<u8>];
1846
1847    fn deref(&self) -> &Self::Target {
1848        unsafe { core::slice::from_raw_parts(self.alloced_packet, self.buffer.view.len() as usize) }
1849    }
1850}
1851
1852impl core::ops::DerefMut for WritePacketObj {
1853    fn deref_mut(&mut self) -> &mut Self::Target {
1854        unsafe {
1855            core::slice::from_raw_parts_mut(self.alloced_packet, self.buffer.view.len() as usize)
1856        }
1857    }
1858}
1859
1860/// Represents a simple allocated packet with read permissions.
1861#[repr(C)]
1862pub struct ReadPacketObj {
1863    alloced_packet: *const u8,
1864    buffer: BoundPacketView<Read>,
1865}
1866
1867impl Splittable<u64> for ReadPacketObj {
1868    fn split_at(self, len: u64) -> (Self, Self) {
1869        let (b1, b2) = self.buffer.split_at(len);
1870
1871        (
1872            Self {
1873                alloced_packet: self.alloced_packet,
1874                buffer: b1,
1875            },
1876            Self {
1877                alloced_packet: unsafe { self.alloced_packet.add(len as usize) },
1878                buffer: b2,
1879            },
1880        )
1881    }
1882
1883    fn len(&self) -> u64 {
1884        self.buffer.view.len()
1885    }
1886}
1887
1888impl Errorable for ReadPacketObj {
1889    fn error(self, err: Error) {
1890        self.buffer.error(err)
1891    }
1892}
1893
1894impl AllocatedPacket for ReadPacketObj {
1895    type Perms = Read;
1896    type Pointer = *const u8;
1897
1898    fn as_ptr(&self) -> Self::Pointer {
1899        self.alloced_packet
1900    }
1901}
1902
1903unsafe impl Send for ReadPacketObj {}
1904unsafe impl Sync for ReadPacketObj {}
1905
1906impl core::ops::Deref for ReadPacketObj {
1907    type Target = [u8];
1908
1909    fn deref(&self) -> &Self::Target {
1910        unsafe { core::slice::from_raw_parts(self.alloced_packet, self.buffer.view.len() as usize) }
1911    }
1912}
1913
1914/// Represents the state of the packet that has already had data transferred to.
1915///
1916/// This struct is marked as `must_use`, because the intention is for the backend to be explicit
1917/// about when the packet is dropped and released to the user. Since bound packets may call a
1918/// function that generates more I/O requests, it is important to drop the packet outside critical
1919/// sections.
1920#[repr(transparent)]
1921#[must_use = "please handle point of drop intentionally"]
1922pub struct TransferredPacket<T: PacketPerms>(BoundPacketView<T>);
1923
1924impl<T: PacketPerms> Splittable<u64> for TransferredPacket<T> {
1925    fn split_at(self, len: u64) -> (Self, Self) {
1926        let (b1, b2) = self.0.split_at(len);
1927
1928        (Self(b1), Self(b2))
1929    }
1930
1931    fn len(&self) -> u64 {
1932        self.0.view.len()
1933    }
1934}
1935
1936impl<T: PacketPerms> Errorable for TransferredPacket<T> {
1937    fn error(self, err: Error) {
1938        self.0.error(err)
1939    }
1940}
1941
1942/// Standard packet variations.
1943///
1944/// This is a small helper used to enable easy storage of different packet types in one place.
1945pub enum StandardPktVariations<Perms: PacketPerms> {
1946    BoundPacketView(BoundPacketView<Perms>),
1947    Alloced(<Perms as PacketPerms>::Alloced),
1948    TransferredPacket(TransferredPacket<Perms>),
1949}
1950
1951impl<P: AllocatedPacket<Perms = PP>, PP: PacketPerms<Alloced = P>> From<P>
1952    for StandardPktVariations<P::Perms>
1953{
1954    fn from(p: P) -> Self {
1955        Self::Alloced(p)
1956    }
1957}
1958
1959impl<Perms: PacketPerms> From<BoundPacketView<Perms>> for StandardPktVariations<Perms> {
1960    fn from(p: BoundPacketView<Perms>) -> Self {
1961        Self::BoundPacketView(p)
1962    }
1963}
1964
1965impl<Perms: PacketPerms> From<MaybeAlloced<Perms>> for StandardPktVariations<Perms> {
1966    fn from(p: MaybeAlloced<Perms>) -> Self {
1967        match p {
1968            Ok(p) => Self::Alloced(p),
1969            Err(p) => Self::BoundPacketView(p),
1970        }
1971    }
1972}
1973
1974impl<Perms: PacketPerms> From<AllocedOrTransferred<Perms>> for StandardPktVariations<Perms> {
1975    fn from(p: AllocedOrTransferred<Perms>) -> Self {
1976        match p {
1977            Ok(p) => Self::Alloced(p),
1978            Err(p) => Self::TransferredPacket(p),
1979        }
1980    }
1981}
1982
1983impl<Perms: PacketPerms> From<TransferredPacket<Perms>> for StandardPktVariations<Perms> {
1984    fn from(p: TransferredPacket<Perms>) -> Self {
1985        Self::TransferredPacket(p)
1986    }
1987}
1988
1989impl<Perms: PacketPerms> Errorable for StandardPktVariations<Perms> {
1990    fn error(self, err: Error) {
1991        match self {
1992            Self::BoundPacketView(p) => p.error(err),
1993            Self::Alloced(p) => p.error(err),
1994            Self::TransferredPacket(p) => p.error(err),
1995        }
1996    }
1997}
1998
1999macro_rules! packet_combos {
2000    ($name:ident, $($perms:ident),*) => {
2001        /// Packet combinations.
2002        ///
2003        /// This is a small helper used to enable easy storage of different packet types in one place.
2004        pub enum $name {
2005            $($perms(StandardPktVariations<$perms>)),*
2006        }
2007
2008        impl<P: AllocatedPacket<Perms = PP>, PP: PacketPerms<Alloced = P>> From<P> for $name where StandardPktVariations<PP>: Into<AnyPacket> {
2009            fn from(p: P) -> Self {
2010                StandardPktVariations::<PP>::from(p).into()
2011            }
2012        }
2013
2014        $(
2015            impl From<StandardPktVariations<$perms>> for $name {
2016                fn from(p: StandardPktVariations<$perms>) -> Self {
2017                    Self::$perms(p)
2018                }
2019            }
2020
2021            impl From<BoundPacketView<$perms>> for $name {
2022                fn from(p: BoundPacketView<$perms>) -> Self {
2023                    Self::$perms(p.into())
2024                }
2025            }
2026
2027            impl From<MaybeAlloced<$perms>> for $name {
2028                fn from(p: MaybeAlloced<$perms>) -> Self {
2029                    Self::$perms(p.into())
2030                }
2031            }
2032
2033            impl From<AllocedOrTransferred<$perms>> for $name {
2034                fn from(p: AllocedOrTransferred<$perms>) -> Self {
2035                    Self::$perms(p.into())
2036                }
2037            }
2038
2039            impl From<TransferredPacket<$perms>> for $name {
2040                fn from(p: TransferredPacket<$perms>) -> Self {
2041                    Self::$perms(p.into())
2042                }
2043            }
2044        )*
2045
2046        impl Errorable for $name {
2047            fn error(self, err: Error) {
2048                match self {
2049                    $(Self::$perms(p) => p.error(err),)*
2050                }
2051            }
2052        }
2053    }
2054}
2055
2056packet_combos!(AnyPacket, Read, Write, ReadWrite);
2057
2058/*macro_rules! downgrade_packet {
2059    ($from:expr, $to:expr) => {
2060        impl<'a> From<Packet<'a, { $from }>> for Packet<'a, { $to }> {
2061            fn from(
2062                Packet {
2063                    data,
2064                    start,
2065                    end,
2066                    _phantom,
2067                }: Packet<'a, { $from }>,
2068            ) -> Self {
2069                Self {
2070                    data,
2071                    start,
2072                    end,
2073                    _phantom,
2074                }
2075            }
2076        }
2077
2078        impl<'a> AsRef<Packet<'a, { $to }>> for Packet<'a, { $from }> {
2079            fn as_ref(&self) -> &Packet<'a, { $to }> {
2080                unsafe { &*(self as *const Self as *const _) }
2081            }
2082        }
2083
2084        impl<'a> AsMut<Packet<'a, { $to }>> for Packet<'a, { $from }> {
2085            fn as_mut(&mut self) -> &mut Packet<'a, { $to }> {
2086                unsafe { &mut *(self as *mut Self as *mut _) }
2087            }
2088        }
2089    };
2090}*/
2091
2092/*downgrade_packet!(perms::READ, perms::NONE);
2093downgrade_packet!(perms::WRITE, perms::NONE);
2094downgrade_packet!(perms::READ_WRITE, perms::NONE);
2095downgrade_packet!(perms::READ_WRITE, perms::READ);
2096downgrade_packet!(perms::READ_WRITE, perms::WRITE);
2097*/
2098/*impl<'a> Packet<'a, {perms::READ}> {
2099
2100}*/
2101
2102/// Allows to intercept a packet between its origin packet ID.
2103///
2104/// `ReboundPacket` allows a packet to be read or written, before returning its result to the
2105/// origin. The packet segments which have finished their core I/O operation can be re-split into
2106/// smaller parts and returned to the origin with partial successes or failures.
2107///
2108/// ## Background
2109///
2110/// All parts of a bound packet must end up in their original starting position - the first place
2111/// they were bound. Failure to uphold this invariant may lead to deadlocks, or other unexpected
2112/// behavior. In addition, packet usually reaches an I/O backend, where it gets processed, before
2113/// being returned back to the caller. However, it is often desired to be able to do some
2114/// intermediary processing, or apply custom routing/splitting rules on the packet.
2115///
2116/// ### Client-server example
2117///
2118/// Let's take networked I/O backend as an example. To perform a read request, the following flow
2119/// happens:
2120///
2121/// 1. Caller sends an I/O packet to the client, acting as a I/O backend. The packet gets accepted,
2122///    read request is sent to the server.
2123///
2124/// 2. The server performs the read request, and for every segment of the packet writes the
2125///    response header with the packet bytes itself.
2126///
2127/// 3. The client receives individual packet segments as `(header, bytes)` pairs. For each pair the
2128///    client outputs the segment back to the original caller.
2129///
2130/// This is a very simple flow - client sends a request, server sends a response, client processes
2131/// the response, but how do you make this efficient?
2132///
2133/// For starters, the client needs to hold on to the original packet until all responses arrive.
2134/// Upon each response, the client needs to split a segment out, read data into it, and then return
2135/// it to the sender. This can actually be relatively easily solved with a simple sharded wrapper
2136/// around packet. Something equivalent to `BTreeMap<usize, BoundPacketObj>`. However, the
2137/// difficulty appears when working on the send side.
2138///
2139/// A naive way to transfer the packet over the wire is to use regular
2140/// [`IoWrite`](crate::traits::IoWrite) interface and await for each write to complete. However,
2141/// the process of awaiting for each request to finish makes the client extremely bottlenecked by
2142/// the time it takes to write each individual request. As far as client is concerned, completion
2143/// of each write operation is not important, what matters is order of operations.
2144///
2145/// [`NoPos`](crate::io::NoPos) I/O is by convention processed in the packet send order. Therefore,
2146/// all a client needs to do is send packets to the same packet ID, while awaiting for
2147/// confirmations in a separate async task, polled concurrently.
2148///
2149/// For a client write request, the one missing piece is awaiting for actual response from the
2150/// other end - which segment of the packet was successful, which was not. The problem is that
2151/// the regular [`IoWrite`](crate::traits::IoWrite) interface consumes the packet and does not let
2152/// you fail parts of it after the fact. A client needs to intercept the packet, and fail parts of
2153/// it after the fact. This is where [`ReboundPacket`] comes into play.
2154///
2155/// Upon arrival at a client, the write request would be rebound to the client's session packet ID,
2156/// and sent for processing through the I/O stream. At some point, the server returns back the
2157/// result of individual segments, which the client would then translate to actual response.
2158///
2159/// `ReboundPacket` helps in this situation, because it facilitates this rebind process safely.
2160pub struct ReboundPacket<T: PacketPerms> {
2161    ranges: RangeSet<u64>,
2162    orig: ManuallyDrop<BoundPacketView<T>>,
2163    unbound: AtomicBool,
2164}
2165
2166impl<T: PacketPerms> ReboundPacket<T> {
2167    pub fn packets_in_flight(&self) -> usize {
2168        self.orig.view.pkt().rc() - if self.ranges.is_empty() { 0 } else { 1 }
2169    }
2170
2171    pub fn unbound(&self) -> PacketView<'static, T> {
2172        assert!(!self.unbound.swap(true, Ordering::Acquire));
2173        // SAFETY: we are ensuring only a single unbound packet instance is created. In addition,
2174        // the rest of this struct ensures that all required invariants of the packet system are
2175        // upheld.
2176        unsafe { self.orig.unbound() }
2177    }
2178
2179    /// Inform that this packet view was fully sent out.
2180    ///
2181    /// Call whenever a packet view is about to be dropped, usually in on_output handler. This will
2182    /// mark the packet's range as intercepted, allowing the true result of the operation to be
2183    /// later propagated.
2184    pub fn on_processed(&mut self, pkt: PacketView<'static, T>, err: Option<Error>) {
2185        match err {
2186            Some(err) => {
2187                let pkt = unsafe {
2188                    self.orig
2189                        .extract_packet(pkt.start - self.orig.view.start, pkt.len())
2190                };
2191                pkt.error(err);
2192            }
2193            None => {
2194                let start = pkt.start - self.orig.view.start;
2195                let end = pkt.end - self.orig.view.start;
2196                self.ranges.insert(start..end);
2197            }
2198        }
2199    }
2200
2201    /// Finalize result of intercepted operation.
2202    ///
2203    /// Whenever a packet view's range is inserted in the intercepted range, it is possible to call
2204    /// this function to finalize the range's results.
2205    ///
2206    /// # Panics
2207    ///
2208    /// Whenever an invalid range is provided.
2209    pub fn range_result(&mut self, start: u64, len: u64, err: Option<Error>) {
2210        let range = start..(start + len);
2211        let mut o = self.ranges.overlapping(&range);
2212        let o = o.next().unwrap();
2213        assert!(o.contains(&start));
2214        assert!(o.contains(&(start + len.saturating_sub(1))));
2215        self.ranges.remove(range);
2216
2217        // SAFETY: we verified uniqueness of the range.
2218        let pkt = unsafe { self.orig.extract_packet(start, len) };
2219
2220        // We just extracted the last packet from the range, we are not going to do it again, as
2221        // per the API contract, we must forget this packet right now.
2222        if self.ranges.is_empty() {
2223            let orig = unsafe { ManuallyDrop::take(&mut self.orig) };
2224            unsafe { orig.forget() };
2225        }
2226
2227        if let Some(err) = err {
2228            pkt.error(err)
2229        }
2230    }
2231
2232    pub fn ranges(&self) -> &RangeSet<u64> {
2233        &self.ranges
2234    }
2235}
2236
2237impl<T: PacketPerms> From<BoundPacketView<T>> for ReboundPacket<T> {
2238    fn from(orig: BoundPacketView<T>) -> Self {
2239        Self {
2240            ranges: Default::default(),
2241            orig: ManuallyDrop::new(orig),
2242            unbound: false.into(),
2243        }
2244    }
2245}
2246
2247impl<T: PacketPerms> Drop for ReboundPacket<T> {
2248    fn drop(&mut self) {
2249        if *self.unbound.get_mut() {
2250            let mut prev = None;
2251
2252            for range in self.ranges.iter() {
2253                prev = Some(unsafe {
2254                    self.orig
2255                        .extract_packet(range.start, range.end - range.start)
2256                });
2257            }
2258
2259            // Only forget if we haven't been taken before
2260            if prev.is_some() {
2261                unsafe { ManuallyDrop::take(&mut self.orig).forget() };
2262            }
2263
2264            core::mem::drop(prev);
2265        } else {
2266            unsafe { ManuallyDrop::drop(&mut self.orig) };
2267        }
2268    }
2269}