webdav_request/
reader.rs

1use crate::{RequestBuilder, Response};
2use std::{
3    future::Future,
4    io::{self},
5    pin::{pin, Pin},
6    task::Poll,
7};
8type ResponseStream = Box<dyn Unpin + Future<Output = reqwest::Result<Response>>>;
9
10pub struct LazyResponseReader {
11    request: Option<RequestBuilder>,
12    stream: Option<ResponseStream>,
13    reader: Option<ResponseReader>,
14}
15
16impl From<RequestBuilder> for LazyResponseReader {
17    fn from(value: RequestBuilder) -> Self {
18        Self::new(value)
19    }
20}
21impl From<ResponseStream> for LazyResponseReader {
22    fn from(value: ResponseStream) -> Self {
23        Self {
24            request: None,
25            stream: Some(value),
26            reader: None,
27        }
28    }
29}
30impl LazyResponseReader {
31    pub fn new(builder: RequestBuilder) -> Self {
32        Self {
33            request: Some(builder),
34            stream: None,
35            reader: None,
36        }
37    }
38}
39impl Unpin for LazyResponseReader {}
40
41macro_rules! ready {
42    ($n:expr) => {
43        if ($n == 0) {
44            return Poll::Ready(Ok(()));
45        }
46    };
47    ($v1:expr, $v2:expr) => {
48        if ($v1 == $v2) {
49            return Poll::Ready(Ok(()));
50        }
51    };
52}
53
54impl tokio::io::AsyncRead for LazyResponseReader {
55    fn poll_read(
56        self: std::pin::Pin<&mut Self>,
57        cx: &mut std::task::Context<'_>,
58        buf: &mut tokio::io::ReadBuf<'_>,
59    ) -> std::task::Poll<io::Result<()>> {
60        ready!(buf.remaining());
61
62        let this = self.get_mut();
63
64        if this.request.is_some() {
65            let request = this.request.take().unwrap();
66            this.stream = Some(Box::new(request.send()));
67        }
68        if let Some(send) = &mut this.stream {
69            match Future::poll(Pin::new(send), cx) {
70                Poll::Ready(data) => match data {
71                    Ok(response) => {
72                        if !response.status().is_success() {
73                            return Poll::Ready(Err(io::Error::other(
74                                response.status().to_string(),
75                            )));
76                        }
77                        this.stream = None;
78                        this.reader = Some(ResponseReader::new(response))
79                    }
80                    Err(e) => return Poll::Ready(Err(io::Error::other(e.to_string()))),
81                },
82                Poll::Pending => return Poll::Pending,
83            }
84        }
85
86        if let Some(val) = &mut this.reader {
87            return Pin::new(val).poll_read(cx, buf);
88        }
89
90        Poll::Ready(Ok(()))
91    }
92}
93
94#[derive(Default)]
95pub struct ResponseReader {
96    inner: Option<Response>,
97    buf: Vec<u8>,
98}
99
100impl std::fmt::Debug for ResponseReader {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        match &self.inner {
103            Some(res) => res.fmt(f),
104            _ => f.write_str("None"),
105        }
106    }
107}
108impl From<Response> for ResponseReader {
109    fn from(value: Response) -> Self {
110        Self::new(value)
111    }
112}
113impl ResponseReader {
114    pub fn new(response: Response) -> ResponseReader {
115        Self {
116            inner: Some(response),
117            ..Default::default()
118        }
119    }
120}
121
122impl tokio::io::AsyncRead for ResponseReader {
123    fn poll_read(
124        self: std::pin::Pin<&mut Self>,
125        cx: &mut std::task::Context<'_>,
126        buf: &mut tokio::io::ReadBuf<'_>,
127    ) -> std::task::Poll<io::Result<()>> {
128        ready!(buf.remaining());
129        let this = self.get_mut();
130        if let Some(res) = &mut this.inner {
131            loop {
132                let chunk = res.chunk();
133                match Future::poll(pin!(chunk), cx) {
134                    Poll::Ready(Ok(bytes)) => {
135                        if let Some(bytes) = bytes {
136                            this.buf.extend_from_slice(&bytes.slice(0..bytes.len()));
137                            let remain = buf.remaining();
138                            if this.buf.len() >= remain {
139                                buf.put_slice(&this.buf[..remain]);
140                                this.buf = this.buf[remain..].to_owned();
141                                return Poll::Ready(Ok(()));
142                            }
143                        } else {
144                            let remain = buf.remaining();
145                            if this.buf.len() >= remain {
146                                buf.put_slice(&this.buf[..remain]);
147                                this.buf = this.buf[remain..].to_owned();
148                            } else {
149                                buf.put_slice(&this.buf);
150                                this.buf = Vec::new();
151                            }
152                            return Poll::Ready(Ok(()));
153                        }
154                    }
155                    Poll::Ready(Err(err)) => {
156                        return Poll::Ready(Err(io::Error::other(err.to_string())));
157                    }
158                    Poll::Pending => return Poll::Pending,
159                }
160            }
161        } else {
162            Poll::Pending
163        }
164    }
165}
166
167impl Unpin for ResponseReader {}