1pub mod ansible;
8pub mod bootstrap;
9pub mod deploy;
10pub mod digital_ocean;
11pub mod error;
12pub mod funding;
13pub mod infra;
14pub mod inventory;
15pub mod logs;
16pub mod logstash;
17pub mod reserved_ip;
18pub mod rpc_client;
19pub mod s3;
20pub mod safe;
21pub mod setup;
22pub mod ssh;
23pub mod terraform;
24pub mod upscale;
25
26const STORAGE_REQUIRED_PER_NODE: u16 = 7;
27
28use crate::{
29 ansible::{
30 extra_vars::ExtraVarsDocBuilder,
31 inventory::{cleanup_environment_inventory, AnsibleInventoryType},
32 provisioning::AnsibleProvisioner,
33 AnsibleRunner,
34 },
35 error::{Error, Result},
36 inventory::{DeploymentInventory, VirtualMachine},
37 rpc_client::RpcClient,
38 s3::S3Repository,
39 ssh::SshClient,
40 terraform::TerraformRunner,
41};
42use alloy::primitives::Address;
43use ant_service_management::ServiceStatus;
44use evmlib::Network;
45use flate2::read::GzDecoder;
46use indicatif::{ProgressBar, ProgressStyle};
47use infra::{build_terraform_args, InfraRunOptions};
48use log::{debug, trace};
49use semver::Version;
50use serde::{Deserialize, Serialize};
51use serde_json::json;
52use std::{
53 fs::File,
54 io::{BufRead, BufReader, BufWriter, Write},
55 net::IpAddr,
56 path::{Path, PathBuf},
57 process::{Command, Stdio},
58 str::FromStr,
59 time::Duration,
60};
61use tar::Archive;
62
63const ANSIBLE_DEFAULT_FORKS: usize = 50;
64
65#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
66pub enum DeploymentType {
67 Bootstrap,
69 #[default]
71 New,
72}
73
74#[derive(Debug, Clone, Default, Serialize, Deserialize)]
75pub struct AnvilNodeData {
76 pub data_payments_address: String,
77 pub deployer_wallet_private_key: String,
78 pub payment_token_address: String,
79 pub rpc_url: String,
80}
81
82impl std::fmt::Display for DeploymentType {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self {
85 DeploymentType::Bootstrap => write!(f, "bootstrap"),
86 DeploymentType::New => write!(f, "new"),
87 }
88 }
89}
90
91impl std::str::FromStr for DeploymentType {
92 type Err = String;
93
94 fn from_str(s: &str) -> Result<Self, Self::Err> {
95 match s.to_lowercase().as_str() {
96 "bootstrap" => Ok(DeploymentType::Bootstrap),
97 "new" => Ok(DeploymentType::New),
98 _ => Err(format!("Invalid deployment type: {}", s)),
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
104pub enum NodeType {
105 FullConePrivateNode,
106 Generic,
107 Genesis,
108 PeerCache,
109 SymmetricPrivateNode,
110}
111
112impl std::fmt::Display for NodeType {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 match self {
115 NodeType::FullConePrivateNode => write!(f, "full-cone-private"),
116 NodeType::Generic => write!(f, "generic"),
117 NodeType::Genesis => write!(f, "genesis"),
118 NodeType::PeerCache => write!(f, "peer-cache"),
119 NodeType::SymmetricPrivateNode => write!(f, "symmetric-private"),
120 }
121 }
122}
123
124impl std::str::FromStr for NodeType {
125 type Err = String;
126
127 fn from_str(s: &str) -> Result<Self, Self::Err> {
128 match s.to_lowercase().as_str() {
129 "full-cone-private" => Ok(NodeType::FullConePrivateNode),
130 "generic" => Ok(NodeType::Generic),
131 "genesis" => Ok(NodeType::Genesis),
132 "peer-cache" => Ok(NodeType::PeerCache),
133 "symmetric-private" => Ok(NodeType::SymmetricPrivateNode),
134 _ => Err(format!("Invalid node type: {}", s)),
135 }
136 }
137}
138
139impl NodeType {
140 pub fn telegraf_role(&self) -> &'static str {
141 match self {
142 NodeType::FullConePrivateNode => "NAT_FULL_CONE_NODE",
143 NodeType::Generic => "GENERIC_NODE",
144 NodeType::Genesis => "GENESIS_NODE",
145 NodeType::PeerCache => "PEER_CACHE_NODE",
146 NodeType::SymmetricPrivateNode => "NAT_RANDOMIZED_NODE",
147 }
148 }
149
150 pub fn to_ansible_inventory_type(&self) -> AnsibleInventoryType {
151 match self {
152 NodeType::FullConePrivateNode => AnsibleInventoryType::FullConePrivateNodes,
153 NodeType::Generic => AnsibleInventoryType::Nodes,
154 NodeType::Genesis => AnsibleInventoryType::Genesis,
155 NodeType::PeerCache => AnsibleInventoryType::PeerCacheNodes,
156 NodeType::SymmetricPrivateNode => AnsibleInventoryType::SymmetricPrivateNodes,
157 }
158 }
159}
160
161#[derive(Clone, Debug, Default, Eq, Serialize, Deserialize, PartialEq)]
162pub enum EvmNetwork {
163 #[default]
164 Anvil,
165 ArbitrumOne,
166 ArbitrumSepolia,
167 Custom,
168}
169
170impl std::fmt::Display for EvmNetwork {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 match self {
173 EvmNetwork::Anvil => write!(f, "evm-custom"),
174 EvmNetwork::ArbitrumOne => write!(f, "evm-arbitrum-one"),
175 EvmNetwork::ArbitrumSepolia => write!(f, "evm-arbitrum-sepolia"),
176 EvmNetwork::Custom => write!(f, "evm-custom"),
177 }
178 }
179}
180
181impl std::str::FromStr for EvmNetwork {
182 type Err = String;
183
184 fn from_str(s: &str) -> Result<Self, Self::Err> {
185 match s.to_lowercase().as_str() {
186 "anvil" => Ok(EvmNetwork::Anvil),
187 "arbitrum-one" => Ok(EvmNetwork::ArbitrumOne),
188 "arbitrum-sepolia" => Ok(EvmNetwork::ArbitrumSepolia),
189 "custom" => Ok(EvmNetwork::Custom),
190 _ => Err(format!("Invalid EVM network type: {}", s)),
191 }
192 }
193}
194
195#[derive(Clone, Debug, Default, Serialize, Deserialize)]
196pub struct EnvironmentDetails {
197 pub deployment_type: DeploymentType,
198 pub environment_type: EnvironmentType,
199 pub evm_network: EvmNetwork,
200 pub evm_data_payments_address: Option<String>,
201 pub evm_payment_token_address: Option<String>,
202 pub evm_rpc_url: Option<String>,
203 pub funding_wallet_address: Option<String>,
204 pub network_id: Option<u8>,
205 pub rewards_address: String,
206}
207
208#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
209pub enum EnvironmentType {
210 #[default]
211 Development,
212 Production,
213 Staging,
214}
215
216impl EnvironmentType {
217 pub fn get_tfvars_filename(&self, name: &str) -> String {
218 match self {
219 EnvironmentType::Development => "dev.tfvars".to_string(),
220 EnvironmentType::Staging => "staging.tfvars".to_string(),
221 EnvironmentType::Production => {
222 format!("{name}.tfvars",)
223 }
224 }
225 }
226
227 pub fn get_default_peer_cache_node_count(&self) -> u16 {
228 match self {
229 EnvironmentType::Development => 5,
230 EnvironmentType::Production => 5,
231 EnvironmentType::Staging => 5,
232 }
233 }
234
235 pub fn get_default_node_count(&self) -> u16 {
236 match self {
237 EnvironmentType::Development => 25,
238 EnvironmentType::Production => 25,
239 EnvironmentType::Staging => 25,
240 }
241 }
242
243 pub fn get_default_symmetric_private_node_count(&self) -> u16 {
244 self.get_default_node_count()
245 }
246
247 pub fn get_default_full_cone_private_node_count(&self) -> u16 {
248 self.get_default_node_count()
249 }
250}
251
252impl std::fmt::Display for EnvironmentType {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 match self {
255 EnvironmentType::Development => write!(f, "development"),
256 EnvironmentType::Production => write!(f, "production"),
257 EnvironmentType::Staging => write!(f, "staging"),
258 }
259 }
260}
261
262impl FromStr for EnvironmentType {
263 type Err = Error;
264
265 fn from_str(s: &str) -> Result<Self, Self::Err> {
266 match s.to_lowercase().as_str() {
267 "development" => Ok(EnvironmentType::Development),
268 "production" => Ok(EnvironmentType::Production),
269 "staging" => Ok(EnvironmentType::Staging),
270 _ => Err(Error::EnvironmentNameFromStringError(s.to_string())),
271 }
272 }
273}
274
275#[derive(Clone, Debug, Serialize, Deserialize)]
289pub enum BinaryOption {
290 BuildFromSource {
292 antnode_features: Option<String>,
294 branch: String,
295 repo_owner: String,
296 },
297 Versioned {
299 ant_version: Option<Version>,
300 antctl_version: Version,
301 antnode_version: Version,
302 },
303}
304
305impl BinaryOption {
306 pub fn print(&self) {
307 match self {
308 BinaryOption::BuildFromSource {
309 antnode_features,
310 branch,
311 repo_owner,
312 } => {
313 println!("Source configuration:");
314 println!(" Repository owner: {}", repo_owner);
315 println!(" Branch: {}", branch);
316 if let Some(features) = antnode_features {
317 println!(" Antnode features: {}", features);
318 }
319 }
320 BinaryOption::Versioned {
321 ant_version,
322 antctl_version,
323 antnode_version,
324 } => {
325 println!("Versioned binaries configuration:");
326 if let Some(version) = ant_version {
327 println!(" ant version: {}", version);
328 }
329 println!(" antctl version: {}", antctl_version);
330 println!(" antnode version: {}", antnode_version);
331 }
332 }
333 }
334}
335
336#[derive(Debug, Clone, Copy)]
337pub enum CloudProvider {
338 Aws,
339 DigitalOcean,
340}
341
342impl std::fmt::Display for CloudProvider {
343 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
344 match self {
345 CloudProvider::Aws => write!(f, "aws"),
346 CloudProvider::DigitalOcean => write!(f, "digital-ocean"),
347 }
348 }
349}
350
351impl CloudProvider {
352 pub fn get_ssh_user(&self) -> String {
353 match self {
354 CloudProvider::Aws => "ubuntu".to_string(),
355 CloudProvider::DigitalOcean => "root".to_string(),
356 }
357 }
358}
359
360#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
361pub enum LogFormat {
362 Default,
363 Json,
364}
365
366impl LogFormat {
367 pub fn parse_from_str(val: &str) -> Result<Self> {
368 match val {
369 "default" => Ok(LogFormat::Default),
370 "json" => Ok(LogFormat::Json),
371 _ => Err(Error::LoggingConfiguration(
372 "The only valid values for this argument are \"default\" or \"json\"".to_string(),
373 )),
374 }
375 }
376
377 pub fn as_str(&self) -> &'static str {
378 match self {
379 LogFormat::Default => "default",
380 LogFormat::Json => "json",
381 }
382 }
383}
384
385#[derive(Clone)]
386pub struct UpgradeOptions {
387 pub ansible_verbose: bool,
388 pub custom_inventory: Option<Vec<VirtualMachine>>,
389 pub env_variables: Option<Vec<(String, String)>>,
390 pub force: bool,
391 pub forks: usize,
392 pub interval: Duration,
393 pub name: String,
394 pub node_type: Option<NodeType>,
395 pub pre_upgrade_delay: Option<u64>,
396 pub provider: CloudProvider,
397 pub version: Option<String>,
398}
399
400impl UpgradeOptions {
401 pub fn get_ansible_vars(&self) -> String {
402 let mut extra_vars = ExtraVarsDocBuilder::default();
403 extra_vars.add_variable("interval", &self.interval.as_millis().to_string());
404 if let Some(env_variables) = &self.env_variables {
405 extra_vars.add_env_variable_list("env_variables", env_variables.clone());
406 }
407 if self.force {
408 extra_vars.add_variable("force", &self.force.to_string());
409 }
410 if let Some(version) = &self.version {
411 extra_vars.add_variable("antnode_version", version);
412 }
413 if let Some(pre_upgrade_delay) = &self.pre_upgrade_delay {
414 extra_vars.add_variable("pre_upgrade_delay", &pre_upgrade_delay.to_string());
415 }
416 extra_vars.build()
417 }
418}
419
420#[derive(Default)]
421pub struct TestnetDeployBuilder {
422 ansible_forks: Option<usize>,
423 ansible_verbose_mode: bool,
424 deployment_type: EnvironmentType,
425 environment_name: String,
426 provider: Option<CloudProvider>,
427 ssh_secret_key_path: Option<PathBuf>,
428 state_bucket_name: Option<String>,
429 terraform_binary_path: Option<PathBuf>,
430 vault_password_path: Option<PathBuf>,
431 working_directory_path: Option<PathBuf>,
432}
433
434impl TestnetDeployBuilder {
435 pub fn new() -> Self {
436 Default::default()
437 }
438
439 pub fn ansible_verbose_mode(&mut self, ansible_verbose_mode: bool) -> &mut Self {
440 self.ansible_verbose_mode = ansible_verbose_mode;
441 self
442 }
443
444 pub fn ansible_forks(&mut self, ansible_forks: usize) -> &mut Self {
445 self.ansible_forks = Some(ansible_forks);
446 self
447 }
448
449 pub fn deployment_type(&mut self, deployment_type: EnvironmentType) -> &mut Self {
450 self.deployment_type = deployment_type;
451 self
452 }
453
454 pub fn environment_name(&mut self, name: &str) -> &mut Self {
455 self.environment_name = name.to_string();
456 self
457 }
458
459 pub fn provider(&mut self, provider: CloudProvider) -> &mut Self {
460 self.provider = Some(provider);
461 self
462 }
463
464 pub fn state_bucket_name(&mut self, state_bucket_name: String) -> &mut Self {
465 self.state_bucket_name = Some(state_bucket_name);
466 self
467 }
468
469 pub fn terraform_binary_path(&mut self, terraform_binary_path: PathBuf) -> &mut Self {
470 self.terraform_binary_path = Some(terraform_binary_path);
471 self
472 }
473
474 pub fn working_directory(&mut self, working_directory_path: PathBuf) -> &mut Self {
475 self.working_directory_path = Some(working_directory_path);
476 self
477 }
478
479 pub fn ssh_secret_key_path(&mut self, ssh_secret_key_path: PathBuf) -> &mut Self {
480 self.ssh_secret_key_path = Some(ssh_secret_key_path);
481 self
482 }
483
484 pub fn vault_password_path(&mut self, vault_password_path: PathBuf) -> &mut Self {
485 self.vault_password_path = Some(vault_password_path);
486 self
487 }
488
489 pub fn build(&self) -> Result<TestnetDeployer> {
490 let provider = self.provider.unwrap_or(CloudProvider::DigitalOcean);
491 match provider {
492 CloudProvider::DigitalOcean => {
493 let digital_ocean_pat = std::env::var("DO_PAT").map_err(|_| {
494 Error::CloudProviderCredentialsNotSupplied("DO_PAT".to_string())
495 })?;
496 std::env::set_var("DIGITALOCEAN_TOKEN", digital_ocean_pat.clone());
500 std::env::set_var("DO_API_TOKEN", digital_ocean_pat);
501 }
502 _ => {
503 return Err(Error::CloudProviderNotSupported(provider.to_string()));
504 }
505 }
506
507 let state_bucket_name = match self.state_bucket_name {
508 Some(ref bucket_name) => bucket_name.clone(),
509 None => std::env::var("TERRAFORM_STATE_BUCKET_NAME")?,
510 };
511
512 let default_terraform_bin_path = PathBuf::from("terraform");
513 let terraform_binary_path = self
514 .terraform_binary_path
515 .as_ref()
516 .unwrap_or(&default_terraform_bin_path);
517
518 let working_directory_path = match self.working_directory_path {
519 Some(ref work_dir_path) => work_dir_path.clone(),
520 None => std::env::current_dir()?.join("resources"),
521 };
522
523 let ssh_secret_key_path = match self.ssh_secret_key_path {
524 Some(ref ssh_sk_path) => ssh_sk_path.clone(),
525 None => PathBuf::from(std::env::var("SSH_KEY_PATH")?),
526 };
527
528 let vault_password_path = match self.vault_password_path {
529 Some(ref vault_pw_path) => vault_pw_path.clone(),
530 None => PathBuf::from(std::env::var("ANSIBLE_VAULT_PASSWORD_PATH")?),
531 };
532
533 let terraform_runner = TerraformRunner::new(
534 terraform_binary_path.to_path_buf(),
535 working_directory_path
536 .join("terraform")
537 .join("testnet")
538 .join(provider.to_string()),
539 provider,
540 &state_bucket_name,
541 )?;
542 let ansible_runner = AnsibleRunner::new(
543 self.ansible_forks.unwrap_or(ANSIBLE_DEFAULT_FORKS),
544 self.ansible_verbose_mode,
545 &self.environment_name,
546 provider,
547 ssh_secret_key_path.clone(),
548 vault_password_path,
549 working_directory_path.join("ansible"),
550 )?;
551 let ssh_client = SshClient::new(ssh_secret_key_path);
552 let ansible_provisioner =
553 AnsibleProvisioner::new(ansible_runner, provider, ssh_client.clone());
554 let rpc_client = RpcClient::new(
555 PathBuf::from("/usr/local/bin/safenode_rpc_client"),
556 working_directory_path.clone(),
557 );
558
559 let safe_path = working_directory_path.join("safe");
562 if safe_path.exists() {
563 std::fs::remove_file(safe_path)?;
564 }
565
566 let testnet = TestnetDeployer::new(
567 ansible_provisioner,
568 provider,
569 self.deployment_type.clone(),
570 &self.environment_name,
571 rpc_client,
572 S3Repository {},
573 ssh_client,
574 terraform_runner,
575 working_directory_path,
576 )?;
577
578 Ok(testnet)
579 }
580}
581
582#[derive(Clone)]
583pub struct TestnetDeployer {
584 pub ansible_provisioner: AnsibleProvisioner,
585 pub cloud_provider: CloudProvider,
586 pub deployment_type: EnvironmentType,
587 pub environment_name: String,
588 pub inventory_file_path: PathBuf,
589 pub rpc_client: RpcClient,
590 pub s3_repository: S3Repository,
591 pub ssh_client: SshClient,
592 pub terraform_runner: TerraformRunner,
593 pub working_directory_path: PathBuf,
594}
595
596impl TestnetDeployer {
597 #[allow(clippy::too_many_arguments)]
598 pub fn new(
599 ansible_provisioner: AnsibleProvisioner,
600 cloud_provider: CloudProvider,
601 deployment_type: EnvironmentType,
602 environment_name: &str,
603 rpc_client: RpcClient,
604 s3_repository: S3Repository,
605 ssh_client: SshClient,
606 terraform_runner: TerraformRunner,
607 working_directory_path: PathBuf,
608 ) -> Result<TestnetDeployer> {
609 if environment_name.is_empty() {
610 return Err(Error::EnvironmentNameRequired);
611 }
612 let inventory_file_path = working_directory_path
613 .join("ansible")
614 .join("inventory")
615 .join("dev_inventory_digital_ocean.yml");
616 Ok(TestnetDeployer {
617 ansible_provisioner,
618 cloud_provider,
619 deployment_type,
620 environment_name: environment_name.to_string(),
621 inventory_file_path,
622 rpc_client,
623 ssh_client,
624 s3_repository,
625 terraform_runner,
626 working_directory_path,
627 })
628 }
629
630 pub async fn init(&self) -> Result<()> {
631 if self
632 .s3_repository
633 .folder_exists(
634 "sn-testnet",
635 &format!("testnet-logs/{}", self.environment_name),
636 )
637 .await?
638 {
639 return Err(Error::LogsForPreviousTestnetExist(
640 self.environment_name.clone(),
641 ));
642 }
643
644 self.terraform_runner.init()?;
645 let workspaces = self.terraform_runner.workspace_list()?;
646 if !workspaces.contains(&self.environment_name) {
647 self.terraform_runner
648 .workspace_new(&self.environment_name)?;
649 } else {
650 println!("Workspace {} already exists", self.environment_name);
651 }
652
653 let rpc_client_path = self.working_directory_path.join("safenode_rpc_client");
654 if !rpc_client_path.is_file() {
655 println!("Downloading the rpc client for safenode...");
656 let archive_name = "safenode_rpc_client-latest-x86_64-unknown-linux-musl.tar.gz";
657 get_and_extract_archive_from_s3(
658 &self.s3_repository,
659 "sn-node-rpc-client",
660 archive_name,
661 &self.working_directory_path,
662 )
663 .await?;
664 #[cfg(unix)]
665 {
666 use std::os::unix::fs::PermissionsExt;
667 let mut permissions = std::fs::metadata(&rpc_client_path)?.permissions();
668 permissions.set_mode(0o755); std::fs::set_permissions(&rpc_client_path, permissions)?;
670 }
671 }
672
673 Ok(())
674 }
675
676 pub fn plan(&self, options: &InfraRunOptions) -> Result<()> {
677 println!("Selecting {} workspace...", options.name);
678 self.terraform_runner.workspace_select(&options.name)?;
679
680 let args = build_terraform_args(options)?;
681
682 self.terraform_runner
683 .plan(Some(args), Some(options.tfvars_filename.clone()))?;
684 Ok(())
685 }
686
687 pub fn start(
688 &self,
689 interval: Duration,
690 node_type: Option<NodeType>,
691 custom_inventory: Option<Vec<VirtualMachine>>,
692 ) -> Result<()> {
693 self.ansible_provisioner.start_nodes(
694 &self.environment_name,
695 interval,
696 node_type,
697 custom_inventory,
698 )?;
699 Ok(())
700 }
701
702 pub fn status(&self) -> Result<()> {
708 self.ansible_provisioner.status()?;
709
710 let peer_cache_node_registries = self
711 .ansible_provisioner
712 .get_node_registries(&AnsibleInventoryType::PeerCacheNodes)?;
713 let generic_node_registries = self
714 .ansible_provisioner
715 .get_node_registries(&AnsibleInventoryType::Nodes)?;
716 let symmetric_private_node_registries = self
717 .ansible_provisioner
718 .get_node_registries(&AnsibleInventoryType::SymmetricPrivateNodes)?;
719 let full_cone_private_node_registries = self
720 .ansible_provisioner
721 .get_node_registries(&AnsibleInventoryType::FullConePrivateNodes)?;
722 let genesis_node_registry = self
723 .ansible_provisioner
724 .get_node_registries(&AnsibleInventoryType::Genesis)?
725 .clone();
726
727 peer_cache_node_registries.print();
728 generic_node_registries.print();
729 symmetric_private_node_registries.print();
730 full_cone_private_node_registries.print();
731 genesis_node_registry.print();
732
733 let all_registries = [
734 &peer_cache_node_registries,
735 &generic_node_registries,
736 &symmetric_private_node_registries,
737 &full_cone_private_node_registries,
738 &genesis_node_registry,
739 ];
740
741 let mut total_nodes = 0;
742 let mut running_nodes = 0;
743 let mut stopped_nodes = 0;
744 let mut added_nodes = 0;
745 let mut removed_nodes = 0;
746
747 for (_, registry) in all_registries
748 .iter()
749 .flat_map(|r| r.retrieved_registries.iter())
750 {
751 for node in registry.nodes.iter() {
752 total_nodes += 1;
753 match node.status {
754 ServiceStatus::Running => running_nodes += 1,
755 ServiceStatus::Stopped => stopped_nodes += 1,
756 ServiceStatus::Added => added_nodes += 1,
757 ServiceStatus::Removed => removed_nodes += 1,
758 }
759 }
760 }
761
762 let peer_cache_hosts = peer_cache_node_registries.retrieved_registries.len();
763 let generic_hosts = generic_node_registries.retrieved_registries.len();
764 let symmetric_private_hosts = symmetric_private_node_registries.retrieved_registries.len();
765 let full_cone_private_hosts = full_cone_private_node_registries.retrieved_registries.len();
766
767 let peer_cache_nodes = peer_cache_node_registries
768 .retrieved_registries
769 .iter()
770 .flat_map(|(_, n)| n.nodes.iter())
771 .count();
772 let generic_nodes = generic_node_registries
773 .retrieved_registries
774 .iter()
775 .flat_map(|(_, n)| n.nodes.iter())
776 .count();
777 let symmetric_private_nodes = symmetric_private_node_registries
778 .retrieved_registries
779 .iter()
780 .flat_map(|(_, n)| n.nodes.iter())
781 .count();
782 let full_cone_private_nodes = full_cone_private_node_registries
783 .retrieved_registries
784 .iter()
785 .flat_map(|(_, n)| n.nodes.iter())
786 .count();
787
788 println!("-------");
789 println!("Summary");
790 println!("-------");
791 println!(
792 "Total peer cache nodes ({}x{}): {}",
793 peer_cache_hosts,
794 if peer_cache_hosts > 0 {
795 peer_cache_nodes / peer_cache_hosts
796 } else {
797 0
798 },
799 peer_cache_nodes
800 );
801 println!(
802 "Total generic nodes ({}x{}): {}",
803 generic_hosts,
804 if generic_hosts > 0 {
805 generic_nodes / generic_hosts
806 } else {
807 0
808 },
809 generic_nodes
810 );
811 println!(
812 "Total symmetric private nodes ({}x{}): {}",
813 symmetric_private_hosts,
814 if symmetric_private_hosts > 0 {
815 symmetric_private_nodes / symmetric_private_hosts
816 } else {
817 0
818 },
819 symmetric_private_nodes
820 );
821 println!(
822 "Total full cone private nodes ({}x{}): {}",
823 full_cone_private_hosts,
824 if full_cone_private_hosts > 0 {
825 full_cone_private_nodes / full_cone_private_hosts
826 } else {
827 0
828 },
829 full_cone_private_nodes
830 );
831 println!("Total nodes: {}", total_nodes);
832 println!("Running nodes: {}", running_nodes);
833 println!("Stopped nodes: {}", stopped_nodes);
834 println!("Added nodes: {}", added_nodes);
835 println!("Removed nodes: {}", removed_nodes);
836
837 Ok(())
838 }
839
840 pub fn cleanup_node_logs(&self, setup_cron: bool) -> Result<()> {
841 self.ansible_provisioner.cleanup_node_logs(setup_cron)?;
842 Ok(())
843 }
844
845 pub fn start_telegraf(
846 &self,
847 node_type: Option<NodeType>,
848 custom_inventory: Option<Vec<VirtualMachine>>,
849 ) -> Result<()> {
850 self.ansible_provisioner.start_telegraf(
851 &self.environment_name,
852 node_type,
853 custom_inventory,
854 )?;
855 Ok(())
856 }
857
858 pub fn stop(
859 &self,
860 interval: Duration,
861 node_type: Option<NodeType>,
862 custom_inventory: Option<Vec<VirtualMachine>>,
863 delay: Option<u64>,
864 service_names: Option<Vec<String>>,
865 ) -> Result<()> {
866 self.ansible_provisioner.stop_nodes(
867 &self.environment_name,
868 interval,
869 node_type,
870 custom_inventory,
871 delay,
872 service_names,
873 )?;
874 Ok(())
875 }
876
877 pub fn stop_telegraf(
878 &self,
879 node_type: Option<NodeType>,
880 custom_inventory: Option<Vec<VirtualMachine>>,
881 ) -> Result<()> {
882 self.ansible_provisioner.stop_telegraf(
883 &self.environment_name,
884 node_type,
885 custom_inventory,
886 )?;
887 Ok(())
888 }
889
890 pub fn upgrade(&self, options: UpgradeOptions) -> Result<()> {
891 self.ansible_provisioner.upgrade_nodes(&options)?;
892 Ok(())
893 }
894
895 pub fn upgrade_antctl(
896 &self,
897 version: Version,
898 node_type: Option<NodeType>,
899 custom_inventory: Option<Vec<VirtualMachine>>,
900 ) -> Result<()> {
901 self.ansible_provisioner.upgrade_antctl(
902 &self.environment_name,
903 &version,
904 node_type,
905 custom_inventory,
906 )?;
907 Ok(())
908 }
909
910 pub fn upgrade_node_telegraf(&self, name: &str) -> Result<()> {
911 self.ansible_provisioner.upgrade_node_telegraf(name)?;
912 Ok(())
913 }
914
915 pub fn upgrade_uploader_telegraf(&self, name: &str) -> Result<()> {
916 self.ansible_provisioner.upgrade_uploader_telegraf(name)?;
917 Ok(())
918 }
919
920 pub async fn clean(&self) -> Result<()> {
921 let environment_details =
922 get_environment_details(&self.environment_name, &self.s3_repository).await?;
923
924 let evm_network = match environment_details.evm_network {
925 EvmNetwork::Anvil => None,
926 EvmNetwork::Custom => Some(Network::new_custom(
927 environment_details.evm_rpc_url.as_ref().unwrap(),
928 environment_details
929 .evm_payment_token_address
930 .as_ref()
931 .unwrap(),
932 environment_details
933 .evm_data_payments_address
934 .as_ref()
935 .unwrap(),
936 )),
937 EvmNetwork::ArbitrumOne => Some(Network::ArbitrumOne),
938 EvmNetwork::ArbitrumSepolia => Some(Network::ArbitrumSepolia),
939 };
940 if let (Some(network), Some(address)) =
941 (evm_network, &environment_details.funding_wallet_address)
942 {
943 if let Err(err) = self
944 .ansible_provisioner
945 .drain_funds_from_uploaders(
946 Address::from_str(address).map_err(|err| {
947 log::error!("Invalid funding wallet public key: {err:?}");
948 Error::FailedToParseKey
949 })?,
950 network,
951 )
952 .await
953 {
954 log::error!("Failed to drain funds from uploaders: {err:?}");
955 }
956 } else {
957 println!("Custom network provided. Not draining funds.");
958 log::info!("Custom network provided. Not draining funds.");
959 }
960
961 do_clean(
962 &self.environment_name,
963 Some(environment_details),
964 self.working_directory_path.clone(),
965 &self.terraform_runner,
966 None,
967 )
968 .await?;
969 self.s3_repository
970 .delete_object("sn-environment-type", &self.environment_name)
971 .await?;
972 Ok(())
973 }
974}
975
976pub fn get_genesis_multiaddr(
981 ansible_runner: &AnsibleRunner,
982 ssh_client: &SshClient,
983) -> Result<(String, IpAddr)> {
984 let genesis_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Genesis, true)?;
985 let genesis_ip = genesis_inventory[0].public_ip_addr;
986
987 let multiaddr = ssh_client
991 .run_command(
992 &genesis_ip,
993 "root",
994 "jq -r '.nodes[] | select(.peers_args.first == true) | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
995 false,
996 )
997 .map(|output| output.first().cloned())
998 .unwrap_or_else(|err| {
999 log::error!("Failed to find first node with quic-v1 protocol: {err:?}");
1000 None
1001 });
1002
1003 let multiaddr = match multiaddr {
1005 Some(addr) => addr,
1006 None => ssh_client
1007 .run_command(
1008 &genesis_ip,
1009 "root",
1010 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1011 false,
1012 )?
1013 .first()
1014 .cloned()
1015 .ok_or_else(|| Error::GenesisListenAddress)?,
1016 };
1017
1018 Ok((multiaddr, genesis_ip))
1019}
1020
1021pub fn get_anvil_node_data(
1022 ansible_runner: &AnsibleRunner,
1023 ssh_client: &SshClient,
1024) -> Result<AnvilNodeData> {
1025 let evm_inventory = ansible_runner.get_inventory(AnsibleInventoryType::EvmNodes, true)?;
1026 if evm_inventory.is_empty() {
1027 return Err(Error::EvmNodeNotFound);
1028 }
1029
1030 let evm_ip = evm_inventory[0].public_ip_addr;
1031 debug!("Retrieved IP address for EVM node: {evm_ip}");
1032 let csv_file_path = "/home/ant/.local/share/autonomi/evm_testnet_data.csv";
1033
1034 const MAX_ATTEMPTS: u8 = 5;
1035 const RETRY_DELAY: Duration = Duration::from_secs(5);
1036
1037 for attempt in 1..=MAX_ATTEMPTS {
1038 match ssh_client.run_command(&evm_ip, "ant", &format!("cat {}", csv_file_path), false) {
1039 Ok(output) => {
1040 if let Some(csv_contents) = output.first() {
1041 let parts: Vec<&str> = csv_contents.split(',').collect();
1042 if parts.len() != 4 {
1043 return Err(Error::EvmTestnetDataParsingError(
1044 "Expected 4 fields in the CSV".to_string(),
1045 ));
1046 }
1047
1048 let evm_testnet_data = AnvilNodeData {
1049 rpc_url: parts[0].trim().to_string(),
1050 payment_token_address: parts[1].trim().to_string(),
1051 data_payments_address: parts[2].trim().to_string(),
1052 deployer_wallet_private_key: parts[3].trim().to_string(),
1053 };
1054 return Ok(evm_testnet_data);
1055 }
1056 }
1057 Err(e) => {
1058 if attempt == MAX_ATTEMPTS {
1059 return Err(e);
1060 }
1061 println!(
1062 "Attempt {} failed to read EVM testnet data. Retrying in {} seconds...",
1063 attempt,
1064 RETRY_DELAY.as_secs()
1065 );
1066 }
1067 }
1068 std::thread::sleep(RETRY_DELAY);
1069 }
1070
1071 Err(Error::EvmTestnetDataNotFound)
1072}
1073
1074pub fn get_multiaddr(
1075 ansible_runner: &AnsibleRunner,
1076 ssh_client: &SshClient,
1077) -> Result<(String, IpAddr)> {
1078 let node_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Nodes, true)?;
1079 let node_ip = node_inventory
1082 .iter()
1083 .find(|vm| vm.name.ends_with("-node-1"))
1084 .ok_or_else(|| Error::NodeAddressNotFound)?
1085 .public_ip_addr;
1086
1087 debug!("Getting multiaddr from node {node_ip}");
1088
1089 let multiaddr =
1090 ssh_client
1091 .run_command(
1092 &node_ip,
1093 "root",
1094 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not)' /var/antctl/node_registry.json | head -n 1",
1096 false,
1097 )?.first()
1098 .cloned()
1099 .ok_or_else(|| Error::NodeAddressNotFound)?;
1100
1101 Ok((multiaddr, node_ip))
1104}
1105
1106pub async fn get_and_extract_archive_from_s3(
1107 s3_repository: &S3Repository,
1108 bucket_name: &str,
1109 archive_bucket_path: &str,
1110 dest_path: &Path,
1111) -> Result<()> {
1112 let archive_file_name = archive_bucket_path.split('/').last().unwrap();
1115 let archive_dest_path = dest_path.join(archive_file_name);
1116 s3_repository
1117 .download_object(bucket_name, archive_bucket_path, &archive_dest_path)
1118 .await?;
1119 extract_archive(&archive_dest_path, dest_path)?;
1120 Ok(())
1121}
1122
1123pub fn extract_archive(archive_path: &Path, dest_path: &Path) -> Result<()> {
1124 let archive_file = File::open(archive_path)?;
1125 let decoder = GzDecoder::new(archive_file);
1126 let mut archive = Archive::new(decoder);
1127 let entries = archive.entries()?;
1128 for entry_result in entries {
1129 let mut entry = entry_result?;
1130 let extract_path = dest_path.join(entry.path()?);
1131 if entry.header().entry_type() == tar::EntryType::Directory {
1132 std::fs::create_dir_all(extract_path)?;
1133 continue;
1134 }
1135 let mut file = BufWriter::new(File::create(extract_path)?);
1136 std::io::copy(&mut entry, &mut file)?;
1137 }
1138 std::fs::remove_file(archive_path)?;
1139 Ok(())
1140}
1141
1142pub fn run_external_command(
1143 binary_path: PathBuf,
1144 working_directory_path: PathBuf,
1145 args: Vec<String>,
1146 suppress_stdout: bool,
1147 suppress_stderr: bool,
1148) -> Result<Vec<String>> {
1149 let mut command = Command::new(binary_path.clone());
1150 for arg in &args {
1151 command.arg(arg);
1152 }
1153 command.stdout(Stdio::piped());
1154 command.stderr(Stdio::piped());
1155 command.current_dir(working_directory_path.clone());
1156 debug!("Running {binary_path:#?} with args {args:#?}");
1157 debug!("Working directory set to {working_directory_path:#?}");
1158
1159 let mut child = command.spawn()?;
1160 let mut output_lines = Vec::new();
1161
1162 if let Some(ref mut stdout) = child.stdout {
1163 let reader = BufReader::new(stdout);
1164 for line in reader.lines() {
1165 let line = line?;
1166 if !suppress_stdout {
1167 println!("{line}");
1168 }
1169 output_lines.push(line);
1170 }
1171 }
1172
1173 if let Some(ref mut stderr) = child.stderr {
1174 let reader = BufReader::new(stderr);
1175 for line in reader.lines() {
1176 let line = line?;
1177 if !suppress_stderr {
1178 eprintln!("{line}");
1179 }
1180 output_lines.push(line);
1181 }
1182 }
1183
1184 let output = child.wait()?;
1185 if !output.success() {
1186 let binary_path = binary_path.to_str().unwrap();
1188 return Err(Error::ExternalCommandRunFailed {
1189 binary: binary_path.to_string(),
1190 exit_status: output,
1191 });
1192 }
1193
1194 Ok(output_lines)
1195}
1196
1197pub fn is_binary_on_path(binary_name: &str) -> bool {
1198 if let Ok(path) = std::env::var("PATH") {
1199 for dir in path.split(':') {
1200 let mut full_path = PathBuf::from(dir);
1201 full_path.push(binary_name);
1202 if full_path.exists() {
1203 return true;
1204 }
1205 }
1206 }
1207 false
1208}
1209
1210pub async fn do_clean(
1211 name: &str,
1212 environment_details: Option<EnvironmentDetails>,
1213 working_directory_path: PathBuf,
1214 terraform_runner: &TerraformRunner,
1215 inventory_types: Option<Vec<AnsibleInventoryType>>,
1216) -> Result<()> {
1217 terraform_runner.init()?;
1218 let workspaces = terraform_runner.workspace_list()?;
1219 if !workspaces.contains(&name.to_string()) {
1220 return Err(Error::EnvironmentDoesNotExist(name.to_string()));
1221 }
1222 terraform_runner.workspace_select(name)?;
1223 println!("Selected {name} workspace");
1224
1225 let environment_details = environment_details.ok_or(Error::EnvironmentDetailsNotFound(
1226 "Should be provided during do_clean".to_string(),
1227 ))?;
1228
1229 let options =
1230 InfraRunOptions::generate_existing(name, terraform_runner, &environment_details).await?;
1231 let mut args = Vec::new();
1232 if let Some(full_cone_private_node_volume_size) = options.full_cone_private_node_volume_size {
1233 args.push((
1234 "full_cone_private_node_volume_size".to_string(),
1235 full_cone_private_node_volume_size.to_string(),
1236 ));
1237 }
1238 if let Some(genesis_node_volume_size) = options.genesis_node_volume_size {
1239 args.push((
1240 "genesis_node_volume_size".to_string(),
1241 genesis_node_volume_size.to_string(),
1242 ));
1243 }
1244 if let Some(node_volume_size) = options.node_volume_size {
1245 args.push(("node_volume_size".to_string(), node_volume_size.to_string()));
1246 }
1247 if let Some(peer_cache_node_volume_size) = options.peer_cache_node_volume_size {
1248 args.push((
1249 "peer_cache_node_volume_size".to_string(),
1250 peer_cache_node_volume_size.to_string(),
1251 ));
1252 }
1253 if let Some(symmetric_private_node_volume_size) = options.symmetric_private_node_volume_size {
1254 args.push((
1255 "symmetric_private_node_volume_size".to_string(),
1256 symmetric_private_node_volume_size.to_string(),
1257 ));
1258 }
1259
1260 terraform_runner.destroy(
1261 Some(args),
1262 Some(
1263 environment_details
1264 .environment_type
1265 .get_tfvars_filename(name),
1266 ),
1267 )?;
1268
1269 terraform_runner.workspace_select("dev")?;
1273 terraform_runner.workspace_delete(name)?;
1274 println!("Deleted {name} workspace");
1275
1276 cleanup_environment_inventory(
1277 name,
1278 &working_directory_path.join("ansible").join("inventory"),
1279 inventory_types,
1280 )?;
1281
1282 println!("Deleted Ansible inventory for {name}");
1283 Ok(())
1284}
1285
1286pub fn get_wallet_directory() -> Result<PathBuf> {
1287 Ok(dirs_next::data_dir()
1288 .ok_or_else(|| Error::CouldNotRetrieveDataDirectory)?
1289 .join("safe")
1290 .join("client")
1291 .join("wallet"))
1292}
1293
1294pub async fn notify_slack(inventory: DeploymentInventory) -> Result<()> {
1295 let webhook_url =
1296 std::env::var("SLACK_WEBHOOK_URL").map_err(|_| Error::SlackWebhookUrlNotSupplied)?;
1297
1298 let mut message = String::new();
1299 message.push_str("*Testnet Details*\n");
1300 message.push_str(&format!("Name: {}\n", inventory.name));
1301 message.push_str(&format!("Node count: {}\n", inventory.peers().len()));
1302 message.push_str(&format!("Faucet address: {:?}\n", inventory.faucet_address));
1303 match inventory.binary_option {
1304 BinaryOption::BuildFromSource {
1305 ref repo_owner,
1306 ref branch,
1307 ..
1308 } => {
1309 message.push_str("*Branch Details*\n");
1310 message.push_str(&format!("Repo owner: {}\n", repo_owner));
1311 message.push_str(&format!("Branch: {}\n", branch));
1312 }
1313 BinaryOption::Versioned {
1314 ant_version: ref safe_version,
1315 antnode_version: ref safenode_version,
1316 antctl_version: ref safenode_manager_version,
1317 ..
1318 } => {
1319 message.push_str("*Version Details*\n");
1320 message.push_str(&format!(
1321 "ant version: {}\n",
1322 safe_version
1323 .as_ref()
1324 .map_or("None".to_string(), |v| v.to_string())
1325 ));
1326 message.push_str(&format!("safenode version: {}\n", safenode_version));
1327 message.push_str(&format!("antctl version: {}\n", safenode_manager_version));
1328 }
1329 }
1330
1331 message.push_str("*Sample Peers*\n");
1332 message.push_str("```\n");
1333 for peer in inventory.peers().iter().take(20) {
1334 message.push_str(&format!("{peer}\n"));
1335 }
1336 message.push_str("```\n");
1337 message.push_str("*Available Files*\n");
1338 message.push_str("```\n");
1339 for (addr, file_name) in inventory.uploaded_files.iter() {
1340 message.push_str(&format!("{}: {}\n", addr, file_name))
1341 }
1342 message.push_str("```\n");
1343
1344 let payload = json!({
1345 "text": message,
1346 });
1347 reqwest::Client::new()
1348 .post(webhook_url)
1349 .json(&payload)
1350 .send()
1351 .await?;
1352 println!("{message}");
1353 println!("Posted notification to Slack");
1354 Ok(())
1355}
1356
1357fn print_duration(duration: Duration) {
1358 let total_seconds = duration.as_secs();
1359 let minutes = total_seconds / 60;
1360 let seconds = total_seconds % 60;
1361 debug!("Time taken: {} minutes and {} seconds", minutes, seconds);
1362}
1363
1364pub fn get_progress_bar(length: u64) -> Result<ProgressBar> {
1365 let progress_bar = ProgressBar::new(length);
1366 progress_bar.set_style(
1367 ProgressStyle::default_bar()
1368 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len}")?
1369 .progress_chars("#>-"),
1370 );
1371 progress_bar.enable_steady_tick(Duration::from_millis(100));
1372 Ok(progress_bar)
1373}
1374pub async fn get_environment_details(
1375 environment_name: &str,
1376 s3_repository: &S3Repository,
1377) -> Result<EnvironmentDetails> {
1378 let temp_file = tempfile::NamedTempFile::new()?;
1379
1380 let max_retries = 3;
1381 let mut retries = 0;
1382 let env_details = loop {
1383 debug!("Downloading the environment details file for {environment_name} from S3");
1384 match s3_repository
1385 .download_object("sn-environment-type", environment_name, temp_file.path())
1386 .await
1387 {
1388 Ok(_) => {
1389 debug!("Downloaded the environment details file for {environment_name} from S3");
1390 let content = match std::fs::read_to_string(temp_file.path()) {
1391 Ok(content) => content,
1392 Err(err) => {
1393 log::error!("Could not read the environment details file: {err:?}");
1394 if retries < max_retries {
1395 debug!("Retrying to read the environment details file");
1396 retries += 1;
1397 continue;
1398 } else {
1399 return Err(Error::EnvironmentDetailsNotFound(
1400 environment_name.to_string(),
1401 ));
1402 }
1403 }
1404 };
1405 trace!("Content of the environment details file: {}", content);
1406
1407 match serde_json::from_str(&content) {
1408 Ok(environment_details) => break environment_details,
1409 Err(err) => {
1410 log::error!("Could not parse the environment details file: {err:?}");
1411 if retries < max_retries {
1412 debug!("Retrying to parse the environment details file");
1413 retries += 1;
1414 continue;
1415 } else {
1416 return Err(Error::EnvironmentDetailsNotFound(
1417 environment_name.to_string(),
1418 ));
1419 }
1420 }
1421 }
1422 }
1423 Err(err) => {
1424 log::error!(
1425 "Could not download the environment details file for {environment_name} from S3: {err:?}"
1426 );
1427 if retries < max_retries {
1428 retries += 1;
1429 continue;
1430 } else {
1431 return Err(Error::EnvironmentDetailsNotFound(
1432 environment_name.to_string(),
1433 ));
1434 }
1435 }
1436 }
1437 };
1438
1439 debug!("Fetched environment details: {env_details:?}");
1440
1441 Ok(env_details)
1442}
1443
1444pub async fn write_environment_details(
1445 s3_repository: &S3Repository,
1446 environment_name: &str,
1447 environment_details: &EnvironmentDetails,
1448) -> Result<()> {
1449 let temp_dir = tempfile::tempdir()?;
1450 let path = temp_dir.path().to_path_buf().join(environment_name);
1451 let mut file = File::create(&path)?;
1452 let json = serde_json::to_string(environment_details)?;
1453 file.write_all(json.as_bytes())?;
1454 s3_repository
1455 .upload_file("sn-environment-type", &path, true)
1456 .await?;
1457 Ok(())
1458}
1459
1460pub fn calculate_size_per_attached_volume(node_count: u16) -> u16 {
1461 if node_count == 0 {
1462 return 0;
1463 }
1464 let total_volume_required = node_count * STORAGE_REQUIRED_PER_NODE;
1465
1466 (total_volume_required as f64 / 7.0).ceil() as u16
1468}
1469
1470pub fn get_bootstrap_cache_url(ip_addr: &IpAddr) -> String {
1471 format!("http://{ip_addr}/bootstrap_cache.json")
1472}