rustrails-storage 0.1.2

File storage (ActiveStorage equivalent)
Documentation
//! Mirror service that writes to one primary and best-effort mirrors.

use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use bytes::Bytes;
use tracing::warn;
use url::Url;

use super::{DynStorageService, StorageError, StorageService};

/// Storage service that reads from the primary and replicates writes to mirrors.
#[derive(Clone)]
pub struct MirrorService {
    name: String,
    primary: DynStorageService,
    mirrors: Vec<DynStorageService>,
}

impl std::fmt::Debug for MirrorService {
    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        formatter
            .debug_struct("MirrorService")
            .field("name", &self.name)
            .field("primary", &self.primary.name())
            .field("mirrors", &self.mirrors.len())
            .finish()
    }
}

impl MirrorService {
    /// Creates a new mirror service.
    #[must_use]
    pub fn new(
        name: impl Into<String>,
        primary: Arc<dyn StorageService>,
        mirrors: Vec<Arc<dyn StorageService>>,
    ) -> Self {
        Self {
            name: name.into(),
            primary,
            mirrors,
        }
    }

    /// Returns the primary service.
    #[must_use]
    pub fn primary(&self) -> &DynStorageService {
        &self.primary
    }

    /// Returns the configured mirror services.
    #[must_use]
    pub fn mirrors(&self) -> &[DynStorageService] {
        &self.mirrors
    }
}

#[async_trait]
impl StorageService for MirrorService {
    fn name(&self) -> &str {
        &self.name
    }

    async fn upload(&self, key: &str, data: Bytes) -> Result<(), StorageError> {
        self.primary.upload(key, data.clone()).await?;
        for mirror in &self.mirrors {
            if let Err(error) = mirror.upload(key, data.clone()).await {
                warn!(service = %mirror.name(), %key, error = %error, "mirror upload failed");
            }
        }
        Ok(())
    }

    async fn download(&self, key: &str) -> Result<Bytes, StorageError> {
        self.primary.download(key).await
    }

    async fn delete(&self, key: &str) -> Result<(), StorageError> {
        self.primary.delete(key).await?;
        for mirror in &self.mirrors {
            if let Err(error) = mirror.delete(key).await {
                warn!(service = %mirror.name(), %key, error = %error, "mirror delete failed");
            }
        }
        Ok(())
    }

    async fn exists(&self, key: &str) -> Result<bool, StorageError> {
        self.primary.exists(key).await
    }

    async fn url(&self, key: &str, expires_in: Duration) -> Result<Url, StorageError> {
        self.primary.url(key, expires_in).await
    }
}

#[cfg(test)]
mod tests {
    use std::sync::{Arc, Mutex};

    use super::*;
    use crate::service::memory::MemoryService;

    #[derive(Debug)]
    struct FailingService {
        name: String,
        uploaded: Mutex<Vec<String>>,
        deleted: Mutex<Vec<String>>,
    }

    impl FailingService {
        fn new(name: &str) -> Self {
            Self {
                name: name.to_owned(),
                uploaded: Mutex::new(Vec::new()),
                deleted: Mutex::new(Vec::new()),
            }
        }
    }

    #[async_trait]
    impl StorageService for FailingService {
        fn name(&self) -> &str {
            &self.name
        }

        async fn upload(&self, key: &str, _data: Bytes) -> Result<(), StorageError> {
            self.uploaded
                .lock()
                .expect("lock should succeed")
                .push(key.to_owned());
            Err(StorageError::InvalidUrl("mirror failed".to_owned()))
        }

        async fn download(&self, key: &str) -> Result<Bytes, StorageError> {
            Err(StorageError::NotFound(key.to_owned()))
        }

        async fn delete(&self, _key: &str) -> Result<(), StorageError> {
            self.deleted
                .lock()
                .expect("lock should succeed")
                .push(_key.to_owned());
            Err(StorageError::InvalidUrl("mirror failed".to_owned()))
        }

        async fn exists(&self, _key: &str) -> Result<bool, StorageError> {
            Ok(false)
        }

        async fn url(&self, _key: &str, _expires_in: Duration) -> Result<Url, StorageError> {
            Url::parse("https://mirror.invalid/")
                .map_err(|error| StorageError::InvalidUrl(error.to_string()))
        }
    }

    fn memory(name: &str) -> Arc<dyn StorageService> {
        Arc::new(MemoryService::new(name).expect("service should build"))
    }

    #[tokio::test]
    async fn test_accessors_expose_primary_and_mirrors() {
        let primary = memory("primary");
        let mirror_a = memory("mirror-a");
        let mirror_b = memory("mirror-b");
        let service = MirrorService::new(
            "mirror",
            primary.clone(),
            vec![mirror_a.clone(), mirror_b.clone()],
        );

        assert_eq!(service.name(), "mirror");
        assert_eq!(service.primary().name(), "primary");
        assert_eq!(service.mirrors().len(), 2);
        assert_eq!(service.mirrors()[0].name(), "mirror-a");
        assert_eq!(service.mirrors()[1].name(), "mirror-b");
    }

    #[tokio::test]
    async fn test_upload_writes_to_primary() {
        let primary = memory("primary");
        let mirror = memory("mirror");
        let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);
        service
            .upload("a.txt", Bytes::from_static(b"hello"))
            .await
            .expect("upload should succeed");
        assert_eq!(
            primary
                .download("a.txt")
                .await
                .expect("download should succeed"),
            Bytes::from_static(b"hello")
        );
    }

    #[tokio::test]
    async fn test_upload_writes_to_mirrors() {
        let primary = memory("primary");
        let mirror = memory("mirror");
        let service = MirrorService::new("mirror", primary, vec![mirror.clone()]);
        service
            .upload("a.txt", Bytes::from_static(b"hello"))
            .await
            .expect("upload should succeed");
        assert_eq!(
            mirror
                .download("a.txt")
                .await
                .expect("download should succeed"),
            Bytes::from_static(b"hello")
        );
    }

    #[tokio::test]
    async fn test_download_reads_from_primary_only() {
        let primary = memory("primary");
        let mirror = memory("mirror");
        mirror
            .upload("a.txt", Bytes::from_static(b"mirror"))
            .await
            .expect("upload should succeed");
        primary
            .upload("a.txt", Bytes::from_static(b"primary"))
            .await
            .expect("upload should succeed");
        let service = MirrorService::new("mirror", primary, vec![mirror]);
        assert_eq!(
            service
                .download("a.txt")
                .await
                .expect("download should succeed"),
            Bytes::from_static(b"primary")
        );
    }

    #[tokio::test]
    async fn test_exists_reads_from_primary_only() {
        let primary = memory("primary");
        let mirror = memory("mirror");
        mirror
            .upload("a.txt", Bytes::from_static(b"mirror"))
            .await
            .expect("upload should succeed");
        let service = MirrorService::new("mirror", primary, vec![mirror]);

        assert!(
            !service
                .exists("a.txt")
                .await
                .expect("exists should succeed")
        );
    }

    #[tokio::test]
    async fn test_failed_mirror_does_not_fail_upload() {
        let primary = memory("primary");
        let failing = Arc::new(FailingService::new("failing"));
        let service = MirrorService::new("mirror", primary.clone(), vec![failing.clone()]);
        service
            .upload("a.txt", Bytes::from_static(b"hello"))
            .await
            .expect("upload should succeed");
        assert!(
            primary
                .exists("a.txt")
                .await
                .expect("exists should succeed")
        );
        assert_eq!(
            failing
                .uploaded
                .lock()
                .expect("lock should succeed")
                .as_slice(),
            ["a.txt"]
        );
    }

    #[tokio::test]
    async fn test_primary_upload_failure_does_not_write_to_mirrors() {
        let primary = Arc::new(FailingService::new("primary"));
        let mirror = memory("mirror");
        let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);

        let error = service
            .upload("a.txt", Bytes::from_static(b"hello"))
            .await
            .expect_err("upload should fail");

        assert!(matches!(error, StorageError::InvalidUrl(_)));
        assert_eq!(
            primary
                .uploaded
                .lock()
                .expect("lock should succeed")
                .as_slice(),
            ["a.txt"]
        );
        assert!(!mirror.exists("a.txt").await.expect("exists should succeed"));
    }

    #[tokio::test]
    async fn test_delete_removes_from_primary_and_mirrors() {
        let primary = memory("primary");
        let mirror = memory("mirror");
        let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);
        service
            .upload("a.txt", Bytes::from_static(b"hello"))
            .await
            .expect("upload should succeed");
        service
            .delete("a.txt")
            .await
            .expect("delete should succeed");
        assert!(
            !primary
                .exists("a.txt")
                .await
                .expect("exists should succeed")
        );
        assert!(!mirror.exists("a.txt").await.expect("exists should succeed"));
    }

    #[tokio::test]
    async fn test_failed_mirror_delete_does_not_fail_delete() {
        let primary = memory("primary");
        primary
            .upload("a.txt", Bytes::from_static(b"hello"))
            .await
            .expect("upload should succeed");
        let failing = Arc::new(FailingService::new("failing"));
        let service = MirrorService::new("mirror", primary.clone(), vec![failing.clone()]);

        service
            .delete("a.txt")
            .await
            .expect("delete should succeed");

        assert!(
            !primary
                .exists("a.txt")
                .await
                .expect("exists should succeed")
        );
        assert_eq!(
            failing
                .deleted
                .lock()
                .expect("lock should succeed")
                .as_slice(),
            ["a.txt"]
        );
    }

    #[tokio::test]
    async fn test_primary_delete_failure_skips_mirrors() {
        let primary = Arc::new(FailingService::new("primary"));
        let mirror = Arc::new(FailingService::new("mirror"));
        let service = MirrorService::new("mirror", primary.clone(), vec![mirror.clone()]);

        let error = service
            .delete("a.txt")
            .await
            .expect_err("delete should fail");

        assert!(matches!(error, StorageError::InvalidUrl(_)));
        assert_eq!(
            primary
                .deleted
                .lock()
                .expect("lock should succeed")
                .as_slice(),
            ["a.txt"]
        );
        assert!(
            mirror
                .deleted
                .lock()
                .expect("lock should succeed")
                .is_empty()
        );
    }

    #[tokio::test]
    async fn test_url_is_delegated_to_primary() {
        let primary = memory("primary");
        let mirror = memory("mirror");
        let service = MirrorService::new("mirror", primary.clone(), vec![mirror]);
        let url = service
            .url("a.txt", Duration::from_secs(5))
            .await
            .expect("url should build");
        let primary_url = primary
            .url("a.txt", Duration::from_secs(5))
            .await
            .expect("url should build");
        assert_eq!(url, primary_url);
    }
}