yup_hyper_mock/
streams.rs

1use std::io;
2use std::str;
3use std::{
4    cmp::min,
5    pin::Pin,
6    task::{Context, Poll, Waker},
7};
8
9use hyper::rt::ReadBufCursor;
10use hyper_util::client::legacy::connect::{Connected, Connection};
11use log::trace;
12
13pub struct MockPollStream {
14    data: Vec<u8>,
15    pos: usize,
16    ready_for_response: bool,
17    waker: Option<Waker>,
18}
19
20impl MockPollStream {
21    pub fn new(data: Vec<u8>) -> Self {
22        Self {
23            data,
24            pos: 0,
25            ready_for_response: false,
26            waker: None,
27        }
28    }
29}
30
31impl hyper::rt::Read for MockPollStream {
32    fn poll_read(
33        mut self: Pin<&mut Self>,
34        cx: &mut Context<'_>,
35        mut buf: ReadBufCursor<'_>,
36    ) -> Poll<io::Result<()>> {
37        if !self.ready_for_response {
38            trace!("Not ready for read yet");
39            self.waker = Some(cx.waker().clone());
40            return Poll::Pending;
41        }
42        let mut read_buf = unsafe { tokio::io::ReadBuf::uninit(buf.as_mut()) };
43        trace!(
44            "Buffer size: {}, Data size: {}, Pos: {}",
45            read_buf.remaining(),
46            self.data.len(),
47            self.pos
48        );
49        let n = min(read_buf.remaining(), self.data.len() - self.pos);
50        let read_until = self.pos + n;
51        read_buf.put_slice(&self.data[self.pos..read_until]);
52        self.pos = read_until;
53        unsafe { buf.advance(n) };
54        trace!(
55            "Read {} bytes: '{}'",
56            n,
57            str::from_utf8(&self.data[self.pos..read_until]).unwrap_or("<bad utf-8>")
58        );
59        self.waker = Some(cx.waker().clone());
60        Poll::Ready(Ok(()))
61    }
62}
63
64impl hyper::rt::Write for MockPollStream {
65    fn poll_write(
66        self: Pin<&mut Self>,
67        _cx: &mut Context<'_>,
68        data: &[u8],
69    ) -> Poll<io::Result<usize>> {
70        trace!(
71            "Request data: {}",
72            str::from_utf8(data).unwrap_or("<bad utf-8>")
73        );
74        let Self {
75            ready_for_response,
76            waker,
77            ..
78        } = self.get_mut();
79        *ready_for_response = true;
80        waker.take().map(|w| w.wake());
81        Poll::Ready(Ok(data.len()))
82    }
83
84    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
85        Poll::Ready(Ok(()))
86    }
87
88    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
89        Poll::Ready(Ok(()))
90    }
91}
92
93impl Connection for MockPollStream {
94    fn connected(&self) -> Connected {
95        Connected::new()
96    }
97}