any_storage/http/
mod.rs

1use std::borrow::Cow;
2use std::io::{Error, ErrorKind, Result};
3use std::ops::{Bound, RangeBounds};
4use std::path::{Path, PathBuf};
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Poll;
8
9use bytes::Bytes;
10use futures::{Stream, StreamExt};
11use reqwest::header::{CONTENT_LENGTH, LAST_MODIFIED};
12use reqwest::{StatusCode, Url, header};
13use time::OffsetDateTime;
14use time::format_description::well_known::Rfc2822;
15
16mod parser;
17
18/// Converts an HTTP status code into a `Result`, returning an `io::Error`
19/// for client or server errors, and `Ok(code)` otherwise.
20pub(crate) fn error_from_status(code: StatusCode) -> Result<StatusCode> {
21    if code.is_server_error() {
22        Err(Error::other(
23            code.canonical_reason().unwrap_or(code.as_str()),
24        ))
25    } else if code.is_client_error() {
26        let kind = match code {
27            StatusCode::NOT_FOUND => ErrorKind::NotFound,
28            StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => ErrorKind::PermissionDenied,
29            _ => ErrorKind::Other,
30        };
31        let msg = code.canonical_reason().unwrap_or(code.as_str());
32        Err(Error::new(kind, msg))
33    } else {
34        Ok(code)
35    }
36}
37
38/// Helper struct to format HTTP Range headers from a `RangeBounds<u64>`.
39pub(crate) struct RangeHeader<R: RangeBounds<u64>>(pub R);
40
41impl<R: RangeBounds<u64>> std::fmt::Display for RangeHeader<R> {
42    /// Formats the HTTP `Range` header value (e.g., "bytes=0-100").
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        f.write_str("bytes=")?;
45        match self.0.start_bound() {
46            Bound::Unbounded => write!(f, "0-"),
47            Bound::Included(v) => write!(f, "{v}-"),
48            Bound::Excluded(v) => write!(f, "{}-", v + 1),
49        }?;
50        match self.0.end_bound() {
51            Bound::Unbounded => {}
52            Bound::Included(v) => {
53                write!(f, "{}", v + 1)?;
54            }
55            Bound::Excluded(v) => {
56                write!(f, "{}", v)?;
57            }
58        };
59        Ok(())
60    }
61}
62
63#[derive(Clone, Debug)]
64#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
65pub struct HttpStoreConfig {
66    pub base_url: String,
67}
68
69impl HttpStoreConfig {
70    pub fn build(&self) -> Result<HttpStore> {
71        HttpStore::new(&self.base_url)
72    }
73}
74
75/// Internal representation of the HTTP-backed store.
76struct InnerHttpStore {
77    base_url: Url,
78    parser: parser::Parser,
79    client: reqwest::Client,
80}
81
82impl InnerHttpStore {
83    /// Resolves a relative file or directory path into a full URL.
84    fn get_url(&self, path: &Path) -> Result<Url> {
85        let clean = crate::util::clean_path(path)?;
86        self.base_url
87            .join(&clean.to_string_lossy())
88            .map_err(|err| Error::new(ErrorKind::InvalidData, err))
89    }
90}
91
92/// Public HTTP-backed file store supporting asynchronous access to remote files
93/// and directories.
94#[derive(Clone)]
95pub struct HttpStore(Arc<InnerHttpStore>);
96
97impl std::fmt::Debug for HttpStore {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        f.debug_struct(stringify!(HttpStore))
100            .field("base_url", &self.0.base_url)
101            .finish_non_exhaustive()
102    }
103}
104
105impl HttpStore {
106    /// Creates a new `HttpStore` from a base URL.
107    ///
108    /// Ensures the base URL ends with a trailing slash and initializes the HTTP
109    /// client and parser.
110    pub fn new(base_url: impl AsRef<str>) -> Result<Self> {
111        let base_url = base_url.as_ref();
112        let base_url = if base_url.ends_with("/") {
113            Cow::Borrowed(base_url)
114        } else {
115            Cow::Owned(format!("{base_url}/"))
116        };
117        let base_url = Url::parse(base_url.as_ref())
118            .map_err(|err| Error::new(ErrorKind::InvalidInput, err))?;
119        Ok(Self(Arc::new(InnerHttpStore {
120            base_url,
121            parser: parser::Parser::default(),
122            client: reqwest::Client::new(),
123        })))
124    }
125}
126
127impl crate::Store for HttpStore {
128    type Directory = HttpStoreDirectory;
129    type File = HttpStoreFile;
130
131    /// Retrieves a file from the HTTP store at the given path.
132    async fn get_file<P: Into<std::path::PathBuf>>(&self, path: P) -> Result<Self::File> {
133        Ok(HttpStoreFile {
134            store: self.0.clone(),
135            path: path.into(),
136        })
137    }
138
139    /// Retrieves a directory from the HTTP store at the given path.
140    async fn get_dir<P: Into<PathBuf>>(&self, path: P) -> Result<Self::Directory> {
141        Ok(HttpStoreDirectory {
142            store: self.0.clone(),
143            path: path.into(),
144        })
145    }
146}
147
148/// Representation of a directory in the HTTP store.
149pub struct HttpStoreDirectory {
150    store: Arc<InnerHttpStore>,
151    path: PathBuf,
152}
153
154impl std::fmt::Debug for HttpStoreDirectory {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        f.debug_struct(stringify!(HttpStoreDirectory))
157            .field("path", &self.path)
158            .finish_non_exhaustive()
159    }
160}
161
162impl crate::StoreDirectory for HttpStoreDirectory {
163    type Entry = HttpStoreEntry;
164    type Reader = HttpStoreDirectoryReader;
165
166    /// Checks if the HTTP directory exists via a HEAD request.
167    async fn exists(&self) -> Result<bool> {
168        let url = self.store.get_url(&self.path)?;
169        match self.store.client.head(url).send().await {
170            Ok(res) => match res.status() {
171                StatusCode::NOT_FOUND => Ok(false),
172                other => error_from_status(other).map(|_| true),
173            },
174            Err(err) => Err(Error::other(err)),
175        }
176    }
177
178    /// Lists the entries in the HTTP directory by fetching and parsing HTML.
179    async fn read(&self) -> Result<Self::Reader> {
180        let url = self.store.get_url(&self.path)?;
181        let res = self
182            .store
183            .client
184            .get(url)
185            .send()
186            .await
187            .map_err(Error::other)?;
188        error_from_status(res.status())?;
189        let html = res.text().await.map_err(Error::other)?;
190        let mut entries = self.store.parser.parse(&html).collect::<Vec<_>>();
191        entries.reverse();
192
193        Ok(HttpStoreDirectoryReader {
194            store: self.store.clone(),
195            path: self.path.clone(),
196            entries,
197        })
198    }
199}
200
201/// Stream reader over entries within an HTTP directory listing.
202pub struct HttpStoreDirectoryReader {
203    store: Arc<InnerHttpStore>,
204    path: PathBuf,
205    entries: Vec<String>,
206}
207
208impl std::fmt::Debug for HttpStoreDirectoryReader {
209    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210        f.debug_struct(stringify!(HttpStoreDirectoryReader))
211            .field("path", &self.path)
212            .field("entries", &self.entries)
213            .finish_non_exhaustive()
214    }
215}
216
217impl Stream for HttpStoreDirectoryReader {
218    type Item = Result<HttpStoreEntry>;
219
220    /// Returns the next directory entry from the parsed HTML listing.
221    fn poll_next(
222        mut self: Pin<&mut Self>,
223        _cx: &mut std::task::Context<'_>,
224    ) -> Poll<Option<Self::Item>> {
225        let mut this = self.as_mut();
226
227        if let Some(entry) = this.entries.pop() {
228            Poll::Ready(Some(HttpStoreEntry::new(
229                self.store.clone(),
230                self.path.clone(),
231                entry,
232            )))
233        } else {
234            Poll::Ready(None)
235        }
236    }
237}
238
239impl crate::StoreDirectoryReader<HttpStoreEntry> for HttpStoreDirectoryReader {}
240
241/// Representation of a file in the HTTP store.
242pub struct HttpStoreFile {
243    store: Arc<InnerHttpStore>,
244    path: PathBuf,
245}
246
247impl std::fmt::Debug for HttpStoreFile {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        f.debug_struct(stringify!(HttpStoreFile))
250            .field("path", &self.path)
251            .finish_non_exhaustive()
252    }
253}
254
255impl crate::StoreFile for HttpStoreFile {
256    type FileReader = HttpStoreFileReader;
257    type FileWriter = crate::NoopFileWriter;
258    type Metadata = HttpStoreFileMetadata;
259
260    /// Returns the filename portion of the HTTP path.
261    fn filename(&self) -> Option<Cow<'_, str>> {
262        let cmp = self.path.components().next_back()?;
263        Some(cmp.as_os_str().to_string_lossy())
264    }
265
266    /// Checks if the HTTP file exists via a HEAD request.
267    async fn exists(&self) -> Result<bool> {
268        let url = self.store.get_url(&self.path)?;
269        let res = self
270            .store
271            .client
272            .head(url)
273            .send()
274            .await
275            .map_err(Error::other)?;
276        match res.status() {
277            StatusCode::NOT_FOUND => Ok(false),
278            other => error_from_status(other).map(|_| true),
279        }
280    }
281
282    /// Retrieves the HTTP file metadata (size and last modified).
283    async fn metadata(&self) -> Result<Self::Metadata> {
284        let url = self.store.get_url(&self.path)?;
285        let res = self
286            .store
287            .client
288            .head(url)
289            .send()
290            .await
291            .map_err(Error::other)?;
292        error_from_status(res.status())?;
293        let size = res
294            .headers()
295            .get(CONTENT_LENGTH)
296            .and_then(|value| value.to_str().ok())
297            .and_then(|value| value.parse::<u64>().ok())
298            .unwrap_or(0);
299        let modified = res
300            .headers()
301            .get(LAST_MODIFIED)
302            .and_then(|value| value.to_str().ok())
303            .and_then(|value| OffsetDateTime::parse(value, &Rfc2822).ok())
304            .map(|dt| dt.unix_timestamp() as u64)
305            .unwrap_or(0);
306        Ok(HttpStoreFileMetadata { size, modified })
307    }
308
309    /// Begins reading a file from the HTTP store for the given byte range.
310    async fn read<R: std::ops::RangeBounds<u64>>(&self, range: R) -> Result<Self::FileReader> {
311        let url = self.store.get_url(&self.path)?;
312        let res = self
313            .store
314            .client
315            .get(url)
316            .header(header::RANGE, RangeHeader(range).to_string())
317            .send()
318            .await
319            .map_err(Error::other)?;
320        HttpStoreFileReader::from_response(res)
321    }
322
323    async fn write(&self, _options: crate::WriteOptions) -> Result<Self::FileWriter> {
324        Err(Error::new(
325            ErrorKind::Unsupported,
326            "http store doesn't support write operations",
327        ))
328    }
329}
330
331/// Metadata for an HTTP file, containing size and last modification time.
332#[derive(Clone, Debug)]
333pub struct HttpStoreFileMetadata {
334    size: u64,
335    modified: u64,
336}
337
338impl super::StoreMetadata for HttpStoreFileMetadata {
339    /// Returns the file size in bytes.
340    fn size(&self) -> u64 {
341        self.size
342    }
343
344    /// Returns 0 as creation time is not available over HTTP.
345    fn created(&self) -> u64 {
346        0
347    }
348
349    /// Returns the last modified time (as a UNIX timestamp).
350    fn modified(&self) -> u64 {
351        self.modified
352    }
353}
354
355/// Reader for streaming bytes from a remote HTTP file.
356pub struct HttpStoreFileReader {
357    stream: Pin<Box<dyn Stream<Item = reqwest::Result<Bytes>> + std::marker::Send>>,
358}
359
360impl std::fmt::Debug for HttpStoreFileReader {
361    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
362        f.debug_struct(stringify!(HttpStoreFileReader))
363            .finish_non_exhaustive()
364    }
365}
366
367impl HttpStoreFileReader {
368    /// Creates a `HttpStoreFileReader` from a `reqwest::Response`.
369    ///
370    /// Validates the response and initializes the byte stream.
371    pub(crate) fn from_response(res: reqwest::Response) -> Result<Self> {
372        crate::http::error_from_status(res.status())?;
373        // TODO handle when status code is not 206
374        let stream = res.bytes_stream().boxed();
375        Ok(Self { stream })
376    }
377}
378
379impl tokio::io::AsyncRead for HttpStoreFileReader {
380    /// Polls the next chunk of data from the HTTP byte stream.
381    ///
382    /// Copies bytes into the provided buffer.
383    fn poll_read(
384        self: std::pin::Pin<&mut Self>,
385        cx: &mut std::task::Context<'_>,
386        buf: &mut tokio::io::ReadBuf<'_>,
387    ) -> std::task::Poll<std::io::Result<()>> {
388        let stream = &mut self.get_mut().stream;
389
390        match Pin::new(stream).poll_next(cx) {
391            Poll::Ready(Some(Ok(chunk))) => {
392                let len = buf.remaining();
393                let to_read = chunk.len().min(len);
394                buf.put_slice(&chunk[..to_read]);
395                Poll::Ready(Ok(()))
396            }
397            // Stream has ended with an error, propagate it
398            Poll::Ready(Some(Err(err))) => Poll::Ready(Err(Error::new(ErrorKind::Other, err))),
399            // No more data to read
400            Poll::Ready(None) => Poll::Ready(Ok(())),
401            Poll::Pending => Poll::Pending,
402        }
403    }
404}
405
406impl crate::StoreFileReader for HttpStoreFileReader {}
407
408/// Represents an entry in the HTTP store (file or directory).
409pub type HttpStoreEntry = crate::Entry<HttpStoreFile, HttpStoreDirectory>;
410
411impl HttpStoreEntry {
412    /// Constructs a new `HttpStoreEntry` (either file or directory) from a path
413    /// component.
414    ///
415    /// Assumes directory entries end with a `/`.
416    fn new(store: Arc<InnerHttpStore>, parent: PathBuf, entry: String) -> Result<Self> {
417        let path = parent.join(&entry);
418        Ok(if entry.ends_with('/') {
419            Self::Directory(HttpStoreDirectory { store, path })
420        } else {
421            Self::File(HttpStoreFile { store, path })
422        })
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use std::io::ErrorKind;
429    use std::path::PathBuf;
430
431    use futures::StreamExt;
432    use reqwest::header::{CONTENT_LENGTH, LAST_MODIFIED};
433    use tokio::io::AsyncReadExt;
434
435    use crate::http::HttpStore;
436    use crate::{Store, StoreDirectory, StoreFile, StoreMetadata};
437
438    #[test_case::test_case("http://localhost", "/foo.txt", "http://localhost/foo.txt"; "root with simple path with prefix")]
439    #[test_case::test_case("http://localhost", "foo.txt", "http://localhost/foo.txt"; "root with simple path without prefix")]
440    #[test_case::test_case("http://localhost/", "foo.txt", "http://localhost/foo.txt"; "root with simple path with slash on base")]
441    #[test_case::test_case("http://localhost/", "/foo.txt", "http://localhost/foo.txt"; "root with simple path with slashes")]
442    #[test_case::test_case("http://localhost/foo", "/bar/baz.txt", "http://localhost/foo/bar/baz.txt"; "with more children")]
443    #[test_case::test_case("http://localhost/foo", "/bar/with space.txt", "http://localhost/foo/bar/with%20space.txt"; "with spaces")]
444    fn building_path(base_url: &str, path: &str, expected: &str) {
445        let store = HttpStore::new(base_url).unwrap();
446        let path = PathBuf::from(path);
447        let url = store.0.get_url(&path).unwrap();
448        assert_eq!(url.as_str(), expected);
449    }
450
451    #[tokio::test]
452    async fn file_should_handle_base_with_ending_slash() {
453        let mut srv = mockito::Server::new_async().await;
454        let mock = srv
455            .mock("HEAD", "/foo/not-found.txt")
456            .with_status(404)
457            .create_async()
458            .await;
459        let store = HttpStore::new(format!("{}/foo/", srv.url())).unwrap();
460        let file = store.get_file("/not-found.txt").await.unwrap();
461        assert!(!file.exists().await.unwrap());
462        mock.assert_async().await;
463    }
464
465    #[tokio::test]
466    async fn file_should_check_if_file_exists() {
467        let mut srv = mockito::Server::new_async().await;
468        let mock = srv
469            .mock("HEAD", "/not-found.txt")
470            .with_status(404)
471            .create_async()
472            .await;
473        let store = HttpStore::new(srv.url()).unwrap();
474        let file = store.get_file("/not-found.txt").await.unwrap();
475        assert!(!file.exists().await.unwrap());
476        mock.assert_async().await;
477    }
478
479    #[tokio::test]
480    async fn file_should_get_filename() {
481        let srv = mockito::Server::new_async().await;
482        let store = HttpStore::new(srv.url()).unwrap();
483        let file = store.get_file("/test/file.txt").await.unwrap();
484        let name = file.filename().unwrap();
485        assert_eq!(name, "file.txt");
486    }
487
488    #[tokio::test]
489    async fn file_should_get_filename_with_space() {
490        let srv = mockito::Server::new_async().await;
491        let store = HttpStore::new(srv.url()).unwrap();
492        let file = store.get_file("/test/with space.txt").await.unwrap();
493        let name = file.filename().unwrap();
494        assert_eq!(name, "with space.txt");
495    }
496
497    #[tokio::test]
498    async fn file_meta_should_give_all() {
499        let mut srv = mockito::Server::new_async().await;
500        let mock = srv
501            .mock("HEAD", "/test/file.txt")
502            .with_status(200)
503            .with_header(CONTENT_LENGTH, "1234")
504            .with_header(LAST_MODIFIED, "Thu, 01 May 2025 09:57:28 GMT")
505            .create_async()
506            .await;
507        let store = HttpStore::new(srv.url()).unwrap();
508        let file = store.get_file("/test/file.txt").await.unwrap();
509        let meta = file.metadata().await.unwrap();
510        assert_eq!(meta.size, 1234);
511        assert_eq!(meta.created(), 0);
512        assert_eq!(meta.modified(), 1746093448);
513        mock.assert_async().await;
514    }
515
516    #[tokio::test]
517    async fn file_reader_should_read_entire_file() {
518        let mut srv = mockito::Server::new_async().await;
519        let _m = srv
520            .mock("GET", "/test/file")
521            .with_status(200)
522            .with_header("Content-Type", "application/octet-stream")
523            .with_body("Hello, world!")
524            .create();
525        let store = HttpStore::new(srv.url()).unwrap();
526        let file = store.get_file("/test/file").await.unwrap();
527
528        let reader = file.read(0..5).await.unwrap();
529
530        let mut buf = vec![0; 5];
531        let mut async_reader = tokio::io::BufReader::new(reader);
532        let n = async_reader.read(&mut buf).await.unwrap();
533
534        assert_eq!(n, 5);
535        assert_eq!(&buf, b"Hello");
536    }
537
538    #[tokio::test]
539    async fn file_reader_should_read_single_range() {
540        let mut srv = mockito::Server::new_async().await;
541        let _m = srv
542            .mock("GET", "/test/file")
543            .with_status(206) // Partial content status for range requests
544            .with_header("Content-Type", "application/octet-stream")
545            .with_header("Content-Range", "bytes 0-4/12")
546            .with_body("Hello, world!")
547            .create();
548
549        let store = HttpStore::new(srv.url()).unwrap();
550        let file = store.get_file("/test/file").await.unwrap();
551
552        let reader = file.read(0..5).await.unwrap();
553        let mut buf = vec![0; 5];
554
555        let mut async_reader = tokio::io::BufReader::new(reader);
556        let n = async_reader.read(&mut buf).await.unwrap();
557
558        assert_eq!(n, 5);
559
560        assert_eq!(&buf, b"Hello");
561    }
562
563    #[tokio::test]
564    async fn file_reader_should_fail_with_not_found() {
565        let mut srv = mockito::Server::new_async().await;
566        let _m = srv.mock("GET", "/test/file").with_status(404).create();
567
568        let store = HttpStore::new(srv.url()).unwrap();
569        let file = store.get_file("/test/file").await.unwrap();
570
571        let result = file.read(0..5).await;
572        match result {
573            Ok(_) => panic!("should fail"),
574            Err(err) => assert_eq!(err.kind(), ErrorKind::NotFound),
575        }
576    }
577
578    #[tokio::test]
579    async fn dir_should_list_entries() {
580        let mut srv = mockito::Server::new_async().await;
581        let _m = srv
582            .mock("GET", "/NEH")
583            .with_status(200)
584            .with_body(include_str!("../../assets/apache.html"))
585            .create();
586
587        let store = HttpStore::new(srv.url()).unwrap();
588        let dir = store.get_dir("/NEH").await.unwrap();
589        let mut content = dir.read().await.unwrap();
590
591        let mut result = Vec::new();
592        while let Some(entry) = content.next().await {
593            result.push(entry.unwrap());
594        }
595        assert_eq!(result.len(), 46);
596
597        assert_eq!(result.iter().filter(|item| item.is_directory()).count(), 41);
598        assert_eq!(result.iter().filter(|item| item.is_file()).count(), 5);
599    }
600}