nanopore/io/
mod.rs

1#[cfg(target_os = "linux")]
2#[path = "backend/io_uring.rs"]
3mod backend;
4
5#[cfg(windows)]
6#[path = "backend/overlapped_io.rs"]
7mod backend;
8
9pub mod request;
10pub mod response;
11
12use std::future::Future;
13use std::hash::{Hash, Hasher};
14#[doc(no_inline)]
15pub use std::io::{Error, Result};
16use std::pin::Pin;
17use std::rc::{Rc, Weak};
18use std::task::{Context, Poll};
19use std::{fmt, mem, ptr};
20
21use crate::task::Handle;
22
23pub use backend::{Event, Timer};
24#[doc(hidden)]
25pub use backend::{RawAsyncFile, RawAsyncSocket};
26pub use poller::Poller;
27pub use request::Request;
28pub use response::Response;
29
30pub mod poller {
31    pub use super::backend::{Config, Poller};
32}
33
34pub struct RawCompletion(Rc<()>);
35
36impl fmt::Debug for RawCompletion {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        write!(f, "RawCompletionRef({:?})", ptr::from_ref(self.0.as_ref()))
39    }
40}
41
42impl RawCompletion {
43    pub(crate) fn new() -> RawCompletion {
44        RawCompletion(Rc::new(()))
45    }
46
47    pub(crate) fn downgrade(&self) -> RawCompletionRef {
48        RawCompletionRef(Rc::downgrade(&self.0))
49    }
50}
51
52impl PartialEq for RawCompletion {
53    fn eq(&self, other: &Self) -> bool {
54        ptr::eq(self.0.as_ref(), other.0.as_ref())
55    }
56}
57
58impl Eq for RawCompletion {}
59
60impl Hash for RawCompletion {
61    fn hash<H: Hasher>(&self, state: &mut H) {
62        ptr::from_ref(self.0.as_ref()).hash(state);
63    }
64}
65
66#[derive(Clone)]
67pub(crate) struct RawCompletionRef(Weak<()>);
68
69impl fmt::Debug for RawCompletionRef {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        write!(f, "RawCompletionRef({:?})", self.0.as_ptr())
72    }
73}
74
75impl RawCompletionRef {
76    #[inline]
77    pub fn upgrade(&self) -> Option<RawCompletion> {
78        self.0.upgrade().map(RawCompletion)
79    }
80}
81
82impl PartialEq for RawCompletionRef {
83    fn eq(&self, other: &Self) -> bool {
84        self.0.ptr_eq(&other.0)
85    }
86}
87
88impl Eq for RawCompletionRef {}
89
90impl Hash for RawCompletionRef {
91    fn hash<H: Hasher>(&self, state: &mut H) {
92        self.0.as_ptr().hash(state);
93    }
94}
95
96impl RawCompletionRef {
97    #[inline]
98    pub fn into_raw(self) -> *const () {
99        self.0.into_raw()
100    }
101
102    #[inline]
103    pub unsafe fn from_raw(ptr: *const ()) -> RawCompletionRef {
104        RawCompletionRef(unsafe { Weak::from_raw(ptr) })
105    }
106}
107
108#[derive(Debug)]
109pub enum CompletionState {
110    NotInitiated(Request),
111    InFlight(RawCompletion),
112    Finished,
113}
114
115#[must_use]
116#[derive(Debug)]
117pub struct Completion {
118    state: CompletionState,
119    handle: Handle,
120}
121
122impl Completion {
123    #[inline]
124    pub(crate) fn new(handle: Handle, request: Request) -> Completion {
125        Completion {
126            state: CompletionState::NotInitiated(request),
127            handle,
128        }
129    }
130
131    #[inline]
132    pub fn associated_runtime(&self) -> Handle {
133        self.handle.clone()
134    }
135
136    #[inline]
137    pub fn state(self) -> CompletionState {
138        self.state
139    }
140}
141
142impl Future for Completion {
143    type Output = Result<Response>;
144
145    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Response>> {
146        let Some(task) = self.handle.wrap_waker(cx.waker().clone()) else {
147            return Poll::Pending;
148        };
149
150        match mem::replace(&mut self.state, CompletionState::Finished) {
151            CompletionState::NotInitiated(request) => {
152                if let Ok(comp) = self
153                    .handle
154                    .with_poller(|mut poller| poller.submit_request(request, Some(task)))
155                {
156                    self.state = CompletionState::InFlight(comp);
157                }
158                Poll::Pending
159            }
160            CompletionState::InFlight(comp) => {
161                if let Ok(r) = self
162                    .handle
163                    .with_poller(|mut poller| poller.poll_response(comp, Some(task)))
164                {
165                    match r {
166                        Ok(r) => return Poll::Ready(r),
167                        Err(comp) => {
168                            self.state = CompletionState::InFlight(comp);
169                        }
170                    }
171                }
172                Poll::Pending
173            }
174            CompletionState::Finished => {
175                // finished or shutdown
176                Poll::Pending
177            }
178        }
179    }
180}
181
182#[cfg(feature = "io")]
183mod traits {
184    use std::future::Future;
185
186    use bytes::{Bytes, BytesMut};
187
188    use super::Result;
189
190    pub const DEFAULT_BUFFER_SIZE: usize = 8 * 1024;
191
192    pub trait AsyncRead {
193        type Future: Future<Output = Result<(BytesMut, usize)>>;
194        fn read(&self, buf: BytesMut) -> Self::Future;
195    }
196
197    pub trait AsyncReadOverlapped {
198        type Future: Future<Output = Result<(BytesMut, usize)>>;
199        fn read_at(&self, buf: BytesMut, offset: u64) -> Self::Future;
200    }
201
202    pub trait AsyncReadVectored {
203        type Future: Future<Output = Result<(Vec<BytesMut>, usize)>>;
204        fn read_vectored(&self, buffers: Vec<BytesMut>) -> Self::Future;
205    }
206
207    pub trait AsyncWrite {
208        type Future: Future<Output = Result<usize>>;
209        fn write(&self, buf: Bytes) -> Self::Future;
210    }
211
212    pub trait AsyncWriteOverlapped {
213        type Future: Future<Output = Result<usize>>;
214        fn write_at(&self, buf: Bytes, offset: u64) -> Self::Future;
215    }
216
217    pub trait AsyncWriteVectored {
218        type Future: Future<Output = Result<usize>>;
219        fn write_vectored(&self, buffers: Vec<Bytes>) -> Self::Future;
220    }
221
222    pub trait AsyncReadExt: AsyncRead {
223        fn read_appending(
224            &self,
225            bytes: BytesMut,
226            len: usize,
227        ) -> future::ReadAppending<Self::Future>;
228
229        fn read_all_appending(&self, bytes: BytesMut) -> future::ReadAllAppending<'_, Self>;
230
231        fn read_exact(&self, buf: BytesMut) -> future::ReadExact<'_, Self>;
232
233        fn read_to_bytes(&self, len: usize) -> future::ReadToBytes<Self::Future>;
234
235        fn read_all_to_bytes(&self) -> future::ReadAllToBytes<'_, Self>;
236
237        fn read_exact_to_bytes(&self, len: usize) -> future::ReadExactToBytes<'_, Self>;
238    }
239
240    pub trait AsyncReadOverlappedExt: AsyncReadOverlapped {
241        fn read_exact_at(&self, buf: BytesMut, offset: u64) -> future::ReadExactAt<'_, Self>;
242
243        fn read_to_bytes_at(&self, len: usize, offset: u64) -> future::ReadToBytesAt<Self::Future>;
244
245        fn read_exact_to_bytes_at(
246            &self,
247            len: usize,
248            offset: u64,
249        ) -> future::ReadExactToBytesAt<'_, Self>;
250    }
251
252    pub trait AsyncWriteExt: AsyncWrite {
253        fn write_all(&self, bytes: Bytes) -> future::WriteAll<'_, Self>;
254    }
255
256    pub trait AsyncWriteOverlappedExt: AsyncWriteOverlapped {
257        fn write_all_at(&self, bytes: Bytes, offset: u64) -> future::WriteAllAt<'_, Self>;
258    }
259
260    impl<A: AsyncRead> AsyncReadExt for A {
261        fn read_appending(
262            &self,
263            mut bytes: BytesMut,
264            len: usize,
265        ) -> future::ReadAppending<Self::Future> {
266            let orig_len = bytes.len();
267            let new_len = orig_len + len;
268            bytes.reserve(len);
269            unsafe { bytes.set_len(new_len) };
270            let buf = bytes.split_off(orig_len);
271            let fut = self.read(buf);
272            future::ReadAppending::new(fut, bytes)
273        }
274
275        fn read_all_appending(&self, bytes: BytesMut) -> future::ReadAllAppending<'_, Self> {
276            future::ReadAllAppending::new(self, bytes)
277        }
278
279        fn read_exact(&self, buf: BytesMut) -> future::ReadExact<'_, Self> {
280            future::ReadExact::new(self, buf)
281        }
282
283        fn read_to_bytes(&self, len: usize) -> future::ReadToBytes<Self::Future> {
284            let mut buf = BytesMut::with_capacity(len);
285            unsafe { buf.set_len(len) };
286            future::ReadToBytes::new(self.read(buf))
287        }
288
289        fn read_all_to_bytes(&self) -> future::ReadAllToBytes<'_, Self> {
290            let bytes = BytesMut::new();
291            future::ReadAllToBytes::new(future::ReadAllAppending::new(self, bytes))
292        }
293
294        fn read_exact_to_bytes(&self, len: usize) -> future::ReadExactToBytes<'_, Self> {
295            let mut buf = BytesMut::with_capacity(len);
296            unsafe { buf.set_len(len) };
297            future::ReadExactToBytes::new(future::ReadExact::new(self, buf))
298        }
299    }
300
301    impl<A: AsyncReadOverlapped> AsyncReadOverlappedExt for A {
302        fn read_exact_at(&self, buf: BytesMut, offset: u64) -> future::ReadExactAt<'_, Self> {
303            future::ReadExactAt::new(self, buf, offset)
304        }
305
306        fn read_to_bytes_at(&self, len: usize, offset: u64) -> future::ReadToBytesAt<Self::Future> {
307            let mut buf = BytesMut::with_capacity(len);
308            unsafe { buf.set_len(len) };
309            future::ReadToBytesAt::new(self.read_at(buf, offset))
310        }
311
312        fn read_exact_to_bytes_at(
313            &self,
314            len: usize,
315            offset: u64,
316        ) -> future::ReadExactToBytesAt<'_, Self> {
317            let mut buf = BytesMut::with_capacity(len);
318            unsafe { buf.set_len(len) };
319            future::ReadExactToBytesAt::new(future::ReadExactAt::new(self, buf, offset))
320        }
321    }
322
323    impl<W: AsyncWrite> AsyncWriteExt for W {
324        fn write_all(&self, bytes: Bytes) -> future::WriteAll<'_, Self> {
325            future::WriteAll::new(self, bytes)
326        }
327    }
328
329    impl<W: AsyncWriteOverlapped> AsyncWriteOverlappedExt for W {
330        fn write_all_at(&self, bytes: Bytes, offset: u64) -> future::WriteAllAt<'_, Self> {
331            future::WriteAllAt::new(self, bytes, offset)
332        }
333    }
334
335    #[expect(clippy::multiple_bound_locations)]
336    pub mod future {
337        use std::fmt;
338        use std::future::Future;
339        use std::mem::{replace, take};
340        use std::pin::Pin;
341        use std::task::{ready, Context, Poll};
342
343        use bytes::{Bytes, BytesMut};
344        use pin_project_lite::pin_project;
345
346        use crate::io::{
347            AsyncRead, AsyncReadExt, AsyncReadOverlapped, AsyncWrite, AsyncWriteOverlapped, Error,
348            Result, DEFAULT_BUFFER_SIZE,
349        };
350
351        pin_project! {
352            #[derive(Debug)]
353            pub struct ReadAppending<F> {
354                #[pin]
355                inner: F,
356                buf0: BytesMut,
357            }
358        }
359
360        impl<F: Future<Output = Result<(BytesMut, usize)>>> ReadAppending<F> {
361            pub(super) fn new(inner: F, buf: BytesMut) -> ReadAppending<F> {
362                ReadAppending { inner, buf0: buf }
363            }
364        }
365
366        impl<F: Future<Output = Result<(BytesMut, usize)>>> Future for ReadAppending<F> {
367            type Output = (BytesMut, Result<usize>);
368
369            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(BytesMut, Result<usize>)> {
370                let this = self.project();
371                Poll::Ready(match ready!(this.inner.poll(cx)) {
372                    Ok((buf, bytes_read)) => {
373                        let mut buf0 = take(this.buf0);
374                        let orig_len = buf0.len();
375                        let new_len = orig_len + bytes_read;
376                        buf0.unsplit(buf);
377                        debug_assert!(buf0.len() >= new_len);
378                        unsafe { buf0.set_len(new_len) };
379                        (buf0, Ok(bytes_read))
380                    }
381                    Err(e) => {
382                        let buf0 = take(this.buf0);
383                        (buf0, Err(e))
384                    }
385                })
386            }
387        }
388
389        pin_project! {
390            #[derive(Debug)]
391            pub struct ReadAllAppending<'a, A: ?Sized> where A: AsyncRead {
392                #[pin]
393                inner: ReadAppending<A::Future>,
394                bytes_read: usize,
395                reader: &'a A,
396                buf0: BytesMut,
397                current_finished: bool,
398            }
399        }
400
401        impl<'a, F, A: AsyncRead<Future = F>> ReadAllAppending<'a, A> {
402            pub(super) fn new(reader: &'a A, buf: BytesMut) -> Self {
403                let mut buf0 = buf;
404                Self {
405                    inner: Self::create_read_future(reader, &mut buf0),
406                    bytes_read: 0,
407                    reader,
408                    buf0,
409                    current_finished: false,
410                }
411            }
412
413            fn create_read_future(reader: &'a A, buf0: &mut BytesMut) -> ReadAppending<F> {
414                reader.read_appending(take(buf0), DEFAULT_BUFFER_SIZE)
415            }
416        }
417
418        impl<'a, A: AsyncRead> Future for ReadAllAppending<'a, A> {
419            type Output = (BytesMut, Result<usize>);
420
421            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(BytesMut, Result<usize>)> {
422                let mut this = self.project();
423                if replace(this.current_finished, false) {
424                    this.inner
425                        .set(Self::create_read_future(this.reader, this.buf0));
426                }
427                match ready!(this.inner.poll(cx)) {
428                    (bytes, r @ Err(_)) => Poll::Ready((bytes, r)),
429                    (bytes, Ok(0)) => Poll::Ready((bytes, Ok(*this.bytes_read))),
430                    (bytes, Ok(inc)) => {
431                        *this.bytes_read += inc;
432                        *this.buf0 = bytes;
433                        *this.current_finished = true;
434                        cx.waker().wake_by_ref();
435                        Poll::Pending
436                    }
437                }
438            }
439        }
440
441        pin_project! {
442            #[derive(Debug)]
443            pub struct ReadExact<'a, A: ?Sized> where A: AsyncRead {
444                #[pin]
445                inner: A::Future,
446                bytes_read: usize,
447                reader: &'a A,
448                buf0: BytesMut,
449                current_finished: bool,
450            }
451        }
452
453        impl<'a, F, A: AsyncRead<Future = F>> ReadExact<'a, A> {
454            pub(super) fn new(reader: &'a A, buf: BytesMut) -> Self {
455                let mut buf0 = buf;
456                let bytes_read = 0;
457                Self {
458                    inner: Self::create_read_future(reader, &mut buf0, bytes_read),
459                    bytes_read,
460                    reader,
461                    buf0,
462                    current_finished: false,
463                }
464            }
465
466            fn create_read_future(reader: &'a A, buf0: &mut BytesMut, bytes_read: usize) -> F {
467                let buf = buf0.split_off(bytes_read);
468                reader.read(buf)
469            }
470        }
471
472        impl<'a, A: AsyncRead> Future for ReadExact<'a, A> {
473            type Output = Result<BytesMut>;
474
475            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<BytesMut>> {
476                let mut this = self.project();
477                if replace(this.current_finished, false) {
478                    this.inner.set(Self::create_read_future(
479                        this.reader,
480                        this.buf0,
481                        *this.bytes_read,
482                    ));
483                }
484                match ready!(this.inner.poll(cx)) {
485                    Err(e) => Poll::Ready(Err(e)),
486                    Ok((buf, bytes_read)) => {
487                        this.buf0.unsplit(buf);
488                        *this.bytes_read += bytes_read;
489                        if *this.bytes_read < this.buf0.len() {
490                            if bytes_read == 0 {
491                                Poll::Ready(Err(Error::new(
492                                    std::io::ErrorKind::UnexpectedEof,
493                                    "unexpected eof",
494                                )))
495                            } else {
496                                *this.current_finished = true;
497                                cx.waker().wake_by_ref();
498                                Poll::Pending
499                            }
500                        } else {
501                            Poll::Ready(Ok(take(this.buf0)))
502                        }
503                    }
504                }
505            }
506        }
507
508        pin_project! {
509            #[derive(Debug)]
510            pub struct ReadExactAt<'a, A: ?Sized> where A: AsyncReadOverlapped {
511                #[pin]
512                inner: A::Future,
513                bytes_read: usize,
514                reader: &'a A,
515                buf0: BytesMut,
516                offset: u64,
517                current_finished: bool,
518            }
519        }
520
521        impl<'a, F, A: AsyncReadOverlapped<Future = F>> ReadExactAt<'a, A> {
522            pub(super) fn new(reader: &'a A, buf: BytesMut, offset: u64) -> Self {
523                let mut buf0 = buf;
524                let bytes_read = 0;
525                Self {
526                    inner: Self::create_read_future(reader, &mut buf0, offset, bytes_read).unwrap(),
527                    bytes_read,
528                    reader,
529                    buf0,
530                    offset,
531                    current_finished: false,
532                }
533            }
534
535            fn create_read_future(
536                reader: &'a A,
537                buf0: &mut BytesMut,
538                offset: u64,
539                bytes_read: usize,
540            ) -> Result<F> {
541                let Some(new_offset) = offset.checked_add(bytes_read as _) else {
542                    return Err(Error::new(
543                        std::io::ErrorKind::InvalidInput,
544                        "offset out of bound",
545                    ));
546                };
547                let buf = buf0.split_off(bytes_read);
548                Ok(reader.read_at(buf, new_offset))
549            }
550        }
551
552        impl<'a, A: AsyncReadOverlapped> Future for ReadExactAt<'a, A> {
553            type Output = Result<BytesMut>;
554
555            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<BytesMut>> {
556                let mut this = self.project();
557                if replace(this.current_finished, false) {
558                    this.inner.set(
559                        match Self::create_read_future(
560                            this.reader,
561                            this.buf0,
562                            *this.offset,
563                            *this.bytes_read,
564                        ) {
565                            Ok(fut) => fut,
566                            Err(e) => return Poll::Ready(Err(e)),
567                        },
568                    );
569                }
570                match ready!(this.inner.poll(cx)) {
571                    Err(e) => Poll::Ready(Err(e)),
572                    Ok((buf, bytes_read)) => {
573                        this.buf0.unsplit(buf);
574                        *this.bytes_read += bytes_read;
575                        if *this.bytes_read < this.buf0.len() {
576                            if bytes_read == 0 {
577                                Poll::Ready(Err(Error::new(
578                                    std::io::ErrorKind::UnexpectedEof,
579                                    "unexpected eof",
580                                )))
581                            } else {
582                                *this.current_finished = true;
583                                cx.waker().wake_by_ref();
584                                Poll::Pending
585                            }
586                        } else {
587                            Poll::Ready(Ok(take(this.buf0)))
588                        }
589                    }
590                }
591            }
592        }
593
594        pin_project! {
595            #[derive(Debug)]
596            pub struct ReadToBytesAt<F> {
597                #[pin]
598                inner: F,
599            }
600        }
601
602        pub use ReadToBytesAt as ReadToBytes;
603
604        impl<F: Future<Output = Result<(BytesMut, usize)>>> ReadToBytesAt<F> {
605            pub(super) fn new(inner: F) -> Self {
606                Self { inner }
607            }
608        }
609
610        impl<F: Future<Output = Result<(BytesMut, usize)>>> Future for ReadToBytesAt<F> {
611            type Output = Result<Bytes>;
612
613            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Bytes>> {
614                Poll::Ready(match ready!(self.project().inner.poll(cx)) {
615                    Ok((mut buf, bytes_read)) => {
616                        debug_assert!(bytes_read <= buf.len());
617                        unsafe { buf.set_len(bytes_read) };
618                        Ok(buf.freeze())
619                    }
620                    Err(e) => Err(e),
621                })
622            }
623        }
624
625        pin_project! {
626            pub struct ReadAllToBytes<'a, A: ?Sized> where A: AsyncRead {
627                #[pin]
628                inner: ReadAllAppending<'a, A>,
629            }
630        }
631
632        impl<'a, A: AsyncRead<Future = F> + fmt::Debug, F: fmt::Debug> fmt::Debug
633            for ReadAllToBytes<'a, A>
634        {
635            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
636                self.inner.fmt(f)
637            }
638        }
639
640        impl<'a, A: AsyncRead> ReadAllToBytes<'a, A> {
641            pub(super) fn new(inner: ReadAllAppending<'a, A>) -> Self {
642                Self { inner }
643            }
644        }
645
646        impl<'a, A: AsyncRead> Future for ReadAllToBytes<'a, A> {
647            type Output = Result<Bytes>;
648
649            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Bytes>> {
650                Poll::Ready(match ready!(self.project().inner.poll(cx)) {
651                    (buf, Ok(_)) => Ok(buf.freeze()),
652                    (_, Err(e)) => Err(e),
653                })
654            }
655        }
656
657        pin_project! {
658            pub struct ReadExactToBytes<'a, A: ?Sized> where A: AsyncRead {
659                #[pin]
660                inner: ReadExact<'a, A>,
661            }
662        }
663
664        impl<'a, A: AsyncRead<Future = F> + fmt::Debug, F: fmt::Debug> fmt::Debug
665            for ReadExactToBytes<'a, A>
666        {
667            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
668                self.inner.fmt(f)
669            }
670        }
671
672        impl<'a, A: AsyncRead> ReadExactToBytes<'a, A> {
673            pub(super) fn new(inner: ReadExact<'a, A>) -> Self {
674                Self { inner }
675            }
676        }
677
678        impl<'a, A: AsyncRead> Future for ReadExactToBytes<'a, A> {
679            type Output = Result<Bytes>;
680
681            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Bytes>> {
682                Poll::Ready(match ready!(self.project().inner.poll(cx)) {
683                    Ok(buf) => Ok(buf.freeze()),
684                    Err(e) => Err(e),
685                })
686            }
687        }
688
689        pin_project! {
690            pub struct ReadExactToBytesAt<'a, A: ?Sized> where A: AsyncReadOverlapped {
691                #[pin]
692                inner: ReadExactAt<'a, A>,
693            }
694        }
695
696        impl<'a, A: AsyncReadOverlapped<Future = F> + fmt::Debug, F: fmt::Debug> fmt::Debug
697            for ReadExactToBytesAt<'a, A>
698        {
699            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
700                self.inner.fmt(f)
701            }
702        }
703
704        impl<'a, A: AsyncReadOverlapped> ReadExactToBytesAt<'a, A> {
705            pub(super) fn new(inner: ReadExactAt<'a, A>) -> Self {
706                Self { inner }
707            }
708        }
709
710        impl<'a, A: AsyncReadOverlapped> Future for ReadExactToBytesAt<'a, A> {
711            type Output = Result<Bytes>;
712
713            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Bytes>> {
714                Poll::Ready(match ready!(self.project().inner.poll(cx)) {
715                    Ok(buf) => Ok(buf.freeze()),
716                    Err(e) => Err(e),
717                })
718            }
719        }
720
721        pin_project! {
722            #[derive(Debug)]
723            pub struct WriteAllAt<'a, W: ?Sized> where W: AsyncWriteOverlapped {
724                #[pin]
725                inner: Option<W::Future>,
726                bytes_written: usize,
727                writer: &'a W,
728                buf0: Bytes,
729                offset: u64,
730                current_finished: bool,
731            }
732        }
733
734        impl<'a, W: AsyncWriteOverlapped> WriteAllAt<'a, W> {
735            pub(crate) fn new(writer: &'a W, bytes: Bytes, offset: u64) -> Self {
736                let buf0 = bytes;
737                let bytes_written = 0;
738                WriteAllAt {
739                    inner: Self::create_write_future(writer, &buf0, offset, bytes_written).unwrap(),
740                    bytes_written,
741                    writer,
742                    buf0,
743                    offset,
744                    current_finished: false,
745                }
746            }
747
748            fn create_write_future(
749                writer: &'a W,
750                buf0: &Bytes,
751                offset: u64,
752                bytes_written: usize,
753            ) -> Result<Option<W::Future>> {
754                let buf = buf0.slice(bytes_written..);
755                Ok(if buf.is_empty() {
756                    None
757                } else {
758                    let Some(new_offset) = offset.checked_add(bytes_written as _) else {
759                        return Err(Error::new(
760                            std::io::ErrorKind::InvalidInput,
761                            "offset out of bound",
762                        ));
763                    };
764                    Some(writer.write_at(buf, new_offset))
765                })
766            }
767        }
768
769        impl<'a, W: AsyncWriteOverlapped> Future for WriteAllAt<'a, W> {
770            type Output = Result<()>;
771
772            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
773                let mut this = self.project();
774                if replace(this.current_finished, false) {
775                    this.inner.set(
776                        match Self::create_write_future(
777                            this.writer,
778                            this.buf0,
779                            *this.offset,
780                            *this.bytes_written,
781                        ) {
782                            Ok(fut) => fut,
783                            Err(e) => return Poll::Ready(Err(e)),
784                        },
785                    );
786                }
787                let Some(inner) = this.inner.as_pin_mut() else {
788                    return Poll::Ready(Ok(()));
789                };
790                match ready!(inner.poll(cx)) {
791                    Ok(bytes_written) => {
792                        if bytes_written == 0 {
793                            Poll::Ready(Err(Error::new(
794                                std::io::ErrorKind::WriteZero,
795                                "write zero",
796                            )))
797                        } else {
798                            *this.bytes_written += bytes_written;
799                            if *this.bytes_written < this.buf0.len() {
800                                *this.current_finished = true;
801                                cx.waker().wake_by_ref();
802                                Poll::Pending
803                            } else {
804                                Poll::Ready(Ok(()))
805                            }
806                        }
807                    }
808                    Err(e) => Poll::Ready(Err(e)),
809                }
810            }
811        }
812
813        pin_project! {
814            #[derive(Debug)]
815            pub struct WriteAll<'a, W: ?Sized> where W: AsyncWrite {
816                #[pin]
817                inner: Option<W::Future>,
818                bytes_written: usize,
819                writer: &'a W,
820                buf0: Bytes,
821                current_finished: bool,
822            }
823        }
824
825        impl<'a, W: AsyncWrite> WriteAll<'a, W> {
826            pub(crate) fn new(writer: &'a W, bytes: Bytes) -> Self {
827                let buf0 = bytes;
828                let bytes_written = 0;
829                WriteAll {
830                    inner: Self::create_write_future(writer, &buf0, bytes_written),
831                    bytes_written,
832                    writer,
833                    buf0,
834                    current_finished: false,
835                }
836            }
837
838            fn create_write_future(
839                writer: &'a W,
840                buf0: &Bytes,
841                bytes_written: usize,
842            ) -> Option<W::Future> {
843                let buf = buf0.slice(bytes_written..);
844                if buf.is_empty() {
845                    None
846                } else {
847                    Some(writer.write(buf))
848                }
849            }
850        }
851
852        impl<'a, W: AsyncWrite> Future for WriteAll<'a, W> {
853            type Output = Result<()>;
854
855            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
856                let mut this = self.project();
857                if replace(this.current_finished, false) {
858                    this.inner.set(Self::create_write_future(
859                        this.writer,
860                        this.buf0,
861                        *this.bytes_written,
862                    ));
863                }
864                let Some(inner) = this.inner.as_pin_mut() else {
865                    return Poll::Ready(Ok(()));
866                };
867                match ready!(inner.poll(cx)) {
868                    Ok(bytes_written) => {
869                        if bytes_written == 0 {
870                            Poll::Ready(Err(Error::new(
871                                std::io::ErrorKind::WriteZero,
872                                "write zero",
873                            )))
874                        } else {
875                            *this.bytes_written += bytes_written;
876                            if *this.bytes_written < this.buf0.len() {
877                                *this.current_finished = true;
878                                cx.waker().wake_by_ref();
879                                Poll::Pending
880                            } else {
881                                Poll::Ready(Ok(()))
882                            }
883                        }
884                    }
885                    Err(e) => Poll::Ready(Err(e)),
886                }
887            }
888        }
889    }
890}
891
892#[cfg_attr(feature = "nightly", doc(cfg(feature = "io")))]
893#[cfg(feature = "io")]
894pub use traits::*;
895
896#[doc(hidden)]
897#[macro_export]
898#[expect(clippy::module_name_repetitions)]
899macro_rules! async_io {
900    ($result:ident @ $op:ident $options:tt => $postfix:expr) => {
901        $crate::task::handle()
902            .async_io($crate::io::Request::$op($crate::io::request::$op $options))
903            .await
904            .map(|response| match response {
905                $crate::io::Response::$op($result) => $postfix,
906                _ => unreachable!(),
907            })
908    };
909
910    ($op:ident $options:tt) => {
911        {
912            #[allow(clippy::drop_non_drop)]
913            let r = async_io!(r @ $op $options => std::mem::drop(r));
914            r
915        }
916    };
917}
918
919#[doc(hidden)]
920#[macro_export]
921#[expect(clippy::module_name_repetitions)]
922macro_rules! threading_io {
923    ($fn:block) => {
924        match $crate::task::handle().spawn_blocking(move || $fn) {
925            Ok(jh) => jh.await.unwrap(),
926            Err(_) => std::future::pending().await,
927        }
928    };
929}
930
931#[doc(hidden)]
932#[macro_export]
933macro_rules! max {
934    ($v1:expr, $v2:expr) => {{
935        let v1 = $v1;
936        let v2 = $v2;
937        if v1 < v2 {
938            v2
939        } else {
940            v1
941        }
942    }};
943}
944
945#[cfg(all(feature = "io", any(feature = "fs", feature = "net")))]
946pub(crate) mod rw {
947    use std::future::Future;
948    use std::ops::{Deref, DerefMut};
949    use std::pin::Pin;
950    use std::task::{ready, Context, Poll};
951
952    use bytes::BytesMut;
953
954    use crate::io::{response, Completion, Response, Result};
955
956    #[must_use]
957    #[derive(Debug)]
958    pub struct Read(pub(crate) Completion);
959
960    impl Future for Read {
961        type Output = Result<(BytesMut, usize)>;
962        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(BytesMut, usize)>> {
963            Poll::Ready(
964                ready!(Pin::new(&mut self.get_mut().0).poll(cx)).map(|response| match response {
965                    Response::Read(response::Read { buf, bytes_read }) => (buf, bytes_read),
966                    Response::RecvMsg(response::RecvMsg {
967                        buffers,
968                        bytes_read,
969                        ..
970                    }) => {
971                        debug_assert_eq!(buffers.len(), 1);
972                        (buffers.into_iter().next().unwrap(), bytes_read)
973                    }
974                    _ => unreachable!(),
975                }),
976            )
977        }
978    }
979
980    impl Deref for Read {
981        type Target = Completion;
982
983        fn deref(&self) -> &Completion {
984            &self.0
985        }
986    }
987
988    impl DerefMut for Read {
989        fn deref_mut(&mut self) -> &mut Completion {
990            &mut self.0
991        }
992    }
993
994    impl AsRef<Completion> for Read {
995        fn as_ref(&self) -> &Completion {
996            &self.0
997        }
998    }
999
1000    impl AsMut<Completion> for Read {
1001        fn as_mut(&mut self) -> &mut Completion {
1002            &mut self.0
1003        }
1004    }
1005
1006    impl Read {
1007        #[inline]
1008        pub fn into_inner(self) -> Completion {
1009            self.0
1010        }
1011    }
1012
1013    #[must_use]
1014    #[derive(Debug)]
1015    pub struct ReadVectored(pub(crate) Completion);
1016
1017    impl Future for ReadVectored {
1018        type Output = Result<(Vec<BytesMut>, usize)>;
1019        fn poll(
1020            self: Pin<&mut Self>,
1021            cx: &mut Context<'_>,
1022        ) -> Poll<Result<(Vec<BytesMut>, usize)>> {
1023            Poll::Ready(
1024                ready!(Pin::new(&mut self.get_mut().0).poll(cx)).map(|response| match response {
1025                    Response::RecvMsg(response::RecvMsg {
1026                        buffers,
1027                        bytes_read,
1028                        ..
1029                    }) => (buffers, bytes_read),
1030                    _ => unreachable!(),
1031                }),
1032            )
1033        }
1034    }
1035
1036    impl Deref for ReadVectored {
1037        type Target = Completion;
1038
1039        fn deref(&self) -> &Completion {
1040            &self.0
1041        }
1042    }
1043
1044    impl DerefMut for ReadVectored {
1045        fn deref_mut(&mut self) -> &mut Completion {
1046            &mut self.0
1047        }
1048    }
1049
1050    impl AsRef<Completion> for ReadVectored {
1051        fn as_ref(&self) -> &Completion {
1052            &self.0
1053        }
1054    }
1055
1056    impl AsMut<Completion> for ReadVectored {
1057        fn as_mut(&mut self) -> &mut Completion {
1058            &mut self.0
1059        }
1060    }
1061
1062    impl ReadVectored {
1063        #[inline]
1064        pub fn into_inner(self) -> Completion {
1065            self.0
1066        }
1067    }
1068
1069    #[must_use]
1070    #[derive(Debug)]
1071    pub struct Write(pub(crate) Completion);
1072
1073    impl Future for Write {
1074        type Output = Result<usize>;
1075        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> {
1076            Poll::Ready(
1077                ready!(Pin::new(&mut self.get_mut().0).poll(cx)).map(|response| match response {
1078                    Response::Write(response::Write { bytes_written })
1079                    | Response::SendMsg(response::SendMsg { bytes_written }) => bytes_written,
1080                    #[cfg(target_os = "linux")]
1081                    Response::SendMsgZc(response::SendMsgZc { bytes_written }) => bytes_written,
1082                    _ => unreachable!(),
1083                }),
1084            )
1085        }
1086    }
1087
1088    impl Deref for Write {
1089        type Target = Completion;
1090
1091        fn deref(&self) -> &Completion {
1092            &self.0
1093        }
1094    }
1095
1096    impl DerefMut for Write {
1097        fn deref_mut(&mut self) -> &mut Completion {
1098            &mut self.0
1099        }
1100    }
1101
1102    impl AsRef<Completion> for Write {
1103        fn as_ref(&self) -> &Completion {
1104            &self.0
1105        }
1106    }
1107
1108    impl AsMut<Completion> for Write {
1109        fn as_mut(&mut self) -> &mut Completion {
1110            &mut self.0
1111        }
1112    }
1113
1114    impl Write {
1115        #[inline]
1116        pub fn into_inner(self) -> Completion {
1117            self.0
1118        }
1119    }
1120
1121    pub use Write as WriteVectored;
1122}