use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use bytes::Bytes;
use tracing::warn;
use url::Url;
use super::{DynStorageService, StorageError, StorageService};
#[derive(Clone)]
pub struct MirrorService {
name: String,
primary: DynStorageService,
mirrors: Vec<DynStorageService>,
}
impl std::fmt::Debug for MirrorService {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("MirrorService")
.field("name", &self.name)
.field("primary", &self.primary.name())
.field("mirrors", &self.mirrors.len())
.finish()
}
}
impl MirrorService {
#[must_use]
pub fn new(
name: impl Into<String>,
primary: Arc<dyn StorageService>,
mirrors: Vec<Arc<dyn StorageService>>,
) -> Self {
Self {
name: name.into(),
primary,
mirrors,
}
}
#[must_use]
pub fn primary(&self) -> &DynStorageService {
&self.primary
}
#[must_use]
pub fn mirrors(&self) -> &[DynStorageService] {
&self.mirrors
}
}
#[async_trait]
impl StorageService for MirrorService {
fn name(&self) -> &str {
&self.name
}
async fn upload(&self, key: &str, data: Bytes) -> Result<(), StorageError> {
self.primary.upload(key, data.clone()).await?;
for mirror in &self.mirrors {
if let Err(error) = mirror.upload(key, data.clone()).await {
warn!(service = %mirror.name(), %key, error = %error, "mirror upload failed");
}
}
Ok(())
}
async fn download(&self, key: &str) -> Result<Bytes, StorageError> {
self.primary.download(key).await
}
async fn delete(&self, key: &str) -> Result<(), StorageError> {
self.primary.delete(key).await?;
for mirror in &self.mirrors {
if let Err(error) = mirror.delete(key).await {
warn!(service = %mirror.name(), %key, error = %error, "mirror delete failed");
}
}
Ok(())
}
async fn exists(&self, key: &str) -> Result<bool, StorageError> {
self.primary.exists(key).await
}
async fn url(&self, key: &str, expires_in: Duration) -> Result<Url, StorageError> {
self.primary.url(key, expires_in).await
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use super::*;
use crate::service::memory::MemoryService;
#[derive(Debug)]
struct FailingService {
name: String,
uploaded: Mutex<Vec<String>>,
deleted: Mutex<Vec<String>>,
}
impl FailingService {
fn new(name: &str) -> Self {
Self {
name: name.to_owned(),
uploaded: Mutex::new(Vec::new()),
deleted: Mutex::new(Vec::new()),
}
}
}
#[async_trait]
impl StorageService for FailingService {
fn name(&self) -> &str {
&self.name
}
async fn upload(&self, key: &str, _data: Bytes) -> Result<(), StorageError> {
self.uploaded
.lock()
.expect("lock should succeed")
.push(key.to_owned());
Err(StorageError::InvalidUrl("mirror failed".to_owned()))
}
async fn download(&self, key: &str) -> Result<Bytes, StorageError> {
Err(StorageError::NotFound(key.to_owned()))
}
async fn delete(&self, _key: &str) -> Result<(), StorageError> {
self.deleted
.lock()
.expect("lock should succeed")
.push(_key.to_owned());
Err(StorageError::InvalidUrl("mirror failed".to_owned()))
}
async fn exists(&self, _key: &str) -> Result<bool, StorageError> {
Ok(false)
}
async fn url(&self, _key: &str, _expires_in: Duration) -> Result<Url, StorageError> {
Url::parse("https://mirror.invalid/")
.map_err(|error| StorageError::InvalidUrl(error.to_string()))
}
}
fn memory(name: &str) -> Arc<dyn StorageService> {
Arc::new(MemoryService::new(name).expect("service should build"))
}
#[tokio::test]
async fn test_accessors_expose_primary_and_mirrors() {
let primary = memory("primary");
let mirror_a = memory("mirror-a");
let mirror_b = memory("mirror-b");
let service = MirrorService::new(
"mirror",
primary.clone(),
vec![mirror_a.clone(), mirror_b.clone()],
);
assert_eq!(service.name(), "mirror");
assert_eq!(service.primary().name(), "primary");
assert_eq!(service.mirrors().len(), 2);
assert_eq!(service.mirrors()[0].name(), "mirror-a");
assert_eq!(service.mirrors()[1].name(), "mirror-b");
}
#[tokio::test]
async fn test_upload_writes_to_primary() {
let primary = memory("primary");
let mirror = memory("mirror");
let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);
service
.upload("a.txt", Bytes::from_static(b"hello"))
.await
.expect("upload should succeed");
assert_eq!(
primary
.download("a.txt")
.await
.expect("download should succeed"),
Bytes::from_static(b"hello")
);
}
#[tokio::test]
async fn test_upload_writes_to_mirrors() {
let primary = memory("primary");
let mirror = memory("mirror");
let service = MirrorService::new("mirror", primary, vec![mirror.clone()]);
service
.upload("a.txt", Bytes::from_static(b"hello"))
.await
.expect("upload should succeed");
assert_eq!(
mirror
.download("a.txt")
.await
.expect("download should succeed"),
Bytes::from_static(b"hello")
);
}
#[tokio::test]
async fn test_download_reads_from_primary_only() {
let primary = memory("primary");
let mirror = memory("mirror");
mirror
.upload("a.txt", Bytes::from_static(b"mirror"))
.await
.expect("upload should succeed");
primary
.upload("a.txt", Bytes::from_static(b"primary"))
.await
.expect("upload should succeed");
let service = MirrorService::new("mirror", primary, vec![mirror]);
assert_eq!(
service
.download("a.txt")
.await
.expect("download should succeed"),
Bytes::from_static(b"primary")
);
}
#[tokio::test]
async fn test_exists_reads_from_primary_only() {
let primary = memory("primary");
let mirror = memory("mirror");
mirror
.upload("a.txt", Bytes::from_static(b"mirror"))
.await
.expect("upload should succeed");
let service = MirrorService::new("mirror", primary, vec![mirror]);
assert!(
!service
.exists("a.txt")
.await
.expect("exists should succeed")
);
}
#[tokio::test]
async fn test_failed_mirror_does_not_fail_upload() {
let primary = memory("primary");
let failing = Arc::new(FailingService::new("failing"));
let service = MirrorService::new("mirror", primary.clone(), vec![failing.clone()]);
service
.upload("a.txt", Bytes::from_static(b"hello"))
.await
.expect("upload should succeed");
assert!(
primary
.exists("a.txt")
.await
.expect("exists should succeed")
);
assert_eq!(
failing
.uploaded
.lock()
.expect("lock should succeed")
.as_slice(),
["a.txt"]
);
}
#[tokio::test]
async fn test_primary_upload_failure_does_not_write_to_mirrors() {
let primary = Arc::new(FailingService::new("primary"));
let mirror = memory("mirror");
let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);
let error = service
.upload("a.txt", Bytes::from_static(b"hello"))
.await
.expect_err("upload should fail");
assert!(matches!(error, StorageError::InvalidUrl(_)));
assert_eq!(
primary
.uploaded
.lock()
.expect("lock should succeed")
.as_slice(),
["a.txt"]
);
assert!(!mirror.exists("a.txt").await.expect("exists should succeed"));
}
#[tokio::test]
async fn test_delete_removes_from_primary_and_mirrors() {
let primary = memory("primary");
let mirror = memory("mirror");
let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);
service
.upload("a.txt", Bytes::from_static(b"hello"))
.await
.expect("upload should succeed");
service
.delete("a.txt")
.await
.expect("delete should succeed");
assert!(
!primary
.exists("a.txt")
.await
.expect("exists should succeed")
);
assert!(!mirror.exists("a.txt").await.expect("exists should succeed"));
}
#[tokio::test]
async fn test_failed_mirror_delete_does_not_fail_delete() {
let primary = memory("primary");
primary
.upload("a.txt", Bytes::from_static(b"hello"))
.await
.expect("upload should succeed");
let failing = Arc::new(FailingService::new("failing"));
let service = MirrorService::new("mirror", primary.clone(), vec![failing.clone()]);
service
.delete("a.txt")
.await
.expect("delete should succeed");
assert!(
!primary
.exists("a.txt")
.await
.expect("exists should succeed")
);
assert_eq!(
failing
.deleted
.lock()
.expect("lock should succeed")
.as_slice(),
["a.txt"]
);
}
#[tokio::test]
async fn test_primary_delete_failure_skips_mirrors() {
let primary = Arc::new(FailingService::new("primary"));
let mirror = Arc::new(FailingService::new("mirror"));
let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);
let error = service
.delete("a.txt")
.await
.expect_err("delete should fail");
assert!(matches!(error, StorageError::InvalidUrl(_)));
assert_eq!(
primary
.deleted
.lock()
.expect("lock should succeed")
.as_slice(),
["a.txt"]
);
assert!(
mirror
.deleted
.lock()
.expect("lock should succeed")
.is_empty()
);
}
#[tokio::test]
async fn test_url_is_delegated_to_primary() {
let primary = memory("primary");
let mirror = memory("mirror");
let service = MirrorService::new("mirror", primary.clone(), vec![mirror]);
let url = service
.url("a.txt", Duration::from_secs(5))
.await
.expect("url should build");
let primary_url = primary
.url("a.txt", Duration::from_secs(5))
.await
.expect("url should build");
assert_eq!(url, primary_url);
}
}