1pub mod ansible;
8pub mod bootstrap;
9pub mod clients;
10pub mod deploy;
11pub mod digital_ocean;
12pub mod error;
13pub mod funding;
14pub mod infra;
15pub mod inventory;
16pub mod logs;
17pub mod reserved_ip;
18pub mod rpc_client;
19pub mod s3;
20pub mod safe;
21pub mod setup;
22pub mod ssh;
23pub mod terraform;
24pub mod upscale;
25
26const STORAGE_REQUIRED_PER_NODE: u16 = 7;
27
28use crate::{
29 ansible::{
30 extra_vars::ExtraVarsDocBuilder,
31 inventory::{cleanup_environment_inventory, AnsibleInventoryType},
32 provisioning::AnsibleProvisioner,
33 AnsibleRunner,
34 },
35 error::{Error, Result},
36 inventory::{DeploymentInventory, VirtualMachine},
37 rpc_client::RpcClient,
38 s3::S3Repository,
39 ssh::SshClient,
40 terraform::TerraformRunner,
41};
42use ant_service_management::ServiceStatus;
43use flate2::read::GzDecoder;
44use indicatif::{ProgressBar, ProgressStyle};
45use infra::{build_terraform_args, InfraRunOptions};
46use log::{debug, trace};
47use semver::Version;
48use serde::{Deserialize, Serialize};
49use serde_json::json;
50use std::{
51 fs::File,
52 io::{BufRead, BufReader, BufWriter, Write},
53 net::IpAddr,
54 path::{Path, PathBuf},
55 process::{Command, Stdio},
56 str::FromStr,
57 time::Duration,
58};
59use tar::Archive;
60
61const ANSIBLE_DEFAULT_FORKS: usize = 50;
62
63#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
64pub enum DeploymentType {
65 Bootstrap,
67 Client,
69 #[default]
71 New,
72}
73
74#[derive(Debug, Clone, Default, Serialize, Deserialize)]
75pub struct AnvilNodeData {
76 pub data_payments_address: String,
77 pub deployer_wallet_private_key: String,
78 pub payment_token_address: String,
79 pub rpc_url: String,
80}
81
82impl std::fmt::Display for DeploymentType {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self {
85 DeploymentType::Bootstrap => write!(f, "bootstrap"),
86 DeploymentType::Client => write!(f, "clients"),
87 DeploymentType::New => write!(f, "new"),
88 }
89 }
90}
91
92impl std::str::FromStr for DeploymentType {
93 type Err = String;
94
95 fn from_str(s: &str) -> Result<Self, Self::Err> {
96 match s.to_lowercase().as_str() {
97 "bootstrap" => Ok(DeploymentType::Bootstrap),
98 "clients" => Ok(DeploymentType::Client),
99 "new" => Ok(DeploymentType::New),
100 _ => Err(format!("Invalid deployment type: {s}")),
101 }
102 }
103}
104
105#[derive(Debug, Clone)]
106pub enum NodeType {
107 FullConePrivateNode,
108 Generic,
109 Genesis,
110 PeerCache,
111 SymmetricPrivateNode,
112}
113
114impl std::fmt::Display for NodeType {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 match self {
117 NodeType::FullConePrivateNode => write!(f, "full-cone-private"),
118 NodeType::Generic => write!(f, "generic"),
119 NodeType::Genesis => write!(f, "genesis"),
120 NodeType::PeerCache => write!(f, "peer-cache"),
121 NodeType::SymmetricPrivateNode => write!(f, "symmetric-private"),
122 }
123 }
124}
125
126impl std::str::FromStr for NodeType {
127 type Err = String;
128
129 fn from_str(s: &str) -> Result<Self, Self::Err> {
130 match s.to_lowercase().as_str() {
131 "full-cone-private" => Ok(NodeType::FullConePrivateNode),
132 "generic" => Ok(NodeType::Generic),
133 "genesis" => Ok(NodeType::Genesis),
134 "peer-cache" => Ok(NodeType::PeerCache),
135 "symmetric-private" => Ok(NodeType::SymmetricPrivateNode),
136 _ => Err(format!("Invalid node type: {s}")),
137 }
138 }
139}
140
141impl NodeType {
142 pub fn telegraf_role(&self) -> &'static str {
143 match self {
144 NodeType::FullConePrivateNode => "NAT_FULL_CONE_NODE",
145 NodeType::Generic => "GENERIC_NODE",
146 NodeType::Genesis => "GENESIS_NODE",
147 NodeType::PeerCache => "PEER_CACHE_NODE",
148 NodeType::SymmetricPrivateNode => "NAT_RANDOMIZED_NODE",
149 }
150 }
151
152 pub fn to_ansible_inventory_type(&self) -> AnsibleInventoryType {
153 match self {
154 NodeType::FullConePrivateNode => AnsibleInventoryType::FullConePrivateNodes,
155 NodeType::Generic => AnsibleInventoryType::Nodes,
156 NodeType::Genesis => AnsibleInventoryType::Genesis,
157 NodeType::PeerCache => AnsibleInventoryType::PeerCacheNodes,
158 NodeType::SymmetricPrivateNode => AnsibleInventoryType::SymmetricPrivateNodes,
159 }
160 }
161}
162
163#[derive(Clone, Debug, Default, Eq, Serialize, Deserialize, PartialEq)]
164pub enum EvmNetwork {
165 #[default]
166 Anvil,
167 ArbitrumOne,
168 ArbitrumSepoliaTest,
169 Custom,
170}
171
172impl std::fmt::Display for EvmNetwork {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 match self {
175 EvmNetwork::Anvil => write!(f, "evm-custom"),
176 EvmNetwork::ArbitrumOne => write!(f, "evm-arbitrum-one"),
177 EvmNetwork::ArbitrumSepoliaTest => write!(f, "evm-arbitrum-sepolia-test"),
178 EvmNetwork::Custom => write!(f, "evm-custom"),
179 }
180 }
181}
182
183impl std::str::FromStr for EvmNetwork {
184 type Err = String;
185
186 fn from_str(s: &str) -> Result<Self, Self::Err> {
187 match s.to_lowercase().as_str() {
188 "anvil" => Ok(EvmNetwork::Anvil),
189 "arbitrum-one" => Ok(EvmNetwork::ArbitrumOne),
190 "arbitrum-sepolia-test" => Ok(EvmNetwork::ArbitrumSepoliaTest),
191 "custom" => Ok(EvmNetwork::Custom),
192 _ => Err(format!("Invalid EVM network type: {s}")),
193 }
194 }
195}
196
197#[derive(Clone, Debug, Default, Serialize, Deserialize)]
198pub struct EvmDetails {
199 pub network: EvmNetwork,
200 pub data_payments_address: Option<String>,
201 pub payment_token_address: Option<String>,
202 pub rpc_url: Option<String>,
203}
204
205#[derive(Clone, Debug, Default, Serialize, Deserialize)]
206pub struct EnvironmentDetails {
207 pub deployment_type: DeploymentType,
208 pub environment_type: EnvironmentType,
209 pub evm_details: EvmDetails,
210 pub funding_wallet_address: Option<String>,
211 pub network_id: Option<u8>,
212 pub region: String,
213 pub rewards_address: Option<String>,
214}
215
216#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
217pub enum EnvironmentType {
218 #[default]
219 Development,
220 Production,
221 Staging,
222}
223
224impl EnvironmentType {
225 pub fn get_tfvars_filenames(&self, name: &str, region: &str) -> Vec<String> {
226 match self {
227 EnvironmentType::Development => vec![
228 "dev.tfvars".to_string(),
229 format!("dev-images-{region}.tfvars", region = region),
230 ],
231 EnvironmentType::Staging => vec![
232 "staging.tfvars".to_string(),
233 format!("staging-images-{region}.tfvars", region = region),
234 ],
235 EnvironmentType::Production => {
236 vec![
237 format!("{name}.tfvars", name = name),
238 format!("production-images-{region}.tfvars", region = region),
239 ]
240 }
241 }
242 }
243
244 pub fn get_default_peer_cache_node_count(&self) -> u16 {
245 match self {
246 EnvironmentType::Development => 5,
247 EnvironmentType::Production => 5,
248 EnvironmentType::Staging => 5,
249 }
250 }
251
252 pub fn get_default_node_count(&self) -> u16 {
253 match self {
254 EnvironmentType::Development => 25,
255 EnvironmentType::Production => 25,
256 EnvironmentType::Staging => 25,
257 }
258 }
259
260 pub fn get_default_symmetric_private_node_count(&self) -> u16 {
261 self.get_default_node_count()
262 }
263
264 pub fn get_default_full_cone_private_node_count(&self) -> u16 {
265 self.get_default_node_count()
266 }
267}
268
269impl std::fmt::Display for EnvironmentType {
270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 match self {
272 EnvironmentType::Development => write!(f, "development"),
273 EnvironmentType::Production => write!(f, "production"),
274 EnvironmentType::Staging => write!(f, "staging"),
275 }
276 }
277}
278
279impl FromStr for EnvironmentType {
280 type Err = Error;
281
282 fn from_str(s: &str) -> Result<Self, Self::Err> {
283 match s.to_lowercase().as_str() {
284 "development" => Ok(EnvironmentType::Development),
285 "production" => Ok(EnvironmentType::Production),
286 "staging" => Ok(EnvironmentType::Staging),
287 _ => Err(Error::EnvironmentNameFromStringError(s.to_string())),
288 }
289 }
290}
291
292#[derive(Clone, Debug, Serialize, Deserialize)]
306pub enum BinaryOption {
307 BuildFromSource {
309 antnode_features: Option<String>,
311 branch: String,
312 repo_owner: String,
313 skip_binary_build: bool,
316 },
317 Versioned {
319 ant_version: Option<Version>,
320 antctl_version: Option<Version>,
321 antnode_version: Option<Version>,
322 },
323}
324
325impl BinaryOption {
326 pub fn should_provision_build_machine(&self) -> bool {
327 match self {
328 BinaryOption::BuildFromSource {
329 skip_binary_build, ..
330 } => !skip_binary_build,
331 BinaryOption::Versioned { .. } => false,
332 }
333 }
334
335 pub fn print(&self) {
336 match self {
337 BinaryOption::BuildFromSource {
338 antnode_features,
339 branch,
340 repo_owner,
341 skip_binary_build: _,
342 } => {
343 println!("Source configuration:");
344 println!(" Repository owner: {repo_owner}");
345 println!(" Branch: {branch}");
346 if let Some(features) = antnode_features {
347 println!(" Antnode features: {features}");
348 }
349 }
350 BinaryOption::Versioned {
351 ant_version,
352 antctl_version,
353 antnode_version,
354 } => {
355 println!("Versioned binaries configuration:");
356 if let Some(version) = ant_version {
357 println!(" ant version: {version}");
358 }
359 if let Some(version) = antctl_version {
360 println!(" antctl version: {version}");
361 }
362 if let Some(version) = antnode_version {
363 println!(" antnode version: {version}");
364 }
365 }
366 }
367 }
368}
369
370#[derive(Debug, Clone, Copy)]
371pub enum CloudProvider {
372 Aws,
373 DigitalOcean,
374}
375
376impl std::fmt::Display for CloudProvider {
377 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
378 match self {
379 CloudProvider::Aws => write!(f, "aws"),
380 CloudProvider::DigitalOcean => write!(f, "digital-ocean"),
381 }
382 }
383}
384
385impl CloudProvider {
386 pub fn get_ssh_user(&self) -> String {
387 match self {
388 CloudProvider::Aws => "ubuntu".to_string(),
389 CloudProvider::DigitalOcean => "root".to_string(),
390 }
391 }
392}
393
394#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
395pub enum LogFormat {
396 Default,
397 Json,
398}
399
400impl LogFormat {
401 pub fn parse_from_str(val: &str) -> Result<Self> {
402 match val {
403 "default" => Ok(LogFormat::Default),
404 "json" => Ok(LogFormat::Json),
405 _ => Err(Error::LoggingConfiguration(
406 "The only valid values for this argument are \"default\" or \"json\"".to_string(),
407 )),
408 }
409 }
410
411 pub fn as_str(&self) -> &'static str {
412 match self {
413 LogFormat::Default => "default",
414 LogFormat::Json => "json",
415 }
416 }
417}
418
419#[derive(Clone)]
420pub struct UpgradeOptions {
421 pub ansible_verbose: bool,
422 pub custom_inventory: Option<Vec<VirtualMachine>>,
423 pub env_variables: Option<Vec<(String, String)>>,
424 pub force: bool,
425 pub forks: usize,
426 pub interval: Duration,
427 pub name: String,
428 pub node_type: Option<NodeType>,
429 pub pre_upgrade_delay: Option<u64>,
430 pub provider: CloudProvider,
431 pub version: Option<String>,
432}
433
434impl UpgradeOptions {
435 pub fn get_ansible_vars(&self) -> String {
436 let mut extra_vars = ExtraVarsDocBuilder::default();
437 extra_vars.add_variable("interval", &self.interval.as_millis().to_string());
438 if let Some(env_variables) = &self.env_variables {
439 extra_vars.add_env_variable_list("env_variables", env_variables.clone());
440 }
441 if self.force {
442 extra_vars.add_variable("force", &self.force.to_string());
443 }
444 if let Some(version) = &self.version {
445 extra_vars.add_variable("antnode_version", version);
446 }
447 if let Some(pre_upgrade_delay) = &self.pre_upgrade_delay {
448 extra_vars.add_variable("pre_upgrade_delay", &pre_upgrade_delay.to_string());
449 }
450 extra_vars.build()
451 }
452}
453
454#[derive(Default)]
455pub struct TestnetDeployBuilder {
456 ansible_forks: Option<usize>,
457 ansible_verbose_mode: bool,
458 deployment_type: EnvironmentType,
459 environment_name: String,
460 provider: Option<CloudProvider>,
461 region: Option<String>,
462 ssh_secret_key_path: Option<PathBuf>,
463 state_bucket_name: Option<String>,
464 terraform_binary_path: Option<PathBuf>,
465 vault_password_path: Option<PathBuf>,
466 working_directory_path: Option<PathBuf>,
467}
468
469impl TestnetDeployBuilder {
470 pub fn new() -> Self {
471 Default::default()
472 }
473
474 pub fn ansible_verbose_mode(&mut self, ansible_verbose_mode: bool) -> &mut Self {
475 self.ansible_verbose_mode = ansible_verbose_mode;
476 self
477 }
478
479 pub fn ansible_forks(&mut self, ansible_forks: usize) -> &mut Self {
480 self.ansible_forks = Some(ansible_forks);
481 self
482 }
483
484 pub fn deployment_type(&mut self, deployment_type: EnvironmentType) -> &mut Self {
485 self.deployment_type = deployment_type;
486 self
487 }
488
489 pub fn environment_name(&mut self, name: &str) -> &mut Self {
490 self.environment_name = name.to_string();
491 self
492 }
493
494 pub fn provider(&mut self, provider: CloudProvider) -> &mut Self {
495 self.provider = Some(provider);
496 self
497 }
498
499 pub fn state_bucket_name(&mut self, state_bucket_name: String) -> &mut Self {
500 self.state_bucket_name = Some(state_bucket_name);
501 self
502 }
503
504 pub fn terraform_binary_path(&mut self, terraform_binary_path: PathBuf) -> &mut Self {
505 self.terraform_binary_path = Some(terraform_binary_path);
506 self
507 }
508
509 pub fn working_directory(&mut self, working_directory_path: PathBuf) -> &mut Self {
510 self.working_directory_path = Some(working_directory_path);
511 self
512 }
513
514 pub fn ssh_secret_key_path(&mut self, ssh_secret_key_path: PathBuf) -> &mut Self {
515 self.ssh_secret_key_path = Some(ssh_secret_key_path);
516 self
517 }
518
519 pub fn vault_password_path(&mut self, vault_password_path: PathBuf) -> &mut Self {
520 self.vault_password_path = Some(vault_password_path);
521 self
522 }
523
524 pub fn region(&mut self, region: String) -> &mut Self {
525 self.region = Some(region);
526 self
527 }
528
529 pub fn build(&self) -> Result<TestnetDeployer> {
530 let provider = self.provider.unwrap_or(CloudProvider::DigitalOcean);
531 match provider {
532 CloudProvider::DigitalOcean => {
533 let digital_ocean_pat = std::env::var("DO_PAT").map_err(|_| {
534 Error::CloudProviderCredentialsNotSupplied("DO_PAT".to_string())
535 })?;
536 std::env::set_var("DIGITALOCEAN_TOKEN", digital_ocean_pat.clone());
540 std::env::set_var("DO_API_TOKEN", digital_ocean_pat);
541 }
542 _ => {
543 return Err(Error::CloudProviderNotSupported(provider.to_string()));
544 }
545 }
546
547 let state_bucket_name = match self.state_bucket_name {
548 Some(ref bucket_name) => bucket_name.clone(),
549 None => std::env::var("TERRAFORM_STATE_BUCKET_NAME")?,
550 };
551
552 let default_terraform_bin_path = PathBuf::from("terraform");
553 let terraform_binary_path = self
554 .terraform_binary_path
555 .as_ref()
556 .unwrap_or(&default_terraform_bin_path);
557
558 let working_directory_path = match self.working_directory_path {
559 Some(ref work_dir_path) => work_dir_path.clone(),
560 None => std::env::current_dir()?.join("resources"),
561 };
562
563 let ssh_secret_key_path = match self.ssh_secret_key_path {
564 Some(ref ssh_sk_path) => ssh_sk_path.clone(),
565 None => PathBuf::from(std::env::var("SSH_KEY_PATH")?),
566 };
567
568 let vault_password_path = match self.vault_password_path {
569 Some(ref vault_pw_path) => vault_pw_path.clone(),
570 None => PathBuf::from(std::env::var("ANSIBLE_VAULT_PASSWORD_PATH")?),
571 };
572
573 let region = match self.region {
574 Some(ref region) => region.clone(),
575 None => "lon1".to_string(),
576 };
577
578 let terraform_runner = TerraformRunner::new(
579 terraform_binary_path.to_path_buf(),
580 working_directory_path
581 .join("terraform")
582 .join("testnet")
583 .join(provider.to_string()),
584 provider,
585 &state_bucket_name,
586 )?;
587 let ansible_runner = AnsibleRunner::new(
588 self.ansible_forks.unwrap_or(ANSIBLE_DEFAULT_FORKS),
589 self.ansible_verbose_mode,
590 &self.environment_name,
591 provider,
592 ssh_secret_key_path.clone(),
593 vault_password_path,
594 working_directory_path.join("ansible"),
595 )?;
596 let ssh_client = SshClient::new(ssh_secret_key_path);
597 let ansible_provisioner =
598 AnsibleProvisioner::new(ansible_runner, provider, ssh_client.clone());
599 let rpc_client = RpcClient::new(
600 PathBuf::from("/usr/local/bin/safenode_rpc_client"),
601 working_directory_path.clone(),
602 );
603
604 let safe_path = working_directory_path.join("safe");
607 if safe_path.exists() {
608 std::fs::remove_file(safe_path)?;
609 }
610
611 let testnet = TestnetDeployer::new(
612 ansible_provisioner,
613 provider,
614 self.deployment_type.clone(),
615 &self.environment_name,
616 rpc_client,
617 S3Repository {},
618 ssh_client,
619 terraform_runner,
620 working_directory_path,
621 region,
622 )?;
623
624 Ok(testnet)
625 }
626}
627
628#[derive(Clone)]
629pub struct TestnetDeployer {
630 pub ansible_provisioner: AnsibleProvisioner,
631 pub cloud_provider: CloudProvider,
632 pub deployment_type: EnvironmentType,
633 pub environment_name: String,
634 pub inventory_file_path: PathBuf,
635 pub region: String,
636 pub rpc_client: RpcClient,
637 pub s3_repository: S3Repository,
638 pub ssh_client: SshClient,
639 pub terraform_runner: TerraformRunner,
640 pub working_directory_path: PathBuf,
641}
642
643impl TestnetDeployer {
644 #[allow(clippy::too_many_arguments)]
645 pub fn new(
646 ansible_provisioner: AnsibleProvisioner,
647 cloud_provider: CloudProvider,
648 deployment_type: EnvironmentType,
649 environment_name: &str,
650 rpc_client: RpcClient,
651 s3_repository: S3Repository,
652 ssh_client: SshClient,
653 terraform_runner: TerraformRunner,
654 working_directory_path: PathBuf,
655 region: String,
656 ) -> Result<TestnetDeployer> {
657 if environment_name.is_empty() {
658 return Err(Error::EnvironmentNameRequired);
659 }
660 let inventory_file_path = working_directory_path
661 .join("ansible")
662 .join("inventory")
663 .join("dev_inventory_digital_ocean.yml");
664 Ok(TestnetDeployer {
665 ansible_provisioner,
666 cloud_provider,
667 deployment_type,
668 environment_name: environment_name.to_string(),
669 inventory_file_path,
670 region,
671 rpc_client,
672 ssh_client,
673 s3_repository,
674 terraform_runner,
675 working_directory_path,
676 })
677 }
678
679 pub async fn init(&self) -> Result<()> {
680 if self
681 .s3_repository
682 .folder_exists(
683 "sn-testnet",
684 &format!("testnet-logs/{}", self.environment_name),
685 )
686 .await?
687 {
688 return Err(Error::LogsForPreviousTestnetExist(
689 self.environment_name.clone(),
690 ));
691 }
692
693 self.terraform_runner.init()?;
694 let workspaces = self.terraform_runner.workspace_list()?;
695 if !workspaces.contains(&self.environment_name) {
696 self.terraform_runner
697 .workspace_new(&self.environment_name)?;
698 } else {
699 println!("Workspace {} already exists", self.environment_name);
700 }
701
702 let rpc_client_path = self.working_directory_path.join("safenode_rpc_client");
703 if !rpc_client_path.is_file() {
704 println!("Downloading the rpc client for safenode...");
705 let archive_name = "safenode_rpc_client-latest-x86_64-unknown-linux-musl.tar.gz";
706 get_and_extract_archive_from_s3(
707 &self.s3_repository,
708 "sn-node-rpc-client",
709 archive_name,
710 &self.working_directory_path,
711 )
712 .await?;
713 #[cfg(unix)]
714 {
715 use std::os::unix::fs::PermissionsExt;
716 let mut permissions = std::fs::metadata(&rpc_client_path)?.permissions();
717 permissions.set_mode(0o755); std::fs::set_permissions(&rpc_client_path, permissions)?;
719 }
720 }
721
722 Ok(())
723 }
724
725 pub fn plan(&self, options: &InfraRunOptions) -> Result<()> {
726 println!("Selecting {} workspace...", options.name);
727 self.terraform_runner.workspace_select(&options.name)?;
728
729 let args = build_terraform_args(options)?;
730
731 self.terraform_runner
732 .plan(Some(args), options.tfvars_filenames.clone())?;
733 Ok(())
734 }
735
736 pub fn start(
737 &self,
738 interval: Duration,
739 node_type: Option<NodeType>,
740 custom_inventory: Option<Vec<VirtualMachine>>,
741 ) -> Result<()> {
742 self.ansible_provisioner.start_nodes(
743 &self.environment_name,
744 interval,
745 node_type,
746 custom_inventory,
747 )?;
748 Ok(())
749 }
750
751 pub fn status(&self) -> Result<()> {
757 self.ansible_provisioner.status()?;
758
759 let peer_cache_node_registries = self
760 .ansible_provisioner
761 .get_node_registries(&AnsibleInventoryType::PeerCacheNodes)?;
762 let generic_node_registries = self
763 .ansible_provisioner
764 .get_node_registries(&AnsibleInventoryType::Nodes)?;
765 let symmetric_private_node_registries = self
766 .ansible_provisioner
767 .get_node_registries(&AnsibleInventoryType::SymmetricPrivateNodes)?;
768 let full_cone_private_node_registries = self
769 .ansible_provisioner
770 .get_node_registries(&AnsibleInventoryType::FullConePrivateNodes)?;
771 let genesis_node_registry = self
772 .ansible_provisioner
773 .get_node_registries(&AnsibleInventoryType::Genesis)?
774 .clone();
775
776 peer_cache_node_registries.print();
777 generic_node_registries.print();
778 symmetric_private_node_registries.print();
779 full_cone_private_node_registries.print();
780 genesis_node_registry.print();
781
782 let all_registries = [
783 &peer_cache_node_registries,
784 &generic_node_registries,
785 &symmetric_private_node_registries,
786 &full_cone_private_node_registries,
787 &genesis_node_registry,
788 ];
789
790 let mut total_nodes = 0;
791 let mut running_nodes = 0;
792 let mut stopped_nodes = 0;
793 let mut added_nodes = 0;
794 let mut removed_nodes = 0;
795
796 for (_, registry) in all_registries
797 .iter()
798 .flat_map(|r| r.retrieved_registries.iter())
799 {
800 for node in registry.nodes.iter() {
801 total_nodes += 1;
802 match node.status {
803 ServiceStatus::Running => running_nodes += 1,
804 ServiceStatus::Stopped => stopped_nodes += 1,
805 ServiceStatus::Added => added_nodes += 1,
806 ServiceStatus::Removed => removed_nodes += 1,
807 }
808 }
809 }
810
811 let peer_cache_hosts = peer_cache_node_registries.retrieved_registries.len();
812 let generic_hosts = generic_node_registries.retrieved_registries.len();
813 let symmetric_private_hosts = symmetric_private_node_registries.retrieved_registries.len();
814 let full_cone_private_hosts = full_cone_private_node_registries.retrieved_registries.len();
815
816 let peer_cache_nodes = peer_cache_node_registries
817 .retrieved_registries
818 .iter()
819 .flat_map(|(_, n)| n.nodes.iter())
820 .count();
821 let generic_nodes = generic_node_registries
822 .retrieved_registries
823 .iter()
824 .flat_map(|(_, n)| n.nodes.iter())
825 .count();
826 let symmetric_private_nodes = symmetric_private_node_registries
827 .retrieved_registries
828 .iter()
829 .flat_map(|(_, n)| n.nodes.iter())
830 .count();
831 let full_cone_private_nodes = full_cone_private_node_registries
832 .retrieved_registries
833 .iter()
834 .flat_map(|(_, n)| n.nodes.iter())
835 .count();
836
837 println!("-------");
838 println!("Summary");
839 println!("-------");
840 println!(
841 "Total peer cache nodes ({}x{}): {}",
842 peer_cache_hosts,
843 if peer_cache_hosts > 0 {
844 peer_cache_nodes / peer_cache_hosts
845 } else {
846 0
847 },
848 peer_cache_nodes
849 );
850 println!(
851 "Total generic nodes ({}x{}): {}",
852 generic_hosts,
853 if generic_hosts > 0 {
854 generic_nodes / generic_hosts
855 } else {
856 0
857 },
858 generic_nodes
859 );
860 println!(
861 "Total symmetric private nodes ({}x{}): {}",
862 symmetric_private_hosts,
863 if symmetric_private_hosts > 0 {
864 symmetric_private_nodes / symmetric_private_hosts
865 } else {
866 0
867 },
868 symmetric_private_nodes
869 );
870 println!(
871 "Total full cone private nodes ({}x{}): {}",
872 full_cone_private_hosts,
873 if full_cone_private_hosts > 0 {
874 full_cone_private_nodes / full_cone_private_hosts
875 } else {
876 0
877 },
878 full_cone_private_nodes
879 );
880 println!("Total nodes: {total_nodes}");
881 println!("Running nodes: {running_nodes}");
882 println!("Stopped nodes: {stopped_nodes}");
883 println!("Added nodes: {added_nodes}");
884 println!("Removed nodes: {removed_nodes}");
885
886 Ok(())
887 }
888
889 pub fn cleanup_node_logs(&self, setup_cron: bool) -> Result<()> {
890 self.ansible_provisioner.cleanup_node_logs(setup_cron)?;
891 Ok(())
892 }
893
894 pub fn start_telegraf(
895 &self,
896 node_type: Option<NodeType>,
897 custom_inventory: Option<Vec<VirtualMachine>>,
898 ) -> Result<()> {
899 self.ansible_provisioner.start_telegraf(
900 &self.environment_name,
901 node_type,
902 custom_inventory,
903 )?;
904 Ok(())
905 }
906
907 pub fn stop(
908 &self,
909 interval: Duration,
910 node_type: Option<NodeType>,
911 custom_inventory: Option<Vec<VirtualMachine>>,
912 delay: Option<u64>,
913 service_names: Option<Vec<String>>,
914 ) -> Result<()> {
915 self.ansible_provisioner.stop_nodes(
916 &self.environment_name,
917 interval,
918 node_type,
919 custom_inventory,
920 delay,
921 service_names,
922 )?;
923 Ok(())
924 }
925
926 pub fn stop_telegraf(
927 &self,
928 node_type: Option<NodeType>,
929 custom_inventory: Option<Vec<VirtualMachine>>,
930 ) -> Result<()> {
931 self.ansible_provisioner.stop_telegraf(
932 &self.environment_name,
933 node_type,
934 custom_inventory,
935 )?;
936 Ok(())
937 }
938
939 pub fn upgrade(&self, options: UpgradeOptions) -> Result<()> {
940 self.ansible_provisioner.upgrade_nodes(&options)?;
941 Ok(())
942 }
943
944 pub fn upgrade_antctl(
945 &self,
946 version: Version,
947 node_type: Option<NodeType>,
948 custom_inventory: Option<Vec<VirtualMachine>>,
949 ) -> Result<()> {
950 self.ansible_provisioner.upgrade_antctl(
951 &self.environment_name,
952 &version,
953 node_type,
954 custom_inventory,
955 )?;
956 Ok(())
957 }
958
959 pub fn upgrade_geoip_telegraf(&self, name: &str) -> Result<()> {
960 self.ansible_provisioner.upgrade_geoip_telegraf(name)?;
961 Ok(())
962 }
963
964 pub fn upgrade_node_telegraf(&self, name: &str) -> Result<()> {
965 self.ansible_provisioner.upgrade_node_telegraf(name)?;
966 Ok(())
967 }
968
969 pub fn upgrade_client_telegraf(&self, name: &str) -> Result<()> {
970 self.ansible_provisioner.upgrade_client_telegraf(name)?;
971 Ok(())
972 }
973
974 pub async fn clean(&self) -> Result<()> {
975 let environment_details =
976 get_environment_details(&self.environment_name, &self.s3_repository)
977 .await
978 .inspect_err(|err| {
979 println!("Failed to get environment details: {err}. Continuing cleanup...");
980 })
981 .ok();
982 if let Some(environment_details) = &environment_details {
983 funding::drain_funds(&self.ansible_provisioner, environment_details).await?;
984 }
985
986 self.destroy_infra(environment_details).await?;
987
988 cleanup_environment_inventory(
989 &self.environment_name,
990 &self
991 .working_directory_path
992 .join("ansible")
993 .join("inventory"),
994 None,
995 )?;
996
997 println!("Deleted Ansible inventory for {}", self.environment_name);
998
999 if let Err(err) = self
1000 .s3_repository
1001 .delete_object("sn-environment-type", &self.environment_name)
1002 .await
1003 {
1004 println!("Failed to delete environment type: {err}. Continuing cleanup...");
1005 }
1006 Ok(())
1007 }
1008
1009 async fn destroy_infra(&self, environment_details: Option<EnvironmentDetails>) -> Result<()> {
1010 infra::select_workspace(&self.terraform_runner, &self.environment_name)?;
1011
1012 let options = InfraRunOptions::generate_existing(
1013 &self.environment_name,
1014 &self.region,
1015 &self.terraform_runner,
1016 environment_details.as_ref(),
1017 )
1018 .await?;
1019
1020 let args = build_terraform_args(&options)?;
1021 let tfvars_filenames = if let Some(environment_details) = &environment_details {
1022 environment_details
1023 .environment_type
1024 .get_tfvars_filenames(&self.environment_name, &self.region)
1025 } else {
1026 vec![]
1027 };
1028
1029 self.terraform_runner
1030 .destroy(Some(args), Some(tfvars_filenames))?;
1031
1032 infra::delete_workspace(&self.terraform_runner, &self.environment_name)?;
1033
1034 Ok(())
1035 }
1036}
1037
1038pub fn get_genesis_multiaddr(
1043 ansible_runner: &AnsibleRunner,
1044 ssh_client: &SshClient,
1045) -> Result<(String, IpAddr)> {
1046 let genesis_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Genesis, true)?;
1047 let genesis_ip = genesis_inventory[0].public_ip_addr;
1048
1049 let multiaddr = ssh_client
1053 .run_command(
1054 &genesis_ip,
1055 "root",
1056 "jq -r '.nodes[] | select(.initial_peers_config.first == true) | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1057 false,
1058 )
1059 .map(|output| output.first().cloned())
1060 .unwrap_or_else(|err| {
1061 log::error!("Failed to find first node with quic-v1 protocol: {err:?}");
1062 None
1063 });
1064
1065 let multiaddr = match multiaddr {
1067 Some(addr) => addr,
1068 None => ssh_client
1069 .run_command(
1070 &genesis_ip,
1071 "root",
1072 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1073 false,
1074 )?
1075 .first()
1076 .cloned()
1077 .ok_or_else(|| Error::GenesisListenAddress)?,
1078 };
1079
1080 Ok((multiaddr, genesis_ip))
1081}
1082
1083pub fn get_anvil_node_data(
1084 ansible_runner: &AnsibleRunner,
1085 ssh_client: &SshClient,
1086) -> Result<AnvilNodeData> {
1087 let evm_inventory = ansible_runner.get_inventory(AnsibleInventoryType::EvmNodes, true)?;
1088 if evm_inventory.is_empty() {
1089 return Err(Error::EvmNodeNotFound);
1090 }
1091
1092 let evm_ip = evm_inventory[0].public_ip_addr;
1093 debug!("Retrieved IP address for EVM node: {evm_ip}");
1094 let csv_file_path = "/home/ant/.local/share/autonomi/evm_testnet_data.csv";
1095
1096 const MAX_ATTEMPTS: u8 = 5;
1097 const RETRY_DELAY: Duration = Duration::from_secs(5);
1098
1099 for attempt in 1..=MAX_ATTEMPTS {
1100 match ssh_client.run_command(&evm_ip, "ant", &format!("cat {csv_file_path}"), false) {
1101 Ok(output) => {
1102 if let Some(csv_contents) = output.first() {
1103 let parts: Vec<&str> = csv_contents.split(',').collect();
1104 if parts.len() != 4 {
1105 return Err(Error::EvmTestnetDataParsingError(
1106 "Expected 4 fields in the CSV".to_string(),
1107 ));
1108 }
1109
1110 let evm_testnet_data = AnvilNodeData {
1111 rpc_url: parts[0].trim().to_string(),
1112 payment_token_address: parts[1].trim().to_string(),
1113 data_payments_address: parts[2].trim().to_string(),
1114 deployer_wallet_private_key: parts[3].trim().to_string(),
1115 };
1116 return Ok(evm_testnet_data);
1117 }
1118 }
1119 Err(e) => {
1120 if attempt == MAX_ATTEMPTS {
1121 return Err(e);
1122 }
1123 println!(
1124 "Attempt {} failed to read EVM testnet data. Retrying in {} seconds...",
1125 attempt,
1126 RETRY_DELAY.as_secs()
1127 );
1128 }
1129 }
1130 std::thread::sleep(RETRY_DELAY);
1131 }
1132
1133 Err(Error::EvmTestnetDataNotFound)
1134}
1135
1136pub fn get_multiaddr(
1137 ansible_runner: &AnsibleRunner,
1138 ssh_client: &SshClient,
1139) -> Result<(String, IpAddr)> {
1140 let node_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Nodes, true)?;
1141 let node_ip = node_inventory
1144 .iter()
1145 .find(|vm| vm.name.ends_with("-node-1"))
1146 .ok_or_else(|| Error::NodeAddressNotFound)?
1147 .public_ip_addr;
1148
1149 debug!("Getting multiaddr from node {node_ip}");
1150
1151 let multiaddr =
1152 ssh_client
1153 .run_command(
1154 &node_ip,
1155 "root",
1156 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not)' /var/antctl/node_registry.json | head -n 1",
1158 false,
1159 )?.first()
1160 .cloned()
1161 .ok_or_else(|| Error::NodeAddressNotFound)?;
1162
1163 Ok((multiaddr, node_ip))
1166}
1167
1168pub async fn get_and_extract_archive_from_s3(
1169 s3_repository: &S3Repository,
1170 bucket_name: &str,
1171 archive_bucket_path: &str,
1172 dest_path: &Path,
1173) -> Result<()> {
1174 let archive_file_name = archive_bucket_path.split('/').next_back().unwrap();
1177 let archive_dest_path = dest_path.join(archive_file_name);
1178 s3_repository
1179 .download_object(bucket_name, archive_bucket_path, &archive_dest_path)
1180 .await?;
1181 extract_archive(&archive_dest_path, dest_path)?;
1182 Ok(())
1183}
1184
1185pub fn extract_archive(archive_path: &Path, dest_path: &Path) -> Result<()> {
1186 let archive_file = File::open(archive_path)?;
1187 let decoder = GzDecoder::new(archive_file);
1188 let mut archive = Archive::new(decoder);
1189 let entries = archive.entries()?;
1190 for entry_result in entries {
1191 let mut entry = entry_result?;
1192 let extract_path = dest_path.join(entry.path()?);
1193 if entry.header().entry_type() == tar::EntryType::Directory {
1194 std::fs::create_dir_all(extract_path)?;
1195 continue;
1196 }
1197 let mut file = BufWriter::new(File::create(extract_path)?);
1198 std::io::copy(&mut entry, &mut file)?;
1199 }
1200 std::fs::remove_file(archive_path)?;
1201 Ok(())
1202}
1203
1204pub fn run_external_command(
1205 binary_path: PathBuf,
1206 working_directory_path: PathBuf,
1207 args: Vec<String>,
1208 suppress_stdout: bool,
1209 suppress_stderr: bool,
1210) -> Result<Vec<String>> {
1211 let mut command = Command::new(binary_path.clone());
1212 for arg in &args {
1213 command.arg(arg);
1214 }
1215 command.stdout(Stdio::piped());
1216 command.stderr(Stdio::piped());
1217 command.current_dir(working_directory_path.clone());
1218 debug!("Running {binary_path:#?} with args {args:#?}");
1219 debug!("Working directory set to {working_directory_path:#?}");
1220
1221 let mut child = command.spawn()?;
1222 let mut output_lines = Vec::new();
1223
1224 if let Some(ref mut stdout) = child.stdout {
1225 let reader = BufReader::new(stdout);
1226 for line in reader.lines() {
1227 let line = line?;
1228 if !suppress_stdout {
1229 println!("{line}");
1230 }
1231 output_lines.push(line);
1232 }
1233 }
1234
1235 if let Some(ref mut stderr) = child.stderr {
1236 let reader = BufReader::new(stderr);
1237 for line in reader.lines() {
1238 let line = line?;
1239 if !suppress_stderr {
1240 eprintln!("{line}");
1241 }
1242 output_lines.push(line);
1243 }
1244 }
1245
1246 let output = child.wait()?;
1247 if !output.success() {
1248 let binary_path = binary_path.to_str().unwrap();
1250 return Err(Error::ExternalCommandRunFailed {
1251 binary: binary_path.to_string(),
1252 exit_status: output,
1253 });
1254 }
1255
1256 Ok(output_lines)
1257}
1258
1259pub fn is_binary_on_path(binary_name: &str) -> bool {
1260 if let Ok(path) = std::env::var("PATH") {
1261 for dir in path.split(':') {
1262 let mut full_path = PathBuf::from(dir);
1263 full_path.push(binary_name);
1264 if full_path.exists() {
1265 return true;
1266 }
1267 }
1268 }
1269 false
1270}
1271
1272pub fn get_wallet_directory() -> Result<PathBuf> {
1273 Ok(dirs_next::data_dir()
1274 .ok_or_else(|| Error::CouldNotRetrieveDataDirectory)?
1275 .join("safe")
1276 .join("client")
1277 .join("wallet"))
1278}
1279
1280pub async fn notify_slack(inventory: DeploymentInventory) -> Result<()> {
1281 let webhook_url =
1282 std::env::var("SLACK_WEBHOOK_URL").map_err(|_| Error::SlackWebhookUrlNotSupplied)?;
1283
1284 let mut message = String::new();
1285 message.push_str("*Testnet Details*\n");
1286 message.push_str(&format!("Name: {}\n", inventory.name));
1287 message.push_str(&format!("Node count: {}\n", inventory.peers().len()));
1288 message.push_str(&format!("Faucet address: {:?}\n", inventory.faucet_address));
1289 match inventory.binary_option {
1290 BinaryOption::BuildFromSource {
1291 ref repo_owner,
1292 ref branch,
1293 ..
1294 } => {
1295 message.push_str("*Branch Details*\n");
1296 message.push_str(&format!("Repo owner: {repo_owner}\n"));
1297 message.push_str(&format!("Branch: {branch}\n"));
1298 }
1299 BinaryOption::Versioned {
1300 ant_version: ref safe_version,
1301 antnode_version: ref safenode_version,
1302 antctl_version: ref safenode_manager_version,
1303 ..
1304 } => {
1305 message.push_str("*Version Details*\n");
1306 message.push_str(&format!(
1307 "ant version: {}\n",
1308 safe_version
1309 .as_ref()
1310 .map_or("None".to_string(), |v| v.to_string())
1311 ));
1312 message.push_str(&format!(
1313 "safenode version: {}\n",
1314 safenode_version
1315 .as_ref()
1316 .map_or("None".to_string(), |v| v.to_string())
1317 ));
1318 message.push_str(&format!(
1319 "antctl version: {}\n",
1320 safenode_manager_version
1321 .as_ref()
1322 .map_or("None".to_string(), |v| v.to_string())
1323 ));
1324 }
1325 }
1326
1327 message.push_str("*Sample Peers*\n");
1328 message.push_str("```\n");
1329 for peer in inventory.peers().iter().take(20) {
1330 message.push_str(&format!("{peer}\n"));
1331 }
1332 message.push_str("```\n");
1333 message.push_str("*Available Files*\n");
1334 message.push_str("```\n");
1335 for (addr, file_name) in inventory.uploaded_files.iter() {
1336 message.push_str(&format!("{addr}: {file_name}\n"))
1337 }
1338 message.push_str("```\n");
1339
1340 let payload = json!({
1341 "text": message,
1342 });
1343 reqwest::Client::new()
1344 .post(webhook_url)
1345 .json(&payload)
1346 .send()
1347 .await?;
1348 println!("{message}");
1349 println!("Posted notification to Slack");
1350 Ok(())
1351}
1352
1353fn print_duration(duration: Duration) {
1354 let total_seconds = duration.as_secs();
1355 let minutes = total_seconds / 60;
1356 let seconds = total_seconds % 60;
1357 debug!("Time taken: {minutes} minutes and {seconds} seconds");
1358}
1359
1360pub fn get_progress_bar(length: u64) -> Result<ProgressBar> {
1361 let progress_bar = ProgressBar::new(length);
1362 progress_bar.set_style(
1363 ProgressStyle::default_bar()
1364 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len}")?
1365 .progress_chars("#>-"),
1366 );
1367 progress_bar.enable_steady_tick(Duration::from_millis(100));
1368 Ok(progress_bar)
1369}
1370
1371pub async fn get_environment_details(
1372 environment_name: &str,
1373 s3_repository: &S3Repository,
1374) -> Result<EnvironmentDetails> {
1375 let temp_file = tempfile::NamedTempFile::new()?;
1376
1377 let max_retries = 3;
1378 let mut retries = 0;
1379 let env_details = loop {
1380 debug!("Downloading the environment details file for {environment_name} from S3");
1381 match s3_repository
1382 .download_object("sn-environment-type", environment_name, temp_file.path())
1383 .await
1384 {
1385 Ok(_) => {
1386 debug!("Downloaded the environment details file for {environment_name} from S3");
1387 let content = match std::fs::read_to_string(temp_file.path()) {
1388 Ok(content) => content,
1389 Err(err) => {
1390 log::error!("Could not read the environment details file: {err:?}");
1391 if retries < max_retries {
1392 debug!("Retrying to read the environment details file");
1393 retries += 1;
1394 continue;
1395 } else {
1396 return Err(Error::EnvironmentDetailsNotFound(
1397 environment_name.to_string(),
1398 ));
1399 }
1400 }
1401 };
1402 trace!("Content of the environment details file: {content}");
1403
1404 match serde_json::from_str(&content) {
1405 Ok(environment_details) => break environment_details,
1406 Err(err) => {
1407 log::error!("Could not parse the environment details file: {err:?}");
1408 if retries < max_retries {
1409 debug!("Retrying to parse the environment details file");
1410 retries += 1;
1411 continue;
1412 } else {
1413 return Err(Error::EnvironmentDetailsNotFound(
1414 environment_name.to_string(),
1415 ));
1416 }
1417 }
1418 }
1419 }
1420 Err(err) => {
1421 log::error!(
1422 "Could not download the environment details file for {environment_name} from S3: {err:?}"
1423 );
1424 if retries < max_retries {
1425 retries += 1;
1426 continue;
1427 } else {
1428 return Err(Error::EnvironmentDetailsNotFound(
1429 environment_name.to_string(),
1430 ));
1431 }
1432 }
1433 }
1434 };
1435
1436 debug!("Fetched environment details: {env_details:?}");
1437
1438 Ok(env_details)
1439}
1440
1441pub async fn write_environment_details(
1442 s3_repository: &S3Repository,
1443 environment_name: &str,
1444 environment_details: &EnvironmentDetails,
1445) -> Result<()> {
1446 let temp_dir = tempfile::tempdir()?;
1447 let path = temp_dir.path().to_path_buf().join(environment_name);
1448 let mut file = File::create(&path)?;
1449 let json = serde_json::to_string(environment_details)?;
1450 file.write_all(json.as_bytes())?;
1451 s3_repository
1452 .upload_file("sn-environment-type", &path, true)
1453 .await?;
1454 Ok(())
1455}
1456
1457pub fn calculate_size_per_attached_volume(node_count: u16) -> u16 {
1458 if node_count == 0 {
1459 return 0;
1460 }
1461 let total_volume_required = node_count * STORAGE_REQUIRED_PER_NODE;
1462
1463 (total_volume_required as f64 / 7.0).ceil() as u16
1465}
1466
1467pub fn get_bootstrap_cache_url(ip_addr: &IpAddr) -> String {
1468 format!("http://{ip_addr}/bootstrap_cache.json")
1469}