1use crate::{
8 ansible::{
9 inventory::AnsibleInventoryType,
10 provisioning::{PrivateNodeProvisionInventory, ProvisionOptions},
11 },
12 error::{Error, Result},
13 get_bootstrap_cache_url, get_genesis_multiaddr, get_multiaddr, DeploymentInventory,
14 DeploymentType, InfraRunOptions, NodeType, TestnetDeployer,
15};
16use colored::Colorize;
17use evmlib::common::U256;
18use log::debug;
19use std::{collections::HashSet, time::Duration};
20
21#[derive(Clone)]
22pub struct UpscaleOptions {
23 pub ansible_verbose: bool,
24 pub ant_version: Option<String>,
25 pub current_inventory: DeploymentInventory,
26 pub desired_full_cone_private_node_count: Option<u16>,
27 pub desired_full_cone_private_node_vm_count: Option<u16>,
28 pub desired_node_count: Option<u16>,
29 pub desired_node_vm_count: Option<u16>,
30 pub desired_peer_cache_node_count: Option<u16>,
31 pub desired_peer_cache_node_vm_count: Option<u16>,
32 pub desired_symmetric_private_node_count: Option<u16>,
33 pub desired_symmetric_private_node_vm_count: Option<u16>,
34 pub desired_uploader_vm_count: Option<u16>,
35 pub desired_uploaders_count: Option<u16>,
36 pub funding_wallet_secret_key: Option<String>,
37 pub gas_amount: Option<U256>,
38 pub interval: Duration,
39 pub infra_only: bool,
40 pub max_archived_log_files: u16,
41 pub max_log_files: u16,
42 pub plan: bool,
43 pub public_rpc: bool,
44 pub provision_only: bool,
45 pub token_amount: Option<U256>,
46}
47
48impl TestnetDeployer {
49 pub async fn upscale(&self, options: &UpscaleOptions) -> Result<()> {
50 let is_bootstrap_deploy = matches!(
51 options
52 .current_inventory
53 .environment_details
54 .deployment_type,
55 DeploymentType::Bootstrap
56 );
57
58 if is_bootstrap_deploy
59 && (options.desired_peer_cache_node_count.is_some()
60 || options.desired_peer_cache_node_vm_count.is_some()
61 || options.desired_uploader_vm_count.is_some())
62 {
63 return Err(Error::InvalidUpscaleOptionsForBootstrapDeployment);
64 }
65
66 let desired_peer_cache_node_vm_count = options
67 .desired_peer_cache_node_vm_count
68 .unwrap_or(options.current_inventory.peer_cache_node_vms.len() as u16);
69 if desired_peer_cache_node_vm_count
70 < options.current_inventory.peer_cache_node_vms.len() as u16
71 {
72 return Err(Error::InvalidUpscaleDesiredPeerCacheVmCount);
73 }
74 debug!("Using {desired_peer_cache_node_vm_count} for desired Peer Cache node VM count");
75
76 let desired_node_vm_count = options
77 .desired_node_vm_count
78 .unwrap_or(options.current_inventory.node_vms.len() as u16);
79 if desired_node_vm_count < options.current_inventory.node_vms.len() as u16 {
80 return Err(Error::InvalidUpscaleDesiredNodeVmCount);
81 }
82 debug!("Using {desired_node_vm_count} for desired node VM count");
83
84 let desired_full_cone_private_node_vm_count = options
85 .desired_full_cone_private_node_vm_count
86 .unwrap_or(options.current_inventory.full_cone_private_node_vms.len() as u16);
87 if desired_full_cone_private_node_vm_count
88 < options.current_inventory.full_cone_private_node_vms.len() as u16
89 {
90 return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeVmCount);
91 }
92 debug!("Using {desired_full_cone_private_node_vm_count} for desired full cone private node VM count");
93
94 let desired_symmetric_private_node_vm_count = options
95 .desired_symmetric_private_node_vm_count
96 .unwrap_or(options.current_inventory.symmetric_private_node_vms.len() as u16);
97 if desired_symmetric_private_node_vm_count
98 < options.current_inventory.symmetric_private_node_vms.len() as u16
99 {
100 return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeVmCount);
101 }
102 debug!("Using {desired_symmetric_private_node_vm_count} for desired full cone private node VM count");
103
104 let desired_uploader_vm_count = options
105 .desired_uploader_vm_count
106 .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
107 if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
108 return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
109 }
110 debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
111
112 let desired_peer_cache_node_count = options
113 .desired_peer_cache_node_count
114 .unwrap_or(options.current_inventory.peer_cache_node_count() as u16);
115 if desired_peer_cache_node_count < options.current_inventory.peer_cache_node_count() as u16
116 {
117 return Err(Error::InvalidUpscaleDesiredPeerCacheNodeCount);
118 }
119 debug!("Using {desired_peer_cache_node_count} for desired peer cache node count");
120
121 let desired_node_count = options
122 .desired_node_count
123 .unwrap_or(options.current_inventory.node_count() as u16);
124 if desired_node_count < options.current_inventory.node_count() as u16 {
125 return Err(Error::InvalidUpscaleDesiredNodeCount);
126 }
127 debug!("Using {desired_node_count} for desired node count");
128
129 let desired_full_cone_private_node_count = options
130 .desired_full_cone_private_node_count
131 .unwrap_or(options.current_inventory.full_cone_private_node_count() as u16);
132 if desired_full_cone_private_node_count
133 < options.current_inventory.full_cone_private_node_count() as u16
134 {
135 return Err(Error::InvalidUpscaleDesiredFullConePrivateNodeCount);
136 }
137 debug!(
138 "Using {desired_full_cone_private_node_count} for desired full cone private node count"
139 );
140
141 let desired_symmetric_private_node_count = options
142 .desired_symmetric_private_node_count
143 .unwrap_or(options.current_inventory.symmetric_private_node_count() as u16);
144 if desired_symmetric_private_node_count
145 < options.current_inventory.symmetric_private_node_count() as u16
146 {
147 return Err(Error::InvalidUpscaleDesiredSymmetricPrivateNodeCount);
148 }
149 debug!(
150 "Using {desired_symmetric_private_node_count} for desired symmetric private node count"
151 );
152
153 let mut infra_run_options = InfraRunOptions::generate_existing(
154 &options.current_inventory.name,
155 &self.terraform_runner,
156 &options.current_inventory.environment_details,
157 )
158 .await?;
159 infra_run_options.peer_cache_node_vm_count = Some(desired_peer_cache_node_vm_count);
160 infra_run_options.node_vm_count = Some(desired_node_vm_count);
161 infra_run_options.full_cone_private_node_vm_count =
162 Some(desired_full_cone_private_node_vm_count);
163 infra_run_options.symmetric_private_node_vm_count =
164 Some(desired_symmetric_private_node_vm_count);
165 infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
166
167 if options.plan {
168 self.plan(&infra_run_options)?;
169 return Ok(());
170 }
171
172 self.create_or_update_infra(&infra_run_options)
173 .map_err(|err| {
174 println!("Failed to create infra {err:?}");
175 err
176 })?;
177
178 if options.infra_only {
179 return Ok(());
180 }
181
182 let mut provision_options = ProvisionOptions {
183 binary_option: options.current_inventory.binary_option.clone(),
184 chunk_size: None,
185 downloaders_count: 0,
186 env_variables: None,
187 evm_network: options
188 .current_inventory
189 .environment_details
190 .evm_network
191 .clone(),
192 evm_data_payments_address: options
193 .current_inventory
194 .environment_details
195 .evm_data_payments_address
196 .clone(),
197 evm_payment_token_address: options
198 .current_inventory
199 .environment_details
200 .evm_payment_token_address
201 .clone(),
202 evm_rpc_url: options
203 .current_inventory
204 .environment_details
205 .evm_rpc_url
206 .clone(),
207 full_cone_private_node_count: desired_full_cone_private_node_count,
208 funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
209 interval: options.interval,
210 log_format: None,
211 logstash_details: None,
212 name: options.current_inventory.name.clone(),
213 network_id: options.current_inventory.environment_details.network_id,
214 node_count: desired_node_count,
215 max_archived_log_files: options.max_archived_log_files,
216 max_log_files: options.max_log_files,
217 output_inventory_dir_path: self
218 .working_directory_path
219 .join("ansible")
220 .join("inventory"),
221 peer_cache_node_count: desired_peer_cache_node_count,
222 public_rpc: options.public_rpc,
223 rewards_address: options
224 .current_inventory
225 .environment_details
226 .rewards_address
227 .clone(),
228 symmetric_private_node_count: desired_symmetric_private_node_count,
229 ant_version: options.ant_version.clone(),
230 uploaders_count: options.desired_uploaders_count,
231 gas_amount: options.gas_amount,
232 token_amount: None,
233 };
234 let mut node_provision_failed = false;
235
236 let (initial_multiaddr, initial_ip_addr) = if is_bootstrap_deploy {
237 get_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client).map_err(
238 |err| {
239 println!("Failed to get node multiaddr {err:?}");
240 err
241 },
242 )?
243 } else {
244 get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
245 .map_err(|err| {
246 println!("Failed to get genesis multiaddr {err:?}");
247 err
248 })?
249 };
250 let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
251 debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
252
253 if !is_bootstrap_deploy {
254 self.wait_for_ssh_availability_on_new_machines(
255 AnsibleInventoryType::PeerCacheNodes,
256 &options.current_inventory,
257 )?;
258 self.ansible_provisioner
259 .print_ansible_run_banner("Provision Peer Cache Nodes");
260 match self.ansible_provisioner.provision_nodes(
261 &provision_options,
262 Some(initial_multiaddr.clone()),
263 Some(initial_network_contacts_url.clone()),
264 NodeType::PeerCache,
265 ) {
266 Ok(()) => {
267 println!("Provisioned Peer Cache nodes");
268 }
269 Err(err) => {
270 log::error!("Failed to provision Peer Cache nodes: {err}");
271 node_provision_failed = true;
272 }
273 }
274 }
275
276 self.wait_for_ssh_availability_on_new_machines(
277 AnsibleInventoryType::Nodes,
278 &options.current_inventory,
279 )?;
280 self.ansible_provisioner
281 .print_ansible_run_banner("Provision Normal Nodes");
282 match self.ansible_provisioner.provision_nodes(
283 &provision_options,
284 Some(initial_multiaddr.clone()),
285 Some(initial_network_contacts_url.clone()),
286 NodeType::Generic,
287 ) {
288 Ok(()) => {
289 println!("Provisioned normal nodes");
290 }
291 Err(err) => {
292 log::error!("Failed to provision normal nodes: {err}");
293 node_provision_failed = true;
294 }
295 }
296
297 let private_node_inventory = PrivateNodeProvisionInventory::new(
298 &self.ansible_provisioner,
299 Some(desired_full_cone_private_node_vm_count),
300 Some(desired_symmetric_private_node_vm_count),
301 )?;
302
303 if private_node_inventory.should_provision_full_cone_private_nodes() {
304 let full_cone_nat_gateway_inventory = self
305 .ansible_provisioner
306 .ansible_runner
307 .get_inventory(AnsibleInventoryType::FullConeNatGateway, true)?;
308
309 let full_cone_nat_gateway_new_vms: Vec<_> = full_cone_nat_gateway_inventory
310 .into_iter()
311 .filter(|item| {
312 !options
313 .current_inventory
314 .full_cone_nat_gateway_vms
315 .contains(item)
316 })
317 .collect();
318
319 for vm in full_cone_nat_gateway_new_vms.iter() {
320 self.ssh_client.wait_for_ssh_availability(
321 &vm.public_ip_addr,
322 &self.cloud_provider.get_ssh_user(),
323 )?;
324 }
325
326 let full_cone_nat_gateway_new_vms = if full_cone_nat_gateway_new_vms.is_empty() {
327 None
328 } else {
329 debug!("Full Cone NAT Gateway new VMs: {full_cone_nat_gateway_new_vms:?}");
330 Some(full_cone_nat_gateway_new_vms)
331 };
332
333 match self.ansible_provisioner.provision_full_cone(
334 &provision_options,
335 Some(initial_multiaddr.clone()),
336 Some(initial_network_contacts_url.clone()),
337 private_node_inventory.clone(),
338 full_cone_nat_gateway_new_vms,
339 ) {
340 Ok(()) => {
341 println!("Provisioned Full Cone nodes and Gateway");
342 }
343 Err(err) => {
344 log::error!("Failed to provision Full Cone nodes and Gateway: {err}");
345 node_provision_failed = true;
346 }
347 }
348 }
349
350 if private_node_inventory.should_provision_symmetric_private_nodes() {
351 self.wait_for_ssh_availability_on_new_machines(
352 AnsibleInventoryType::SymmetricNatGateway,
353 &options.current_inventory,
354 )?;
355 self.ansible_provisioner
356 .print_ansible_run_banner("Provision Symmetric NAT Gateway");
357 self.ansible_provisioner
358 .provision_symmetric_nat_gateway(&provision_options, &private_node_inventory)
359 .map_err(|err| {
360 println!("Failed to provision symmetric NAT gateway {err:?}");
361 err
362 })?;
363
364 self.wait_for_ssh_availability_on_new_machines(
365 AnsibleInventoryType::SymmetricPrivateNodes,
366 &options.current_inventory,
367 )?;
368 self.ansible_provisioner
369 .print_ansible_run_banner("Provision Symmetric Private Nodes");
370 match self.ansible_provisioner.provision_symmetric_private_nodes(
371 &mut provision_options,
372 Some(initial_multiaddr.clone()),
373 Some(initial_network_contacts_url.clone()),
374 &private_node_inventory,
375 ) {
376 Ok(()) => {
377 println!("Provisioned symmetric private nodes");
378 }
379 Err(err) => {
380 log::error!("Failed to provision symmetric private nodes: {err}");
381 node_provision_failed = true;
382 }
383 }
384 }
385
386 let should_provision_uploaders = options.desired_uploaders_count.is_some()
387 || options.desired_uploader_vm_count.is_some();
388 if should_provision_uploaders {
389 self.wait_for_ssh_availability_on_new_machines(
390 AnsibleInventoryType::Uploaders,
391 &options.current_inventory,
392 )?;
393 let genesis_network_contacts = get_bootstrap_cache_url(&initial_ip_addr);
394 self.ansible_provisioner
395 .print_ansible_run_banner("Provision Uploaders");
396 self.ansible_provisioner
397 .provision_uploaders(
398 &provision_options,
399 Some(initial_multiaddr.clone()),
400 Some(genesis_network_contacts.clone()),
401 )
402 .await
403 .map_err(|err| {
404 println!("Failed to provision uploaders {err:?}");
405 err
406 })?;
407 }
408
409 if node_provision_failed {
410 println!();
411 println!("{}", "WARNING!".yellow());
412 println!("Some nodes failed to provision without error.");
413 println!("This usually means a small number of nodes failed to start on a few VMs.");
414 println!("However, most of the time the deployment will still be usable.");
415 println!("See the output from Ansible to determine which VMs had failures.");
416 }
417
418 Ok(())
419 }
420
421 pub async fn upscale_uploaders(&self, options: &UpscaleOptions) -> Result<()> {
422 let is_bootstrap_deploy = matches!(
423 options
424 .current_inventory
425 .environment_details
426 .deployment_type,
427 DeploymentType::Bootstrap
428 );
429
430 if is_bootstrap_deploy {
431 return Err(Error::InvalidUploaderUpscaleDeploymentType(
432 "bootstrap".to_string(),
433 ));
434 }
435
436 let desired_uploader_vm_count = options
437 .desired_uploader_vm_count
438 .unwrap_or(options.current_inventory.uploader_vms.len() as u16);
439 if desired_uploader_vm_count < options.current_inventory.uploader_vms.len() as u16 {
440 return Err(Error::InvalidUpscaleDesiredUploaderVmCount);
441 }
442 debug!("Using {desired_uploader_vm_count} for desired uploader VM count");
443
444 let mut infra_run_options = InfraRunOptions::generate_existing(
445 &options.current_inventory.name,
446 &self.terraform_runner,
447 &options.current_inventory.environment_details,
448 )
449 .await?;
450 infra_run_options.uploader_vm_count = Some(desired_uploader_vm_count);
451
452 if options.plan {
453 self.plan(&infra_run_options)?;
454 return Ok(());
455 }
456
457 if !options.provision_only {
458 self.create_or_update_infra(&infra_run_options)
459 .map_err(|err| {
460 println!("Failed to create infra {err:?}");
461 err
462 })?;
463 }
464
465 if options.infra_only {
466 return Ok(());
467 }
468
469 let (initial_multiaddr, initial_ip_addr) =
470 get_genesis_multiaddr(&self.ansible_provisioner.ansible_runner, &self.ssh_client)
471 .map_err(|err| {
472 println!("Failed to get genesis multiaddr {err:?}");
473 err
474 })?;
475 let initial_network_contacts_url = get_bootstrap_cache_url(&initial_ip_addr);
476 debug!("Retrieved initial peer {initial_multiaddr} and initial network contacts {initial_network_contacts_url}");
477
478 let provision_options = ProvisionOptions {
479 binary_option: options.current_inventory.binary_option.clone(),
480 chunk_size: None,
481 downloaders_count: 0,
482 env_variables: None,
483 evm_data_payments_address: options
484 .current_inventory
485 .environment_details
486 .evm_data_payments_address
487 .clone(),
488 evm_network: options
489 .current_inventory
490 .environment_details
491 .evm_network
492 .clone(),
493 evm_payment_token_address: options
494 .current_inventory
495 .environment_details
496 .evm_payment_token_address
497 .clone(),
498 evm_rpc_url: options
499 .current_inventory
500 .environment_details
501 .evm_rpc_url
502 .clone(),
503 full_cone_private_node_count: 0,
504 funding_wallet_secret_key: options.funding_wallet_secret_key.clone(),
505 interval: options.interval,
506 log_format: None,
507 logstash_details: None,
508 name: options.current_inventory.name.clone(),
509 network_id: options.current_inventory.environment_details.network_id,
510 node_count: 0,
511 max_archived_log_files: options.max_archived_log_files,
512 max_log_files: options.max_log_files,
513 output_inventory_dir_path: self
514 .working_directory_path
515 .join("ansible")
516 .join("inventory"),
517 peer_cache_node_count: 0,
518
519 public_rpc: options.public_rpc,
520 rewards_address: options
521 .current_inventory
522 .environment_details
523 .rewards_address
524 .clone(),
525 symmetric_private_node_count: 0,
526 ant_version: options.ant_version.clone(),
527 uploaders_count: options.desired_uploaders_count,
528 gas_amount: options.gas_amount,
529 token_amount: options.token_amount,
530 };
531
532 self.wait_for_ssh_availability_on_new_machines(
533 AnsibleInventoryType::Uploaders,
534 &options.current_inventory,
535 )?;
536 self.ansible_provisioner
537 .print_ansible_run_banner("Provision Uploaders");
538 self.ansible_provisioner
539 .provision_uploaders(
540 &provision_options,
541 Some(initial_multiaddr),
542 Some(initial_network_contacts_url),
543 )
544 .await
545 .map_err(|err| {
546 println!("Failed to provision uploaders {err:?}");
547 err
548 })?;
549
550 Ok(())
551 }
552
553 fn wait_for_ssh_availability_on_new_machines(
554 &self,
555 inventory_type: AnsibleInventoryType,
556 current_inventory: &DeploymentInventory,
557 ) -> Result<()> {
558 let inventory = self
559 .ansible_provisioner
560 .ansible_runner
561 .get_inventory(inventory_type, true)?;
562 let old_set: HashSet<_> = match inventory_type {
563 AnsibleInventoryType::PeerCacheNodes => current_inventory
564 .peer_cache_node_vms
565 .iter()
566 .map(|node_vm| &node_vm.vm)
567 .cloned()
568 .collect(),
569 AnsibleInventoryType::Nodes => current_inventory
570 .node_vms
571 .iter()
572 .map(|node_vm| &node_vm.vm)
573 .cloned()
574 .collect(),
575 AnsibleInventoryType::Uploaders => current_inventory
576 .uploader_vms
577 .iter()
578 .map(|uploader_vm| &uploader_vm.vm)
579 .cloned()
580 .collect(),
581 AnsibleInventoryType::FullConeNatGateway => current_inventory
582 .full_cone_nat_gateway_vms
583 .iter()
584 .cloned()
585 .collect(),
586 AnsibleInventoryType::SymmetricNatGateway => current_inventory
587 .symmetric_nat_gateway_vms
588 .iter()
589 .cloned()
590 .collect(),
591 AnsibleInventoryType::FullConePrivateNodes => current_inventory
592 .full_cone_private_node_vms
593 .iter()
594 .map(|node_vm| &node_vm.vm)
595 .cloned()
596 .collect(),
597 AnsibleInventoryType::SymmetricPrivateNodes => current_inventory
598 .symmetric_private_node_vms
599 .iter()
600 .map(|node_vm| &node_vm.vm)
601 .cloned()
602 .collect(),
603 it => return Err(Error::UpscaleInventoryTypeNotSupported(it.to_string())),
604 };
605 let new_vms: Vec<_> = inventory
606 .into_iter()
607 .filter(|item| !old_set.contains(item))
608 .collect();
609 for vm in new_vms.iter() {
610 self.ssh_client.wait_for_ssh_availability(
611 &vm.public_ip_addr,
612 &self.cloud_provider.get_ssh_user(),
613 )?;
614 }
615 Ok(())
616 }
617}