1pub mod ansible;
8pub mod bootstrap;
9pub mod deploy;
10pub mod digital_ocean;
11pub mod error;
12pub mod funding;
13pub mod infra;
14pub mod inventory;
15pub mod logs;
16pub mod logstash;
17pub mod reserved_ip;
18pub mod rpc_client;
19pub mod s3;
20pub mod safe;
21pub mod setup;
22pub mod ssh;
23pub mod terraform;
24pub mod upscale;
25
26const STORAGE_REQUIRED_PER_NODE: u16 = 7;
27
28use crate::{
29 ansible::{
30 extra_vars::ExtraVarsDocBuilder,
31 inventory::{cleanup_environment_inventory, AnsibleInventoryType},
32 provisioning::AnsibleProvisioner,
33 AnsibleRunner,
34 },
35 error::{Error, Result},
36 inventory::{DeploymentInventory, VirtualMachine},
37 rpc_client::RpcClient,
38 s3::S3Repository,
39 ssh::SshClient,
40 terraform::TerraformRunner,
41};
42use alloy::primitives::Address;
43use ant_service_management::ServiceStatus;
44use evmlib::Network;
45use flate2::read::GzDecoder;
46use indicatif::{ProgressBar, ProgressStyle};
47use infra::{build_terraform_args, InfraRunOptions};
48use log::{debug, trace};
49use semver::Version;
50use serde::{Deserialize, Serialize};
51use serde_json::json;
52use std::{
53 fs::File,
54 io::{BufRead, BufReader, BufWriter, Write},
55 net::IpAddr,
56 path::{Path, PathBuf},
57 process::{Command, Stdio},
58 str::FromStr,
59 time::Duration,
60};
61use tar::Archive;
62
63const ANSIBLE_DEFAULT_FORKS: usize = 50;
64
65#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
66pub enum DeploymentType {
67 Bootstrap,
69 #[default]
71 New,
72}
73
74#[derive(Debug, Clone, Default, Serialize, Deserialize)]
75pub struct AnvilNodeData {
76 pub data_payments_address: String,
77 pub deployer_wallet_private_key: String,
78 pub payment_token_address: String,
79 pub rpc_url: String,
80}
81
82impl std::fmt::Display for DeploymentType {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self {
85 DeploymentType::Bootstrap => write!(f, "bootstrap"),
86 DeploymentType::New => write!(f, "new"),
87 }
88 }
89}
90
91impl std::str::FromStr for DeploymentType {
92 type Err = String;
93
94 fn from_str(s: &str) -> Result<Self, Self::Err> {
95 match s.to_lowercase().as_str() {
96 "bootstrap" => Ok(DeploymentType::Bootstrap),
97 "new" => Ok(DeploymentType::New),
98 _ => Err(format!("Invalid deployment type: {}", s)),
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
104pub enum NodeType {
105 FullConePrivateNode,
106 Generic,
107 Genesis,
108 PeerCache,
109 SymmetricPrivateNode,
110}
111
112impl std::str::FromStr for NodeType {
113 type Err = String;
114
115 fn from_str(s: &str) -> Result<Self, Self::Err> {
116 match s.to_lowercase().as_str() {
117 "full-cone-private" => Ok(NodeType::FullConePrivateNode),
118 "generic" => Ok(NodeType::Generic),
119 "genesis" => Ok(NodeType::Genesis),
120 "peer-cache" => Ok(NodeType::PeerCache),
121 "symmetric-private" => Ok(NodeType::SymmetricPrivateNode),
122 _ => Err(format!("Invalid node type: {}", s)),
123 }
124 }
125}
126
127impl NodeType {
128 pub fn telegraf_role(&self) -> &'static str {
129 match self {
130 NodeType::FullConePrivateNode => "NAT_FULL_CONE_NODE",
131 NodeType::Generic => "GENERIC_NODE",
132 NodeType::Genesis => "GENESIS_NODE",
133 NodeType::PeerCache => "PEER_CACHE_NODE",
134 NodeType::SymmetricPrivateNode => "NAT_RANDOMIZED_NODE",
135 }
136 }
137
138 pub fn to_ansible_inventory_type(&self) -> AnsibleInventoryType {
139 match self {
140 NodeType::FullConePrivateNode => AnsibleInventoryType::FullConePrivateNodes,
141 NodeType::Generic => AnsibleInventoryType::Nodes,
142 NodeType::Genesis => AnsibleInventoryType::Genesis,
143 NodeType::PeerCache => AnsibleInventoryType::PeerCacheNodes,
144 NodeType::SymmetricPrivateNode => AnsibleInventoryType::SymmetricPrivateNodes,
145 }
146 }
147}
148
149#[derive(Clone, Debug, Default, Eq, Serialize, Deserialize, PartialEq)]
150pub enum EvmNetwork {
151 #[default]
152 Anvil,
153 ArbitrumOne,
154 ArbitrumSepolia,
155 Custom,
156}
157
158impl std::fmt::Display for EvmNetwork {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 match self {
161 EvmNetwork::Anvil => write!(f, "evm-custom"),
162 EvmNetwork::ArbitrumOne => write!(f, "evm-arbitrum-one"),
163 EvmNetwork::ArbitrumSepolia => write!(f, "evm-arbitrum-sepolia"),
164 EvmNetwork::Custom => write!(f, "evm-custom"),
165 }
166 }
167}
168
169impl std::str::FromStr for EvmNetwork {
170 type Err = String;
171
172 fn from_str(s: &str) -> Result<Self, Self::Err> {
173 match s.to_lowercase().as_str() {
174 "anvil" => Ok(EvmNetwork::Anvil),
175 "arbitrum-one" => Ok(EvmNetwork::ArbitrumOne),
176 "arbitrum-sepolia" => Ok(EvmNetwork::ArbitrumSepolia),
177 "custom" => Ok(EvmNetwork::Custom),
178 _ => Err(format!("Invalid EVM network type: {}", s)),
179 }
180 }
181}
182
183#[derive(Clone, Debug, Default, Serialize, Deserialize)]
184pub struct EnvironmentDetails {
185 pub deployment_type: DeploymentType,
186 pub environment_type: EnvironmentType,
187 pub evm_network: EvmNetwork,
188 pub evm_data_payments_address: Option<String>,
189 pub evm_payment_token_address: Option<String>,
190 pub evm_rpc_url: Option<String>,
191 pub funding_wallet_address: Option<String>,
192 pub network_id: Option<u8>,
193 pub rewards_address: String,
194}
195
196#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
197pub enum EnvironmentType {
198 #[default]
199 Development,
200 Production,
201 Staging,
202}
203
204impl EnvironmentType {
205 pub fn get_tfvars_filename(&self, name: &str) -> String {
206 match self {
207 EnvironmentType::Development => "dev.tfvars".to_string(),
208 EnvironmentType::Staging => "staging.tfvars".to_string(),
209 EnvironmentType::Production => {
210 format!("{name}.tfvars",)
211 }
212 }
213 }
214
215 pub fn get_default_peer_cache_node_count(&self) -> u16 {
216 match self {
217 EnvironmentType::Development => 5,
218 EnvironmentType::Production => 5,
219 EnvironmentType::Staging => 5,
220 }
221 }
222
223 pub fn get_default_node_count(&self) -> u16 {
224 match self {
225 EnvironmentType::Development => 25,
226 EnvironmentType::Production => 25,
227 EnvironmentType::Staging => 25,
228 }
229 }
230
231 pub fn get_default_symmetric_private_node_count(&self) -> u16 {
232 self.get_default_node_count()
233 }
234
235 pub fn get_default_full_cone_private_node_count(&self) -> u16 {
236 self.get_default_node_count()
237 }
238}
239
240impl std::fmt::Display for EnvironmentType {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 match self {
243 EnvironmentType::Development => write!(f, "development"),
244 EnvironmentType::Production => write!(f, "production"),
245 EnvironmentType::Staging => write!(f, "staging"),
246 }
247 }
248}
249
250impl FromStr for EnvironmentType {
251 type Err = Error;
252
253 fn from_str(s: &str) -> Result<Self, Self::Err> {
254 match s.to_lowercase().as_str() {
255 "development" => Ok(EnvironmentType::Development),
256 "production" => Ok(EnvironmentType::Production),
257 "staging" => Ok(EnvironmentType::Staging),
258 _ => Err(Error::EnvironmentNameFromStringError(s.to_string())),
259 }
260 }
261}
262
263#[derive(Clone, Debug, Serialize, Deserialize)]
277pub enum BinaryOption {
278 BuildFromSource {
280 antnode_features: Option<String>,
282 branch: String,
283 repo_owner: String,
284 },
285 Versioned {
287 ant_version: Option<Version>,
288 antctl_version: Version,
289 antnode_version: Version,
290 },
291}
292
293impl BinaryOption {
294 pub fn print(&self) {
295 match self {
296 BinaryOption::BuildFromSource {
297 antnode_features,
298 branch,
299 repo_owner,
300 } => {
301 println!("Source configuration:");
302 println!(" Repository owner: {}", repo_owner);
303 println!(" Branch: {}", branch);
304 if let Some(features) = antnode_features {
305 println!(" Antnode features: {}", features);
306 }
307 }
308 BinaryOption::Versioned {
309 ant_version,
310 antctl_version,
311 antnode_version,
312 } => {
313 println!("Versioned binaries configuration:");
314 if let Some(version) = ant_version {
315 println!(" ant version: {}", version);
316 }
317 println!(" antctl version: {}", antctl_version);
318 println!(" antnode version: {}", antnode_version);
319 }
320 }
321 }
322}
323
324#[derive(Debug, Clone, Copy)]
325pub enum CloudProvider {
326 Aws,
327 DigitalOcean,
328}
329
330impl std::fmt::Display for CloudProvider {
331 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332 match self {
333 CloudProvider::Aws => write!(f, "aws"),
334 CloudProvider::DigitalOcean => write!(f, "digital-ocean"),
335 }
336 }
337}
338
339impl CloudProvider {
340 pub fn get_ssh_user(&self) -> String {
341 match self {
342 CloudProvider::Aws => "ubuntu".to_string(),
343 CloudProvider::DigitalOcean => "root".to_string(),
344 }
345 }
346}
347
348#[derive(Debug, Clone, Copy)]
349pub enum LogFormat {
350 Default,
351 Json,
352}
353
354impl LogFormat {
355 pub fn parse_from_str(val: &str) -> Result<Self> {
356 match val {
357 "default" => Ok(LogFormat::Default),
358 "json" => Ok(LogFormat::Json),
359 _ => Err(Error::LoggingConfiguration(
360 "The only valid values for this argument are \"default\" or \"json\"".to_string(),
361 )),
362 }
363 }
364
365 pub fn as_str(&self) -> &'static str {
366 match self {
367 LogFormat::Default => "default",
368 LogFormat::Json => "json",
369 }
370 }
371}
372
373#[derive(Clone)]
374pub struct UpgradeOptions {
375 pub ansible_verbose: bool,
376 pub custom_inventory: Option<Vec<VirtualMachine>>,
377 pub env_variables: Option<Vec<(String, String)>>,
378 pub force: bool,
379 pub forks: usize,
380 pub interval: Duration,
381 pub name: String,
382 pub node_type: Option<NodeType>,
383 pub pre_upgrade_delay: Option<u64>,
384 pub provider: CloudProvider,
385 pub version: Option<String>,
386}
387
388impl UpgradeOptions {
389 pub fn get_ansible_vars(&self) -> String {
390 let mut extra_vars = ExtraVarsDocBuilder::default();
391 extra_vars.add_variable("interval", &self.interval.as_millis().to_string());
392 if let Some(env_variables) = &self.env_variables {
393 extra_vars.add_env_variable_list("env_variables", env_variables.clone());
394 }
395 if self.force {
396 extra_vars.add_variable("force", &self.force.to_string());
397 }
398 if let Some(version) = &self.version {
399 extra_vars.add_variable("antnode_version", version);
400 }
401 if let Some(pre_upgrade_delay) = &self.pre_upgrade_delay {
402 extra_vars.add_variable("pre_upgrade_delay", &pre_upgrade_delay.to_string());
403 }
404 extra_vars.build()
405 }
406}
407
408#[derive(Default)]
409pub struct TestnetDeployBuilder {
410 ansible_forks: Option<usize>,
411 ansible_verbose_mode: bool,
412 deployment_type: EnvironmentType,
413 environment_name: String,
414 provider: Option<CloudProvider>,
415 ssh_secret_key_path: Option<PathBuf>,
416 state_bucket_name: Option<String>,
417 terraform_binary_path: Option<PathBuf>,
418 vault_password_path: Option<PathBuf>,
419 working_directory_path: Option<PathBuf>,
420}
421
422impl TestnetDeployBuilder {
423 pub fn new() -> Self {
424 Default::default()
425 }
426
427 pub fn ansible_verbose_mode(&mut self, ansible_verbose_mode: bool) -> &mut Self {
428 self.ansible_verbose_mode = ansible_verbose_mode;
429 self
430 }
431
432 pub fn ansible_forks(&mut self, ansible_forks: usize) -> &mut Self {
433 self.ansible_forks = Some(ansible_forks);
434 self
435 }
436
437 pub fn deployment_type(&mut self, deployment_type: EnvironmentType) -> &mut Self {
438 self.deployment_type = deployment_type;
439 self
440 }
441
442 pub fn environment_name(&mut self, name: &str) -> &mut Self {
443 self.environment_name = name.to_string();
444 self
445 }
446
447 pub fn provider(&mut self, provider: CloudProvider) -> &mut Self {
448 self.provider = Some(provider);
449 self
450 }
451
452 pub fn state_bucket_name(&mut self, state_bucket_name: String) -> &mut Self {
453 self.state_bucket_name = Some(state_bucket_name);
454 self
455 }
456
457 pub fn terraform_binary_path(&mut self, terraform_binary_path: PathBuf) -> &mut Self {
458 self.terraform_binary_path = Some(terraform_binary_path);
459 self
460 }
461
462 pub fn working_directory(&mut self, working_directory_path: PathBuf) -> &mut Self {
463 self.working_directory_path = Some(working_directory_path);
464 self
465 }
466
467 pub fn ssh_secret_key_path(&mut self, ssh_secret_key_path: PathBuf) -> &mut Self {
468 self.ssh_secret_key_path = Some(ssh_secret_key_path);
469 self
470 }
471
472 pub fn vault_password_path(&mut self, vault_password_path: PathBuf) -> &mut Self {
473 self.vault_password_path = Some(vault_password_path);
474 self
475 }
476
477 pub fn build(&self) -> Result<TestnetDeployer> {
478 let provider = self.provider.unwrap_or(CloudProvider::DigitalOcean);
479 match provider {
480 CloudProvider::DigitalOcean => {
481 let digital_ocean_pat = std::env::var("DO_PAT").map_err(|_| {
482 Error::CloudProviderCredentialsNotSupplied("DO_PAT".to_string())
483 })?;
484 std::env::set_var("DIGITALOCEAN_TOKEN", digital_ocean_pat.clone());
488 std::env::set_var("DO_API_TOKEN", digital_ocean_pat);
489 }
490 _ => {
491 return Err(Error::CloudProviderNotSupported(provider.to_string()));
492 }
493 }
494
495 let state_bucket_name = match self.state_bucket_name {
496 Some(ref bucket_name) => bucket_name.clone(),
497 None => std::env::var("TERRAFORM_STATE_BUCKET_NAME")?,
498 };
499
500 let default_terraform_bin_path = PathBuf::from("terraform");
501 let terraform_binary_path = self
502 .terraform_binary_path
503 .as_ref()
504 .unwrap_or(&default_terraform_bin_path);
505
506 let working_directory_path = match self.working_directory_path {
507 Some(ref work_dir_path) => work_dir_path.clone(),
508 None => std::env::current_dir()?.join("resources"),
509 };
510
511 let ssh_secret_key_path = match self.ssh_secret_key_path {
512 Some(ref ssh_sk_path) => ssh_sk_path.clone(),
513 None => PathBuf::from(std::env::var("SSH_KEY_PATH")?),
514 };
515
516 let vault_password_path = match self.vault_password_path {
517 Some(ref vault_pw_path) => vault_pw_path.clone(),
518 None => PathBuf::from(std::env::var("ANSIBLE_VAULT_PASSWORD_PATH")?),
519 };
520
521 let terraform_runner = TerraformRunner::new(
522 terraform_binary_path.to_path_buf(),
523 working_directory_path
524 .join("terraform")
525 .join("testnet")
526 .join(provider.to_string()),
527 provider,
528 &state_bucket_name,
529 )?;
530 let ansible_runner = AnsibleRunner::new(
531 self.ansible_forks.unwrap_or(ANSIBLE_DEFAULT_FORKS),
532 self.ansible_verbose_mode,
533 &self.environment_name,
534 provider,
535 ssh_secret_key_path.clone(),
536 vault_password_path,
537 working_directory_path.join("ansible"),
538 )?;
539 let ssh_client = SshClient::new(ssh_secret_key_path);
540 let ansible_provisioner =
541 AnsibleProvisioner::new(ansible_runner, provider, ssh_client.clone());
542 let rpc_client = RpcClient::new(
543 PathBuf::from("/usr/local/bin/safenode_rpc_client"),
544 working_directory_path.clone(),
545 );
546
547 let safe_path = working_directory_path.join("safe");
550 if safe_path.exists() {
551 std::fs::remove_file(safe_path)?;
552 }
553
554 let testnet = TestnetDeployer::new(
555 ansible_provisioner,
556 provider,
557 self.deployment_type.clone(),
558 &self.environment_name,
559 rpc_client,
560 S3Repository {},
561 ssh_client,
562 terraform_runner,
563 working_directory_path,
564 )?;
565
566 Ok(testnet)
567 }
568}
569
570#[derive(Clone)]
571pub struct TestnetDeployer {
572 pub ansible_provisioner: AnsibleProvisioner,
573 pub cloud_provider: CloudProvider,
574 pub deployment_type: EnvironmentType,
575 pub environment_name: String,
576 pub inventory_file_path: PathBuf,
577 pub rpc_client: RpcClient,
578 pub s3_repository: S3Repository,
579 pub ssh_client: SshClient,
580 pub terraform_runner: TerraformRunner,
581 pub working_directory_path: PathBuf,
582}
583
584impl TestnetDeployer {
585 #[allow(clippy::too_many_arguments)]
586 pub fn new(
587 ansible_provisioner: AnsibleProvisioner,
588 cloud_provider: CloudProvider,
589 deployment_type: EnvironmentType,
590 environment_name: &str,
591 rpc_client: RpcClient,
592 s3_repository: S3Repository,
593 ssh_client: SshClient,
594 terraform_runner: TerraformRunner,
595 working_directory_path: PathBuf,
596 ) -> Result<TestnetDeployer> {
597 if environment_name.is_empty() {
598 return Err(Error::EnvironmentNameRequired);
599 }
600 let inventory_file_path = working_directory_path
601 .join("ansible")
602 .join("inventory")
603 .join("dev_inventory_digital_ocean.yml");
604 Ok(TestnetDeployer {
605 ansible_provisioner,
606 cloud_provider,
607 deployment_type,
608 environment_name: environment_name.to_string(),
609 inventory_file_path,
610 rpc_client,
611 ssh_client,
612 s3_repository,
613 terraform_runner,
614 working_directory_path,
615 })
616 }
617
618 pub async fn init(&self) -> Result<()> {
619 if self
620 .s3_repository
621 .folder_exists(
622 "sn-testnet",
623 &format!("testnet-logs/{}", self.environment_name),
624 )
625 .await?
626 {
627 return Err(Error::LogsForPreviousTestnetExist(
628 self.environment_name.clone(),
629 ));
630 }
631
632 self.terraform_runner.init()?;
633 let workspaces = self.terraform_runner.workspace_list()?;
634 if !workspaces.contains(&self.environment_name) {
635 self.terraform_runner
636 .workspace_new(&self.environment_name)?;
637 } else {
638 println!("Workspace {} already exists", self.environment_name);
639 }
640
641 let rpc_client_path = self.working_directory_path.join("safenode_rpc_client");
642 if !rpc_client_path.is_file() {
643 println!("Downloading the rpc client for safenode...");
644 let archive_name = "safenode_rpc_client-latest-x86_64-unknown-linux-musl.tar.gz";
645 get_and_extract_archive_from_s3(
646 &self.s3_repository,
647 "sn-node-rpc-client",
648 archive_name,
649 &self.working_directory_path,
650 )
651 .await?;
652 #[cfg(unix)]
653 {
654 use std::os::unix::fs::PermissionsExt;
655 let mut permissions = std::fs::metadata(&rpc_client_path)?.permissions();
656 permissions.set_mode(0o755); std::fs::set_permissions(&rpc_client_path, permissions)?;
658 }
659 }
660
661 Ok(())
662 }
663
664 pub fn plan(&self, options: &InfraRunOptions) -> Result<()> {
665 println!("Selecting {} workspace...", options.name);
666 self.terraform_runner.workspace_select(&options.name)?;
667
668 let args = build_terraform_args(options)?;
669
670 self.terraform_runner
671 .plan(Some(args), Some(options.tfvars_filename.clone()))?;
672 Ok(())
673 }
674
675 pub fn start(
676 &self,
677 interval: Duration,
678 node_type: Option<NodeType>,
679 custom_inventory: Option<Vec<VirtualMachine>>,
680 ) -> Result<()> {
681 self.ansible_provisioner.start_nodes(
682 &self.environment_name,
683 interval,
684 node_type,
685 custom_inventory,
686 )?;
687 Ok(())
688 }
689
690 pub fn status(&self) -> Result<()> {
696 self.ansible_provisioner.status()?;
697
698 let peer_cache_node_registries = self
699 .ansible_provisioner
700 .get_node_registries(&AnsibleInventoryType::PeerCacheNodes)?;
701 let generic_node_registries = self
702 .ansible_provisioner
703 .get_node_registries(&AnsibleInventoryType::Nodes)?;
704 let symmetric_private_node_registries = self
705 .ansible_provisioner
706 .get_node_registries(&AnsibleInventoryType::SymmetricPrivateNodes)?;
707 let full_cone_private_node_registries = self
708 .ansible_provisioner
709 .get_node_registries(&AnsibleInventoryType::FullConePrivateNodes)?;
710 let genesis_node_registry = self
711 .ansible_provisioner
712 .get_node_registries(&AnsibleInventoryType::Genesis)?
713 .clone();
714
715 peer_cache_node_registries.print();
716 generic_node_registries.print();
717 symmetric_private_node_registries.print();
718 full_cone_private_node_registries.print();
719 genesis_node_registry.print();
720
721 let all_registries = [
722 &peer_cache_node_registries,
723 &generic_node_registries,
724 &symmetric_private_node_registries,
725 &full_cone_private_node_registries,
726 &genesis_node_registry,
727 ];
728
729 let mut total_nodes = 0;
730 let mut running_nodes = 0;
731 let mut stopped_nodes = 0;
732 let mut added_nodes = 0;
733 let mut removed_nodes = 0;
734
735 for (_, registry) in all_registries
736 .iter()
737 .flat_map(|r| r.retrieved_registries.iter())
738 {
739 for node in registry.nodes.iter() {
740 total_nodes += 1;
741 match node.status {
742 ServiceStatus::Running => running_nodes += 1,
743 ServiceStatus::Stopped => stopped_nodes += 1,
744 ServiceStatus::Added => added_nodes += 1,
745 ServiceStatus::Removed => removed_nodes += 1,
746 }
747 }
748 }
749
750 let peer_cache_hosts = peer_cache_node_registries.retrieved_registries.len();
751 let generic_hosts = generic_node_registries.retrieved_registries.len();
752 let symmetric_private_hosts = symmetric_private_node_registries.retrieved_registries.len();
753 let full_cone_private_hosts = full_cone_private_node_registries.retrieved_registries.len();
754
755 let peer_cache_nodes = peer_cache_node_registries
756 .retrieved_registries
757 .iter()
758 .flat_map(|(_, n)| n.nodes.iter())
759 .count();
760 let generic_nodes = generic_node_registries
761 .retrieved_registries
762 .iter()
763 .flat_map(|(_, n)| n.nodes.iter())
764 .count();
765 let symmetric_private_nodes = symmetric_private_node_registries
766 .retrieved_registries
767 .iter()
768 .flat_map(|(_, n)| n.nodes.iter())
769 .count();
770 let full_cone_private_nodes = full_cone_private_node_registries
771 .retrieved_registries
772 .iter()
773 .flat_map(|(_, n)| n.nodes.iter())
774 .count();
775
776 println!("-------");
777 println!("Summary");
778 println!("-------");
779 println!(
780 "Total peer cache nodes ({}x{}): {}",
781 peer_cache_hosts,
782 if peer_cache_hosts > 0 {
783 peer_cache_nodes / peer_cache_hosts
784 } else {
785 0
786 },
787 peer_cache_nodes
788 );
789 println!(
790 "Total generic nodes ({}x{}): {}",
791 generic_hosts,
792 if generic_hosts > 0 {
793 generic_nodes / generic_hosts
794 } else {
795 0
796 },
797 generic_nodes
798 );
799 println!(
800 "Total symmetric private nodes ({}x{}): {}",
801 symmetric_private_hosts,
802 if symmetric_private_hosts > 0 {
803 symmetric_private_nodes / symmetric_private_hosts
804 } else {
805 0
806 },
807 symmetric_private_nodes
808 );
809 println!(
810 "Total full cone private nodes ({}x{}): {}",
811 full_cone_private_hosts,
812 if full_cone_private_hosts > 0 {
813 full_cone_private_nodes / full_cone_private_hosts
814 } else {
815 0
816 },
817 full_cone_private_nodes
818 );
819 println!("Total nodes: {}", total_nodes);
820 println!("Running nodes: {}", running_nodes);
821 println!("Stopped nodes: {}", stopped_nodes);
822 println!("Added nodes: {}", added_nodes);
823 println!("Removed nodes: {}", removed_nodes);
824
825 Ok(())
826 }
827
828 pub fn cleanup_node_logs(&self, setup_cron: bool) -> Result<()> {
829 self.ansible_provisioner.cleanup_node_logs(setup_cron)?;
830 Ok(())
831 }
832
833 pub fn start_telegraf(
834 &self,
835 node_type: Option<NodeType>,
836 custom_inventory: Option<Vec<VirtualMachine>>,
837 ) -> Result<()> {
838 self.ansible_provisioner.start_telegraf(
839 &self.environment_name,
840 node_type,
841 custom_inventory,
842 )?;
843 Ok(())
844 }
845
846 pub fn stop(
847 &self,
848 interval: Duration,
849 node_type: Option<NodeType>,
850 custom_inventory: Option<Vec<VirtualMachine>>,
851 delay: Option<u64>,
852 ) -> Result<()> {
853 self.ansible_provisioner.stop_nodes(
854 &self.environment_name,
855 interval,
856 node_type,
857 custom_inventory,
858 delay,
859 )?;
860 Ok(())
861 }
862
863 pub fn stop_telegraf(
864 &self,
865 node_type: Option<NodeType>,
866 custom_inventory: Option<Vec<VirtualMachine>>,
867 ) -> Result<()> {
868 self.ansible_provisioner.stop_telegraf(
869 &self.environment_name,
870 node_type,
871 custom_inventory,
872 )?;
873 Ok(())
874 }
875
876 pub fn upgrade(&self, options: UpgradeOptions) -> Result<()> {
877 self.ansible_provisioner.upgrade_nodes(&options)?;
878 Ok(())
879 }
880
881 pub fn upgrade_antctl(
882 &self,
883 version: Version,
884 node_type: Option<NodeType>,
885 custom_inventory: Option<Vec<VirtualMachine>>,
886 ) -> Result<()> {
887 self.ansible_provisioner.upgrade_antctl(
888 &self.environment_name,
889 &version,
890 node_type,
891 custom_inventory,
892 )?;
893 Ok(())
894 }
895
896 pub fn upgrade_node_telegraf(&self, name: &str) -> Result<()> {
897 self.ansible_provisioner.upgrade_node_telegraf(name)?;
898 Ok(())
899 }
900
901 pub fn upgrade_uploader_telegraf(&self, name: &str) -> Result<()> {
902 self.ansible_provisioner.upgrade_uploader_telegraf(name)?;
903 Ok(())
904 }
905
906 pub async fn clean(&self) -> Result<()> {
907 let environment_details =
908 get_environment_details(&self.environment_name, &self.s3_repository).await?;
909
910 let evm_network = match environment_details.evm_network {
911 EvmNetwork::Anvil => None,
912 EvmNetwork::Custom => Some(Network::new_custom(
913 environment_details.evm_rpc_url.as_ref().unwrap(),
914 environment_details
915 .evm_payment_token_address
916 .as_ref()
917 .unwrap(),
918 environment_details
919 .evm_data_payments_address
920 .as_ref()
921 .unwrap(),
922 )),
923 EvmNetwork::ArbitrumOne => Some(Network::ArbitrumOne),
924 EvmNetwork::ArbitrumSepolia => Some(Network::ArbitrumSepolia),
925 };
926 if let (Some(network), Some(address)) =
927 (evm_network, &environment_details.funding_wallet_address)
928 {
929 if let Err(err) = self
930 .ansible_provisioner
931 .drain_funds_from_uploaders(
932 Address::from_str(address).map_err(|err| {
933 log::error!("Invalid funding wallet public key: {err:?}");
934 Error::FailedToParseKey
935 })?,
936 network,
937 )
938 .await
939 {
940 log::error!("Failed to drain funds from uploaders: {err:?}");
941 }
942 } else {
943 println!("Custom network provided. Not draining funds.");
944 log::info!("Custom network provided. Not draining funds.");
945 }
946
947 do_clean(
948 &self.environment_name,
949 Some(environment_details),
950 self.working_directory_path.clone(),
951 &self.terraform_runner,
952 None,
953 )
954 .await?;
955 self.s3_repository
956 .delete_object("sn-environment-type", &self.environment_name)
957 .await?;
958 Ok(())
959 }
960}
961
962pub fn get_genesis_multiaddr(
967 ansible_runner: &AnsibleRunner,
968 ssh_client: &SshClient,
969) -> Result<(String, IpAddr)> {
970 let genesis_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Genesis, true)?;
971 let genesis_ip = genesis_inventory[0].public_ip_addr;
972
973 let multiaddr = ssh_client
977 .run_command(
978 &genesis_ip,
979 "root",
980 "jq -r '.nodes[] | select(.peers_args.first == true) | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
981 false,
982 )
983 .map(|output| output.first().cloned())
984 .unwrap_or_else(|err| {
985 log::error!("Failed to find first node with quic-v1 protocol: {err:?}");
986 None
987 });
988
989 let multiaddr = match multiaddr {
991 Some(addr) => addr,
992 None => ssh_client
993 .run_command(
994 &genesis_ip,
995 "root",
996 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
997 false,
998 )?
999 .first()
1000 .cloned()
1001 .ok_or_else(|| Error::GenesisListenAddress)?,
1002 };
1003
1004 Ok((multiaddr, genesis_ip))
1005}
1006
1007pub fn get_anvil_node_data(
1008 ansible_runner: &AnsibleRunner,
1009 ssh_client: &SshClient,
1010) -> Result<AnvilNodeData> {
1011 let evm_inventory = ansible_runner.get_inventory(AnsibleInventoryType::EvmNodes, true)?;
1012 if evm_inventory.is_empty() {
1013 return Err(Error::EvmNodeNotFound);
1014 }
1015
1016 let evm_ip = evm_inventory[0].public_ip_addr;
1017 debug!("Retrieved IP address for EVM node: {evm_ip}");
1018 let csv_file_path = "/home/ant/.local/share/autonomi/evm_testnet_data.csv";
1019
1020 const MAX_ATTEMPTS: u8 = 5;
1021 const RETRY_DELAY: Duration = Duration::from_secs(5);
1022
1023 for attempt in 1..=MAX_ATTEMPTS {
1024 match ssh_client.run_command(&evm_ip, "ant", &format!("cat {}", csv_file_path), false) {
1025 Ok(output) => {
1026 if let Some(csv_contents) = output.first() {
1027 let parts: Vec<&str> = csv_contents.split(',').collect();
1028 if parts.len() != 4 {
1029 return Err(Error::EvmTestnetDataParsingError(
1030 "Expected 4 fields in the CSV".to_string(),
1031 ));
1032 }
1033
1034 let evm_testnet_data = AnvilNodeData {
1035 rpc_url: parts[0].trim().to_string(),
1036 payment_token_address: parts[1].trim().to_string(),
1037 data_payments_address: parts[2].trim().to_string(),
1038 deployer_wallet_private_key: parts[3].trim().to_string(),
1039 };
1040 return Ok(evm_testnet_data);
1041 }
1042 }
1043 Err(e) => {
1044 if attempt == MAX_ATTEMPTS {
1045 return Err(e);
1046 }
1047 println!(
1048 "Attempt {} failed to read EVM testnet data. Retrying in {} seconds...",
1049 attempt,
1050 RETRY_DELAY.as_secs()
1051 );
1052 }
1053 }
1054 std::thread::sleep(RETRY_DELAY);
1055 }
1056
1057 Err(Error::EvmTestnetDataNotFound)
1058}
1059
1060pub fn get_multiaddr(
1061 ansible_runner: &AnsibleRunner,
1062 ssh_client: &SshClient,
1063) -> Result<(String, IpAddr)> {
1064 let node_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Nodes, true)?;
1065 let node_ip = node_inventory
1068 .iter()
1069 .find(|vm| vm.name.ends_with("-node-1"))
1070 .ok_or_else(|| Error::NodeAddressNotFound)?
1071 .public_ip_addr;
1072
1073 debug!("Getting multiaddr from node {node_ip}");
1074
1075 let multiaddr =
1076 ssh_client
1077 .run_command(
1078 &node_ip,
1079 "root",
1080 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not)' /var/antctl/node_registry.json | head -n 1",
1082 false,
1083 )?.first()
1084 .cloned()
1085 .ok_or_else(|| Error::NodeAddressNotFound)?;
1086
1087 Ok((multiaddr, node_ip))
1090}
1091
1092pub async fn get_and_extract_archive_from_s3(
1093 s3_repository: &S3Repository,
1094 bucket_name: &str,
1095 archive_bucket_path: &str,
1096 dest_path: &Path,
1097) -> Result<()> {
1098 let archive_file_name = archive_bucket_path.split('/').last().unwrap();
1101 let archive_dest_path = dest_path.join(archive_file_name);
1102 s3_repository
1103 .download_object(bucket_name, archive_bucket_path, &archive_dest_path)
1104 .await?;
1105 extract_archive(&archive_dest_path, dest_path)?;
1106 Ok(())
1107}
1108
1109pub fn extract_archive(archive_path: &Path, dest_path: &Path) -> Result<()> {
1110 let archive_file = File::open(archive_path)?;
1111 let decoder = GzDecoder::new(archive_file);
1112 let mut archive = Archive::new(decoder);
1113 let entries = archive.entries()?;
1114 for entry_result in entries {
1115 let mut entry = entry_result?;
1116 let extract_path = dest_path.join(entry.path()?);
1117 if entry.header().entry_type() == tar::EntryType::Directory {
1118 std::fs::create_dir_all(extract_path)?;
1119 continue;
1120 }
1121 let mut file = BufWriter::new(File::create(extract_path)?);
1122 std::io::copy(&mut entry, &mut file)?;
1123 }
1124 std::fs::remove_file(archive_path)?;
1125 Ok(())
1126}
1127
1128pub fn run_external_command(
1129 binary_path: PathBuf,
1130 working_directory_path: PathBuf,
1131 args: Vec<String>,
1132 suppress_stdout: bool,
1133 suppress_stderr: bool,
1134) -> Result<Vec<String>> {
1135 let mut command = Command::new(binary_path.clone());
1136 for arg in &args {
1137 command.arg(arg);
1138 }
1139 command.stdout(Stdio::piped());
1140 command.stderr(Stdio::piped());
1141 command.current_dir(working_directory_path.clone());
1142 debug!("Running {binary_path:#?} with args {args:#?}");
1143 debug!("Working directory set to {working_directory_path:#?}");
1144
1145 let mut child = command.spawn()?;
1146 let mut output_lines = Vec::new();
1147
1148 if let Some(ref mut stdout) = child.stdout {
1149 let reader = BufReader::new(stdout);
1150 for line in reader.lines() {
1151 let line = line?;
1152 if !suppress_stdout {
1153 println!("{line}");
1154 }
1155 output_lines.push(line);
1156 }
1157 }
1158
1159 if let Some(ref mut stderr) = child.stderr {
1160 let reader = BufReader::new(stderr);
1161 for line in reader.lines() {
1162 let line = line?;
1163 if !suppress_stderr {
1164 eprintln!("{line}");
1165 }
1166 output_lines.push(line);
1167 }
1168 }
1169
1170 let output = child.wait()?;
1171 if !output.success() {
1172 let binary_path = binary_path.to_str().unwrap();
1174 return Err(Error::ExternalCommandRunFailed {
1175 binary: binary_path.to_string(),
1176 exit_status: output,
1177 });
1178 }
1179
1180 Ok(output_lines)
1181}
1182
1183pub fn is_binary_on_path(binary_name: &str) -> bool {
1184 if let Ok(path) = std::env::var("PATH") {
1185 for dir in path.split(':') {
1186 let mut full_path = PathBuf::from(dir);
1187 full_path.push(binary_name);
1188 if full_path.exists() {
1189 return true;
1190 }
1191 }
1192 }
1193 false
1194}
1195
1196pub async fn do_clean(
1197 name: &str,
1198 environment_details: Option<EnvironmentDetails>,
1199 working_directory_path: PathBuf,
1200 terraform_runner: &TerraformRunner,
1201 inventory_types: Option<Vec<AnsibleInventoryType>>,
1202) -> Result<()> {
1203 terraform_runner.init()?;
1204 let workspaces = terraform_runner.workspace_list()?;
1205 if !workspaces.contains(&name.to_string()) {
1206 return Err(Error::EnvironmentDoesNotExist(name.to_string()));
1207 }
1208 terraform_runner.workspace_select(name)?;
1209 println!("Selected {name} workspace");
1210
1211 let environment_details = environment_details.ok_or(Error::EnvironmentDetailsNotFound(
1212 "Should be provided during do_clean".to_string(),
1213 ))?;
1214
1215 let options =
1216 InfraRunOptions::generate_existing(name, terraform_runner, &environment_details).await?;
1217 let mut args = Vec::new();
1218 if let Some(full_cone_private_node_volume_size) = options.full_cone_private_node_volume_size {
1219 args.push((
1220 "full_cone_private_node_volume_size".to_string(),
1221 full_cone_private_node_volume_size.to_string(),
1222 ));
1223 }
1224 if let Some(genesis_node_volume_size) = options.genesis_node_volume_size {
1225 args.push((
1226 "genesis_node_volume_size".to_string(),
1227 genesis_node_volume_size.to_string(),
1228 ));
1229 }
1230 if let Some(node_volume_size) = options.node_volume_size {
1231 args.push(("node_volume_size".to_string(), node_volume_size.to_string()));
1232 }
1233 if let Some(peer_cache_node_volume_size) = options.peer_cache_node_volume_size {
1234 args.push((
1235 "peer_cache_node_volume_size".to_string(),
1236 peer_cache_node_volume_size.to_string(),
1237 ));
1238 }
1239 if let Some(symmetric_private_node_volume_size) = options.symmetric_private_node_volume_size {
1240 args.push((
1241 "symmetric_private_node_volume_size".to_string(),
1242 symmetric_private_node_volume_size.to_string(),
1243 ));
1244 }
1245
1246 terraform_runner.destroy(
1247 Some(args),
1248 Some(
1249 environment_details
1250 .environment_type
1251 .get_tfvars_filename(name),
1252 ),
1253 )?;
1254
1255 terraform_runner.workspace_select("dev")?;
1259 terraform_runner.workspace_delete(name)?;
1260 println!("Deleted {name} workspace");
1261
1262 cleanup_environment_inventory(
1263 name,
1264 &working_directory_path.join("ansible").join("inventory"),
1265 inventory_types,
1266 )?;
1267
1268 println!("Deleted Ansible inventory for {name}");
1269 Ok(())
1270}
1271
1272pub fn get_wallet_directory() -> Result<PathBuf> {
1273 Ok(dirs_next::data_dir()
1274 .ok_or_else(|| Error::CouldNotRetrieveDataDirectory)?
1275 .join("safe")
1276 .join("client")
1277 .join("wallet"))
1278}
1279
1280pub async fn notify_slack(inventory: DeploymentInventory) -> Result<()> {
1281 let webhook_url =
1282 std::env::var("SLACK_WEBHOOK_URL").map_err(|_| Error::SlackWebhookUrlNotSupplied)?;
1283
1284 let mut message = String::new();
1285 message.push_str("*Testnet Details*\n");
1286 message.push_str(&format!("Name: {}\n", inventory.name));
1287 message.push_str(&format!("Node count: {}\n", inventory.peers().len()));
1288 message.push_str(&format!("Faucet address: {:?}\n", inventory.faucet_address));
1289 match inventory.binary_option {
1290 BinaryOption::BuildFromSource {
1291 ref repo_owner,
1292 ref branch,
1293 ..
1294 } => {
1295 message.push_str("*Branch Details*\n");
1296 message.push_str(&format!("Repo owner: {}\n", repo_owner));
1297 message.push_str(&format!("Branch: {}\n", branch));
1298 }
1299 BinaryOption::Versioned {
1300 ant_version: ref safe_version,
1301 antnode_version: ref safenode_version,
1302 antctl_version: ref safenode_manager_version,
1303 ..
1304 } => {
1305 message.push_str("*Version Details*\n");
1306 message.push_str(&format!(
1307 "ant version: {}\n",
1308 safe_version
1309 .as_ref()
1310 .map_or("None".to_string(), |v| v.to_string())
1311 ));
1312 message.push_str(&format!("safenode version: {}\n", safenode_version));
1313 message.push_str(&format!("antctl version: {}\n", safenode_manager_version));
1314 }
1315 }
1316
1317 message.push_str("*Sample Peers*\n");
1318 message.push_str("```\n");
1319 for peer in inventory.peers().iter().take(20) {
1320 message.push_str(&format!("{peer}\n"));
1321 }
1322 message.push_str("```\n");
1323 message.push_str("*Available Files*\n");
1324 message.push_str("```\n");
1325 for (addr, file_name) in inventory.uploaded_files.iter() {
1326 message.push_str(&format!("{}: {}\n", addr, file_name))
1327 }
1328 message.push_str("```\n");
1329
1330 let payload = json!({
1331 "text": message,
1332 });
1333 reqwest::Client::new()
1334 .post(webhook_url)
1335 .json(&payload)
1336 .send()
1337 .await?;
1338 println!("{message}");
1339 println!("Posted notification to Slack");
1340 Ok(())
1341}
1342
1343fn print_duration(duration: Duration) {
1344 let total_seconds = duration.as_secs();
1345 let minutes = total_seconds / 60;
1346 let seconds = total_seconds % 60;
1347 debug!("Time taken: {} minutes and {} seconds", minutes, seconds);
1348}
1349
1350pub fn get_progress_bar(length: u64) -> Result<ProgressBar> {
1351 let progress_bar = ProgressBar::new(length);
1352 progress_bar.set_style(
1353 ProgressStyle::default_bar()
1354 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len}")?
1355 .progress_chars("#>-"),
1356 );
1357 progress_bar.enable_steady_tick(Duration::from_millis(100));
1358 Ok(progress_bar)
1359}
1360pub async fn get_environment_details(
1361 environment_name: &str,
1362 s3_repository: &S3Repository,
1363) -> Result<EnvironmentDetails> {
1364 let temp_file = tempfile::NamedTempFile::new()?;
1365
1366 let max_retries = 3;
1367 let mut retries = 0;
1368 let env_details = loop {
1369 debug!("Downloading the environment details file for {environment_name} from S3");
1370 match s3_repository
1371 .download_object("sn-environment-type", environment_name, temp_file.path())
1372 .await
1373 {
1374 Ok(_) => {
1375 debug!("Downloaded the environment details file for {environment_name} from S3");
1376 let content = match std::fs::read_to_string(temp_file.path()) {
1377 Ok(content) => content,
1378 Err(err) => {
1379 log::error!("Could not read the environment details file: {err:?}");
1380 if retries < max_retries {
1381 debug!("Retrying to read the environment details file");
1382 retries += 1;
1383 continue;
1384 } else {
1385 return Err(Error::EnvironmentDetailsNotFound(
1386 environment_name.to_string(),
1387 ));
1388 }
1389 }
1390 };
1391 trace!("Content of the environment details file: {}", content);
1392
1393 match serde_json::from_str(&content) {
1394 Ok(environment_details) => break environment_details,
1395 Err(err) => {
1396 log::error!("Could not parse the environment details file: {err:?}");
1397 if retries < max_retries {
1398 debug!("Retrying to parse the environment details file");
1399 retries += 1;
1400 continue;
1401 } else {
1402 return Err(Error::EnvironmentDetailsNotFound(
1403 environment_name.to_string(),
1404 ));
1405 }
1406 }
1407 }
1408 }
1409 Err(err) => {
1410 log::error!(
1411 "Could not download the environment details file for {environment_name} from S3: {err:?}"
1412 );
1413 if retries < max_retries {
1414 retries += 1;
1415 continue;
1416 } else {
1417 return Err(Error::EnvironmentDetailsNotFound(
1418 environment_name.to_string(),
1419 ));
1420 }
1421 }
1422 }
1423 };
1424
1425 debug!("Fetched environment details: {env_details:?}");
1426
1427 Ok(env_details)
1428}
1429
1430pub async fn write_environment_details(
1431 s3_repository: &S3Repository,
1432 environment_name: &str,
1433 environment_details: &EnvironmentDetails,
1434) -> Result<()> {
1435 let temp_dir = tempfile::tempdir()?;
1436 let path = temp_dir.path().to_path_buf().join(environment_name);
1437 let mut file = File::create(&path)?;
1438 let json = serde_json::to_string(environment_details)?;
1439 file.write_all(json.as_bytes())?;
1440 s3_repository
1441 .upload_file("sn-environment-type", &path, true)
1442 .await?;
1443 Ok(())
1444}
1445
1446pub fn calculate_size_per_attached_volume(node_count: u16) -> u16 {
1447 if node_count == 0 {
1448 return 0;
1449 }
1450 let total_volume_required = node_count * STORAGE_REQUIRED_PER_NODE;
1451
1452 (total_volume_required as f64 / 7.0).ceil() as u16
1454}
1455
1456pub fn get_bootstrap_cache_url(ip_addr: &IpAddr) -> String {
1457 format!("http://{ip_addr}/bootstrap_cache.json")
1458}