mfio/futures_compat.rs
1//! Provides compatibility with `futures` traits.
2
3use crate::io::*;
4use crate::stdeq::{self, AsyncIoFut};
5use crate::util::PosShift;
6use core::future::Future;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9#[cfg(not(mfio_assume_linear_types))]
10use futures::io::AsyncRead;
11use futures::io::{AsyncSeek, AsyncWrite};
12use std::io::{Result, SeekFrom};
13
14/// Container for intermediate values.
15///
16/// Currently, reading and writing is not cancel safe. Meaning, cancelling the I/O operation and
17/// issuing a new one would continue the previous operation, and sync the results to the currently
18/// provided buffer. Note that the types of operations are handled separately so they do not mix
19/// and it is okay to cancel a read to issue a write.
20///
21/// If you wish to cancel the operation, do drop the entire `Compat` object. However, be warned
22/// that `mfio` may panic, since it does not yet support cancellation at all.
23///
24/// Note that at the time of writing, `AsyncRead` is not supported when `mfio_assume_linear_types`
25/// config is set.
26pub struct Compat<'a, Io: ?Sized> {
27 io: &'a Io,
28 #[cfg(not(mfio_assume_linear_types))]
29 read: Option<AsyncIoFut<'a, Io, Write, u64, &'a mut [u8]>>,
30 write: Option<AsyncIoFut<'a, Io, Read, u64, &'a [u8]>>,
31}
32
33/// Bridges mfio with futures.
34///
35/// # Examples
36///
37/// Read from mfio object through futures traits.
38///
39/// ```rust
40/// # mod sample {
41/// # include!("sample.rs");
42/// # }
43/// # use sample::SampleIo;
44/// # fn work() -> mfio::error::Result<()> {
45/// # mfio::linear_types_switch!(
46/// # Linear => { Ok(()) }
47/// # Standard => {{
48/// use futures::io::{AsyncReadExt, Cursor};
49/// use mfio::backend::*;
50/// use mfio::futures_compat::FuturesCompat;
51/// use mfio::stdeq::SeekableRef;
52///
53/// let mem = vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144];
54/// let handle = SampleIo::new(mem.clone());
55///
56/// handle.block_on(async {
57/// let mut buf = Cursor::new(vec![0; mem.len()]);
58///
59/// let handle = SeekableRef::from(&handle);
60/// futures::io::copy(handle.compat(), &mut buf).await?;
61/// assert_eq!(mem, buf.into_inner());
62///
63/// Ok(())
64/// })
65/// # }}
66/// # )
67/// # }
68/// # work().unwrap();
69/// ```
70///
71/// Write using futures traits.
72///
73/// ```rust
74/// # mod sample {
75/// # include!("sample.rs");
76/// # }
77/// # use sample::SampleIo;
78/// # fn work() -> mfio::error::Result<()> {
79/// use futures::io::AsyncWriteExt;
80/// use mfio::backend::*;
81/// use mfio::futures_compat::FuturesCompat;
82/// use mfio::stdeq::SeekableRef;
83/// use mfio::traits::IoRead;
84///
85/// let mut mem = vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144];
86/// let handle = SampleIo::new(mem.clone());
87///
88/// handle.block_on(async {
89/// let handle = SeekableRef::from(&handle);
90/// handle.compat().write_all(&[9, 9, 9]).await?;
91///
92/// handle.read_all(0, &mut mem[..5]).await.unwrap();
93/// assert_eq!(&mem[..5], &[9, 9, 9, 2, 3]);
94///
95/// Ok(())
96/// })
97/// # }
98/// # work().unwrap();
99/// ```
100pub trait FuturesCompat {
101 fn compat(&self) -> Compat<Self> {
102 Compat {
103 io: self,
104 #[cfg(not(mfio_assume_linear_types))]
105 read: None,
106 write: None,
107 }
108 }
109}
110
111// StreamPos is needed for all I/O traits, so we use it to make sure rust gives better diagnostics.
112impl<Io: ?Sized + stdeq::StreamPos<u64>> FuturesCompat for Io {}
113
114// Currently we cannot guarantee that the user won't swap the buffer when using linear types.
115// FIXME: always allocate an intermediary and sync in `Compat`. This way we could also retain the
116// buffer, so that's nice.
117#[cfg(not(mfio_assume_linear_types))]
118#[cfg_attr(docsrs, doc(cfg(not(mfio_assume_linear_types))))]
119impl<'a, Io: ?Sized + stdeq::AsyncRead<u64>> AsyncRead for Compat<'a, Io>
120where
121 u64: PosShift<Io>,
122{
123 fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
124 let this = unsafe { self.get_unchecked_mut() };
125
126 loop {
127 if let Some(read) = this.read.as_mut() {
128 // Update the sync handle. This is how we hack around the lifetimes of input buffer.
129 // SAFETY: AsyncIoFut will only use the sync object if, and only if the buffer is
130 // to be written in this poll.
131 read.sync = Some(unsafe { &mut *(buf as *mut _) });
132
133 let read = unsafe { Pin::new_unchecked(read) };
134
135 break read.poll(cx).map(|v| {
136 this.read = None;
137 v.map_err(|_| std::io::ErrorKind::Other.into())
138 });
139 } else {
140 // SAFETY: on mfio_assume_linear_types, this is unsafe. Without the switch this is
141 // safe, because the buffer is stored in a sync variable that is only used whenever
142 // the I/O completes. That is processed in this poll function, and we update the
143 // sync at every iteration of the loop.
144 let buf = unsafe { &mut *(buf as *mut _) };
145 this.read = Some(stdeq::AsyncRead::read(this.io, buf));
146 }
147 }
148 }
149}
150
151impl<'a, Io: ?Sized + stdeq::AsyncWrite<u64>> AsyncWrite for Compat<'a, Io>
152where
153 u64: PosShift<Io>,
154{
155 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize>> {
156 let this = unsafe { self.get_unchecked_mut() };
157
158 loop {
159 if let Some(write) = this.write.as_mut() {
160 let write = unsafe { Pin::new_unchecked(write) };
161
162 break write.poll(cx).map(|v| {
163 this.write = None;
164 v.map_err(|_| std::io::ErrorKind::Other.into())
165 });
166 } else {
167 // SAFETY: on mfio_assume_linear_types, this is unsafe. Without the switch this is
168 // safe, because the buffer is transferred to an intermediate one before this
169 // function returns..
170 let buf = unsafe { &*(buf as *const _) };
171 this.write = Some(stdeq::AsyncWrite::write(this.io, buf));
172 }
173 }
174 }
175
176 fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<()>> {
177 // Completion of every request currently implies we've flushed.
178 // TODO: improve semantics
179 Poll::Ready(Ok(()))
180 }
181
182 fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<()>> {
183 // We currently imply that we can just close on drop.
184 // TODO: improve semantics
185 Poll::Ready(Ok(()))
186 }
187}
188
189impl<'a, Io: ?Sized + stdeq::StreamPos<u64>> AsyncSeek for Compat<'a, Io> {
190 fn poll_seek(self: Pin<&mut Self>, _: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> {
191 let this = unsafe { self.get_unchecked_mut() };
192 Poll::Ready(stdeq::std_seek(this.io, pos))
193 }
194}