rustrails-storage 0.1.2

File storage (ActiveStorage equivalent)
Documentation
//! Object-store backed GCS-style service.

use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use bytes::Bytes;
use object_store::{ObjectStore, ObjectStoreExt, PutPayload, path::Path};
use url::Url;

use super::{StorageError, StorageService, checked_key};

/// Minimal Google Cloud Storage-compatible service backed by [`object_store`].
#[derive(Clone)]
pub struct GcsService {
    name: String,
    bucket: String,
    store: Arc<dyn ObjectStore>,
    base_url: Url,
}

impl std::fmt::Debug for GcsService {
    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        formatter
            .debug_struct("GcsService")
            .field("name", &self.name)
            .field("bucket", &self.bucket)
            .finish_non_exhaustive()
    }
}

impl GcsService {
    /// Creates a new service using the provided object store.
    ///
    /// # Errors
    ///
    /// Returns an error when the synthetic base URL cannot be constructed.
    pub fn new(
        name: impl Into<String>,
        bucket: impl Into<String>,
        store: Arc<dyn ObjectStore>,
    ) -> Result<Self, StorageError> {
        let bucket = bucket.into();
        let base_url = Url::parse(&format!("https://gcs.local/{bucket}/"))
            .map_err(|error| StorageError::InvalidUrl(error.to_string()))?;
        Ok(Self {
            name: name.into(),
            bucket,
            store,
            base_url,
        })
    }

    fn path_for(&self, key: &str) -> Result<Path, StorageError> {
        let key = checked_key(key)?;
        Ok(Path::from(key))
    }
}

#[async_trait]
impl StorageService for GcsService {
    fn name(&self) -> &str {
        &self.name
    }

    async fn upload(&self, key: &str, data: Bytes) -> Result<(), StorageError> {
        let path = self.path_for(key)?;
        if self.exists(key).await? {
            return Err(StorageError::DuplicateKey(key.to_owned()));
        }
        self.store
            .put(&path, PutPayload::from(data))
            .await
            .map(|_| ())
            .map_err(|error| StorageError::ObjectStore {
                path: key.to_owned(),
                message: error.to_string(),
            })
    }

    async fn download(&self, key: &str) -> Result<Bytes, StorageError> {
        let path = self.path_for(key)?;
        match self.store.get(&path).await {
            Ok(result) => result
                .bytes()
                .await
                .map_err(|error| StorageError::ObjectStore {
                    path: key.to_owned(),
                    message: error.to_string(),
                }),
            Err(object_store::Error::NotFound { .. }) => {
                Err(StorageError::NotFound(key.to_owned()))
            }
            Err(error) => Err(StorageError::ObjectStore {
                path: key.to_owned(),
                message: error.to_string(),
            }),
        }
    }

    async fn delete(&self, key: &str) -> Result<(), StorageError> {
        let path = self.path_for(key)?;
        match self.store.delete(&path).await {
            Ok(()) => Ok(()),
            Err(object_store::Error::NotFound { .. }) => Ok(()),
            Err(error) => Err(StorageError::ObjectStore {
                path: key.to_owned(),
                message: error.to_string(),
            }),
        }
    }

    async fn exists(&self, key: &str) -> Result<bool, StorageError> {
        let path = self.path_for(key)?;
        match self.store.head(&path).await {
            Ok(_) => Ok(true),
            Err(object_store::Error::NotFound { .. }) => Ok(false),
            Err(error) => Err(StorageError::ObjectStore {
                path: key.to_owned(),
                message: error.to_string(),
            }),
        }
    }

    async fn url(&self, key: &str, expires_in: Duration) -> Result<Url, StorageError> {
        let key = checked_key(key)?;
        let mut url = self.base_url.clone();
        url.set_path(&format!("{}/{}", self.bucket, key));
        url.query_pairs_mut()
            .append_pair("service", &self.name)
            .append_pair("expires_in", &expires_in.as_secs().to_string());
        Ok(url)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_gcs_service_round_trip_with_in_memory_store() {
        let service = GcsService::new(
            "gcs",
            "bucket",
            Arc::new(object_store::memory::InMemory::new()),
        )
        .expect("service should build");
        service
            .upload("a.txt", Bytes::from_static(b"hello"))
            .await
            .expect("upload should succeed");
        assert_eq!(
            service
                .download("a.txt")
                .await
                .expect("download should succeed"),
            Bytes::from_static(b"hello")
        );
    }

    #[tokio::test]
    async fn test_gcs_service_url_includes_bucket() {
        let service = GcsService::new(
            "gcs",
            "bucket",
            Arc::new(object_store::memory::InMemory::new()),
        )
        .expect("service should build");
        let url = service
            .url("a.txt", Duration::from_secs(60))
            .await
            .expect("url should build");
        assert_eq!(
            url.as_str(),
            "https://gcs.local/bucket/a.txt?service=gcs&expires_in=60"
        );
    }
}