1pub mod ansible;
8pub mod bootstrap;
9pub mod deploy;
10pub mod digital_ocean;
11pub mod error;
12pub mod funding;
13pub mod infra;
14pub mod inventory;
15pub mod logs;
16pub mod reserved_ip;
17pub mod rpc_client;
18pub mod s3;
19pub mod safe;
20pub mod setup;
21pub mod ssh;
22pub mod terraform;
23pub mod uploaders;
24pub mod upscale;
25
26const STORAGE_REQUIRED_PER_NODE: u16 = 7;
27
28use crate::{
29 ansible::{
30 extra_vars::ExtraVarsDocBuilder,
31 inventory::{cleanup_environment_inventory, AnsibleInventoryType},
32 provisioning::AnsibleProvisioner,
33 AnsibleRunner,
34 },
35 error::{Error, Result},
36 inventory::{DeploymentInventory, VirtualMachine},
37 rpc_client::RpcClient,
38 s3::S3Repository,
39 ssh::SshClient,
40 terraform::TerraformRunner,
41};
42use ant_service_management::ServiceStatus;
43use flate2::read::GzDecoder;
44use indicatif::{ProgressBar, ProgressStyle};
45use infra::{build_terraform_args, InfraRunOptions};
46use log::{debug, trace};
47use semver::Version;
48use serde::{Deserialize, Serialize};
49use serde_json::json;
50use std::{
51 fs::File,
52 io::{BufRead, BufReader, BufWriter, Write},
53 net::IpAddr,
54 path::{Path, PathBuf},
55 process::{Command, Stdio},
56 str::FromStr,
57 time::Duration,
58};
59use tar::Archive;
60
61const ANSIBLE_DEFAULT_FORKS: usize = 50;
62
63#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
64pub enum DeploymentType {
65 Bootstrap,
67 #[default]
69 New,
70 Uploaders,
71}
72
73#[derive(Debug, Clone, Default, Serialize, Deserialize)]
74pub struct AnvilNodeData {
75 pub data_payments_address: String,
76 pub deployer_wallet_private_key: String,
77 pub payment_token_address: String,
78 pub rpc_url: String,
79}
80
81impl std::fmt::Display for DeploymentType {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 match self {
84 DeploymentType::Bootstrap => write!(f, "bootstrap"),
85 DeploymentType::New => write!(f, "new"),
86 DeploymentType::Uploaders => write!(f, "uploaders"),
87 }
88 }
89}
90
91impl std::str::FromStr for DeploymentType {
92 type Err = String;
93
94 fn from_str(s: &str) -> Result<Self, Self::Err> {
95 match s.to_lowercase().as_str() {
96 "bootstrap" => Ok(DeploymentType::Bootstrap),
97 "new" => Ok(DeploymentType::New),
98 "uploaders" => Ok(DeploymentType::Uploaders),
99 _ => Err(format!("Invalid deployment type: {}", s)),
100 }
101 }
102}
103
104#[derive(Debug, Clone)]
105pub enum NodeType {
106 FullConePrivateNode,
107 Generic,
108 Genesis,
109 PeerCache,
110 SymmetricPrivateNode,
111}
112
113impl std::fmt::Display for NodeType {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 match self {
116 NodeType::FullConePrivateNode => write!(f, "full-cone-private"),
117 NodeType::Generic => write!(f, "generic"),
118 NodeType::Genesis => write!(f, "genesis"),
119 NodeType::PeerCache => write!(f, "peer-cache"),
120 NodeType::SymmetricPrivateNode => write!(f, "symmetric-private"),
121 }
122 }
123}
124
125impl std::str::FromStr for NodeType {
126 type Err = String;
127
128 fn from_str(s: &str) -> Result<Self, Self::Err> {
129 match s.to_lowercase().as_str() {
130 "full-cone-private" => Ok(NodeType::FullConePrivateNode),
131 "generic" => Ok(NodeType::Generic),
132 "genesis" => Ok(NodeType::Genesis),
133 "peer-cache" => Ok(NodeType::PeerCache),
134 "symmetric-private" => Ok(NodeType::SymmetricPrivateNode),
135 _ => Err(format!("Invalid node type: {}", s)),
136 }
137 }
138}
139
140impl NodeType {
141 pub fn telegraf_role(&self) -> &'static str {
142 match self {
143 NodeType::FullConePrivateNode => "NAT_FULL_CONE_NODE",
144 NodeType::Generic => "GENERIC_NODE",
145 NodeType::Genesis => "GENESIS_NODE",
146 NodeType::PeerCache => "PEER_CACHE_NODE",
147 NodeType::SymmetricPrivateNode => "NAT_RANDOMIZED_NODE",
148 }
149 }
150
151 pub fn to_ansible_inventory_type(&self) -> AnsibleInventoryType {
152 match self {
153 NodeType::FullConePrivateNode => AnsibleInventoryType::FullConePrivateNodes,
154 NodeType::Generic => AnsibleInventoryType::Nodes,
155 NodeType::Genesis => AnsibleInventoryType::Genesis,
156 NodeType::PeerCache => AnsibleInventoryType::PeerCacheNodes,
157 NodeType::SymmetricPrivateNode => AnsibleInventoryType::SymmetricPrivateNodes,
158 }
159 }
160}
161
162#[derive(Clone, Debug, Default, Eq, Serialize, Deserialize, PartialEq)]
163pub enum EvmNetwork {
164 #[default]
165 Anvil,
166 ArbitrumOne,
167 ArbitrumSepolia,
168 Custom,
169}
170
171impl std::fmt::Display for EvmNetwork {
172 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173 match self {
174 EvmNetwork::Anvil => write!(f, "evm-custom"),
175 EvmNetwork::ArbitrumOne => write!(f, "evm-arbitrum-one"),
176 EvmNetwork::ArbitrumSepolia => write!(f, "evm-arbitrum-sepolia"),
177 EvmNetwork::Custom => write!(f, "evm-custom"),
178 }
179 }
180}
181
182impl std::str::FromStr for EvmNetwork {
183 type Err = String;
184
185 fn from_str(s: &str) -> Result<Self, Self::Err> {
186 match s.to_lowercase().as_str() {
187 "anvil" => Ok(EvmNetwork::Anvil),
188 "arbitrum-one" => Ok(EvmNetwork::ArbitrumOne),
189 "arbitrum-sepolia" => Ok(EvmNetwork::ArbitrumSepolia),
190 "custom" => Ok(EvmNetwork::Custom),
191 _ => Err(format!("Invalid EVM network type: {}", s)),
192 }
193 }
194}
195
196#[derive(Clone, Debug, Default, Serialize, Deserialize)]
197pub struct EvmDetails {
198 pub network: EvmNetwork,
199 pub data_payments_address: Option<String>,
200 pub payment_token_address: Option<String>,
201 pub rpc_url: Option<String>,
202}
203
204#[derive(Clone, Debug, Default, Serialize, Deserialize)]
205pub struct EnvironmentDetails {
206 pub deployment_type: DeploymentType,
207 pub environment_type: EnvironmentType,
208 pub evm_details: EvmDetails,
209 pub funding_wallet_address: Option<String>,
210 pub network_id: Option<u8>,
211 pub rewards_address: Option<String>,
212}
213
214#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
215pub enum EnvironmentType {
216 #[default]
217 Development,
218 Production,
219 Staging,
220}
221
222impl EnvironmentType {
223 pub fn get_tfvars_filename(&self, name: &str) -> String {
224 match self {
225 EnvironmentType::Development => "dev.tfvars".to_string(),
226 EnvironmentType::Staging => "staging.tfvars".to_string(),
227 EnvironmentType::Production => {
228 format!("{name}.tfvars",)
229 }
230 }
231 }
232
233 pub fn get_default_peer_cache_node_count(&self) -> u16 {
234 match self {
235 EnvironmentType::Development => 5,
236 EnvironmentType::Production => 5,
237 EnvironmentType::Staging => 5,
238 }
239 }
240
241 pub fn get_default_node_count(&self) -> u16 {
242 match self {
243 EnvironmentType::Development => 25,
244 EnvironmentType::Production => 25,
245 EnvironmentType::Staging => 25,
246 }
247 }
248
249 pub fn get_default_symmetric_private_node_count(&self) -> u16 {
250 self.get_default_node_count()
251 }
252
253 pub fn get_default_full_cone_private_node_count(&self) -> u16 {
254 self.get_default_node_count()
255 }
256}
257
258impl std::fmt::Display for EnvironmentType {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 match self {
261 EnvironmentType::Development => write!(f, "development"),
262 EnvironmentType::Production => write!(f, "production"),
263 EnvironmentType::Staging => write!(f, "staging"),
264 }
265 }
266}
267
268impl FromStr for EnvironmentType {
269 type Err = Error;
270
271 fn from_str(s: &str) -> Result<Self, Self::Err> {
272 match s.to_lowercase().as_str() {
273 "development" => Ok(EnvironmentType::Development),
274 "production" => Ok(EnvironmentType::Production),
275 "staging" => Ok(EnvironmentType::Staging),
276 _ => Err(Error::EnvironmentNameFromStringError(s.to_string())),
277 }
278 }
279}
280
281#[derive(Clone, Debug, Serialize, Deserialize)]
295pub enum BinaryOption {
296 BuildFromSource {
298 antnode_features: Option<String>,
300 branch: String,
301 repo_owner: String,
302 },
303 Versioned {
305 ant_version: Option<Version>,
306 antctl_version: Option<Version>,
307 antnode_version: Option<Version>,
308 },
309}
310
311impl BinaryOption {
312 pub fn print(&self) {
313 match self {
314 BinaryOption::BuildFromSource {
315 antnode_features,
316 branch,
317 repo_owner,
318 } => {
319 println!("Source configuration:");
320 println!(" Repository owner: {}", repo_owner);
321 println!(" Branch: {}", branch);
322 if let Some(features) = antnode_features {
323 println!(" Antnode features: {}", features);
324 }
325 }
326 BinaryOption::Versioned {
327 ant_version,
328 antctl_version,
329 antnode_version,
330 } => {
331 println!("Versioned binaries configuration:");
332 if let Some(version) = ant_version {
333 println!(" ant version: {}", version);
334 }
335 if let Some(version) = antctl_version {
336 println!(" antctl version: {}", version);
337 }
338 if let Some(version) = antnode_version {
339 println!(" antnode version: {}", version);
340 }
341 }
342 }
343 }
344}
345
346#[derive(Debug, Clone, Copy)]
347pub enum CloudProvider {
348 Aws,
349 DigitalOcean,
350}
351
352impl std::fmt::Display for CloudProvider {
353 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
354 match self {
355 CloudProvider::Aws => write!(f, "aws"),
356 CloudProvider::DigitalOcean => write!(f, "digital-ocean"),
357 }
358 }
359}
360
361impl CloudProvider {
362 pub fn get_ssh_user(&self) -> String {
363 match self {
364 CloudProvider::Aws => "ubuntu".to_string(),
365 CloudProvider::DigitalOcean => "root".to_string(),
366 }
367 }
368}
369
370#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
371pub enum LogFormat {
372 Default,
373 Json,
374}
375
376impl LogFormat {
377 pub fn parse_from_str(val: &str) -> Result<Self> {
378 match val {
379 "default" => Ok(LogFormat::Default),
380 "json" => Ok(LogFormat::Json),
381 _ => Err(Error::LoggingConfiguration(
382 "The only valid values for this argument are \"default\" or \"json\"".to_string(),
383 )),
384 }
385 }
386
387 pub fn as_str(&self) -> &'static str {
388 match self {
389 LogFormat::Default => "default",
390 LogFormat::Json => "json",
391 }
392 }
393}
394
395#[derive(Clone)]
396pub struct UpgradeOptions {
397 pub ansible_verbose: bool,
398 pub custom_inventory: Option<Vec<VirtualMachine>>,
399 pub env_variables: Option<Vec<(String, String)>>,
400 pub force: bool,
401 pub forks: usize,
402 pub interval: Duration,
403 pub name: String,
404 pub node_type: Option<NodeType>,
405 pub pre_upgrade_delay: Option<u64>,
406 pub provider: CloudProvider,
407 pub version: Option<String>,
408}
409
410impl UpgradeOptions {
411 pub fn get_ansible_vars(&self) -> String {
412 let mut extra_vars = ExtraVarsDocBuilder::default();
413 extra_vars.add_variable("interval", &self.interval.as_millis().to_string());
414 if let Some(env_variables) = &self.env_variables {
415 extra_vars.add_env_variable_list("env_variables", env_variables.clone());
416 }
417 if self.force {
418 extra_vars.add_variable("force", &self.force.to_string());
419 }
420 if let Some(version) = &self.version {
421 extra_vars.add_variable("antnode_version", version);
422 }
423 if let Some(pre_upgrade_delay) = &self.pre_upgrade_delay {
424 extra_vars.add_variable("pre_upgrade_delay", &pre_upgrade_delay.to_string());
425 }
426 extra_vars.build()
427 }
428}
429
430#[derive(Default)]
431pub struct TestnetDeployBuilder {
432 ansible_forks: Option<usize>,
433 ansible_verbose_mode: bool,
434 deployment_type: EnvironmentType,
435 environment_name: String,
436 provider: Option<CloudProvider>,
437 ssh_secret_key_path: Option<PathBuf>,
438 state_bucket_name: Option<String>,
439 terraform_binary_path: Option<PathBuf>,
440 vault_password_path: Option<PathBuf>,
441 working_directory_path: Option<PathBuf>,
442}
443
444impl TestnetDeployBuilder {
445 pub fn new() -> Self {
446 Default::default()
447 }
448
449 pub fn ansible_verbose_mode(&mut self, ansible_verbose_mode: bool) -> &mut Self {
450 self.ansible_verbose_mode = ansible_verbose_mode;
451 self
452 }
453
454 pub fn ansible_forks(&mut self, ansible_forks: usize) -> &mut Self {
455 self.ansible_forks = Some(ansible_forks);
456 self
457 }
458
459 pub fn deployment_type(&mut self, deployment_type: EnvironmentType) -> &mut Self {
460 self.deployment_type = deployment_type;
461 self
462 }
463
464 pub fn environment_name(&mut self, name: &str) -> &mut Self {
465 self.environment_name = name.to_string();
466 self
467 }
468
469 pub fn provider(&mut self, provider: CloudProvider) -> &mut Self {
470 self.provider = Some(provider);
471 self
472 }
473
474 pub fn state_bucket_name(&mut self, state_bucket_name: String) -> &mut Self {
475 self.state_bucket_name = Some(state_bucket_name);
476 self
477 }
478
479 pub fn terraform_binary_path(&mut self, terraform_binary_path: PathBuf) -> &mut Self {
480 self.terraform_binary_path = Some(terraform_binary_path);
481 self
482 }
483
484 pub fn working_directory(&mut self, working_directory_path: PathBuf) -> &mut Self {
485 self.working_directory_path = Some(working_directory_path);
486 self
487 }
488
489 pub fn ssh_secret_key_path(&mut self, ssh_secret_key_path: PathBuf) -> &mut Self {
490 self.ssh_secret_key_path = Some(ssh_secret_key_path);
491 self
492 }
493
494 pub fn vault_password_path(&mut self, vault_password_path: PathBuf) -> &mut Self {
495 self.vault_password_path = Some(vault_password_path);
496 self
497 }
498
499 pub fn build(&self) -> Result<TestnetDeployer> {
500 let provider = self.provider.unwrap_or(CloudProvider::DigitalOcean);
501 match provider {
502 CloudProvider::DigitalOcean => {
503 let digital_ocean_pat = std::env::var("DO_PAT").map_err(|_| {
504 Error::CloudProviderCredentialsNotSupplied("DO_PAT".to_string())
505 })?;
506 std::env::set_var("DIGITALOCEAN_TOKEN", digital_ocean_pat.clone());
510 std::env::set_var("DO_API_TOKEN", digital_ocean_pat);
511 }
512 _ => {
513 return Err(Error::CloudProviderNotSupported(provider.to_string()));
514 }
515 }
516
517 let state_bucket_name = match self.state_bucket_name {
518 Some(ref bucket_name) => bucket_name.clone(),
519 None => std::env::var("TERRAFORM_STATE_BUCKET_NAME")?,
520 };
521
522 let default_terraform_bin_path = PathBuf::from("terraform");
523 let terraform_binary_path = self
524 .terraform_binary_path
525 .as_ref()
526 .unwrap_or(&default_terraform_bin_path);
527
528 let working_directory_path = match self.working_directory_path {
529 Some(ref work_dir_path) => work_dir_path.clone(),
530 None => std::env::current_dir()?.join("resources"),
531 };
532
533 let ssh_secret_key_path = match self.ssh_secret_key_path {
534 Some(ref ssh_sk_path) => ssh_sk_path.clone(),
535 None => PathBuf::from(std::env::var("SSH_KEY_PATH")?),
536 };
537
538 let vault_password_path = match self.vault_password_path {
539 Some(ref vault_pw_path) => vault_pw_path.clone(),
540 None => PathBuf::from(std::env::var("ANSIBLE_VAULT_PASSWORD_PATH")?),
541 };
542
543 let terraform_runner = TerraformRunner::new(
544 terraform_binary_path.to_path_buf(),
545 working_directory_path
546 .join("terraform")
547 .join("testnet")
548 .join(provider.to_string()),
549 provider,
550 &state_bucket_name,
551 )?;
552 let ansible_runner = AnsibleRunner::new(
553 self.ansible_forks.unwrap_or(ANSIBLE_DEFAULT_FORKS),
554 self.ansible_verbose_mode,
555 &self.environment_name,
556 provider,
557 ssh_secret_key_path.clone(),
558 vault_password_path,
559 working_directory_path.join("ansible"),
560 )?;
561 let ssh_client = SshClient::new(ssh_secret_key_path);
562 let ansible_provisioner =
563 AnsibleProvisioner::new(ansible_runner, provider, ssh_client.clone());
564 let rpc_client = RpcClient::new(
565 PathBuf::from("/usr/local/bin/safenode_rpc_client"),
566 working_directory_path.clone(),
567 );
568
569 let safe_path = working_directory_path.join("safe");
572 if safe_path.exists() {
573 std::fs::remove_file(safe_path)?;
574 }
575
576 let testnet = TestnetDeployer::new(
577 ansible_provisioner,
578 provider,
579 self.deployment_type.clone(),
580 &self.environment_name,
581 rpc_client,
582 S3Repository {},
583 ssh_client,
584 terraform_runner,
585 working_directory_path,
586 )?;
587
588 Ok(testnet)
589 }
590}
591
592#[derive(Clone)]
593pub struct TestnetDeployer {
594 pub ansible_provisioner: AnsibleProvisioner,
595 pub cloud_provider: CloudProvider,
596 pub deployment_type: EnvironmentType,
597 pub environment_name: String,
598 pub inventory_file_path: PathBuf,
599 pub rpc_client: RpcClient,
600 pub s3_repository: S3Repository,
601 pub ssh_client: SshClient,
602 pub terraform_runner: TerraformRunner,
603 pub working_directory_path: PathBuf,
604}
605
606impl TestnetDeployer {
607 #[allow(clippy::too_many_arguments)]
608 pub fn new(
609 ansible_provisioner: AnsibleProvisioner,
610 cloud_provider: CloudProvider,
611 deployment_type: EnvironmentType,
612 environment_name: &str,
613 rpc_client: RpcClient,
614 s3_repository: S3Repository,
615 ssh_client: SshClient,
616 terraform_runner: TerraformRunner,
617 working_directory_path: PathBuf,
618 ) -> Result<TestnetDeployer> {
619 if environment_name.is_empty() {
620 return Err(Error::EnvironmentNameRequired);
621 }
622 let inventory_file_path = working_directory_path
623 .join("ansible")
624 .join("inventory")
625 .join("dev_inventory_digital_ocean.yml");
626 Ok(TestnetDeployer {
627 ansible_provisioner,
628 cloud_provider,
629 deployment_type,
630 environment_name: environment_name.to_string(),
631 inventory_file_path,
632 rpc_client,
633 ssh_client,
634 s3_repository,
635 terraform_runner,
636 working_directory_path,
637 })
638 }
639
640 pub async fn init(&self) -> Result<()> {
641 if self
642 .s3_repository
643 .folder_exists(
644 "sn-testnet",
645 &format!("testnet-logs/{}", self.environment_name),
646 )
647 .await?
648 {
649 return Err(Error::LogsForPreviousTestnetExist(
650 self.environment_name.clone(),
651 ));
652 }
653
654 self.terraform_runner.init()?;
655 let workspaces = self.terraform_runner.workspace_list()?;
656 if !workspaces.contains(&self.environment_name) {
657 self.terraform_runner
658 .workspace_new(&self.environment_name)?;
659 } else {
660 println!("Workspace {} already exists", self.environment_name);
661 }
662
663 let rpc_client_path = self.working_directory_path.join("safenode_rpc_client");
664 if !rpc_client_path.is_file() {
665 println!("Downloading the rpc client for safenode...");
666 let archive_name = "safenode_rpc_client-latest-x86_64-unknown-linux-musl.tar.gz";
667 get_and_extract_archive_from_s3(
668 &self.s3_repository,
669 "sn-node-rpc-client",
670 archive_name,
671 &self.working_directory_path,
672 )
673 .await?;
674 #[cfg(unix)]
675 {
676 use std::os::unix::fs::PermissionsExt;
677 let mut permissions = std::fs::metadata(&rpc_client_path)?.permissions();
678 permissions.set_mode(0o755); std::fs::set_permissions(&rpc_client_path, permissions)?;
680 }
681 }
682
683 Ok(())
684 }
685
686 pub fn plan(&self, options: &InfraRunOptions) -> Result<()> {
687 println!("Selecting {} workspace...", options.name);
688 self.terraform_runner.workspace_select(&options.name)?;
689
690 let args = build_terraform_args(options)?;
691
692 self.terraform_runner
693 .plan(Some(args), Some(options.tfvars_filename.clone()))?;
694 Ok(())
695 }
696
697 pub fn start(
698 &self,
699 interval: Duration,
700 node_type: Option<NodeType>,
701 custom_inventory: Option<Vec<VirtualMachine>>,
702 ) -> Result<()> {
703 self.ansible_provisioner.start_nodes(
704 &self.environment_name,
705 interval,
706 node_type,
707 custom_inventory,
708 )?;
709 Ok(())
710 }
711
712 pub fn status(&self) -> Result<()> {
718 self.ansible_provisioner.status()?;
719
720 let peer_cache_node_registries = self
721 .ansible_provisioner
722 .get_node_registries(&AnsibleInventoryType::PeerCacheNodes)?;
723 let generic_node_registries = self
724 .ansible_provisioner
725 .get_node_registries(&AnsibleInventoryType::Nodes)?;
726 let symmetric_private_node_registries = self
727 .ansible_provisioner
728 .get_node_registries(&AnsibleInventoryType::SymmetricPrivateNodes)?;
729 let full_cone_private_node_registries = self
730 .ansible_provisioner
731 .get_node_registries(&AnsibleInventoryType::FullConePrivateNodes)?;
732 let genesis_node_registry = self
733 .ansible_provisioner
734 .get_node_registries(&AnsibleInventoryType::Genesis)?
735 .clone();
736
737 peer_cache_node_registries.print();
738 generic_node_registries.print();
739 symmetric_private_node_registries.print();
740 full_cone_private_node_registries.print();
741 genesis_node_registry.print();
742
743 let all_registries = [
744 &peer_cache_node_registries,
745 &generic_node_registries,
746 &symmetric_private_node_registries,
747 &full_cone_private_node_registries,
748 &genesis_node_registry,
749 ];
750
751 let mut total_nodes = 0;
752 let mut running_nodes = 0;
753 let mut stopped_nodes = 0;
754 let mut added_nodes = 0;
755 let mut removed_nodes = 0;
756
757 for (_, registry) in all_registries
758 .iter()
759 .flat_map(|r| r.retrieved_registries.iter())
760 {
761 for node in registry.nodes.iter() {
762 total_nodes += 1;
763 match node.status {
764 ServiceStatus::Running => running_nodes += 1,
765 ServiceStatus::Stopped => stopped_nodes += 1,
766 ServiceStatus::Added => added_nodes += 1,
767 ServiceStatus::Removed => removed_nodes += 1,
768 }
769 }
770 }
771
772 let peer_cache_hosts = peer_cache_node_registries.retrieved_registries.len();
773 let generic_hosts = generic_node_registries.retrieved_registries.len();
774 let symmetric_private_hosts = symmetric_private_node_registries.retrieved_registries.len();
775 let full_cone_private_hosts = full_cone_private_node_registries.retrieved_registries.len();
776
777 let peer_cache_nodes = peer_cache_node_registries
778 .retrieved_registries
779 .iter()
780 .flat_map(|(_, n)| n.nodes.iter())
781 .count();
782 let generic_nodes = generic_node_registries
783 .retrieved_registries
784 .iter()
785 .flat_map(|(_, n)| n.nodes.iter())
786 .count();
787 let symmetric_private_nodes = symmetric_private_node_registries
788 .retrieved_registries
789 .iter()
790 .flat_map(|(_, n)| n.nodes.iter())
791 .count();
792 let full_cone_private_nodes = full_cone_private_node_registries
793 .retrieved_registries
794 .iter()
795 .flat_map(|(_, n)| n.nodes.iter())
796 .count();
797
798 println!("-------");
799 println!("Summary");
800 println!("-------");
801 println!(
802 "Total peer cache nodes ({}x{}): {}",
803 peer_cache_hosts,
804 if peer_cache_hosts > 0 {
805 peer_cache_nodes / peer_cache_hosts
806 } else {
807 0
808 },
809 peer_cache_nodes
810 );
811 println!(
812 "Total generic nodes ({}x{}): {}",
813 generic_hosts,
814 if generic_hosts > 0 {
815 generic_nodes / generic_hosts
816 } else {
817 0
818 },
819 generic_nodes
820 );
821 println!(
822 "Total symmetric private nodes ({}x{}): {}",
823 symmetric_private_hosts,
824 if symmetric_private_hosts > 0 {
825 symmetric_private_nodes / symmetric_private_hosts
826 } else {
827 0
828 },
829 symmetric_private_nodes
830 );
831 println!(
832 "Total full cone private nodes ({}x{}): {}",
833 full_cone_private_hosts,
834 if full_cone_private_hosts > 0 {
835 full_cone_private_nodes / full_cone_private_hosts
836 } else {
837 0
838 },
839 full_cone_private_nodes
840 );
841 println!("Total nodes: {}", total_nodes);
842 println!("Running nodes: {}", running_nodes);
843 println!("Stopped nodes: {}", stopped_nodes);
844 println!("Added nodes: {}", added_nodes);
845 println!("Removed nodes: {}", removed_nodes);
846
847 Ok(())
848 }
849
850 pub fn cleanup_node_logs(&self, setup_cron: bool) -> Result<()> {
851 self.ansible_provisioner.cleanup_node_logs(setup_cron)?;
852 Ok(())
853 }
854
855 pub fn start_telegraf(
856 &self,
857 node_type: Option<NodeType>,
858 custom_inventory: Option<Vec<VirtualMachine>>,
859 ) -> Result<()> {
860 self.ansible_provisioner.start_telegraf(
861 &self.environment_name,
862 node_type,
863 custom_inventory,
864 )?;
865 Ok(())
866 }
867
868 pub fn stop(
869 &self,
870 interval: Duration,
871 node_type: Option<NodeType>,
872 custom_inventory: Option<Vec<VirtualMachine>>,
873 delay: Option<u64>,
874 service_names: Option<Vec<String>>,
875 ) -> Result<()> {
876 self.ansible_provisioner.stop_nodes(
877 &self.environment_name,
878 interval,
879 node_type,
880 custom_inventory,
881 delay,
882 service_names,
883 )?;
884 Ok(())
885 }
886
887 pub fn stop_telegraf(
888 &self,
889 node_type: Option<NodeType>,
890 custom_inventory: Option<Vec<VirtualMachine>>,
891 ) -> Result<()> {
892 self.ansible_provisioner.stop_telegraf(
893 &self.environment_name,
894 node_type,
895 custom_inventory,
896 )?;
897 Ok(())
898 }
899
900 pub fn upgrade(&self, options: UpgradeOptions) -> Result<()> {
901 self.ansible_provisioner.upgrade_nodes(&options)?;
902 Ok(())
903 }
904
905 pub fn upgrade_antctl(
906 &self,
907 version: Version,
908 node_type: Option<NodeType>,
909 custom_inventory: Option<Vec<VirtualMachine>>,
910 ) -> Result<()> {
911 self.ansible_provisioner.upgrade_antctl(
912 &self.environment_name,
913 &version,
914 node_type,
915 custom_inventory,
916 )?;
917 Ok(())
918 }
919
920 pub fn upgrade_node_telegraf(&self, name: &str) -> Result<()> {
921 self.ansible_provisioner.upgrade_node_telegraf(name)?;
922 Ok(())
923 }
924
925 pub fn upgrade_uploader_telegraf(&self, name: &str) -> Result<()> {
926 self.ansible_provisioner.upgrade_uploader_telegraf(name)?;
927 Ok(())
928 }
929
930 pub async fn clean(&self) -> Result<()> {
931 let environment_details =
932 get_environment_details(&self.environment_name, &self.s3_repository).await?;
933 funding::drain_funds(&self.ansible_provisioner, &environment_details).await?;
934
935 self.destroy_infra(&environment_details).await?;
936
937 cleanup_environment_inventory(
938 &self.environment_name,
939 &self
940 .working_directory_path
941 .join("ansible")
942 .join("inventory"),
943 None,
944 )?;
945
946 println!("Deleted Ansible inventory for {}", self.environment_name);
947
948 self.s3_repository
949 .delete_object("sn-environment-type", &self.environment_name)
950 .await?;
951 Ok(())
952 }
953
954 async fn destroy_infra(&self, environment_details: &EnvironmentDetails) -> Result<()> {
955 infra::select_workspace(&self.terraform_runner, &self.environment_name)?;
956
957 let options = InfraRunOptions::generate_existing(
958 &self.environment_name,
959 &self.terraform_runner,
960 environment_details,
961 )
962 .await?;
963
964 let mut args = Vec::new();
965 if let Some(full_cone_private_node_volume_size) = options.full_cone_private_node_volume_size
966 {
967 args.push((
968 "full_cone_private_node_volume_size".to_string(),
969 full_cone_private_node_volume_size.to_string(),
970 ));
971 }
972 if let Some(genesis_node_volume_size) = options.genesis_node_volume_size {
973 args.push((
974 "genesis_node_volume_size".to_string(),
975 genesis_node_volume_size.to_string(),
976 ));
977 }
978 if let Some(node_volume_size) = options.node_volume_size {
979 args.push(("node_volume_size".to_string(), node_volume_size.to_string()));
980 }
981 if let Some(peer_cache_node_volume_size) = options.peer_cache_node_volume_size {
982 args.push((
983 "peer_cache_node_volume_size".to_string(),
984 peer_cache_node_volume_size.to_string(),
985 ));
986 }
987 if let Some(symmetric_private_node_volume_size) = options.symmetric_private_node_volume_size
988 {
989 args.push((
990 "symmetric_private_node_volume_size".to_string(),
991 symmetric_private_node_volume_size.to_string(),
992 ));
993 }
994
995 self.terraform_runner.destroy(
996 Some(args),
997 Some(
998 environment_details
999 .environment_type
1000 .get_tfvars_filename(&self.environment_name),
1001 ),
1002 )?;
1003
1004 infra::delete_workspace(&self.terraform_runner, &self.environment_name)?;
1005
1006 Ok(())
1007 }
1008}
1009
1010pub fn get_genesis_multiaddr(
1015 ansible_runner: &AnsibleRunner,
1016 ssh_client: &SshClient,
1017) -> Result<(String, IpAddr)> {
1018 let genesis_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Genesis, true)?;
1019 let genesis_ip = genesis_inventory[0].public_ip_addr;
1020
1021 let multiaddr = ssh_client
1025 .run_command(
1026 &genesis_ip,
1027 "root",
1028 "jq -r '.nodes[] | select(.peers_args.first == true) | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1029 false,
1030 )
1031 .map(|output| output.first().cloned())
1032 .unwrap_or_else(|err| {
1033 log::error!("Failed to find first node with quic-v1 protocol: {err:?}");
1034 None
1035 });
1036
1037 let multiaddr = match multiaddr {
1039 Some(addr) => addr,
1040 None => ssh_client
1041 .run_command(
1042 &genesis_ip,
1043 "root",
1044 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1045 false,
1046 )?
1047 .first()
1048 .cloned()
1049 .ok_or_else(|| Error::GenesisListenAddress)?,
1050 };
1051
1052 Ok((multiaddr, genesis_ip))
1053}
1054
1055pub fn get_anvil_node_data(
1056 ansible_runner: &AnsibleRunner,
1057 ssh_client: &SshClient,
1058) -> Result<AnvilNodeData> {
1059 let evm_inventory = ansible_runner.get_inventory(AnsibleInventoryType::EvmNodes, true)?;
1060 if evm_inventory.is_empty() {
1061 return Err(Error::EvmNodeNotFound);
1062 }
1063
1064 let evm_ip = evm_inventory[0].public_ip_addr;
1065 debug!("Retrieved IP address for EVM node: {evm_ip}");
1066 let csv_file_path = "/home/ant/.local/share/autonomi/evm_testnet_data.csv";
1067
1068 const MAX_ATTEMPTS: u8 = 5;
1069 const RETRY_DELAY: Duration = Duration::from_secs(5);
1070
1071 for attempt in 1..=MAX_ATTEMPTS {
1072 match ssh_client.run_command(&evm_ip, "ant", &format!("cat {}", csv_file_path), false) {
1073 Ok(output) => {
1074 if let Some(csv_contents) = output.first() {
1075 let parts: Vec<&str> = csv_contents.split(',').collect();
1076 if parts.len() != 4 {
1077 return Err(Error::EvmTestnetDataParsingError(
1078 "Expected 4 fields in the CSV".to_string(),
1079 ));
1080 }
1081
1082 let evm_testnet_data = AnvilNodeData {
1083 rpc_url: parts[0].trim().to_string(),
1084 payment_token_address: parts[1].trim().to_string(),
1085 data_payments_address: parts[2].trim().to_string(),
1086 deployer_wallet_private_key: parts[3].trim().to_string(),
1087 };
1088 return Ok(evm_testnet_data);
1089 }
1090 }
1091 Err(e) => {
1092 if attempt == MAX_ATTEMPTS {
1093 return Err(e);
1094 }
1095 println!(
1096 "Attempt {} failed to read EVM testnet data. Retrying in {} seconds...",
1097 attempt,
1098 RETRY_DELAY.as_secs()
1099 );
1100 }
1101 }
1102 std::thread::sleep(RETRY_DELAY);
1103 }
1104
1105 Err(Error::EvmTestnetDataNotFound)
1106}
1107
1108pub fn get_multiaddr(
1109 ansible_runner: &AnsibleRunner,
1110 ssh_client: &SshClient,
1111) -> Result<(String, IpAddr)> {
1112 let node_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Nodes, true)?;
1113 let node_ip = node_inventory
1116 .iter()
1117 .find(|vm| vm.name.ends_with("-node-1"))
1118 .ok_or_else(|| Error::NodeAddressNotFound)?
1119 .public_ip_addr;
1120
1121 debug!("Getting multiaddr from node {node_ip}");
1122
1123 let multiaddr =
1124 ssh_client
1125 .run_command(
1126 &node_ip,
1127 "root",
1128 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not)' /var/antctl/node_registry.json | head -n 1",
1130 false,
1131 )?.first()
1132 .cloned()
1133 .ok_or_else(|| Error::NodeAddressNotFound)?;
1134
1135 Ok((multiaddr, node_ip))
1138}
1139
1140pub async fn get_and_extract_archive_from_s3(
1141 s3_repository: &S3Repository,
1142 bucket_name: &str,
1143 archive_bucket_path: &str,
1144 dest_path: &Path,
1145) -> Result<()> {
1146 let archive_file_name = archive_bucket_path.split('/').last().unwrap();
1149 let archive_dest_path = dest_path.join(archive_file_name);
1150 s3_repository
1151 .download_object(bucket_name, archive_bucket_path, &archive_dest_path)
1152 .await?;
1153 extract_archive(&archive_dest_path, dest_path)?;
1154 Ok(())
1155}
1156
1157pub fn extract_archive(archive_path: &Path, dest_path: &Path) -> Result<()> {
1158 let archive_file = File::open(archive_path)?;
1159 let decoder = GzDecoder::new(archive_file);
1160 let mut archive = Archive::new(decoder);
1161 let entries = archive.entries()?;
1162 for entry_result in entries {
1163 let mut entry = entry_result?;
1164 let extract_path = dest_path.join(entry.path()?);
1165 if entry.header().entry_type() == tar::EntryType::Directory {
1166 std::fs::create_dir_all(extract_path)?;
1167 continue;
1168 }
1169 let mut file = BufWriter::new(File::create(extract_path)?);
1170 std::io::copy(&mut entry, &mut file)?;
1171 }
1172 std::fs::remove_file(archive_path)?;
1173 Ok(())
1174}
1175
1176pub fn run_external_command(
1177 binary_path: PathBuf,
1178 working_directory_path: PathBuf,
1179 args: Vec<String>,
1180 suppress_stdout: bool,
1181 suppress_stderr: bool,
1182) -> Result<Vec<String>> {
1183 let mut command = Command::new(binary_path.clone());
1184 for arg in &args {
1185 command.arg(arg);
1186 }
1187 command.stdout(Stdio::piped());
1188 command.stderr(Stdio::piped());
1189 command.current_dir(working_directory_path.clone());
1190 debug!("Running {binary_path:#?} with args {args:#?}");
1191 debug!("Working directory set to {working_directory_path:#?}");
1192
1193 let mut child = command.spawn()?;
1194 let mut output_lines = Vec::new();
1195
1196 if let Some(ref mut stdout) = child.stdout {
1197 let reader = BufReader::new(stdout);
1198 for line in reader.lines() {
1199 let line = line?;
1200 if !suppress_stdout {
1201 println!("{line}");
1202 }
1203 output_lines.push(line);
1204 }
1205 }
1206
1207 if let Some(ref mut stderr) = child.stderr {
1208 let reader = BufReader::new(stderr);
1209 for line in reader.lines() {
1210 let line = line?;
1211 if !suppress_stderr {
1212 eprintln!("{line}");
1213 }
1214 output_lines.push(line);
1215 }
1216 }
1217
1218 let output = child.wait()?;
1219 if !output.success() {
1220 let binary_path = binary_path.to_str().unwrap();
1222 return Err(Error::ExternalCommandRunFailed {
1223 binary: binary_path.to_string(),
1224 exit_status: output,
1225 });
1226 }
1227
1228 Ok(output_lines)
1229}
1230
1231pub fn is_binary_on_path(binary_name: &str) -> bool {
1232 if let Ok(path) = std::env::var("PATH") {
1233 for dir in path.split(':') {
1234 let mut full_path = PathBuf::from(dir);
1235 full_path.push(binary_name);
1236 if full_path.exists() {
1237 return true;
1238 }
1239 }
1240 }
1241 false
1242}
1243
1244pub fn get_wallet_directory() -> Result<PathBuf> {
1245 Ok(dirs_next::data_dir()
1246 .ok_or_else(|| Error::CouldNotRetrieveDataDirectory)?
1247 .join("safe")
1248 .join("client")
1249 .join("wallet"))
1250}
1251
1252pub async fn notify_slack(inventory: DeploymentInventory) -> Result<()> {
1253 let webhook_url =
1254 std::env::var("SLACK_WEBHOOK_URL").map_err(|_| Error::SlackWebhookUrlNotSupplied)?;
1255
1256 let mut message = String::new();
1257 message.push_str("*Testnet Details*\n");
1258 message.push_str(&format!("Name: {}\n", inventory.name));
1259 message.push_str(&format!("Node count: {}\n", inventory.peers().len()));
1260 message.push_str(&format!("Faucet address: {:?}\n", inventory.faucet_address));
1261 match inventory.binary_option {
1262 BinaryOption::BuildFromSource {
1263 ref repo_owner,
1264 ref branch,
1265 ..
1266 } => {
1267 message.push_str("*Branch Details*\n");
1268 message.push_str(&format!("Repo owner: {}\n", repo_owner));
1269 message.push_str(&format!("Branch: {}\n", branch));
1270 }
1271 BinaryOption::Versioned {
1272 ant_version: ref safe_version,
1273 antnode_version: ref safenode_version,
1274 antctl_version: ref safenode_manager_version,
1275 ..
1276 } => {
1277 message.push_str("*Version Details*\n");
1278 message.push_str(&format!(
1279 "ant version: {}\n",
1280 safe_version
1281 .as_ref()
1282 .map_or("None".to_string(), |v| v.to_string())
1283 ));
1284 message.push_str(&format!(
1285 "safenode version: {}\n",
1286 safenode_version
1287 .as_ref()
1288 .map_or("None".to_string(), |v| v.to_string())
1289 ));
1290 message.push_str(&format!(
1291 "antctl version: {}\n",
1292 safenode_manager_version
1293 .as_ref()
1294 .map_or("None".to_string(), |v| v.to_string())
1295 ));
1296 }
1297 }
1298
1299 message.push_str("*Sample Peers*\n");
1300 message.push_str("```\n");
1301 for peer in inventory.peers().iter().take(20) {
1302 message.push_str(&format!("{peer}\n"));
1303 }
1304 message.push_str("```\n");
1305 message.push_str("*Available Files*\n");
1306 message.push_str("```\n");
1307 for (addr, file_name) in inventory.uploaded_files.iter() {
1308 message.push_str(&format!("{}: {}\n", addr, file_name))
1309 }
1310 message.push_str("```\n");
1311
1312 let payload = json!({
1313 "text": message,
1314 });
1315 reqwest::Client::new()
1316 .post(webhook_url)
1317 .json(&payload)
1318 .send()
1319 .await?;
1320 println!("{message}");
1321 println!("Posted notification to Slack");
1322 Ok(())
1323}
1324
1325fn print_duration(duration: Duration) {
1326 let total_seconds = duration.as_secs();
1327 let minutes = total_seconds / 60;
1328 let seconds = total_seconds % 60;
1329 debug!("Time taken: {} minutes and {} seconds", minutes, seconds);
1330}
1331
1332pub fn get_progress_bar(length: u64) -> Result<ProgressBar> {
1333 let progress_bar = ProgressBar::new(length);
1334 progress_bar.set_style(
1335 ProgressStyle::default_bar()
1336 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len}")?
1337 .progress_chars("#>-"),
1338 );
1339 progress_bar.enable_steady_tick(Duration::from_millis(100));
1340 Ok(progress_bar)
1341}
1342
1343pub async fn get_environment_details(
1344 environment_name: &str,
1345 s3_repository: &S3Repository,
1346) -> Result<EnvironmentDetails> {
1347 let temp_file = tempfile::NamedTempFile::new()?;
1348
1349 let max_retries = 3;
1350 let mut retries = 0;
1351 let env_details = loop {
1352 debug!("Downloading the environment details file for {environment_name} from S3");
1353 match s3_repository
1354 .download_object("sn-environment-type", environment_name, temp_file.path())
1355 .await
1356 {
1357 Ok(_) => {
1358 debug!("Downloaded the environment details file for {environment_name} from S3");
1359 let content = match std::fs::read_to_string(temp_file.path()) {
1360 Ok(content) => content,
1361 Err(err) => {
1362 log::error!("Could not read the environment details file: {err:?}");
1363 if retries < max_retries {
1364 debug!("Retrying to read the environment details file");
1365 retries += 1;
1366 continue;
1367 } else {
1368 return Err(Error::EnvironmentDetailsNotFound(
1369 environment_name.to_string(),
1370 ));
1371 }
1372 }
1373 };
1374 trace!("Content of the environment details file: {}", content);
1375
1376 match serde_json::from_str(&content) {
1377 Ok(environment_details) => break environment_details,
1378 Err(err) => {
1379 log::error!("Could not parse the environment details file: {err:?}");
1380 if retries < max_retries {
1381 debug!("Retrying to parse the environment details file");
1382 retries += 1;
1383 continue;
1384 } else {
1385 return Err(Error::EnvironmentDetailsNotFound(
1386 environment_name.to_string(),
1387 ));
1388 }
1389 }
1390 }
1391 }
1392 Err(err) => {
1393 log::error!(
1394 "Could not download the environment details file for {environment_name} from S3: {err:?}"
1395 );
1396 if retries < max_retries {
1397 retries += 1;
1398 continue;
1399 } else {
1400 return Err(Error::EnvironmentDetailsNotFound(
1401 environment_name.to_string(),
1402 ));
1403 }
1404 }
1405 }
1406 };
1407
1408 debug!("Fetched environment details: {env_details:?}");
1409
1410 Ok(env_details)
1411}
1412
1413pub async fn write_environment_details(
1414 s3_repository: &S3Repository,
1415 environment_name: &str,
1416 environment_details: &EnvironmentDetails,
1417) -> Result<()> {
1418 let temp_dir = tempfile::tempdir()?;
1419 let path = temp_dir.path().to_path_buf().join(environment_name);
1420 let mut file = File::create(&path)?;
1421 let json = serde_json::to_string(environment_details)?;
1422 file.write_all(json.as_bytes())?;
1423 s3_repository
1424 .upload_file("sn-environment-type", &path, true)
1425 .await?;
1426 Ok(())
1427}
1428
1429pub fn calculate_size_per_attached_volume(node_count: u16) -> u16 {
1430 if node_count == 0 {
1431 return 0;
1432 }
1433 let total_volume_required = node_count * STORAGE_REQUIRED_PER_NODE;
1434
1435 (total_volume_required as f64 / 7.0).ceil() as u16
1437}
1438
1439pub fn get_bootstrap_cache_url(ip_addr: &IpAddr) -> String {
1440 format!("http://{ip_addr}/bootstrap_cache.json")
1441}