active_storage/multi_store.rs
1//! # MultiStore Module
2//!
3//! The `multi_store` module defines a struct `MultiStore` for managing multiple
4//! stores, including a primary store and mirrors. It also introduces a `Mirror`
5//! struct for performing mirroring operations across multiple stores.
6//!
7//! ## Example
8//!
9//! ```rust
10//! # #[cfg(feature = "derive")] {
11#![doc = include_str!("../examples/multi.rs")]
12//! # }
13//! ```
14use std::{
15 collections::{BTreeMap, HashMap},
16 path::Path,
17};
18
19use crate::{
20 errors::{DriverError, MirrorError, MirrorResult},
21 store::Store,
22};
23
24/// Enum representing the mirroring policy for [`MultiStore`].
25#[derive(Clone, Debug, PartialEq, Eq)]
26pub enum Policy {
27 /// Continue mirroring to other stores even if one fails
28 ContinueOnFailure,
29 /// Stop mirroring if any store fails
30 StopOnFailure,
31}
32
33/// Struct representing a [`MultiStore`] that manages multiple stores, including
34/// a primary store and mirrors.
35#[derive(Clone)]
36pub struct MultiStore {
37 pub primary: Store,
38 mirrors: HashMap<String, Vec<String>>,
39 mirrors_policy: Policy,
40 stores: HashMap<String, Store>,
41}
42
43impl MultiStore {
44 /// Creates a new [`MultiStore`] with the provided primary store.
45 ///
46 /// # Example
47 /// ```rust
48 /// use std::{collections::HashMap, path::PathBuf};
49 /// use active_storage::{drivers, multi_store::MultiStore, StoreConfig};
50 ///
51 /// #[tokio::main]
52 /// async fn main() {
53 /// let config = drivers::disk::Config {
54 /// location: PathBuf::from("tmp").join("primary-storage"),
55 /// };
56 /// let disk_driver = StoreConfig::Disk(config).build().await.unwrap();
57 ///
58 /// let inmem_driver = StoreConfig::InMem().build().await.unwrap();
59 ///
60 /// let mut multi_store = MultiStore::new(disk_driver);
61 /// multi_store.add_stores(HashMap::from([("secondary", inmem_driver)]));
62 /// }
63 /// ```
64 #[must_use]
65 pub fn new(store: Store) -> Self {
66 Self {
67 primary: store,
68 mirrors: HashMap::new(),
69 mirrors_policy: Policy::ContinueOnFailure,
70 stores: HashMap::new(),
71 }
72 }
73
74 /// Adds a Stores to the [`MultiStore`].
75 pub fn add_stores(&mut self, stores: HashMap<&str, Store>) -> &mut Self {
76 for (name, stores) in stores {
77 self.stores.insert(name.to_string(), stores);
78 }
79
80 self
81 }
82
83 /// Sets the mirroring policy for the [`MultiStore`].
84 pub fn set_mirrors_policy(&mut self, policy: Policy) -> &mut Self {
85 self.mirrors_policy = policy;
86 self
87 }
88
89 /// Getting single store
90 pub fn get_store(&mut self, name: &str) -> Option<&Store> {
91 self.stores.get(name)
92 }
93
94 /// Adds mirrors to the [`MultiStore`] with the specified name and store
95 /// names.
96 ///
97 /// # Errors
98 ///
99 /// Returns an error if any of the specified stores are not defined in the
100 /// [`MultiStore`].
101 pub fn add_mirrors(&mut self, name: &str, stores_names: &[&str]) -> Result<&mut Self, String> {
102 let unknown_stores = stores_names
103 .iter()
104 .filter(|&&user_store_name| !self.stores.contains_key(user_store_name))
105 .map(std::string::ToString::to_string)
106 .collect::<Vec<_>>();
107
108 if !unknown_stores.is_empty() {
109 return Err(format!(
110 "the stores: {} not defined",
111 unknown_stores.join(",")
112 ));
113 };
114
115 self.mirrors.insert(
116 name.to_string(),
117 stores_names.iter().map(|&s| s.to_string()).collect(),
118 );
119 Ok(self)
120 }
121
122 /// Creates a Mirror struct for mirroring operations from the primary store.
123 #[must_use]
124 pub fn mirror_stores_from_primary(&self) -> Mirror<'_> {
125 let mut stores = BTreeMap::from([("primary", &self.primary)]);
126 for (name, store) in &self.stores {
127 stores.insert(name, store);
128 }
129
130 Mirror {
131 policy: &self.mirrors_policy,
132 stores,
133 }
134 }
135
136 /// Mirror stores by mirror key
137 #[must_use]
138 pub fn mirror(&self, name: &str) -> Option<Mirror<'_>> {
139 let stores_name = self.mirrors.get(name)?;
140 let mut stores = BTreeMap::new();
141 for (name, store) in &self.stores {
142 if stores_name.contains(name) {
143 stores.insert(name.as_str(), store);
144 }
145 }
146
147 Some(Mirror {
148 policy: &self.mirrors_policy,
149 stores,
150 })
151 }
152}
153
154/// Struct representing a mirror for mirroring operations across multiple
155/// stores.
156pub struct Mirror<'a> {
157 policy: &'a Policy,
158 stores: BTreeMap<&'a str, &'a Store>,
159}
160
161impl<'a> Mirror<'a> {
162 /// Writes content to all stores in the mirror.
163 ///
164 ///
165 /// # Example
166 ///
167 /// ```rust
168 /// use std::{collections::HashMap, path::PathBuf};
169 /// use active_storage::{drivers, multi_store::MultiStore, StoreConfig};
170 ///
171 /// #[tokio::main]
172 /// async fn main() {
173 /// let config = drivers::disk::Config {
174 /// location: PathBuf::from("tmp").join("primary-storage"),
175 /// };
176 /// let disk_driver = StoreConfig::Disk(config).build().await.unwrap();
177 ///
178 /// let inmem_driver = StoreConfig::InMem().build().await.unwrap();
179 ///
180 /// let mut multi_store = MultiStore::new(disk_driver);
181 /// multi_store.add_stores(HashMap::from([("secondary", inmem_driver)]));
182 ///
183 /// let _ = multi_store
184 /// .mirror_stores_from_primary()
185 /// .write(PathBuf::from("test").as_path(), b"content")
186 /// .await;
187 /// }
188 /// ```
189 ///
190 /// # Errors
191 ///
192 /// Depend of the mirror policy return operation failure
193 pub async fn write<C>(&self, path: &Path, content: C) -> MirrorResult<()>
194 where
195 C: AsRef<[u8]> + Send,
196 {
197 let mut error_stores = BTreeMap::new();
198 for (name, store) in &self.stores {
199 if let Err(error) = store.write(path, content.as_ref().to_vec()).await {
200 self.handle_error_policy(name, error, &mut error_stores)?;
201 }
202 }
203
204 if error_stores.is_empty() {
205 Ok(())
206 } else {
207 Err(MirrorError::MirrorFailedOnStores(error_stores))
208 }
209 }
210
211 /// Deletes a file from all stores in the mirror.
212 ///
213 /// # Example
214 ///
215 /// ```rust
216 /// use std::{collections::HashMap, path::PathBuf};
217 /// use active_storage::{drivers, multi_store::MultiStore, StoreConfig};
218 ///
219 /// #[tokio::main]
220 /// async fn main() {
221 /// let config = drivers::disk::Config {
222 /// location: PathBuf::from("tmp").join("store-1"),
223 /// };
224 /// let disk_driver = StoreConfig::Disk(config).build().await.unwrap();
225 ///
226 /// let inmem_driver = StoreConfig::InMem().build().await.unwrap();
227 ///
228 /// let mut multi_store = MultiStore::new(disk_driver);
229 /// multi_store.add_stores(HashMap::from([("secondary", inmem_driver)]));
230 ///
231 /// let _ = multi_store
232 /// .mirror_stores_from_primary()
233 /// .write(PathBuf::from("test").as_path(), b"content")
234 /// .await;
235 ///
236 /// let _ = multi_store
237 /// .mirror_stores_from_primary()
238 /// .delete(PathBuf::from("test").as_path())
239 /// .await;
240 /// }
241 /// ```
242 ///
243 /// # Errors
244 ///
245 /// Depend of the mirror policy return operation failure
246 pub async fn delete(&self, path: &Path) -> MirrorResult<()> {
247 let mut error_stores = BTreeMap::new();
248 for (name, store) in &self.stores {
249 if let Err(error) = store.delete(path).await {
250 self.handle_error_policy(name, error, &mut error_stores)?;
251 }
252 }
253
254 if error_stores.is_empty() {
255 Ok(())
256 } else {
257 Err(MirrorError::MirrorFailedOnStores(error_stores))
258 }
259 }
260
261 /// Deletes a directory from all stores in the mirror.
262 ///
263 /// # Example
264 ///
265 /// ```rust
266 /// use std::{collections::HashMap, path::PathBuf};
267 /// use active_storage::{drivers, multi_store::MultiStore, StoreConfig};
268 ///
269 /// #[tokio::main]
270 /// async fn main() {
271 /// let config = drivers::disk::Config {
272 /// location: PathBuf::from("tmp").join("primary-storage"),
273 /// };
274 /// let disk_driver = StoreConfig::Disk(config).build().await.unwrap();
275 ///
276 /// let inmem_driver = StoreConfig::InMem().build().await.unwrap();
277 ///
278 /// let mut multi_store = MultiStore::new(disk_driver);
279 /// multi_store.add_stores(HashMap::from([("secondary", inmem_driver)]));
280 ///
281 /// let _ = multi_store
282 /// .mirror_stores_from_primary()
283 /// .write(PathBuf::from("folder").join("file").as_path(), b"content")
284 /// .await;
285 ///
286 /// let _ = multi_store
287 /// .mirror_stores_from_primary()
288 /// .delete_directory(PathBuf::from("folder").as_path())
289 /// .await;
290 /// }
291 /// ```
292 ///
293 /// # Errors
294 ///
295 /// Returns an error if any store fails to delete the directory.
296 pub async fn delete_directory(&self, path: &Path) -> MirrorResult<()> {
297 let mut error_stores = BTreeMap::new();
298 for (name, store) in &self.stores {
299 if let Err(error) = store.delete_directory(path).await {
300 self.handle_error_policy(name, error, &mut error_stores)?;
301 }
302 }
303
304 if error_stores.is_empty() {
305 Ok(())
306 } else {
307 Err(MirrorError::MirrorFailedOnStores(error_stores))
308 }
309 }
310
311 /// Handles the mirroring error policy based on the specified store's
312 /// failure.
313 fn handle_error_policy(
314 &self,
315 store_name: &str,
316 error: DriverError,
317 error_stores: &mut BTreeMap<String, DriverError>,
318 ) -> MirrorResult<()> {
319 match self.policy {
320 Policy::ContinueOnFailure => {
321 error_stores.insert((*store_name).to_string(), error);
322 Ok(())
323 }
324 Policy::StopOnFailure => Err(MirrorError::MirrorFailedOnStore(
325 (*store_name).to_string(),
326 error,
327 )),
328 }
329 }
330}
331
332#[cfg(test)]
333mod tests {
334
335 use super::*;
336 use crate::StoreConfig;
337
338 #[tokio::test]
339 async fn can_add_store() {
340 let store = StoreConfig::InMem().build().await.unwrap();
341 let mut multi_store = MultiStore::new(store);
342
343 assert_eq!(multi_store.stores.len(), 0);
344
345 let store_1 = StoreConfig::InMem().build().await.unwrap();
346 let store_2 = StoreConfig::InMem().build().await.unwrap();
347
348 let stores = HashMap::from([("foo", store_1), ("bar", store_2)]);
349
350 multi_store.add_stores(stores);
351 assert_eq!(multi_store.stores.len(), 2);
352 }
353
354 #[tokio::test]
355 async fn can_update_policy() {
356 let store = StoreConfig::InMem().build().await.unwrap();
357 let mut multi_store = MultiStore::new(store);
358
359 let init_policy = multi_store.mirrors_policy.clone();
360
361 multi_store.set_mirrors_policy(Policy::StopOnFailure);
362
363 assert!(init_policy != multi_store.mirrors_policy);
364 }
365
366 #[tokio::test]
367 async fn can_add_mirrors() {
368 let store = StoreConfig::InMem().build().await.unwrap();
369 let mut multi_store = MultiStore::new(store);
370
371 let store_1 = StoreConfig::InMem().build().await.unwrap();
372 let store_2 = StoreConfig::InMem().build().await.unwrap();
373 let stores = HashMap::from([("bar-store", store_1), ("baz-store", store_2)]);
374 multi_store.add_stores(stores);
375
376 assert_eq!(multi_store.mirrors.len(), 0);
377
378 assert!(multi_store
379 .add_mirrors(
380 "mirror-bar-and-baz",
381 vec!["bar-store", "baz-store"].as_slice()
382 )
383 .is_ok());
384
385 assert_eq!(
386 multi_store.mirrors.get("mirror-bar-and-baz").unwrap().len(),
387 2
388 );
389
390 assert_eq!(
391 multi_store
392 .add_mirrors(
393 "bar",
394 vec!["baz-store", "un-existing 1", "un-existing 2"].as_slice()
395 )
396 .err(),
397 Some("the stores: un-existing 1,un-existing 2 not defined".to_string())
398 );
399 }
400}