1use super::{AsyncReadBuffer, AsyncWriteBuffer, IoBuffer};
2use futures::{
3 io::{AsyncRead, AsyncWrite},
4 ready, Future,
5};
6use std::{
7 io,
8 pin::Pin,
9 task::{Context, Poll},
10};
11
12impl<P: AsyncWrite + Unpin> AsyncWriteBuffer for IoBuffer<P> {
13 type Error = io::Error;
14
15 fn poll_alloc(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
16 let n = self.buffer.vacant_len();
17 if n > 0 {
18 self.buffer.advance(n);
19 }
20 Poll::Ready(Ok(()))
21 }
22
23 type WriteAll<'a> = WriteAll<'a, P> where P: 'a;
24
25 fn write_all(&mut self, count: usize) -> Self::WriteAll<'_> {
26 WriteAll {
27 owner: self,
28 pos: 0,
29 count,
30 }
31 }
32}
33
34pub struct WriteAll<'a, P: AsyncWrite + Unpin + 'a> {
35 owner: &'a mut IoBuffer<P>,
36 pos: usize,
37 count: usize,
38}
39
40impl<'a, P: AsyncWrite + Unpin + 'a> Unpin for WriteAll<'a, P> {}
41
42impl<'a, P: AsyncWrite + Unpin + 'a> Future for WriteAll<'a, P> {
43 type Output = io::Result<()>;
44 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45 assert!(!self.owner.poisoned);
46 while self.pos < self.count {
47 let (pos, count) = (self.pos, self.count);
48 let (pipe, buffer) = self.owner.split_mut();
49 match ready!(Pin::new(pipe).poll_write(cx, &buffer.occupied()[pos..count])) {
50 Ok(n) => {
51 if n == 0 {
52 if self.pos != 0 {
53 self.owner.poisoned = true;
54 }
55 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
56 } else {
57 self.pos += n;
58 }
59 }
60 Err(e) => {
61 if self.pos != 0 {
62 self.owner.poisoned = true;
63 return Poll::Ready(Err(e));
64 }
65 }
66 }
67 }
68 ready!(Pin::new(&mut self.owner.pipe).poll_flush(cx))?;
69 self.owner.buffer.clear();
70 Poll::Ready(Ok(()))
71 }
72}
73
74impl<P: AsyncRead + Unpin> AsyncReadBuffer for IoBuffer<P> {
75 type Error = io::Error;
76
77 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize, io::Error>> {
78 assert!(!self.poisoned);
79 if self.buffer.vacant_len() == 0 {
80 if self.buffer.preceding_len() > 0 {
81 self.buffer.make_contiguous();
82 } else {
83 return Poll::Ready(Err(io::ErrorKind::OutOfMemory.into()));
84 }
85 }
86 let (pipe, buffer) = self.split_mut();
87 let res = ready!(Pin::new(pipe).poll_read(cx, buffer.vacant_mut()));
88 if let Ok(n) = res {
89 self.buffer.advance(n);
90 }
91 Poll::Ready(res)
92 }
93
94 fn skip(&mut self, count: usize) {
95 self.buffer.skip(count);
96 }
97}