loco_rs/storage/mod.rs
1//! # Storage Module
2//!
3//! This module defines a generic storage abstraction represented by the
4//! [`Storage`] struct. It provides methods for performing common storage
5//! operations such as upload, download, delete, rename, and copy.
6//!
7//! ## Storage Strategy
8//!
9//! The [`Storage`] struct is designed to work with different storage
10//! strategies. A storage strategy defines the behavior of the storage
11//! operations. Strategies implement the [`strategies::StorageStrategy`].
12//! The selected strategy can be dynamically changed at runtime.
13mod contents;
14pub mod drivers;
15pub mod strategies;
16pub mod stream;
17use std::{
18 collections::BTreeMap,
19 path::{Path, PathBuf},
20};
21
22use bytes::Bytes;
23
24use self::{drivers::StoreDriver, stream::BytesStream};
25
26#[derive(thiserror::Error, Debug)]
27#[allow(clippy::module_name_repetitions)]
28pub enum StorageError {
29 #[error("store not found by the given key: {0}")]
30 StoreNotFound(String),
31
32 #[error(transparent)]
33 Store(#[from] Box<opendal::Error>),
34
35 #[error("Unable to read data from file {}", path.display().to_string())]
36 UnableToReadBytes { path: PathBuf },
37
38 #[error("secondaries errors")]
39 Multi(BTreeMap<String, String>),
40
41 #[error(transparent)]
42 Any(#[from] Box<dyn std::error::Error + Send + Sync>),
43}
44
45pub type StorageResult<T> = std::result::Result<T, StorageError>;
46
47impl From<opendal::Error> for StorageError {
48 fn from(val: opendal::Error) -> Self {
49 Self::Store(Box::new(val))
50 }
51}
52
53pub struct Storage {
54 pub stores: BTreeMap<String, Box<dyn StoreDriver>>,
55 pub strategy: Box<dyn strategies::StorageStrategy>,
56}
57
58impl Storage {
59 /// Creates a new storage instance with a single store and the default
60 /// strategy.
61 ///
62 /// # Examples
63 ///```
64 /// use loco_rs::storage;
65 ///
66 /// let storage = storage::Storage::single(storage::drivers::mem::new());
67 /// ```
68 #[must_use]
69 pub fn single(store: Box<dyn StoreDriver>) -> Self {
70 let default_key = "store";
71 Self {
72 strategy: Box::new(strategies::single::SingleStrategy::new(default_key)),
73 stores: BTreeMap::from([(default_key.to_string(), store)]),
74 }
75 }
76
77 /// Creates a new storage instance with the provided stores and strategy.
78 #[must_use]
79 pub fn new(
80 stores: BTreeMap<String, Box<dyn StoreDriver>>,
81 strategy: Box<dyn strategies::StorageStrategy>,
82 ) -> Self {
83 Self { stores, strategy }
84 }
85
86 /// Uploads content to the storage at the specified path.
87 ///
88 /// This method uses the selected strategy for the upload operation.
89 ///
90 /// # Examples
91 ///```
92 /// use loco_rs::storage;
93 /// use std::path::Path;
94 /// use bytes::Bytes;
95 /// pub async fn upload() {
96 /// let storage = storage::Storage::single(storage::drivers::mem::new());
97 /// let path = Path::new("example.txt");
98 /// let content = "Loco!";
99 /// let result = storage.upload(path, &Bytes::from(content)).await;
100 /// assert!(result.is_ok());
101 /// }
102 /// ```
103 ///
104 /// # Errors
105 ///
106 /// This method returns an error if the upload operation fails or if there
107 /// is an issue with the strategy configuration.
108 pub async fn upload(&self, path: &Path, content: &Bytes) -> StorageResult<()> {
109 self.upload_with_strategy(path, content, &*self.strategy)
110 .await
111 }
112
113 /// Uploads content to the storage at the specified path using a specific
114 /// strategy.
115 ///
116 /// This method allows specifying a custom strategy for the upload
117 /// operation.
118 ///
119 /// # Errors
120 ///
121 /// This method returns an error if the upload operation fails or if there
122 /// is an issue with the strategy configuration.
123 pub async fn upload_with_strategy(
124 &self,
125 path: &Path,
126 content: &Bytes,
127 strategy: &dyn strategies::StorageStrategy,
128 ) -> StorageResult<()> {
129 strategy.upload(self, path, content).await
130 }
131
132 /// Downloads content from the storage at the specified path.
133 ///
134 /// This method uses the selected strategy for the download operation.
135 ///
136 /// # Examples
137 ///```
138 /// use loco_rs::storage;
139 /// use std::path::Path;
140 /// use bytes::Bytes;
141 /// pub async fn download() {
142 /// let storage = storage::Storage::single(storage::drivers::mem::new());
143 /// let path = Path::new("example.txt");
144 /// let content = "Loco!";
145 /// storage.upload(path, &Bytes::from(content)).await;
146 ///
147 /// let result: String = storage.download(path).await.unwrap();
148 /// assert_eq!(result, "Loco!");
149 /// }
150 /// ```
151 ///
152 /// # Errors
153 ///
154 /// This method returns an error if the download operation fails or if there
155 /// is an issue with the strategy configuration.
156 pub async fn download<T: TryFrom<contents::Contents>>(&self, path: &Path) -> StorageResult<T> {
157 self.download_with_policy(path, &*self.strategy).await
158 }
159
160 /// Downloads content from the storage at the specified path using a
161 /// specific strategy.
162 ///
163 /// This method allows specifying a custom strategy for the download
164 /// operation.
165 ///
166 /// # Errors
167 ///
168 /// This method returns an error if the download operation fails or if there
169 /// is an issue with the strategy configuration.
170 pub async fn download_with_policy<T: TryFrom<contents::Contents>>(
171 &self,
172 path: &Path,
173 strategy: &dyn strategies::StorageStrategy,
174 ) -> StorageResult<T> {
175 let res = strategy.download(self, path).await?;
176 contents::Contents::from(res).try_into().map_or_else(
177 |_| {
178 Err(StorageError::UnableToReadBytes {
179 path: path.to_path_buf(),
180 })
181 },
182 |content| Ok(content),
183 )
184 }
185
186 /// Deletes content from the storage at the specified path.
187 ///
188 /// This method uses the selected strategy for the delete operation.
189 ///
190 /// # Examples
191 ///```
192 /// use loco_rs::storage;
193 /// use std::path::Path;
194 /// use bytes::Bytes;
195 /// pub async fn download() {
196 /// let storage = storage::Storage::single(storage::drivers::mem::new());
197 /// let path = Path::new("example.txt");
198 /// let content = "Loco!";
199 /// storage.upload(path, &Bytes::from(content)).await;
200 ///
201 /// let result = storage.delete(path).await;
202 /// assert!(result.is_ok());
203 /// }
204 /// ```
205 ///
206 /// # Errors
207 ///
208 /// This method returns an error if the delete operation fails or if there
209 /// is an issue with the strategy configuration.
210 pub async fn delete(&self, path: &Path) -> StorageResult<()> {
211 self.delete_with_policy(path, &*self.strategy).await
212 }
213
214 /// Deletes content from the storage at the specified path using a specific
215 /// strategy.
216 ///
217 /// This method allows specifying a custom strategy for the delete
218 /// operation.
219 ///
220 /// # Errors
221 ///
222 /// This method returns an error if the delete operation fails or if there
223 /// is an issue with the strategy configuration.
224 pub async fn delete_with_policy(
225 &self,
226 path: &Path,
227 strategy: &dyn strategies::StorageStrategy,
228 ) -> StorageResult<()> {
229 strategy.delete(self, path).await
230 }
231
232 /// Renames content from one path to another in the storage.
233 ///
234 /// This method uses the selected strategy for the rename operation.
235 ///
236 /// # Examples
237 ///```
238 /// use loco_rs::storage;
239 /// use std::path::Path;
240 /// use bytes::Bytes;
241 /// pub async fn download() {
242 /// let storage = storage::Storage::single(storage::drivers::mem::new());
243 /// let path = Path::new("example.txt");
244 /// let content = "Loco!";
245 /// storage.upload(path, &Bytes::from(content)).await;
246 ///
247 /// let new_path = Path::new("new_path.txt");
248 /// let store = storage.as_store("default").unwrap();
249 /// assert!(storage.rename(&path, &new_path).await.is_ok());
250 /// assert!(!store.exists(&path).await.unwrap());
251 /// assert!(store.exists(&new_path).await.unwrap());
252 /// }
253 /// ```
254 ///
255 /// # Errors
256 ///
257 /// This method returns an error if the rename operation fails or if there
258 /// is an issue with the strategy configuration.
259 pub async fn rename(&self, from: &Path, to: &Path) -> StorageResult<()> {
260 self.rename_with_policy(from, to, &*self.strategy).await
261 }
262
263 /// Renames content from one path to another in the storage using a specific
264 /// strategy.
265 ///
266 /// This method allows specifying a custom strategy for the rename
267 /// operation.
268 ///
269 /// # Errors
270 ///
271 /// This method returns an error if the rename operation fails or if there
272 /// is an issue with the strategy configuration.
273 pub async fn rename_with_policy(
274 &self,
275 from: &Path,
276 to: &Path,
277 strategy: &dyn strategies::StorageStrategy,
278 ) -> StorageResult<()> {
279 strategy.rename(self, from, to).await
280 }
281
282 /// Copies content from one path to another in the storage.
283 ///
284 /// This method uses the selected strategy for the copy operation.
285 ///
286 /// # Examples
287 ///```
288 /// use loco_rs::storage;
289 /// use std::path::Path;
290 /// use bytes::Bytes;
291 /// pub async fn download() {
292 /// let storage = storage::Storage::single(storage::drivers::mem::new());
293 /// let path = Path::new("example.txt");
294 /// let content = "Loco!";
295 /// storage.upload(path, &Bytes::from(content)).await;
296 ///
297 /// let new_path = Path::new("new_path.txt");
298 /// let store = storage.as_store("default").unwrap();
299 /// assert!(storage.copy(&path, &new_path).await.is_ok());
300 /// assert!(store.exists(&path).await.unwrap());
301 /// assert!(store.exists(&new_path).await.unwrap());
302 /// }
303 /// ```
304 ///
305 /// # Errors
306 ///
307 /// This method returns an error if the copy operation fails or if there is
308 /// an issue with the strategy configuration.
309 pub async fn copy(&self, from: &Path, to: &Path) -> StorageResult<()> {
310 self.copy_with_policy(from, to, &*self.strategy).await
311 }
312
313 /// Copies content from one path to another in the storage using a specific
314 /// strategy.
315 ///
316 /// This method allows specifying a custom strategy for the copy operation.
317 ///
318 /// # Errors
319 ///
320 /// This method returns an error if the copy operation fails or if there is
321 /// an issue with the strategy configuration.
322 pub async fn copy_with_policy(
323 &self,
324 from: &Path,
325 to: &Path,
326 strategy: &dyn strategies::StorageStrategy,
327 ) -> StorageResult<()> {
328 strategy.copy(self, from, to).await
329 }
330
331 /// Returns a reference to the store with the specified name if exists.
332 ///
333 /// # Examples
334 ///```
335 /// use loco_rs::storage;
336 /// use std::path::Path;
337 /// use bytes::Bytes;
338 /// pub async fn download() {
339 /// let storage = storage::Storage::single(storage::drivers::mem::new());
340 /// assert!(storage.as_store("default").is_some());
341 /// assert!(storage.as_store("store_2").is_none());
342 /// }
343 /// ```
344 ///
345 /// # Returns
346 /// Return None if the given name not found.
347 #[must_use]
348 pub fn as_store(&self, name: &str) -> Option<&dyn StoreDriver> {
349 self.stores.get(name).map(|s| &**s)
350 }
351
352 /// Returns a reference to the store with the specified name.
353 ///
354 /// # Examples
355 ///```
356 /// use loco_rs::storage;
357 /// use std::path::Path;
358 /// use bytes::Bytes;
359 /// pub async fn download() {
360 /// let storage = storage::Storage::single(storage::drivers::mem::new());
361 /// assert!(storage.as_store_err("default").is_ok());
362 /// assert!(storage.as_store_err("store_2").is_err());
363 /// }
364 /// ```
365 ///
366 /// # Errors
367 ///
368 /// Return an error if the given store name not exists
369 // REVIEW(nd): not sure bout the name 'as_store_err' -- it returns result
370 pub fn as_store_err(&self, name: &str) -> StorageResult<&dyn StoreDriver> {
371 self.as_store(name)
372 .ok_or(StorageError::StoreNotFound(name.to_string()))
373 }
374
375 /// Downloads content from storage as a stream, enabling efficient
376 /// handling of large files without loading them entirely into memory.
377 ///
378 /// This method uses the selected strategy for the download operation.
379 ///
380 /// # Examples
381 ///```
382 /// use loco_rs::storage;
383 /// use std::path::Path;
384 /// pub async fn stream_download() {
385 /// let storage = storage::Storage::single(storage::drivers::mem::new());
386 /// let path = Path::new("large_file.mp4");
387 ///
388 /// let stream = storage.download_stream(path).await.unwrap();
389 /// // Stream can be converted to axum Body for HTTP response
390 /// // let body = stream.into_body();
391 /// }
392 /// ```
393 ///
394 /// # Errors
395 ///
396 /// This method returns an error if the download operation fails or if there
397 /// is an issue with the strategy configuration.
398 pub async fn download_stream(&self, path: &Path) -> StorageResult<BytesStream> {
399 self.download_stream_with_policy(path, &*self.strategy)
400 .await
401 }
402
403 /// Downloads content from storage as a stream using a specific strategy.
404 ///
405 /// # Errors
406 ///
407 /// This method returns an error if the download operation fails or if there
408 /// is an issue with the strategy configuration.
409 pub async fn download_stream_with_policy(
410 &self,
411 path: &Path,
412 strategy: &dyn strategies::StorageStrategy,
413 ) -> StorageResult<BytesStream> {
414 strategy.download_stream(self, path).await
415 }
416
417 /// Uploads content from a stream to storage, enabling efficient
418 /// handling of large files without loading them entirely into memory.
419 ///
420 /// This method uses the selected strategy for the upload operation.
421 ///
422 /// # Examples
423 ///```
424 /// use loco_rs::storage;
425 /// use std::path::Path;
426 /// pub async fn stream_upload(stream: storage::stream::BytesStream) {
427 /// let storage = storage::Storage::single(storage::drivers::mem::new());
428 /// let path = Path::new("large_file.mp4");
429 ///
430 /// storage.upload_stream(path, stream).await.unwrap();
431 /// }
432 /// ```
433 ///
434 /// # Errors
435 ///
436 /// This method returns an error if the upload operation fails or if there
437 /// is an issue with the strategy configuration.
438 pub async fn upload_stream(&self, path: &Path, stream: BytesStream) -> StorageResult<()> {
439 self.upload_stream_with_policy(path, stream, &*self.strategy)
440 .await
441 }
442
443 /// Uploads content from a stream using a specific strategy.
444 ///
445 /// # Errors
446 ///
447 /// This method returns an error if the upload operation fails or if there
448 /// is an issue with the strategy configuration.
449 pub async fn upload_stream_with_policy(
450 &self,
451 path: &Path,
452 stream: BytesStream,
453 strategy: &dyn strategies::StorageStrategy,
454 ) -> StorageResult<()> {
455 strategy.upload_stream(self, path, stream).await
456 }
457}