Skip to main content

commonware_deployer/aws/
s3.rs

1//! AWS S3 SDK function wrappers for caching deployer artifacts
2
3use crate::aws::{deployer_directory, Error, InstanceConfig};
4use aws_config::BehaviorVersion;
5pub use aws_config::Region;
6use aws_sdk_s3::{
7    config::retry::ReconnectMode,
8    operation::head_object::HeadObjectError,
9    presigning::PresigningConfig,
10    primitives::ByteStream,
11    types::{BucketLocationConstraint, CreateBucketConfiguration, Delete, ObjectIdentifier},
12    Client as S3Client,
13};
14use commonware_cryptography::{Hasher as _, Sha256};
15use futures::{
16    future::try_join_all,
17    stream::{self, StreamExt, TryStreamExt},
18};
19use std::{
20    collections::{BTreeMap, BTreeSet, HashMap},
21    io::Read,
22    path::Path,
23    time::Duration,
24};
25use tracing::{debug, info};
26
27/// File name for the bucket config (stores the S3 bucket name).
28const BUCKET_CONFIG_FILE: &str = "bucket";
29
30/// Gets the bucket name, generating one if it doesn't exist.
31/// The bucket name is stored in ~/.commonware_deployer/bucket.
32pub fn get_bucket_name() -> String {
33    let path = deployer_directory(None).join(BUCKET_CONFIG_FILE);
34
35    if let Ok(contents) = std::fs::read_to_string(&path) {
36        let name = contents.trim();
37        if !name.is_empty() {
38            return name.to_string();
39        }
40    }
41
42    let suffix = &uuid::Uuid::new_v4().simple().to_string()[..16];
43    let bucket_name = format!("commonware-deployer-{suffix}");
44
45    if let Some(parent) = path.parent() {
46        std::fs::create_dir_all(parent).expect("failed to create deployer directory");
47    }
48    std::fs::write(&path, &bucket_name).expect("failed to write bucket config");
49
50    bucket_name
51}
52
53/// Deletes the bucket config file so a new bucket name is generated on next use.
54pub fn delete_bucket_config() {
55    let path = deployer_directory(None).join(BUCKET_CONFIG_FILE);
56
57    // If the bucket config file doesn't exist yet, do nothing (clean may have been run
58    // out-of-order)
59    let _ = std::fs::remove_file(path);
60}
61
62/// Prefix for tool binaries: tools/binaries/{tool}/{version}/{platform}/{filename}
63pub const TOOLS_BINARIES_PREFIX: &str = "tools/binaries";
64
65/// Prefix for tool configs: tools/configs/{deployer_version}/{component}/{file}
66pub const TOOLS_CONFIGS_PREFIX: &str = "tools/configs";
67
68/// Prefix for per-deployment data
69pub const DEPLOYMENTS_PREFIX: &str = "deployments";
70
71/// Maximum buffer size for file hashing (32MB)
72pub const MAX_HASH_BUFFER_SIZE: usize = 32 * 1024 * 1024;
73
74/// Maximum number of concurrent file hash operations
75pub const MAX_CONCURRENT_HASHES: usize = 8;
76
77/// Duration for pre-signed URLs (6 hours)
78pub const PRESIGN_DURATION: Duration = Duration::from_secs(6 * 60 * 60);
79
80/// Common wget prefix with retry settings for S3 downloads
81///
82/// Retries on connection failures and HTTP errors:
83/// - 404: Not Found (S3 eventual consistency)
84/// - 408: Request Timeout
85/// - 429: Too Many Requests (rate limiting)
86/// - 500: Internal Server Error
87/// - 502: Bad Gateway
88/// - 503: Service Unavailable
89/// - 504: Gateway Timeout
90pub const WGET: &str =
91    "wget -q --tries=10 --retry-connrefused --retry-on-http-error=404,408,429,500,502,503,504 --waitretry=5";
92
93/// Creates an S3 client for the specified AWS region
94pub async fn create_client(region: Region) -> S3Client {
95    let retry = aws_config::retry::RetryConfig::adaptive()
96        .with_max_attempts(u32::MAX)
97        .with_initial_backoff(Duration::from_millis(500))
98        .with_max_backoff(Duration::from_secs(30))
99        .with_reconnect_mode(ReconnectMode::ReconnectOnTransientError);
100    let config = aws_config::defaults(BehaviorVersion::v2026_01_12())
101        .region(region)
102        .retry_config(retry)
103        .load()
104        .await;
105    S3Client::new(&config)
106}
107
108/// Ensures the S3 bucket exists, creating it if necessary
109pub async fn ensure_bucket_exists(
110    client: &S3Client,
111    bucket_name: &str,
112    region: &str,
113) -> Result<(), Error> {
114    // Check if bucket exists by trying to get its location
115    match client.head_bucket().bucket(bucket_name).send().await {
116        Ok(_) => {
117            info!(bucket = bucket_name, "bucket already exists");
118            return Ok(());
119        }
120        Err(e) => {
121            // Check for region header before consuming the error
122            let bucket_region = e
123                .raw_response()
124                .and_then(|r| r.headers().get("x-amz-bucket-region"))
125                .map(|s| s.to_string());
126
127            let service_err = e.into_service_error();
128            if service_err.is_not_found() {
129                // 404: bucket doesn't exist, we need to create it
130                debug!(bucket = bucket_name, "bucket not found, will create");
131            } else if let Some(bucket_region) = bucket_region {
132                // Bucket exists in a different region - proceed with cross-region access
133                info!(
134                    bucket = bucket_name,
135                    bucket_region = bucket_region.as_str(),
136                    client_region = region,
137                    "bucket exists in different region, using cross-region access"
138                );
139                return Ok(());
140            } else {
141                // 403 or other error without region header: access denied
142                return Err(Error::S3BucketForbidden {
143                    bucket: bucket_name.to_string(),
144                    reason: super::BucketForbiddenReason::AccessDenied,
145                });
146            }
147        }
148    }
149
150    // Create the bucket (us-east-1 must not have a location constraint)
151    let mut request = client.create_bucket().bucket(bucket_name);
152    if region != "us-east-1" {
153        let location_constraint = BucketLocationConstraint::from(region);
154        let bucket_config = CreateBucketConfiguration::builder()
155            .location_constraint(location_constraint)
156            .build();
157        request = request.create_bucket_configuration(bucket_config);
158    }
159
160    match request.send().await {
161        Ok(_) => {
162            info!(bucket = bucket_name, region = region, "created bucket");
163        }
164        Err(e) => {
165            let service_err = e.into_service_error();
166            let s3_err = aws_sdk_s3::Error::from(service_err);
167            match &s3_err {
168                aws_sdk_s3::Error::BucketAlreadyExists(_)
169                | aws_sdk_s3::Error::BucketAlreadyOwnedByYou(_) => {
170                    info!(bucket = bucket_name, "bucket already exists");
171                }
172                _ => {
173                    return Err(Error::AwsS3 {
174                        bucket: bucket_name.to_string(),
175                        operation: super::S3Operation::CreateBucket,
176                        source: Box::new(s3_err),
177                    });
178                }
179            }
180        }
181    }
182    Ok(())
183}
184
185/// Checks if an object exists in S3
186pub async fn object_exists(client: &S3Client, bucket: &str, key: &str) -> Result<bool, Error> {
187    match client.head_object().bucket(bucket).key(key).send().await {
188        Ok(_) => Ok(true),
189        Err(e) => {
190            let service_err = e.into_service_error();
191            if matches!(service_err, HeadObjectError::NotFound(_)) {
192                Ok(false)
193            } else {
194                Err(Error::AwsS3 {
195                    bucket: bucket.to_string(),
196                    operation: super::S3Operation::HeadObject,
197                    source: Box::new(aws_sdk_s3::Error::from(service_err)),
198                })
199            }
200        }
201    }
202}
203
204/// Uploads a ByteStream to S3 with unlimited retries for transient failures.
205/// Takes a closure that produces the ByteStream, allowing re-creation on retry.
206async fn upload_with_retry<F, Fut>(client: &S3Client, bucket: &str, key: &str, make_body: F)
207where
208    F: Fn() -> Fut,
209    Fut: std::future::Future<Output = Result<ByteStream, Error>>,
210{
211    let mut attempt = 0u32;
212    loop {
213        let body = match make_body().await {
214            Ok(b) => b,
215            Err(e) => {
216                debug!(
217                    bucket = bucket,
218                    key = key,
219                    attempt = attempt + 1,
220                    error = %e,
221                    "failed to create body, retrying"
222                );
223                attempt = attempt.saturating_add(1);
224                let backoff = Duration::from_millis(500 * (1 << attempt.min(10)));
225                tokio::time::sleep(backoff).await;
226                continue;
227            }
228        };
229
230        match client
231            .put_object()
232            .bucket(bucket)
233            .key(key)
234            .body(body)
235            .send()
236            .await
237        {
238            Ok(_) => {
239                debug!(bucket = bucket, key = key, "uploaded to S3");
240                return;
241            }
242            Err(e) => {
243                debug!(
244                    bucket = bucket,
245                    key = key,
246                    attempt = attempt + 1,
247                    error = %e,
248                    "upload failed, retrying"
249                );
250                attempt = attempt.saturating_add(1);
251                let backoff = Duration::from_millis(500 * (1 << attempt.min(10)));
252                tokio::time::sleep(backoff).await;
253            }
254        }
255    }
256}
257
258/// Source for S3 upload
259pub enum UploadSource<'a> {
260    File(&'a Path),
261    Static(&'static [u8]),
262}
263
264/// Caches content to S3 if it doesn't exist, then returns a pre-signed URL
265#[must_use = "the pre-signed URL should be used to download the content"]
266pub async fn cache_and_presign(
267    client: &S3Client,
268    bucket: &str,
269    key: &str,
270    source: UploadSource<'_>,
271    expires_in: Duration,
272) -> Result<String, Error> {
273    if !object_exists(client, bucket, key).await? {
274        debug!(key = key, "not in S3, uploading");
275        match source {
276            UploadSource::File(path) => {
277                let path = path.to_path_buf();
278                upload_with_retry(client, bucket, key, || {
279                    let path = path.clone();
280                    async move {
281                        ByteStream::from_path(path)
282                            .await
283                            .map_err(|e| Error::Io(std::io::Error::other(e)))
284                    }
285                })
286                .await;
287            }
288            UploadSource::Static(content) => {
289                upload_with_retry(client, bucket, key, || async {
290                    Ok(ByteStream::from_static(content))
291                })
292                .await;
293            }
294        }
295    }
296    presign_url(client, bucket, key, expires_in).await
297}
298
299/// Computes the SHA256 hash of a file and returns it as a hex string.
300/// Uses spawn_blocking internally to avoid blocking the async runtime.
301pub async fn hash_file(path: &Path) -> Result<String, Error> {
302    let path = path.to_path_buf();
303    tokio::task::spawn_blocking(move || {
304        let mut file = std::fs::File::open(&path)?;
305        let file_size = file.metadata()?.len() as usize;
306        let buffer_size = file_size.min(MAX_HASH_BUFFER_SIZE);
307        let mut hasher = Sha256::new();
308        let mut buffer = vec![0u8; buffer_size];
309        loop {
310            let bytes_read = file.read(&mut buffer)?;
311            if bytes_read == 0 {
312                break;
313            }
314            hasher.update(&buffer[..bytes_read]);
315        }
316        Ok(hasher.finalize().to_string())
317    })
318    .await
319    .map_err(|e| Error::Io(std::io::Error::other(e)))?
320}
321
322/// Computes SHA256 hashes for multiple files concurrently.
323/// Returns a map from file path to hex-encoded digest.
324pub async fn hash_files(paths: Vec<String>) -> Result<HashMap<String, String>, Error> {
325    stream::iter(paths.into_iter().map(|path| async move {
326        let digest = hash_file(Path::new(&path)).await?;
327        Ok::<_, Error>((path, digest))
328    }))
329    .buffer_unordered(MAX_CONCURRENT_HASHES)
330    .try_collect()
331    .await
332}
333
334/// Generates a pre-signed URL for downloading an object from S3
335#[must_use = "the pre-signed URL should be used to download the object"]
336pub async fn presign_url(
337    client: &S3Client,
338    bucket: &str,
339    key: &str,
340    expires_in: Duration,
341) -> Result<String, Error> {
342    let presigning_config = PresigningConfig::expires_in(expires_in)?;
343
344    let presigned_request = client
345        .get_object()
346        .bucket(bucket)
347        .key(key)
348        .presigned(presigning_config)
349        .await?;
350
351    Ok(presigned_request.uri().to_string())
352}
353
354/// Deletes all objects under a prefix in S3 using batch delete (up to 1000 objects per request)
355pub async fn delete_prefix(client: &S3Client, bucket: &str, prefix: &str) -> Result<(), Error> {
356    let mut continuation_token: Option<String> = None;
357    let mut deleted_count = 0;
358
359    loop {
360        let mut request = client.list_objects_v2().bucket(bucket).prefix(prefix);
361
362        if let Some(token) = continuation_token {
363            request = request.continuation_token(token);
364        }
365
366        let response = request.send().await.map_err(|e| Error::AwsS3 {
367            bucket: bucket.to_string(),
368            operation: super::S3Operation::ListObjects,
369            source: Box::new(aws_sdk_s3::Error::from(e.into_service_error())),
370        })?;
371
372        // Collect object identifiers for batch delete
373        if let Some(objects) = response.contents {
374            let identifiers: Vec<ObjectIdentifier> = objects
375                .into_iter()
376                .filter_map(|obj| obj.key)
377                .map(|key| ObjectIdentifier::builder().key(key).build())
378                .collect::<Result<Vec<_>, _>>()?;
379
380            if !identifiers.is_empty() {
381                let count = identifiers.len();
382                let delete = Delete::builder().set_objects(Some(identifiers)).build()?;
383
384                client
385                    .delete_objects()
386                    .bucket(bucket)
387                    .delete(delete)
388                    .send()
389                    .await
390                    .map_err(|e| Error::AwsS3 {
391                        bucket: bucket.to_string(),
392                        operation: super::S3Operation::DeleteObjects,
393                        source: Box::new(aws_sdk_s3::Error::from(e.into_service_error())),
394                    })?;
395
396                deleted_count += count;
397            }
398        }
399
400        if response.is_truncated == Some(true) {
401            continuation_token = response.next_continuation_token;
402        } else {
403            break;
404        }
405    }
406
407    info!(
408        bucket = bucket,
409        prefix = prefix,
410        count = deleted_count,
411        "deleted objects from S3"
412    );
413    Ok(())
414}
415
416/// Deletes a bucket (must be empty first)
417pub async fn delete_bucket(client: &S3Client, bucket: &str) -> Result<(), Error> {
418    client
419        .delete_bucket()
420        .bucket(bucket)
421        .send()
422        .await
423        .map_err(|e| Error::AwsS3 {
424            bucket: bucket.to_string(),
425            operation: super::S3Operation::DeleteBucket,
426            source: Box::new(aws_sdk_s3::Error::from(e.into_service_error())),
427        })?;
428    info!(bucket = bucket, "deleted bucket");
429    Ok(())
430}
431
432/// Deletes all objects in a bucket and then deletes the bucket itself
433pub async fn delete_bucket_and_contents(client: &S3Client, bucket: &str) -> Result<(), Error> {
434    // First delete all objects (no prefix means all objects)
435    delete_prefix(client, bucket, "").await?;
436
437    // Then delete the bucket
438    delete_bucket(client, bucket).await?;
439
440    Ok(())
441}
442
443/// Checks if an error is a "bucket does not exist" error
444pub fn is_no_such_bucket_error(error: &Error) -> bool {
445    match error {
446        Error::AwsS3 { source, .. } => {
447            matches!(source.as_ref(), aws_sdk_s3::Error::NoSuchBucket(_))
448        }
449        _ => false,
450    }
451}
452
453/// Result of uploading instance files to S3
454pub struct InstanceFileUrls {
455    /// Map from instance name to binary pre-signed URL
456    pub binary_urls: HashMap<String, String>,
457    /// Map from instance name to config pre-signed URL
458    pub config_urls: HashMap<String, String>,
459}
460
461/// Uploads binary and config files for instances to S3 with digest-based deduplication.
462///
463/// This function:
464/// 1. Collects unique binary and config paths across all instances
465/// 2. Computes SHA256 digests for deduplication
466/// 3. Uploads unique files to S3 concurrently
467/// 4. Returns pre-signed URLs mapped by instance name
468///
469/// Files with identical content are uploaded only once, reducing bandwidth and storage.
470pub async fn upload_instance_files(
471    client: &S3Client,
472    bucket: &str,
473    tag: &str,
474    instances: &[InstanceConfig],
475) -> Result<InstanceFileUrls, Error> {
476    // Collect unique binary and config paths (dedup before hashing)
477    let mut unique_binary_paths: BTreeSet<String> = BTreeSet::new();
478    let mut unique_config_paths: BTreeSet<String> = BTreeSet::new();
479    for instance in instances {
480        unique_binary_paths.insert(instance.binary.clone());
481        unique_config_paths.insert(instance.config.clone());
482    }
483
484    // Compute digests concurrently for unique files only
485    let unique_paths: Vec<String> = unique_binary_paths
486        .iter()
487        .chain(unique_config_paths.iter())
488        .cloned()
489        .collect();
490    info!(count = unique_paths.len(), "computing file digests");
491    let path_to_digest = hash_files(unique_paths).await?;
492
493    // Build dedup maps from digests
494    let mut binary_digests: BTreeMap<String, String> = BTreeMap::new(); // digest -> path
495    let mut config_digests: BTreeMap<String, String> = BTreeMap::new(); // digest -> path
496    let mut instance_binary_digest: HashMap<String, String> = HashMap::new(); // instance -> digest
497    let mut instance_config_digest: HashMap<String, String> = HashMap::new(); // instance -> digest
498    for instance in instances {
499        let binary_digest = path_to_digest[&instance.binary].clone();
500        let config_digest = path_to_digest[&instance.config].clone();
501        binary_digests.insert(binary_digest.clone(), instance.binary.clone());
502        config_digests.insert(config_digest.clone(), instance.config.clone());
503        instance_binary_digest.insert(instance.name.clone(), binary_digest);
504        instance_config_digest.insert(instance.name.clone(), config_digest);
505    }
506
507    // Upload unique binaries and configs to S3 (deduplicated by digest)
508    let (binary_digest_to_url, config_digest_to_url): (
509        HashMap<String, String>,
510        HashMap<String, String>,
511    ) = tokio::try_join!(
512        async {
513            Ok::<_, Error>(
514                try_join_all(binary_digests.iter().map(|(digest, path)| {
515                    let client = client.clone();
516                    let bucket = bucket.to_string();
517                    let digest = digest.clone();
518                    let key = super::services::binary_s3_key(tag, &digest);
519                    let path = path.clone();
520                    async move {
521                        let url = cache_and_presign(
522                            &client,
523                            &bucket,
524                            &key,
525                            UploadSource::File(path.as_ref()),
526                            PRESIGN_DURATION,
527                        )
528                        .await?;
529                        Ok::<_, Error>((digest, url))
530                    }
531                }))
532                .await?
533                .into_iter()
534                .collect(),
535            )
536        },
537        async {
538            Ok::<_, Error>(
539                try_join_all(config_digests.iter().map(|(digest, path)| {
540                    let client = client.clone();
541                    let bucket = bucket.to_string();
542                    let digest = digest.clone();
543                    let key = super::services::config_s3_key(tag, &digest);
544                    let path = path.clone();
545                    async move {
546                        let url = cache_and_presign(
547                            &client,
548                            &bucket,
549                            &key,
550                            UploadSource::File(path.as_ref()),
551                            PRESIGN_DURATION,
552                        )
553                        .await?;
554                        Ok::<_, Error>((digest, url))
555                    }
556                }))
557                .await?
558                .into_iter()
559                .collect(),
560            )
561        },
562    )?;
563
564    // Map instance names to URLs via their digests
565    let mut binary_urls: HashMap<String, String> = HashMap::new();
566    let mut config_urls: HashMap<String, String> = HashMap::new();
567    for instance in instances {
568        let binary_digest = &instance_binary_digest[&instance.name];
569        let config_digest = &instance_config_digest[&instance.name];
570        binary_urls.insert(
571            instance.name.clone(),
572            binary_digest_to_url[binary_digest].clone(),
573        );
574        config_urls.insert(
575            instance.name.clone(),
576            config_digest_to_url[config_digest].clone(),
577        );
578    }
579
580    Ok(InstanceFileUrls {
581        binary_urls,
582        config_urls,
583    })
584}