async_http_codec/internal/
buffer_write.rs1use crate::internal::io_future::{IoFuture, IoFutureState};
2use futures::AsyncWrite;
3use std::io;
4use std::mem::replace;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8pub struct BufferWriteState {
9 buffer: io::Result<Vec<u8>>,
10 completion: usize,
11}
12
13impl BufferWriteState {
14 pub fn new(buffer: io::Result<Vec<u8>>) -> Self {
15 Self {
16 buffer,
17 completion: 0,
18 }
19 }
20}
21
22impl<IO: AsyncWrite + Unpin> IoFutureState<IO> for BufferWriteState {
23 fn poll(&mut self, cx: &mut Context<'_>, io: &mut IO) -> Poll<io::Result<()>> {
24 let buffer = match &self.buffer {
25 Ok(buffer) => buffer,
26 Err(_) => {
27 let r = replace(&mut self.buffer, Ok(Vec::new()));
28 return Poll::Ready(Err(r.unwrap_err()));
29 }
30 };
31 loop {
32 let remainder = &buffer[self.completion..];
33 match Pin::new(&mut *io).poll_write(cx, &remainder) {
34 Poll::Ready(Ok(n)) => {
35 if n == remainder.len() {
36 return Poll::Ready(Ok(()));
37 }
38 self.completion += n;
39 }
40 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
41 Poll::Pending => return Poll::Pending,
42 }
43 }
44 }
45}
46
47pub type BufferWrite<IO> = IoFuture<BufferWriteState, IO>;
48
49#[allow(dead_code)]
50const fn check_if_send<T: Send>() {}
51const _: () = check_if_send::<BufferWrite<Box<dyn AsyncWrite + Send + Unpin>>>();
52
53#[cfg(test)]
54mod tests {
55 use crate::internal::buffer_write::{BufferWrite, BufferWriteState};
56 use crate::internal::io_future::IoFutureState;
57 use futures::executor::block_on;
58 use futures::io::Cursor;
59
60 #[test]
61 fn test() {
62 block_on(async {
63 const HELLO_WORLD: &[u8] = b"Hello World!";
64 let mut io = Cursor::new(Vec::new());
65 let fut: BufferWrite<_> =
66 BufferWriteState::new(Ok(HELLO_WORLD.to_vec())).into_future(&mut io);
67 fut.await.unwrap();
68
69 assert_eq!(
70 String::from_utf8(HELLO_WORLD.to_vec()).unwrap(),
71 String::from_utf8(io.into_inner()).unwrap()
72 );
73 })
74 }
75}