use s3::{creds::Credentials, request::ResponseData, Bucket, BucketConfiguration, Region};
use super::errors::ArtefactError;
pub async fn bucket_exists(bucket: &Bucket) -> Result<bool, ArtefactError> {
match bucket.exists().await {
Ok(exists) => Ok(exists),
Err(error) => match error {
s3::error::S3Error::HttpFailWithBody(404, _) => Ok(false),
_ => {
tracing::error!(%error, "Unable to check if bucket exists");
Err(ArtefactError::UnableToCheckExistence(error))
}
},
}
}
pub async fn create_if_does_not_exist(bucket: Box<Bucket>) -> Result<Box<Bucket>, ArtefactError> {
let credentials = bucket.credentials().await.unwrap();
match bucket_exists(&bucket).await? {
true => {
tracing::info!("Using existing bucket");
Ok(bucket)
}
false => match Bucket::create_with_path_style(
&bucket.name,
bucket.region.clone(),
credentials,
BucketConfiguration::public(),
)
.await
{
Ok(response) => match response.success() {
true => {
tracing::info!("Created a new bucket");
Ok(bucket)
}
false => {
tracing::error!(
"Unable to create bucket, got failure response {}",
response.response_text
);
Err(ArtefactError::UnableToCreateBucketFailResponse(
response.response_text,
))
}
},
Err(error) => {
tracing::error!(%error, "Unable to create bucket");
Err(ArtefactError::UnableToCreateBucket(error))
}
},
}
}
#[tracing::instrument(skip(access_key, secret_key))]
pub async fn get_bucket(
access_key: &str,
secret_key: &str,
bucket_name: &str,
store_url: String,
) -> Result<Box<Bucket>, ArtefactError> {
let bucket_creds = match Credentials::new(Some(access_key), Some(secret_key), None, None, None)
{
Ok(creds) => creds,
Err(error) => {
tracing::error!(%error, "Unable to create creds for bucket");
return Err(ArtefactError::UnableToExistingOpenBucket(
s3::error::S3Error::Credentials(error),
));
}
};
let bucket_region = Region::Custom {
region: "custom".to_owned(),
endpoint: store_url,
};
let bucket = match Bucket::new(bucket_name, bucket_region.clone(), bucket_creds.clone()) {
Ok(bucket) => bucket.with_path_style(),
Err(error) => {
tracing::error!(%error, "Unable to open bucket");
return Err(ArtefactError::UnableToExistingOpenBucket(error));
}
};
create_if_does_not_exist(bucket).await
}
pub async fn create_parent_directories(local_path: &String) -> tokio::io::Result<()> {
let path = std::path::Path::new(&local_path);
let prefix = match path.parent() {
Some(parent) => parent,
None => {
return Ok(());
}
};
tokio::fs::create_dir_all(prefix).await
}
pub async fn get_artefact(
bucket: &Bucket,
store_path: String,
) -> Result<ResponseData, ArtefactError> {
let response = match bucket.get_object(&store_path).await {
Ok(response) => response,
Err(error) => match error {
s3::error::S3Error::HttpFailWithBody(404, _) => {
tracing::error!("Got 404 response while downloading artefact");
return Err(ArtefactError::ArtefactDoesNotExist(store_path));
}
error => {
tracing::error!(%error, "Could not download artefact");
return Err(ArtefactError::UnableToDownloadInput(error));
}
},
};
let status_code = response.status_code();
if status_code != 200 {
tracing::error!(
"Response was non ok code {} while downloading artefact",
status_code
);
return Err(ArtefactError::UnableToDownloadInputApi(status_code));
}
Ok(response)
}
#[tracing::instrument(skip(bucket))]
pub async fn download_input(
bucket: &Bucket,
local_path: String,
store_path: String,
) -> Result<(), ArtefactError> {
tracing::info!("Downloading input");
let response = get_artefact(bucket, store_path).await?;
if let Err(error) = create_parent_directories(&local_path).await {
tracing::error!(%error, "Unable to create parent directories for input");
return Err(ArtefactError::UnableToWriteInput(error));
}
if let std::io::Result::Err(error) = tokio::fs::write(local_path, &response.bytes()).await {
tracing::error!(%error, "File error while downloading input");
return Err(ArtefactError::UnableToWriteInput(error));
}
Ok(())
}
#[tracing::instrument(skip(bucket))]
pub async fn upload_output(
bucket: &Bucket,
local_path: String,
store_path: String,
) -> Result<(), ArtefactError> {
tracing::info!("Uploading output");
let content = match tokio::fs::read(local_path).await {
Ok(content) => content,
Err(error) => {
tracing::error!(%error, "File error while uploading output");
return Err(ArtefactError::UnableToReadOutput(error));
}
};
let response = match bucket.put_object(store_path, &content).await {
Ok(response) => response,
Err(error) => {
tracing::error!(%error, "Could not upload output");
return Err(ArtefactError::UnableToUploadArtifact(error));
}
};
let status_cost = response.status_code();
if status_cost != 200 {
tracing::error!(
"Response was non ok code {} while uploading output",
status_cost
);
return Err(ArtefactError::UnableToUploadArtifactApi(status_cost));
}
Ok(())
}