Skip to main content

rustfs_rio/
http_reader.rs

1// Copyright 2024 RustFS Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bytes::Bytes;
16use futures::{Stream, TryStreamExt as _};
17use http::HeaderMap;
18use pin_project_lite::pin_project;
19use reqwest::{Client, Method, RequestBuilder};
20use std::error::Error as _;
21use std::io::{self, Error};
22use std::ops::Not as _;
23use std::pin::Pin;
24use std::sync::LazyLock;
25use std::task::{Context, Poll};
26use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
27use tokio::sync::mpsc;
28use tokio_util::io::StreamReader;
29
30use crate::{EtagResolvable, HashReaderDetector, HashReaderMut};
31
32fn get_http_client() -> Client {
33    // Reuse the HTTP connection pool in the global `reqwest::Client` instance
34    // TODO: interact with load balancing?
35    static CLIENT: LazyLock<Client> = LazyLock::new(Client::new);
36    CLIENT.clone()
37}
38
39static HTTP_DEBUG_LOG: bool = false;
40#[inline(always)]
41fn http_debug_log(args: std::fmt::Arguments) {
42    if HTTP_DEBUG_LOG {
43        println!("{args}");
44    }
45}
46macro_rules! http_log {
47    ($($arg:tt)*) => {
48        http_debug_log(format_args!($($arg)*));
49    };
50}
51
52pin_project! {
53    pub struct HttpReader {
54        url:String,
55        method: Method,
56        headers: HeaderMap,
57        #[pin]
58        inner: StreamReader<Pin<Box<dyn Stream<Item=std::io::Result<Bytes>>+Send+Sync>>, Bytes>,
59    }
60}
61
62impl HttpReader {
63    pub async fn new(url: String, method: Method, headers: HeaderMap, body: Option<Vec<u8>>) -> io::Result<Self> {
64        // http_log!("[HttpReader::new] url: {url}, method: {method:?}, headers: {headers:?}");
65        Self::with_capacity(url, method, headers, body, 0).await
66    }
67    /// Create a new HttpReader from a URL. The request is performed immediately.
68    pub async fn with_capacity(
69        url: String,
70        method: Method,
71        headers: HeaderMap,
72        body: Option<Vec<u8>>,
73        _read_buf_size: usize,
74    ) -> io::Result<Self> {
75        // http_log!(
76        //     "[HttpReader::with_capacity] url: {url}, method: {method:?}, headers: {headers:?}, buf_size: {}",
77        //     _read_buf_size
78        // );
79        // First, check if the connection is available (HEAD)
80        let client = get_http_client();
81        let head_resp = client.head(&url).headers(headers.clone()).send().await;
82        match head_resp {
83            Ok(resp) => {
84                http_log!("[HttpReader::new] HEAD status: {}", resp.status());
85                if !resp.status().is_success() {
86                    return Err(Error::other(format!("HEAD failed: url: {}, status {}", url, resp.status())));
87                }
88            }
89            Err(e) => {
90                http_log!("[HttpReader::new] HEAD error: {e}");
91                return Err(Error::other(e.source().map(|s| s.to_string()).unwrap_or_else(|| e.to_string())));
92            }
93        }
94
95        let client = get_http_client();
96        let mut request: RequestBuilder = client.request(method.clone(), url.clone()).headers(headers.clone());
97        if let Some(body) = body {
98            request = request.body(body);
99        }
100
101        let resp = request
102            .send()
103            .await
104            .map_err(|e| Error::other(format!("HttpReader HTTP request error: {e}")))?;
105
106        if resp.status().is_success().not() {
107            return Err(Error::other(format!(
108                "HttpReader HTTP request failed with non-200 status {}",
109                resp.status()
110            )));
111        }
112
113        let stream = resp
114            .bytes_stream()
115            .map_err(|e| Error::other(format!("HttpReader stream error: {e}")));
116
117        Ok(Self {
118            inner: StreamReader::new(Box::pin(stream)),
119            url,
120            method,
121            headers,
122        })
123    }
124    pub fn url(&self) -> &str {
125        &self.url
126    }
127    pub fn method(&self) -> &Method {
128        &self.method
129    }
130    pub fn headers(&self) -> &HeaderMap {
131        &self.headers
132    }
133}
134
135impl AsyncRead for HttpReader {
136    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<std::io::Result<()>> {
137        // http_log!(
138        //     "[HttpReader::poll_read] url: {}, method: {:?}, buf.remaining: {}",
139        //     self.url,
140        //     self.method,
141        //     buf.remaining()
142        // );
143        // Read from the inner stream
144        Pin::new(&mut self.inner).poll_read(cx, buf)
145    }
146}
147
148impl EtagResolvable for HttpReader {
149    fn is_etag_reader(&self) -> bool {
150        false
151    }
152    fn try_resolve_etag(&mut self) -> Option<String> {
153        None
154    }
155}
156
157impl HashReaderDetector for HttpReader {
158    fn is_hash_reader(&self) -> bool {
159        false
160    }
161
162    fn as_hash_reader_mut(&mut self) -> Option<&mut dyn HashReaderMut> {
163        None
164    }
165}
166
167struct ReceiverStream {
168    receiver: mpsc::Receiver<Option<Bytes>>,
169}
170
171impl Stream for ReceiverStream {
172    type Item = Result<Bytes, std::io::Error>;
173    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
174        let poll = Pin::new(&mut self.receiver).poll_recv(cx);
175        // match &poll {
176        //     Poll::Ready(Some(Some(bytes))) => {
177        //         // http_log!("[ReceiverStream] poll_next: got {} bytes", bytes.len());
178        //     }
179        //     Poll::Ready(Some(None)) => {
180        //         // http_log!("[ReceiverStream] poll_next: sender shutdown");
181        //     }
182        //     Poll::Ready(None) => {
183        //         // http_log!("[ReceiverStream] poll_next: channel closed");
184        //     }
185        //     Poll::Pending => {
186        //         // http_log!("[ReceiverStream] poll_next: pending");
187        //     }
188        // }
189        match poll {
190            Poll::Ready(Some(Some(bytes))) => Poll::Ready(Some(Ok(bytes))),
191            Poll::Ready(Some(None)) => Poll::Ready(None), // Sender shutdown
192            Poll::Ready(None) => Poll::Ready(None),
193            Poll::Pending => Poll::Pending,
194        }
195    }
196}
197
198pin_project! {
199    pub struct HttpWriter {
200        url:String,
201        method: Method,
202        headers: HeaderMap,
203        err_rx: tokio::sync::oneshot::Receiver<std::io::Error>,
204        sender: tokio::sync::mpsc::Sender<Option<Bytes>>,
205        handle: tokio::task::JoinHandle<std::io::Result<()>>,
206        finish:bool,
207
208    }
209}
210
211impl HttpWriter {
212    /// Create a new HttpWriter for the given URL. The HTTP request is performed in the background.
213    pub async fn new(url: String, method: Method, headers: HeaderMap) -> io::Result<Self> {
214        // http_log!("[HttpWriter::new] url: {url}, method: {method:?}, headers: {headers:?}");
215        let url_clone = url.clone();
216        let method_clone = method.clone();
217        let headers_clone = headers.clone();
218
219        // First, try to write empty data to check if writable
220        let client = get_http_client();
221        let resp = client.put(&url).headers(headers.clone()).body(Vec::new()).send().await;
222        match resp {
223            Ok(resp) => {
224                // http_log!("[HttpWriter::new] empty PUT status: {}", resp.status());
225                if !resp.status().is_success() {
226                    return Err(Error::other(format!("Empty PUT failed: status {}", resp.status())));
227                }
228            }
229            Err(e) => {
230                // http_log!("[HttpWriter::new] empty PUT error: {e}");
231                return Err(Error::other(format!("Empty PUT failed: {e}")));
232            }
233        }
234
235        let (sender, receiver) = tokio::sync::mpsc::channel::<Option<Bytes>>(8);
236        let (err_tx, err_rx) = tokio::sync::oneshot::channel::<io::Error>();
237
238        let handle = tokio::spawn(async move {
239            let stream = ReceiverStream { receiver };
240            let body = reqwest::Body::wrap_stream(stream);
241            // http_log!(
242            //     "[HttpWriter::spawn] sending HTTP request: url={url_clone}, method={method_clone:?}, headers={headers_clone:?}"
243            // );
244
245            let client = get_http_client();
246            let request = client
247                .request(method_clone, url_clone.clone())
248                .headers(headers_clone.clone())
249                .body(body);
250
251            // Hold the request until the shutdown signal is received
252            let response = request.send().await;
253
254            match response {
255                Ok(resp) => {
256                    // http_log!("[HttpWriter::spawn] got response: status={}", resp.status());
257                    if !resp.status().is_success() {
258                        let _ = err_tx.send(Error::other(format!(
259                            "HttpWriter HTTP request failed with non-200 status {}",
260                            resp.status()
261                        )));
262                        return Err(Error::other(format!("HTTP request failed with non-200 status {}", resp.status())));
263                    }
264                }
265                Err(e) => {
266                    // http_log!("[HttpWriter::spawn] HTTP request error: {e}");
267                    let _ = err_tx.send(Error::other(format!("HTTP request failed: {e}")));
268                    return Err(Error::other(format!("HTTP request failed: {e}")));
269                }
270            }
271
272            // http_log!("[HttpWriter::spawn] HTTP request completed, exiting");
273            Ok(())
274        });
275
276        // http_log!("[HttpWriter::new] connection established successfully");
277        Ok(Self {
278            url,
279            method,
280            headers,
281            err_rx,
282            sender,
283            handle,
284            finish: false,
285        })
286    }
287
288    pub fn url(&self) -> &str {
289        &self.url
290    }
291
292    pub fn method(&self) -> &Method {
293        &self.method
294    }
295
296    pub fn headers(&self) -> &HeaderMap {
297        &self.headers
298    }
299}
300
301impl AsyncWrite for HttpWriter {
302    fn poll_write(mut self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
303        // http_log!(
304        //     "[HttpWriter::poll_write] url: {}, method: {:?}, buf.len: {}",
305        //     self.url,
306        //     self.method,
307        //     buf.len()
308        // );
309        if let Ok(e) = Pin::new(&mut self.err_rx).try_recv() {
310            return Poll::Ready(Err(e));
311        }
312
313        self.sender
314            .try_send(Some(Bytes::copy_from_slice(buf)))
315            .map_err(|e| Error::other(format!("HttpWriter send error: {e}")))?;
316
317        Poll::Ready(Ok(buf.len()))
318    }
319
320    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
321        Poll::Ready(Ok(()))
322    }
323
324    fn poll_shutdown(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
325        // let url = self.url.clone();
326        // let method = self.method.clone();
327
328        if !self.finish {
329            // http_log!("[HttpWriter::poll_shutdown] url: {}, method: {:?}", url, method);
330            self.sender
331                .try_send(None)
332                .map_err(|e| Error::other(format!("HttpWriter shutdown error: {e}")))?;
333            // http_log!(
334            //     "[HttpWriter::poll_shutdown] sent shutdown signal to HTTP request, url: {}, method: {:?}",
335            //     url,
336            //     method
337            // );
338
339            self.finish = true;
340        }
341        // Wait for the HTTP request to complete
342        use futures::FutureExt;
343        match Pin::new(&mut self.get_mut().handle).poll_unpin(_cx) {
344            Poll::Ready(Ok(_)) => {
345                // http_log!(
346                //     "[HttpWriter::poll_shutdown] HTTP request finished successfully, url: {}, method: {:?}",
347                //     url,
348                //     method
349                // );
350            }
351            Poll::Ready(Err(e)) => {
352                // http_log!("[HttpWriter::poll_shutdown] HTTP request failed: {e}, url: {}, method: {:?}", url, method);
353                return Poll::Ready(Err(Error::other(format!("HTTP request failed: {e}"))));
354            }
355            Poll::Pending => {
356                // http_log!("[HttpWriter::poll_shutdown] HTTP request pending, url: {}, method: {:?}", url, method);
357                return Poll::Pending;
358            }
359        }
360
361        Poll::Ready(Ok(()))
362    }
363}
364
365// #[cfg(test)]
366// mod tests {
367//     use super::*;
368//     use reqwest::Method;
369//     use std::vec;
370//     use tokio::io::{AsyncReadExt, AsyncWriteExt};
371
372//     #[tokio::test]
373//     async fn test_http_writer_err() {
374//         // Use a real local server for integration, or mockito for unit test
375//         // Here, we use the Go test server at 127.0.0.1:8081 (scripts/testfile.go)
376//         let url = "http://127.0.0.1:8081/testfile".to_string();
377//         let data = vec![42u8; 8];
378
379//         // Write
380//         // 添加 header X-Deny-Write = 1 模拟不可写入的情况
381//         let mut headers = HeaderMap::new();
382//         headers.insert("X-Deny-Write", "1".parse().unwrap());
383//         // 这里我们使用 PUT 方法
384//         let writer_result = HttpWriter::new(url.clone(), Method::PUT, headers).await;
385//         match writer_result {
386//             Ok(mut writer) => {
387//                 // 如果能创建成功,写入应该报错
388//                 let write_result = writer.write_all(&data).await;
389//                 assert!(write_result.is_err(), "write_all should fail when server denies write");
390//                 if let Err(e) = write_result {
391//                     println!("write_all error: {e}");
392//                 }
393//                 let shutdown_result = writer.shutdown().await;
394//                 if let Err(e) = shutdown_result {
395//                     println!("shutdown error: {e}");
396//                 }
397//             }
398//             Err(e) => {
399//                 // 直接构造失败也可以
400//                 println!("HttpWriter::new error: {e}");
401//                 assert!(
402//                     e.to_string().contains("Empty PUT failed") || e.to_string().contains("Forbidden"),
403//                     "unexpected error: {e}"
404//                 );
405//                 return;
406//             }
407//         }
408//         // Should not reach here
409//         panic!("HttpWriter should not allow writing when server denies write");
410//     }
411
412//     #[tokio::test]
413//     async fn test_http_writer_and_reader_ok() {
414//         // 使用本地 Go 测试服务器
415//         let url = "http://127.0.0.1:8081/testfile".to_string();
416//         let data = vec![99u8; 512 * 1024]; // 512KB of data
417
418//         // Write (不加 X-Deny-Write)
419//         let headers = HeaderMap::new();
420//         let mut writer = HttpWriter::new(url.clone(), Method::PUT, headers).await.unwrap();
421//         writer.write_all(&data).await.unwrap();
422//         writer.shutdown().await.unwrap();
423
424//         http_log!("Wrote {} bytes to {} (ok case)", data.len(), url);
425
426//         // Read back
427//         let mut reader = HttpReader::with_capacity(url.clone(), Method::GET, HeaderMap::new(), 8192)
428//             .await
429//             .unwrap();
430//         let mut buf = Vec::new();
431//         reader.read_to_end(&mut buf).await.unwrap();
432//         assert_eq!(buf, data);
433
434//         // println!("Read {} bytes from {} (ok case)", buf.len(), url);
435//         // tokio::time::sleep(std::time::Duration::from_secs(2)).await; // Wait for server to process
436//         // println!("[test_http_writer_and_reader_ok] completed successfully");
437//     }
438// }