completion_io/
read.rs

1use std::fmt::{self, Debug, Formatter};
2use std::future::{self, Future};
3use std::io::{Cursor, Empty, Repeat, Result};
4use std::marker::PhantomData;
5use std::mem::MaybeUninit;
6use std::pin::Pin;
7use std::ptr::NonNull;
8use std::slice;
9use std::task::{Context, Poll};
10
11use completion_core::CompletionFuture;
12
13/// Read bytes from a source asynchronously.
14///
15/// This is an asynchronous version of [`std::io::Read`].
16///
17/// You should not implement this trait manually, instead implement [`AsyncReadWith`].
18pub trait AsyncRead: for<'a> AsyncReadWith<'a> {}
19impl<T: for<'a> AsyncReadWith<'a> + ?Sized> AsyncRead for T {}
20
21/// Read bytes from a source asynchronously with a specific lifetime.
22pub trait AsyncReadWith<'a> {
23    /// The future that reads from the source.
24    type ReadFuture: CompletionFuture<Output = Result<()>>;
25
26    /// Pull some bytes from this source into the specified buffer.
27    ///
28    /// If this reads 0 bytes of data, either the buffer was 0 bytes in length or the stream has
29    /// reached EOF.
30    fn read(&'a mut self, buf: ReadBufMut<'a>) -> Self::ReadFuture;
31
32    // TODO: support vectored reads
33}
34
35impl<'a, R: AsyncReadWith<'a> + ?Sized> AsyncReadWith<'a> for &mut R {
36    type ReadFuture = R::ReadFuture;
37
38    fn read(&'a mut self, buf: ReadBufMut<'a>) -> Self::ReadFuture {
39        (**self).read(buf)
40    }
41}
42
43impl<'a, R: AsyncReadWith<'a> + ?Sized> AsyncReadWith<'a> for Box<R> {
44    type ReadFuture = R::ReadFuture;
45
46    fn read(&'a mut self, buf: ReadBufMut<'a>) -> Self::ReadFuture {
47        (**self).read(buf)
48    }
49}
50
51impl<'a> AsyncReadWith<'a> for Empty {
52    type ReadFuture = future::Ready<Result<()>>;
53
54    fn read(&'a mut self, _buf: ReadBufMut<'a>) -> Self::ReadFuture {
55        future::ready(Ok(()))
56    }
57}
58
59impl<'a> AsyncReadWith<'a> for Repeat {
60    type ReadFuture = ReadRepeat<'a>;
61
62    fn read(&'a mut self, buf: ReadBufMut<'a>) -> Self::ReadFuture {
63        let mut byte = 0_u8;
64        std::io::Read::read(self, std::slice::from_mut(&mut byte)).unwrap();
65        ReadRepeat { byte, buf }
66    }
67}
68
69/// Future for [`read`](AsyncReadWith::read) on a [`Repeat`].
70#[derive(Debug)]
71pub struct ReadRepeat<'a> {
72    byte: u8,
73    buf: ReadBufMut<'a>,
74}
75impl CompletionFuture for ReadRepeat<'_> {
76    type Output = Result<()>;
77
78    unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79        Future::poll(self, cx)
80    }
81    unsafe fn poll_cancel(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
82        Poll::Ready(())
83    }
84}
85impl Future for ReadRepeat<'_> {
86    type Output = Result<()>;
87
88    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
89        let remaining = self.buf.remaining();
90        unsafe {
91            self.buf
92                .unfilled_mut()
93                .as_mut_ptr()
94                .write_bytes(self.byte, remaining);
95            self.buf.assume_init(remaining);
96        };
97        self.buf.add_filled(remaining);
98        Poll::Ready(Ok(()))
99    }
100}
101
102#[test]
103fn test_read_repeat() {
104    let mut bytes = [MaybeUninit::uninit(); 13];
105    let mut buf = ReadBuf::uninit(&mut bytes);
106
107    futures_lite::future::block_on(std::io::repeat(185).read(buf.as_mut())).unwrap();
108
109    assert_eq!(buf.into_filled(), &[185; 13]);
110}
111
112impl<'a, 's> AsyncReadWith<'a> for &'s [u8] {
113    type ReadFuture = ReadSlice<'a, 's>;
114
115    fn read(&'a mut self, buf: ReadBufMut<'a>) -> Self::ReadFuture {
116        ReadSlice {
117            // Safety: We are extending the lifetime of the reference from 'a to 's. This is safe
118            // because the struct it is in only lives for as long as 'a.
119            slice: unsafe { &mut *(self as *mut _) },
120            buf,
121        }
122    }
123}
124
125/// Future for [`read`](AsyncReadWith::read) on a byte slice (`&[u8]`).
126#[derive(Debug)]
127pub struct ReadSlice<'a, 's> {
128    // This is conceptually an &'a mut &'s [u8]. However, that would add the implicit bound 's: 'a
129    // which is incompatible with AsyncReadWith.
130    slice: &'s mut &'s [u8],
131    buf: ReadBufMut<'a>,
132}
133impl Future for ReadSlice<'_, '_> {
134    type Output = Result<()>;
135
136    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
137        let amount = std::cmp::min(self.buf.remaining(), self.slice.len());
138        let (write, rest) = self.slice.split_at(amount);
139        self.buf.append(write);
140        *self.slice = rest;
141
142        Poll::Ready(Ok(()))
143    }
144}
145impl CompletionFuture for ReadSlice<'_, '_> {
146    type Output = Result<()>;
147
148    unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
149        Future::poll(self, cx)
150    }
151    unsafe fn poll_cancel(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
152        Poll::Ready(())
153    }
154}
155
156#[test]
157fn test_read_slice() {
158    futures_lite::future::block_on(async {
159        let mut bytes = [MaybeUninit::uninit(); 7];
160        let mut buf = ReadBuf::uninit(&mut bytes);
161
162        let mut slice: &[u8] = &[1, 2, 3, 4, 5];
163        slice.read(buf.as_mut()).await.unwrap();
164
165        assert_eq!(slice, &[]);
166        assert_eq!(buf.as_mut().filled(), &[1, 2, 3, 4, 5]);
167
168        let mut slice: &[u8] = &[6, 7, 8, 9, 10];
169        slice.read(buf.as_mut()).await.unwrap();
170
171        assert_eq!(slice, &[8, 9, 10]);
172        assert_eq!(buf.as_mut().filled(), &[1, 2, 3, 4, 5, 6, 7]);
173    });
174}
175
176impl<'a, T: AsRef<[u8]>> AsyncReadWith<'a> for Cursor<T> {
177    type ReadFuture = ReadCursor<'a, T>;
178
179    fn read(&'a mut self, buf: ReadBufMut<'a>) -> Self::ReadFuture {
180        ReadCursor { cursor: self, buf }
181    }
182}
183
184/// Future for [`read`](AsyncReadWith::read) on a [`Cursor`].
185#[derive(Debug)]
186pub struct ReadCursor<'a, T> {
187    // This is conceptually an &'a mut Cursor<T>. However, that would add the implicit bound T: 'a
188    // which is incompatible with AsyncReadWith.
189    cursor: *mut Cursor<T>,
190    buf: ReadBufMut<'a>,
191}
192// ReadBufMut is always Send+Sync, and we hold a mutable reference to Cursor.
193unsafe impl<T: Send> Send for ReadCursor<'_, T> {}
194unsafe impl<T: Sync> Sync for ReadCursor<'_, T> {}
195
196impl<T: AsRef<[u8]>> Future for ReadCursor<'_, T> {
197    type Output = Result<()>;
198
199    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
200        let cursor = unsafe { &mut *self.cursor };
201
202        let slice = std::io::BufRead::fill_buf(cursor)?;
203        let amount = std::cmp::min(self.buf.remaining(), slice.len());
204        self.buf.append(&slice[..amount]);
205        cursor.set_position(cursor.position() + amount as u64);
206
207        Poll::Ready(Ok(()))
208    }
209}
210impl<T: AsRef<[u8]>> CompletionFuture for ReadCursor<'_, T> {
211    type Output = Result<()>;
212
213    unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
214        Future::poll(self, cx)
215    }
216    unsafe fn poll_cancel(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
217        Poll::Ready(())
218    }
219}
220
221#[test]
222fn test_read_cursor() {
223    futures_lite::future::block_on(async {
224        let mut bytes = [MaybeUninit::uninit(); 7];
225        let mut buf = ReadBuf::uninit(&mut bytes);
226
227        let mut cursor = Cursor::new(vec![1, 2, 3, 4, 5]);
228        cursor.read(buf.as_mut()).await.unwrap();
229        assert_eq!(cursor.position(), 5);
230        assert_eq!(buf.as_mut().filled(), &[1, 2, 3, 4, 5]);
231
232        let mut cursor = Cursor::new(vec![6, 7, 8, 9, 10]);
233        cursor.read(buf.as_mut()).await.unwrap();
234        assert_eq!(cursor.position(), 2);
235        assert_eq!(buf.as_mut().filled(), &[1, 2, 3, 4, 5, 6, 7]);
236    });
237}
238
239#[cfg(test)]
240#[allow(dead_code, clippy::extra_unused_lifetimes)]
241fn test_impls_traits<'a>() {
242    fn assert_impls<R: AsyncRead>() {}
243
244    assert_impls::<Empty>();
245    assert_impls::<&'a mut Empty>();
246    assert_impls::<Box<Empty>>();
247    assert_impls::<&'a mut Box<&'a mut Empty>>();
248
249    assert_impls::<&'a [u8]>();
250    assert_impls::<&'a mut &'a [u8]>();
251
252    assert_impls::<Cursor<Vec<u8>>>();
253    assert_impls::<Cursor<&'a [u8]>>();
254    assert_impls::<&'a mut Cursor<&'a [u8]>>();
255}
256
257/// Macro to define the commend methods in both `ReadBuf` and `ReadBufMut`.
258macro_rules! common_read_buf_methods {
259    ($get:expr, $get_mut:expr $(,)?) => {
260        /// Get the total capacity of the buffer.
261        #[inline]
262        #[must_use]
263        pub fn capacity(&self) -> usize {
264            $get(self).data.len()
265        }
266
267        /// Get a shared reference to the filled portion of the buffer.
268        #[inline]
269        #[must_use]
270        pub fn filled(&self) -> &[u8] {
271            let buf = $get(self);
272            unsafe { &*(buf.data.get_unchecked(..buf.filled) as *const _ as *const _) }
273        }
274
275        /// Get a mutable reference to the filled portion of the buffer.
276        #[inline]
277        #[must_use]
278        pub fn filled_mut(&mut self) -> &mut [u8] {
279            let buf = unsafe { $get_mut(self) };
280            unsafe { &mut *(buf.data.get_unchecked_mut(..buf.filled) as *mut _ as *mut _) }
281        }
282
283        /// Get a shared reference to the initialized portion of the buffer.
284        ///
285        /// This includes the filled portion.
286        #[inline]
287        #[must_use]
288        pub fn initialized(&self) -> &[u8] {
289            let buf = $get(self);
290            unsafe { &*(buf.data.get_unchecked(..buf.initialized) as *const _ as *const _) }
291        }
292
293        /// Get a mutable reference to the initialized portion of the buffer.
294        ///
295        /// This includes the filled portion.
296        #[inline]
297        pub fn initialized_mut(&mut self) -> &mut [u8] {
298            let buf = unsafe { $get_mut(self) };
299            unsafe { &mut *(buf.data.get_unchecked_mut(..buf.initialized) as *mut _ as *mut _) }
300        }
301
302        /// Get a mutable reference to the unfilled part of the buffer without ensuring that it has
303        /// been fully initialized.
304        ///
305        /// # Safety
306        ///
307        /// The caller must not deinitialize portions of the buffer that have already been
308        /// initialized.
309        #[inline]
310        #[must_use]
311        pub unsafe fn unfilled_mut(&mut self) -> &mut [MaybeUninit<u8>] {
312            let buf = $get_mut(self);
313            buf.data.get_unchecked_mut(buf.filled..)
314        }
315
316        /// Get a shared reference to the entire backing buffer.
317        #[inline]
318        #[must_use]
319        pub fn all(&self) -> &[MaybeUninit<u8>] {
320            $get(self).data
321        }
322
323        /// Get a mutable reference to the entire backing buffer.
324        ///
325        /// # Safety
326        ///
327        /// The caller must not deinitialize portions of the buffer that have already been
328        /// initialized.
329        #[inline]
330        #[must_use]
331        pub unsafe fn all_mut(&mut self) -> &mut [MaybeUninit<u8>] {
332            $get_mut(self).data
333        }
334
335        /// Get a mutable reference to the unfilled part of the buffer, ensuring it is fully
336        ///
337        /// initialized.
338        ///
339        /// Since `ReadBuf` tracks the region of the buffer that has been initialized, this is
340        /// effectively "free" after the first use.
341        #[inline]
342        pub fn initialize_unfilled(&mut self) -> &mut [u8] {
343            self.initialize_unfilled_to(self.remaining())
344        }
345
346        /// Get a mutable reference to the first `n` bytes of the unfilled part of the buffer, ensuring
347        /// it is fully initialized.
348        ///
349        /// # Panics
350        ///
351        /// Panics if `self.remaining()` is less than `n`.
352        #[inline]
353        pub fn initialize_unfilled_to(&mut self, n: usize) -> &mut [u8] {
354            assert!(
355                self.remaining() >= n,
356                "attempted to obtain more bytes than the buffer's capacity"
357            );
358
359            let buf = unsafe { $get_mut(self) };
360            let end = buf.filled + n;
361
362            if buf.initialized < end {
363                unsafe {
364                    buf.data
365                        .get_unchecked_mut(buf.initialized)
366                        .as_mut_ptr()
367                        .write_bytes(0, end - buf.initialized);
368                }
369                buf.initialized = end;
370            }
371
372            unsafe { &mut *(buf.data.get_unchecked_mut(buf.filled..end) as *mut _ as *mut _) }
373        }
374
375        /// Get the number of bytes at the end of the slice that have not yet been filled.
376        #[inline]
377        #[must_use]
378        pub fn remaining(&self) -> usize {
379            self.capacity() - $get(self).filled
380        }
381
382        /// Clear the buffer, resetting the filled region to empty.
383        ///
384        /// The number of initialized bytes is not changed, and the contents of the buffer is not
385        /// modified.
386        #[inline]
387        pub fn clear(&mut self) {
388            unsafe { $get_mut(self) }.filled = 0;
389        }
390
391        /// Increase the size of the filled region of the buffer by `n` bytes.
392        ///
393        /// The number of initialized bytes is not changed.
394        ///
395        /// # Panics
396        ///
397        /// Panics if the filled region of the buffer would become larger than the initialized region.
398        #[inline]
399        pub fn add_filled(&mut self, n: usize) {
400            let filled = $get(&*self).filled.checked_add(n).expect(
401                "attempted to increase the filled region of the buffer beyond the integer limit",
402            );
403            self.set_filled(filled);
404        }
405
406        /// Set the size of the filled region of the buffer to `n`.
407        ///
408        /// The number of initialized bytes is not changed.
409        ///
410        /// Note that this can be used to *shrink* the filled region of the buffer in addition to
411        /// growing it (for example, by a `Read` implementation that compresses data in-place).
412        ///
413        /// # Panics
414        ///
415        /// Panics if the filled region of the buffer would become larger than the initialized region.
416        #[inline]
417        pub fn set_filled(&mut self, n: usize) {
418            let buf = unsafe { $get_mut(self) };
419            assert!(
420                n <= buf.initialized,
421                "attempted to increase the filled region of the buffer beyond initialized region"
422            );
423
424            buf.filled = n;
425        }
426
427        /// Asserts that the first `n` unfilled bytes of the buffer are initialized.
428        ///
429        /// `ReadBuf` assumes that bytes are never deinitialized, so this method does nothing when
430        /// called with fewer bytes than are already known to be initialized.
431        ///
432        /// # Safety
433        ///
434        /// The caller must ensure that the first `n` unfilled bytes of the buffer have already been
435        /// initialized.
436        #[inline]
437        pub unsafe fn assume_init(&mut self, n: usize) {
438            let buf = $get_mut(self);
439            let new = buf.filled + n;
440            if new > buf.initialized {
441                buf.initialized = n;
442            }
443        }
444
445        /// Appends data to the buffer, advancing the written position and possibly also the initialized
446        /// position.
447        ///
448        /// # Panics
449        ///
450        /// Panics if `self.remaining()` is less than `other.len()`.
451        #[inline]
452        pub fn append(&mut self, other: &[u8]) {
453            assert!(
454                self.remaining() >= other.len(),
455                "attempted to append more bytes to the buffer than it has capacity for",
456            );
457
458            let buf = unsafe { $get_mut(self) };
459
460            let end = buf.filled + other.len();
461
462            unsafe {
463                buf.data
464                    .get_unchecked_mut(buf.filled..end)
465                    .as_mut_ptr()
466                    .cast::<u8>()
467                    .copy_from_nonoverlapping(other.as_ptr(), other.len())
468            }
469
470            if buf.initialized < end {
471                buf.initialized = end;
472            }
473            buf.filled = end;
474        }
475    };
476}
477
478/// A wrapper around a byte buffer that is incrementally filled and initialized.
479pub struct ReadBuf<'a> {
480    data: &'a mut [MaybeUninit<u8>],
481    /// The index up to which the buffer has been filled with meaningful data.
482    filled: usize,
483    /// The index up to which the buffer's data is initialized with useless data.
484    initialized: usize,
485}
486
487impl<'a> ReadBuf<'a> {
488    /// Create a new `ReadBuf` from a fully initialized buffer.
489    #[inline]
490    pub fn new(buf: &'a mut [u8]) -> Self {
491        let initialized = buf.len();
492        Self {
493            data: unsafe { &mut *(buf as *mut _ as *mut [MaybeUninit<u8>]) },
494            filled: 0,
495            initialized,
496        }
497    }
498
499    /// Create a new `ReadBuf` from a fully uninitialized buffer.
500    ///
501    /// Use [`assume_init`](ReadBufMut::assume_init) if part of the buffer is known to be already
502    /// initialized.
503    #[inline]
504    pub fn uninit(data: &'a mut [MaybeUninit<u8>]) -> ReadBuf<'a> {
505        Self {
506            data,
507            filled: 0,
508            initialized: 0,
509        }
510    }
511
512    /// Get a mutable reference to this buffer as a [`ReadBufMut`].
513    #[inline]
514    pub fn as_mut(&mut self) -> ReadBufMut<'_> {
515        ReadBufMut {
516            buf: NonNull::from(self),
517            _covariant: PhantomData,
518        }
519    }
520
521    /// Consume the buffer, returning the entire partially initialized backing slice.
522    #[inline]
523    #[must_use]
524    pub fn into_all(self) -> &'a mut [MaybeUninit<u8>] {
525        self.data
526    }
527
528    /// Consume the buffer, returning its three parts, the filled portion, the unfilled portion and
529    /// the uninitialized portion.
530    #[inline]
531    #[must_use]
532    pub fn into_parts(self) -> (&'a mut [u8], &'a mut [u8], &'a mut [MaybeUninit<u8>]) {
533        let len = self.data.len();
534        let ptr = self.data.as_mut_ptr();
535
536        unsafe {
537            (
538                slice::from_raw_parts_mut(ptr as *mut u8, self.filled),
539                slice::from_raw_parts_mut(
540                    ptr.add(self.filled) as *mut u8,
541                    self.initialized - self.filled,
542                ),
543                slice::from_raw_parts_mut(ptr.add(self.initialized), len - self.initialized),
544            )
545        }
546    }
547
548    /// Consume the buffer, returning its filled portion.
549    #[inline]
550    #[must_use]
551    pub fn into_filled(self) -> &'a mut [u8] {
552        unsafe { &mut *(self.data.get_unchecked_mut(..self.filled) as *mut _ as *mut _) }
553    }
554}
555
556/// These methods are also present on [`ReadBufMut`].
557#[allow(unused_unsafe)]
558impl ReadBuf<'_> {
559    common_read_buf_methods!(std::convert::identity, std::convert::identity);
560}
561
562impl Debug for ReadBuf<'_> {
563    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
564        struct InitializedByte(u8);
565        impl Debug for InitializedByte {
566            fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
567                write!(f, "({})", self.0)
568            }
569        }
570        struct Uninit;
571        impl Debug for Uninit {
572            fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
573                f.write_str("-")
574            }
575        }
576
577        let mut list = f.debug_list();
578
579        for (i, byte) in self.data.iter().enumerate() {
580            if i < self.filled {
581                list.entry(&unsafe { byte.assume_init() });
582            } else if i < self.initialized {
583                list.entry(&InitializedByte(unsafe { byte.assume_init() }));
584            } else {
585                list.entry(&Uninit);
586            }
587        }
588
589        list.finish()
590    }
591}
592
593/// A type that grants mutable access to a [`ReadBuf`].
594///
595/// You can create this by calling [`ReadBuf::as_mut`].
596pub struct ReadBufMut<'a> {
597    /// This is a `NonNull` to allow `'a` to be covariant. As a safety invariant, this must _never_
598    /// be moved out of and the inner buffer's pointer to the bytes must never be changed (as the
599    /// `'a` lifetime could be shorter than the actual lifetime of the `ReadBuf`/bytes).
600    buf: NonNull<ReadBuf<'a>>,
601    /// Even though we hold a mutable reference to `ReadBuf`, it is safe to be covariant as we
602    /// never reassign the buffer, or change the buffer's pointer to the bytes.
603    _covariant: PhantomData<&'a ()>,
604}
605
606// We effectively hold an `&'a mut ReadBuf<'a>` which is Send and Sync.
607unsafe impl Send for ReadBufMut<'_> {}
608unsafe impl Sync for ReadBufMut<'_> {}
609
610impl<'a> ReadBufMut<'a> {
611    /// Get a shared reference to the internal buffer.
612    #[inline]
613    #[must_use]
614    pub fn buf(&self) -> &ReadBuf<'a> {
615        unsafe {
616            // Safety: You cannot move out of a shared reference.
617            self.buf.as_ref()
618        }
619    }
620
621    /// Get a mutable reference to the internal buffer.
622    ///
623    /// # Safety
624    ///
625    /// This must not be moved out of, and the buffer's pointer to the bytes must not be changed.
626    #[inline]
627    pub unsafe fn buf_mut(&mut self) -> &mut ReadBuf<'a> {
628        self.buf.as_mut()
629    }
630
631    /// Convert this type to a mutable reference to the internal buffer.
632    ///
633    /// # Safety
634    ///
635    /// This must not be moved out of, and the buffer's pointer to the bytes must not be changed.
636    #[inline]
637    #[must_use]
638    pub unsafe fn into_mut(self) -> &'a mut ReadBuf<'a> {
639        &mut *self.buf.as_ptr()
640    }
641
642    /// Borrow the buffer, rather than consuming it.
643    #[inline]
644    #[must_use]
645    pub fn as_mut(&mut self) -> ReadBufMut<'_> {
646        ReadBufMut {
647            buf: self.buf,
648            _covariant: PhantomData,
649        }
650    }
651}
652
653/// These methods are also present on [`ReadBuf`].
654impl ReadBufMut<'_> {
655    common_read_buf_methods!(Self::buf, Self::buf_mut,);
656}
657
658impl Debug for ReadBufMut<'_> {
659    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
660        self.buf().fmt(f)
661    }
662}