1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use reqwest::{RequestBuilder, Response};
use std::{
    future::Future,
    io,
    pin::{pin, Pin},
    task::Poll,
};

pub struct LazyResponseReader {
    pub(crate) request: Option<RequestBuilder>,
    pub(crate) buf: Option<Box<dyn Unpin + Future<Output = reqwest::Result<Response>>>>,
    pub(crate) reader: Option<ResponseReader>,
}
impl Unpin for LazyResponseReader {}
impl tokio::io::AsyncRead for LazyResponseReader {
    fn poll_read(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> std::task::Poll<io::Result<()>> {
        let this = self.get_mut();
        if let Some(request) = this.request.take() {
            this.buf = Some(Box::new(request.send()));
        }
        if let Some(send) = &mut this.buf {
            match Future::poll(Pin::new(send), cx) {
                Poll::Ready(data) => match data {
                    Ok(response) => {
                        if !response.status().is_success() {
                            return Poll::Ready(Err(io::Error::new(
                                io::ErrorKind::Other,
                                response.status().to_string(),
                            )));
                        }
                        this.buf = None;
                        this.reader = Some(ResponseReader::new(Some(response)))
                    }
                    Err(e) => {
                        return Poll::Ready(Err(io::Error::new(
                            io::ErrorKind::Other,
                            e.to_string(),
                        )))
                    }
                },
                Poll::Pending => return Poll::Pending,
            }
        }
        if let Some(r) = &mut this.reader {
            return tokio::io::AsyncRead::poll_read(Pin::new(r), cx, buf);
        }
        Poll::Ready(Ok(()))
    }
}

#[derive(Default)]
pub struct ResponseReader {
    inner: Option<Response>,
    buf: Vec<u8>,
}

impl std::fmt::Debug for ResponseReader {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match &self.inner {
            Some(res) => res.fmt(f),
            _ => f.write_str("None"),
        }
    }
}

impl ResponseReader {
    pub fn new(inner: Option<Response>) -> ResponseReader {
        Self {
            inner,
            ..Default::default()
        }
    }
}

impl tokio::io::AsyncRead for ResponseReader {
    fn poll_read(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> std::task::Poll<io::Result<()>> {
        let this = self.get_mut();
        if let Some(res) = &mut this.inner {
            loop {
                let chunk = res.chunk();
                match Future::poll(pin!(chunk), cx) {
                    Poll::Ready(Ok(bytes)) => {
                        if let Some(bytes) = bytes {
                            this.buf.extend_from_slice(&bytes.slice(0..bytes.len()));
                            let remain = buf.remaining();
                            if this.buf.len() >= remain {
                                buf.put_slice(&this.buf[..remain]);
                                this.buf = this.buf[remain..].to_owned();
                                return Poll::Ready(Ok(()));
                            }
                        } else {
                            buf.put_slice(&this.buf);
                            this.buf.clear();
                            return Poll::Ready(Ok(()));
                        }
                    }
                    Poll::Ready(Err(err)) => {
                        return Poll::Ready(Err(io::Error::new(
                            std::io::ErrorKind::Other,
                            err.to_string(),
                        )));
                    }
                    Poll::Pending => return Poll::Pending,
                }
            }
        } else {
            Poll::Pending
        }
    }
}

impl Unpin for ResponseReader {}