1use crate::aws::{
4 deployer_directory,
5 ec2::{self, *},
6 s3::{self, *},
7 services::*,
8 utils::*,
9 Architecture, Config, Error, Host, Hosts, InstanceConfig, Metadata, CREATED_FILE_NAME,
10 LOGS_PORT, METADATA_FILE_NAME, MONITORING_NAME, MONITORING_REGION, PROFILES_PORT, TRACES_PORT,
11};
12use commonware_cryptography::{Hasher as _, Sha256};
13use futures::{
14 future::try_join_all,
15 stream::{self, StreamExt, TryStreamExt},
16};
17use std::{
18 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
19 fs::File,
20 net::IpAddr,
21 path::PathBuf,
22 slice,
23 time::Instant,
24};
25use tokio::process::Command;
26use tracing::info;
27
28const MAX_DESCRIBE_BATCH: usize = 1000;
30
31struct ToolUrls {
33 prometheus: String,
34 grafana: String,
35 loki: String,
36 pyroscope: String,
37 tempo: String,
38 node_exporter: String,
39 promtail: String,
40 libjemalloc: String,
41 logrotate: String,
42 fonts_dejavu_mono: String,
43 fonts_dejavu_core: String,
44 fontconfig_config: String,
45 libfontconfig: String,
46 unzip: String,
47 adduser: String,
48 musl: String,
49}
50
51#[derive(Clone)]
53pub struct Deployment {
54 pub instance: InstanceConfig,
55 pub id: String,
56 pub ip: String,
57}
58
59pub struct RegionResources {
61 pub vpc_id: String,
62 pub vpc_cidr: String,
63 pub route_table_id: String,
64 pub subnets: Vec<(String, String)>,
65 pub az_support: BTreeMap<String, BTreeSet<String>>,
66 pub binary_sg_id: Option<String>,
67 pub monitoring_sg_id: Option<String>,
68}
69
70pub async fn create(config: &PathBuf, concurrency: usize) -> Result<(), Error> {
72 let config: Config = {
74 let config_file = File::open(config)?;
75 serde_yaml::from_reader(config_file)?
76 };
77 let tag = &config.tag;
78 info!(tag = tag.as_str(), "loaded configuration");
79
80 let mut instance_names = HashSet::new();
82 for instance in &config.instances {
83 if !instance_names.insert(&instance.name) {
84 return Err(Error::DuplicateInstanceName(instance.name.clone()));
85 }
86 if instance.name == MONITORING_NAME {
87 return Err(Error::InvalidInstanceName(instance.name.clone()));
88 }
89 }
90
91 let mut regions: BTreeSet<String> = config.instances.iter().map(|i| i.region.clone()).collect();
93 regions.insert(MONITORING_REGION.to_string());
94
95 let ec2_client = ec2::create_client(Region::new(MONITORING_REGION)).await;
97 let enabled_regions = ec2::get_enabled_regions(&ec2_client).await?;
98 let disabled: Vec<_> = regions
99 .iter()
100 .filter(|r| !enabled_regions.contains(*r))
101 .cloned()
102 .collect();
103 if !disabled.is_empty() {
104 return Err(Error::RegionsNotEnabled(disabled));
105 }
106 info!(?regions, "validated all regions are enabled");
107
108 let tag_directory = deployer_directory(Some(tag));
110 if tag_directory.exists() {
111 return Err(Error::CreationAttempted);
112 }
113 std::fs::create_dir_all(&tag_directory)?;
114 info!(path = ?tag_directory, "created tag directory");
115
116 let deployer_ip = get_public_ip().await?;
118 info!(ip = deployer_ip.as_str(), "recovered public IP");
119
120 let key_name = format!("deployer-{tag}");
122 let private_key_path = tag_directory.join(format!("id_rsa_{tag}"));
123 let public_key_path = tag_directory.join(format!("id_rsa_{tag}.pub"));
124 let output = Command::new("ssh-keygen")
125 .arg("-t")
126 .arg("rsa")
127 .arg("-b")
128 .arg("4096")
129 .arg("-f")
130 .arg(private_key_path.to_str().unwrap())
131 .arg("-N")
132 .arg("")
133 .output()
134 .await?;
135 if !output.status.success() {
136 return Err(Error::KeygenFailed);
137 }
138 let public_key = std::fs::read_to_string(&public_key_path)?;
139 let private_key = private_key_path.to_str().unwrap();
140
141 let metadata = Metadata {
143 tag: tag.clone(),
144 created_at: std::time::SystemTime::now()
145 .duration_since(std::time::UNIX_EPOCH)
146 .unwrap()
147 .as_secs(),
148 regions: regions.iter().cloned().collect(),
149 instance_count: config.instances.len(),
150 };
151 let metadata_file = File::create(tag_directory.join(METADATA_FILE_NAME))?;
152 serde_yaml::to_writer(metadata_file, &metadata)?;
153 info!("persisted deployment metadata");
154
155 let mut instance_types_by_region: HashMap<String, HashSet<String>> = HashMap::new();
157 let mut unique_instance_types: HashSet<String> = HashSet::new();
158 instance_types_by_region
159 .entry(MONITORING_REGION.to_string())
160 .or_default()
161 .insert(config.monitoring.instance_type.clone());
162 unique_instance_types.insert(config.monitoring.instance_type.clone());
163 for instance in &config.instances {
164 instance_types_by_region
165 .entry(instance.region.clone())
166 .or_default()
167 .insert(instance.instance_type.clone());
168 unique_instance_types.insert(instance.instance_type.clone());
169 }
170
171 info!("detecting architectures for instance types");
173 let ec2_client = ec2::create_client(Region::new(MONITORING_REGION)).await;
174 let mut arch_by_instance_type: HashMap<String, Architecture> = HashMap::new();
175 for instance_type in &unique_instance_types {
176 let arch = detect_architecture(&ec2_client, instance_type).await?;
177 info!(
178 architecture = %arch,
179 instance_type = instance_type.as_str(),
180 "detected architecture"
181 );
182 arch_by_instance_type.insert(instance_type.clone(), arch);
183 }
184
185 let monitoring_architecture = arch_by_instance_type[&config.monitoring.instance_type];
187 let mut instance_architectures: HashMap<String, Architecture> = HashMap::new();
188 let mut architectures_needed: HashSet<Architecture> = HashSet::new();
189 architectures_needed.insert(monitoring_architecture);
190 for instance in &config.instances {
191 let arch = arch_by_instance_type[&instance.instance_type];
192 instance_architectures.insert(instance.name.clone(), arch);
193 architectures_needed.insert(arch);
194 }
195
196 let bucket_name = get_bucket_name();
198 info!(bucket = bucket_name.as_str(), "setting up S3 bucket");
199 let s3_client = s3::create_client(Region::new(MONITORING_REGION)).await;
200 ensure_bucket_exists(&s3_client, &bucket_name, MONITORING_REGION).await?;
201
202 info!("uploading observability tools to S3");
204 let cache_tool = |s3_key: String, download_url: String| {
205 let tag_directory = tag_directory.clone();
206 let s3_client = s3_client.clone();
207 let bucket_name = bucket_name.clone();
208 async move {
209 if object_exists(&s3_client, &bucket_name, &s3_key).await? {
210 info!(key = s3_key.as_str(), "tool already in S3");
211 return presign_url(&s3_client, &bucket_name, &s3_key, PRESIGN_DURATION).await;
212 }
213 info!(
214 key = s3_key.as_str(),
215 "tool not in S3, downloading and uploading"
216 );
217 let temp_path = tag_directory.join(s3_key.replace('/', "_"));
218 download_file(&download_url, &temp_path).await?;
219 let url = cache_and_presign(
220 &s3_client,
221 &bucket_name,
222 &s3_key,
223 UploadSource::File(&temp_path),
224 PRESIGN_DURATION,
225 )
226 .await?;
227 std::fs::remove_file(&temp_path)?;
228 Ok::<_, Error>(url)
229 }
230 };
231
232 let adduser_url = cache_tool(
234 adduser_bin_s3_key(ADDUSER_VERSION),
235 adduser_download_url(ADDUSER_VERSION),
236 )
237 .await?;
238 let fonts_dejavu_mono_url = cache_tool(
239 fonts_dejavu_mono_bin_s3_key(FONTS_DEJAVU_MONO_VERSION),
240 fonts_dejavu_mono_download_url(FONTS_DEJAVU_MONO_VERSION),
241 )
242 .await?;
243 let fonts_dejavu_core_url = cache_tool(
244 fonts_dejavu_core_bin_s3_key(FONTS_DEJAVU_CORE_VERSION),
245 fonts_dejavu_core_download_url(FONTS_DEJAVU_CORE_VERSION),
246 )
247 .await?;
248 let mut tool_urls_by_arch: HashMap<Architecture, ToolUrls> = HashMap::new();
250 for arch in &architectures_needed {
251 let [prometheus_url, grafana_url, loki_url, pyroscope_url, tempo_url, node_exporter_url, promtail_url,
252 libjemalloc_url, logrotate_url, fontconfig_config_url, libfontconfig_url, unzip_url, musl_url]: [String; 13] =
253 try_join_all([
254 cache_tool(prometheus_bin_s3_key(PROMETHEUS_VERSION, *arch), prometheus_download_url(PROMETHEUS_VERSION, *arch)),
255 cache_tool(grafana_bin_s3_key(GRAFANA_VERSION, *arch), grafana_download_url(GRAFANA_VERSION, *arch)),
256 cache_tool(loki_bin_s3_key(LOKI_VERSION, *arch), loki_download_url(LOKI_VERSION, *arch)),
257 cache_tool(pyroscope_bin_s3_key(PYROSCOPE_VERSION, *arch), pyroscope_download_url(PYROSCOPE_VERSION, *arch)),
258 cache_tool(tempo_bin_s3_key(TEMPO_VERSION, *arch), tempo_download_url(TEMPO_VERSION, *arch)),
259 cache_tool(node_exporter_bin_s3_key(NODE_EXPORTER_VERSION, *arch), node_exporter_download_url(NODE_EXPORTER_VERSION, *arch)),
260 cache_tool(promtail_bin_s3_key(PROMTAIL_VERSION, *arch), promtail_download_url(PROMTAIL_VERSION, *arch)),
261 cache_tool(libjemalloc_bin_s3_key(LIBJEMALLOC2_VERSION, *arch), libjemalloc_download_url(LIBJEMALLOC2_VERSION, *arch)),
262 cache_tool(logrotate_bin_s3_key(LOGROTATE_VERSION, *arch), logrotate_download_url(LOGROTATE_VERSION, *arch)),
263 cache_tool(fontconfig_config_bin_s3_key(FONTCONFIG_CONFIG_VERSION, *arch), fontconfig_config_download_url(FONTCONFIG_CONFIG_VERSION, *arch)),
264 cache_tool(libfontconfig_bin_s3_key(LIBFONTCONFIG1_VERSION, *arch), libfontconfig_download_url(LIBFONTCONFIG1_VERSION, *arch)),
265 cache_tool(unzip_bin_s3_key(UNZIP_VERSION, *arch), unzip_download_url(UNZIP_VERSION, *arch)),
266 cache_tool(musl_bin_s3_key(MUSL_VERSION, *arch), musl_download_url(MUSL_VERSION, *arch)),
267 ])
268 .await?
269 .try_into()
270 .unwrap();
271 tool_urls_by_arch.insert(
272 *arch,
273 ToolUrls {
274 prometheus: prometheus_url,
275 grafana: grafana_url,
276 loki: loki_url,
277 pyroscope: pyroscope_url,
278 tempo: tempo_url,
279 node_exporter: node_exporter_url,
280 promtail: promtail_url,
281 libjemalloc: libjemalloc_url,
282 logrotate: logrotate_url,
283 fonts_dejavu_mono: fonts_dejavu_mono_url.clone(),
284 fonts_dejavu_core: fonts_dejavu_core_url.clone(),
285 fontconfig_config: fontconfig_config_url,
286 libfontconfig: libfontconfig_url,
287 unzip: unzip_url,
288 adduser: adduser_url.clone(),
289 musl: musl_url,
290 },
291 );
292 }
293 info!("observability tools uploaded");
294
295 info!("uploading unique binaries and configs to S3");
297 let instance_file_urls =
298 s3::upload_instance_files(&s3_client, &bucket_name, tag, &config.instances).await?;
299 let instance_binary_urls = instance_file_urls.binary_urls;
300 let instance_config_urls = instance_file_urls.config_urls;
301 info!("uploaded all instance binaries and configs");
302
303 info!(?regions, "initializing resources");
305 let region_init_futures: Vec<_> = regions
306 .iter()
307 .enumerate()
308 .map(|(idx, region)| {
309 let region = region.clone();
310 let tag = tag.clone();
311 let deployer_ip = deployer_ip.clone();
312 let key_name = key_name.clone();
313 let public_key = public_key.clone();
314 let instance_types: Vec<String> =
315 instance_types_by_region[®ion].iter().cloned().collect();
316
317 async move {
318 let ec2_client = ec2::create_client(Region::new(region.clone())).await;
320 info!(region = region.as_str(), "created EC2 client");
321
322 let az_support = find_az_instance_support(&ec2_client, &instance_types).await?;
324 let mut azs: Vec<String> = az_support.keys().cloned().collect();
325 azs.sort();
326 info!(?azs, region = region.as_str(), "found availability zones");
327
328 let vpc_cidr = format!("10.{idx}.0.0/16");
330 let vpc_id = create_vpc(&ec2_client, &vpc_cidr, &tag).await?;
331 info!(
332 vpc = vpc_id.as_str(),
333 region = region.as_str(),
334 "created VPC"
335 );
336 let igw_id = create_and_attach_igw(&ec2_client, &vpc_id, &tag).await?;
337 info!(
338 igw = igw_id.as_str(),
339 vpc = vpc_id.as_str(),
340 region = region.as_str(),
341 "created and attached IGW"
342 );
343 let route_table_id =
344 create_route_table(&ec2_client, &vpc_id, &igw_id, &tag).await?;
345 info!(
346 route_table = route_table_id.as_str(),
347 vpc = vpc_id.as_str(),
348 region = region.as_str(),
349 "created route table"
350 );
351
352 let subnet_futures: Vec<_> = azs
354 .iter()
355 .enumerate()
356 .map(|(az_idx, az)| {
357 let ec2_client = ec2_client.clone();
358 let vpc_id = vpc_id.clone();
359 let route_table_id = route_table_id.clone();
360 let tag = tag.clone();
361 let az = az.clone();
362 let region = region.clone();
363 async move {
364 let subnet_cidr = format!("10.{idx}.{az_idx}.0/24");
365 let subnet_id = create_subnet(
366 &ec2_client,
367 &vpc_id,
368 &route_table_id,
369 &subnet_cidr,
370 &az,
371 &tag,
372 )
373 .await?;
374 info!(
375 subnet = subnet_id.as_str(),
376 az = az.as_str(),
377 region = region.as_str(),
378 "created subnet"
379 );
380 Ok::<(String, String), Error>((az, subnet_id))
381 }
382 })
383 .collect();
384 let subnets = try_join_all(subnet_futures).await?;
385
386 let monitoring_sg_id = if region == MONITORING_REGION {
388 let sg_id =
389 create_security_group_monitoring(&ec2_client, &vpc_id, &deployer_ip, &tag)
390 .await?;
391 info!(
392 sg = sg_id.as_str(),
393 vpc = vpc_id.as_str(),
394 region = region.as_str(),
395 "created monitoring security group"
396 );
397 Some(sg_id)
398 } else {
399 None
400 };
401
402 import_key_pair(&ec2_client, &key_name, &public_key).await?;
404 info!(
405 key = key_name.as_str(),
406 region = region.as_str(),
407 "imported key pair"
408 );
409
410 info!(
411 vpc = vpc_id.as_str(),
412 subnet_count = subnets.len(),
413 region = region.as_str(),
414 "initialized resources"
415 );
416
417 Ok::<_, Error>((
418 region,
419 ec2_client,
420 RegionResources {
421 vpc_id,
422 vpc_cidr,
423 route_table_id,
424 subnets,
425 az_support,
426 binary_sg_id: None,
427 monitoring_sg_id,
428 },
429 ))
430 }
431 })
432 .collect();
433
434 let region_results = try_join_all(region_init_futures).await?;
435 let (ec2_clients, mut region_resources): (HashMap<_, _>, HashMap<_, _>) = region_results
436 .into_iter()
437 .map(|(region, client, resources)| ((region.clone(), client), (region, resources)))
438 .unzip();
439 info!(?regions, "initialized resources");
440
441 info!("creating binary security groups");
443 let binary_sg_futures: Vec<_> = region_resources
444 .iter()
445 .map(|(region, resources)| {
446 let region = region.clone();
447 let ec2_client = ec2_clients[®ion].clone();
448 let vpc_id = resources.vpc_id.clone();
449 let deployer_ip = deployer_ip.clone();
450 let tag = tag.clone();
451 let ports = config.ports.clone();
452 async move {
453 let binary_sg_id =
454 create_security_group_binary(&ec2_client, &vpc_id, &deployer_ip, &tag, &ports)
455 .await?;
456 info!(
457 sg = binary_sg_id.as_str(),
458 vpc = vpc_id.as_str(),
459 region = region.as_str(),
460 "created binary security group"
461 );
462 Ok::<_, Error>((region, binary_sg_id))
463 }
464 })
465 .collect();
466 for (region, binary_sg_id) in try_join_all(binary_sg_futures).await? {
467 region_resources.get_mut(®ion).unwrap().binary_sg_id = Some(binary_sg_id);
468 }
469 info!("created binary security groups");
470
471 info!("initializing VPC peering connections");
473 let monitoring_region = MONITORING_REGION.to_string();
474 let monitoring_resources = region_resources.get(&monitoring_region).unwrap();
475 let monitoring_vpc_id = &monitoring_resources.vpc_id;
476 let monitoring_cidr = &monitoring_resources.vpc_cidr;
477 let monitoring_route_table_id = &monitoring_resources.route_table_id;
478 let binary_regions: HashSet<String> =
479 config.instances.iter().map(|i| i.region.clone()).collect();
480 let peering_futures: Vec<_> = regions
481 .iter()
482 .filter(|region| *region != &monitoring_region && binary_regions.contains(*region))
483 .map(|region| {
484 let region = region.clone();
485 let monitoring_ec2_client = ec2_clients[&monitoring_region].clone();
486 let binary_ec2_client = ec2_clients[®ion].clone();
487 let monitoring_vpc_id = monitoring_vpc_id.clone();
488 let monitoring_cidr = monitoring_cidr.clone();
489 let monitoring_route_table_id = monitoring_route_table_id.clone();
490 let binary_resources = region_resources.get(®ion).unwrap();
491 let binary_vpc_id = binary_resources.vpc_id.clone();
492 let binary_cidr = binary_resources.vpc_cidr.clone();
493 let binary_route_table_id = binary_resources.route_table_id.clone();
494 let tag = tag.clone();
495 async move {
496 let peer_id = create_vpc_peering_connection(
497 &monitoring_ec2_client,
498 &monitoring_vpc_id,
499 &binary_vpc_id,
500 ®ion,
501 &tag,
502 )
503 .await?;
504 info!(
505 peer = peer_id.as_str(),
506 monitoring = monitoring_vpc_id.as_str(),
507 binary = binary_vpc_id.as_str(),
508 region = region.as_str(),
509 "created VPC peering connection"
510 );
511 wait_for_vpc_peering_connection(&binary_ec2_client, &peer_id).await?;
512 info!(
513 peer = peer_id.as_str(),
514 region = region.as_str(),
515 "VPC peering connection is available"
516 );
517 accept_vpc_peering_connection(&binary_ec2_client, &peer_id).await?;
518 info!(
519 peer = peer_id.as_str(),
520 region = region.as_str(),
521 "accepted VPC peering connection"
522 );
523 add_route(
524 &monitoring_ec2_client,
525 &monitoring_route_table_id,
526 &binary_cidr,
527 &peer_id,
528 )
529 .await?;
530 add_route(
531 &binary_ec2_client,
532 &binary_route_table_id,
533 &monitoring_cidr,
534 &peer_id,
535 )
536 .await?;
537 info!(
538 peer = peer_id.as_str(),
539 monitoring = monitoring_vpc_id.as_str(),
540 binary = binary_vpc_id.as_str(),
541 region = region.as_str(),
542 "added routes for VPC peering connection"
543 );
544 Ok::<_, Error>(())
545 }
546 })
547 .collect();
548 try_join_all(peering_futures).await?;
549 info!("initialized VPC peering connections");
550
551 info!("launching instances");
553 let monitoring_ec2_client = &ec2_clients[&monitoring_region];
554 let monitoring_ami_id = find_latest_ami(monitoring_ec2_client, monitoring_architecture).await?;
555 let monitoring_instance_type =
556 InstanceType::try_parse(&config.monitoring.instance_type).expect("Invalid instance type");
557 let monitoring_storage_class =
558 VolumeType::try_parse(&config.monitoring.storage_class).expect("Invalid storage class");
559 let monitoring_sg_id = monitoring_resources
560 .monitoring_sg_id
561 .as_ref()
562 .unwrap()
563 .clone();
564 let monitoring_subnets = monitoring_resources.subnets.clone();
565 let monitoring_az_support = monitoring_resources.az_support.clone();
566
567 let mut ami_cache: HashMap<(String, Architecture), String> = HashMap::new();
569 ami_cache.insert(
570 (monitoring_region.clone(), monitoring_architecture),
571 monitoring_ami_id.clone(),
572 );
573 info!(
574 region = monitoring_region.as_str(),
575 architecture = %monitoring_architecture,
576 ami_id = monitoring_ami_id.as_str(),
577 "selected AMI"
578 );
579 let mut binary_launch_configs = Vec::new();
580 for instance in &config.instances {
581 let region = instance.region.clone();
582 let resources = region_resources.get(®ion).unwrap();
583 let ec2_client = ec2_clients.get(®ion).unwrap();
584 let arch = instance_architectures[&instance.name];
585 let ami_id = match ami_cache.get(&(region.clone(), arch)) {
586 Some(id) => id.clone(),
587 None => {
588 let id = find_latest_ami(ec2_client, arch).await?;
589 ami_cache.insert((region.clone(), arch), id.clone());
590 info!(
591 region = region.as_str(),
592 architecture = %arch,
593 ami_id = id.as_str(),
594 "selected AMI"
595 );
596 id
597 }
598 };
599 binary_launch_configs.push((instance, ec2_client, resources, ami_id, arch));
600 }
601
602 let monitoring_launch_future = {
604 let key_name = key_name.clone();
605 let tag = tag.clone();
606 let sg_id = monitoring_sg_id.clone();
607 async move {
608 let (mut ids, az) = launch_instances(
609 monitoring_ec2_client,
610 &monitoring_ami_id,
611 monitoring_instance_type,
612 config.monitoring.storage_size,
613 monitoring_storage_class,
614 &key_name,
615 &monitoring_subnets,
616 &monitoring_az_support,
617 0,
618 &sg_id,
619 1,
620 MONITORING_NAME,
621 &tag,
622 )
623 .await?;
624 let instance_id = ids.remove(0);
625 info!(
626 instance_id = instance_id.as_str(),
627 az = az.as_str(),
628 "launched monitoring instance"
629 );
630 Ok::<String, Error>(instance_id)
631 }
632 };
633
634 let binary_launch_futures = binary_launch_configs.iter().enumerate().map(
636 |(idx, (instance, ec2_client, resources, ami_id, _arch))| {
637 let key_name = key_name.clone();
638 let instance_type =
639 InstanceType::try_parse(&instance.instance_type).expect("Invalid instance type");
640 let storage_class =
641 VolumeType::try_parse(&instance.storage_class).expect("Invalid storage class");
642 let binary_sg_id = resources.binary_sg_id.as_ref().unwrap();
643 let tag = tag.clone();
644 let instance_name = instance.name.clone();
645 let region = instance.region.clone();
646 let subnets = resources.subnets.clone();
647 let az_support = resources.az_support.clone();
648 async move {
649 let (mut ids, az) = launch_instances(
650 ec2_client,
651 ami_id,
652 instance_type,
653 instance.storage_size,
654 storage_class,
655 &key_name,
656 &subnets,
657 &az_support,
658 idx,
659 binary_sg_id,
660 1,
661 &instance.name,
662 &tag,
663 )
664 .await?;
665 let instance_id = ids.remove(0);
666 info!(
667 instance_id = instance_id.as_str(),
668 instance = instance_name.as_str(),
669 az = az.as_str(),
670 "launched instance"
671 );
672 Ok::<(String, String, InstanceConfig), Error>((
673 instance_id,
674 region,
675 (*instance).clone(),
676 ))
677 }
678 },
679 );
680
681 let (monitoring_instance_id, binary_launches) = tokio::try_join!(
683 monitoring_launch_future,
684 try_join_all(binary_launch_futures)
685 )?;
686 info!("instances requested");
687
688 let mut instances_by_region: HashMap<String, Vec<(String, InstanceConfig)>> = HashMap::new();
690 for (instance_id, region, instance_config) in binary_launches {
691 instances_by_region
692 .entry(region)
693 .or_default()
694 .push((instance_id, instance_config));
695 }
696
697 let wait_futures = instances_by_region
699 .into_iter()
700 .flat_map(|(region, instances)| {
701 let ec2_client = ec2_clients[®ion].clone();
702 instances
703 .chunks(MAX_DESCRIBE_BATCH)
704 .map(move |chunk| {
705 let ec2_client = ec2_client.clone();
706 let chunk: Vec<_> = chunk.to_vec();
707 let region = region.clone();
708 async move {
709 let instance_ids: Vec<String> =
710 chunk.iter().map(|(id, _)| id.clone()).collect();
711 let ips = wait_for_instances_running(&ec2_client, &instance_ids).await?;
712 info!(
713 region = region.as_str(),
714 count = chunk.len(),
715 "instances running in region"
716 );
717 let deployments: Vec<Deployment> = chunk
718 .into_iter()
719 .zip(ips)
720 .map(|((instance_id, instance_config), ip)| Deployment {
721 instance: instance_config,
722 id: instance_id,
723 ip,
724 })
725 .collect();
726 Ok::<Vec<Deployment>, Error>(deployments)
727 }
728 })
729 .collect::<Vec<_>>()
730 });
731
732 let (monitoring_ips, binary_deployment_batches) = tokio::try_join!(
734 async {
735 wait_for_instances_running(
736 monitoring_ec2_client,
737 slice::from_ref(&monitoring_instance_id),
738 )
739 .await
740 .map_err(Error::AwsEc2)
741 },
742 try_join_all(wait_futures)
743 )?;
744 let monitoring_ip = monitoring_ips[0].clone();
745 let monitoring_private_ip =
746 get_private_ip(monitoring_ec2_client, &monitoring_instance_id).await?;
747 let deployments: Vec<Deployment> = binary_deployment_batches.into_iter().flatten().collect();
748 info!(ip = monitoring_ip.as_str(), "monitoring instance running");
749 info!("launched instances");
750
751 info!("adding monitoring ingress rules");
755 for (region, resources) in region_resources.iter() {
756 let binary_sg_id = resources.binary_sg_id.as_ref().unwrap();
757 add_monitoring_ingress(&ec2_clients[region], binary_sg_id, &monitoring_ip).await?;
758 }
759 info!("added monitoring ingress rules");
760
761 info!("uploading config files to S3");
763 let [
764 datasources_url,
765 all_yml_url,
766 loki_yml_url,
767 pyroscope_yml_url,
768 tempo_yml_url,
769 prometheus_service_url,
770 loki_service_url,
771 pyroscope_service_url,
772 tempo_service_url,
773 monitoring_node_exporter_service_url,
774 promtail_service_url,
775 logrotate_conf_url,
776 pyroscope_agent_service_url,
777 pyroscope_agent_timer_url,
778 ]: [String; 14] = try_join_all([
779 cache_and_presign(&s3_client, &bucket_name, &grafana_datasources_s3_key(), UploadSource::Static(DATASOURCES_YML.as_bytes()), PRESIGN_DURATION),
780 cache_and_presign(&s3_client, &bucket_name, &grafana_dashboards_s3_key(), UploadSource::Static(ALL_YML.as_bytes()), PRESIGN_DURATION),
781 cache_and_presign(&s3_client, &bucket_name, &loki_config_s3_key(), UploadSource::Static(LOKI_CONFIG.as_bytes()), PRESIGN_DURATION),
782 cache_and_presign(&s3_client, &bucket_name, &pyroscope_config_s3_key(), UploadSource::Static(PYROSCOPE_CONFIG.as_bytes()), PRESIGN_DURATION),
783 cache_and_presign(&s3_client, &bucket_name, &tempo_config_s3_key(), UploadSource::Static(TEMPO_CONFIG.as_bytes()), PRESIGN_DURATION),
784 cache_and_presign(&s3_client, &bucket_name, &prometheus_service_s3_key(), UploadSource::Static(PROMETHEUS_SERVICE.as_bytes()), PRESIGN_DURATION),
785 cache_and_presign(&s3_client, &bucket_name, &loki_service_s3_key(), UploadSource::Static(LOKI_SERVICE.as_bytes()), PRESIGN_DURATION),
786 cache_and_presign(&s3_client, &bucket_name, &pyroscope_service_s3_key(), UploadSource::Static(PYROSCOPE_SERVICE.as_bytes()), PRESIGN_DURATION),
787 cache_and_presign(&s3_client, &bucket_name, &tempo_service_s3_key(), UploadSource::Static(TEMPO_SERVICE.as_bytes()), PRESIGN_DURATION),
788 cache_and_presign(&s3_client, &bucket_name, &node_exporter_service_s3_key(), UploadSource::Static(NODE_EXPORTER_SERVICE.as_bytes()), PRESIGN_DURATION),
789 cache_and_presign(&s3_client, &bucket_name, &promtail_service_s3_key(), UploadSource::Static(PROMTAIL_SERVICE.as_bytes()), PRESIGN_DURATION),
790 cache_and_presign(&s3_client, &bucket_name, &logrotate_config_s3_key(), UploadSource::Static(LOGROTATE_CONF.as_bytes()), PRESIGN_DURATION),
791 cache_and_presign(&s3_client, &bucket_name, &pyroscope_agent_service_s3_key(), UploadSource::Static(PYROSCOPE_AGENT_SERVICE.as_bytes()), PRESIGN_DURATION),
792 cache_and_presign(&s3_client, &bucket_name, &pyroscope_agent_timer_s3_key(), UploadSource::Static(PYROSCOPE_AGENT_TIMER.as_bytes()), PRESIGN_DURATION),
793 ])
794 .await?
795 .try_into()
796 .unwrap();
797
798 let mut binary_service_urls_by_arch: HashMap<Architecture, String> = HashMap::new();
800 for arch in &architectures_needed {
801 let binary_service_content = binary_service(*arch);
802 let temp_path = tag_directory.join(format!("binary-{}.service", arch.as_str()));
803 std::fs::write(&temp_path, &binary_service_content)?;
804 let binary_service_url = cache_and_presign(
805 &s3_client,
806 &bucket_name,
807 &binary_service_s3_key_for_arch(*arch),
808 UploadSource::File(&temp_path),
809 PRESIGN_DURATION,
810 )
811 .await?;
812 std::fs::remove_file(&temp_path)?;
813 binary_service_urls_by_arch.insert(*arch, binary_service_url);
814 }
815
816 let instances: Vec<(&str, &str, &str, &str)> = deployments
818 .iter()
819 .map(|d| {
820 let arch = instance_architectures[&d.instance.name];
821 (
822 d.instance.name.as_str(),
823 d.ip.as_str(),
824 d.instance.region.as_str(),
825 arch.as_str(),
826 )
827 })
828 .collect();
829 let prom_config = generate_prometheus_config(&instances);
830 let prom_digest = Sha256::hash(prom_config.as_bytes()).to_string();
831 let prom_path = tag_directory.join("prometheus.yml");
832 std::fs::write(&prom_path, &prom_config)?;
833 let dashboard_path = std::path::PathBuf::from(&config.monitoring.dashboard);
834 let dashboard_digest = hash_file(&dashboard_path).await?;
835 let [prometheus_config_url, dashboard_url]: [String; 2] = try_join_all([
836 cache_and_presign(
837 &s3_client,
838 &bucket_name,
839 &monitoring_s3_key(tag, &prom_digest),
840 UploadSource::File(&prom_path),
841 PRESIGN_DURATION,
842 ),
843 cache_and_presign(
844 &s3_client,
845 &bucket_name,
846 &monitoring_s3_key(tag, &dashboard_digest),
847 UploadSource::File(&dashboard_path),
848 PRESIGN_DURATION,
849 ),
850 ])
851 .await?
852 .try_into()
853 .unwrap();
854
855 let hosts = Hosts {
857 monitoring: monitoring_private_ip.clone().parse::<IpAddr>().unwrap(),
858 hosts: deployments
859 .iter()
860 .map(|d| Host {
861 name: d.instance.name.clone(),
862 region: d.instance.region.clone(),
863 ip: d.ip.clone().parse::<IpAddr>().unwrap(),
864 })
865 .collect(),
866 };
867 let hosts_yaml = serde_yaml::to_string(&hosts)?;
868 let hosts_digest = Sha256::hash(hosts_yaml.as_bytes()).to_string();
869 let hosts_path = tag_directory.join("hosts.yaml");
870 std::fs::write(&hosts_path, &hosts_yaml)?;
871 let hosts_url = cache_and_presign(
872 &s3_client,
873 &bucket_name,
874 &hosts_s3_key(tag, &hosts_digest),
875 UploadSource::File(&hosts_path),
876 PRESIGN_DURATION,
877 )
878 .await?;
879
880 let mut promtail_digests: BTreeMap<String, std::path::PathBuf> = BTreeMap::new();
882 let mut pyroscope_digests: BTreeMap<String, std::path::PathBuf> = BTreeMap::new();
883 let mut instance_promtail_digest: HashMap<String, String> = HashMap::new();
884 let mut instance_pyroscope_digest: HashMap<String, String> = HashMap::new();
885 for deployment in &deployments {
886 let instance = &deployment.instance;
887 let ip = &deployment.ip;
888 let arch = instance_architectures[&instance.name].as_str();
889
890 let promtail_cfg = promtail_config(
891 &monitoring_private_ip,
892 &instance.name,
893 ip,
894 &instance.region,
895 arch,
896 );
897 let promtail_digest = Sha256::hash(promtail_cfg.as_bytes()).to_string();
898 let promtail_path = tag_directory.join(format!("promtail_{}.yml", instance.name));
899 std::fs::write(&promtail_path, &promtail_cfg)?;
900
901 let pyroscope_script = generate_pyroscope_script(
902 &monitoring_private_ip,
903 &instance.name,
904 ip,
905 &instance.region,
906 arch,
907 );
908 let pyroscope_digest = Sha256::hash(pyroscope_script.as_bytes()).to_string();
909 let pyroscope_path = tag_directory.join(format!("pyroscope-agent_{}.sh", instance.name));
910 std::fs::write(&pyroscope_path, &pyroscope_script)?;
911
912 promtail_digests
913 .entry(promtail_digest.clone())
914 .or_insert(promtail_path);
915 pyroscope_digests
916 .entry(pyroscope_digest.clone())
917 .or_insert(pyroscope_path);
918 instance_promtail_digest.insert(instance.name.clone(), promtail_digest);
919 instance_pyroscope_digest.insert(instance.name.clone(), pyroscope_digest);
920 }
921
922 let (promtail_digest_to_url, pyroscope_digest_to_url): (
924 HashMap<String, String>,
925 HashMap<String, String>,
926 ) = tokio::try_join!(
927 async {
928 Ok::<_, Error>(
929 try_join_all(promtail_digests.iter().map(|(digest, path)| {
930 let s3_client = s3_client.clone();
931 let bucket_name = bucket_name.clone();
932 let digest = digest.clone();
933 let key = promtail_s3_key(tag, &digest);
934 let path = path.clone();
935 async move {
936 let url = cache_and_presign(
937 &s3_client,
938 &bucket_name,
939 &key,
940 UploadSource::File(&path),
941 PRESIGN_DURATION,
942 )
943 .await?;
944 Ok::<_, Error>((digest, url))
945 }
946 }))
947 .await?
948 .into_iter()
949 .collect(),
950 )
951 },
952 async {
953 Ok::<_, Error>(
954 try_join_all(pyroscope_digests.iter().map(|(digest, path)| {
955 let s3_client = s3_client.clone();
956 let bucket_name = bucket_name.clone();
957 let digest = digest.clone();
958 let key = pyroscope_s3_key(tag, &digest);
959 let path = path.clone();
960 async move {
961 let url = cache_and_presign(
962 &s3_client,
963 &bucket_name,
964 &key,
965 UploadSource::File(&path),
966 PRESIGN_DURATION,
967 )
968 .await?;
969 Ok::<_, Error>((digest, url))
970 }
971 }))
972 .await?
973 .into_iter()
974 .collect(),
975 )
976 },
977 )?;
978
979 let mut instance_urls_map: HashMap<String, (InstanceUrls, Architecture)> = HashMap::new();
981 for deployment in &deployments {
982 let name = &deployment.instance.name;
983 let arch = instance_architectures[name];
984 let promtail_digest = &instance_promtail_digest[name];
985 let pyroscope_digest = &instance_pyroscope_digest[name];
986 let tool_urls = &tool_urls_by_arch[&arch];
987
988 instance_urls_map.insert(
989 name.clone(),
990 (
991 InstanceUrls {
992 binary: instance_binary_urls[name].clone(),
993 config: instance_config_urls[name].clone(),
994 hosts: hosts_url.clone(),
995 promtail_bin: tool_urls.promtail.clone(),
996 promtail_config: promtail_digest_to_url[promtail_digest].clone(),
997 promtail_service: promtail_service_url.clone(),
998 node_exporter_bin: tool_urls.node_exporter.clone(),
999 node_exporter_service: monitoring_node_exporter_service_url.clone(),
1000 binary_service: binary_service_urls_by_arch[&arch].clone(),
1001 logrotate_conf: logrotate_conf_url.clone(),
1002 pyroscope_script: pyroscope_digest_to_url[pyroscope_digest].clone(),
1003 pyroscope_service: pyroscope_agent_service_url.clone(),
1004 pyroscope_timer: pyroscope_agent_timer_url.clone(),
1005 libjemalloc_deb: tool_urls.libjemalloc.clone(),
1006 logrotate_deb: tool_urls.logrotate.clone(),
1007 unzip_deb: tool_urls.unzip.clone(),
1008 },
1009 arch,
1010 ),
1011 );
1012 }
1013 info!("uploaded config files to S3");
1014
1015 let tool_urls = &tool_urls_by_arch[&monitoring_architecture];
1017 let monitoring_urls = MonitoringUrls {
1018 prometheus_bin: tool_urls.prometheus.clone(),
1019 grafana_bin: tool_urls.grafana.clone(),
1020 loki_bin: tool_urls.loki.clone(),
1021 pyroscope_bin: tool_urls.pyroscope.clone(),
1022 tempo_bin: tool_urls.tempo.clone(),
1023 node_exporter_bin: tool_urls.node_exporter.clone(),
1024 fonts_dejavu_mono_deb: tool_urls.fonts_dejavu_mono.clone(),
1025 fonts_dejavu_core_deb: tool_urls.fonts_dejavu_core.clone(),
1026 fontconfig_config_deb: tool_urls.fontconfig_config.clone(),
1027 libfontconfig_deb: tool_urls.libfontconfig.clone(),
1028 unzip_deb: tool_urls.unzip.clone(),
1029 adduser_deb: tool_urls.adduser.clone(),
1030 musl_deb: tool_urls.musl.clone(),
1031 prometheus_config: prometheus_config_url,
1032 datasources_yml: datasources_url,
1033 all_yml: all_yml_url,
1034 dashboard: dashboard_url,
1035 loki_yml: loki_yml_url,
1036 pyroscope_yml: pyroscope_yml_url,
1037 tempo_yml: tempo_yml_url,
1038 prometheus_service: prometheus_service_url,
1039 loki_service: loki_service_url,
1040 pyroscope_service: pyroscope_service_url,
1041 tempo_service: tempo_service_url,
1042 node_exporter_service: monitoring_node_exporter_service_url.clone(),
1043 };
1044
1045 info!("configuring monitoring and binary instances");
1047 let binary_configs: Vec<_> = deployments
1048 .iter()
1049 .map(|deployment| {
1050 let instance = deployment.instance.clone();
1051 let deployment_id = deployment.id.clone();
1052 let ec2_client = ec2_clients[&instance.region].clone();
1053 let ip = deployment.ip.clone();
1054 let (urls, arch) = instance_urls_map.remove(&instance.name).unwrap();
1055 (instance, deployment_id, ec2_client, ip, urls, arch)
1056 })
1057 .collect();
1058 let binary_futures = binary_configs.into_iter().map(
1059 |(instance, deployment_id, ec2_client, ip, urls, arch)| async move {
1060 let start = Instant::now();
1061
1062 wait_for_instances_ready(&ec2_client, slice::from_ref(&deployment_id)).await?;
1063 let deploy = format!("{:.1}s", start.elapsed().as_secs_f64());
1064
1065 let download_start = Instant::now();
1066 if let Some(apt_cmd) = install_binary_apt_cmd(instance.profiling) {
1067 ssh_execute(private_key, &ip, apt_cmd).await?;
1068 }
1069 ssh_execute(private_key, &ip, &install_binary_download_cmd(&urls)).await?;
1070 let download = format!("{:.1}s", download_start.elapsed().as_secs_f64());
1071
1072 let setup_start = Instant::now();
1073 ssh_execute(
1074 private_key,
1075 &ip,
1076 &install_binary_setup_cmd(instance.profiling, arch),
1077 )
1078 .await?;
1079 let setup = format!("{:.1}s", setup_start.elapsed().as_secs_f64());
1080
1081 let start_time = Instant::now();
1082 poll_service_active(private_key, &ip, "promtail").await?;
1083 poll_service_active(private_key, &ip, "node_exporter").await?;
1084 poll_service_active(private_key, &ip, "binary").await?;
1085 let start_dur = format!("{:.1}s", start_time.elapsed().as_secs_f64());
1086
1087 info!(
1088 ip,
1089 instance = instance.name.as_str(),
1090 deploy,
1091 download,
1092 setup,
1093 start = start_dur,
1094 "configured instance"
1095 );
1096 Ok::<String, Error>(ip)
1097 },
1098 );
1099
1100 let (_, all_binary_ips) = tokio::try_join!(
1102 async {
1103 let start = Instant::now();
1105
1106 let monitoring_ec2_client = &ec2_clients[&monitoring_region];
1107 wait_for_instances_ready(
1108 monitoring_ec2_client,
1109 slice::from_ref(&monitoring_instance_id),
1110 )
1111 .await?;
1112 let deploy = format!("{:.1}s", start.elapsed().as_secs_f64());
1113
1114 let download_start = Instant::now();
1115 ssh_execute(
1116 private_key,
1117 &monitoring_ip,
1118 &install_monitoring_download_cmd(&monitoring_urls),
1119 )
1120 .await?;
1121 let download = format!("{:.1}s", download_start.elapsed().as_secs_f64());
1122
1123 let setup_start = Instant::now();
1124 ssh_execute(
1125 private_key,
1126 &monitoring_ip,
1127 &install_monitoring_setup_cmd(PROMETHEUS_VERSION, monitoring_architecture),
1128 )
1129 .await?;
1130 ssh_execute(private_key, &monitoring_ip, start_monitoring_services_cmd()).await?;
1131 let setup = format!("{:.1}s", setup_start.elapsed().as_secs_f64());
1132
1133 let start_time = Instant::now();
1134 poll_service_active(private_key, &monitoring_ip, "node_exporter").await?;
1135 poll_service_active(private_key, &monitoring_ip, "prometheus").await?;
1136 poll_service_active(private_key, &monitoring_ip, "loki").await?;
1137 poll_service_active(private_key, &monitoring_ip, "pyroscope").await?;
1138 poll_service_active(private_key, &monitoring_ip, "tempo").await?;
1139 poll_service_active(private_key, &monitoring_ip, "grafana-server").await?;
1140 let start_dur = format!("{:.1}s", start_time.elapsed().as_secs_f64());
1141
1142 info!(
1143 ip = monitoring_ip.as_str(),
1144 deploy,
1145 download,
1146 setup,
1147 start = start_dur,
1148 "configured monitoring instance"
1149 );
1150 Ok::<(), Error>(())
1151 },
1152 async {
1153 let all_binary_ips: Vec<String> = stream::iter(binary_futures)
1155 .buffer_unordered(concurrency)
1156 .try_collect()
1157 .await?;
1158 info!("configured binary instances");
1159 Ok::<Vec<String>, Error>(all_binary_ips)
1160 }
1161 )?;
1162
1163 info!("updating monitoring security group to allow traffic from binary instances");
1165 let monitoring_ec2_client = &ec2_clients[&monitoring_region];
1166 if binary_regions.contains(&monitoring_region) {
1167 let binary_sg_id = region_resources[&monitoring_region]
1168 .binary_sg_id
1169 .clone()
1170 .unwrap();
1171 monitoring_ec2_client
1172 .authorize_security_group_ingress()
1173 .group_id(&monitoring_sg_id)
1174 .ip_permissions(
1175 IpPermission::builder()
1176 .ip_protocol("tcp")
1177 .from_port(LOGS_PORT as i32)
1178 .to_port(LOGS_PORT as i32)
1179 .user_id_group_pairs(
1180 UserIdGroupPair::builder()
1181 .group_id(binary_sg_id.clone())
1182 .build(),
1183 )
1184 .build(),
1185 )
1186 .ip_permissions(
1187 IpPermission::builder()
1188 .ip_protocol("tcp")
1189 .from_port(PROFILES_PORT as i32)
1190 .to_port(PROFILES_PORT as i32)
1191 .user_id_group_pairs(
1192 UserIdGroupPair::builder()
1193 .group_id(binary_sg_id.clone())
1194 .build(),
1195 )
1196 .build(),
1197 )
1198 .ip_permissions(
1199 IpPermission::builder()
1200 .ip_protocol("tcp")
1201 .from_port(TRACES_PORT as i32)
1202 .to_port(TRACES_PORT as i32)
1203 .user_id_group_pairs(
1204 UserIdGroupPair::builder()
1205 .group_id(binary_sg_id.clone())
1206 .build(),
1207 )
1208 .build(),
1209 )
1210 .send()
1211 .await
1212 .map_err(|err| err.into_service_error())?;
1213 info!(
1214 monitoring = monitoring_sg_id.as_str(),
1215 binary = binary_sg_id.as_str(),
1216 region = monitoring_region.as_str(),
1217 "linked monitoring and binary security groups in monitoring region"
1218 );
1219 }
1220 for region in ®ions {
1221 if region != &monitoring_region && binary_regions.contains(region) {
1222 let binary_cidr = ®ion_resources[region].vpc_cidr;
1223 monitoring_ec2_client
1224 .authorize_security_group_ingress()
1225 .group_id(&monitoring_sg_id)
1226 .ip_permissions(
1227 IpPermission::builder()
1228 .ip_protocol("tcp")
1229 .from_port(LOGS_PORT as i32)
1230 .to_port(LOGS_PORT as i32)
1231 .ip_ranges(IpRange::builder().cidr_ip(binary_cidr).build())
1232 .build(),
1233 )
1234 .ip_permissions(
1235 IpPermission::builder()
1236 .ip_protocol("tcp")
1237 .from_port(PROFILES_PORT as i32)
1238 .to_port(PROFILES_PORT as i32)
1239 .ip_ranges(IpRange::builder().cidr_ip(binary_cidr).build())
1240 .build(),
1241 )
1242 .ip_permissions(
1243 IpPermission::builder()
1244 .ip_protocol("tcp")
1245 .from_port(TRACES_PORT as i32)
1246 .to_port(TRACES_PORT as i32)
1247 .ip_ranges(IpRange::builder().cidr_ip(binary_cidr).build())
1248 .build(),
1249 )
1250 .send()
1251 .await
1252 .map_err(|err| err.into_service_error())?;
1253 info!(
1254 monitoring = monitoring_sg_id.as_str(),
1255 binary = binary_cidr.as_str(),
1256 region = region.as_str(),
1257 "opened monitoring port to traffic from binary VPC"
1258 );
1259 }
1260 }
1261 info!("updated monitoring security group");
1262
1263 File::create(tag_directory.join(CREATED_FILE_NAME))?;
1265 info!(
1266 monitoring = monitoring_ip.as_str(),
1267 binary = ?all_binary_ips,
1268 "deployment complete"
1269 );
1270 Ok(())
1271}