1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
use std::{
    future::Future, io::{self, Read}, pin::pin, task::Poll
};
use reqwest::Response;
use tokio::io::AsyncRead;
#[derive(Default)]
pub struct ResponseReader {
    inner: Option<Response>,
    buf: Vec<u8>,
}

impl std::fmt::Debug for ResponseReader {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match &self.inner {
            Some(res) => res.fmt(f),
            _ => f.write_str("None"),
        }
    }
}

impl ResponseReader {
    pub fn new(inner: Option<Response>) -> ResponseReader {
        Self {
            inner,
            ..Default::default()
        }
    }
}

impl AsyncRead for ResponseReader {
    fn poll_read(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> std::task::Poll<io::Result<()>> {
        let this = self.get_mut();
        if let Some(res) = &mut this.inner {
            loop {
                let chunk = res.chunk();
                match Future::poll(pin!(chunk), cx) {
                    Poll::Ready(Ok(bytes)) => {
                        if let Some(bytes) = bytes {
                            let mut cursor = io::Cursor::new(bytes);
                            match cursor.read_to_end(&mut this.buf) {
                                Ok(_) => {
                                    if this.buf.len() >= buf.capacity() {
                                        buf.put_slice(&this.buf[..buf.capacity()]);
                                        this.buf = this.buf[buf.capacity()..].to_owned();
                                        return Poll::Ready(Ok(()));
                                    }
                                }
                                Err(err) => {
                                    return Poll::Ready(Err(err));
                                }
                            }
                        } else {
                            buf.put_slice(&this.buf);
                            this.buf.clear();
                            return Poll::Ready(Ok(()));
                        }
                    }
                    Poll::Ready(Err(err)) => {
                        return Poll::Ready(Err(io::Error::new(
                            std::io::ErrorKind::Other,
                            err.to_string(),
                        )));
                    }
                    Poll::Pending => return Poll::Pending,
                }
            }
        } else {
            Poll::Pending
        }
    }
}