Skip to main content

commonware_deployer/aws/
create.rs

1//! `create` subcommand for `ec2`
2
3use crate::aws::{
4    deployer_directory,
5    ec2::{self, *},
6    s3::{self, *},
7    services::*,
8    utils::*,
9    Architecture, Config, Error, Host, Hosts, InstanceConfig, Metadata, CREATED_FILE_NAME,
10    LOGS_PORT, METADATA_FILE_NAME, MONITORING_NAME, MONITORING_REGION, PROFILES_PORT, TRACES_PORT,
11};
12use commonware_cryptography::{Hasher as _, Sha256};
13use futures::{
14    future::try_join_all,
15    stream::{self, StreamExt, TryStreamExt},
16};
17use std::{
18    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
19    fs::File,
20    net::IpAddr,
21    path::PathBuf,
22    slice,
23    time::Instant,
24};
25use tokio::process::Command;
26use tracing::info;
27
28/// Maximum number of instance IDs per DescribeInstances API call
29const MAX_DESCRIBE_BATCH: usize = 1000;
30
31/// Pre-signed URLs for observability tools per architecture
32struct ToolUrls {
33    prometheus: String,
34    grafana: String,
35    loki: String,
36    pyroscope: String,
37    tempo: String,
38    node_exporter: String,
39    promtail: String,
40    libjemalloc: String,
41    logrotate: String,
42    fonts_dejavu_mono: String,
43    fonts_dejavu_core: String,
44    fontconfig_config: String,
45    libfontconfig: String,
46    unzip: String,
47    adduser: String,
48    musl: String,
49}
50
51/// Represents a deployed instance with its configuration and public IP
52#[derive(Clone)]
53pub struct Deployment {
54    pub instance: InstanceConfig,
55    pub id: String,
56    pub ip: String,
57}
58
59/// Represents AWS resources created in a specific region
60pub struct RegionResources {
61    pub vpc_id: String,
62    pub vpc_cidr: String,
63    pub route_table_id: String,
64    pub subnets: Vec<(String, String)>,
65    pub az_support: BTreeMap<String, BTreeSet<String>>,
66    pub binary_sg_id: Option<String>,
67    pub monitoring_sg_id: Option<String>,
68}
69
70/// Sets up EC2 instances, deploys files, and configures monitoring and logging
71pub async fn create(config: &PathBuf, concurrency: usize) -> Result<(), Error> {
72    // Load configuration from YAML file
73    let config: Config = {
74        let config_file = File::open(config)?;
75        serde_yaml::from_reader(config_file)?
76    };
77    let tag = &config.tag;
78    info!(tag = tag.as_str(), "loaded configuration");
79
80    // Ensure no instance is duplicated or named MONITORING_NAME
81    let mut instance_names = HashSet::new();
82    for instance in &config.instances {
83        if !instance_names.insert(&instance.name) {
84            return Err(Error::DuplicateInstanceName(instance.name.clone()));
85        }
86        if instance.name == MONITORING_NAME {
87            return Err(Error::InvalidInstanceName(instance.name.clone()));
88        }
89    }
90
91    // Determine unique regions
92    let mut regions: BTreeSet<String> = config.instances.iter().map(|i| i.region.clone()).collect();
93    regions.insert(MONITORING_REGION.to_string());
94
95    // Validate that all regions are enabled (before writing anything to disk)
96    let ec2_client = ec2::create_client(Region::new(MONITORING_REGION)).await;
97    let enabled_regions = ec2::get_enabled_regions(&ec2_client).await?;
98    let disabled: Vec<_> = regions
99        .iter()
100        .filter(|r| !enabled_regions.contains(*r))
101        .cloned()
102        .collect();
103    if !disabled.is_empty() {
104        return Err(Error::RegionsNotEnabled(disabled));
105    }
106    info!(?regions, "validated all regions are enabled");
107
108    // Create a temporary directory for local files
109    let tag_directory = deployer_directory(Some(tag));
110    if tag_directory.exists() {
111        return Err(Error::CreationAttempted);
112    }
113    std::fs::create_dir_all(&tag_directory)?;
114    info!(path = ?tag_directory, "created tag directory");
115
116    // Get public IP address of the deployer
117    let deployer_ip = get_public_ip().await?;
118    info!(ip = deployer_ip.as_str(), "recovered public IP");
119
120    // Generate SSH key pair
121    let key_name = format!("deployer-{tag}");
122    let private_key_path = tag_directory.join(format!("id_rsa_{tag}"));
123    let public_key_path = tag_directory.join(format!("id_rsa_{tag}.pub"));
124    let output = Command::new("ssh-keygen")
125        .arg("-t")
126        .arg("rsa")
127        .arg("-b")
128        .arg("4096")
129        .arg("-f")
130        .arg(private_key_path.to_str().unwrap())
131        .arg("-N")
132        .arg("")
133        .output()
134        .await?;
135    if !output.status.success() {
136        return Err(Error::KeygenFailed);
137    }
138    let public_key = std::fs::read_to_string(&public_key_path)?;
139    let private_key = private_key_path.to_str().unwrap();
140
141    // Persist deployment metadata early to enable `destroy --tag` on failure
142    let metadata = Metadata {
143        tag: tag.clone(),
144        created_at: std::time::SystemTime::now()
145            .duration_since(std::time::UNIX_EPOCH)
146            .unwrap()
147            .as_secs(),
148        regions: regions.iter().cloned().collect(),
149        instance_count: config.instances.len(),
150    };
151    let metadata_file = File::create(tag_directory.join(METADATA_FILE_NAME))?;
152    serde_yaml::to_writer(metadata_file, &metadata)?;
153    info!("persisted deployment metadata");
154
155    // Collect instance types by region (for availability zone selection) and unique types (for architecture detection)
156    let mut instance_types_by_region: HashMap<String, HashSet<String>> = HashMap::new();
157    let mut unique_instance_types: HashSet<String> = HashSet::new();
158    instance_types_by_region
159        .entry(MONITORING_REGION.to_string())
160        .or_default()
161        .insert(config.monitoring.instance_type.clone());
162    unique_instance_types.insert(config.monitoring.instance_type.clone());
163    for instance in &config.instances {
164        instance_types_by_region
165            .entry(instance.region.clone())
166            .or_default()
167            .insert(instance.instance_type.clone());
168        unique_instance_types.insert(instance.instance_type.clone());
169    }
170
171    // Detect architecture for each unique instance type (architecture is global, not region-specific)
172    info!("detecting architectures for instance types");
173    let ec2_client = ec2::create_client(Region::new(MONITORING_REGION)).await;
174    let mut arch_by_instance_type: HashMap<String, Architecture> = HashMap::new();
175    for instance_type in &unique_instance_types {
176        let arch = detect_architecture(&ec2_client, instance_type).await?;
177        info!(
178            architecture = %arch,
179            instance_type = instance_type.as_str(),
180            "detected architecture"
181        );
182        arch_by_instance_type.insert(instance_type.clone(), arch);
183    }
184
185    // Build per-instance architecture map and collect architectures needed
186    let monitoring_architecture = arch_by_instance_type[&config.monitoring.instance_type];
187    let mut instance_architectures: HashMap<String, Architecture> = HashMap::new();
188    let mut architectures_needed: HashSet<Architecture> = HashSet::new();
189    architectures_needed.insert(monitoring_architecture);
190    for instance in &config.instances {
191        let arch = arch_by_instance_type[&instance.instance_type];
192        instance_architectures.insert(instance.name.clone(), arch);
193        architectures_needed.insert(arch);
194    }
195
196    // Setup S3 bucket and cache observability tools
197    let bucket_name = get_bucket_name();
198    info!(bucket = bucket_name.as_str(), "setting up S3 bucket");
199    let s3_client = s3::create_client(Region::new(MONITORING_REGION)).await;
200    ensure_bucket_exists(&s3_client, &bucket_name, MONITORING_REGION).await?;
201
202    // Cache observability tools for each architecture needed
203    info!("uploading observability tools to S3");
204    let cache_tool = |s3_key: String, download_url: String| {
205        let tag_directory = tag_directory.clone();
206        let s3_client = s3_client.clone();
207        let bucket_name = bucket_name.clone();
208        async move {
209            if object_exists(&s3_client, &bucket_name, &s3_key).await? {
210                info!(key = s3_key.as_str(), "tool already in S3");
211                return presign_url(&s3_client, &bucket_name, &s3_key, PRESIGN_DURATION).await;
212            }
213            info!(
214                key = s3_key.as_str(),
215                "tool not in S3, downloading and uploading"
216            );
217            let temp_path = tag_directory.join(s3_key.replace('/', "_"));
218            download_file(&download_url, &temp_path).await?;
219            let url = cache_and_presign(
220                &s3_client,
221                &bucket_name,
222                &s3_key,
223                UploadSource::File(&temp_path),
224                PRESIGN_DURATION,
225            )
226            .await?;
227            std::fs::remove_file(&temp_path)?;
228            Ok::<_, Error>(url)
229        }
230    };
231
232    // Cache arch-independent packages once before the loop
233    let adduser_url = cache_tool(
234        adduser_bin_s3_key(ADDUSER_VERSION),
235        adduser_download_url(ADDUSER_VERSION),
236    )
237    .await?;
238    let fonts_dejavu_mono_url = cache_tool(
239        fonts_dejavu_mono_bin_s3_key(FONTS_DEJAVU_MONO_VERSION),
240        fonts_dejavu_mono_download_url(FONTS_DEJAVU_MONO_VERSION),
241    )
242    .await?;
243    let fonts_dejavu_core_url = cache_tool(
244        fonts_dejavu_core_bin_s3_key(FONTS_DEJAVU_CORE_VERSION),
245        fonts_dejavu_core_download_url(FONTS_DEJAVU_CORE_VERSION),
246    )
247    .await?;
248    // Cache tools for each architecture and store URLs per-architecture
249    let mut tool_urls_by_arch: HashMap<Architecture, ToolUrls> = HashMap::new();
250    for arch in &architectures_needed {
251        let [prometheus_url, grafana_url, loki_url, pyroscope_url, tempo_url, node_exporter_url, promtail_url,
252             libjemalloc_url, logrotate_url, fontconfig_config_url, libfontconfig_url, unzip_url, musl_url]: [String; 13] =
253            try_join_all([
254                cache_tool(prometheus_bin_s3_key(PROMETHEUS_VERSION, *arch), prometheus_download_url(PROMETHEUS_VERSION, *arch)),
255                cache_tool(grafana_bin_s3_key(GRAFANA_VERSION, *arch), grafana_download_url(GRAFANA_VERSION, *arch)),
256                cache_tool(loki_bin_s3_key(LOKI_VERSION, *arch), loki_download_url(LOKI_VERSION, *arch)),
257                cache_tool(pyroscope_bin_s3_key(PYROSCOPE_VERSION, *arch), pyroscope_download_url(PYROSCOPE_VERSION, *arch)),
258                cache_tool(tempo_bin_s3_key(TEMPO_VERSION, *arch), tempo_download_url(TEMPO_VERSION, *arch)),
259                cache_tool(node_exporter_bin_s3_key(NODE_EXPORTER_VERSION, *arch), node_exporter_download_url(NODE_EXPORTER_VERSION, *arch)),
260                cache_tool(promtail_bin_s3_key(PROMTAIL_VERSION, *arch), promtail_download_url(PROMTAIL_VERSION, *arch)),
261                cache_tool(libjemalloc_bin_s3_key(LIBJEMALLOC2_VERSION, *arch), libjemalloc_download_url(LIBJEMALLOC2_VERSION, *arch)),
262                cache_tool(logrotate_bin_s3_key(LOGROTATE_VERSION, *arch), logrotate_download_url(LOGROTATE_VERSION, *arch)),
263                cache_tool(fontconfig_config_bin_s3_key(FONTCONFIG_CONFIG_VERSION, *arch), fontconfig_config_download_url(FONTCONFIG_CONFIG_VERSION, *arch)),
264                cache_tool(libfontconfig_bin_s3_key(LIBFONTCONFIG1_VERSION, *arch), libfontconfig_download_url(LIBFONTCONFIG1_VERSION, *arch)),
265                cache_tool(unzip_bin_s3_key(UNZIP_VERSION, *arch), unzip_download_url(UNZIP_VERSION, *arch)),
266                cache_tool(musl_bin_s3_key(MUSL_VERSION, *arch), musl_download_url(MUSL_VERSION, *arch)),
267            ])
268            .await?
269            .try_into()
270            .unwrap();
271        tool_urls_by_arch.insert(
272            *arch,
273            ToolUrls {
274                prometheus: prometheus_url,
275                grafana: grafana_url,
276                loki: loki_url,
277                pyroscope: pyroscope_url,
278                tempo: tempo_url,
279                node_exporter: node_exporter_url,
280                promtail: promtail_url,
281                libjemalloc: libjemalloc_url,
282                logrotate: logrotate_url,
283                fonts_dejavu_mono: fonts_dejavu_mono_url.clone(),
284                fonts_dejavu_core: fonts_dejavu_core_url.clone(),
285                fontconfig_config: fontconfig_config_url,
286                libfontconfig: libfontconfig_url,
287                unzip: unzip_url,
288                adduser: adduser_url.clone(),
289                musl: musl_url,
290            },
291        );
292    }
293    info!("observability tools uploaded");
294
295    // Upload unique binaries and configs to S3 (deduplicated by digest)
296    info!("uploading unique binaries and configs to S3");
297    let instance_file_urls =
298        s3::upload_instance_files(&s3_client, &bucket_name, tag, &config.instances).await?;
299    let instance_binary_urls = instance_file_urls.binary_urls;
300    let instance_config_urls = instance_file_urls.config_urls;
301    info!("uploaded all instance binaries and configs");
302
303    // Initialize resources for each region concurrently
304    info!(?regions, "initializing resources");
305    let region_init_futures: Vec<_> = regions
306        .iter()
307        .enumerate()
308        .map(|(idx, region)| {
309            let region = region.clone();
310            let tag = tag.clone();
311            let deployer_ip = deployer_ip.clone();
312            let key_name = key_name.clone();
313            let public_key = public_key.clone();
314            let instance_types: Vec<String> =
315                instance_types_by_region[&region].iter().cloned().collect();
316
317            async move {
318                // Create client for region
319                let ec2_client = ec2::create_client(Region::new(region.clone())).await;
320                info!(region = region.as_str(), "created EC2 client");
321
322                // Find which AZs support which instance types
323                let az_support = find_az_instance_support(&ec2_client, &instance_types).await?;
324                let mut azs: Vec<String> = az_support.keys().cloned().collect();
325                azs.sort();
326                info!(?azs, region = region.as_str(), "found availability zones");
327
328                // Create VPC, IGW, route table
329                let vpc_cidr = format!("10.{idx}.0.0/16");
330                let vpc_id = create_vpc(&ec2_client, &vpc_cidr, &tag).await?;
331                info!(
332                    vpc = vpc_id.as_str(),
333                    region = region.as_str(),
334                    "created VPC"
335                );
336                let igw_id = create_and_attach_igw(&ec2_client, &vpc_id, &tag).await?;
337                info!(
338                    igw = igw_id.as_str(),
339                    vpc = vpc_id.as_str(),
340                    region = region.as_str(),
341                    "created and attached IGW"
342                );
343                let route_table_id =
344                    create_route_table(&ec2_client, &vpc_id, &igw_id, &tag).await?;
345                info!(
346                    route_table = route_table_id.as_str(),
347                    vpc = vpc_id.as_str(),
348                    region = region.as_str(),
349                    "created route table"
350                );
351
352                // Create a subnet in each AZ concurrently
353                let subnet_futures: Vec<_> = azs
354                    .iter()
355                    .enumerate()
356                    .map(|(az_idx, az)| {
357                        let ec2_client = ec2_client.clone();
358                        let vpc_id = vpc_id.clone();
359                        let route_table_id = route_table_id.clone();
360                        let tag = tag.clone();
361                        let az = az.clone();
362                        let region = region.clone();
363                        async move {
364                            let subnet_cidr = format!("10.{idx}.{az_idx}.0/24");
365                            let subnet_id = create_subnet(
366                                &ec2_client,
367                                &vpc_id,
368                                &route_table_id,
369                                &subnet_cidr,
370                                &az,
371                                &tag,
372                            )
373                            .await?;
374                            info!(
375                                subnet = subnet_id.as_str(),
376                                az = az.as_str(),
377                                region = region.as_str(),
378                                "created subnet"
379                            );
380                            Ok::<(String, String), Error>((az, subnet_id))
381                        }
382                    })
383                    .collect();
384                let subnets = try_join_all(subnet_futures).await?;
385
386                // Create monitoring security group in monitoring region
387                let monitoring_sg_id = if region == MONITORING_REGION {
388                    let sg_id =
389                        create_security_group_monitoring(&ec2_client, &vpc_id, &deployer_ip, &tag)
390                            .await?;
391                    info!(
392                        sg = sg_id.as_str(),
393                        vpc = vpc_id.as_str(),
394                        region = region.as_str(),
395                        "created monitoring security group"
396                    );
397                    Some(sg_id)
398                } else {
399                    None
400                };
401
402                // Import key pair
403                import_key_pair(&ec2_client, &key_name, &public_key).await?;
404                info!(
405                    key = key_name.as_str(),
406                    region = region.as_str(),
407                    "imported key pair"
408                );
409
410                info!(
411                    vpc = vpc_id.as_str(),
412                    subnet_count = subnets.len(),
413                    region = region.as_str(),
414                    "initialized resources"
415                );
416
417                Ok::<_, Error>((
418                    region,
419                    ec2_client,
420                    RegionResources {
421                        vpc_id,
422                        vpc_cidr,
423                        route_table_id,
424                        subnets,
425                        az_support,
426                        binary_sg_id: None,
427                        monitoring_sg_id,
428                    },
429                ))
430            }
431        })
432        .collect();
433
434    let region_results = try_join_all(region_init_futures).await?;
435    let (ec2_clients, mut region_resources): (HashMap<_, _>, HashMap<_, _>) = region_results
436        .into_iter()
437        .map(|(region, client, resources)| ((region.clone(), client), (region, resources)))
438        .unzip();
439    info!(?regions, "initialized resources");
440
441    // Create binary security groups (without monitoring IP - added later for parallel launch)
442    info!("creating binary security groups");
443    let binary_sg_futures: Vec<_> = region_resources
444        .iter()
445        .map(|(region, resources)| {
446            let region = region.clone();
447            let ec2_client = ec2_clients[&region].clone();
448            let vpc_id = resources.vpc_id.clone();
449            let deployer_ip = deployer_ip.clone();
450            let tag = tag.clone();
451            let ports = config.ports.clone();
452            async move {
453                let binary_sg_id =
454                    create_security_group_binary(&ec2_client, &vpc_id, &deployer_ip, &tag, &ports)
455                        .await?;
456                info!(
457                    sg = binary_sg_id.as_str(),
458                    vpc = vpc_id.as_str(),
459                    region = region.as_str(),
460                    "created binary security group"
461                );
462                Ok::<_, Error>((region, binary_sg_id))
463            }
464        })
465        .collect();
466    for (region, binary_sg_id) in try_join_all(binary_sg_futures).await? {
467        region_resources.get_mut(&region).unwrap().binary_sg_id = Some(binary_sg_id);
468    }
469    info!("created binary security groups");
470
471    // Setup VPC peering connections concurrently
472    info!("initializing VPC peering connections");
473    let monitoring_region = MONITORING_REGION.to_string();
474    let monitoring_resources = region_resources.get(&monitoring_region).unwrap();
475    let monitoring_vpc_id = &monitoring_resources.vpc_id;
476    let monitoring_cidr = &monitoring_resources.vpc_cidr;
477    let monitoring_route_table_id = &monitoring_resources.route_table_id;
478    let binary_regions: HashSet<String> =
479        config.instances.iter().map(|i| i.region.clone()).collect();
480    let peering_futures: Vec<_> = regions
481        .iter()
482        .filter(|region| *region != &monitoring_region && binary_regions.contains(*region))
483        .map(|region| {
484            let region = region.clone();
485            let monitoring_ec2_client = ec2_clients[&monitoring_region].clone();
486            let binary_ec2_client = ec2_clients[&region].clone();
487            let monitoring_vpc_id = monitoring_vpc_id.clone();
488            let monitoring_cidr = monitoring_cidr.clone();
489            let monitoring_route_table_id = monitoring_route_table_id.clone();
490            let binary_resources = region_resources.get(&region).unwrap();
491            let binary_vpc_id = binary_resources.vpc_id.clone();
492            let binary_cidr = binary_resources.vpc_cidr.clone();
493            let binary_route_table_id = binary_resources.route_table_id.clone();
494            let tag = tag.clone();
495            async move {
496                let peer_id = create_vpc_peering_connection(
497                    &monitoring_ec2_client,
498                    &monitoring_vpc_id,
499                    &binary_vpc_id,
500                    &region,
501                    &tag,
502                )
503                .await?;
504                info!(
505                    peer = peer_id.as_str(),
506                    monitoring = monitoring_vpc_id.as_str(),
507                    binary = binary_vpc_id.as_str(),
508                    region = region.as_str(),
509                    "created VPC peering connection"
510                );
511                wait_for_vpc_peering_connection(&binary_ec2_client, &peer_id).await?;
512                info!(
513                    peer = peer_id.as_str(),
514                    region = region.as_str(),
515                    "VPC peering connection is available"
516                );
517                accept_vpc_peering_connection(&binary_ec2_client, &peer_id).await?;
518                info!(
519                    peer = peer_id.as_str(),
520                    region = region.as_str(),
521                    "accepted VPC peering connection"
522                );
523                add_route(
524                    &monitoring_ec2_client,
525                    &monitoring_route_table_id,
526                    &binary_cidr,
527                    &peer_id,
528                )
529                .await?;
530                add_route(
531                    &binary_ec2_client,
532                    &binary_route_table_id,
533                    &monitoring_cidr,
534                    &peer_id,
535                )
536                .await?;
537                info!(
538                    peer = peer_id.as_str(),
539                    monitoring = monitoring_vpc_id.as_str(),
540                    binary = binary_vpc_id.as_str(),
541                    region = region.as_str(),
542                    "added routes for VPC peering connection"
543                );
544                Ok::<_, Error>(())
545            }
546        })
547        .collect();
548    try_join_all(peering_futures).await?;
549    info!("initialized VPC peering connections");
550
551    // Prepare launch configurations for all instances
552    info!("launching instances");
553    let monitoring_ec2_client = &ec2_clients[&monitoring_region];
554    let monitoring_ami_id = find_latest_ami(monitoring_ec2_client, monitoring_architecture).await?;
555    let monitoring_instance_type =
556        InstanceType::try_parse(&config.monitoring.instance_type).expect("Invalid instance type");
557    let monitoring_storage_class =
558        VolumeType::try_parse(&config.monitoring.storage_class).expect("Invalid storage class");
559    let monitoring_sg_id = monitoring_resources
560        .monitoring_sg_id
561        .as_ref()
562        .unwrap()
563        .clone();
564    let monitoring_subnets = monitoring_resources.subnets.clone();
565    let monitoring_az_support = monitoring_resources.az_support.clone();
566
567    // Lookup AMI IDs for binary instances
568    let mut ami_cache: HashMap<(String, Architecture), String> = HashMap::new();
569    ami_cache.insert(
570        (monitoring_region.clone(), monitoring_architecture),
571        monitoring_ami_id.clone(),
572    );
573    info!(
574        region = monitoring_region.as_str(),
575        architecture = %monitoring_architecture,
576        ami_id = monitoring_ami_id.as_str(),
577        "selected AMI"
578    );
579    let mut binary_launch_configs = Vec::new();
580    for instance in &config.instances {
581        let region = instance.region.clone();
582        let resources = region_resources.get(&region).unwrap();
583        let ec2_client = ec2_clients.get(&region).unwrap();
584        let arch = instance_architectures[&instance.name];
585        let ami_id = match ami_cache.get(&(region.clone(), arch)) {
586            Some(id) => id.clone(),
587            None => {
588                let id = find_latest_ami(ec2_client, arch).await?;
589                ami_cache.insert((region.clone(), arch), id.clone());
590                info!(
591                    region = region.as_str(),
592                    architecture = %arch,
593                    ami_id = id.as_str(),
594                    "selected AMI"
595                );
596                id
597            }
598        };
599        binary_launch_configs.push((instance, ec2_client, resources, ami_id, arch));
600    }
601
602    // Launch monitoring instance (uses start_idx=0 since there's only one)
603    let monitoring_launch_future = {
604        let key_name = key_name.clone();
605        let tag = tag.clone();
606        let sg_id = monitoring_sg_id.clone();
607        async move {
608            let (mut ids, az) = launch_instances(
609                monitoring_ec2_client,
610                &monitoring_ami_id,
611                monitoring_instance_type,
612                config.monitoring.storage_size,
613                monitoring_storage_class,
614                &key_name,
615                &monitoring_subnets,
616                &monitoring_az_support,
617                0,
618                &sg_id,
619                1,
620                MONITORING_NAME,
621                &tag,
622            )
623            .await?;
624            let instance_id = ids.remove(0);
625            info!(
626                instance_id = instance_id.as_str(),
627                az = az.as_str(),
628                "launched monitoring instance"
629            );
630            Ok::<String, Error>(instance_id)
631        }
632    };
633
634    // Launch binary instances, distributing across AZs by using instance index as start_idx
635    let binary_launch_futures = binary_launch_configs.iter().enumerate().map(
636        |(idx, (instance, ec2_client, resources, ami_id, _arch))| {
637            let key_name = key_name.clone();
638            let instance_type =
639                InstanceType::try_parse(&instance.instance_type).expect("Invalid instance type");
640            let storage_class =
641                VolumeType::try_parse(&instance.storage_class).expect("Invalid storage class");
642            let binary_sg_id = resources.binary_sg_id.as_ref().unwrap();
643            let tag = tag.clone();
644            let instance_name = instance.name.clone();
645            let region = instance.region.clone();
646            let subnets = resources.subnets.clone();
647            let az_support = resources.az_support.clone();
648            async move {
649                let (mut ids, az) = launch_instances(
650                    ec2_client,
651                    ami_id,
652                    instance_type,
653                    instance.storage_size,
654                    storage_class,
655                    &key_name,
656                    &subnets,
657                    &az_support,
658                    idx,
659                    binary_sg_id,
660                    1,
661                    &instance.name,
662                    &tag,
663                )
664                .await?;
665                let instance_id = ids.remove(0);
666                info!(
667                    instance_id = instance_id.as_str(),
668                    instance = instance_name.as_str(),
669                    az = az.as_str(),
670                    "launched instance"
671                );
672                Ok::<(String, String, InstanceConfig), Error>((
673                    instance_id,
674                    region,
675                    (*instance).clone(),
676                ))
677            }
678        },
679    );
680
681    // Wait for all launches to complete (get instance IDs)
682    let (monitoring_instance_id, binary_launches) = tokio::try_join!(
683        monitoring_launch_future,
684        try_join_all(binary_launch_futures)
685    )?;
686    info!("instances requested");
687
688    // Group binary instances by region for batched DescribeInstances calls
689    let mut instances_by_region: HashMap<String, Vec<(String, InstanceConfig)>> = HashMap::new();
690    for (instance_id, region, instance_config) in binary_launches {
691        instances_by_region
692            .entry(region)
693            .or_default()
694            .push((instance_id, instance_config));
695    }
696
697    // Wait for instances to be running, batched by region
698    let wait_futures = instances_by_region
699        .into_iter()
700        .flat_map(|(region, instances)| {
701            let ec2_client = ec2_clients[&region].clone();
702            instances
703                .chunks(MAX_DESCRIBE_BATCH)
704                .map(move |chunk| {
705                    let ec2_client = ec2_client.clone();
706                    let chunk: Vec<_> = chunk.to_vec();
707                    let region = region.clone();
708                    async move {
709                        let instance_ids: Vec<String> =
710                            chunk.iter().map(|(id, _)| id.clone()).collect();
711                        let ips = wait_for_instances_running(&ec2_client, &instance_ids).await?;
712                        info!(
713                            region = region.as_str(),
714                            count = chunk.len(),
715                            "instances running in region"
716                        );
717                        let deployments: Vec<Deployment> = chunk
718                            .into_iter()
719                            .zip(ips)
720                            .map(|((instance_id, instance_config), ip)| Deployment {
721                                instance: instance_config,
722                                id: instance_id,
723                                ip,
724                            })
725                            .collect();
726                        Ok::<Vec<Deployment>, Error>(deployments)
727                    }
728                })
729                .collect::<Vec<_>>()
730        });
731
732    // Wait for monitoring instance and all binary instances in parallel
733    let (monitoring_ips, binary_deployment_batches) = tokio::try_join!(
734        async {
735            wait_for_instances_running(
736                monitoring_ec2_client,
737                slice::from_ref(&monitoring_instance_id),
738            )
739            .await
740            .map_err(Error::AwsEc2)
741        },
742        try_join_all(wait_futures)
743    )?;
744    let monitoring_ip = monitoring_ips[0].clone();
745    let monitoring_private_ip =
746        get_private_ip(monitoring_ec2_client, &monitoring_instance_id).await?;
747    let deployments: Vec<Deployment> = binary_deployment_batches.into_iter().flatten().collect();
748    info!(ip = monitoring_ip.as_str(), "monitoring instance running");
749    info!("launched instances");
750
751    // Add monitoring IP rules to binary security groups (for Prometheus scraping).
752    // This happens after instance launch but before instance configuration, so there's
753    // no window where Prometheus would try to scrape unconfigured instances.
754    info!("adding monitoring ingress rules");
755    for (region, resources) in region_resources.iter() {
756        let binary_sg_id = resources.binary_sg_id.as_ref().unwrap();
757        add_monitoring_ingress(&ec2_clients[region], binary_sg_id, &monitoring_ip).await?;
758    }
759    info!("added monitoring ingress rules");
760
761    // Cache static config files globally (these don't change between deployments)
762    info!("uploading config files to S3");
763    let [
764        datasources_url,
765        all_yml_url,
766        loki_yml_url,
767        pyroscope_yml_url,
768        tempo_yml_url,
769        prometheus_service_url,
770        loki_service_url,
771        pyroscope_service_url,
772        tempo_service_url,
773        monitoring_node_exporter_service_url,
774        promtail_service_url,
775        logrotate_conf_url,
776        pyroscope_agent_service_url,
777        pyroscope_agent_timer_url,
778    ]: [String; 14] = try_join_all([
779        cache_and_presign(&s3_client, &bucket_name, &grafana_datasources_s3_key(), UploadSource::Static(DATASOURCES_YML.as_bytes()), PRESIGN_DURATION),
780        cache_and_presign(&s3_client, &bucket_name, &grafana_dashboards_s3_key(), UploadSource::Static(ALL_YML.as_bytes()), PRESIGN_DURATION),
781        cache_and_presign(&s3_client, &bucket_name, &loki_config_s3_key(), UploadSource::Static(LOKI_CONFIG.as_bytes()), PRESIGN_DURATION),
782        cache_and_presign(&s3_client, &bucket_name, &pyroscope_config_s3_key(), UploadSource::Static(PYROSCOPE_CONFIG.as_bytes()), PRESIGN_DURATION),
783        cache_and_presign(&s3_client, &bucket_name, &tempo_config_s3_key(), UploadSource::Static(TEMPO_CONFIG.as_bytes()), PRESIGN_DURATION),
784        cache_and_presign(&s3_client, &bucket_name, &prometheus_service_s3_key(), UploadSource::Static(PROMETHEUS_SERVICE.as_bytes()), PRESIGN_DURATION),
785        cache_and_presign(&s3_client, &bucket_name, &loki_service_s3_key(), UploadSource::Static(LOKI_SERVICE.as_bytes()), PRESIGN_DURATION),
786        cache_and_presign(&s3_client, &bucket_name, &pyroscope_service_s3_key(), UploadSource::Static(PYROSCOPE_SERVICE.as_bytes()), PRESIGN_DURATION),
787        cache_and_presign(&s3_client, &bucket_name, &tempo_service_s3_key(), UploadSource::Static(TEMPO_SERVICE.as_bytes()), PRESIGN_DURATION),
788        cache_and_presign(&s3_client, &bucket_name, &node_exporter_service_s3_key(), UploadSource::Static(NODE_EXPORTER_SERVICE.as_bytes()), PRESIGN_DURATION),
789        cache_and_presign(&s3_client, &bucket_name, &promtail_service_s3_key(), UploadSource::Static(PROMTAIL_SERVICE.as_bytes()), PRESIGN_DURATION),
790        cache_and_presign(&s3_client, &bucket_name, &logrotate_config_s3_key(), UploadSource::Static(LOGROTATE_CONF.as_bytes()), PRESIGN_DURATION),
791        cache_and_presign(&s3_client, &bucket_name, &pyroscope_agent_service_s3_key(), UploadSource::Static(PYROSCOPE_AGENT_SERVICE.as_bytes()), PRESIGN_DURATION),
792        cache_and_presign(&s3_client, &bucket_name, &pyroscope_agent_timer_s3_key(), UploadSource::Static(PYROSCOPE_AGENT_TIMER.as_bytes()), PRESIGN_DURATION),
793    ])
794    .await?
795    .try_into()
796    .unwrap();
797
798    // Cache binary_service per architecture
799    let mut binary_service_urls_by_arch: HashMap<Architecture, String> = HashMap::new();
800    for arch in &architectures_needed {
801        let binary_service_content = binary_service(*arch);
802        let temp_path = tag_directory.join(format!("binary-{}.service", arch.as_str()));
803        std::fs::write(&temp_path, &binary_service_content)?;
804        let binary_service_url = cache_and_presign(
805            &s3_client,
806            &bucket_name,
807            &binary_service_s3_key_for_arch(*arch),
808            UploadSource::File(&temp_path),
809            PRESIGN_DURATION,
810        )
811        .await?;
812        std::fs::remove_file(&temp_path)?;
813        binary_service_urls_by_arch.insert(*arch, binary_service_url);
814    }
815
816    // Upload deployment-specific monitoring config files (deduplicated by digest)
817    let instances: Vec<(&str, &str, &str, &str)> = deployments
818        .iter()
819        .map(|d| {
820            let arch = instance_architectures[&d.instance.name];
821            (
822                d.instance.name.as_str(),
823                d.ip.as_str(),
824                d.instance.region.as_str(),
825                arch.as_str(),
826            )
827        })
828        .collect();
829    let prom_config = generate_prometheus_config(&instances);
830    let prom_digest = Sha256::hash(prom_config.as_bytes()).to_string();
831    let prom_path = tag_directory.join("prometheus.yml");
832    std::fs::write(&prom_path, &prom_config)?;
833    let dashboard_path = std::path::PathBuf::from(&config.monitoring.dashboard);
834    let dashboard_digest = hash_file(&dashboard_path).await?;
835    let [prometheus_config_url, dashboard_url]: [String; 2] = try_join_all([
836        cache_and_presign(
837            &s3_client,
838            &bucket_name,
839            &monitoring_s3_key(tag, &prom_digest),
840            UploadSource::File(&prom_path),
841            PRESIGN_DURATION,
842        ),
843        cache_and_presign(
844            &s3_client,
845            &bucket_name,
846            &monitoring_s3_key(tag, &dashboard_digest),
847            UploadSource::File(&dashboard_path),
848            PRESIGN_DURATION,
849        ),
850    ])
851    .await?
852    .try_into()
853    .unwrap();
854
855    // Generate hosts.yaml and upload once (shared by all instances)
856    let hosts = Hosts {
857        monitoring: monitoring_private_ip.clone().parse::<IpAddr>().unwrap(),
858        hosts: deployments
859            .iter()
860            .map(|d| Host {
861                name: d.instance.name.clone(),
862                region: d.instance.region.clone(),
863                ip: d.ip.clone().parse::<IpAddr>().unwrap(),
864            })
865            .collect(),
866    };
867    let hosts_yaml = serde_yaml::to_string(&hosts)?;
868    let hosts_digest = Sha256::hash(hosts_yaml.as_bytes()).to_string();
869    let hosts_path = tag_directory.join("hosts.yaml");
870    std::fs::write(&hosts_path, &hosts_yaml)?;
871    let hosts_url = cache_and_presign(
872        &s3_client,
873        &bucket_name,
874        &hosts_s3_key(tag, &hosts_digest),
875        UploadSource::File(&hosts_path),
876        PRESIGN_DURATION,
877    )
878    .await?;
879
880    // Write per-instance config files locally and compute digests
881    let mut promtail_digests: BTreeMap<String, std::path::PathBuf> = BTreeMap::new();
882    let mut pyroscope_digests: BTreeMap<String, std::path::PathBuf> = BTreeMap::new();
883    let mut instance_promtail_digest: HashMap<String, String> = HashMap::new();
884    let mut instance_pyroscope_digest: HashMap<String, String> = HashMap::new();
885    for deployment in &deployments {
886        let instance = &deployment.instance;
887        let ip = &deployment.ip;
888        let arch = instance_architectures[&instance.name].as_str();
889
890        let promtail_cfg = promtail_config(
891            &monitoring_private_ip,
892            &instance.name,
893            ip,
894            &instance.region,
895            arch,
896        );
897        let promtail_digest = Sha256::hash(promtail_cfg.as_bytes()).to_string();
898        let promtail_path = tag_directory.join(format!("promtail_{}.yml", instance.name));
899        std::fs::write(&promtail_path, &promtail_cfg)?;
900
901        let pyroscope_script = generate_pyroscope_script(
902            &monitoring_private_ip,
903            &instance.name,
904            ip,
905            &instance.region,
906            arch,
907        );
908        let pyroscope_digest = Sha256::hash(pyroscope_script.as_bytes()).to_string();
909        let pyroscope_path = tag_directory.join(format!("pyroscope-agent_{}.sh", instance.name));
910        std::fs::write(&pyroscope_path, &pyroscope_script)?;
911
912        promtail_digests
913            .entry(promtail_digest.clone())
914            .or_insert(promtail_path);
915        pyroscope_digests
916            .entry(pyroscope_digest.clone())
917            .or_insert(pyroscope_path);
918        instance_promtail_digest.insert(instance.name.clone(), promtail_digest);
919        instance_pyroscope_digest.insert(instance.name.clone(), pyroscope_digest);
920    }
921
922    // Upload unique promtail and pyroscope configs
923    let (promtail_digest_to_url, pyroscope_digest_to_url): (
924        HashMap<String, String>,
925        HashMap<String, String>,
926    ) = tokio::try_join!(
927        async {
928            Ok::<_, Error>(
929                try_join_all(promtail_digests.iter().map(|(digest, path)| {
930                    let s3_client = s3_client.clone();
931                    let bucket_name = bucket_name.clone();
932                    let digest = digest.clone();
933                    let key = promtail_s3_key(tag, &digest);
934                    let path = path.clone();
935                    async move {
936                        let url = cache_and_presign(
937                            &s3_client,
938                            &bucket_name,
939                            &key,
940                            UploadSource::File(&path),
941                            PRESIGN_DURATION,
942                        )
943                        .await?;
944                        Ok::<_, Error>((digest, url))
945                    }
946                }))
947                .await?
948                .into_iter()
949                .collect(),
950            )
951        },
952        async {
953            Ok::<_, Error>(
954                try_join_all(pyroscope_digests.iter().map(|(digest, path)| {
955                    let s3_client = s3_client.clone();
956                    let bucket_name = bucket_name.clone();
957                    let digest = digest.clone();
958                    let key = pyroscope_s3_key(tag, &digest);
959                    let path = path.clone();
960                    async move {
961                        let url = cache_and_presign(
962                            &s3_client,
963                            &bucket_name,
964                            &key,
965                            UploadSource::File(&path),
966                            PRESIGN_DURATION,
967                        )
968                        .await?;
969                        Ok::<_, Error>((digest, url))
970                    }
971                }))
972                .await?
973                .into_iter()
974                .collect(),
975            )
976        },
977    )?;
978
979    // Build instance URLs map (using architecture-specific tool URLs)
980    let mut instance_urls_map: HashMap<String, (InstanceUrls, Architecture)> = HashMap::new();
981    for deployment in &deployments {
982        let name = &deployment.instance.name;
983        let arch = instance_architectures[name];
984        let promtail_digest = &instance_promtail_digest[name];
985        let pyroscope_digest = &instance_pyroscope_digest[name];
986        let tool_urls = &tool_urls_by_arch[&arch];
987
988        instance_urls_map.insert(
989            name.clone(),
990            (
991                InstanceUrls {
992                    binary: instance_binary_urls[name].clone(),
993                    config: instance_config_urls[name].clone(),
994                    hosts: hosts_url.clone(),
995                    promtail_bin: tool_urls.promtail.clone(),
996                    promtail_config: promtail_digest_to_url[promtail_digest].clone(),
997                    promtail_service: promtail_service_url.clone(),
998                    node_exporter_bin: tool_urls.node_exporter.clone(),
999                    node_exporter_service: monitoring_node_exporter_service_url.clone(),
1000                    binary_service: binary_service_urls_by_arch[&arch].clone(),
1001                    logrotate_conf: logrotate_conf_url.clone(),
1002                    pyroscope_script: pyroscope_digest_to_url[pyroscope_digest].clone(),
1003                    pyroscope_service: pyroscope_agent_service_url.clone(),
1004                    pyroscope_timer: pyroscope_agent_timer_url.clone(),
1005                    libjemalloc_deb: tool_urls.libjemalloc.clone(),
1006                    logrotate_deb: tool_urls.logrotate.clone(),
1007                    unzip_deb: tool_urls.unzip.clone(),
1008                },
1009                arch,
1010            ),
1011        );
1012    }
1013    info!("uploaded config files to S3");
1014
1015    // Build monitoring URLs struct for SSH configuration (using monitoring architecture)
1016    let tool_urls = &tool_urls_by_arch[&monitoring_architecture];
1017    let monitoring_urls = MonitoringUrls {
1018        prometheus_bin: tool_urls.prometheus.clone(),
1019        grafana_bin: tool_urls.grafana.clone(),
1020        loki_bin: tool_urls.loki.clone(),
1021        pyroscope_bin: tool_urls.pyroscope.clone(),
1022        tempo_bin: tool_urls.tempo.clone(),
1023        node_exporter_bin: tool_urls.node_exporter.clone(),
1024        fonts_dejavu_mono_deb: tool_urls.fonts_dejavu_mono.clone(),
1025        fonts_dejavu_core_deb: tool_urls.fonts_dejavu_core.clone(),
1026        fontconfig_config_deb: tool_urls.fontconfig_config.clone(),
1027        libfontconfig_deb: tool_urls.libfontconfig.clone(),
1028        unzip_deb: tool_urls.unzip.clone(),
1029        adduser_deb: tool_urls.adduser.clone(),
1030        musl_deb: tool_urls.musl.clone(),
1031        prometheus_config: prometheus_config_url,
1032        datasources_yml: datasources_url,
1033        all_yml: all_yml_url,
1034        dashboard: dashboard_url,
1035        loki_yml: loki_yml_url,
1036        pyroscope_yml: pyroscope_yml_url,
1037        tempo_yml: tempo_yml_url,
1038        prometheus_service: prometheus_service_url,
1039        loki_service: loki_service_url,
1040        pyroscope_service: pyroscope_service_url,
1041        tempo_service: tempo_service_url,
1042        node_exporter_service: monitoring_node_exporter_service_url.clone(),
1043    };
1044
1045    // Prepare binary instance configuration futures
1046    info!("configuring monitoring and binary instances");
1047    let binary_configs: Vec<_> = deployments
1048        .iter()
1049        .map(|deployment| {
1050            let instance = deployment.instance.clone();
1051            let deployment_id = deployment.id.clone();
1052            let ec2_client = ec2_clients[&instance.region].clone();
1053            let ip = deployment.ip.clone();
1054            let (urls, arch) = instance_urls_map.remove(&instance.name).unwrap();
1055            (instance, deployment_id, ec2_client, ip, urls, arch)
1056        })
1057        .collect();
1058    let binary_futures = binary_configs.into_iter().map(
1059        |(instance, deployment_id, ec2_client, ip, urls, arch)| async move {
1060            let start = Instant::now();
1061
1062            wait_for_instances_ready(&ec2_client, slice::from_ref(&deployment_id)).await?;
1063            let deploy = format!("{:.1}s", start.elapsed().as_secs_f64());
1064
1065            let download_start = Instant::now();
1066            if let Some(apt_cmd) = install_binary_apt_cmd(instance.profiling) {
1067                ssh_execute(private_key, &ip, apt_cmd).await?;
1068            }
1069            ssh_execute(private_key, &ip, &install_binary_download_cmd(&urls)).await?;
1070            let download = format!("{:.1}s", download_start.elapsed().as_secs_f64());
1071
1072            let setup_start = Instant::now();
1073            ssh_execute(
1074                private_key,
1075                &ip,
1076                &install_binary_setup_cmd(instance.profiling, arch),
1077            )
1078            .await?;
1079            let setup = format!("{:.1}s", setup_start.elapsed().as_secs_f64());
1080
1081            let start_time = Instant::now();
1082            poll_service_active(private_key, &ip, "promtail").await?;
1083            poll_service_active(private_key, &ip, "node_exporter").await?;
1084            poll_service_active(private_key, &ip, "binary").await?;
1085            let start_dur = format!("{:.1}s", start_time.elapsed().as_secs_f64());
1086
1087            info!(
1088                ip,
1089                instance = instance.name.as_str(),
1090                deploy,
1091                download,
1092                setup,
1093                start = start_dur,
1094                "configured instance"
1095            );
1096            Ok::<String, Error>(ip)
1097        },
1098    );
1099
1100    // Run monitoring and binary configuration in parallel
1101    let (_, all_binary_ips) = tokio::try_join!(
1102        async {
1103            // Configure monitoring instance
1104            let start = Instant::now();
1105
1106            let monitoring_ec2_client = &ec2_clients[&monitoring_region];
1107            wait_for_instances_ready(
1108                monitoring_ec2_client,
1109                slice::from_ref(&monitoring_instance_id),
1110            )
1111            .await?;
1112            let deploy = format!("{:.1}s", start.elapsed().as_secs_f64());
1113
1114            let download_start = Instant::now();
1115            ssh_execute(
1116                private_key,
1117                &monitoring_ip,
1118                &install_monitoring_download_cmd(&monitoring_urls),
1119            )
1120            .await?;
1121            let download = format!("{:.1}s", download_start.elapsed().as_secs_f64());
1122
1123            let setup_start = Instant::now();
1124            ssh_execute(
1125                private_key,
1126                &monitoring_ip,
1127                &install_monitoring_setup_cmd(PROMETHEUS_VERSION, monitoring_architecture),
1128            )
1129            .await?;
1130            ssh_execute(private_key, &monitoring_ip, start_monitoring_services_cmd()).await?;
1131            let setup = format!("{:.1}s", setup_start.elapsed().as_secs_f64());
1132
1133            let start_time = Instant::now();
1134            poll_service_active(private_key, &monitoring_ip, "node_exporter").await?;
1135            poll_service_active(private_key, &monitoring_ip, "prometheus").await?;
1136            poll_service_active(private_key, &monitoring_ip, "loki").await?;
1137            poll_service_active(private_key, &monitoring_ip, "pyroscope").await?;
1138            poll_service_active(private_key, &monitoring_ip, "tempo").await?;
1139            poll_service_active(private_key, &monitoring_ip, "grafana-server").await?;
1140            let start_dur = format!("{:.1}s", start_time.elapsed().as_secs_f64());
1141
1142            info!(
1143                ip = monitoring_ip.as_str(),
1144                deploy,
1145                download,
1146                setup,
1147                start = start_dur,
1148                "configured monitoring instance"
1149            );
1150            Ok::<(), Error>(())
1151        },
1152        async {
1153            // Configure binary instances (limited concurrency to avoid SSH overload)
1154            let all_binary_ips: Vec<String> = stream::iter(binary_futures)
1155                .buffer_unordered(concurrency)
1156                .try_collect()
1157                .await?;
1158            info!("configured binary instances");
1159            Ok::<Vec<String>, Error>(all_binary_ips)
1160        }
1161    )?;
1162
1163    // Update monitoring security group to restrict Loki port (3100)
1164    info!("updating monitoring security group to allow traffic from binary instances");
1165    let monitoring_ec2_client = &ec2_clients[&monitoring_region];
1166    if binary_regions.contains(&monitoring_region) {
1167        let binary_sg_id = region_resources[&monitoring_region]
1168            .binary_sg_id
1169            .clone()
1170            .unwrap();
1171        monitoring_ec2_client
1172            .authorize_security_group_ingress()
1173            .group_id(&monitoring_sg_id)
1174            .ip_permissions(
1175                IpPermission::builder()
1176                    .ip_protocol("tcp")
1177                    .from_port(LOGS_PORT as i32)
1178                    .to_port(LOGS_PORT as i32)
1179                    .user_id_group_pairs(
1180                        UserIdGroupPair::builder()
1181                            .group_id(binary_sg_id.clone())
1182                            .build(),
1183                    )
1184                    .build(),
1185            )
1186            .ip_permissions(
1187                IpPermission::builder()
1188                    .ip_protocol("tcp")
1189                    .from_port(PROFILES_PORT as i32)
1190                    .to_port(PROFILES_PORT as i32)
1191                    .user_id_group_pairs(
1192                        UserIdGroupPair::builder()
1193                            .group_id(binary_sg_id.clone())
1194                            .build(),
1195                    )
1196                    .build(),
1197            )
1198            .ip_permissions(
1199                IpPermission::builder()
1200                    .ip_protocol("tcp")
1201                    .from_port(TRACES_PORT as i32)
1202                    .to_port(TRACES_PORT as i32)
1203                    .user_id_group_pairs(
1204                        UserIdGroupPair::builder()
1205                            .group_id(binary_sg_id.clone())
1206                            .build(),
1207                    )
1208                    .build(),
1209            )
1210            .send()
1211            .await
1212            .map_err(|err| err.into_service_error())?;
1213        info!(
1214            monitoring = monitoring_sg_id.as_str(),
1215            binary = binary_sg_id.as_str(),
1216            region = monitoring_region.as_str(),
1217            "linked monitoring and binary security groups in monitoring region"
1218        );
1219    }
1220    for region in &regions {
1221        if region != &monitoring_region && binary_regions.contains(region) {
1222            let binary_cidr = &region_resources[region].vpc_cidr;
1223            monitoring_ec2_client
1224                .authorize_security_group_ingress()
1225                .group_id(&monitoring_sg_id)
1226                .ip_permissions(
1227                    IpPermission::builder()
1228                        .ip_protocol("tcp")
1229                        .from_port(LOGS_PORT as i32)
1230                        .to_port(LOGS_PORT as i32)
1231                        .ip_ranges(IpRange::builder().cidr_ip(binary_cidr).build())
1232                        .build(),
1233                )
1234                .ip_permissions(
1235                    IpPermission::builder()
1236                        .ip_protocol("tcp")
1237                        .from_port(PROFILES_PORT as i32)
1238                        .to_port(PROFILES_PORT as i32)
1239                        .ip_ranges(IpRange::builder().cidr_ip(binary_cidr).build())
1240                        .build(),
1241                )
1242                .ip_permissions(
1243                    IpPermission::builder()
1244                        .ip_protocol("tcp")
1245                        .from_port(TRACES_PORT as i32)
1246                        .to_port(TRACES_PORT as i32)
1247                        .ip_ranges(IpRange::builder().cidr_ip(binary_cidr).build())
1248                        .build(),
1249                )
1250                .send()
1251                .await
1252                .map_err(|err| err.into_service_error())?;
1253            info!(
1254                monitoring = monitoring_sg_id.as_str(),
1255                binary = binary_cidr.as_str(),
1256                region = region.as_str(),
1257                "opened monitoring port to traffic from binary VPC"
1258            );
1259        }
1260    }
1261    info!("updated monitoring security group");
1262
1263    // Mark deployment as complete
1264    File::create(tag_directory.join(CREATED_FILE_NAME))?;
1265    info!(
1266        monitoring = monitoring_ip.as_str(),
1267        binary = ?all_binary_ips,
1268        "deployment complete"
1269    );
1270    Ok(())
1271}