mfio/
futures_compat.rs

1//! Provides compatibility with `futures` traits.
2
3use crate::io::*;
4use crate::stdeq::{self, AsyncIoFut};
5use crate::util::PosShift;
6use core::future::Future;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9#[cfg(not(mfio_assume_linear_types))]
10use futures::io::AsyncRead;
11use futures::io::{AsyncSeek, AsyncWrite};
12use std::io::{Result, SeekFrom};
13
14/// Container for intermediate values.
15///
16/// Currently, reading and writing is not cancel safe. Meaning, cancelling the I/O operation and
17/// issuing a new one would continue the previous operation, and sync the results to the currently
18/// provided buffer. Note that the types of operations are handled separately so they do not mix
19/// and it is okay to cancel a read to issue a write.
20///
21/// If you wish to cancel the operation, do drop the entire `Compat` object. However, be warned
22/// that `mfio` may panic, since it does not yet support cancellation at all.
23///
24/// Note that at the time of writing, `AsyncRead` is not supported when `mfio_assume_linear_types`
25/// config is set.
26pub struct Compat<'a, Io: ?Sized> {
27    io: &'a Io,
28    #[cfg(not(mfio_assume_linear_types))]
29    read: Option<AsyncIoFut<'a, Io, Write, u64, &'a mut [u8]>>,
30    write: Option<AsyncIoFut<'a, Io, Read, u64, &'a [u8]>>,
31}
32
33/// Bridges mfio with futures.
34///
35/// # Examples
36///
37/// Read from mfio object through futures traits.
38///
39/// ```rust
40/// # mod sample {
41/// #     include!("sample.rs");
42/// # }
43/// # use sample::SampleIo;
44/// # fn work() -> mfio::error::Result<()> {
45/// # mfio::linear_types_switch!(
46/// #     Linear => { Ok(()) }
47/// #     Standard => {{
48/// use futures::io::{AsyncReadExt, Cursor};
49/// use mfio::backend::*;
50/// use mfio::futures_compat::FuturesCompat;
51/// use mfio::stdeq::SeekableRef;
52///
53/// let mem = vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144];
54/// let handle = SampleIo::new(mem.clone());
55///
56/// handle.block_on(async {
57///     let mut buf = Cursor::new(vec![0; mem.len()]);
58///
59///     let handle = SeekableRef::from(&handle);
60///     futures::io::copy(handle.compat(), &mut buf).await?;
61///     assert_eq!(mem, buf.into_inner());
62///
63///     Ok(())
64/// })
65/// # }}
66/// # )
67/// # }
68/// # work().unwrap();
69/// ```
70///
71/// Write using futures traits.
72///
73/// ```rust
74/// # mod sample {
75/// #     include!("sample.rs");
76/// # }
77/// # use sample::SampleIo;
78/// # fn work() -> mfio::error::Result<()> {
79/// use futures::io::AsyncWriteExt;
80/// use mfio::backend::*;
81/// use mfio::futures_compat::FuturesCompat;
82/// use mfio::stdeq::SeekableRef;
83/// use mfio::traits::IoRead;
84///
85/// let mut mem = vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144];
86/// let handle = SampleIo::new(mem.clone());
87///
88/// handle.block_on(async {
89///     let handle = SeekableRef::from(&handle);
90///     handle.compat().write_all(&[9, 9, 9]).await?;
91///
92///     handle.read_all(0, &mut mem[..5]).await.unwrap();
93///     assert_eq!(&mem[..5], &[9, 9, 9, 2, 3]);
94///
95///     Ok(())
96/// })
97/// # }
98/// # work().unwrap();
99/// ```
100pub trait FuturesCompat {
101    fn compat(&self) -> Compat<Self> {
102        Compat {
103            io: self,
104            #[cfg(not(mfio_assume_linear_types))]
105            read: None,
106            write: None,
107        }
108    }
109}
110
111// StreamPos is needed for all I/O traits, so we use it to make sure rust gives better diagnostics.
112impl<Io: ?Sized + stdeq::StreamPos<u64>> FuturesCompat for Io {}
113
114// Currently we cannot guarantee that the user won't swap the buffer when using linear types.
115// FIXME: always allocate an intermediary and sync in `Compat`. This way we could also retain the
116// buffer, so that's nice.
117#[cfg(not(mfio_assume_linear_types))]
118#[cfg_attr(docsrs, doc(cfg(not(mfio_assume_linear_types))))]
119impl<'a, Io: ?Sized + stdeq::AsyncRead<u64>> AsyncRead for Compat<'a, Io>
120where
121    u64: PosShift<Io>,
122{
123    fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
124        let this = unsafe { self.get_unchecked_mut() };
125
126        loop {
127            if let Some(read) = this.read.as_mut() {
128                // Update the sync handle. This is how we hack around the lifetimes of input buffer.
129                // SAFETY: AsyncIoFut will only use the sync object if, and only if the buffer is
130                // to be written in this poll.
131                read.sync = Some(unsafe { &mut *(buf as *mut _) });
132
133                let read = unsafe { Pin::new_unchecked(read) };
134
135                break read.poll(cx).map(|v| {
136                    this.read = None;
137                    v.map_err(|_| std::io::ErrorKind::Other.into())
138                });
139            } else {
140                // SAFETY: on mfio_assume_linear_types, this is unsafe. Without the switch this is
141                // safe, because the buffer is stored in a sync variable that is only used whenever
142                // the I/O completes. That is processed in this poll function, and we update the
143                // sync at every iteration of the loop.
144                let buf = unsafe { &mut *(buf as *mut _) };
145                this.read = Some(stdeq::AsyncRead::read(this.io, buf));
146            }
147        }
148    }
149}
150
151impl<'a, Io: ?Sized + stdeq::AsyncWrite<u64>> AsyncWrite for Compat<'a, Io>
152where
153    u64: PosShift<Io>,
154{
155    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize>> {
156        let this = unsafe { self.get_unchecked_mut() };
157
158        loop {
159            if let Some(write) = this.write.as_mut() {
160                let write = unsafe { Pin::new_unchecked(write) };
161
162                break write.poll(cx).map(|v| {
163                    this.write = None;
164                    v.map_err(|_| std::io::ErrorKind::Other.into())
165                });
166            } else {
167                // SAFETY: on mfio_assume_linear_types, this is unsafe. Without the switch this is
168                // safe, because the buffer is transferred to an intermediate one before this
169                // function returns..
170                let buf = unsafe { &*(buf as *const _) };
171                this.write = Some(stdeq::AsyncWrite::write(this.io, buf));
172            }
173        }
174    }
175
176    fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<()>> {
177        // Completion of every request currently implies we've flushed.
178        // TODO: improve semantics
179        Poll::Ready(Ok(()))
180    }
181
182    fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<()>> {
183        // We currently imply that we can just close on drop.
184        // TODO: improve semantics
185        Poll::Ready(Ok(()))
186    }
187}
188
189impl<'a, Io: ?Sized + stdeq::StreamPos<u64>> AsyncSeek for Compat<'a, Io> {
190    fn poll_seek(self: Pin<&mut Self>, _: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> {
191        let this = unsafe { self.get_unchecked_mut() };
192        Poll::Ready(stdeq::std_seek(this.io, pos))
193    }
194}