1use crate::{
8 ansible::{
9 inventory::AnsibleInventoryType,
10 provisioning::{PrivateNodeProvisionInventory, ProvisionOptions},
11 },
12 error::{Error, Result},
13 get_bootstrap_cache_url, get_genesis_multiaddr, get_multiaddr, DeploymentInventory,
14 DeploymentType, InfraRunOptions, NodeType, TestnetDeployer,
15};
16use colored::Colorize;
17use evmlib::common::U256;
18use log::debug;
19use std::{collections::HashSet, time::Duration};
20
21#[derive(Clone)]
22pub struct UpscaleOptions {
23 pub ansible_verbose: bool,
24 pub ant_version: Option<String>,
25 pub current_inventory: DeploymentInventory,
26 pub desired_full_cone_private_node_count: Option<u16>,
27 pub desired_full_cone_private_node_vm_count: Option<u16>,
28 pub desired_node_count: Option<u16>,
29 pub desired_node_vm_count: Option<u16>,
30 pub desired_peer_cache_node_count: Option<u16>,
31 pub desired_peer_cache_node_vm_count: Option<u16>,
32 pub desired_symmetric_private_node_count: Option<u16>,
33 pub desired_symmetric_private_node_vm_count: Option<u16>,
34 pub desired_uploader_vm_count: Option<u16>,
35 pub desired_uploaders_count: Option<u16>,
36 pub funding_wallet_secret_key: Option<String>,
37 pub gas_amount: Option<U256>,
38 pub interval: Duration,
39 pub infra_only: bool,
40 pub max_archived_log_files: u16,
41 pub max_log_files: u16,
42 pub plan: bool,
43 pub public_rpc: bool,
44 pub provision_only: bool,
45 pub token_amount: Option<U256>,
46}
47
48impl TestnetDeployer {
49 pub async fn upscale(&self, options: &UpscaleOptions) -> Result<()> {
50 let is_bootstrap_deploy = matches!(
51 options
52 .current_inventory
53 .environment_details
54 .deployment_type,
55 DeploymentType::Bootstrap
56 );
57
58 if is_bootstrap_deploy
59 && (options.desired_peer_cache_node_count.is_some()
60 || options.desired_peer_cache_node_vm_count.is_some()
61 || options.desired_uploader_vm_count.is_some())
62 {
63 return Err(Error::InvalidUpscaleOptionsForBootstrapDeployment);
64 }
65
66 let desired_peer_cache_node_vm_count = options
67 .desired_peer_cache_node_vm_count
68 .unwrap_or(options.current_inventory.peer_cache_node_vms.len() as u16);
69 if desired_peer_cache_node_vm_count
70 < options.current_inventory.peer_cache_node_vms.len() as u16
71 {
72 return Err(Error::InvalidUpscaleDesiredPeerCacheVmCount);
73 }
74 debug!("Using {desired_peer_cache_node_vm_count} for desired Peer Cache node VM count");
75
76 let desired_node_vm_count = options
77 .desired_node_vm_count
78 .unwrap_or(options.current_inventory.node_vms.len() as u16);
79 if desired_node_vm_count < options.current_inventory.node_vms.len() as u16 {
80 return Err(Error::InvalidUpscaleDesiredNodeVmCount);
81 }
82 debug!("Using {desired_node_vm_count} for desired node VM count");
83
84 let desired_full_cone_private_node_vm_count = options
85 .desired_full_cone_private_node_vm_count
86 .unwrap_or(options.current_inventory.full_cone_private_node_vms.len() as u16);
87 if desired_full_cone_private_node_vm_count
88 < options.current_inventory.full_cone_private_node_vms.len() as u16
89 {
90 return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeVmCount);
91 }
92 debug!("Using {desired_full_cone_private_node_vm_count} for desired full cone private node VM count");
93
94 let desired_symmetric_private_node_vm_count = options
95 .desired_symmetric_private_node_vm_count
96 .unwrap_or(options.current_inventory.symmetric_private_node_vms.len() as u16);
97 if desired_symmetric_private_node_vm_count
98 < options.current_inventory.symmetric_private_node_vms.len() as u16
99 {
100 return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeVmCount);
101 }
102 debug!("Using {desired_symmetric_private_node_vm_count} for desired full cone private node VM count");
103
104 let desired_uploader_vm_count = options
105 .desired_uploader_vm_count
106 .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
107 if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
108 return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
109 }
110 debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
111
112 let desired_peer_cache_node_count = options
113 .desired_peer_cache_node_count
114 .unwrap_or(options.current_inventory.peer_cache_node_count() as u16);
115 if desired_peer_cache_node_count < options.current_inventory.peer_cache_node_count() as u16
116 {
117 return Err(Error::InvalidUpscaleDesiredPeerCacheNodeCount);
118 }
119 debug!("Using {desired_peer_cache_node_count} for desired peer cache node count");
120
121 let desired_node_count = options
122 .desired_node_count
123 .unwrap_or(options.current_inventory.node_count() as u16);
124 if desired_node_count < options.current_inventory.node_count() as u16 {
125 return Err(Error::InvalidUpscaleDesiredNodeCount);
126 }
127 debug!("Using {desired_node_count} for desired node count");
128
129 let desired_full_cone_private_node_count = options
130 .desired_full_cone_private_node_count
131 .unwrap_or(options.current_inventory.full_cone_private_node_count() as u16);
132 if desired_full_cone_private_node_count
133 < options.current_inventory.full_cone_private_node_count() as u16
134 {
135 return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeCount);
136 }
137 debug!(
138 "Using {desired_full_cone_private_node_count} for desired full cone private node count"
139 );
140
141 let desired_symmetric_private_node_count = options
142 .desired_symmetric_private_node_count
143 .unwrap_or(options.current_inventory.symmetric_private_node_count() as u16);
144 if desired_symmetric_private_node_count
145 < options.current_inventory.symmetric_private_node_count() as u16
146 {
147 return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeCount);
148 }
149 debug!(
150 "Using {desired_symmetric_private_node_count} for desired symmetric private node count"
151 );
152
153 let mut infra_run_options = InfraRunOptions::generate_existing(
154 &options.current_inventory.name,
155 &self.terraform_runner,
156 &options.current_inventory.environment_details,
157 )
158 .await?;
159 infra_run_options.peer_cache_node_vm_count = Some(desired_peer_cache_node_vm_count);
160 infra_run_options.node_vm_count = Some(desired_node_vm_count);
161 infra_run_options.full_cone_private_node_vm_count =
162 Some(desired_full_cone_private_node_vm_count);
163 infra_run_options.symmetric_private_node_vm_count =
164 Some(desired_symmetric_private_node_vm_count);
165 infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
166
167 if options.plan {
168 self.plan(&infra_run_options)?;
169 return Ok(());
170 }
171
172 self.create_or_update_infra(&infra_run_options)
173 .map_err(|err| {
174 println!("Failed to create infra {err:?}");
175 err
176 })?;
177
178 if options.infra_only {
179 return Ok(());
180 }
181
182 let mut provision_options = ProvisionOptions {
183 binary_option: options.current_inventory.binary_option.clone(),
184 chunk_size: None,
185 client_env_variables: None,
186 downloaders_count: 0,
187 enable_telegraf: true,
188 evm_network: options
189 .current_inventory
190 .environment_details
191 .evm_network
192 .clone(),
193 evm_data_payments_address: options
194 .current_inventory
195 .environment_details
196 .evm_data_payments_address
197 .clone(),
198 evm_payment_token_address: options
199 .current_inventory
200 .environment_details
201 .evm_payment_token_address
202 .clone(),
203 evm_rpc_url: options
204 .current_inventory
205 .environment_details
206 .evm_rpc_url
207 .clone(),
208 full_cone_private_node_count: desired_full_cone_private_node_count,
209 funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
210 interval: options.interval,
211 log_format: None,
212 logstash_details: None,
213 name: options.current_inventory.name.clone(),
214 network_id: options.current_inventory.environment_details.network_id,
215 node_count: desired_node_count,
216 node_env_variables: None,
217 max_archived_log_files: options.max_archived_log_files,
218 max_log_files: options.max_log_files,
219 output_inventory_dir_path: self
220 .working_directory_path
221 .join("ansible")
222 .join("inventory"),
223 peer_cache_node_count: desired_peer_cache_node_count,
224 public_rpc: options.public_rpc,
225 rewards_address: options
226 .current_inventory
227 .environment_details
228 .rewards_address
229 .clone(),
230 symmetric_private_node_count: desired_symmetric_private_node_count,
231 ant_version: options.ant_version.clone(),
232 uploaders_count: options.desired_uploaders_count,
233 gas_amount: options.gas_amount,
234 token_amount: None,
235 };
236 let mut node_provision_failed = false;
237
238 let (initial_multiaddr, initial_ip_addr) = if is_bootstrap_deploy {
239 get_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client).map_err(
240 |err| {
241 println!("Failed to get node multiaddr {err:?}");
242 err
243 },
244 )?
245 } else {
246 get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
247 .map_err(|err| {
248 println!("Failed to get genesis multiaddr {err:?}");
249 err
250 })?
251 };
252 let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
253 debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
254
255 if !is_bootstrap_deploy {
256 self.wait_for_ssh_availability_on_new_machines(
257 AnsibleInventoryType::PeerCacheNodes,
258 &options.current_inventory,
259 )?;
260 self.ansible_provisioner
261 .print_ansible_run_banner("Provision Peer Cache Nodes");
262 match self.ansible_provisioner.provision_nodes(
263 &provision_options,
264 Some(initial_multiaddr.clone()),
265 Some(initial_network_contacts_url.clone()),
266 NodeType::PeerCache,
267 ) {
268 Ok(()) => {
269 println!("Provisioned Peer Cache nodes");
270 }
271 Err(err) => {
272 log::error!("Failed to provision Peer Cache nodes: {err}");
273 node_provision_failed = true;
274 }
275 }
276 }
277
278 self.wait_for_ssh_availability_on_new_machines(
279 AnsibleInventoryType::Nodes,
280 &options.current_inventory,
281 )?;
282 self.ansible_provisioner
283 .print_ansible_run_banner("Provision Normal Nodes");
284 match self.ansible_provisioner.provision_nodes(
285 &provision_options,
286 Some(initial_multiaddr.clone()),
287 Some(initial_network_contacts_url.clone()),
288 NodeType::Generic,
289 ) {
290 Ok(()) => {
291 println!("Provisioned normal nodes");
292 }
293 Err(err) => {
294 log::error!("Failed to provision normal nodes: {err}");
295 node_provision_failed = true;
296 }
297 }
298
299 let private_node_inventory = PrivateNodeProvisionInventory::new(
300 &self.ansible_provisioner,
301 Some(desired_full_cone_private_node_vm_count),
302 Some(desired_symmetric_private_node_vm_count),
303 )?;
304
305 if private_node_inventory.should_provision_full_cone_private_nodes() {
306 let full_cone_nat_gateway_inventory = self
307 .ansible_provisioner
308 .ansible_runner
309 .get_inventory(AnsibleInventoryType::FullConeNatGateway, true)?;
310
311 let full_cone_nat_gateway_new_vms: Vec<_> = full_cone_nat_gateway_inventory
312 .into_iter()
313 .filter(|item| {
314 !options
315 .current_inventory
316 .full_cone_nat_gateway_vms
317 .contains(item)
318 })
319 .collect();
320
321 for vm in full_cone_nat_gateway_new_vms.iter() {
322 self.ssh_client.wait_for_ssh_availability(
323 &vm.public_ip_addr,
324 &self.cloud_provider.get_ssh_user(),
325 )?;
326 }
327
328 let full_cone_nat_gateway_new_vms = if full_cone_nat_gateway_new_vms.is_empty() {
329 None
330 } else {
331 debug!("Full Cone NAT Gateway new VMs: {full_cone_nat_gateway_new_vms:?}");
332 Some(full_cone_nat_gateway_new_vms)
333 };
334
335 match self.ansible_provisioner.provision_full_cone(
336 &provision_options,
337 Some(initial_multiaddr.clone()),
338 Some(initial_network_contacts_url.clone()),
339 private_node_inventory.clone(),
340 full_cone_nat_gateway_new_vms,
341 ) {
342 Ok(()) => {
343 println!("Provisioned Full Cone nodes and Gateway");
344 }
345 Err(err) => {
346 log::error!("Failed to provision Full Cone nodes and Gateway: {err}");
347 node_provision_failed = true;
348 }
349 }
350 }
351
352 if private_node_inventory.should_provision_symmetric_private_nodes() {
353 self.wait_for_ssh_availability_on_new_machines(
354 AnsibleInventoryType::SymmetricNatGateway,
355 &options.current_inventory,
356 )?;
357 self.ansible_provisioner
358 .print_ansible_run_banner("Provision Symmetric NAT Gateway");
359 self.ansible_provisioner
360 .provision_symmetric_nat_gateway(&provision_options, &private_node_inventory)
361 .map_err(|err| {
362 println!("Failed to provision symmetric NAT gateway {err:?}");
363 err
364 })?;
365
366 self.wait_for_ssh_availability_on_new_machines(
367 AnsibleInventoryType::SymmetricPrivateNodes,
368 &options.current_inventory,
369 )?;
370 self.ansible_provisioner
371 .print_ansible_run_banner("Provision Symmetric Private Nodes");
372 match self.ansible_provisioner.provision_symmetric_private_nodes(
373 &mut provision_options,
374 Some(initial_multiaddr.clone()),
375 Some(initial_network_contacts_url.clone()),
376 &private_node_inventory,
377 ) {
378 Ok(()) => {
379 println!("Provisioned symmetric private nodes");
380 }
381 Err(err) => {
382 log::error!("Failed to provision symmetric private nodes: {err}");
383 node_provision_failed = true;
384 }
385 }
386 }
387
388 let should_provision_uploaders = options.desired_uploaders_count.is_some()
389 || options.desired_uploader_vm_count.is_some();
390 if should_provision_uploaders {
391 self.wait_for_ssh_availability_on_new_machines(
392 AnsibleInventoryType::Uploaders,
393 &options.current_inventory,
394 )?;
395 let genesis_network_contacts = get_bootstrap_cache_url(&initial_ip_addr);
396 self.ansible_provisioner
397 .print_ansible_run_banner("Provision Uploaders");
398 self.ansible_provisioner
399 .provision_uploaders(
400 &provision_options,
401 Some(initial_multiaddr.clone()),
402 Some(genesis_network_contacts.clone()),
403 )
404 .await
405 .map_err(|err| {
406 println!("Failed to provision uploaders {err:?}");
407 err
408 })?;
409 }
410
411 if node_provision_failed {
412 println!();
413 println!("{}", "WARNING!".yellow());
414 println!("Some nodes failed to provision without error.");
415 println!("This usually means a small number of nodes failed to start on a few VMs.");
416 println!("However, most of the time the deployment will still be usable.");
417 println!("See the output from Ansible to determine which VMs had failures.");
418 }
419
420 Ok(())
421 }
422
423 pub async fn upscale_uploaders(&self, options: &UpscaleOptions) -> Result<()> {
424 let is_bootstrap_deploy = matches!(
425 options
426 .current_inventory
427 .environment_details
428 .deployment_type,
429 DeploymentType::Bootstrap
430 );
431
432 if is_bootstrap_deploy {
433 return Err(Error::InvalidUploaderUpscaleDeploymentType(
434 "bootstrap".to_string(),
435 ));
436 }
437
438 let desired_uploader_vm_count = options
439 .desired_uploader_vm_count
440 .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
441 if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
442 return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
443 }
444 debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
445
446 let mut infra_run_options = InfraRunOptions::generate_existing(
447 &options.current_inventory.name,
448 &self.terraform_runner,
449 &options.current_inventory.environment_details,
450 )
451 .await?;
452 infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
453
454 if options.plan {
455 self.plan(&infra_run_options)?;
456 return Ok(());
457 }
458
459 if !options.provision_only {
460 self.create_or_update_infra(&infra_run_options)
461 .map_err(|err| {
462 println!("Failed to create infra {err:?}");
463 err
464 })?;
465 }
466
467 if options.infra_only {
468 return Ok(());
469 }
470
471 let (initial_multiaddr, initial_ip_addr) =
472 get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
473 .map_err(|err| {
474 println!("Failed to get genesis multiaddr {err:?}");
475 err
476 })?;
477 let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
478 debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
479
480 let provision_options = ProvisionOptions {
481 binary_option: options.current_inventory.binary_option.clone(),
482 chunk_size: None,
483 client_env_variables: None,
484 downloaders_count: 0,
485 enable_telegraf: true,
486 evm_data_payments_address: options
487 .current_inventory
488 .environment_details
489 .evm_data_payments_address
490 .clone(),
491 evm_network: options
492 .current_inventory
493 .environment_details
494 .evm_network
495 .clone(),
496 evm_payment_token_address: options
497 .current_inventory
498 .environment_details
499 .evm_payment_token_address
500 .clone(),
501 evm_rpc_url: options
502 .current_inventory
503 .environment_details
504 .evm_rpc_url
505 .clone(),
506 full_cone_private_node_count: 0,
507 funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
508 interval: options.interval,
509 log_format: None,
510 logstash_details: None,
511 name: options.current_inventory.name.clone(),
512 network_id: options.current_inventory.environment_details.network_id,
513 node_count: 0,
514 node_env_variables: None,
515 max_archived_log_files: options.max_archived_log_files,
516 max_log_files: options.max_log_files,
517 output_inventory_dir_path: self
518 .working_directory_path
519 .join("ansible")
520 .join("inventory"),
521 peer_cache_node_count: 0,
522 public_rpc: options.public_rpc,
523 rewards_address: options
524 .current_inventory
525 .environment_details
526 .rewards_address
527 .clone(),
528 symmetric_private_node_count: 0,
529 ant_version: options.ant_version.clone(),
530 uploaders_count: options.desired_uploaders_count,
531 gas_amount: options.gas_amount,
532 token_amount: options.token_amount,
533 };
534
535 self.wait_for_ssh_availability_on_new_machines(
536 AnsibleInventoryType::Uploaders,
537 &options.current_inventory,
538 )?;
539 self.ansible_provisioner
540 .print_ansible_run_banner("Provision Uploaders");
541 self.ansible_provisioner
542 .provision_uploaders(
543 &provision_options,
544 Some(initial_multiaddr),
545 Some(initial_network_contacts_url),
546 )
547 .await
548 .map_err(|err| {
549 println!("Failed to provision uploaders {err:?}");
550 err
551 })?;
552
553 Ok(())
554 }
555
556 fn wait_for_ssh_availability_on_new_machines(
557 &self,
558 inventory_type: AnsibleInventoryType,
559 current_inventory: &DeploymentInventory,
560 ) -> Result<()> {
561 let inventory = self
562 .ansible_provisioner
563 .ansible_runner
564 .get_inventory(inventory_type, true)?;
565 let old_set: HashSet<_> = match inventory_type {
566 AnsibleInventoryType::PeerCacheNodes => current_inventory
567 .peer_cache_node_vms
568 .iter()
569 .map(|node_vm| &node_vm.vm)
570 .cloned()
571 .collect(),
572 AnsibleInventoryType::Nodes => current_inventory
573 .node_vms
574 .iter()
575 .map(|node_vm| &node_vm.vm)
576 .cloned()
577 .collect(),
578 AnsibleInventoryType::Uploaders => current_inventory
579 .uploader_vms
580 .iter()
581 .map(|uploader_vm| &uploader_vm.vm)
582 .cloned()
583 .collect(),
584 AnsibleInventoryType::FullConeNatGateway => current_inventory
585 .full_cone_nat_gateway_vms
586 .iter()
587 .cloned()
588 .collect(),
589 AnsibleInventoryType::SymmetricNatGateway => current_inventory
590 .symmetric_nat_gateway_vms
591 .iter()
592 .cloned()
593 .collect(),
594 AnsibleInventoryType::FullConePrivateNodes => current_inventory
595 .full_cone_private_node_vms
596 .iter()
597 .map(|node_vm| &node_vm.vm)
598 .cloned()
599 .collect(),
600 AnsibleInventoryType::SymmetricPrivateNodes => current_inventory
601 .symmetric_private_node_vms
602 .iter()
603 .map(|node_vm| &node_vm.vm)
604 .cloned()
605 .collect(),
606 it => return Err(Error::UpscaleInventoryTypeNotSupported(it.to_string())),
607 };
608 let new_vms: Vec<_> = inventory
609 .into_iter()
610 .filter(|item| !old_set.contains(item))
611 .collect();
612 for vm in new_vms.iter() {
613 self.ssh_client.wait_for_ssh_availability(
614 &vm.public_ip_addr,
615 &self.cloud_provider.get_ssh_user(),
616 )?;
617 }
618 Ok(())
619 }
620}