sn_testnet_deploy/
upscale.rs

1// Copyright (c) 2023, MaidSafe.
2// All rights reserved.
3//
4// This SAFE Network Software is licensed under the BSD-3-Clause license.
5// Please see the LICENSE file for more details.
6
7use crate::{
8    ansible::{
9        inventory::AnsibleInventoryType,
10        provisioning::{PrivateNodeProvisionInventory, ProvisionOptions},
11    },
12    error::{Error, Result},
13    get_bootstrap_cache_url, get_genesis_multiaddr, get_multiaddr, DeploymentInventory,
14    DeploymentType, InfraRunOptions, NodeType, TestnetDeployer,
15};
16use colored::Colorize;
17use evmlib::common::U256;
18use log::debug;
19use std::{collections::HashSet, time::Duration};
20
21#[derive(Clone)]
22pub struct UpscaleOptions {
23    pub ansible_verbose: bool,
24    pub ant_version: Option<String>,
25    pub current_inventory: DeploymentInventory,
26    pub desired_full_cone_private_node_count: Option<u16>,
27    pub desired_full_cone_private_node_vm_count: Option<u16>,
28    pub desired_node_count: Option<u16>,
29    pub desired_node_vm_count: Option<u16>,
30    pub desired_peer_cache_node_count: Option<u16>,
31    pub desired_peer_cache_node_vm_count: Option<u16>,
32    pub desired_symmetric_private_node_count: Option<u16>,
33    pub desired_symmetric_private_node_vm_count: Option<u16>,
34    pub desired_uploader_vm_count: Option<u16>,
35    pub desired_uploaders_count: Option<u16>,
36    pub funding_wallet_secret_key: Option<String>,
37    pub gas_amount: Option<U256>,
38    pub interval: Duration,
39    pub infra_only: bool,
40    pub max_archived_log_files: u16,
41    pub max_log_files: u16,
42    pub plan: bool,
43    pub public_rpc: bool,
44    pub provision_only: bool,
45    pub token_amount: Option<U256>,
46}
47
48impl TestnetDeployer {
49    pub async fn upscale(&self, options: &UpscaleOptions) -> Result<()> {
50        let is_bootstrap_deploy = matches!(
51            options
52                .current_inventory
53                .environment_details
54                .deployment_type,
55            DeploymentType::Bootstrap
56        );
57
58        if is_bootstrap_deploy
59            && (options.desired_peer_cache_node_count.is_some()
60                || options.desired_peer_cache_node_vm_count.is_some()
61                || options.desired_uploader_vm_count.is_some())
62        {
63            return Err(Error::InvalidUpscaleOptionsForBootstrapDeployment);
64        }
65
66        let desired_peer_cache_node_vm_count = options
67            .desired_peer_cache_node_vm_count
68            .unwrap_or(options.current_inventory.peer_cache_node_vms.len() as u16);
69        if desired_peer_cache_node_vm_count
70            < options.current_inventory.peer_cache_node_vms.len() as u16
71        {
72            return Err(Error::InvalidUpscaleDesiredPeerCacheVmCount);
73        }
74        debug!("Using {desired_peer_cache_node_vm_count} for desired Peer Cache node VM count");
75
76        let desired_node_vm_count = options
77            .desired_node_vm_count
78            .unwrap_or(options.current_inventory.node_vms.len() as u16);
79        if desired_node_vm_count < options.current_inventory.node_vms.len() as u16 {
80            return Err(Error::InvalidUpscaleDesiredNodeVmCount);
81        }
82        debug!("Using {desired_node_vm_count} for desired node VM count");
83
84        let desired_full_cone_private_node_vm_count = options
85            .desired_full_cone_private_node_vm_count
86            .unwrap_or(options.current_inventory.full_cone_private_node_vms.len() as u16);
87        if desired_full_cone_private_node_vm_count
88            < options.current_inventory.full_cone_private_node_vms.len() as u16
89        {
90            return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeVmCount);
91        }
92        debug!("Using {desired_full_cone_private_node_vm_count} for desired full cone private node VM count");
93
94        let desired_symmetric_private_node_vm_count = options
95            .desired_symmetric_private_node_vm_count
96            .unwrap_or(options.current_inventory.symmetric_private_node_vms.len() as u16);
97        if desired_symmetric_private_node_vm_count
98            < options.current_inventory.symmetric_private_node_vms.len() as u16
99        {
100            return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeVmCount);
101        }
102        debug!("Using {desired_symmetric_private_node_vm_count} for desired full cone private node VM count");
103
104        let desired_uploader_vm_count = options
105            .desired_uploader_vm_count
106            .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
107        if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
108            return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
109        }
110        debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
111
112        let desired_peer_cache_node_count = options
113            .desired_peer_cache_node_count
114            .unwrap_or(options.current_inventory.peer_cache_node_count() as u16);
115        if desired_peer_cache_node_count < options.current_inventory.peer_cache_node_count() as u16
116        {
117            return Err(Error::InvalidUpscaleDesiredPeerCacheNodeCount);
118        }
119        debug!("Using {desired_peer_cache_node_count} for desired peer cache node count");
120
121        let desired_node_count = options
122            .desired_node_count
123            .unwrap_or(options.current_inventory.node_count() as u16);
124        if desired_node_count < options.current_inventory.node_count() as u16 {
125            return Err(Error::InvalidUpscaleDesiredNodeCount);
126        }
127        debug!("Using {desired_node_count} for desired node count");
128
129        let desired_full_cone_private_node_count = options
130            .desired_full_cone_private_node_count
131            .unwrap_or(options.current_inventory.full_cone_private_node_count() as u16);
132        if desired_full_cone_private_node_count
133            < options.current_inventory.full_cone_private_node_count() as u16
134        {
135            return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeCount);
136        }
137        debug!(
138            "Using {desired_full_cone_private_node_count} for desired full cone private node count"
139        );
140
141        let desired_symmetric_private_node_count = options
142            .desired_symmetric_private_node_count
143            .unwrap_or(options.current_inventory.symmetric_private_node_count() as u16);
144        if desired_symmetric_private_node_count
145            < options.current_inventory.symmetric_private_node_count() as u16
146        {
147            return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeCount);
148        }
149        debug!(
150            "Using {desired_symmetric_private_node_count} for desired symmetric private node count"
151        );
152
153        let mut infra_run_options = InfraRunOptions::generate_existing(
154            &options.current_inventory.name,
155            &self.terraform_runner,
156            &options.current_inventory.environment_details,
157        )
158        .await?;
159        infra_run_options.peer_cache_node_vm_count = Some(desired_peer_cache_node_vm_count);
160        infra_run_options.node_vm_count = Some(desired_node_vm_count);
161        infra_run_options.full_cone_private_node_vm_count =
162            Some(desired_full_cone_private_node_vm_count);
163        infra_run_options.symmetric_private_node_vm_count =
164            Some(desired_symmetric_private_node_vm_count);
165        infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
166
167        if options.plan {
168            self.plan(&infra_run_options)?;
169            return Ok(());
170        }
171
172        self.create_or_update_infra(&infra_run_options)
173            .map_err(|err| {
174                println!("Failed to create infra {err:?}");
175                err
176            })?;
177
178        if options.infra_only {
179            return Ok(());
180        }
181
182        let mut provision_options = ProvisionOptions {
183            binary_option: options.current_inventory.binary_option.clone(),
184            chunk_size: None,
185            client_env_variables: None,
186            downloaders_count: 0,
187            enable_telegraf: true,
188            evm_network: options
189                .current_inventory
190                .environment_details
191                .evm_details
192                .network
193                .clone(),
194            evm_data_payments_address: options
195                .current_inventory
196                .environment_details
197                .evm_details
198                .data_payments_address
199                .clone(),
200            evm_payment_token_address: options
201                .current_inventory
202                .environment_details
203                .evm_details
204                .payment_token_address
205                .clone(),
206            evm_rpc_url: options
207                .current_inventory
208                .environment_details
209                .evm_details
210                .rpc_url
211                .clone(),
212            full_cone_private_node_count: desired_full_cone_private_node_count,
213            funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
214            interval: Some(options.interval),
215            log_format: None,
216            name: options.current_inventory.name.clone(),
217            network_id: options.current_inventory.environment_details.network_id,
218            node_count: desired_node_count,
219            node_env_variables: None,
220            max_archived_log_files: options.max_archived_log_files,
221            max_log_files: options.max_log_files,
222            output_inventory_dir_path: self
223                .working_directory_path
224                .join("ansible")
225                .join("inventory"),
226            peer_cache_node_count: desired_peer_cache_node_count,
227            public_rpc: options.public_rpc,
228            rewards_address: options
229                .current_inventory
230                .environment_details
231                .rewards_address
232                .clone(),
233            symmetric_private_node_count: desired_symmetric_private_node_count,
234            ant_version: options.ant_version.clone(),
235            uploaders_count: options.desired_uploaders_count,
236            gas_amount: options.gas_amount,
237            token_amount: None,
238            wallet_secret_keys: None,
239            max_uploads: None,
240        };
241        let mut node_provision_failed = false;
242
243        let (initial_multiaddr, initial_ip_addr) = if is_bootstrap_deploy {
244            get_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client).map_err(
245                |err| {
246                    println!("Failed to get node multiaddr {err:?}");
247                    err
248                },
249            )?
250        } else {
251            get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
252                .map_err(|err| {
253                    println!("Failed to get genesis multiaddr {err:?}");
254                    err
255                })?
256        };
257        let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
258        debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
259
260        if !is_bootstrap_deploy {
261            self.wait_for_ssh_availability_on_new_machines(
262                AnsibleInventoryType::PeerCacheNodes,
263                &options.current_inventory,
264            )?;
265            self.ansible_provisioner
266                .print_ansible_run_banner("Provision Peer Cache Nodes");
267            match self.ansible_provisioner.provision_nodes(
268                &provision_options,
269                Some(initial_multiaddr.clone()),
270                Some(initial_network_contacts_url.clone()),
271                NodeType::PeerCache,
272            ) {
273                Ok(()) => {
274                    println!("Provisioned Peer Cache nodes");
275                }
276                Err(err) => {
277                    log::error!("Failed to provision Peer Cache nodes: {err}");
278                    node_provision_failed = true;
279                }
280            }
281        }
282
283        self.wait_for_ssh_availability_on_new_machines(
284            AnsibleInventoryType::Nodes,
285            &options.current_inventory,
286        )?;
287        self.ansible_provisioner
288            .print_ansible_run_banner("Provision Normal Nodes");
289        match self.ansible_provisioner.provision_nodes(
290            &provision_options,
291            Some(initial_multiaddr.clone()),
292            Some(initial_network_contacts_url.clone()),
293            NodeType::Generic,
294        ) {
295            Ok(()) => {
296                println!("Provisioned normal nodes");
297            }
298            Err(err) => {
299                log::error!("Failed to provision normal nodes: {err}");
300                node_provision_failed = true;
301            }
302        }
303
304        let private_node_inventory = PrivateNodeProvisionInventory::new(
305            &self.ansible_provisioner,
306            Some(desired_full_cone_private_node_vm_count),
307            Some(desired_symmetric_private_node_vm_count),
308        )?;
309
310        if private_node_inventory.should_provision_full_cone_private_nodes() {
311            let full_cone_nat_gateway_inventory = self
312                .ansible_provisioner
313                .ansible_runner
314                .get_inventory(AnsibleInventoryType::FullConeNatGateway, true)?;
315
316            let full_cone_nat_gateway_new_vms: Vec<_> = full_cone_nat_gateway_inventory
317                .into_iter()
318                .filter(|item| {
319                    !options
320                        .current_inventory
321                        .full_cone_nat_gateway_vms
322                        .contains(item)
323                })
324                .collect();
325
326            for vm in full_cone_nat_gateway_new_vms.iter() {
327                self.ssh_client.wait_for_ssh_availability(
328                    &vm.public_ip_addr,
329                    &self.cloud_provider.get_ssh_user(),
330                )?;
331            }
332
333            let full_cone_nat_gateway_new_vms = if full_cone_nat_gateway_new_vms.is_empty() {
334                None
335            } else {
336                debug!("Full Cone NAT Gateway new VMs: {full_cone_nat_gateway_new_vms:?}");
337                Some(full_cone_nat_gateway_new_vms)
338            };
339
340            match self.ansible_provisioner.provision_full_cone(
341                &provision_options,
342                Some(initial_multiaddr.clone()),
343                Some(initial_network_contacts_url.clone()),
344                private_node_inventory.clone(),
345                full_cone_nat_gateway_new_vms,
346            ) {
347                Ok(()) => {
348                    println!("Provisioned Full Cone nodes and Gateway");
349                }
350                Err(err) => {
351                    log::error!("Failed to provision Full Cone nodes and Gateway: {err}");
352                    node_provision_failed = true;
353                }
354            }
355        }
356
357        if private_node_inventory.should_provision_symmetric_private_nodes() {
358            self.wait_for_ssh_availability_on_new_machines(
359                AnsibleInventoryType::SymmetricNatGateway,
360                &options.current_inventory,
361            )?;
362            self.ansible_provisioner
363                .print_ansible_run_banner("Provision Symmetric NAT Gateway");
364            self.ansible_provisioner
365                .provision_symmetric_nat_gateway(&provision_options, &private_node_inventory)
366                .map_err(|err| {
367                    println!("Failed to provision symmetric NAT gateway {err:?}");
368                    err
369                })?;
370
371            self.wait_for_ssh_availability_on_new_machines(
372                AnsibleInventoryType::SymmetricPrivateNodes,
373                &options.current_inventory,
374            )?;
375            self.ansible_provisioner
376                .print_ansible_run_banner("Provision Symmetric Private Nodes");
377            match self.ansible_provisioner.provision_symmetric_private_nodes(
378                &mut provision_options,
379                Some(initial_multiaddr.clone()),
380                Some(initial_network_contacts_url.clone()),
381                &private_node_inventory,
382            ) {
383                Ok(()) => {
384                    println!("Provisioned symmetric private nodes");
385                }
386                Err(err) => {
387                    log::error!("Failed to provision symmetric private nodes: {err}");
388                    node_provision_failed = true;
389                }
390            }
391        }
392
393        let should_provision_uploaders = options.desired_uploaders_count.is_some()
394            || options.desired_uploader_vm_count.is_some();
395        if should_provision_uploaders {
396            self.wait_for_ssh_availability_on_new_machines(
397                AnsibleInventoryType::Uploaders,
398                &options.current_inventory,
399            )?;
400            let genesis_network_contacts = get_bootstrap_cache_url(&initial_ip_addr);
401            self.ansible_provisioner
402                .print_ansible_run_banner("Provision Uploaders");
403            self.ansible_provisioner
404                .provision_uploaders(
405                    &provision_options,
406                    Some(initial_multiaddr.clone()),
407                    Some(genesis_network_contacts.clone()),
408                )
409                .await
410                .map_err(|err| {
411                    println!("Failed to provision uploaders {err:?}");
412                    err
413                })?;
414        }
415
416        if node_provision_failed {
417            println!();
418            println!("{}", "WARNING!".yellow());
419            println!("Some nodes failed to provision without error.");
420            println!("This usually means a small number of nodes failed to start on a few VMs.");
421            println!("However, most of the time the deployment will still be usable.");
422            println!("See the output from Ansible to determine which VMs had failures.");
423        }
424
425        Ok(())
426    }
427
428    pub async fn upscale_uploaders(&self, options: &UpscaleOptions) -> Result<()> {
429        let is_bootstrap_deploy = matches!(
430            options
431                .current_inventory
432                .environment_details
433                .deployment_type,
434            DeploymentType::Bootstrap
435        );
436
437        if is_bootstrap_deploy {
438            return Err(Error::InvalidUploaderUpscaleDeploymentType(
439                "bootstrap".to_string(),
440            ));
441        }
442
443        let desired_uploader_vm_count = options
444            .desired_uploader_vm_count
445            .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
446        if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
447            return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
448        }
449        debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
450
451        let mut infra_run_options = InfraRunOptions::generate_existing(
452            &options.current_inventory.name,
453            &self.terraform_runner,
454            &options.current_inventory.environment_details,
455        )
456        .await?;
457        infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
458
459        if options.plan {
460            self.plan(&infra_run_options)?;
461            return Ok(());
462        }
463
464        if !options.provision_only {
465            self.create_or_update_infra(&infra_run_options)
466                .map_err(|err| {
467                    println!("Failed to create infra {err:?}");
468                    err
469                })?;
470        }
471
472        if options.infra_only {
473            return Ok(());
474        }
475
476        let (initial_multiaddr, initial_ip_addr) =
477            get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
478                .map_err(|err| {
479                    println!("Failed to get genesis multiaddr {err:?}");
480                    err
481                })?;
482        let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
483        debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
484
485        let provision_options = ProvisionOptions {
486            ant_version: options.ant_version.clone(),
487            binary_option: options.current_inventory.binary_option.clone(),
488            chunk_size: None,
489            client_env_variables: None,
490            downloaders_count: 0,
491            enable_telegraf: true,
492            evm_data_payments_address: options
493                .current_inventory
494                .environment_details
495                .evm_details
496                .data_payments_address
497                .clone(),
498            evm_network: options
499                .current_inventory
500                .environment_details
501                .evm_details
502                .network
503                .clone(),
504            evm_payment_token_address: options
505                .current_inventory
506                .environment_details
507                .evm_details
508                .payment_token_address
509                .clone(),
510            evm_rpc_url: options
511                .current_inventory
512                .environment_details
513                .evm_details
514                .rpc_url
515                .clone(),
516            full_cone_private_node_count: 0,
517            funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
518            gas_amount: options.gas_amount,
519            interval: Some(options.interval),
520            log_format: None,
521            max_archived_log_files: options.max_archived_log_files,
522            max_log_files: options.max_log_files,
523            max_uploads: None,
524            name: options.current_inventory.name.clone(),
525            network_id: options.current_inventory.environment_details.network_id,
526            node_count: 0,
527            node_env_variables: None,
528            output_inventory_dir_path: self
529                .working_directory_path
530                .join("ansible")
531                .join("inventory"),
532            peer_cache_node_count: 0,
533            public_rpc: options.public_rpc,
534            rewards_address: options
535                .current_inventory
536                .environment_details
537                .rewards_address
538                .clone(),
539            symmetric_private_node_count: 0,
540            token_amount: options.token_amount,
541            uploaders_count: options.desired_uploaders_count,
542            wallet_secret_keys: None,
543        };
544
545        self.wait_for_ssh_availability_on_new_machines(
546            AnsibleInventoryType::Uploaders,
547            &options.current_inventory,
548        )?;
549        self.ansible_provisioner
550            .print_ansible_run_banner("Provision Uploaders");
551        self.ansible_provisioner
552            .provision_uploaders(
553                &provision_options,
554                Some(initial_multiaddr),
555                Some(initial_network_contacts_url),
556            )
557            .await
558            .map_err(|err| {
559                println!("Failed to provision uploaders {err:?}");
560                err
561            })?;
562
563        Ok(())
564    }
565
566    fn wait_for_ssh_availability_on_new_machines(
567        &self,
568        inventory_type: AnsibleInventoryType,
569        current_inventory: &DeploymentInventory,
570    ) -> Result<()> {
571        let inventory = self
572            .ansible_provisioner
573            .ansible_runner
574            .get_inventory(inventory_type, true)?;
575        let old_set: HashSet<_> = match inventory_type {
576            AnsibleInventoryType::PeerCacheNodes => current_inventory
577                .peer_cache_node_vms
578                .iter()
579                .map(|node_vm| &node_vm.vm)
580                .cloned()
581                .collect(),
582            AnsibleInventoryType::Nodes => current_inventory
583                .node_vms
584                .iter()
585                .map(|node_vm| &node_vm.vm)
586                .cloned()
587                .collect(),
588            AnsibleInventoryType::Uploaders => current_inventory
589                .uploader_vms
590                .iter()
591                .map(|uploader_vm| &uploader_vm.vm)
592                .cloned()
593                .collect(),
594            AnsibleInventoryType::FullConeNatGateway => current_inventory
595                .full_cone_nat_gateway_vms
596                .iter()
597                .cloned()
598                .collect(),
599            AnsibleInventoryType::SymmetricNatGateway => current_inventory
600                .symmetric_nat_gateway_vms
601                .iter()
602                .cloned()
603                .collect(),
604            AnsibleInventoryType::FullConePrivateNodes => current_inventory
605                .full_cone_private_node_vms
606                .iter()
607                .map(|node_vm| &node_vm.vm)
608                .cloned()
609                .collect(),
610            AnsibleInventoryType::SymmetricPrivateNodes => current_inventory
611                .symmetric_private_node_vms
612                .iter()
613                .map(|node_vm| &node_vm.vm)
614                .cloned()
615                .collect(),
616            it => return Err(Error::UpscaleInventoryTypeNotSupported(it.to_string())),
617        };
618        let new_vms: Vec<_> = inventory
619            .into_iter()
620            .filter(|item| !old_set.contains(item))
621            .collect();
622        for vm in new_vms.iter() {
623            self.ssh_client.wait_for_ssh_availability(
624                &vm.public_ip_addr,
625                &self.cloud_provider.get_ssh_user(),
626            )?;
627        }
628        Ok(())
629    }
630}