mfio/io/mod.rs
1//! Desribes abstract I/O operations.
2//!
3//! # Introduction
4//!
5//! I/O is performed on packets - fungible units of operation. A packet is paired with an address
6//! and then is transferred throughout the I/O chain, until it reaches the final I/O backend,
7//! completing the operation. Along the way, the packet may be split into smaller ones, parts of it
8//! may be rejected, while other parts may get forwarded, with potentially diverging addresses. In
9//! the end, all parts of the packet are collected back into one place.
10//!
11//! A packet represents an abstract source or destination for I/O operations. It is always
12//! parameterized with [`PacketPerms`], describing how the underlying data of the packet can be
13//! accessed. Accessing the data means that the packet will no longer be forwarded, but before that
14//! point, the packet may be split up into smaller chunks, and sent to different I/O subsystems.
15//!
16//! # Lifecycle
17//!
18//! Most packet interactions can be traced back to [`PacketIo`] - a trait enabling the user to send
19//! packets to the I/O system. [`PacketIo::send_io`] is used to pass a bound packet to the
20//! front-facing I/O backend. It is the entrypoint for packets. Upon completion of each segment,
21//! output function is notified (if it has been assigned), and once all segments have completed
22//! operation, the original packet's waker is signaled. Complete flow is as follows:
23//!
24//! 1. A packet is bound to a stack or heap location through [`PacketStore`] trait.
25//!
26//! 2. Caller stores reference to the packet's header locally.
27//!
28//! 3. Bound packet is passed to [`PacketIo::send_io`].
29//!
30//! 4. The I/O backend processes the packet.
31//!
32//! 5. Result is fed back to [`OutputRef`], if it has been assigned during binding process.
33//!
34//! 6. Caller may choose to process returning packets as they come, based on the epecific
35//! `OutputRef` being attached, or await for total completion by calling
36//! [`Future::poll`] on the [`Packet`] reference.
37//!
38//! Steps 1-3 may be abstracted using [`PacketIoExt`] trait. Entire flow may be abstracted using
39//! [`IoRead`](crate::traits::IoRead), and [`IoWrite`](crate::traits::IoWrite) traits. However, if
40//! custom packet permissions are needed, the standard traits may not be sufficient.
41//!
42//! # Copy constraint negotiation
43//!
44//! I/O systems have various constraints on kinds of I/O operations possible. Some systems work by
45//! exposing a publicly accessible byte buffer, while in other systems those buffers are opaque and
46//! hidden behind hardware mechanisms or OS APIs. The simplest way to tackle varying requirements
47//! is to allocate intermediary buffers on the endpoints and expose those for I/O. However, that
48//! can be highly inefficient, because multiple copies may be involved, before data ends up at the
49//! final destination. Ideal scenario, for any I/O system, is to have only one copy per operation.
50//! And in I/O system where data is generated on-the-fly, ideal scenario would be to write output
51//! directly to the destination.
52//!
53//! To achieve this in mfio, we attach constraints to various parts of the I/O chain, and allocate
54//! temporary buffers only when needed. For any I/O end, we have the following constraint options:
55//!
56//! 1. Publicly exposed aligned byte-addressable buffer - this is the lower constraint tier, as
57//! individual bytes can be modified at neglibible cost.
58//! 2. Accepts byte-addressable input - this is more constarined, because the caller must provide a
59//! byte buffer, and cannot generate data on the fly. The callee takes this buffer and processes
60//! it internally using opaque mechanisms.
61//!
62//! I/O has 2 ends - input and output. These constraint levels are similar on both ends. See how
63//! these levels are described in the context of input (caller):
64//!
65//! 1. Sends byte-addressable buffer - this is the lower constraint tier, because the callee can
66//! process the input in any way possible.
67//! 2. Fills a byte-addressable buffer - this is more constrained, because the callee needs to
68//! provide a buffer to write to. However, this may also mean that the caller generates data on
69//! the fly, thus memory usage is lower.
70//!
71//! This is not exhaustive, but generally sufficient for most I/O cases. In practice, a backend
72//! that is able to access byte-addressable buffer directly will simply provide it to the packet,
73//! which will then process it. If the backend instead needs a buffer from the packet, it will
74//! call [`BoundPacketView::try_alloc`] with desired alignment parameters. If the allocation is not
75//! successful, it will then fall back to allocating an intermediary buffer.
76
77use crate::error::Error;
78#[cfg(feature = "cglue-trait")]
79use cglue::prelude::v1::*;
80use core::cell::UnsafeCell;
81use core::future::Future;
82use core::marker::PhantomData;
83use core::pin::Pin;
84use core::task::{Context, Poll};
85
86mod packet;
87pub use packet::*;
88mod opaque;
89pub use opaque::*;
90
91/// The primary trait enabling I/O.
92///
93/// This trait is generic in as many aspects as possible. The typical input is a parameter,
94/// typically representing a location, while view contains a slice of a packet. These 2 are
95/// disjoint, because the parameter may change on multiple hops of I/O chain, without data on the
96/// view changing.
97///
98/// `Perms` represents access permissions the packets sent through this trait have. This typically
99/// means reversed meaning when it comes to desired I/O operation. For instance, `Write` packets
100/// allow read operations to be performced, because data is read into the packet. Meanwhile, `Read`
101/// permission implies the packet holds the data, and that data can be transferred to the internal
102/// data store of the I/O backend.
103///
104/// You may want to check [`PacketIoExt`] trait and the [`traits`](crate::traits) module for easier
105/// to use abstractions.
106///
107/// # Example
108///
109/// Request-response handler:
110///
111/// ```
112/// use mfio::io::*;
113/// use mfio::mferr;
114/// use mfio::traits::*;
115///
116/// enum Request {
117/// Hi,
118/// Ho,
119/// }
120///
121/// impl Request {
122/// fn response(self) -> &'static [u8] {
123/// match self {
124/// Self::Hi => b"Hi",
125/// Self::Ho => b"Hoooooo",
126/// }
127/// }
128/// }
129///
130/// struct Backend;
131///
132/// impl PacketIo<Write, Request> for Backend {
133/// fn send_io(&self, param: Request, view: BoundPacketView<Write>) {
134/// let resp = param.response();
135///
136/// let view = if view.len() > resp.len() as u64 {
137/// let (a, b) = view.split_at(resp.len() as u64);
138/// b.error(mferr!(Memory, Outside, Backend));
139/// a
140/// } else {
141/// view
142/// };
143///
144/// // SAFETY: we have verified the packet view is at most the size of the response.
145/// unsafe {
146/// let _ = view.transfer_data(resp.as_ptr().cast());
147/// }
148/// }
149/// }
150///
151/// # pollster::block_on(async {
152/// let backend = Backend;
153///
154/// let mut buf = [0u8; 64];
155/// let _ = backend.read_all(Request::Ho, &mut buf[..]).await;
156/// assert_eq!(&buf[..7], b"Hoooooo");
157/// let _ = backend.read_all(Request::Hi, &mut buf[..]).await;
158/// assert_eq!(&buf[..2], b"Hi");
159/// assert_eq!(&buf[..10], b"Hiooooo\0\0\0");
160/// # });
161/// ```
162#[cfg_attr(feature = "cglue-trait", cglue_trait)]
163pub trait PacketIo<Perms: PacketPerms, Param>: Sized {
164 /// Send I/O request to the backend.
165 ///
166 /// This is a low level function for sending I/O to the backend. Typically, you'd use
167 /// [`PacketIoExt`], and [`StreamIoExt`] traits as second level abstractions. Third level
168 /// abstractions include [`IoRead`](crate::traits::IoRead), [`IoWrite`](crate::traits::IoWrite),
169 /// [`AsyncRead`](crate::stdeq::AsyncRead), and [`AsyncWrite`](crate::stdeq::AsyncWrite). Use
170 /// of these traits is highly encouraged.
171 ///
172 /// TODO: make this a sink (<https://github.com/memflow/mfio/issues/3>)
173 ///
174 /// # Example
175 ///
176 /// Manually sending I/O and awaiting it:
177 ///
178 /// ```
179 /// # mod sample {
180 /// # include!("../sample.rs");
181 /// # }
182 /// # use sample::SampleIo;
183 /// use mfio::backend::*;
184 /// use mfio::io::*;
185 /// use mfio::mferr;
186 ///
187 /// let handle = SampleIo::new(vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]);
188 ///
189 /// handle.block_on(async {
190 /// // Create a "simple" packet on the stack. Note that use of this must be careful - such
191 /// // usage can be considered to fall within the description of `mfio_assume_linear_types`
192 /// let packet = FullPacket::<_, Write>::new([0u8; 5]);
193 ///
194 /// let view = PacketView::from_ref(&packet, 0);
195 /// // SAFETY: through block_on, we ensure the packet will be waited until completion, and
196 /// // not be moved from the original location.
197 /// let view = unsafe { view.bind(None) };
198 ///
199 /// handle.send_io(3, view);
200 ///
201 /// // Packet is awaitable, and blocks until there are no more in-flight segments.
202 /// let _ = (&*packet).await;
203 ///
204 /// assert_eq!(packet.simple_contiguous_slice().unwrap(), &[2, 3, 5, 8, 13]);
205 /// })
206 /// ```
207 fn send_io(&self, param: Param, view: BoundPacketView<Perms>);
208}
209
210/// I/O helpers.
211///
212/// Use these helpers to simplify the usage of [`PacketIo`].
213pub trait PacketIoExt<Perms: PacketPerms, Param>: PacketIo<Perms, Param> {
214 fn io<'a, T: PacketStore<'a, Perms>>(
215 &'a self,
216 param: Param,
217 packet: T,
218 ) -> IoFut<'a, Self, Perms, Param, T> {
219 IoFut {
220 pkt: UnsafeCell::new(Some(packet.stack())),
221 initial_state: UnsafeCell::new(Some((self, param))),
222 _phantom: PhantomData,
223 }
224 }
225
226 fn io_to<'a, T: PacketStore<'a, Perms>, O: OutputStore<'a, Perms>>(
227 &'a self,
228 param: Param,
229 packet: T,
230 output: O,
231 ) -> IoToFut<'a, Self, Perms, Param, T, O> {
232 IoToFut {
233 pkt_out: UnsafeCell::new(Some((packet.stack(), output.stack()))),
234 initial_state: UnsafeCell::new(Some((self, param))),
235 _phantom: PhantomData,
236 }
237 }
238
239 fn io_to_stream<'a, T: PacketStore<'a, Perms> + 'a, O: PushPop<Output<'a, Perms>> + 'a>(
240 &'a self,
241 param: Param,
242 packet: T,
243 container: O,
244 ) -> IoToFut<'a, Self, Perms, Param, T, PacketStream<O, Perms>> {
245 self.io_to(param, packet, PacketStream::new(container))
246 }
247
248 fn io_to_fn<
249 'a,
250 T: PacketStore<'a, Perms>,
251 F: Fn(PacketView<'a, Perms>, Option<Error>) + Send + Sync + 'a,
252 >(
253 &'a self,
254 param: Param,
255 packet: T,
256 func: F,
257 ) -> IoToFut<'a, Self, Perms, Param, T, OutputFunction<F, Perms>> {
258 self.io_to(param, packet, OutputFunction::new(func))
259 }
260}
261
262impl<T: PacketIo<Perms, Param>, Perms: PacketPerms, Param> PacketIoExt<Perms, Param> for T {}
263
264/// Helpers for Stream I/O.
265///
266/// This is mainly meant for cases where I/O does not have a position parameter, such as TCP
267/// streams.
268pub trait StreamIoExt<Perms: PacketPerms>: PacketIo<Perms, NoPos> {
269 fn stream_io<'a, T: PacketStore<'a, Perms>>(
270 &'a self,
271 packet: T,
272 ) -> IoFut<'a, Self, Perms, NoPos, T> {
273 self.io(NoPos::new(), packet)
274 }
275
276 fn stream_io_to<'a, T: PacketStore<'a, Perms>, O: OutputStore<'a, Perms>>(
277 &'a self,
278 packet: T,
279 output: O,
280 ) -> IoToFut<'a, Self, Perms, NoPos, T, O> {
281 self.io_to(NoPos::new(), packet, output)
282 }
283}
284
285impl<T: PacketIo<Perms, NoPos>, Perms: PacketPerms> StreamIoExt<Perms> for T {}
286
287/// Describes lack of position.
288///
289/// This type is used in streams to signify that I/O is sequential. The convention is that I/O is
290/// processed on first-come, first-served basis
291#[repr(transparent)]
292#[derive(Clone)]
293pub struct NoPos(core::marker::PhantomData<()>);
294
295impl NoPos {
296 pub const fn new() -> Self {
297 Self(core::marker::PhantomData)
298 }
299}
300
301/// The simplest I/O future.
302///
303/// This future will drive an operation on the packet to completion, and then return `Poll::Ready`.
304///
305/// To perform more complex actions on partial results, please look at [`IoToFut`].
306pub struct IoFut<'a, T: ?Sized, Perms: PacketPerms, Param, Packet: PacketStore<'a, Perms>> {
307 pkt: UnsafeCell<Option<Packet::StackReq<'a>>>,
308 initial_state: UnsafeCell<Option<(&'a T, Param)>>,
309 _phantom: PhantomData<Perms>,
310}
311
312impl<
313 'a,
314 T: PacketIo<Perms, Param> + ?Sized,
315 Perms: PacketPerms,
316 Param,
317 Pkt: PacketStore<'a, Perms>,
318 > Future for IoFut<'a, T, Perms, Param, Pkt>
319{
320 type Output = Pkt::StackReq<'a>;
321
322 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
323 let state = self.into_ref().get_ref();
324
325 loop {
326 match unsafe { (*state.initial_state.get()).take() } {
327 Some((io, param)) => {
328 // SAFETY: this packet's existence is tied to 'a lifetime, meaning it will be valid
329 // throughout 'a.
330 let pkt: &'a Pkt::StackReq<'a> =
331 unsafe { (*state.pkt.get()).as_ref().unwrap() };
332
333 let view: PacketView<'a, Perms> = Pkt::stack_opaque(pkt);
334
335 // SAFETY: PacketView's lifetime is a marker, and we are using the marker lifetime to guide
336 // assumptions about type's validity. A sound implementation would put a 'static object
337 // here regardless, making the object 'static, while non-'static implementations are out of
338 // our hand, therefore we assume the caller is giving us correct info.
339 let bound = unsafe { view.bind(None) };
340 io.send_io(param, bound)
341 }
342 None => {
343 let pkt: &'a Pkt::StackReq<'a> =
344 unsafe { (*state.pkt.get()).as_ref().unwrap() };
345
346 let mut pkt: &'a Packet<Perms> = Pkt::stack_hdr(pkt);
347 let pkt = Pin::new(&mut pkt);
348 break pkt
349 .poll(cx)
350 .map(|_| unsafe { (*state.pkt.get()).take().unwrap() });
351 }
352 }
353 }
354 }
355}
356
357/// I/O future with custom actions per returned packet segment.
358///
359/// This future allows customizing behavior upon each completed packet segment. This may include
360/// logging, storing segments in a collection, or processing them in a stream. Please see
361/// appropriate output modules for more details.
362pub struct IoToFut<
363 'a,
364 T: ?Sized,
365 Perms: PacketPerms,
366 Param,
367 Packet: PacketStore<'a, Perms>,
368 Output: OutputStore<'a, Perms>,
369> {
370 pkt_out: UnsafeCell<Option<(Packet::StackReq<'a>, Output::StackReq<'a>)>>,
371 initial_state: UnsafeCell<Option<(&'a T, Param)>>,
372 _phantom: PhantomData<Perms>,
373}
374
375impl<
376 'a,
377 T: PacketIo<Perms, Param> + ?Sized,
378 Perms: PacketPerms,
379 Param,
380 Pkt: PacketStore<'a, Perms>,
381 Out: OutputStore<'a, Perms>,
382 > IoToFut<'a, T, Perms, Param, Pkt, Out>
383{
384 pub fn submit(self: Pin<&mut Self>) -> &Out::StackReq<'a> {
385 let this = self.into_ref();
386 if let Some((io, param)) = unsafe { (*this.initial_state.get()).take() } {
387 // SAFETY: this packet's existence is tied to 'a lifetime, meaning it will be valid
388 // throughout 'a.
389 let (pkt, out): &'a mut (Pkt::StackReq<'a>, Out::StackReq<'a>) =
390 unsafe { (*this.pkt_out.get()).as_mut().unwrap() };
391 let view: PacketView<'a, Perms> = Pkt::stack_opaque(pkt);
392 // SAFETY: PacketView's lifetime is a marker, and we are using the marker lifetime to guide
393 // assumptions about type's validity. A sound implementation would put a 'static object
394 // here regardless, making the object 'static, while non-'static implementations are out of
395 // our hand, therefore we assume the caller is giving us correct info.
396 let bound = unsafe { view.bind(Some(Out::stack_opaque(out))) };
397 io.send_io(param, bound)
398 }
399
400 unsafe { (*this.pkt_out.get()).as_ref().map(|(_, out)| out).unwrap() }
401 }
402}
403
404impl<
405 'a,
406 T: PacketIo<Perms, Param>,
407 Perms: PacketPerms,
408 Param,
409 Pkt: PacketStore<'a, Perms>,
410 Out: OutputStore<'a, Perms>,
411 > Future for IoToFut<'a, T, Perms, Param, Pkt, Out>
412{
413 type Output = (Pkt::StackReq<'a>, Out::StackReq<'a>);
414
415 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
416 self.as_mut().submit();
417
418 let state = self.into_ref();
419
420 let pkt: &'a Pkt::StackReq<'a> = unsafe { &(*state.pkt_out.get()).as_ref().unwrap().0 };
421 let mut pkt: &'a Packet<Perms> = Pkt::stack_hdr(pkt);
422 let pkt = Pin::new(&mut pkt);
423 pkt.poll(cx)
424 .map(|_| unsafe { (*state.pkt_out.get()).take().unwrap() })
425 }
426}