1use std::fmt::{self, Debug, Formatter};
2use std::future::{self, Future};
3use std::io::{Cursor, Empty, Repeat, Result};
4use std::marker::PhantomData;
5use std::mem::MaybeUninit;
6use std::pin::Pin;
7use std::ptr::NonNull;
8use std::slice;
9use std::task::{Context, Poll};
10
11use completion_core::CompletionFuture;
12
13pub trait AsyncRead: for<'a> AsyncReadWith<'a> {}
19impl<T: for<'a> AsyncReadWith<'a> + ?Sized> AsyncRead for T {}
20
21pub trait AsyncReadWith<'a> {
23 type ReadFuture: CompletionFuture<Output = Result<()>>;
25
26 fn read(&'a mut self, buf: ReadBufMut<'a>) -> Self::ReadFuture;
31
32 }
34
35impl<'a, R: AsyncReadWith<'a> + ?Sized> AsyncReadWith<'a> for &mut R {
36 type ReadFuture = R::ReadFuture;
37
38 fn read(&'a mut self, buf: ReadBufMut<'a>) -> Self::ReadFuture {
39 (**self).read(buf)
40 }
41}
42
43impl<'a, R: AsyncReadWith<'a> + ?Sized> AsyncReadWith<'a> for Box<R> {
44 type ReadFuture = R::ReadFuture;
45
46 fn read(&'a mut self, buf: ReadBufMut<'a>) -> Self::ReadFuture {
47 (**self).read(buf)
48 }
49}
50
51impl<'a> AsyncReadWith<'a> for Empty {
52 type ReadFuture = future::Ready<Result<()>>;
53
54 fn read(&'a mut self, _buf: ReadBufMut<'a>) -> Self::ReadFuture {
55 future::ready(Ok(()))
56 }
57}
58
59impl<'a> AsyncReadWith<'a> for Repeat {
60 type ReadFuture = ReadRepeat<'a>;
61
62 fn read(&'a mut self, buf: ReadBufMut<'a>) -> Self::ReadFuture {
63 let mut byte = 0_u8;
64 std::io::Read::read(self, std::slice::from_mut(&mut byte)).unwrap();
65 ReadRepeat { byte, buf }
66 }
67}
68
69#[derive(Debug)]
71pub struct ReadRepeat<'a> {
72 byte: u8,
73 buf: ReadBufMut<'a>,
74}
75impl CompletionFuture for ReadRepeat<'_> {
76 type Output = Result<()>;
77
78 unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79 Future::poll(self, cx)
80 }
81 unsafe fn poll_cancel(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
82 Poll::Ready(())
83 }
84}
85impl Future for ReadRepeat<'_> {
86 type Output = Result<()>;
87
88 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
89 let remaining = self.buf.remaining();
90 unsafe {
91 self.buf
92 .unfilled_mut()
93 .as_mut_ptr()
94 .write_bytes(self.byte, remaining);
95 self.buf.assume_init(remaining);
96 };
97 self.buf.add_filled(remaining);
98 Poll::Ready(Ok(()))
99 }
100}
101
102#[test]
103fn test_read_repeat() {
104 let mut bytes = [MaybeUninit::uninit(); 13];
105 let mut buf = ReadBuf::uninit(&mut bytes);
106
107 futures_lite::future::block_on(std::io::repeat(185).read(buf.as_mut())).unwrap();
108
109 assert_eq!(buf.into_filled(), &[185; 13]);
110}
111
112impl<'a, 's> AsyncReadWith<'a> for &'s [u8] {
113 type ReadFuture = ReadSlice<'a, 's>;
114
115 fn read(&'a mut self, buf: ReadBufMut<'a>) -> Self::ReadFuture {
116 ReadSlice {
117 slice: unsafe { &mut *(self as *mut _) },
120 buf,
121 }
122 }
123}
124
125#[derive(Debug)]
127pub struct ReadSlice<'a, 's> {
128 slice: &'s mut &'s [u8],
131 buf: ReadBufMut<'a>,
132}
133impl Future for ReadSlice<'_, '_> {
134 type Output = Result<()>;
135
136 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
137 let amount = std::cmp::min(self.buf.remaining(), self.slice.len());
138 let (write, rest) = self.slice.split_at(amount);
139 self.buf.append(write);
140 *self.slice = rest;
141
142 Poll::Ready(Ok(()))
143 }
144}
145impl CompletionFuture for ReadSlice<'_, '_> {
146 type Output = Result<()>;
147
148 unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
149 Future::poll(self, cx)
150 }
151 unsafe fn poll_cancel(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
152 Poll::Ready(())
153 }
154}
155
156#[test]
157fn test_read_slice() {
158 futures_lite::future::block_on(async {
159 let mut bytes = [MaybeUninit::uninit(); 7];
160 let mut buf = ReadBuf::uninit(&mut bytes);
161
162 let mut slice: &[u8] = &[1, 2, 3, 4, 5];
163 slice.read(buf.as_mut()).await.unwrap();
164
165 assert_eq!(slice, &[]);
166 assert_eq!(buf.as_mut().filled(), &[1, 2, 3, 4, 5]);
167
168 let mut slice: &[u8] = &[6, 7, 8, 9, 10];
169 slice.read(buf.as_mut()).await.unwrap();
170
171 assert_eq!(slice, &[8, 9, 10]);
172 assert_eq!(buf.as_mut().filled(), &[1, 2, 3, 4, 5, 6, 7]);
173 });
174}
175
176impl<'a, T: AsRef<[u8]>> AsyncReadWith<'a> for Cursor<T> {
177 type ReadFuture = ReadCursor<'a, T>;
178
179 fn read(&'a mut self, buf: ReadBufMut<'a>) -> Self::ReadFuture {
180 ReadCursor { cursor: self, buf }
181 }
182}
183
184#[derive(Debug)]
186pub struct ReadCursor<'a, T> {
187 cursor: *mut Cursor<T>,
190 buf: ReadBufMut<'a>,
191}
192unsafe impl<T: Send> Send for ReadCursor<'_, T> {}
194unsafe impl<T: Sync> Sync for ReadCursor<'_, T> {}
195
196impl<T: AsRef<[u8]>> Future for ReadCursor<'_, T> {
197 type Output = Result<()>;
198
199 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
200 let cursor = unsafe { &mut *self.cursor };
201
202 let slice = std::io::BufRead::fill_buf(cursor)?;
203 let amount = std::cmp::min(self.buf.remaining(), slice.len());
204 self.buf.append(&slice[..amount]);
205 cursor.set_position(cursor.position() + amount as u64);
206
207 Poll::Ready(Ok(()))
208 }
209}
210impl<T: AsRef<[u8]>> CompletionFuture for ReadCursor<'_, T> {
211 type Output = Result<()>;
212
213 unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
214 Future::poll(self, cx)
215 }
216 unsafe fn poll_cancel(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
217 Poll::Ready(())
218 }
219}
220
221#[test]
222fn test_read_cursor() {
223 futures_lite::future::block_on(async {
224 let mut bytes = [MaybeUninit::uninit(); 7];
225 let mut buf = ReadBuf::uninit(&mut bytes);
226
227 let mut cursor = Cursor::new(vec![1, 2, 3, 4, 5]);
228 cursor.read(buf.as_mut()).await.unwrap();
229 assert_eq!(cursor.position(), 5);
230 assert_eq!(buf.as_mut().filled(), &[1, 2, 3, 4, 5]);
231
232 let mut cursor = Cursor::new(vec![6, 7, 8, 9, 10]);
233 cursor.read(buf.as_mut()).await.unwrap();
234 assert_eq!(cursor.position(), 2);
235 assert_eq!(buf.as_mut().filled(), &[1, 2, 3, 4, 5, 6, 7]);
236 });
237}
238
239#[cfg(test)]
240#[allow(dead_code, clippy::extra_unused_lifetimes)]
241fn test_impls_traits<'a>() {
242 fn assert_impls<R: AsyncRead>() {}
243
244 assert_impls::<Empty>();
245 assert_impls::<&'a mut Empty>();
246 assert_impls::<Box<Empty>>();
247 assert_impls::<&'a mut Box<&'a mut Empty>>();
248
249 assert_impls::<&'a [u8]>();
250 assert_impls::<&'a mut &'a [u8]>();
251
252 assert_impls::<Cursor<Vec<u8>>>();
253 assert_impls::<Cursor<&'a [u8]>>();
254 assert_impls::<&'a mut Cursor<&'a [u8]>>();
255}
256
257macro_rules! common_read_buf_methods {
259 ($get:expr, $get_mut:expr $(,)?) => {
260 #[inline]
262 #[must_use]
263 pub fn capacity(&self) -> usize {
264 $get(self).data.len()
265 }
266
267 #[inline]
269 #[must_use]
270 pub fn filled(&self) -> &[u8] {
271 let buf = $get(self);
272 unsafe { &*(buf.data.get_unchecked(..buf.filled) as *const _ as *const _) }
273 }
274
275 #[inline]
277 #[must_use]
278 pub fn filled_mut(&mut self) -> &mut [u8] {
279 let buf = unsafe { $get_mut(self) };
280 unsafe { &mut *(buf.data.get_unchecked_mut(..buf.filled) as *mut _ as *mut _) }
281 }
282
283 #[inline]
287 #[must_use]
288 pub fn initialized(&self) -> &[u8] {
289 let buf = $get(self);
290 unsafe { &*(buf.data.get_unchecked(..buf.initialized) as *const _ as *const _) }
291 }
292
293 #[inline]
297 pub fn initialized_mut(&mut self) -> &mut [u8] {
298 let buf = unsafe { $get_mut(self) };
299 unsafe { &mut *(buf.data.get_unchecked_mut(..buf.initialized) as *mut _ as *mut _) }
300 }
301
302 #[inline]
310 #[must_use]
311 pub unsafe fn unfilled_mut(&mut self) -> &mut [MaybeUninit<u8>] {
312 let buf = $get_mut(self);
313 buf.data.get_unchecked_mut(buf.filled..)
314 }
315
316 #[inline]
318 #[must_use]
319 pub fn all(&self) -> &[MaybeUninit<u8>] {
320 $get(self).data
321 }
322
323 #[inline]
330 #[must_use]
331 pub unsafe fn all_mut(&mut self) -> &mut [MaybeUninit<u8>] {
332 $get_mut(self).data
333 }
334
335 #[inline]
342 pub fn initialize_unfilled(&mut self) -> &mut [u8] {
343 self.initialize_unfilled_to(self.remaining())
344 }
345
346 #[inline]
353 pub fn initialize_unfilled_to(&mut self, n: usize) -> &mut [u8] {
354 assert!(
355 self.remaining() >= n,
356 "attempted to obtain more bytes than the buffer's capacity"
357 );
358
359 let buf = unsafe { $get_mut(self) };
360 let end = buf.filled + n;
361
362 if buf.initialized < end {
363 unsafe {
364 buf.data
365 .get_unchecked_mut(buf.initialized)
366 .as_mut_ptr()
367 .write_bytes(0, end - buf.initialized);
368 }
369 buf.initialized = end;
370 }
371
372 unsafe { &mut *(buf.data.get_unchecked_mut(buf.filled..end) as *mut _ as *mut _) }
373 }
374
375 #[inline]
377 #[must_use]
378 pub fn remaining(&self) -> usize {
379 self.capacity() - $get(self).filled
380 }
381
382 #[inline]
387 pub fn clear(&mut self) {
388 unsafe { $get_mut(self) }.filled = 0;
389 }
390
391 #[inline]
399 pub fn add_filled(&mut self, n: usize) {
400 let filled = $get(&*self).filled.checked_add(n).expect(
401 "attempted to increase the filled region of the buffer beyond the integer limit",
402 );
403 self.set_filled(filled);
404 }
405
406 #[inline]
417 pub fn set_filled(&mut self, n: usize) {
418 let buf = unsafe { $get_mut(self) };
419 assert!(
420 n <= buf.initialized,
421 "attempted to increase the filled region of the buffer beyond initialized region"
422 );
423
424 buf.filled = n;
425 }
426
427 #[inline]
437 pub unsafe fn assume_init(&mut self, n: usize) {
438 let buf = $get_mut(self);
439 let new = buf.filled + n;
440 if new > buf.initialized {
441 buf.initialized = n;
442 }
443 }
444
445 #[inline]
452 pub fn append(&mut self, other: &[u8]) {
453 assert!(
454 self.remaining() >= other.len(),
455 "attempted to append more bytes to the buffer than it has capacity for",
456 );
457
458 let buf = unsafe { $get_mut(self) };
459
460 let end = buf.filled + other.len();
461
462 unsafe {
463 buf.data
464 .get_unchecked_mut(buf.filled..end)
465 .as_mut_ptr()
466 .cast::<u8>()
467 .copy_from_nonoverlapping(other.as_ptr(), other.len())
468 }
469
470 if buf.initialized < end {
471 buf.initialized = end;
472 }
473 buf.filled = end;
474 }
475 };
476}
477
478pub struct ReadBuf<'a> {
480 data: &'a mut [MaybeUninit<u8>],
481 filled: usize,
483 initialized: usize,
485}
486
487impl<'a> ReadBuf<'a> {
488 #[inline]
490 pub fn new(buf: &'a mut [u8]) -> Self {
491 let initialized = buf.len();
492 Self {
493 data: unsafe { &mut *(buf as *mut _ as *mut [MaybeUninit<u8>]) },
494 filled: 0,
495 initialized,
496 }
497 }
498
499 #[inline]
504 pub fn uninit(data: &'a mut [MaybeUninit<u8>]) -> ReadBuf<'a> {
505 Self {
506 data,
507 filled: 0,
508 initialized: 0,
509 }
510 }
511
512 #[inline]
514 pub fn as_mut(&mut self) -> ReadBufMut<'_> {
515 ReadBufMut {
516 buf: NonNull::from(self),
517 _covariant: PhantomData,
518 }
519 }
520
521 #[inline]
523 #[must_use]
524 pub fn into_all(self) -> &'a mut [MaybeUninit<u8>] {
525 self.data
526 }
527
528 #[inline]
531 #[must_use]
532 pub fn into_parts(self) -> (&'a mut [u8], &'a mut [u8], &'a mut [MaybeUninit<u8>]) {
533 let len = self.data.len();
534 let ptr = self.data.as_mut_ptr();
535
536 unsafe {
537 (
538 slice::from_raw_parts_mut(ptr as *mut u8, self.filled),
539 slice::from_raw_parts_mut(
540 ptr.add(self.filled) as *mut u8,
541 self.initialized - self.filled,
542 ),
543 slice::from_raw_parts_mut(ptr.add(self.initialized), len - self.initialized),
544 )
545 }
546 }
547
548 #[inline]
550 #[must_use]
551 pub fn into_filled(self) -> &'a mut [u8] {
552 unsafe { &mut *(self.data.get_unchecked_mut(..self.filled) as *mut _ as *mut _) }
553 }
554}
555
556#[allow(unused_unsafe)]
558impl ReadBuf<'_> {
559 common_read_buf_methods!(std::convert::identity, std::convert::identity);
560}
561
562impl Debug for ReadBuf<'_> {
563 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
564 struct InitializedByte(u8);
565 impl Debug for InitializedByte {
566 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
567 write!(f, "({})", self.0)
568 }
569 }
570 struct Uninit;
571 impl Debug for Uninit {
572 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
573 f.write_str("-")
574 }
575 }
576
577 let mut list = f.debug_list();
578
579 for (i, byte) in self.data.iter().enumerate() {
580 if i < self.filled {
581 list.entry(&unsafe { byte.assume_init() });
582 } else if i < self.initialized {
583 list.entry(&InitializedByte(unsafe { byte.assume_init() }));
584 } else {
585 list.entry(&Uninit);
586 }
587 }
588
589 list.finish()
590 }
591}
592
593pub struct ReadBufMut<'a> {
597 buf: NonNull<ReadBuf<'a>>,
601 _covariant: PhantomData<&'a ()>,
604}
605
606unsafe impl Send for ReadBufMut<'_> {}
608unsafe impl Sync for ReadBufMut<'_> {}
609
610impl<'a> ReadBufMut<'a> {
611 #[inline]
613 #[must_use]
614 pub fn buf(&self) -> &ReadBuf<'a> {
615 unsafe {
616 self.buf.as_ref()
618 }
619 }
620
621 #[inline]
627 pub unsafe fn buf_mut(&mut self) -> &mut ReadBuf<'a> {
628 self.buf.as_mut()
629 }
630
631 #[inline]
637 #[must_use]
638 pub unsafe fn into_mut(self) -> &'a mut ReadBuf<'a> {
639 &mut *self.buf.as_ptr()
640 }
641
642 #[inline]
644 #[must_use]
645 pub fn as_mut(&mut self) -> ReadBufMut<'_> {
646 ReadBufMut {
647 buf: self.buf,
648 _covariant: PhantomData,
649 }
650 }
651}
652
653impl ReadBufMut<'_> {
655 common_read_buf_methods!(Self::buf, Self::buf_mut,);
656}
657
658impl Debug for ReadBufMut<'_> {
659 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
660 self.buf().fmt(f)
661 }
662}