sn_testnet_deploy/
lib.rs

1// Copyright (c) 2023, MaidSafe.
2// All rights reserved.
3//
4// This SAFE Network Software is licensed under the BSD-3-Clause license.
5// Please see the LICENSE file for more details.
6
7pub mod ansible;
8pub mod bootstrap;
9pub mod clients;
10pub mod deploy;
11pub mod digital_ocean;
12pub mod error;
13pub mod funding;
14pub mod infra;
15pub mod inventory;
16pub mod logs;
17pub mod reserved_ip;
18pub mod rpc_client;
19pub mod s3;
20pub mod safe;
21pub mod setup;
22pub mod ssh;
23pub mod terraform;
24pub mod upscale;
25
26const STORAGE_REQUIRED_PER_NODE: u16 = 7;
27
28use crate::{
29    ansible::{
30        extra_vars::ExtraVarsDocBuilder,
31        inventory::{cleanup_environment_inventory, AnsibleInventoryType},
32        provisioning::AnsibleProvisioner,
33        AnsibleRunner,
34    },
35    error::{Error, Result},
36    inventory::{DeploymentInventory, VirtualMachine},
37    rpc_client::RpcClient,
38    s3::S3Repository,
39    ssh::SshClient,
40    terraform::TerraformRunner,
41};
42use ant_service_management::ServiceStatus;
43use flate2::read::GzDecoder;
44use indicatif::{ProgressBar, ProgressStyle};
45use infra::{build_terraform_args, InfraRunOptions};
46use log::{debug, trace};
47use semver::Version;
48use serde::{Deserialize, Serialize};
49use serde_json::json;
50use std::{
51    fs::File,
52    io::{BufRead, BufReader, BufWriter, Write},
53    net::IpAddr,
54    path::{Path, PathBuf},
55    process::{Command, Stdio},
56    str::FromStr,
57    time::Duration,
58};
59use tar::Archive;
60
61const ANSIBLE_DEFAULT_FORKS: usize = 50;
62
63#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
64pub enum DeploymentType {
65    /// The deployment has been bootstrapped from an existing network.
66    Bootstrap,
67    /// Client deployment.
68    Client,
69    /// The deployment is a new network.
70    #[default]
71    New,
72}
73
74#[derive(Debug, Clone, Default, Serialize, Deserialize)]
75pub struct AnvilNodeData {
76    pub data_payments_address: String,
77    pub deployer_wallet_private_key: String,
78    pub payment_token_address: String,
79    pub rpc_url: String,
80}
81
82impl std::fmt::Display for DeploymentType {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        match self {
85            DeploymentType::Bootstrap => write!(f, "bootstrap"),
86            DeploymentType::Client => write!(f, "clients"),
87            DeploymentType::New => write!(f, "new"),
88        }
89    }
90}
91
92impl std::str::FromStr for DeploymentType {
93    type Err = String;
94
95    fn from_str(s: &str) -> Result<Self, Self::Err> {
96        match s.to_lowercase().as_str() {
97            "bootstrap" => Ok(DeploymentType::Bootstrap),
98            "clients" => Ok(DeploymentType::Client),
99            "new" => Ok(DeploymentType::New),
100            _ => Err(format!("Invalid deployment type: {}", s)),
101        }
102    }
103}
104
105#[derive(Debug, Clone)]
106pub enum NodeType {
107    FullConePrivateNode,
108    Generic,
109    Genesis,
110    PeerCache,
111    SymmetricPrivateNode,
112}
113
114impl std::fmt::Display for NodeType {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        match self {
117            NodeType::FullConePrivateNode => write!(f, "full-cone-private"),
118            NodeType::Generic => write!(f, "generic"),
119            NodeType::Genesis => write!(f, "genesis"),
120            NodeType::PeerCache => write!(f, "peer-cache"),
121            NodeType::SymmetricPrivateNode => write!(f, "symmetric-private"),
122        }
123    }
124}
125
126impl std::str::FromStr for NodeType {
127    type Err = String;
128
129    fn from_str(s: &str) -> Result<Self, Self::Err> {
130        match s.to_lowercase().as_str() {
131            "full-cone-private" => Ok(NodeType::FullConePrivateNode),
132            "generic" => Ok(NodeType::Generic),
133            "genesis" => Ok(NodeType::Genesis),
134            "peer-cache" => Ok(NodeType::PeerCache),
135            "symmetric-private" => Ok(NodeType::SymmetricPrivateNode),
136            _ => Err(format!("Invalid node type: {}", s)),
137        }
138    }
139}
140
141impl NodeType {
142    pub fn telegraf_role(&self) -> &'static str {
143        match self {
144            NodeType::FullConePrivateNode => "NAT_FULL_CONE_NODE",
145            NodeType::Generic => "GENERIC_NODE",
146            NodeType::Genesis => "GENESIS_NODE",
147            NodeType::PeerCache => "PEER_CACHE_NODE",
148            NodeType::SymmetricPrivateNode => "NAT_RANDOMIZED_NODE",
149        }
150    }
151
152    pub fn to_ansible_inventory_type(&self) -> AnsibleInventoryType {
153        match self {
154            NodeType::FullConePrivateNode => AnsibleInventoryType::FullConePrivateNodes,
155            NodeType::Generic => AnsibleInventoryType::Nodes,
156            NodeType::Genesis => AnsibleInventoryType::Genesis,
157            NodeType::PeerCache => AnsibleInventoryType::PeerCacheNodes,
158            NodeType::SymmetricPrivateNode => AnsibleInventoryType::SymmetricPrivateNodes,
159        }
160    }
161}
162
163#[derive(Clone, Debug, Default, Eq, Serialize, Deserialize, PartialEq)]
164pub enum EvmNetwork {
165    #[default]
166    Anvil,
167    ArbitrumOne,
168    ArbitrumSepolia,
169    Custom,
170}
171
172impl std::fmt::Display for EvmNetwork {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        match self {
175            EvmNetwork::Anvil => write!(f, "evm-custom"),
176            EvmNetwork::ArbitrumOne => write!(f, "evm-arbitrum-one"),
177            EvmNetwork::ArbitrumSepolia => write!(f, "evm-arbitrum-sepolia"),
178            EvmNetwork::Custom => write!(f, "evm-custom"),
179        }
180    }
181}
182
183impl std::str::FromStr for EvmNetwork {
184    type Err = String;
185
186    fn from_str(s: &str) -> Result<Self, Self::Err> {
187        match s.to_lowercase().as_str() {
188            "anvil" => Ok(EvmNetwork::Anvil),
189            "arbitrum-one" => Ok(EvmNetwork::ArbitrumOne),
190            "arbitrum-sepolia" => Ok(EvmNetwork::ArbitrumSepolia),
191            "custom" => Ok(EvmNetwork::Custom),
192            _ => Err(format!("Invalid EVM network type: {}", s)),
193        }
194    }
195}
196
197#[derive(Clone, Debug, Default, Serialize, Deserialize)]
198pub struct EvmDetails {
199    pub network: EvmNetwork,
200    pub data_payments_address: Option<String>,
201    pub payment_token_address: Option<String>,
202    pub rpc_url: Option<String>,
203}
204
205#[derive(Clone, Debug, Default, Serialize, Deserialize)]
206pub struct EnvironmentDetails {
207    pub deployment_type: DeploymentType,
208    pub environment_type: EnvironmentType,
209    pub evm_details: EvmDetails,
210    pub funding_wallet_address: Option<String>,
211    pub network_id: Option<u8>,
212    pub rewards_address: Option<String>,
213}
214
215#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
216pub enum EnvironmentType {
217    #[default]
218    Development,
219    Production,
220    Staging,
221}
222
223impl EnvironmentType {
224    pub fn get_tfvars_filename(&self, name: &str) -> String {
225        match self {
226            EnvironmentType::Development => "dev.tfvars".to_string(),
227            EnvironmentType::Staging => "staging.tfvars".to_string(),
228            EnvironmentType::Production => {
229                format!("{name}.tfvars",)
230            }
231        }
232    }
233
234    pub fn get_default_peer_cache_node_count(&self) -> u16 {
235        match self {
236            EnvironmentType::Development => 5,
237            EnvironmentType::Production => 5,
238            EnvironmentType::Staging => 5,
239        }
240    }
241
242    pub fn get_default_node_count(&self) -> u16 {
243        match self {
244            EnvironmentType::Development => 25,
245            EnvironmentType::Production => 25,
246            EnvironmentType::Staging => 25,
247        }
248    }
249
250    pub fn get_default_symmetric_private_node_count(&self) -> u16 {
251        self.get_default_node_count()
252    }
253
254    pub fn get_default_full_cone_private_node_count(&self) -> u16 {
255        self.get_default_node_count()
256    }
257}
258
259impl std::fmt::Display for EnvironmentType {
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        match self {
262            EnvironmentType::Development => write!(f, "development"),
263            EnvironmentType::Production => write!(f, "production"),
264            EnvironmentType::Staging => write!(f, "staging"),
265        }
266    }
267}
268
269impl FromStr for EnvironmentType {
270    type Err = Error;
271
272    fn from_str(s: &str) -> Result<Self, Self::Err> {
273        match s.to_lowercase().as_str() {
274            "development" => Ok(EnvironmentType::Development),
275            "production" => Ok(EnvironmentType::Production),
276            "staging" => Ok(EnvironmentType::Staging),
277            _ => Err(Error::EnvironmentNameFromStringError(s.to_string())),
278        }
279    }
280}
281
282/// Specify the binary option for the deployment.
283///
284/// There are several binaries involved in the deployment:
285/// * safenode
286/// * safenode_rpc_client
287/// * faucet
288/// * safe
289///
290/// The `safe` binary is only used for smoke testing the deployment, although we don't really do
291/// that at the moment.
292///
293/// The options are to build from source, or supply a pre-built, versioned binary, which will be
294/// fetched from S3. Building from source adds significant time to the deployment.
295#[derive(Clone, Debug, Serialize, Deserialize)]
296pub enum BinaryOption {
297    /// Binaries will be built from source.
298    BuildFromSource {
299        /// A comma-separated list that will be passed to the `--features` argument.
300        antnode_features: Option<String>,
301        branch: String,
302        repo_owner: String,
303    },
304    /// Pre-built, versioned binaries will be fetched from S3.
305    Versioned {
306        ant_version: Option<Version>,
307        antctl_version: Option<Version>,
308        antnode_version: Option<Version>,
309    },
310}
311
312impl BinaryOption {
313    pub fn print(&self) {
314        match self {
315            BinaryOption::BuildFromSource {
316                antnode_features,
317                branch,
318                repo_owner,
319            } => {
320                println!("Source configuration:");
321                println!("  Repository owner: {}", repo_owner);
322                println!("  Branch: {}", branch);
323                if let Some(features) = antnode_features {
324                    println!("  Antnode features: {}", features);
325                }
326            }
327            BinaryOption::Versioned {
328                ant_version,
329                antctl_version,
330                antnode_version,
331            } => {
332                println!("Versioned binaries configuration:");
333                if let Some(version) = ant_version {
334                    println!("  ant version: {}", version);
335                }
336                if let Some(version) = antctl_version {
337                    println!("  antctl version: {}", version);
338                }
339                if let Some(version) = antnode_version {
340                    println!("  antnode version: {}", version);
341                }
342            }
343        }
344    }
345}
346
347#[derive(Debug, Clone, Copy)]
348pub enum CloudProvider {
349    Aws,
350    DigitalOcean,
351}
352
353impl std::fmt::Display for CloudProvider {
354    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
355        match self {
356            CloudProvider::Aws => write!(f, "aws"),
357            CloudProvider::DigitalOcean => write!(f, "digital-ocean"),
358        }
359    }
360}
361
362impl CloudProvider {
363    pub fn get_ssh_user(&self) -> String {
364        match self {
365            CloudProvider::Aws => "ubuntu".to_string(),
366            CloudProvider::DigitalOcean => "root".to_string(),
367        }
368    }
369}
370
371#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
372pub enum LogFormat {
373    Default,
374    Json,
375}
376
377impl LogFormat {
378    pub fn parse_from_str(val: &str) -> Result<Self> {
379        match val {
380            "default" => Ok(LogFormat::Default),
381            "json" => Ok(LogFormat::Json),
382            _ => Err(Error::LoggingConfiguration(
383                "The only valid values for this argument are \"default\" or \"json\"".to_string(),
384            )),
385        }
386    }
387
388    pub fn as_str(&self) -> &'static str {
389        match self {
390            LogFormat::Default => "default",
391            LogFormat::Json => "json",
392        }
393    }
394}
395
396#[derive(Clone)]
397pub struct UpgradeOptions {
398    pub ansible_verbose: bool,
399    pub custom_inventory: Option<Vec<VirtualMachine>>,
400    pub env_variables: Option<Vec<(String, String)>>,
401    pub force: bool,
402    pub forks: usize,
403    pub interval: Duration,
404    pub name: String,
405    pub node_type: Option<NodeType>,
406    pub pre_upgrade_delay: Option<u64>,
407    pub provider: CloudProvider,
408    pub version: Option<String>,
409}
410
411impl UpgradeOptions {
412    pub fn get_ansible_vars(&self) -> String {
413        let mut extra_vars = ExtraVarsDocBuilder::default();
414        extra_vars.add_variable("interval", &self.interval.as_millis().to_string());
415        if let Some(env_variables) = &self.env_variables {
416            extra_vars.add_env_variable_list("env_variables", env_variables.clone());
417        }
418        if self.force {
419            extra_vars.add_variable("force", &self.force.to_string());
420        }
421        if let Some(version) = &self.version {
422            extra_vars.add_variable("antnode_version", version);
423        }
424        if let Some(pre_upgrade_delay) = &self.pre_upgrade_delay {
425            extra_vars.add_variable("pre_upgrade_delay", &pre_upgrade_delay.to_string());
426        }
427        extra_vars.build()
428    }
429}
430
431#[derive(Default)]
432pub struct TestnetDeployBuilder {
433    ansible_forks: Option<usize>,
434    ansible_verbose_mode: bool,
435    deployment_type: EnvironmentType,
436    environment_name: String,
437    provider: Option<CloudProvider>,
438    ssh_secret_key_path: Option<PathBuf>,
439    state_bucket_name: Option<String>,
440    terraform_binary_path: Option<PathBuf>,
441    vault_password_path: Option<PathBuf>,
442    working_directory_path: Option<PathBuf>,
443}
444
445impl TestnetDeployBuilder {
446    pub fn new() -> Self {
447        Default::default()
448    }
449
450    pub fn ansible_verbose_mode(&mut self, ansible_verbose_mode: bool) -> &mut Self {
451        self.ansible_verbose_mode = ansible_verbose_mode;
452        self
453    }
454
455    pub fn ansible_forks(&mut self, ansible_forks: usize) -> &mut Self {
456        self.ansible_forks = Some(ansible_forks);
457        self
458    }
459
460    pub fn deployment_type(&mut self, deployment_type: EnvironmentType) -> &mut Self {
461        self.deployment_type = deployment_type;
462        self
463    }
464
465    pub fn environment_name(&mut self, name: &str) -> &mut Self {
466        self.environment_name = name.to_string();
467        self
468    }
469
470    pub fn provider(&mut self, provider: CloudProvider) -> &mut Self {
471        self.provider = Some(provider);
472        self
473    }
474
475    pub fn state_bucket_name(&mut self, state_bucket_name: String) -> &mut Self {
476        self.state_bucket_name = Some(state_bucket_name);
477        self
478    }
479
480    pub fn terraform_binary_path(&mut self, terraform_binary_path: PathBuf) -> &mut Self {
481        self.terraform_binary_path = Some(terraform_binary_path);
482        self
483    }
484
485    pub fn working_directory(&mut self, working_directory_path: PathBuf) -> &mut Self {
486        self.working_directory_path = Some(working_directory_path);
487        self
488    }
489
490    pub fn ssh_secret_key_path(&mut self, ssh_secret_key_path: PathBuf) -> &mut Self {
491        self.ssh_secret_key_path = Some(ssh_secret_key_path);
492        self
493    }
494
495    pub fn vault_password_path(&mut self, vault_password_path: PathBuf) -> &mut Self {
496        self.vault_password_path = Some(vault_password_path);
497        self
498    }
499
500    pub fn build(&self) -> Result<TestnetDeployer> {
501        let provider = self.provider.unwrap_or(CloudProvider::DigitalOcean);
502        match provider {
503            CloudProvider::DigitalOcean => {
504                let digital_ocean_pat = std::env::var("DO_PAT").map_err(|_| {
505                    Error::CloudProviderCredentialsNotSupplied("DO_PAT".to_string())
506                })?;
507                // The DO_PAT variable is not actually read by either Terraform or Ansible.
508                // Each tool uses a different variable, so instead we set each of those variables
509                // to the value of DO_PAT. This means the user only needs to set one variable.
510                std::env::set_var("DIGITALOCEAN_TOKEN", digital_ocean_pat.clone());
511                std::env::set_var("DO_API_TOKEN", digital_ocean_pat);
512            }
513            _ => {
514                return Err(Error::CloudProviderNotSupported(provider.to_string()));
515            }
516        }
517
518        let state_bucket_name = match self.state_bucket_name {
519            Some(ref bucket_name) => bucket_name.clone(),
520            None => std::env::var("TERRAFORM_STATE_BUCKET_NAME")?,
521        };
522
523        let default_terraform_bin_path = PathBuf::from("terraform");
524        let terraform_binary_path = self
525            .terraform_binary_path
526            .as_ref()
527            .unwrap_or(&default_terraform_bin_path);
528
529        let working_directory_path = match self.working_directory_path {
530            Some(ref work_dir_path) => work_dir_path.clone(),
531            None => std::env::current_dir()?.join("resources"),
532        };
533
534        let ssh_secret_key_path = match self.ssh_secret_key_path {
535            Some(ref ssh_sk_path) => ssh_sk_path.clone(),
536            None => PathBuf::from(std::env::var("SSH_KEY_PATH")?),
537        };
538
539        let vault_password_path = match self.vault_password_path {
540            Some(ref vault_pw_path) => vault_pw_path.clone(),
541            None => PathBuf::from(std::env::var("ANSIBLE_VAULT_PASSWORD_PATH")?),
542        };
543
544        let terraform_runner = TerraformRunner::new(
545            terraform_binary_path.to_path_buf(),
546            working_directory_path
547                .join("terraform")
548                .join("testnet")
549                .join(provider.to_string()),
550            provider,
551            &state_bucket_name,
552        )?;
553        let ansible_runner = AnsibleRunner::new(
554            self.ansible_forks.unwrap_or(ANSIBLE_DEFAULT_FORKS),
555            self.ansible_verbose_mode,
556            &self.environment_name,
557            provider,
558            ssh_secret_key_path.clone(),
559            vault_password_path,
560            working_directory_path.join("ansible"),
561        )?;
562        let ssh_client = SshClient::new(ssh_secret_key_path);
563        let ansible_provisioner =
564            AnsibleProvisioner::new(ansible_runner, provider, ssh_client.clone());
565        let rpc_client = RpcClient::new(
566            PathBuf::from("/usr/local/bin/safenode_rpc_client"),
567            working_directory_path.clone(),
568        );
569
570        // Remove any `safe` binary from a previous deployment. Otherwise you can end up with
571        // mismatched binaries.
572        let safe_path = working_directory_path.join("safe");
573        if safe_path.exists() {
574            std::fs::remove_file(safe_path)?;
575        }
576
577        let testnet = TestnetDeployer::new(
578            ansible_provisioner,
579            provider,
580            self.deployment_type.clone(),
581            &self.environment_name,
582            rpc_client,
583            S3Repository {},
584            ssh_client,
585            terraform_runner,
586            working_directory_path,
587        )?;
588
589        Ok(testnet)
590    }
591}
592
593#[derive(Clone)]
594pub struct TestnetDeployer {
595    pub ansible_provisioner: AnsibleProvisioner,
596    pub cloud_provider: CloudProvider,
597    pub deployment_type: EnvironmentType,
598    pub environment_name: String,
599    pub inventory_file_path: PathBuf,
600    pub rpc_client: RpcClient,
601    pub s3_repository: S3Repository,
602    pub ssh_client: SshClient,
603    pub terraform_runner: TerraformRunner,
604    pub working_directory_path: PathBuf,
605}
606
607impl TestnetDeployer {
608    #[allow(clippy::too_many_arguments)]
609    pub fn new(
610        ansible_provisioner: AnsibleProvisioner,
611        cloud_provider: CloudProvider,
612        deployment_type: EnvironmentType,
613        environment_name: &str,
614        rpc_client: RpcClient,
615        s3_repository: S3Repository,
616        ssh_client: SshClient,
617        terraform_runner: TerraformRunner,
618        working_directory_path: PathBuf,
619    ) -> Result<TestnetDeployer> {
620        if environment_name.is_empty() {
621            return Err(Error::EnvironmentNameRequired);
622        }
623        let inventory_file_path = working_directory_path
624            .join("ansible")
625            .join("inventory")
626            .join("dev_inventory_digital_ocean.yml");
627        Ok(TestnetDeployer {
628            ansible_provisioner,
629            cloud_provider,
630            deployment_type,
631            environment_name: environment_name.to_string(),
632            inventory_file_path,
633            rpc_client,
634            ssh_client,
635            s3_repository,
636            terraform_runner,
637            working_directory_path,
638        })
639    }
640
641    pub async fn init(&self) -> Result<()> {
642        if self
643            .s3_repository
644            .folder_exists(
645                "sn-testnet",
646                &format!("testnet-logs/{}", self.environment_name),
647            )
648            .await?
649        {
650            return Err(Error::LogsForPreviousTestnetExist(
651                self.environment_name.clone(),
652            ));
653        }
654
655        self.terraform_runner.init()?;
656        let workspaces = self.terraform_runner.workspace_list()?;
657        if !workspaces.contains(&self.environment_name) {
658            self.terraform_runner
659                .workspace_new(&self.environment_name)?;
660        } else {
661            println!("Workspace {} already exists", self.environment_name);
662        }
663
664        let rpc_client_path = self.working_directory_path.join("safenode_rpc_client");
665        if !rpc_client_path.is_file() {
666            println!("Downloading the rpc client for safenode...");
667            let archive_name = "safenode_rpc_client-latest-x86_64-unknown-linux-musl.tar.gz";
668            get_and_extract_archive_from_s3(
669                &self.s3_repository,
670                "sn-node-rpc-client",
671                archive_name,
672                &self.working_directory_path,
673            )
674            .await?;
675            #[cfg(unix)]
676            {
677                use std::os::unix::fs::PermissionsExt;
678                let mut permissions = std::fs::metadata(&rpc_client_path)?.permissions();
679                permissions.set_mode(0o755); // rwxr-xr-x
680                std::fs::set_permissions(&rpc_client_path, permissions)?;
681            }
682        }
683
684        Ok(())
685    }
686
687    pub fn plan(&self, options: &InfraRunOptions) -> Result<()> {
688        println!("Selecting {} workspace...", options.name);
689        self.terraform_runner.workspace_select(&options.name)?;
690
691        let args = build_terraform_args(options)?;
692
693        self.terraform_runner
694            .plan(Some(args), options.tfvars_filename.clone())?;
695        Ok(())
696    }
697
698    pub fn start(
699        &self,
700        interval: Duration,
701        node_type: Option<NodeType>,
702        custom_inventory: Option<Vec<VirtualMachine>>,
703    ) -> Result<()> {
704        self.ansible_provisioner.start_nodes(
705            &self.environment_name,
706            interval,
707            node_type,
708            custom_inventory,
709        )?;
710        Ok(())
711    }
712
713    /// Get the status of all nodes in a network.
714    ///
715    /// First, a playbook runs `safenode-manager status` against all the machines, to get the
716    /// current state of all the nodes. Then all the node registry files are retrieved and
717    /// deserialized to a `NodeRegistry`, allowing us to output the status of each node on each VM.
718    pub fn status(&self) -> Result<()> {
719        self.ansible_provisioner.status()?;
720
721        let peer_cache_node_registries = self
722            .ansible_provisioner
723            .get_node_registries(&AnsibleInventoryType::PeerCacheNodes)?;
724        let generic_node_registries = self
725            .ansible_provisioner
726            .get_node_registries(&AnsibleInventoryType::Nodes)?;
727        let symmetric_private_node_registries = self
728            .ansible_provisioner
729            .get_node_registries(&AnsibleInventoryType::SymmetricPrivateNodes)?;
730        let full_cone_private_node_registries = self
731            .ansible_provisioner
732            .get_node_registries(&AnsibleInventoryType::FullConePrivateNodes)?;
733        let genesis_node_registry = self
734            .ansible_provisioner
735            .get_node_registries(&AnsibleInventoryType::Genesis)?
736            .clone();
737
738        peer_cache_node_registries.print();
739        generic_node_registries.print();
740        symmetric_private_node_registries.print();
741        full_cone_private_node_registries.print();
742        genesis_node_registry.print();
743
744        let all_registries = [
745            &peer_cache_node_registries,
746            &generic_node_registries,
747            &symmetric_private_node_registries,
748            &full_cone_private_node_registries,
749            &genesis_node_registry,
750        ];
751
752        let mut total_nodes = 0;
753        let mut running_nodes = 0;
754        let mut stopped_nodes = 0;
755        let mut added_nodes = 0;
756        let mut removed_nodes = 0;
757
758        for (_, registry) in all_registries
759            .iter()
760            .flat_map(|r| r.retrieved_registries.iter())
761        {
762            for node in registry.nodes.iter() {
763                total_nodes += 1;
764                match node.status {
765                    ServiceStatus::Running => running_nodes += 1,
766                    ServiceStatus::Stopped => stopped_nodes += 1,
767                    ServiceStatus::Added => added_nodes += 1,
768                    ServiceStatus::Removed => removed_nodes += 1,
769                }
770            }
771        }
772
773        let peer_cache_hosts = peer_cache_node_registries.retrieved_registries.len();
774        let generic_hosts = generic_node_registries.retrieved_registries.len();
775        let symmetric_private_hosts = symmetric_private_node_registries.retrieved_registries.len();
776        let full_cone_private_hosts = full_cone_private_node_registries.retrieved_registries.len();
777
778        let peer_cache_nodes = peer_cache_node_registries
779            .retrieved_registries
780            .iter()
781            .flat_map(|(_, n)| n.nodes.iter())
782            .count();
783        let generic_nodes = generic_node_registries
784            .retrieved_registries
785            .iter()
786            .flat_map(|(_, n)| n.nodes.iter())
787            .count();
788        let symmetric_private_nodes = symmetric_private_node_registries
789            .retrieved_registries
790            .iter()
791            .flat_map(|(_, n)| n.nodes.iter())
792            .count();
793        let full_cone_private_nodes = full_cone_private_node_registries
794            .retrieved_registries
795            .iter()
796            .flat_map(|(_, n)| n.nodes.iter())
797            .count();
798
799        println!("-------");
800        println!("Summary");
801        println!("-------");
802        println!(
803            "Total peer cache nodes ({}x{}): {}",
804            peer_cache_hosts,
805            if peer_cache_hosts > 0 {
806                peer_cache_nodes / peer_cache_hosts
807            } else {
808                0
809            },
810            peer_cache_nodes
811        );
812        println!(
813            "Total generic nodes ({}x{}): {}",
814            generic_hosts,
815            if generic_hosts > 0 {
816                generic_nodes / generic_hosts
817            } else {
818                0
819            },
820            generic_nodes
821        );
822        println!(
823            "Total symmetric private nodes ({}x{}): {}",
824            symmetric_private_hosts,
825            if symmetric_private_hosts > 0 {
826                symmetric_private_nodes / symmetric_private_hosts
827            } else {
828                0
829            },
830            symmetric_private_nodes
831        );
832        println!(
833            "Total full cone private nodes ({}x{}): {}",
834            full_cone_private_hosts,
835            if full_cone_private_hosts > 0 {
836                full_cone_private_nodes / full_cone_private_hosts
837            } else {
838                0
839            },
840            full_cone_private_nodes
841        );
842        println!("Total nodes: {}", total_nodes);
843        println!("Running nodes: {}", running_nodes);
844        println!("Stopped nodes: {}", stopped_nodes);
845        println!("Added nodes: {}", added_nodes);
846        println!("Removed nodes: {}", removed_nodes);
847
848        Ok(())
849    }
850
851    pub fn cleanup_node_logs(&self, setup_cron: bool) -> Result<()> {
852        self.ansible_provisioner.cleanup_node_logs(setup_cron)?;
853        Ok(())
854    }
855
856    pub fn start_telegraf(
857        &self,
858        node_type: Option<NodeType>,
859        custom_inventory: Option<Vec<VirtualMachine>>,
860    ) -> Result<()> {
861        self.ansible_provisioner.start_telegraf(
862            &self.environment_name,
863            node_type,
864            custom_inventory,
865        )?;
866        Ok(())
867    }
868
869    pub fn stop(
870        &self,
871        interval: Duration,
872        node_type: Option<NodeType>,
873        custom_inventory: Option<Vec<VirtualMachine>>,
874        delay: Option<u64>,
875        service_names: Option<Vec<String>>,
876    ) -> Result<()> {
877        self.ansible_provisioner.stop_nodes(
878            &self.environment_name,
879            interval,
880            node_type,
881            custom_inventory,
882            delay,
883            service_names,
884        )?;
885        Ok(())
886    }
887
888    pub fn stop_telegraf(
889        &self,
890        node_type: Option<NodeType>,
891        custom_inventory: Option<Vec<VirtualMachine>>,
892    ) -> Result<()> {
893        self.ansible_provisioner.stop_telegraf(
894            &self.environment_name,
895            node_type,
896            custom_inventory,
897        )?;
898        Ok(())
899    }
900
901    pub fn upgrade(&self, options: UpgradeOptions) -> Result<()> {
902        self.ansible_provisioner.upgrade_nodes(&options)?;
903        Ok(())
904    }
905
906    pub fn upgrade_antctl(
907        &self,
908        version: Version,
909        node_type: Option<NodeType>,
910        custom_inventory: Option<Vec<VirtualMachine>>,
911    ) -> Result<()> {
912        self.ansible_provisioner.upgrade_antctl(
913            &self.environment_name,
914            &version,
915            node_type,
916            custom_inventory,
917        )?;
918        Ok(())
919    }
920
921    pub fn upgrade_node_telegraf(&self, name: &str) -> Result<()> {
922        self.ansible_provisioner.upgrade_node_telegraf(name)?;
923        Ok(())
924    }
925
926    pub fn upgrade_client_telegraf(&self, name: &str) -> Result<()> {
927        self.ansible_provisioner.upgrade_client_telegraf(name)?;
928        Ok(())
929    }
930
931    pub async fn clean(&self) -> Result<()> {
932        let environment_details =
933            get_environment_details(&self.environment_name, &self.s3_repository)
934                .await
935                .inspect_err(|err| {
936                    println!("Failed to get environment details: {err}. Continuing cleanup...");
937                })
938                .ok();
939        if let Some(environment_details) = &environment_details {
940            funding::drain_funds(&self.ansible_provisioner, environment_details).await?;
941        }
942
943        self.destroy_infra(environment_details).await?;
944
945        cleanup_environment_inventory(
946            &self.environment_name,
947            &self
948                .working_directory_path
949                .join("ansible")
950                .join("inventory"),
951            None,
952        )?;
953
954        println!("Deleted Ansible inventory for {}", self.environment_name);
955
956        if let Err(err) = self
957            .s3_repository
958            .delete_object("sn-environment-type", &self.environment_name)
959            .await
960        {
961            println!("Failed to delete environment type: {err}. Continuing cleanup...");
962        }
963        Ok(())
964    }
965
966    async fn destroy_infra(&self, environment_details: Option<EnvironmentDetails>) -> Result<()> {
967        infra::select_workspace(&self.terraform_runner, &self.environment_name)?;
968
969        let options = InfraRunOptions::generate_existing(
970            &self.environment_name,
971            &self.terraform_runner,
972            environment_details.as_ref(),
973        )
974        .await?;
975
976        let args = build_terraform_args(&options)?;
977
978        self.terraform_runner.destroy(Some(args), None)?;
979
980        infra::delete_workspace(&self.terraform_runner, &self.environment_name)?;
981
982        Ok(())
983    }
984}
985
986//
987// Shared Helpers
988//
989
990pub fn get_genesis_multiaddr(
991    ansible_runner: &AnsibleRunner,
992    ssh_client: &SshClient,
993) -> Result<(String, IpAddr)> {
994    let genesis_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Genesis, true)?;
995    let genesis_ip = genesis_inventory[0].public_ip_addr;
996
997    // It's possible for the genesis host to be altered from its original state where a node was
998    // started with the `--first` flag.
999    // First attempt: try to find node with first=true
1000    let multiaddr = ssh_client
1001        .run_command(
1002            &genesis_ip,
1003            "root",
1004            "jq -r '.nodes[] | select(.peers_args.first == true) | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1005            false,
1006        )
1007        .map(|output| output.first().cloned())
1008        .unwrap_or_else(|err| {
1009            log::error!("Failed to find first node with quic-v1 protocol: {err:?}");
1010            None
1011        });
1012
1013    // Second attempt: if first attempt failed, see if any node is available.
1014    let multiaddr = match multiaddr {
1015        Some(addr) => addr,
1016        None => ssh_client
1017            .run_command(
1018                &genesis_ip,
1019                "root",
1020                "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1021                false,
1022            )?
1023            .first()
1024            .cloned()
1025            .ok_or_else(|| Error::GenesisListenAddress)?,
1026    };
1027
1028    Ok((multiaddr, genesis_ip))
1029}
1030
1031pub fn get_anvil_node_data(
1032    ansible_runner: &AnsibleRunner,
1033    ssh_client: &SshClient,
1034) -> Result<AnvilNodeData> {
1035    let evm_inventory = ansible_runner.get_inventory(AnsibleInventoryType::EvmNodes, true)?;
1036    if evm_inventory.is_empty() {
1037        return Err(Error::EvmNodeNotFound);
1038    }
1039
1040    let evm_ip = evm_inventory[0].public_ip_addr;
1041    debug!("Retrieved IP address for EVM node: {evm_ip}");
1042    let csv_file_path = "/home/ant/.local/share/autonomi/evm_testnet_data.csv";
1043
1044    const MAX_ATTEMPTS: u8 = 5;
1045    const RETRY_DELAY: Duration = Duration::from_secs(5);
1046
1047    for attempt in 1..=MAX_ATTEMPTS {
1048        match ssh_client.run_command(&evm_ip, "ant", &format!("cat {}", csv_file_path), false) {
1049            Ok(output) => {
1050                if let Some(csv_contents) = output.first() {
1051                    let parts: Vec<&str> = csv_contents.split(',').collect();
1052                    if parts.len() != 4 {
1053                        return Err(Error::EvmTestnetDataParsingError(
1054                            "Expected 4 fields in the CSV".to_string(),
1055                        ));
1056                    }
1057
1058                    let evm_testnet_data = AnvilNodeData {
1059                        rpc_url: parts[0].trim().to_string(),
1060                        payment_token_address: parts[1].trim().to_string(),
1061                        data_payments_address: parts[2].trim().to_string(),
1062                        deployer_wallet_private_key: parts[3].trim().to_string(),
1063                    };
1064                    return Ok(evm_testnet_data);
1065                }
1066            }
1067            Err(e) => {
1068                if attempt == MAX_ATTEMPTS {
1069                    return Err(e);
1070                }
1071                println!(
1072                    "Attempt {} failed to read EVM testnet data. Retrying in {} seconds...",
1073                    attempt,
1074                    RETRY_DELAY.as_secs()
1075                );
1076            }
1077        }
1078        std::thread::sleep(RETRY_DELAY);
1079    }
1080
1081    Err(Error::EvmTestnetDataNotFound)
1082}
1083
1084pub fn get_multiaddr(
1085    ansible_runner: &AnsibleRunner,
1086    ssh_client: &SshClient,
1087) -> Result<(String, IpAddr)> {
1088    let node_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Nodes, true)?;
1089    // For upscaling a bootstrap deployment, we'd need to select one of the nodes that's already
1090    // provisioned. So just try the first one.
1091    let node_ip = node_inventory
1092        .iter()
1093        .find(|vm| vm.name.ends_with("-node-1"))
1094        .ok_or_else(|| Error::NodeAddressNotFound)?
1095        .public_ip_addr;
1096
1097    debug!("Getting multiaddr from node {node_ip}");
1098
1099    let multiaddr =
1100        ssh_client
1101        .run_command(
1102            &node_ip,
1103            "root",
1104            // fetch the first multiaddr which does not contain the localhost addr.
1105            "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not)' /var/antctl/node_registry.json | head -n 1",
1106            false,
1107        )?.first()
1108        .cloned()
1109        .ok_or_else(|| Error::NodeAddressNotFound)?;
1110
1111    // The node_ip is obviously inside the multiaddr, but it's just being returned as a
1112    // separate item for convenience.
1113    Ok((multiaddr, node_ip))
1114}
1115
1116pub async fn get_and_extract_archive_from_s3(
1117    s3_repository: &S3Repository,
1118    bucket_name: &str,
1119    archive_bucket_path: &str,
1120    dest_path: &Path,
1121) -> Result<()> {
1122    // In this case, not using unwrap leads to having to provide a very trivial error variant that
1123    // doesn't seem very valuable.
1124    let archive_file_name = archive_bucket_path.split('/').last().unwrap();
1125    let archive_dest_path = dest_path.join(archive_file_name);
1126    s3_repository
1127        .download_object(bucket_name, archive_bucket_path, &archive_dest_path)
1128        .await?;
1129    extract_archive(&archive_dest_path, dest_path)?;
1130    Ok(())
1131}
1132
1133pub fn extract_archive(archive_path: &Path, dest_path: &Path) -> Result<()> {
1134    let archive_file = File::open(archive_path)?;
1135    let decoder = GzDecoder::new(archive_file);
1136    let mut archive = Archive::new(decoder);
1137    let entries = archive.entries()?;
1138    for entry_result in entries {
1139        let mut entry = entry_result?;
1140        let extract_path = dest_path.join(entry.path()?);
1141        if entry.header().entry_type() == tar::EntryType::Directory {
1142            std::fs::create_dir_all(extract_path)?;
1143            continue;
1144        }
1145        let mut file = BufWriter::new(File::create(extract_path)?);
1146        std::io::copy(&mut entry, &mut file)?;
1147    }
1148    std::fs::remove_file(archive_path)?;
1149    Ok(())
1150}
1151
1152pub fn run_external_command(
1153    binary_path: PathBuf,
1154    working_directory_path: PathBuf,
1155    args: Vec<String>,
1156    suppress_stdout: bool,
1157    suppress_stderr: bool,
1158) -> Result<Vec<String>> {
1159    let mut command = Command::new(binary_path.clone());
1160    for arg in &args {
1161        command.arg(arg);
1162    }
1163    command.stdout(Stdio::piped());
1164    command.stderr(Stdio::piped());
1165    command.current_dir(working_directory_path.clone());
1166    debug!("Running {binary_path:#?} with args {args:#?}");
1167    debug!("Working directory set to {working_directory_path:#?}");
1168
1169    let mut child = command.spawn()?;
1170    let mut output_lines = Vec::new();
1171
1172    if let Some(ref mut stdout) = child.stdout {
1173        let reader = BufReader::new(stdout);
1174        for line in reader.lines() {
1175            let line = line?;
1176            if !suppress_stdout {
1177                println!("{line}");
1178            }
1179            output_lines.push(line);
1180        }
1181    }
1182
1183    if let Some(ref mut stderr) = child.stderr {
1184        let reader = BufReader::new(stderr);
1185        for line in reader.lines() {
1186            let line = line?;
1187            if !suppress_stderr {
1188                eprintln!("{line}");
1189            }
1190            output_lines.push(line);
1191        }
1192    }
1193
1194    let output = child.wait()?;
1195    if !output.success() {
1196        // Using `unwrap` here avoids introducing another error variant, which seems excessive.
1197        let binary_path = binary_path.to_str().unwrap();
1198        return Err(Error::ExternalCommandRunFailed {
1199            binary: binary_path.to_string(),
1200            exit_status: output,
1201        });
1202    }
1203
1204    Ok(output_lines)
1205}
1206
1207pub fn is_binary_on_path(binary_name: &str) -> bool {
1208    if let Ok(path) = std::env::var("PATH") {
1209        for dir in path.split(':') {
1210            let mut full_path = PathBuf::from(dir);
1211            full_path.push(binary_name);
1212            if full_path.exists() {
1213                return true;
1214            }
1215        }
1216    }
1217    false
1218}
1219
1220pub fn get_wallet_directory() -> Result<PathBuf> {
1221    Ok(dirs_next::data_dir()
1222        .ok_or_else(|| Error::CouldNotRetrieveDataDirectory)?
1223        .join("safe")
1224        .join("client")
1225        .join("wallet"))
1226}
1227
1228pub async fn notify_slack(inventory: DeploymentInventory) -> Result<()> {
1229    let webhook_url =
1230        std::env::var("SLACK_WEBHOOK_URL").map_err(|_| Error::SlackWebhookUrlNotSupplied)?;
1231
1232    let mut message = String::new();
1233    message.push_str("*Testnet Details*\n");
1234    message.push_str(&format!("Name: {}\n", inventory.name));
1235    message.push_str(&format!("Node count: {}\n", inventory.peers().len()));
1236    message.push_str(&format!("Faucet address: {:?}\n", inventory.faucet_address));
1237    match inventory.binary_option {
1238        BinaryOption::BuildFromSource {
1239            ref repo_owner,
1240            ref branch,
1241            ..
1242        } => {
1243            message.push_str("*Branch Details*\n");
1244            message.push_str(&format!("Repo owner: {}\n", repo_owner));
1245            message.push_str(&format!("Branch: {}\n", branch));
1246        }
1247        BinaryOption::Versioned {
1248            ant_version: ref safe_version,
1249            antnode_version: ref safenode_version,
1250            antctl_version: ref safenode_manager_version,
1251            ..
1252        } => {
1253            message.push_str("*Version Details*\n");
1254            message.push_str(&format!(
1255                "ant version: {}\n",
1256                safe_version
1257                    .as_ref()
1258                    .map_or("None".to_string(), |v| v.to_string())
1259            ));
1260            message.push_str(&format!(
1261                "safenode version: {}\n",
1262                safenode_version
1263                    .as_ref()
1264                    .map_or("None".to_string(), |v| v.to_string())
1265            ));
1266            message.push_str(&format!(
1267                "antctl version: {}\n",
1268                safenode_manager_version
1269                    .as_ref()
1270                    .map_or("None".to_string(), |v| v.to_string())
1271            ));
1272        }
1273    }
1274
1275    message.push_str("*Sample Peers*\n");
1276    message.push_str("```\n");
1277    for peer in inventory.peers().iter().take(20) {
1278        message.push_str(&format!("{peer}\n"));
1279    }
1280    message.push_str("```\n");
1281    message.push_str("*Available Files*\n");
1282    message.push_str("```\n");
1283    for (addr, file_name) in inventory.uploaded_files.iter() {
1284        message.push_str(&format!("{}: {}\n", addr, file_name))
1285    }
1286    message.push_str("```\n");
1287
1288    let payload = json!({
1289        "text": message,
1290    });
1291    reqwest::Client::new()
1292        .post(webhook_url)
1293        .json(&payload)
1294        .send()
1295        .await?;
1296    println!("{message}");
1297    println!("Posted notification to Slack");
1298    Ok(())
1299}
1300
1301fn print_duration(duration: Duration) {
1302    let total_seconds = duration.as_secs();
1303    let minutes = total_seconds / 60;
1304    let seconds = total_seconds % 60;
1305    debug!("Time taken: {} minutes and {} seconds", minutes, seconds);
1306}
1307
1308pub fn get_progress_bar(length: u64) -> Result<ProgressBar> {
1309    let progress_bar = ProgressBar::new(length);
1310    progress_bar.set_style(
1311        ProgressStyle::default_bar()
1312            .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len}")?
1313            .progress_chars("#>-"),
1314    );
1315    progress_bar.enable_steady_tick(Duration::from_millis(100));
1316    Ok(progress_bar)
1317}
1318
1319pub async fn get_environment_details(
1320    environment_name: &str,
1321    s3_repository: &S3Repository,
1322) -> Result<EnvironmentDetails> {
1323    let temp_file = tempfile::NamedTempFile::new()?;
1324
1325    let max_retries = 3;
1326    let mut retries = 0;
1327    let env_details = loop {
1328        debug!("Downloading the environment details file for {environment_name} from S3");
1329        match s3_repository
1330            .download_object("sn-environment-type", environment_name, temp_file.path())
1331            .await
1332        {
1333            Ok(_) => {
1334                debug!("Downloaded the environment details file for {environment_name} from S3");
1335                let content = match std::fs::read_to_string(temp_file.path()) {
1336                    Ok(content) => content,
1337                    Err(err) => {
1338                        log::error!("Could not read the environment details file: {err:?}");
1339                        if retries < max_retries {
1340                            debug!("Retrying to read the environment details file");
1341                            retries += 1;
1342                            continue;
1343                        } else {
1344                            return Err(Error::EnvironmentDetailsNotFound(
1345                                environment_name.to_string(),
1346                            ));
1347                        }
1348                    }
1349                };
1350                trace!("Content of the environment details file: {}", content);
1351
1352                match serde_json::from_str(&content) {
1353                    Ok(environment_details) => break environment_details,
1354                    Err(err) => {
1355                        log::error!("Could not parse the environment details file: {err:?}");
1356                        if retries < max_retries {
1357                            debug!("Retrying to parse the environment details file");
1358                            retries += 1;
1359                            continue;
1360                        } else {
1361                            return Err(Error::EnvironmentDetailsNotFound(
1362                                environment_name.to_string(),
1363                            ));
1364                        }
1365                    }
1366                }
1367            }
1368            Err(err) => {
1369                log::error!(
1370                    "Could not download the environment details file for {environment_name} from S3: {err:?}"
1371                );
1372                if retries < max_retries {
1373                    retries += 1;
1374                    continue;
1375                } else {
1376                    return Err(Error::EnvironmentDetailsNotFound(
1377                        environment_name.to_string(),
1378                    ));
1379                }
1380            }
1381        }
1382    };
1383
1384    debug!("Fetched environment details: {env_details:?}");
1385
1386    Ok(env_details)
1387}
1388
1389pub async fn write_environment_details(
1390    s3_repository: &S3Repository,
1391    environment_name: &str,
1392    environment_details: &EnvironmentDetails,
1393) -> Result<()> {
1394    let temp_dir = tempfile::tempdir()?;
1395    let path = temp_dir.path().to_path_buf().join(environment_name);
1396    let mut file = File::create(&path)?;
1397    let json = serde_json::to_string(environment_details)?;
1398    file.write_all(json.as_bytes())?;
1399    s3_repository
1400        .upload_file("sn-environment-type", &path, true)
1401        .await?;
1402    Ok(())
1403}
1404
1405pub fn calculate_size_per_attached_volume(node_count: u16) -> u16 {
1406    if node_count == 0 {
1407        return 0;
1408    }
1409    let total_volume_required = node_count * STORAGE_REQUIRED_PER_NODE;
1410
1411    // 7 attached volumes per VM
1412    (total_volume_required as f64 / 7.0).ceil() as u16
1413}
1414
1415pub fn get_bootstrap_cache_url(ip_addr: &IpAddr) -> String {
1416    format!("http://{ip_addr}/bootstrap_cache.json")
1417}