use crate::aws::{deployer_directory, Error, InstanceConfig};
use aws_config::BehaviorVersion;
pub use aws_config::Region;
use aws_sdk_s3::{
config::retry::ReconnectMode,
operation::head_object::HeadObjectError,
presigning::PresigningConfig,
primitives::ByteStream,
types::{BucketLocationConstraint, CreateBucketConfiguration, Delete, ObjectIdentifier},
Client as S3Client,
};
use commonware_cryptography::{Hasher as _, Sha256};
use futures::{
future::try_join_all,
stream::{self, StreamExt, TryStreamExt},
};
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
io::Read,
path::Path,
time::Duration,
};
use tracing::{debug, info};
const BUCKET_CONFIG_FILE: &str = "bucket";
pub fn get_bucket_name() -> String {
let path = deployer_directory(None).join(BUCKET_CONFIG_FILE);
if let Ok(contents) = std::fs::read_to_string(&path) {
let name = contents.trim();
if !name.is_empty() {
return name.to_string();
}
}
let suffix = &uuid::Uuid::new_v4().simple().to_string()[..16];
let bucket_name = format!("commonware-deployer-{suffix}");
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).expect("failed to create deployer directory");
}
std::fs::write(&path, &bucket_name).expect("failed to write bucket config");
bucket_name
}
pub fn delete_bucket_config() {
let path = deployer_directory(None).join(BUCKET_CONFIG_FILE);
let _ = std::fs::remove_file(path);
}
pub const TOOLS_BINARIES_PREFIX: &str = "tools/binaries";
pub const TOOLS_CONFIGS_PREFIX: &str = "tools/configs";
pub const DEPLOYMENTS_PREFIX: &str = "deployments";
pub const MAX_HASH_BUFFER_SIZE: usize = 32 * 1024 * 1024;
pub const MAX_CONCURRENT_HASHES: usize = 8;
pub const PRESIGN_DURATION: Duration = Duration::from_secs(6 * 60 * 60);
pub const WGET: &str =
"wget -q --tries=10 --retry-connrefused --retry-on-http-error=404,408,429,500,502,503,504 --waitretry=5";
pub async fn create_client(region: Region) -> S3Client {
let retry = aws_config::retry::RetryConfig::adaptive()
.with_max_attempts(u32::MAX)
.with_initial_backoff(Duration::from_millis(500))
.with_max_backoff(Duration::from_secs(30))
.with_reconnect_mode(ReconnectMode::ReconnectOnTransientError);
let config = aws_config::defaults(BehaviorVersion::v2026_01_12())
.region(region)
.retry_config(retry)
.load()
.await;
S3Client::new(&config)
}
pub async fn ensure_bucket_exists(
client: &S3Client,
bucket_name: &str,
region: &str,
) -> Result<(), Error> {
match client.head_bucket().bucket(bucket_name).send().await {
Ok(_) => {
info!(bucket = bucket_name, "bucket already exists");
return Ok(());
}
Err(e) => {
let bucket_region = e
.raw_response()
.and_then(|r| r.headers().get("x-amz-bucket-region"))
.map(|s| s.to_string());
let service_err = e.into_service_error();
if service_err.is_not_found() {
debug!(bucket = bucket_name, "bucket not found, will create");
} else if let Some(bucket_region) = bucket_region {
info!(
bucket = bucket_name,
bucket_region = bucket_region.as_str(),
client_region = region,
"bucket exists in different region, using cross-region access"
);
return Ok(());
} else {
return Err(Error::S3BucketForbidden {
bucket: bucket_name.to_string(),
reason: super::BucketForbiddenReason::AccessDenied,
});
}
}
}
let mut request = client.create_bucket().bucket(bucket_name);
if region != "us-east-1" {
let location_constraint = BucketLocationConstraint::from(region);
let bucket_config = CreateBucketConfiguration::builder()
.location_constraint(location_constraint)
.build();
request = request.create_bucket_configuration(bucket_config);
}
match request.send().await {
Ok(_) => {
info!(bucket = bucket_name, region = region, "created bucket");
}
Err(e) => {
let service_err = e.into_service_error();
let s3_err = aws_sdk_s3::Error::from(service_err);
match &s3_err {
aws_sdk_s3::Error::BucketAlreadyExists(_)
| aws_sdk_s3::Error::BucketAlreadyOwnedByYou(_) => {
info!(bucket = bucket_name, "bucket already exists");
}
_ => {
return Err(Error::AwsS3 {
bucket: bucket_name.to_string(),
operation: super::S3Operation::CreateBucket,
source: Box::new(s3_err),
});
}
}
}
}
Ok(())
}
pub async fn object_exists(client: &S3Client, bucket: &str, key: &str) -> Result<bool, Error> {
match client.head_object().bucket(bucket).key(key).send().await {
Ok(_) => Ok(true),
Err(e) => {
let service_err = e.into_service_error();
if matches!(service_err, HeadObjectError::NotFound(_)) {
Ok(false)
} else {
Err(Error::AwsS3 {
bucket: bucket.to_string(),
operation: super::S3Operation::HeadObject,
source: Box::new(aws_sdk_s3::Error::from(service_err)),
})
}
}
}
}
async fn upload_with_retry<F, Fut>(client: &S3Client, bucket: &str, key: &str, make_body: F)
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<ByteStream, Error>>,
{
let mut attempt = 0u32;
loop {
let body = match make_body().await {
Ok(b) => b,
Err(e) => {
debug!(
bucket = bucket,
key = key,
attempt = attempt + 1,
error = %e,
"failed to create body, retrying"
);
attempt = attempt.saturating_add(1);
let backoff = Duration::from_millis(500 * (1 << attempt.min(10)));
tokio::time::sleep(backoff).await;
continue;
}
};
match client
.put_object()
.bucket(bucket)
.key(key)
.body(body)
.send()
.await
{
Ok(_) => {
debug!(bucket = bucket, key = key, "uploaded to S3");
return;
}
Err(e) => {
debug!(
bucket = bucket,
key = key,
attempt = attempt + 1,
error = %e,
"upload failed, retrying"
);
attempt = attempt.saturating_add(1);
let backoff = Duration::from_millis(500 * (1 << attempt.min(10)));
tokio::time::sleep(backoff).await;
}
}
}
}
pub enum UploadSource<'a> {
File(&'a Path),
Static(&'static [u8]),
}
#[must_use = "the pre-signed URL should be used to download the content"]
pub async fn cache_and_presign(
client: &S3Client,
bucket: &str,
key: &str,
source: UploadSource<'_>,
expires_in: Duration,
) -> Result<String, Error> {
if !object_exists(client, bucket, key).await? {
debug!(key = key, "not in S3, uploading");
match source {
UploadSource::File(path) => {
let path = path.to_path_buf();
upload_with_retry(client, bucket, key, || {
let path = path.clone();
async move {
ByteStream::from_path(path)
.await
.map_err(|e| Error::Io(std::io::Error::other(e)))
}
})
.await;
}
UploadSource::Static(content) => {
upload_with_retry(client, bucket, key, || async {
Ok(ByteStream::from_static(content))
})
.await;
}
}
}
presign_url(client, bucket, key, expires_in).await
}
pub async fn hash_file(path: &Path) -> Result<String, Error> {
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || {
let mut file = std::fs::File::open(&path)?;
let file_size = file.metadata()?.len() as usize;
let buffer_size = file_size.min(MAX_HASH_BUFFER_SIZE);
let mut hasher = Sha256::new();
let mut buffer = vec![0u8; buffer_size];
loop {
let bytes_read = file.read(&mut buffer)?;
if bytes_read == 0 {
break;
}
hasher.update(&buffer[..bytes_read]);
}
Ok(hasher.finalize().to_string())
})
.await
.map_err(|e| Error::Io(std::io::Error::other(e)))?
}
pub async fn hash_files(paths: Vec<String>) -> Result<HashMap<String, String>, Error> {
stream::iter(paths.into_iter().map(|path| async move {
let digest = hash_file(Path::new(&path)).await?;
Ok::<_, Error>((path, digest))
}))
.buffer_unordered(MAX_CONCURRENT_HASHES)
.try_collect()
.await
}
#[must_use = "the pre-signed URL should be used to download the object"]
pub async fn presign_url(
client: &S3Client,
bucket: &str,
key: &str,
expires_in: Duration,
) -> Result<String, Error> {
let presigning_config = PresigningConfig::expires_in(expires_in)?;
let presigned_request = client
.get_object()
.bucket(bucket)
.key(key)
.presigned(presigning_config)
.await?;
Ok(presigned_request.uri().to_string())
}
pub async fn delete_prefix(client: &S3Client, bucket: &str, prefix: &str) -> Result<(), Error> {
let mut continuation_token: Option<String> = None;
let mut deleted_count = 0;
loop {
let mut request = client.list_objects_v2().bucket(bucket).prefix(prefix);
if let Some(token) = continuation_token {
request = request.continuation_token(token);
}
let response = request.send().await.map_err(|e| Error::AwsS3 {
bucket: bucket.to_string(),
operation: super::S3Operation::ListObjects,
source: Box::new(aws_sdk_s3::Error::from(e.into_service_error())),
})?;
if let Some(objects) = response.contents {
let identifiers: Vec<ObjectIdentifier> = objects
.into_iter()
.filter_map(|obj| obj.key)
.map(|key| ObjectIdentifier::builder().key(key).build())
.collect::<Result<Vec<_>, _>>()?;
if !identifiers.is_empty() {
let count = identifiers.len();
let delete = Delete::builder().set_objects(Some(identifiers)).build()?;
client
.delete_objects()
.bucket(bucket)
.delete(delete)
.send()
.await
.map_err(|e| Error::AwsS3 {
bucket: bucket.to_string(),
operation: super::S3Operation::DeleteObjects,
source: Box::new(aws_sdk_s3::Error::from(e.into_service_error())),
})?;
deleted_count += count;
}
}
if response.is_truncated == Some(true) {
continuation_token = response.next_continuation_token;
} else {
break;
}
}
info!(
bucket = bucket,
prefix = prefix,
count = deleted_count,
"deleted objects from S3"
);
Ok(())
}
pub async fn delete_bucket(client: &S3Client, bucket: &str) -> Result<(), Error> {
client
.delete_bucket()
.bucket(bucket)
.send()
.await
.map_err(|e| Error::AwsS3 {
bucket: bucket.to_string(),
operation: super::S3Operation::DeleteBucket,
source: Box::new(aws_sdk_s3::Error::from(e.into_service_error())),
})?;
info!(bucket = bucket, "deleted bucket");
Ok(())
}
pub async fn delete_bucket_and_contents(client: &S3Client, bucket: &str) -> Result<(), Error> {
delete_prefix(client, bucket, "").await?;
delete_bucket(client, bucket).await?;
Ok(())
}
pub fn is_no_such_bucket_error(error: &Error) -> bool {
match error {
Error::AwsS3 { source, .. } => {
matches!(source.as_ref(), aws_sdk_s3::Error::NoSuchBucket(_))
}
_ => false,
}
}
pub struct InstanceFileUrls {
pub binary_urls: HashMap<String, String>,
pub config_urls: HashMap<String, String>,
}
pub async fn upload_instance_files(
client: &S3Client,
bucket: &str,
tag: &str,
instances: &[InstanceConfig],
) -> Result<InstanceFileUrls, Error> {
let mut unique_binary_paths: BTreeSet<String> = BTreeSet::new();
let mut unique_config_paths: BTreeSet<String> = BTreeSet::new();
for instance in instances {
unique_binary_paths.insert(instance.binary.clone());
unique_config_paths.insert(instance.config.clone());
}
let unique_paths: Vec<String> = unique_binary_paths
.iter()
.chain(unique_config_paths.iter())
.cloned()
.collect();
info!(count = unique_paths.len(), "computing file digests");
let path_to_digest = hash_files(unique_paths).await?;
let mut binary_digests: BTreeMap<String, String> = BTreeMap::new(); let mut config_digests: BTreeMap<String, String> = BTreeMap::new(); let mut instance_binary_digest: HashMap<String, String> = HashMap::new(); let mut instance_config_digest: HashMap<String, String> = HashMap::new(); for instance in instances {
let binary_digest = path_to_digest[&instance.binary].clone();
let config_digest = path_to_digest[&instance.config].clone();
binary_digests.insert(binary_digest.clone(), instance.binary.clone());
config_digests.insert(config_digest.clone(), instance.config.clone());
instance_binary_digest.insert(instance.name.clone(), binary_digest);
instance_config_digest.insert(instance.name.clone(), config_digest);
}
let (binary_digest_to_url, config_digest_to_url): (
HashMap<String, String>,
HashMap<String, String>,
) = tokio::try_join!(
async {
Ok::<_, Error>(
try_join_all(binary_digests.iter().map(|(digest, path)| {
let client = client.clone();
let bucket = bucket.to_string();
let digest = digest.clone();
let key = super::services::binary_s3_key(tag, &digest);
let path = path.clone();
async move {
let url = cache_and_presign(
&client,
&bucket,
&key,
UploadSource::File(path.as_ref()),
PRESIGN_DURATION,
)
.await?;
Ok::<_, Error>((digest, url))
}
}))
.await?
.into_iter()
.collect(),
)
},
async {
Ok::<_, Error>(
try_join_all(config_digests.iter().map(|(digest, path)| {
let client = client.clone();
let bucket = bucket.to_string();
let digest = digest.clone();
let key = super::services::config_s3_key(tag, &digest);
let path = path.clone();
async move {
let url = cache_and_presign(
&client,
&bucket,
&key,
UploadSource::File(path.as_ref()),
PRESIGN_DURATION,
)
.await?;
Ok::<_, Error>((digest, url))
}
}))
.await?
.into_iter()
.collect(),
)
},
)?;
let mut binary_urls: HashMap<String, String> = HashMap::new();
let mut config_urls: HashMap<String, String> = HashMap::new();
for instance in instances {
let binary_digest = &instance_binary_digest[&instance.name];
let config_digest = &instance_config_digest[&instance.name];
binary_urls.insert(
instance.name.clone(),
binary_digest_to_url[binary_digest].clone(),
);
config_urls.insert(
instance.name.clone(),
config_digest_to_url[config_digest].clone(),
);
}
Ok(InstanceFileUrls {
binary_urls,
config_urls,
})
}