1use crate::{
8 ansible::{
9 inventory::AnsibleInventoryType,
10 provisioning::{PrivateNodeProvisionInventory, ProvisionOptions},
11 },
12 error::{Error, Result},
13 get_anvil_node_data, get_bootstrap_cache_url, get_genesis_multiaddr, get_multiaddr,
14 DeploymentInventory, DeploymentType, EvmNetwork, InfraRunOptions, NodeType, TestnetDeployer,
15};
16use colored::Colorize;
17use evmlib::common::U256;
18use log::debug;
19use std::{collections::HashSet, time::Duration};
20
21#[derive(Clone)]
22pub struct UpscaleOptions {
23 pub ansible_verbose: bool,
24 pub ant_version: Option<String>,
25 pub current_inventory: DeploymentInventory,
26 pub desired_client_vm_count: Option<u16>,
27 pub desired_full_cone_private_node_count: Option<u16>,
28 pub desired_full_cone_private_node_vm_count: Option<u16>,
29 pub desired_node_count: Option<u16>,
30 pub desired_node_vm_count: Option<u16>,
31 pub desired_peer_cache_node_count: Option<u16>,
32 pub desired_peer_cache_node_vm_count: Option<u16>,
33 pub desired_symmetric_private_node_count: Option<u16>,
34 pub desired_symmetric_private_node_vm_count: Option<u16>,
35 pub desired_uploaders_count: Option<u16>,
36 pub enable_downloaders: bool,
37 pub funding_wallet_secret_key: Option<String>,
38 pub gas_amount: Option<U256>,
39 pub interval: Duration,
40 pub infra_only: bool,
41 pub max_archived_log_files: u16,
42 pub max_log_files: u16,
43 pub plan: bool,
44 pub public_rpc: bool,
45 pub provision_only: bool,
46 pub token_amount: Option<U256>,
47}
48
49impl TestnetDeployer {
50 pub async fn upscale(&self, options: &UpscaleOptions) -> Result<()> {
51 let is_bootstrap_deploy = matches!(
52 options
53 .current_inventory
54 .environment_details
55 .deployment_type,
56 DeploymentType::Bootstrap
57 );
58
59 if is_bootstrap_deploy
60 && (options.desired_peer_cache_node_count.is_some()
61 || options.desired_peer_cache_node_vm_count.is_some()
62 || options.desired_client_vm_count.is_some())
63 {
64 return Err(Error::InvalidUpscaleOptionsForBootstrapDeployment);
65 }
66
67 let desired_peer_cache_node_vm_count = options
68 .desired_peer_cache_node_vm_count
69 .unwrap_or(options.current_inventory.peer_cache_node_vms.len() as u16);
70 if desired_peer_cache_node_vm_count
71 < options.current_inventory.peer_cache_node_vms.len() as u16
72 {
73 return Err(Error::InvalidUpscaleDesiredPeerCacheVmCount);
74 }
75 debug!("Using {desired_peer_cache_node_vm_count} for desired Peer Cache node VM count");
76
77 let desired_node_vm_count = options
78 .desired_node_vm_count
79 .unwrap_or(options.current_inventory.node_vms.len() as u16);
80 if desired_node_vm_count < options.current_inventory.node_vms.len() as u16 {
81 return Err(Error::InvalidUpscaleDesiredNodeVmCount);
82 }
83 debug!("Using {desired_node_vm_count} for desired node VM count");
84
85 let desired_full_cone_private_node_vm_count = options
86 .desired_full_cone_private_node_vm_count
87 .unwrap_or(options.current_inventory.full_cone_private_node_vms.len() as u16);
88 if desired_full_cone_private_node_vm_count
89 < options.current_inventory.full_cone_private_node_vms.len() as u16
90 {
91 return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeVmCount);
92 }
93 debug!("Using {desired_full_cone_private_node_vm_count} for desired full cone private node VM count");
94
95 let desired_symmetric_private_node_vm_count = options
96 .desired_symmetric_private_node_vm_count
97 .unwrap_or(options.current_inventory.symmetric_private_node_vms.len() as u16);
98 if desired_symmetric_private_node_vm_count
99 < options.current_inventory.symmetric_private_node_vms.len() as u16
100 {
101 return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeVmCount);
102 }
103 debug!("Using {desired_symmetric_private_node_vm_count} for desired full cone private node VM count");
104
105 let desired_client_vm_count = options
106 .desired_client_vm_count
107 .unwrap_or(options.current_inventory.client_vms.len() as u16);
108 if desired_client_vm_count < options.current_inventory.client_vms.len() as u16 {
109 return Err(Error::InvalidUpscaleDesiredClientVmCount);
110 }
111 debug!("Using {desired_client_vm_count} for desired Client VM count");
112
113 let desired_peer_cache_node_count = options
114 .desired_peer_cache_node_count
115 .unwrap_or(options.current_inventory.peer_cache_node_count() as u16);
116 if desired_peer_cache_node_count < options.current_inventory.peer_cache_node_count() as u16
117 {
118 return Err(Error::InvalidUpscaleDesiredPeerCacheNodeCount);
119 }
120 debug!("Using {desired_peer_cache_node_count} for desired peer cache node count");
121
122 let desired_node_count = options
123 .desired_node_count
124 .unwrap_or(options.current_inventory.node_count() as u16);
125 if desired_node_count < options.current_inventory.node_count() as u16 {
126 return Err(Error::InvalidUpscaleDesiredNodeCount);
127 }
128 debug!("Using {desired_node_count} for desired node count");
129
130 let desired_full_cone_private_node_count = options
131 .desired_full_cone_private_node_count
132 .unwrap_or(options.current_inventory.full_cone_private_node_count() as u16);
133 if desired_full_cone_private_node_count
134 < options.current_inventory.full_cone_private_node_count() as u16
135 {
136 return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeCount);
137 }
138 debug!(
139 "Using {desired_full_cone_private_node_count} for desired full cone private node count"
140 );
141
142 let desired_symmetric_private_node_count = options
143 .desired_symmetric_private_node_count
144 .unwrap_or(options.current_inventory.symmetric_private_node_count() as u16);
145 if desired_symmetric_private_node_count
146 < options.current_inventory.symmetric_private_node_count() as u16
147 {
148 return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeCount);
149 }
150 debug!(
151 "Using {desired_symmetric_private_node_count} for desired symmetric private node count"
152 );
153
154 let mut infra_run_options = InfraRunOptions::generate_existing(
155 &options.current_inventory.name,
156 &options.current_inventory.environment_details.region,
157 &self.terraform_runner,
158 Some(&options.current_inventory.environment_details),
159 )
160 .await?;
161 infra_run_options.peer_cache_node_vm_count = Some(desired_peer_cache_node_vm_count);
162 infra_run_options.node_vm_count = Some(desired_node_vm_count);
163 infra_run_options.full_cone_private_node_vm_count =
164 Some(desired_full_cone_private_node_vm_count);
165 infra_run_options.symmetric_private_node_vm_count =
166 Some(desired_symmetric_private_node_vm_count);
167 infra_run_options.client_vm_count = Some(desired_client_vm_count);
168
169 if options.plan {
170 self.plan(&infra_run_options)?;
171 return Ok(());
172 }
173
174 self.create_or_update_infra(&infra_run_options)
175 .map_err(|err| {
176 println!("Failed to create infra {err:?}");
177 err
178 })?;
179
180 if options.infra_only {
181 return Ok(());
182 }
183
184 let mut provision_options = ProvisionOptions {
185 binary_option: options.current_inventory.binary_option.clone(),
186 chunk_size: None,
187 client_env_variables: None,
188 enable_downloaders: options.enable_downloaders,
189 enable_telegraf: true,
190 evm_network: options
191 .current_inventory
192 .environment_details
193 .evm_details
194 .network
195 .clone(),
196 evm_data_payments_address: options
197 .current_inventory
198 .environment_details
199 .evm_details
200 .data_payments_address
201 .clone(),
202 evm_payment_token_address: options
203 .current_inventory
204 .environment_details
205 .evm_details
206 .payment_token_address
207 .clone(),
208 evm_rpc_url: options
209 .current_inventory
210 .environment_details
211 .evm_details
212 .rpc_url
213 .clone(),
214 full_cone_private_node_count: desired_full_cone_private_node_count,
215 funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
216 interval: Some(options.interval),
217 log_format: None,
218 name: options.current_inventory.name.clone(),
219 network_id: options.current_inventory.environment_details.network_id,
220 node_count: desired_node_count,
221 node_env_variables: None,
222 max_archived_log_files: options.max_archived_log_files,
223 max_log_files: options.max_log_files,
224 output_inventory_dir_path: self
225 .working_directory_path
226 .join("ansible")
227 .join("inventory"),
228 peer_cache_node_count: desired_peer_cache_node_count,
229 public_rpc: options.public_rpc,
230 rewards_address: options
231 .current_inventory
232 .environment_details
233 .rewards_address
234 .clone(),
235 symmetric_private_node_count: desired_symmetric_private_node_count,
236 ant_version: options.ant_version.clone(),
237 uploaders_count: options.desired_uploaders_count,
238 gas_amount: options.gas_amount,
239 token_amount: None,
240 wallet_secret_keys: None,
241 max_uploads: None,
242 };
243 let mut node_provision_failed = false;
244
245 let (initial_multiaddr, initial_ip_addr) = if is_bootstrap_deploy {
246 get_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client).map_err(
247 |err| {
248 println!("Failed to get node multiaddr {err:?}");
249 err
250 },
251 )?
252 } else {
253 get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
254 .map_err(|err| {
255 println!("Failed to get genesis multiaddr {err:?}");
256 err
257 })?
258 };
259 let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
260 debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
261
262 if !is_bootstrap_deploy {
263 self.wait_for_ssh_availability_on_new_machines(
264 AnsibleInventoryType::PeerCacheNodes,
265 &options.current_inventory,
266 )?;
267 self.ansible_provisioner
268 .print_ansible_run_banner("Provision Peer Cache Nodes");
269 match self.ansible_provisioner.provision_nodes(
270 &provision_options,
271 Some(initial_multiaddr.clone()),
272 Some(initial_network_contacts_url.clone()),
273 NodeType::PeerCache,
274 ) {
275 Ok(()) => {
276 println!("Provisioned Peer Cache nodes");
277 }
278 Err(err) => {
279 log::error!("Failed to provision Peer Cache nodes: {err}");
280 node_provision_failed = true;
281 }
282 }
283 }
284
285 self.wait_for_ssh_availability_on_new_machines(
286 AnsibleInventoryType::Nodes,
287 &options.current_inventory,
288 )?;
289 self.ansible_provisioner
290 .print_ansible_run_banner("Provision Normal Nodes");
291 match self.ansible_provisioner.provision_nodes(
292 &provision_options,
293 Some(initial_multiaddr.clone()),
294 Some(initial_network_contacts_url.clone()),
295 NodeType::Generic,
296 ) {
297 Ok(()) => {
298 println!("Provisioned normal nodes");
299 }
300 Err(err) => {
301 log::error!("Failed to provision normal nodes: {err}");
302 node_provision_failed = true;
303 }
304 }
305
306 let private_node_inventory = PrivateNodeProvisionInventory::new(
307 &self.ansible_provisioner,
308 Some(desired_full_cone_private_node_vm_count),
309 Some(desired_symmetric_private_node_vm_count),
310 )?;
311
312 if private_node_inventory.should_provision_full_cone_private_nodes() {
313 let full_cone_nat_gateway_inventory = self
314 .ansible_provisioner
315 .ansible_runner
316 .get_inventory(AnsibleInventoryType::FullConeNatGateway, true)?;
317
318 let full_cone_nat_gateway_new_vms: Vec<_> = full_cone_nat_gateway_inventory
319 .into_iter()
320 .filter(|item| {
321 !options
322 .current_inventory
323 .full_cone_nat_gateway_vms
324 .contains(item)
325 })
326 .collect();
327
328 for vm in full_cone_nat_gateway_new_vms.iter() {
329 self.ssh_client.wait_for_ssh_availability(
330 &vm.public_ip_addr,
331 &self.cloud_provider.get_ssh_user(),
332 )?;
333 }
334
335 let full_cone_nat_gateway_new_vms = if full_cone_nat_gateway_new_vms.is_empty() {
336 None
337 } else {
338 debug!("Full Cone NAT Gateway new VMs: {full_cone_nat_gateway_new_vms:?}");
339 Some(full_cone_nat_gateway_new_vms)
340 };
341
342 match self.ansible_provisioner.provision_full_cone(
343 &provision_options,
344 Some(initial_multiaddr.clone()),
345 Some(initial_network_contacts_url.clone()),
346 private_node_inventory.clone(),
347 full_cone_nat_gateway_new_vms,
348 ) {
349 Ok(()) => {
350 println!("Provisioned Full Cone nodes and Gateway");
351 }
352 Err(err) => {
353 log::error!("Failed to provision Full Cone nodes and Gateway: {err}");
354 node_provision_failed = true;
355 }
356 }
357 }
358
359 if private_node_inventory.should_provision_symmetric_private_nodes() {
360 self.wait_for_ssh_availability_on_new_machines(
361 AnsibleInventoryType::SymmetricNatGateway,
362 &options.current_inventory,
363 )?;
364 self.ansible_provisioner
365 .print_ansible_run_banner("Provision Symmetric NAT Gateway");
366 self.ansible_provisioner
367 .provision_symmetric_nat_gateway(&provision_options, &private_node_inventory)
368 .map_err(|err| {
369 println!("Failed to provision symmetric NAT gateway {err:?}");
370 err
371 })?;
372
373 self.wait_for_ssh_availability_on_new_machines(
374 AnsibleInventoryType::SymmetricPrivateNodes,
375 &options.current_inventory,
376 )?;
377 self.ansible_provisioner
378 .print_ansible_run_banner("Provision Symmetric Private Nodes");
379 match self.ansible_provisioner.provision_symmetric_private_nodes(
380 &mut provision_options,
381 Some(initial_multiaddr.clone()),
382 Some(initial_network_contacts_url.clone()),
383 &private_node_inventory,
384 ) {
385 Ok(()) => {
386 println!("Provisioned symmetric private nodes");
387 }
388 Err(err) => {
389 log::error!("Failed to provision symmetric private nodes: {err}");
390 node_provision_failed = true;
391 }
392 }
393 }
394
395 let should_provision_uploaders =
396 options.desired_uploaders_count.is_some() || options.desired_client_vm_count.is_some();
397 if should_provision_uploaders {
398 if provision_options.evm_network == EvmNetwork::Anvil {
400 let anvil_node_data =
401 get_anvil_node_data(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
402 .map_err(|err| {
403 println!("Failed to get evm testnet data {err:?}");
404 err
405 })?;
406
407 provision_options.funding_wallet_secret_key =
408 Some(anvil_node_data.deployer_wallet_private_key);
409 }
410
411 self.wait_for_ssh_availability_on_new_machines(
412 AnsibleInventoryType::Clients,
413 &options.current_inventory,
414 )?;
415 let genesis_network_contacts = get_bootstrap_cache_url(&initial_ip_addr);
416 self.ansible_provisioner
417 .print_ansible_run_banner("Provision Clients");
418 self.ansible_provisioner
419 .provision_clients(
420 &provision_options,
421 Some(initial_multiaddr.clone()),
422 Some(genesis_network_contacts.clone()),
423 )
424 .await
425 .map_err(|err| {
426 println!("Failed to provision Clients {err:?}");
427 err
428 })?;
429 }
430
431 if node_provision_failed {
432 println!();
433 println!("{}", "WARNING!".yellow());
434 println!("Some nodes failed to provision without error.");
435 println!("This usually means a small number of nodes failed to start on a few VMs.");
436 println!("However, most of the time the deployment will still be usable.");
437 println!("See the output from Ansible to determine which VMs had failures.");
438 }
439
440 Ok(())
441 }
442
443 pub async fn upscale_clients(&self, options: &UpscaleOptions) -> Result<()> {
444 let is_bootstrap_deploy = matches!(
445 options
446 .current_inventory
447 .environment_details
448 .deployment_type,
449 DeploymentType::Bootstrap
450 );
451
452 if is_bootstrap_deploy {
453 return Err(Error::InvalidClientUpscaleDeploymentType(
454 "bootstrap".to_string(),
455 ));
456 }
457
458 let desired_client_vm_count = options
459 .desired_client_vm_count
460 .unwrap_or(options.current_inventory.client_vms.len() as u16);
461 if desired_client_vm_count < options.current_inventory.client_vms.len() as u16 {
462 return Err(Error::InvalidUpscaleDesiredClientVmCount);
463 }
464 debug!("Using {desired_client_vm_count} for desired Client VM count");
465
466 let mut infra_run_options = InfraRunOptions::generate_existing(
467 &options.current_inventory.name,
468 &options.current_inventory.environment_details.region,
469 &self.terraform_runner,
470 Some(&options.current_inventory.environment_details),
471 )
472 .await?;
473 infra_run_options.client_vm_count = Some(desired_client_vm_count);
474
475 if options.plan {
476 self.plan(&infra_run_options)?;
477 return Ok(());
478 }
479
480 if !options.provision_only {
481 self.create_or_update_infra(&infra_run_options)
482 .map_err(|err| {
483 println!("Failed to create infra {err:?}");
484 err
485 })?;
486 }
487
488 if options.infra_only {
489 return Ok(());
490 }
491
492 let (initial_multiaddr, initial_ip_addr) =
493 get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
494 .map_err(|err| {
495 println!("Failed to get genesis multiaddr {err:?}");
496 err
497 })?;
498 let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
499 debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
500
501 let provision_options = ProvisionOptions {
502 ant_version: options.ant_version.clone(),
503 binary_option: options.current_inventory.binary_option.clone(),
504 chunk_size: None,
505 client_env_variables: None,
506 enable_downloaders: options.enable_downloaders,
507 enable_telegraf: true,
508 evm_data_payments_address: options
509 .current_inventory
510 .environment_details
511 .evm_details
512 .data_payments_address
513 .clone(),
514 evm_network: options
515 .current_inventory
516 .environment_details
517 .evm_details
518 .network
519 .clone(),
520 evm_payment_token_address: options
521 .current_inventory
522 .environment_details
523 .evm_details
524 .payment_token_address
525 .clone(),
526 evm_rpc_url: options
527 .current_inventory
528 .environment_details
529 .evm_details
530 .rpc_url
531 .clone(),
532 full_cone_private_node_count: 0,
533 funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
534 gas_amount: options.gas_amount,
535 interval: Some(options.interval),
536 log_format: None,
537 max_archived_log_files: options.max_archived_log_files,
538 max_log_files: options.max_log_files,
539 max_uploads: None,
540 name: options.current_inventory.name.clone(),
541 network_id: options.current_inventory.environment_details.network_id,
542 node_count: 0,
543 node_env_variables: None,
544 output_inventory_dir_path: self
545 .working_directory_path
546 .join("ansible")
547 .join("inventory"),
548 peer_cache_node_count: 0,
549 public_rpc: options.public_rpc,
550 rewards_address: options
551 .current_inventory
552 .environment_details
553 .rewards_address
554 .clone(),
555 symmetric_private_node_count: 0,
556 token_amount: options.token_amount,
557 uploaders_count: options.desired_uploaders_count,
558 wallet_secret_keys: None,
559 };
560
561 self.wait_for_ssh_availability_on_new_machines(
562 AnsibleInventoryType::Clients,
563 &options.current_inventory,
564 )?;
565 self.ansible_provisioner
566 .print_ansible_run_banner("Provision Clients");
567 self.ansible_provisioner
568 .provision_clients(
569 &provision_options,
570 Some(initial_multiaddr),
571 Some(initial_network_contacts_url),
572 )
573 .await
574 .map_err(|err| {
575 println!("Failed to provision clients {err:?}");
576 err
577 })?;
578
579 Ok(())
580 }
581
582 fn wait_for_ssh_availability_on_new_machines(
583 &self,
584 inventory_type: AnsibleInventoryType,
585 current_inventory: &DeploymentInventory,
586 ) -> Result<()> {
587 let inventory = self
588 .ansible_provisioner
589 .ansible_runner
590 .get_inventory(inventory_type, true)?;
591 let old_set: HashSet<_> = match inventory_type {
592 AnsibleInventoryType::Clients => current_inventory
593 .client_vms
594 .iter()
595 .map(|client_vm| &client_vm.vm)
596 .cloned()
597 .collect(),
598 AnsibleInventoryType::PeerCacheNodes => current_inventory
599 .peer_cache_node_vms
600 .iter()
601 .map(|node_vm| &node_vm.vm)
602 .cloned()
603 .collect(),
604 AnsibleInventoryType::Nodes => current_inventory
605 .node_vms
606 .iter()
607 .map(|node_vm| &node_vm.vm)
608 .cloned()
609 .collect(),
610 AnsibleInventoryType::FullConeNatGateway => current_inventory
611 .full_cone_nat_gateway_vms
612 .iter()
613 .cloned()
614 .collect(),
615 AnsibleInventoryType::SymmetricNatGateway => current_inventory
616 .symmetric_nat_gateway_vms
617 .iter()
618 .cloned()
619 .collect(),
620 AnsibleInventoryType::FullConePrivateNodes => current_inventory
621 .full_cone_private_node_vms
622 .iter()
623 .map(|node_vm| &node_vm.vm)
624 .cloned()
625 .collect(),
626 AnsibleInventoryType::SymmetricPrivateNodes => current_inventory
627 .symmetric_private_node_vms
628 .iter()
629 .map(|node_vm| &node_vm.vm)
630 .cloned()
631 .collect(),
632 it => return Err(Error::UpscaleInventoryTypeNotSupported(it.to_string())),
633 };
634 let new_vms: Vec<_> = inventory
635 .into_iter()
636 .filter(|item| !old_set.contains(item))
637 .collect();
638 for vm in new_vms.iter() {
639 self.ssh_client.wait_for_ssh_availability(
640 &vm.public_ip_addr,
641 &self.cloud_provider.get_ssh_user(),
642 )?;
643 }
644 Ok(())
645 }
646}