sn_testnet_deploy/
lib.rs

1// Copyright (c) 2023, MaidSafe.
2// All rights reserved.
3//
4// This SAFE Network Software is licensed under the BSD-3-Clause license.
5// Please see the LICENSE file for more details.
6
7pub mod ansible;
8pub mod bootstrap;
9pub mod deploy;
10pub mod digital_ocean;
11pub mod error;
12pub mod funding;
13pub mod infra;
14pub mod inventory;
15pub mod logs;
16pub mod reserved_ip;
17pub mod rpc_client;
18pub mod s3;
19pub mod safe;
20pub mod setup;
21pub mod ssh;
22pub mod terraform;
23pub mod uploaders;
24pub mod upscale;
25
26const STORAGE_REQUIRED_PER_NODE: u16 = 7;
27
28use crate::{
29    ansible::{
30        extra_vars::ExtraVarsDocBuilder,
31        inventory::{cleanup_environment_inventory, AnsibleInventoryType},
32        provisioning::AnsibleProvisioner,
33        AnsibleRunner,
34    },
35    error::{Error, Result},
36    inventory::{DeploymentInventory, VirtualMachine},
37    rpc_client::RpcClient,
38    s3::S3Repository,
39    ssh::SshClient,
40    terraform::TerraformRunner,
41};
42use ant_service_management::ServiceStatus;
43use flate2::read::GzDecoder;
44use indicatif::{ProgressBar, ProgressStyle};
45use infra::{build_terraform_args, InfraRunOptions};
46use log::{debug, trace};
47use semver::Version;
48use serde::{Deserialize, Serialize};
49use serde_json::json;
50use std::{
51    fs::File,
52    io::{BufRead, BufReader, BufWriter, Write},
53    net::IpAddr,
54    path::{Path, PathBuf},
55    process::{Command, Stdio},
56    str::FromStr,
57    time::Duration,
58};
59use tar::Archive;
60
61const ANSIBLE_DEFAULT_FORKS: usize = 50;
62
63#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
64pub enum DeploymentType {
65    /// The deployment has been bootstrapped from an existing network.
66    Bootstrap,
67    /// The deployment is a new network.
68    #[default]
69    New,
70    Uploaders,
71}
72
73#[derive(Debug, Clone, Default, Serialize, Deserialize)]
74pub struct AnvilNodeData {
75    pub data_payments_address: String,
76    pub deployer_wallet_private_key: String,
77    pub payment_token_address: String,
78    pub rpc_url: String,
79}
80
81impl std::fmt::Display for DeploymentType {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        match self {
84            DeploymentType::Bootstrap => write!(f, "bootstrap"),
85            DeploymentType::New => write!(f, "new"),
86            DeploymentType::Uploaders => write!(f, "uploaders"),
87        }
88    }
89}
90
91impl std::str::FromStr for DeploymentType {
92    type Err = String;
93
94    fn from_str(s: &str) -> Result<Self, Self::Err> {
95        match s.to_lowercase().as_str() {
96            "bootstrap" => Ok(DeploymentType::Bootstrap),
97            "new" => Ok(DeploymentType::New),
98            "uploaders" => Ok(DeploymentType::Uploaders),
99            _ => Err(format!("Invalid deployment type: {}", s)),
100        }
101    }
102}
103
104#[derive(Debug, Clone)]
105pub enum NodeType {
106    FullConePrivateNode,
107    Generic,
108    Genesis,
109    PeerCache,
110    SymmetricPrivateNode,
111}
112
113impl std::fmt::Display for NodeType {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        match self {
116            NodeType::FullConePrivateNode => write!(f, "full-cone-private"),
117            NodeType::Generic => write!(f, "generic"),
118            NodeType::Genesis => write!(f, "genesis"),
119            NodeType::PeerCache => write!(f, "peer-cache"),
120            NodeType::SymmetricPrivateNode => write!(f, "symmetric-private"),
121        }
122    }
123}
124
125impl std::str::FromStr for NodeType {
126    type Err = String;
127
128    fn from_str(s: &str) -> Result<Self, Self::Err> {
129        match s.to_lowercase().as_str() {
130            "full-cone-private" => Ok(NodeType::FullConePrivateNode),
131            "generic" => Ok(NodeType::Generic),
132            "genesis" => Ok(NodeType::Genesis),
133            "peer-cache" => Ok(NodeType::PeerCache),
134            "symmetric-private" => Ok(NodeType::SymmetricPrivateNode),
135            _ => Err(format!("Invalid node type: {}", s)),
136        }
137    }
138}
139
140impl NodeType {
141    pub fn telegraf_role(&self) -> &'static str {
142        match self {
143            NodeType::FullConePrivateNode => "NAT_FULL_CONE_NODE",
144            NodeType::Generic => "GENERIC_NODE",
145            NodeType::Genesis => "GENESIS_NODE",
146            NodeType::PeerCache => "PEER_CACHE_NODE",
147            NodeType::SymmetricPrivateNode => "NAT_RANDOMIZED_NODE",
148        }
149    }
150
151    pub fn to_ansible_inventory_type(&self) -> AnsibleInventoryType {
152        match self {
153            NodeType::FullConePrivateNode => AnsibleInventoryType::FullConePrivateNodes,
154            NodeType::Generic => AnsibleInventoryType::Nodes,
155            NodeType::Genesis => AnsibleInventoryType::Genesis,
156            NodeType::PeerCache => AnsibleInventoryType::PeerCacheNodes,
157            NodeType::SymmetricPrivateNode => AnsibleInventoryType::SymmetricPrivateNodes,
158        }
159    }
160}
161
162#[derive(Clone, Debug, Default, Eq, Serialize, Deserialize, PartialEq)]
163pub enum EvmNetwork {
164    #[default]
165    Anvil,
166    ArbitrumOne,
167    ArbitrumSepolia,
168    Custom,
169}
170
171impl std::fmt::Display for EvmNetwork {
172    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173        match self {
174            EvmNetwork::Anvil => write!(f, "evm-custom"),
175            EvmNetwork::ArbitrumOne => write!(f, "evm-arbitrum-one"),
176            EvmNetwork::ArbitrumSepolia => write!(f, "evm-arbitrum-sepolia"),
177            EvmNetwork::Custom => write!(f, "evm-custom"),
178        }
179    }
180}
181
182impl std::str::FromStr for EvmNetwork {
183    type Err = String;
184
185    fn from_str(s: &str) -> Result<Self, Self::Err> {
186        match s.to_lowercase().as_str() {
187            "anvil" => Ok(EvmNetwork::Anvil),
188            "arbitrum-one" => Ok(EvmNetwork::ArbitrumOne),
189            "arbitrum-sepolia" => Ok(EvmNetwork::ArbitrumSepolia),
190            "custom" => Ok(EvmNetwork::Custom),
191            _ => Err(format!("Invalid EVM network type: {}", s)),
192        }
193    }
194}
195
196#[derive(Clone, Debug, Default, Serialize, Deserialize)]
197pub struct EvmDetails {
198    pub network: EvmNetwork,
199    pub data_payments_address: Option<String>,
200    pub payment_token_address: Option<String>,
201    pub rpc_url: Option<String>,
202}
203
204#[derive(Clone, Debug, Default, Serialize, Deserialize)]
205pub struct EnvironmentDetails {
206    pub deployment_type: DeploymentType,
207    pub environment_type: EnvironmentType,
208    pub evm_details: EvmDetails,
209    pub funding_wallet_address: Option<String>,
210    pub network_id: Option<u8>,
211    pub rewards_address: Option<String>,
212}
213
214#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
215pub enum EnvironmentType {
216    #[default]
217    Development,
218    Production,
219    Staging,
220}
221
222impl EnvironmentType {
223    pub fn get_tfvars_filename(&self, name: &str) -> String {
224        match self {
225            EnvironmentType::Development => "dev.tfvars".to_string(),
226            EnvironmentType::Staging => "staging.tfvars".to_string(),
227            EnvironmentType::Production => {
228                format!("{name}.tfvars",)
229            }
230        }
231    }
232
233    pub fn get_default_peer_cache_node_count(&self) -> u16 {
234        match self {
235            EnvironmentType::Development => 5,
236            EnvironmentType::Production => 5,
237            EnvironmentType::Staging => 5,
238        }
239    }
240
241    pub fn get_default_node_count(&self) -> u16 {
242        match self {
243            EnvironmentType::Development => 25,
244            EnvironmentType::Production => 25,
245            EnvironmentType::Staging => 25,
246        }
247    }
248
249    pub fn get_default_symmetric_private_node_count(&self) -> u16 {
250        self.get_default_node_count()
251    }
252
253    pub fn get_default_full_cone_private_node_count(&self) -> u16 {
254        self.get_default_node_count()
255    }
256}
257
258impl std::fmt::Display for EnvironmentType {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        match self {
261            EnvironmentType::Development => write!(f, "development"),
262            EnvironmentType::Production => write!(f, "production"),
263            EnvironmentType::Staging => write!(f, "staging"),
264        }
265    }
266}
267
268impl FromStr for EnvironmentType {
269    type Err = Error;
270
271    fn from_str(s: &str) -> Result<Self, Self::Err> {
272        match s.to_lowercase().as_str() {
273            "development" => Ok(EnvironmentType::Development),
274            "production" => Ok(EnvironmentType::Production),
275            "staging" => Ok(EnvironmentType::Staging),
276            _ => Err(Error::EnvironmentNameFromStringError(s.to_string())),
277        }
278    }
279}
280
281/// Specify the binary option for the deployment.
282///
283/// There are several binaries involved in the deployment:
284/// * safenode
285/// * safenode_rpc_client
286/// * faucet
287/// * safe
288///
289/// The `safe` binary is only used for smoke testing the deployment, although we don't really do
290/// that at the moment.
291///
292/// The options are to build from source, or supply a pre-built, versioned binary, which will be
293/// fetched from S3. Building from source adds significant time to the deployment.
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub enum BinaryOption {
296    /// Binaries will be built from source.
297    BuildFromSource {
298        /// A comma-separated list that will be passed to the `--features` argument.
299        antnode_features: Option<String>,
300        branch: String,
301        repo_owner: String,
302    },
303    /// Pre-built, versioned binaries will be fetched from S3.
304    Versioned {
305        ant_version: Option<Version>,
306        antctl_version: Option<Version>,
307        antnode_version: Option<Version>,
308    },
309}
310
311impl BinaryOption {
312    pub fn print(&self) {
313        match self {
314            BinaryOption::BuildFromSource {
315                antnode_features,
316                branch,
317                repo_owner,
318            } => {
319                println!("Source configuration:");
320                println!("  Repository owner: {}", repo_owner);
321                println!("  Branch: {}", branch);
322                if let Some(features) = antnode_features {
323                    println!("  Antnode features: {}", features);
324                }
325            }
326            BinaryOption::Versioned {
327                ant_version,
328                antctl_version,
329                antnode_version,
330            } => {
331                println!("Versioned binaries configuration:");
332                if let Some(version) = ant_version {
333                    println!("  ant version: {}", version);
334                }
335                if let Some(version) = antctl_version {
336                    println!("  antctl version: {}", version);
337                }
338                if let Some(version) = antnode_version {
339                    println!("  antnode version: {}", version);
340                }
341            }
342        }
343    }
344}
345
346#[derive(Debug, Clone, Copy)]
347pub enum CloudProvider {
348    Aws,
349    DigitalOcean,
350}
351
352impl std::fmt::Display for CloudProvider {
353    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
354        match self {
355            CloudProvider::Aws => write!(f, "aws"),
356            CloudProvider::DigitalOcean => write!(f, "digital-ocean"),
357        }
358    }
359}
360
361impl CloudProvider {
362    pub fn get_ssh_user(&self) -> String {
363        match self {
364            CloudProvider::Aws => "ubuntu".to_string(),
365            CloudProvider::DigitalOcean => "root".to_string(),
366        }
367    }
368}
369
370#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
371pub enum LogFormat {
372    Default,
373    Json,
374}
375
376impl LogFormat {
377    pub fn parse_from_str(val: &str) -> Result<Self> {
378        match val {
379            "default" => Ok(LogFormat::Default),
380            "json" => Ok(LogFormat::Json),
381            _ => Err(Error::LoggingConfiguration(
382                "The only valid values for this argument are \"default\" or \"json\"".to_string(),
383            )),
384        }
385    }
386
387    pub fn as_str(&self) -> &'static str {
388        match self {
389            LogFormat::Default => "default",
390            LogFormat::Json => "json",
391        }
392    }
393}
394
395#[derive(Clone)]
396pub struct UpgradeOptions {
397    pub ansible_verbose: bool,
398    pub custom_inventory: Option<Vec<VirtualMachine>>,
399    pub env_variables: Option<Vec<(String, String)>>,
400    pub force: bool,
401    pub forks: usize,
402    pub interval: Duration,
403    pub name: String,
404    pub node_type: Option<NodeType>,
405    pub pre_upgrade_delay: Option<u64>,
406    pub provider: CloudProvider,
407    pub version: Option<String>,
408}
409
410impl UpgradeOptions {
411    pub fn get_ansible_vars(&self) -> String {
412        let mut extra_vars = ExtraVarsDocBuilder::default();
413        extra_vars.add_variable("interval", &self.interval.as_millis().to_string());
414        if let Some(env_variables) = &self.env_variables {
415            extra_vars.add_env_variable_list("env_variables", env_variables.clone());
416        }
417        if self.force {
418            extra_vars.add_variable("force", &self.force.to_string());
419        }
420        if let Some(version) = &self.version {
421            extra_vars.add_variable("antnode_version", version);
422        }
423        if let Some(pre_upgrade_delay) = &self.pre_upgrade_delay {
424            extra_vars.add_variable("pre_upgrade_delay", &pre_upgrade_delay.to_string());
425        }
426        extra_vars.build()
427    }
428}
429
430#[derive(Default)]
431pub struct TestnetDeployBuilder {
432    ansible_forks: Option<usize>,
433    ansible_verbose_mode: bool,
434    deployment_type: EnvironmentType,
435    environment_name: String,
436    provider: Option<CloudProvider>,
437    ssh_secret_key_path: Option<PathBuf>,
438    state_bucket_name: Option<String>,
439    terraform_binary_path: Option<PathBuf>,
440    vault_password_path: Option<PathBuf>,
441    working_directory_path: Option<PathBuf>,
442}
443
444impl TestnetDeployBuilder {
445    pub fn new() -> Self {
446        Default::default()
447    }
448
449    pub fn ansible_verbose_mode(&mut self, ansible_verbose_mode: bool) -> &mut Self {
450        self.ansible_verbose_mode = ansible_verbose_mode;
451        self
452    }
453
454    pub fn ansible_forks(&mut self, ansible_forks: usize) -> &mut Self {
455        self.ansible_forks = Some(ansible_forks);
456        self
457    }
458
459    pub fn deployment_type(&mut self, deployment_type: EnvironmentType) -> &mut Self {
460        self.deployment_type = deployment_type;
461        self
462    }
463
464    pub fn environment_name(&mut self, name: &str) -> &mut Self {
465        self.environment_name = name.to_string();
466        self
467    }
468
469    pub fn provider(&mut self, provider: CloudProvider) -> &mut Self {
470        self.provider = Some(provider);
471        self
472    }
473
474    pub fn state_bucket_name(&mut self, state_bucket_name: String) -> &mut Self {
475        self.state_bucket_name = Some(state_bucket_name);
476        self
477    }
478
479    pub fn terraform_binary_path(&mut self, terraform_binary_path: PathBuf) -> &mut Self {
480        self.terraform_binary_path = Some(terraform_binary_path);
481        self
482    }
483
484    pub fn working_directory(&mut self, working_directory_path: PathBuf) -> &mut Self {
485        self.working_directory_path = Some(working_directory_path);
486        self
487    }
488
489    pub fn ssh_secret_key_path(&mut self, ssh_secret_key_path: PathBuf) -> &mut Self {
490        self.ssh_secret_key_path = Some(ssh_secret_key_path);
491        self
492    }
493
494    pub fn vault_password_path(&mut self, vault_password_path: PathBuf) -> &mut Self {
495        self.vault_password_path = Some(vault_password_path);
496        self
497    }
498
499    pub fn build(&self) -> Result<TestnetDeployer> {
500        let provider = self.provider.unwrap_or(CloudProvider::DigitalOcean);
501        match provider {
502            CloudProvider::DigitalOcean => {
503                let digital_ocean_pat = std::env::var("DO_PAT").map_err(|_| {
504                    Error::CloudProviderCredentialsNotSupplied("DO_PAT".to_string())
505                })?;
506                // The DO_PAT variable is not actually read by either Terraform or Ansible.
507                // Each tool uses a different variable, so instead we set each of those variables
508                // to the value of DO_PAT. This means the user only needs to set one variable.
509                std::env::set_var("DIGITALOCEAN_TOKEN", digital_ocean_pat.clone());
510                std::env::set_var("DO_API_TOKEN", digital_ocean_pat);
511            }
512            _ => {
513                return Err(Error::CloudProviderNotSupported(provider.to_string()));
514            }
515        }
516
517        let state_bucket_name = match self.state_bucket_name {
518            Some(ref bucket_name) => bucket_name.clone(),
519            None => std::env::var("TERRAFORM_STATE_BUCKET_NAME")?,
520        };
521
522        let default_terraform_bin_path = PathBuf::from("terraform");
523        let terraform_binary_path = self
524            .terraform_binary_path
525            .as_ref()
526            .unwrap_or(&default_terraform_bin_path);
527
528        let working_directory_path = match self.working_directory_path {
529            Some(ref work_dir_path) => work_dir_path.clone(),
530            None => std::env::current_dir()?.join("resources"),
531        };
532
533        let ssh_secret_key_path = match self.ssh_secret_key_path {
534            Some(ref ssh_sk_path) => ssh_sk_path.clone(),
535            None => PathBuf::from(std::env::var("SSH_KEY_PATH")?),
536        };
537
538        let vault_password_path = match self.vault_password_path {
539            Some(ref vault_pw_path) => vault_pw_path.clone(),
540            None => PathBuf::from(std::env::var("ANSIBLE_VAULT_PASSWORD_PATH")?),
541        };
542
543        let terraform_runner = TerraformRunner::new(
544            terraform_binary_path.to_path_buf(),
545            working_directory_path
546                .join("terraform")
547                .join("testnet")
548                .join(provider.to_string()),
549            provider,
550            &state_bucket_name,
551        )?;
552        let ansible_runner = AnsibleRunner::new(
553            self.ansible_forks.unwrap_or(ANSIBLE_DEFAULT_FORKS),
554            self.ansible_verbose_mode,
555            &self.environment_name,
556            provider,
557            ssh_secret_key_path.clone(),
558            vault_password_path,
559            working_directory_path.join("ansible"),
560        )?;
561        let ssh_client = SshClient::new(ssh_secret_key_path);
562        let ansible_provisioner =
563            AnsibleProvisioner::new(ansible_runner, provider, ssh_client.clone());
564        let rpc_client = RpcClient::new(
565            PathBuf::from("/usr/local/bin/safenode_rpc_client"),
566            working_directory_path.clone(),
567        );
568
569        // Remove any `safe` binary from a previous deployment. Otherwise you can end up with
570        // mismatched binaries.
571        let safe_path = working_directory_path.join("safe");
572        if safe_path.exists() {
573            std::fs::remove_file(safe_path)?;
574        }
575
576        let testnet = TestnetDeployer::new(
577            ansible_provisioner,
578            provider,
579            self.deployment_type.clone(),
580            &self.environment_name,
581            rpc_client,
582            S3Repository {},
583            ssh_client,
584            terraform_runner,
585            working_directory_path,
586        )?;
587
588        Ok(testnet)
589    }
590}
591
592#[derive(Clone)]
593pub struct TestnetDeployer {
594    pub ansible_provisioner: AnsibleProvisioner,
595    pub cloud_provider: CloudProvider,
596    pub deployment_type: EnvironmentType,
597    pub environment_name: String,
598    pub inventory_file_path: PathBuf,
599    pub rpc_client: RpcClient,
600    pub s3_repository: S3Repository,
601    pub ssh_client: SshClient,
602    pub terraform_runner: TerraformRunner,
603    pub working_directory_path: PathBuf,
604}
605
606impl TestnetDeployer {
607    #[allow(clippy::too_many_arguments)]
608    pub fn new(
609        ansible_provisioner: AnsibleProvisioner,
610        cloud_provider: CloudProvider,
611        deployment_type: EnvironmentType,
612        environment_name: &str,
613        rpc_client: RpcClient,
614        s3_repository: S3Repository,
615        ssh_client: SshClient,
616        terraform_runner: TerraformRunner,
617        working_directory_path: PathBuf,
618    ) -> Result<TestnetDeployer> {
619        if environment_name.is_empty() {
620            return Err(Error::EnvironmentNameRequired);
621        }
622        let inventory_file_path = working_directory_path
623            .join("ansible")
624            .join("inventory")
625            .join("dev_inventory_digital_ocean.yml");
626        Ok(TestnetDeployer {
627            ansible_provisioner,
628            cloud_provider,
629            deployment_type,
630            environment_name: environment_name.to_string(),
631            inventory_file_path,
632            rpc_client,
633            ssh_client,
634            s3_repository,
635            terraform_runner,
636            working_directory_path,
637        })
638    }
639
640    pub async fn init(&self) -> Result<()> {
641        if self
642            .s3_repository
643            .folder_exists(
644                "sn-testnet",
645                &format!("testnet-logs/{}", self.environment_name),
646            )
647            .await?
648        {
649            return Err(Error::LogsForPreviousTestnetExist(
650                self.environment_name.clone(),
651            ));
652        }
653
654        self.terraform_runner.init()?;
655        let workspaces = self.terraform_runner.workspace_list()?;
656        if !workspaces.contains(&self.environment_name) {
657            self.terraform_runner
658                .workspace_new(&self.environment_name)?;
659        } else {
660            println!("Workspace {} already exists", self.environment_name);
661        }
662
663        let rpc_client_path = self.working_directory_path.join("safenode_rpc_client");
664        if !rpc_client_path.is_file() {
665            println!("Downloading the rpc client for safenode...");
666            let archive_name = "safenode_rpc_client-latest-x86_64-unknown-linux-musl.tar.gz";
667            get_and_extract_archive_from_s3(
668                &self.s3_repository,
669                "sn-node-rpc-client",
670                archive_name,
671                &self.working_directory_path,
672            )
673            .await?;
674            #[cfg(unix)]
675            {
676                use std::os::unix::fs::PermissionsExt;
677                let mut permissions = std::fs::metadata(&rpc_client_path)?.permissions();
678                permissions.set_mode(0o755); // rwxr-xr-x
679                std::fs::set_permissions(&rpc_client_path, permissions)?;
680            }
681        }
682
683        Ok(())
684    }
685
686    pub fn plan(&self, options: &InfraRunOptions) -> Result<()> {
687        println!("Selecting {} workspace...", options.name);
688        self.terraform_runner.workspace_select(&options.name)?;
689
690        let args = build_terraform_args(options)?;
691
692        self.terraform_runner
693            .plan(Some(args), options.tfvars_filename.clone())?;
694        Ok(())
695    }
696
697    pub fn start(
698        &self,
699        interval: Duration,
700        node_type: Option<NodeType>,
701        custom_inventory: Option<Vec<VirtualMachine>>,
702    ) -> Result<()> {
703        self.ansible_provisioner.start_nodes(
704            &self.environment_name,
705            interval,
706            node_type,
707            custom_inventory,
708        )?;
709        Ok(())
710    }
711
712    /// Get the status of all nodes in a network.
713    ///
714    /// First, a playbook runs `safenode-manager status` against all the machines, to get the
715    /// current state of all the nodes. Then all the node registry files are retrieved and
716    /// deserialized to a `NodeRegistry`, allowing us to output the status of each node on each VM.
717    pub fn status(&self) -> Result<()> {
718        self.ansible_provisioner.status()?;
719
720        let peer_cache_node_registries = self
721            .ansible_provisioner
722            .get_node_registries(&AnsibleInventoryType::PeerCacheNodes)?;
723        let generic_node_registries = self
724            .ansible_provisioner
725            .get_node_registries(&AnsibleInventoryType::Nodes)?;
726        let symmetric_private_node_registries = self
727            .ansible_provisioner
728            .get_node_registries(&AnsibleInventoryType::SymmetricPrivateNodes)?;
729        let full_cone_private_node_registries = self
730            .ansible_provisioner
731            .get_node_registries(&AnsibleInventoryType::FullConePrivateNodes)?;
732        let genesis_node_registry = self
733            .ansible_provisioner
734            .get_node_registries(&AnsibleInventoryType::Genesis)?
735            .clone();
736
737        peer_cache_node_registries.print();
738        generic_node_registries.print();
739        symmetric_private_node_registries.print();
740        full_cone_private_node_registries.print();
741        genesis_node_registry.print();
742
743        let all_registries = [
744            &peer_cache_node_registries,
745            &generic_node_registries,
746            &symmetric_private_node_registries,
747            &full_cone_private_node_registries,
748            &genesis_node_registry,
749        ];
750
751        let mut total_nodes = 0;
752        let mut running_nodes = 0;
753        let mut stopped_nodes = 0;
754        let mut added_nodes = 0;
755        let mut removed_nodes = 0;
756
757        for (_, registry) in all_registries
758            .iter()
759            .flat_map(|r| r.retrieved_registries.iter())
760        {
761            for node in registry.nodes.iter() {
762                total_nodes += 1;
763                match node.status {
764                    ServiceStatus::Running => running_nodes += 1,
765                    ServiceStatus::Stopped => stopped_nodes += 1,
766                    ServiceStatus::Added => added_nodes += 1,
767                    ServiceStatus::Removed => removed_nodes += 1,
768                }
769            }
770        }
771
772        let peer_cache_hosts = peer_cache_node_registries.retrieved_registries.len();
773        let generic_hosts = generic_node_registries.retrieved_registries.len();
774        let symmetric_private_hosts = symmetric_private_node_registries.retrieved_registries.len();
775        let full_cone_private_hosts = full_cone_private_node_registries.retrieved_registries.len();
776
777        let peer_cache_nodes = peer_cache_node_registries
778            .retrieved_registries
779            .iter()
780            .flat_map(|(_, n)| n.nodes.iter())
781            .count();
782        let generic_nodes = generic_node_registries
783            .retrieved_registries
784            .iter()
785            .flat_map(|(_, n)| n.nodes.iter())
786            .count();
787        let symmetric_private_nodes = symmetric_private_node_registries
788            .retrieved_registries
789            .iter()
790            .flat_map(|(_, n)| n.nodes.iter())
791            .count();
792        let full_cone_private_nodes = full_cone_private_node_registries
793            .retrieved_registries
794            .iter()
795            .flat_map(|(_, n)| n.nodes.iter())
796            .count();
797
798        println!("-------");
799        println!("Summary");
800        println!("-------");
801        println!(
802            "Total peer cache nodes ({}x{}): {}",
803            peer_cache_hosts,
804            if peer_cache_hosts > 0 {
805                peer_cache_nodes / peer_cache_hosts
806            } else {
807                0
808            },
809            peer_cache_nodes
810        );
811        println!(
812            "Total generic nodes ({}x{}): {}",
813            generic_hosts,
814            if generic_hosts > 0 {
815                generic_nodes / generic_hosts
816            } else {
817                0
818            },
819            generic_nodes
820        );
821        println!(
822            "Total symmetric private nodes ({}x{}): {}",
823            symmetric_private_hosts,
824            if symmetric_private_hosts > 0 {
825                symmetric_private_nodes / symmetric_private_hosts
826            } else {
827                0
828            },
829            symmetric_private_nodes
830        );
831        println!(
832            "Total full cone private nodes ({}x{}): {}",
833            full_cone_private_hosts,
834            if full_cone_private_hosts > 0 {
835                full_cone_private_nodes / full_cone_private_hosts
836            } else {
837                0
838            },
839            full_cone_private_nodes
840        );
841        println!("Total nodes: {}", total_nodes);
842        println!("Running nodes: {}", running_nodes);
843        println!("Stopped nodes: {}", stopped_nodes);
844        println!("Added nodes: {}", added_nodes);
845        println!("Removed nodes: {}", removed_nodes);
846
847        Ok(())
848    }
849
850    pub fn cleanup_node_logs(&self, setup_cron: bool) -> Result<()> {
851        self.ansible_provisioner.cleanup_node_logs(setup_cron)?;
852        Ok(())
853    }
854
855    pub fn start_telegraf(
856        &self,
857        node_type: Option<NodeType>,
858        custom_inventory: Option<Vec<VirtualMachine>>,
859    ) -> Result<()> {
860        self.ansible_provisioner.start_telegraf(
861            &self.environment_name,
862            node_type,
863            custom_inventory,
864        )?;
865        Ok(())
866    }
867
868    pub fn stop(
869        &self,
870        interval: Duration,
871        node_type: Option<NodeType>,
872        custom_inventory: Option<Vec<VirtualMachine>>,
873        delay: Option<u64>,
874        service_names: Option<Vec<String>>,
875    ) -> Result<()> {
876        self.ansible_provisioner.stop_nodes(
877            &self.environment_name,
878            interval,
879            node_type,
880            custom_inventory,
881            delay,
882            service_names,
883        )?;
884        Ok(())
885    }
886
887    pub fn stop_telegraf(
888        &self,
889        node_type: Option<NodeType>,
890        custom_inventory: Option<Vec<VirtualMachine>>,
891    ) -> Result<()> {
892        self.ansible_provisioner.stop_telegraf(
893            &self.environment_name,
894            node_type,
895            custom_inventory,
896        )?;
897        Ok(())
898    }
899
900    pub fn upgrade(&self, options: UpgradeOptions) -> Result<()> {
901        self.ansible_provisioner.upgrade_nodes(&options)?;
902        Ok(())
903    }
904
905    pub fn upgrade_antctl(
906        &self,
907        version: Version,
908        node_type: Option<NodeType>,
909        custom_inventory: Option<Vec<VirtualMachine>>,
910    ) -> Result<()> {
911        self.ansible_provisioner.upgrade_antctl(
912            &self.environment_name,
913            &version,
914            node_type,
915            custom_inventory,
916        )?;
917        Ok(())
918    }
919
920    pub fn upgrade_node_telegraf(&self, name: &str) -> Result<()> {
921        self.ansible_provisioner.upgrade_node_telegraf(name)?;
922        Ok(())
923    }
924
925    pub fn upgrade_uploader_telegraf(&self, name: &str) -> Result<()> {
926        self.ansible_provisioner.upgrade_uploader_telegraf(name)?;
927        Ok(())
928    }
929
930    pub async fn clean(&self) -> Result<()> {
931        let environment_details =
932            get_environment_details(&self.environment_name, &self.s3_repository)
933                .await
934                .inspect_err(|err| {
935                    println!("Failed to get environment details: {err}. Continuing cleanup...");
936                })
937                .ok();
938        if let Some(environment_details) = &environment_details {
939            funding::drain_funds(&self.ansible_provisioner, environment_details).await?;
940        }
941
942        self.destroy_infra(environment_details).await?;
943
944        cleanup_environment_inventory(
945            &self.environment_name,
946            &self
947                .working_directory_path
948                .join("ansible")
949                .join("inventory"),
950            None,
951        )?;
952
953        println!("Deleted Ansible inventory for {}", self.environment_name);
954
955        if let Err(err) = self
956            .s3_repository
957            .delete_object("sn-environment-type", &self.environment_name)
958            .await
959        {
960            println!("Failed to delete environment type: {err}. Continuing cleanup...");
961        }
962        Ok(())
963    }
964
965    async fn destroy_infra(&self, environment_details: Option<EnvironmentDetails>) -> Result<()> {
966        infra::select_workspace(&self.terraform_runner, &self.environment_name)?;
967
968        let options = InfraRunOptions::generate_existing(
969            &self.environment_name,
970            &self.terraform_runner,
971            environment_details.as_ref(),
972        )
973        .await?;
974
975        let args = build_terraform_args(&options)?;
976
977        self.terraform_runner.destroy(Some(args), None)?;
978
979        infra::delete_workspace(&self.terraform_runner, &self.environment_name)?;
980
981        Ok(())
982    }
983}
984
985//
986// Shared Helpers
987//
988
989pub fn get_genesis_multiaddr(
990    ansible_runner: &AnsibleRunner,
991    ssh_client: &SshClient,
992) -> Result<(String, IpAddr)> {
993    let genesis_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Genesis, true)?;
994    let genesis_ip = genesis_inventory[0].public_ip_addr;
995
996    // It's possible for the genesis host to be altered from its original state where a node was
997    // started with the `--first` flag.
998    // First attempt: try to find node with first=true
999    let multiaddr = ssh_client
1000        .run_command(
1001            &genesis_ip,
1002            "root",
1003            "jq -r '.nodes[] | select(.peers_args.first == true) | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1004            false,
1005        )
1006        .map(|output| output.first().cloned())
1007        .unwrap_or_else(|err| {
1008            log::error!("Failed to find first node with quic-v1 protocol: {err:?}");
1009            None
1010        });
1011
1012    // Second attempt: if first attempt failed, see if any node is available.
1013    let multiaddr = match multiaddr {
1014        Some(addr) => addr,
1015        None => ssh_client
1016            .run_command(
1017                &genesis_ip,
1018                "root",
1019                "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1020                false,
1021            )?
1022            .first()
1023            .cloned()
1024            .ok_or_else(|| Error::GenesisListenAddress)?,
1025    };
1026
1027    Ok((multiaddr, genesis_ip))
1028}
1029
1030pub fn get_anvil_node_data(
1031    ansible_runner: &AnsibleRunner,
1032    ssh_client: &SshClient,
1033) -> Result<AnvilNodeData> {
1034    let evm_inventory = ansible_runner.get_inventory(AnsibleInventoryType::EvmNodes, true)?;
1035    if evm_inventory.is_empty() {
1036        return Err(Error::EvmNodeNotFound);
1037    }
1038
1039    let evm_ip = evm_inventory[0].public_ip_addr;
1040    debug!("Retrieved IP address for EVM node: {evm_ip}");
1041    let csv_file_path = "/home/ant/.local/share/autonomi/evm_testnet_data.csv";
1042
1043    const MAX_ATTEMPTS: u8 = 5;
1044    const RETRY_DELAY: Duration = Duration::from_secs(5);
1045
1046    for attempt in 1..=MAX_ATTEMPTS {
1047        match ssh_client.run_command(&evm_ip, "ant", &format!("cat {}", csv_file_path), false) {
1048            Ok(output) => {
1049                if let Some(csv_contents) = output.first() {
1050                    let parts: Vec<&str> = csv_contents.split(',').collect();
1051                    if parts.len() != 4 {
1052                        return Err(Error::EvmTestnetDataParsingError(
1053                            "Expected 4 fields in the CSV".to_string(),
1054                        ));
1055                    }
1056
1057                    let evm_testnet_data = AnvilNodeData {
1058                        rpc_url: parts[0].trim().to_string(),
1059                        payment_token_address: parts[1].trim().to_string(),
1060                        data_payments_address: parts[2].trim().to_string(),
1061                        deployer_wallet_private_key: parts[3].trim().to_string(),
1062                    };
1063                    return Ok(evm_testnet_data);
1064                }
1065            }
1066            Err(e) => {
1067                if attempt == MAX_ATTEMPTS {
1068                    return Err(e);
1069                }
1070                println!(
1071                    "Attempt {} failed to read EVM testnet data. Retrying in {} seconds...",
1072                    attempt,
1073                    RETRY_DELAY.as_secs()
1074                );
1075            }
1076        }
1077        std::thread::sleep(RETRY_DELAY);
1078    }
1079
1080    Err(Error::EvmTestnetDataNotFound)
1081}
1082
1083pub fn get_multiaddr(
1084    ansible_runner: &AnsibleRunner,
1085    ssh_client: &SshClient,
1086) -> Result<(String, IpAddr)> {
1087    let node_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Nodes, true)?;
1088    // For upscaling a bootstrap deployment, we'd need to select one of the nodes that's already
1089    // provisioned. So just try the first one.
1090    let node_ip = node_inventory
1091        .iter()
1092        .find(|vm| vm.name.ends_with("-node-1"))
1093        .ok_or_else(|| Error::NodeAddressNotFound)?
1094        .public_ip_addr;
1095
1096    debug!("Getting multiaddr from node {node_ip}");
1097
1098    let multiaddr =
1099        ssh_client
1100        .run_command(
1101            &node_ip,
1102            "root",
1103            // fetch the first multiaddr which does not contain the localhost addr.
1104            "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not)' /var/antctl/node_registry.json | head -n 1",
1105            false,
1106        )?.first()
1107        .cloned()
1108        .ok_or_else(|| Error::NodeAddressNotFound)?;
1109
1110    // The node_ip is obviously inside the multiaddr, but it's just being returned as a
1111    // separate item for convenience.
1112    Ok((multiaddr, node_ip))
1113}
1114
1115pub async fn get_and_extract_archive_from_s3(
1116    s3_repository: &S3Repository,
1117    bucket_name: &str,
1118    archive_bucket_path: &str,
1119    dest_path: &Path,
1120) -> Result<()> {
1121    // In this case, not using unwrap leads to having to provide a very trivial error variant that
1122    // doesn't seem very valuable.
1123    let archive_file_name = archive_bucket_path.split('/').last().unwrap();
1124    let archive_dest_path = dest_path.join(archive_file_name);
1125    s3_repository
1126        .download_object(bucket_name, archive_bucket_path, &archive_dest_path)
1127        .await?;
1128    extract_archive(&archive_dest_path, dest_path)?;
1129    Ok(())
1130}
1131
1132pub fn extract_archive(archive_path: &Path, dest_path: &Path) -> Result<()> {
1133    let archive_file = File::open(archive_path)?;
1134    let decoder = GzDecoder::new(archive_file);
1135    let mut archive = Archive::new(decoder);
1136    let entries = archive.entries()?;
1137    for entry_result in entries {
1138        let mut entry = entry_result?;
1139        let extract_path = dest_path.join(entry.path()?);
1140        if entry.header().entry_type() == tar::EntryType::Directory {
1141            std::fs::create_dir_all(extract_path)?;
1142            continue;
1143        }
1144        let mut file = BufWriter::new(File::create(extract_path)?);
1145        std::io::copy(&mut entry, &mut file)?;
1146    }
1147    std::fs::remove_file(archive_path)?;
1148    Ok(())
1149}
1150
1151pub fn run_external_command(
1152    binary_path: PathBuf,
1153    working_directory_path: PathBuf,
1154    args: Vec<String>,
1155    suppress_stdout: bool,
1156    suppress_stderr: bool,
1157) -> Result<Vec<String>> {
1158    let mut command = Command::new(binary_path.clone());
1159    for arg in &args {
1160        command.arg(arg);
1161    }
1162    command.stdout(Stdio::piped());
1163    command.stderr(Stdio::piped());
1164    command.current_dir(working_directory_path.clone());
1165    debug!("Running {binary_path:#?} with args {args:#?}");
1166    debug!("Working directory set to {working_directory_path:#?}");
1167
1168    let mut child = command.spawn()?;
1169    let mut output_lines = Vec::new();
1170
1171    if let Some(ref mut stdout) = child.stdout {
1172        let reader = BufReader::new(stdout);
1173        for line in reader.lines() {
1174            let line = line?;
1175            if !suppress_stdout {
1176                println!("{line}");
1177            }
1178            output_lines.push(line);
1179        }
1180    }
1181
1182    if let Some(ref mut stderr) = child.stderr {
1183        let reader = BufReader::new(stderr);
1184        for line in reader.lines() {
1185            let line = line?;
1186            if !suppress_stderr {
1187                eprintln!("{line}");
1188            }
1189            output_lines.push(line);
1190        }
1191    }
1192
1193    let output = child.wait()?;
1194    if !output.success() {
1195        // Using `unwrap` here avoids introducing another error variant, which seems excessive.
1196        let binary_path = binary_path.to_str().unwrap();
1197        return Err(Error::ExternalCommandRunFailed {
1198            binary: binary_path.to_string(),
1199            exit_status: output,
1200        });
1201    }
1202
1203    Ok(output_lines)
1204}
1205
1206pub fn is_binary_on_path(binary_name: &str) -> bool {
1207    if let Ok(path) = std::env::var("PATH") {
1208        for dir in path.split(':') {
1209            let mut full_path = PathBuf::from(dir);
1210            full_path.push(binary_name);
1211            if full_path.exists() {
1212                return true;
1213            }
1214        }
1215    }
1216    false
1217}
1218
1219pub fn get_wallet_directory() -> Result<PathBuf> {
1220    Ok(dirs_next::data_dir()
1221        .ok_or_else(|| Error::CouldNotRetrieveDataDirectory)?
1222        .join("safe")
1223        .join("client")
1224        .join("wallet"))
1225}
1226
1227pub async fn notify_slack(inventory: DeploymentInventory) -> Result<()> {
1228    let webhook_url =
1229        std::env::var("SLACK_WEBHOOK_URL").map_err(|_| Error::SlackWebhookUrlNotSupplied)?;
1230
1231    let mut message = String::new();
1232    message.push_str("*Testnet Details*\n");
1233    message.push_str(&format!("Name: {}\n", inventory.name));
1234    message.push_str(&format!("Node count: {}\n", inventory.peers().len()));
1235    message.push_str(&format!("Faucet address: {:?}\n", inventory.faucet_address));
1236    match inventory.binary_option {
1237        BinaryOption::BuildFromSource {
1238            ref repo_owner,
1239            ref branch,
1240            ..
1241        } => {
1242            message.push_str("*Branch Details*\n");
1243            message.push_str(&format!("Repo owner: {}\n", repo_owner));
1244            message.push_str(&format!("Branch: {}\n", branch));
1245        }
1246        BinaryOption::Versioned {
1247            ant_version: ref safe_version,
1248            antnode_version: ref safenode_version,
1249            antctl_version: ref safenode_manager_version,
1250            ..
1251        } => {
1252            message.push_str("*Version Details*\n");
1253            message.push_str(&format!(
1254                "ant version: {}\n",
1255                safe_version
1256                    .as_ref()
1257                    .map_or("None".to_string(), |v| v.to_string())
1258            ));
1259            message.push_str(&format!(
1260                "safenode version: {}\n",
1261                safenode_version
1262                    .as_ref()
1263                    .map_or("None".to_string(), |v| v.to_string())
1264            ));
1265            message.push_str(&format!(
1266                "antctl version: {}\n",
1267                safenode_manager_version
1268                    .as_ref()
1269                    .map_or("None".to_string(), |v| v.to_string())
1270            ));
1271        }
1272    }
1273
1274    message.push_str("*Sample Peers*\n");
1275    message.push_str("```\n");
1276    for peer in inventory.peers().iter().take(20) {
1277        message.push_str(&format!("{peer}\n"));
1278    }
1279    message.push_str("```\n");
1280    message.push_str("*Available Files*\n");
1281    message.push_str("```\n");
1282    for (addr, file_name) in inventory.uploaded_files.iter() {
1283        message.push_str(&format!("{}: {}\n", addr, file_name))
1284    }
1285    message.push_str("```\n");
1286
1287    let payload = json!({
1288        "text": message,
1289    });
1290    reqwest::Client::new()
1291        .post(webhook_url)
1292        .json(&payload)
1293        .send()
1294        .await?;
1295    println!("{message}");
1296    println!("Posted notification to Slack");
1297    Ok(())
1298}
1299
1300fn print_duration(duration: Duration) {
1301    let total_seconds = duration.as_secs();
1302    let minutes = total_seconds / 60;
1303    let seconds = total_seconds % 60;
1304    debug!("Time taken: {} minutes and {} seconds", minutes, seconds);
1305}
1306
1307pub fn get_progress_bar(length: u64) -> Result<ProgressBar> {
1308    let progress_bar = ProgressBar::new(length);
1309    progress_bar.set_style(
1310        ProgressStyle::default_bar()
1311            .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len}")?
1312            .progress_chars("#>-"),
1313    );
1314    progress_bar.enable_steady_tick(Duration::from_millis(100));
1315    Ok(progress_bar)
1316}
1317
1318pub async fn get_environment_details(
1319    environment_name: &str,
1320    s3_repository: &S3Repository,
1321) -> Result<EnvironmentDetails> {
1322    let temp_file = tempfile::NamedTempFile::new()?;
1323
1324    let max_retries = 3;
1325    let mut retries = 0;
1326    let env_details = loop {
1327        debug!("Downloading the environment details file for {environment_name} from S3");
1328        match s3_repository
1329            .download_object("sn-environment-type", environment_name, temp_file.path())
1330            .await
1331        {
1332            Ok(_) => {
1333                debug!("Downloaded the environment details file for {environment_name} from S3");
1334                let content = match std::fs::read_to_string(temp_file.path()) {
1335                    Ok(content) => content,
1336                    Err(err) => {
1337                        log::error!("Could not read the environment details file: {err:?}");
1338                        if retries < max_retries {
1339                            debug!("Retrying to read the environment details file");
1340                            retries += 1;
1341                            continue;
1342                        } else {
1343                            return Err(Error::EnvironmentDetailsNotFound(
1344                                environment_name.to_string(),
1345                            ));
1346                        }
1347                    }
1348                };
1349                trace!("Content of the environment details file: {}", content);
1350
1351                match serde_json::from_str(&content) {
1352                    Ok(environment_details) => break environment_details,
1353                    Err(err) => {
1354                        log::error!("Could not parse the environment details file: {err:?}");
1355                        if retries < max_retries {
1356                            debug!("Retrying to parse the environment details file");
1357                            retries += 1;
1358                            continue;
1359                        } else {
1360                            return Err(Error::EnvironmentDetailsNotFound(
1361                                environment_name.to_string(),
1362                            ));
1363                        }
1364                    }
1365                }
1366            }
1367            Err(err) => {
1368                log::error!(
1369                    "Could not download the environment details file for {environment_name} from S3: {err:?}"
1370                );
1371                if retries < max_retries {
1372                    retries += 1;
1373                    continue;
1374                } else {
1375                    return Err(Error::EnvironmentDetailsNotFound(
1376                        environment_name.to_string(),
1377                    ));
1378                }
1379            }
1380        }
1381    };
1382
1383    debug!("Fetched environment details: {env_details:?}");
1384
1385    Ok(env_details)
1386}
1387
1388pub async fn write_environment_details(
1389    s3_repository: &S3Repository,
1390    environment_name: &str,
1391    environment_details: &EnvironmentDetails,
1392) -> Result<()> {
1393    let temp_dir = tempfile::tempdir()?;
1394    let path = temp_dir.path().to_path_buf().join(environment_name);
1395    let mut file = File::create(&path)?;
1396    let json = serde_json::to_string(environment_details)?;
1397    file.write_all(json.as_bytes())?;
1398    s3_repository
1399        .upload_file("sn-environment-type", &path, true)
1400        .await?;
1401    Ok(())
1402}
1403
1404pub fn calculate_size_per_attached_volume(node_count: u16) -> u16 {
1405    if node_count == 0 {
1406        return 0;
1407    }
1408    let total_volume_required = node_count * STORAGE_REQUIRED_PER_NODE;
1409
1410    // 7 attached volumes per VM
1411    (total_volume_required as f64 / 7.0).ceil() as u16
1412}
1413
1414pub fn get_bootstrap_cache_url(ip_addr: &IpAddr) -> String {
1415    format!("http://{ip_addr}/bootstrap_cache.json")
1416}