Skip to main content

commonware_deployer/aws/
update.rs

1//! `update` subcommand for `ec2`
2
3use crate::aws::{
4    deployer_directory,
5    ec2::{self, *},
6    s3::{self, *},
7    utils::*,
8    Config, Error, InstanceConfig, CREATED_FILE_NAME, DESTROYED_FILE_NAME, MONITORING_NAME,
9    MONITORING_REGION,
10};
11use aws_sdk_ec2::types::Filter;
12use futures::{
13    future::try_join_all,
14    stream::{self, StreamExt, TryStreamExt},
15};
16use std::{collections::HashMap, fs::File, path::PathBuf};
17use tracing::{error, info};
18
19/// Updates the binary and configuration on all binary nodes
20pub async fn update(config_path: &PathBuf, concurrency: usize) -> Result<(), Error> {
21    // Load config
22    let config: Config = {
23        let config_file = File::open(config_path)?;
24        serde_yaml::from_reader(config_file)?
25    };
26    let tag = &config.tag;
27    info!(tag = tag.as_str(), "loaded configuration");
28
29    // Ensure created file exists
30    let tag_directory = deployer_directory(Some(tag));
31    let created_file = tag_directory.join(CREATED_FILE_NAME);
32    if !created_file.exists() {
33        return Err(Error::DeploymentNotComplete(tag.clone()));
34    }
35
36    // Ensure destroyed file does not exist
37    let destroyed_file = tag_directory.join(DESTROYED_FILE_NAME);
38    if destroyed_file.exists() {
39        return Err(Error::DeploymentAlreadyDestroyed(tag.clone()));
40    }
41
42    // Construct private key path (assumes it exists from create command)
43    let private_key_path = tag_directory.join(format!("id_rsa_{tag}"));
44    if !private_key_path.exists() {
45        return Err(Error::PrivateKeyNotFound);
46    }
47
48    // Create a map from instance name to InstanceConfig for lookup
49    let instance_map: HashMap<String, InstanceConfig> = config
50        .instances
51        .iter()
52        .map(|i| (i.name.clone(), i.clone()))
53        .collect();
54
55    // Upload updated binaries and configs to S3 and generate pre-signed URLs
56    // Uses digest-based deduplication to avoid re-uploading identical files
57    let bucket_name = get_bucket_name();
58    let s3_client = s3::create_client(Region::new(MONITORING_REGION)).await;
59
60    // Upload unique binaries and configs (deduplicated by digest)
61    info!("uploading unique binaries and configs to S3");
62    let instance_file_urls =
63        s3::upload_instance_files(&s3_client, &bucket_name, tag, &config.instances).await?;
64    let instance_binary_urls = instance_file_urls.binary_urls;
65    let instance_config_urls = instance_file_urls.config_urls;
66    info!("uploaded all updated binaries and configs");
67
68    // Determine all regions (binary + monitoring)
69    let mut regions = config
70        .instances
71        .iter()
72        .map(|i| i.region.clone())
73        .collect::<std::collections::HashSet<_>>();
74    regions.insert(MONITORING_REGION.to_string());
75
76    // Collect all binary instances across regions (in parallel)
77    let region_futures = regions.iter().map(|region| {
78        let region = region.clone();
79        let tag = tag.clone();
80        async move {
81            let ec2_client = ec2::create_client(Region::new(region.clone())).await;
82            let resp = ec2_client
83                .describe_instances()
84                .filters(Filter::builder().name("tag:deployer").values(&tag).build())
85                .send()
86                .await
87                .map_err(|err| err.into_service_error())?;
88            let mut instances = Vec::new();
89            for reservation in resp.reservations.unwrap_or_default() {
90                for instance in reservation.instances.unwrap_or_default() {
91                    if let Some(tags) = &instance.tags {
92                        if let Some(name_tag) =
93                            tags.iter().find(|t| t.key.as_deref() == Some("name"))
94                        {
95                            if name_tag.value.as_deref() != Some(MONITORING_NAME) {
96                                if let Some(public_ip) = &instance.public_ip_address {
97                                    let name = name_tag.value.clone().unwrap();
98                                    info!(
99                                        region = region.as_str(),
100                                        name = name.as_str(),
101                                        ip = public_ip.as_str(),
102                                        "found instance"
103                                    );
104                                    instances.push((name, public_ip.clone()));
105                                }
106                            }
107                        }
108                    }
109                }
110            }
111            Ok::<_, Error>(instances)
112        }
113    });
114    let binary_instances: Vec<(String, String)> = try_join_all(region_futures)
115        .await?
116        .into_iter()
117        .flatten()
118        .collect();
119
120    // Update each binary instance with limited concurrency to avoid SSH overload
121    let private_key = private_key_path.to_str().unwrap();
122    stream::iter(binary_instances.into_iter().filter_map(|(name, ip)| {
123        if instance_map.contains_key(&name) {
124            let binary_url = instance_binary_urls[&name].clone();
125            let config_url = instance_config_urls[&name].clone();
126            Some(async move {
127                update_instance(private_key, &ip, &binary_url, &config_url).await?;
128                info!(name, ip, "updated instance");
129                Ok::<(), Error>(())
130            })
131        } else {
132            error!(name, "instance config not found in config file");
133            None
134        }
135    }))
136    .buffer_unordered(concurrency)
137    .try_collect::<Vec<_>>()
138    .await?;
139    info!("update complete");
140    Ok(())
141}
142
143/// Updates a single instance with new binary and config via S3 pre-signed URLs
144async fn update_instance(
145    private_key: &str,
146    ip: &str,
147    binary_url: &str,
148    config_url: &str,
149) -> Result<(), Error> {
150    // Stop the binary service
151    ssh_execute(private_key, ip, "sudo systemctl stop binary").await?;
152
153    // Wait for the service to become inactive
154    poll_service_inactive(private_key, ip, "binary").await?;
155
156    // Remove the existing binary and config (to ensure new copy is used)
157    ssh_execute(private_key, ip, "rm -f /home/ubuntu/binary").await?;
158    ssh_execute(private_key, ip, "rm -f /home/ubuntu/config.conf").await?;
159
160    // Download the latest binary and config from S3 concurrently via pre-signed URLs
161    let download_cmd = format!(
162        r#"{WGET} -O /home/ubuntu/binary '{}' &
163{WGET} -O /home/ubuntu/config.conf '{}' &
164wait
165
166# Verify all downloads succeeded
167for f in binary config.conf; do
168    if [ ! -f "/home/ubuntu/$f" ]; then
169        echo "ERROR: Failed to download $f" >&2
170        exit 1
171    fi
172done
173
174# Ensure the binary is executable
175chmod +x /home/ubuntu/binary"#,
176        binary_url, config_url
177    );
178    ssh_execute(private_key, ip, &download_cmd).await?;
179
180    // Restart the binary service
181    ssh_execute(private_key, ip, "sudo systemctl start binary").await?;
182
183    // Verify the service is active (optional but recommended for reliability)
184    poll_service_active(private_key, ip, "binary").await?;
185    Ok(())
186}