sn_testnet_deploy/
upscale.rs

1// Copyright (c) 2023, MaidSafe.
2// All rights reserved.
3//
4// This SAFE Network Software is licensed under the BSD-3-Clause license.
5// Please see the LICENSE file for more details.
6
7use crate::{
8    ansible::{
9        inventory::AnsibleInventoryType,
10        provisioning::{PrivateNodeProvisionInventory, ProvisionOptions},
11    },
12    error::{Error, Result},
13    get_bootstrap_cache_url, get_genesis_multiaddr, get_multiaddr, DeploymentInventory,
14    DeploymentType, InfraRunOptions, NodeType, TestnetDeployer,
15};
16use colored::Colorize;
17use evmlib::common::U256;
18use log::debug;
19use std::{collections::HashSet, time::Duration};
20
21#[derive(Clone)]
22pub struct UpscaleOptions {
23    pub ansible_verbose: bool,
24    pub ant_version: Option<String>,
25    pub current_inventory: DeploymentInventory,
26    pub desired_full_cone_private_node_count: Option<u16>,
27    pub desired_full_cone_private_node_vm_count: Option<u16>,
28    pub desired_node_count: Option<u16>,
29    pub desired_node_vm_count: Option<u16>,
30    pub desired_peer_cache_node_count: Option<u16>,
31    pub desired_peer_cache_node_vm_count: Option<u16>,
32    pub desired_symmetric_private_node_count: Option<u16>,
33    pub desired_symmetric_private_node_vm_count: Option<u16>,
34    pub desired_uploader_vm_count: Option<u16>,
35    pub desired_uploaders_count: Option<u16>,
36    pub funding_wallet_secret_key: Option<String>,
37    pub gas_amount: Option<U256>,
38    pub interval: Duration,
39    pub infra_only: bool,
40    pub max_archived_log_files: u16,
41    pub max_log_files: u16,
42    pub plan: bool,
43    pub public_rpc: bool,
44    pub provision_only: bool,
45    pub token_amount: Option<U256>,
46}
47
48impl TestnetDeployer {
49    pub async fn upscale(&self, options: &UpscaleOptions) -> Result<()> {
50        let is_bootstrap_deploy = matches!(
51            options
52                .current_inventory
53                .environment_details
54                .deployment_type,
55            DeploymentType::Bootstrap
56        );
57
58        if is_bootstrap_deploy
59            && (options.desired_peer_cache_node_count.is_some()
60                || options.desired_peer_cache_node_vm_count.is_some()
61                || options.desired_uploader_vm_count.is_some())
62        {
63            return Err(Error::InvalidUpscaleOptionsForBootstrapDeployment);
64        }
65
66        let desired_peer_cache_node_vm_count = options
67            .desired_peer_cache_node_vm_count
68            .unwrap_or(options.current_inventory.peer_cache_node_vms.len() as u16);
69        if desired_peer_cache_node_vm_count
70            < options.current_inventory.peer_cache_node_vms.len() as u16
71        {
72            return Err(Error::InvalidUpscaleDesiredPeerCacheVmCount);
73        }
74        debug!("Using {desired_peer_cache_node_vm_count} for desired Peer Cache node VM count");
75
76        let desired_node_vm_count = options
77            .desired_node_vm_count
78            .unwrap_or(options.current_inventory.node_vms.len() as u16);
79        if desired_node_vm_count < options.current_inventory.node_vms.len() as u16 {
80            return Err(Error::InvalidUpscaleDesiredNodeVmCount);
81        }
82        debug!("Using {desired_node_vm_count} for desired node VM count");
83
84        let desired_full_cone_private_node_vm_count = options
85            .desired_full_cone_private_node_vm_count
86            .unwrap_or(options.current_inventory.full_cone_private_node_vms.len() as u16);
87        if desired_full_cone_private_node_vm_count
88            < options.current_inventory.full_cone_private_node_vms.len() as u16
89        {
90            return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeVmCount);
91        }
92        debug!("Using {desired_full_cone_private_node_vm_count} for desired full cone private node VM count");
93
94        let desired_symmetric_private_node_vm_count = options
95            .desired_symmetric_private_node_vm_count
96            .unwrap_or(options.current_inventory.symmetric_private_node_vms.len() as u16);
97        if desired_symmetric_private_node_vm_count
98            < options.current_inventory.symmetric_private_node_vms.len() as u16
99        {
100            return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeVmCount);
101        }
102        debug!("Using {desired_symmetric_private_node_vm_count} for desired full cone private node VM count");
103
104        let desired_uploader_vm_count = options
105            .desired_uploader_vm_count
106            .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
107        if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
108            return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
109        }
110        debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
111
112        let desired_peer_cache_node_count = options
113            .desired_peer_cache_node_count
114            .unwrap_or(options.current_inventory.peer_cache_node_count() as u16);
115        if desired_peer_cache_node_count < options.current_inventory.peer_cache_node_count() as u16
116        {
117            return Err(Error::InvalidUpscaleDesiredPeerCacheNodeCount);
118        }
119        debug!("Using {desired_peer_cache_node_count} for desired peer cache node count");
120
121        let desired_node_count = options
122            .desired_node_count
123            .unwrap_or(options.current_inventory.node_count() as u16);
124        if desired_node_count < options.current_inventory.node_count() as u16 {
125            return Err(Error::InvalidUpscaleDesiredNodeCount);
126        }
127        debug!("Using {desired_node_count} for desired node count");
128
129        let desired_full_cone_private_node_count = options
130            .desired_full_cone_private_node_count
131            .unwrap_or(options.current_inventory.full_cone_private_node_count() as u16);
132        if desired_full_cone_private_node_count
133            < options.current_inventory.full_cone_private_node_count() as u16
134        {
135            return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeCount);
136        }
137        debug!(
138            "Using {desired_full_cone_private_node_count} for desired full cone private node count"
139        );
140
141        let desired_symmetric_private_node_count = options
142            .desired_symmetric_private_node_count
143            .unwrap_or(options.current_inventory.symmetric_private_node_count() as u16);
144        if desired_symmetric_private_node_count
145            < options.current_inventory.symmetric_private_node_count() as u16
146        {
147            return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeCount);
148        }
149        debug!(
150            "Using {desired_symmetric_private_node_count} for desired symmetric private node count"
151        );
152
153        let mut infra_run_options = InfraRunOptions::generate_existing(
154            &options.current_inventory.name,
155            &self.terraform_runner,
156            &options.current_inventory.environment_details,
157        )
158        .await?;
159        infra_run_options.peer_cache_node_vm_count = Some(desired_peer_cache_node_vm_count);
160        infra_run_options.node_vm_count = Some(desired_node_vm_count);
161        infra_run_options.full_cone_private_node_vm_count =
162            Some(desired_full_cone_private_node_vm_count);
163        infra_run_options.symmetric_private_node_vm_count =
164            Some(desired_symmetric_private_node_vm_count);
165        infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
166
167        if options.plan {
168            self.plan(&infra_run_options)?;
169            return Ok(());
170        }
171
172        self.create_or_update_infra(&infra_run_options)
173            .map_err(|err| {
174                println!("Failed to create infra {err:?}");
175                err
176            })?;
177
178        if options.infra_only {
179            return Ok(());
180        }
181
182        let mut provision_options = ProvisionOptions {
183            binary_option: options.current_inventory.binary_option.clone(),
184            chunk_size: None,
185            client_env_variables: None,
186            downloaders_count: 0,
187            enable_telegraf: true,
188            evm_network: options
189                .current_inventory
190                .environment_details
191                .evm_network
192                .clone(),
193            evm_data_payments_address: options
194                .current_inventory
195                .environment_details
196                .evm_data_payments_address
197                .clone(),
198            evm_payment_token_address: options
199                .current_inventory
200                .environment_details
201                .evm_payment_token_address
202                .clone(),
203            evm_rpc_url: options
204                .current_inventory
205                .environment_details
206                .evm_rpc_url
207                .clone(),
208            full_cone_private_node_count: desired_full_cone_private_node_count,
209            funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
210            interval: options.interval,
211            log_format: None,
212            logstash_details: None,
213            name: options.current_inventory.name.clone(),
214            network_id: options.current_inventory.environment_details.network_id,
215            node_count: desired_node_count,
216            node_env_variables: None,
217            max_archived_log_files: options.max_archived_log_files,
218            max_log_files: options.max_log_files,
219            output_inventory_dir_path: self
220                .working_directory_path
221                .join("ansible")
222                .join("inventory"),
223            peer_cache_node_count: desired_peer_cache_node_count,
224            public_rpc: options.public_rpc,
225            rewards_address: options
226                .current_inventory
227                .environment_details
228                .rewards_address
229                .clone(),
230            symmetric_private_node_count: desired_symmetric_private_node_count,
231            ant_version: options.ant_version.clone(),
232            uploaders_count: options.desired_uploaders_count,
233            gas_amount: options.gas_amount,
234            token_amount: None,
235        };
236        let mut node_provision_failed = false;
237
238        let (initial_multiaddr, initial_ip_addr) = if is_bootstrap_deploy {
239            get_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client).map_err(
240                |err| {
241                    println!("Failed to get node multiaddr {err:?}");
242                    err
243                },
244            )?
245        } else {
246            get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
247                .map_err(|err| {
248                    println!("Failed to get genesis multiaddr {err:?}");
249                    err
250                })?
251        };
252        let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
253        debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
254
255        if !is_bootstrap_deploy {
256            self.wait_for_ssh_availability_on_new_machines(
257                AnsibleInventoryType::PeerCacheNodes,
258                &options.current_inventory,
259            )?;
260            self.ansible_provisioner
261                .print_ansible_run_banner("Provision Peer Cache Nodes");
262            match self.ansible_provisioner.provision_nodes(
263                &provision_options,
264                Some(initial_multiaddr.clone()),
265                Some(initial_network_contacts_url.clone()),
266                NodeType::PeerCache,
267            ) {
268                Ok(()) => {
269                    println!("Provisioned Peer Cache nodes");
270                }
271                Err(err) => {
272                    log::error!("Failed to provision Peer Cache nodes: {err}");
273                    node_provision_failed = true;
274                }
275            }
276        }
277
278        self.wait_for_ssh_availability_on_new_machines(
279            AnsibleInventoryType::Nodes,
280            &options.current_inventory,
281        )?;
282        self.ansible_provisioner
283            .print_ansible_run_banner("Provision Normal Nodes");
284        match self.ansible_provisioner.provision_nodes(
285            &provision_options,
286            Some(initial_multiaddr.clone()),
287            Some(initial_network_contacts_url.clone()),
288            NodeType::Generic,
289        ) {
290            Ok(()) => {
291                println!("Provisioned normal nodes");
292            }
293            Err(err) => {
294                log::error!("Failed to provision normal nodes: {err}");
295                node_provision_failed = true;
296            }
297        }
298
299        let private_node_inventory = PrivateNodeProvisionInventory::new(
300            &self.ansible_provisioner,
301            Some(desired_full_cone_private_node_vm_count),
302            Some(desired_symmetric_private_node_vm_count),
303        )?;
304
305        if private_node_inventory.should_provision_full_cone_private_nodes() {
306            let full_cone_nat_gateway_inventory = self
307                .ansible_provisioner
308                .ansible_runner
309                .get_inventory(AnsibleInventoryType::FullConeNatGateway, true)?;
310
311            let full_cone_nat_gateway_new_vms: Vec<_> = full_cone_nat_gateway_inventory
312                .into_iter()
313                .filter(|item| {
314                    !options
315                        .current_inventory
316                        .full_cone_nat_gateway_vms
317                        .contains(item)
318                })
319                .collect();
320
321            for vm in full_cone_nat_gateway_new_vms.iter() {
322                self.ssh_client.wait_for_ssh_availability(
323                    &vm.public_ip_addr,
324                    &self.cloud_provider.get_ssh_user(),
325                )?;
326            }
327
328            let full_cone_nat_gateway_new_vms = if full_cone_nat_gateway_new_vms.is_empty() {
329                None
330            } else {
331                debug!("Full Cone NAT Gateway new VMs: {full_cone_nat_gateway_new_vms:?}");
332                Some(full_cone_nat_gateway_new_vms)
333            };
334
335            match self.ansible_provisioner.provision_full_cone(
336                &provision_options,
337                Some(initial_multiaddr.clone()),
338                Some(initial_network_contacts_url.clone()),
339                private_node_inventory.clone(),
340                full_cone_nat_gateway_new_vms,
341            ) {
342                Ok(()) => {
343                    println!("Provisioned Full Cone nodes and Gateway");
344                }
345                Err(err) => {
346                    log::error!("Failed to provision Full Cone nodes and Gateway: {err}");
347                    node_provision_failed = true;
348                }
349            }
350        }
351
352        if private_node_inventory.should_provision_symmetric_private_nodes() {
353            self.wait_for_ssh_availability_on_new_machines(
354                AnsibleInventoryType::SymmetricNatGateway,
355                &options.current_inventory,
356            )?;
357            self.ansible_provisioner
358                .print_ansible_run_banner("Provision Symmetric NAT Gateway");
359            self.ansible_provisioner
360                .provision_symmetric_nat_gateway(&provision_options, &private_node_inventory)
361                .map_err(|err| {
362                    println!("Failed to provision symmetric NAT gateway {err:?}");
363                    err
364                })?;
365
366            self.wait_for_ssh_availability_on_new_machines(
367                AnsibleInventoryType::SymmetricPrivateNodes,
368                &options.current_inventory,
369            )?;
370            self.ansible_provisioner
371                .print_ansible_run_banner("Provision Symmetric Private Nodes");
372            match self.ansible_provisioner.provision_symmetric_private_nodes(
373                &mut provision_options,
374                Some(initial_multiaddr.clone()),
375                Some(initial_network_contacts_url.clone()),
376                &private_node_inventory,
377            ) {
378                Ok(()) => {
379                    println!("Provisioned symmetric private nodes");
380                }
381                Err(err) => {
382                    log::error!("Failed to provision symmetric private nodes: {err}");
383                    node_provision_failed = true;
384                }
385            }
386        }
387
388        let should_provision_uploaders = options.desired_uploaders_count.is_some()
389            || options.desired_uploader_vm_count.is_some();
390        if should_provision_uploaders {
391            self.wait_for_ssh_availability_on_new_machines(
392                AnsibleInventoryType::Uploaders,
393                &options.current_inventory,
394            )?;
395            let genesis_network_contacts = get_bootstrap_cache_url(&initial_ip_addr);
396            self.ansible_provisioner
397                .print_ansible_run_banner("Provision Uploaders");
398            self.ansible_provisioner
399                .provision_uploaders(
400                    &provision_options,
401                    Some(initial_multiaddr.clone()),
402                    Some(genesis_network_contacts.clone()),
403                )
404                .await
405                .map_err(|err| {
406                    println!("Failed to provision uploaders {err:?}");
407                    err
408                })?;
409        }
410
411        if node_provision_failed {
412            println!();
413            println!("{}", "WARNING!".yellow());
414            println!("Some nodes failed to provision without error.");
415            println!("This usually means a small number of nodes failed to start on a few VMs.");
416            println!("However, most of the time the deployment will still be usable.");
417            println!("See the output from Ansible to determine which VMs had failures.");
418        }
419
420        Ok(())
421    }
422
423    pub async fn upscale_uploaders(&self, options: &UpscaleOptions) -> Result<()> {
424        let is_bootstrap_deploy = matches!(
425            options
426                .current_inventory
427                .environment_details
428                .deployment_type,
429            DeploymentType::Bootstrap
430        );
431
432        if is_bootstrap_deploy {
433            return Err(Error::InvalidUploaderUpscaleDeploymentType(
434                "bootstrap".to_string(),
435            ));
436        }
437
438        let desired_uploader_vm_count = options
439            .desired_uploader_vm_count
440            .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
441        if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
442            return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
443        }
444        debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
445
446        let mut infra_run_options = InfraRunOptions::generate_existing(
447            &options.current_inventory.name,
448            &self.terraform_runner,
449            &options.current_inventory.environment_details,
450        )
451        .await?;
452        infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
453
454        if options.plan {
455            self.plan(&infra_run_options)?;
456            return Ok(());
457        }
458
459        if !options.provision_only {
460            self.create_or_update_infra(&infra_run_options)
461                .map_err(|err| {
462                    println!("Failed to create infra {err:?}");
463                    err
464                })?;
465        }
466
467        if options.infra_only {
468            return Ok(());
469        }
470
471        let (initial_multiaddr, initial_ip_addr) =
472            get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
473                .map_err(|err| {
474                    println!("Failed to get genesis multiaddr {err:?}");
475                    err
476                })?;
477        let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
478        debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
479
480        let provision_options = ProvisionOptions {
481            binary_option: options.current_inventory.binary_option.clone(),
482            chunk_size: None,
483            client_env_variables: None,
484            downloaders_count: 0,
485            enable_telegraf: true,
486            evm_data_payments_address: options
487                .current_inventory
488                .environment_details
489                .evm_data_payments_address
490                .clone(),
491            evm_network: options
492                .current_inventory
493                .environment_details
494                .evm_network
495                .clone(),
496            evm_payment_token_address: options
497                .current_inventory
498                .environment_details
499                .evm_payment_token_address
500                .clone(),
501            evm_rpc_url: options
502                .current_inventory
503                .environment_details
504                .evm_rpc_url
505                .clone(),
506            full_cone_private_node_count: 0,
507            funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
508            interval: options.interval,
509            log_format: None,
510            logstash_details: None,
511            name: options.current_inventory.name.clone(),
512            network_id: options.current_inventory.environment_details.network_id,
513            node_count: 0,
514            node_env_variables: None,
515            max_archived_log_files: options.max_archived_log_files,
516            max_log_files: options.max_log_files,
517            output_inventory_dir_path: self
518                .working_directory_path
519                .join("ansible")
520                .join("inventory"),
521            peer_cache_node_count: 0,
522            public_rpc: options.public_rpc,
523            rewards_address: options
524                .current_inventory
525                .environment_details
526                .rewards_address
527                .clone(),
528            symmetric_private_node_count: 0,
529            ant_version: options.ant_version.clone(),
530            uploaders_count: options.desired_uploaders_count,
531            gas_amount: options.gas_amount,
532            token_amount: options.token_amount,
533        };
534
535        self.wait_for_ssh_availability_on_new_machines(
536            AnsibleInventoryType::Uploaders,
537            &options.current_inventory,
538        )?;
539        self.ansible_provisioner
540            .print_ansible_run_banner("Provision Uploaders");
541        self.ansible_provisioner
542            .provision_uploaders(
543                &provision_options,
544                Some(initial_multiaddr),
545                Some(initial_network_contacts_url),
546            )
547            .await
548            .map_err(|err| {
549                println!("Failed to provision uploaders {err:?}");
550                err
551            })?;
552
553        Ok(())
554    }
555
556    fn wait_for_ssh_availability_on_new_machines(
557        &self,
558        inventory_type: AnsibleInventoryType,
559        current_inventory: &DeploymentInventory,
560    ) -> Result<()> {
561        let inventory = self
562            .ansible_provisioner
563            .ansible_runner
564            .get_inventory(inventory_type, true)?;
565        let old_set: HashSet<_> = match inventory_type {
566            AnsibleInventoryType::PeerCacheNodes => current_inventory
567                .peer_cache_node_vms
568                .iter()
569                .map(|node_vm| &node_vm.vm)
570                .cloned()
571                .collect(),
572            AnsibleInventoryType::Nodes => current_inventory
573                .node_vms
574                .iter()
575                .map(|node_vm| &node_vm.vm)
576                .cloned()
577                .collect(),
578            AnsibleInventoryType::Uploaders => current_inventory
579                .uploader_vms
580                .iter()
581                .map(|uploader_vm| &uploader_vm.vm)
582                .cloned()
583                .collect(),
584            AnsibleInventoryType::FullConeNatGateway => current_inventory
585                .full_cone_nat_gateway_vms
586                .iter()
587                .cloned()
588                .collect(),
589            AnsibleInventoryType::SymmetricNatGateway => current_inventory
590                .symmetric_nat_gateway_vms
591                .iter()
592                .cloned()
593                .collect(),
594            AnsibleInventoryType::FullConePrivateNodes => current_inventory
595                .full_cone_private_node_vms
596                .iter()
597                .map(|node_vm| &node_vm.vm)
598                .cloned()
599                .collect(),
600            AnsibleInventoryType::SymmetricPrivateNodes => current_inventory
601                .symmetric_private_node_vms
602                .iter()
603                .map(|node_vm| &node_vm.vm)
604                .cloned()
605                .collect(),
606            it => return Err(Error::UpscaleInventoryTypeNotSupported(it.to_string())),
607        };
608        let new_vms: Vec<_> = inventory
609            .into_iter()
610            .filter(|item| !old_set.contains(item))
611            .collect();
612        for vm in new_vms.iter() {
613            self.ssh_client.wait_for_ssh_availability(
614                &vm.public_ip_addr,
615                &self.cloud_provider.get_ssh_user(),
616            )?;
617        }
618        Ok(())
619    }
620}