webdav_request/
reader.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
use crate::{RequestBuilder, Response};
use std::{
    future::Future,
    io,
    pin::{pin, Pin},
    task::Poll,
};
pub struct LazyResponseReader {
    request: Option<RequestBuilder>,
    buf: Option<Box<dyn Unpin + Future<Output = reqwest::Result<Response>>>>,
    reader: Option<ResponseReader>,
}

impl From<RequestBuilder> for LazyResponseReader {
    fn from(value: RequestBuilder) -> Self {
        Self::new(value)
    }
}
impl From<Box<dyn Unpin + Future<Output = reqwest::Result<Response>>>> for LazyResponseReader {
    fn from(value: Box<dyn Unpin + Future<Output = reqwest::Result<Response>>>) -> Self {
        Self { request: None, buf: Some(value), reader: None }
    }
}
impl LazyResponseReader {
    pub fn new(builder: RequestBuilder) -> Self {
        Self { request: Some(builder), buf: None, reader: None }
    }
}
impl Unpin for LazyResponseReader {}
impl tokio::io::AsyncRead for LazyResponseReader {
    fn poll_read(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> std::task::Poll<io::Result<()>> {
        let this = self.get_mut();
        if let Some(request) = this.request.take() {
            this.buf = Some(Box::new(request.send()));
        }
        if let Some(send) = &mut this.buf {
            match Future::poll(Pin::new(send), cx) {
                Poll::Ready(data) => match data {
                    Ok(response) => {
                        if !response.status().is_success() {
                            return Poll::Ready(Err(io::Error::new(
                                io::ErrorKind::Other,
                                response.status().to_string(),
                            )));
                        }
                        this.buf = None;
                        this.reader = Some(ResponseReader::new(response))
                    }
                    Err(e) => {
                        return Poll::Ready(Err(io::Error::new(
                            io::ErrorKind::Other,
                            e.to_string(),
                        )))
                    }
                },
                Poll::Pending => return Poll::Pending,
            }
        }
        if let Some(r) = &mut this.reader {
            return tokio::io::AsyncRead::poll_read(Pin::new(r), cx, buf);
        }
        Poll::Ready(Ok(()))
    }
}

#[derive(Default)]
pub struct ResponseReader {
    inner: Option<Response>,
    buf: Vec<u8>,
}

impl std::fmt::Debug for ResponseReader {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match &self.inner {
            Some(res) => res.fmt(f),
            _ => f.write_str("None"),
        }
    }
}
impl From<Response> for ResponseReader {
    fn from(value: Response) -> Self {
        Self::new(value)
    }
}
impl ResponseReader {
    pub fn new(response: Response) -> ResponseReader {
        Self {
            inner: Some(response),
            ..Default::default()
        }
    }
}

impl tokio::io::AsyncRead for ResponseReader {
    fn poll_read(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> std::task::Poll<io::Result<()>> {
        let this = self.get_mut();
        if let Some(res) = &mut this.inner {
            loop {
                let chunk = res.chunk();
                match Future::poll(pin!(chunk), cx) {
                    Poll::Ready(Ok(bytes)) => {
                        if let Some(bytes) = bytes {
                            this.buf.extend_from_slice(&bytes.slice(0..bytes.len()));
                            let remain = buf.remaining();
                            if this.buf.len() >= remain {
                                buf.put_slice(&this.buf[..remain]);
                                this.buf = this.buf[remain..].to_owned();
                                return Poll::Ready(Ok(()));
                            }
                        } else {
                            buf.put_slice(&this.buf);
                            this.buf.clear();
                            return Poll::Ready(Ok(()));
                        }
                    }
                    Poll::Ready(Err(err)) => {
                        return Poll::Ready(Err(io::Error::new(
                            std::io::ErrorKind::Other,
                            err.to_string(),
                        )));
                    }
                    Poll::Pending => return Poll::Pending,
                }
            }
        } else {
            Poll::Pending
        }
    }
}

impl Unpin for ResponseReader {}