use crate::config::UploaderConfig;
use crate::download::{DownloadBody, Downloader, ReqwestDownloader, STREAM_LIMIT_ERROR};
use crate::error::{Error, Result};
use crate::keygen::{KeyGenerator, UuidKeyGenerator};
use crate::storage::{S3Storage, StorageClient, StorageInput};
use crate::types::UploadResult;
use aws_config::BehaviorVersion;
use aws_credential_types::Credentials;
use aws_sdk_s3::{Client, primitives::ByteStream};
use aws_types::region::Region;
use futures_util::TryStreamExt;
use http_body::Frame;
use http_body_util::StreamBody;
use std::error::Error as StdError;
use std::path::Path;
use tracing::{debug, instrument, warn};
use url::Url;
#[derive(Clone)]
pub struct GarageUploader<D = ReqwestDownloader, S = S3Storage, K = UuidKeyGenerator> {
config: UploaderConfig,
downloader: D,
storage: S,
key_generator: K,
}
impl GarageUploader {
pub fn new(config: UploaderConfig) -> Result<Self> {
config.validate()?;
let credentials = Credentials::new(
&config.access_key_id,
&config.secret_access_key,
None,
None,
"garage-sdk",
);
let s3_config = aws_sdk_s3::config::Builder::new()
.endpoint_url(&config.endpoint)
.force_path_style(true)
.region(Region::new(config.region.clone()))
.credentials_provider(credentials)
.behavior_version(BehaviorVersion::latest())
.build();
let http = reqwest::Client::builder()
.timeout(config.download_timeout)
.user_agent("garage-sdk/0.1.0")
.build()
.map_err(|e| Error::Config {
message: format!("failed to create HTTP client: {}", e),
})?;
let storage = S3Storage::new(Client::from_conf(s3_config), config.bucket.clone());
let downloader = ReqwestDownloader::new(http);
Ok(Self {
config,
downloader,
storage,
key_generator: UuidKeyGenerator,
})
}
}
impl<D, S, K> GarageUploader<D, S, K>
where
D: Downloader,
S: StorageClient,
K: KeyGenerator,
S::Error: StdError + 'static,
{
pub fn with_components(
config: UploaderConfig,
downloader: D,
storage: S,
key_generator: K,
) -> Result<Self> {
config.validate()?;
Ok(Self {
config,
downloader,
storage,
key_generator,
})
}
#[instrument(skip(self), fields(path = %path.as_ref().display()))]
pub async fn upload_from_path(&self, path: impl AsRef<Path>) -> Result<UploadResult> {
let path = path.as_ref();
let metadata = tokio::fs::metadata(path)
.await
.map_err(|e| Error::FileRead {
path: path.display().to_string(),
source: e,
})?;
if !metadata.is_file() {
return Err(Error::InvalidPath {
reason: "path is not a file".into(),
});
}
let size = metadata.len();
if size > self.config.max_file_size {
return Err(Error::Config {
message: format!(
"file size {} exceeds maximum {}",
size, self.config.max_file_size
),
});
}
let file_name =
path.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| Error::InvalidPath {
reason: "could not extract filename".into(),
})?;
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("bin");
let content_type = mime_guess::from_path(path)
.first_or_octet_stream()
.to_string();
debug!(
file_name = %file_name,
content_type = %content_type,
size = size,
"uploading file from path"
);
let body = ByteStream::from_path(path)
.await
.map_err(|e| Error::FileRead {
path: path.display().to_string(),
source: std::io::Error::other(e.to_string()),
})?;
let key = self
.key_generator
.generate_key(ext, self.config.key_prefix.as_deref());
self.upload_to_storage(key, body, content_type, size).await
}
#[instrument(skip(self), fields(url = %url))]
pub async fn upload_from_url(&self, url: &str) -> Result<UploadResult> {
let parsed_url = Url::parse(url).map_err(|e| Error::InvalidUrl {
url: url.to_string(),
reason: e.to_string(),
})?;
match parsed_url.scheme() {
"http" | "https" => {}
scheme => {
return Err(Error::InvalidUrl {
url: url.to_string(),
reason: format!("unsupported scheme '{}', expected http or https", scheme),
});
}
}
debug!(url = %url, "downloading content from URL");
let download = self
.downloader
.fetch(
parsed_url,
self.config.max_buffered_bytes,
self.config.max_file_size,
)
.await?;
let key = self
.key_generator
.generate_key(&download.extension, self.config.key_prefix.as_deref());
match download.body {
DownloadBody::Buffered { bytes } => {
let size = bytes.len() as u64;
let body = ByteStream::from(bytes);
self.upload_to_storage(key, body, download.content_type, size)
.await
}
DownloadBody::Streamed {
stream,
size_counter,
} => {
let body_stream = stream.map_ok(Frame::data);
let body = ByteStream::from_body_1_x(StreamBody::new(body_stream));
let result = self
.storage
.put_object(StorageInput {
key: key.clone(),
content_type: download.content_type.clone(),
body,
})
.await
.map_err(|e| {
warn!(error = %e, key = %key, "S3 put_object failed");
map_s3_error(
"put_object",
&e,
Some(STREAM_LIMIT_ERROR),
Some(self.config.max_file_size),
)
})?;
let size = size_counter.load(std::sync::atomic::Ordering::Relaxed);
Ok(self.build_upload_result(key, download.content_type, size, result.etag))
}
}
}
#[instrument(skip(self, bytes), fields(content_type = %content_type))]
pub async fn upload_bytes(
&self,
bytes: impl Into<Vec<u8>>,
content_type: &str,
extension: Option<&str>,
) -> Result<UploadResult> {
let bytes = bytes.into();
let size = bytes.len() as u64;
if size > self.config.max_file_size {
return Err(Error::Config {
message: format!(
"content size {} exceeds maximum {}",
size, self.config.max_file_size
),
});
}
let ext = extension
.or_else(|| {
mime_guess::get_mime_extensions_str(content_type)
.and_then(|arr| arr.first().copied())
})
.unwrap_or("bin");
debug!(size = size, content_type = %content_type, ext = %ext, "uploading bytes");
let body = ByteStream::from(bytes);
let key = self
.key_generator
.generate_key(ext, self.config.key_prefix.as_deref());
self.upload_to_storage(key, body, content_type.to_string(), size)
.await
}
pub fn config(&self) -> &UploaderConfig {
&self.config
}
async fn upload_to_storage(
&self,
key: String,
body: ByteStream,
content_type: String,
size: u64,
) -> Result<UploadResult> {
let result = self
.storage
.put_object(StorageInput {
key: key.clone(),
content_type: content_type.clone(),
body,
})
.await
.map_err(|e| {
warn!(error = %e, key = %key, "S3 put_object failed");
map_s3_error("put_object", &e, None, None)
})?;
Ok(self.build_upload_result(key, content_type, size, result.etag))
}
fn build_upload_result(
&self,
key: String,
content_type: String,
size: u64,
etag: Option<String>,
) -> UploadResult {
let public_url = format!(
"{}/{}",
self.config.public_base_url.trim_end_matches('/'),
key
);
debug!(key = %key, public_url = %public_url, "upload complete");
UploadResult {
bucket: self.config.bucket.clone(),
key,
public_url,
etag,
content_type,
size,
}
}
}
fn map_s3_error(
operation: &str,
error: &(dyn StdError + 'static),
limit_message: Option<&'static str>,
size_limit: Option<u64>,
) -> Error {
if let Some(message) = limit_message
&& error_chain_contains_message(error, message)
{
return Error::Config {
message: match size_limit {
Some(limit) => format!("downloaded content size exceeds maximum {}", limit),
None => "downloaded content size exceeds maximum".into(),
},
};
}
Error::S3Operation {
operation: operation.to_string(),
reason: error.to_string(),
}
}
fn error_chain_contains_message(error: &(dyn StdError + 'static), message: &str) -> bool {
if error.to_string().contains(message) {
return true;
}
let mut source = error.source();
while let Some(err) = source {
if err.to_string().contains(message) {
return true;
}
source = err.source();
}
false
}