active_storage/
multi_store.rs

1//! # MultiStore Module
2//!
3//! The `multi_store` module defines a struct `MultiStore` for managing multiple
4//! stores, including a primary store and mirrors. It also introduces a `Mirror`
5//! struct for performing mirroring operations across multiple stores.
6//!
7//! ## Example
8//!
9//! ```rust
10//! # #[cfg(feature = "derive")] {
11#![doc = include_str!("../examples/multi.rs")]
12//! # }
13//! ```
14use std::{
15    collections::{BTreeMap, HashMap},
16    path::Path,
17};
18
19use crate::{
20    errors::{DriverError, MirrorError, MirrorResult},
21    store::Store,
22};
23
24/// Enum representing the mirroring policy for [`MultiStore`].
25#[derive(Clone, Debug, PartialEq, Eq)]
26pub enum Policy {
27    /// Continue mirroring to other stores even if one fails
28    ContinueOnFailure,
29    /// Stop mirroring if any store fails
30    StopOnFailure,
31}
32
33/// Struct representing a [`MultiStore`] that manages multiple stores, including
34/// a primary store and mirrors.
35#[derive(Clone)]
36pub struct MultiStore {
37    pub primary: Store,
38    mirrors: HashMap<String, Vec<String>>,
39    mirrors_policy: Policy,
40    stores: HashMap<String, Store>,
41}
42
43impl MultiStore {
44    /// Creates a new [`MultiStore`] with the provided primary store.
45    ///
46    /// # Example
47    /// ```rust
48    /// use std::{collections::HashMap, path::PathBuf};
49    /// use active_storage::{drivers, multi_store::MultiStore, StoreConfig};
50    ///
51    /// #[tokio::main]
52    /// async fn main() {
53    ///     let config = drivers::disk::Config {
54    ///         location: PathBuf::from("tmp").join("primary-storage"),
55    ///     };
56    ///     let disk_driver = StoreConfig::Disk(config).build().await.unwrap();
57    ///
58    ///     let inmem_driver = StoreConfig::InMem().build().await.unwrap();
59    ///
60    ///     let mut multi_store = MultiStore::new(disk_driver);
61    ///     multi_store.add_stores(HashMap::from([("secondary", inmem_driver)]));
62    /// }    
63    /// ```
64    #[must_use]
65    pub fn new(store: Store) -> Self {
66        Self {
67            primary: store,
68            mirrors: HashMap::new(),
69            mirrors_policy: Policy::ContinueOnFailure,
70            stores: HashMap::new(),
71        }
72    }
73
74    /// Adds a Stores to the [`MultiStore`].
75    pub fn add_stores(&mut self, stores: HashMap<&str, Store>) -> &mut Self {
76        for (name, stores) in stores {
77            self.stores.insert(name.to_string(), stores);
78        }
79
80        self
81    }
82
83    /// Sets the mirroring policy for the [`MultiStore`].
84    pub fn set_mirrors_policy(&mut self, policy: Policy) -> &mut Self {
85        self.mirrors_policy = policy;
86        self
87    }
88
89    /// Getting single store
90    pub fn get_store(&mut self, name: &str) -> Option<&Store> {
91        self.stores.get(name)
92    }
93
94    /// Adds mirrors to the [`MultiStore`] with the specified name and store
95    /// names.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if any of the specified stores are not defined in the
100    /// [`MultiStore`].
101    pub fn add_mirrors(&mut self, name: &str, stores_names: &[&str]) -> Result<&mut Self, String> {
102        let unknown_stores = stores_names
103            .iter()
104            .filter(|&&user_store_name| !self.stores.contains_key(user_store_name))
105            .map(std::string::ToString::to_string)
106            .collect::<Vec<_>>();
107
108        if !unknown_stores.is_empty() {
109            return Err(format!(
110                "the stores: {} not defined",
111                unknown_stores.join(",")
112            ));
113        };
114
115        self.mirrors.insert(
116            name.to_string(),
117            stores_names.iter().map(|&s| s.to_string()).collect(),
118        );
119        Ok(self)
120    }
121
122    /// Creates a Mirror struct for mirroring operations from the primary store.
123    #[must_use]
124    pub fn mirror_stores_from_primary(&self) -> Mirror<'_> {
125        let mut stores = BTreeMap::from([("primary", &self.primary)]);
126        for (name, store) in &self.stores {
127            stores.insert(name, store);
128        }
129
130        Mirror {
131            policy: &self.mirrors_policy,
132            stores,
133        }
134    }
135
136    /// Mirror stores by mirror key
137    #[must_use]
138    pub fn mirror(&self, name: &str) -> Option<Mirror<'_>> {
139        let stores_name = self.mirrors.get(name)?;
140        let mut stores = BTreeMap::new();
141        for (name, store) in &self.stores {
142            if stores_name.contains(name) {
143                stores.insert(name.as_str(), store);
144            }
145        }
146
147        Some(Mirror {
148            policy: &self.mirrors_policy,
149            stores,
150        })
151    }
152}
153
154/// Struct representing a mirror for mirroring operations across multiple
155/// stores.
156pub struct Mirror<'a> {
157    policy: &'a Policy,
158    stores: BTreeMap<&'a str, &'a Store>,
159}
160
161impl<'a> Mirror<'a> {
162    /// Writes content to all stores in the mirror.
163    ///
164    ///
165    /// # Example
166    ///
167    /// ```rust
168    /// use std::{collections::HashMap, path::PathBuf};
169    /// use active_storage::{drivers, multi_store::MultiStore, StoreConfig};
170    ///
171    /// #[tokio::main]
172    /// async fn main() {
173    ///     let config = drivers::disk::Config {
174    ///         location: PathBuf::from("tmp").join("primary-storage"),
175    ///     };
176    ///     let disk_driver = StoreConfig::Disk(config).build().await.unwrap();
177    ///
178    ///     let inmem_driver = StoreConfig::InMem().build().await.unwrap();
179    ///
180    ///     let mut multi_store = MultiStore::new(disk_driver);
181    ///     multi_store.add_stores(HashMap::from([("secondary", inmem_driver)]));
182    ///
183    ///     let _ = multi_store
184    ///    .mirror_stores_from_primary()
185    ///    .write(PathBuf::from("test").as_path(), b"content")
186    ///    .await;
187    /// }    
188    /// ```
189    ///
190    /// # Errors
191    ///
192    /// Depend of the mirror policy return operation failure
193    pub async fn write<C>(&self, path: &Path, content: C) -> MirrorResult<()>
194    where
195        C: AsRef<[u8]> + Send,
196    {
197        let mut error_stores = BTreeMap::new();
198        for (name, store) in &self.stores {
199            if let Err(error) = store.write(path, content.as_ref().to_vec()).await {
200                self.handle_error_policy(name, error, &mut error_stores)?;
201            }
202        }
203
204        if error_stores.is_empty() {
205            Ok(())
206        } else {
207            Err(MirrorError::MirrorFailedOnStores(error_stores))
208        }
209    }
210
211    /// Deletes a file from all stores in the mirror.
212    ///
213    /// # Example
214    ///
215    /// ```rust
216    /// use std::{collections::HashMap, path::PathBuf};
217    /// use active_storage::{drivers, multi_store::MultiStore, StoreConfig};
218    ///
219    /// #[tokio::main]
220    /// async fn main() {
221    ///     let config = drivers::disk::Config {
222    ///         location: PathBuf::from("tmp").join("store-1"),
223    ///     };
224    ///     let disk_driver = StoreConfig::Disk(config).build().await.unwrap();
225    ///
226    ///     let inmem_driver = StoreConfig::InMem().build().await.unwrap();
227    ///
228    ///     let mut multi_store = MultiStore::new(disk_driver);
229    ///     multi_store.add_stores(HashMap::from([("secondary", inmem_driver)]));
230    ///
231    ///     let _ = multi_store
232    ///    .mirror_stores_from_primary()
233    ///    .write(PathBuf::from("test").as_path(), b"content")
234    ///    .await;
235    ///
236    ///     let _ = multi_store
237    ///    .mirror_stores_from_primary()
238    ///    .delete(PathBuf::from("test").as_path())
239    ///    .await;
240    /// }    
241    /// ```
242    ///
243    /// # Errors
244    ///
245    /// Depend of the mirror policy return operation failure
246    pub async fn delete(&self, path: &Path) -> MirrorResult<()> {
247        let mut error_stores = BTreeMap::new();
248        for (name, store) in &self.stores {
249            if let Err(error) = store.delete(path).await {
250                self.handle_error_policy(name, error, &mut error_stores)?;
251            }
252        }
253
254        if error_stores.is_empty() {
255            Ok(())
256        } else {
257            Err(MirrorError::MirrorFailedOnStores(error_stores))
258        }
259    }
260
261    /// Deletes a directory from all stores in the mirror.
262    ///
263    /// # Example
264    ///
265    /// ```rust
266    /// use std::{collections::HashMap, path::PathBuf};
267    /// use active_storage::{drivers, multi_store::MultiStore, StoreConfig};
268    ///
269    /// #[tokio::main]
270    /// async fn main() {
271    ///     let config = drivers::disk::Config {
272    ///         location: PathBuf::from("tmp").join("primary-storage"),
273    ///     };
274    ///     let disk_driver = StoreConfig::Disk(config).build().await.unwrap();
275    ///
276    ///     let inmem_driver = StoreConfig::InMem().build().await.unwrap();
277    ///
278    ///     let mut multi_store = MultiStore::new(disk_driver);
279    ///     multi_store.add_stores(HashMap::from([("secondary", inmem_driver)]));
280    ///
281    ///     let _ = multi_store
282    ///    .mirror_stores_from_primary()
283    ///    .write(PathBuf::from("folder").join("file").as_path(), b"content")
284    ///    .await;
285    ///
286    ///     let _ = multi_store
287    ///    .mirror_stores_from_primary()
288    ///    .delete_directory(PathBuf::from("folder").as_path())
289    ///    .await;
290    /// }    
291    /// ```
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if any store fails to delete the directory.
296    pub async fn delete_directory(&self, path: &Path) -> MirrorResult<()> {
297        let mut error_stores = BTreeMap::new();
298        for (name, store) in &self.stores {
299            if let Err(error) = store.delete_directory(path).await {
300                self.handle_error_policy(name, error, &mut error_stores)?;
301            }
302        }
303
304        if error_stores.is_empty() {
305            Ok(())
306        } else {
307            Err(MirrorError::MirrorFailedOnStores(error_stores))
308        }
309    }
310
311    /// Handles the mirroring error policy based on the specified store's
312    /// failure.
313    fn handle_error_policy(
314        &self,
315        store_name: &str,
316        error: DriverError,
317        error_stores: &mut BTreeMap<String, DriverError>,
318    ) -> MirrorResult<()> {
319        match self.policy {
320            Policy::ContinueOnFailure => {
321                error_stores.insert((*store_name).to_string(), error);
322                Ok(())
323            }
324            Policy::StopOnFailure => Err(MirrorError::MirrorFailedOnStore(
325                (*store_name).to_string(),
326                error,
327            )),
328        }
329    }
330}
331
332#[cfg(test)]
333mod tests {
334
335    use super::*;
336    use crate::StoreConfig;
337
338    #[tokio::test]
339    async fn can_add_store() {
340        let store = StoreConfig::InMem().build().await.unwrap();
341        let mut multi_store = MultiStore::new(store);
342
343        assert_eq!(multi_store.stores.len(), 0);
344
345        let store_1 = StoreConfig::InMem().build().await.unwrap();
346        let store_2 = StoreConfig::InMem().build().await.unwrap();
347
348        let stores = HashMap::from([("foo", store_1), ("bar", store_2)]);
349
350        multi_store.add_stores(stores);
351        assert_eq!(multi_store.stores.len(), 2);
352    }
353
354    #[tokio::test]
355    async fn can_update_policy() {
356        let store = StoreConfig::InMem().build().await.unwrap();
357        let mut multi_store = MultiStore::new(store);
358
359        let init_policy = multi_store.mirrors_policy.clone();
360
361        multi_store.set_mirrors_policy(Policy::StopOnFailure);
362
363        assert!(init_policy != multi_store.mirrors_policy);
364    }
365
366    #[tokio::test]
367    async fn can_add_mirrors() {
368        let store = StoreConfig::InMem().build().await.unwrap();
369        let mut multi_store = MultiStore::new(store);
370
371        let store_1 = StoreConfig::InMem().build().await.unwrap();
372        let store_2 = StoreConfig::InMem().build().await.unwrap();
373        let stores = HashMap::from([("bar-store", store_1), ("baz-store", store_2)]);
374        multi_store.add_stores(stores);
375
376        assert_eq!(multi_store.mirrors.len(), 0);
377
378        assert!(multi_store
379            .add_mirrors(
380                "mirror-bar-and-baz",
381                vec!["bar-store", "baz-store"].as_slice()
382            )
383            .is_ok());
384
385        assert_eq!(
386            multi_store.mirrors.get("mirror-bar-and-baz").unwrap().len(),
387            2
388        );
389
390        assert_eq!(
391            multi_store
392                .add_mirrors(
393                    "bar",
394                    vec!["baz-store", "un-existing 1", "un-existing 2"].as_slice()
395                )
396                .err(),
397            Some("the stores: un-existing 1,un-existing 2 not defined".to_string())
398        );
399    }
400}