mfio/io/
mod.rs

1//! Desribes abstract I/O operations.
2//!
3//! # Introduction
4//!
5//! I/O is performed on packets - fungible units of operation. A packet is paired with an address
6//! and then is transferred throughout the I/O chain, until it reaches the final I/O backend,
7//! completing the operation. Along the way, the packet may be split into smaller ones, parts of it
8//! may be rejected, while other parts may get forwarded, with potentially diverging addresses. In
9//! the end, all parts of the packet are collected back into one place.
10//!
11//! A packet represents an abstract source or destination for I/O operations. It is always
12//! parameterized with [`PacketPerms`], describing how the underlying data of the packet can be
13//! accessed. Accessing the data means that the packet will no longer be forwarded, but before that
14//! point, the packet may be split up into smaller chunks, and sent to different I/O subsystems.
15//!
16//! # Lifecycle
17//!
18//! Most packet interactions can be traced back to [`PacketIo`] - a trait enabling the user to send
19//! packets to the I/O system. [`PacketIo::send_io`] is used to pass a bound packet to the
20//! front-facing I/O backend. It is the entrypoint for packets. Upon completion of each segment,
21//! output function is notified (if it has been assigned), and once all segments have completed
22//! operation, the original packet's waker is signaled. Complete flow is as follows:
23//!
24//! 1. A packet is bound to a stack or heap location through [`PacketStore`] trait.
25//!
26//! 2. Caller stores reference to the packet's header locally.
27//!
28//! 3. Bound packet is passed to [`PacketIo::send_io`].
29//!
30//! 4. The I/O backend processes the packet.
31//!
32//! 5. Result is fed back to [`OutputRef`], if it has been assigned during binding process.
33//!
34//! 6. Caller may choose to process returning packets as they come, based on the epecific
35//!    `OutputRef` being attached, or await for total completion by calling
36//!    [`Future::poll`] on the [`Packet`] reference.
37//!
38//! Steps 1-3 may be abstracted using [`PacketIoExt`] trait. Entire flow may be abstracted using
39//! [`IoRead`](crate::traits::IoRead), and [`IoWrite`](crate::traits::IoWrite) traits. However, if
40//! custom packet permissions are needed, the standard traits may not be sufficient.
41//!
42//! # Copy constraint negotiation
43//!
44//! I/O systems have various constraints on kinds of I/O operations possible. Some systems work by
45//! exposing a publicly accessible byte buffer, while in other systems those buffers are opaque and
46//! hidden behind hardware mechanisms or OS APIs. The simplest way to tackle varying requirements
47//! is to allocate intermediary buffers on the endpoints and expose those for I/O. However, that
48//! can be highly inefficient, because multiple copies may be involved, before data ends up at the
49//! final destination. Ideal scenario, for any I/O system, is to have only one copy per operation.
50//! And in I/O system where data is generated on-the-fly, ideal scenario would be to write output
51//! directly to the destination.
52//!
53//! To achieve this in mfio, we attach constraints to various parts of the I/O chain, and allocate
54//! temporary buffers only when needed. For any I/O end, we have the following constraint options:
55//!
56//! 1. Publicly exposed aligned byte-addressable buffer - this is the lower constraint tier, as
57//!    individual bytes can be modified at neglibible cost.
58//! 2. Accepts byte-addressable input - this is more constarined, because the caller must provide a
59//!    byte buffer, and cannot generate data on the fly. The callee takes this buffer and processes
60//!    it internally using opaque mechanisms.
61//!
62//! I/O has 2 ends - input and output. These constraint levels are similar on both ends. See how
63//! these levels are described in the context of input (caller):
64//!
65//! 1. Sends byte-addressable buffer - this is the lower constraint tier, because the callee can
66//!    process the input in any way possible.
67//! 2. Fills a byte-addressable buffer - this is more constrained, because the callee needs to
68//!    provide a buffer to write to. However, this may also mean that the caller generates data on
69//!    the fly, thus memory usage is lower.
70//!
71//! This is not exhaustive, but generally sufficient for most I/O cases. In practice, a backend
72//! that is able to access byte-addressable buffer directly will simply provide it to the packet,
73//! which will then process it. If the backend instead needs a buffer from the packet, it will
74//! call [`BoundPacketView::try_alloc`] with desired alignment parameters. If the allocation is not
75//! successful, it will then fall back to allocating an intermediary buffer.
76
77use crate::error::Error;
78#[cfg(feature = "cglue-trait")]
79use cglue::prelude::v1::*;
80use core::cell::UnsafeCell;
81use core::future::Future;
82use core::marker::PhantomData;
83use core::pin::Pin;
84use core::task::{Context, Poll};
85
86mod packet;
87pub use packet::*;
88mod opaque;
89pub use opaque::*;
90
91/// The primary trait enabling I/O.
92///
93/// This trait is generic in as many aspects as possible. The typical input is a parameter,
94/// typically representing a location, while view contains a slice of a packet. These 2 are
95/// disjoint, because the parameter may change on multiple hops of I/O chain, without data on the
96/// view changing.
97///
98/// `Perms` represents access permissions the packets sent through this trait have. This typically
99/// means reversed meaning when it comes to desired I/O operation. For instance, `Write` packets
100/// allow read operations to be performced, because data is read into the packet. Meanwhile, `Read`
101/// permission implies the packet holds the data, and that data can be transferred to the internal
102/// data store of the I/O backend.
103///
104/// You may want to check [`PacketIoExt`] trait and the [`traits`](crate::traits) module for easier
105/// to use abstractions.
106///
107/// # Example
108///
109/// Request-response handler:
110///
111/// ```
112/// use mfio::io::*;
113/// use mfio::mferr;
114/// use mfio::traits::*;
115///
116/// enum Request {
117///     Hi,
118///     Ho,
119/// }
120///
121/// impl Request {
122///     fn response(self) -> &'static [u8] {
123///         match self {
124///             Self::Hi => b"Hi",
125///             Self::Ho => b"Hoooooo",
126///         }
127///     }
128/// }
129///
130/// struct Backend;
131///
132/// impl PacketIo<Write, Request> for Backend {
133///     fn send_io(&self, param: Request, view: BoundPacketView<Write>) {
134///         let resp = param.response();
135///
136///         let view = if view.len() > resp.len() as u64 {
137///             let (a, b) = view.split_at(resp.len() as u64);
138///             b.error(mferr!(Memory, Outside, Backend));
139///             a
140///         } else {
141///             view
142///         };
143///
144///         // SAFETY: we have verified the packet view is at most the size of the response.
145///         unsafe {
146///             let _ = view.transfer_data(resp.as_ptr().cast());
147///         }
148///     }
149/// }
150///
151/// # pollster::block_on(async {
152/// let backend = Backend;
153///
154/// let mut buf = [0u8; 64];
155/// let _ = backend.read_all(Request::Ho, &mut buf[..]).await;
156/// assert_eq!(&buf[..7], b"Hoooooo");
157/// let _ = backend.read_all(Request::Hi, &mut buf[..]).await;
158/// assert_eq!(&buf[..2], b"Hi");
159/// assert_eq!(&buf[..10], b"Hiooooo\0\0\0");
160/// # });
161/// ```
162#[cfg_attr(feature = "cglue-trait", cglue_trait)]
163pub trait PacketIo<Perms: PacketPerms, Param>: Sized {
164    /// Send I/O request to the backend.
165    ///
166    /// This is a low level function for sending I/O to the backend. Typically, you'd use
167    /// [`PacketIoExt`], and [`StreamIoExt`] traits as second level abstractions. Third level
168    /// abstractions include [`IoRead`](crate::traits::IoRead), [`IoWrite`](crate::traits::IoWrite),
169    /// [`AsyncRead`](crate::stdeq::AsyncRead), and [`AsyncWrite`](crate::stdeq::AsyncWrite). Use
170    /// of these traits is highly encouraged.
171    ///
172    /// TODO: make this a sink (<https://github.com/memflow/mfio/issues/3>)
173    ///
174    /// # Example
175    ///
176    /// Manually sending I/O and awaiting it:
177    ///
178    /// ```
179    /// # mod sample {
180    /// #     include!("../sample.rs");
181    /// # }
182    /// # use sample::SampleIo;
183    /// use mfio::backend::*;
184    /// use mfio::io::*;
185    /// use mfio::mferr;
186    ///
187    /// let handle = SampleIo::new(vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]);
188    ///
189    /// handle.block_on(async {
190    ///     // Create a "simple" packet on the stack. Note that use of this must be careful - such
191    ///     // usage can be considered to fall within the description of `mfio_assume_linear_types`
192    ///     let packet = FullPacket::<_, Write>::new([0u8; 5]);
193    ///
194    ///     let view = PacketView::from_ref(&packet, 0);
195    ///     // SAFETY: through block_on, we ensure the packet will be waited until completion, and
196    ///     // not be moved from the original location.
197    ///     let view = unsafe { view.bind(None) };
198    ///
199    ///     handle.send_io(3, view);
200    ///
201    ///     // Packet is awaitable, and blocks until there are no more in-flight segments.
202    ///     let _ = (&*packet).await;
203    ///
204    ///     assert_eq!(packet.simple_contiguous_slice().unwrap(), &[2, 3, 5, 8, 13]);
205    /// })
206    /// ```
207    fn send_io(&self, param: Param, view: BoundPacketView<Perms>);
208}
209
210/// I/O helpers.
211///
212/// Use these helpers to simplify the usage of [`PacketIo`].
213pub trait PacketIoExt<Perms: PacketPerms, Param>: PacketIo<Perms, Param> {
214    fn io<'a, T: PacketStore<'a, Perms>>(
215        &'a self,
216        param: Param,
217        packet: T,
218    ) -> IoFut<'a, Self, Perms, Param, T> {
219        IoFut {
220            pkt: UnsafeCell::new(Some(packet.stack())),
221            initial_state: UnsafeCell::new(Some((self, param))),
222            _phantom: PhantomData,
223        }
224    }
225
226    fn io_to<'a, T: PacketStore<'a, Perms>, O: OutputStore<'a, Perms>>(
227        &'a self,
228        param: Param,
229        packet: T,
230        output: O,
231    ) -> IoToFut<'a, Self, Perms, Param, T, O> {
232        IoToFut {
233            pkt_out: UnsafeCell::new(Some((packet.stack(), output.stack()))),
234            initial_state: UnsafeCell::new(Some((self, param))),
235            _phantom: PhantomData,
236        }
237    }
238
239    fn io_to_stream<'a, T: PacketStore<'a, Perms> + 'a, O: PushPop<Output<'a, Perms>> + 'a>(
240        &'a self,
241        param: Param,
242        packet: T,
243        container: O,
244    ) -> IoToFut<'a, Self, Perms, Param, T, PacketStream<O, Perms>> {
245        self.io_to(param, packet, PacketStream::new(container))
246    }
247
248    fn io_to_fn<
249        'a,
250        T: PacketStore<'a, Perms>,
251        F: Fn(PacketView<'a, Perms>, Option<Error>) + Send + Sync + 'a,
252    >(
253        &'a self,
254        param: Param,
255        packet: T,
256        func: F,
257    ) -> IoToFut<'a, Self, Perms, Param, T, OutputFunction<F, Perms>> {
258        self.io_to(param, packet, OutputFunction::new(func))
259    }
260}
261
262impl<T: PacketIo<Perms, Param>, Perms: PacketPerms, Param> PacketIoExt<Perms, Param> for T {}
263
264/// Helpers for Stream I/O.
265///
266/// This is mainly meant for cases where I/O does not have a position parameter, such as TCP
267/// streams.
268pub trait StreamIoExt<Perms: PacketPerms>: PacketIo<Perms, NoPos> {
269    fn stream_io<'a, T: PacketStore<'a, Perms>>(
270        &'a self,
271        packet: T,
272    ) -> IoFut<'a, Self, Perms, NoPos, T> {
273        self.io(NoPos::new(), packet)
274    }
275
276    fn stream_io_to<'a, T: PacketStore<'a, Perms>, O: OutputStore<'a, Perms>>(
277        &'a self,
278        packet: T,
279        output: O,
280    ) -> IoToFut<'a, Self, Perms, NoPos, T, O> {
281        self.io_to(NoPos::new(), packet, output)
282    }
283}
284
285impl<T: PacketIo<Perms, NoPos>, Perms: PacketPerms> StreamIoExt<Perms> for T {}
286
287/// Describes lack of position.
288///
289/// This type is used in streams to signify that I/O is sequential. The convention is that I/O is
290/// processed on first-come, first-served basis
291#[repr(transparent)]
292#[derive(Clone)]
293pub struct NoPos(core::marker::PhantomData<()>);
294
295impl NoPos {
296    pub const fn new() -> Self {
297        Self(core::marker::PhantomData)
298    }
299}
300
301/// The simplest I/O future.
302///
303/// This future will drive an operation on the packet to completion, and then return `Poll::Ready`.
304///
305/// To perform more complex actions on partial results, please look at [`IoToFut`].
306pub struct IoFut<'a, T: ?Sized, Perms: PacketPerms, Param, Packet: PacketStore<'a, Perms>> {
307    pkt: UnsafeCell<Option<Packet::StackReq<'a>>>,
308    initial_state: UnsafeCell<Option<(&'a T, Param)>>,
309    _phantom: PhantomData<Perms>,
310}
311
312impl<
313        'a,
314        T: PacketIo<Perms, Param> + ?Sized,
315        Perms: PacketPerms,
316        Param,
317        Pkt: PacketStore<'a, Perms>,
318    > Future for IoFut<'a, T, Perms, Param, Pkt>
319{
320    type Output = Pkt::StackReq<'a>;
321
322    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
323        let state = self.into_ref().get_ref();
324
325        loop {
326            match unsafe { (*state.initial_state.get()).take() } {
327                Some((io, param)) => {
328                    // SAFETY: this packet's existence is tied to 'a lifetime, meaning it will be valid
329                    // throughout 'a.
330                    let pkt: &'a Pkt::StackReq<'a> =
331                        unsafe { (*state.pkt.get()).as_ref().unwrap() };
332
333                    let view: PacketView<'a, Perms> = Pkt::stack_opaque(pkt);
334
335                    // SAFETY: PacketView's lifetime is a marker, and we are using the marker lifetime to guide
336                    // assumptions about type's validity. A sound implementation would put a 'static object
337                    // here regardless, making the object 'static, while non-'static implementations are out of
338                    // our hand, therefore we assume the caller is giving us correct info.
339                    let bound = unsafe { view.bind(None) };
340                    io.send_io(param, bound)
341                }
342                None => {
343                    let pkt: &'a Pkt::StackReq<'a> =
344                        unsafe { (*state.pkt.get()).as_ref().unwrap() };
345
346                    let mut pkt: &'a Packet<Perms> = Pkt::stack_hdr(pkt);
347                    let pkt = Pin::new(&mut pkt);
348                    break pkt
349                        .poll(cx)
350                        .map(|_| unsafe { (*state.pkt.get()).take().unwrap() });
351                }
352            }
353        }
354    }
355}
356
357/// I/O future with custom actions per returned packet segment.
358///
359/// This future allows customizing behavior upon each completed packet segment. This may include
360/// logging, storing segments in a collection, or processing them in a stream. Please see
361/// appropriate output modules for more details.
362pub struct IoToFut<
363    'a,
364    T: ?Sized,
365    Perms: PacketPerms,
366    Param,
367    Packet: PacketStore<'a, Perms>,
368    Output: OutputStore<'a, Perms>,
369> {
370    pkt_out: UnsafeCell<Option<(Packet::StackReq<'a>, Output::StackReq<'a>)>>,
371    initial_state: UnsafeCell<Option<(&'a T, Param)>>,
372    _phantom: PhantomData<Perms>,
373}
374
375impl<
376        'a,
377        T: PacketIo<Perms, Param> + ?Sized,
378        Perms: PacketPerms,
379        Param,
380        Pkt: PacketStore<'a, Perms>,
381        Out: OutputStore<'a, Perms>,
382    > IoToFut<'a, T, Perms, Param, Pkt, Out>
383{
384    pub fn submit(self: Pin<&mut Self>) -> &Out::StackReq<'a> {
385        let this = self.into_ref();
386        if let Some((io, param)) = unsafe { (*this.initial_state.get()).take() } {
387            // SAFETY: this packet's existence is tied to 'a lifetime, meaning it will be valid
388            // throughout 'a.
389            let (pkt, out): &'a mut (Pkt::StackReq<'a>, Out::StackReq<'a>) =
390                unsafe { (*this.pkt_out.get()).as_mut().unwrap() };
391            let view: PacketView<'a, Perms> = Pkt::stack_opaque(pkt);
392            // SAFETY: PacketView's lifetime is a marker, and we are using the marker lifetime to guide
393            // assumptions about type's validity. A sound implementation would put a 'static object
394            // here regardless, making the object 'static, while non-'static implementations are out of
395            // our hand, therefore we assume the caller is giving us correct info.
396            let bound = unsafe { view.bind(Some(Out::stack_opaque(out))) };
397            io.send_io(param, bound)
398        }
399
400        unsafe { (*this.pkt_out.get()).as_ref().map(|(_, out)| out).unwrap() }
401    }
402}
403
404impl<
405        'a,
406        T: PacketIo<Perms, Param>,
407        Perms: PacketPerms,
408        Param,
409        Pkt: PacketStore<'a, Perms>,
410        Out: OutputStore<'a, Perms>,
411    > Future for IoToFut<'a, T, Perms, Param, Pkt, Out>
412{
413    type Output = (Pkt::StackReq<'a>, Out::StackReq<'a>);
414
415    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
416        self.as_mut().submit();
417
418        let state = self.into_ref();
419
420        let pkt: &'a Pkt::StackReq<'a> = unsafe { &(*state.pkt_out.get()).as_ref().unwrap().0 };
421        let mut pkt: &'a Packet<Perms> = Pkt::stack_hdr(pkt);
422        let pkt = Pin::new(&mut pkt);
423        pkt.poll(cx)
424            .map(|_| unsafe { (*state.pkt_out.get()).take().unwrap() })
425    }
426}