sn_testnet_deploy/
upscale.rs

1// Copyright (c) 2023, MaidSafe.
2// All rights reserved.
3//
4// This SAFE Network Software is licensed under the BSD-3-Clause license.
5// Please see the LICENSE file for more details.
6
7use crate::{
8    ansible::{
9        inventory::AnsibleInventoryType,
10        provisioning::{PrivateNodeProvisionInventory, ProvisionOptions},
11    },
12    error::{Error, Result},
13    get_bootstrap_cache_url, get_genesis_multiaddr, get_multiaddr, DeploymentInventory,
14    DeploymentType, InfraRunOptions, NodeType, TestnetDeployer,
15};
16use colored::Colorize;
17use evmlib::common::U256;
18use log::debug;
19use std::{collections::HashSet, time::Duration};
20
21#[derive(Clone)]
22pub struct UpscaleOptions {
23    pub ansible_verbose: bool,
24    pub ant_version: Option<String>,
25    pub current_inventory: DeploymentInventory,
26    pub desired_full_cone_private_node_count: Option<u16>,
27    pub desired_full_cone_private_node_vm_count: Option<u16>,
28    pub desired_node_count: Option<u16>,
29    pub desired_node_vm_count: Option<u16>,
30    pub desired_peer_cache_node_count: Option<u16>,
31    pub desired_peer_cache_node_vm_count: Option<u16>,
32    pub desired_symmetric_private_node_count: Option<u16>,
33    pub desired_symmetric_private_node_vm_count: Option<u16>,
34    pub desired_uploader_vm_count: Option<u16>,
35    pub desired_uploaders_count: Option<u16>,
36    pub funding_wallet_secret_key: Option<String>,
37    pub gas_amount: Option<U256>,
38    pub interval: Duration,
39    pub infra_only: bool,
40    pub max_archived_log_files: u16,
41    pub max_log_files: u16,
42    pub plan: bool,
43    pub public_rpc: bool,
44    pub provision_only: bool,
45    pub token_amount: Option<U256>,
46}
47
48impl TestnetDeployer {
49    pub async fn upscale(&self, options: &UpscaleOptions) -> Result<()> {
50        let is_bootstrap_deploy = matches!(
51            options
52                .current_inventory
53                .environment_details
54                .deployment_type,
55            DeploymentType::Bootstrap
56        );
57
58        if is_bootstrap_deploy
59            && (options.desired_peer_cache_node_count.is_some()
60                || options.desired_peer_cache_node_vm_count.is_some()
61                || options.desired_uploader_vm_count.is_some())
62        {
63            return Err(Error::InvalidUpscaleOptionsForBootstrapDeployment);
64        }
65
66        let desired_peer_cache_node_vm_count = options
67            .desired_peer_cache_node_vm_count
68            .unwrap_or(options.current_inventory.peer_cache_node_vms.len() as u16);
69        if desired_peer_cache_node_vm_count
70            < options.current_inventory.peer_cache_node_vms.len() as u16
71        {
72            return Err(Error::InvalidUpscaleDesiredPeerCacheVmCount);
73        }
74        debug!("Using {desired_peer_cache_node_vm_count} for desired Peer Cache node VM count");
75
76        let desired_node_vm_count = options
77            .desired_node_vm_count
78            .unwrap_or(options.current_inventory.node_vms.len() as u16);
79        if desired_node_vm_count < options.current_inventory.node_vms.len() as u16 {
80            return Err(Error::InvalidUpscaleDesiredNodeVmCount);
81        }
82        debug!("Using {desired_node_vm_count} for desired node VM count");
83
84        let desired_full_cone_private_node_vm_count = options
85            .desired_full_cone_private_node_vm_count
86            .unwrap_or(options.current_inventory.full_cone_private_node_vms.len() as u16);
87        if desired_full_cone_private_node_vm_count
88            < options.current_inventory.full_cone_private_node_vms.len() as u16
89        {
90            return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeVmCount);
91        }
92        debug!("Using {desired_full_cone_private_node_vm_count} for desired full cone private node VM count");
93
94        let desired_symmetric_private_node_vm_count = options
95            .desired_symmetric_private_node_vm_count
96            .unwrap_or(options.current_inventory.symmetric_private_node_vms.len() as u16);
97        if desired_symmetric_private_node_vm_count
98            < options.current_inventory.symmetric_private_node_vms.len() as u16
99        {
100            return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeVmCount);
101        }
102        debug!("Using {desired_symmetric_private_node_vm_count} for desired full cone private node VM count");
103
104        let desired_uploader_vm_count = options
105            .desired_uploader_vm_count
106            .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
107        if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
108            return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
109        }
110        debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
111
112        let desired_peer_cache_node_count = options
113            .desired_peer_cache_node_count
114            .unwrap_or(options.current_inventory.peer_cache_node_count() as u16);
115        if desired_peer_cache_node_count < options.current_inventory.peer_cache_node_count() as u16
116        {
117            return Err(Error::InvalidUpscaleDesiredPeerCacheNodeCount);
118        }
119        debug!("Using {desired_peer_cache_node_count} for desired peer cache node count");
120
121        let desired_node_count = options
122            .desired_node_count
123            .unwrap_or(options.current_inventory.node_count() as u16);
124        if desired_node_count < options.current_inventory.node_count() as u16 {
125            return Err(Error::InvalidUpscaleDesiredNodeCount);
126        }
127        debug!("Using {desired_node_count} for desired node count");
128
129        let desired_full_cone_private_node_count = options
130            .desired_full_cone_private_node_count
131            .unwrap_or(options.current_inventory.full_cone_private_node_count() as u16);
132        if desired_full_cone_private_node_count
133            < options.current_inventory.full_cone_private_node_count() as u16
134        {
135            return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeCount);
136        }
137        debug!(
138            "Using {desired_full_cone_private_node_count} for desired full cone private node count"
139        );
140
141        let desired_symmetric_private_node_count = options
142            .desired_symmetric_private_node_count
143            .unwrap_or(options.current_inventory.symmetric_private_node_count() as u16);
144        if desired_symmetric_private_node_count
145            < options.current_inventory.symmetric_private_node_count() as u16
146        {
147            return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeCount);
148        }
149        debug!(
150            "Using {desired_symmetric_private_node_count} for desired symmetric private node count"
151        );
152
153        let mut infra_run_options = InfraRunOptions::generate_existing(
154            &options.current_inventory.name,
155            &self.terraform_runner,
156            &options.current_inventory.environment_details,
157        )
158        .await?;
159        infra_run_options.peer_cache_node_vm_count = Some(desired_peer_cache_node_vm_count);
160        infra_run_options.node_vm_count = Some(desired_node_vm_count);
161        infra_run_options.full_cone_private_node_vm_count =
162            Some(desired_full_cone_private_node_vm_count);
163        infra_run_options.symmetric_private_node_vm_count =
164            Some(desired_symmetric_private_node_vm_count);
165        infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
166
167        if options.plan {
168            self.plan(&infra_run_options)?;
169            return Ok(());
170        }
171
172        self.create_or_update_infra(&infra_run_options)
173            .map_err(|err| {
174                println!("Failed to create infra {err:?}");
175                err
176            })?;
177
178        if options.infra_only {
179            return Ok(());
180        }
181
182        let mut provision_options = ProvisionOptions {
183            binary_option: options.current_inventory.binary_option.clone(),
184            chunk_size: None,
185            downloaders_count: 0,
186            env_variables: None,
187            evm_network: options
188                .current_inventory
189                .environment_details
190                .evm_network
191                .clone(),
192            evm_data_payments_address: options
193                .current_inventory
194                .environment_details
195                .evm_data_payments_address
196                .clone(),
197            evm_payment_token_address: options
198                .current_inventory
199                .environment_details
200                .evm_payment_token_address
201                .clone(),
202            evm_rpc_url: options
203                .current_inventory
204                .environment_details
205                .evm_rpc_url
206                .clone(),
207            full_cone_private_node_count: desired_full_cone_private_node_count,
208            funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
209            interval: options.interval,
210            log_format: None,
211            logstash_details: None,
212            name: options.current_inventory.name.clone(),
213            network_id: options.current_inventory.environment_details.network_id,
214            node_count: desired_node_count,
215            max_archived_log_files: options.max_archived_log_files,
216            max_log_files: options.max_log_files,
217            output_inventory_dir_path: self
218                .working_directory_path
219                .join("ansible")
220                .join("inventory"),
221            peer_cache_node_count: desired_peer_cache_node_count,
222            public_rpc: options.public_rpc,
223            rewards_address: options
224                .current_inventory
225                .environment_details
226                .rewards_address
227                .clone(),
228            symmetric_private_node_count: desired_symmetric_private_node_count,
229            ant_version: options.ant_version.clone(),
230            uploaders_count: options.desired_uploaders_count,
231            gas_amount: options.gas_amount,
232            token_amount: None,
233        };
234        let mut node_provision_failed = false;
235
236        let (initial_multiaddr, initial_ip_addr) = if is_bootstrap_deploy {
237            get_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client).map_err(
238                |err| {
239                    println!("Failed to get node multiaddr {err:?}");
240                    err
241                },
242            )?
243        } else {
244            get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
245                .map_err(|err| {
246                    println!("Failed to get genesis multiaddr {err:?}");
247                    err
248                })?
249        };
250        let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
251        debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
252
253        if !is_bootstrap_deploy {
254            self.wait_for_ssh_availability_on_new_machines(
255                AnsibleInventoryType::PeerCacheNodes,
256                &options.current_inventory,
257            )?;
258            self.ansible_provisioner
259                .print_ansible_run_banner("Provision Peer Cache Nodes");
260            match self.ansible_provisioner.provision_nodes(
261                &provision_options,
262                Some(initial_multiaddr.clone()),
263                Some(initial_network_contacts_url.clone()),
264                NodeType::PeerCache,
265            ) {
266                Ok(()) => {
267                    println!("Provisioned Peer Cache nodes");
268                }
269                Err(err) => {
270                    log::error!("Failed to provision Peer Cache nodes: {err}");
271                    node_provision_failed = true;
272                }
273            }
274        }
275
276        self.wait_for_ssh_availability_on_new_machines(
277            AnsibleInventoryType::Nodes,
278            &options.current_inventory,
279        )?;
280        self.ansible_provisioner
281            .print_ansible_run_banner("Provision Normal Nodes");
282        match self.ansible_provisioner.provision_nodes(
283            &provision_options,
284            Some(initial_multiaddr.clone()),
285            Some(initial_network_contacts_url.clone()),
286            NodeType::Generic,
287        ) {
288            Ok(()) => {
289                println!("Provisioned normal nodes");
290            }
291            Err(err) => {
292                log::error!("Failed to provision normal nodes: {err}");
293                node_provision_failed = true;
294            }
295        }
296
297        let private_node_inventory = PrivateNodeProvisionInventory::new(
298            &self.ansible_provisioner,
299            Some(desired_full_cone_private_node_vm_count),
300            Some(desired_symmetric_private_node_vm_count),
301        )?;
302
303        if private_node_inventory.should_provision_full_cone_private_nodes() {
304            let full_cone_nat_gateway_inventory = self
305                .ansible_provisioner
306                .ansible_runner
307                .get_inventory(AnsibleInventoryType::FullConeNatGateway, true)?;
308
309            let full_cone_nat_gateway_new_vms: Vec<_> = full_cone_nat_gateway_inventory
310                .into_iter()
311                .filter(|item| {
312                    !options
313                        .current_inventory
314                        .full_cone_nat_gateway_vms
315                        .contains(item)
316                })
317                .collect();
318
319            for vm in full_cone_nat_gateway_new_vms.iter() {
320                self.ssh_client.wait_for_ssh_availability(
321                    &vm.public_ip_addr,
322                    &self.cloud_provider.get_ssh_user(),
323                )?;
324            }
325
326            let full_cone_nat_gateway_new_vms = if full_cone_nat_gateway_new_vms.is_empty() {
327                None
328            } else {
329                debug!("Full Cone NAT Gateway new VMs: {full_cone_nat_gateway_new_vms:?}");
330                Some(full_cone_nat_gateway_new_vms)
331            };
332
333            match self.ansible_provisioner.provision_full_cone(
334                &provision_options,
335                Some(initial_multiaddr.clone()),
336                Some(initial_network_contacts_url.clone()),
337                private_node_inventory.clone(),
338                full_cone_nat_gateway_new_vms,
339            ) {
340                Ok(()) => {
341                    println!("Provisioned Full Cone nodes and Gateway");
342                }
343                Err(err) => {
344                    log::error!("Failed to provision Full Cone nodes and Gateway: {err}");
345                    node_provision_failed = true;
346                }
347            }
348        }
349
350        if private_node_inventory.should_provision_symmetric_private_nodes() {
351            self.wait_for_ssh_availability_on_new_machines(
352                AnsibleInventoryType::SymmetricNatGateway,
353                &options.current_inventory,
354            )?;
355            self.ansible_provisioner
356                .print_ansible_run_banner("Provision Symmetric NAT Gateway");
357            self.ansible_provisioner
358                .provision_symmetric_nat_gateway(&provision_options, &private_node_inventory)
359                .map_err(|err| {
360                    println!("Failed to provision symmetric NAT gateway {err:?}");
361                    err
362                })?;
363
364            self.wait_for_ssh_availability_on_new_machines(
365                AnsibleInventoryType::SymmetricPrivateNodes,
366                &options.current_inventory,
367            )?;
368            self.ansible_provisioner
369                .print_ansible_run_banner("Provision Symmetric Private Nodes");
370            match self.ansible_provisioner.provision_symmetric_private_nodes(
371                &mut provision_options,
372                Some(initial_multiaddr.clone()),
373                Some(initial_network_contacts_url.clone()),
374                &private_node_inventory,
375            ) {
376                Ok(()) => {
377                    println!("Provisioned symmetric private nodes");
378                }
379                Err(err) => {
380                    log::error!("Failed to provision symmetric private nodes: {err}");
381                    node_provision_failed = true;
382                }
383            }
384        }
385
386        let should_provision_uploaders = options.desired_uploaders_count.is_some()
387            || options.desired_uploader_vm_count.is_some();
388        if should_provision_uploaders {
389            self.wait_for_ssh_availability_on_new_machines(
390                AnsibleInventoryType::Uploaders,
391                &options.current_inventory,
392            )?;
393            let genesis_network_contacts = get_bootstrap_cache_url(&initial_ip_addr);
394            self.ansible_provisioner
395                .print_ansible_run_banner("Provision Uploaders");
396            self.ansible_provisioner
397                .provision_uploaders(
398                    &provision_options,
399                    Some(initial_multiaddr.clone()),
400                    Some(genesis_network_contacts.clone()),
401                )
402                .await
403                .map_err(|err| {
404                    println!("Failed to provision uploaders {err:?}");
405                    err
406                })?;
407        }
408
409        if node_provision_failed {
410            println!();
411            println!("{}", "WARNING!".yellow());
412            println!("Some nodes failed to provision without error.");
413            println!("This usually means a small number of nodes failed to start on a few VMs.");
414            println!("However, most of the time the deployment will still be usable.");
415            println!("See the output from Ansible to determine which VMs had failures.");
416        }
417
418        Ok(())
419    }
420
421    pub async fn upscale_uploaders(&self, options: &UpscaleOptions) -> Result<()> {
422        let is_bootstrap_deploy = matches!(
423            options
424                .current_inventory
425                .environment_details
426                .deployment_type,
427            DeploymentType::Bootstrap
428        );
429
430        if is_bootstrap_deploy {
431            return Err(Error::InvalidUploaderUpscaleDeploymentType(
432                "bootstrap".to_string(),
433            ));
434        }
435
436        let desired_uploader_vm_count = options
437            .desired_uploader_vm_count
438            .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
439        if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
440            return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
441        }
442        debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
443
444        let mut infra_run_options = InfraRunOptions::generate_existing(
445            &options.current_inventory.name,
446            &self.terraform_runner,
447            &options.current_inventory.environment_details,
448        )
449        .await?;
450        infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
451
452        if options.plan {
453            self.plan(&infra_run_options)?;
454            return Ok(());
455        }
456
457        if !options.provision_only {
458            self.create_or_update_infra(&infra_run_options)
459                .map_err(|err| {
460                    println!("Failed to create infra {err:?}");
461                    err
462                })?;
463        }
464
465        if options.infra_only {
466            return Ok(());
467        }
468
469        let (initial_multiaddr, initial_ip_addr) =
470            get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
471                .map_err(|err| {
472                    println!("Failed to get genesis multiaddr {err:?}");
473                    err
474                })?;
475        let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
476        debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
477
478        let provision_options = ProvisionOptions {
479            binary_option: options.current_inventory.binary_option.clone(),
480            chunk_size: None,
481            downloaders_count: 0,
482            env_variables: None,
483            evm_data_payments_address: options
484                .current_inventory
485                .environment_details
486                .evm_data_payments_address
487                .clone(),
488            evm_network: options
489                .current_inventory
490                .environment_details
491                .evm_network
492                .clone(),
493            evm_payment_token_address: options
494                .current_inventory
495                .environment_details
496                .evm_payment_token_address
497                .clone(),
498            evm_rpc_url: options
499                .current_inventory
500                .environment_details
501                .evm_rpc_url
502                .clone(),
503            full_cone_private_node_count: 0,
504            funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
505            interval: options.interval,
506            log_format: None,
507            logstash_details: None,
508            name: options.current_inventory.name.clone(),
509            network_id: options.current_inventory.environment_details.network_id,
510            node_count: 0,
511            max_archived_log_files: options.max_archived_log_files,
512            max_log_files: options.max_log_files,
513            output_inventory_dir_path: self
514                .working_directory_path
515                .join("ansible")
516                .join("inventory"),
517            peer_cache_node_count: 0,
518
519            public_rpc: options.public_rpc,
520            rewards_address: options
521                .current_inventory
522                .environment_details
523                .rewards_address
524                .clone(),
525            symmetric_private_node_count: 0,
526            ant_version: options.ant_version.clone(),
527            uploaders_count: options.desired_uploaders_count,
528            gas_amount: options.gas_amount,
529            token_amount: options.token_amount,
530        };
531
532        self.wait_for_ssh_availability_on_new_machines(
533            AnsibleInventoryType::Uploaders,
534            &options.current_inventory,
535        )?;
536        self.ansible_provisioner
537            .print_ansible_run_banner("Provision Uploaders");
538        self.ansible_provisioner
539            .provision_uploaders(
540                &provision_options,
541                Some(initial_multiaddr),
542                Some(initial_network_contacts_url),
543            )
544            .await
545            .map_err(|err| {
546                println!("Failed to provision uploaders {err:?}");
547                err
548            })?;
549
550        Ok(())
551    }
552
553    fn wait_for_ssh_availability_on_new_machines(
554        &self,
555        inventory_type: AnsibleInventoryType,
556        current_inventory: &DeploymentInventory,
557    ) -> Result<()> {
558        let inventory = self
559            .ansible_provisioner
560            .ansible_runner
561            .get_inventory(inventory_type, true)?;
562        let old_set: HashSet<_> = match inventory_type {
563            AnsibleInventoryType::PeerCacheNodes => current_inventory
564                .peer_cache_node_vms
565                .iter()
566                .map(|node_vm| &node_vm.vm)
567                .cloned()
568                .collect(),
569            AnsibleInventoryType::Nodes => current_inventory
570                .node_vms
571                .iter()
572                .map(|node_vm| &node_vm.vm)
573                .cloned()
574                .collect(),
575            AnsibleInventoryType::Uploaders => current_inventory
576                .uploader_vms
577                .iter()
578                .map(|uploader_vm| &uploader_vm.vm)
579                .cloned()
580                .collect(),
581            AnsibleInventoryType::FullConeNatGateway => current_inventory
582                .full_cone_nat_gateway_vms
583                .iter()
584                .cloned()
585                .collect(),
586            AnsibleInventoryType::SymmetricNatGateway => current_inventory
587                .symmetric_nat_gateway_vms
588                .iter()
589                .cloned()
590                .collect(),
591            AnsibleInventoryType::FullConePrivateNodes => current_inventory
592                .full_cone_private_node_vms
593                .iter()
594                .map(|node_vm| &node_vm.vm)
595                .cloned()
596                .collect(),
597            AnsibleInventoryType::SymmetricPrivateNodes => current_inventory
598                .symmetric_private_node_vms
599                .iter()
600                .map(|node_vm| &node_vm.vm)
601                .cloned()
602                .collect(),
603            it => return Err(Error::UpscaleInventoryTypeNotSupported(it.to_string())),
604        };
605        let new_vms: Vec<_> = inventory
606            .into_iter()
607            .filter(|item| !old_set.contains(item))
608            .collect();
609        for vm in new_vms.iter() {
610            self.ssh_client.wait_for_ssh_availability(
611                &vm.public_ip_addr,
612                &self.cloud_provider.get_ssh_user(),
613            )?;
614        }
615        Ok(())
616    }
617}