loco_rs/storage/
mod.rs

1//! # Storage Module
2//!
3//! This module defines a generic storage abstraction represented by the
4//! [`Storage`] struct. It provides methods for performing common storage
5//! operations such as upload, download, delete, rename, and copy.
6//!
7//! ## Storage Strategy
8//!
9//! The [`Storage`] struct is designed to work with different storage
10//! strategies. A storage strategy defines the behavior of the storage
11//! operations. Strategies implement the [`strategies::StorageStrategy`].
12//! The selected strategy can be dynamically changed at runtime.
13mod contents;
14pub mod drivers;
15pub mod strategies;
16pub mod stream;
17use std::{
18    collections::BTreeMap,
19    path::{Path, PathBuf},
20};
21
22use bytes::Bytes;
23
24use self::{drivers::StoreDriver, stream::BytesStream};
25
26#[derive(thiserror::Error, Debug)]
27#[allow(clippy::module_name_repetitions)]
28pub enum StorageError {
29    #[error("store not found by the given key: {0}")]
30    StoreNotFound(String),
31
32    #[error(transparent)]
33    Store(#[from] Box<opendal::Error>),
34
35    #[error("Unable to read data from file {}", path.display().to_string())]
36    UnableToReadBytes { path: PathBuf },
37
38    #[error("secondaries errors")]
39    Multi(BTreeMap<String, String>),
40
41    #[error(transparent)]
42    Any(#[from] Box<dyn std::error::Error + Send + Sync>),
43}
44
45pub type StorageResult<T> = std::result::Result<T, StorageError>;
46
47impl From<opendal::Error> for StorageError {
48    fn from(val: opendal::Error) -> Self {
49        Self::Store(Box::new(val))
50    }
51}
52
53pub struct Storage {
54    pub stores: BTreeMap<String, Box<dyn StoreDriver>>,
55    pub strategy: Box<dyn strategies::StorageStrategy>,
56}
57
58impl Storage {
59    /// Creates a new storage instance with a single store and the default
60    /// strategy.
61    ///
62    /// # Examples
63    ///```
64    /// use loco_rs::storage;
65    ///
66    /// let storage = storage::Storage::single(storage::drivers::mem::new());
67    /// ```
68    #[must_use]
69    pub fn single(store: Box<dyn StoreDriver>) -> Self {
70        let default_key = "store";
71        Self {
72            strategy: Box::new(strategies::single::SingleStrategy::new(default_key)),
73            stores: BTreeMap::from([(default_key.to_string(), store)]),
74        }
75    }
76
77    /// Creates a new storage instance with the provided stores and strategy.
78    #[must_use]
79    pub fn new(
80        stores: BTreeMap<String, Box<dyn StoreDriver>>,
81        strategy: Box<dyn strategies::StorageStrategy>,
82    ) -> Self {
83        Self { stores, strategy }
84    }
85
86    /// Uploads content to the storage at the specified path.
87    ///
88    /// This method uses the selected strategy for the upload operation.
89    ///
90    /// # Examples
91    ///```
92    /// use loco_rs::storage;
93    /// use std::path::Path;
94    /// use bytes::Bytes;
95    /// pub async fn upload() {
96    ///     let storage = storage::Storage::single(storage::drivers::mem::new());
97    ///     let path = Path::new("example.txt");
98    ///     let content = "Loco!";
99    ///     let result = storage.upload(path, &Bytes::from(content)).await;
100    ///     assert!(result.is_ok());
101    /// }
102    /// ```
103    ///
104    /// # Errors
105    ///
106    /// This method returns an error if the upload operation fails or if there
107    /// is an issue with the strategy configuration.
108    pub async fn upload(&self, path: &Path, content: &Bytes) -> StorageResult<()> {
109        self.upload_with_strategy(path, content, &*self.strategy)
110            .await
111    }
112
113    /// Uploads content to the storage at the specified path using a specific
114    /// strategy.
115    ///
116    /// This method allows specifying a custom strategy for the upload
117    /// operation.
118    ///
119    /// # Errors
120    ///
121    /// This method returns an error if the upload operation fails or if there
122    /// is an issue with the strategy configuration.
123    pub async fn upload_with_strategy(
124        &self,
125        path: &Path,
126        content: &Bytes,
127        strategy: &dyn strategies::StorageStrategy,
128    ) -> StorageResult<()> {
129        strategy.upload(self, path, content).await
130    }
131
132    /// Downloads content from the storage at the specified path.
133    ///
134    /// This method uses the selected strategy for the download operation.
135    ///
136    /// # Examples
137    ///```
138    /// use loco_rs::storage;
139    /// use std::path::Path;
140    /// use bytes::Bytes;
141    /// pub async fn download() {
142    ///     let storage = storage::Storage::single(storage::drivers::mem::new());
143    ///     let path = Path::new("example.txt");
144    ///     let content = "Loco!";
145    ///     storage.upload(path, &Bytes::from(content)).await;
146    ///
147    ///     let result: String = storage.download(path).await.unwrap();
148    ///     assert_eq!(result, "Loco!");
149    /// }
150    /// ```
151    ///
152    /// # Errors
153    ///
154    /// This method returns an error if the download operation fails or if there
155    /// is an issue with the strategy configuration.
156    pub async fn download<T: TryFrom<contents::Contents>>(&self, path: &Path) -> StorageResult<T> {
157        self.download_with_policy(path, &*self.strategy).await
158    }
159
160    /// Downloads content from the storage at the specified path using a
161    /// specific strategy.
162    ///
163    /// This method allows specifying a custom strategy for the download
164    /// operation.
165    ///
166    /// # Errors
167    ///
168    /// This method returns an error if the download operation fails or if there
169    /// is an issue with the strategy configuration.
170    pub async fn download_with_policy<T: TryFrom<contents::Contents>>(
171        &self,
172        path: &Path,
173        strategy: &dyn strategies::StorageStrategy,
174    ) -> StorageResult<T> {
175        let res = strategy.download(self, path).await?;
176        contents::Contents::from(res).try_into().map_or_else(
177            |_| {
178                Err(StorageError::UnableToReadBytes {
179                    path: path.to_path_buf(),
180                })
181            },
182            |content| Ok(content),
183        )
184    }
185
186    /// Deletes content from the storage at the specified path.
187    ///
188    /// This method uses the selected strategy for the delete operation.
189    ///
190    /// # Examples
191    ///```
192    /// use loco_rs::storage;
193    /// use std::path::Path;
194    /// use bytes::Bytes;
195    /// pub async fn download() {
196    ///     let storage = storage::Storage::single(storage::drivers::mem::new());
197    ///     let path = Path::new("example.txt");
198    ///     let content = "Loco!";
199    ///     storage.upload(path, &Bytes::from(content)).await;
200    ///
201    ///     let result = storage.delete(path).await;
202    ///     assert!(result.is_ok());
203    /// }
204    /// ```
205    ///
206    /// # Errors
207    ///
208    /// This method returns an error if the delete operation fails or if there
209    /// is an issue with the strategy configuration.
210    pub async fn delete(&self, path: &Path) -> StorageResult<()> {
211        self.delete_with_policy(path, &*self.strategy).await
212    }
213
214    /// Deletes content from the storage at the specified path using a specific
215    /// strategy.
216    ///
217    /// This method allows specifying a custom strategy for the delete
218    /// operation.
219    ///
220    /// # Errors
221    ///
222    /// This method returns an error if the delete operation fails or if there
223    /// is an issue with the strategy configuration.    
224    pub async fn delete_with_policy(
225        &self,
226        path: &Path,
227        strategy: &dyn strategies::StorageStrategy,
228    ) -> StorageResult<()> {
229        strategy.delete(self, path).await
230    }
231
232    /// Renames content from one path to another in the storage.
233    ///
234    /// This method uses the selected strategy for the rename operation.
235    ///
236    /// # Examples
237    ///```
238    /// use loco_rs::storage;
239    /// use std::path::Path;
240    /// use bytes::Bytes;
241    /// pub async fn download() {
242    ///     let storage = storage::Storage::single(storage::drivers::mem::new());
243    ///     let path = Path::new("example.txt");
244    ///     let content = "Loco!";
245    ///     storage.upload(path, &Bytes::from(content)).await;
246    ///     
247    ///     let new_path = Path::new("new_path.txt");
248    ///     let store = storage.as_store("default").unwrap();
249    ///     assert!(storage.rename(&path, &new_path).await.is_ok());
250    ///     assert!(!store.exists(&path).await.unwrap());
251    ///     assert!(store.exists(&new_path).await.unwrap());
252    /// }
253    /// ```
254    ///
255    /// # Errors
256    ///
257    /// This method returns an error if the rename operation fails or if there
258    /// is an issue with the strategy configuration.
259    pub async fn rename(&self, from: &Path, to: &Path) -> StorageResult<()> {
260        self.rename_with_policy(from, to, &*self.strategy).await
261    }
262
263    /// Renames content from one path to another in the storage using a specific
264    /// strategy.
265    ///
266    /// This method allows specifying a custom strategy for the rename
267    /// operation.
268    ///
269    /// # Errors
270    ///
271    /// This method returns an error if the rename operation fails or if there
272    /// is an issue with the strategy configuration.
273    pub async fn rename_with_policy(
274        &self,
275        from: &Path,
276        to: &Path,
277        strategy: &dyn strategies::StorageStrategy,
278    ) -> StorageResult<()> {
279        strategy.rename(self, from, to).await
280    }
281
282    /// Copies content from one path to another in the storage.
283    ///
284    /// This method uses the selected strategy for the copy operation.
285    ///
286    /// # Examples
287    ///```
288    /// use loco_rs::storage;
289    /// use std::path::Path;
290    /// use bytes::Bytes;
291    /// pub async fn download() {
292    ///     let storage = storage::Storage::single(storage::drivers::mem::new());
293    ///     let path = Path::new("example.txt");
294    ///     let content = "Loco!";
295    ///     storage.upload(path, &Bytes::from(content)).await;
296    ///     
297    ///     let new_path = Path::new("new_path.txt");
298    ///     let store = storage.as_store("default").unwrap();
299    ///     assert!(storage.copy(&path, &new_path).await.is_ok());
300    ///     assert!(store.exists(&path).await.unwrap());
301    ///     assert!(store.exists(&new_path).await.unwrap());
302    /// }
303    /// ```
304    ///
305    /// # Errors
306    ///
307    /// This method returns an error if the copy operation fails or if there is
308    /// an issue with the strategy configuration.
309    pub async fn copy(&self, from: &Path, to: &Path) -> StorageResult<()> {
310        self.copy_with_policy(from, to, &*self.strategy).await
311    }
312
313    /// Copies content from one path to another in the storage using a specific
314    /// strategy.
315    ///
316    /// This method allows specifying a custom strategy for the copy operation.
317    ///
318    /// # Errors
319    ///
320    /// This method returns an error if the copy operation fails or if there is
321    /// an issue with the strategy configuration.
322    pub async fn copy_with_policy(
323        &self,
324        from: &Path,
325        to: &Path,
326        strategy: &dyn strategies::StorageStrategy,
327    ) -> StorageResult<()> {
328        strategy.copy(self, from, to).await
329    }
330
331    /// Returns a reference to the store with the specified name if exists.
332    ///
333    /// # Examples
334    ///```
335    /// use loco_rs::storage;
336    /// use std::path::Path;
337    /// use bytes::Bytes;
338    /// pub async fn download() {
339    ///     let storage = storage::Storage::single(storage::drivers::mem::new());
340    ///     assert!(storage.as_store("default").is_some());
341    ///     assert!(storage.as_store("store_2").is_none());
342    /// }
343    /// ```
344    ///
345    /// # Returns
346    /// Return None if the given name not found.
347    #[must_use]
348    pub fn as_store(&self, name: &str) -> Option<&dyn StoreDriver> {
349        self.stores.get(name).map(|s| &**s)
350    }
351
352    /// Returns a reference to the store with the specified name.
353    ///
354    /// # Examples
355    ///```
356    /// use loco_rs::storage;
357    /// use std::path::Path;
358    /// use bytes::Bytes;
359    /// pub async fn download() {
360    ///     let storage = storage::Storage::single(storage::drivers::mem::new());
361    ///     assert!(storage.as_store_err("default").is_ok());
362    ///     assert!(storage.as_store_err("store_2").is_err());
363    /// }
364    /// ```
365    ///
366    /// # Errors
367    ///
368    /// Return an error if the given store name not exists
369    // REVIEW(nd): not sure bout the name 'as_store_err' -- it returns result
370    pub fn as_store_err(&self, name: &str) -> StorageResult<&dyn StoreDriver> {
371        self.as_store(name)
372            .ok_or(StorageError::StoreNotFound(name.to_string()))
373    }
374
375    /// Downloads content from storage as a stream, enabling efficient
376    /// handling of large files without loading them entirely into memory.
377    ///
378    /// This method uses the selected strategy for the download operation.
379    ///
380    /// # Examples
381    ///```
382    /// use loco_rs::storage;
383    /// use std::path::Path;
384    /// pub async fn stream_download() {
385    ///     let storage = storage::Storage::single(storage::drivers::mem::new());
386    ///     let path = Path::new("large_file.mp4");
387    ///     
388    ///     let stream = storage.download_stream(path).await.unwrap();
389    ///     // Stream can be converted to axum Body for HTTP response
390    ///     // let body = stream.into_body();
391    /// }
392    /// ```
393    ///
394    /// # Errors
395    ///
396    /// This method returns an error if the download operation fails or if there
397    /// is an issue with the strategy configuration.
398    pub async fn download_stream(&self, path: &Path) -> StorageResult<BytesStream> {
399        self.download_stream_with_policy(path, &*self.strategy)
400            .await
401    }
402
403    /// Downloads content from storage as a stream using a specific strategy.
404    ///
405    /// # Errors
406    ///
407    /// This method returns an error if the download operation fails or if there
408    /// is an issue with the strategy configuration.
409    pub async fn download_stream_with_policy(
410        &self,
411        path: &Path,
412        strategy: &dyn strategies::StorageStrategy,
413    ) -> StorageResult<BytesStream> {
414        strategy.download_stream(self, path).await
415    }
416
417    /// Uploads content from a stream to storage, enabling efficient
418    /// handling of large files without loading them entirely into memory.
419    ///
420    /// This method uses the selected strategy for the upload operation.
421    ///
422    /// # Examples
423    ///```
424    /// use loco_rs::storage;
425    /// use std::path::Path;
426    /// pub async fn stream_upload(stream: storage::stream::BytesStream) {
427    ///     let storage = storage::Storage::single(storage::drivers::mem::new());
428    ///     let path = Path::new("large_file.mp4");
429    ///     
430    ///     storage.upload_stream(path, stream).await.unwrap();
431    /// }
432    /// ```
433    ///
434    /// # Errors
435    ///
436    /// This method returns an error if the upload operation fails or if there
437    /// is an issue with the strategy configuration.
438    pub async fn upload_stream(&self, path: &Path, stream: BytesStream) -> StorageResult<()> {
439        self.upload_stream_with_policy(path, stream, &*self.strategy)
440            .await
441    }
442
443    /// Uploads content from a stream using a specific strategy.
444    ///
445    /// # Errors
446    ///
447    /// This method returns an error if the upload operation fails or if there
448    /// is an issue with the strategy configuration.
449    pub async fn upload_stream_with_policy(
450        &self,
451        path: &Path,
452        stream: BytesStream,
453        strategy: &dyn strategies::StorageStrategy,
454    ) -> StorageResult<()> {
455        strategy.upload_stream(self, path, stream).await
456    }
457}