rustrails-storage 0.1.2

File storage (ActiveStorage equivalent)
Documentation
//! Object-store backed S3-style service.

use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use bytes::Bytes;
use object_store::{ObjectStore, ObjectStoreExt, PutPayload, path::Path};
use url::Url;

use super::{StorageError, StorageService, checked_key};

/// Minimal S3-compatible storage service backed by [`object_store`].
#[derive(Clone)]
pub struct S3Service {
    name: String,
    bucket: String,
    store: Arc<dyn ObjectStore>,
    base_url: Url,
}

impl std::fmt::Debug for S3Service {
    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        formatter
            .debug_struct("S3Service")
            .field("name", &self.name)
            .field("bucket", &self.bucket)
            .finish_non_exhaustive()
    }
}

impl S3Service {
    /// Creates a new service using the provided object store.
    ///
    /// # Errors
    ///
    /// Returns an error when the synthetic base URL cannot be constructed.
    pub fn new(
        name: impl Into<String>,
        bucket: impl Into<String>,
        store: Arc<dyn ObjectStore>,
    ) -> Result<Self, StorageError> {
        let bucket = bucket.into();
        let base_url = Url::parse(&format!("https://s3.local/{bucket}/"))
            .map_err(|error| StorageError::InvalidUrl(error.to_string()))?;
        Ok(Self {
            name: name.into(),
            bucket,
            store,
            base_url,
        })
    }

    fn path_for(&self, key: &str) -> Result<Path, StorageError> {
        let key = checked_key(key)?;
        Ok(Path::from(key))
    }
}

#[async_trait]
impl StorageService for S3Service {
    fn name(&self) -> &str {
        &self.name
    }

    async fn upload(&self, key: &str, data: Bytes) -> Result<(), StorageError> {
        let path = self.path_for(key)?;
        if self.exists(key).await? {
            return Err(StorageError::DuplicateKey(key.to_owned()));
        }
        self.store
            .put(&path, PutPayload::from(data))
            .await
            .map(|_| ())
            .map_err(|error| StorageError::ObjectStore {
                path: key.to_owned(),
                message: error.to_string(),
            })
    }

    async fn download(&self, key: &str) -> Result<Bytes, StorageError> {
        let path = self.path_for(key)?;
        match self.store.get(&path).await {
            Ok(result) => result
                .bytes()
                .await
                .map_err(|error| StorageError::ObjectStore {
                    path: key.to_owned(),
                    message: error.to_string(),
                }),
            Err(object_store::Error::NotFound { .. }) => {
                Err(StorageError::NotFound(key.to_owned()))
            }
            Err(error) => Err(StorageError::ObjectStore {
                path: key.to_owned(),
                message: error.to_string(),
            }),
        }
    }

    async fn delete(&self, key: &str) -> Result<(), StorageError> {
        let path = self.path_for(key)?;
        match self.store.delete(&path).await {
            Ok(()) => Ok(()),
            Err(object_store::Error::NotFound { .. }) => Ok(()),
            Err(error) => Err(StorageError::ObjectStore {
                path: key.to_owned(),
                message: error.to_string(),
            }),
        }
    }

    async fn exists(&self, key: &str) -> Result<bool, StorageError> {
        let path = self.path_for(key)?;
        match self.store.head(&path).await {
            Ok(_) => Ok(true),
            Err(object_store::Error::NotFound { .. }) => Ok(false),
            Err(error) => Err(StorageError::ObjectStore {
                path: key.to_owned(),
                message: error.to_string(),
            }),
        }
    }

    async fn url(&self, key: &str, expires_in: Duration) -> Result<Url, StorageError> {
        let key = checked_key(key)?;
        let mut url = self.base_url.clone();
        url.set_path(&format!("{}/{}", self.bucket, key));
        url.query_pairs_mut()
            .append_pair("service", &self.name)
            .append_pair("expires_in", &expires_in.as_secs().to_string());
        Ok(url)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_s3_service_round_trip_with_in_memory_store() {
        let service = S3Service::new(
            "s3",
            "bucket",
            Arc::new(object_store::memory::InMemory::new()),
        )
        .expect("service should build");
        service
            .upload("a.txt", Bytes::from_static(b"hello"))
            .await
            .expect("upload should succeed");
        assert_eq!(
            service
                .download("a.txt")
                .await
                .expect("download should succeed"),
            Bytes::from_static(b"hello")
        );
    }

    #[tokio::test]
    async fn test_s3_service_reports_missing_keys() {
        let service = S3Service::new(
            "s3",
            "bucket",
            Arc::new(object_store::memory::InMemory::new()),
        )
        .expect("service should build");
        let error = service
            .download("missing")
            .await
            .expect_err("download should fail");
        assert!(matches!(error, StorageError::NotFound(key) if key == "missing"));
    }
}