commonware_deployer/ec2/
create.rs

1//! `create` subcommand for `ec2`
2
3use crate::ec2::{
4    aws::*, deployer_directory, s3::*, services::*, utils::*, Architecture, Config, Error, Host,
5    Hosts, InstanceConfig, CREATED_FILE_NAME, LOGS_PORT, MONITORING_NAME, MONITORING_REGION,
6    PROFILES_PORT, TRACES_PORT,
7};
8use futures::future::try_join_all;
9use std::{
10    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
11    fs::File,
12    net::IpAddr,
13    path::PathBuf,
14    slice,
15};
16use tokio::process::Command;
17use tracing::info;
18
19/// Pre-signed URLs for observability tools (prometheus, grafana, loki, pyroscope, tempo, node_exporter, promtail)
20type ToolUrls = (String, String, String, String, String, String, String);
21
22/// Represents a deployed instance with its configuration and public IP
23#[derive(Clone)]
24pub struct Deployment {
25    pub instance: InstanceConfig,
26    pub id: String,
27    pub ip: String,
28}
29
30/// Represents AWS resources created in a specific region
31pub struct RegionResources {
32    pub vpc_id: String,
33    pub vpc_cidr: String,
34    pub route_table_id: String,
35    pub subnet_id: String,
36    pub binary_sg_id: Option<String>,
37    pub monitoring_sg_id: Option<String>,
38}
39
40/// Sets up EC2 instances, deploys files, and configures monitoring and logging
41pub async fn create(config: &PathBuf) -> Result<(), Error> {
42    // Load configuration from YAML file
43    let config: Config = {
44        let config_file = File::open(config)?;
45        serde_yaml::from_reader(config_file)?
46    };
47    let tag = &config.tag;
48    info!(tag = tag.as_str(), "loaded configuration");
49
50    // Create a temporary directory for local files
51    let tag_directory = deployer_directory(tag);
52    if tag_directory.exists() {
53        return Err(Error::CreationAttempted);
54    }
55    std::fs::create_dir_all(&tag_directory)?;
56    info!(path = ?tag_directory, "created tag directory");
57
58    // Ensure no instance is duplicated or named MONITORING_NAME
59    let mut instance_names = HashSet::new();
60    for instance in &config.instances {
61        if !instance_names.insert(&instance.name) {
62            return Err(Error::DuplicateInstanceName(instance.name.clone()));
63        }
64        if instance.name == MONITORING_NAME {
65            return Err(Error::InvalidInstanceName(instance.name.clone()));
66        }
67    }
68
69    // Get public IP address of the deployer
70    let deployer_ip = get_public_ip().await?;
71    info!(ip = deployer_ip.as_str(), "recovered public IP");
72
73    // Generate SSH key pair
74    let key_name = format!("deployer-{tag}");
75    let private_key_path = tag_directory.join(format!("id_rsa_{tag}"));
76    let public_key_path = tag_directory.join(format!("id_rsa_{tag}.pub"));
77    let output = Command::new("ssh-keygen")
78        .arg("-t")
79        .arg("rsa")
80        .arg("-b")
81        .arg("4096")
82        .arg("-f")
83        .arg(private_key_path.to_str().unwrap())
84        .arg("-N")
85        .arg("")
86        .output()
87        .await?;
88    if !output.status.success() {
89        return Err(Error::KeygenFailed);
90    }
91    let public_key = std::fs::read_to_string(&public_key_path)?;
92    let private_key = private_key_path.to_str().unwrap();
93
94    // Determine unique regions
95    let mut regions: BTreeSet<String> = config.instances.iter().map(|i| i.region.clone()).collect();
96    regions.insert(MONITORING_REGION.to_string());
97
98    // Collect instance types by region (for availability zone selection) and unique types (for architecture detection)
99    let mut instance_types_by_region: HashMap<String, HashSet<String>> = HashMap::new();
100    let mut unique_instance_types: HashSet<String> = HashSet::new();
101    instance_types_by_region
102        .entry(MONITORING_REGION.to_string())
103        .or_default()
104        .insert(config.monitoring.instance_type.clone());
105    unique_instance_types.insert(config.monitoring.instance_type.clone());
106    for instance in &config.instances {
107        instance_types_by_region
108            .entry(instance.region.clone())
109            .or_default()
110            .insert(instance.instance_type.clone());
111        unique_instance_types.insert(instance.instance_type.clone());
112    }
113
114    // Detect architecture for each unique instance type (architecture is global, not region-specific)
115    info!("detecting architectures for instance types");
116    let ec2_client = create_ec2_client(Region::new(MONITORING_REGION)).await;
117    let mut arch_by_instance_type: HashMap<String, Architecture> = HashMap::new();
118    for instance_type in &unique_instance_types {
119        let arch = detect_architecture(&ec2_client, instance_type).await?;
120        info!(
121            architecture = %arch,
122            instance_type = instance_type.as_str(),
123            "detected architecture"
124        );
125        arch_by_instance_type.insert(instance_type.clone(), arch);
126    }
127
128    // Build per-instance architecture map and collect architectures needed
129    let monitoring_architecture = arch_by_instance_type[&config.monitoring.instance_type];
130    let mut instance_architectures: HashMap<String, Architecture> = HashMap::new();
131    let mut architectures_needed: HashSet<Architecture> = HashSet::new();
132    architectures_needed.insert(monitoring_architecture);
133    for instance in &config.instances {
134        let arch = arch_by_instance_type[&instance.instance_type];
135        instance_architectures.insert(instance.name.clone(), arch);
136        architectures_needed.insert(arch);
137    }
138
139    // Setup S3 bucket and cache observability tools
140    info!(bucket = S3_BUCKET_NAME, "setting up S3 bucket");
141    let s3_client = create_s3_client(Region::new(MONITORING_REGION)).await;
142    ensure_bucket_exists(&s3_client, S3_BUCKET_NAME, MONITORING_REGION).await?;
143
144    // Cache observability tools for each architecture needed
145    info!("uploading observability tools to S3");
146    let cache_tool = |s3_key: String, download_url: String| {
147        let tag_directory = tag_directory.clone();
148        let s3_client = s3_client.clone();
149        async move {
150            if !object_exists(&s3_client, S3_BUCKET_NAME, &s3_key).await? {
151                info!(
152                    key = s3_key.as_str(),
153                    "tool not in S3, downloading and uploading"
154                );
155                let temp_path = tag_directory.join(s3_key.replace('/', "_"));
156                download_file(&download_url, &temp_path).await?;
157                let url = upload_and_presign(
158                    &s3_client,
159                    S3_BUCKET_NAME,
160                    &s3_key,
161                    &temp_path,
162                    PRESIGN_DURATION,
163                )
164                .await?;
165                std::fs::remove_file(&temp_path)?;
166                Ok::<_, Error>(url)
167            } else {
168                info!(key = s3_key.as_str(), "tool already in S3");
169                presign_url(&s3_client, S3_BUCKET_NAME, &s3_key, PRESIGN_DURATION).await
170            }
171        }
172    };
173
174    // Cache tools for each architecture and store URLs per-architecture
175    let mut tool_urls_by_arch: HashMap<Architecture, ToolUrls> = HashMap::new();
176    for arch in &architectures_needed {
177        let [prometheus_url, grafana_url, loki_url, pyroscope_url, tempo_url, node_exporter_url, promtail_url]: [String; 7] =
178            try_join_all([
179                cache_tool(prometheus_bin_s3_key(PROMETHEUS_VERSION, *arch), prometheus_download_url(PROMETHEUS_VERSION, *arch)),
180                cache_tool(grafana_bin_s3_key(GRAFANA_VERSION, *arch), grafana_download_url(GRAFANA_VERSION, *arch)),
181                cache_tool(loki_bin_s3_key(LOKI_VERSION, *arch), loki_download_url(LOKI_VERSION, *arch)),
182                cache_tool(pyroscope_bin_s3_key(PYROSCOPE_VERSION, *arch), pyroscope_download_url(PYROSCOPE_VERSION, *arch)),
183                cache_tool(tempo_bin_s3_key(TEMPO_VERSION, *arch), tempo_download_url(TEMPO_VERSION, *arch)),
184                cache_tool(node_exporter_bin_s3_key(NODE_EXPORTER_VERSION, *arch), node_exporter_download_url(NODE_EXPORTER_VERSION, *arch)),
185                cache_tool(promtail_bin_s3_key(PROMTAIL_VERSION, *arch), promtail_download_url(PROMTAIL_VERSION, *arch)),
186            ])
187            .await?
188            .try_into()
189            .unwrap();
190        tool_urls_by_arch.insert(
191            *arch,
192            (
193                prometheus_url,
194                grafana_url,
195                loki_url,
196                pyroscope_url,
197                tempo_url,
198                node_exporter_url,
199                promtail_url,
200            ),
201        );
202    }
203    info!("observability tools uploaded");
204
205    // Compute digests for binaries and configs, grouping by digest for deduplication
206    let mut binary_digests: BTreeMap<String, String> = BTreeMap::new(); // digest -> path
207    let mut config_digests: BTreeMap<String, String> = BTreeMap::new(); // digest -> path
208    let mut instance_binary_digest: HashMap<String, String> = HashMap::new(); // instance -> digest
209    let mut instance_config_digest: HashMap<String, String> = HashMap::new(); // instance -> digest
210    for instance in &config.instances {
211        let binary_digest = hash_file(std::path::Path::new(&instance.binary))?;
212        let config_digest = hash_file(std::path::Path::new(&instance.config))?;
213        binary_digests.insert(binary_digest.clone(), instance.binary.clone());
214        config_digests.insert(config_digest.clone(), instance.config.clone());
215        instance_binary_digest.insert(instance.name.clone(), binary_digest);
216        instance_config_digest.insert(instance.name.clone(), config_digest);
217    }
218
219    // Upload unique binaries and configs to S3 (deduplicated by digest)
220    info!("uploading unique binaries and configs to S3");
221    let (binary_digest_to_url, config_digest_to_url): (
222        HashMap<String, String>,
223        HashMap<String, String>,
224    ) = tokio::try_join!(
225        async {
226            Ok::<_, Error>(
227                try_join_all(binary_digests.iter().map(|(digest, path)| {
228                    let s3_client = s3_client.clone();
229                    let digest = digest.clone();
230                    let key = binary_s3_key(tag, &digest);
231                    let path = path.clone();
232                    async move {
233                        let url = cache_file_and_presign(
234                            &s3_client,
235                            S3_BUCKET_NAME,
236                            &key,
237                            path.as_ref(),
238                            PRESIGN_DURATION,
239                        )
240                        .await?;
241                        Ok::<_, Error>((digest, url))
242                    }
243                }))
244                .await?
245                .into_iter()
246                .collect(),
247            )
248        },
249        async {
250            Ok::<_, Error>(
251                try_join_all(config_digests.iter().map(|(digest, path)| {
252                    let s3_client = s3_client.clone();
253                    let digest = digest.clone();
254                    let key = config_s3_key(tag, &digest);
255                    let path = path.clone();
256                    async move {
257                        let url = cache_file_and_presign(
258                            &s3_client,
259                            S3_BUCKET_NAME,
260                            &key,
261                            path.as_ref(),
262                            PRESIGN_DURATION,
263                        )
264                        .await?;
265                        Ok::<_, Error>((digest, url))
266                    }
267                }))
268                .await?
269                .into_iter()
270                .collect(),
271            )
272        },
273    )?;
274
275    // Map instance names to URLs via their digests
276    let mut instance_binary_urls: HashMap<String, String> = HashMap::new();
277    let mut instance_config_urls: HashMap<String, String> = HashMap::new();
278    for instance in &config.instances {
279        let binary_digest = &instance_binary_digest[&instance.name];
280        let config_digest = &instance_config_digest[&instance.name];
281        instance_binary_urls.insert(
282            instance.name.clone(),
283            binary_digest_to_url[binary_digest].clone(),
284        );
285        instance_config_urls.insert(
286            instance.name.clone(),
287            config_digest_to_url[config_digest].clone(),
288        );
289    }
290    info!("uploaded all instance binaries and configs");
291
292    // Initialize resources for each region concurrently
293    info!(?regions, "initializing resources");
294    let region_init_futures: Vec<_> = regions
295        .iter()
296        .enumerate()
297        .map(|(idx, region)| {
298            let region = region.clone();
299            let tag = tag.clone();
300            let deployer_ip = deployer_ip.clone();
301            let key_name = key_name.clone();
302            let public_key = public_key.clone();
303            let instance_types: Vec<String> =
304                instance_types_by_region[&region].iter().cloned().collect();
305
306            async move {
307                // Create client for region
308                let ec2_client = create_ec2_client(Region::new(region.clone())).await;
309                info!(region = region.as_str(), "created EC2 client");
310
311                // Find availability zone that supports all instance types
312                let az = find_availability_zone(&ec2_client, &instance_types).await?;
313                info!(
314                    az = az.as_str(),
315                    region = region.as_str(),
316                    "selected availability zone"
317                );
318
319                // Create VPC, IGW, route table, subnet, security groups, and key pair
320                let vpc_cidr = format!("10.{idx}.0.0/16");
321                let vpc_id = create_vpc(&ec2_client, &vpc_cidr, &tag).await?;
322                info!(
323                    vpc = vpc_id.as_str(),
324                    region = region.as_str(),
325                    "created VPC"
326                );
327                let igw_id = create_and_attach_igw(&ec2_client, &vpc_id, &tag).await?;
328                info!(
329                    igw = igw_id.as_str(),
330                    vpc = vpc_id.as_str(),
331                    region = region.as_str(),
332                    "created and attached IGW"
333                );
334                let route_table_id =
335                    create_route_table(&ec2_client, &vpc_id, &igw_id, &tag).await?;
336                info!(
337                    route_table = route_table_id.as_str(),
338                    vpc = vpc_id.as_str(),
339                    region = region.as_str(),
340                    "created route table"
341                );
342                let subnet_cidr = format!("10.{idx}.1.0/24");
343                let subnet_id = create_subnet(
344                    &ec2_client,
345                    &vpc_id,
346                    &route_table_id,
347                    &subnet_cidr,
348                    &az,
349                    &tag,
350                )
351                .await?;
352                info!(
353                    subnet = subnet_id.as_str(),
354                    vpc = vpc_id.as_str(),
355                    region = region.as_str(),
356                    "created subnet"
357                );
358
359                // Create monitoring security group in monitoring region
360                let monitoring_sg_id = if region == MONITORING_REGION {
361                    let sg_id =
362                        create_security_group_monitoring(&ec2_client, &vpc_id, &deployer_ip, &tag)
363                            .await?;
364                    info!(
365                        sg = sg_id.as_str(),
366                        vpc = vpc_id.as_str(),
367                        region = region.as_str(),
368                        "created monitoring security group"
369                    );
370                    Some(sg_id)
371                } else {
372                    None
373                };
374
375                // Import key pair
376                import_key_pair(&ec2_client, &key_name, &public_key).await?;
377                info!(
378                    key = key_name.as_str(),
379                    region = region.as_str(),
380                    "imported key pair"
381                );
382
383                info!(
384                    vpc = vpc_id.as_str(),
385                    subnet = subnet_id.as_str(),
386                    subnet_cidr = subnet_cidr.as_str(),
387                    region = region.as_str(),
388                    "initialized resources"
389                );
390
391                Ok::<_, Error>((
392                    region,
393                    ec2_client,
394                    RegionResources {
395                        vpc_id,
396                        vpc_cidr,
397                        route_table_id,
398                        subnet_id,
399                        binary_sg_id: None,
400                        monitoring_sg_id,
401                    },
402                ))
403            }
404        })
405        .collect();
406
407    let region_results = try_join_all(region_init_futures).await?;
408    let (ec2_clients, mut region_resources): (HashMap<_, _>, HashMap<_, _>) = region_results
409        .into_iter()
410        .map(|(region, client, resources)| ((region.clone(), client), (region, resources)))
411        .unzip();
412    info!(?regions, "initialized resources");
413
414    // Create binary security groups (without monitoring IP - added later for parallel launch)
415    info!("creating binary security groups");
416    for (region, resources) in region_resources.iter_mut() {
417        let binary_sg_id = create_security_group_binary(
418            &ec2_clients[region],
419            &resources.vpc_id,
420            &deployer_ip,
421            tag,
422            &config.ports,
423        )
424        .await?;
425        info!(
426            sg = binary_sg_id.as_str(),
427            vpc = resources.vpc_id.as_str(),
428            region = region.as_str(),
429            "created binary security group"
430        );
431        resources.binary_sg_id = Some(binary_sg_id);
432    }
433    info!("created binary security groups");
434
435    // Setup VPC peering connections
436    info!("initializing VPC peering connections");
437    let monitoring_region = MONITORING_REGION.to_string();
438    let monitoring_resources = region_resources.get(&monitoring_region).unwrap();
439    let monitoring_vpc_id = &monitoring_resources.vpc_id;
440    let monitoring_cidr = &monitoring_resources.vpc_cidr;
441    let monitoring_route_table_id = &monitoring_resources.route_table_id;
442    let binary_regions: HashSet<String> =
443        config.instances.iter().map(|i| i.region.clone()).collect();
444    for region in &regions {
445        if region != &monitoring_region && binary_regions.contains(region) {
446            let binary_resources = region_resources.get(region).unwrap();
447            let binary_vpc_id = &binary_resources.vpc_id;
448            let binary_cidr = &binary_resources.vpc_cidr;
449            let peer_id = create_vpc_peering_connection(
450                &ec2_clients[&monitoring_region],
451                monitoring_vpc_id,
452                binary_vpc_id,
453                region,
454                tag,
455            )
456            .await?;
457            info!(
458                peer = peer_id.as_str(),
459                monitoring = monitoring_vpc_id.as_str(),
460                binary = binary_vpc_id.as_str(),
461                region = region.as_str(),
462                "created VPC peering connection"
463            );
464            wait_for_vpc_peering_connection(&ec2_clients[region], &peer_id).await?;
465            info!(
466                peer = peer_id.as_str(),
467                region = region.as_str(),
468                "VPC peering connection is available"
469            );
470            accept_vpc_peering_connection(&ec2_clients[region], &peer_id).await?;
471            info!(
472                peer = peer_id.as_str(),
473                region = region.as_str(),
474                "accepted VPC peering connection"
475            );
476            add_route(
477                &ec2_clients[&monitoring_region],
478                monitoring_route_table_id,
479                binary_cidr,
480                &peer_id,
481            )
482            .await?;
483            add_route(
484                &ec2_clients[region],
485                &binary_resources.route_table_id,
486                monitoring_cidr,
487                &peer_id,
488            )
489            .await?;
490            info!(
491                peer = peer_id.as_str(),
492                monitoring = monitoring_vpc_id.as_str(),
493                binary = binary_vpc_id.as_str(),
494                region = region.as_str(),
495                "added routes for VPC peering connection"
496            );
497        }
498    }
499    info!("initialized VPC peering connections");
500
501    // Prepare launch configurations for all instances
502    info!("launching instances");
503    let monitoring_ec2_client = &ec2_clients[&monitoring_region];
504    let monitoring_ami_id = find_latest_ami(monitoring_ec2_client, monitoring_architecture).await?;
505    let monitoring_instance_type =
506        InstanceType::try_parse(&config.monitoring.instance_type).expect("Invalid instance type");
507    let monitoring_storage_class =
508        VolumeType::try_parse(&config.monitoring.storage_class).expect("Invalid storage class");
509    let monitoring_sg_id = monitoring_resources
510        .monitoring_sg_id
511        .as_ref()
512        .unwrap()
513        .clone();
514    let monitoring_subnet_id = monitoring_resources.subnet_id.clone();
515
516    let mut binary_launch_configs = Vec::new();
517    for instance in &config.instances {
518        let region = instance.region.clone();
519        let resources = region_resources.get(&region).unwrap();
520        let ec2_client = ec2_clients.get(&region).unwrap();
521        let arch = instance_architectures[&instance.name];
522        let ami_id = find_latest_ami(ec2_client, arch).await?;
523        binary_launch_configs.push((instance, ec2_client, resources, ami_id, arch));
524    }
525
526    // Launch monitoring instance (don't wait yet)
527    let monitoring_launch_future = {
528        let key_name = key_name.clone();
529        let tag = tag.clone();
530        let sg_id = monitoring_sg_id.clone();
531        async move {
532            let instance_id = launch_instances(
533                monitoring_ec2_client,
534                &monitoring_ami_id,
535                monitoring_instance_type,
536                config.monitoring.storage_size,
537                monitoring_storage_class,
538                &key_name,
539                &monitoring_subnet_id,
540                &sg_id,
541                1,
542                MONITORING_NAME,
543                &tag,
544            )
545            .await?[0]
546                .clone();
547            let ip =
548                wait_for_instances_running(monitoring_ec2_client, slice::from_ref(&instance_id))
549                    .await?[0]
550                    .clone();
551            let private_ip = get_private_ip(monitoring_ec2_client, &instance_id).await?;
552            info!(ip = ip.as_str(), "launched monitoring instance");
553            Ok::<(String, String, String), Error>((instance_id, ip, private_ip))
554        }
555    };
556
557    // Launch binary instances (don't wait yet)
558    let binary_launch_futures =
559        binary_launch_configs
560            .iter()
561            .map(|(instance, ec2_client, resources, ami_id, _arch)| {
562                let key_name = key_name.clone();
563                let instance_type = InstanceType::try_parse(&instance.instance_type)
564                    .expect("Invalid instance type");
565                let storage_class =
566                    VolumeType::try_parse(&instance.storage_class).expect("Invalid storage class");
567                let binary_sg_id = resources.binary_sg_id.as_ref().unwrap();
568                let tag = tag.clone();
569                async move {
570                    let instance_id = launch_instances(
571                        ec2_client,
572                        ami_id,
573                        instance_type,
574                        instance.storage_size,
575                        storage_class,
576                        &key_name,
577                        &resources.subnet_id,
578                        binary_sg_id,
579                        1,
580                        &instance.name,
581                        &tag,
582                    )
583                    .await?[0]
584                        .clone();
585                    let ip = wait_for_instances_running(ec2_client, slice::from_ref(&instance_id))
586                        .await?[0]
587                        .clone();
588                    info!(
589                        ip = ip.as_str(),
590                        instance = instance.name.as_str(),
591                        "launched instance"
592                    );
593                    Ok::<Deployment, Error>(Deployment {
594                        instance: (*instance).clone(),
595                        id: instance_id,
596                        ip,
597                    })
598                }
599            });
600
601    // Wait for all instances in parallel
602    let (monitoring_result, deployments) = tokio::try_join!(
603        monitoring_launch_future,
604        try_join_all(binary_launch_futures)
605    )?;
606    let (monitoring_instance_id, monitoring_ip, monitoring_private_ip) = monitoring_result;
607    info!("launched instances");
608
609    // Add monitoring IP rules to binary security groups (for Prometheus scraping).
610    // This happens after instance launch but before instance configuration, so there's
611    // no window where Prometheus would try to scrape unconfigured instances.
612    info!("adding monitoring ingress rules");
613    for (region, resources) in region_resources.iter() {
614        let binary_sg_id = resources.binary_sg_id.as_ref().unwrap();
615        add_monitoring_ingress(&ec2_clients[region], binary_sg_id, &monitoring_ip).await?;
616    }
617    info!("added monitoring ingress rules");
618
619    // Cache static config files globally (these don't change between deployments)
620    info!("uploading config files to S3");
621    let [
622        bbr_conf_url,
623        datasources_url,
624        all_yml_url,
625        loki_yml_url,
626        pyroscope_yml_url,
627        tempo_yml_url,
628        prometheus_service_url,
629        loki_service_url,
630        pyroscope_service_url,
631        tempo_service_url,
632        monitoring_node_exporter_service_url,
633        promtail_service_url,
634        logrotate_conf_url,
635        pyroscope_agent_service_url,
636        pyroscope_agent_timer_url,
637    ]: [String; 15] = try_join_all([
638        cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &bbr_config_s3_key(), BBR_CONF.as_bytes(), PRESIGN_DURATION),
639        cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &grafana_datasources_s3_key(), DATASOURCES_YML.as_bytes(), PRESIGN_DURATION),
640        cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &grafana_dashboards_s3_key(), ALL_YML.as_bytes(), PRESIGN_DURATION),
641        cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &loki_config_s3_key(), LOKI_CONFIG.as_bytes(), PRESIGN_DURATION),
642        cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &pyroscope_config_s3_key(), PYROSCOPE_CONFIG.as_bytes(), PRESIGN_DURATION),
643        cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &tempo_config_s3_key(), TEMPO_CONFIG.as_bytes(), PRESIGN_DURATION),
644        cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &prometheus_service_s3_key(), PROMETHEUS_SERVICE.as_bytes(), PRESIGN_DURATION),
645        cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &loki_service_s3_key(), LOKI_SERVICE.as_bytes(), PRESIGN_DURATION),
646        cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &pyroscope_service_s3_key(), PYROSCOPE_SERVICE.as_bytes(), PRESIGN_DURATION),
647        cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &tempo_service_s3_key(), TEMPO_SERVICE.as_bytes(), PRESIGN_DURATION),
648        cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &node_exporter_service_s3_key(), NODE_EXPORTER_SERVICE.as_bytes(), PRESIGN_DURATION),
649        cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &promtail_service_s3_key(), PROMTAIL_SERVICE.as_bytes(), PRESIGN_DURATION),
650        cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &logrotate_config_s3_key(), LOGROTATE_CONF.as_bytes(), PRESIGN_DURATION),
651        cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &pyroscope_agent_service_s3_key(), PYROSCOPE_AGENT_SERVICE.as_bytes(), PRESIGN_DURATION),
652        cache_content_and_presign(&s3_client, S3_BUCKET_NAME, &pyroscope_agent_timer_s3_key(), PYROSCOPE_AGENT_TIMER.as_bytes(), PRESIGN_DURATION),
653    ])
654    .await?
655    .try_into()
656    .unwrap();
657
658    // Cache binary_service per architecture
659    let mut binary_service_urls_by_arch: HashMap<Architecture, String> = HashMap::new();
660    for arch in &architectures_needed {
661        let binary_service_content = binary_service(*arch);
662        let temp_path = tag_directory.join(format!("binary-{}.service", arch.as_str()));
663        std::fs::write(&temp_path, &binary_service_content)?;
664        let binary_service_url = cache_file_and_presign(
665            &s3_client,
666            S3_BUCKET_NAME,
667            &binary_service_s3_key_for_arch(*arch),
668            &temp_path,
669            PRESIGN_DURATION,
670        )
671        .await?;
672        std::fs::remove_file(&temp_path)?;
673        binary_service_urls_by_arch.insert(*arch, binary_service_url);
674    }
675
676    // Upload deployment-specific monitoring config files (deduplicated by digest)
677    let instances: Vec<(&str, &str, &str, &str)> = deployments
678        .iter()
679        .map(|d| {
680            let arch = instance_architectures[&d.instance.name];
681            (
682                d.instance.name.as_str(),
683                d.ip.as_str(),
684                d.instance.region.as_str(),
685                arch.as_str(),
686            )
687        })
688        .collect();
689    let prom_config = generate_prometheus_config(&instances);
690    let prom_path = tag_directory.join("prometheus.yml");
691    std::fs::write(&prom_path, &prom_config)?;
692    let prom_digest = hash_file(&prom_path)?;
693    let dashboard_path = std::path::PathBuf::from(&config.monitoring.dashboard);
694    let dashboard_digest = hash_file(&dashboard_path)?;
695    let [prometheus_config_url, dashboard_url]: [String; 2] = try_join_all([
696        cache_file_and_presign(
697            &s3_client,
698            S3_BUCKET_NAME,
699            &monitoring_s3_key(tag, &prom_digest),
700            &prom_path,
701            PRESIGN_DURATION,
702        ),
703        cache_file_and_presign(
704            &s3_client,
705            S3_BUCKET_NAME,
706            &monitoring_s3_key(tag, &dashboard_digest),
707            &dashboard_path,
708            PRESIGN_DURATION,
709        ),
710    ])
711    .await?
712    .try_into()
713    .unwrap();
714
715    // Generate hosts.yaml and upload once (shared by all instances)
716    let hosts = Hosts {
717        monitoring: monitoring_private_ip.clone().parse::<IpAddr>().unwrap(),
718        hosts: deployments
719            .iter()
720            .map(|d| Host {
721                name: d.instance.name.clone(),
722                region: d.instance.region.clone(),
723                ip: d.ip.clone().parse::<IpAddr>().unwrap(),
724            })
725            .collect(),
726    };
727    let hosts_yaml = serde_yaml::to_string(&hosts)?;
728    let hosts_path = tag_directory.join("hosts.yaml");
729    std::fs::write(&hosts_path, &hosts_yaml)?;
730    let hosts_digest = hash_file(&hosts_path)?;
731    let hosts_url = cache_file_and_presign(
732        &s3_client,
733        S3_BUCKET_NAME,
734        &hosts_s3_key(tag, &hosts_digest),
735        &hosts_path,
736        PRESIGN_DURATION,
737    )
738    .await?;
739
740    // Write per-instance config files locally, compute digests, and deduplicate
741    let mut promtail_digests: BTreeMap<String, std::path::PathBuf> = BTreeMap::new();
742    let mut pyroscope_digests: BTreeMap<String, std::path::PathBuf> = BTreeMap::new();
743    let mut instance_promtail_digest: HashMap<String, String> = HashMap::new();
744    let mut instance_pyroscope_digest: HashMap<String, String> = HashMap::new();
745    for deployment in &deployments {
746        let instance = &deployment.instance;
747        let ip = &deployment.ip;
748        let arch = instance_architectures[&instance.name].as_str();
749
750        let promtail_cfg = promtail_config(
751            &monitoring_private_ip,
752            &instance.name,
753            ip,
754            &instance.region,
755            arch,
756        );
757        let promtail_path = tag_directory.join(format!("promtail_{}.yml", instance.name));
758        std::fs::write(&promtail_path, &promtail_cfg)?;
759        let promtail_digest = hash_file(&promtail_path)?;
760
761        let pyroscope_script = generate_pyroscope_script(
762            &monitoring_private_ip,
763            &instance.name,
764            ip,
765            &instance.region,
766            arch,
767        );
768        let pyroscope_path = tag_directory.join(format!("pyroscope-agent_{}.sh", instance.name));
769        std::fs::write(&pyroscope_path, &pyroscope_script)?;
770        let pyroscope_digest = hash_file(&pyroscope_path)?;
771
772        promtail_digests.insert(promtail_digest.clone(), promtail_path);
773        pyroscope_digests.insert(pyroscope_digest.clone(), pyroscope_path);
774        instance_promtail_digest.insert(instance.name.clone(), promtail_digest);
775        instance_pyroscope_digest.insert(instance.name.clone(), pyroscope_digest);
776    }
777
778    // Upload unique promtail and pyroscope configs
779    let (promtail_digest_to_url, pyroscope_digest_to_url): (
780        HashMap<String, String>,
781        HashMap<String, String>,
782    ) = tokio::try_join!(
783        async {
784            Ok::<_, Error>(
785                try_join_all(promtail_digests.iter().map(|(digest, path)| {
786                    let s3_client = s3_client.clone();
787                    let digest = digest.clone();
788                    let key = promtail_s3_key(tag, &digest);
789                    let path = path.clone();
790                    async move {
791                        let url = cache_file_and_presign(
792                            &s3_client,
793                            S3_BUCKET_NAME,
794                            &key,
795                            &path,
796                            PRESIGN_DURATION,
797                        )
798                        .await?;
799                        Ok::<_, Error>((digest, url))
800                    }
801                }))
802                .await?
803                .into_iter()
804                .collect(),
805            )
806        },
807        async {
808            Ok::<_, Error>(
809                try_join_all(pyroscope_digests.iter().map(|(digest, path)| {
810                    let s3_client = s3_client.clone();
811                    let digest = digest.clone();
812                    let key = pyroscope_s3_key(tag, &digest);
813                    let path = path.clone();
814                    async move {
815                        let url = cache_file_and_presign(
816                            &s3_client,
817                            S3_BUCKET_NAME,
818                            &key,
819                            &path,
820                            PRESIGN_DURATION,
821                        )
822                        .await?;
823                        Ok::<_, Error>((digest, url))
824                    }
825                }))
826                .await?
827                .into_iter()
828                .collect(),
829            )
830        },
831    )?;
832
833    // Build instance URLs map (using architecture-specific tool URLs)
834    let mut instance_urls_map: HashMap<String, (InstanceUrls, Architecture)> = HashMap::new();
835    for deployment in &deployments {
836        let name = &deployment.instance.name;
837        let arch = instance_architectures[name];
838        let promtail_digest = &instance_promtail_digest[name];
839        let pyroscope_digest = &instance_pyroscope_digest[name];
840        let (_, _, _, _, _, node_exporter_url, promtail_url) = &tool_urls_by_arch[&arch];
841
842        instance_urls_map.insert(
843            name.clone(),
844            (
845                InstanceUrls {
846                    binary: instance_binary_urls[name].clone(),
847                    config: instance_config_urls[name].clone(),
848                    hosts: hosts_url.clone(),
849                    promtail_bin: promtail_url.clone(),
850                    promtail_config: promtail_digest_to_url[promtail_digest].clone(),
851                    promtail_service: promtail_service_url.clone(),
852                    node_exporter_bin: node_exporter_url.clone(),
853                    node_exporter_service: monitoring_node_exporter_service_url.clone(),
854                    binary_service: binary_service_urls_by_arch[&arch].clone(),
855                    logrotate_conf: logrotate_conf_url.clone(),
856                    pyroscope_script: pyroscope_digest_to_url[pyroscope_digest].clone(),
857                    pyroscope_service: pyroscope_agent_service_url.clone(),
858                    pyroscope_timer: pyroscope_agent_timer_url.clone(),
859                },
860                arch,
861            ),
862        );
863    }
864    info!("uploaded config files to S3");
865
866    // Build monitoring URLs struct for SSH configuration (using monitoring architecture)
867    let (prometheus_url, grafana_url, loki_url, pyroscope_url, tempo_url, node_exporter_url, _) =
868        &tool_urls_by_arch[&monitoring_architecture];
869    let monitoring_urls = MonitoringUrls {
870        prometheus_bin: prometheus_url.clone(),
871        grafana_bin: grafana_url.clone(),
872        loki_bin: loki_url.clone(),
873        pyroscope_bin: pyroscope_url.clone(),
874        tempo_bin: tempo_url.clone(),
875        node_exporter_bin: node_exporter_url.clone(),
876        prometheus_config: prometheus_config_url,
877        datasources_yml: datasources_url,
878        all_yml: all_yml_url,
879        dashboard: dashboard_url,
880        loki_yml: loki_yml_url,
881        pyroscope_yml: pyroscope_yml_url,
882        tempo_yml: tempo_yml_url,
883        prometheus_service: prometheus_service_url,
884        loki_service: loki_service_url,
885        pyroscope_service: pyroscope_service_url,
886        tempo_service: tempo_service_url,
887        node_exporter_service: monitoring_node_exporter_service_url.clone(),
888    };
889
890    // Prepare binary instance configuration futures
891    info!("configuring monitoring and binary instances");
892    let binary_configs: Vec<_> = deployments
893        .iter()
894        .map(|deployment| {
895            let instance = deployment.instance.clone();
896            let deployment_id = deployment.id.clone();
897            let ec2_client = ec2_clients[&instance.region].clone();
898            let ip = deployment.ip.clone();
899            let bbr_url = bbr_conf_url.clone();
900            let (urls, arch) = instance_urls_map.remove(&instance.name).unwrap();
901            (instance, deployment_id, ec2_client, ip, bbr_url, urls, arch)
902        })
903        .collect();
904    let binary_futures = binary_configs.into_iter().map(
905        |(instance, deployment_id, ec2_client, ip, bbr_url, urls, arch)| async move {
906            wait_for_instances_ready(&ec2_client, slice::from_ref(&deployment_id)).await?;
907            enable_bbr(private_key, &ip, &bbr_url).await?;
908            ssh_execute(
909                private_key,
910                &ip,
911                &install_binary_cmd(&urls, instance.profiling, arch),
912            )
913            .await?;
914            poll_service_active(private_key, &ip, "promtail").await?;
915            poll_service_active(private_key, &ip, "node_exporter").await?;
916            poll_service_active(private_key, &ip, "binary").await?;
917            info!(
918                ip = ip.as_str(),
919                instance = instance.name.as_str(),
920                "configured instance"
921            );
922            Ok::<String, Error>(ip)
923        },
924    );
925
926    // Run monitoring and binary configuration in parallel
927    let (_, all_binary_ips) = tokio::try_join!(
928        async {
929            // Configure monitoring instance
930            let monitoring_ec2_client = &ec2_clients[&monitoring_region];
931            wait_for_instances_ready(
932                monitoring_ec2_client,
933                slice::from_ref(&monitoring_instance_id),
934            )
935            .await?;
936            enable_bbr(private_key, &monitoring_ip, &bbr_conf_url).await?;
937            ssh_execute(
938                private_key,
939                &monitoring_ip,
940                &install_monitoring_cmd(
941                    &monitoring_urls,
942                    PROMETHEUS_VERSION,
943                    monitoring_architecture,
944                ),
945            )
946            .await?;
947            ssh_execute(private_key, &monitoring_ip, start_monitoring_services_cmd()).await?;
948            poll_service_active(private_key, &monitoring_ip, "node_exporter").await?;
949            poll_service_active(private_key, &monitoring_ip, "prometheus").await?;
950            poll_service_active(private_key, &monitoring_ip, "loki").await?;
951            poll_service_active(private_key, &monitoring_ip, "pyroscope").await?;
952            poll_service_active(private_key, &monitoring_ip, "tempo").await?;
953            poll_service_active(private_key, &monitoring_ip, "grafana-server").await?;
954            info!(
955                ip = monitoring_ip.as_str(),
956                "configured monitoring instance"
957            );
958            Ok::<(), Error>(())
959        },
960        async {
961            // Configure binary instances
962            let all_binary_ips = try_join_all(binary_futures).await?;
963            info!("configured binary instances");
964            Ok::<Vec<String>, Error>(all_binary_ips)
965        }
966    )?;
967
968    // Update monitoring security group to restrict Loki port (3100)
969    info!("updating monitoring security group to allow traffic from binary instances");
970    let monitoring_ec2_client = &ec2_clients[&monitoring_region];
971    if binary_regions.contains(&monitoring_region) {
972        let binary_sg_id = region_resources[&monitoring_region]
973            .binary_sg_id
974            .clone()
975            .unwrap();
976        monitoring_ec2_client
977            .authorize_security_group_ingress()
978            .group_id(&monitoring_sg_id)
979            .ip_permissions(
980                IpPermission::builder()
981                    .ip_protocol("tcp")
982                    .from_port(LOGS_PORT as i32)
983                    .to_port(LOGS_PORT as i32)
984                    .user_id_group_pairs(
985                        UserIdGroupPair::builder()
986                            .group_id(binary_sg_id.clone())
987                            .build(),
988                    )
989                    .build(),
990            )
991            .ip_permissions(
992                IpPermission::builder()
993                    .ip_protocol("tcp")
994                    .from_port(PROFILES_PORT as i32)
995                    .to_port(PROFILES_PORT as i32)
996                    .user_id_group_pairs(
997                        UserIdGroupPair::builder()
998                            .group_id(binary_sg_id.clone())
999                            .build(),
1000                    )
1001                    .build(),
1002            )
1003            .ip_permissions(
1004                IpPermission::builder()
1005                    .ip_protocol("tcp")
1006                    .from_port(TRACES_PORT as i32)
1007                    .to_port(TRACES_PORT as i32)
1008                    .user_id_group_pairs(
1009                        UserIdGroupPair::builder()
1010                            .group_id(binary_sg_id.clone())
1011                            .build(),
1012                    )
1013                    .build(),
1014            )
1015            .send()
1016            .await
1017            .map_err(|err| err.into_service_error())?;
1018        info!(
1019            monitoring = monitoring_sg_id.as_str(),
1020            binary = binary_sg_id.as_str(),
1021            region = monitoring_region.as_str(),
1022            "linked monitoring and binary security groups in monitoring region"
1023        );
1024    }
1025    for region in &regions {
1026        if region != &monitoring_region && binary_regions.contains(region) {
1027            let binary_cidr = &region_resources[region].vpc_cidr;
1028            monitoring_ec2_client
1029                .authorize_security_group_ingress()
1030                .group_id(&monitoring_sg_id)
1031                .ip_permissions(
1032                    IpPermission::builder()
1033                        .ip_protocol("tcp")
1034                        .from_port(LOGS_PORT as i32)
1035                        .to_port(LOGS_PORT as i32)
1036                        .ip_ranges(IpRange::builder().cidr_ip(binary_cidr).build())
1037                        .build(),
1038                )
1039                .ip_permissions(
1040                    IpPermission::builder()
1041                        .ip_protocol("tcp")
1042                        .from_port(PROFILES_PORT as i32)
1043                        .to_port(PROFILES_PORT as i32)
1044                        .ip_ranges(IpRange::builder().cidr_ip(binary_cidr).build())
1045                        .build(),
1046                )
1047                .ip_permissions(
1048                    IpPermission::builder()
1049                        .ip_protocol("tcp")
1050                        .from_port(TRACES_PORT as i32)
1051                        .to_port(TRACES_PORT as i32)
1052                        .ip_ranges(IpRange::builder().cidr_ip(binary_cidr).build())
1053                        .build(),
1054                )
1055                .send()
1056                .await
1057                .map_err(|err| err.into_service_error())?;
1058            info!(
1059                monitoring = monitoring_sg_id.as_str(),
1060                binary = binary_cidr.as_str(),
1061                region = region.as_str(),
1062                "opened monitoring port to traffic from binary VPC"
1063            );
1064        }
1065    }
1066    info!("updated monitoring security group");
1067
1068    // Mark deployment as complete
1069    File::create(tag_directory.join(CREATED_FILE_NAME))?;
1070    info!(
1071        monitoring = monitoring_ip.as_str(),
1072        binary = ?all_binary_ips,
1073        "deployment complete"
1074    );
1075    Ok(())
1076}