1pub mod ansible;
8pub mod bootstrap;
9pub mod clients;
10pub mod deploy;
11pub mod digital_ocean;
12pub mod error;
13pub mod funding;
14pub mod infra;
15pub mod inventory;
16pub mod logs;
17pub mod reserved_ip;
18pub mod rpc_client;
19pub mod s3;
20pub mod safe;
21pub mod setup;
22pub mod ssh;
23pub mod terraform;
24pub mod upscale;
25
26const STORAGE_REQUIRED_PER_NODE: u16 = 7;
27
28use crate::{
29 ansible::{
30 extra_vars::ExtraVarsDocBuilder,
31 inventory::{cleanup_environment_inventory, AnsibleInventoryType},
32 provisioning::AnsibleProvisioner,
33 AnsibleRunner,
34 },
35 error::{Error, Result},
36 inventory::{DeploymentInventory, VirtualMachine},
37 rpc_client::RpcClient,
38 s3::S3Repository,
39 ssh::SshClient,
40 terraform::TerraformRunner,
41};
42use ant_service_management::ServiceStatus;
43use flate2::read::GzDecoder;
44use indicatif::{ProgressBar, ProgressStyle};
45use infra::{build_terraform_args, InfraRunOptions};
46use log::{debug, trace};
47use semver::Version;
48use serde::{Deserialize, Serialize};
49use serde_json::json;
50use std::{
51 fs::File,
52 io::{BufRead, BufReader, BufWriter, Write},
53 net::IpAddr,
54 path::{Path, PathBuf},
55 process::{Command, Stdio},
56 str::FromStr,
57 time::Duration,
58};
59use tar::Archive;
60
61const ANSIBLE_DEFAULT_FORKS: usize = 50;
62
63#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
64pub enum DeploymentType {
65 Bootstrap,
67 Client,
69 #[default]
71 New,
72}
73
74#[derive(Debug, Clone, Default, Serialize, Deserialize)]
75pub struct AnvilNodeData {
76 pub data_payments_address: String,
77 pub deployer_wallet_private_key: String,
78 pub payment_token_address: String,
79 pub rpc_url: String,
80}
81
82impl std::fmt::Display for DeploymentType {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self {
85 DeploymentType::Bootstrap => write!(f, "bootstrap"),
86 DeploymentType::Client => write!(f, "clients"),
87 DeploymentType::New => write!(f, "new"),
88 }
89 }
90}
91
92impl std::str::FromStr for DeploymentType {
93 type Err = String;
94
95 fn from_str(s: &str) -> Result<Self, Self::Err> {
96 match s.to_lowercase().as_str() {
97 "bootstrap" => Ok(DeploymentType::Bootstrap),
98 "clients" => Ok(DeploymentType::Client),
99 "new" => Ok(DeploymentType::New),
100 _ => Err(format!("Invalid deployment type: {}", s)),
101 }
102 }
103}
104
105#[derive(Debug, Clone)]
106pub enum NodeType {
107 FullConePrivateNode,
108 Generic,
109 Genesis,
110 PeerCache,
111 SymmetricPrivateNode,
112}
113
114impl std::fmt::Display for NodeType {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 match self {
117 NodeType::FullConePrivateNode => write!(f, "full-cone-private"),
118 NodeType::Generic => write!(f, "generic"),
119 NodeType::Genesis => write!(f, "genesis"),
120 NodeType::PeerCache => write!(f, "peer-cache"),
121 NodeType::SymmetricPrivateNode => write!(f, "symmetric-private"),
122 }
123 }
124}
125
126impl std::str::FromStr for NodeType {
127 type Err = String;
128
129 fn from_str(s: &str) -> Result<Self, Self::Err> {
130 match s.to_lowercase().as_str() {
131 "full-cone-private" => Ok(NodeType::FullConePrivateNode),
132 "generic" => Ok(NodeType::Generic),
133 "genesis" => Ok(NodeType::Genesis),
134 "peer-cache" => Ok(NodeType::PeerCache),
135 "symmetric-private" => Ok(NodeType::SymmetricPrivateNode),
136 _ => Err(format!("Invalid node type: {}", s)),
137 }
138 }
139}
140
141impl NodeType {
142 pub fn telegraf_role(&self) -> &'static str {
143 match self {
144 NodeType::FullConePrivateNode => "NAT_FULL_CONE_NODE",
145 NodeType::Generic => "GENERIC_NODE",
146 NodeType::Genesis => "GENESIS_NODE",
147 NodeType::PeerCache => "PEER_CACHE_NODE",
148 NodeType::SymmetricPrivateNode => "NAT_RANDOMIZED_NODE",
149 }
150 }
151
152 pub fn to_ansible_inventory_type(&self) -> AnsibleInventoryType {
153 match self {
154 NodeType::FullConePrivateNode => AnsibleInventoryType::FullConePrivateNodes,
155 NodeType::Generic => AnsibleInventoryType::Nodes,
156 NodeType::Genesis => AnsibleInventoryType::Genesis,
157 NodeType::PeerCache => AnsibleInventoryType::PeerCacheNodes,
158 NodeType::SymmetricPrivateNode => AnsibleInventoryType::SymmetricPrivateNodes,
159 }
160 }
161}
162
163#[derive(Clone, Debug, Default, Eq, Serialize, Deserialize, PartialEq)]
164pub enum EvmNetwork {
165 #[default]
166 Anvil,
167 ArbitrumOne,
168 ArbitrumSepolia,
169 Custom,
170}
171
172impl std::fmt::Display for EvmNetwork {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 match self {
175 EvmNetwork::Anvil => write!(f, "evm-custom"),
176 EvmNetwork::ArbitrumOne => write!(f, "evm-arbitrum-one"),
177 EvmNetwork::ArbitrumSepolia => write!(f, "evm-arbitrum-sepolia"),
178 EvmNetwork::Custom => write!(f, "evm-custom"),
179 }
180 }
181}
182
183impl std::str::FromStr for EvmNetwork {
184 type Err = String;
185
186 fn from_str(s: &str) -> Result<Self, Self::Err> {
187 match s.to_lowercase().as_str() {
188 "anvil" => Ok(EvmNetwork::Anvil),
189 "arbitrum-one" => Ok(EvmNetwork::ArbitrumOne),
190 "arbitrum-sepolia" => Ok(EvmNetwork::ArbitrumSepolia),
191 "custom" => Ok(EvmNetwork::Custom),
192 _ => Err(format!("Invalid EVM network type: {}", s)),
193 }
194 }
195}
196
197#[derive(Clone, Debug, Default, Serialize, Deserialize)]
198pub struct EvmDetails {
199 pub network: EvmNetwork,
200 pub data_payments_address: Option<String>,
201 pub payment_token_address: Option<String>,
202 pub rpc_url: Option<String>,
203}
204
205#[derive(Clone, Debug, Default, Serialize, Deserialize)]
206pub struct EnvironmentDetails {
207 pub deployment_type: DeploymentType,
208 pub environment_type: EnvironmentType,
209 pub evm_details: EvmDetails,
210 pub funding_wallet_address: Option<String>,
211 pub network_id: Option<u8>,
212 pub region: String,
213 pub rewards_address: Option<String>,
214}
215
216#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
217pub enum EnvironmentType {
218 #[default]
219 Development,
220 Production,
221 Staging,
222}
223
224impl EnvironmentType {
225 pub fn get_tfvars_filenames(&self, name: &str, region: &str) -> Vec<String> {
226 match self {
227 EnvironmentType::Development => vec![
228 "dev.tfvars".to_string(),
229 format!("dev-images-{region}.tfvars", region = region),
230 ],
231 EnvironmentType::Staging => vec![
232 "staging.tfvars".to_string(),
233 format!("staging-images-{region}.tfvars", region = region),
234 ],
235 EnvironmentType::Production => {
236 vec![
237 format!("{name}.tfvars", name = name),
238 format!("production-images-{region}.tfvars", region = region),
239 ]
240 }
241 }
242 }
243
244 pub fn get_default_peer_cache_node_count(&self) -> u16 {
245 match self {
246 EnvironmentType::Development => 5,
247 EnvironmentType::Production => 5,
248 EnvironmentType::Staging => 5,
249 }
250 }
251
252 pub fn get_default_node_count(&self) -> u16 {
253 match self {
254 EnvironmentType::Development => 25,
255 EnvironmentType::Production => 25,
256 EnvironmentType::Staging => 25,
257 }
258 }
259
260 pub fn get_default_symmetric_private_node_count(&self) -> u16 {
261 self.get_default_node_count()
262 }
263
264 pub fn get_default_full_cone_private_node_count(&self) -> u16 {
265 self.get_default_node_count()
266 }
267}
268
269impl std::fmt::Display for EnvironmentType {
270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 match self {
272 EnvironmentType::Development => write!(f, "development"),
273 EnvironmentType::Production => write!(f, "production"),
274 EnvironmentType::Staging => write!(f, "staging"),
275 }
276 }
277}
278
279impl FromStr for EnvironmentType {
280 type Err = Error;
281
282 fn from_str(s: &str) -> Result<Self, Self::Err> {
283 match s.to_lowercase().as_str() {
284 "development" => Ok(EnvironmentType::Development),
285 "production" => Ok(EnvironmentType::Production),
286 "staging" => Ok(EnvironmentType::Staging),
287 _ => Err(Error::EnvironmentNameFromStringError(s.to_string())),
288 }
289 }
290}
291
292#[derive(Clone, Debug, Serialize, Deserialize)]
306pub enum BinaryOption {
307 BuildFromSource {
309 antnode_features: Option<String>,
311 branch: String,
312 repo_owner: String,
313 },
314 Versioned {
316 ant_version: Option<Version>,
317 antctl_version: Option<Version>,
318 antnode_version: Option<Version>,
319 },
320}
321
322impl BinaryOption {
323 pub fn print(&self) {
324 match self {
325 BinaryOption::BuildFromSource {
326 antnode_features,
327 branch,
328 repo_owner,
329 } => {
330 println!("Source configuration:");
331 println!(" Repository owner: {}", repo_owner);
332 println!(" Branch: {}", branch);
333 if let Some(features) = antnode_features {
334 println!(" Antnode features: {}", features);
335 }
336 }
337 BinaryOption::Versioned {
338 ant_version,
339 antctl_version,
340 antnode_version,
341 } => {
342 println!("Versioned binaries configuration:");
343 if let Some(version) = ant_version {
344 println!(" ant version: {}", version);
345 }
346 if let Some(version) = antctl_version {
347 println!(" antctl version: {}", version);
348 }
349 if let Some(version) = antnode_version {
350 println!(" antnode version: {}", version);
351 }
352 }
353 }
354 }
355}
356
357#[derive(Debug, Clone, Copy)]
358pub enum CloudProvider {
359 Aws,
360 DigitalOcean,
361}
362
363impl std::fmt::Display for CloudProvider {
364 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365 match self {
366 CloudProvider::Aws => write!(f, "aws"),
367 CloudProvider::DigitalOcean => write!(f, "digital-ocean"),
368 }
369 }
370}
371
372impl CloudProvider {
373 pub fn get_ssh_user(&self) -> String {
374 match self {
375 CloudProvider::Aws => "ubuntu".to_string(),
376 CloudProvider::DigitalOcean => "root".to_string(),
377 }
378 }
379}
380
381#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
382pub enum LogFormat {
383 Default,
384 Json,
385}
386
387impl LogFormat {
388 pub fn parse_from_str(val: &str) -> Result<Self> {
389 match val {
390 "default" => Ok(LogFormat::Default),
391 "json" => Ok(LogFormat::Json),
392 _ => Err(Error::LoggingConfiguration(
393 "The only valid values for this argument are \"default\" or \"json\"".to_string(),
394 )),
395 }
396 }
397
398 pub fn as_str(&self) -> &'static str {
399 match self {
400 LogFormat::Default => "default",
401 LogFormat::Json => "json",
402 }
403 }
404}
405
406#[derive(Clone)]
407pub struct UpgradeOptions {
408 pub ansible_verbose: bool,
409 pub custom_inventory: Option<Vec<VirtualMachine>>,
410 pub env_variables: Option<Vec<(String, String)>>,
411 pub force: bool,
412 pub forks: usize,
413 pub interval: Duration,
414 pub name: String,
415 pub node_type: Option<NodeType>,
416 pub pre_upgrade_delay: Option<u64>,
417 pub provider: CloudProvider,
418 pub version: Option<String>,
419}
420
421impl UpgradeOptions {
422 pub fn get_ansible_vars(&self) -> String {
423 let mut extra_vars = ExtraVarsDocBuilder::default();
424 extra_vars.add_variable("interval", &self.interval.as_millis().to_string());
425 if let Some(env_variables) = &self.env_variables {
426 extra_vars.add_env_variable_list("env_variables", env_variables.clone());
427 }
428 if self.force {
429 extra_vars.add_variable("force", &self.force.to_string());
430 }
431 if let Some(version) = &self.version {
432 extra_vars.add_variable("antnode_version", version);
433 }
434 if let Some(pre_upgrade_delay) = &self.pre_upgrade_delay {
435 extra_vars.add_variable("pre_upgrade_delay", &pre_upgrade_delay.to_string());
436 }
437 extra_vars.build()
438 }
439}
440
441#[derive(Default)]
442pub struct TestnetDeployBuilder {
443 ansible_forks: Option<usize>,
444 ansible_verbose_mode: bool,
445 deployment_type: EnvironmentType,
446 environment_name: String,
447 provider: Option<CloudProvider>,
448 region: Option<String>,
449 ssh_secret_key_path: Option<PathBuf>,
450 state_bucket_name: Option<String>,
451 terraform_binary_path: Option<PathBuf>,
452 vault_password_path: Option<PathBuf>,
453 working_directory_path: Option<PathBuf>,
454}
455
456impl TestnetDeployBuilder {
457 pub fn new() -> Self {
458 Default::default()
459 }
460
461 pub fn ansible_verbose_mode(&mut self, ansible_verbose_mode: bool) -> &mut Self {
462 self.ansible_verbose_mode = ansible_verbose_mode;
463 self
464 }
465
466 pub fn ansible_forks(&mut self, ansible_forks: usize) -> &mut Self {
467 self.ansible_forks = Some(ansible_forks);
468 self
469 }
470
471 pub fn deployment_type(&mut self, deployment_type: EnvironmentType) -> &mut Self {
472 self.deployment_type = deployment_type;
473 self
474 }
475
476 pub fn environment_name(&mut self, name: &str) -> &mut Self {
477 self.environment_name = name.to_string();
478 self
479 }
480
481 pub fn provider(&mut self, provider: CloudProvider) -> &mut Self {
482 self.provider = Some(provider);
483 self
484 }
485
486 pub fn state_bucket_name(&mut self, state_bucket_name: String) -> &mut Self {
487 self.state_bucket_name = Some(state_bucket_name);
488 self
489 }
490
491 pub fn terraform_binary_path(&mut self, terraform_binary_path: PathBuf) -> &mut Self {
492 self.terraform_binary_path = Some(terraform_binary_path);
493 self
494 }
495
496 pub fn working_directory(&mut self, working_directory_path: PathBuf) -> &mut Self {
497 self.working_directory_path = Some(working_directory_path);
498 self
499 }
500
501 pub fn ssh_secret_key_path(&mut self, ssh_secret_key_path: PathBuf) -> &mut Self {
502 self.ssh_secret_key_path = Some(ssh_secret_key_path);
503 self
504 }
505
506 pub fn vault_password_path(&mut self, vault_password_path: PathBuf) -> &mut Self {
507 self.vault_password_path = Some(vault_password_path);
508 self
509 }
510
511 pub fn region(&mut self, region: String) -> &mut Self {
512 self.region = Some(region);
513 self
514 }
515
516 pub fn build(&self) -> Result<TestnetDeployer> {
517 let provider = self.provider.unwrap_or(CloudProvider::DigitalOcean);
518 match provider {
519 CloudProvider::DigitalOcean => {
520 let digital_ocean_pat = std::env::var("DO_PAT").map_err(|_| {
521 Error::CloudProviderCredentialsNotSupplied("DO_PAT".to_string())
522 })?;
523 std::env::set_var("DIGITALOCEAN_TOKEN", digital_ocean_pat.clone());
527 std::env::set_var("DO_API_TOKEN", digital_ocean_pat);
528 }
529 _ => {
530 return Err(Error::CloudProviderNotSupported(provider.to_string()));
531 }
532 }
533
534 let state_bucket_name = match self.state_bucket_name {
535 Some(ref bucket_name) => bucket_name.clone(),
536 None => std::env::var("TERRAFORM_STATE_BUCKET_NAME")?,
537 };
538
539 let default_terraform_bin_path = PathBuf::from("terraform");
540 let terraform_binary_path = self
541 .terraform_binary_path
542 .as_ref()
543 .unwrap_or(&default_terraform_bin_path);
544
545 let working_directory_path = match self.working_directory_path {
546 Some(ref work_dir_path) => work_dir_path.clone(),
547 None => std::env::current_dir()?.join("resources"),
548 };
549
550 let ssh_secret_key_path = match self.ssh_secret_key_path {
551 Some(ref ssh_sk_path) => ssh_sk_path.clone(),
552 None => PathBuf::from(std::env::var("SSH_KEY_PATH")?),
553 };
554
555 let vault_password_path = match self.vault_password_path {
556 Some(ref vault_pw_path) => vault_pw_path.clone(),
557 None => PathBuf::from(std::env::var("ANSIBLE_VAULT_PASSWORD_PATH")?),
558 };
559
560 let region = match self.region {
561 Some(ref region) => region.clone(),
562 None => "lon1".to_string(),
563 };
564
565 let terraform_runner = TerraformRunner::new(
566 terraform_binary_path.to_path_buf(),
567 working_directory_path
568 .join("terraform")
569 .join("testnet")
570 .join(provider.to_string()),
571 provider,
572 &state_bucket_name,
573 )?;
574 let ansible_runner = AnsibleRunner::new(
575 self.ansible_forks.unwrap_or(ANSIBLE_DEFAULT_FORKS),
576 self.ansible_verbose_mode,
577 &self.environment_name,
578 provider,
579 ssh_secret_key_path.clone(),
580 vault_password_path,
581 working_directory_path.join("ansible"),
582 )?;
583 let ssh_client = SshClient::new(ssh_secret_key_path);
584 let ansible_provisioner =
585 AnsibleProvisioner::new(ansible_runner, provider, ssh_client.clone());
586 let rpc_client = RpcClient::new(
587 PathBuf::from("/usr/local/bin/safenode_rpc_client"),
588 working_directory_path.clone(),
589 );
590
591 let safe_path = working_directory_path.join("safe");
594 if safe_path.exists() {
595 std::fs::remove_file(safe_path)?;
596 }
597
598 let testnet = TestnetDeployer::new(
599 ansible_provisioner,
600 provider,
601 self.deployment_type.clone(),
602 &self.environment_name,
603 rpc_client,
604 S3Repository {},
605 ssh_client,
606 terraform_runner,
607 working_directory_path,
608 region,
609 )?;
610
611 Ok(testnet)
612 }
613}
614
615#[derive(Clone)]
616pub struct TestnetDeployer {
617 pub ansible_provisioner: AnsibleProvisioner,
618 pub cloud_provider: CloudProvider,
619 pub deployment_type: EnvironmentType,
620 pub environment_name: String,
621 pub inventory_file_path: PathBuf,
622 pub region: String,
623 pub rpc_client: RpcClient,
624 pub s3_repository: S3Repository,
625 pub ssh_client: SshClient,
626 pub terraform_runner: TerraformRunner,
627 pub working_directory_path: PathBuf,
628}
629
630impl TestnetDeployer {
631 #[allow(clippy::too_many_arguments)]
632 pub fn new(
633 ansible_provisioner: AnsibleProvisioner,
634 cloud_provider: CloudProvider,
635 deployment_type: EnvironmentType,
636 environment_name: &str,
637 rpc_client: RpcClient,
638 s3_repository: S3Repository,
639 ssh_client: SshClient,
640 terraform_runner: TerraformRunner,
641 working_directory_path: PathBuf,
642 region: String,
643 ) -> Result<TestnetDeployer> {
644 if environment_name.is_empty() {
645 return Err(Error::EnvironmentNameRequired);
646 }
647 let inventory_file_path = working_directory_path
648 .join("ansible")
649 .join("inventory")
650 .join("dev_inventory_digital_ocean.yml");
651 Ok(TestnetDeployer {
652 ansible_provisioner,
653 cloud_provider,
654 deployment_type,
655 environment_name: environment_name.to_string(),
656 inventory_file_path,
657 region,
658 rpc_client,
659 ssh_client,
660 s3_repository,
661 terraform_runner,
662 working_directory_path,
663 })
664 }
665
666 pub async fn init(&self) -> Result<()> {
667 if self
668 .s3_repository
669 .folder_exists(
670 "sn-testnet",
671 &format!("testnet-logs/{}", self.environment_name),
672 )
673 .await?
674 {
675 return Err(Error::LogsForPreviousTestnetExist(
676 self.environment_name.clone(),
677 ));
678 }
679
680 self.terraform_runner.init()?;
681 let workspaces = self.terraform_runner.workspace_list()?;
682 if !workspaces.contains(&self.environment_name) {
683 self.terraform_runner
684 .workspace_new(&self.environment_name)?;
685 } else {
686 println!("Workspace {} already exists", self.environment_name);
687 }
688
689 let rpc_client_path = self.working_directory_path.join("safenode_rpc_client");
690 if !rpc_client_path.is_file() {
691 println!("Downloading the rpc client for safenode...");
692 let archive_name = "safenode_rpc_client-latest-x86_64-unknown-linux-musl.tar.gz";
693 get_and_extract_archive_from_s3(
694 &self.s3_repository,
695 "sn-node-rpc-client",
696 archive_name,
697 &self.working_directory_path,
698 )
699 .await?;
700 #[cfg(unix)]
701 {
702 use std::os::unix::fs::PermissionsExt;
703 let mut permissions = std::fs::metadata(&rpc_client_path)?.permissions();
704 permissions.set_mode(0o755); std::fs::set_permissions(&rpc_client_path, permissions)?;
706 }
707 }
708
709 Ok(())
710 }
711
712 pub fn plan(&self, options: &InfraRunOptions) -> Result<()> {
713 println!("Selecting {} workspace...", options.name);
714 self.terraform_runner.workspace_select(&options.name)?;
715
716 let args = build_terraform_args(options)?;
717
718 self.terraform_runner
719 .plan(Some(args), options.tfvars_filenames.clone())?;
720 Ok(())
721 }
722
723 pub fn start(
724 &self,
725 interval: Duration,
726 node_type: Option<NodeType>,
727 custom_inventory: Option<Vec<VirtualMachine>>,
728 ) -> Result<()> {
729 self.ansible_provisioner.start_nodes(
730 &self.environment_name,
731 interval,
732 node_type,
733 custom_inventory,
734 )?;
735 Ok(())
736 }
737
738 pub fn status(&self) -> Result<()> {
744 self.ansible_provisioner.status()?;
745
746 let peer_cache_node_registries = self
747 .ansible_provisioner
748 .get_node_registries(&AnsibleInventoryType::PeerCacheNodes)?;
749 let generic_node_registries = self
750 .ansible_provisioner
751 .get_node_registries(&AnsibleInventoryType::Nodes)?;
752 let symmetric_private_node_registries = self
753 .ansible_provisioner
754 .get_node_registries(&AnsibleInventoryType::SymmetricPrivateNodes)?;
755 let full_cone_private_node_registries = self
756 .ansible_provisioner
757 .get_node_registries(&AnsibleInventoryType::FullConePrivateNodes)?;
758 let genesis_node_registry = self
759 .ansible_provisioner
760 .get_node_registries(&AnsibleInventoryType::Genesis)?
761 .clone();
762
763 peer_cache_node_registries.print();
764 generic_node_registries.print();
765 symmetric_private_node_registries.print();
766 full_cone_private_node_registries.print();
767 genesis_node_registry.print();
768
769 let all_registries = [
770 &peer_cache_node_registries,
771 &generic_node_registries,
772 &symmetric_private_node_registries,
773 &full_cone_private_node_registries,
774 &genesis_node_registry,
775 ];
776
777 let mut total_nodes = 0;
778 let mut running_nodes = 0;
779 let mut stopped_nodes = 0;
780 let mut added_nodes = 0;
781 let mut removed_nodes = 0;
782
783 for (_, registry) in all_registries
784 .iter()
785 .flat_map(|r| r.retrieved_registries.iter())
786 {
787 for node in registry.nodes.iter() {
788 total_nodes += 1;
789 match node.status {
790 ServiceStatus::Running => running_nodes += 1,
791 ServiceStatus::Stopped => stopped_nodes += 1,
792 ServiceStatus::Added => added_nodes += 1,
793 ServiceStatus::Removed => removed_nodes += 1,
794 }
795 }
796 }
797
798 let peer_cache_hosts = peer_cache_node_registries.retrieved_registries.len();
799 let generic_hosts = generic_node_registries.retrieved_registries.len();
800 let symmetric_private_hosts = symmetric_private_node_registries.retrieved_registries.len();
801 let full_cone_private_hosts = full_cone_private_node_registries.retrieved_registries.len();
802
803 let peer_cache_nodes = peer_cache_node_registries
804 .retrieved_registries
805 .iter()
806 .flat_map(|(_, n)| n.nodes.iter())
807 .count();
808 let generic_nodes = generic_node_registries
809 .retrieved_registries
810 .iter()
811 .flat_map(|(_, n)| n.nodes.iter())
812 .count();
813 let symmetric_private_nodes = symmetric_private_node_registries
814 .retrieved_registries
815 .iter()
816 .flat_map(|(_, n)| n.nodes.iter())
817 .count();
818 let full_cone_private_nodes = full_cone_private_node_registries
819 .retrieved_registries
820 .iter()
821 .flat_map(|(_, n)| n.nodes.iter())
822 .count();
823
824 println!("-------");
825 println!("Summary");
826 println!("-------");
827 println!(
828 "Total peer cache nodes ({}x{}): {}",
829 peer_cache_hosts,
830 if peer_cache_hosts > 0 {
831 peer_cache_nodes / peer_cache_hosts
832 } else {
833 0
834 },
835 peer_cache_nodes
836 );
837 println!(
838 "Total generic nodes ({}x{}): {}",
839 generic_hosts,
840 if generic_hosts > 0 {
841 generic_nodes / generic_hosts
842 } else {
843 0
844 },
845 generic_nodes
846 );
847 println!(
848 "Total symmetric private nodes ({}x{}): {}",
849 symmetric_private_hosts,
850 if symmetric_private_hosts > 0 {
851 symmetric_private_nodes / symmetric_private_hosts
852 } else {
853 0
854 },
855 symmetric_private_nodes
856 );
857 println!(
858 "Total full cone private nodes ({}x{}): {}",
859 full_cone_private_hosts,
860 if full_cone_private_hosts > 0 {
861 full_cone_private_nodes / full_cone_private_hosts
862 } else {
863 0
864 },
865 full_cone_private_nodes
866 );
867 println!("Total nodes: {}", total_nodes);
868 println!("Running nodes: {}", running_nodes);
869 println!("Stopped nodes: {}", stopped_nodes);
870 println!("Added nodes: {}", added_nodes);
871 println!("Removed nodes: {}", removed_nodes);
872
873 Ok(())
874 }
875
876 pub fn cleanup_node_logs(&self, setup_cron: bool) -> Result<()> {
877 self.ansible_provisioner.cleanup_node_logs(setup_cron)?;
878 Ok(())
879 }
880
881 pub fn start_telegraf(
882 &self,
883 node_type: Option<NodeType>,
884 custom_inventory: Option<Vec<VirtualMachine>>,
885 ) -> Result<()> {
886 self.ansible_provisioner.start_telegraf(
887 &self.environment_name,
888 node_type,
889 custom_inventory,
890 )?;
891 Ok(())
892 }
893
894 pub fn stop(
895 &self,
896 interval: Duration,
897 node_type: Option<NodeType>,
898 custom_inventory: Option<Vec<VirtualMachine>>,
899 delay: Option<u64>,
900 service_names: Option<Vec<String>>,
901 ) -> Result<()> {
902 self.ansible_provisioner.stop_nodes(
903 &self.environment_name,
904 interval,
905 node_type,
906 custom_inventory,
907 delay,
908 service_names,
909 )?;
910 Ok(())
911 }
912
913 pub fn stop_telegraf(
914 &self,
915 node_type: Option<NodeType>,
916 custom_inventory: Option<Vec<VirtualMachine>>,
917 ) -> Result<()> {
918 self.ansible_provisioner.stop_telegraf(
919 &self.environment_name,
920 node_type,
921 custom_inventory,
922 )?;
923 Ok(())
924 }
925
926 pub fn upgrade(&self, options: UpgradeOptions) -> Result<()> {
927 self.ansible_provisioner.upgrade_nodes(&options)?;
928 Ok(())
929 }
930
931 pub fn upgrade_antctl(
932 &self,
933 version: Version,
934 node_type: Option<NodeType>,
935 custom_inventory: Option<Vec<VirtualMachine>>,
936 ) -> Result<()> {
937 self.ansible_provisioner.upgrade_antctl(
938 &self.environment_name,
939 &version,
940 node_type,
941 custom_inventory,
942 )?;
943 Ok(())
944 }
945
946 pub fn upgrade_node_telegraf(&self, name: &str) -> Result<()> {
947 self.ansible_provisioner.upgrade_node_telegraf(name)?;
948 Ok(())
949 }
950
951 pub fn upgrade_client_telegraf(&self, name: &str) -> Result<()> {
952 self.ansible_provisioner.upgrade_client_telegraf(name)?;
953 Ok(())
954 }
955
956 pub async fn clean(&self) -> Result<()> {
957 let environment_details =
958 get_environment_details(&self.environment_name, &self.s3_repository)
959 .await
960 .inspect_err(|err| {
961 println!("Failed to get environment details: {err}. Continuing cleanup...");
962 })
963 .ok();
964 if let Some(environment_details) = &environment_details {
965 funding::drain_funds(&self.ansible_provisioner, environment_details).await?;
966 }
967
968 self.destroy_infra(environment_details).await?;
969
970 cleanup_environment_inventory(
971 &self.environment_name,
972 &self
973 .working_directory_path
974 .join("ansible")
975 .join("inventory"),
976 None,
977 )?;
978
979 println!("Deleted Ansible inventory for {}", self.environment_name);
980
981 if let Err(err) = self
982 .s3_repository
983 .delete_object("sn-environment-type", &self.environment_name)
984 .await
985 {
986 println!("Failed to delete environment type: {err}. Continuing cleanup...");
987 }
988 Ok(())
989 }
990
991 async fn destroy_infra(&self, environment_details: Option<EnvironmentDetails>) -> Result<()> {
992 infra::select_workspace(&self.terraform_runner, &self.environment_name)?;
993
994 let options = InfraRunOptions::generate_existing(
995 &self.environment_name,
996 &self.region,
997 &self.terraform_runner,
998 environment_details.as_ref(),
999 )
1000 .await?;
1001
1002 let args = build_terraform_args(&options)?;
1003 let tfvars_filenames = if let Some(environment_details) = &environment_details {
1004 environment_details
1005 .environment_type
1006 .get_tfvars_filenames(&self.environment_name, &self.region)
1007 } else {
1008 vec![]
1009 };
1010
1011 self.terraform_runner
1012 .destroy(Some(args), Some(tfvars_filenames))?;
1013
1014 infra::delete_workspace(&self.terraform_runner, &self.environment_name)?;
1015
1016 Ok(())
1017 }
1018}
1019
1020pub fn get_genesis_multiaddr(
1025 ansible_runner: &AnsibleRunner,
1026 ssh_client: &SshClient,
1027) -> Result<(String, IpAddr)> {
1028 let genesis_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Genesis, true)?;
1029 let genesis_ip = genesis_inventory[0].public_ip_addr;
1030
1031 let multiaddr = ssh_client
1035 .run_command(
1036 &genesis_ip,
1037 "root",
1038 "jq -r '.nodes[] | select(.peers_args.first == true) | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1039 false,
1040 )
1041 .map(|output| output.first().cloned())
1042 .unwrap_or_else(|err| {
1043 log::error!("Failed to find first node with quic-v1 protocol: {err:?}");
1044 None
1045 });
1046
1047 let multiaddr = match multiaddr {
1049 Some(addr) => addr,
1050 None => ssh_client
1051 .run_command(
1052 &genesis_ip,
1053 "root",
1054 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1055 false,
1056 )?
1057 .first()
1058 .cloned()
1059 .ok_or_else(|| Error::GenesisListenAddress)?,
1060 };
1061
1062 Ok((multiaddr, genesis_ip))
1063}
1064
1065pub fn get_anvil_node_data(
1066 ansible_runner: &AnsibleRunner,
1067 ssh_client: &SshClient,
1068) -> Result<AnvilNodeData> {
1069 let evm_inventory = ansible_runner.get_inventory(AnsibleInventoryType::EvmNodes, true)?;
1070 if evm_inventory.is_empty() {
1071 return Err(Error::EvmNodeNotFound);
1072 }
1073
1074 let evm_ip = evm_inventory[0].public_ip_addr;
1075 debug!("Retrieved IP address for EVM node: {evm_ip}");
1076 let csv_file_path = "/home/ant/.local/share/autonomi/evm_testnet_data.csv";
1077
1078 const MAX_ATTEMPTS: u8 = 5;
1079 const RETRY_DELAY: Duration = Duration::from_secs(5);
1080
1081 for attempt in 1..=MAX_ATTEMPTS {
1082 match ssh_client.run_command(&evm_ip, "ant", &format!("cat {}", csv_file_path), false) {
1083 Ok(output) => {
1084 if let Some(csv_contents) = output.first() {
1085 let parts: Vec<&str> = csv_contents.split(',').collect();
1086 if parts.len() != 4 {
1087 return Err(Error::EvmTestnetDataParsingError(
1088 "Expected 4 fields in the CSV".to_string(),
1089 ));
1090 }
1091
1092 let evm_testnet_data = AnvilNodeData {
1093 rpc_url: parts[0].trim().to_string(),
1094 payment_token_address: parts[1].trim().to_string(),
1095 data_payments_address: parts[2].trim().to_string(),
1096 deployer_wallet_private_key: parts[3].trim().to_string(),
1097 };
1098 return Ok(evm_testnet_data);
1099 }
1100 }
1101 Err(e) => {
1102 if attempt == MAX_ATTEMPTS {
1103 return Err(e);
1104 }
1105 println!(
1106 "Attempt {} failed to read EVM testnet data. Retrying in {} seconds...",
1107 attempt,
1108 RETRY_DELAY.as_secs()
1109 );
1110 }
1111 }
1112 std::thread::sleep(RETRY_DELAY);
1113 }
1114
1115 Err(Error::EvmTestnetDataNotFound)
1116}
1117
1118pub fn get_multiaddr(
1119 ansible_runner: &AnsibleRunner,
1120 ssh_client: &SshClient,
1121) -> Result<(String, IpAddr)> {
1122 let node_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Nodes, true)?;
1123 let node_ip = node_inventory
1126 .iter()
1127 .find(|vm| vm.name.ends_with("-node-1"))
1128 .ok_or_else(|| Error::NodeAddressNotFound)?
1129 .public_ip_addr;
1130
1131 debug!("Getting multiaddr from node {node_ip}");
1132
1133 let multiaddr =
1134 ssh_client
1135 .run_command(
1136 &node_ip,
1137 "root",
1138 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not)' /var/antctl/node_registry.json | head -n 1",
1140 false,
1141 )?.first()
1142 .cloned()
1143 .ok_or_else(|| Error::NodeAddressNotFound)?;
1144
1145 Ok((multiaddr, node_ip))
1148}
1149
1150pub async fn get_and_extract_archive_from_s3(
1151 s3_repository: &S3Repository,
1152 bucket_name: &str,
1153 archive_bucket_path: &str,
1154 dest_path: &Path,
1155) -> Result<()> {
1156 let archive_file_name = archive_bucket_path.split('/').next_back().unwrap();
1159 let archive_dest_path = dest_path.join(archive_file_name);
1160 s3_repository
1161 .download_object(bucket_name, archive_bucket_path, &archive_dest_path)
1162 .await?;
1163 extract_archive(&archive_dest_path, dest_path)?;
1164 Ok(())
1165}
1166
1167pub fn extract_archive(archive_path: &Path, dest_path: &Path) -> Result<()> {
1168 let archive_file = File::open(archive_path)?;
1169 let decoder = GzDecoder::new(archive_file);
1170 let mut archive = Archive::new(decoder);
1171 let entries = archive.entries()?;
1172 for entry_result in entries {
1173 let mut entry = entry_result?;
1174 let extract_path = dest_path.join(entry.path()?);
1175 if entry.header().entry_type() == tar::EntryType::Directory {
1176 std::fs::create_dir_all(extract_path)?;
1177 continue;
1178 }
1179 let mut file = BufWriter::new(File::create(extract_path)?);
1180 std::io::copy(&mut entry, &mut file)?;
1181 }
1182 std::fs::remove_file(archive_path)?;
1183 Ok(())
1184}
1185
1186pub fn run_external_command(
1187 binary_path: PathBuf,
1188 working_directory_path: PathBuf,
1189 args: Vec<String>,
1190 suppress_stdout: bool,
1191 suppress_stderr: bool,
1192) -> Result<Vec<String>> {
1193 let mut command = Command::new(binary_path.clone());
1194 for arg in &args {
1195 command.arg(arg);
1196 }
1197 command.stdout(Stdio::piped());
1198 command.stderr(Stdio::piped());
1199 command.current_dir(working_directory_path.clone());
1200 debug!("Running {binary_path:#?} with args {args:#?}");
1201 debug!("Working directory set to {working_directory_path:#?}");
1202
1203 let mut child = command.spawn()?;
1204 let mut output_lines = Vec::new();
1205
1206 if let Some(ref mut stdout) = child.stdout {
1207 let reader = BufReader::new(stdout);
1208 for line in reader.lines() {
1209 let line = line?;
1210 if !suppress_stdout {
1211 println!("{line}");
1212 }
1213 output_lines.push(line);
1214 }
1215 }
1216
1217 if let Some(ref mut stderr) = child.stderr {
1218 let reader = BufReader::new(stderr);
1219 for line in reader.lines() {
1220 let line = line?;
1221 if !suppress_stderr {
1222 eprintln!("{line}");
1223 }
1224 output_lines.push(line);
1225 }
1226 }
1227
1228 let output = child.wait()?;
1229 if !output.success() {
1230 let binary_path = binary_path.to_str().unwrap();
1232 return Err(Error::ExternalCommandRunFailed {
1233 binary: binary_path.to_string(),
1234 exit_status: output,
1235 });
1236 }
1237
1238 Ok(output_lines)
1239}
1240
1241pub fn is_binary_on_path(binary_name: &str) -> bool {
1242 if let Ok(path) = std::env::var("PATH") {
1243 for dir in path.split(':') {
1244 let mut full_path = PathBuf::from(dir);
1245 full_path.push(binary_name);
1246 if full_path.exists() {
1247 return true;
1248 }
1249 }
1250 }
1251 false
1252}
1253
1254pub fn get_wallet_directory() -> Result<PathBuf> {
1255 Ok(dirs_next::data_dir()
1256 .ok_or_else(|| Error::CouldNotRetrieveDataDirectory)?
1257 .join("safe")
1258 .join("client")
1259 .join("wallet"))
1260}
1261
1262pub async fn notify_slack(inventory: DeploymentInventory) -> Result<()> {
1263 let webhook_url =
1264 std::env::var("SLACK_WEBHOOK_URL").map_err(|_| Error::SlackWebhookUrlNotSupplied)?;
1265
1266 let mut message = String::new();
1267 message.push_str("*Testnet Details*\n");
1268 message.push_str(&format!("Name: {}\n", inventory.name));
1269 message.push_str(&format!("Node count: {}\n", inventory.peers().len()));
1270 message.push_str(&format!("Faucet address: {:?}\n", inventory.faucet_address));
1271 match inventory.binary_option {
1272 BinaryOption::BuildFromSource {
1273 ref repo_owner,
1274 ref branch,
1275 ..
1276 } => {
1277 message.push_str("*Branch Details*\n");
1278 message.push_str(&format!("Repo owner: {}\n", repo_owner));
1279 message.push_str(&format!("Branch: {}\n", branch));
1280 }
1281 BinaryOption::Versioned {
1282 ant_version: ref safe_version,
1283 antnode_version: ref safenode_version,
1284 antctl_version: ref safenode_manager_version,
1285 ..
1286 } => {
1287 message.push_str("*Version Details*\n");
1288 message.push_str(&format!(
1289 "ant version: {}\n",
1290 safe_version
1291 .as_ref()
1292 .map_or("None".to_string(), |v| v.to_string())
1293 ));
1294 message.push_str(&format!(
1295 "safenode version: {}\n",
1296 safenode_version
1297 .as_ref()
1298 .map_or("None".to_string(), |v| v.to_string())
1299 ));
1300 message.push_str(&format!(
1301 "antctl version: {}\n",
1302 safenode_manager_version
1303 .as_ref()
1304 .map_or("None".to_string(), |v| v.to_string())
1305 ));
1306 }
1307 }
1308
1309 message.push_str("*Sample Peers*\n");
1310 message.push_str("```\n");
1311 for peer in inventory.peers().iter().take(20) {
1312 message.push_str(&format!("{peer}\n"));
1313 }
1314 message.push_str("```\n");
1315 message.push_str("*Available Files*\n");
1316 message.push_str("```\n");
1317 for (addr, file_name) in inventory.uploaded_files.iter() {
1318 message.push_str(&format!("{}: {}\n", addr, file_name))
1319 }
1320 message.push_str("```\n");
1321
1322 let payload = json!({
1323 "text": message,
1324 });
1325 reqwest::Client::new()
1326 .post(webhook_url)
1327 .json(&payload)
1328 .send()
1329 .await?;
1330 println!("{message}");
1331 println!("Posted notification to Slack");
1332 Ok(())
1333}
1334
1335fn print_duration(duration: Duration) {
1336 let total_seconds = duration.as_secs();
1337 let minutes = total_seconds / 60;
1338 let seconds = total_seconds % 60;
1339 debug!("Time taken: {} minutes and {} seconds", minutes, seconds);
1340}
1341
1342pub fn get_progress_bar(length: u64) -> Result<ProgressBar> {
1343 let progress_bar = ProgressBar::new(length);
1344 progress_bar.set_style(
1345 ProgressStyle::default_bar()
1346 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len}")?
1347 .progress_chars("#>-"),
1348 );
1349 progress_bar.enable_steady_tick(Duration::from_millis(100));
1350 Ok(progress_bar)
1351}
1352
1353pub async fn get_environment_details(
1354 environment_name: &str,
1355 s3_repository: &S3Repository,
1356) -> Result<EnvironmentDetails> {
1357 let temp_file = tempfile::NamedTempFile::new()?;
1358
1359 let max_retries = 3;
1360 let mut retries = 0;
1361 let env_details = loop {
1362 debug!("Downloading the environment details file for {environment_name} from S3");
1363 match s3_repository
1364 .download_object("sn-environment-type", environment_name, temp_file.path())
1365 .await
1366 {
1367 Ok(_) => {
1368 debug!("Downloaded the environment details file for {environment_name} from S3");
1369 let content = match std::fs::read_to_string(temp_file.path()) {
1370 Ok(content) => content,
1371 Err(err) => {
1372 log::error!("Could not read the environment details file: {err:?}");
1373 if retries < max_retries {
1374 debug!("Retrying to read the environment details file");
1375 retries += 1;
1376 continue;
1377 } else {
1378 return Err(Error::EnvironmentDetailsNotFound(
1379 environment_name.to_string(),
1380 ));
1381 }
1382 }
1383 };
1384 trace!("Content of the environment details file: {}", content);
1385
1386 match serde_json::from_str(&content) {
1387 Ok(environment_details) => break environment_details,
1388 Err(err) => {
1389 log::error!("Could not parse the environment details file: {err:?}");
1390 if retries < max_retries {
1391 debug!("Retrying to parse the environment details file");
1392 retries += 1;
1393 continue;
1394 } else {
1395 return Err(Error::EnvironmentDetailsNotFound(
1396 environment_name.to_string(),
1397 ));
1398 }
1399 }
1400 }
1401 }
1402 Err(err) => {
1403 log::error!(
1404 "Could not download the environment details file for {environment_name} from S3: {err:?}"
1405 );
1406 if retries < max_retries {
1407 retries += 1;
1408 continue;
1409 } else {
1410 return Err(Error::EnvironmentDetailsNotFound(
1411 environment_name.to_string(),
1412 ));
1413 }
1414 }
1415 }
1416 };
1417
1418 debug!("Fetched environment details: {env_details:?}");
1419
1420 Ok(env_details)
1421}
1422
1423pub async fn write_environment_details(
1424 s3_repository: &S3Repository,
1425 environment_name: &str,
1426 environment_details: &EnvironmentDetails,
1427) -> Result<()> {
1428 let temp_dir = tempfile::tempdir()?;
1429 let path = temp_dir.path().to_path_buf().join(environment_name);
1430 let mut file = File::create(&path)?;
1431 let json = serde_json::to_string(environment_details)?;
1432 file.write_all(json.as_bytes())?;
1433 s3_repository
1434 .upload_file("sn-environment-type", &path, true)
1435 .await?;
1436 Ok(())
1437}
1438
1439pub fn calculate_size_per_attached_volume(node_count: u16) -> u16 {
1440 if node_count == 0 {
1441 return 0;
1442 }
1443 let total_volume_required = node_count * STORAGE_REQUIRED_PER_NODE;
1444
1445 (total_volume_required as f64 / 7.0).ceil() as u16
1447}
1448
1449pub fn get_bootstrap_cache_url(ip_addr: &IpAddr) -> String {
1450 format!("http://{ip_addr}/bootstrap_cache.json")
1451}