1use crate::ec2::{
4 aws::*, deployer_directory, s3::*, services::*, utils::*, Config, Error, InstanceConfig,
5 CREATED_FILE_NAME, DESTROYED_FILE_NAME, MONITORING_NAME, MONITORING_REGION,
6};
7use aws_sdk_ec2::types::Filter;
8use futures::future::try_join_all;
9use std::{
10 collections::{BTreeMap, HashMap},
11 fs::File,
12 path::PathBuf,
13};
14use tracing::{error, info};
15
16pub async fn update(config_path: &PathBuf) -> Result<(), Error> {
18 let config: Config = {
20 let config_file = File::open(config_path)?;
21 serde_yaml::from_reader(config_file)?
22 };
23 let tag = &config.tag;
24 info!(tag = tag.as_str(), "loaded configuration");
25
26 let tag_directory = deployer_directory(tag);
28 let created_file = tag_directory.join(CREATED_FILE_NAME);
29 if !created_file.exists() {
30 return Err(Error::DeploymentNotComplete(tag.clone()));
31 }
32
33 let destroyed_file = tag_directory.join(DESTROYED_FILE_NAME);
35 if destroyed_file.exists() {
36 return Err(Error::DeploymentAlreadyDestroyed(tag.clone()));
37 }
38
39 let private_key_path = tag_directory.join(format!("id_rsa_{tag}"));
41 if !private_key_path.exists() {
42 return Err(Error::PrivateKeyNotFound);
43 }
44
45 let instance_map: HashMap<String, InstanceConfig> = config
47 .instances
48 .iter()
49 .map(|i| (i.name.clone(), i.clone()))
50 .collect();
51
52 let s3_client = create_s3_client(Region::new(MONITORING_REGION)).await;
55
56 let mut binary_digests: BTreeMap<String, String> = BTreeMap::new();
58 let mut config_digests: BTreeMap<String, String> = BTreeMap::new();
59 let mut instance_binary_digest: HashMap<String, String> = HashMap::new();
60 let mut instance_config_digest: HashMap<String, String> = HashMap::new();
61 for instance in &config.instances {
62 let binary_digest = hash_file(std::path::Path::new(&instance.binary))?;
63 let config_digest = hash_file(std::path::Path::new(&instance.config))?;
64 binary_digests.insert(binary_digest.clone(), instance.binary.clone());
65 config_digests.insert(config_digest.clone(), instance.config.clone());
66 instance_binary_digest.insert(instance.name.clone(), binary_digest);
67 instance_config_digest.insert(instance.name.clone(), config_digest);
68 }
69
70 info!("uploading unique binaries and configs to S3");
72 let (binary_digest_to_url, config_digest_to_url): (
73 HashMap<String, String>,
74 HashMap<String, String>,
75 ) = tokio::try_join!(
76 async {
77 Ok::<_, Error>(
78 try_join_all(binary_digests.iter().map(|(digest, path)| {
79 let s3_client = s3_client.clone();
80 let digest = digest.clone();
81 let key = binary_s3_key(tag, &digest);
82 let path = path.clone();
83 async move {
84 let url = cache_file_and_presign(
85 &s3_client,
86 S3_BUCKET_NAME,
87 &key,
88 path.as_ref(),
89 PRESIGN_DURATION,
90 )
91 .await?;
92 Ok::<_, Error>((digest, url))
93 }
94 }))
95 .await?
96 .into_iter()
97 .collect(),
98 )
99 },
100 async {
101 Ok::<_, Error>(
102 try_join_all(config_digests.iter().map(|(digest, path)| {
103 let s3_client = s3_client.clone();
104 let digest = digest.clone();
105 let key = config_s3_key(tag, &digest);
106 let path = path.clone();
107 async move {
108 let url = cache_file_and_presign(
109 &s3_client,
110 S3_BUCKET_NAME,
111 &key,
112 path.as_ref(),
113 PRESIGN_DURATION,
114 )
115 .await?;
116 Ok::<_, Error>((digest, url))
117 }
118 }))
119 .await?
120 .into_iter()
121 .collect(),
122 )
123 },
124 )?;
125
126 let mut instance_binary_urls: HashMap<String, String> = HashMap::new();
128 let mut instance_config_urls: HashMap<String, String> = HashMap::new();
129 for instance in &config.instances {
130 let binary_digest = &instance_binary_digest[&instance.name];
131 let config_digest = &instance_config_digest[&instance.name];
132 instance_binary_urls.insert(
133 instance.name.clone(),
134 binary_digest_to_url[binary_digest].clone(),
135 );
136 instance_config_urls.insert(
137 instance.name.clone(),
138 config_digest_to_url[config_digest].clone(),
139 );
140 }
141 info!("uploaded all updated binaries and configs");
142
143 let mut regions = config
145 .instances
146 .iter()
147 .map(|i| i.region.clone())
148 .collect::<std::collections::HashSet<_>>();
149 regions.insert(MONITORING_REGION.to_string());
150
151 let mut binary_instances = Vec::new();
153 for region in ®ions {
154 let ec2_client = create_ec2_client(Region::new(region.clone())).await;
155 let resp = ec2_client
156 .describe_instances()
157 .filters(Filter::builder().name("tag:deployer").values(tag).build())
158 .send()
159 .await
160 .map_err(|err| err.into_service_error())?;
161 for reservation in resp.reservations.unwrap_or_default() {
162 for instance in reservation.instances.unwrap_or_default() {
163 if let Some(tags) = &instance.tags {
164 if let Some(name_tag) = tags.iter().find(|t| t.key.as_deref() == Some("name")) {
165 if name_tag.value.as_deref() != Some(MONITORING_NAME) {
166 if let Some(public_ip) = &instance.public_ip_address {
167 binary_instances
168 .push((name_tag.value.clone().unwrap(), public_ip.clone()));
169 info!(
170 region,
171 name = name_tag.value.clone().unwrap(),
172 ip = public_ip,
173 "found instance"
174 );
175 }
176 }
177 }
178 }
179 }
180 }
181 }
182
183 let private_key = private_key_path.to_str().unwrap();
185 try_join_all(binary_instances.into_iter().filter_map(|(name, ip)| {
186 if instance_map.contains_key(&name) {
187 let binary_url = instance_binary_urls[&name].clone();
188 let config_url = instance_config_urls[&name].clone();
189 Some(async move {
190 update_instance(private_key, &ip, &binary_url, &config_url).await?;
191 info!(name, ip, "updated instance");
192 Ok::<(), Error>(())
193 })
194 } else {
195 error!(name, "instance config not found in config file");
196 None
197 }
198 }))
199 .await?;
200 info!("update complete");
201 Ok(())
202}
203
204async fn update_instance(
206 private_key: &str,
207 ip: &str,
208 binary_url: &str,
209 config_url: &str,
210) -> Result<(), Error> {
211 ssh_execute(private_key, ip, "sudo systemctl stop binary").await?;
213
214 poll_service_inactive(private_key, ip, "binary").await?;
216
217 ssh_execute(private_key, ip, "rm -f /home/ubuntu/binary").await?;
219 ssh_execute(private_key, ip, "rm -f /home/ubuntu/config.conf").await?;
220
221 let download_cmd = format!(
223 r#"wget -q --tries=10 --retry-connrefused --waitretry=5 -O /home/ubuntu/binary '{}' &
224wget -q --tries=10 --retry-connrefused --waitretry=5 -O /home/ubuntu/config.conf '{}' &
225wait
226
227# Verify all downloads succeeded
228for f in binary config.conf; do
229 if [ ! -f "/home/ubuntu/$f" ]; then
230 echo "ERROR: Failed to download $f" >&2
231 exit 1
232 fi
233done
234
235# Ensure the binary is executable
236chmod +x /home/ubuntu/binary"#,
237 binary_url, config_url
238 );
239 ssh_execute(private_key, ip, &download_cmd).await?;
240
241 ssh_execute(private_key, ip, "sudo systemctl start binary").await?;
243
244 poll_service_active(private_key, ip, "binary").await?;
246 Ok(())
247}