rustrails_storage/service/
mirror.rs1use std::{sync::Arc, time::Duration};
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use tracing::warn;
8use url::Url;
9
10use super::{DynStorageService, StorageError, StorageService};
11
12#[derive(Clone)]
14pub struct MirrorService {
15 name: String,
16 primary: DynStorageService,
17 mirrors: Vec<DynStorageService>,
18}
19
20impl std::fmt::Debug for MirrorService {
21 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 formatter
23 .debug_struct("MirrorService")
24 .field("name", &self.name)
25 .field("primary", &self.primary.name())
26 .field("mirrors", &self.mirrors.len())
27 .finish()
28 }
29}
30
31impl MirrorService {
32 #[must_use]
34 pub fn new(
35 name: impl Into<String>,
36 primary: Arc<dyn StorageService>,
37 mirrors: Vec<Arc<dyn StorageService>>,
38 ) -> Self {
39 Self {
40 name: name.into(),
41 primary,
42 mirrors,
43 }
44 }
45
46 #[must_use]
48 pub fn primary(&self) -> &DynStorageService {
49 &self.primary
50 }
51
52 #[must_use]
54 pub fn mirrors(&self) -> &[DynStorageService] {
55 &self.mirrors
56 }
57}
58
59#[async_trait]
60impl StorageService for MirrorService {
61 fn name(&self) -> &str {
62 &self.name
63 }
64
65 async fn upload(&self, key: &str, data: Bytes) -> Result<(), StorageError> {
66 self.primary.upload(key, data.clone()).await?;
67 for mirror in &self.mirrors {
68 if let Err(error) = mirror.upload(key, data.clone()).await {
69 warn!(service = %mirror.name(), %key, error = %error, "mirror upload failed");
70 }
71 }
72 Ok(())
73 }
74
75 async fn download(&self, key: &str) -> Result<Bytes, StorageError> {
76 self.primary.download(key).await
77 }
78
79 async fn delete(&self, key: &str) -> Result<(), StorageError> {
80 self.primary.delete(key).await?;
81 for mirror in &self.mirrors {
82 if let Err(error) = mirror.delete(key).await {
83 warn!(service = %mirror.name(), %key, error = %error, "mirror delete failed");
84 }
85 }
86 Ok(())
87 }
88
89 async fn exists(&self, key: &str) -> Result<bool, StorageError> {
90 self.primary.exists(key).await
91 }
92
93 async fn url(&self, key: &str, expires_in: Duration) -> Result<Url, StorageError> {
94 self.primary.url(key, expires_in).await
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use std::sync::{Arc, Mutex};
101
102 use super::*;
103 use crate::service::memory::MemoryService;
104
105 #[derive(Debug)]
106 struct FailingService {
107 name: String,
108 uploaded: Mutex<Vec<String>>,
109 deleted: Mutex<Vec<String>>,
110 }
111
112 impl FailingService {
113 fn new(name: &str) -> Self {
114 Self {
115 name: name.to_owned(),
116 uploaded: Mutex::new(Vec::new()),
117 deleted: Mutex::new(Vec::new()),
118 }
119 }
120 }
121
122 #[async_trait]
123 impl StorageService for FailingService {
124 fn name(&self) -> &str {
125 &self.name
126 }
127
128 async fn upload(&self, key: &str, _data: Bytes) -> Result<(), StorageError> {
129 self.uploaded
130 .lock()
131 .expect("lock should succeed")
132 .push(key.to_owned());
133 Err(StorageError::InvalidUrl("mirror failed".to_owned()))
134 }
135
136 async fn download(&self, key: &str) -> Result<Bytes, StorageError> {
137 Err(StorageError::NotFound(key.to_owned()))
138 }
139
140 async fn delete(&self, _key: &str) -> Result<(), StorageError> {
141 self.deleted
142 .lock()
143 .expect("lock should succeed")
144 .push(_key.to_owned());
145 Err(StorageError::InvalidUrl("mirror failed".to_owned()))
146 }
147
148 async fn exists(&self, _key: &str) -> Result<bool, StorageError> {
149 Ok(false)
150 }
151
152 async fn url(&self, _key: &str, _expires_in: Duration) -> Result<Url, StorageError> {
153 Url::parse("https://mirror.invalid/")
154 .map_err(|error| StorageError::InvalidUrl(error.to_string()))
155 }
156 }
157
158 fn memory(name: &str) -> Arc<dyn StorageService> {
159 Arc::new(MemoryService::new(name).expect("service should build"))
160 }
161
162 #[tokio::test]
163 async fn test_accessors_expose_primary_and_mirrors() {
164 let primary = memory("primary");
165 let mirror_a = memory("mirror-a");
166 let mirror_b = memory("mirror-b");
167 let service = MirrorService::new(
168 "mirror",
169 primary.clone(),
170 vec![mirror_a.clone(), mirror_b.clone()],
171 );
172
173 assert_eq!(service.name(), "mirror");
174 assert_eq!(service.primary().name(), "primary");
175 assert_eq!(service.mirrors().len(), 2);
176 assert_eq!(service.mirrors()[0].name(), "mirror-a");
177 assert_eq!(service.mirrors()[1].name(), "mirror-b");
178 }
179
180 #[tokio::test]
181 async fn test_upload_writes_to_primary() {
182 let primary = memory("primary");
183 let mirror = memory("mirror");
184 let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);
185 service
186 .upload("a.txt", Bytes::from_static(b"hello"))
187 .await
188 .expect("upload should succeed");
189 assert_eq!(
190 primary
191 .download("a.txt")
192 .await
193 .expect("download should succeed"),
194 Bytes::from_static(b"hello")
195 );
196 }
197
198 #[tokio::test]
199 async fn test_upload_writes_to_mirrors() {
200 let primary = memory("primary");
201 let mirror = memory("mirror");
202 let service = MirrorService::new("mirror", primary, vec![mirror.clone()]);
203 service
204 .upload("a.txt", Bytes::from_static(b"hello"))
205 .await
206 .expect("upload should succeed");
207 assert_eq!(
208 mirror
209 .download("a.txt")
210 .await
211 .expect("download should succeed"),
212 Bytes::from_static(b"hello")
213 );
214 }
215
216 #[tokio::test]
217 async fn test_download_reads_from_primary_only() {
218 let primary = memory("primary");
219 let mirror = memory("mirror");
220 mirror
221 .upload("a.txt", Bytes::from_static(b"mirror"))
222 .await
223 .expect("upload should succeed");
224 primary
225 .upload("a.txt", Bytes::from_static(b"primary"))
226 .await
227 .expect("upload should succeed");
228 let service = MirrorService::new("mirror", primary, vec![mirror]);
229 assert_eq!(
230 service
231 .download("a.txt")
232 .await
233 .expect("download should succeed"),
234 Bytes::from_static(b"primary")
235 );
236 }
237
238 #[tokio::test]
239 async fn test_exists_reads_from_primary_only() {
240 let primary = memory("primary");
241 let mirror = memory("mirror");
242 mirror
243 .upload("a.txt", Bytes::from_static(b"mirror"))
244 .await
245 .expect("upload should succeed");
246 let service = MirrorService::new("mirror", primary, vec![mirror]);
247
248 assert!(
249 !service
250 .exists("a.txt")
251 .await
252 .expect("exists should succeed")
253 );
254 }
255
256 #[tokio::test]
257 async fn test_failed_mirror_does_not_fail_upload() {
258 let primary = memory("primary");
259 let failing = Arc::new(FailingService::new("failing"));
260 let service = MirrorService::new("mirror", primary.clone(), vec![failing.clone()]);
261 service
262 .upload("a.txt", Bytes::from_static(b"hello"))
263 .await
264 .expect("upload should succeed");
265 assert!(
266 primary
267 .exists("a.txt")
268 .await
269 .expect("exists should succeed")
270 );
271 assert_eq!(
272 failing
273 .uploaded
274 .lock()
275 .expect("lock should succeed")
276 .as_slice(),
277 ["a.txt"]
278 );
279 }
280
281 #[tokio::test]
282 async fn test_primary_upload_failure_does_not_write_to_mirrors() {
283 let primary = Arc::new(FailingService::new("primary"));
284 let mirror = memory("mirror");
285 let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);
286
287 let error = service
288 .upload("a.txt", Bytes::from_static(b"hello"))
289 .await
290 .expect_err("upload should fail");
291
292 assert!(matches!(error, StorageError::InvalidUrl(_)));
293 assert_eq!(
294 primary
295 .uploaded
296 .lock()
297 .expect("lock should succeed")
298 .as_slice(),
299 ["a.txt"]
300 );
301 assert!(!mirror.exists("a.txt").await.expect("exists should succeed"));
302 }
303
304 #[tokio::test]
305 async fn test_delete_removes_from_primary_and_mirrors() {
306 let primary = memory("primary");
307 let mirror = memory("mirror");
308 let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);
309 service
310 .upload("a.txt", Bytes::from_static(b"hello"))
311 .await
312 .expect("upload should succeed");
313 service
314 .delete("a.txt")
315 .await
316 .expect("delete should succeed");
317 assert!(
318 !primary
319 .exists("a.txt")
320 .await
321 .expect("exists should succeed")
322 );
323 assert!(!mirror.exists("a.txt").await.expect("exists should succeed"));
324 }
325
326 #[tokio::test]
327 async fn test_failed_mirror_delete_does_not_fail_delete() {
328 let primary = memory("primary");
329 primary
330 .upload("a.txt", Bytes::from_static(b"hello"))
331 .await
332 .expect("upload should succeed");
333 let failing = Arc::new(FailingService::new("failing"));
334 let service = MirrorService::new("mirror", primary.clone(), vec![failing.clone()]);
335
336 service
337 .delete("a.txt")
338 .await
339 .expect("delete should succeed");
340
341 assert!(
342 !primary
343 .exists("a.txt")
344 .await
345 .expect("exists should succeed")
346 );
347 assert_eq!(
348 failing
349 .deleted
350 .lock()
351 .expect("lock should succeed")
352 .as_slice(),
353 ["a.txt"]
354 );
355 }
356
357 #[tokio::test]
358 async fn test_primary_delete_failure_skips_mirrors() {
359 let primary = Arc::new(FailingService::new("primary"));
360 let mirror = Arc::new(FailingService::new("mirror"));
361 let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);
362
363 let error = service
364 .delete("a.txt")
365 .await
366 .expect_err("delete should fail");
367
368 assert!(matches!(error, StorageError::InvalidUrl(_)));
369 assert_eq!(
370 primary
371 .deleted
372 .lock()
373 .expect("lock should succeed")
374 .as_slice(),
375 ["a.txt"]
376 );
377 assert!(
378 mirror
379 .deleted
380 .lock()
381 .expect("lock should succeed")
382 .is_empty()
383 );
384 }
385
386 #[tokio::test]
387 async fn test_url_is_delegated_to_primary() {
388 let primary = memory("primary");
389 let mirror = memory("mirror");
390 let service = MirrorService::new("mirror", primary.clone(), vec![mirror]);
391 let url = service
392 .url("a.txt", Duration::from_secs(5))
393 .await
394 .expect("url should build");
395 let primary_url = primary
396 .url("a.txt", Duration::from_secs(5))
397 .await
398 .expect("url should build");
399 assert_eq!(url, primary_url);
400 }
401}