1pub mod ansible;
8pub mod bootstrap;
9pub mod clients;
10pub mod deploy;
11pub mod digital_ocean;
12pub mod error;
13pub mod funding;
14pub mod infra;
15pub mod inventory;
16pub mod logs;
17pub mod reserved_ip;
18pub mod rpc_client;
19pub mod s3;
20pub mod safe;
21pub mod setup;
22pub mod ssh;
23pub mod terraform;
24pub mod upscale;
25
26const STORAGE_REQUIRED_PER_NODE: u16 = 7;
27
28use crate::{
29 ansible::{
30 extra_vars::ExtraVarsDocBuilder,
31 inventory::{cleanup_environment_inventory, AnsibleInventoryType},
32 provisioning::AnsibleProvisioner,
33 AnsibleRunner,
34 },
35 error::{Error, Result},
36 inventory::{DeploymentInventory, VirtualMachine},
37 rpc_client::RpcClient,
38 s3::S3Repository,
39 ssh::SshClient,
40 terraform::TerraformRunner,
41};
42use ant_service_management::ServiceStatus;
43use flate2::read::GzDecoder;
44use indicatif::{ProgressBar, ProgressStyle};
45use infra::{build_terraform_args, InfraRunOptions};
46use log::{debug, trace};
47use semver::Version;
48use serde::{Deserialize, Serialize};
49use serde_json::json;
50use std::{
51 fs::File,
52 io::{BufRead, BufReader, BufWriter, Write},
53 net::IpAddr,
54 path::{Path, PathBuf},
55 process::{Command, Stdio},
56 str::FromStr,
57 time::Duration,
58};
59use tar::Archive;
60
61const ANSIBLE_DEFAULT_FORKS: usize = 50;
62
63#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
64pub enum DeploymentType {
65 Bootstrap,
67 Client,
69 #[default]
71 New,
72}
73
74#[derive(Debug, Clone, Default, Serialize, Deserialize)]
75pub struct AnvilNodeData {
76 pub data_payments_address: String,
77 pub deployer_wallet_private_key: String,
78 pub payment_token_address: String,
79 pub rpc_url: String,
80}
81
82impl std::fmt::Display for DeploymentType {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self {
85 DeploymentType::Bootstrap => write!(f, "bootstrap"),
86 DeploymentType::Client => write!(f, "clients"),
87 DeploymentType::New => write!(f, "new"),
88 }
89 }
90}
91
92impl std::str::FromStr for DeploymentType {
93 type Err = String;
94
95 fn from_str(s: &str) -> Result<Self, Self::Err> {
96 match s.to_lowercase().as_str() {
97 "bootstrap" => Ok(DeploymentType::Bootstrap),
98 "clients" => Ok(DeploymentType::Client),
99 "new" => Ok(DeploymentType::New),
100 _ => Err(format!("Invalid deployment type: {}", s)),
101 }
102 }
103}
104
105#[derive(Debug, Clone)]
106pub enum NodeType {
107 FullConePrivateNode,
108 Generic,
109 Genesis,
110 PeerCache,
111 SymmetricPrivateNode,
112}
113
114impl std::fmt::Display for NodeType {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 match self {
117 NodeType::FullConePrivateNode => write!(f, "full-cone-private"),
118 NodeType::Generic => write!(f, "generic"),
119 NodeType::Genesis => write!(f, "genesis"),
120 NodeType::PeerCache => write!(f, "peer-cache"),
121 NodeType::SymmetricPrivateNode => write!(f, "symmetric-private"),
122 }
123 }
124}
125
126impl std::str::FromStr for NodeType {
127 type Err = String;
128
129 fn from_str(s: &str) -> Result<Self, Self::Err> {
130 match s.to_lowercase().as_str() {
131 "full-cone-private" => Ok(NodeType::FullConePrivateNode),
132 "generic" => Ok(NodeType::Generic),
133 "genesis" => Ok(NodeType::Genesis),
134 "peer-cache" => Ok(NodeType::PeerCache),
135 "symmetric-private" => Ok(NodeType::SymmetricPrivateNode),
136 _ => Err(format!("Invalid node type: {}", s)),
137 }
138 }
139}
140
141impl NodeType {
142 pub fn telegraf_role(&self) -> &'static str {
143 match self {
144 NodeType::FullConePrivateNode => "NAT_FULL_CONE_NODE",
145 NodeType::Generic => "GENERIC_NODE",
146 NodeType::Genesis => "GENESIS_NODE",
147 NodeType::PeerCache => "PEER_CACHE_NODE",
148 NodeType::SymmetricPrivateNode => "NAT_RANDOMIZED_NODE",
149 }
150 }
151
152 pub fn to_ansible_inventory_type(&self) -> AnsibleInventoryType {
153 match self {
154 NodeType::FullConePrivateNode => AnsibleInventoryType::FullConePrivateNodes,
155 NodeType::Generic => AnsibleInventoryType::Nodes,
156 NodeType::Genesis => AnsibleInventoryType::Genesis,
157 NodeType::PeerCache => AnsibleInventoryType::PeerCacheNodes,
158 NodeType::SymmetricPrivateNode => AnsibleInventoryType::SymmetricPrivateNodes,
159 }
160 }
161}
162
163#[derive(Clone, Debug, Default, Eq, Serialize, Deserialize, PartialEq)]
164pub enum EvmNetwork {
165 #[default]
166 Anvil,
167 ArbitrumOne,
168 ArbitrumSepolia,
169 Custom,
170}
171
172impl std::fmt::Display for EvmNetwork {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 match self {
175 EvmNetwork::Anvil => write!(f, "evm-custom"),
176 EvmNetwork::ArbitrumOne => write!(f, "evm-arbitrum-one"),
177 EvmNetwork::ArbitrumSepolia => write!(f, "evm-arbitrum-sepolia"),
178 EvmNetwork::Custom => write!(f, "evm-custom"),
179 }
180 }
181}
182
183impl std::str::FromStr for EvmNetwork {
184 type Err = String;
185
186 fn from_str(s: &str) -> Result<Self, Self::Err> {
187 match s.to_lowercase().as_str() {
188 "anvil" => Ok(EvmNetwork::Anvil),
189 "arbitrum-one" => Ok(EvmNetwork::ArbitrumOne),
190 "arbitrum-sepolia" => Ok(EvmNetwork::ArbitrumSepolia),
191 "custom" => Ok(EvmNetwork::Custom),
192 _ => Err(format!("Invalid EVM network type: {}", s)),
193 }
194 }
195}
196
197#[derive(Clone, Debug, Default, Serialize, Deserialize)]
198pub struct EvmDetails {
199 pub network: EvmNetwork,
200 pub data_payments_address: Option<String>,
201 pub payment_token_address: Option<String>,
202 pub rpc_url: Option<String>,
203}
204
205#[derive(Clone, Debug, Default, Serialize, Deserialize)]
206pub struct EnvironmentDetails {
207 pub deployment_type: DeploymentType,
208 pub environment_type: EnvironmentType,
209 pub evm_details: EvmDetails,
210 pub funding_wallet_address: Option<String>,
211 pub network_id: Option<u8>,
212 pub rewards_address: Option<String>,
213}
214
215#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
216pub enum EnvironmentType {
217 #[default]
218 Development,
219 Production,
220 Staging,
221}
222
223impl EnvironmentType {
224 pub fn get_tfvars_filename(&self, name: &str) -> String {
225 match self {
226 EnvironmentType::Development => "dev.tfvars".to_string(),
227 EnvironmentType::Staging => "staging.tfvars".to_string(),
228 EnvironmentType::Production => {
229 format!("{name}.tfvars",)
230 }
231 }
232 }
233
234 pub fn get_default_peer_cache_node_count(&self) -> u16 {
235 match self {
236 EnvironmentType::Development => 5,
237 EnvironmentType::Production => 5,
238 EnvironmentType::Staging => 5,
239 }
240 }
241
242 pub fn get_default_node_count(&self) -> u16 {
243 match self {
244 EnvironmentType::Development => 25,
245 EnvironmentType::Production => 25,
246 EnvironmentType::Staging => 25,
247 }
248 }
249
250 pub fn get_default_symmetric_private_node_count(&self) -> u16 {
251 self.get_default_node_count()
252 }
253
254 pub fn get_default_full_cone_private_node_count(&self) -> u16 {
255 self.get_default_node_count()
256 }
257}
258
259impl std::fmt::Display for EnvironmentType {
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 match self {
262 EnvironmentType::Development => write!(f, "development"),
263 EnvironmentType::Production => write!(f, "production"),
264 EnvironmentType::Staging => write!(f, "staging"),
265 }
266 }
267}
268
269impl FromStr for EnvironmentType {
270 type Err = Error;
271
272 fn from_str(s: &str) -> Result<Self, Self::Err> {
273 match s.to_lowercase().as_str() {
274 "development" => Ok(EnvironmentType::Development),
275 "production" => Ok(EnvironmentType::Production),
276 "staging" => Ok(EnvironmentType::Staging),
277 _ => Err(Error::EnvironmentNameFromStringError(s.to_string())),
278 }
279 }
280}
281
282#[derive(Clone, Debug, Serialize, Deserialize)]
296pub enum BinaryOption {
297 BuildFromSource {
299 antnode_features: Option<String>,
301 branch: String,
302 repo_owner: String,
303 },
304 Versioned {
306 ant_version: Option<Version>,
307 antctl_version: Option<Version>,
308 antnode_version: Option<Version>,
309 },
310}
311
312impl BinaryOption {
313 pub fn print(&self) {
314 match self {
315 BinaryOption::BuildFromSource {
316 antnode_features,
317 branch,
318 repo_owner,
319 } => {
320 println!("Source configuration:");
321 println!(" Repository owner: {}", repo_owner);
322 println!(" Branch: {}", branch);
323 if let Some(features) = antnode_features {
324 println!(" Antnode features: {}", features);
325 }
326 }
327 BinaryOption::Versioned {
328 ant_version,
329 antctl_version,
330 antnode_version,
331 } => {
332 println!("Versioned binaries configuration:");
333 if let Some(version) = ant_version {
334 println!(" ant version: {}", version);
335 }
336 if let Some(version) = antctl_version {
337 println!(" antctl version: {}", version);
338 }
339 if let Some(version) = antnode_version {
340 println!(" antnode version: {}", version);
341 }
342 }
343 }
344 }
345}
346
347#[derive(Debug, Clone, Copy)]
348pub enum CloudProvider {
349 Aws,
350 DigitalOcean,
351}
352
353impl std::fmt::Display for CloudProvider {
354 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
355 match self {
356 CloudProvider::Aws => write!(f, "aws"),
357 CloudProvider::DigitalOcean => write!(f, "digital-ocean"),
358 }
359 }
360}
361
362impl CloudProvider {
363 pub fn get_ssh_user(&self) -> String {
364 match self {
365 CloudProvider::Aws => "ubuntu".to_string(),
366 CloudProvider::DigitalOcean => "root".to_string(),
367 }
368 }
369}
370
371#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
372pub enum LogFormat {
373 Default,
374 Json,
375}
376
377impl LogFormat {
378 pub fn parse_from_str(val: &str) -> Result<Self> {
379 match val {
380 "default" => Ok(LogFormat::Default),
381 "json" => Ok(LogFormat::Json),
382 _ => Err(Error::LoggingConfiguration(
383 "The only valid values for this argument are \"default\" or \"json\"".to_string(),
384 )),
385 }
386 }
387
388 pub fn as_str(&self) -> &'static str {
389 match self {
390 LogFormat::Default => "default",
391 LogFormat::Json => "json",
392 }
393 }
394}
395
396#[derive(Clone)]
397pub struct UpgradeOptions {
398 pub ansible_verbose: bool,
399 pub custom_inventory: Option<Vec<VirtualMachine>>,
400 pub env_variables: Option<Vec<(String, String)>>,
401 pub force: bool,
402 pub forks: usize,
403 pub interval: Duration,
404 pub name: String,
405 pub node_type: Option<NodeType>,
406 pub pre_upgrade_delay: Option<u64>,
407 pub provider: CloudProvider,
408 pub version: Option<String>,
409}
410
411impl UpgradeOptions {
412 pub fn get_ansible_vars(&self) -> String {
413 let mut extra_vars = ExtraVarsDocBuilder::default();
414 extra_vars.add_variable("interval", &self.interval.as_millis().to_string());
415 if let Some(env_variables) = &self.env_variables {
416 extra_vars.add_env_variable_list("env_variables", env_variables.clone());
417 }
418 if self.force {
419 extra_vars.add_variable("force", &self.force.to_string());
420 }
421 if let Some(version) = &self.version {
422 extra_vars.add_variable("antnode_version", version);
423 }
424 if let Some(pre_upgrade_delay) = &self.pre_upgrade_delay {
425 extra_vars.add_variable("pre_upgrade_delay", &pre_upgrade_delay.to_string());
426 }
427 extra_vars.build()
428 }
429}
430
431#[derive(Default)]
432pub struct TestnetDeployBuilder {
433 ansible_forks: Option<usize>,
434 ansible_verbose_mode: bool,
435 deployment_type: EnvironmentType,
436 environment_name: String,
437 provider: Option<CloudProvider>,
438 ssh_secret_key_path: Option<PathBuf>,
439 state_bucket_name: Option<String>,
440 terraform_binary_path: Option<PathBuf>,
441 vault_password_path: Option<PathBuf>,
442 working_directory_path: Option<PathBuf>,
443}
444
445impl TestnetDeployBuilder {
446 pub fn new() -> Self {
447 Default::default()
448 }
449
450 pub fn ansible_verbose_mode(&mut self, ansible_verbose_mode: bool) -> &mut Self {
451 self.ansible_verbose_mode = ansible_verbose_mode;
452 self
453 }
454
455 pub fn ansible_forks(&mut self, ansible_forks: usize) -> &mut Self {
456 self.ansible_forks = Some(ansible_forks);
457 self
458 }
459
460 pub fn deployment_type(&mut self, deployment_type: EnvironmentType) -> &mut Self {
461 self.deployment_type = deployment_type;
462 self
463 }
464
465 pub fn environment_name(&mut self, name: &str) -> &mut Self {
466 self.environment_name = name.to_string();
467 self
468 }
469
470 pub fn provider(&mut self, provider: CloudProvider) -> &mut Self {
471 self.provider = Some(provider);
472 self
473 }
474
475 pub fn state_bucket_name(&mut self, state_bucket_name: String) -> &mut Self {
476 self.state_bucket_name = Some(state_bucket_name);
477 self
478 }
479
480 pub fn terraform_binary_path(&mut self, terraform_binary_path: PathBuf) -> &mut Self {
481 self.terraform_binary_path = Some(terraform_binary_path);
482 self
483 }
484
485 pub fn working_directory(&mut self, working_directory_path: PathBuf) -> &mut Self {
486 self.working_directory_path = Some(working_directory_path);
487 self
488 }
489
490 pub fn ssh_secret_key_path(&mut self, ssh_secret_key_path: PathBuf) -> &mut Self {
491 self.ssh_secret_key_path = Some(ssh_secret_key_path);
492 self
493 }
494
495 pub fn vault_password_path(&mut self, vault_password_path: PathBuf) -> &mut Self {
496 self.vault_password_path = Some(vault_password_path);
497 self
498 }
499
500 pub fn build(&self) -> Result<TestnetDeployer> {
501 let provider = self.provider.unwrap_or(CloudProvider::DigitalOcean);
502 match provider {
503 CloudProvider::DigitalOcean => {
504 let digital_ocean_pat = std::env::var("DO_PAT").map_err(|_| {
505 Error::CloudProviderCredentialsNotSupplied("DO_PAT".to_string())
506 })?;
507 std::env::set_var("DIGITALOCEAN_TOKEN", digital_ocean_pat.clone());
511 std::env::set_var("DO_API_TOKEN", digital_ocean_pat);
512 }
513 _ => {
514 return Err(Error::CloudProviderNotSupported(provider.to_string()));
515 }
516 }
517
518 let state_bucket_name = match self.state_bucket_name {
519 Some(ref bucket_name) => bucket_name.clone(),
520 None => std::env::var("TERRAFORM_STATE_BUCKET_NAME")?,
521 };
522
523 let default_terraform_bin_path = PathBuf::from("terraform");
524 let terraform_binary_path = self
525 .terraform_binary_path
526 .as_ref()
527 .unwrap_or(&default_terraform_bin_path);
528
529 let working_directory_path = match self.working_directory_path {
530 Some(ref work_dir_path) => work_dir_path.clone(),
531 None => std::env::current_dir()?.join("resources"),
532 };
533
534 let ssh_secret_key_path = match self.ssh_secret_key_path {
535 Some(ref ssh_sk_path) => ssh_sk_path.clone(),
536 None => PathBuf::from(std::env::var("SSH_KEY_PATH")?),
537 };
538
539 let vault_password_path = match self.vault_password_path {
540 Some(ref vault_pw_path) => vault_pw_path.clone(),
541 None => PathBuf::from(std::env::var("ANSIBLE_VAULT_PASSWORD_PATH")?),
542 };
543
544 let terraform_runner = TerraformRunner::new(
545 terraform_binary_path.to_path_buf(),
546 working_directory_path
547 .join("terraform")
548 .join("testnet")
549 .join(provider.to_string()),
550 provider,
551 &state_bucket_name,
552 )?;
553 let ansible_runner = AnsibleRunner::new(
554 self.ansible_forks.unwrap_or(ANSIBLE_DEFAULT_FORKS),
555 self.ansible_verbose_mode,
556 &self.environment_name,
557 provider,
558 ssh_secret_key_path.clone(),
559 vault_password_path,
560 working_directory_path.join("ansible"),
561 )?;
562 let ssh_client = SshClient::new(ssh_secret_key_path);
563 let ansible_provisioner =
564 AnsibleProvisioner::new(ansible_runner, provider, ssh_client.clone());
565 let rpc_client = RpcClient::new(
566 PathBuf::from("/usr/local/bin/safenode_rpc_client"),
567 working_directory_path.clone(),
568 );
569
570 let safe_path = working_directory_path.join("safe");
573 if safe_path.exists() {
574 std::fs::remove_file(safe_path)?;
575 }
576
577 let testnet = TestnetDeployer::new(
578 ansible_provisioner,
579 provider,
580 self.deployment_type.clone(),
581 &self.environment_name,
582 rpc_client,
583 S3Repository {},
584 ssh_client,
585 terraform_runner,
586 working_directory_path,
587 )?;
588
589 Ok(testnet)
590 }
591}
592
593#[derive(Clone)]
594pub struct TestnetDeployer {
595 pub ansible_provisioner: AnsibleProvisioner,
596 pub cloud_provider: CloudProvider,
597 pub deployment_type: EnvironmentType,
598 pub environment_name: String,
599 pub inventory_file_path: PathBuf,
600 pub rpc_client: RpcClient,
601 pub s3_repository: S3Repository,
602 pub ssh_client: SshClient,
603 pub terraform_runner: TerraformRunner,
604 pub working_directory_path: PathBuf,
605}
606
607impl TestnetDeployer {
608 #[allow(clippy::too_many_arguments)]
609 pub fn new(
610 ansible_provisioner: AnsibleProvisioner,
611 cloud_provider: CloudProvider,
612 deployment_type: EnvironmentType,
613 environment_name: &str,
614 rpc_client: RpcClient,
615 s3_repository: S3Repository,
616 ssh_client: SshClient,
617 terraform_runner: TerraformRunner,
618 working_directory_path: PathBuf,
619 ) -> Result<TestnetDeployer> {
620 if environment_name.is_empty() {
621 return Err(Error::EnvironmentNameRequired);
622 }
623 let inventory_file_path = working_directory_path
624 .join("ansible")
625 .join("inventory")
626 .join("dev_inventory_digital_ocean.yml");
627 Ok(TestnetDeployer {
628 ansible_provisioner,
629 cloud_provider,
630 deployment_type,
631 environment_name: environment_name.to_string(),
632 inventory_file_path,
633 rpc_client,
634 ssh_client,
635 s3_repository,
636 terraform_runner,
637 working_directory_path,
638 })
639 }
640
641 pub async fn init(&self) -> Result<()> {
642 if self
643 .s3_repository
644 .folder_exists(
645 "sn-testnet",
646 &format!("testnet-logs/{}", self.environment_name),
647 )
648 .await?
649 {
650 return Err(Error::LogsForPreviousTestnetExist(
651 self.environment_name.clone(),
652 ));
653 }
654
655 self.terraform_runner.init()?;
656 let workspaces = self.terraform_runner.workspace_list()?;
657 if !workspaces.contains(&self.environment_name) {
658 self.terraform_runner
659 .workspace_new(&self.environment_name)?;
660 } else {
661 println!("Workspace {} already exists", self.environment_name);
662 }
663
664 let rpc_client_path = self.working_directory_path.join("safenode_rpc_client");
665 if !rpc_client_path.is_file() {
666 println!("Downloading the rpc client for safenode...");
667 let archive_name = "safenode_rpc_client-latest-x86_64-unknown-linux-musl.tar.gz";
668 get_and_extract_archive_from_s3(
669 &self.s3_repository,
670 "sn-node-rpc-client",
671 archive_name,
672 &self.working_directory_path,
673 )
674 .await?;
675 #[cfg(unix)]
676 {
677 use std::os::unix::fs::PermissionsExt;
678 let mut permissions = std::fs::metadata(&rpc_client_path)?.permissions();
679 permissions.set_mode(0o755); std::fs::set_permissions(&rpc_client_path, permissions)?;
681 }
682 }
683
684 Ok(())
685 }
686
687 pub fn plan(&self, options: &InfraRunOptions) -> Result<()> {
688 println!("Selecting {} workspace...", options.name);
689 self.terraform_runner.workspace_select(&options.name)?;
690
691 let args = build_terraform_args(options)?;
692
693 self.terraform_runner
694 .plan(Some(args), options.tfvars_filename.clone())?;
695 Ok(())
696 }
697
698 pub fn start(
699 &self,
700 interval: Duration,
701 node_type: Option<NodeType>,
702 custom_inventory: Option<Vec<VirtualMachine>>,
703 ) -> Result<()> {
704 self.ansible_provisioner.start_nodes(
705 &self.environment_name,
706 interval,
707 node_type,
708 custom_inventory,
709 )?;
710 Ok(())
711 }
712
713 pub fn status(&self) -> Result<()> {
719 self.ansible_provisioner.status()?;
720
721 let peer_cache_node_registries = self
722 .ansible_provisioner
723 .get_node_registries(&AnsibleInventoryType::PeerCacheNodes)?;
724 let generic_node_registries = self
725 .ansible_provisioner
726 .get_node_registries(&AnsibleInventoryType::Nodes)?;
727 let symmetric_private_node_registries = self
728 .ansible_provisioner
729 .get_node_registries(&AnsibleInventoryType::SymmetricPrivateNodes)?;
730 let full_cone_private_node_registries = self
731 .ansible_provisioner
732 .get_node_registries(&AnsibleInventoryType::FullConePrivateNodes)?;
733 let genesis_node_registry = self
734 .ansible_provisioner
735 .get_node_registries(&AnsibleInventoryType::Genesis)?
736 .clone();
737
738 peer_cache_node_registries.print();
739 generic_node_registries.print();
740 symmetric_private_node_registries.print();
741 full_cone_private_node_registries.print();
742 genesis_node_registry.print();
743
744 let all_registries = [
745 &peer_cache_node_registries,
746 &generic_node_registries,
747 &symmetric_private_node_registries,
748 &full_cone_private_node_registries,
749 &genesis_node_registry,
750 ];
751
752 let mut total_nodes = 0;
753 let mut running_nodes = 0;
754 let mut stopped_nodes = 0;
755 let mut added_nodes = 0;
756 let mut removed_nodes = 0;
757
758 for (_, registry) in all_registries
759 .iter()
760 .flat_map(|r| r.retrieved_registries.iter())
761 {
762 for node in registry.nodes.iter() {
763 total_nodes += 1;
764 match node.status {
765 ServiceStatus::Running => running_nodes += 1,
766 ServiceStatus::Stopped => stopped_nodes += 1,
767 ServiceStatus::Added => added_nodes += 1,
768 ServiceStatus::Removed => removed_nodes += 1,
769 }
770 }
771 }
772
773 let peer_cache_hosts = peer_cache_node_registries.retrieved_registries.len();
774 let generic_hosts = generic_node_registries.retrieved_registries.len();
775 let symmetric_private_hosts = symmetric_private_node_registries.retrieved_registries.len();
776 let full_cone_private_hosts = full_cone_private_node_registries.retrieved_registries.len();
777
778 let peer_cache_nodes = peer_cache_node_registries
779 .retrieved_registries
780 .iter()
781 .flat_map(|(_, n)| n.nodes.iter())
782 .count();
783 let generic_nodes = generic_node_registries
784 .retrieved_registries
785 .iter()
786 .flat_map(|(_, n)| n.nodes.iter())
787 .count();
788 let symmetric_private_nodes = symmetric_private_node_registries
789 .retrieved_registries
790 .iter()
791 .flat_map(|(_, n)| n.nodes.iter())
792 .count();
793 let full_cone_private_nodes = full_cone_private_node_registries
794 .retrieved_registries
795 .iter()
796 .flat_map(|(_, n)| n.nodes.iter())
797 .count();
798
799 println!("-------");
800 println!("Summary");
801 println!("-------");
802 println!(
803 "Total peer cache nodes ({}x{}): {}",
804 peer_cache_hosts,
805 if peer_cache_hosts > 0 {
806 peer_cache_nodes / peer_cache_hosts
807 } else {
808 0
809 },
810 peer_cache_nodes
811 );
812 println!(
813 "Total generic nodes ({}x{}): {}",
814 generic_hosts,
815 if generic_hosts > 0 {
816 generic_nodes / generic_hosts
817 } else {
818 0
819 },
820 generic_nodes
821 );
822 println!(
823 "Total symmetric private nodes ({}x{}): {}",
824 symmetric_private_hosts,
825 if symmetric_private_hosts > 0 {
826 symmetric_private_nodes / symmetric_private_hosts
827 } else {
828 0
829 },
830 symmetric_private_nodes
831 );
832 println!(
833 "Total full cone private nodes ({}x{}): {}",
834 full_cone_private_hosts,
835 if full_cone_private_hosts > 0 {
836 full_cone_private_nodes / full_cone_private_hosts
837 } else {
838 0
839 },
840 full_cone_private_nodes
841 );
842 println!("Total nodes: {}", total_nodes);
843 println!("Running nodes: {}", running_nodes);
844 println!("Stopped nodes: {}", stopped_nodes);
845 println!("Added nodes: {}", added_nodes);
846 println!("Removed nodes: {}", removed_nodes);
847
848 Ok(())
849 }
850
851 pub fn cleanup_node_logs(&self, setup_cron: bool) -> Result<()> {
852 self.ansible_provisioner.cleanup_node_logs(setup_cron)?;
853 Ok(())
854 }
855
856 pub fn start_telegraf(
857 &self,
858 node_type: Option<NodeType>,
859 custom_inventory: Option<Vec<VirtualMachine>>,
860 ) -> Result<()> {
861 self.ansible_provisioner.start_telegraf(
862 &self.environment_name,
863 node_type,
864 custom_inventory,
865 )?;
866 Ok(())
867 }
868
869 pub fn stop(
870 &self,
871 interval: Duration,
872 node_type: Option<NodeType>,
873 custom_inventory: Option<Vec<VirtualMachine>>,
874 delay: Option<u64>,
875 service_names: Option<Vec<String>>,
876 ) -> Result<()> {
877 self.ansible_provisioner.stop_nodes(
878 &self.environment_name,
879 interval,
880 node_type,
881 custom_inventory,
882 delay,
883 service_names,
884 )?;
885 Ok(())
886 }
887
888 pub fn stop_telegraf(
889 &self,
890 node_type: Option<NodeType>,
891 custom_inventory: Option<Vec<VirtualMachine>>,
892 ) -> Result<()> {
893 self.ansible_provisioner.stop_telegraf(
894 &self.environment_name,
895 node_type,
896 custom_inventory,
897 )?;
898 Ok(())
899 }
900
901 pub fn upgrade(&self, options: UpgradeOptions) -> Result<()> {
902 self.ansible_provisioner.upgrade_nodes(&options)?;
903 Ok(())
904 }
905
906 pub fn upgrade_antctl(
907 &self,
908 version: Version,
909 node_type: Option<NodeType>,
910 custom_inventory: Option<Vec<VirtualMachine>>,
911 ) -> Result<()> {
912 self.ansible_provisioner.upgrade_antctl(
913 &self.environment_name,
914 &version,
915 node_type,
916 custom_inventory,
917 )?;
918 Ok(())
919 }
920
921 pub fn upgrade_node_telegraf(&self, name: &str) -> Result<()> {
922 self.ansible_provisioner.upgrade_node_telegraf(name)?;
923 Ok(())
924 }
925
926 pub fn upgrade_client_telegraf(&self, name: &str) -> Result<()> {
927 self.ansible_provisioner.upgrade_client_telegraf(name)?;
928 Ok(())
929 }
930
931 pub async fn clean(&self) -> Result<()> {
932 let environment_details =
933 get_environment_details(&self.environment_name, &self.s3_repository)
934 .await
935 .inspect_err(|err| {
936 println!("Failed to get environment details: {err}. Continuing cleanup...");
937 })
938 .ok();
939 if let Some(environment_details) = &environment_details {
940 funding::drain_funds(&self.ansible_provisioner, environment_details).await?;
941 }
942
943 self.destroy_infra(environment_details).await?;
944
945 cleanup_environment_inventory(
946 &self.environment_name,
947 &self
948 .working_directory_path
949 .join("ansible")
950 .join("inventory"),
951 None,
952 )?;
953
954 println!("Deleted Ansible inventory for {}", self.environment_name);
955
956 if let Err(err) = self
957 .s3_repository
958 .delete_object("sn-environment-type", &self.environment_name)
959 .await
960 {
961 println!("Failed to delete environment type: {err}. Continuing cleanup...");
962 }
963 Ok(())
964 }
965
966 async fn destroy_infra(&self, environment_details: Option<EnvironmentDetails>) -> Result<()> {
967 infra::select_workspace(&self.terraform_runner, &self.environment_name)?;
968
969 let options = InfraRunOptions::generate_existing(
970 &self.environment_name,
971 &self.terraform_runner,
972 environment_details.as_ref(),
973 )
974 .await?;
975
976 let args = build_terraform_args(&options)?;
977
978 self.terraform_runner.destroy(Some(args), None)?;
979
980 infra::delete_workspace(&self.terraform_runner, &self.environment_name)?;
981
982 Ok(())
983 }
984}
985
986pub fn get_genesis_multiaddr(
991 ansible_runner: &AnsibleRunner,
992 ssh_client: &SshClient,
993) -> Result<(String, IpAddr)> {
994 let genesis_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Genesis, true)?;
995 let genesis_ip = genesis_inventory[0].public_ip_addr;
996
997 let multiaddr = ssh_client
1001 .run_command(
1002 &genesis_ip,
1003 "root",
1004 "jq -r '.nodes[] | select(.peers_args.first == true) | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1005 false,
1006 )
1007 .map(|output| output.first().cloned())
1008 .unwrap_or_else(|err| {
1009 log::error!("Failed to find first node with quic-v1 protocol: {err:?}");
1010 None
1011 });
1012
1013 let multiaddr = match multiaddr {
1015 Some(addr) => addr,
1016 None => ssh_client
1017 .run_command(
1018 &genesis_ip,
1019 "root",
1020 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1021 false,
1022 )?
1023 .first()
1024 .cloned()
1025 .ok_or_else(|| Error::GenesisListenAddress)?,
1026 };
1027
1028 Ok((multiaddr, genesis_ip))
1029}
1030
1031pub fn get_anvil_node_data(
1032 ansible_runner: &AnsibleRunner,
1033 ssh_client: &SshClient,
1034) -> Result<AnvilNodeData> {
1035 let evm_inventory = ansible_runner.get_inventory(AnsibleInventoryType::EvmNodes, true)?;
1036 if evm_inventory.is_empty() {
1037 return Err(Error::EvmNodeNotFound);
1038 }
1039
1040 let evm_ip = evm_inventory[0].public_ip_addr;
1041 debug!("Retrieved IP address for EVM node: {evm_ip}");
1042 let csv_file_path = "/home/ant/.local/share/autonomi/evm_testnet_data.csv";
1043
1044 const MAX_ATTEMPTS: u8 = 5;
1045 const RETRY_DELAY: Duration = Duration::from_secs(5);
1046
1047 for attempt in 1..=MAX_ATTEMPTS {
1048 match ssh_client.run_command(&evm_ip, "ant", &format!("cat {}", csv_file_path), false) {
1049 Ok(output) => {
1050 if let Some(csv_contents) = output.first() {
1051 let parts: Vec<&str> = csv_contents.split(',').collect();
1052 if parts.len() != 4 {
1053 return Err(Error::EvmTestnetDataParsingError(
1054 "Expected 4 fields in the CSV".to_string(),
1055 ));
1056 }
1057
1058 let evm_testnet_data = AnvilNodeData {
1059 rpc_url: parts[0].trim().to_string(),
1060 payment_token_address: parts[1].trim().to_string(),
1061 data_payments_address: parts[2].trim().to_string(),
1062 deployer_wallet_private_key: parts[3].trim().to_string(),
1063 };
1064 return Ok(evm_testnet_data);
1065 }
1066 }
1067 Err(e) => {
1068 if attempt == MAX_ATTEMPTS {
1069 return Err(e);
1070 }
1071 println!(
1072 "Attempt {} failed to read EVM testnet data. Retrying in {} seconds...",
1073 attempt,
1074 RETRY_DELAY.as_secs()
1075 );
1076 }
1077 }
1078 std::thread::sleep(RETRY_DELAY);
1079 }
1080
1081 Err(Error::EvmTestnetDataNotFound)
1082}
1083
1084pub fn get_multiaddr(
1085 ansible_runner: &AnsibleRunner,
1086 ssh_client: &SshClient,
1087) -> Result<(String, IpAddr)> {
1088 let node_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Nodes, true)?;
1089 let node_ip = node_inventory
1092 .iter()
1093 .find(|vm| vm.name.ends_with("-node-1"))
1094 .ok_or_else(|| Error::NodeAddressNotFound)?
1095 .public_ip_addr;
1096
1097 debug!("Getting multiaddr from node {node_ip}");
1098
1099 let multiaddr =
1100 ssh_client
1101 .run_command(
1102 &node_ip,
1103 "root",
1104 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not)' /var/antctl/node_registry.json | head -n 1",
1106 false,
1107 )?.first()
1108 .cloned()
1109 .ok_or_else(|| Error::NodeAddressNotFound)?;
1110
1111 Ok((multiaddr, node_ip))
1114}
1115
1116pub async fn get_and_extract_archive_from_s3(
1117 s3_repository: &S3Repository,
1118 bucket_name: &str,
1119 archive_bucket_path: &str,
1120 dest_path: &Path,
1121) -> Result<()> {
1122 let archive_file_name = archive_bucket_path.split('/').last().unwrap();
1125 let archive_dest_path = dest_path.join(archive_file_name);
1126 s3_repository
1127 .download_object(bucket_name, archive_bucket_path, &archive_dest_path)
1128 .await?;
1129 extract_archive(&archive_dest_path, dest_path)?;
1130 Ok(())
1131}
1132
1133pub fn extract_archive(archive_path: &Path, dest_path: &Path) -> Result<()> {
1134 let archive_file = File::open(archive_path)?;
1135 let decoder = GzDecoder::new(archive_file);
1136 let mut archive = Archive::new(decoder);
1137 let entries = archive.entries()?;
1138 for entry_result in entries {
1139 let mut entry = entry_result?;
1140 let extract_path = dest_path.join(entry.path()?);
1141 if entry.header().entry_type() == tar::EntryType::Directory {
1142 std::fs::create_dir_all(extract_path)?;
1143 continue;
1144 }
1145 let mut file = BufWriter::new(File::create(extract_path)?);
1146 std::io::copy(&mut entry, &mut file)?;
1147 }
1148 std::fs::remove_file(archive_path)?;
1149 Ok(())
1150}
1151
1152pub fn run_external_command(
1153 binary_path: PathBuf,
1154 working_directory_path: PathBuf,
1155 args: Vec<String>,
1156 suppress_stdout: bool,
1157 suppress_stderr: bool,
1158) -> Result<Vec<String>> {
1159 let mut command = Command::new(binary_path.clone());
1160 for arg in &args {
1161 command.arg(arg);
1162 }
1163 command.stdout(Stdio::piped());
1164 command.stderr(Stdio::piped());
1165 command.current_dir(working_directory_path.clone());
1166 debug!("Running {binary_path:#?} with args {args:#?}");
1167 debug!("Working directory set to {working_directory_path:#?}");
1168
1169 let mut child = command.spawn()?;
1170 let mut output_lines = Vec::new();
1171
1172 if let Some(ref mut stdout) = child.stdout {
1173 let reader = BufReader::new(stdout);
1174 for line in reader.lines() {
1175 let line = line?;
1176 if !suppress_stdout {
1177 println!("{line}");
1178 }
1179 output_lines.push(line);
1180 }
1181 }
1182
1183 if let Some(ref mut stderr) = child.stderr {
1184 let reader = BufReader::new(stderr);
1185 for line in reader.lines() {
1186 let line = line?;
1187 if !suppress_stderr {
1188 eprintln!("{line}");
1189 }
1190 output_lines.push(line);
1191 }
1192 }
1193
1194 let output = child.wait()?;
1195 if !output.success() {
1196 let binary_path = binary_path.to_str().unwrap();
1198 return Err(Error::ExternalCommandRunFailed {
1199 binary: binary_path.to_string(),
1200 exit_status: output,
1201 });
1202 }
1203
1204 Ok(output_lines)
1205}
1206
1207pub fn is_binary_on_path(binary_name: &str) -> bool {
1208 if let Ok(path) = std::env::var("PATH") {
1209 for dir in path.split(':') {
1210 let mut full_path = PathBuf::from(dir);
1211 full_path.push(binary_name);
1212 if full_path.exists() {
1213 return true;
1214 }
1215 }
1216 }
1217 false
1218}
1219
1220pub fn get_wallet_directory() -> Result<PathBuf> {
1221 Ok(dirs_next::data_dir()
1222 .ok_or_else(|| Error::CouldNotRetrieveDataDirectory)?
1223 .join("safe")
1224 .join("client")
1225 .join("wallet"))
1226}
1227
1228pub async fn notify_slack(inventory: DeploymentInventory) -> Result<()> {
1229 let webhook_url =
1230 std::env::var("SLACK_WEBHOOK_URL").map_err(|_| Error::SlackWebhookUrlNotSupplied)?;
1231
1232 let mut message = String::new();
1233 message.push_str("*Testnet Details*\n");
1234 message.push_str(&format!("Name: {}\n", inventory.name));
1235 message.push_str(&format!("Node count: {}\n", inventory.peers().len()));
1236 message.push_str(&format!("Faucet address: {:?}\n", inventory.faucet_address));
1237 match inventory.binary_option {
1238 BinaryOption::BuildFromSource {
1239 ref repo_owner,
1240 ref branch,
1241 ..
1242 } => {
1243 message.push_str("*Branch Details*\n");
1244 message.push_str(&format!("Repo owner: {}\n", repo_owner));
1245 message.push_str(&format!("Branch: {}\n", branch));
1246 }
1247 BinaryOption::Versioned {
1248 ant_version: ref safe_version,
1249 antnode_version: ref safenode_version,
1250 antctl_version: ref safenode_manager_version,
1251 ..
1252 } => {
1253 message.push_str("*Version Details*\n");
1254 message.push_str(&format!(
1255 "ant version: {}\n",
1256 safe_version
1257 .as_ref()
1258 .map_or("None".to_string(), |v| v.to_string())
1259 ));
1260 message.push_str(&format!(
1261 "safenode version: {}\n",
1262 safenode_version
1263 .as_ref()
1264 .map_or("None".to_string(), |v| v.to_string())
1265 ));
1266 message.push_str(&format!(
1267 "antctl version: {}\n",
1268 safenode_manager_version
1269 .as_ref()
1270 .map_or("None".to_string(), |v| v.to_string())
1271 ));
1272 }
1273 }
1274
1275 message.push_str("*Sample Peers*\n");
1276 message.push_str("```\n");
1277 for peer in inventory.peers().iter().take(20) {
1278 message.push_str(&format!("{peer}\n"));
1279 }
1280 message.push_str("```\n");
1281 message.push_str("*Available Files*\n");
1282 message.push_str("```\n");
1283 for (addr, file_name) in inventory.uploaded_files.iter() {
1284 message.push_str(&format!("{}: {}\n", addr, file_name))
1285 }
1286 message.push_str("```\n");
1287
1288 let payload = json!({
1289 "text": message,
1290 });
1291 reqwest::Client::new()
1292 .post(webhook_url)
1293 .json(&payload)
1294 .send()
1295 .await?;
1296 println!("{message}");
1297 println!("Posted notification to Slack");
1298 Ok(())
1299}
1300
1301fn print_duration(duration: Duration) {
1302 let total_seconds = duration.as_secs();
1303 let minutes = total_seconds / 60;
1304 let seconds = total_seconds % 60;
1305 debug!("Time taken: {} minutes and {} seconds", minutes, seconds);
1306}
1307
1308pub fn get_progress_bar(length: u64) -> Result<ProgressBar> {
1309 let progress_bar = ProgressBar::new(length);
1310 progress_bar.set_style(
1311 ProgressStyle::default_bar()
1312 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len}")?
1313 .progress_chars("#>-"),
1314 );
1315 progress_bar.enable_steady_tick(Duration::from_millis(100));
1316 Ok(progress_bar)
1317}
1318
1319pub async fn get_environment_details(
1320 environment_name: &str,
1321 s3_repository: &S3Repository,
1322) -> Result<EnvironmentDetails> {
1323 let temp_file = tempfile::NamedTempFile::new()?;
1324
1325 let max_retries = 3;
1326 let mut retries = 0;
1327 let env_details = loop {
1328 debug!("Downloading the environment details file for {environment_name} from S3");
1329 match s3_repository
1330 .download_object("sn-environment-type", environment_name, temp_file.path())
1331 .await
1332 {
1333 Ok(_) => {
1334 debug!("Downloaded the environment details file for {environment_name} from S3");
1335 let content = match std::fs::read_to_string(temp_file.path()) {
1336 Ok(content) => content,
1337 Err(err) => {
1338 log::error!("Could not read the environment details file: {err:?}");
1339 if retries < max_retries {
1340 debug!("Retrying to read the environment details file");
1341 retries += 1;
1342 continue;
1343 } else {
1344 return Err(Error::EnvironmentDetailsNotFound(
1345 environment_name.to_string(),
1346 ));
1347 }
1348 }
1349 };
1350 trace!("Content of the environment details file: {}", content);
1351
1352 match serde_json::from_str(&content) {
1353 Ok(environment_details) => break environment_details,
1354 Err(err) => {
1355 log::error!("Could not parse the environment details file: {err:?}");
1356 if retries < max_retries {
1357 debug!("Retrying to parse the environment details file");
1358 retries += 1;
1359 continue;
1360 } else {
1361 return Err(Error::EnvironmentDetailsNotFound(
1362 environment_name.to_string(),
1363 ));
1364 }
1365 }
1366 }
1367 }
1368 Err(err) => {
1369 log::error!(
1370 "Could not download the environment details file for {environment_name} from S3: {err:?}"
1371 );
1372 if retries < max_retries {
1373 retries += 1;
1374 continue;
1375 } else {
1376 return Err(Error::EnvironmentDetailsNotFound(
1377 environment_name.to_string(),
1378 ));
1379 }
1380 }
1381 }
1382 };
1383
1384 debug!("Fetched environment details: {env_details:?}");
1385
1386 Ok(env_details)
1387}
1388
1389pub async fn write_environment_details(
1390 s3_repository: &S3Repository,
1391 environment_name: &str,
1392 environment_details: &EnvironmentDetails,
1393) -> Result<()> {
1394 let temp_dir = tempfile::tempdir()?;
1395 let path = temp_dir.path().to_path_buf().join(environment_name);
1396 let mut file = File::create(&path)?;
1397 let json = serde_json::to_string(environment_details)?;
1398 file.write_all(json.as_bytes())?;
1399 s3_repository
1400 .upload_file("sn-environment-type", &path, true)
1401 .await?;
1402 Ok(())
1403}
1404
1405pub fn calculate_size_per_attached_volume(node_count: u16) -> u16 {
1406 if node_count == 0 {
1407 return 0;
1408 }
1409 let total_volume_required = node_count * STORAGE_REQUIRED_PER_NODE;
1410
1411 (total_volume_required as f64 / 7.0).ceil() as u16
1413}
1414
1415pub fn get_bootstrap_cache_url(ip_addr: &IpAddr) -> String {
1416 format!("http://{ip_addr}/bootstrap_cache.json")
1417}