1use crate::{
8 ansible::{
9 inventory::AnsibleInventoryType,
10 provisioning::{PrivateNodeProvisionInventory, ProvisionOptions},
11 },
12 error::{Error, Result},
13 get_bootstrap_cache_url, get_genesis_multiaddr, get_multiaddr, DeploymentInventory,
14 DeploymentType, InfraRunOptions, NodeType, TestnetDeployer,
15};
16use colored::Colorize;
17use evmlib::common::U256;
18use log::debug;
19use std::{collections::HashSet, time::Duration};
20
21#[derive(Clone)]
22pub struct UpscaleOptions {
23 pub ansible_verbose: bool,
24 pub ant_version: Option<String>,
25 pub current_inventory: DeploymentInventory,
26 pub desired_full_cone_private_node_count: Option<u16>,
27 pub desired_full_cone_private_node_vm_count: Option<u16>,
28 pub desired_node_count: Option<u16>,
29 pub desired_node_vm_count: Option<u16>,
30 pub desired_peer_cache_node_count: Option<u16>,
31 pub desired_peer_cache_node_vm_count: Option<u16>,
32 pub desired_symmetric_private_node_count: Option<u16>,
33 pub desired_symmetric_private_node_vm_count: Option<u16>,
34 pub desired_uploader_vm_count: Option<u16>,
35 pub desired_uploaders_count: Option<u16>,
36 pub funding_wallet_secret_key: Option<String>,
37 pub gas_amount: Option<U256>,
38 pub interval: Duration,
39 pub infra_only: bool,
40 pub max_archived_log_files: u16,
41 pub max_log_files: u16,
42 pub plan: bool,
43 pub public_rpc: bool,
44 pub provision_only: bool,
45 pub token_amount: Option<U256>,
46}
47
48impl TestnetDeployer {
49 pub async fn upscale(&self, options: &UpscaleOptions) -> Result<()> {
50 let is_bootstrap_deploy = matches!(
51 options
52 .current_inventory
53 .environment_details
54 .deployment_type,
55 DeploymentType::Bootstrap
56 );
57
58 if is_bootstrap_deploy
59 && (options.desired_peer_cache_node_count.is_some()
60 || options.desired_peer_cache_node_vm_count.is_some()
61 || options.desired_uploader_vm_count.is_some())
62 {
63 return Err(Error::InvalidUpscaleOptionsForBootstrapDeployment);
64 }
65
66 let desired_peer_cache_node_vm_count = options
67 .desired_peer_cache_node_vm_count
68 .unwrap_or(options.current_inventory.peer_cache_node_vms.len() as u16);
69 if desired_peer_cache_node_vm_count
70 < options.current_inventory.peer_cache_node_vms.len() as u16
71 {
72 return Err(Error::InvalidUpscaleDesiredPeerCacheVmCount);
73 }
74 debug!("Using {desired_peer_cache_node_vm_count} for desired Peer Cache node VM count");
75
76 let desired_node_vm_count = options
77 .desired_node_vm_count
78 .unwrap_or(options.current_inventory.node_vms.len() as u16);
79 if desired_node_vm_count < options.current_inventory.node_vms.len() as u16 {
80 return Err(Error::InvalidUpscaleDesiredNodeVmCount);
81 }
82 debug!("Using {desired_node_vm_count} for desired node VM count");
83
84 let desired_full_cone_private_node_vm_count = options
85 .desired_full_cone_private_node_vm_count
86 .unwrap_or(options.current_inventory.full_cone_private_node_vms.len() as u16);
87 if desired_full_cone_private_node_vm_count
88 < options.current_inventory.full_cone_private_node_vms.len() as u16
89 {
90 return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeVmCount);
91 }
92 debug!("Using {desired_full_cone_private_node_vm_count} for desired full cone private node VM count");
93
94 let desired_symmetric_private_node_vm_count = options
95 .desired_symmetric_private_node_vm_count
96 .unwrap_or(options.current_inventory.symmetric_private_node_vms.len() as u16);
97 if desired_symmetric_private_node_vm_count
98 < options.current_inventory.symmetric_private_node_vms.len() as u16
99 {
100 return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeVmCount);
101 }
102 debug!("Using {desired_symmetric_private_node_vm_count} for desired full cone private node VM count");
103
104 let desired_uploader_vm_count = options
105 .desired_uploader_vm_count
106 .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
107 if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
108 return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
109 }
110 debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
111
112 let desired_peer_cache_node_count = options
113 .desired_peer_cache_node_count
114 .unwrap_or(options.current_inventory.peer_cache_node_count() as u16);
115 if desired_peer_cache_node_count < options.current_inventory.peer_cache_node_count() as u16
116 {
117 return Err(Error::InvalidUpscaleDesiredPeerCacheNodeCount);
118 }
119 debug!("Using {desired_peer_cache_node_count} for desired peer cache node count");
120
121 let desired_node_count = options
122 .desired_node_count
123 .unwrap_or(options.current_inventory.node_count() as u16);
124 if desired_node_count < options.current_inventory.node_count() as u16 {
125 return Err(Error::InvalidUpscaleDesiredNodeCount);
126 }
127 debug!("Using {desired_node_count} for desired node count");
128
129 let desired_full_cone_private_node_count = options
130 .desired_full_cone_private_node_count
131 .unwrap_or(options.current_inventory.full_cone_private_node_count() as u16);
132 if desired_full_cone_private_node_count
133 < options.current_inventory.full_cone_private_node_count() as u16
134 {
135 return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeCount);
136 }
137 debug!(
138 "Using {desired_full_cone_private_node_count} for desired full cone private node count"
139 );
140
141 let desired_symmetric_private_node_count = options
142 .desired_symmetric_private_node_count
143 .unwrap_or(options.current_inventory.symmetric_private_node_count() as u16);
144 if desired_symmetric_private_node_count
145 < options.current_inventory.symmetric_private_node_count() as u16
146 {
147 return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeCount);
148 }
149 debug!(
150 "Using {desired_symmetric_private_node_count} for desired symmetric private node count"
151 );
152
153 let mut infra_run_options = InfraRunOptions::generate_existing(
154 &options.current_inventory.name,
155 &self.terraform_runner,
156 Some(&options.current_inventory.environment_details),
157 )
158 .await?;
159 infra_run_options.peer_cache_node_vm_count = Some(desired_peer_cache_node_vm_count);
160 infra_run_options.node_vm_count = Some(desired_node_vm_count);
161 infra_run_options.full_cone_private_node_vm_count =
162 Some(desired_full_cone_private_node_vm_count);
163 infra_run_options.symmetric_private_node_vm_count =
164 Some(desired_symmetric_private_node_vm_count);
165 infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
166
167 if options.plan {
168 self.plan(&infra_run_options)?;
169 return Ok(());
170 }
171
172 self.create_or_update_infra(&infra_run_options)
173 .map_err(|err| {
174 println!("Failed to create infra {err:?}");
175 err
176 })?;
177
178 if options.infra_only {
179 return Ok(());
180 }
181
182 let mut provision_options = ProvisionOptions {
183 binary_option: options.current_inventory.binary_option.clone(),
184 chunk_size: None,
185 client_env_variables: None,
186 downloaders_count: 0,
187 enable_telegraf: true,
188 evm_network: options
189 .current_inventory
190 .environment_details
191 .evm_details
192 .network
193 .clone(),
194 evm_data_payments_address: options
195 .current_inventory
196 .environment_details
197 .evm_details
198 .data_payments_address
199 .clone(),
200 evm_payment_token_address: options
201 .current_inventory
202 .environment_details
203 .evm_details
204 .payment_token_address
205 .clone(),
206 evm_rpc_url: options
207 .current_inventory
208 .environment_details
209 .evm_details
210 .rpc_url
211 .clone(),
212 full_cone_private_node_count: desired_full_cone_private_node_count,
213 funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
214 interval: Some(options.interval),
215 log_format: None,
216 name: options.current_inventory.name.clone(),
217 network_id: options.current_inventory.environment_details.network_id,
218 node_count: desired_node_count,
219 node_env_variables: None,
220 max_archived_log_files: options.max_archived_log_files,
221 max_log_files: options.max_log_files,
222 output_inventory_dir_path: self
223 .working_directory_path
224 .join("ansible")
225 .join("inventory"),
226 peer_cache_node_count: desired_peer_cache_node_count,
227 public_rpc: options.public_rpc,
228 rewards_address: options
229 .current_inventory
230 .environment_details
231 .rewards_address
232 .clone(),
233 symmetric_private_node_count: desired_symmetric_private_node_count,
234 ant_version: options.ant_version.clone(),
235 uploaders_count: options.desired_uploaders_count,
236 gas_amount: options.gas_amount,
237 token_amount: None,
238 wallet_secret_keys: None,
239 max_uploads: None,
240 };
241 let mut node_provision_failed = false;
242
243 let (initial_multiaddr, initial_ip_addr) = if is_bootstrap_deploy {
244 get_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client).map_err(
245 |err| {
246 println!("Failed to get node multiaddr {err:?}");
247 err
248 },
249 )?
250 } else {
251 get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
252 .map_err(|err| {
253 println!("Failed to get genesis multiaddr {err:?}");
254 err
255 })?
256 };
257 let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
258 debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
259
260 if !is_bootstrap_deploy {
261 self.wait_for_ssh_availability_on_new_machines(
262 AnsibleInventoryType::PeerCacheNodes,
263 &options.current_inventory,
264 )?;
265 self.ansible_provisioner
266 .print_ansible_run_banner("Provision Peer Cache Nodes");
267 match self.ansible_provisioner.provision_nodes(
268 &provision_options,
269 Some(initial_multiaddr.clone()),
270 Some(initial_network_contacts_url.clone()),
271 NodeType::PeerCache,
272 ) {
273 Ok(()) => {
274 println!("Provisioned Peer Cache nodes");
275 }
276 Err(err) => {
277 log::error!("Failed to provision Peer Cache nodes: {err}");
278 node_provision_failed = true;
279 }
280 }
281 }
282
283 self.wait_for_ssh_availability_on_new_machines(
284 AnsibleInventoryType::Nodes,
285 &options.current_inventory,
286 )?;
287 self.ansible_provisioner
288 .print_ansible_run_banner("Provision Normal Nodes");
289 match self.ansible_provisioner.provision_nodes(
290 &provision_options,
291 Some(initial_multiaddr.clone()),
292 Some(initial_network_contacts_url.clone()),
293 NodeType::Generic,
294 ) {
295 Ok(()) => {
296 println!("Provisioned normal nodes");
297 }
298 Err(err) => {
299 log::error!("Failed to provision normal nodes: {err}");
300 node_provision_failed = true;
301 }
302 }
303
304 let private_node_inventory = PrivateNodeProvisionInventory::new(
305 &self.ansible_provisioner,
306 Some(desired_full_cone_private_node_vm_count),
307 Some(desired_symmetric_private_node_vm_count),
308 )?;
309
310 if private_node_inventory.should_provision_full_cone_private_nodes() {
311 let full_cone_nat_gateway_inventory = self
312 .ansible_provisioner
313 .ansible_runner
314 .get_inventory(AnsibleInventoryType::FullConeNatGateway, true)?;
315
316 let full_cone_nat_gateway_new_vms: Vec<_> = full_cone_nat_gateway_inventory
317 .into_iter()
318 .filter(|item| {
319 !options
320 .current_inventory
321 .full_cone_nat_gateway_vms
322 .contains(item)
323 })
324 .collect();
325
326 for vm in full_cone_nat_gateway_new_vms.iter() {
327 self.ssh_client.wait_for_ssh_availability(
328 &vm.public_ip_addr,
329 &self.cloud_provider.get_ssh_user(),
330 )?;
331 }
332
333 let full_cone_nat_gateway_new_vms = if full_cone_nat_gateway_new_vms.is_empty() {
334 None
335 } else {
336 debug!("Full Cone NAT Gateway new VMs: {full_cone_nat_gateway_new_vms:?}");
337 Some(full_cone_nat_gateway_new_vms)
338 };
339
340 match self.ansible_provisioner.provision_full_cone(
341 &provision_options,
342 Some(initial_multiaddr.clone()),
343 Some(initial_network_contacts_url.clone()),
344 private_node_inventory.clone(),
345 full_cone_nat_gateway_new_vms,
346 ) {
347 Ok(()) => {
348 println!("Provisioned Full Cone nodes and Gateway");
349 }
350 Err(err) => {
351 log::error!("Failed to provision Full Cone nodes and Gateway: {err}");
352 node_provision_failed = true;
353 }
354 }
355 }
356
357 if private_node_inventory.should_provision_symmetric_private_nodes() {
358 self.wait_for_ssh_availability_on_new_machines(
359 AnsibleInventoryType::SymmetricNatGateway,
360 &options.current_inventory,
361 )?;
362 self.ansible_provisioner
363 .print_ansible_run_banner("Provision Symmetric NAT Gateway");
364 self.ansible_provisioner
365 .provision_symmetric_nat_gateway(&provision_options, &private_node_inventory)
366 .map_err(|err| {
367 println!("Failed to provision symmetric NAT gateway {err:?}");
368 err
369 })?;
370
371 self.wait_for_ssh_availability_on_new_machines(
372 AnsibleInventoryType::SymmetricPrivateNodes,
373 &options.current_inventory,
374 )?;
375 self.ansible_provisioner
376 .print_ansible_run_banner("Provision Symmetric Private Nodes");
377 match self.ansible_provisioner.provision_symmetric_private_nodes(
378 &mut provision_options,
379 Some(initial_multiaddr.clone()),
380 Some(initial_network_contacts_url.clone()),
381 &private_node_inventory,
382 ) {
383 Ok(()) => {
384 println!("Provisioned symmetric private nodes");
385 }
386 Err(err) => {
387 log::error!("Failed to provision symmetric private nodes: {err}");
388 node_provision_failed = true;
389 }
390 }
391 }
392
393 let should_provision_uploaders = options.desired_uploaders_count.is_some()
394 || options.desired_uploader_vm_count.is_some();
395 if should_provision_uploaders {
396 self.wait_for_ssh_availability_on_new_machines(
397 AnsibleInventoryType::Uploaders,
398 &options.current_inventory,
399 )?;
400 let genesis_network_contacts = get_bootstrap_cache_url(&initial_ip_addr);
401 self.ansible_provisioner
402 .print_ansible_run_banner("Provision Uploaders");
403 self.ansible_provisioner
404 .provision_uploaders(
405 &provision_options,
406 Some(initial_multiaddr.clone()),
407 Some(genesis_network_contacts.clone()),
408 )
409 .await
410 .map_err(|err| {
411 println!("Failed to provision uploaders {err:?}");
412 err
413 })?;
414 }
415
416 if node_provision_failed {
417 println!();
418 println!("{}", "WARNING!".yellow());
419 println!("Some nodes failed to provision without error.");
420 println!("This usually means a small number of nodes failed to start on a few VMs.");
421 println!("However, most of the time the deployment will still be usable.");
422 println!("See the output from Ansible to determine which VMs had failures.");
423 }
424
425 Ok(())
426 }
427
428 pub async fn upscale_uploaders(&self, options: &UpscaleOptions) -> Result<()> {
429 let is_bootstrap_deploy = matches!(
430 options
431 .current_inventory
432 .environment_details
433 .deployment_type,
434 DeploymentType::Bootstrap
435 );
436
437 if is_bootstrap_deploy {
438 return Err(Error::InvalidUploaderUpscaleDeploymentType(
439 "bootstrap".to_string(),
440 ));
441 }
442
443 let desired_uploader_vm_count = options
444 .desired_uploader_vm_count
445 .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
446 if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
447 return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
448 }
449 debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
450
451 let mut infra_run_options = InfraRunOptions::generate_existing(
452 &options.current_inventory.name,
453 &self.terraform_runner,
454 Some(&options.current_inventory.environment_details),
455 )
456 .await?;
457 infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
458
459 if options.plan {
460 self.plan(&infra_run_options)?;
461 return Ok(());
462 }
463
464 if !options.provision_only {
465 self.create_or_update_infra(&infra_run_options)
466 .map_err(|err| {
467 println!("Failed to create infra {err:?}");
468 err
469 })?;
470 }
471
472 if options.infra_only {
473 return Ok(());
474 }
475
476 let (initial_multiaddr, initial_ip_addr) =
477 get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
478 .map_err(|err| {
479 println!("Failed to get genesis multiaddr {err:?}");
480 err
481 })?;
482 let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
483 debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
484
485 let provision_options = ProvisionOptions {
486 ant_version: options.ant_version.clone(),
487 binary_option: options.current_inventory.binary_option.clone(),
488 chunk_size: None,
489 client_env_variables: None,
490 downloaders_count: 0,
491 enable_telegraf: true,
492 evm_data_payments_address: options
493 .current_inventory
494 .environment_details
495 .evm_details
496 .data_payments_address
497 .clone(),
498 evm_network: options
499 .current_inventory
500 .environment_details
501 .evm_details
502 .network
503 .clone(),
504 evm_payment_token_address: options
505 .current_inventory
506 .environment_details
507 .evm_details
508 .payment_token_address
509 .clone(),
510 evm_rpc_url: options
511 .current_inventory
512 .environment_details
513 .evm_details
514 .rpc_url
515 .clone(),
516 full_cone_private_node_count: 0,
517 funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
518 gas_amount: options.gas_amount,
519 interval: Some(options.interval),
520 log_format: None,
521 max_archived_log_files: options.max_archived_log_files,
522 max_log_files: options.max_log_files,
523 max_uploads: None,
524 name: options.current_inventory.name.clone(),
525 network_id: options.current_inventory.environment_details.network_id,
526 node_count: 0,
527 node_env_variables: None,
528 output_inventory_dir_path: self
529 .working_directory_path
530 .join("ansible")
531 .join("inventory"),
532 peer_cache_node_count: 0,
533 public_rpc: options.public_rpc,
534 rewards_address: options
535 .current_inventory
536 .environment_details
537 .rewards_address
538 .clone(),
539 symmetric_private_node_count: 0,
540 token_amount: options.token_amount,
541 uploaders_count: options.desired_uploaders_count,
542 wallet_secret_keys: None,
543 };
544
545 self.wait_for_ssh_availability_on_new_machines(
546 AnsibleInventoryType::Uploaders,
547 &options.current_inventory,
548 )?;
549 self.ansible_provisioner
550 .print_ansible_run_banner("Provision Uploaders");
551 self.ansible_provisioner
552 .provision_uploaders(
553 &provision_options,
554 Some(initial_multiaddr),
555 Some(initial_network_contacts_url),
556 )
557 .await
558 .map_err(|err| {
559 println!("Failed to provision uploaders {err:?}");
560 err
561 })?;
562
563 Ok(())
564 }
565
566 fn wait_for_ssh_availability_on_new_machines(
567 &self,
568 inventory_type: AnsibleInventoryType,
569 current_inventory: &DeploymentInventory,
570 ) -> Result<()> {
571 let inventory = self
572 .ansible_provisioner
573 .ansible_runner
574 .get_inventory(inventory_type, true)?;
575 let old_set: HashSet<_> = match inventory_type {
576 AnsibleInventoryType::PeerCacheNodes => current_inventory
577 .peer_cache_node_vms
578 .iter()
579 .map(|node_vm| &node_vm.vm)
580 .cloned()
581 .collect(),
582 AnsibleInventoryType::Nodes => current_inventory
583 .node_vms
584 .iter()
585 .map(|node_vm| &node_vm.vm)
586 .cloned()
587 .collect(),
588 AnsibleInventoryType::Uploaders => current_inventory
589 .uploader_vms
590 .iter()
591 .map(|uploader_vm| &uploader_vm.vm)
592 .cloned()
593 .collect(),
594 AnsibleInventoryType::FullConeNatGateway => current_inventory
595 .full_cone_nat_gateway_vms
596 .iter()
597 .cloned()
598 .collect(),
599 AnsibleInventoryType::SymmetricNatGateway => current_inventory
600 .symmetric_nat_gateway_vms
601 .iter()
602 .cloned()
603 .collect(),
604 AnsibleInventoryType::FullConePrivateNodes => current_inventory
605 .full_cone_private_node_vms
606 .iter()
607 .map(|node_vm| &node_vm.vm)
608 .cloned()
609 .collect(),
610 AnsibleInventoryType::SymmetricPrivateNodes => current_inventory
611 .symmetric_private_node_vms
612 .iter()
613 .map(|node_vm| &node_vm.vm)
614 .cloned()
615 .collect(),
616 it => return Err(Error::UpscaleInventoryTypeNotSupported(it.to_string())),
617 };
618 let new_vms: Vec<_> = inventory
619 .into_iter()
620 .filter(|item| !old_set.contains(item))
621 .collect();
622 for vm in new_vms.iter() {
623 self.ssh_client.wait_for_ssh_availability(
624 &vm.public_ip_addr,
625 &self.cloud_provider.get_ssh_user(),
626 )?;
627 }
628 Ok(())
629 }
630}