remi_fs/
service.rs

1// ๐Ÿปโ€โ„๏ธ๐Ÿงถ remi-rs: Asynchronous Rust crate to handle communication between applications and object storage providers
2// Copyright (c) 2022-2025 Noelware, LLC. <team@noelware.org>
3//
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in all
12// copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20// SOFTWARE.
21
22use crate::{default_resolver, ContentTypeResolver, StorageConfig};
23use remi::{async_trait, Blob, Bytes, Directory, File, ListBlobsRequest, StorageService as _, UploadRequest};
24use std::{
25    borrow::Cow,
26    io,
27    path::{Path, PathBuf},
28    sync::Arc,
29    time::SystemTime,
30};
31use tokio::{fs, io::*};
32
33#[cfg(feature = "tracing")]
34use tracing::instrument;
35
36/// Represents an implementation of a [`StorageService`](remi::StorageService) for the
37/// local filesystem.
38#[derive(Clone)]
39pub struct StorageService {
40    resolver: Arc<dyn ContentTypeResolver>,
41    config: StorageConfig,
42}
43
44impl StorageService {
45    /// Creates a new [`StorageService`] instance.
46    pub fn new<P: AsRef<Path>>(path: P) -> StorageService {
47        Self::with_config(StorageConfig::new(path))
48    }
49
50    /// Creates a new [`StorageService`] instance with a provided configuration object.
51    pub fn with_config(config: StorageConfig) -> StorageService {
52        StorageService {
53            resolver: Arc::new(default_resolver),
54            config,
55        }
56    }
57
58    /// Updates the given [`ContentTypeResolver`] to something else.
59    pub fn with_resolver<R: ContentTypeResolver + 'static>(mut self, resolver: R) -> StorageService {
60        self.resolver = Arc::new(resolver);
61        self
62    }
63
64    /// Attempts to normalize a given path and returns a canonical, absolute
65    /// path. It must follow some strict rules:
66    ///
67    /// * If the path starts with `./`, then it will resolve from [`StorageConfig::directory`] if
68    ///   the directory was found. Otherwise, it'll use the current directory.
69    ///
70    /// * If the path starts with `~/`, then it will resolve from the home directory from [`etcetera::home_dir`].
71    #[cfg_attr(
72        feature = "tracing",
73        instrument(
74            name = "remi.filesystem.normalize",
75            skip_all,
76            fields(remi.service = "fs", path = %path.as_ref().display())
77        )
78    )]
79    pub fn normalize<P: AsRef<Path>>(&self, path: P) -> io::Result<Option<PathBuf>> {
80        let path = path.as_ref();
81
82        #[cfg(feature = "tracing")]
83        tracing::trace!("resolving path");
84
85        #[cfg(feature = "log")]
86        log::trace!("resolving path: {}", path.display());
87
88        if path == self.config.directory {
89            return std::fs::canonicalize(&self.config.directory).map(|x| Ok(Some(x)))?;
90        }
91
92        if path.starts_with("./") {
93            let Some(directory) = self.normalize(&self.config.directory)? else {
94                #[cfg(feature = "tracing")]
95                tracing::warn!(
96                    directory = %self.config.directory.display(),
97                    "unable to resolve directory from config"
98                );
99
100                #[cfg(feature = "log")]
101                log::warn!("unable to resolve given directory from config");
102
103                return Ok(None);
104            };
105
106            let normalized = format!("{}/{}", directory.display(), path.strip_prefix("./").unwrap().display());
107
108            #[cfg(feature = "tracing")]
109            tracing::trace!(%normalized, "resolved path to");
110
111            #[cfg(feature = "log")]
112            log::trace!("resolved path {} ~> {normalized}", path.display());
113
114            return Ok(Some(Path::new(&normalized).to_path_buf()));
115        }
116
117        if path.starts_with("~/") {
118            let homedir = etcetera::home_dir()
119                .inspect_err(|e| {
120                    #[cfg(feature = "tracing")]
121                    tracing::error!(error = %e, "failed to get home directory");
122
123                    #[cfg(feature = "log")]
124                    log::error!("failed to get home directory: {e}");
125
126                    let _ = e;
127                })
128                .map_err(|_| <std::io::ErrorKind as Into<std::io::Error>>::into(io::ErrorKind::InvalidData))?;
129
130            let normalized = format!("{}/{}", homedir.display(), path.strip_prefix("~/").unwrap().display());
131
132            #[cfg(feature = "tracing")]
133            tracing::trace!(%normalized, "resolved path to");
134
135            #[cfg(feature = "log")]
136            log::trace!("resolved path {} ~> {normalized}", path.display());
137
138            return Ok(Some(Path::new(&normalized).to_path_buf()));
139        }
140
141        Ok(Some(path.to_path_buf()))
142    }
143
144    async fn create_file(&self, path: &Path) -> io::Result<File> {
145        let metadata = path.metadata();
146        let is_symlink = metadata.as_ref().map(|m| m.is_symlink()).unwrap_or(false);
147        let size = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
148        let last_modified_at = match metadata {
149            Ok(ref m) => Some(
150                m.modified()?
151                    .duration_since(SystemTime::UNIX_EPOCH)
152                    .map_err(|_| io::Error::new(io::ErrorKind::Other, "clock went backwards?!"))?
153                    .as_millis(),
154            ),
155
156            Err(_) => None,
157        };
158
159        let created_at = match metadata {
160            Ok(ref m) => Some(
161                m.created()?
162                    .duration_since(SystemTime::UNIX_EPOCH)
163                    .map_err(|_| io::Error::new(io::ErrorKind::Other, "clock went backwards?!"))?
164                    .as_millis(),
165            ),
166
167            Err(_) => None,
168        };
169
170        let bytes = self.open(path).await?.map_or(Bytes::new(), |x| x);
171        let content_type = self.resolver.resolve(bytes.as_ref());
172
173        Ok(File {
174            last_modified_at,
175            content_type: Some(content_type.to_string()),
176            metadata: Default::default(),
177            created_at,
178            is_symlink,
179            data: bytes,
180            name: path.file_name().unwrap().to_string_lossy().into_owned(),
181            path: format!("fs://{}", path.display()),
182            size: size as usize,
183        })
184    }
185
186    async fn create_file_from_entry(&self, path: &Path, entry: fs::DirEntry) -> io::Result<File> {
187        let metadata = entry.metadata().await;
188        let is_symlink = metadata.as_ref().map(|m| m.is_symlink()).unwrap_or(false);
189        let size = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
190        let last_modified_at = match metadata {
191            Ok(ref m) => Some(
192                m.modified()?
193                    .duration_since(SystemTime::UNIX_EPOCH)
194                    .map_err(|_| io::Error::new(io::ErrorKind::Other, "clock went backwards?!"))?
195                    .as_millis(),
196            ),
197
198            Err(_) => None,
199        };
200
201        let created_at = match metadata {
202            Ok(ref m) => Some(
203                m.created()?
204                    .duration_since(SystemTime::UNIX_EPOCH)
205                    .map_err(|_| io::Error::new(io::ErrorKind::Other, "clock went backwards?!"))?
206                    .as_millis(),
207            ),
208
209            Err(_) => None,
210        };
211
212        let bytes = self.open(path).await?.map_or(Bytes::new(), |x| x);
213        let content_type = self.resolver.resolve(bytes.as_ref());
214
215        Ok(File {
216            last_modified_at,
217            content_type: Some(content_type.to_string()),
218            metadata: Default::default(),
219            created_at,
220            is_symlink,
221            data: bytes,
222            name: entry.file_name().to_string_lossy().into_owned(),
223            path: format!("fs://{}", path.display()),
224            size: size as usize,
225        })
226    }
227}
228
229#[async_trait]
230impl remi::StorageService for StorageService {
231    type Error = io::Error;
232
233    fn name(&self) -> Cow<'static, str> {
234        Cow::Borrowed("remi:gridfs")
235    }
236
237    #[cfg_attr(
238        feature = "tracing",
239        instrument(
240            name = "remi.filesystem.open",
241            skip_all,
242            fields(
243                remi.service = "fs",
244                directory = %self.config.directory.display()
245            )
246        )
247    )]
248    async fn init(&self) -> io::Result<()> {
249        if !self.config.directory.try_exists()? {
250            #[cfg(feature = "tracing")]
251            tracing::info!("creating directory since it doesn't exist");
252
253            #[cfg(feature = "log")]
254            log::info!(
255                "creating directory [{}] since it doesn't exist",
256                self.config.directory.display(),
257            );
258
259            fs::create_dir_all(&self.config.directory).await?;
260        }
261
262        if !self.config.directory.is_dir() {
263            #[cfg(not(no_io_errorkind))]
264            return Err(Error::new(
265                io::ErrorKind::NotADirectory,
266                format!("path [{}] is a file, not a directory", self.config.directory.display()),
267            ));
268
269            #[cfg(no_io_errorkind)]
270            return Err(Error::new(
271                io::ErrorKind::InvalidData,
272                format!("path [{}] is a file, not a directory", self.config.directory.display()),
273            ));
274        }
275
276        Ok(())
277    }
278
279    #[cfg_attr(
280        feature = "tracing",
281        instrument(
282            name = "remi.filesystem.open",
283            skip_all,
284            fields(
285                remi.service = "fs",
286                path = %path.as_ref().display()
287            )
288        )
289    )]
290    async fn open<P: AsRef<Path> + Send>(&self, path: P) -> io::Result<Option<Bytes>> {
291        let path = path.as_ref();
292        let Some(path) = self.normalize(path)? else {
293            #[cfg(feature = "tracing")]
294            tracing::warn!("path given couldn't be normalized");
295
296            #[cfg(feature = "log")]
297            log::warn!("path given [{}] was a file, not a directory", path.display());
298
299            return Ok(None);
300        };
301
302        if !path.try_exists()? {
303            #[cfg(feature = "tracing")]
304            tracing::warn!("path doesn't exist");
305
306            #[cfg(feature = "log")]
307            log::warn!("path [{}] doesn't exist", path.display());
308
309            return Ok(None);
310        }
311
312        if path.is_dir() {
313            #[cfg(not(no_io_errorkind))]
314            return Err(Error::new(
315                io::ErrorKind::NotADirectory,
316                format!("path [{}] is a file, not a directory", self.config.directory.display()),
317            ));
318
319            #[cfg(no_io_errorkind)]
320            return Err(Error::new(
321                io::ErrorKind::InvalidData,
322                format!("path [{}] is a file, not a directory", self.config.directory.display()),
323            ));
324        }
325
326        #[cfg(feature = "tracing")]
327        tracing::trace!("attempting to open file");
328
329        #[cfg(feature = "log")]
330        log::trace!("attempting to open file [{}]", path.display());
331
332        let mut file = fs::OpenOptions::new()
333            .create(false)
334            .write(false)
335            .read(true)
336            .open(&path)
337            .await?;
338
339        let metadata = file.metadata().await?;
340        let size = metadata.len();
341        let mut buffer = vec![0; size as usize];
342
343        buffer.resize(size as usize, 0);
344        file.read_exact(&mut buffer).await?;
345
346        Ok(Some(Bytes::from(buffer)))
347    }
348
349    #[cfg_attr(
350        feature = "tracing",
351        instrument(
352            name = "remi.filesystem.blob",
353            skip_all,
354            fields(
355                remi.service = "fs",
356                path = %path.as_ref().display()
357            )
358        )
359    )]
360    async fn blob<P: AsRef<Path> + Send>(&self, path: P) -> io::Result<Option<Blob>> {
361        let path = path.as_ref();
362        let Some(path) = self.normalize(path)? else {
363            #[cfg(feature = "tracing")]
364            tracing::warn!("path given couldn't be normalized");
365
366            #[cfg(feature = "log")]
367            log::warn!("path given [{}] couldn't be normalized", path.display());
368
369            return Ok(None);
370        };
371
372        if path.is_dir() {
373            let metadata = path.metadata()?;
374            let created_at = match metadata.created() {
375                Ok(sys) => Some(
376                    sys.duration_since(SystemTime::UNIX_EPOCH)
377                        .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "clock went backwards?!"))?
378                        .as_millis(),
379                ),
380
381                Err(_) => None,
382            };
383
384            let name = path
385                .file_name()
386                .map(|s| s.to_string_lossy())
387                .unwrap_or(Cow::Borrowed("<root or relative path>"))
388                .to_string();
389
390            return Ok(Some(Blob::Directory(Directory {
391                created_at,
392                name,
393                path: format!("fs://{}", path.display()),
394            })));
395        }
396
397        Ok(Some(Blob::File(self.create_file(&path).await?)))
398    }
399
400    #[cfg_attr(
401        feature = "tracing",
402        instrument(
403            name = "remi.filesystem.blobs",
404            skip_all,
405            fields(
406                remi.service = "fs",
407                path = ?path.as_ref().map(|path| path.as_ref().display())
408            )
409        )
410    )]
411    async fn blobs<P: AsRef<Path> + Send>(
412        &self,
413        path: Option<P>,
414        options: Option<ListBlobsRequest>,
415    ) -> io::Result<Vec<Blob>> {
416        let options = options.unwrap_or_default();
417        let prefix = options.prefix.clone().unwrap_or_default();
418        let path = match path {
419            Some(ref p) => p.as_ref(),
420            None => &self.config.directory,
421        };
422
423        let Some(path) = self.normalize(path)? else {
424            #[cfg(feature = "tracing")]
425            tracing::warn!("path given couldn't be normalized");
426
427            #[cfg(feature = "log")]
428            log::warn!("path given [{}] was a file, not a directory", path.display());
429
430            return Ok(vec![]);
431        };
432
433        if path.is_file() {
434            #[cfg(feature = "tracing")]
435            tracing::warn!("path given was a file, not a directory");
436
437            #[cfg(feature = "log")]
438            log::warn!("path given [{}] was a file, not a directory", path.display());
439
440            return Ok(vec![]);
441        }
442
443        let search = format!("{}{prefix}", path.display());
444        #[cfg(feature = "tracing")]
445        tracing::trace!(%search, "attempting to search all blobs in given path");
446
447        #[cfg(feature = "log")]
448        log::trace!(
449            "attempting to search in [{search}] for all blobs in given path [{}]",
450            path.display()
451        );
452
453        let mut files = fs::read_dir(search).await?;
454        let mut blobs = vec![];
455
456        while let Some(entry) = files.next_entry().await? {
457            if entry.path().is_dir() && options.include_dirs {
458                blobs.push(Blob::Directory(Directory {
459                    created_at: match entry.metadata().await {
460                        Ok(sys) => Some(
461                            sys.created()?
462                                .duration_since(SystemTime::UNIX_EPOCH)
463                                .map_err(|_| io::Error::new(io::ErrorKind::Other, "clock went backwards?!"))?
464                                .as_millis(),
465                        ),
466
467                        Err(_) => None,
468                    },
469
470                    name: path
471                        .file_name()
472                        .map(|s| s.to_string_lossy())
473                        .unwrap_or(Cow::Borrowed("<root or relative path>"))
474                        .to_string(),
475
476                    path: format!("fs://{}", entry.path().display()),
477                }));
478
479                continue;
480            }
481
482            let path = entry.path();
483            let ext_allowed = match path.extension() {
484                Some(s) => options.is_ext_allowed(s.to_str().expect("valid utf-8 in path extension")),
485                None => true,
486            };
487
488            if !ext_allowed {
489                continue;
490            }
491
492            blobs.push(Blob::File(self.create_file_from_entry(&path, entry).await?));
493        }
494
495        Ok(blobs)
496    }
497
498    #[cfg_attr(
499        feature = "tracing",
500        instrument(
501            name = "remi.filesystem.delete",
502            skip_all,
503            fields(
504                remi.service = "fs",
505                path = %path.as_ref().display()
506            )
507        )
508    )]
509    async fn delete<P: AsRef<Path> + Send>(&self, path: P) -> io::Result<()> {
510        let path = path.as_ref();
511        let Some(path) = self.normalize(path)? else {
512            return Err(io::Error::new(
513                io::ErrorKind::InvalidInput,
514                "unable to normalize given path",
515            ));
516        };
517
518        if path.is_dir() {
519            #[cfg(feature = "tracing")]
520            tracing::trace!("deleting directory");
521
522            #[cfg(feature = "log")]
523            log::trace!("deleting directory [{}]", path.display());
524
525            fs::remove_dir(path).await?;
526            return Ok(());
527        }
528
529        #[cfg(feature = "tracing")]
530        tracing::trace!("deleting file");
531
532        #[cfg(feature = "log")]
533        log::trace!("deleting file [{}]...", path.display());
534
535        fs::remove_file(path).await
536    }
537
538    #[cfg_attr(
539        feature = "tracing",
540        instrument(
541            name = "remi.filesystem.exists",
542            skip_all,
543            fields(
544                remi.service = "fs",
545                path = %path.as_ref().display()
546            )
547        )
548    )]
549    async fn exists<P: AsRef<Path> + Send>(&self, path: P) -> io::Result<bool> {
550        let path = path.as_ref();
551        let Some(path) = self.normalize(path)? else {
552            return Err(io::Error::new(
553                io::ErrorKind::InvalidInput,
554                "unable to normalize given path",
555            ));
556        };
557
558        path.try_exists()
559    }
560
561    #[cfg_attr(
562        feature = "tracing",
563        instrument(
564            name = "remi.filesystem.upload",
565            skip_all,
566            fields(
567                remi.service = "fs",
568                path = %path.as_ref().display()
569            )
570        )
571    )]
572    async fn upload<P: AsRef<Path> + Send>(&self, path: P, options: UploadRequest) -> io::Result<()> {
573        let path = path.as_ref();
574        let Some(path) = self.normalize(path)? else {
575            return Err(io::Error::new(
576                io::ErrorKind::InvalidInput,
577                "unable to normalize given path",
578            ));
579        };
580
581        if path.try_exists()? {
582            #[cfg(feature = "tracing")]
583            tracing::warn!("contents in given path will be overwritten");
584
585            #[cfg(feature = "log")]
586            log::trace!("contents in given path [{}] will be overwritten", path.display());
587        }
588
589        #[cfg(feature = "tracing")]
590        tracing::warn!("uploading file");
591
592        #[cfg(feature = "log")]
593        log::trace!("uploading file [{}]", path.display());
594
595        // ensure that the parent exists, if not, it'll attempt
596        // to create all paths in the given parent
597        if let Some(parent) = path.parent() {
598            fs::create_dir_all(parent).await?;
599        }
600
601        let mut file = fs::OpenOptions::new();
602        file.write(true);
603
604        if !path.try_exists()? {
605            // atomically create the file if it doesn't exist
606            file.create_new(true);
607        }
608
609        let mut file = file.open(path).await?;
610        file.write_all(options.data.as_ref()).await?;
611        file.flush().await?;
612
613        Ok(())
614    }
615
616    #[cfg(feature = "unstable")]
617    #[cfg_attr(any(noeldoc, docsrs), doc(cfg(feature = "unstable")))]
618    async fn healthcheck(&self) -> io::Result<()> {
619        Ok(())
620    }
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626
627    // built to not repeat setup functionality
628    macro_rules! build_testcases {
629        ($(
630            $(#[$meta:meta])*
631            $name:ident($storage:ident) $code:block
632        )*) => {
633            $(
634                $(#[$meta])*
635                #[::tokio::test]
636                async fn $name() -> ::std::io::Result<()> {
637                    let tempdir = ::tempfile::tempdir().expect("failed to create tempdir");
638                    assert!(fs::try_exists(&tempdir).await.expect("tempdir to actually exist"));
639
640                    let $storage = $crate::StorageService::new(&tempdir);
641                    ($storage).init().await.expect("initialization part to be successful");
642
643                    assert!(fs::try_exists(tempdir).await.expect("should actually exist?!"));
644
645                    let __ret: ::std::io::Result<()> = $code;
646                    __ret
647                }
648            )*
649        };
650    }
651
652    build_testcases! {
653        init(_storage) {
654            Ok(())
655        }
656
657        // open(storage) {
658        //     #[cfg(feature = "tracing")]
659        //     use tracing_subscriber::prelude::*;
660
661        //     #[cfg(feature = "tracing")]
662        //     let _guard = tracing_subscriber::registry().with(tracing_subscriber::fmt::layer()).set_default();
663
664        //     // 1. upload the contents and see if we can do so
665        //     let contents: remi::Bytes = "{\"wuff\":true}".into();
666        //     storage.upload("./wuff.json", UploadRequest::default()
667        //         .with_data(contents.clone())
668        //         .with_content_type(Some(default_resolver(contents.as_ref())))
669        //     ).await.expect("unable to upload ./wuff.json");
670
671        //     // 2. assert that it exists
672        //     assert!(storage.exists("./wuff.json").await?);
673
674        //     // 3. open the file and check if it is the same
675        //     assert_eq!(contents, storage.open("./wuff.json").await?.unwrap());
676
677        //     Ok(())
678        // }
679    }
680}