flatty_io/async_/
io.rs

1use super::{AsyncReadBuffer, AsyncWriteBuffer, IoBuffer};
2use futures::{
3    io::{AsyncRead, AsyncWrite},
4    ready, Future,
5};
6use std::{
7    io,
8    pin::Pin,
9    task::{Context, Poll},
10};
11
12impl<P: AsyncWrite + Unpin> AsyncWriteBuffer for IoBuffer<P> {
13    type Error = io::Error;
14
15    fn poll_alloc(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
16        let n = self.buffer.vacant_len();
17        if n > 0 {
18            self.buffer.advance(n);
19        }
20        Poll::Ready(Ok(()))
21    }
22
23    type WriteAll<'a> = WriteAll<'a, P> where P: 'a;
24
25    fn write_all(&mut self, count: usize) -> Self::WriteAll<'_> {
26        WriteAll {
27            owner: self,
28            pos: 0,
29            count,
30        }
31    }
32}
33
34pub struct WriteAll<'a, P: AsyncWrite + Unpin + 'a> {
35    owner: &'a mut IoBuffer<P>,
36    pos: usize,
37    count: usize,
38}
39
40impl<'a, P: AsyncWrite + Unpin + 'a> Unpin for WriteAll<'a, P> {}
41
42impl<'a, P: AsyncWrite + Unpin + 'a> Future for WriteAll<'a, P> {
43    type Output = io::Result<()>;
44    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45        assert!(!self.owner.poisoned);
46        while self.pos < self.count {
47            let (pos, count) = (self.pos, self.count);
48            let (pipe, buffer) = self.owner.split_mut();
49            match ready!(Pin::new(pipe).poll_write(cx, &buffer.occupied()[pos..count])) {
50                Ok(n) => {
51                    if n == 0 {
52                        if self.pos != 0 {
53                            self.owner.poisoned = true;
54                        }
55                        return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
56                    } else {
57                        self.pos += n;
58                    }
59                }
60                Err(e) => {
61                    if self.pos != 0 {
62                        self.owner.poisoned = true;
63                        return Poll::Ready(Err(e));
64                    }
65                }
66            }
67        }
68        ready!(Pin::new(&mut self.owner.pipe).poll_flush(cx))?;
69        self.owner.buffer.clear();
70        Poll::Ready(Ok(()))
71    }
72}
73
74impl<P: AsyncRead + Unpin> AsyncReadBuffer for IoBuffer<P> {
75    type Error = io::Error;
76
77    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize, io::Error>> {
78        assert!(!self.poisoned);
79        if self.buffer.vacant_len() == 0 {
80            if self.buffer.preceding_len() > 0 {
81                self.buffer.make_contiguous();
82            } else {
83                return Poll::Ready(Err(io::ErrorKind::OutOfMemory.into()));
84            }
85        }
86        let (pipe, buffer) = self.split_mut();
87        let res = ready!(Pin::new(pipe).poll_read(cx, buffer.vacant_mut()));
88        if let Ok(n) = res {
89            self.buffer.advance(n);
90        }
91        Poll::Ready(res)
92    }
93
94    fn skip(&mut self, count: usize) {
95        self.buffer.skip(count);
96    }
97}