1#[cfg(target_os = "linux")]
2#[path = "backend/io_uring.rs"]
3mod backend;
4
5#[cfg(windows)]
6#[path = "backend/overlapped_io.rs"]
7mod backend;
8
9pub mod request;
10pub mod response;
11
12use std::future::Future;
13use std::hash::{Hash, Hasher};
14#[doc(no_inline)]
15pub use std::io::{Error, Result};
16use std::pin::Pin;
17use std::rc::{Rc, Weak};
18use std::task::{Context, Poll};
19use std::{fmt, mem, ptr};
20
21use crate::task::Handle;
22
23pub use backend::{Event, Timer};
24#[doc(hidden)]
25pub use backend::{RawAsyncFile, RawAsyncSocket};
26pub use poller::Poller;
27pub use request::Request;
28pub use response::Response;
29
30pub mod poller {
31 pub use super::backend::{Config, Poller};
32}
33
34pub struct RawCompletion(Rc<()>);
35
36impl fmt::Debug for RawCompletion {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 write!(f, "RawCompletionRef({:?})", ptr::from_ref(self.0.as_ref()))
39 }
40}
41
42impl RawCompletion {
43 pub(crate) fn new() -> RawCompletion {
44 RawCompletion(Rc::new(()))
45 }
46
47 pub(crate) fn downgrade(&self) -> RawCompletionRef {
48 RawCompletionRef(Rc::downgrade(&self.0))
49 }
50}
51
52impl PartialEq for RawCompletion {
53 fn eq(&self, other: &Self) -> bool {
54 ptr::eq(self.0.as_ref(), other.0.as_ref())
55 }
56}
57
58impl Eq for RawCompletion {}
59
60impl Hash for RawCompletion {
61 fn hash<H: Hasher>(&self, state: &mut H) {
62 ptr::from_ref(self.0.as_ref()).hash(state);
63 }
64}
65
66#[derive(Clone)]
67pub(crate) struct RawCompletionRef(Weak<()>);
68
69impl fmt::Debug for RawCompletionRef {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 write!(f, "RawCompletionRef({:?})", self.0.as_ptr())
72 }
73}
74
75impl RawCompletionRef {
76 #[inline]
77 pub fn upgrade(&self) -> Option<RawCompletion> {
78 self.0.upgrade().map(RawCompletion)
79 }
80}
81
82impl PartialEq for RawCompletionRef {
83 fn eq(&self, other: &Self) -> bool {
84 self.0.ptr_eq(&other.0)
85 }
86}
87
88impl Eq for RawCompletionRef {}
89
90impl Hash for RawCompletionRef {
91 fn hash<H: Hasher>(&self, state: &mut H) {
92 self.0.as_ptr().hash(state);
93 }
94}
95
96impl RawCompletionRef {
97 #[inline]
98 pub fn into_raw(self) -> *const () {
99 self.0.into_raw()
100 }
101
102 #[inline]
103 pub unsafe fn from_raw(ptr: *const ()) -> RawCompletionRef {
104 RawCompletionRef(unsafe { Weak::from_raw(ptr) })
105 }
106}
107
108#[derive(Debug)]
109pub enum CompletionState {
110 NotInitiated(Request),
111 InFlight(RawCompletion),
112 Finished,
113}
114
115#[must_use]
116#[derive(Debug)]
117pub struct Completion {
118 state: CompletionState,
119 handle: Handle,
120}
121
122impl Completion {
123 #[inline]
124 pub(crate) fn new(handle: Handle, request: Request) -> Completion {
125 Completion {
126 state: CompletionState::NotInitiated(request),
127 handle,
128 }
129 }
130
131 #[inline]
132 pub fn associated_runtime(&self) -> Handle {
133 self.handle.clone()
134 }
135
136 #[inline]
137 pub fn state(self) -> CompletionState {
138 self.state
139 }
140}
141
142impl Future for Completion {
143 type Output = Result<Response>;
144
145 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Response>> {
146 let Some(task) = self.handle.wrap_waker(cx.waker().clone()) else {
147 return Poll::Pending;
148 };
149
150 match mem::replace(&mut self.state, CompletionState::Finished) {
151 CompletionState::NotInitiated(request) => {
152 if let Ok(comp) = self
153 .handle
154 .with_poller(|mut poller| poller.submit_request(request, Some(task)))
155 {
156 self.state = CompletionState::InFlight(comp);
157 }
158 Poll::Pending
159 }
160 CompletionState::InFlight(comp) => {
161 if let Ok(r) = self
162 .handle
163 .with_poller(|mut poller| poller.poll_response(comp, Some(task)))
164 {
165 match r {
166 Ok(r) => return Poll::Ready(r),
167 Err(comp) => {
168 self.state = CompletionState::InFlight(comp);
169 }
170 }
171 }
172 Poll::Pending
173 }
174 CompletionState::Finished => {
175 Poll::Pending
177 }
178 }
179 }
180}
181
182#[cfg(feature = "io")]
183mod traits {
184 use std::future::Future;
185
186 use bytes::{Bytes, BytesMut};
187
188 use super::Result;
189
190 pub const DEFAULT_BUFFER_SIZE: usize = 8 * 1024;
191
192 pub trait AsyncRead {
193 type Future: Future<Output = Result<(BytesMut, usize)>>;
194 fn read(&self, buf: BytesMut) -> Self::Future;
195 }
196
197 pub trait AsyncReadOverlapped {
198 type Future: Future<Output = Result<(BytesMut, usize)>>;
199 fn read_at(&self, buf: BytesMut, offset: u64) -> Self::Future;
200 }
201
202 pub trait AsyncReadVectored {
203 type Future: Future<Output = Result<(Vec<BytesMut>, usize)>>;
204 fn read_vectored(&self, buffers: Vec<BytesMut>) -> Self::Future;
205 }
206
207 pub trait AsyncWrite {
208 type Future: Future<Output = Result<usize>>;
209 fn write(&self, buf: Bytes) -> Self::Future;
210 }
211
212 pub trait AsyncWriteOverlapped {
213 type Future: Future<Output = Result<usize>>;
214 fn write_at(&self, buf: Bytes, offset: u64) -> Self::Future;
215 }
216
217 pub trait AsyncWriteVectored {
218 type Future: Future<Output = Result<usize>>;
219 fn write_vectored(&self, buffers: Vec<Bytes>) -> Self::Future;
220 }
221
222 pub trait AsyncReadExt: AsyncRead {
223 fn read_appending(
224 &self,
225 bytes: BytesMut,
226 len: usize,
227 ) -> future::ReadAppending<Self::Future>;
228
229 fn read_all_appending(&self, bytes: BytesMut) -> future::ReadAllAppending<'_, Self>;
230
231 fn read_exact(&self, buf: BytesMut) -> future::ReadExact<'_, Self>;
232
233 fn read_to_bytes(&self, len: usize) -> future::ReadToBytes<Self::Future>;
234
235 fn read_all_to_bytes(&self) -> future::ReadAllToBytes<'_, Self>;
236
237 fn read_exact_to_bytes(&self, len: usize) -> future::ReadExactToBytes<'_, Self>;
238 }
239
240 pub trait AsyncReadOverlappedExt: AsyncReadOverlapped {
241 fn read_exact_at(&self, buf: BytesMut, offset: u64) -> future::ReadExactAt<'_, Self>;
242
243 fn read_to_bytes_at(&self, len: usize, offset: u64) -> future::ReadToBytesAt<Self::Future>;
244
245 fn read_exact_to_bytes_at(
246 &self,
247 len: usize,
248 offset: u64,
249 ) -> future::ReadExactToBytesAt<'_, Self>;
250 }
251
252 pub trait AsyncWriteExt: AsyncWrite {
253 fn write_all(&self, bytes: Bytes) -> future::WriteAll<'_, Self>;
254 }
255
256 pub trait AsyncWriteOverlappedExt: AsyncWriteOverlapped {
257 fn write_all_at(&self, bytes: Bytes, offset: u64) -> future::WriteAllAt<'_, Self>;
258 }
259
260 impl<A: AsyncRead> AsyncReadExt for A {
261 fn read_appending(
262 &self,
263 mut bytes: BytesMut,
264 len: usize,
265 ) -> future::ReadAppending<Self::Future> {
266 let orig_len = bytes.len();
267 let new_len = orig_len + len;
268 bytes.reserve(len);
269 unsafe { bytes.set_len(new_len) };
270 let buf = bytes.split_off(orig_len);
271 let fut = self.read(buf);
272 future::ReadAppending::new(fut, bytes)
273 }
274
275 fn read_all_appending(&self, bytes: BytesMut) -> future::ReadAllAppending<'_, Self> {
276 future::ReadAllAppending::new(self, bytes)
277 }
278
279 fn read_exact(&self, buf: BytesMut) -> future::ReadExact<'_, Self> {
280 future::ReadExact::new(self, buf)
281 }
282
283 fn read_to_bytes(&self, len: usize) -> future::ReadToBytes<Self::Future> {
284 let mut buf = BytesMut::with_capacity(len);
285 unsafe { buf.set_len(len) };
286 future::ReadToBytes::new(self.read(buf))
287 }
288
289 fn read_all_to_bytes(&self) -> future::ReadAllToBytes<'_, Self> {
290 let bytes = BytesMut::new();
291 future::ReadAllToBytes::new(future::ReadAllAppending::new(self, bytes))
292 }
293
294 fn read_exact_to_bytes(&self, len: usize) -> future::ReadExactToBytes<'_, Self> {
295 let mut buf = BytesMut::with_capacity(len);
296 unsafe { buf.set_len(len) };
297 future::ReadExactToBytes::new(future::ReadExact::new(self, buf))
298 }
299 }
300
301 impl<A: AsyncReadOverlapped> AsyncReadOverlappedExt for A {
302 fn read_exact_at(&self, buf: BytesMut, offset: u64) -> future::ReadExactAt<'_, Self> {
303 future::ReadExactAt::new(self, buf, offset)
304 }
305
306 fn read_to_bytes_at(&self, len: usize, offset: u64) -> future::ReadToBytesAt<Self::Future> {
307 let mut buf = BytesMut::with_capacity(len);
308 unsafe { buf.set_len(len) };
309 future::ReadToBytesAt::new(self.read_at(buf, offset))
310 }
311
312 fn read_exact_to_bytes_at(
313 &self,
314 len: usize,
315 offset: u64,
316 ) -> future::ReadExactToBytesAt<'_, Self> {
317 let mut buf = BytesMut::with_capacity(len);
318 unsafe { buf.set_len(len) };
319 future::ReadExactToBytesAt::new(future::ReadExactAt::new(self, buf, offset))
320 }
321 }
322
323 impl<W: AsyncWrite> AsyncWriteExt for W {
324 fn write_all(&self, bytes: Bytes) -> future::WriteAll<'_, Self> {
325 future::WriteAll::new(self, bytes)
326 }
327 }
328
329 impl<W: AsyncWriteOverlapped> AsyncWriteOverlappedExt for W {
330 fn write_all_at(&self, bytes: Bytes, offset: u64) -> future::WriteAllAt<'_, Self> {
331 future::WriteAllAt::new(self, bytes, offset)
332 }
333 }
334
335 #[expect(clippy::multiple_bound_locations)]
336 pub mod future {
337 use std::fmt;
338 use std::future::Future;
339 use std::mem::{replace, take};
340 use std::pin::Pin;
341 use std::task::{ready, Context, Poll};
342
343 use bytes::{Bytes, BytesMut};
344 use pin_project_lite::pin_project;
345
346 use crate::io::{
347 AsyncRead, AsyncReadExt, AsyncReadOverlapped, AsyncWrite, AsyncWriteOverlapped, Error,
348 Result, DEFAULT_BUFFER_SIZE,
349 };
350
351 pin_project! {
352 #[derive(Debug)]
353 pub struct ReadAppending<F> {
354 #[pin]
355 inner: F,
356 buf0: BytesMut,
357 }
358 }
359
360 impl<F: Future<Output = Result<(BytesMut, usize)>>> ReadAppending<F> {
361 pub(super) fn new(inner: F, buf: BytesMut) -> ReadAppending<F> {
362 ReadAppending { inner, buf0: buf }
363 }
364 }
365
366 impl<F: Future<Output = Result<(BytesMut, usize)>>> Future for ReadAppending<F> {
367 type Output = (BytesMut, Result<usize>);
368
369 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(BytesMut, Result<usize>)> {
370 let this = self.project();
371 Poll::Ready(match ready!(this.inner.poll(cx)) {
372 Ok((buf, bytes_read)) => {
373 let mut buf0 = take(this.buf0);
374 let orig_len = buf0.len();
375 let new_len = orig_len + bytes_read;
376 buf0.unsplit(buf);
377 debug_assert!(buf0.len() >= new_len);
378 unsafe { buf0.set_len(new_len) };
379 (buf0, Ok(bytes_read))
380 }
381 Err(e) => {
382 let buf0 = take(this.buf0);
383 (buf0, Err(e))
384 }
385 })
386 }
387 }
388
389 pin_project! {
390 #[derive(Debug)]
391 pub struct ReadAllAppending<'a, A: ?Sized> where A: AsyncRead {
392 #[pin]
393 inner: ReadAppending<A::Future>,
394 bytes_read: usize,
395 reader: &'a A,
396 buf0: BytesMut,
397 current_finished: bool,
398 }
399 }
400
401 impl<'a, F, A: AsyncRead<Future = F>> ReadAllAppending<'a, A> {
402 pub(super) fn new(reader: &'a A, buf: BytesMut) -> Self {
403 let mut buf0 = buf;
404 Self {
405 inner: Self::create_read_future(reader, &mut buf0),
406 bytes_read: 0,
407 reader,
408 buf0,
409 current_finished: false,
410 }
411 }
412
413 fn create_read_future(reader: &'a A, buf0: &mut BytesMut) -> ReadAppending<F> {
414 reader.read_appending(take(buf0), DEFAULT_BUFFER_SIZE)
415 }
416 }
417
418 impl<'a, A: AsyncRead> Future for ReadAllAppending<'a, A> {
419 type Output = (BytesMut, Result<usize>);
420
421 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(BytesMut, Result<usize>)> {
422 let mut this = self.project();
423 if replace(this.current_finished, false) {
424 this.inner
425 .set(Self::create_read_future(this.reader, this.buf0));
426 }
427 match ready!(this.inner.poll(cx)) {
428 (bytes, r @ Err(_)) => Poll::Ready((bytes, r)),
429 (bytes, Ok(0)) => Poll::Ready((bytes, Ok(*this.bytes_read))),
430 (bytes, Ok(inc)) => {
431 *this.bytes_read += inc;
432 *this.buf0 = bytes;
433 *this.current_finished = true;
434 cx.waker().wake_by_ref();
435 Poll::Pending
436 }
437 }
438 }
439 }
440
441 pin_project! {
442 #[derive(Debug)]
443 pub struct ReadExact<'a, A: ?Sized> where A: AsyncRead {
444 #[pin]
445 inner: A::Future,
446 bytes_read: usize,
447 reader: &'a A,
448 buf0: BytesMut,
449 current_finished: bool,
450 }
451 }
452
453 impl<'a, F, A: AsyncRead<Future = F>> ReadExact<'a, A> {
454 pub(super) fn new(reader: &'a A, buf: BytesMut) -> Self {
455 let mut buf0 = buf;
456 let bytes_read = 0;
457 Self {
458 inner: Self::create_read_future(reader, &mut buf0, bytes_read),
459 bytes_read,
460 reader,
461 buf0,
462 current_finished: false,
463 }
464 }
465
466 fn create_read_future(reader: &'a A, buf0: &mut BytesMut, bytes_read: usize) -> F {
467 let buf = buf0.split_off(bytes_read);
468 reader.read(buf)
469 }
470 }
471
472 impl<'a, A: AsyncRead> Future for ReadExact<'a, A> {
473 type Output = Result<BytesMut>;
474
475 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<BytesMut>> {
476 let mut this = self.project();
477 if replace(this.current_finished, false) {
478 this.inner.set(Self::create_read_future(
479 this.reader,
480 this.buf0,
481 *this.bytes_read,
482 ));
483 }
484 match ready!(this.inner.poll(cx)) {
485 Err(e) => Poll::Ready(Err(e)),
486 Ok((buf, bytes_read)) => {
487 this.buf0.unsplit(buf);
488 *this.bytes_read += bytes_read;
489 if *this.bytes_read < this.buf0.len() {
490 if bytes_read == 0 {
491 Poll::Ready(Err(Error::new(
492 std::io::ErrorKind::UnexpectedEof,
493 "unexpected eof",
494 )))
495 } else {
496 *this.current_finished = true;
497 cx.waker().wake_by_ref();
498 Poll::Pending
499 }
500 } else {
501 Poll::Ready(Ok(take(this.buf0)))
502 }
503 }
504 }
505 }
506 }
507
508 pin_project! {
509 #[derive(Debug)]
510 pub struct ReadExactAt<'a, A: ?Sized> where A: AsyncReadOverlapped {
511 #[pin]
512 inner: A::Future,
513 bytes_read: usize,
514 reader: &'a A,
515 buf0: BytesMut,
516 offset: u64,
517 current_finished: bool,
518 }
519 }
520
521 impl<'a, F, A: AsyncReadOverlapped<Future = F>> ReadExactAt<'a, A> {
522 pub(super) fn new(reader: &'a A, buf: BytesMut, offset: u64) -> Self {
523 let mut buf0 = buf;
524 let bytes_read = 0;
525 Self {
526 inner: Self::create_read_future(reader, &mut buf0, offset, bytes_read).unwrap(),
527 bytes_read,
528 reader,
529 buf0,
530 offset,
531 current_finished: false,
532 }
533 }
534
535 fn create_read_future(
536 reader: &'a A,
537 buf0: &mut BytesMut,
538 offset: u64,
539 bytes_read: usize,
540 ) -> Result<F> {
541 let Some(new_offset) = offset.checked_add(bytes_read as _) else {
542 return Err(Error::new(
543 std::io::ErrorKind::InvalidInput,
544 "offset out of bound",
545 ));
546 };
547 let buf = buf0.split_off(bytes_read);
548 Ok(reader.read_at(buf, new_offset))
549 }
550 }
551
552 impl<'a, A: AsyncReadOverlapped> Future for ReadExactAt<'a, A> {
553 type Output = Result<BytesMut>;
554
555 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<BytesMut>> {
556 let mut this = self.project();
557 if replace(this.current_finished, false) {
558 this.inner.set(
559 match Self::create_read_future(
560 this.reader,
561 this.buf0,
562 *this.offset,
563 *this.bytes_read,
564 ) {
565 Ok(fut) => fut,
566 Err(e) => return Poll::Ready(Err(e)),
567 },
568 );
569 }
570 match ready!(this.inner.poll(cx)) {
571 Err(e) => Poll::Ready(Err(e)),
572 Ok((buf, bytes_read)) => {
573 this.buf0.unsplit(buf);
574 *this.bytes_read += bytes_read;
575 if *this.bytes_read < this.buf0.len() {
576 if bytes_read == 0 {
577 Poll::Ready(Err(Error::new(
578 std::io::ErrorKind::UnexpectedEof,
579 "unexpected eof",
580 )))
581 } else {
582 *this.current_finished = true;
583 cx.waker().wake_by_ref();
584 Poll::Pending
585 }
586 } else {
587 Poll::Ready(Ok(take(this.buf0)))
588 }
589 }
590 }
591 }
592 }
593
594 pin_project! {
595 #[derive(Debug)]
596 pub struct ReadToBytesAt<F> {
597 #[pin]
598 inner: F,
599 }
600 }
601
602 pub use ReadToBytesAt as ReadToBytes;
603
604 impl<F: Future<Output = Result<(BytesMut, usize)>>> ReadToBytesAt<F> {
605 pub(super) fn new(inner: F) -> Self {
606 Self { inner }
607 }
608 }
609
610 impl<F: Future<Output = Result<(BytesMut, usize)>>> Future for ReadToBytesAt<F> {
611 type Output = Result<Bytes>;
612
613 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Bytes>> {
614 Poll::Ready(match ready!(self.project().inner.poll(cx)) {
615 Ok((mut buf, bytes_read)) => {
616 debug_assert!(bytes_read <= buf.len());
617 unsafe { buf.set_len(bytes_read) };
618 Ok(buf.freeze())
619 }
620 Err(e) => Err(e),
621 })
622 }
623 }
624
625 pin_project! {
626 pub struct ReadAllToBytes<'a, A: ?Sized> where A: AsyncRead {
627 #[pin]
628 inner: ReadAllAppending<'a, A>,
629 }
630 }
631
632 impl<'a, A: AsyncRead<Future = F> + fmt::Debug, F: fmt::Debug> fmt::Debug
633 for ReadAllToBytes<'a, A>
634 {
635 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
636 self.inner.fmt(f)
637 }
638 }
639
640 impl<'a, A: AsyncRead> ReadAllToBytes<'a, A> {
641 pub(super) fn new(inner: ReadAllAppending<'a, A>) -> Self {
642 Self { inner }
643 }
644 }
645
646 impl<'a, A: AsyncRead> Future for ReadAllToBytes<'a, A> {
647 type Output = Result<Bytes>;
648
649 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Bytes>> {
650 Poll::Ready(match ready!(self.project().inner.poll(cx)) {
651 (buf, Ok(_)) => Ok(buf.freeze()),
652 (_, Err(e)) => Err(e),
653 })
654 }
655 }
656
657 pin_project! {
658 pub struct ReadExactToBytes<'a, A: ?Sized> where A: AsyncRead {
659 #[pin]
660 inner: ReadExact<'a, A>,
661 }
662 }
663
664 impl<'a, A: AsyncRead<Future = F> + fmt::Debug, F: fmt::Debug> fmt::Debug
665 for ReadExactToBytes<'a, A>
666 {
667 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
668 self.inner.fmt(f)
669 }
670 }
671
672 impl<'a, A: AsyncRead> ReadExactToBytes<'a, A> {
673 pub(super) fn new(inner: ReadExact<'a, A>) -> Self {
674 Self { inner }
675 }
676 }
677
678 impl<'a, A: AsyncRead> Future for ReadExactToBytes<'a, A> {
679 type Output = Result<Bytes>;
680
681 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Bytes>> {
682 Poll::Ready(match ready!(self.project().inner.poll(cx)) {
683 Ok(buf) => Ok(buf.freeze()),
684 Err(e) => Err(e),
685 })
686 }
687 }
688
689 pin_project! {
690 pub struct ReadExactToBytesAt<'a, A: ?Sized> where A: AsyncReadOverlapped {
691 #[pin]
692 inner: ReadExactAt<'a, A>,
693 }
694 }
695
696 impl<'a, A: AsyncReadOverlapped<Future = F> + fmt::Debug, F: fmt::Debug> fmt::Debug
697 for ReadExactToBytesAt<'a, A>
698 {
699 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
700 self.inner.fmt(f)
701 }
702 }
703
704 impl<'a, A: AsyncReadOverlapped> ReadExactToBytesAt<'a, A> {
705 pub(super) fn new(inner: ReadExactAt<'a, A>) -> Self {
706 Self { inner }
707 }
708 }
709
710 impl<'a, A: AsyncReadOverlapped> Future for ReadExactToBytesAt<'a, A> {
711 type Output = Result<Bytes>;
712
713 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Bytes>> {
714 Poll::Ready(match ready!(self.project().inner.poll(cx)) {
715 Ok(buf) => Ok(buf.freeze()),
716 Err(e) => Err(e),
717 })
718 }
719 }
720
721 pin_project! {
722 #[derive(Debug)]
723 pub struct WriteAllAt<'a, W: ?Sized> where W: AsyncWriteOverlapped {
724 #[pin]
725 inner: Option<W::Future>,
726 bytes_written: usize,
727 writer: &'a W,
728 buf0: Bytes,
729 offset: u64,
730 current_finished: bool,
731 }
732 }
733
734 impl<'a, W: AsyncWriteOverlapped> WriteAllAt<'a, W> {
735 pub(crate) fn new(writer: &'a W, bytes: Bytes, offset: u64) -> Self {
736 let buf0 = bytes;
737 let bytes_written = 0;
738 WriteAllAt {
739 inner: Self::create_write_future(writer, &buf0, offset, bytes_written).unwrap(),
740 bytes_written,
741 writer,
742 buf0,
743 offset,
744 current_finished: false,
745 }
746 }
747
748 fn create_write_future(
749 writer: &'a W,
750 buf0: &Bytes,
751 offset: u64,
752 bytes_written: usize,
753 ) -> Result<Option<W::Future>> {
754 let buf = buf0.slice(bytes_written..);
755 Ok(if buf.is_empty() {
756 None
757 } else {
758 let Some(new_offset) = offset.checked_add(bytes_written as _) else {
759 return Err(Error::new(
760 std::io::ErrorKind::InvalidInput,
761 "offset out of bound",
762 ));
763 };
764 Some(writer.write_at(buf, new_offset))
765 })
766 }
767 }
768
769 impl<'a, W: AsyncWriteOverlapped> Future for WriteAllAt<'a, W> {
770 type Output = Result<()>;
771
772 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
773 let mut this = self.project();
774 if replace(this.current_finished, false) {
775 this.inner.set(
776 match Self::create_write_future(
777 this.writer,
778 this.buf0,
779 *this.offset,
780 *this.bytes_written,
781 ) {
782 Ok(fut) => fut,
783 Err(e) => return Poll::Ready(Err(e)),
784 },
785 );
786 }
787 let Some(inner) = this.inner.as_pin_mut() else {
788 return Poll::Ready(Ok(()));
789 };
790 match ready!(inner.poll(cx)) {
791 Ok(bytes_written) => {
792 if bytes_written == 0 {
793 Poll::Ready(Err(Error::new(
794 std::io::ErrorKind::WriteZero,
795 "write zero",
796 )))
797 } else {
798 *this.bytes_written += bytes_written;
799 if *this.bytes_written < this.buf0.len() {
800 *this.current_finished = true;
801 cx.waker().wake_by_ref();
802 Poll::Pending
803 } else {
804 Poll::Ready(Ok(()))
805 }
806 }
807 }
808 Err(e) => Poll::Ready(Err(e)),
809 }
810 }
811 }
812
813 pin_project! {
814 #[derive(Debug)]
815 pub struct WriteAll<'a, W: ?Sized> where W: AsyncWrite {
816 #[pin]
817 inner: Option<W::Future>,
818 bytes_written: usize,
819 writer: &'a W,
820 buf0: Bytes,
821 current_finished: bool,
822 }
823 }
824
825 impl<'a, W: AsyncWrite> WriteAll<'a, W> {
826 pub(crate) fn new(writer: &'a W, bytes: Bytes) -> Self {
827 let buf0 = bytes;
828 let bytes_written = 0;
829 WriteAll {
830 inner: Self::create_write_future(writer, &buf0, bytes_written),
831 bytes_written,
832 writer,
833 buf0,
834 current_finished: false,
835 }
836 }
837
838 fn create_write_future(
839 writer: &'a W,
840 buf0: &Bytes,
841 bytes_written: usize,
842 ) -> Option<W::Future> {
843 let buf = buf0.slice(bytes_written..);
844 if buf.is_empty() {
845 None
846 } else {
847 Some(writer.write(buf))
848 }
849 }
850 }
851
852 impl<'a, W: AsyncWrite> Future for WriteAll<'a, W> {
853 type Output = Result<()>;
854
855 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
856 let mut this = self.project();
857 if replace(this.current_finished, false) {
858 this.inner.set(Self::create_write_future(
859 this.writer,
860 this.buf0,
861 *this.bytes_written,
862 ));
863 }
864 let Some(inner) = this.inner.as_pin_mut() else {
865 return Poll::Ready(Ok(()));
866 };
867 match ready!(inner.poll(cx)) {
868 Ok(bytes_written) => {
869 if bytes_written == 0 {
870 Poll::Ready(Err(Error::new(
871 std::io::ErrorKind::WriteZero,
872 "write zero",
873 )))
874 } else {
875 *this.bytes_written += bytes_written;
876 if *this.bytes_written < this.buf0.len() {
877 *this.current_finished = true;
878 cx.waker().wake_by_ref();
879 Poll::Pending
880 } else {
881 Poll::Ready(Ok(()))
882 }
883 }
884 }
885 Err(e) => Poll::Ready(Err(e)),
886 }
887 }
888 }
889 }
890}
891
892#[cfg_attr(feature = "nightly", doc(cfg(feature = "io")))]
893#[cfg(feature = "io")]
894pub use traits::*;
895
896#[doc(hidden)]
897#[macro_export]
898#[expect(clippy::module_name_repetitions)]
899macro_rules! async_io {
900 ($result:ident @ $op:ident $options:tt => $postfix:expr) => {
901 $crate::task::handle()
902 .async_io($crate::io::Request::$op($crate::io::request::$op $options))
903 .await
904 .map(|response| match response {
905 $crate::io::Response::$op($result) => $postfix,
906 _ => unreachable!(),
907 })
908 };
909
910 ($op:ident $options:tt) => {
911 {
912 #[allow(clippy::drop_non_drop)]
913 let r = async_io!(r @ $op $options => std::mem::drop(r));
914 r
915 }
916 };
917}
918
919#[doc(hidden)]
920#[macro_export]
921#[expect(clippy::module_name_repetitions)]
922macro_rules! threading_io {
923 ($fn:block) => {
924 match $crate::task::handle().spawn_blocking(move || $fn) {
925 Ok(jh) => jh.await.unwrap(),
926 Err(_) => std::future::pending().await,
927 }
928 };
929}
930
931#[doc(hidden)]
932#[macro_export]
933macro_rules! max {
934 ($v1:expr, $v2:expr) => {{
935 let v1 = $v1;
936 let v2 = $v2;
937 if v1 < v2 {
938 v2
939 } else {
940 v1
941 }
942 }};
943}
944
945#[cfg(all(feature = "io", any(feature = "fs", feature = "net")))]
946pub(crate) mod rw {
947 use std::future::Future;
948 use std::ops::{Deref, DerefMut};
949 use std::pin::Pin;
950 use std::task::{ready, Context, Poll};
951
952 use bytes::BytesMut;
953
954 use crate::io::{response, Completion, Response, Result};
955
956 #[must_use]
957 #[derive(Debug)]
958 pub struct Read(pub(crate) Completion);
959
960 impl Future for Read {
961 type Output = Result<(BytesMut, usize)>;
962 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(BytesMut, usize)>> {
963 Poll::Ready(
964 ready!(Pin::new(&mut self.get_mut().0).poll(cx)).map(|response| match response {
965 Response::Read(response::Read { buf, bytes_read }) => (buf, bytes_read),
966 Response::RecvMsg(response::RecvMsg {
967 buffers,
968 bytes_read,
969 ..
970 }) => {
971 debug_assert_eq!(buffers.len(), 1);
972 (buffers.into_iter().next().unwrap(), bytes_read)
973 }
974 _ => unreachable!(),
975 }),
976 )
977 }
978 }
979
980 impl Deref for Read {
981 type Target = Completion;
982
983 fn deref(&self) -> &Completion {
984 &self.0
985 }
986 }
987
988 impl DerefMut for Read {
989 fn deref_mut(&mut self) -> &mut Completion {
990 &mut self.0
991 }
992 }
993
994 impl AsRef<Completion> for Read {
995 fn as_ref(&self) -> &Completion {
996 &self.0
997 }
998 }
999
1000 impl AsMut<Completion> for Read {
1001 fn as_mut(&mut self) -> &mut Completion {
1002 &mut self.0
1003 }
1004 }
1005
1006 impl Read {
1007 #[inline]
1008 pub fn into_inner(self) -> Completion {
1009 self.0
1010 }
1011 }
1012
1013 #[must_use]
1014 #[derive(Debug)]
1015 pub struct ReadVectored(pub(crate) Completion);
1016
1017 impl Future for ReadVectored {
1018 type Output = Result<(Vec<BytesMut>, usize)>;
1019 fn poll(
1020 self: Pin<&mut Self>,
1021 cx: &mut Context<'_>,
1022 ) -> Poll<Result<(Vec<BytesMut>, usize)>> {
1023 Poll::Ready(
1024 ready!(Pin::new(&mut self.get_mut().0).poll(cx)).map(|response| match response {
1025 Response::RecvMsg(response::RecvMsg {
1026 buffers,
1027 bytes_read,
1028 ..
1029 }) => (buffers, bytes_read),
1030 _ => unreachable!(),
1031 }),
1032 )
1033 }
1034 }
1035
1036 impl Deref for ReadVectored {
1037 type Target = Completion;
1038
1039 fn deref(&self) -> &Completion {
1040 &self.0
1041 }
1042 }
1043
1044 impl DerefMut for ReadVectored {
1045 fn deref_mut(&mut self) -> &mut Completion {
1046 &mut self.0
1047 }
1048 }
1049
1050 impl AsRef<Completion> for ReadVectored {
1051 fn as_ref(&self) -> &Completion {
1052 &self.0
1053 }
1054 }
1055
1056 impl AsMut<Completion> for ReadVectored {
1057 fn as_mut(&mut self) -> &mut Completion {
1058 &mut self.0
1059 }
1060 }
1061
1062 impl ReadVectored {
1063 #[inline]
1064 pub fn into_inner(self) -> Completion {
1065 self.0
1066 }
1067 }
1068
1069 #[must_use]
1070 #[derive(Debug)]
1071 pub struct Write(pub(crate) Completion);
1072
1073 impl Future for Write {
1074 type Output = Result<usize>;
1075 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> {
1076 Poll::Ready(
1077 ready!(Pin::new(&mut self.get_mut().0).poll(cx)).map(|response| match response {
1078 Response::Write(response::Write { bytes_written })
1079 | Response::SendMsg(response::SendMsg { bytes_written }) => bytes_written,
1080 #[cfg(target_os = "linux")]
1081 Response::SendMsgZc(response::SendMsgZc { bytes_written }) => bytes_written,
1082 _ => unreachable!(),
1083 }),
1084 )
1085 }
1086 }
1087
1088 impl Deref for Write {
1089 type Target = Completion;
1090
1091 fn deref(&self) -> &Completion {
1092 &self.0
1093 }
1094 }
1095
1096 impl DerefMut for Write {
1097 fn deref_mut(&mut self) -> &mut Completion {
1098 &mut self.0
1099 }
1100 }
1101
1102 impl AsRef<Completion> for Write {
1103 fn as_ref(&self) -> &Completion {
1104 &self.0
1105 }
1106 }
1107
1108 impl AsMut<Completion> for Write {
1109 fn as_mut(&mut self) -> &mut Completion {
1110 &mut self.0
1111 }
1112 }
1113
1114 impl Write {
1115 #[inline]
1116 pub fn into_inner(self) -> Completion {
1117 self.0
1118 }
1119 }
1120
1121 pub use Write as WriteVectored;
1122}