commonware_deployer/ec2/
update.rs

1//! `update` subcommand for `ec2`
2
3use crate::ec2::{
4    aws::*, deployer_directory, s3::*, services::*, utils::*, Config, Error, InstanceConfig,
5    CREATED_FILE_NAME, DESTROYED_FILE_NAME, MONITORING_NAME, MONITORING_REGION,
6};
7use aws_sdk_ec2::types::Filter;
8use futures::future::try_join_all;
9use std::{
10    collections::{BTreeMap, HashMap},
11    fs::File,
12    path::PathBuf,
13};
14use tracing::{error, info};
15
16/// Updates the binary and configuration on all binary nodes
17pub async fn update(config_path: &PathBuf) -> Result<(), Error> {
18    // Load config
19    let config: Config = {
20        let config_file = File::open(config_path)?;
21        serde_yaml::from_reader(config_file)?
22    };
23    let tag = &config.tag;
24    info!(tag = tag.as_str(), "loaded configuration");
25
26    // Ensure created file exists
27    let tag_directory = deployer_directory(tag);
28    let created_file = tag_directory.join(CREATED_FILE_NAME);
29    if !created_file.exists() {
30        return Err(Error::DeploymentNotComplete(tag.clone()));
31    }
32
33    // Ensure destroyed file does not exist
34    let destroyed_file = tag_directory.join(DESTROYED_FILE_NAME);
35    if destroyed_file.exists() {
36        return Err(Error::DeploymentAlreadyDestroyed(tag.clone()));
37    }
38
39    // Construct private key path (assumes it exists from create command)
40    let private_key_path = tag_directory.join(format!("id_rsa_{tag}"));
41    if !private_key_path.exists() {
42        return Err(Error::PrivateKeyNotFound);
43    }
44
45    // Create a map from instance name to InstanceConfig for lookup
46    let instance_map: HashMap<String, InstanceConfig> = config
47        .instances
48        .iter()
49        .map(|i| (i.name.clone(), i.clone()))
50        .collect();
51
52    // Upload updated binaries and configs to S3 and generate pre-signed URLs
53    // Uses digest-based deduplication to avoid re-uploading identical files
54    let s3_client = create_s3_client(Region::new(MONITORING_REGION)).await;
55
56    // Compute digests and build deduplication maps
57    let mut binary_digests: BTreeMap<String, String> = BTreeMap::new();
58    let mut config_digests: BTreeMap<String, String> = BTreeMap::new();
59    let mut instance_binary_digest: HashMap<String, String> = HashMap::new();
60    let mut instance_config_digest: HashMap<String, String> = HashMap::new();
61    for instance in &config.instances {
62        let binary_digest = hash_file(std::path::Path::new(&instance.binary))?;
63        let config_digest = hash_file(std::path::Path::new(&instance.config))?;
64        binary_digests.insert(binary_digest.clone(), instance.binary.clone());
65        config_digests.insert(config_digest.clone(), instance.config.clone());
66        instance_binary_digest.insert(instance.name.clone(), binary_digest);
67        instance_config_digest.insert(instance.name.clone(), config_digest);
68    }
69
70    // Upload unique binaries and configs (deduplicated by digest)
71    info!("uploading unique binaries and configs to S3");
72    let (binary_digest_to_url, config_digest_to_url): (
73        HashMap<String, String>,
74        HashMap<String, String>,
75    ) = tokio::try_join!(
76        async {
77            Ok::<_, Error>(
78                try_join_all(binary_digests.iter().map(|(digest, path)| {
79                    let s3_client = s3_client.clone();
80                    let digest = digest.clone();
81                    let key = binary_s3_key(tag, &digest);
82                    let path = path.clone();
83                    async move {
84                        let url = cache_file_and_presign(
85                            &s3_client,
86                            S3_BUCKET_NAME,
87                            &key,
88                            path.as_ref(),
89                            PRESIGN_DURATION,
90                        )
91                        .await?;
92                        Ok::<_, Error>((digest, url))
93                    }
94                }))
95                .await?
96                .into_iter()
97                .collect(),
98            )
99        },
100        async {
101            Ok::<_, Error>(
102                try_join_all(config_digests.iter().map(|(digest, path)| {
103                    let s3_client = s3_client.clone();
104                    let digest = digest.clone();
105                    let key = config_s3_key(tag, &digest);
106                    let path = path.clone();
107                    async move {
108                        let url = cache_file_and_presign(
109                            &s3_client,
110                            S3_BUCKET_NAME,
111                            &key,
112                            path.as_ref(),
113                            PRESIGN_DURATION,
114                        )
115                        .await?;
116                        Ok::<_, Error>((digest, url))
117                    }
118                }))
119                .await?
120                .into_iter()
121                .collect(),
122            )
123        },
124    )?;
125
126    // Map instance names to URLs via their digests
127    let mut instance_binary_urls: HashMap<String, String> = HashMap::new();
128    let mut instance_config_urls: HashMap<String, String> = HashMap::new();
129    for instance in &config.instances {
130        let binary_digest = &instance_binary_digest[&instance.name];
131        let config_digest = &instance_config_digest[&instance.name];
132        instance_binary_urls.insert(
133            instance.name.clone(),
134            binary_digest_to_url[binary_digest].clone(),
135        );
136        instance_config_urls.insert(
137            instance.name.clone(),
138            config_digest_to_url[config_digest].clone(),
139        );
140    }
141    info!("uploaded all updated binaries and configs");
142
143    // Determine all regions (binary + monitoring)
144    let mut regions = config
145        .instances
146        .iter()
147        .map(|i| i.region.clone())
148        .collect::<std::collections::HashSet<_>>();
149    regions.insert(MONITORING_REGION.to_string());
150
151    // Collect all binary instances across regions
152    let mut binary_instances = Vec::new();
153    for region in &regions {
154        let ec2_client = create_ec2_client(Region::new(region.clone())).await;
155        let resp = ec2_client
156            .describe_instances()
157            .filters(Filter::builder().name("tag:deployer").values(tag).build())
158            .send()
159            .await
160            .map_err(|err| err.into_service_error())?;
161        for reservation in resp.reservations.unwrap_or_default() {
162            for instance in reservation.instances.unwrap_or_default() {
163                if let Some(tags) = &instance.tags {
164                    if let Some(name_tag) = tags.iter().find(|t| t.key.as_deref() == Some("name")) {
165                        if name_tag.value.as_deref() != Some(MONITORING_NAME) {
166                            if let Some(public_ip) = &instance.public_ip_address {
167                                binary_instances
168                                    .push((name_tag.value.clone().unwrap(), public_ip.clone()));
169                                info!(
170                                    region,
171                                    name = name_tag.value.clone().unwrap(),
172                                    ip = public_ip,
173                                    "found instance"
174                                );
175                            }
176                        }
177                    }
178                }
179            }
180        }
181    }
182
183    // Update each binary instance concurrently
184    let private_key = private_key_path.to_str().unwrap();
185    try_join_all(binary_instances.into_iter().filter_map(|(name, ip)| {
186        if instance_map.contains_key(&name) {
187            let binary_url = instance_binary_urls[&name].clone();
188            let config_url = instance_config_urls[&name].clone();
189            Some(async move {
190                update_instance(private_key, &ip, &binary_url, &config_url).await?;
191                info!(name, ip, "updated instance");
192                Ok::<(), Error>(())
193            })
194        } else {
195            error!(name, "instance config not found in config file");
196            None
197        }
198    }))
199    .await?;
200    info!("update complete");
201    Ok(())
202}
203
204/// Updates a single instance with new binary and config via S3 pre-signed URLs
205async fn update_instance(
206    private_key: &str,
207    ip: &str,
208    binary_url: &str,
209    config_url: &str,
210) -> Result<(), Error> {
211    // Stop the binary service
212    ssh_execute(private_key, ip, "sudo systemctl stop binary").await?;
213
214    // Wait for the service to become inactive
215    poll_service_inactive(private_key, ip, "binary").await?;
216
217    // Remove the existing binary and config (to ensure new copy is used)
218    ssh_execute(private_key, ip, "rm -f /home/ubuntu/binary").await?;
219    ssh_execute(private_key, ip, "rm -f /home/ubuntu/config.conf").await?;
220
221    // Download the latest binary and config from S3 concurrently via pre-signed URLs
222    let download_cmd = format!(
223        r#"wget -q --tries=10 --retry-connrefused --waitretry=5 -O /home/ubuntu/binary '{}' &
224wget -q --tries=10 --retry-connrefused --waitretry=5 -O /home/ubuntu/config.conf '{}' &
225wait
226
227# Verify all downloads succeeded
228for f in binary config.conf; do
229    if [ ! -f "/home/ubuntu/$f" ]; then
230        echo "ERROR: Failed to download $f" >&2
231        exit 1
232    fi
233done
234
235# Ensure the binary is executable
236chmod +x /home/ubuntu/binary"#,
237        binary_url, config_url
238    );
239    ssh_execute(private_key, ip, &download_cmd).await?;
240
241    // Restart the binary service
242    ssh_execute(private_key, ip, "sudo systemctl start binary").await?;
243
244    // Verify the service is active (optional but recommended for reliability)
245    poll_service_active(private_key, ip, "binary").await?;
246    Ok(())
247}