1pub mod ansible;
8pub mod bootstrap;
9pub mod clients;
10pub mod deploy;
11pub mod digital_ocean;
12pub mod error;
13pub mod funding;
14pub mod infra;
15pub mod inventory;
16pub mod logs;
17pub mod reserved_ip;
18pub mod rpc_client;
19pub mod s3;
20pub mod safe;
21pub mod setup;
22pub mod ssh;
23pub mod terraform;
24pub mod upscale;
25
26const STORAGE_REQUIRED_PER_NODE: u16 = 7;
27
28use crate::{
29 ansible::{
30 extra_vars::ExtraVarsDocBuilder,
31 inventory::{cleanup_environment_inventory, AnsibleInventoryType},
32 provisioning::AnsibleProvisioner,
33 AnsibleRunner,
34 },
35 error::{Error, Result},
36 inventory::{DeploymentInventory, VirtualMachine},
37 rpc_client::RpcClient,
38 s3::S3Repository,
39 ssh::SshClient,
40 terraform::TerraformRunner,
41};
42use ant_service_management::ServiceStatus;
43use flate2::read::GzDecoder;
44use indicatif::{ProgressBar, ProgressStyle};
45use infra::{build_terraform_args, InfraRunOptions};
46use log::{debug, trace};
47use semver::Version;
48use serde::{Deserialize, Serialize};
49use serde_json::json;
50use std::{
51 fs::File,
52 io::{BufRead, BufReader, BufWriter, Write},
53 net::IpAddr,
54 path::{Path, PathBuf},
55 process::{Command, Stdio},
56 str::FromStr,
57 time::Duration,
58};
59use tar::Archive;
60
61const ANSIBLE_DEFAULT_FORKS: usize = 50;
62
63#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
64pub enum DeploymentType {
65 Bootstrap,
67 Client,
69 #[default]
71 New,
72}
73
74#[derive(Debug, Clone, Default, Serialize, Deserialize)]
75pub struct AnvilNodeData {
76 pub data_payments_address: String,
77 pub deployer_wallet_private_key: String,
78 pub payment_token_address: String,
79 pub rpc_url: String,
80}
81
82impl std::fmt::Display for DeploymentType {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self {
85 DeploymentType::Bootstrap => write!(f, "bootstrap"),
86 DeploymentType::Client => write!(f, "clients"),
87 DeploymentType::New => write!(f, "new"),
88 }
89 }
90}
91
92impl std::str::FromStr for DeploymentType {
93 type Err = String;
94
95 fn from_str(s: &str) -> Result<Self, Self::Err> {
96 match s.to_lowercase().as_str() {
97 "bootstrap" => Ok(DeploymentType::Bootstrap),
98 "clients" => Ok(DeploymentType::Client),
99 "new" => Ok(DeploymentType::New),
100 _ => Err(format!("Invalid deployment type: {s}")),
101 }
102 }
103}
104
105#[derive(Debug, Clone)]
106pub enum NodeType {
107 FullConePrivateNode,
108 Generic,
109 Genesis,
110 PeerCache,
111 SymmetricPrivateNode,
112}
113
114impl std::fmt::Display for NodeType {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 match self {
117 NodeType::FullConePrivateNode => write!(f, "full-cone-private"),
118 NodeType::Generic => write!(f, "generic"),
119 NodeType::Genesis => write!(f, "genesis"),
120 NodeType::PeerCache => write!(f, "peer-cache"),
121 NodeType::SymmetricPrivateNode => write!(f, "symmetric-private"),
122 }
123 }
124}
125
126impl std::str::FromStr for NodeType {
127 type Err = String;
128
129 fn from_str(s: &str) -> Result<Self, Self::Err> {
130 match s.to_lowercase().as_str() {
131 "full-cone-private" => Ok(NodeType::FullConePrivateNode),
132 "generic" => Ok(NodeType::Generic),
133 "genesis" => Ok(NodeType::Genesis),
134 "peer-cache" => Ok(NodeType::PeerCache),
135 "symmetric-private" => Ok(NodeType::SymmetricPrivateNode),
136 _ => Err(format!("Invalid node type: {s}")),
137 }
138 }
139}
140
141impl NodeType {
142 pub fn telegraf_role(&self) -> &'static str {
143 match self {
144 NodeType::FullConePrivateNode => "NAT_FULL_CONE_NODE",
145 NodeType::Generic => "GENERIC_NODE",
146 NodeType::Genesis => "GENESIS_NODE",
147 NodeType::PeerCache => "PEER_CACHE_NODE",
148 NodeType::SymmetricPrivateNode => "NAT_RANDOMIZED_NODE",
149 }
150 }
151
152 pub fn to_ansible_inventory_type(&self) -> AnsibleInventoryType {
153 match self {
154 NodeType::FullConePrivateNode => AnsibleInventoryType::FullConePrivateNodes,
155 NodeType::Generic => AnsibleInventoryType::Nodes,
156 NodeType::Genesis => AnsibleInventoryType::Genesis,
157 NodeType::PeerCache => AnsibleInventoryType::PeerCacheNodes,
158 NodeType::SymmetricPrivateNode => AnsibleInventoryType::SymmetricPrivateNodes,
159 }
160 }
161}
162
163#[derive(Clone, Debug, Default, Eq, Serialize, Deserialize, PartialEq)]
164pub enum EvmNetwork {
165 #[default]
166 Anvil,
167 ArbitrumOne,
168 ArbitrumSepoliaTest,
169 Custom,
170}
171
172impl std::fmt::Display for EvmNetwork {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 match self {
175 EvmNetwork::Anvil => write!(f, "evm-custom"),
176 EvmNetwork::ArbitrumOne => write!(f, "evm-arbitrum-one"),
177 EvmNetwork::ArbitrumSepoliaTest => write!(f, "evm-arbitrum-sepolia-test"),
178 EvmNetwork::Custom => write!(f, "evm-custom"),
179 }
180 }
181}
182
183impl std::str::FromStr for EvmNetwork {
184 type Err = String;
185
186 fn from_str(s: &str) -> Result<Self, Self::Err> {
187 match s.to_lowercase().as_str() {
188 "anvil" => Ok(EvmNetwork::Anvil),
189 "arbitrum-one" => Ok(EvmNetwork::ArbitrumOne),
190 "arbitrum-sepolia-test" => Ok(EvmNetwork::ArbitrumSepoliaTest),
191 "custom" => Ok(EvmNetwork::Custom),
192 _ => Err(format!("Invalid EVM network type: {s}")),
193 }
194 }
195}
196
197#[derive(Clone, Debug, Default, Serialize, Deserialize)]
198pub struct EvmDetails {
199 pub network: EvmNetwork,
200 pub data_payments_address: Option<String>,
201 pub payment_token_address: Option<String>,
202 pub rpc_url: Option<String>,
203}
204
205#[derive(Clone, Debug, Default, Serialize, Deserialize)]
206pub struct EnvironmentDetails {
207 pub deployment_type: DeploymentType,
208 pub environment_type: EnvironmentType,
209 pub evm_details: EvmDetails,
210 pub funding_wallet_address: Option<String>,
211 pub network_id: Option<u8>,
212 pub region: String,
213 pub rewards_address: Option<String>,
214}
215
216#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
217pub enum EnvironmentType {
218 #[default]
219 Development,
220 Production,
221 Staging,
222}
223
224impl EnvironmentType {
225 pub fn get_tfvars_filenames(&self, name: &str, region: &str) -> Vec<String> {
226 match self {
227 EnvironmentType::Development => vec![
228 "dev.tfvars".to_string(),
229 format!("dev-images-{region}.tfvars", region = region),
230 ],
231 EnvironmentType::Staging => vec![
232 "staging.tfvars".to_string(),
233 format!("staging-images-{region}.tfvars", region = region),
234 ],
235 EnvironmentType::Production => {
236 vec![
237 format!("{name}.tfvars", name = name),
238 format!("production-images-{region}.tfvars", region = region),
239 ]
240 }
241 }
242 }
243
244 pub fn get_default_peer_cache_node_count(&self) -> u16 {
245 match self {
246 EnvironmentType::Development => 5,
247 EnvironmentType::Production => 5,
248 EnvironmentType::Staging => 5,
249 }
250 }
251
252 pub fn get_default_node_count(&self) -> u16 {
253 match self {
254 EnvironmentType::Development => 25,
255 EnvironmentType::Production => 25,
256 EnvironmentType::Staging => 25,
257 }
258 }
259
260 pub fn get_default_symmetric_private_node_count(&self) -> u16 {
261 self.get_default_node_count()
262 }
263
264 pub fn get_default_full_cone_private_node_count(&self) -> u16 {
265 self.get_default_node_count()
266 }
267}
268
269impl std::fmt::Display for EnvironmentType {
270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 match self {
272 EnvironmentType::Development => write!(f, "development"),
273 EnvironmentType::Production => write!(f, "production"),
274 EnvironmentType::Staging => write!(f, "staging"),
275 }
276 }
277}
278
279impl FromStr for EnvironmentType {
280 type Err = Error;
281
282 fn from_str(s: &str) -> Result<Self, Self::Err> {
283 match s.to_lowercase().as_str() {
284 "development" => Ok(EnvironmentType::Development),
285 "production" => Ok(EnvironmentType::Production),
286 "staging" => Ok(EnvironmentType::Staging),
287 _ => Err(Error::EnvironmentNameFromStringError(s.to_string())),
288 }
289 }
290}
291
292#[derive(Clone, Debug, Serialize, Deserialize)]
306pub enum BinaryOption {
307 BuildFromSource {
309 antnode_features: Option<String>,
311 branch: String,
312 repo_owner: String,
313 skip_binary_build: bool,
316 },
317 Versioned {
319 ant_version: Option<Version>,
320 antctl_version: Option<Version>,
321 antnode_version: Option<Version>,
322 },
323}
324
325impl BinaryOption {
326 pub fn should_provision_build_machine(&self) -> bool {
327 match self {
328 BinaryOption::BuildFromSource {
329 skip_binary_build, ..
330 } => !skip_binary_build,
331 BinaryOption::Versioned { .. } => false,
332 }
333 }
334
335 pub fn print(&self) {
336 match self {
337 BinaryOption::BuildFromSource {
338 antnode_features,
339 branch,
340 repo_owner,
341 skip_binary_build: _,
342 } => {
343 println!("Source configuration:");
344 println!(" Repository owner: {repo_owner}");
345 println!(" Branch: {branch}");
346 if let Some(features) = antnode_features {
347 println!(" Antnode features: {features}");
348 }
349 }
350 BinaryOption::Versioned {
351 ant_version,
352 antctl_version,
353 antnode_version,
354 } => {
355 println!("Versioned binaries configuration:");
356 if let Some(version) = ant_version {
357 println!(" ant version: {version}");
358 }
359 if let Some(version) = antctl_version {
360 println!(" antctl version: {version}");
361 }
362 if let Some(version) = antnode_version {
363 println!(" antnode version: {version}");
364 }
365 }
366 }
367 }
368}
369
370#[derive(Debug, Clone, Copy)]
371pub enum CloudProvider {
372 Aws,
373 DigitalOcean,
374}
375
376impl std::fmt::Display for CloudProvider {
377 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
378 match self {
379 CloudProvider::Aws => write!(f, "aws"),
380 CloudProvider::DigitalOcean => write!(f, "digital-ocean"),
381 }
382 }
383}
384
385impl CloudProvider {
386 pub fn get_ssh_user(&self) -> String {
387 match self {
388 CloudProvider::Aws => "ubuntu".to_string(),
389 CloudProvider::DigitalOcean => "root".to_string(),
390 }
391 }
392}
393
394#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
395pub enum LogFormat {
396 Default,
397 Json,
398}
399
400impl LogFormat {
401 pub fn parse_from_str(val: &str) -> Result<Self> {
402 match val {
403 "default" => Ok(LogFormat::Default),
404 "json" => Ok(LogFormat::Json),
405 _ => Err(Error::LoggingConfiguration(
406 "The only valid values for this argument are \"default\" or \"json\"".to_string(),
407 )),
408 }
409 }
410
411 pub fn as_str(&self) -> &'static str {
412 match self {
413 LogFormat::Default => "default",
414 LogFormat::Json => "json",
415 }
416 }
417}
418
419#[derive(Clone)]
420pub struct UpgradeOptions {
421 pub ansible_verbose: bool,
422 pub branch: Option<String>,
423 pub custom_inventory: Option<Vec<VirtualMachine>>,
424 pub env_variables: Option<Vec<(String, String)>>,
425 pub force: bool,
426 pub forks: usize,
427 pub interval: Duration,
428 pub name: String,
429 pub node_type: Option<NodeType>,
430 pub pre_upgrade_delay: Option<u64>,
431 pub provider: CloudProvider,
432 pub repo_owner: Option<String>,
433 pub version: Option<String>,
434}
435
436impl UpgradeOptions {
437 pub fn get_ansible_vars(&self) -> String {
438 let mut extra_vars = ExtraVarsDocBuilder::default();
439 extra_vars.add_variable("interval", &self.interval.as_millis().to_string());
440 if let Some(env_variables) = &self.env_variables {
441 extra_vars.add_env_variable_list("env_variables", env_variables.clone());
442 }
443 if self.force {
444 extra_vars.add_variable("force", &self.force.to_string());
445 }
446 if let Some(version) = &self.version {
447 extra_vars.add_variable("antnode_version", version);
448 }
449 if let Some(pre_upgrade_delay) = &self.pre_upgrade_delay {
450 extra_vars.add_variable("pre_upgrade_delay", &pre_upgrade_delay.to_string());
451 }
452
453 if let (Some(repo_owner), Some(branch)) = (&self.repo_owner, &self.branch) {
454 let binary_option = BinaryOption::BuildFromSource {
455 antnode_features: None,
456 branch: branch.clone(),
457 repo_owner: repo_owner.clone(),
458 skip_binary_build: true,
459 };
460 extra_vars.add_node_url_or_version(&self.name, &binary_option);
461 }
462
463 extra_vars.build()
464 }
465}
466
467#[derive(Default)]
468pub struct TestnetDeployBuilder {
469 ansible_forks: Option<usize>,
470 ansible_verbose_mode: bool,
471 deployment_type: EnvironmentType,
472 environment_name: String,
473 provider: Option<CloudProvider>,
474 region: Option<String>,
475 ssh_secret_key_path: Option<PathBuf>,
476 state_bucket_name: Option<String>,
477 terraform_binary_path: Option<PathBuf>,
478 vault_password_path: Option<PathBuf>,
479 working_directory_path: Option<PathBuf>,
480}
481
482impl TestnetDeployBuilder {
483 pub fn new() -> Self {
484 Default::default()
485 }
486
487 pub fn ansible_verbose_mode(&mut self, ansible_verbose_mode: bool) -> &mut Self {
488 self.ansible_verbose_mode = ansible_verbose_mode;
489 self
490 }
491
492 pub fn ansible_forks(&mut self, ansible_forks: usize) -> &mut Self {
493 self.ansible_forks = Some(ansible_forks);
494 self
495 }
496
497 pub fn deployment_type(&mut self, deployment_type: EnvironmentType) -> &mut Self {
498 self.deployment_type = deployment_type;
499 self
500 }
501
502 pub fn environment_name(&mut self, name: &str) -> &mut Self {
503 self.environment_name = name.to_string();
504 self
505 }
506
507 pub fn provider(&mut self, provider: CloudProvider) -> &mut Self {
508 self.provider = Some(provider);
509 self
510 }
511
512 pub fn state_bucket_name(&mut self, state_bucket_name: String) -> &mut Self {
513 self.state_bucket_name = Some(state_bucket_name);
514 self
515 }
516
517 pub fn terraform_binary_path(&mut self, terraform_binary_path: PathBuf) -> &mut Self {
518 self.terraform_binary_path = Some(terraform_binary_path);
519 self
520 }
521
522 pub fn working_directory(&mut self, working_directory_path: PathBuf) -> &mut Self {
523 self.working_directory_path = Some(working_directory_path);
524 self
525 }
526
527 pub fn ssh_secret_key_path(&mut self, ssh_secret_key_path: PathBuf) -> &mut Self {
528 self.ssh_secret_key_path = Some(ssh_secret_key_path);
529 self
530 }
531
532 pub fn vault_password_path(&mut self, vault_password_path: PathBuf) -> &mut Self {
533 self.vault_password_path = Some(vault_password_path);
534 self
535 }
536
537 pub fn region(&mut self, region: String) -> &mut Self {
538 self.region = Some(region);
539 self
540 }
541
542 pub fn build(&self) -> Result<TestnetDeployer> {
543 let provider = self.provider.unwrap_or(CloudProvider::DigitalOcean);
544 match provider {
545 CloudProvider::DigitalOcean => {
546 let digital_ocean_pat = std::env::var("DO_PAT").map_err(|_| {
547 Error::CloudProviderCredentialsNotSupplied("DO_PAT".to_string())
548 })?;
549 std::env::set_var("DIGITALOCEAN_TOKEN", digital_ocean_pat.clone());
553 std::env::set_var("DO_API_TOKEN", digital_ocean_pat);
554 }
555 _ => {
556 return Err(Error::CloudProviderNotSupported(provider.to_string()));
557 }
558 }
559
560 let state_bucket_name = match self.state_bucket_name {
561 Some(ref bucket_name) => bucket_name.clone(),
562 None => std::env::var("TERRAFORM_STATE_BUCKET_NAME")?,
563 };
564
565 let default_terraform_bin_path = PathBuf::from("terraform");
566 let terraform_binary_path = self
567 .terraform_binary_path
568 .as_ref()
569 .unwrap_or(&default_terraform_bin_path);
570
571 let working_directory_path = match self.working_directory_path {
572 Some(ref work_dir_path) => work_dir_path.clone(),
573 None => std::env::current_dir()?.join("resources"),
574 };
575
576 let ssh_secret_key_path = match self.ssh_secret_key_path {
577 Some(ref ssh_sk_path) => ssh_sk_path.clone(),
578 None => PathBuf::from(std::env::var("SSH_KEY_PATH")?),
579 };
580
581 let vault_password_path = match self.vault_password_path {
582 Some(ref vault_pw_path) => vault_pw_path.clone(),
583 None => PathBuf::from(std::env::var("ANSIBLE_VAULT_PASSWORD_PATH")?),
584 };
585
586 let region = match self.region {
587 Some(ref region) => region.clone(),
588 None => "lon1".to_string(),
589 };
590
591 let terraform_runner = TerraformRunner::new(
592 terraform_binary_path.to_path_buf(),
593 working_directory_path
594 .join("terraform")
595 .join("testnet")
596 .join(provider.to_string()),
597 provider,
598 &state_bucket_name,
599 )?;
600 let ansible_runner = AnsibleRunner::new(
601 self.ansible_forks.unwrap_or(ANSIBLE_DEFAULT_FORKS),
602 self.ansible_verbose_mode,
603 &self.environment_name,
604 provider,
605 ssh_secret_key_path.clone(),
606 vault_password_path,
607 working_directory_path.join("ansible"),
608 )?;
609 let ssh_client = SshClient::new(ssh_secret_key_path);
610 let ansible_provisioner =
611 AnsibleProvisioner::new(ansible_runner, provider, ssh_client.clone());
612 let rpc_client = RpcClient::new(
613 PathBuf::from("/usr/local/bin/safenode_rpc_client"),
614 working_directory_path.clone(),
615 );
616
617 let safe_path = working_directory_path.join("safe");
620 if safe_path.exists() {
621 std::fs::remove_file(safe_path)?;
622 }
623
624 let testnet = TestnetDeployer::new(
625 ansible_provisioner,
626 provider,
627 self.deployment_type.clone(),
628 &self.environment_name,
629 rpc_client,
630 S3Repository {},
631 ssh_client,
632 terraform_runner,
633 working_directory_path,
634 region,
635 )?;
636
637 Ok(testnet)
638 }
639}
640
641#[derive(Clone)]
642pub struct TestnetDeployer {
643 pub ansible_provisioner: AnsibleProvisioner,
644 pub cloud_provider: CloudProvider,
645 pub deployment_type: EnvironmentType,
646 pub environment_name: String,
647 pub inventory_file_path: PathBuf,
648 pub region: String,
649 pub rpc_client: RpcClient,
650 pub s3_repository: S3Repository,
651 pub ssh_client: SshClient,
652 pub terraform_runner: TerraformRunner,
653 pub working_directory_path: PathBuf,
654}
655
656impl TestnetDeployer {
657 #[allow(clippy::too_many_arguments)]
658 pub fn new(
659 ansible_provisioner: AnsibleProvisioner,
660 cloud_provider: CloudProvider,
661 deployment_type: EnvironmentType,
662 environment_name: &str,
663 rpc_client: RpcClient,
664 s3_repository: S3Repository,
665 ssh_client: SshClient,
666 terraform_runner: TerraformRunner,
667 working_directory_path: PathBuf,
668 region: String,
669 ) -> Result<TestnetDeployer> {
670 if environment_name.is_empty() {
671 return Err(Error::EnvironmentNameRequired);
672 }
673 let inventory_file_path = working_directory_path
674 .join("ansible")
675 .join("inventory")
676 .join("dev_inventory_digital_ocean.yml");
677 Ok(TestnetDeployer {
678 ansible_provisioner,
679 cloud_provider,
680 deployment_type,
681 environment_name: environment_name.to_string(),
682 inventory_file_path,
683 region,
684 rpc_client,
685 ssh_client,
686 s3_repository,
687 terraform_runner,
688 working_directory_path,
689 })
690 }
691
692 pub async fn init(&self) -> Result<()> {
693 if self
694 .s3_repository
695 .folder_exists(
696 "sn-testnet",
697 &format!("testnet-logs/{}", self.environment_name),
698 )
699 .await?
700 {
701 return Err(Error::LogsForPreviousTestnetExist(
702 self.environment_name.clone(),
703 ));
704 }
705
706 self.terraform_runner.init()?;
707 let workspaces = self.terraform_runner.workspace_list()?;
708 if !workspaces.contains(&self.environment_name) {
709 self.terraform_runner
710 .workspace_new(&self.environment_name)?;
711 } else {
712 println!("Workspace {} already exists", self.environment_name);
713 }
714
715 let rpc_client_path = self.working_directory_path.join("safenode_rpc_client");
716 if !rpc_client_path.is_file() {
717 println!("Downloading the rpc client for safenode...");
718 let archive_name = "safenode_rpc_client-latest-x86_64-unknown-linux-musl.tar.gz";
719 get_and_extract_archive_from_s3(
720 &self.s3_repository,
721 "sn-node-rpc-client",
722 archive_name,
723 &self.working_directory_path,
724 )
725 .await?;
726 #[cfg(unix)]
727 {
728 use std::os::unix::fs::PermissionsExt;
729 let mut permissions = std::fs::metadata(&rpc_client_path)?.permissions();
730 permissions.set_mode(0o755); std::fs::set_permissions(&rpc_client_path, permissions)?;
732 }
733 }
734
735 Ok(())
736 }
737
738 pub fn plan(&self, options: &InfraRunOptions) -> Result<()> {
739 println!("Selecting {} workspace...", options.name);
740 self.terraform_runner.workspace_select(&options.name)?;
741
742 let args = build_terraform_args(options)?;
743
744 self.terraform_runner
745 .plan(Some(args), options.tfvars_filenames.clone())?;
746 Ok(())
747 }
748
749 pub fn start(
750 &self,
751 interval: Duration,
752 node_type: Option<NodeType>,
753 custom_inventory: Option<Vec<VirtualMachine>>,
754 ) -> Result<()> {
755 self.ansible_provisioner.start_nodes(
756 &self.environment_name,
757 interval,
758 node_type,
759 custom_inventory,
760 )?;
761 Ok(())
762 }
763
764 pub fn status(&self) -> Result<()> {
770 self.ansible_provisioner.status()?;
771
772 let peer_cache_node_registries = self
773 .ansible_provisioner
774 .get_node_registries(&AnsibleInventoryType::PeerCacheNodes)?;
775 let generic_node_registries = self
776 .ansible_provisioner
777 .get_node_registries(&AnsibleInventoryType::Nodes)?;
778 let symmetric_private_node_registries = self
779 .ansible_provisioner
780 .get_node_registries(&AnsibleInventoryType::SymmetricPrivateNodes)?;
781 let full_cone_private_node_registries = self
782 .ansible_provisioner
783 .get_node_registries(&AnsibleInventoryType::FullConePrivateNodes)?;
784 let genesis_node_registry = self
785 .ansible_provisioner
786 .get_node_registries(&AnsibleInventoryType::Genesis)?
787 .clone();
788
789 peer_cache_node_registries.print();
790 generic_node_registries.print();
791 symmetric_private_node_registries.print();
792 full_cone_private_node_registries.print();
793 genesis_node_registry.print();
794
795 let all_registries = [
796 &peer_cache_node_registries,
797 &generic_node_registries,
798 &symmetric_private_node_registries,
799 &full_cone_private_node_registries,
800 &genesis_node_registry,
801 ];
802
803 let mut total_nodes = 0;
804 let mut running_nodes = 0;
805 let mut stopped_nodes = 0;
806 let mut added_nodes = 0;
807 let mut removed_nodes = 0;
808
809 for (_, registry) in all_registries
810 .iter()
811 .flat_map(|r| r.retrieved_registries.iter())
812 {
813 for node in registry.nodes.iter() {
814 total_nodes += 1;
815 match node.status {
816 ServiceStatus::Running => running_nodes += 1,
817 ServiceStatus::Stopped => stopped_nodes += 1,
818 ServiceStatus::Added => added_nodes += 1,
819 ServiceStatus::Removed => removed_nodes += 1,
820 }
821 }
822 }
823
824 let peer_cache_hosts = peer_cache_node_registries.retrieved_registries.len();
825 let generic_hosts = generic_node_registries.retrieved_registries.len();
826 let symmetric_private_hosts = symmetric_private_node_registries.retrieved_registries.len();
827 let full_cone_private_hosts = full_cone_private_node_registries.retrieved_registries.len();
828
829 let peer_cache_nodes = peer_cache_node_registries
830 .retrieved_registries
831 .iter()
832 .flat_map(|(_, n)| n.nodes.iter())
833 .count();
834 let generic_nodes = generic_node_registries
835 .retrieved_registries
836 .iter()
837 .flat_map(|(_, n)| n.nodes.iter())
838 .count();
839 let symmetric_private_nodes = symmetric_private_node_registries
840 .retrieved_registries
841 .iter()
842 .flat_map(|(_, n)| n.nodes.iter())
843 .count();
844 let full_cone_private_nodes = full_cone_private_node_registries
845 .retrieved_registries
846 .iter()
847 .flat_map(|(_, n)| n.nodes.iter())
848 .count();
849
850 println!("-------");
851 println!("Summary");
852 println!("-------");
853 println!(
854 "Total peer cache nodes ({}x{}): {}",
855 peer_cache_hosts,
856 if peer_cache_hosts > 0 {
857 peer_cache_nodes / peer_cache_hosts
858 } else {
859 0
860 },
861 peer_cache_nodes
862 );
863 println!(
864 "Total generic nodes ({}x{}): {}",
865 generic_hosts,
866 if generic_hosts > 0 {
867 generic_nodes / generic_hosts
868 } else {
869 0
870 },
871 generic_nodes
872 );
873 println!(
874 "Total symmetric private nodes ({}x{}): {}",
875 symmetric_private_hosts,
876 if symmetric_private_hosts > 0 {
877 symmetric_private_nodes / symmetric_private_hosts
878 } else {
879 0
880 },
881 symmetric_private_nodes
882 );
883 println!(
884 "Total full cone private nodes ({}x{}): {}",
885 full_cone_private_hosts,
886 if full_cone_private_hosts > 0 {
887 full_cone_private_nodes / full_cone_private_hosts
888 } else {
889 0
890 },
891 full_cone_private_nodes
892 );
893 println!("Total nodes: {total_nodes}");
894 println!("Running nodes: {running_nodes}");
895 println!("Stopped nodes: {stopped_nodes}");
896 println!("Added nodes: {added_nodes}");
897 println!("Removed nodes: {removed_nodes}");
898
899 Ok(())
900 }
901
902 pub fn cleanup_node_logs(&self, setup_cron: bool) -> Result<()> {
903 self.ansible_provisioner.cleanup_node_logs(setup_cron)?;
904 Ok(())
905 }
906
907 pub fn start_telegraf(
908 &self,
909 node_type: Option<NodeType>,
910 custom_inventory: Option<Vec<VirtualMachine>>,
911 ) -> Result<()> {
912 self.ansible_provisioner.start_telegraf(
913 &self.environment_name,
914 node_type,
915 custom_inventory,
916 )?;
917 Ok(())
918 }
919
920 pub fn stop(
921 &self,
922 interval: Duration,
923 node_type: Option<NodeType>,
924 custom_inventory: Option<Vec<VirtualMachine>>,
925 delay: Option<u64>,
926 service_names: Option<Vec<String>>,
927 ) -> Result<()> {
928 self.ansible_provisioner.stop_nodes(
929 &self.environment_name,
930 interval,
931 node_type,
932 custom_inventory,
933 delay,
934 service_names,
935 )?;
936 Ok(())
937 }
938
939 pub fn stop_telegraf(
940 &self,
941 node_type: Option<NodeType>,
942 custom_inventory: Option<Vec<VirtualMachine>>,
943 ) -> Result<()> {
944 self.ansible_provisioner.stop_telegraf(
945 &self.environment_name,
946 node_type,
947 custom_inventory,
948 )?;
949 Ok(())
950 }
951
952 pub fn upgrade(&self, options: UpgradeOptions) -> Result<()> {
953 self.ansible_provisioner.upgrade_nodes(&options)?;
954 Ok(())
955 }
956
957 pub fn upgrade_antctl(
958 &self,
959 version: Version,
960 node_type: Option<NodeType>,
961 custom_inventory: Option<Vec<VirtualMachine>>,
962 ) -> Result<()> {
963 self.ansible_provisioner.upgrade_antctl(
964 &self.environment_name,
965 &version,
966 node_type,
967 custom_inventory,
968 )?;
969 Ok(())
970 }
971
972 pub fn upgrade_geoip_telegraf(&self, name: &str) -> Result<()> {
973 self.ansible_provisioner.upgrade_geoip_telegraf(name)?;
974 Ok(())
975 }
976
977 pub fn upgrade_node_telegraf(&self, name: &str) -> Result<()> {
978 self.ansible_provisioner.upgrade_node_telegraf(name)?;
979 Ok(())
980 }
981
982 pub fn upgrade_client_telegraf(&self, name: &str) -> Result<()> {
983 self.ansible_provisioner.upgrade_client_telegraf(name)?;
984 Ok(())
985 }
986
987 pub async fn clean(&self) -> Result<()> {
988 let environment_details =
989 get_environment_details(&self.environment_name, &self.s3_repository)
990 .await
991 .inspect_err(|err| {
992 println!("Failed to get environment details: {err}. Continuing cleanup...");
993 })
994 .ok();
995 if let Some(environment_details) = &environment_details {
996 funding::drain_funds(&self.ansible_provisioner, environment_details).await?;
997 }
998
999 self.destroy_infra(environment_details).await?;
1000
1001 cleanup_environment_inventory(
1002 &self.environment_name,
1003 &self
1004 .working_directory_path
1005 .join("ansible")
1006 .join("inventory"),
1007 None,
1008 )?;
1009
1010 println!("Deleted Ansible inventory for {}", self.environment_name);
1011
1012 if let Err(err) = self
1013 .s3_repository
1014 .delete_object("sn-environment-type", &self.environment_name)
1015 .await
1016 {
1017 println!("Failed to delete environment type: {err}. Continuing cleanup...");
1018 }
1019 Ok(())
1020 }
1021
1022 async fn destroy_infra(&self, environment_details: Option<EnvironmentDetails>) -> Result<()> {
1023 infra::select_workspace(&self.terraform_runner, &self.environment_name)?;
1024
1025 let options = InfraRunOptions::generate_existing(
1026 &self.environment_name,
1027 &self.region,
1028 &self.terraform_runner,
1029 environment_details.as_ref(),
1030 )
1031 .await?;
1032
1033 let args = build_terraform_args(&options)?;
1034 let tfvars_filenames = if let Some(environment_details) = &environment_details {
1035 environment_details
1036 .environment_type
1037 .get_tfvars_filenames(&self.environment_name, &self.region)
1038 } else {
1039 vec![]
1040 };
1041
1042 self.terraform_runner
1043 .destroy(Some(args), Some(tfvars_filenames))?;
1044
1045 infra::delete_workspace(&self.terraform_runner, &self.environment_name)?;
1046
1047 Ok(())
1048 }
1049}
1050
1051pub fn get_genesis_multiaddr(
1056 ansible_runner: &AnsibleRunner,
1057 ssh_client: &SshClient,
1058) -> Result<(String, IpAddr)> {
1059 let genesis_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Genesis, true)?;
1060 let genesis_ip = genesis_inventory[0].public_ip_addr;
1061
1062 let multiaddr = ssh_client
1066 .run_command(
1067 &genesis_ip,
1068 "root",
1069 "jq -r '.nodes[] | select(.initial_peers_config.first == true) | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1070 false,
1071 )
1072 .map(|output| output.first().cloned())
1073 .unwrap_or_else(|err| {
1074 log::error!("Failed to find first node with quic-v1 protocol: {err:?}");
1075 None
1076 });
1077
1078 let multiaddr = match multiaddr {
1080 Some(addr) => addr,
1081 None => ssh_client
1082 .run_command(
1083 &genesis_ip,
1084 "root",
1085 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1086 false,
1087 )?
1088 .first()
1089 .cloned()
1090 .ok_or_else(|| Error::GenesisListenAddress)?,
1091 };
1092
1093 Ok((multiaddr, genesis_ip))
1094}
1095
1096pub fn get_anvil_node_data(
1097 ansible_runner: &AnsibleRunner,
1098 ssh_client: &SshClient,
1099) -> Result<AnvilNodeData> {
1100 let evm_inventory = ansible_runner.get_inventory(AnsibleInventoryType::EvmNodes, true)?;
1101 if evm_inventory.is_empty() {
1102 return Err(Error::EvmNodeNotFound);
1103 }
1104
1105 let evm_ip = evm_inventory[0].public_ip_addr;
1106 debug!("Retrieved IP address for EVM node: {evm_ip}");
1107 let csv_file_path = "/home/ant/.local/share/autonomi/evm_testnet_data.csv";
1108
1109 const MAX_ATTEMPTS: u8 = 5;
1110 const RETRY_DELAY: Duration = Duration::from_secs(5);
1111
1112 for attempt in 1..=MAX_ATTEMPTS {
1113 match ssh_client.run_command(&evm_ip, "ant", &format!("cat {csv_file_path}"), false) {
1114 Ok(output) => {
1115 if let Some(csv_contents) = output.first() {
1116 let parts: Vec<&str> = csv_contents.split(',').collect();
1117 if parts.len() != 4 {
1118 return Err(Error::EvmTestnetDataParsingError(
1119 "Expected 4 fields in the CSV".to_string(),
1120 ));
1121 }
1122
1123 let evm_testnet_data = AnvilNodeData {
1124 rpc_url: parts[0].trim().to_string(),
1125 payment_token_address: parts[1].trim().to_string(),
1126 data_payments_address: parts[2].trim().to_string(),
1127 deployer_wallet_private_key: parts[3].trim().to_string(),
1128 };
1129 return Ok(evm_testnet_data);
1130 }
1131 }
1132 Err(e) => {
1133 if attempt == MAX_ATTEMPTS {
1134 return Err(e);
1135 }
1136 println!(
1137 "Attempt {} failed to read EVM testnet data. Retrying in {} seconds...",
1138 attempt,
1139 RETRY_DELAY.as_secs()
1140 );
1141 }
1142 }
1143 std::thread::sleep(RETRY_DELAY);
1144 }
1145
1146 Err(Error::EvmTestnetDataNotFound)
1147}
1148
1149pub fn get_multiaddr(
1150 ansible_runner: &AnsibleRunner,
1151 ssh_client: &SshClient,
1152) -> Result<(String, IpAddr)> {
1153 let node_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Nodes, true)?;
1154 let node_ip = node_inventory
1157 .iter()
1158 .find(|vm| vm.name.ends_with("-node-1"))
1159 .ok_or_else(|| Error::NodeAddressNotFound)?
1160 .public_ip_addr;
1161
1162 debug!("Getting multiaddr from node {node_ip}");
1163
1164 let multiaddr =
1165 ssh_client
1166 .run_command(
1167 &node_ip,
1168 "root",
1169 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not)' /var/antctl/node_registry.json | head -n 1",
1171 false,
1172 )?.first()
1173 .cloned()
1174 .ok_or_else(|| Error::NodeAddressNotFound)?;
1175
1176 Ok((multiaddr, node_ip))
1179}
1180
1181pub async fn get_and_extract_archive_from_s3(
1182 s3_repository: &S3Repository,
1183 bucket_name: &str,
1184 archive_bucket_path: &str,
1185 dest_path: &Path,
1186) -> Result<()> {
1187 let archive_file_name = archive_bucket_path.split('/').next_back().unwrap();
1190 let archive_dest_path = dest_path.join(archive_file_name);
1191 s3_repository
1192 .download_object(bucket_name, archive_bucket_path, &archive_dest_path)
1193 .await?;
1194 extract_archive(&archive_dest_path, dest_path)?;
1195 Ok(())
1196}
1197
1198pub fn extract_archive(archive_path: &Path, dest_path: &Path) -> Result<()> {
1199 let archive_file = File::open(archive_path)?;
1200 let decoder = GzDecoder::new(archive_file);
1201 let mut archive = Archive::new(decoder);
1202 let entries = archive.entries()?;
1203 for entry_result in entries {
1204 let mut entry = entry_result?;
1205 let extract_path = dest_path.join(entry.path()?);
1206 if entry.header().entry_type() == tar::EntryType::Directory {
1207 std::fs::create_dir_all(extract_path)?;
1208 continue;
1209 }
1210 let mut file = BufWriter::new(File::create(extract_path)?);
1211 std::io::copy(&mut entry, &mut file)?;
1212 }
1213 std::fs::remove_file(archive_path)?;
1214 Ok(())
1215}
1216
1217pub fn run_external_command(
1218 binary_path: PathBuf,
1219 working_directory_path: PathBuf,
1220 args: Vec<String>,
1221 suppress_stdout: bool,
1222 suppress_stderr: bool,
1223) -> Result<Vec<String>> {
1224 let mut command = Command::new(binary_path.clone());
1225 for arg in &args {
1226 command.arg(arg);
1227 }
1228 command.stdout(Stdio::piped());
1229 command.stderr(Stdio::piped());
1230 command.current_dir(working_directory_path.clone());
1231 debug!("Running {binary_path:#?} with args {args:#?}");
1232 debug!("Working directory set to {working_directory_path:#?}");
1233
1234 let mut child = command.spawn()?;
1235 let mut output_lines = Vec::new();
1236
1237 if let Some(ref mut stdout) = child.stdout {
1238 let reader = BufReader::new(stdout);
1239 for line in reader.lines() {
1240 let line = line?;
1241 if !suppress_stdout {
1242 println!("{line}");
1243 }
1244 output_lines.push(line);
1245 }
1246 }
1247
1248 if let Some(ref mut stderr) = child.stderr {
1249 let reader = BufReader::new(stderr);
1250 for line in reader.lines() {
1251 let line = line?;
1252 if !suppress_stderr {
1253 eprintln!("{line}");
1254 }
1255 output_lines.push(line);
1256 }
1257 }
1258
1259 let output = child.wait()?;
1260 if !output.success() {
1261 let binary_path = binary_path.to_str().unwrap();
1263 return Err(Error::ExternalCommandRunFailed {
1264 binary: binary_path.to_string(),
1265 exit_status: output,
1266 });
1267 }
1268
1269 Ok(output_lines)
1270}
1271
1272pub fn is_binary_on_path(binary_name: &str) -> bool {
1273 if let Ok(path) = std::env::var("PATH") {
1274 for dir in path.split(':') {
1275 let mut full_path = PathBuf::from(dir);
1276 full_path.push(binary_name);
1277 if full_path.exists() {
1278 return true;
1279 }
1280 }
1281 }
1282 false
1283}
1284
1285pub fn get_wallet_directory() -> Result<PathBuf> {
1286 Ok(dirs_next::data_dir()
1287 .ok_or_else(|| Error::CouldNotRetrieveDataDirectory)?
1288 .join("safe")
1289 .join("client")
1290 .join("wallet"))
1291}
1292
1293pub async fn notify_slack(inventory: DeploymentInventory) -> Result<()> {
1294 let webhook_url =
1295 std::env::var("SLACK_WEBHOOK_URL").map_err(|_| Error::SlackWebhookUrlNotSupplied)?;
1296
1297 let mut message = String::new();
1298 message.push_str("*Testnet Details*\n");
1299 message.push_str(&format!("Name: {}\n", inventory.name));
1300 message.push_str(&format!("Node count: {}\n", inventory.peers().len()));
1301 message.push_str(&format!("Faucet address: {:?}\n", inventory.faucet_address));
1302 match inventory.binary_option {
1303 BinaryOption::BuildFromSource {
1304 ref repo_owner,
1305 ref branch,
1306 ..
1307 } => {
1308 message.push_str("*Branch Details*\n");
1309 message.push_str(&format!("Repo owner: {repo_owner}\n"));
1310 message.push_str(&format!("Branch: {branch}\n"));
1311 }
1312 BinaryOption::Versioned {
1313 ant_version: ref safe_version,
1314 antnode_version: ref safenode_version,
1315 antctl_version: ref safenode_manager_version,
1316 ..
1317 } => {
1318 message.push_str("*Version Details*\n");
1319 message.push_str(&format!(
1320 "ant version: {}\n",
1321 safe_version
1322 .as_ref()
1323 .map_or("None".to_string(), |v| v.to_string())
1324 ));
1325 message.push_str(&format!(
1326 "safenode version: {}\n",
1327 safenode_version
1328 .as_ref()
1329 .map_or("None".to_string(), |v| v.to_string())
1330 ));
1331 message.push_str(&format!(
1332 "antctl version: {}\n",
1333 safenode_manager_version
1334 .as_ref()
1335 .map_or("None".to_string(), |v| v.to_string())
1336 ));
1337 }
1338 }
1339
1340 message.push_str("*Sample Peers*\n");
1341 message.push_str("```\n");
1342 for peer in inventory.peers().iter().take(20) {
1343 message.push_str(&format!("{peer}\n"));
1344 }
1345 message.push_str("```\n");
1346 message.push_str("*Available Files*\n");
1347 message.push_str("```\n");
1348 for (addr, file_name) in inventory.uploaded_files.iter() {
1349 message.push_str(&format!("{addr}: {file_name}\n"))
1350 }
1351 message.push_str("```\n");
1352
1353 let payload = json!({
1354 "text": message,
1355 });
1356 reqwest::Client::new()
1357 .post(webhook_url)
1358 .json(&payload)
1359 .send()
1360 .await?;
1361 println!("{message}");
1362 println!("Posted notification to Slack");
1363 Ok(())
1364}
1365
1366fn print_duration(duration: Duration) {
1367 let total_seconds = duration.as_secs();
1368 let minutes = total_seconds / 60;
1369 let seconds = total_seconds % 60;
1370 debug!("Time taken: {minutes} minutes and {seconds} seconds");
1371}
1372
1373pub fn get_progress_bar(length: u64) -> Result<ProgressBar> {
1374 let progress_bar = ProgressBar::new(length);
1375 progress_bar.set_style(
1376 ProgressStyle::default_bar()
1377 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len}")?
1378 .progress_chars("#>-"),
1379 );
1380 progress_bar.enable_steady_tick(Duration::from_millis(100));
1381 Ok(progress_bar)
1382}
1383
1384pub async fn get_environment_details(
1385 environment_name: &str,
1386 s3_repository: &S3Repository,
1387) -> Result<EnvironmentDetails> {
1388 let temp_file = tempfile::NamedTempFile::new()?;
1389
1390 let max_retries = 3;
1391 let mut retries = 0;
1392 let env_details = loop {
1393 debug!("Downloading the environment details file for {environment_name} from S3");
1394 match s3_repository
1395 .download_object("sn-environment-type", environment_name, temp_file.path())
1396 .await
1397 {
1398 Ok(_) => {
1399 debug!("Downloaded the environment details file for {environment_name} from S3");
1400 let content = match std::fs::read_to_string(temp_file.path()) {
1401 Ok(content) => content,
1402 Err(err) => {
1403 log::error!("Could not read the environment details file: {err:?}");
1404 if retries < max_retries {
1405 debug!("Retrying to read the environment details file");
1406 retries += 1;
1407 continue;
1408 } else {
1409 return Err(Error::EnvironmentDetailsNotFound(
1410 environment_name.to_string(),
1411 ));
1412 }
1413 }
1414 };
1415 trace!("Content of the environment details file: {content}");
1416
1417 match serde_json::from_str(&content) {
1418 Ok(environment_details) => break environment_details,
1419 Err(err) => {
1420 log::error!("Could not parse the environment details file: {err:?}");
1421 if retries < max_retries {
1422 debug!("Retrying to parse the environment details file");
1423 retries += 1;
1424 continue;
1425 } else {
1426 return Err(Error::EnvironmentDetailsNotFound(
1427 environment_name.to_string(),
1428 ));
1429 }
1430 }
1431 }
1432 }
1433 Err(err) => {
1434 log::error!(
1435 "Could not download the environment details file for {environment_name} from S3: {err:?}"
1436 );
1437 if retries < max_retries {
1438 retries += 1;
1439 continue;
1440 } else {
1441 return Err(Error::EnvironmentDetailsNotFound(
1442 environment_name.to_string(),
1443 ));
1444 }
1445 }
1446 }
1447 };
1448
1449 debug!("Fetched environment details: {env_details:?}");
1450
1451 Ok(env_details)
1452}
1453
1454pub async fn write_environment_details(
1455 s3_repository: &S3Repository,
1456 environment_name: &str,
1457 environment_details: &EnvironmentDetails,
1458) -> Result<()> {
1459 let temp_dir = tempfile::tempdir()?;
1460 let path = temp_dir.path().to_path_buf().join(environment_name);
1461 let mut file = File::create(&path)?;
1462 let json = serde_json::to_string(environment_details)?;
1463 file.write_all(json.as_bytes())?;
1464 s3_repository
1465 .upload_file("sn-environment-type", &path, true)
1466 .await?;
1467 Ok(())
1468}
1469
1470pub fn calculate_size_per_attached_volume(node_count: u16) -> u16 {
1471 if node_count == 0 {
1472 return 0;
1473 }
1474 let total_volume_required = node_count * STORAGE_REQUIRED_PER_NODE;
1475
1476 (total_volume_required as f64 / 7.0).ceil() as u16
1478}
1479
1480pub fn get_bootstrap_cache_url(ip_addr: &IpAddr) -> String {
1481 format!("http://{ip_addr}/bootstrap_cache.json")
1482}