1use std::borrow::Cow;
2use std::io::{Error, ErrorKind, Result};
3use std::ops::{Bound, RangeBounds};
4use std::path::{Path, PathBuf};
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Poll;
8
9use bytes::Bytes;
10use futures::{Stream, StreamExt};
11use reqwest::header::{CONTENT_LENGTH, LAST_MODIFIED};
12use reqwest::{StatusCode, Url, header};
13use time::OffsetDateTime;
14use time::format_description::well_known::Rfc2822;
15
16mod parser;
17
18pub(crate) fn error_from_status(code: StatusCode) -> Result<StatusCode> {
21 if code.is_server_error() {
22 Err(Error::other(
23 code.canonical_reason().unwrap_or(code.as_str()),
24 ))
25 } else if code.is_client_error() {
26 let kind = match code {
27 StatusCode::NOT_FOUND => ErrorKind::NotFound,
28 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => ErrorKind::PermissionDenied,
29 _ => ErrorKind::Other,
30 };
31 let msg = code.canonical_reason().unwrap_or(code.as_str());
32 Err(Error::new(kind, msg))
33 } else {
34 Ok(code)
35 }
36}
37
38pub(crate) struct RangeHeader<R: RangeBounds<u64>>(pub R);
40
41impl<R: RangeBounds<u64>> std::fmt::Display for RangeHeader<R> {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.write_str("bytes=")?;
45 match self.0.start_bound() {
46 Bound::Unbounded => write!(f, "0-"),
47 Bound::Included(v) => write!(f, "{v}-"),
48 Bound::Excluded(v) => write!(f, "{}-", v + 1),
49 }?;
50 match self.0.end_bound() {
51 Bound::Unbounded => {}
52 Bound::Included(v) => {
53 write!(f, "{}", v + 1)?;
54 }
55 Bound::Excluded(v) => {
56 write!(f, "{}", v)?;
57 }
58 };
59 Ok(())
60 }
61}
62
63#[derive(Clone, Debug)]
64#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
65pub struct HttpStoreConfig {
66 pub base_url: String,
67}
68
69impl HttpStoreConfig {
70 pub fn build(&self) -> Result<HttpStore> {
71 HttpStore::new(&self.base_url)
72 }
73}
74
75struct InnerHttpStore {
77 base_url: Url,
78 parser: parser::Parser,
79 client: reqwest::Client,
80}
81
82impl InnerHttpStore {
83 fn get_url(&self, path: &Path) -> Result<Url> {
85 let clean = crate::util::clean_path(path)?;
86 self.base_url
87 .join(&clean.to_string_lossy())
88 .map_err(|err| Error::new(ErrorKind::InvalidData, err))
89 }
90}
91
92#[derive(Clone)]
95pub struct HttpStore(Arc<InnerHttpStore>);
96
97impl std::fmt::Debug for HttpStore {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 f.debug_struct(stringify!(HttpStore))
100 .field("base_url", &self.0.base_url)
101 .finish_non_exhaustive()
102 }
103}
104
105impl HttpStore {
106 pub fn new(base_url: impl AsRef<str>) -> Result<Self> {
111 let base_url = base_url.as_ref();
112 let base_url = if base_url.ends_with("/") {
113 Cow::Borrowed(base_url)
114 } else {
115 Cow::Owned(format!("{base_url}/"))
116 };
117 let base_url = Url::parse(base_url.as_ref())
118 .map_err(|err| Error::new(ErrorKind::InvalidInput, err))?;
119 Ok(Self(Arc::new(InnerHttpStore {
120 base_url,
121 parser: parser::Parser::default(),
122 client: reqwest::Client::new(),
123 })))
124 }
125}
126
127impl crate::Store for HttpStore {
128 type Directory = HttpStoreDirectory;
129 type File = HttpStoreFile;
130
131 async fn get_file<P: Into<std::path::PathBuf>>(&self, path: P) -> Result<Self::File> {
133 Ok(HttpStoreFile {
134 store: self.0.clone(),
135 path: path.into(),
136 })
137 }
138
139 async fn get_dir<P: Into<PathBuf>>(&self, path: P) -> Result<Self::Directory> {
141 Ok(HttpStoreDirectory {
142 store: self.0.clone(),
143 path: path.into(),
144 })
145 }
146}
147
148pub struct HttpStoreDirectory {
150 store: Arc<InnerHttpStore>,
151 path: PathBuf,
152}
153
154impl std::fmt::Debug for HttpStoreDirectory {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 f.debug_struct(stringify!(HttpStoreDirectory))
157 .field("path", &self.path)
158 .finish_non_exhaustive()
159 }
160}
161
162impl crate::StoreDirectory for HttpStoreDirectory {
163 type Entry = HttpStoreEntry;
164 type Reader = HttpStoreDirectoryReader;
165
166 async fn exists(&self) -> Result<bool> {
168 let url = self.store.get_url(&self.path)?;
169 match self.store.client.head(url).send().await {
170 Ok(res) => match res.status() {
171 StatusCode::NOT_FOUND => Ok(false),
172 other => error_from_status(other).map(|_| true),
173 },
174 Err(err) => Err(Error::other(err)),
175 }
176 }
177
178 async fn read(&self) -> Result<Self::Reader> {
180 let url = self.store.get_url(&self.path)?;
181 let res = self
182 .store
183 .client
184 .get(url)
185 .send()
186 .await
187 .map_err(Error::other)?;
188 error_from_status(res.status())?;
189 let html = res.text().await.map_err(Error::other)?;
190 let mut entries = self.store.parser.parse(&html).collect::<Vec<_>>();
191 entries.reverse();
192
193 Ok(HttpStoreDirectoryReader {
194 store: self.store.clone(),
195 path: self.path.clone(),
196 entries,
197 })
198 }
199}
200
201pub struct HttpStoreDirectoryReader {
203 store: Arc<InnerHttpStore>,
204 path: PathBuf,
205 entries: Vec<String>,
206}
207
208impl std::fmt::Debug for HttpStoreDirectoryReader {
209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 f.debug_struct(stringify!(HttpStoreDirectoryReader))
211 .field("path", &self.path)
212 .field("entries", &self.entries)
213 .finish_non_exhaustive()
214 }
215}
216
217impl Stream for HttpStoreDirectoryReader {
218 type Item = Result<HttpStoreEntry>;
219
220 fn poll_next(
222 mut self: Pin<&mut Self>,
223 _cx: &mut std::task::Context<'_>,
224 ) -> Poll<Option<Self::Item>> {
225 let mut this = self.as_mut();
226
227 if let Some(entry) = this.entries.pop() {
228 Poll::Ready(Some(HttpStoreEntry::new(
229 self.store.clone(),
230 self.path.clone(),
231 entry,
232 )))
233 } else {
234 Poll::Ready(None)
235 }
236 }
237}
238
239impl crate::StoreDirectoryReader<HttpStoreEntry> for HttpStoreDirectoryReader {}
240
241pub struct HttpStoreFile {
243 store: Arc<InnerHttpStore>,
244 path: PathBuf,
245}
246
247impl std::fmt::Debug for HttpStoreFile {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 f.debug_struct(stringify!(HttpStoreFile))
250 .field("path", &self.path)
251 .finish_non_exhaustive()
252 }
253}
254
255impl crate::StoreFile for HttpStoreFile {
256 type FileReader = HttpStoreFileReader;
257 type FileWriter = crate::NoopFileWriter;
258 type Metadata = HttpStoreFileMetadata;
259
260 fn filename(&self) -> Option<Cow<'_, str>> {
262 let cmp = self.path.components().next_back()?;
263 Some(cmp.as_os_str().to_string_lossy())
264 }
265
266 async fn exists(&self) -> Result<bool> {
268 let url = self.store.get_url(&self.path)?;
269 let res = self
270 .store
271 .client
272 .head(url)
273 .send()
274 .await
275 .map_err(Error::other)?;
276 match res.status() {
277 StatusCode::NOT_FOUND => Ok(false),
278 other => error_from_status(other).map(|_| true),
279 }
280 }
281
282 async fn metadata(&self) -> Result<Self::Metadata> {
284 let url = self.store.get_url(&self.path)?;
285 let res = self
286 .store
287 .client
288 .head(url)
289 .send()
290 .await
291 .map_err(Error::other)?;
292 error_from_status(res.status())?;
293 let size = res
294 .headers()
295 .get(CONTENT_LENGTH)
296 .and_then(|value| value.to_str().ok())
297 .and_then(|value| value.parse::<u64>().ok())
298 .unwrap_or(0);
299 let modified = res
300 .headers()
301 .get(LAST_MODIFIED)
302 .and_then(|value| value.to_str().ok())
303 .and_then(|value| OffsetDateTime::parse(value, &Rfc2822).ok())
304 .map(|dt| dt.unix_timestamp() as u64)
305 .unwrap_or(0);
306 Ok(HttpStoreFileMetadata { size, modified })
307 }
308
309 async fn read<R: std::ops::RangeBounds<u64>>(&self, range: R) -> Result<Self::FileReader> {
311 let url = self.store.get_url(&self.path)?;
312 let res = self
313 .store
314 .client
315 .get(url)
316 .header(header::RANGE, RangeHeader(range).to_string())
317 .send()
318 .await
319 .map_err(Error::other)?;
320 HttpStoreFileReader::from_response(res)
321 }
322
323 async fn write(&self, _options: crate::WriteOptions) -> Result<Self::FileWriter> {
324 Err(Error::new(
325 ErrorKind::Unsupported,
326 "http store doesn't support write operations",
327 ))
328 }
329}
330
331#[derive(Clone, Debug)]
333pub struct HttpStoreFileMetadata {
334 size: u64,
335 modified: u64,
336}
337
338impl super::StoreMetadata for HttpStoreFileMetadata {
339 fn size(&self) -> u64 {
341 self.size
342 }
343
344 fn created(&self) -> u64 {
346 0
347 }
348
349 fn modified(&self) -> u64 {
351 self.modified
352 }
353}
354
355pub struct HttpStoreFileReader {
357 stream: Pin<Box<dyn Stream<Item = reqwest::Result<Bytes>> + std::marker::Send>>,
358}
359
360impl std::fmt::Debug for HttpStoreFileReader {
361 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
362 f.debug_struct(stringify!(HttpStoreFileReader))
363 .finish_non_exhaustive()
364 }
365}
366
367impl HttpStoreFileReader {
368 pub(crate) fn from_response(res: reqwest::Response) -> Result<Self> {
372 crate::http::error_from_status(res.status())?;
373 let stream = res.bytes_stream().boxed();
375 Ok(Self { stream })
376 }
377}
378
379impl tokio::io::AsyncRead for HttpStoreFileReader {
380 fn poll_read(
384 self: std::pin::Pin<&mut Self>,
385 cx: &mut std::task::Context<'_>,
386 buf: &mut tokio::io::ReadBuf<'_>,
387 ) -> std::task::Poll<std::io::Result<()>> {
388 let stream = &mut self.get_mut().stream;
389
390 match Pin::new(stream).poll_next(cx) {
391 Poll::Ready(Some(Ok(chunk))) => {
392 let len = buf.remaining();
393 let to_read = chunk.len().min(len);
394 buf.put_slice(&chunk[..to_read]);
395 Poll::Ready(Ok(()))
396 }
397 Poll::Ready(Some(Err(err))) => Poll::Ready(Err(Error::new(ErrorKind::Other, err))),
399 Poll::Ready(None) => Poll::Ready(Ok(())),
401 Poll::Pending => Poll::Pending,
402 }
403 }
404}
405
406impl crate::StoreFileReader for HttpStoreFileReader {}
407
408pub type HttpStoreEntry = crate::Entry<HttpStoreFile, HttpStoreDirectory>;
410
411impl HttpStoreEntry {
412 fn new(store: Arc<InnerHttpStore>, parent: PathBuf, entry: String) -> Result<Self> {
417 let path = parent.join(&entry);
418 Ok(if entry.ends_with('/') {
419 Self::Directory(HttpStoreDirectory { store, path })
420 } else {
421 Self::File(HttpStoreFile { store, path })
422 })
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use std::io::ErrorKind;
429 use std::path::PathBuf;
430
431 use futures::StreamExt;
432 use reqwest::header::{CONTENT_LENGTH, LAST_MODIFIED};
433 use tokio::io::AsyncReadExt;
434
435 use crate::http::HttpStore;
436 use crate::{Store, StoreDirectory, StoreFile, StoreMetadata};
437
438 #[test_case::test_case("http://localhost", "/foo.txt", "http://localhost/foo.txt"; "root with simple path with prefix")]
439 #[test_case::test_case("http://localhost", "foo.txt", "http://localhost/foo.txt"; "root with simple path without prefix")]
440 #[test_case::test_case("http://localhost/", "foo.txt", "http://localhost/foo.txt"; "root with simple path with slash on base")]
441 #[test_case::test_case("http://localhost/", "/foo.txt", "http://localhost/foo.txt"; "root with simple path with slashes")]
442 #[test_case::test_case("http://localhost/foo", "/bar/baz.txt", "http://localhost/foo/bar/baz.txt"; "with more children")]
443 #[test_case::test_case("http://localhost/foo", "/bar/with space.txt", "http://localhost/foo/bar/with%20space.txt"; "with spaces")]
444 fn building_path(base_url: &str, path: &str, expected: &str) {
445 let store = HttpStore::new(base_url).unwrap();
446 let path = PathBuf::from(path);
447 let url = store.0.get_url(&path).unwrap();
448 assert_eq!(url.as_str(), expected);
449 }
450
451 #[tokio::test]
452 async fn file_should_handle_base_with_ending_slash() {
453 let mut srv = mockito::Server::new_async().await;
454 let mock = srv
455 .mock("HEAD", "/foo/not-found.txt")
456 .with_status(404)
457 .create_async()
458 .await;
459 let store = HttpStore::new(format!("{}/foo/", srv.url())).unwrap();
460 let file = store.get_file("/not-found.txt").await.unwrap();
461 assert!(!file.exists().await.unwrap());
462 mock.assert_async().await;
463 }
464
465 #[tokio::test]
466 async fn file_should_check_if_file_exists() {
467 let mut srv = mockito::Server::new_async().await;
468 let mock = srv
469 .mock("HEAD", "/not-found.txt")
470 .with_status(404)
471 .create_async()
472 .await;
473 let store = HttpStore::new(srv.url()).unwrap();
474 let file = store.get_file("/not-found.txt").await.unwrap();
475 assert!(!file.exists().await.unwrap());
476 mock.assert_async().await;
477 }
478
479 #[tokio::test]
480 async fn file_should_get_filename() {
481 let srv = mockito::Server::new_async().await;
482 let store = HttpStore::new(srv.url()).unwrap();
483 let file = store.get_file("/test/file.txt").await.unwrap();
484 let name = file.filename().unwrap();
485 assert_eq!(name, "file.txt");
486 }
487
488 #[tokio::test]
489 async fn file_should_get_filename_with_space() {
490 let srv = mockito::Server::new_async().await;
491 let store = HttpStore::new(srv.url()).unwrap();
492 let file = store.get_file("/test/with space.txt").await.unwrap();
493 let name = file.filename().unwrap();
494 assert_eq!(name, "with space.txt");
495 }
496
497 #[tokio::test]
498 async fn file_meta_should_give_all() {
499 let mut srv = mockito::Server::new_async().await;
500 let mock = srv
501 .mock("HEAD", "/test/file.txt")
502 .with_status(200)
503 .with_header(CONTENT_LENGTH, "1234")
504 .with_header(LAST_MODIFIED, "Thu, 01 May 2025 09:57:28 GMT")
505 .create_async()
506 .await;
507 let store = HttpStore::new(srv.url()).unwrap();
508 let file = store.get_file("/test/file.txt").await.unwrap();
509 let meta = file.metadata().await.unwrap();
510 assert_eq!(meta.size, 1234);
511 assert_eq!(meta.created(), 0);
512 assert_eq!(meta.modified(), 1746093448);
513 mock.assert_async().await;
514 }
515
516 #[tokio::test]
517 async fn file_reader_should_read_entire_file() {
518 let mut srv = mockito::Server::new_async().await;
519 let _m = srv
520 .mock("GET", "/test/file")
521 .with_status(200)
522 .with_header("Content-Type", "application/octet-stream")
523 .with_body("Hello, world!")
524 .create();
525 let store = HttpStore::new(srv.url()).unwrap();
526 let file = store.get_file("/test/file").await.unwrap();
527
528 let reader = file.read(0..5).await.unwrap();
529
530 let mut buf = vec![0; 5];
531 let mut async_reader = tokio::io::BufReader::new(reader);
532 let n = async_reader.read(&mut buf).await.unwrap();
533
534 assert_eq!(n, 5);
535 assert_eq!(&buf, b"Hello");
536 }
537
538 #[tokio::test]
539 async fn file_reader_should_read_single_range() {
540 let mut srv = mockito::Server::new_async().await;
541 let _m = srv
542 .mock("GET", "/test/file")
543 .with_status(206) .with_header("Content-Type", "application/octet-stream")
545 .with_header("Content-Range", "bytes 0-4/12")
546 .with_body("Hello, world!")
547 .create();
548
549 let store = HttpStore::new(srv.url()).unwrap();
550 let file = store.get_file("/test/file").await.unwrap();
551
552 let reader = file.read(0..5).await.unwrap();
553 let mut buf = vec![0; 5];
554
555 let mut async_reader = tokio::io::BufReader::new(reader);
556 let n = async_reader.read(&mut buf).await.unwrap();
557
558 assert_eq!(n, 5);
559
560 assert_eq!(&buf, b"Hello");
561 }
562
563 #[tokio::test]
564 async fn file_reader_should_fail_with_not_found() {
565 let mut srv = mockito::Server::new_async().await;
566 let _m = srv.mock("GET", "/test/file").with_status(404).create();
567
568 let store = HttpStore::new(srv.url()).unwrap();
569 let file = store.get_file("/test/file").await.unwrap();
570
571 let result = file.read(0..5).await;
572 match result {
573 Ok(_) => panic!("should fail"),
574 Err(err) => assert_eq!(err.kind(), ErrorKind::NotFound),
575 }
576 }
577
578 #[tokio::test]
579 async fn dir_should_list_entries() {
580 let mut srv = mockito::Server::new_async().await;
581 let _m = srv
582 .mock("GET", "/NEH")
583 .with_status(200)
584 .with_body(include_str!("../../assets/apache.html"))
585 .create();
586
587 let store = HttpStore::new(srv.url()).unwrap();
588 let dir = store.get_dir("/NEH").await.unwrap();
589 let mut content = dir.read().await.unwrap();
590
591 let mut result = Vec::new();
592 while let Some(entry) = content.next().await {
593 result.push(entry.unwrap());
594 }
595 assert_eq!(result.len(), 46);
596
597 assert_eq!(result.iter().filter(|item| item.is_directory()).count(), 41);
598 assert_eq!(result.iter().filter(|item| item.is_file()).count(), 5);
599 }
600}