1pub mod ansible;
8pub mod bootstrap;
9pub mod deploy;
10pub mod digital_ocean;
11pub mod error;
12pub mod funding;
13pub mod infra;
14pub mod inventory;
15pub mod logs;
16pub mod logstash;
17pub mod reserved_ip;
18pub mod rpc_client;
19pub mod s3;
20pub mod safe;
21pub mod setup;
22pub mod ssh;
23pub mod terraform;
24pub mod upscale;
25
26const STORAGE_REQUIRED_PER_NODE: u16 = 7;
27
28use crate::{
29 ansible::{
30 extra_vars::ExtraVarsDocBuilder,
31 inventory::{cleanup_environment_inventory, AnsibleInventoryType},
32 provisioning::AnsibleProvisioner,
33 AnsibleRunner,
34 },
35 error::{Error, Result},
36 inventory::{DeploymentInventory, VirtualMachine},
37 rpc_client::RpcClient,
38 s3::S3Repository,
39 ssh::SshClient,
40 terraform::TerraformRunner,
41};
42use alloy::primitives::Address;
43use ant_service_management::ServiceStatus;
44use evmlib::Network;
45use flate2::read::GzDecoder;
46use indicatif::{ProgressBar, ProgressStyle};
47use infra::{build_terraform_args, InfraRunOptions};
48use log::{debug, trace};
49use semver::Version;
50use serde::{Deserialize, Serialize};
51use serde_json::json;
52use std::{
53 fs::File,
54 io::{BufRead, BufReader, BufWriter, Write},
55 net::IpAddr,
56 path::{Path, PathBuf},
57 process::{Command, Stdio},
58 str::FromStr,
59 time::Duration,
60};
61use tar::Archive;
62
63const ANSIBLE_DEFAULT_FORKS: usize = 50;
64
65#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
66pub enum DeploymentType {
67 Bootstrap,
69 #[default]
71 New,
72}
73
74#[derive(Debug, Clone, Default, Serialize, Deserialize)]
75pub struct AnvilNodeData {
76 pub data_payments_address: String,
77 pub deployer_wallet_private_key: String,
78 pub payment_token_address: String,
79 pub rpc_url: String,
80}
81
82impl std::fmt::Display for DeploymentType {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self {
85 DeploymentType::Bootstrap => write!(f, "bootstrap"),
86 DeploymentType::New => write!(f, "new"),
87 }
88 }
89}
90
91impl std::str::FromStr for DeploymentType {
92 type Err = String;
93
94 fn from_str(s: &str) -> Result<Self, Self::Err> {
95 match s.to_lowercase().as_str() {
96 "bootstrap" => Ok(DeploymentType::Bootstrap),
97 "new" => Ok(DeploymentType::New),
98 _ => Err(format!("Invalid deployment type: {}", s)),
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
104pub enum NodeType {
105 FullConePrivateNode,
106 Generic,
107 Genesis,
108 PeerCache,
109 SymmetricPrivateNode,
110}
111
112impl std::str::FromStr for NodeType {
113 type Err = String;
114
115 fn from_str(s: &str) -> Result<Self, Self::Err> {
116 match s.to_lowercase().as_str() {
117 "full-cone-private" => Ok(NodeType::FullConePrivateNode),
118 "generic" => Ok(NodeType::Generic),
119 "genesis" => Ok(NodeType::Genesis),
120 "peer-cache" => Ok(NodeType::PeerCache),
121 "symmetric-private" => Ok(NodeType::SymmetricPrivateNode),
122 _ => Err(format!("Invalid node type: {}", s)),
123 }
124 }
125}
126
127impl NodeType {
128 pub fn telegraf_role(&self) -> &'static str {
129 match self {
130 NodeType::FullConePrivateNode => "NAT_FULL_CONE_NODE",
131 NodeType::Generic => "GENERIC_NODE",
132 NodeType::Genesis => "GENESIS_NODE",
133 NodeType::PeerCache => "PEER_CACHE_NODE",
134 NodeType::SymmetricPrivateNode => "NAT_RANDOMIZED_NODE",
135 }
136 }
137
138 pub fn to_ansible_inventory_type(&self) -> AnsibleInventoryType {
139 match self {
140 NodeType::FullConePrivateNode => AnsibleInventoryType::FullConePrivateNodes,
141 NodeType::Generic => AnsibleInventoryType::Nodes,
142 NodeType::Genesis => AnsibleInventoryType::Genesis,
143 NodeType::PeerCache => AnsibleInventoryType::PeerCacheNodes,
144 NodeType::SymmetricPrivateNode => AnsibleInventoryType::SymmetricPrivateNodes,
145 }
146 }
147}
148
149#[derive(Clone, Debug, Default, Eq, Serialize, Deserialize, PartialEq)]
150pub enum EvmNetwork {
151 #[default]
152 Anvil,
153 ArbitrumOne,
154 ArbitrumSepolia,
155 Custom,
156}
157
158impl std::fmt::Display for EvmNetwork {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 match self {
161 EvmNetwork::Anvil => write!(f, "evm-custom"),
162 EvmNetwork::ArbitrumOne => write!(f, "evm-arbitrum-one"),
163 EvmNetwork::ArbitrumSepolia => write!(f, "evm-arbitrum-sepolia"),
164 EvmNetwork::Custom => write!(f, "evm-custom"),
165 }
166 }
167}
168
169impl std::str::FromStr for EvmNetwork {
170 type Err = String;
171
172 fn from_str(s: &str) -> Result<Self, Self::Err> {
173 match s.to_lowercase().as_str() {
174 "anvil" => Ok(EvmNetwork::Anvil),
175 "arbitrum-one" => Ok(EvmNetwork::ArbitrumOne),
176 "arbitrum-sepolia" => Ok(EvmNetwork::ArbitrumSepolia),
177 "custom" => Ok(EvmNetwork::Custom),
178 _ => Err(format!("Invalid EVM network type: {}", s)),
179 }
180 }
181}
182
183#[derive(Clone, Debug, Default, Serialize, Deserialize)]
184pub struct EnvironmentDetails {
185 pub deployment_type: DeploymentType,
186 pub environment_type: EnvironmentType,
187 pub evm_network: EvmNetwork,
188 pub evm_data_payments_address: Option<String>,
189 pub evm_payment_token_address: Option<String>,
190 pub evm_rpc_url: Option<String>,
191 pub funding_wallet_address: Option<String>,
192 pub network_id: Option<u8>,
193 pub rewards_address: String,
194}
195
196#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
197pub enum EnvironmentType {
198 #[default]
199 Development,
200 Production,
201 Staging,
202}
203
204impl EnvironmentType {
205 pub fn get_tfvars_filename(&self, name: &str) -> String {
206 match self {
207 EnvironmentType::Development => "dev.tfvars".to_string(),
208 EnvironmentType::Staging => "staging.tfvars".to_string(),
209 EnvironmentType::Production => {
210 format!("{name}.tfvars",)
211 }
212 }
213 }
214
215 pub fn get_default_peer_cache_node_count(&self) -> u16 {
216 match self {
217 EnvironmentType::Development => 5,
218 EnvironmentType::Production => 5,
219 EnvironmentType::Staging => 5,
220 }
221 }
222
223 pub fn get_default_node_count(&self) -> u16 {
224 match self {
225 EnvironmentType::Development => 25,
226 EnvironmentType::Production => 25,
227 EnvironmentType::Staging => 25,
228 }
229 }
230
231 pub fn get_default_symmetric_private_node_count(&self) -> u16 {
232 self.get_default_node_count()
233 }
234
235 pub fn get_default_full_cone_private_node_count(&self) -> u16 {
236 self.get_default_node_count()
237 }
238}
239
240impl std::fmt::Display for EnvironmentType {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 match self {
243 EnvironmentType::Development => write!(f, "development"),
244 EnvironmentType::Production => write!(f, "production"),
245 EnvironmentType::Staging => write!(f, "staging"),
246 }
247 }
248}
249
250impl FromStr for EnvironmentType {
251 type Err = Error;
252
253 fn from_str(s: &str) -> Result<Self, Self::Err> {
254 match s.to_lowercase().as_str() {
255 "development" => Ok(EnvironmentType::Development),
256 "production" => Ok(EnvironmentType::Production),
257 "staging" => Ok(EnvironmentType::Staging),
258 _ => Err(Error::EnvironmentNameFromStringError(s.to_string())),
259 }
260 }
261}
262
263#[derive(Clone, Debug, Serialize, Deserialize)]
277pub enum BinaryOption {
278 BuildFromSource {
280 antnode_features: Option<String>,
282 branch: String,
283 repo_owner: String,
284 },
285 Versioned {
287 ant_version: Option<Version>,
288 antctl_version: Version,
289 antnode_version: Version,
290 },
291}
292
293impl BinaryOption {
294 pub fn print(&self) {
295 match self {
296 BinaryOption::BuildFromSource {
297 antnode_features,
298 branch,
299 repo_owner,
300 } => {
301 println!("Source configuration:");
302 println!(" Repository owner: {}", repo_owner);
303 println!(" Branch: {}", branch);
304 if let Some(features) = antnode_features {
305 println!(" Antnode features: {}", features);
306 }
307 }
308 BinaryOption::Versioned {
309 ant_version,
310 antctl_version,
311 antnode_version,
312 } => {
313 println!("Versioned binaries configuration:");
314 if let Some(version) = ant_version {
315 println!(" ant version: {}", version);
316 }
317 println!(" antctl version: {}", antctl_version);
318 println!(" antnode version: {}", antnode_version);
319 }
320 }
321 }
322}
323
324#[derive(Debug, Clone, Copy)]
325pub enum CloudProvider {
326 Aws,
327 DigitalOcean,
328}
329
330impl std::fmt::Display for CloudProvider {
331 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332 match self {
333 CloudProvider::Aws => write!(f, "aws"),
334 CloudProvider::DigitalOcean => write!(f, "digital-ocean"),
335 }
336 }
337}
338
339impl CloudProvider {
340 pub fn get_ssh_user(&self) -> String {
341 match self {
342 CloudProvider::Aws => "ubuntu".to_string(),
343 CloudProvider::DigitalOcean => "root".to_string(),
344 }
345 }
346}
347
348#[derive(Debug, Clone, Copy)]
349pub enum LogFormat {
350 Default,
351 Json,
352}
353
354impl LogFormat {
355 pub fn parse_from_str(val: &str) -> Result<Self> {
356 match val {
357 "default" => Ok(LogFormat::Default),
358 "json" => Ok(LogFormat::Json),
359 _ => Err(Error::LoggingConfiguration(
360 "The only valid values for this argument are \"default\" or \"json\"".to_string(),
361 )),
362 }
363 }
364
365 pub fn as_str(&self) -> &'static str {
366 match self {
367 LogFormat::Default => "default",
368 LogFormat::Json => "json",
369 }
370 }
371}
372
373#[derive(Clone)]
374pub struct UpgradeOptions {
375 pub ansible_verbose: bool,
376 pub custom_inventory: Option<Vec<VirtualMachine>>,
377 pub env_variables: Option<Vec<(String, String)>>,
378 pub force: bool,
379 pub forks: usize,
380 pub interval: Duration,
381 pub name: String,
382 pub node_type: Option<NodeType>,
383 pub pre_upgrade_delay: Option<u64>,
384 pub provider: CloudProvider,
385 pub version: Option<String>,
386}
387
388impl UpgradeOptions {
389 pub fn get_ansible_vars(&self) -> String {
390 let mut extra_vars = ExtraVarsDocBuilder::default();
391 extra_vars.add_variable("interval", &self.interval.as_millis().to_string());
392 if let Some(env_variables) = &self.env_variables {
393 extra_vars.add_env_variable_list("env_variables", env_variables.clone());
394 }
395 if self.force {
396 extra_vars.add_variable("force", &self.force.to_string());
397 }
398 if let Some(version) = &self.version {
399 extra_vars.add_variable("antnode_version", version);
400 }
401 if let Some(pre_upgrade_delay) = &self.pre_upgrade_delay {
402 extra_vars.add_variable("pre_upgrade_delay", &pre_upgrade_delay.to_string());
403 }
404 extra_vars.build()
405 }
406}
407
408#[derive(Default)]
409pub struct TestnetDeployBuilder {
410 ansible_forks: Option<usize>,
411 ansible_verbose_mode: bool,
412 deployment_type: EnvironmentType,
413 environment_name: String,
414 provider: Option<CloudProvider>,
415 ssh_secret_key_path: Option<PathBuf>,
416 state_bucket_name: Option<String>,
417 terraform_binary_path: Option<PathBuf>,
418 vault_password_path: Option<PathBuf>,
419 working_directory_path: Option<PathBuf>,
420}
421
422impl TestnetDeployBuilder {
423 pub fn new() -> Self {
424 Default::default()
425 }
426
427 pub fn ansible_verbose_mode(&mut self, ansible_verbose_mode: bool) -> &mut Self {
428 self.ansible_verbose_mode = ansible_verbose_mode;
429 self
430 }
431
432 pub fn ansible_forks(&mut self, ansible_forks: usize) -> &mut Self {
433 self.ansible_forks = Some(ansible_forks);
434 self
435 }
436
437 pub fn deployment_type(&mut self, deployment_type: EnvironmentType) -> &mut Self {
438 self.deployment_type = deployment_type;
439 self
440 }
441
442 pub fn environment_name(&mut self, name: &str) -> &mut Self {
443 self.environment_name = name.to_string();
444 self
445 }
446
447 pub fn provider(&mut self, provider: CloudProvider) -> &mut Self {
448 self.provider = Some(provider);
449 self
450 }
451
452 pub fn state_bucket_name(&mut self, state_bucket_name: String) -> &mut Self {
453 self.state_bucket_name = Some(state_bucket_name);
454 self
455 }
456
457 pub fn terraform_binary_path(&mut self, terraform_binary_path: PathBuf) -> &mut Self {
458 self.terraform_binary_path = Some(terraform_binary_path);
459 self
460 }
461
462 pub fn working_directory(&mut self, working_directory_path: PathBuf) -> &mut Self {
463 self.working_directory_path = Some(working_directory_path);
464 self
465 }
466
467 pub fn ssh_secret_key_path(&mut self, ssh_secret_key_path: PathBuf) -> &mut Self {
468 self.ssh_secret_key_path = Some(ssh_secret_key_path);
469 self
470 }
471
472 pub fn vault_password_path(&mut self, vault_password_path: PathBuf) -> &mut Self {
473 self.vault_password_path = Some(vault_password_path);
474 self
475 }
476
477 pub fn build(&self) -> Result<TestnetDeployer> {
478 let provider = self.provider.unwrap_or(CloudProvider::DigitalOcean);
479 match provider {
480 CloudProvider::DigitalOcean => {
481 let digital_ocean_pat = std::env::var("DO_PAT").map_err(|_| {
482 Error::CloudProviderCredentialsNotSupplied("DO_PAT".to_string())
483 })?;
484 std::env::set_var("DIGITALOCEAN_TOKEN", digital_ocean_pat.clone());
488 std::env::set_var("DO_API_TOKEN", digital_ocean_pat);
489 }
490 _ => {
491 return Err(Error::CloudProviderNotSupported(provider.to_string()));
492 }
493 }
494
495 let state_bucket_name = match self.state_bucket_name {
496 Some(ref bucket_name) => bucket_name.clone(),
497 None => std::env::var("TERRAFORM_STATE_BUCKET_NAME")?,
498 };
499
500 let default_terraform_bin_path = PathBuf::from("terraform");
501 let terraform_binary_path = self
502 .terraform_binary_path
503 .as_ref()
504 .unwrap_or(&default_terraform_bin_path);
505
506 let working_directory_path = match self.working_directory_path {
507 Some(ref work_dir_path) => work_dir_path.clone(),
508 None => std::env::current_dir()?.join("resources"),
509 };
510
511 let ssh_secret_key_path = match self.ssh_secret_key_path {
512 Some(ref ssh_sk_path) => ssh_sk_path.clone(),
513 None => PathBuf::from(std::env::var("SSH_KEY_PATH")?),
514 };
515
516 let vault_password_path = match self.vault_password_path {
517 Some(ref vault_pw_path) => vault_pw_path.clone(),
518 None => PathBuf::from(std::env::var("ANSIBLE_VAULT_PASSWORD_PATH")?),
519 };
520
521 let terraform_runner = TerraformRunner::new(
522 terraform_binary_path.to_path_buf(),
523 working_directory_path
524 .join("terraform")
525 .join("testnet")
526 .join(provider.to_string()),
527 provider,
528 &state_bucket_name,
529 )?;
530 let ansible_runner = AnsibleRunner::new(
531 self.ansible_forks.unwrap_or(ANSIBLE_DEFAULT_FORKS),
532 self.ansible_verbose_mode,
533 &self.environment_name,
534 provider,
535 ssh_secret_key_path.clone(),
536 vault_password_path,
537 working_directory_path.join("ansible"),
538 )?;
539 let ssh_client = SshClient::new(ssh_secret_key_path);
540 let ansible_provisioner =
541 AnsibleProvisioner::new(ansible_runner, provider, ssh_client.clone());
542 let rpc_client = RpcClient::new(
543 PathBuf::from("/usr/local/bin/safenode_rpc_client"),
544 working_directory_path.clone(),
545 );
546
547 let safe_path = working_directory_path.join("safe");
550 if safe_path.exists() {
551 std::fs::remove_file(safe_path)?;
552 }
553
554 let testnet = TestnetDeployer::new(
555 ansible_provisioner,
556 provider,
557 self.deployment_type.clone(),
558 &self.environment_name,
559 rpc_client,
560 S3Repository {},
561 ssh_client,
562 terraform_runner,
563 working_directory_path,
564 )?;
565
566 Ok(testnet)
567 }
568}
569
570#[derive(Clone)]
571pub struct TestnetDeployer {
572 pub ansible_provisioner: AnsibleProvisioner,
573 pub cloud_provider: CloudProvider,
574 pub deployment_type: EnvironmentType,
575 pub environment_name: String,
576 pub inventory_file_path: PathBuf,
577 pub rpc_client: RpcClient,
578 pub s3_repository: S3Repository,
579 pub ssh_client: SshClient,
580 pub terraform_runner: TerraformRunner,
581 pub working_directory_path: PathBuf,
582}
583
584impl TestnetDeployer {
585 #[allow(clippy::too_many_arguments)]
586 pub fn new(
587 ansible_provisioner: AnsibleProvisioner,
588 cloud_provider: CloudProvider,
589 deployment_type: EnvironmentType,
590 environment_name: &str,
591 rpc_client: RpcClient,
592 s3_repository: S3Repository,
593 ssh_client: SshClient,
594 terraform_runner: TerraformRunner,
595 working_directory_path: PathBuf,
596 ) -> Result<TestnetDeployer> {
597 if environment_name.is_empty() {
598 return Err(Error::EnvironmentNameRequired);
599 }
600 let inventory_file_path = working_directory_path
601 .join("ansible")
602 .join("inventory")
603 .join("dev_inventory_digital_ocean.yml");
604 Ok(TestnetDeployer {
605 ansible_provisioner,
606 cloud_provider,
607 deployment_type,
608 environment_name: environment_name.to_string(),
609 inventory_file_path,
610 rpc_client,
611 ssh_client,
612 s3_repository,
613 terraform_runner,
614 working_directory_path,
615 })
616 }
617
618 pub async fn init(&self) -> Result<()> {
619 if self
620 .s3_repository
621 .folder_exists(
622 "sn-testnet",
623 &format!("testnet-logs/{}", self.environment_name),
624 )
625 .await?
626 {
627 return Err(Error::LogsForPreviousTestnetExist(
628 self.environment_name.clone(),
629 ));
630 }
631
632 self.terraform_runner.init()?;
633 let workspaces = self.terraform_runner.workspace_list()?;
634 if !workspaces.contains(&self.environment_name) {
635 self.terraform_runner
636 .workspace_new(&self.environment_name)?;
637 } else {
638 println!("Workspace {} already exists", self.environment_name);
639 }
640
641 let rpc_client_path = self.working_directory_path.join("safenode_rpc_client");
642 if !rpc_client_path.is_file() {
643 println!("Downloading the rpc client for safenode...");
644 let archive_name = "safenode_rpc_client-latest-x86_64-unknown-linux-musl.tar.gz";
645 get_and_extract_archive_from_s3(
646 &self.s3_repository,
647 "sn-node-rpc-client",
648 archive_name,
649 &self.working_directory_path,
650 )
651 .await?;
652 #[cfg(unix)]
653 {
654 use std::os::unix::fs::PermissionsExt;
655 let mut permissions = std::fs::metadata(&rpc_client_path)?.permissions();
656 permissions.set_mode(0o755); std::fs::set_permissions(&rpc_client_path, permissions)?;
658 }
659 }
660
661 Ok(())
662 }
663
664 pub fn plan(&self, options: &InfraRunOptions) -> Result<()> {
665 println!("Selecting {} workspace...", options.name);
666 self.terraform_runner.workspace_select(&options.name)?;
667
668 let args = build_terraform_args(options)?;
669
670 self.terraform_runner
671 .plan(Some(args), Some(options.tfvars_filename.clone()))?;
672 Ok(())
673 }
674
675 pub fn start(
676 &self,
677 interval: Duration,
678 node_type: Option<NodeType>,
679 custom_inventory: Option<Vec<VirtualMachine>>,
680 ) -> Result<()> {
681 self.ansible_provisioner.start_nodes(
682 &self.environment_name,
683 interval,
684 node_type,
685 custom_inventory,
686 )?;
687 Ok(())
688 }
689
690 pub fn status(&self) -> Result<()> {
696 self.ansible_provisioner.status()?;
697
698 let peer_cache_node_registries = self
699 .ansible_provisioner
700 .get_node_registries(&AnsibleInventoryType::PeerCacheNodes)?;
701 let generic_node_registries = self
702 .ansible_provisioner
703 .get_node_registries(&AnsibleInventoryType::Nodes)?;
704 let symmetric_private_node_registries = self
705 .ansible_provisioner
706 .get_node_registries(&AnsibleInventoryType::SymmetricPrivateNodes)?;
707 let full_cone_private_node_registries = self
708 .ansible_provisioner
709 .get_node_registries(&AnsibleInventoryType::FullConePrivateNodes)?;
710 let genesis_node_registry = self
711 .ansible_provisioner
712 .get_node_registries(&AnsibleInventoryType::Genesis)?
713 .clone();
714
715 peer_cache_node_registries.print();
716 generic_node_registries.print();
717 symmetric_private_node_registries.print();
718 full_cone_private_node_registries.print();
719 genesis_node_registry.print();
720
721 let all_registries = [
722 &peer_cache_node_registries,
723 &generic_node_registries,
724 &symmetric_private_node_registries,
725 &full_cone_private_node_registries,
726 &genesis_node_registry,
727 ];
728
729 let mut total_nodes = 0;
730 let mut running_nodes = 0;
731 let mut stopped_nodes = 0;
732 let mut added_nodes = 0;
733 let mut removed_nodes = 0;
734
735 for (_, registry) in all_registries
736 .iter()
737 .flat_map(|r| r.retrieved_registries.iter())
738 {
739 for node in registry.nodes.iter() {
740 total_nodes += 1;
741 match node.status {
742 ServiceStatus::Running => running_nodes += 1,
743 ServiceStatus::Stopped => stopped_nodes += 1,
744 ServiceStatus::Added => added_nodes += 1,
745 ServiceStatus::Removed => removed_nodes += 1,
746 }
747 }
748 }
749
750 let peer_cache_hosts = peer_cache_node_registries.retrieved_registries.len();
751 let generic_hosts = generic_node_registries.retrieved_registries.len();
752 let symmetric_private_hosts = symmetric_private_node_registries.retrieved_registries.len();
753 let full_cone_private_hosts = full_cone_private_node_registries.retrieved_registries.len();
754
755 let peer_cache_nodes = peer_cache_node_registries
756 .retrieved_registries
757 .iter()
758 .flat_map(|(_, n)| n.nodes.iter())
759 .count();
760 let generic_nodes = generic_node_registries
761 .retrieved_registries
762 .iter()
763 .flat_map(|(_, n)| n.nodes.iter())
764 .count();
765 let symmetric_private_nodes = symmetric_private_node_registries
766 .retrieved_registries
767 .iter()
768 .flat_map(|(_, n)| n.nodes.iter())
769 .count();
770 let full_cone_private_nodes = full_cone_private_node_registries
771 .retrieved_registries
772 .iter()
773 .flat_map(|(_, n)| n.nodes.iter())
774 .count();
775
776 println!("-------");
777 println!("Summary");
778 println!("-------");
779 println!(
780 "Total peer cache nodes ({}x{}): {}",
781 peer_cache_hosts,
782 if peer_cache_hosts > 0 {
783 peer_cache_nodes / peer_cache_hosts
784 } else {
785 0
786 },
787 peer_cache_nodes
788 );
789 println!(
790 "Total generic nodes ({}x{}): {}",
791 generic_hosts,
792 if generic_hosts > 0 {
793 generic_nodes / generic_hosts
794 } else {
795 0
796 },
797 generic_nodes
798 );
799 println!(
800 "Total symmetric private nodes ({}x{}): {}",
801 symmetric_private_hosts,
802 if symmetric_private_hosts > 0 {
803 symmetric_private_nodes / symmetric_private_hosts
804 } else {
805 0
806 },
807 symmetric_private_nodes
808 );
809 println!(
810 "Total full cone private nodes ({}x{}): {}",
811 full_cone_private_hosts,
812 if full_cone_private_hosts > 0 {
813 full_cone_private_nodes / full_cone_private_hosts
814 } else {
815 0
816 },
817 full_cone_private_nodes
818 );
819 println!("Total nodes: {}", total_nodes);
820 println!("Running nodes: {}", running_nodes);
821 println!("Stopped nodes: {}", stopped_nodes);
822 println!("Added nodes: {}", added_nodes);
823 println!("Removed nodes: {}", removed_nodes);
824
825 Ok(())
826 }
827
828 pub fn cleanup_node_logs(&self, setup_cron: bool) -> Result<()> {
829 self.ansible_provisioner.cleanup_node_logs(setup_cron)?;
830 Ok(())
831 }
832
833 pub fn start_telegraf(
834 &self,
835 node_type: Option<NodeType>,
836 custom_inventory: Option<Vec<VirtualMachine>>,
837 ) -> Result<()> {
838 self.ansible_provisioner.start_telegraf(
839 &self.environment_name,
840 node_type,
841 custom_inventory,
842 )?;
843 Ok(())
844 }
845
846 pub fn stop(
847 &self,
848 interval: Duration,
849 node_type: Option<NodeType>,
850 custom_inventory: Option<Vec<VirtualMachine>>,
851 delay: Option<u64>,
852 service_names: Option<Vec<String>>,
853 ) -> Result<()> {
854 self.ansible_provisioner.stop_nodes(
855 &self.environment_name,
856 interval,
857 node_type,
858 custom_inventory,
859 delay,
860 service_names,
861 )?;
862 Ok(())
863 }
864
865 pub fn stop_telegraf(
866 &self,
867 node_type: Option<NodeType>,
868 custom_inventory: Option<Vec<VirtualMachine>>,
869 ) -> Result<()> {
870 self.ansible_provisioner.stop_telegraf(
871 &self.environment_name,
872 node_type,
873 custom_inventory,
874 )?;
875 Ok(())
876 }
877
878 pub fn upgrade(&self, options: UpgradeOptions) -> Result<()> {
879 self.ansible_provisioner.upgrade_nodes(&options)?;
880 Ok(())
881 }
882
883 pub fn upgrade_antctl(
884 &self,
885 version: Version,
886 node_type: Option<NodeType>,
887 custom_inventory: Option<Vec<VirtualMachine>>,
888 ) -> Result<()> {
889 self.ansible_provisioner.upgrade_antctl(
890 &self.environment_name,
891 &version,
892 node_type,
893 custom_inventory,
894 )?;
895 Ok(())
896 }
897
898 pub fn upgrade_node_telegraf(&self, name: &str) -> Result<()> {
899 self.ansible_provisioner.upgrade_node_telegraf(name)?;
900 Ok(())
901 }
902
903 pub fn upgrade_uploader_telegraf(&self, name: &str) -> Result<()> {
904 self.ansible_provisioner.upgrade_uploader_telegraf(name)?;
905 Ok(())
906 }
907
908 pub async fn clean(&self) -> Result<()> {
909 let environment_details =
910 get_environment_details(&self.environment_name, &self.s3_repository).await?;
911
912 let evm_network = match environment_details.evm_network {
913 EvmNetwork::Anvil => None,
914 EvmNetwork::Custom => Some(Network::new_custom(
915 environment_details.evm_rpc_url.as_ref().unwrap(),
916 environment_details
917 .evm_payment_token_address
918 .as_ref()
919 .unwrap(),
920 environment_details
921 .evm_data_payments_address
922 .as_ref()
923 .unwrap(),
924 )),
925 EvmNetwork::ArbitrumOne => Some(Network::ArbitrumOne),
926 EvmNetwork::ArbitrumSepolia => Some(Network::ArbitrumSepolia),
927 };
928 if let (Some(network), Some(address)) =
929 (evm_network, &environment_details.funding_wallet_address)
930 {
931 if let Err(err) = self
932 .ansible_provisioner
933 .drain_funds_from_uploaders(
934 Address::from_str(address).map_err(|err| {
935 log::error!("Invalid funding wallet public key: {err:?}");
936 Error::FailedToParseKey
937 })?,
938 network,
939 )
940 .await
941 {
942 log::error!("Failed to drain funds from uploaders: {err:?}");
943 }
944 } else {
945 println!("Custom network provided. Not draining funds.");
946 log::info!("Custom network provided. Not draining funds.");
947 }
948
949 do_clean(
950 &self.environment_name,
951 Some(environment_details),
952 self.working_directory_path.clone(),
953 &self.terraform_runner,
954 None,
955 )
956 .await?;
957 self.s3_repository
958 .delete_object("sn-environment-type", &self.environment_name)
959 .await?;
960 Ok(())
961 }
962}
963
964pub fn get_genesis_multiaddr(
969 ansible_runner: &AnsibleRunner,
970 ssh_client: &SshClient,
971) -> Result<(String, IpAddr)> {
972 let genesis_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Genesis, true)?;
973 let genesis_ip = genesis_inventory[0].public_ip_addr;
974
975 let multiaddr = ssh_client
979 .run_command(
980 &genesis_ip,
981 "root",
982 "jq -r '.nodes[] | select(.peers_args.first == true) | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
983 false,
984 )
985 .map(|output| output.first().cloned())
986 .unwrap_or_else(|err| {
987 log::error!("Failed to find first node with quic-v1 protocol: {err:?}");
988 None
989 });
990
991 let multiaddr = match multiaddr {
993 Some(addr) => addr,
994 None => ssh_client
995 .run_command(
996 &genesis_ip,
997 "root",
998 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
999 false,
1000 )?
1001 .first()
1002 .cloned()
1003 .ok_or_else(|| Error::GenesisListenAddress)?,
1004 };
1005
1006 Ok((multiaddr, genesis_ip))
1007}
1008
1009pub fn get_anvil_node_data(
1010 ansible_runner: &AnsibleRunner,
1011 ssh_client: &SshClient,
1012) -> Result<AnvilNodeData> {
1013 let evm_inventory = ansible_runner.get_inventory(AnsibleInventoryType::EvmNodes, true)?;
1014 if evm_inventory.is_empty() {
1015 return Err(Error::EvmNodeNotFound);
1016 }
1017
1018 let evm_ip = evm_inventory[0].public_ip_addr;
1019 debug!("Retrieved IP address for EVM node: {evm_ip}");
1020 let csv_file_path = "/home/ant/.local/share/autonomi/evm_testnet_data.csv";
1021
1022 const MAX_ATTEMPTS: u8 = 5;
1023 const RETRY_DELAY: Duration = Duration::from_secs(5);
1024
1025 for attempt in 1..=MAX_ATTEMPTS {
1026 match ssh_client.run_command(&evm_ip, "ant", &format!("cat {}", csv_file_path), false) {
1027 Ok(output) => {
1028 if let Some(csv_contents) = output.first() {
1029 let parts: Vec<&str> = csv_contents.split(',').collect();
1030 if parts.len() != 4 {
1031 return Err(Error::EvmTestnetDataParsingError(
1032 "Expected 4 fields in the CSV".to_string(),
1033 ));
1034 }
1035
1036 let evm_testnet_data = AnvilNodeData {
1037 rpc_url: parts[0].trim().to_string(),
1038 payment_token_address: parts[1].trim().to_string(),
1039 data_payments_address: parts[2].trim().to_string(),
1040 deployer_wallet_private_key: parts[3].trim().to_string(),
1041 };
1042 return Ok(evm_testnet_data);
1043 }
1044 }
1045 Err(e) => {
1046 if attempt == MAX_ATTEMPTS {
1047 return Err(e);
1048 }
1049 println!(
1050 "Attempt {} failed to read EVM testnet data. Retrying in {} seconds...",
1051 attempt,
1052 RETRY_DELAY.as_secs()
1053 );
1054 }
1055 }
1056 std::thread::sleep(RETRY_DELAY);
1057 }
1058
1059 Err(Error::EvmTestnetDataNotFound)
1060}
1061
1062pub fn get_multiaddr(
1063 ansible_runner: &AnsibleRunner,
1064 ssh_client: &SshClient,
1065) -> Result<(String, IpAddr)> {
1066 let node_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Nodes, true)?;
1067 let node_ip = node_inventory
1070 .iter()
1071 .find(|vm| vm.name.ends_with("-node-1"))
1072 .ok_or_else(|| Error::NodeAddressNotFound)?
1073 .public_ip_addr;
1074
1075 debug!("Getting multiaddr from node {node_ip}");
1076
1077 let multiaddr =
1078 ssh_client
1079 .run_command(
1080 &node_ip,
1081 "root",
1082 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not)' /var/antctl/node_registry.json | head -n 1",
1084 false,
1085 )?.first()
1086 .cloned()
1087 .ok_or_else(|| Error::NodeAddressNotFound)?;
1088
1089 Ok((multiaddr, node_ip))
1092}
1093
1094pub async fn get_and_extract_archive_from_s3(
1095 s3_repository: &S3Repository,
1096 bucket_name: &str,
1097 archive_bucket_path: &str,
1098 dest_path: &Path,
1099) -> Result<()> {
1100 let archive_file_name = archive_bucket_path.split('/').last().unwrap();
1103 let archive_dest_path = dest_path.join(archive_file_name);
1104 s3_repository
1105 .download_object(bucket_name, archive_bucket_path, &archive_dest_path)
1106 .await?;
1107 extract_archive(&archive_dest_path, dest_path)?;
1108 Ok(())
1109}
1110
1111pub fn extract_archive(archive_path: &Path, dest_path: &Path) -> Result<()> {
1112 let archive_file = File::open(archive_path)?;
1113 let decoder = GzDecoder::new(archive_file);
1114 let mut archive = Archive::new(decoder);
1115 let entries = archive.entries()?;
1116 for entry_result in entries {
1117 let mut entry = entry_result?;
1118 let extract_path = dest_path.join(entry.path()?);
1119 if entry.header().entry_type() == tar::EntryType::Directory {
1120 std::fs::create_dir_all(extract_path)?;
1121 continue;
1122 }
1123 let mut file = BufWriter::new(File::create(extract_path)?);
1124 std::io::copy(&mut entry, &mut file)?;
1125 }
1126 std::fs::remove_file(archive_path)?;
1127 Ok(())
1128}
1129
1130pub fn run_external_command(
1131 binary_path: PathBuf,
1132 working_directory_path: PathBuf,
1133 args: Vec<String>,
1134 suppress_stdout: bool,
1135 suppress_stderr: bool,
1136) -> Result<Vec<String>> {
1137 let mut command = Command::new(binary_path.clone());
1138 for arg in &args {
1139 command.arg(arg);
1140 }
1141 command.stdout(Stdio::piped());
1142 command.stderr(Stdio::piped());
1143 command.current_dir(working_directory_path.clone());
1144 debug!("Running {binary_path:#?} with args {args:#?}");
1145 debug!("Working directory set to {working_directory_path:#?}");
1146
1147 let mut child = command.spawn()?;
1148 let mut output_lines = Vec::new();
1149
1150 if let Some(ref mut stdout) = child.stdout {
1151 let reader = BufReader::new(stdout);
1152 for line in reader.lines() {
1153 let line = line?;
1154 if !suppress_stdout {
1155 println!("{line}");
1156 }
1157 output_lines.push(line);
1158 }
1159 }
1160
1161 if let Some(ref mut stderr) = child.stderr {
1162 let reader = BufReader::new(stderr);
1163 for line in reader.lines() {
1164 let line = line?;
1165 if !suppress_stderr {
1166 eprintln!("{line}");
1167 }
1168 output_lines.push(line);
1169 }
1170 }
1171
1172 let output = child.wait()?;
1173 if !output.success() {
1174 let binary_path = binary_path.to_str().unwrap();
1176 return Err(Error::ExternalCommandRunFailed {
1177 binary: binary_path.to_string(),
1178 exit_status: output,
1179 });
1180 }
1181
1182 Ok(output_lines)
1183}
1184
1185pub fn is_binary_on_path(binary_name: &str) -> bool {
1186 if let Ok(path) = std::env::var("PATH") {
1187 for dir in path.split(':') {
1188 let mut full_path = PathBuf::from(dir);
1189 full_path.push(binary_name);
1190 if full_path.exists() {
1191 return true;
1192 }
1193 }
1194 }
1195 false
1196}
1197
1198pub async fn do_clean(
1199 name: &str,
1200 environment_details: Option<EnvironmentDetails>,
1201 working_directory_path: PathBuf,
1202 terraform_runner: &TerraformRunner,
1203 inventory_types: Option<Vec<AnsibleInventoryType>>,
1204) -> Result<()> {
1205 terraform_runner.init()?;
1206 let workspaces = terraform_runner.workspace_list()?;
1207 if !workspaces.contains(&name.to_string()) {
1208 return Err(Error::EnvironmentDoesNotExist(name.to_string()));
1209 }
1210 terraform_runner.workspace_select(name)?;
1211 println!("Selected {name} workspace");
1212
1213 let environment_details = environment_details.ok_or(Error::EnvironmentDetailsNotFound(
1214 "Should be provided during do_clean".to_string(),
1215 ))?;
1216
1217 let options =
1218 InfraRunOptions::generate_existing(name, terraform_runner, &environment_details).await?;
1219 let mut args = Vec::new();
1220 if let Some(full_cone_private_node_volume_size) = options.full_cone_private_node_volume_size {
1221 args.push((
1222 "full_cone_private_node_volume_size".to_string(),
1223 full_cone_private_node_volume_size.to_string(),
1224 ));
1225 }
1226 if let Some(genesis_node_volume_size) = options.genesis_node_volume_size {
1227 args.push((
1228 "genesis_node_volume_size".to_string(),
1229 genesis_node_volume_size.to_string(),
1230 ));
1231 }
1232 if let Some(node_volume_size) = options.node_volume_size {
1233 args.push(("node_volume_size".to_string(), node_volume_size.to_string()));
1234 }
1235 if let Some(peer_cache_node_volume_size) = options.peer_cache_node_volume_size {
1236 args.push((
1237 "peer_cache_node_volume_size".to_string(),
1238 peer_cache_node_volume_size.to_string(),
1239 ));
1240 }
1241 if let Some(symmetric_private_node_volume_size) = options.symmetric_private_node_volume_size {
1242 args.push((
1243 "symmetric_private_node_volume_size".to_string(),
1244 symmetric_private_node_volume_size.to_string(),
1245 ));
1246 }
1247
1248 terraform_runner.destroy(
1249 Some(args),
1250 Some(
1251 environment_details
1252 .environment_type
1253 .get_tfvars_filename(name),
1254 ),
1255 )?;
1256
1257 terraform_runner.workspace_select("dev")?;
1261 terraform_runner.workspace_delete(name)?;
1262 println!("Deleted {name} workspace");
1263
1264 cleanup_environment_inventory(
1265 name,
1266 &working_directory_path.join("ansible").join("inventory"),
1267 inventory_types,
1268 )?;
1269
1270 println!("Deleted Ansible inventory for {name}");
1271 Ok(())
1272}
1273
1274pub fn get_wallet_directory() -> Result<PathBuf> {
1275 Ok(dirs_next::data_dir()
1276 .ok_or_else(|| Error::CouldNotRetrieveDataDirectory)?
1277 .join("safe")
1278 .join("client")
1279 .join("wallet"))
1280}
1281
1282pub async fn notify_slack(inventory: DeploymentInventory) -> Result<()> {
1283 let webhook_url =
1284 std::env::var("SLACK_WEBHOOK_URL").map_err(|_| Error::SlackWebhookUrlNotSupplied)?;
1285
1286 let mut message = String::new();
1287 message.push_str("*Testnet Details*\n");
1288 message.push_str(&format!("Name: {}\n", inventory.name));
1289 message.push_str(&format!("Node count: {}\n", inventory.peers().len()));
1290 message.push_str(&format!("Faucet address: {:?}\n", inventory.faucet_address));
1291 match inventory.binary_option {
1292 BinaryOption::BuildFromSource {
1293 ref repo_owner,
1294 ref branch,
1295 ..
1296 } => {
1297 message.push_str("*Branch Details*\n");
1298 message.push_str(&format!("Repo owner: {}\n", repo_owner));
1299 message.push_str(&format!("Branch: {}\n", branch));
1300 }
1301 BinaryOption::Versioned {
1302 ant_version: ref safe_version,
1303 antnode_version: ref safenode_version,
1304 antctl_version: ref safenode_manager_version,
1305 ..
1306 } => {
1307 message.push_str("*Version Details*\n");
1308 message.push_str(&format!(
1309 "ant version: {}\n",
1310 safe_version
1311 .as_ref()
1312 .map_or("None".to_string(), |v| v.to_string())
1313 ));
1314 message.push_str(&format!("safenode version: {}\n", safenode_version));
1315 message.push_str(&format!("antctl version: {}\n", safenode_manager_version));
1316 }
1317 }
1318
1319 message.push_str("*Sample Peers*\n");
1320 message.push_str("```\n");
1321 for peer in inventory.peers().iter().take(20) {
1322 message.push_str(&format!("{peer}\n"));
1323 }
1324 message.push_str("```\n");
1325 message.push_str("*Available Files*\n");
1326 message.push_str("```\n");
1327 for (addr, file_name) in inventory.uploaded_files.iter() {
1328 message.push_str(&format!("{}: {}\n", addr, file_name))
1329 }
1330 message.push_str("```\n");
1331
1332 let payload = json!({
1333 "text": message,
1334 });
1335 reqwest::Client::new()
1336 .post(webhook_url)
1337 .json(&payload)
1338 .send()
1339 .await?;
1340 println!("{message}");
1341 println!("Posted notification to Slack");
1342 Ok(())
1343}
1344
1345fn print_duration(duration: Duration) {
1346 let total_seconds = duration.as_secs();
1347 let minutes = total_seconds / 60;
1348 let seconds = total_seconds % 60;
1349 debug!("Time taken: {} minutes and {} seconds", minutes, seconds);
1350}
1351
1352pub fn get_progress_bar(length: u64) -> Result<ProgressBar> {
1353 let progress_bar = ProgressBar::new(length);
1354 progress_bar.set_style(
1355 ProgressStyle::default_bar()
1356 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len}")?
1357 .progress_chars("#>-"),
1358 );
1359 progress_bar.enable_steady_tick(Duration::from_millis(100));
1360 Ok(progress_bar)
1361}
1362pub async fn get_environment_details(
1363 environment_name: &str,
1364 s3_repository: &S3Repository,
1365) -> Result<EnvironmentDetails> {
1366 let temp_file = tempfile::NamedTempFile::new()?;
1367
1368 let max_retries = 3;
1369 let mut retries = 0;
1370 let env_details = loop {
1371 debug!("Downloading the environment details file for {environment_name} from S3");
1372 match s3_repository
1373 .download_object("sn-environment-type", environment_name, temp_file.path())
1374 .await
1375 {
1376 Ok(_) => {
1377 debug!("Downloaded the environment details file for {environment_name} from S3");
1378 let content = match std::fs::read_to_string(temp_file.path()) {
1379 Ok(content) => content,
1380 Err(err) => {
1381 log::error!("Could not read the environment details file: {err:?}");
1382 if retries < max_retries {
1383 debug!("Retrying to read the environment details file");
1384 retries += 1;
1385 continue;
1386 } else {
1387 return Err(Error::EnvironmentDetailsNotFound(
1388 environment_name.to_string(),
1389 ));
1390 }
1391 }
1392 };
1393 trace!("Content of the environment details file: {}", content);
1394
1395 match serde_json::from_str(&content) {
1396 Ok(environment_details) => break environment_details,
1397 Err(err) => {
1398 log::error!("Could not parse the environment details file: {err:?}");
1399 if retries < max_retries {
1400 debug!("Retrying to parse the environment details file");
1401 retries += 1;
1402 continue;
1403 } else {
1404 return Err(Error::EnvironmentDetailsNotFound(
1405 environment_name.to_string(),
1406 ));
1407 }
1408 }
1409 }
1410 }
1411 Err(err) => {
1412 log::error!(
1413 "Could not download the environment details file for {environment_name} from S3: {err:?}"
1414 );
1415 if retries < max_retries {
1416 retries += 1;
1417 continue;
1418 } else {
1419 return Err(Error::EnvironmentDetailsNotFound(
1420 environment_name.to_string(),
1421 ));
1422 }
1423 }
1424 }
1425 };
1426
1427 debug!("Fetched environment details: {env_details:?}");
1428
1429 Ok(env_details)
1430}
1431
1432pub async fn write_environment_details(
1433 s3_repository: &S3Repository,
1434 environment_name: &str,
1435 environment_details: &EnvironmentDetails,
1436) -> Result<()> {
1437 let temp_dir = tempfile::tempdir()?;
1438 let path = temp_dir.path().to_path_buf().join(environment_name);
1439 let mut file = File::create(&path)?;
1440 let json = serde_json::to_string(environment_details)?;
1441 file.write_all(json.as_bytes())?;
1442 s3_repository
1443 .upload_file("sn-environment-type", &path, true)
1444 .await?;
1445 Ok(())
1446}
1447
1448pub fn calculate_size_per_attached_volume(node_count: u16) -> u16 {
1449 if node_count == 0 {
1450 return 0;
1451 }
1452 let total_volume_required = node_count * STORAGE_REQUIRED_PER_NODE;
1453
1454 (total_volume_required as f64 / 7.0).ceil() as u16
1456}
1457
1458pub fn get_bootstrap_cache_url(ip_addr: &IpAddr) -> String {
1459 format!("http://{ip_addr}/bootstrap_cache.json")
1460}