1pub mod ansible;
8pub mod bootstrap;
9pub mod deploy;
10pub mod digital_ocean;
11pub mod error;
12pub mod funding;
13pub mod infra;
14pub mod inventory;
15pub mod logs;
16pub mod logstash;
17pub mod network_commands;
18pub mod reserved_ip;
19pub mod rpc_client;
20pub mod s3;
21pub mod safe;
22pub mod setup;
23pub mod ssh;
24pub mod terraform;
25pub mod upscale;
26
27const STORAGE_REQUIRED_PER_NODE: u16 = 7;
28
29use crate::{
30 ansible::{
31 extra_vars::ExtraVarsDocBuilder,
32 inventory::{cleanup_environment_inventory, AnsibleInventoryType},
33 provisioning::AnsibleProvisioner,
34 AnsibleRunner,
35 },
36 error::{Error, Result},
37 inventory::{DeploymentInventory, VirtualMachine},
38 rpc_client::RpcClient,
39 s3::S3Repository,
40 ssh::SshClient,
41 terraform::TerraformRunner,
42};
43use alloy::primitives::Address;
44use ant_service_management::ServiceStatus;
45use evmlib::Network;
46use flate2::read::GzDecoder;
47use indicatif::{ProgressBar, ProgressStyle};
48use infra::{build_terraform_args, InfraRunOptions};
49use log::{debug, trace};
50use semver::Version;
51use serde::{Deserialize, Serialize};
52use serde_json::json;
53use std::{
54 fs::File,
55 io::{BufRead, BufReader, BufWriter, Write},
56 net::{IpAddr, SocketAddr},
57 path::{Path, PathBuf},
58 process::{Command, Stdio},
59 str::FromStr,
60 time::Duration,
61};
62use tar::Archive;
63
64const ANSIBLE_DEFAULT_FORKS: usize = 50;
65
66#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
67pub enum DeploymentType {
68 Bootstrap,
70 #[default]
72 New,
73}
74
75#[derive(Debug, Clone, Default, Serialize, Deserialize)]
76pub struct AnvilNodeData {
77 pub data_payments_address: String,
78 pub deployer_wallet_private_key: String,
79 pub payment_token_address: String,
80 pub rpc_url: String,
81}
82
83impl std::fmt::Display for DeploymentType {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 match self {
86 DeploymentType::Bootstrap => write!(f, "bootstrap"),
87 DeploymentType::New => write!(f, "new"),
88 }
89 }
90}
91
92impl std::str::FromStr for DeploymentType {
93 type Err = String;
94
95 fn from_str(s: &str) -> Result<Self, Self::Err> {
96 match s.to_lowercase().as_str() {
97 "bootstrap" => Ok(DeploymentType::Bootstrap),
98 "new" => Ok(DeploymentType::New),
99 _ => Err(format!("Invalid deployment type: {}", s)),
100 }
101 }
102}
103
104#[derive(Debug, Clone)]
105pub enum NodeType {
106 FullConePrivateNode,
107 Generic,
108 Genesis,
109 PeerCache,
110 SymmetricPrivateNode,
111}
112
113impl std::str::FromStr for NodeType {
114 type Err = String;
115
116 fn from_str(s: &str) -> Result<Self, Self::Err> {
117 match s.to_lowercase().as_str() {
118 "full-cone-private" => Ok(NodeType::FullConePrivateNode),
119 "generic" => Ok(NodeType::Generic),
120 "genesis" => Ok(NodeType::Genesis),
121 "peer-cache" => Ok(NodeType::PeerCache),
122 "symmetric-private" => Ok(NodeType::SymmetricPrivateNode),
123 _ => Err(format!("Invalid node type: {}", s)),
124 }
125 }
126}
127
128impl NodeType {
129 pub fn telegraf_role(&self) -> &'static str {
130 match self {
131 NodeType::FullConePrivateNode => "NAT_FULL_CONE_NODE",
132 NodeType::Generic => "GENERIC_NODE",
133 NodeType::Genesis => "GENESIS_NODE",
134 NodeType::PeerCache => "PEER_CACHE_NODE",
135 NodeType::SymmetricPrivateNode => "NAT_RANDOMIZED_NODE",
136 }
137 }
138
139 pub fn to_ansible_inventory_type(&self) -> AnsibleInventoryType {
140 match self {
141 NodeType::FullConePrivateNode => AnsibleInventoryType::FullConePrivateNodes,
142 NodeType::Generic => AnsibleInventoryType::Nodes,
143 NodeType::Genesis => AnsibleInventoryType::Genesis,
144 NodeType::PeerCache => AnsibleInventoryType::PeerCacheNodes,
145 NodeType::SymmetricPrivateNode => AnsibleInventoryType::SymmetricPrivateNodes,
146 }
147 }
148}
149
150#[derive(Clone, Debug, Default, Eq, Serialize, Deserialize, PartialEq)]
151pub enum EvmNetwork {
152 #[default]
153 Anvil,
154 ArbitrumOne,
155 ArbitrumSepolia,
156 Custom,
157}
158
159impl std::fmt::Display for EvmNetwork {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 match self {
162 EvmNetwork::Anvil => write!(f, "evm-custom"),
163 EvmNetwork::ArbitrumOne => write!(f, "evm-arbitrum-one"),
164 EvmNetwork::ArbitrumSepolia => write!(f, "evm-arbitrum-sepolia"),
165 EvmNetwork::Custom => write!(f, "evm-custom"),
166 }
167 }
168}
169
170impl std::str::FromStr for EvmNetwork {
171 type Err = String;
172
173 fn from_str(s: &str) -> Result<Self, Self::Err> {
174 match s.to_lowercase().as_str() {
175 "anvil" => Ok(EvmNetwork::Anvil),
176 "arbitrum-one" => Ok(EvmNetwork::ArbitrumOne),
177 "arbitrum-sepolia" => Ok(EvmNetwork::ArbitrumSepolia),
178 "custom" => Ok(EvmNetwork::Custom),
179 _ => Err(format!("Invalid EVM network type: {}", s)),
180 }
181 }
182}
183
184#[derive(Clone, Debug, Default, Serialize, Deserialize)]
185pub struct EnvironmentDetails {
186 pub deployment_type: DeploymentType,
187 pub environment_type: EnvironmentType,
188 pub evm_network: EvmNetwork,
189 pub evm_data_payments_address: Option<String>,
190 pub evm_payment_token_address: Option<String>,
191 pub evm_rpc_url: Option<String>,
192 pub funding_wallet_address: Option<String>,
193 pub network_id: Option<u8>,
194 pub rewards_address: String,
195}
196
197#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
198pub enum EnvironmentType {
199 #[default]
200 Development,
201 Production,
202 Staging,
203}
204
205impl EnvironmentType {
206 pub fn get_tfvars_filename(&self, name: &str) -> String {
207 match self {
208 EnvironmentType::Development => "dev.tfvars".to_string(),
209 EnvironmentType::Staging => "staging.tfvars".to_string(),
210 EnvironmentType::Production => {
211 format!("{name}.tfvars",)
212 }
213 }
214 }
215
216 pub fn get_default_peer_cache_node_count(&self) -> u16 {
217 match self {
218 EnvironmentType::Development => 5,
219 EnvironmentType::Production => 5,
220 EnvironmentType::Staging => 5,
221 }
222 }
223
224 pub fn get_default_node_count(&self) -> u16 {
225 match self {
226 EnvironmentType::Development => 25,
227 EnvironmentType::Production => 25,
228 EnvironmentType::Staging => 25,
229 }
230 }
231
232 pub fn get_default_symmetric_private_node_count(&self) -> u16 {
233 self.get_default_node_count()
234 }
235
236 pub fn get_default_full_cone_private_node_count(&self) -> u16 {
237 self.get_default_node_count()
238 }
239}
240
241impl std::fmt::Display for EnvironmentType {
242 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243 match self {
244 EnvironmentType::Development => write!(f, "development"),
245 EnvironmentType::Production => write!(f, "production"),
246 EnvironmentType::Staging => write!(f, "staging"),
247 }
248 }
249}
250
251impl FromStr for EnvironmentType {
252 type Err = Error;
253
254 fn from_str(s: &str) -> Result<Self, Self::Err> {
255 match s.to_lowercase().as_str() {
256 "development" => Ok(EnvironmentType::Development),
257 "production" => Ok(EnvironmentType::Production),
258 "staging" => Ok(EnvironmentType::Staging),
259 _ => Err(Error::EnvironmentNameFromStringError(s.to_string())),
260 }
261 }
262}
263
264pub struct DeployOptions {
265 pub binary_option: BinaryOption,
266 pub current_inventory: DeploymentInventory,
267 pub env_variables: Option<Vec<(String, String)>>,
268 pub evm_network: EvmNetwork,
269 pub evm_node_vm_size: Option<String>,
270 pub log_format: Option<LogFormat>,
271 pub logstash_details: Option<(String, Vec<SocketAddr>)>,
272 pub name: String,
273 pub node_count: u16,
274 pub node_vm_count: Option<u16>,
275 pub node_vm_size: Option<String>,
276 pub peer_cache_node_count: u16,
277 pub peer_cache_node_vm_count: Option<u16>,
278 pub peer_cache_node_vm_size: Option<String>,
279 pub public_rpc: bool,
280 pub rewards_address: String,
281 pub uploader_vm_count: Option<u16>,
282 pub uploader_vm_size: Option<String>,
283}
284
285#[derive(Clone, Debug, Serialize, Deserialize)]
299pub enum BinaryOption {
300 BuildFromSource {
302 antnode_features: Option<String>,
304 branch: String,
305 repo_owner: String,
306 },
307 Versioned {
309 ant_version: Option<Version>,
310 antctl_version: Version,
311 antnode_version: Version,
312 },
313}
314
315impl BinaryOption {
316 pub fn print(&self) {
317 match self {
318 BinaryOption::BuildFromSource {
319 antnode_features,
320 branch,
321 repo_owner,
322 } => {
323 println!("Source configuration:");
324 println!(" Repository owner: {}", repo_owner);
325 println!(" Branch: {}", branch);
326 if let Some(features) = antnode_features {
327 println!(" Antnode features: {}", features);
328 }
329 }
330 BinaryOption::Versioned {
331 ant_version,
332 antctl_version,
333 antnode_version,
334 } => {
335 println!("Versioned binaries configuration:");
336 if let Some(version) = ant_version {
337 println!(" ant version: {}", version);
338 }
339 println!(" antctl version: {}", antctl_version);
340 println!(" antnode version: {}", antnode_version);
341 }
342 }
343 }
344}
345
346#[derive(Debug, Clone, Copy)]
347pub enum CloudProvider {
348 Aws,
349 DigitalOcean,
350}
351
352impl std::fmt::Display for CloudProvider {
353 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
354 match self {
355 CloudProvider::Aws => write!(f, "aws"),
356 CloudProvider::DigitalOcean => write!(f, "digital-ocean"),
357 }
358 }
359}
360
361impl CloudProvider {
362 pub fn get_ssh_user(&self) -> String {
363 match self {
364 CloudProvider::Aws => "ubuntu".to_string(),
365 CloudProvider::DigitalOcean => "root".to_string(),
366 }
367 }
368}
369
370#[derive(Debug, Clone, Copy)]
371pub enum LogFormat {
372 Default,
373 Json,
374}
375
376impl LogFormat {
377 pub fn parse_from_str(val: &str) -> Result<Self> {
378 match val {
379 "default" => Ok(LogFormat::Default),
380 "json" => Ok(LogFormat::Json),
381 _ => Err(Error::LoggingConfiguration(
382 "The only valid values for this argument are \"default\" or \"json\"".to_string(),
383 )),
384 }
385 }
386
387 pub fn as_str(&self) -> &'static str {
388 match self {
389 LogFormat::Default => "default",
390 LogFormat::Json => "json",
391 }
392 }
393}
394
395#[derive(Clone)]
396pub struct UpgradeOptions {
397 pub ansible_verbose: bool,
398 pub custom_inventory: Option<Vec<VirtualMachine>>,
399 pub env_variables: Option<Vec<(String, String)>>,
400 pub force: bool,
401 pub forks: usize,
402 pub interval: Duration,
403 pub name: String,
404 pub node_type: Option<NodeType>,
405 pub pre_upgrade_delay: Option<u64>,
406 pub provider: CloudProvider,
407 pub version: Option<String>,
408}
409
410impl UpgradeOptions {
411 pub fn get_ansible_vars(&self) -> String {
412 let mut extra_vars = ExtraVarsDocBuilder::default();
413 extra_vars.add_variable("interval", &self.interval.as_millis().to_string());
414 if let Some(env_variables) = &self.env_variables {
415 extra_vars.add_env_variable_list("env_variables", env_variables.clone());
416 }
417 if self.force {
418 extra_vars.add_variable("force", &self.force.to_string());
419 }
420 if let Some(version) = &self.version {
421 extra_vars.add_variable("antnode_version", version);
422 }
423 if let Some(pre_upgrade_delay) = &self.pre_upgrade_delay {
424 extra_vars.add_variable("pre_upgrade_delay", &pre_upgrade_delay.to_string());
425 }
426 extra_vars.build()
427 }
428}
429
430#[derive(Default)]
431pub struct TestnetDeployBuilder {
432 ansible_forks: Option<usize>,
433 ansible_verbose_mode: bool,
434 deployment_type: EnvironmentType,
435 environment_name: String,
436 provider: Option<CloudProvider>,
437 ssh_secret_key_path: Option<PathBuf>,
438 state_bucket_name: Option<String>,
439 terraform_binary_path: Option<PathBuf>,
440 vault_password_path: Option<PathBuf>,
441 working_directory_path: Option<PathBuf>,
442}
443
444impl TestnetDeployBuilder {
445 pub fn new() -> Self {
446 Default::default()
447 }
448
449 pub fn ansible_verbose_mode(&mut self, ansible_verbose_mode: bool) -> &mut Self {
450 self.ansible_verbose_mode = ansible_verbose_mode;
451 self
452 }
453
454 pub fn ansible_forks(&mut self, ansible_forks: usize) -> &mut Self {
455 self.ansible_forks = Some(ansible_forks);
456 self
457 }
458
459 pub fn deployment_type(&mut self, deployment_type: EnvironmentType) -> &mut Self {
460 self.deployment_type = deployment_type;
461 self
462 }
463
464 pub fn environment_name(&mut self, name: &str) -> &mut Self {
465 self.environment_name = name.to_string();
466 self
467 }
468
469 pub fn provider(&mut self, provider: CloudProvider) -> &mut Self {
470 self.provider = Some(provider);
471 self
472 }
473
474 pub fn state_bucket_name(&mut self, state_bucket_name: String) -> &mut Self {
475 self.state_bucket_name = Some(state_bucket_name);
476 self
477 }
478
479 pub fn terraform_binary_path(&mut self, terraform_binary_path: PathBuf) -> &mut Self {
480 self.terraform_binary_path = Some(terraform_binary_path);
481 self
482 }
483
484 pub fn working_directory(&mut self, working_directory_path: PathBuf) -> &mut Self {
485 self.working_directory_path = Some(working_directory_path);
486 self
487 }
488
489 pub fn ssh_secret_key_path(&mut self, ssh_secret_key_path: PathBuf) -> &mut Self {
490 self.ssh_secret_key_path = Some(ssh_secret_key_path);
491 self
492 }
493
494 pub fn vault_password_path(&mut self, vault_password_path: PathBuf) -> &mut Self {
495 self.vault_password_path = Some(vault_password_path);
496 self
497 }
498
499 pub fn build(&self) -> Result<TestnetDeployer> {
500 let provider = self.provider.unwrap_or(CloudProvider::DigitalOcean);
501 match provider {
502 CloudProvider::DigitalOcean => {
503 let digital_ocean_pat = std::env::var("DO_PAT").map_err(|_| {
504 Error::CloudProviderCredentialsNotSupplied("DO_PAT".to_string())
505 })?;
506 std::env::set_var("DIGITALOCEAN_TOKEN", digital_ocean_pat.clone());
510 std::env::set_var("DO_API_TOKEN", digital_ocean_pat);
511 }
512 _ => {
513 return Err(Error::CloudProviderNotSupported(provider.to_string()));
514 }
515 }
516
517 let state_bucket_name = match self.state_bucket_name {
518 Some(ref bucket_name) => bucket_name.clone(),
519 None => std::env::var("TERRAFORM_STATE_BUCKET_NAME")?,
520 };
521
522 let default_terraform_bin_path = PathBuf::from("terraform");
523 let terraform_binary_path = self
524 .terraform_binary_path
525 .as_ref()
526 .unwrap_or(&default_terraform_bin_path);
527
528 let working_directory_path = match self.working_directory_path {
529 Some(ref work_dir_path) => work_dir_path.clone(),
530 None => std::env::current_dir()?.join("resources"),
531 };
532
533 let ssh_secret_key_path = match self.ssh_secret_key_path {
534 Some(ref ssh_sk_path) => ssh_sk_path.clone(),
535 None => PathBuf::from(std::env::var("SSH_KEY_PATH")?),
536 };
537
538 let vault_password_path = match self.vault_password_path {
539 Some(ref vault_pw_path) => vault_pw_path.clone(),
540 None => PathBuf::from(std::env::var("ANSIBLE_VAULT_PASSWORD_PATH")?),
541 };
542
543 let terraform_runner = TerraformRunner::new(
544 terraform_binary_path.to_path_buf(),
545 working_directory_path
546 .join("terraform")
547 .join("testnet")
548 .join(provider.to_string()),
549 provider,
550 &state_bucket_name,
551 )?;
552 let ansible_runner = AnsibleRunner::new(
553 self.ansible_forks.unwrap_or(ANSIBLE_DEFAULT_FORKS),
554 self.ansible_verbose_mode,
555 &self.environment_name,
556 provider,
557 ssh_secret_key_path.clone(),
558 vault_password_path,
559 working_directory_path.join("ansible"),
560 )?;
561 let ssh_client = SshClient::new(ssh_secret_key_path);
562 let ansible_provisioner =
563 AnsibleProvisioner::new(ansible_runner, provider, ssh_client.clone());
564 let rpc_client = RpcClient::new(
565 PathBuf::from("/usr/local/bin/safenode_rpc_client"),
566 working_directory_path.clone(),
567 );
568
569 let safe_path = working_directory_path.join("safe");
572 if safe_path.exists() {
573 std::fs::remove_file(safe_path)?;
574 }
575
576 let testnet = TestnetDeployer::new(
577 ansible_provisioner,
578 provider,
579 self.deployment_type.clone(),
580 &self.environment_name,
581 rpc_client,
582 S3Repository {},
583 ssh_client,
584 terraform_runner,
585 working_directory_path,
586 )?;
587
588 Ok(testnet)
589 }
590}
591
592#[derive(Clone)]
593pub struct TestnetDeployer {
594 pub ansible_provisioner: AnsibleProvisioner,
595 pub cloud_provider: CloudProvider,
596 pub deployment_type: EnvironmentType,
597 pub environment_name: String,
598 pub inventory_file_path: PathBuf,
599 pub rpc_client: RpcClient,
600 pub s3_repository: S3Repository,
601 pub ssh_client: SshClient,
602 pub terraform_runner: TerraformRunner,
603 pub working_directory_path: PathBuf,
604}
605
606impl TestnetDeployer {
607 #[allow(clippy::too_many_arguments)]
608 pub fn new(
609 ansible_provisioner: AnsibleProvisioner,
610 cloud_provider: CloudProvider,
611 deployment_type: EnvironmentType,
612 environment_name: &str,
613 rpc_client: RpcClient,
614 s3_repository: S3Repository,
615 ssh_client: SshClient,
616 terraform_runner: TerraformRunner,
617 working_directory_path: PathBuf,
618 ) -> Result<TestnetDeployer> {
619 if environment_name.is_empty() {
620 return Err(Error::EnvironmentNameRequired);
621 }
622 let inventory_file_path = working_directory_path
623 .join("ansible")
624 .join("inventory")
625 .join("dev_inventory_digital_ocean.yml");
626 Ok(TestnetDeployer {
627 ansible_provisioner,
628 cloud_provider,
629 deployment_type,
630 environment_name: environment_name.to_string(),
631 inventory_file_path,
632 rpc_client,
633 ssh_client,
634 s3_repository,
635 terraform_runner,
636 working_directory_path,
637 })
638 }
639
640 pub async fn init(&self) -> Result<()> {
641 if self
642 .s3_repository
643 .folder_exists(
644 "sn-testnet",
645 &format!("testnet-logs/{}", self.environment_name),
646 )
647 .await?
648 {
649 return Err(Error::LogsForPreviousTestnetExist(
650 self.environment_name.clone(),
651 ));
652 }
653
654 self.terraform_runner.init()?;
655 let workspaces = self.terraform_runner.workspace_list()?;
656 if !workspaces.contains(&self.environment_name) {
657 self.terraform_runner
658 .workspace_new(&self.environment_name)?;
659 } else {
660 println!("Workspace {} already exists", self.environment_name);
661 }
662
663 let rpc_client_path = self.working_directory_path.join("safenode_rpc_client");
664 if !rpc_client_path.is_file() {
665 println!("Downloading the rpc client for safenode...");
666 let archive_name = "safenode_rpc_client-latest-x86_64-unknown-linux-musl.tar.gz";
667 get_and_extract_archive_from_s3(
668 &self.s3_repository,
669 "sn-node-rpc-client",
670 archive_name,
671 &self.working_directory_path,
672 )
673 .await?;
674 #[cfg(unix)]
675 {
676 use std::os::unix::fs::PermissionsExt;
677 let mut permissions = std::fs::metadata(&rpc_client_path)?.permissions();
678 permissions.set_mode(0o755); std::fs::set_permissions(&rpc_client_path, permissions)?;
680 }
681 }
682
683 Ok(())
684 }
685
686 pub fn plan(&self, options: &InfraRunOptions) -> Result<()> {
687 println!("Selecting {} workspace...", options.name);
688 self.terraform_runner.workspace_select(&options.name)?;
689
690 let args = build_terraform_args(options)?;
691
692 self.terraform_runner
693 .plan(Some(args), Some(options.tfvars_filename.clone()))?;
694 Ok(())
695 }
696
697 pub fn start(
698 &self,
699 interval: Duration,
700 node_type: Option<NodeType>,
701 custom_inventory: Option<Vec<VirtualMachine>>,
702 ) -> Result<()> {
703 self.ansible_provisioner.start_nodes(
704 &self.environment_name,
705 interval,
706 node_type,
707 custom_inventory,
708 )?;
709 Ok(())
710 }
711
712 pub fn status(&self) -> Result<()> {
718 self.ansible_provisioner.status()?;
719
720 let peer_cache_node_registries = self
721 .ansible_provisioner
722 .get_node_registries(&AnsibleInventoryType::PeerCacheNodes)?;
723 let generic_node_registries = self
724 .ansible_provisioner
725 .get_node_registries(&AnsibleInventoryType::Nodes)?;
726 let private_node_registries = self
727 .ansible_provisioner
728 .get_node_registries(&AnsibleInventoryType::SymmetricPrivateNodes)?;
729 let genesis_node_registry = self
730 .ansible_provisioner
731 .get_node_registries(&AnsibleInventoryType::Genesis)?
732 .clone();
733
734 peer_cache_node_registries.print();
735 generic_node_registries.print();
736 private_node_registries.print();
737 genesis_node_registry.print();
738
739 let all_registries = [
740 &peer_cache_node_registries,
741 &generic_node_registries,
742 &private_node_registries,
743 &genesis_node_registry,
744 ];
745
746 let mut total_nodes = 0;
747 let mut running_nodes = 0;
748 let mut stopped_nodes = 0;
749 let mut added_nodes = 0;
750 let mut removed_nodes = 0;
751
752 for (_, registry) in all_registries
753 .iter()
754 .flat_map(|r| r.retrieved_registries.iter())
755 {
756 for node in registry.nodes.iter() {
757 total_nodes += 1;
758 match node.status {
759 ServiceStatus::Running => running_nodes += 1,
760 ServiceStatus::Stopped => stopped_nodes += 1,
761 ServiceStatus::Added => added_nodes += 1,
762 ServiceStatus::Removed => removed_nodes += 1,
763 }
764 }
765 }
766
767 let peer_cache_hosts = peer_cache_node_registries.retrieved_registries.len();
768 let generic_hosts = generic_node_registries.retrieved_registries.len();
769 let private_hosts = private_node_registries.retrieved_registries.len();
770
771 let peer_cache_nodes = peer_cache_node_registries
772 .retrieved_registries
773 .iter()
774 .flat_map(|(_, n)| n.nodes.iter())
775 .count();
776 let generic_nodes = generic_node_registries
777 .retrieved_registries
778 .iter()
779 .flat_map(|(_, n)| n.nodes.iter())
780 .count();
781 let private_nodes = private_node_registries
782 .retrieved_registries
783 .iter()
784 .flat_map(|(_, n)| n.nodes.iter())
785 .count();
786
787 println!("-------");
788 println!("Summary");
789 println!("-------");
790 println!(
791 "Total peer cache nodes ({}x{}): {}",
792 peer_cache_hosts,
793 peer_cache_nodes / peer_cache_hosts,
794 peer_cache_nodes
795 );
796 println!(
797 "Total generic nodes ({}x{}): {}",
798 generic_hosts,
799 generic_nodes / generic_hosts,
800 generic_nodes
801 );
802 println!(
803 "Total private nodes ({}x{}): {}",
804 private_hosts,
805 private_nodes / private_hosts,
806 private_nodes
807 );
808 println!("Total nodes: {}", total_nodes);
809 println!("Running nodes: {}", running_nodes);
810 println!("Stopped nodes: {}", stopped_nodes);
811 println!("Added nodes: {}", added_nodes);
812 println!("Removed nodes: {}", removed_nodes);
813
814 Ok(())
815 }
816
817 pub fn cleanup_node_logs(&self, setup_cron: bool) -> Result<()> {
818 self.ansible_provisioner.cleanup_node_logs(setup_cron)?;
819 Ok(())
820 }
821
822 pub fn start_telegraf(
823 &self,
824 node_type: Option<NodeType>,
825 custom_inventory: Option<Vec<VirtualMachine>>,
826 ) -> Result<()> {
827 self.ansible_provisioner.start_telegraf(
828 &self.environment_name,
829 node_type,
830 custom_inventory,
831 )?;
832 Ok(())
833 }
834
835 pub fn stop(
836 &self,
837 interval: Duration,
838 node_type: Option<NodeType>,
839 custom_inventory: Option<Vec<VirtualMachine>>,
840 delay: Option<u64>,
841 ) -> Result<()> {
842 self.ansible_provisioner.stop_nodes(
843 &self.environment_name,
844 interval,
845 node_type,
846 custom_inventory,
847 delay,
848 )?;
849 Ok(())
850 }
851
852 pub fn stop_telegraf(
853 &self,
854 node_type: Option<NodeType>,
855 custom_inventory: Option<Vec<VirtualMachine>>,
856 ) -> Result<()> {
857 self.ansible_provisioner.stop_telegraf(
858 &self.environment_name,
859 node_type,
860 custom_inventory,
861 )?;
862 Ok(())
863 }
864
865 pub fn upgrade(&self, options: UpgradeOptions) -> Result<()> {
866 self.ansible_provisioner.upgrade_nodes(&options)?;
867 Ok(())
868 }
869
870 pub fn upgrade_antctl(
871 &self,
872 version: Version,
873 node_type: Option<NodeType>,
874 custom_inventory: Option<Vec<VirtualMachine>>,
875 ) -> Result<()> {
876 self.ansible_provisioner.upgrade_antctl(
877 &self.environment_name,
878 &version,
879 node_type,
880 custom_inventory,
881 )?;
882 Ok(())
883 }
884
885 pub fn upgrade_node_telegraf(&self, name: &str) -> Result<()> {
886 self.ansible_provisioner.upgrade_node_telegraf(name)?;
887 Ok(())
888 }
889
890 pub fn upgrade_uploader_telegraf(&self, name: &str) -> Result<()> {
891 self.ansible_provisioner.upgrade_uploader_telegraf(name)?;
892 Ok(())
893 }
894
895 pub async fn clean(&self) -> Result<()> {
896 let environment_details =
897 get_environment_details(&self.environment_name, &self.s3_repository).await?;
898
899 let evm_network = match environment_details.evm_network {
900 EvmNetwork::Anvil => None,
901 EvmNetwork::Custom => Some(Network::new_custom(
902 environment_details.evm_rpc_url.as_ref().unwrap(),
903 environment_details
904 .evm_payment_token_address
905 .as_ref()
906 .unwrap(),
907 environment_details
908 .evm_data_payments_address
909 .as_ref()
910 .unwrap(),
911 )),
912 EvmNetwork::ArbitrumOne => Some(Network::ArbitrumOne),
913 EvmNetwork::ArbitrumSepolia => Some(Network::ArbitrumSepolia),
914 };
915 if let (Some(network), Some(address)) =
916 (evm_network, &environment_details.funding_wallet_address)
917 {
918 if let Err(err) = self
919 .ansible_provisioner
920 .drain_funds_from_uploaders(
921 Address::from_str(address).map_err(|err| {
922 log::error!("Invalid funding wallet public key: {err:?}");
923 Error::FailedToParseKey
924 })?,
925 network,
926 )
927 .await
928 {
929 log::error!("Failed to drain funds from uploaders: {err:?}");
930 }
931 } else {
932 println!("Custom network provided. Not draining funds.");
933 log::info!("Custom network provided. Not draining funds.");
934 }
935
936 do_clean(
937 &self.environment_name,
938 Some(environment_details),
939 self.working_directory_path.clone(),
940 &self.terraform_runner,
941 None,
942 )
943 .await?;
944 self.s3_repository
945 .delete_object("sn-environment-type", &self.environment_name)
946 .await?;
947 Ok(())
948 }
949}
950
951pub fn get_genesis_multiaddr(
956 ansible_runner: &AnsibleRunner,
957 ssh_client: &SshClient,
958) -> Result<(String, IpAddr)> {
959 let genesis_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Genesis, true)?;
960 let genesis_ip = genesis_inventory[0].public_ip_addr;
961
962 let multiaddr = ssh_client
966 .run_command(
967 &genesis_ip,
968 "root",
969 "jq -r '.nodes[] | select(.peers_args.first == true) | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
970 false,
971 )
972 .map(|output| output.first().cloned())
973 .unwrap_or_else(|err| {
974 log::error!("Failed to find first node with quic-v1 protocol: {err:?}");
975 None
976 });
977
978 let multiaddr = match multiaddr {
980 Some(addr) => addr,
981 None => ssh_client
982 .run_command(
983 &genesis_ip,
984 "root",
985 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
986 false,
987 )?
988 .first()
989 .cloned()
990 .ok_or_else(|| Error::GenesisListenAddress)?,
991 };
992
993 Ok((multiaddr, genesis_ip))
994}
995
996pub fn get_anvil_node_data(
997 ansible_runner: &AnsibleRunner,
998 ssh_client: &SshClient,
999) -> Result<AnvilNodeData> {
1000 let evm_inventory = ansible_runner.get_inventory(AnsibleInventoryType::EvmNodes, true)?;
1001 if evm_inventory.is_empty() {
1002 return Err(Error::EvmNodeNotFound);
1003 }
1004
1005 let evm_ip = evm_inventory[0].public_ip_addr;
1006 debug!("Retrieved IP address for EVM node: {evm_ip}");
1007 let csv_file_path = "/home/ant/.local/share/autonomi/evm_testnet_data.csv";
1008
1009 const MAX_ATTEMPTS: u8 = 5;
1010 const RETRY_DELAY: Duration = Duration::from_secs(5);
1011
1012 for attempt in 1..=MAX_ATTEMPTS {
1013 match ssh_client.run_command(&evm_ip, "ant", &format!("cat {}", csv_file_path), false) {
1014 Ok(output) => {
1015 if let Some(csv_contents) = output.first() {
1016 let parts: Vec<&str> = csv_contents.split(',').collect();
1017 if parts.len() != 4 {
1018 return Err(Error::EvmTestnetDataParsingError(
1019 "Expected 4 fields in the CSV".to_string(),
1020 ));
1021 }
1022
1023 let evm_testnet_data = AnvilNodeData {
1024 rpc_url: parts[0].trim().to_string(),
1025 payment_token_address: parts[1].trim().to_string(),
1026 data_payments_address: parts[2].trim().to_string(),
1027 deployer_wallet_private_key: parts[3].trim().to_string(),
1028 };
1029 return Ok(evm_testnet_data);
1030 }
1031 }
1032 Err(e) => {
1033 if attempt == MAX_ATTEMPTS {
1034 return Err(e);
1035 }
1036 println!(
1037 "Attempt {} failed to read EVM testnet data. Retrying in {} seconds...",
1038 attempt,
1039 RETRY_DELAY.as_secs()
1040 );
1041 }
1042 }
1043 std::thread::sleep(RETRY_DELAY);
1044 }
1045
1046 Err(Error::EvmTestnetDataNotFound)
1047}
1048
1049pub fn get_multiaddr(
1050 ansible_runner: &AnsibleRunner,
1051 ssh_client: &SshClient,
1052) -> Result<(String, IpAddr)> {
1053 let node_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Nodes, true)?;
1054 let node_ip = node_inventory
1057 .iter()
1058 .find(|vm| vm.name.ends_with("-node-1"))
1059 .ok_or_else(|| Error::NodeAddressNotFound)?
1060 .public_ip_addr;
1061
1062 debug!("Getting multiaddr from node {node_ip}");
1063
1064 let multiaddr =
1065 ssh_client
1066 .run_command(
1067 &node_ip,
1068 "root",
1069 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not)' /var/antctl/node_registry.json | head -n 1",
1071 false,
1072 )?.first()
1073 .cloned()
1074 .ok_or_else(|| Error::NodeAddressNotFound)?;
1075
1076 Ok((multiaddr, node_ip))
1079}
1080
1081pub async fn get_and_extract_archive_from_s3(
1082 s3_repository: &S3Repository,
1083 bucket_name: &str,
1084 archive_bucket_path: &str,
1085 dest_path: &Path,
1086) -> Result<()> {
1087 let archive_file_name = archive_bucket_path.split('/').last().unwrap();
1090 let archive_dest_path = dest_path.join(archive_file_name);
1091 s3_repository
1092 .download_object(bucket_name, archive_bucket_path, &archive_dest_path)
1093 .await?;
1094 extract_archive(&archive_dest_path, dest_path)?;
1095 Ok(())
1096}
1097
1098pub fn extract_archive(archive_path: &Path, dest_path: &Path) -> Result<()> {
1099 let archive_file = File::open(archive_path)?;
1100 let decoder = GzDecoder::new(archive_file);
1101 let mut archive = Archive::new(decoder);
1102 let entries = archive.entries()?;
1103 for entry_result in entries {
1104 let mut entry = entry_result?;
1105 let extract_path = dest_path.join(entry.path()?);
1106 if entry.header().entry_type() == tar::EntryType::Directory {
1107 std::fs::create_dir_all(extract_path)?;
1108 continue;
1109 }
1110 let mut file = BufWriter::new(File::create(extract_path)?);
1111 std::io::copy(&mut entry, &mut file)?;
1112 }
1113 std::fs::remove_file(archive_path)?;
1114 Ok(())
1115}
1116
1117pub fn run_external_command(
1118 binary_path: PathBuf,
1119 working_directory_path: PathBuf,
1120 args: Vec<String>,
1121 suppress_stdout: bool,
1122 suppress_stderr: bool,
1123) -> Result<Vec<String>> {
1124 let mut command = Command::new(binary_path.clone());
1125 for arg in &args {
1126 command.arg(arg);
1127 }
1128 command.stdout(Stdio::piped());
1129 command.stderr(Stdio::piped());
1130 command.current_dir(working_directory_path.clone());
1131 debug!("Running {binary_path:#?} with args {args:#?}");
1132 debug!("Working directory set to {working_directory_path:#?}");
1133
1134 let mut child = command.spawn()?;
1135 let mut output_lines = Vec::new();
1136
1137 if let Some(ref mut stdout) = child.stdout {
1138 let reader = BufReader::new(stdout);
1139 for line in reader.lines() {
1140 let line = line?;
1141 if !suppress_stdout {
1142 println!("{line}");
1143 }
1144 output_lines.push(line);
1145 }
1146 }
1147
1148 if let Some(ref mut stderr) = child.stderr {
1149 let reader = BufReader::new(stderr);
1150 for line in reader.lines() {
1151 let line = line?;
1152 if !suppress_stderr {
1153 eprintln!("{line}");
1154 }
1155 output_lines.push(line);
1156 }
1157 }
1158
1159 let output = child.wait()?;
1160 if !output.success() {
1161 let binary_path = binary_path.to_str().unwrap();
1163 return Err(Error::ExternalCommandRunFailed {
1164 binary: binary_path.to_string(),
1165 exit_status: output,
1166 });
1167 }
1168
1169 Ok(output_lines)
1170}
1171
1172pub fn is_binary_on_path(binary_name: &str) -> bool {
1173 if let Ok(path) = std::env::var("PATH") {
1174 for dir in path.split(':') {
1175 let mut full_path = PathBuf::from(dir);
1176 full_path.push(binary_name);
1177 if full_path.exists() {
1178 return true;
1179 }
1180 }
1181 }
1182 false
1183}
1184
1185pub async fn do_clean(
1186 name: &str,
1187 environment_details: Option<EnvironmentDetails>,
1188 working_directory_path: PathBuf,
1189 terraform_runner: &TerraformRunner,
1190 inventory_types: Option<Vec<AnsibleInventoryType>>,
1191) -> Result<()> {
1192 terraform_runner.init()?;
1193 let workspaces = terraform_runner.workspace_list()?;
1194 if !workspaces.contains(&name.to_string()) {
1195 return Err(Error::EnvironmentDoesNotExist(name.to_string()));
1196 }
1197 terraform_runner.workspace_select(name)?;
1198 println!("Selected {name} workspace");
1199
1200 let environment_details = environment_details.ok_or(Error::EnvironmentDetailsNotFound(
1201 "Should be provided during do_clean".to_string(),
1202 ))?;
1203
1204 let options =
1205 InfraRunOptions::generate_existing(name, terraform_runner, &environment_details).await?;
1206 let mut args = Vec::new();
1207 if let Some(full_cone_private_node_volume_size) = options.full_cone_private_node_volume_size {
1208 args.push((
1209 "full_cone_private_node_volume_size".to_string(),
1210 full_cone_private_node_volume_size.to_string(),
1211 ));
1212 }
1213 if let Some(genesis_node_volume_size) = options.genesis_node_volume_size {
1214 args.push((
1215 "genesis_node_volume_size".to_string(),
1216 genesis_node_volume_size.to_string(),
1217 ));
1218 }
1219 if let Some(node_volume_size) = options.node_volume_size {
1220 args.push(("node_volume_size".to_string(), node_volume_size.to_string()));
1221 }
1222 if let Some(peer_cache_node_volume_size) = options.peer_cache_node_volume_size {
1223 args.push((
1224 "peer_cache_node_volume_size".to_string(),
1225 peer_cache_node_volume_size.to_string(),
1226 ));
1227 }
1228 if let Some(symmetric_private_node_volume_size) = options.symmetric_private_node_volume_size {
1229 args.push((
1230 "symmetric_private_node_volume_size".to_string(),
1231 symmetric_private_node_volume_size.to_string(),
1232 ));
1233 }
1234
1235 terraform_runner.destroy(
1236 Some(args),
1237 Some(
1238 environment_details
1239 .environment_type
1240 .get_tfvars_filename(name),
1241 ),
1242 )?;
1243
1244 terraform_runner.workspace_select("dev")?;
1248 terraform_runner.workspace_delete(name)?;
1249 println!("Deleted {name} workspace");
1250
1251 cleanup_environment_inventory(
1252 name,
1253 &working_directory_path.join("ansible").join("inventory"),
1254 inventory_types,
1255 )?;
1256
1257 println!("Deleted Ansible inventory for {name}");
1258 Ok(())
1259}
1260
1261pub fn get_wallet_directory() -> Result<PathBuf> {
1262 Ok(dirs_next::data_dir()
1263 .ok_or_else(|| Error::CouldNotRetrieveDataDirectory)?
1264 .join("safe")
1265 .join("client")
1266 .join("wallet"))
1267}
1268
1269pub async fn notify_slack(inventory: DeploymentInventory) -> Result<()> {
1270 let webhook_url =
1271 std::env::var("SLACK_WEBHOOK_URL").map_err(|_| Error::SlackWebhookUrlNotSupplied)?;
1272
1273 let mut message = String::new();
1274 message.push_str("*Testnet Details*\n");
1275 message.push_str(&format!("Name: {}\n", inventory.name));
1276 message.push_str(&format!("Node count: {}\n", inventory.peers().len()));
1277 message.push_str(&format!("Faucet address: {:?}\n", inventory.faucet_address));
1278 match inventory.binary_option {
1279 BinaryOption::BuildFromSource {
1280 ref repo_owner,
1281 ref branch,
1282 ..
1283 } => {
1284 message.push_str("*Branch Details*\n");
1285 message.push_str(&format!("Repo owner: {}\n", repo_owner));
1286 message.push_str(&format!("Branch: {}\n", branch));
1287 }
1288 BinaryOption::Versioned {
1289 ant_version: ref safe_version,
1290 antnode_version: ref safenode_version,
1291 antctl_version: ref safenode_manager_version,
1292 ..
1293 } => {
1294 message.push_str("*Version Details*\n");
1295 message.push_str(&format!(
1296 "ant version: {}\n",
1297 safe_version
1298 .as_ref()
1299 .map_or("None".to_string(), |v| v.to_string())
1300 ));
1301 message.push_str(&format!("safenode version: {}\n", safenode_version));
1302 message.push_str(&format!("antctl version: {}\n", safenode_manager_version));
1303 }
1304 }
1305
1306 message.push_str("*Sample Peers*\n");
1307 message.push_str("```\n");
1308 for peer in inventory.peers().iter().take(20) {
1309 message.push_str(&format!("{peer}\n"));
1310 }
1311 message.push_str("```\n");
1312 message.push_str("*Available Files*\n");
1313 message.push_str("```\n");
1314 for (addr, file_name) in inventory.uploaded_files.iter() {
1315 message.push_str(&format!("{}: {}\n", addr, file_name))
1316 }
1317 message.push_str("```\n");
1318
1319 let payload = json!({
1320 "text": message,
1321 });
1322 reqwest::Client::new()
1323 .post(webhook_url)
1324 .json(&payload)
1325 .send()
1326 .await?;
1327 println!("{message}");
1328 println!("Posted notification to Slack");
1329 Ok(())
1330}
1331
1332fn print_duration(duration: Duration) {
1333 let total_seconds = duration.as_secs();
1334 let minutes = total_seconds / 60;
1335 let seconds = total_seconds % 60;
1336 debug!("Time taken: {} minutes and {} seconds", minutes, seconds);
1337}
1338
1339pub fn get_progress_bar(length: u64) -> Result<ProgressBar> {
1340 let progress_bar = ProgressBar::new(length);
1341 progress_bar.set_style(
1342 ProgressStyle::default_bar()
1343 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len}")?
1344 .progress_chars("#>-"),
1345 );
1346 progress_bar.enable_steady_tick(Duration::from_millis(100));
1347 Ok(progress_bar)
1348}
1349pub async fn get_environment_details(
1350 environment_name: &str,
1351 s3_repository: &S3Repository,
1352) -> Result<EnvironmentDetails> {
1353 let temp_file = tempfile::NamedTempFile::new()?;
1354
1355 let max_retries = 3;
1356 let mut retries = 0;
1357 let env_details = loop {
1358 debug!("Downloading the environment details file for {environment_name} from S3");
1359 match s3_repository
1360 .download_object("sn-environment-type", environment_name, temp_file.path())
1361 .await
1362 {
1363 Ok(_) => {
1364 debug!("Downloaded the environment details file for {environment_name} from S3");
1365 let content = match std::fs::read_to_string(temp_file.path()) {
1366 Ok(content) => content,
1367 Err(err) => {
1368 log::error!("Could not read the environment details file: {err:?}");
1369 if retries < max_retries {
1370 debug!("Retrying to read the environment details file");
1371 retries += 1;
1372 continue;
1373 } else {
1374 return Err(Error::EnvironmentDetailsNotFound(
1375 environment_name.to_string(),
1376 ));
1377 }
1378 }
1379 };
1380 trace!("Content of the environment details file: {}", content);
1381
1382 match serde_json::from_str(&content) {
1383 Ok(environment_details) => break environment_details,
1384 Err(err) => {
1385 log::error!("Could not parse the environment details file: {err:?}");
1386 if retries < max_retries {
1387 debug!("Retrying to parse the environment details file");
1388 retries += 1;
1389 continue;
1390 } else {
1391 return Err(Error::EnvironmentDetailsNotFound(
1392 environment_name.to_string(),
1393 ));
1394 }
1395 }
1396 }
1397 }
1398 Err(err) => {
1399 log::error!(
1400 "Could not download the environment details file for {environment_name} from S3: {err:?}"
1401 );
1402 if retries < max_retries {
1403 retries += 1;
1404 continue;
1405 } else {
1406 return Err(Error::EnvironmentDetailsNotFound(
1407 environment_name.to_string(),
1408 ));
1409 }
1410 }
1411 }
1412 };
1413
1414 debug!("Fetched environment details: {env_details:?}");
1415
1416 Ok(env_details)
1417}
1418
1419pub async fn write_environment_details(
1420 s3_repository: &S3Repository,
1421 environment_name: &str,
1422 environment_details: &EnvironmentDetails,
1423) -> Result<()> {
1424 let temp_dir = tempfile::tempdir()?;
1425 let path = temp_dir.path().to_path_buf().join(environment_name);
1426 let mut file = File::create(&path)?;
1427 let json = serde_json::to_string(environment_details)?;
1428 file.write_all(json.as_bytes())?;
1429 s3_repository
1430 .upload_file("sn-environment-type", &path, true)
1431 .await?;
1432 Ok(())
1433}
1434
1435pub fn calculate_size_per_attached_volume(node_count: u16) -> u16 {
1436 if node_count == 0 {
1437 return 0;
1438 }
1439 let total_volume_required = node_count * STORAGE_REQUIRED_PER_NODE;
1440
1441 (total_volume_required as f64 / 7.0).ceil() as u16
1443}
1444
1445pub fn get_bootstrap_cache_url(ip_addr: &IpAddr) -> String {
1446 format!("http://{ip_addr}/bootstrap_cache.json")
1447}