1use crate::aws::{deployer_directory, Error, InstanceConfig};
4use aws_config::BehaviorVersion;
5pub use aws_config::Region;
6use aws_sdk_s3::{
7 config::retry::ReconnectMode,
8 operation::head_object::HeadObjectError,
9 presigning::PresigningConfig,
10 primitives::ByteStream,
11 types::{BucketLocationConstraint, CreateBucketConfiguration, Delete, ObjectIdentifier},
12 Client as S3Client,
13};
14use commonware_cryptography::{Hasher as _, Sha256};
15use futures::{
16 future::try_join_all,
17 stream::{self, StreamExt, TryStreamExt},
18};
19use std::{
20 collections::{BTreeMap, BTreeSet, HashMap},
21 io::Read,
22 path::Path,
23 time::Duration,
24};
25use tracing::{debug, info};
26
27const BUCKET_CONFIG_FILE: &str = "bucket";
29
30pub fn get_bucket_name() -> String {
33 let path = deployer_directory(None).join(BUCKET_CONFIG_FILE);
34
35 if let Ok(contents) = std::fs::read_to_string(&path) {
36 let name = contents.trim();
37 if !name.is_empty() {
38 return name.to_string();
39 }
40 }
41
42 let suffix = &uuid::Uuid::new_v4().simple().to_string()[..16];
43 let bucket_name = format!("commonware-deployer-{suffix}");
44
45 if let Some(parent) = path.parent() {
46 std::fs::create_dir_all(parent).expect("failed to create deployer directory");
47 }
48 std::fs::write(&path, &bucket_name).expect("failed to write bucket config");
49
50 bucket_name
51}
52
53pub fn delete_bucket_config() {
55 let path = deployer_directory(None).join(BUCKET_CONFIG_FILE);
56
57 let _ = std::fs::remove_file(path);
60}
61
62pub const TOOLS_BINARIES_PREFIX: &str = "tools/binaries";
64
65pub const TOOLS_CONFIGS_PREFIX: &str = "tools/configs";
67
68pub const DEPLOYMENTS_PREFIX: &str = "deployments";
70
71pub const MAX_HASH_BUFFER_SIZE: usize = 32 * 1024 * 1024;
73
74pub const MAX_CONCURRENT_HASHES: usize = 8;
76
77pub const PRESIGN_DURATION: Duration = Duration::from_secs(6 * 60 * 60);
79
80pub const WGET: &str =
91 "wget -q --tries=10 --retry-connrefused --retry-on-http-error=404,408,429,500,502,503,504 --waitretry=5";
92
93pub async fn create_client(region: Region) -> S3Client {
95 let retry = aws_config::retry::RetryConfig::adaptive()
96 .with_max_attempts(u32::MAX)
97 .with_initial_backoff(Duration::from_millis(500))
98 .with_max_backoff(Duration::from_secs(30))
99 .with_reconnect_mode(ReconnectMode::ReconnectOnTransientError);
100 let config = aws_config::defaults(BehaviorVersion::v2026_01_12())
101 .region(region)
102 .retry_config(retry)
103 .load()
104 .await;
105 S3Client::new(&config)
106}
107
108pub async fn ensure_bucket_exists(
110 client: &S3Client,
111 bucket_name: &str,
112 region: &str,
113) -> Result<(), Error> {
114 match client.head_bucket().bucket(bucket_name).send().await {
116 Ok(_) => {
117 info!(bucket = bucket_name, "bucket already exists");
118 return Ok(());
119 }
120 Err(e) => {
121 let bucket_region = e
123 .raw_response()
124 .and_then(|r| r.headers().get("x-amz-bucket-region"))
125 .map(|s| s.to_string());
126
127 let service_err = e.into_service_error();
128 if service_err.is_not_found() {
129 debug!(bucket = bucket_name, "bucket not found, will create");
131 } else if let Some(bucket_region) = bucket_region {
132 info!(
134 bucket = bucket_name,
135 bucket_region = bucket_region.as_str(),
136 client_region = region,
137 "bucket exists in different region, using cross-region access"
138 );
139 return Ok(());
140 } else {
141 return Err(Error::S3BucketForbidden {
143 bucket: bucket_name.to_string(),
144 reason: super::BucketForbiddenReason::AccessDenied,
145 });
146 }
147 }
148 }
149
150 let mut request = client.create_bucket().bucket(bucket_name);
152 if region != "us-east-1" {
153 let location_constraint = BucketLocationConstraint::from(region);
154 let bucket_config = CreateBucketConfiguration::builder()
155 .location_constraint(location_constraint)
156 .build();
157 request = request.create_bucket_configuration(bucket_config);
158 }
159
160 match request.send().await {
161 Ok(_) => {
162 info!(bucket = bucket_name, region = region, "created bucket");
163 }
164 Err(e) => {
165 let service_err = e.into_service_error();
166 let s3_err = aws_sdk_s3::Error::from(service_err);
167 match &s3_err {
168 aws_sdk_s3::Error::BucketAlreadyExists(_)
169 | aws_sdk_s3::Error::BucketAlreadyOwnedByYou(_) => {
170 info!(bucket = bucket_name, "bucket already exists");
171 }
172 _ => {
173 return Err(Error::AwsS3 {
174 bucket: bucket_name.to_string(),
175 operation: super::S3Operation::CreateBucket,
176 source: Box::new(s3_err),
177 });
178 }
179 }
180 }
181 }
182 Ok(())
183}
184
185pub async fn object_exists(client: &S3Client, bucket: &str, key: &str) -> Result<bool, Error> {
187 match client.head_object().bucket(bucket).key(key).send().await {
188 Ok(_) => Ok(true),
189 Err(e) => {
190 let service_err = e.into_service_error();
191 if matches!(service_err, HeadObjectError::NotFound(_)) {
192 Ok(false)
193 } else {
194 Err(Error::AwsS3 {
195 bucket: bucket.to_string(),
196 operation: super::S3Operation::HeadObject,
197 source: Box::new(aws_sdk_s3::Error::from(service_err)),
198 })
199 }
200 }
201 }
202}
203
204async fn upload_with_retry<F, Fut>(client: &S3Client, bucket: &str, key: &str, make_body: F)
207where
208 F: Fn() -> Fut,
209 Fut: std::future::Future<Output = Result<ByteStream, Error>>,
210{
211 let mut attempt = 0u32;
212 loop {
213 let body = match make_body().await {
214 Ok(b) => b,
215 Err(e) => {
216 debug!(
217 bucket = bucket,
218 key = key,
219 attempt = attempt + 1,
220 error = %e,
221 "failed to create body, retrying"
222 );
223 attempt = attempt.saturating_add(1);
224 let backoff = Duration::from_millis(500 * (1 << attempt.min(10)));
225 tokio::time::sleep(backoff).await;
226 continue;
227 }
228 };
229
230 match client
231 .put_object()
232 .bucket(bucket)
233 .key(key)
234 .body(body)
235 .send()
236 .await
237 {
238 Ok(_) => {
239 debug!(bucket = bucket, key = key, "uploaded to S3");
240 return;
241 }
242 Err(e) => {
243 debug!(
244 bucket = bucket,
245 key = key,
246 attempt = attempt + 1,
247 error = %e,
248 "upload failed, retrying"
249 );
250 attempt = attempt.saturating_add(1);
251 let backoff = Duration::from_millis(500 * (1 << attempt.min(10)));
252 tokio::time::sleep(backoff).await;
253 }
254 }
255 }
256}
257
258pub enum UploadSource<'a> {
260 File(&'a Path),
261 Static(&'static [u8]),
262}
263
264#[must_use = "the pre-signed URL should be used to download the content"]
266pub async fn cache_and_presign(
267 client: &S3Client,
268 bucket: &str,
269 key: &str,
270 source: UploadSource<'_>,
271 expires_in: Duration,
272) -> Result<String, Error> {
273 if !object_exists(client, bucket, key).await? {
274 debug!(key = key, "not in S3, uploading");
275 match source {
276 UploadSource::File(path) => {
277 let path = path.to_path_buf();
278 upload_with_retry(client, bucket, key, || {
279 let path = path.clone();
280 async move {
281 ByteStream::from_path(path)
282 .await
283 .map_err(|e| Error::Io(std::io::Error::other(e)))
284 }
285 })
286 .await;
287 }
288 UploadSource::Static(content) => {
289 upload_with_retry(client, bucket, key, || async {
290 Ok(ByteStream::from_static(content))
291 })
292 .await;
293 }
294 }
295 }
296 presign_url(client, bucket, key, expires_in).await
297}
298
299pub async fn hash_file(path: &Path) -> Result<String, Error> {
302 let path = path.to_path_buf();
303 tokio::task::spawn_blocking(move || {
304 let mut file = std::fs::File::open(&path)?;
305 let file_size = file.metadata()?.len() as usize;
306 let buffer_size = file_size.min(MAX_HASH_BUFFER_SIZE);
307 let mut hasher = Sha256::new();
308 let mut buffer = vec![0u8; buffer_size];
309 loop {
310 let bytes_read = file.read(&mut buffer)?;
311 if bytes_read == 0 {
312 break;
313 }
314 hasher.update(&buffer[..bytes_read]);
315 }
316 Ok(hasher.finalize().to_string())
317 })
318 .await
319 .map_err(|e| Error::Io(std::io::Error::other(e)))?
320}
321
322pub async fn hash_files(paths: Vec<String>) -> Result<HashMap<String, String>, Error> {
325 stream::iter(paths.into_iter().map(|path| async move {
326 let digest = hash_file(Path::new(&path)).await?;
327 Ok::<_, Error>((path, digest))
328 }))
329 .buffer_unordered(MAX_CONCURRENT_HASHES)
330 .try_collect()
331 .await
332}
333
334#[must_use = "the pre-signed URL should be used to download the object"]
336pub async fn presign_url(
337 client: &S3Client,
338 bucket: &str,
339 key: &str,
340 expires_in: Duration,
341) -> Result<String, Error> {
342 let presigning_config = PresigningConfig::expires_in(expires_in)?;
343
344 let presigned_request = client
345 .get_object()
346 .bucket(bucket)
347 .key(key)
348 .presigned(presigning_config)
349 .await?;
350
351 Ok(presigned_request.uri().to_string())
352}
353
354pub async fn delete_prefix(client: &S3Client, bucket: &str, prefix: &str) -> Result<(), Error> {
356 let mut continuation_token: Option<String> = None;
357 let mut deleted_count = 0;
358
359 loop {
360 let mut request = client.list_objects_v2().bucket(bucket).prefix(prefix);
361
362 if let Some(token) = continuation_token {
363 request = request.continuation_token(token);
364 }
365
366 let response = request.send().await.map_err(|e| Error::AwsS3 {
367 bucket: bucket.to_string(),
368 operation: super::S3Operation::ListObjects,
369 source: Box::new(aws_sdk_s3::Error::from(e.into_service_error())),
370 })?;
371
372 if let Some(objects) = response.contents {
374 let identifiers: Vec<ObjectIdentifier> = objects
375 .into_iter()
376 .filter_map(|obj| obj.key)
377 .map(|key| ObjectIdentifier::builder().key(key).build())
378 .collect::<Result<Vec<_>, _>>()?;
379
380 if !identifiers.is_empty() {
381 let count = identifiers.len();
382 let delete = Delete::builder().set_objects(Some(identifiers)).build()?;
383
384 client
385 .delete_objects()
386 .bucket(bucket)
387 .delete(delete)
388 .send()
389 .await
390 .map_err(|e| Error::AwsS3 {
391 bucket: bucket.to_string(),
392 operation: super::S3Operation::DeleteObjects,
393 source: Box::new(aws_sdk_s3::Error::from(e.into_service_error())),
394 })?;
395
396 deleted_count += count;
397 }
398 }
399
400 if response.is_truncated == Some(true) {
401 continuation_token = response.next_continuation_token;
402 } else {
403 break;
404 }
405 }
406
407 info!(
408 bucket = bucket,
409 prefix = prefix,
410 count = deleted_count,
411 "deleted objects from S3"
412 );
413 Ok(())
414}
415
416pub async fn delete_bucket(client: &S3Client, bucket: &str) -> Result<(), Error> {
418 client
419 .delete_bucket()
420 .bucket(bucket)
421 .send()
422 .await
423 .map_err(|e| Error::AwsS3 {
424 bucket: bucket.to_string(),
425 operation: super::S3Operation::DeleteBucket,
426 source: Box::new(aws_sdk_s3::Error::from(e.into_service_error())),
427 })?;
428 info!(bucket = bucket, "deleted bucket");
429 Ok(())
430}
431
432pub async fn delete_bucket_and_contents(client: &S3Client, bucket: &str) -> Result<(), Error> {
434 delete_prefix(client, bucket, "").await?;
436
437 delete_bucket(client, bucket).await?;
439
440 Ok(())
441}
442
443pub fn is_no_such_bucket_error(error: &Error) -> bool {
445 match error {
446 Error::AwsS3 { source, .. } => {
447 matches!(source.as_ref(), aws_sdk_s3::Error::NoSuchBucket(_))
448 }
449 _ => false,
450 }
451}
452
453pub struct InstanceFileUrls {
455 pub binary_urls: HashMap<String, String>,
457 pub config_urls: HashMap<String, String>,
459}
460
461pub async fn upload_instance_files(
471 client: &S3Client,
472 bucket: &str,
473 tag: &str,
474 instances: &[InstanceConfig],
475) -> Result<InstanceFileUrls, Error> {
476 let mut unique_binary_paths: BTreeSet<String> = BTreeSet::new();
478 let mut unique_config_paths: BTreeSet<String> = BTreeSet::new();
479 for instance in instances {
480 unique_binary_paths.insert(instance.binary.clone());
481 unique_config_paths.insert(instance.config.clone());
482 }
483
484 let unique_paths: Vec<String> = unique_binary_paths
486 .iter()
487 .chain(unique_config_paths.iter())
488 .cloned()
489 .collect();
490 info!(count = unique_paths.len(), "computing file digests");
491 let path_to_digest = hash_files(unique_paths).await?;
492
493 let mut binary_digests: BTreeMap<String, String> = BTreeMap::new(); let mut config_digests: BTreeMap<String, String> = BTreeMap::new(); let mut instance_binary_digest: HashMap<String, String> = HashMap::new(); let mut instance_config_digest: HashMap<String, String> = HashMap::new(); for instance in instances {
499 let binary_digest = path_to_digest[&instance.binary].clone();
500 let config_digest = path_to_digest[&instance.config].clone();
501 binary_digests.insert(binary_digest.clone(), instance.binary.clone());
502 config_digests.insert(config_digest.clone(), instance.config.clone());
503 instance_binary_digest.insert(instance.name.clone(), binary_digest);
504 instance_config_digest.insert(instance.name.clone(), config_digest);
505 }
506
507 let (binary_digest_to_url, config_digest_to_url): (
509 HashMap<String, String>,
510 HashMap<String, String>,
511 ) = tokio::try_join!(
512 async {
513 Ok::<_, Error>(
514 try_join_all(binary_digests.iter().map(|(digest, path)| {
515 let client = client.clone();
516 let bucket = bucket.to_string();
517 let digest = digest.clone();
518 let key = super::services::binary_s3_key(tag, &digest);
519 let path = path.clone();
520 async move {
521 let url = cache_and_presign(
522 &client,
523 &bucket,
524 &key,
525 UploadSource::File(path.as_ref()),
526 PRESIGN_DURATION,
527 )
528 .await?;
529 Ok::<_, Error>((digest, url))
530 }
531 }))
532 .await?
533 .into_iter()
534 .collect(),
535 )
536 },
537 async {
538 Ok::<_, Error>(
539 try_join_all(config_digests.iter().map(|(digest, path)| {
540 let client = client.clone();
541 let bucket = bucket.to_string();
542 let digest = digest.clone();
543 let key = super::services::config_s3_key(tag, &digest);
544 let path = path.clone();
545 async move {
546 let url = cache_and_presign(
547 &client,
548 &bucket,
549 &key,
550 UploadSource::File(path.as_ref()),
551 PRESIGN_DURATION,
552 )
553 .await?;
554 Ok::<_, Error>((digest, url))
555 }
556 }))
557 .await?
558 .into_iter()
559 .collect(),
560 )
561 },
562 )?;
563
564 let mut binary_urls: HashMap<String, String> = HashMap::new();
566 let mut config_urls: HashMap<String, String> = HashMap::new();
567 for instance in instances {
568 let binary_digest = &instance_binary_digest[&instance.name];
569 let config_digest = &instance_config_digest[&instance.name];
570 binary_urls.insert(
571 instance.name.clone(),
572 binary_digest_to_url[binary_digest].clone(),
573 );
574 config_urls.insert(
575 instance.name.clone(),
576 config_digest_to_url[config_digest].clone(),
577 );
578 }
579
580 Ok(InstanceFileUrls {
581 binary_urls,
582 config_urls,
583 })
584}