garage-sdk 0.1.1

Async Rust SDK for Garage S3-compatible storage with uploads and public URL generation
Documentation
use crate::config::UploaderConfig;
use crate::download::{DownloadBody, Downloader, ReqwestDownloader, STREAM_LIMIT_ERROR};
use crate::error::{Error, Result};
use crate::keygen::{KeyGenerator, UuidKeyGenerator};
use crate::storage::{S3Storage, StorageClient, StorageInput};
use crate::types::UploadResult;
use aws_config::BehaviorVersion;
use aws_credential_types::Credentials;
use aws_sdk_s3::{Client, primitives::ByteStream};
use aws_types::region::Region;
use futures_util::TryStreamExt;
use http_body::Frame;
use http_body_util::StreamBody;
use std::error::Error as StdError;
use std::path::Path;
use tracing::{debug, instrument, warn};
use url::Url;

/// The main uploader client for Garage S3-compatible storage.
///
/// This is the primary interface for uploading files. It is cheap to clone
/// and can be shared across tasks.
#[derive(Clone)]
pub struct GarageUploader<D = ReqwestDownloader, S = S3Storage, K = UuidKeyGenerator> {
    config: UploaderConfig,
    downloader: D,
    storage: S,
    key_generator: K,
}

impl GarageUploader {
    /// Create a new uploader with the given configuration.
    ///
    /// # Errors
    ///
    /// Returns an error if the configuration is invalid or the S3 client
    /// cannot be initialized.
    pub fn new(config: UploaderConfig) -> Result<Self> {
        config.validate()?;

        let credentials = Credentials::new(
            &config.access_key_id,
            &config.secret_access_key,
            None,
            None,
            "garage-sdk",
        );

        let s3_config = aws_sdk_s3::config::Builder::new()
            .endpoint_url(&config.endpoint)
            .force_path_style(true)
            .region(Region::new(config.region.clone()))
            .credentials_provider(credentials)
            .behavior_version(BehaviorVersion::latest())
            .build();

        let http = reqwest::Client::builder()
            .timeout(config.download_timeout)
            .user_agent("garage-sdk/0.1.0")
            .build()
            .map_err(|e| Error::Config {
                message: format!("failed to create HTTP client: {}", e),
            })?;

        let storage = S3Storage::new(Client::from_conf(s3_config), config.bucket.clone());
        let downloader = ReqwestDownloader::new(http);

        Ok(Self {
            config,
            downloader,
            storage,
            key_generator: UuidKeyGenerator,
        })
    }
}

impl<D, S, K> GarageUploader<D, S, K>
where
    D: Downloader,
    S: StorageClient,
    K: KeyGenerator,
    S::Error: StdError + 'static,
{
    /// Create a new uploader with custom components.
    pub fn with_components(
        config: UploaderConfig,
        downloader: D,
        storage: S,
        key_generator: K,
    ) -> Result<Self> {
        config.validate()?;
        Ok(Self {
            config,
            downloader,
            storage,
            key_generator,
        })
    }

    /// Upload a file from a local path.
    ///
    /// # Arguments
    ///
    /// * `path` - Path to the local file to upload.
    ///
    /// # Errors
    ///
    /// Returns an error if:
    /// - The file cannot be read
    /// - The file exceeds the maximum size limit
    /// - The S3 upload fails
    #[instrument(skip(self), fields(path = %path.as_ref().display()))]
    pub async fn upload_from_path(&self, path: impl AsRef<Path>) -> Result<UploadResult> {
        let path = path.as_ref();

        // Validate path exists and is a file
        let metadata = tokio::fs::metadata(path)
            .await
            .map_err(|e| Error::FileRead {
                path: path.display().to_string(),
                source: e,
            })?;

        if !metadata.is_file() {
            return Err(Error::InvalidPath {
                reason: "path is not a file".into(),
            });
        }

        let size = metadata.len();
        if size > self.config.max_file_size {
            return Err(Error::Config {
                message: format!(
                    "file size {} exceeds maximum {}",
                    size, self.config.max_file_size
                ),
            });
        }

        let file_name =
            path.file_name()
                .and_then(|n| n.to_str())
                .ok_or_else(|| Error::InvalidPath {
                    reason: "could not extract filename".into(),
                })?;

        let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("bin");

        let content_type = mime_guess::from_path(path)
            .first_or_octet_stream()
            .to_string();

        debug!(
            file_name = %file_name,
            content_type = %content_type,
            size = size,
            "uploading file from path"
        );

        let body = ByteStream::from_path(path)
            .await
            .map_err(|e| Error::FileRead {
                path: path.display().to_string(),
                source: std::io::Error::other(e.to_string()),
            })?;

        let key = self
            .key_generator
            .generate_key(ext, self.config.key_prefix.as_deref());
        self.upload_to_storage(key, body, content_type, size).await
    }

    /// Upload content from a URL by downloading it first.
    ///
    /// The content is buffered for small responses and streamed for larger
    /// responses to avoid unbounded memory usage.
    ///
    /// # Arguments
    ///
    /// * `url` - The URL to download and upload.
    ///
    /// # Errors
    ///
    /// Returns an error if:
    /// - The URL is invalid
    /// - The download fails
    /// - The content exceeds the maximum size limit
    /// - The S3 upload fails
    #[instrument(skip(self), fields(url = %url))]
    pub async fn upload_from_url(&self, url: &str) -> Result<UploadResult> {
        let parsed_url = Url::parse(url).map_err(|e| Error::InvalidUrl {
            url: url.to_string(),
            reason: e.to_string(),
        })?;

        match parsed_url.scheme() {
            "http" | "https" => {}
            scheme => {
                return Err(Error::InvalidUrl {
                    url: url.to_string(),
                    reason: format!("unsupported scheme '{}', expected http or https", scheme),
                });
            }
        }

        debug!(url = %url, "downloading content from URL");

        let download = self
            .downloader
            .fetch(
                parsed_url,
                self.config.max_buffered_bytes,
                self.config.max_file_size,
            )
            .await?;

        let key = self
            .key_generator
            .generate_key(&download.extension, self.config.key_prefix.as_deref());

        match download.body {
            DownloadBody::Buffered { bytes } => {
                let size = bytes.len() as u64;
                let body = ByteStream::from(bytes);
                self.upload_to_storage(key, body, download.content_type, size)
                    .await
            }
            DownloadBody::Streamed {
                stream,
                size_counter,
            } => {
                let body_stream = stream.map_ok(Frame::data);
                let body = ByteStream::from_body_1_x(StreamBody::new(body_stream));
                let result = self
                    .storage
                    .put_object(StorageInput {
                        key: key.clone(),
                        content_type: download.content_type.clone(),
                        body,
                    })
                    .await
                    .map_err(|e| {
                        warn!(error = %e, key = %key, "S3 put_object failed");
                        map_s3_error(
                            "put_object",
                            &e,
                            Some(STREAM_LIMIT_ERROR),
                            Some(self.config.max_file_size),
                        )
                    })?;

                let size = size_counter.load(std::sync::atomic::Ordering::Relaxed);
                Ok(self.build_upload_result(key, download.content_type, size, result.etag))
            }
        }
    }

    /// Upload raw bytes directly.
    ///
    /// # Arguments
    ///
    /// * `bytes` - The raw bytes to upload.
    /// * `content_type` - The MIME type of the content.
    /// * `extension` - Optional file extension for the generated key.
    ///
    /// # Errors
    ///
    /// Returns an error if:
    /// - The content exceeds the maximum size limit
    /// - The S3 upload fails
    #[instrument(skip(self, bytes), fields(content_type = %content_type))]
    pub async fn upload_bytes(
        &self,
        bytes: impl Into<Vec<u8>>,
        content_type: &str,
        extension: Option<&str>,
    ) -> Result<UploadResult> {
        let bytes = bytes.into();
        let size = bytes.len() as u64;

        if size > self.config.max_file_size {
            return Err(Error::Config {
                message: format!(
                    "content size {} exceeds maximum {}",
                    size, self.config.max_file_size
                ),
            });
        }

        let ext = extension
            .or_else(|| {
                mime_guess::get_mime_extensions_str(content_type)
                    .and_then(|arr| arr.first().copied())
            })
            .unwrap_or("bin");

        debug!(size = size, content_type = %content_type, ext = %ext, "uploading bytes");

        let body = ByteStream::from(bytes);
        let key = self
            .key_generator
            .generate_key(ext, self.config.key_prefix.as_deref());
        self.upload_to_storage(key, body, content_type.to_string(), size)
            .await
    }

    /// Get a reference to the configuration.
    pub fn config(&self) -> &UploaderConfig {
        &self.config
    }

    async fn upload_to_storage(
        &self,
        key: String,
        body: ByteStream,
        content_type: String,
        size: u64,
    ) -> Result<UploadResult> {
        let result = self
            .storage
            .put_object(StorageInput {
                key: key.clone(),
                content_type: content_type.clone(),
                body,
            })
            .await
            .map_err(|e| {
                warn!(error = %e, key = %key, "S3 put_object failed");
                map_s3_error("put_object", &e, None, None)
            })?;

        Ok(self.build_upload_result(key, content_type, size, result.etag))
    }

    fn build_upload_result(
        &self,
        key: String,
        content_type: String,
        size: u64,
        etag: Option<String>,
    ) -> UploadResult {
        let public_url = format!(
            "{}/{}",
            self.config.public_base_url.trim_end_matches('/'),
            key
        );

        debug!(key = %key, public_url = %public_url, "upload complete");

        UploadResult {
            bucket: self.config.bucket.clone(),
            key,
            public_url,
            etag,
            content_type,
            size,
        }
    }
}

fn map_s3_error(
    operation: &str,
    error: &(dyn StdError + 'static),
    limit_message: Option<&'static str>,
    size_limit: Option<u64>,
) -> Error {
    if let Some(message) = limit_message
        && error_chain_contains_message(error, message)
    {
        return Error::Config {
            message: match size_limit {
                Some(limit) => format!("downloaded content size exceeds maximum {}", limit),
                None => "downloaded content size exceeds maximum".into(),
            },
        };
    }

    Error::S3Operation {
        operation: operation.to_string(),
        reason: error.to_string(),
    }
}

fn error_chain_contains_message(error: &(dyn StdError + 'static), message: &str) -> bool {
    if error.to_string().contains(message) {
        return true;
    }

    let mut source = error.source();
    while let Some(err) = source {
        if err.to_string().contains(message) {
            return true;
        }
        source = err.source();
    }

    false
}