1use crate::{
8 ansible::{
9 inventory::AnsibleInventoryType,
10 provisioning::{PrivateNodeProvisionInventory, ProvisionOptions},
11 },
12 error::{Error, Result},
13 get_bootstrap_cache_url, get_genesis_multiaddr, get_multiaddr, DeploymentInventory,
14 DeploymentType, InfraRunOptions, NodeType, TestnetDeployer,
15};
16use colored::Colorize;
17use evmlib::common::U256;
18use log::debug;
19use std::{collections::HashSet, time::Duration};
20
21#[derive(Clone)]
22pub struct UpscaleOptions {
23 pub ansible_verbose: bool,
24 pub ant_version: Option<String>,
25 pub current_inventory: DeploymentInventory,
26 pub desired_full_cone_private_node_count: Option<u16>,
27 pub desired_full_cone_private_node_vm_count: Option<u16>,
28 pub desired_node_count: Option<u16>,
29 pub desired_node_vm_count: Option<u16>,
30 pub desired_peer_cache_node_count: Option<u16>,
31 pub desired_peer_cache_node_vm_count: Option<u16>,
32 pub desired_symmetric_private_node_count: Option<u16>,
33 pub desired_symmetric_private_node_vm_count: Option<u16>,
34 pub desired_uploader_vm_count: Option<u16>,
35 pub desired_uploaders_count: Option<u16>,
36 pub funding_wallet_secret_key: Option<String>,
37 pub gas_amount: Option<U256>,
38 pub interval: Duration,
39 pub infra_only: bool,
40 pub max_archived_log_files: u16,
41 pub max_log_files: u16,
42 pub plan: bool,
43 pub public_rpc: bool,
44 pub provision_only: bool,
45 pub token_amount: Option<U256>,
46}
47
48impl TestnetDeployer {
49 pub async fn upscale(&self, options: &UpscaleOptions) -> Result<()> {
50 let is_bootstrap_deploy = matches!(
51 options
52 .current_inventory
53 .environment_details
54 .deployment_type,
55 DeploymentType::Bootstrap
56 );
57
58 if is_bootstrap_deploy
59 && (options.desired_peer_cache_node_count.is_some()
60 || options.desired_peer_cache_node_vm_count.is_some()
61 || options.desired_uploader_vm_count.is_some())
62 {
63 return Err(Error::InvalidUpscaleOptionsForBootstrapDeployment);
64 }
65
66 let desired_peer_cache_node_vm_count = options
67 .desired_peer_cache_node_vm_count
68 .unwrap_or(options.current_inventory.peer_cache_node_vms.len() as u16);
69 if desired_peer_cache_node_vm_count
70 < options.current_inventory.peer_cache_node_vms.len() as u16
71 {
72 return Err(Error::InvalidUpscaleDesiredPeerCacheVmCount);
73 }
74 debug!("Using {desired_peer_cache_node_vm_count} for desired Peer Cache node VM count");
75
76 let desired_node_vm_count = options
77 .desired_node_vm_count
78 .unwrap_or(options.current_inventory.node_vms.len() as u16);
79 if desired_node_vm_count < options.current_inventory.node_vms.len() as u16 {
80 return Err(Error::InvalidUpscaleDesiredNodeVmCount);
81 }
82 debug!("Using {desired_node_vm_count} for desired node VM count");
83
84 let desired_full_cone_private_node_vm_count = options
85 .desired_full_cone_private_node_vm_count
86 .unwrap_or(options.current_inventory.full_cone_private_node_vms.len() as u16);
87 if desired_full_cone_private_node_vm_count
88 < options.current_inventory.full_cone_private_node_vms.len() as u16
89 {
90 return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeVmCount);
91 }
92 debug!("Using {desired_full_cone_private_node_vm_count} for desired full cone private node VM count");
93
94 let desired_symmetric_private_node_vm_count = options
95 .desired_symmetric_private_node_vm_count
96 .unwrap_or(options.current_inventory.symmetric_private_node_vms.len() as u16);
97 if desired_symmetric_private_node_vm_count
98 < options.current_inventory.symmetric_private_node_vms.len() as u16
99 {
100 return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeVmCount);
101 }
102 debug!("Using {desired_symmetric_private_node_vm_count} for desired full cone private node VM count");
103
104 let desired_uploader_vm_count = options
105 .desired_uploader_vm_count
106 .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
107 if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
108 return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
109 }
110 debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
111
112 let desired_peer_cache_node_count = options
113 .desired_peer_cache_node_count
114 .unwrap_or(options.current_inventory.peer_cache_node_count() as u16);
115 if desired_peer_cache_node_count < options.current_inventory.peer_cache_node_count() as u16
116 {
117 return Err(Error::InvalidUpscaleDesiredPeerCacheNodeCount);
118 }
119 debug!("Using {desired_peer_cache_node_count} for desired peer cache node count");
120
121 let desired_node_count = options
122 .desired_node_count
123 .unwrap_or(options.current_inventory.node_count() as u16);
124 if desired_node_count < options.current_inventory.node_count() as u16 {
125 return Err(Error::InvalidUpscaleDesiredNodeCount);
126 }
127 debug!("Using {desired_node_count} for desired node count");
128
129 let desired_full_cone_private_node_count = options
130 .desired_full_cone_private_node_count
131 .unwrap_or(options.current_inventory.full_cone_private_node_count() as u16);
132 if desired_full_cone_private_node_count
133 < options.current_inventory.full_cone_private_node_count() as u16
134 {
135 return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeCount);
136 }
137 debug!(
138 "Using {desired_full_cone_private_node_count} for desired full cone private node count"
139 );
140
141 let desired_symmetric_private_node_count = options
142 .desired_symmetric_private_node_count
143 .unwrap_or(options.current_inventory.symmetric_private_node_count() as u16);
144 if desired_symmetric_private_node_count
145 < options.current_inventory.symmetric_private_node_count() as u16
146 {
147 return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeCount);
148 }
149 debug!(
150 "Using {desired_symmetric_private_node_count} for desired symmetric private node count"
151 );
152
153 let mut infra_run_options = InfraRunOptions::generate_existing(
154 &options.current_inventory.name,
155 &self.terraform_runner,
156 &options.current_inventory.environment_details,
157 )
158 .await?;
159 infra_run_options.peer_cache_node_vm_count = Some(desired_peer_cache_node_vm_count);
160 infra_run_options.node_vm_count = Some(desired_node_vm_count);
161 infra_run_options.full_cone_private_node_vm_count =
162 Some(desired_full_cone_private_node_vm_count);
163 infra_run_options.symmetric_private_node_vm_count =
164 Some(desired_symmetric_private_node_vm_count);
165 infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
166
167 if options.plan {
168 self.plan(&infra_run_options)?;
169 return Ok(());
170 }
171
172 self.create_or_update_infra(&infra_run_options)
173 .map_err(|err| {
174 println!("Failed to create infra {err:?}");
175 err
176 })?;
177
178 if options.infra_only {
179 return Ok(());
180 }
181
182 let mut provision_options = ProvisionOptions {
183 binary_option: options.current_inventory.binary_option.clone(),
184 chunk_size: None,
185 downloaders_count: 0,
186 enable_telegraf: true,
187 env_variables: None,
188 evm_network: options
189 .current_inventory
190 .environment_details
191 .evm_network
192 .clone(),
193 evm_data_payments_address: options
194 .current_inventory
195 .environment_details
196 .evm_data_payments_address
197 .clone(),
198 evm_payment_token_address: options
199 .current_inventory
200 .environment_details
201 .evm_payment_token_address
202 .clone(),
203 evm_rpc_url: options
204 .current_inventory
205 .environment_details
206 .evm_rpc_url
207 .clone(),
208 full_cone_private_node_count: desired_full_cone_private_node_count,
209 funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
210 interval: options.interval,
211 log_format: None,
212 logstash_details: None,
213 name: options.current_inventory.name.clone(),
214 network_id: options.current_inventory.environment_details.network_id,
215 node_count: desired_node_count,
216 max_archived_log_files: options.max_archived_log_files,
217 max_log_files: options.max_log_files,
218 output_inventory_dir_path: self
219 .working_directory_path
220 .join("ansible")
221 .join("inventory"),
222 peer_cache_node_count: desired_peer_cache_node_count,
223 public_rpc: options.public_rpc,
224 rewards_address: options
225 .current_inventory
226 .environment_details
227 .rewards_address
228 .clone(),
229 symmetric_private_node_count: desired_symmetric_private_node_count,
230 ant_version: options.ant_version.clone(),
231 uploaders_count: options.desired_uploaders_count,
232 gas_amount: options.gas_amount,
233 token_amount: None,
234 };
235 let mut node_provision_failed = false;
236
237 let (initial_multiaddr, initial_ip_addr) = if is_bootstrap_deploy {
238 get_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client).map_err(
239 |err| {
240 println!("Failed to get node multiaddr {err:?}");
241 err
242 },
243 )?
244 } else {
245 get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
246 .map_err(|err| {
247 println!("Failed to get genesis multiaddr {err:?}");
248 err
249 })?
250 };
251 let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
252 debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
253
254 if !is_bootstrap_deploy {
255 self.wait_for_ssh_availability_on_new_machines(
256 AnsibleInventoryType::PeerCacheNodes,
257 &options.current_inventory,
258 )?;
259 self.ansible_provisioner
260 .print_ansible_run_banner("Provision Peer Cache Nodes");
261 match self.ansible_provisioner.provision_nodes(
262 &provision_options,
263 Some(initial_multiaddr.clone()),
264 Some(initial_network_contacts_url.clone()),
265 NodeType::PeerCache,
266 ) {
267 Ok(()) => {
268 println!("Provisioned Peer Cache nodes");
269 }
270 Err(err) => {
271 log::error!("Failed to provision Peer Cache nodes: {err}");
272 node_provision_failed = true;
273 }
274 }
275 }
276
277 self.wait_for_ssh_availability_on_new_machines(
278 AnsibleInventoryType::Nodes,
279 &options.current_inventory,
280 )?;
281 self.ansible_provisioner
282 .print_ansible_run_banner("Provision Normal Nodes");
283 match self.ansible_provisioner.provision_nodes(
284 &provision_options,
285 Some(initial_multiaddr.clone()),
286 Some(initial_network_contacts_url.clone()),
287 NodeType::Generic,
288 ) {
289 Ok(()) => {
290 println!("Provisioned normal nodes");
291 }
292 Err(err) => {
293 log::error!("Failed to provision normal nodes: {err}");
294 node_provision_failed = true;
295 }
296 }
297
298 let private_node_inventory = PrivateNodeProvisionInventory::new(
299 &self.ansible_provisioner,
300 Some(desired_full_cone_private_node_vm_count),
301 Some(desired_symmetric_private_node_vm_count),
302 )?;
303
304 if private_node_inventory.should_provision_full_cone_private_nodes() {
305 let full_cone_nat_gateway_inventory = self
306 .ansible_provisioner
307 .ansible_runner
308 .get_inventory(AnsibleInventoryType::FullConeNatGateway, true)?;
309
310 let full_cone_nat_gateway_new_vms: Vec<_> = full_cone_nat_gateway_inventory
311 .into_iter()
312 .filter(|item| {
313 !options
314 .current_inventory
315 .full_cone_nat_gateway_vms
316 .contains(item)
317 })
318 .collect();
319
320 for vm in full_cone_nat_gateway_new_vms.iter() {
321 self.ssh_client.wait_for_ssh_availability(
322 &vm.public_ip_addr,
323 &self.cloud_provider.get_ssh_user(),
324 )?;
325 }
326
327 let full_cone_nat_gateway_new_vms = if full_cone_nat_gateway_new_vms.is_empty() {
328 None
329 } else {
330 debug!("Full Cone NAT Gateway new VMs: {full_cone_nat_gateway_new_vms:?}");
331 Some(full_cone_nat_gateway_new_vms)
332 };
333
334 match self.ansible_provisioner.provision_full_cone(
335 &provision_options,
336 Some(initial_multiaddr.clone()),
337 Some(initial_network_contacts_url.clone()),
338 private_node_inventory.clone(),
339 full_cone_nat_gateway_new_vms,
340 ) {
341 Ok(()) => {
342 println!("Provisioned Full Cone nodes and Gateway");
343 }
344 Err(err) => {
345 log::error!("Failed to provision Full Cone nodes and Gateway: {err}");
346 node_provision_failed = true;
347 }
348 }
349 }
350
351 if private_node_inventory.should_provision_symmetric_private_nodes() {
352 self.wait_for_ssh_availability_on_new_machines(
353 AnsibleInventoryType::SymmetricNatGateway,
354 &options.current_inventory,
355 )?;
356 self.ansible_provisioner
357 .print_ansible_run_banner("Provision Symmetric NAT Gateway");
358 self.ansible_provisioner
359 .provision_symmetric_nat_gateway(&provision_options, &private_node_inventory)
360 .map_err(|err| {
361 println!("Failed to provision symmetric NAT gateway {err:?}");
362 err
363 })?;
364
365 self.wait_for_ssh_availability_on_new_machines(
366 AnsibleInventoryType::SymmetricPrivateNodes,
367 &options.current_inventory,
368 )?;
369 self.ansible_provisioner
370 .print_ansible_run_banner("Provision Symmetric Private Nodes");
371 match self.ansible_provisioner.provision_symmetric_private_nodes(
372 &mut provision_options,
373 Some(initial_multiaddr.clone()),
374 Some(initial_network_contacts_url.clone()),
375 &private_node_inventory,
376 ) {
377 Ok(()) => {
378 println!("Provisioned symmetric private nodes");
379 }
380 Err(err) => {
381 log::error!("Failed to provision symmetric private nodes: {err}");
382 node_provision_failed = true;
383 }
384 }
385 }
386
387 let should_provision_uploaders = options.desired_uploaders_count.is_some()
388 || options.desired_uploader_vm_count.is_some();
389 if should_provision_uploaders {
390 self.wait_for_ssh_availability_on_new_machines(
391 AnsibleInventoryType::Uploaders,
392 &options.current_inventory,
393 )?;
394 let genesis_network_contacts = get_bootstrap_cache_url(&initial_ip_addr);
395 self.ansible_provisioner
396 .print_ansible_run_banner("Provision Uploaders");
397 self.ansible_provisioner
398 .provision_uploaders(
399 &provision_options,
400 Some(initial_multiaddr.clone()),
401 Some(genesis_network_contacts.clone()),
402 )
403 .await
404 .map_err(|err| {
405 println!("Failed to provision uploaders {err:?}");
406 err
407 })?;
408 }
409
410 if node_provision_failed {
411 println!();
412 println!("{}", "WARNING!".yellow());
413 println!("Some nodes failed to provision without error.");
414 println!("This usually means a small number of nodes failed to start on a few VMs.");
415 println!("However, most of the time the deployment will still be usable.");
416 println!("See the output from Ansible to determine which VMs had failures.");
417 }
418
419 Ok(())
420 }
421
422 pub async fn upscale_uploaders(&self, options: &UpscaleOptions) -> Result<()> {
423 let is_bootstrap_deploy = matches!(
424 options
425 .current_inventory
426 .environment_details
427 .deployment_type,
428 DeploymentType::Bootstrap
429 );
430
431 if is_bootstrap_deploy {
432 return Err(Error::InvalidUploaderUpscaleDeploymentType(
433 "bootstrap".to_string(),
434 ));
435 }
436
437 let desired_uploader_vm_count = options
438 .desired_uploader_vm_count
439 .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
440 if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
441 return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
442 }
443 debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
444
445 let mut infra_run_options = InfraRunOptions::generate_existing(
446 &options.current_inventory.name,
447 &self.terraform_runner,
448 &options.current_inventory.environment_details,
449 )
450 .await?;
451 infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
452
453 if options.plan {
454 self.plan(&infra_run_options)?;
455 return Ok(());
456 }
457
458 if !options.provision_only {
459 self.create_or_update_infra(&infra_run_options)
460 .map_err(|err| {
461 println!("Failed to create infra {err:?}");
462 err
463 })?;
464 }
465
466 if options.infra_only {
467 return Ok(());
468 }
469
470 let (initial_multiaddr, initial_ip_addr) =
471 get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
472 .map_err(|err| {
473 println!("Failed to get genesis multiaddr {err:?}");
474 err
475 })?;
476 let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
477 debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
478
479 let provision_options = ProvisionOptions {
480 binary_option: options.current_inventory.binary_option.clone(),
481 chunk_size: None,
482 downloaders_count: 0,
483 enable_telegraf: true,
484 env_variables: None,
485 evm_data_payments_address: options
486 .current_inventory
487 .environment_details
488 .evm_data_payments_address
489 .clone(),
490 evm_network: options
491 .current_inventory
492 .environment_details
493 .evm_network
494 .clone(),
495 evm_payment_token_address: options
496 .current_inventory
497 .environment_details
498 .evm_payment_token_address
499 .clone(),
500 evm_rpc_url: options
501 .current_inventory
502 .environment_details
503 .evm_rpc_url
504 .clone(),
505 full_cone_private_node_count: 0,
506 funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
507 interval: options.interval,
508 log_format: None,
509 logstash_details: None,
510 name: options.current_inventory.name.clone(),
511 network_id: options.current_inventory.environment_details.network_id,
512 node_count: 0,
513 max_archived_log_files: options.max_archived_log_files,
514 max_log_files: options.max_log_files,
515 output_inventory_dir_path: self
516 .working_directory_path
517 .join("ansible")
518 .join("inventory"),
519 peer_cache_node_count: 0,
520 public_rpc: options.public_rpc,
521 rewards_address: options
522 .current_inventory
523 .environment_details
524 .rewards_address
525 .clone(),
526 symmetric_private_node_count: 0,
527 ant_version: options.ant_version.clone(),
528 uploaders_count: options.desired_uploaders_count,
529 gas_amount: options.gas_amount,
530 token_amount: options.token_amount,
531 };
532
533 self.wait_for_ssh_availability_on_new_machines(
534 AnsibleInventoryType::Uploaders,
535 &options.current_inventory,
536 )?;
537 self.ansible_provisioner
538 .print_ansible_run_banner("Provision Uploaders");
539 self.ansible_provisioner
540 .provision_uploaders(
541 &provision_options,
542 Some(initial_multiaddr),
543 Some(initial_network_contacts_url),
544 )
545 .await
546 .map_err(|err| {
547 println!("Failed to provision uploaders {err:?}");
548 err
549 })?;
550
551 Ok(())
552 }
553
554 fn wait_for_ssh_availability_on_new_machines(
555 &self,
556 inventory_type: AnsibleInventoryType,
557 current_inventory: &DeploymentInventory,
558 ) -> Result<()> {
559 let inventory = self
560 .ansible_provisioner
561 .ansible_runner
562 .get_inventory(inventory_type, true)?;
563 let old_set: HashSet<_> = match inventory_type {
564 AnsibleInventoryType::PeerCacheNodes => current_inventory
565 .peer_cache_node_vms
566 .iter()
567 .map(|node_vm| &node_vm.vm)
568 .cloned()
569 .collect(),
570 AnsibleInventoryType::Nodes => current_inventory
571 .node_vms
572 .iter()
573 .map(|node_vm| &node_vm.vm)
574 .cloned()
575 .collect(),
576 AnsibleInventoryType::Uploaders => current_inventory
577 .uploader_vms
578 .iter()
579 .map(|uploader_vm| &uploader_vm.vm)
580 .cloned()
581 .collect(),
582 AnsibleInventoryType::FullConeNatGateway => current_inventory
583 .full_cone_nat_gateway_vms
584 .iter()
585 .cloned()
586 .collect(),
587 AnsibleInventoryType::SymmetricNatGateway => current_inventory
588 .symmetric_nat_gateway_vms
589 .iter()
590 .cloned()
591 .collect(),
592 AnsibleInventoryType::FullConePrivateNodes => current_inventory
593 .full_cone_private_node_vms
594 .iter()
595 .map(|node_vm| &node_vm.vm)
596 .cloned()
597 .collect(),
598 AnsibleInventoryType::SymmetricPrivateNodes => current_inventory
599 .symmetric_private_node_vms
600 .iter()
601 .map(|node_vm| &node_vm.vm)
602 .cloned()
603 .collect(),
604 it => return Err(Error::UpscaleInventoryTypeNotSupported(it.to_string())),
605 };
606 let new_vms: Vec<_> = inventory
607 .into_iter()
608 .filter(|item| !old_set.contains(item))
609 .collect();
610 for vm in new_vms.iter() {
611 self.ssh_client.wait_for_ssh_availability(
612 &vm.public_ip_addr,
613 &self.cloud_provider.get_ssh_user(),
614 )?;
615 }
616 Ok(())
617 }
618}