any_storage/
local.rs

1use std::borrow::Cow;
2use std::io::{Error, ErrorKind, Result, SeekFrom};
3use std::ops::{Bound, RangeBounds};
4use std::os::unix::fs::MetadataExt;
5use std::path::{Component, PathBuf};
6use std::pin::Pin;
7use std::sync::Arc;
8use std::task::Poll;
9use std::time::SystemTime;
10
11use futures::Stream;
12use tokio::io::AsyncSeekExt;
13
14use crate::{Entry, Store, StoreDirectory, StoreFile, StoreFileReader, WriteMode};
15
16/// Configuration for [`LocalStore`].
17#[derive(Clone, Debug)]
18#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
19pub struct LocalStoreConfig {
20    /// Root directory for that store
21    pub path: PathBuf,
22}
23
24impl LocalStoreConfig {
25    pub fn build(&self) -> Result<LocalStore> {
26        Ok(LocalStore::new(self.path.clone()))
27    }
28}
29
30/// Internal representation of the local store with a root path.
31#[derive(Debug)]
32struct InnerLocalStore {
33    root: PathBuf,
34}
35
36/// Wrapper for the local store, enabling shared ownership.
37#[derive(Debug, Clone)]
38pub struct LocalStore(Arc<InnerLocalStore>);
39
40impl LocalStore {
41    /// Constructor of the localstore
42    pub fn new<P: Into<PathBuf>>(path: P) -> Self {
43        Self::from(path.into())
44    }
45}
46
47impl From<PathBuf> for LocalStore {
48    /// Converts a `PathBuf` into a `LocalStore`.
49    ///
50    /// Takes the root path of the local store and wraps it in an `Arc`.
51    fn from(value: PathBuf) -> Self {
52        Self(Arc::new(InnerLocalStore { root: value }))
53    }
54}
55
56impl Store for LocalStore {
57    type Directory = LocalStoreDirectory;
58    type File = LocalStoreFile;
59
60    /// Retrieves a directory at the specified path in the local store.
61    ///
62    /// Merges the root path with the given path to obtain the full directory
63    /// path.
64    async fn get_dir<P: Into<PathBuf>>(&self, path: P) -> Result<Self::Directory> {
65        let path = path.into();
66        crate::util::merge_path(&self.0.root, &path).map(|path| LocalStoreDirectory { path })
67    }
68
69    /// Retrieves a file at the specified path in the local store.
70    ///
71    /// Merges the root path with the given path to obtain the full file path.
72    async fn get_file<P: Into<PathBuf>>(&self, path: P) -> Result<Self::File> {
73        let path = path.into();
74        crate::util::merge_path(&self.0.root, &path).map(|path| LocalStoreFile { path })
75    }
76}
77
78/// Type alias for entries in the local store, which can be files or
79/// directories.
80pub type LocalStoreEntry = Entry<LocalStoreFile, LocalStoreDirectory>;
81
82impl LocalStoreEntry {
83    /// Creates a new `LocalStoreEntry` from a `tokio::fs::DirEntry`.
84    ///
85    /// The entry is classified as either a file or directory based on its path.
86    pub fn new(entry: tokio::fs::DirEntry) -> Result<Self> {
87        let path = entry.path();
88        if path.is_dir() {
89            Ok(Self::Directory(LocalStoreDirectory { path }))
90        } else if path.is_file() {
91            Ok(Self::File(LocalStoreFile { path }))
92        } else {
93            Err(Error::new(
94                ErrorKind::Unsupported,
95                "expected a file or a directory",
96            ))
97        }
98    }
99}
100
101/// Representation of a directory in the local store.
102#[derive(Debug)]
103pub struct LocalStoreDirectory {
104    path: PathBuf,
105}
106
107impl StoreDirectory for LocalStoreDirectory {
108    type Entry = LocalStoreEntry;
109    type Reader = LocalStoreDirectoryReader;
110
111    /// Checks if the directory exists.
112    ///
113    /// Returns a future that resolves to `true` if the directory exists,
114    /// otherwise `false`.
115    async fn exists(&self) -> Result<bool> {
116        tokio::fs::try_exists(&self.path).await
117    }
118
119    /// Reads the contents of the directory.
120    ///
121    /// Returns a future that resolves to a reader for iterating over the
122    /// directory's entries.
123    async fn read(&self) -> Result<Self::Reader> {
124        tokio::fs::read_dir(&self.path)
125            .await
126            .map(|value| LocalStoreDirectoryReader {
127                inner: Box::pin(value),
128            })
129    }
130}
131
132/// Reader for streaming entries from a local store directory.
133#[derive(Debug)]
134pub struct LocalStoreDirectoryReader {
135    inner: Pin<Box<tokio::fs::ReadDir>>,
136}
137
138impl Stream for LocalStoreDirectoryReader {
139    type Item = Result<LocalStoreEntry>;
140
141    /// Polls for the next directory entry.
142    ///
143    /// This function is used to asynchronously retrieve the next entry in the
144    /// directory.
145    fn poll_next(
146        self: Pin<&mut Self>,
147        cx: &mut std::task::Context<'_>,
148    ) -> Poll<Option<Self::Item>> {
149        let mut inner = self.get_mut().inner.as_mut();
150
151        match inner.poll_next_entry(cx) {
152            Poll::Ready(Ok(Some(entry))) => Poll::Ready(Some(LocalStoreEntry::new(entry))),
153            Poll::Ready(Ok(None)) => Poll::Ready(None),
154            Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
155            Poll::Pending => Poll::Pending,
156        }
157    }
158}
159
160impl crate::StoreDirectoryReader<LocalStoreEntry> for LocalStoreDirectoryReader {}
161
162/// Representation of a file in the local store.
163#[derive(Debug)]
164pub struct LocalStoreFile {
165    path: PathBuf,
166}
167
168impl StoreFile for LocalStoreFile {
169    type FileReader = LocalStoreFileReader;
170    type FileWriter = LocalStoreFileWriter;
171    type Metadata = LocalStoreFileMetadata;
172
173    /// Retrieves the file name from the path.
174    ///
175    /// This function extracts the file name by iterating over the components of
176    /// the path in reverse order.
177    fn filename(&self) -> Option<Cow<'_, str>> {
178        self.path
179            .components()
180            .rev()
181            .filter_map(|item| match item {
182                Component::Normal(inner) => Some(inner),
183                _ => None,
184            })
185            .next()
186            .map(|value| value.to_string_lossy())
187    }
188
189    /// Checks if the file exists.
190    ///
191    /// Returns a future that resolves to `true` if the file exists, otherwise
192    /// `false`.
193    async fn exists(&self) -> Result<bool> {
194        tokio::fs::try_exists(&self.path).await
195    }
196
197    /// Retrieves the metadata of the file.
198    ///
199    /// Returns a future that resolves to the file's metadata, such as size and
200    /// timestamps.
201    async fn metadata(&self) -> Result<Self::Metadata> {
202        let meta = tokio::fs::metadata(&self.path).await?;
203        let size = meta.size();
204        let created = meta
205            .created()
206            .ok()
207            .and_then(|v| v.duration_since(SystemTime::UNIX_EPOCH).ok())
208            .map(|d| d.as_secs())
209            .unwrap_or(0);
210        let modified = meta
211            .modified()
212            .ok()
213            .and_then(|v| v.duration_since(SystemTime::UNIX_EPOCH).ok())
214            .map(|d| d.as_secs())
215            .unwrap_or(0);
216        Ok(LocalStoreFileMetadata {
217            size,
218            created,
219            modified,
220        })
221    }
222
223    /// Reads a portion of the file's content, specified by a byte range.
224    ///
225    /// Returns a future that resolves to a reader that can read the specified
226    /// range of the file.
227    async fn read<R: RangeBounds<u64>>(&self, range: R) -> Result<Self::FileReader> {
228        use tokio::io::AsyncSeekExt;
229
230        let start = match range.start_bound() {
231            Bound::Included(&n) => n,
232            Bound::Excluded(&n) => n + 1,
233            Bound::Unbounded => 0,
234        };
235
236        let end = match range.end_bound() {
237            Bound::Included(&n) => Some(n + 1),
238            Bound::Excluded(&n) => Some(n),
239            Bound::Unbounded => None, // no limit
240        };
241
242        let mut file = tokio::fs::OpenOptions::new()
243            .read(true)
244            .open(&self.path)
245            .await?;
246        file.seek(std::io::SeekFrom::Start(start)).await?;
247        Ok(LocalStoreFileReader {
248            file,
249            start,
250            end,
251            position: start,
252        })
253    }
254
255    async fn write(&self, options: crate::WriteOptions) -> Result<Self::FileWriter> {
256        let mut file = tokio::fs::OpenOptions::new()
257            .append(matches!(options.mode, WriteMode::Append))
258            .truncate(matches!(options.mode, WriteMode::Truncate { .. }))
259            .write(true)
260            .create(true)
261            .open(&self.path)
262            .await?;
263        match options.mode {
264            WriteMode::Truncate { offset } if offset > 0 => {
265                file.seek(SeekFrom::Start(offset)).await?;
266            }
267            _ => {}
268        };
269        Ok(LocalStoreFileWriter(file))
270    }
271}
272
273/// Metadata associated with a file in the local store (size, created, modified
274/// timestamps).
275#[derive(Clone, Debug)]
276pub struct LocalStoreFileMetadata {
277    size: u64,
278    created: u64,
279    modified: u64,
280}
281
282impl super::StoreMetadata for LocalStoreFileMetadata {
283    /// Returns the size of the file in bytes.
284    fn size(&self) -> u64 {
285        self.size
286    }
287
288    /// Returns the creation timestamp of the file (epoch time).
289    fn created(&self) -> u64 {
290        self.created
291    }
292
293    /// Returns the last modification timestamp of the file (epoch time).
294    fn modified(&self) -> u64 {
295        self.modified
296    }
297}
298
299/// Reader for asynchronously reading the contents of a file in the local store.
300#[derive(Debug)]
301pub struct LocalStoreFileReader {
302    file: tokio::fs::File,
303    #[allow(unused)]
304    start: u64,
305    end: Option<u64>,
306    position: u64,
307}
308
309impl tokio::io::AsyncRead for LocalStoreFileReader {
310    /// Polls for reading data from the file.
311    ///
312    /// This function reads data into the provided buffer, handling partial
313    /// reads within the given range.
314    fn poll_read(
315        mut self: std::pin::Pin<&mut Self>,
316        cx: &mut std::task::Context<'_>,
317        buf: &mut tokio::io::ReadBuf<'_>,
318    ) -> Poll<std::io::Result<()>> {
319        let remaining = match self.end {
320            Some(end) => end.saturating_sub(self.position) as usize,
321            None => buf.remaining(),
322        };
323
324        if remaining == 0 {
325            return std::task::Poll::Ready(Ok(()));
326        }
327
328        // Limit the read buffer to the remaining range
329        let read_len = std::cmp::min(remaining, buf.remaining()) as usize;
330        let mut temp_buf = vec![0u8; read_len];
331        let mut temp_read_buf = tokio::io::ReadBuf::new(&mut temp_buf);
332
333        let this = self.as_mut().get_mut();
334        let pinned_file = Pin::new(&mut this.file);
335
336        match pinned_file.poll_read(cx, &mut temp_read_buf) {
337            Poll::Ready(Ok(())) => {
338                let bytes_read = temp_read_buf.filled().len();
339                buf.put_slice(temp_read_buf.filled());
340                this.position += bytes_read as u64;
341                Poll::Ready(Ok(()))
342            }
343            other => other,
344        }
345    }
346}
347
348impl StoreFileReader for LocalStoreFileReader {}
349
350#[derive(Debug)]
351pub struct LocalStoreFileWriter(tokio::fs::File);
352
353impl tokio::io::AsyncWrite for LocalStoreFileWriter {
354    fn poll_write(
355        mut self: Pin<&mut Self>,
356        cx: &mut std::task::Context<'_>,
357        buf: &[u8],
358    ) -> Poll<Result<usize>> {
359        // Pinning the inner file (unwrap since it is wrapped in Pin)
360        let file = &mut self.as_mut().0;
361
362        // Use tokio::io::AsyncWriteExt::write to write to the file
363        Pin::new(file).poll_write(cx, buf)
364    }
365
366    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Result<()>> {
367        let file = &mut self.as_mut().0;
368        Pin::new(file).poll_flush(cx)
369    }
370
371    fn poll_shutdown(
372        mut self: Pin<&mut Self>,
373        cx: &mut std::task::Context<'_>,
374    ) -> Poll<Result<()>> {
375        let file = &mut self.as_mut().0;
376        Pin::new(file).poll_shutdown(cx)
377    }
378}
379
380impl crate::StoreFileWriter for LocalStoreFileWriter {}
381
382#[cfg(test)]
383mod tests {
384    use std::path::PathBuf;
385
386    use tokio::io::AsyncReadExt;
387
388    use super::*;
389    use crate::Store;
390
391    #[tokio::test]
392    async fn should_not_go_in_parent_folder() {
393        let current = PathBuf::from(env!("PWD"));
394        let store = LocalStore::from(current);
395
396        let _ = store.get_file("anywhere/../hello.txt").await.unwrap();
397
398        let err = store.get_file("../hello.txt").await.unwrap_err();
399        assert_eq!(err.to_string(), "No such file or directory");
400    }
401
402    #[tokio::test]
403    async fn should_find_existing_files() {
404        let current = PathBuf::from(env!("PWD"));
405        let store = LocalStore::from(current);
406
407        let lib = store.get_file("/src/lib.rs").await.unwrap();
408        assert!(lib.exists().await.unwrap());
409
410        let lib = store.get_file("src/lib.rs").await.unwrap();
411        assert!(lib.exists().await.unwrap());
412
413        let lib = store.get_file("nothing/../src/lib.rs").await.unwrap();
414        assert!(lib.exists().await.unwrap());
415
416        let missing = store.get_file("nothing.rs").await.unwrap();
417        assert!(!missing.exists().await.unwrap());
418    }
419
420    #[tokio::test]
421    async fn should_read_lib_file() {
422        let current = PathBuf::from(env!("PWD"));
423        let store = LocalStore::from(current);
424
425        let lib = store.get_file("/src/lib.rs").await.unwrap();
426        let mut reader = lib.read(0..10).await.unwrap();
427        let mut buffer = vec![];
428        reader.read_to_end(&mut buffer).await.unwrap();
429
430        let content = include_bytes!("./lib.rs");
431        assert_eq!(buffer, content[0..10]);
432    }
433
434    #[tokio::test]
435    async fn should_read_lib_metadata() {
436        let current = PathBuf::from(env!("PWD"));
437        let store = LocalStore::from(current);
438
439        let lib = store.get_file("/src/lib.rs").await.unwrap();
440        let meta = lib.metadata().await.unwrap();
441
442        assert!(meta.size > 0);
443        assert!(meta.created > 0);
444        assert!(meta.modified > 0);
445    }
446}