commonware_deployer/aws/
update.rs1use crate::aws::{
4 deployer_directory,
5 ec2::{self, *},
6 s3::{self, *},
7 utils::*,
8 Config, Error, InstanceConfig, CREATED_FILE_NAME, DESTROYED_FILE_NAME, MONITORING_NAME,
9 MONITORING_REGION,
10};
11use aws_sdk_ec2::types::Filter;
12use futures::{
13 future::try_join_all,
14 stream::{self, StreamExt, TryStreamExt},
15};
16use std::{collections::HashMap, fs::File, path::PathBuf};
17use tracing::{error, info};
18
19pub async fn update(config_path: &PathBuf, concurrency: usize) -> Result<(), Error> {
21 let config: Config = {
23 let config_file = File::open(config_path)?;
24 serde_yaml::from_reader(config_file)?
25 };
26 let tag = &config.tag;
27 info!(tag = tag.as_str(), "loaded configuration");
28
29 let tag_directory = deployer_directory(Some(tag));
31 let created_file = tag_directory.join(CREATED_FILE_NAME);
32 if !created_file.exists() {
33 return Err(Error::DeploymentNotComplete(tag.clone()));
34 }
35
36 let destroyed_file = tag_directory.join(DESTROYED_FILE_NAME);
38 if destroyed_file.exists() {
39 return Err(Error::DeploymentAlreadyDestroyed(tag.clone()));
40 }
41
42 let private_key_path = tag_directory.join(format!("id_rsa_{tag}"));
44 if !private_key_path.exists() {
45 return Err(Error::PrivateKeyNotFound);
46 }
47
48 let instance_map: HashMap<String, InstanceConfig> = config
50 .instances
51 .iter()
52 .map(|i| (i.name.clone(), i.clone()))
53 .collect();
54
55 let bucket_name = get_bucket_name();
58 let s3_client = s3::create_client(Region::new(MONITORING_REGION)).await;
59
60 info!("uploading unique binaries and configs to S3");
62 let instance_file_urls =
63 s3::upload_instance_files(&s3_client, &bucket_name, tag, &config.instances).await?;
64 let instance_binary_urls = instance_file_urls.binary_urls;
65 let instance_config_urls = instance_file_urls.config_urls;
66 info!("uploaded all updated binaries and configs");
67
68 let mut regions = config
70 .instances
71 .iter()
72 .map(|i| i.region.clone())
73 .collect::<std::collections::HashSet<_>>();
74 regions.insert(MONITORING_REGION.to_string());
75
76 let region_futures = regions.iter().map(|region| {
78 let region = region.clone();
79 let tag = tag.clone();
80 async move {
81 let ec2_client = ec2::create_client(Region::new(region.clone())).await;
82 let resp = ec2_client
83 .describe_instances()
84 .filters(Filter::builder().name("tag:deployer").values(&tag).build())
85 .send()
86 .await
87 .map_err(|err| err.into_service_error())?;
88 let mut instances = Vec::new();
89 for reservation in resp.reservations.unwrap_or_default() {
90 for instance in reservation.instances.unwrap_or_default() {
91 if let Some(tags) = &instance.tags {
92 if let Some(name_tag) =
93 tags.iter().find(|t| t.key.as_deref() == Some("name"))
94 {
95 if name_tag.value.as_deref() != Some(MONITORING_NAME) {
96 if let Some(public_ip) = &instance.public_ip_address {
97 let name = name_tag.value.clone().unwrap();
98 info!(
99 region = region.as_str(),
100 name = name.as_str(),
101 ip = public_ip.as_str(),
102 "found instance"
103 );
104 instances.push((name, public_ip.clone()));
105 }
106 }
107 }
108 }
109 }
110 }
111 Ok::<_, Error>(instances)
112 }
113 });
114 let binary_instances: Vec<(String, String)> = try_join_all(region_futures)
115 .await?
116 .into_iter()
117 .flatten()
118 .collect();
119
120 let private_key = private_key_path.to_str().unwrap();
122 stream::iter(binary_instances.into_iter().filter_map(|(name, ip)| {
123 if instance_map.contains_key(&name) {
124 let binary_url = instance_binary_urls[&name].clone();
125 let config_url = instance_config_urls[&name].clone();
126 Some(async move {
127 update_instance(private_key, &ip, &binary_url, &config_url).await?;
128 info!(name, ip, "updated instance");
129 Ok::<(), Error>(())
130 })
131 } else {
132 error!(name, "instance config not found in config file");
133 None
134 }
135 }))
136 .buffer_unordered(concurrency)
137 .try_collect::<Vec<_>>()
138 .await?;
139 info!("update complete");
140 Ok(())
141}
142
143async fn update_instance(
145 private_key: &str,
146 ip: &str,
147 binary_url: &str,
148 config_url: &str,
149) -> Result<(), Error> {
150 ssh_execute(private_key, ip, "sudo systemctl stop binary").await?;
152
153 poll_service_inactive(private_key, ip, "binary").await?;
155
156 ssh_execute(private_key, ip, "rm -f /home/ubuntu/binary").await?;
158 ssh_execute(private_key, ip, "rm -f /home/ubuntu/config.conf").await?;
159
160 let download_cmd = format!(
162 r#"{WGET} -O /home/ubuntu/binary '{}' &
163{WGET} -O /home/ubuntu/config.conf '{}' &
164wait
165
166# Verify all downloads succeeded
167for f in binary config.conf; do
168 if [ ! -f "/home/ubuntu/$f" ]; then
169 echo "ERROR: Failed to download $f" >&2
170 exit 1
171 fi
172done
173
174# Ensure the binary is executable
175chmod +x /home/ubuntu/binary"#,
176 binary_url, config_url
177 );
178 ssh_execute(private_key, ip, &download_cmd).await?;
179
180 ssh_execute(private_key, ip, "sudo systemctl start binary").await?;
182
183 poll_service_active(private_key, ip, "binary").await?;
185 Ok(())
186}