rustfs_rio/http_reader.rs
1// Copyright 2024 RustFS Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bytes::Bytes;
16use futures::{Stream, TryStreamExt as _};
17use http::HeaderMap;
18use pin_project_lite::pin_project;
19use reqwest::{Client, Method, RequestBuilder};
20use std::error::Error as _;
21use std::io::{self, Error};
22use std::ops::Not as _;
23use std::pin::Pin;
24use std::sync::LazyLock;
25use std::task::{Context, Poll};
26use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
27use tokio::sync::mpsc;
28use tokio_util::io::StreamReader;
29
30use crate::{EtagResolvable, HashReaderDetector, HashReaderMut};
31
32fn get_http_client() -> Client {
33 // Reuse the HTTP connection pool in the global `reqwest::Client` instance
34 // TODO: interact with load balancing?
35 static CLIENT: LazyLock<Client> = LazyLock::new(Client::new);
36 CLIENT.clone()
37}
38
39static HTTP_DEBUG_LOG: bool = false;
40#[inline(always)]
41fn http_debug_log(args: std::fmt::Arguments) {
42 if HTTP_DEBUG_LOG {
43 println!("{args}");
44 }
45}
46macro_rules! http_log {
47 ($($arg:tt)*) => {
48 http_debug_log(format_args!($($arg)*));
49 };
50}
51
52pin_project! {
53 pub struct HttpReader {
54 url:String,
55 method: Method,
56 headers: HeaderMap,
57 #[pin]
58 inner: StreamReader<Pin<Box<dyn Stream<Item=std::io::Result<Bytes>>+Send+Sync>>, Bytes>,
59 }
60}
61
62impl HttpReader {
63 pub async fn new(url: String, method: Method, headers: HeaderMap, body: Option<Vec<u8>>) -> io::Result<Self> {
64 // http_log!("[HttpReader::new] url: {url}, method: {method:?}, headers: {headers:?}");
65 Self::with_capacity(url, method, headers, body, 0).await
66 }
67 /// Create a new HttpReader from a URL. The request is performed immediately.
68 pub async fn with_capacity(
69 url: String,
70 method: Method,
71 headers: HeaderMap,
72 body: Option<Vec<u8>>,
73 _read_buf_size: usize,
74 ) -> io::Result<Self> {
75 // http_log!(
76 // "[HttpReader::with_capacity] url: {url}, method: {method:?}, headers: {headers:?}, buf_size: {}",
77 // _read_buf_size
78 // );
79 // First, check if the connection is available (HEAD)
80 let client = get_http_client();
81 let head_resp = client.head(&url).headers(headers.clone()).send().await;
82 match head_resp {
83 Ok(resp) => {
84 http_log!("[HttpReader::new] HEAD status: {}", resp.status());
85 if !resp.status().is_success() {
86 return Err(Error::other(format!("HEAD failed: url: {}, status {}", url, resp.status())));
87 }
88 }
89 Err(e) => {
90 http_log!("[HttpReader::new] HEAD error: {e}");
91 return Err(Error::other(e.source().map(|s| s.to_string()).unwrap_or_else(|| e.to_string())));
92 }
93 }
94
95 let client = get_http_client();
96 let mut request: RequestBuilder = client.request(method.clone(), url.clone()).headers(headers.clone());
97 if let Some(body) = body {
98 request = request.body(body);
99 }
100
101 let resp = request
102 .send()
103 .await
104 .map_err(|e| Error::other(format!("HttpReader HTTP request error: {e}")))?;
105
106 if resp.status().is_success().not() {
107 return Err(Error::other(format!(
108 "HttpReader HTTP request failed with non-200 status {}",
109 resp.status()
110 )));
111 }
112
113 let stream = resp
114 .bytes_stream()
115 .map_err(|e| Error::other(format!("HttpReader stream error: {e}")));
116
117 Ok(Self {
118 inner: StreamReader::new(Box::pin(stream)),
119 url,
120 method,
121 headers,
122 })
123 }
124 pub fn url(&self) -> &str {
125 &self.url
126 }
127 pub fn method(&self) -> &Method {
128 &self.method
129 }
130 pub fn headers(&self) -> &HeaderMap {
131 &self.headers
132 }
133}
134
135impl AsyncRead for HttpReader {
136 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<std::io::Result<()>> {
137 // http_log!(
138 // "[HttpReader::poll_read] url: {}, method: {:?}, buf.remaining: {}",
139 // self.url,
140 // self.method,
141 // buf.remaining()
142 // );
143 // Read from the inner stream
144 Pin::new(&mut self.inner).poll_read(cx, buf)
145 }
146}
147
148impl EtagResolvable for HttpReader {
149 fn is_etag_reader(&self) -> bool {
150 false
151 }
152 fn try_resolve_etag(&mut self) -> Option<String> {
153 None
154 }
155}
156
157impl HashReaderDetector for HttpReader {
158 fn is_hash_reader(&self) -> bool {
159 false
160 }
161
162 fn as_hash_reader_mut(&mut self) -> Option<&mut dyn HashReaderMut> {
163 None
164 }
165}
166
167struct ReceiverStream {
168 receiver: mpsc::Receiver<Option<Bytes>>,
169}
170
171impl Stream for ReceiverStream {
172 type Item = Result<Bytes, std::io::Error>;
173 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
174 let poll = Pin::new(&mut self.receiver).poll_recv(cx);
175 // match &poll {
176 // Poll::Ready(Some(Some(bytes))) => {
177 // // http_log!("[ReceiverStream] poll_next: got {} bytes", bytes.len());
178 // }
179 // Poll::Ready(Some(None)) => {
180 // // http_log!("[ReceiverStream] poll_next: sender shutdown");
181 // }
182 // Poll::Ready(None) => {
183 // // http_log!("[ReceiverStream] poll_next: channel closed");
184 // }
185 // Poll::Pending => {
186 // // http_log!("[ReceiverStream] poll_next: pending");
187 // }
188 // }
189 match poll {
190 Poll::Ready(Some(Some(bytes))) => Poll::Ready(Some(Ok(bytes))),
191 Poll::Ready(Some(None)) => Poll::Ready(None), // Sender shutdown
192 Poll::Ready(None) => Poll::Ready(None),
193 Poll::Pending => Poll::Pending,
194 }
195 }
196}
197
198pin_project! {
199 pub struct HttpWriter {
200 url:String,
201 method: Method,
202 headers: HeaderMap,
203 err_rx: tokio::sync::oneshot::Receiver<std::io::Error>,
204 sender: tokio::sync::mpsc::Sender<Option<Bytes>>,
205 handle: tokio::task::JoinHandle<std::io::Result<()>>,
206 finish:bool,
207
208 }
209}
210
211impl HttpWriter {
212 /// Create a new HttpWriter for the given URL. The HTTP request is performed in the background.
213 pub async fn new(url: String, method: Method, headers: HeaderMap) -> io::Result<Self> {
214 // http_log!("[HttpWriter::new] url: {url}, method: {method:?}, headers: {headers:?}");
215 let url_clone = url.clone();
216 let method_clone = method.clone();
217 let headers_clone = headers.clone();
218
219 // First, try to write empty data to check if writable
220 let client = get_http_client();
221 let resp = client.put(&url).headers(headers.clone()).body(Vec::new()).send().await;
222 match resp {
223 Ok(resp) => {
224 // http_log!("[HttpWriter::new] empty PUT status: {}", resp.status());
225 if !resp.status().is_success() {
226 return Err(Error::other(format!("Empty PUT failed: status {}", resp.status())));
227 }
228 }
229 Err(e) => {
230 // http_log!("[HttpWriter::new] empty PUT error: {e}");
231 return Err(Error::other(format!("Empty PUT failed: {e}")));
232 }
233 }
234
235 let (sender, receiver) = tokio::sync::mpsc::channel::<Option<Bytes>>(8);
236 let (err_tx, err_rx) = tokio::sync::oneshot::channel::<io::Error>();
237
238 let handle = tokio::spawn(async move {
239 let stream = ReceiverStream { receiver };
240 let body = reqwest::Body::wrap_stream(stream);
241 // http_log!(
242 // "[HttpWriter::spawn] sending HTTP request: url={url_clone}, method={method_clone:?}, headers={headers_clone:?}"
243 // );
244
245 let client = get_http_client();
246 let request = client
247 .request(method_clone, url_clone.clone())
248 .headers(headers_clone.clone())
249 .body(body);
250
251 // Hold the request until the shutdown signal is received
252 let response = request.send().await;
253
254 match response {
255 Ok(resp) => {
256 // http_log!("[HttpWriter::spawn] got response: status={}", resp.status());
257 if !resp.status().is_success() {
258 let _ = err_tx.send(Error::other(format!(
259 "HttpWriter HTTP request failed with non-200 status {}",
260 resp.status()
261 )));
262 return Err(Error::other(format!("HTTP request failed with non-200 status {}", resp.status())));
263 }
264 }
265 Err(e) => {
266 // http_log!("[HttpWriter::spawn] HTTP request error: {e}");
267 let _ = err_tx.send(Error::other(format!("HTTP request failed: {e}")));
268 return Err(Error::other(format!("HTTP request failed: {e}")));
269 }
270 }
271
272 // http_log!("[HttpWriter::spawn] HTTP request completed, exiting");
273 Ok(())
274 });
275
276 // http_log!("[HttpWriter::new] connection established successfully");
277 Ok(Self {
278 url,
279 method,
280 headers,
281 err_rx,
282 sender,
283 handle,
284 finish: false,
285 })
286 }
287
288 pub fn url(&self) -> &str {
289 &self.url
290 }
291
292 pub fn method(&self) -> &Method {
293 &self.method
294 }
295
296 pub fn headers(&self) -> &HeaderMap {
297 &self.headers
298 }
299}
300
301impl AsyncWrite for HttpWriter {
302 fn poll_write(mut self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
303 // http_log!(
304 // "[HttpWriter::poll_write] url: {}, method: {:?}, buf.len: {}",
305 // self.url,
306 // self.method,
307 // buf.len()
308 // );
309 if let Ok(e) = Pin::new(&mut self.err_rx).try_recv() {
310 return Poll::Ready(Err(e));
311 }
312
313 self.sender
314 .try_send(Some(Bytes::copy_from_slice(buf)))
315 .map_err(|e| Error::other(format!("HttpWriter send error: {e}")))?;
316
317 Poll::Ready(Ok(buf.len()))
318 }
319
320 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
321 Poll::Ready(Ok(()))
322 }
323
324 fn poll_shutdown(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
325 // let url = self.url.clone();
326 // let method = self.method.clone();
327
328 if !self.finish {
329 // http_log!("[HttpWriter::poll_shutdown] url: {}, method: {:?}", url, method);
330 self.sender
331 .try_send(None)
332 .map_err(|e| Error::other(format!("HttpWriter shutdown error: {e}")))?;
333 // http_log!(
334 // "[HttpWriter::poll_shutdown] sent shutdown signal to HTTP request, url: {}, method: {:?}",
335 // url,
336 // method
337 // );
338
339 self.finish = true;
340 }
341 // Wait for the HTTP request to complete
342 use futures::FutureExt;
343 match Pin::new(&mut self.get_mut().handle).poll_unpin(_cx) {
344 Poll::Ready(Ok(_)) => {
345 // http_log!(
346 // "[HttpWriter::poll_shutdown] HTTP request finished successfully, url: {}, method: {:?}",
347 // url,
348 // method
349 // );
350 }
351 Poll::Ready(Err(e)) => {
352 // http_log!("[HttpWriter::poll_shutdown] HTTP request failed: {e}, url: {}, method: {:?}", url, method);
353 return Poll::Ready(Err(Error::other(format!("HTTP request failed: {e}"))));
354 }
355 Poll::Pending => {
356 // http_log!("[HttpWriter::poll_shutdown] HTTP request pending, url: {}, method: {:?}", url, method);
357 return Poll::Pending;
358 }
359 }
360
361 Poll::Ready(Ok(()))
362 }
363}
364
365// #[cfg(test)]
366// mod tests {
367// use super::*;
368// use reqwest::Method;
369// use std::vec;
370// use tokio::io::{AsyncReadExt, AsyncWriteExt};
371
372// #[tokio::test]
373// async fn test_http_writer_err() {
374// // Use a real local server for integration, or mockito for unit test
375// // Here, we use the Go test server at 127.0.0.1:8081 (scripts/testfile.go)
376// let url = "http://127.0.0.1:8081/testfile".to_string();
377// let data = vec![42u8; 8];
378
379// // Write
380// // 添加 header X-Deny-Write = 1 模拟不可写入的情况
381// let mut headers = HeaderMap::new();
382// headers.insert("X-Deny-Write", "1".parse().unwrap());
383// // 这里我们使用 PUT 方法
384// let writer_result = HttpWriter::new(url.clone(), Method::PUT, headers).await;
385// match writer_result {
386// Ok(mut writer) => {
387// // 如果能创建成功,写入应该报错
388// let write_result = writer.write_all(&data).await;
389// assert!(write_result.is_err(), "write_all should fail when server denies write");
390// if let Err(e) = write_result {
391// println!("write_all error: {e}");
392// }
393// let shutdown_result = writer.shutdown().await;
394// if let Err(e) = shutdown_result {
395// println!("shutdown error: {e}");
396// }
397// }
398// Err(e) => {
399// // 直接构造失败也可以
400// println!("HttpWriter::new error: {e}");
401// assert!(
402// e.to_string().contains("Empty PUT failed") || e.to_string().contains("Forbidden"),
403// "unexpected error: {e}"
404// );
405// return;
406// }
407// }
408// // Should not reach here
409// panic!("HttpWriter should not allow writing when server denies write");
410// }
411
412// #[tokio::test]
413// async fn test_http_writer_and_reader_ok() {
414// // 使用本地 Go 测试服务器
415// let url = "http://127.0.0.1:8081/testfile".to_string();
416// let data = vec![99u8; 512 * 1024]; // 512KB of data
417
418// // Write (不加 X-Deny-Write)
419// let headers = HeaderMap::new();
420// let mut writer = HttpWriter::new(url.clone(), Method::PUT, headers).await.unwrap();
421// writer.write_all(&data).await.unwrap();
422// writer.shutdown().await.unwrap();
423
424// http_log!("Wrote {} bytes to {} (ok case)", data.len(), url);
425
426// // Read back
427// let mut reader = HttpReader::with_capacity(url.clone(), Method::GET, HeaderMap::new(), 8192)
428// .await
429// .unwrap();
430// let mut buf = Vec::new();
431// reader.read_to_end(&mut buf).await.unwrap();
432// assert_eq!(buf, data);
433
434// // println!("Read {} bytes from {} (ok case)", buf.len(), url);
435// // tokio::time::sleep(std::time::Duration::from_secs(2)).await; // Wait for server to process
436// // println!("[test_http_writer_and_reader_ok] completed successfully");
437// }
438// }