sn_testnet_deploy/
upscale.rs

1// Copyright (c) 2023, MaidSafe.
2// All rights reserved.
3//
4// This SAFE Network Software is licensed under the BSD-3-Clause license.
5// Please see the LICENSE file for more details.
6
7use crate::{
8    ansible::{
9        inventory::AnsibleInventoryType,
10        provisioning::{PrivateNodeProvisionInventory, ProvisionOptions},
11    },
12    error::{Error, Result},
13    get_bootstrap_cache_url, get_genesis_multiaddr, get_multiaddr, DeploymentInventory,
14    DeploymentType, InfraRunOptions, NodeType, TestnetDeployer,
15};
16use colored::Colorize;
17use evmlib::common::U256;
18use log::debug;
19use std::{collections::HashSet, time::Duration};
20
21#[derive(Clone)]
22pub struct UpscaleOptions {
23    pub ansible_verbose: bool,
24    pub ant_version: Option<String>,
25    pub current_inventory: DeploymentInventory,
26    pub desired_full_cone_private_node_count: Option<u16>,
27    pub desired_full_cone_private_node_vm_count: Option<u16>,
28    pub desired_node_count: Option<u16>,
29    pub desired_node_vm_count: Option<u16>,
30    pub desired_peer_cache_node_count: Option<u16>,
31    pub desired_peer_cache_node_vm_count: Option<u16>,
32    pub desired_symmetric_private_node_count: Option<u16>,
33    pub desired_symmetric_private_node_vm_count: Option<u16>,
34    pub desired_uploader_vm_count: Option<u16>,
35    pub desired_uploaders_count: Option<u16>,
36    pub funding_wallet_secret_key: Option<String>,
37    pub gas_amount: Option<U256>,
38    pub interval: Duration,
39    pub infra_only: bool,
40    pub max_archived_log_files: u16,
41    pub max_log_files: u16,
42    pub plan: bool,
43    pub public_rpc: bool,
44    pub provision_only: bool,
45    pub token_amount: Option<U256>,
46}
47
48impl TestnetDeployer {
49    pub async fn upscale(&self, options: &UpscaleOptions) -> Result<()> {
50        let is_bootstrap_deploy = matches!(
51            options
52                .current_inventory
53                .environment_details
54                .deployment_type,
55            DeploymentType::Bootstrap
56        );
57
58        if is_bootstrap_deploy
59            && (options.desired_peer_cache_node_count.is_some()
60                || options.desired_peer_cache_node_vm_count.is_some()
61                || options.desired_uploader_vm_count.is_some())
62        {
63            return Err(Error::InvalidUpscaleOptionsForBootstrapDeployment);
64        }
65
66        let desired_peer_cache_node_vm_count = options
67            .desired_peer_cache_node_vm_count
68            .unwrap_or(options.current_inventory.peer_cache_node_vms.len() as u16);
69        if desired_peer_cache_node_vm_count
70            < options.current_inventory.peer_cache_node_vms.len() as u16
71        {
72            return Err(Error::InvalidUpscaleDesiredPeerCacheVmCount);
73        }
74        debug!("Using {desired_peer_cache_node_vm_count} for desired Peer Cache node VM count");
75
76        let desired_node_vm_count = options
77            .desired_node_vm_count
78            .unwrap_or(options.current_inventory.node_vms.len() as u16);
79        if desired_node_vm_count < options.current_inventory.node_vms.len() as u16 {
80            return Err(Error::InvalidUpscaleDesiredNodeVmCount);
81        }
82        debug!("Using {desired_node_vm_count} for desired node VM count");
83
84        let desired_full_cone_private_node_vm_count = options
85            .desired_full_cone_private_node_vm_count
86            .unwrap_or(options.current_inventory.full_cone_private_node_vms.len() as u16);
87        if desired_full_cone_private_node_vm_count
88            < options.current_inventory.full_cone_private_node_vms.len() as u16
89        {
90            return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeVmCount);
91        }
92        debug!("Using {desired_full_cone_private_node_vm_count} for desired full cone private node VM count");
93
94        let desired_symmetric_private_node_vm_count = options
95            .desired_symmetric_private_node_vm_count
96            .unwrap_or(options.current_inventory.symmetric_private_node_vms.len() as u16);
97        if desired_symmetric_private_node_vm_count
98            < options.current_inventory.symmetric_private_node_vms.len() as u16
99        {
100            return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeVmCount);
101        }
102        debug!("Using {desired_symmetric_private_node_vm_count} for desired full cone private node VM count");
103
104        let desired_uploader_vm_count = options
105            .desired_uploader_vm_count
106            .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
107        if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
108            return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
109        }
110        debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
111
112        let desired_peer_cache_node_count = options
113            .desired_peer_cache_node_count
114            .unwrap_or(options.current_inventory.peer_cache_node_count() as u16);
115        if desired_peer_cache_node_count < options.current_inventory.peer_cache_node_count() as u16
116        {
117            return Err(Error::InvalidUpscaleDesiredPeerCacheNodeCount);
118        }
119        debug!("Using {desired_peer_cache_node_count} for desired peer cache node count");
120
121        let desired_node_count = options
122            .desired_node_count
123            .unwrap_or(options.current_inventory.node_count() as u16);
124        if desired_node_count < options.current_inventory.node_count() as u16 {
125            return Err(Error::InvalidUpscaleDesiredNodeCount);
126        }
127        debug!("Using {desired_node_count} for desired node count");
128
129        let desired_full_cone_private_node_count = options
130            .desired_full_cone_private_node_count
131            .unwrap_or(options.current_inventory.full_cone_private_node_count() as u16);
132        if desired_full_cone_private_node_count
133            < options.current_inventory.full_cone_private_node_count() as u16
134        {
135            return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeCount);
136        }
137        debug!(
138            "Using {desired_full_cone_private_node_count} for desired full cone private node count"
139        );
140
141        let desired_symmetric_private_node_count = options
142            .desired_symmetric_private_node_count
143            .unwrap_or(options.current_inventory.symmetric_private_node_count() as u16);
144        if desired_symmetric_private_node_count
145            < options.current_inventory.symmetric_private_node_count() as u16
146        {
147            return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeCount);
148        }
149        debug!(
150            "Using {desired_symmetric_private_node_count} for desired symmetric private node count"
151        );
152
153        let mut infra_run_options = InfraRunOptions::generate_existing(
154            &options.current_inventory.name,
155            &self.terraform_runner,
156            &options.current_inventory.environment_details,
157        )
158        .await?;
159        infra_run_options.peer_cache_node_vm_count = Some(desired_peer_cache_node_vm_count);
160        infra_run_options.node_vm_count = Some(desired_node_vm_count);
161        infra_run_options.full_cone_private_node_vm_count =
162            Some(desired_full_cone_private_node_vm_count);
163        infra_run_options.symmetric_private_node_vm_count =
164            Some(desired_symmetric_private_node_vm_count);
165        infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
166
167        if options.plan {
168            self.plan(&infra_run_options)?;
169            return Ok(());
170        }
171
172        self.create_or_update_infra(&infra_run_options)
173            .map_err(|err| {
174                println!("Failed to create infra {err:?}");
175                err
176            })?;
177
178        if options.infra_only {
179            return Ok(());
180        }
181
182        let mut provision_options = ProvisionOptions {
183            binary_option: options.current_inventory.binary_option.clone(),
184            chunk_size: None,
185            downloaders_count: 0,
186            enable_telegraf: true,
187            env_variables: None,
188            evm_network: options
189                .current_inventory
190                .environment_details
191                .evm_network
192                .clone(),
193            evm_data_payments_address: options
194                .current_inventory
195                .environment_details
196                .evm_data_payments_address
197                .clone(),
198            evm_payment_token_address: options
199                .current_inventory
200                .environment_details
201                .evm_payment_token_address
202                .clone(),
203            evm_rpc_url: options
204                .current_inventory
205                .environment_details
206                .evm_rpc_url
207                .clone(),
208            full_cone_private_node_count: desired_full_cone_private_node_count,
209            funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
210            interval: options.interval,
211            log_format: None,
212            logstash_details: None,
213            name: options.current_inventory.name.clone(),
214            network_id: options.current_inventory.environment_details.network_id,
215            node_count: desired_node_count,
216            max_archived_log_files: options.max_archived_log_files,
217            max_log_files: options.max_log_files,
218            output_inventory_dir_path: self
219                .working_directory_path
220                .join("ansible")
221                .join("inventory"),
222            peer_cache_node_count: desired_peer_cache_node_count,
223            public_rpc: options.public_rpc,
224            rewards_address: options
225                .current_inventory
226                .environment_details
227                .rewards_address
228                .clone(),
229            symmetric_private_node_count: desired_symmetric_private_node_count,
230            ant_version: options.ant_version.clone(),
231            uploaders_count: options.desired_uploaders_count,
232            gas_amount: options.gas_amount,
233            token_amount: None,
234        };
235        let mut node_provision_failed = false;
236
237        let (initial_multiaddr, initial_ip_addr) = if is_bootstrap_deploy {
238            get_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client).map_err(
239                |err| {
240                    println!("Failed to get node multiaddr {err:?}");
241                    err
242                },
243            )?
244        } else {
245            get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
246                .map_err(|err| {
247                    println!("Failed to get genesis multiaddr {err:?}");
248                    err
249                })?
250        };
251        let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
252        debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
253
254        if !is_bootstrap_deploy {
255            self.wait_for_ssh_availability_on_new_machines(
256                AnsibleInventoryType::PeerCacheNodes,
257                &options.current_inventory,
258            )?;
259            self.ansible_provisioner
260                .print_ansible_run_banner("Provision Peer Cache Nodes");
261            match self.ansible_provisioner.provision_nodes(
262                &provision_options,
263                Some(initial_multiaddr.clone()),
264                Some(initial_network_contacts_url.clone()),
265                NodeType::PeerCache,
266            ) {
267                Ok(()) => {
268                    println!("Provisioned Peer Cache nodes");
269                }
270                Err(err) => {
271                    log::error!("Failed to provision Peer Cache nodes: {err}");
272                    node_provision_failed = true;
273                }
274            }
275        }
276
277        self.wait_for_ssh_availability_on_new_machines(
278            AnsibleInventoryType::Nodes,
279            &options.current_inventory,
280        )?;
281        self.ansible_provisioner
282            .print_ansible_run_banner("Provision Normal Nodes");
283        match self.ansible_provisioner.provision_nodes(
284            &provision_options,
285            Some(initial_multiaddr.clone()),
286            Some(initial_network_contacts_url.clone()),
287            NodeType::Generic,
288        ) {
289            Ok(()) => {
290                println!("Provisioned normal nodes");
291            }
292            Err(err) => {
293                log::error!("Failed to provision normal nodes: {err}");
294                node_provision_failed = true;
295            }
296        }
297
298        let private_node_inventory = PrivateNodeProvisionInventory::new(
299            &self.ansible_provisioner,
300            Some(desired_full_cone_private_node_vm_count),
301            Some(desired_symmetric_private_node_vm_count),
302        )?;
303
304        if private_node_inventory.should_provision_full_cone_private_nodes() {
305            let full_cone_nat_gateway_inventory = self
306                .ansible_provisioner
307                .ansible_runner
308                .get_inventory(AnsibleInventoryType::FullConeNatGateway, true)?;
309
310            let full_cone_nat_gateway_new_vms: Vec<_> = full_cone_nat_gateway_inventory
311                .into_iter()
312                .filter(|item| {
313                    !options
314                        .current_inventory
315                        .full_cone_nat_gateway_vms
316                        .contains(item)
317                })
318                .collect();
319
320            for vm in full_cone_nat_gateway_new_vms.iter() {
321                self.ssh_client.wait_for_ssh_availability(
322                    &vm.public_ip_addr,
323                    &self.cloud_provider.get_ssh_user(),
324                )?;
325            }
326
327            let full_cone_nat_gateway_new_vms = if full_cone_nat_gateway_new_vms.is_empty() {
328                None
329            } else {
330                debug!("Full Cone NAT Gateway new VMs: {full_cone_nat_gateway_new_vms:?}");
331                Some(full_cone_nat_gateway_new_vms)
332            };
333
334            match self.ansible_provisioner.provision_full_cone(
335                &provision_options,
336                Some(initial_multiaddr.clone()),
337                Some(initial_network_contacts_url.clone()),
338                private_node_inventory.clone(),
339                full_cone_nat_gateway_new_vms,
340            ) {
341                Ok(()) => {
342                    println!("Provisioned Full Cone nodes and Gateway");
343                }
344                Err(err) => {
345                    log::error!("Failed to provision Full Cone nodes and Gateway: {err}");
346                    node_provision_failed = true;
347                }
348            }
349        }
350
351        if private_node_inventory.should_provision_symmetric_private_nodes() {
352            self.wait_for_ssh_availability_on_new_machines(
353                AnsibleInventoryType::SymmetricNatGateway,
354                &options.current_inventory,
355            )?;
356            self.ansible_provisioner
357                .print_ansible_run_banner("Provision Symmetric NAT Gateway");
358            self.ansible_provisioner
359                .provision_symmetric_nat_gateway(&provision_options, &private_node_inventory)
360                .map_err(|err| {
361                    println!("Failed to provision symmetric NAT gateway {err:?}");
362                    err
363                })?;
364
365            self.wait_for_ssh_availability_on_new_machines(
366                AnsibleInventoryType::SymmetricPrivateNodes,
367                &options.current_inventory,
368            )?;
369            self.ansible_provisioner
370                .print_ansible_run_banner("Provision Symmetric Private Nodes");
371            match self.ansible_provisioner.provision_symmetric_private_nodes(
372                &mut provision_options,
373                Some(initial_multiaddr.clone()),
374                Some(initial_network_contacts_url.clone()),
375                &private_node_inventory,
376            ) {
377                Ok(()) => {
378                    println!("Provisioned symmetric private nodes");
379                }
380                Err(err) => {
381                    log::error!("Failed to provision symmetric private nodes: {err}");
382                    node_provision_failed = true;
383                }
384            }
385        }
386
387        let should_provision_uploaders = options.desired_uploaders_count.is_some()
388            || options.desired_uploader_vm_count.is_some();
389        if should_provision_uploaders {
390            self.wait_for_ssh_availability_on_new_machines(
391                AnsibleInventoryType::Uploaders,
392                &options.current_inventory,
393            )?;
394            let genesis_network_contacts = get_bootstrap_cache_url(&initial_ip_addr);
395            self.ansible_provisioner
396                .print_ansible_run_banner("Provision Uploaders");
397            self.ansible_provisioner
398                .provision_uploaders(
399                    &provision_options,
400                    Some(initial_multiaddr.clone()),
401                    Some(genesis_network_contacts.clone()),
402                )
403                .await
404                .map_err(|err| {
405                    println!("Failed to provision uploaders {err:?}");
406                    err
407                })?;
408        }
409
410        if node_provision_failed {
411            println!();
412            println!("{}", "WARNING!".yellow());
413            println!("Some nodes failed to provision without error.");
414            println!("This usually means a small number of nodes failed to start on a few VMs.");
415            println!("However, most of the time the deployment will still be usable.");
416            println!("See the output from Ansible to determine which VMs had failures.");
417        }
418
419        Ok(())
420    }
421
422    pub async fn upscale_uploaders(&self, options: &UpscaleOptions) -> Result<()> {
423        let is_bootstrap_deploy = matches!(
424            options
425                .current_inventory
426                .environment_details
427                .deployment_type,
428            DeploymentType::Bootstrap
429        );
430
431        if is_bootstrap_deploy {
432            return Err(Error::InvalidUploaderUpscaleDeploymentType(
433                "bootstrap".to_string(),
434            ));
435        }
436
437        let desired_uploader_vm_count = options
438            .desired_uploader_vm_count
439            .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
440        if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
441            return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
442        }
443        debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
444
445        let mut infra_run_options = InfraRunOptions::generate_existing(
446            &options.current_inventory.name,
447            &self.terraform_runner,
448            &options.current_inventory.environment_details,
449        )
450        .await?;
451        infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
452
453        if options.plan {
454            self.plan(&infra_run_options)?;
455            return Ok(());
456        }
457
458        if !options.provision_only {
459            self.create_or_update_infra(&infra_run_options)
460                .map_err(|err| {
461                    println!("Failed to create infra {err:?}");
462                    err
463                })?;
464        }
465
466        if options.infra_only {
467            return Ok(());
468        }
469
470        let (initial_multiaddr, initial_ip_addr) =
471            get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
472                .map_err(|err| {
473                    println!("Failed to get genesis multiaddr {err:?}");
474                    err
475                })?;
476        let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
477        debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
478
479        let provision_options = ProvisionOptions {
480            binary_option: options.current_inventory.binary_option.clone(),
481            chunk_size: None,
482            downloaders_count: 0,
483            enable_telegraf: true,
484            env_variables: None,
485            evm_data_payments_address: options
486                .current_inventory
487                .environment_details
488                .evm_data_payments_address
489                .clone(),
490            evm_network: options
491                .current_inventory
492                .environment_details
493                .evm_network
494                .clone(),
495            evm_payment_token_address: options
496                .current_inventory
497                .environment_details
498                .evm_payment_token_address
499                .clone(),
500            evm_rpc_url: options
501                .current_inventory
502                .environment_details
503                .evm_rpc_url
504                .clone(),
505            full_cone_private_node_count: 0,
506            funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
507            interval: options.interval,
508            log_format: None,
509            logstash_details: None,
510            name: options.current_inventory.name.clone(),
511            network_id: options.current_inventory.environment_details.network_id,
512            node_count: 0,
513            max_archived_log_files: options.max_archived_log_files,
514            max_log_files: options.max_log_files,
515            output_inventory_dir_path: self
516                .working_directory_path
517                .join("ansible")
518                .join("inventory"),
519            peer_cache_node_count: 0,
520            public_rpc: options.public_rpc,
521            rewards_address: options
522                .current_inventory
523                .environment_details
524                .rewards_address
525                .clone(),
526            symmetric_private_node_count: 0,
527            ant_version: options.ant_version.clone(),
528            uploaders_count: options.desired_uploaders_count,
529            gas_amount: options.gas_amount,
530            token_amount: options.token_amount,
531        };
532
533        self.wait_for_ssh_availability_on_new_machines(
534            AnsibleInventoryType::Uploaders,
535            &options.current_inventory,
536        )?;
537        self.ansible_provisioner
538            .print_ansible_run_banner("Provision Uploaders");
539        self.ansible_provisioner
540            .provision_uploaders(
541                &provision_options,
542                Some(initial_multiaddr),
543                Some(initial_network_contacts_url),
544            )
545            .await
546            .map_err(|err| {
547                println!("Failed to provision uploaders {err:?}");
548                err
549            })?;
550
551        Ok(())
552    }
553
554    fn wait_for_ssh_availability_on_new_machines(
555        &self,
556        inventory_type: AnsibleInventoryType,
557        current_inventory: &DeploymentInventory,
558    ) -> Result<()> {
559        let inventory = self
560            .ansible_provisioner
561            .ansible_runner
562            .get_inventory(inventory_type, true)?;
563        let old_set: HashSet<_> = match inventory_type {
564            AnsibleInventoryType::PeerCacheNodes => current_inventory
565                .peer_cache_node_vms
566                .iter()
567                .map(|node_vm| &node_vm.vm)
568                .cloned()
569                .collect(),
570            AnsibleInventoryType::Nodes => current_inventory
571                .node_vms
572                .iter()
573                .map(|node_vm| &node_vm.vm)
574                .cloned()
575                .collect(),
576            AnsibleInventoryType::Uploaders => current_inventory
577                .uploader_vms
578                .iter()
579                .map(|uploader_vm| &uploader_vm.vm)
580                .cloned()
581                .collect(),
582            AnsibleInventoryType::FullConeNatGateway => current_inventory
583                .full_cone_nat_gateway_vms
584                .iter()
585                .cloned()
586                .collect(),
587            AnsibleInventoryType::SymmetricNatGateway => current_inventory
588                .symmetric_nat_gateway_vms
589                .iter()
590                .cloned()
591                .collect(),
592            AnsibleInventoryType::FullConePrivateNodes => current_inventory
593                .full_cone_private_node_vms
594                .iter()
595                .map(|node_vm| &node_vm.vm)
596                .cloned()
597                .collect(),
598            AnsibleInventoryType::SymmetricPrivateNodes => current_inventory
599                .symmetric_private_node_vms
600                .iter()
601                .map(|node_vm| &node_vm.vm)
602                .cloned()
603                .collect(),
604            it => return Err(Error::UpscaleInventoryTypeNotSupported(it.to_string())),
605        };
606        let new_vms: Vec<_> = inventory
607            .into_iter()
608            .filter(|item| !old_set.contains(item))
609            .collect();
610        for vm in new_vms.iter() {
611            self.ssh_client.wait_for_ssh_availability(
612                &vm.public_ip_addr,
613                &self.cloud_provider.get_ssh_user(),
614            )?;
615        }
616        Ok(())
617    }
618}