Skip to main content

rustrails_storage/service/
mirror.rs

1//! Mirror service that writes to one primary and best-effort mirrors.
2
3use std::{sync::Arc, time::Duration};
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use tracing::warn;
8use url::Url;
9
10use super::{DynStorageService, StorageError, StorageService};
11
12/// Storage service that reads from the primary and replicates writes to mirrors.
13#[derive(Clone)]
14pub struct MirrorService {
15    name: String,
16    primary: DynStorageService,
17    mirrors: Vec<DynStorageService>,
18}
19
20impl std::fmt::Debug for MirrorService {
21    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        formatter
23            .debug_struct("MirrorService")
24            .field("name", &self.name)
25            .field("primary", &self.primary.name())
26            .field("mirrors", &self.mirrors.len())
27            .finish()
28    }
29}
30
31impl MirrorService {
32    /// Creates a new mirror service.
33    #[must_use]
34    pub fn new(
35        name: impl Into<String>,
36        primary: Arc<dyn StorageService>,
37        mirrors: Vec<Arc<dyn StorageService>>,
38    ) -> Self {
39        Self {
40            name: name.into(),
41            primary,
42            mirrors,
43        }
44    }
45
46    /// Returns the primary service.
47    #[must_use]
48    pub fn primary(&self) -> &DynStorageService {
49        &self.primary
50    }
51
52    /// Returns the configured mirror services.
53    #[must_use]
54    pub fn mirrors(&self) -> &[DynStorageService] {
55        &self.mirrors
56    }
57}
58
59#[async_trait]
60impl StorageService for MirrorService {
61    fn name(&self) -> &str {
62        &self.name
63    }
64
65    async fn upload(&self, key: &str, data: Bytes) -> Result<(), StorageError> {
66        self.primary.upload(key, data.clone()).await?;
67        for mirror in &self.mirrors {
68            if let Err(error) = mirror.upload(key, data.clone()).await {
69                warn!(service = %mirror.name(), %key, error = %error, "mirror upload failed");
70            }
71        }
72        Ok(())
73    }
74
75    async fn download(&self, key: &str) -> Result<Bytes, StorageError> {
76        self.primary.download(key).await
77    }
78
79    async fn delete(&self, key: &str) -> Result<(), StorageError> {
80        self.primary.delete(key).await?;
81        for mirror in &self.mirrors {
82            if let Err(error) = mirror.delete(key).await {
83                warn!(service = %mirror.name(), %key, error = %error, "mirror delete failed");
84            }
85        }
86        Ok(())
87    }
88
89    async fn exists(&self, key: &str) -> Result<bool, StorageError> {
90        self.primary.exists(key).await
91    }
92
93    async fn url(&self, key: &str, expires_in: Duration) -> Result<Url, StorageError> {
94        self.primary.url(key, expires_in).await
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use std::sync::{Arc, Mutex};
101
102    use super::*;
103    use crate::service::memory::MemoryService;
104
105    #[derive(Debug)]
106    struct FailingService {
107        name: String,
108        uploaded: Mutex<Vec<String>>,
109        deleted: Mutex<Vec<String>>,
110    }
111
112    impl FailingService {
113        fn new(name: &str) -> Self {
114            Self {
115                name: name.to_owned(),
116                uploaded: Mutex::new(Vec::new()),
117                deleted: Mutex::new(Vec::new()),
118            }
119        }
120    }
121
122    #[async_trait]
123    impl StorageService for FailingService {
124        fn name(&self) -> &str {
125            &self.name
126        }
127
128        async fn upload(&self, key: &str, _data: Bytes) -> Result<(), StorageError> {
129            self.uploaded
130                .lock()
131                .expect("lock should succeed")
132                .push(key.to_owned());
133            Err(StorageError::InvalidUrl("mirror failed".to_owned()))
134        }
135
136        async fn download(&self, key: &str) -> Result<Bytes, StorageError> {
137            Err(StorageError::NotFound(key.to_owned()))
138        }
139
140        async fn delete(&self, _key: &str) -> Result<(), StorageError> {
141            self.deleted
142                .lock()
143                .expect("lock should succeed")
144                .push(_key.to_owned());
145            Err(StorageError::InvalidUrl("mirror failed".to_owned()))
146        }
147
148        async fn exists(&self, _key: &str) -> Result<bool, StorageError> {
149            Ok(false)
150        }
151
152        async fn url(&self, _key: &str, _expires_in: Duration) -> Result<Url, StorageError> {
153            Url::parse("https://mirror.invalid/")
154                .map_err(|error| StorageError::InvalidUrl(error.to_string()))
155        }
156    }
157
158    fn memory(name: &str) -> Arc<dyn StorageService> {
159        Arc::new(MemoryService::new(name).expect("service should build"))
160    }
161
162    #[tokio::test]
163    async fn test_accessors_expose_primary_and_mirrors() {
164        let primary = memory("primary");
165        let mirror_a = memory("mirror-a");
166        let mirror_b = memory("mirror-b");
167        let service = MirrorService::new(
168            "mirror",
169            primary.clone(),
170            vec![mirror_a.clone(), mirror_b.clone()],
171        );
172
173        assert_eq!(service.name(), "mirror");
174        assert_eq!(service.primary().name(), "primary");
175        assert_eq!(service.mirrors().len(), 2);
176        assert_eq!(service.mirrors()[0].name(), "mirror-a");
177        assert_eq!(service.mirrors()[1].name(), "mirror-b");
178    }
179
180    #[tokio::test]
181    async fn test_upload_writes_to_primary() {
182        let primary = memory("primary");
183        let mirror = memory("mirror");
184        let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);
185        service
186            .upload("a.txt", Bytes::from_static(b"hello"))
187            .await
188            .expect("upload should succeed");
189        assert_eq!(
190            primary
191                .download("a.txt")
192                .await
193                .expect("download should succeed"),
194            Bytes::from_static(b"hello")
195        );
196    }
197
198    #[tokio::test]
199    async fn test_upload_writes_to_mirrors() {
200        let primary = memory("primary");
201        let mirror = memory("mirror");
202        let service = MirrorService::new("mirror", primary, vec![mirror.clone()]);
203        service
204            .upload("a.txt", Bytes::from_static(b"hello"))
205            .await
206            .expect("upload should succeed");
207        assert_eq!(
208            mirror
209                .download("a.txt")
210                .await
211                .expect("download should succeed"),
212            Bytes::from_static(b"hello")
213        );
214    }
215
216    #[tokio::test]
217    async fn test_download_reads_from_primary_only() {
218        let primary = memory("primary");
219        let mirror = memory("mirror");
220        mirror
221            .upload("a.txt", Bytes::from_static(b"mirror"))
222            .await
223            .expect("upload should succeed");
224        primary
225            .upload("a.txt", Bytes::from_static(b"primary"))
226            .await
227            .expect("upload should succeed");
228        let service = MirrorService::new("mirror", primary, vec![mirror]);
229        assert_eq!(
230            service
231                .download("a.txt")
232                .await
233                .expect("download should succeed"),
234            Bytes::from_static(b"primary")
235        );
236    }
237
238    #[tokio::test]
239    async fn test_exists_reads_from_primary_only() {
240        let primary = memory("primary");
241        let mirror = memory("mirror");
242        mirror
243            .upload("a.txt", Bytes::from_static(b"mirror"))
244            .await
245            .expect("upload should succeed");
246        let service = MirrorService::new("mirror", primary, vec![mirror]);
247
248        assert!(
249            !service
250                .exists("a.txt")
251                .await
252                .expect("exists should succeed")
253        );
254    }
255
256    #[tokio::test]
257    async fn test_failed_mirror_does_not_fail_upload() {
258        let primary = memory("primary");
259        let failing = Arc::new(FailingService::new("failing"));
260        let service = MirrorService::new("mirror", primary.clone(), vec![failing.clone()]);
261        service
262            .upload("a.txt", Bytes::from_static(b"hello"))
263            .await
264            .expect("upload should succeed");
265        assert!(
266            primary
267                .exists("a.txt")
268                .await
269                .expect("exists should succeed")
270        );
271        assert_eq!(
272            failing
273                .uploaded
274                .lock()
275                .expect("lock should succeed")
276                .as_slice(),
277            ["a.txt"]
278        );
279    }
280
281    #[tokio::test]
282    async fn test_primary_upload_failure_does_not_write_to_mirrors() {
283        let primary = Arc::new(FailingService::new("primary"));
284        let mirror = memory("mirror");
285        let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);
286
287        let error = service
288            .upload("a.txt", Bytes::from_static(b"hello"))
289            .await
290            .expect_err("upload should fail");
291
292        assert!(matches!(error, StorageError::InvalidUrl(_)));
293        assert_eq!(
294            primary
295                .uploaded
296                .lock()
297                .expect("lock should succeed")
298                .as_slice(),
299            ["a.txt"]
300        );
301        assert!(!mirror.exists("a.txt").await.expect("exists should succeed"));
302    }
303
304    #[tokio::test]
305    async fn test_delete_removes_from_primary_and_mirrors() {
306        let primary = memory("primary");
307        let mirror = memory("mirror");
308        let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);
309        service
310            .upload("a.txt", Bytes::from_static(b"hello"))
311            .await
312            .expect("upload should succeed");
313        service
314            .delete("a.txt")
315            .await
316            .expect("delete should succeed");
317        assert!(
318            !primary
319                .exists("a.txt")
320                .await
321                .expect("exists should succeed")
322        );
323        assert!(!mirror.exists("a.txt").await.expect("exists should succeed"));
324    }
325
326    #[tokio::test]
327    async fn test_failed_mirror_delete_does_not_fail_delete() {
328        let primary = memory("primary");
329        primary
330            .upload("a.txt", Bytes::from_static(b"hello"))
331            .await
332            .expect("upload should succeed");
333        let failing = Arc::new(FailingService::new("failing"));
334        let service = MirrorService::new("mirror", primary.clone(), vec![failing.clone()]);
335
336        service
337            .delete("a.txt")
338            .await
339            .expect("delete should succeed");
340
341        assert!(
342            !primary
343                .exists("a.txt")
344                .await
345                .expect("exists should succeed")
346        );
347        assert_eq!(
348            failing
349                .deleted
350                .lock()
351                .expect("lock should succeed")
352                .as_slice(),
353            ["a.txt"]
354        );
355    }
356
357    #[tokio::test]
358    async fn test_primary_delete_failure_skips_mirrors() {
359        let primary = Arc::new(FailingService::new("primary"));
360        let mirror = Arc::new(FailingService::new("mirror"));
361        let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);
362
363        let error = service
364            .delete("a.txt")
365            .await
366            .expect_err("delete should fail");
367
368        assert!(matches!(error, StorageError::InvalidUrl(_)));
369        assert_eq!(
370            primary
371                .deleted
372                .lock()
373                .expect("lock should succeed")
374                .as_slice(),
375            ["a.txt"]
376        );
377        assert!(
378            mirror
379                .deleted
380                .lock()
381                .expect("lock should succeed")
382                .is_empty()
383        );
384    }
385
386    #[tokio::test]
387    async fn test_url_is_delegated_to_primary() {
388        let primary = memory("primary");
389        let mirror = memory("mirror");
390        let service = MirrorService::new("mirror", primary.clone(), vec![mirror]);
391        let url = service
392            .url("a.txt", Duration::from_secs(5))
393            .await
394            .expect("url should build");
395        let primary_url = primary
396            .url("a.txt", Duration::from_secs(5))
397            .await
398            .expect("url should build");
399        assert_eq!(url, primary_url);
400    }
401}