yup_hyper_mock/
streams.rs1use std::io;
2use std::str;
3use std::{
4 cmp::min,
5 pin::Pin,
6 task::{Context, Poll, Waker},
7};
8
9use hyper::rt::ReadBufCursor;
10use hyper_util::client::legacy::connect::{Connected, Connection};
11use log::trace;
12
13pub struct MockPollStream {
14 data: Vec<u8>,
15 pos: usize,
16 ready_for_response: bool,
17 waker: Option<Waker>,
18}
19
20impl MockPollStream {
21 pub fn new(data: Vec<u8>) -> Self {
22 Self {
23 data,
24 pos: 0,
25 ready_for_response: false,
26 waker: None,
27 }
28 }
29}
30
31impl hyper::rt::Read for MockPollStream {
32 fn poll_read(
33 mut self: Pin<&mut Self>,
34 cx: &mut Context<'_>,
35 mut buf: ReadBufCursor<'_>,
36 ) -> Poll<io::Result<()>> {
37 if !self.ready_for_response {
38 trace!("Not ready for read yet");
39 self.waker = Some(cx.waker().clone());
40 return Poll::Pending;
41 }
42 let mut read_buf = unsafe { tokio::io::ReadBuf::uninit(buf.as_mut()) };
43 trace!(
44 "Buffer size: {}, Data size: {}, Pos: {}",
45 read_buf.remaining(),
46 self.data.len(),
47 self.pos
48 );
49 let n = min(read_buf.remaining(), self.data.len() - self.pos);
50 let read_until = self.pos + n;
51 read_buf.put_slice(&self.data[self.pos..read_until]);
52 self.pos = read_until;
53 unsafe { buf.advance(n) };
54 trace!(
55 "Read {} bytes: '{}'",
56 n,
57 str::from_utf8(&self.data[self.pos..read_until]).unwrap_or("<bad utf-8>")
58 );
59 self.waker = Some(cx.waker().clone());
60 Poll::Ready(Ok(()))
61 }
62}
63
64impl hyper::rt::Write for MockPollStream {
65 fn poll_write(
66 self: Pin<&mut Self>,
67 _cx: &mut Context<'_>,
68 data: &[u8],
69 ) -> Poll<io::Result<usize>> {
70 trace!(
71 "Request data: {}",
72 str::from_utf8(data).unwrap_or("<bad utf-8>")
73 );
74 let Self {
75 ready_for_response,
76 waker,
77 ..
78 } = self.get_mut();
79 *ready_for_response = true;
80 waker.take().map(|w| w.wake());
81 Poll::Ready(Ok(data.len()))
82 }
83
84 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
85 Poll::Ready(Ok(()))
86 }
87
88 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
89 Poll::Ready(Ok(()))
90 }
91}
92
93impl Connection for MockPollStream {
94 fn connected(&self) -> Connected {
95 Connected::new()
96 }
97}