1use crate::ec2::{
4 aws::*, deployer_directory, s3::*, services::*, utils::*, Architecture, Config, Error, Host,
5 Hosts, InstanceConfig, CREATED_FILE_NAME, LOGS_PORT, MONITORING_NAME, MONITORING_REGION,
6 PROFILES_PORT, TRACES_PORT,
7};
8use futures::future::try_join_all;
9use std::{
10 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
11 fs::File,
12 net::IpAddr,
13 path::PathBuf,
14 slice,
15};
16use tokio::process::Command;
17use tracing::info;
18
19type ToolUrls = (String, String, String, String, String, String, String);
21
22#[derive(Clone)]
24pub struct Deployment {
25 pub instance: InstanceConfig,
26 pub id: String,
27 pub ip: String,
28}
29
30pub struct RegionResources {
32 pub vpc_id: String,
33 pub vpc_cidr: String,
34 pub route_table_id: String,
35 pub subnet_id: String,
36 pub binary_sg_id: Option<String>,
37 pub monitoring_sg_id: Option<String>,
38}
39
40pub async fn create(config: &PathBuf) -> Result<(), Error> {
42 let config: Config = {
44 let config_file = File::open(config)?;
45 serde_yaml::from_reader(config_file)?
46 };
47 let tag = &config.tag;
48 info!(tag = tag.as_str(), "loaded configuration");
49
50 let tag_directory = deployer_directory(tag);
52 if tag_directory.exists() {
53 return Err(Error::CreationAttempted);
54 }
55 std::fs::create_dir_all(&tag_directory)?;
56 info!(path = ?tag_directory, "created tag directory");
57
58 let mut instance_names = HashSet::new();
60 for instance in &config.instances {
61 if !instance_names.insert(&instance.name) {
62 return Err(Error::DuplicateInstanceName(instance.name.clone()));
63 }
64 if instance.name == MONITORING_NAME {
65 return Err(Error::InvalidInstanceName(instance.name.clone()));
66 }
67 }
68
69 let deployer_ip = get_public_ip().await?;
71 info!(ip = deployer_ip.as_str(), "recovered public IP");
72
73 let key_name = format!("deployer-{tag}");
75 let private_key_path = tag_directory.join(format!("id_rsa_{tag}"));
76 let public_key_path = tag_directory.join(format!("id_rsa_{tag}.pub"));
77 let output = Command::new("ssh-keygen")
78 .arg("-t")
79 .arg("rsa")
80 .arg("-b")
81 .arg("4096")
82 .arg("-f")
83 .arg(private_key_path.to_str().unwrap())
84 .arg("-N")
85 .arg("")
86 .output()
87 .await?;
88 if !output.status.success() {
89 return Err(Error::KeygenFailed);
90 }
91 let public_key = std::fs::read_to_string(&public_key_path)?;
92 let private_key = private_key_path.to_str().unwrap();
93
94 let mut regions: BTreeSet<String> = config.instances.iter().map(|i| i.region.clone()).collect();
96 regions.insert(MONITORING_REGION.to_string());
97
98 let mut instance_types_by_region: HashMap<String, HashSet<String>> = HashMap::new();
100 let mut unique_instance_types: HashSet<String> = HashSet::new();
101 instance_types_by_region
102 .entry(MONITORING_REGION.to_string())
103 .or_default()
104 .insert(config.monitoring.instance_type.clone());
105 unique_instance_types.insert(config.monitoring.instance_type.clone());
106 for instance in &config.instances {
107 instance_types_by_region
108 .entry(instance.region.clone())
109 .or_default()
110 .insert(instance.instance_type.clone());
111 unique_instance_types.insert(instance.instance_type.clone());
112 }
113
114 info!("detecting architectures for instance types");
116 let ec2_client = create_ec2_client(Region::new(MONITORING_REGION)).await;
117 let mut arch_by_instance_type: HashMap<String, Architecture> = HashMap::new();
118 for instance_type in &unique_instance_types {
119 let arch = detect_architecture(&ec2_client, instance_type).await?;
120 info!(
121 architecture = %arch,
122 instance_type = instance_type.as_str(),
123 "detected architecture"
124 );
125 arch_by_instance_type.insert(instance_type.clone(), arch);
126 }
127
128 let monitoring_architecture = arch_by_instance_type[&config.monitoring.instance_type];
130 let mut instance_architectures: HashMap<String, Architecture> = HashMap::new();
131 let mut architectures_needed: HashSet<Architecture> = HashSet::new();
132 architectures_needed.insert(monitoring_architecture);
133 for instance in &config.instances {
134 let arch = arch_by_instance_type[&instance.instance_type];
135 instance_architectures.insert(instance.name.clone(), arch);
136 architectures_needed.insert(arch);
137 }
138
139 info!(bucket = S3_BUCKET_NAME, "setting up S3 bucket");
141 let s3_client = create_s3_client(Region::new(MONITORING_REGION)).await;
142 ensure_bucket_exists(&s3_client, S3_BUCKET_NAME, MONITORING_REGION).await?;
143
144 info!("uploading observability tools to S3");
146 let cache_tool = |s3_key: String, download_url: String| {
147 let tag_directory = tag_directory.clone();
148 let s3_client = s3_client.clone();
149 async move {
150 if !object_exists(&s3_client, S3_BUCKET_NAME, &s3_key).await? {
151 info!(
152 key = s3_key.as_str(),
153 "tool not in S3, downloading and uploading"
154 );
155 let temp_path = tag_directory.join(s3_key.replace('/', "_"));
156 download_file(&download_url, &temp_path).await?;
157 let url = upload_and_presign(
158 &s3_client,
159 S3_BUCKET_NAME,
160 &s3_key,
161 &temp_path,
162 PRESIGN_DURATION,
163 )
164 .await?;
165 std::fs::remove_file(&temp_path)?;
166 Ok::<_, Error>(url)
167 } else {
168 info!(key = s3_key.as_str(), "tool already in S3");
169 presign_url(&s3_client, S3_BUCKET_NAME, &s3_key, PRESIGN_DURATION).await
170 }
171 }
172 };
173
174 let mut tool_urls_by_arch: HashMap<Architecture, ToolUrls> = HashMap::new();
176 for arch in &architectures_needed {
177 let [prometheus_url, grafana_url, loki_url, pyroscope_url, tempo_url, node_exporter_url, promtail_url]: [String; 7] =
178 try_join_all([
179 cache_tool(prometheus_bin_s3_key(PROMETHEUS_VERSION, *arch), prometheus_download_url(PROMETHEUS_VERSION, *arch)),
180 cache_tool(grafana_bin_s3_key(GRAFANA_VERSION, *arch), grafana_download_url(GRAFANA_VERSION, *arch)),
181 cache_tool(loki_bin_s3_key(LOKI_VERSION, *arch), loki_download_url(LOKI_VERSION, *arch)),
182 cache_tool(pyroscope_bin_s3_key(PYROSCOPE_VERSION, *arch), pyroscope_download_url(PYROSCOPE_VERSION, *arch)),
183 cache_tool(tempo_bin_s3_key(TEMPO_VERSION, *arch), tempo_download_url(TEMPO_VERSION, *arch)),
184 cache_tool(node_exporter_bin_s3_key(NODE_EXPORTER_VERSION, *arch), node_exporter_download_url(NODE_EXPORTER_VERSION, *arch)),
185 cache_tool(promtail_bin_s3_key(PROMTAIL_VERSION, *arch), promtail_download_url(PROMTAIL_VERSION, *arch)),
186 ])
187 .await?
188 .try_into()
189 .unwrap();
190 tool_urls_by_arch.insert(
191 *arch,
192 (
193 prometheus_url,
194 grafana_url,
195 loki_url,
196 pyroscope_url,
197 tempo_url,
198 node_exporter_url,
199 promtail_url,
200 ),
201 );
202 }
203 info!("observability tools uploaded");
204
205 let mut binary_digests: BTreeMap<String, String> = BTreeMap::new(); let mut config_digests: BTreeMap<String, String> = BTreeMap::new(); let mut instance_binary_digest: HashMap<String, String> = HashMap::new(); let mut instance_config_digest: HashMap<String, String> = HashMap::new(); for instance in &config.instances {
211 let binary_digest = hash_file(std::path::Path::new(&instance.binary))?;
212 let config_digest = hash_file(std::path::Path::new(&instance.config))?;
213 binary_digests.insert(binary_digest.clone(), instance.binary.clone());
214 config_digests.insert(config_digest.clone(), instance.config.clone());
215 instance_binary_digest.insert(instance.name.clone(), binary_digest);
216 instance_config_digest.insert(instance.name.clone(), config_digest);
217 }
218
219 info!("uploading unique binaries and configs to S3");
221 let (binary_digest_to_url, config_digest_to_url): (
222 HashMap<String, String>,
223 HashMap<String, String>,
224 ) = tokio::try_join!(
225 async {
226 Ok::<_, Error>(
227 try_join_all(binary_digests.iter().map(|(digest, path)| {
228 let s3_client = s3_client.clone();
229 let digest = digest.clone();
230 let key = binary_s3_key(tag, &digest);
231 let path = path.clone();
232 async move {
233 let url = cache_file_and_presign(
234 &s3_client,
235 S3_BUCKET_NAME,
236 &key,
237 path.as_ref(),
238 PRESIGN_DURATION,
239 )
240 .await?;
241 Ok::<_, Error>((digest, url))
242 }
243 }))
244 .await?
245 .into_iter()
246 .collect(),
247 )
248 },
249 async {
250 Ok::<_, Error>(
251 try_join_all(config_digests.iter().map(|(digest, path)| {
252 let s3_client = s3_client.clone();
253 let digest = digest.clone();
254 let key = config_s3_key(tag, &digest);
255 let path = path.clone();
256 async move {
257 let url = cache_file_and_presign(
258 &s3_client,
259 S3_BUCKET_NAME,
260 &key,
261 path.as_ref(),
262 PRESIGN_DURATION,
263 )
264 .await?;
265 Ok::<_, Error>((digest, url))
266 }
267 }))
268 .await?
269 .into_iter()
270 .collect(),
271 )
272 },
273 )?;
274
275 let mut instance_binary_urls: HashMap<String, String> = HashMap::new();
277 let mut instance_config_urls: HashMap<String, String> = HashMap::new();
278 for instance in &config.instances {
279 let binary_digest = &instance_binary_digest[&instance.name];
280 let config_digest = &instance_config_digest[&instance.name];
281 instance_binary_urls.insert(
282 instance.name.clone(),
283 binary_digest_to_url[binary_digest].clone(),
284 );
285 instance_config_urls.insert(
286 instance.name.clone(),
287 config_digest_to_url[config_digest].clone(),
288 );
289 }
290 info!("uploaded all instance binaries and configs");
291
292 info!(?regions, "initializing resources");
294 let region_init_futures: Vec<_> = regions
295 .iter()
296 .enumerate()
297 .map(|(idx, region)| {
298 let region = region.clone();
299 let tag = tag.clone();
300 let deployer_ip = deployer_ip.clone();
301 let key_name = key_name.clone();
302 let public_key = public_key.clone();
303 let instance_types: Vec<String> =
304 instance_types_by_region[®ion].iter().cloned().collect();
305
306 async move {
307 let ec2_client = create_ec2_client(Region::new(region.clone())).await;
309 info!(region = region.as_str(), "created EC2 client");
310
311 let az = find_availability_zone(&ec2_client, &instance_types).await?;
313 info!(
314 az = az.as_str(),
315 region = region.as_str(),
316 "selected availability zone"
317 );
318
319 let vpc_cidr = format!("10.{idx}.0.0/16");
321 let vpc_id = create_vpc(&ec2_client, &vpc_cidr, &tag).await?;
322 info!(
323 vpc = vpc_id.as_str(),
324 region = region.as_str(),
325 "created VPC"
326 );
327 let igw_id = create_and_attach_igw(&ec2_client, &vpc_id, &tag).await?;
328 info!(
329 igw = igw_id.as_str(),
330 vpc = vpc_id.as_str(),
331 region = region.as_str(),
332 "created and attached IGW"
333 );
334 let route_table_id =
335 create_route_table(&ec2_client, &vpc_id, &igw_id, &tag).await?;
336 info!(
337 route_table = route_table_id.as_str(),
338 vpc = vpc_id.as_str(),
339 region = region.as_str(),
340 "created route table"
341 );
342 let subnet_cidr = format!("10.{idx}.1.0/24");
343 let subnet_id = create_subnet(
344 &ec2_client,
345 &vpc_id,
346 &route_table_id,
347 &subnet_cidr,
348 &az,
349 &tag,
350 )
351 .await?;
352 info!(
353 subnet = subnet_id.as_str(),
354 vpc = vpc_id.as_str(),
355 region = region.as_str(),
356 "created subnet"
357 );
358
359 let monitoring_sg_id = if region == MONITORING_REGION {
361 let sg_id =
362 create_security_group_monitoring(&ec2_client, &vpc_id, &deployer_ip, &tag)
363 .await?;
364 info!(
365 sg = sg_id.as_str(),
366 vpc = vpc_id.as_str(),
367 region = region.as_str(),
368 "created monitoring security group"
369 );
370 Some(sg_id)
371 } else {
372 None
373 };
374
375 import_key_pair(&ec2_client, &key_name, &public_key).await?;
377 info!(
378 key = key_name.as_str(),
379 region = region.as_str(),
380 "imported key pair"
381 );
382
383 info!(
384 vpc = vpc_id.as_str(),
385 subnet = subnet_id.as_str(),
386 subnet_cidr = subnet_cidr.as_str(),
387 region = region.as_str(),
388 "initialized resources"
389 );
390
391 Ok::<_, Error>((
392 region,
393 ec2_client,
394 RegionResources {
395 vpc_id,
396 vpc_cidr,
397 route_table_id,
398 subnet_id,
399 binary_sg_id: None,
400 monitoring_sg_id,
401 },
402 ))
403 }
404 })
405 .collect();
406
407 let region_results = try_join_all(region_init_futures).await?;
408 let (ec2_clients, mut region_resources): (HashMap<_, _>, HashMap<_, _>) = region_results
409 .into_iter()
410 .map(|(region, client, resources)| ((region.clone(), client), (region, resources)))
411 .unzip();
412 info!(?regions, "initialized resources");
413
414 info!("creating binary security groups");
416 for (region, resources) in region_resources.iter_mut() {
417 let binary_sg_id = create_security_group_binary(
418 &ec2_clients[region],
419 &resources.vpc_id,
420 &deployer_ip,
421 tag,
422 &config.ports,
423 )
424 .await?;
425 info!(
426 sg = binary_sg_id.as_str(),
427 vpc = resources.vpc_id.as_str(),
428 region = region.as_str(),
429 "created binary security group"
430 );
431 resources.binary_sg_id = Some(binary_sg_id);
432 }
433 info!("created binary security groups");
434
435 info!("initializing VPC peering connections");
437 let monitoring_region = MONITORING_REGION.to_string();
438 let monitoring_resources = region_resources.get(&monitoring_region).unwrap();
439 let monitoring_vpc_id = &monitoring_resources.vpc_id;
440 let monitoring_cidr = &monitoring_resources.vpc_cidr;
441 let monitoring_route_table_id = &monitoring_resources.route_table_id;
442 let binary_regions: HashSet<String> =
443 config.instances.iter().map(|i| i.region.clone()).collect();
444 for region in ®ions {
445 if region != &monitoring_region && binary_regions.contains(region) {
446 let binary_resources = region_resources.get(region).unwrap();
447 let binary_vpc_id = &binary_resources.vpc_id;
448 let binary_cidr = &binary_resources.vpc_cidr;
449 let peer_id = create_vpc_peering_connection(
450 &ec2_clients[&monitoring_region],
451 monitoring_vpc_id,
452 binary_vpc_id,
453 region,
454 tag,
455 )
456 .await?;
457 info!(
458 peer = peer_id.as_str(),
459 monitoring = monitoring_vpc_id.as_str(),
460 binary = binary_vpc_id.as_str(),
461 region = region.as_str(),
462 "created VPC peering connection"
463 );
464 wait_for_vpc_peering_connection(&ec2_clients[region], &peer_id).await?;
465 info!(
466 peer = peer_id.as_str(),
467 region = region.as_str(),
468 "VPC peering connection is available"
469 );
470 accept_vpc_peering_connection(&ec2_clients[region], &peer_id).await?;
471 info!(
472 peer = peer_id.as_str(),
473 region = region.as_str(),
474 "accepted VPC peering connection"
475 );
476 add_route(
477 &ec2_clients[&monitoring_region],
478 monitoring_route_table_id,
479 binary_cidr,
480 &peer_id,
481 )
482 .await?;
483 add_route(
484 &ec2_clients[region],
485 &binary_resources.route_table_id,
486 monitoring_cidr,
487 &peer_id,
488 )
489 .await?;
490 info!(
491 peer = peer_id.as_str(),
492 monitoring = monitoring_vpc_id.as_str(),
493 binary = binary_vpc_id.as_str(),
494 region = region.as_str(),
495 "added routes for VPC peering connection"
496 );
497 }
498 }
499 info!("initialized VPC peering connections");
500
501 info!("launching instances");
503 let monitoring_ec2_client = &ec2_clients[&monitoring_region];
504 let monitoring_ami_id = find_latest_ami(monitoring_ec2_client, monitoring_architecture).await?;
505 let monitoring_instance_type =
506 InstanceType::try_parse(&config.monitoring.instance_type).expect("Invalid instance type");
507 let monitoring_storage_class =
508 VolumeType::try_parse(&config.monitoring.storage_class).expect("Invalid storage class");
509 let monitoring_sg_id = monitoring_resources
510 .monitoring_sg_id
511 .as_ref()
512 .unwrap()
513 .clone();
514 let monitoring_subnet_id = monitoring_resources.subnet_id.clone();
515
516 let mut binary_launch_configs = Vec::new();
517 for instance in &config.instances {
518 let region = instance.region.clone();
519 let resources = region_resources.get(®ion).unwrap();
520 let ec2_client = ec2_clients.get(®ion).unwrap();
521 let arch = instance_architectures[&instance.name];
522 let ami_id = find_latest_ami(ec2_client, arch).await?;
523 binary_launch_configs.push((instance, ec2_client, resources, ami_id, arch));
524 }
525
526 let monitoring_launch_future = {
528 let key_name = key_name.clone();
529 let tag = tag.clone();
530 let sg_id = monitoring_sg_id.clone();
531 async move {
532 let instance_id = launch_instances(
533 monitoring_ec2_client,
534 &monitoring_ami_id,
535 monitoring_instance_type,
536 config.monitoring.storage_size,
537 monitoring_storage_class,
538 &key_name,
539 &monitoring_subnet_id,
540 &sg_id,
541 1,
542 MONITORING_NAME,
543 &tag,
544 )
545 .await?[0]
546 .clone();
547 let ip =
548 wait_for_instances_running(monitoring_ec2_client, slice::from_ref(&instance_id))
549 .await?[0]
550 .clone();
551 let private_ip = get_private_ip(monitoring_ec2_client, &instance_id).await?;
552 info!(ip = ip.as_str(), "launched monitoring instance");
553 Ok::<(String, String, String), Error>((instance_id, ip, private_ip))
554 }
555 };
556
557 let binary_launch_futures =
559 binary_launch_configs
560 .iter()
561 .map(|(instance, ec2_client, resources, ami_id, _arch)| {
562 let key_name = key_name.clone();
563 let instance_type = InstanceType::try_parse(&instance.instance_type)
564 .expect("Invalid instance type");
565 let storage_class =
566 VolumeType::try_parse(&instance.storage_class).expect("Invalid storage class");
567 let binary_sg_id = resources.binary_sg_id.as_ref().unwrap();
568 let tag = tag.clone();
569 async move {
570 let instance_id = launch_instances(
571 ec2_client,
572 ami_id,
573 instance_type,
574 instance.storage_size,
575 storage_class,
576 &key_name,
577 &resources.subnet_id,
578 binary_sg_id,
579 1,
580 &instance.name,
581 &tag,
582 )
583 .await?[0]
584 .clone();
585 let ip = wait_for_instances_running(ec2_client, slice::from_ref(&instance_id))
586 .await?[0]
587 .clone();
588 info!(
589 ip = ip.as_str(),
590 instance = instance.name.as_str(),
591 "launched instance"
592 );
593 Ok::<Deployment, Error>(Deployment {
594 instance: (*instance).clone(),
595 id: instance_id,
596 ip,
597 })
598 }
599 });
600
601 let (monitoring_result, deployments) = tokio::try_join!(
603 monitoring_launch_future,
604 try_join_all(binary_launch_futures)
605 )?;
606 let (monitoring_instance_id, monitoring_ip, monitoring_private_ip) = monitoring_result;
607 info!("launched instances");
608
609 info!("adding monitoring ingress rules");
613 for (region, resources) in region_resources.iter() {
614 let binary_sg_id = resources.binary_sg_id.as_ref().unwrap();
615 add_monitoring_ingress(&ec2_clients[region], binary_sg_id, &monitoring_ip).await?;
616 }
617 info!("added monitoring ingress rules");
618
619 info!("uploading config files to S3");
621 let [
622 bbr_conf_url,
623 datasources_url,
624 all_yml_url,
625 loki_yml_url,
626 pyroscope_yml_url,
627 tempo_yml_url,
628 prometheus_service_url,
629 loki_service_url,
630 pyroscope_service_url,
631 tempo_service_url,
632 monitoring_node_exporter_service_url,
633 promtail_service_url,
634 logrotate_conf_url,
635 pyroscope_agent_service_url,
636 pyroscope_agent_timer_url,
637 ]: [String; 15] = try_join_all([
638 cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &bbr_config_s3_key(), BBR_CONF.as_bytes(), PRESIGN_DURATION),
639 cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &grafana_datasources_s3_key(), DATASOURCES_YML.as_bytes(), PRESIGN_DURATION),
640 cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &grafana_dashboards_s3_key(), ALL_YML.as_bytes(), PRESIGN_DURATION),
641 cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &loki_config_s3_key(), LOKI_CONFIG.as_bytes(), PRESIGN_DURATION),
642 cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &pyroscope_config_s3_key(), PYROSCOPE_CONFIG.as_bytes(), PRESIGN_DURATION),
643 cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &tempo_config_s3_key(), TEMPO_CONFIG.as_bytes(), PRESIGN_DURATION),
644 cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &prometheus_service_s3_key(), PROMETHEUS_SERVICE.as_bytes(), PRESIGN_DURATION),
645 cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &loki_service_s3_key(), LOKI_SERVICE.as_bytes(), PRESIGN_DURATION),
646 cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &pyroscope_service_s3_key(), PYROSCOPE_SERVICE.as_bytes(), PRESIGN_DURATION),
647 cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &tempo_service_s3_key(), TEMPO_SERVICE.as_bytes(), PRESIGN_DURATION),
648 cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &node_exporter_service_s3_key(), NODE_EXPORTER_SERVICE.as_bytes(), PRESIGN_DURATION),
649 cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &promtail_service_s3_key(), PROMTAIL_SERVICE.as_bytes(), PRESIGN_DURATION),
650 cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &logrotate_config_s3_key(), LOGROTATE_CONF.as_bytes(), PRESIGN_DURATION),
651 cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &pyroscope_agent_service_s3_key(), PYROSCOPE_AGENT_SERVICE.as_bytes(), PRESIGN_DURATION),
652 cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &pyroscope_agent_timer_s3_key(), PYROSCOPE_AGENT_TIMER.as_bytes(), PRESIGN_DURATION),
653 ])
654 .await?
655 .try_into()
656 .unwrap();
657
658 let mut binary_service_urls_by_arch: HashMap<Architecture, String> = HashMap::new();
660 for arch in &architectures_needed {
661 let binary_service_content = binary_service(*arch);
662 let temp_path = tag_directory.join(format!("binary-{}.service", arch.as_str()));
663 std::fs::write(&temp_path, &binary_service_content)?;
664 let binary_service_url = cache_file_and_presign(
665 &s3_client,
666 S3_BUCKET_NAME,
667 &binary_service_s3_key_for_arch(*arch),
668 &temp_path,
669 PRESIGN_DURATION,
670 )
671 .await?;
672 std::fs::remove_file(&temp_path)?;
673 binary_service_urls_by_arch.insert(*arch, binary_service_url);
674 }
675
676 let instances: Vec<(&str, &str, &str, &str)> = deployments
678 .iter()
679 .map(|d| {
680 let arch = instance_architectures[&d.instance.name];
681 (
682 d.instance.name.as_str(),
683 d.ip.as_str(),
684 d.instance.region.as_str(),
685 arch.as_str(),
686 )
687 })
688 .collect();
689 let prom_config = generate_prometheus_config(&instances);
690 let prom_path = tag_directory.join("prometheus.yml");
691 std::fs::write(&prom_path, &prom_config)?;
692 let prom_digest = hash_file(&prom_path)?;
693 let dashboard_path = std::path::PathBuf::from(&config.monitoring.dashboard);
694 let dashboard_digest = hash_file(&dashboard_path)?;
695 let [prometheus_config_url, dashboard_url]: [String; 2] = try_join_all([
696 cache_file_and_presign(
697 &s3_client,
698 S3_BUCKET_NAME,
699 &monitoring_s3_key(tag, &prom_digest),
700 &prom_path,
701 PRESIGN_DURATION,
702 ),
703 cache_file_and_presign(
704 &s3_client,
705 S3_BUCKET_NAME,
706 &monitoring_s3_key(tag, &dashboard_digest),
707 &dashboard_path,
708 PRESIGN_DURATION,
709 ),
710 ])
711 .await?
712 .try_into()
713 .unwrap();
714
715 let hosts = Hosts {
717 monitoring: monitoring_private_ip.clone().parse::<IpAddr>().unwrap(),
718 hosts: deployments
719 .iter()
720 .map(|d| Host {
721 name: d.instance.name.clone(),
722 region: d.instance.region.clone(),
723 ip: d.ip.clone().parse::<IpAddr>().unwrap(),
724 })
725 .collect(),
726 };
727 let hosts_yaml = serde_yaml::to_string(&hosts)?;
728 let hosts_path = tag_directory.join("hosts.yaml");
729 std::fs::write(&hosts_path, &hosts_yaml)?;
730 let hosts_digest = hash_file(&hosts_path)?;
731 let hosts_url = cache_file_and_presign(
732 &s3_client,
733 S3_BUCKET_NAME,
734 &hosts_s3_key(tag, &hosts_digest),
735 &hosts_path,
736 PRESIGN_DURATION,
737 )
738 .await?;
739
740 let mut promtail_digests: BTreeMap<String, std::path::PathBuf> = BTreeMap::new();
742 let mut pyroscope_digests: BTreeMap<String, std::path::PathBuf> = BTreeMap::new();
743 let mut instance_promtail_digest: HashMap<String, String> = HashMap::new();
744 let mut instance_pyroscope_digest: HashMap<String, String> = HashMap::new();
745 for deployment in &deployments {
746 let instance = &deployment.instance;
747 let ip = &deployment.ip;
748 let arch = instance_architectures[&instance.name].as_str();
749
750 let promtail_cfg = promtail_config(
751 &monitoring_private_ip,
752 &instance.name,
753 ip,
754 &instance.region,
755 arch,
756 );
757 let promtail_path = tag_directory.join(format!("promtail_{}.yml", instance.name));
758 std::fs::write(&promtail_path, &promtail_cfg)?;
759 let promtail_digest = hash_file(&promtail_path)?;
760
761 let pyroscope_script = generate_pyroscope_script(
762 &monitoring_private_ip,
763 &instance.name,
764 ip,
765 &instance.region,
766 arch,
767 );
768 let pyroscope_path = tag_directory.join(format!("pyroscope-agent_{}.sh", instance.name));
769 std::fs::write(&pyroscope_path, &pyroscope_script)?;
770 let pyroscope_digest = hash_file(&pyroscope_path)?;
771
772 promtail_digests.insert(promtail_digest.clone(), promtail_path);
773 pyroscope_digests.insert(pyroscope_digest.clone(), pyroscope_path);
774 instance_promtail_digest.insert(instance.name.clone(), promtail_digest);
775 instance_pyroscope_digest.insert(instance.name.clone(), pyroscope_digest);
776 }
777
778 let (promtail_digest_to_url, pyroscope_digest_to_url): (
780 HashMap<String, String>,
781 HashMap<String, String>,
782 ) = tokio::try_join!(
783 async {
784 Ok::<_, Error>(
785 try_join_all(promtail_digests.iter().map(|(digest, path)| {
786 let s3_client = s3_client.clone();
787 let digest = digest.clone();
788 let key = promtail_s3_key(tag, &digest);
789 let path = path.clone();
790 async move {
791 let url = cache_file_and_presign(
792 &s3_client,
793 S3_BUCKET_NAME,
794 &key,
795 &path,
796 PRESIGN_DURATION,
797 )
798 .await?;
799 Ok::<_, Error>((digest, url))
800 }
801 }))
802 .await?
803 .into_iter()
804 .collect(),
805 )
806 },
807 async {
808 Ok::<_, Error>(
809 try_join_all(pyroscope_digests.iter().map(|(digest, path)| {
810 let s3_client = s3_client.clone();
811 let digest = digest.clone();
812 let key = pyroscope_s3_key(tag, &digest);
813 let path = path.clone();
814 async move {
815 let url = cache_file_and_presign(
816 &s3_client,
817 S3_BUCKET_NAME,
818 &key,
819 &path,
820 PRESIGN_DURATION,
821 )
822 .await?;
823 Ok::<_, Error>((digest, url))
824 }
825 }))
826 .await?
827 .into_iter()
828 .collect(),
829 )
830 },
831 )?;
832
833 let mut instance_urls_map: HashMap<String, (InstanceUrls, Architecture)> = HashMap::new();
835 for deployment in &deployments {
836 let name = &deployment.instance.name;
837 let arch = instance_architectures[name];
838 let promtail_digest = &instance_promtail_digest[name];
839 let pyroscope_digest = &instance_pyroscope_digest[name];
840 let (_, _, _, _, _, node_exporter_url, promtail_url) = &tool_urls_by_arch[&arch];
841
842 instance_urls_map.insert(
843 name.clone(),
844 (
845 InstanceUrls {
846 binary: instance_binary_urls[name].clone(),
847 config: instance_config_urls[name].clone(),
848 hosts: hosts_url.clone(),
849 promtail_bin: promtail_url.clone(),
850 promtail_config: promtail_digest_to_url[promtail_digest].clone(),
851 promtail_service: promtail_service_url.clone(),
852 node_exporter_bin: node_exporter_url.clone(),
853 node_exporter_service: monitoring_node_exporter_service_url.clone(),
854 binary_service: binary_service_urls_by_arch[&arch].clone(),
855 logrotate_conf: logrotate_conf_url.clone(),
856 pyroscope_script: pyroscope_digest_to_url[pyroscope_digest].clone(),
857 pyroscope_service: pyroscope_agent_service_url.clone(),
858 pyroscope_timer: pyroscope_agent_timer_url.clone(),
859 },
860 arch,
861 ),
862 );
863 }
864 info!("uploaded config files to S3");
865
866 let (prometheus_url, grafana_url, loki_url, pyroscope_url, tempo_url, node_exporter_url, _) =
868 &tool_urls_by_arch[&monitoring_architecture];
869 let monitoring_urls = MonitoringUrls {
870 prometheus_bin: prometheus_url.clone(),
871 grafana_bin: grafana_url.clone(),
872 loki_bin: loki_url.clone(),
873 pyroscope_bin: pyroscope_url.clone(),
874 tempo_bin: tempo_url.clone(),
875 node_exporter_bin: node_exporter_url.clone(),
876 prometheus_config: prometheus_config_url,
877 datasources_yml: datasources_url,
878 all_yml: all_yml_url,
879 dashboard: dashboard_url,
880 loki_yml: loki_yml_url,
881 pyroscope_yml: pyroscope_yml_url,
882 tempo_yml: tempo_yml_url,
883 prometheus_service: prometheus_service_url,
884 loki_service: loki_service_url,
885 pyroscope_service: pyroscope_service_url,
886 tempo_service: tempo_service_url,
887 node_exporter_service: monitoring_node_exporter_service_url.clone(),
888 };
889
890 info!("configuring monitoring and binary instances");
892 let binary_configs: Vec<_> = deployments
893 .iter()
894 .map(|deployment| {
895 let instance = deployment.instance.clone();
896 let deployment_id = deployment.id.clone();
897 let ec2_client = ec2_clients[&instance.region].clone();
898 let ip = deployment.ip.clone();
899 let bbr_url = bbr_conf_url.clone();
900 let (urls, arch) = instance_urls_map.remove(&instance.name).unwrap();
901 (instance, deployment_id, ec2_client, ip, bbr_url, urls, arch)
902 })
903 .collect();
904 let binary_futures = binary_configs.into_iter().map(
905 |(instance, deployment_id, ec2_client, ip, bbr_url, urls, arch)| async move {
906 wait_for_instances_ready(&ec2_client, slice::from_ref(&deployment_id)).await?;
907 enable_bbr(private_key, &ip, &bbr_url).await?;
908 ssh_execute(
909 private_key,
910 &ip,
911 &install_binary_cmd(&urls, instance.profiling, arch),
912 )
913 .await?;
914 poll_service_active(private_key, &ip, "promtail").await?;
915 poll_service_active(private_key, &ip, "node_exporter").await?;
916 poll_service_active(private_key, &ip, "binary").await?;
917 info!(
918 ip = ip.as_str(),
919 instance = instance.name.as_str(),
920 "configured instance"
921 );
922 Ok::<String, Error>(ip)
923 },
924 );
925
926 let (_, all_binary_ips) = tokio::try_join!(
928 async {
929 let monitoring_ec2_client = &ec2_clients[&monitoring_region];
931 wait_for_instances_ready(
932 monitoring_ec2_client,
933 slice::from_ref(&monitoring_instance_id),
934 )
935 .await?;
936 enable_bbr(private_key, &monitoring_ip, &bbr_conf_url).await?;
937 ssh_execute(
938 private_key,
939 &monitoring_ip,
940 &install_monitoring_cmd(
941 &monitoring_urls,
942 PROMETHEUS_VERSION,
943 monitoring_architecture,
944 ),
945 )
946 .await?;
947 ssh_execute(private_key, &monitoring_ip, start_monitoring_services_cmd()).await?;
948 poll_service_active(private_key, &monitoring_ip, "node_exporter").await?;
949 poll_service_active(private_key, &monitoring_ip, "prometheus").await?;
950 poll_service_active(private_key, &monitoring_ip, "loki").await?;
951 poll_service_active(private_key, &monitoring_ip, "pyroscope").await?;
952 poll_service_active(private_key, &monitoring_ip, "tempo").await?;
953 poll_service_active(private_key, &monitoring_ip, "grafana-server").await?;
954 info!(
955 ip = monitoring_ip.as_str(),
956 "configured monitoring instance"
957 );
958 Ok::<(), Error>(())
959 },
960 async {
961 let all_binary_ips = try_join_all(binary_futures).await?;
963 info!("configured binary instances");
964 Ok::<Vec<String>, Error>(all_binary_ips)
965 }
966 )?;
967
968 info!("updating monitoring security group to allow traffic from binary instances");
970 let monitoring_ec2_client = &ec2_clients[&monitoring_region];
971 if binary_regions.contains(&monitoring_region) {
972 let binary_sg_id = region_resources[&monitoring_region]
973 .binary_sg_id
974 .clone()
975 .unwrap();
976 monitoring_ec2_client
977 .authorize_security_group_ingress()
978 .group_id(&monitoring_sg_id)
979 .ip_permissions(
980 IpPermission::builder()
981 .ip_protocol("tcp")
982 .from_port(LOGS_PORT as i32)
983 .to_port(LOGS_PORT as i32)
984 .user_id_group_pairs(
985 UserIdGroupPair::builder()
986 .group_id(binary_sg_id.clone())
987 .build(),
988 )
989 .build(),
990 )
991 .ip_permissions(
992 IpPermission::builder()
993 .ip_protocol("tcp")
994 .from_port(PROFILES_PORT as i32)
995 .to_port(PROFILES_PORT as i32)
996 .user_id_group_pairs(
997 UserIdGroupPair::builder()
998 .group_id(binary_sg_id.clone())
999 .build(),
1000 )
1001 .build(),
1002 )
1003 .ip_permissions(
1004 IpPermission::builder()
1005 .ip_protocol("tcp")
1006 .from_port(TRACES_PORT as i32)
1007 .to_port(TRACES_PORT as i32)
1008 .user_id_group_pairs(
1009 UserIdGroupPair::builder()
1010 .group_id(binary_sg_id.clone())
1011 .build(),
1012 )
1013 .build(),
1014 )
1015 .send()
1016 .await
1017 .map_err(|err| err.into_service_error())?;
1018 info!(
1019 monitoring = monitoring_sg_id.as_str(),
1020 binary = binary_sg_id.as_str(),
1021 region = monitoring_region.as_str(),
1022 "linked monitoring and binary security groups in monitoring region"
1023 );
1024 }
1025 for region in ®ions {
1026 if region != &monitoring_region && binary_regions.contains(region) {
1027 let binary_cidr = ®ion_resources[region].vpc_cidr;
1028 monitoring_ec2_client
1029 .authorize_security_group_ingress()
1030 .group_id(&monitoring_sg_id)
1031 .ip_permissions(
1032 IpPermission::builder()
1033 .ip_protocol("tcp")
1034 .from_port(LOGS_PORT as i32)
1035 .to_port(LOGS_PORT as i32)
1036 .ip_ranges(IpRange::builder().cidr_ip(binary_cidr).build())
1037 .build(),
1038 )
1039 .ip_permissions(
1040 IpPermission::builder()
1041 .ip_protocol("tcp")
1042 .from_port(PROFILES_PORT as i32)
1043 .to_port(PROFILES_PORT as i32)
1044 .ip_ranges(IpRange::builder().cidr_ip(binary_cidr).build())
1045 .build(),
1046 )
1047 .ip_permissions(
1048 IpPermission::builder()
1049 .ip_protocol("tcp")
1050 .from_port(TRACES_PORT as i32)
1051 .to_port(TRACES_PORT as i32)
1052 .ip_ranges(IpRange::builder().cidr_ip(binary_cidr).build())
1053 .build(),
1054 )
1055 .send()
1056 .await
1057 .map_err(|err| err.into_service_error())?;
1058 info!(
1059 monitoring = monitoring_sg_id.as_str(),
1060 binary = binary_cidr.as_str(),
1061 region = region.as_str(),
1062 "opened monitoring port to traffic from binary VPC"
1063 );
1064 }
1065 }
1066 info!("updated monitoring security group");
1067
1068 File::create(tag_directory.join(CREATED_FILE_NAME))?;
1070 info!(
1071 monitoring = monitoring_ip.as_str(),
1072 binary = ?all_binary_ips,
1073 "deployment complete"
1074 );
1075 Ok(())
1076}