1pub mod ansible;
8pub mod bootstrap;
9pub mod clients;
10pub mod deploy;
11pub mod digital_ocean;
12pub mod error;
13pub mod funding;
14pub mod infra;
15pub mod inventory;
16pub mod logs;
17pub mod reserved_ip;
18pub mod rpc_client;
19pub mod s3;
20pub mod safe;
21pub mod setup;
22pub mod ssh;
23pub mod terraform;
24pub mod upscale;
25
26const STORAGE_REQUIRED_PER_NODE: u16 = 7;
27
28use crate::{
29 ansible::{
30 extra_vars::ExtraVarsDocBuilder,
31 inventory::{cleanup_environment_inventory, AnsibleInventoryType},
32 provisioning::AnsibleProvisioner,
33 AnsibleRunner,
34 },
35 error::{Error, Result},
36 inventory::{DeploymentInventory, VirtualMachine},
37 rpc_client::RpcClient,
38 s3::S3Repository,
39 ssh::SshClient,
40 terraform::TerraformRunner,
41};
42use ant_service_management::ServiceStatus;
43use flate2::read::GzDecoder;
44use indicatif::{ProgressBar, ProgressStyle};
45use infra::{build_terraform_args, InfraRunOptions};
46use log::{debug, trace};
47use semver::Version;
48use serde::{Deserialize, Serialize};
49use serde_json::json;
50use std::{
51 fs::File,
52 io::{BufRead, BufReader, BufWriter, Write},
53 net::IpAddr,
54 path::{Path, PathBuf},
55 process::{Command, Stdio},
56 str::FromStr,
57 time::Duration,
58};
59use tar::Archive;
60
61const ANSIBLE_DEFAULT_FORKS: usize = 50;
62
63#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
64pub enum DeploymentType {
65 Bootstrap,
67 Client,
69 #[default]
71 New,
72}
73
74#[derive(Debug, Clone, Default, Serialize, Deserialize)]
75pub struct AnvilNodeData {
76 pub data_payments_address: String,
77 pub deployer_wallet_private_key: String,
78 pub payment_token_address: String,
79 pub rpc_url: String,
80}
81
82impl std::fmt::Display for DeploymentType {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self {
85 DeploymentType::Bootstrap => write!(f, "bootstrap"),
86 DeploymentType::Client => write!(f, "clients"),
87 DeploymentType::New => write!(f, "new"),
88 }
89 }
90}
91
92impl std::str::FromStr for DeploymentType {
93 type Err = String;
94
95 fn from_str(s: &str) -> Result<Self, Self::Err> {
96 match s.to_lowercase().as_str() {
97 "bootstrap" => Ok(DeploymentType::Bootstrap),
98 "clients" => Ok(DeploymentType::Client),
99 "new" => Ok(DeploymentType::New),
100 _ => Err(format!("Invalid deployment type: {s}")),
101 }
102 }
103}
104
105#[derive(Debug, Clone)]
106pub enum NodeType {
107 FullConePrivateNode,
108 Generic,
109 Genesis,
110 PeerCache,
111 SymmetricPrivateNode,
112 Upnp,
113}
114
115impl std::fmt::Display for NodeType {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 match self {
118 NodeType::FullConePrivateNode => write!(f, "full-cone-private"),
119 NodeType::Generic => write!(f, "generic"),
120 NodeType::Genesis => write!(f, "genesis"),
121 NodeType::PeerCache => write!(f, "peer-cache"),
122 NodeType::SymmetricPrivateNode => write!(f, "symmetric-private"),
123 NodeType::Upnp => write!(f, "upnp"),
124 }
125 }
126}
127
128impl std::str::FromStr for NodeType {
129 type Err = String;
130
131 fn from_str(s: &str) -> Result<Self, Self::Err> {
132 match s.to_lowercase().as_str() {
133 "full-cone-private" => Ok(NodeType::FullConePrivateNode),
134 "generic" => Ok(NodeType::Generic),
135 "genesis" => Ok(NodeType::Genesis),
136 "peer-cache" => Ok(NodeType::PeerCache),
137 "symmetric-private" => Ok(NodeType::SymmetricPrivateNode),
138 "upnp" => Ok(NodeType::Upnp),
139 _ => Err(format!("Invalid node type: {s}")),
140 }
141 }
142}
143
144impl NodeType {
145 pub fn telegraf_role(&self) -> &'static str {
146 match self {
147 NodeType::FullConePrivateNode => "NAT_STATIC_FULL_CONE_NODE",
148 NodeType::Generic => "GENERIC_NODE",
149 NodeType::Genesis => "GENESIS_NODE",
150 NodeType::PeerCache => "PEER_CACHE_NODE",
151 NodeType::SymmetricPrivateNode => "NAT_RANDOMIZED_NODE",
152 NodeType::Upnp => "UPNP_NODE",
153 }
154 }
155
156 pub fn to_ansible_inventory_type(&self) -> AnsibleInventoryType {
157 match self {
158 NodeType::FullConePrivateNode => AnsibleInventoryType::FullConePrivateNodes,
159 NodeType::Generic => AnsibleInventoryType::Nodes,
160 NodeType::Genesis => AnsibleInventoryType::Genesis,
161 NodeType::PeerCache => AnsibleInventoryType::PeerCacheNodes,
162 NodeType::SymmetricPrivateNode => AnsibleInventoryType::SymmetricPrivateNodes,
163 NodeType::Upnp => AnsibleInventoryType::Upnp,
164 }
165 }
166}
167
168#[derive(Clone, Debug, Default, Eq, Serialize, Deserialize, PartialEq)]
169pub enum EvmNetwork {
170 #[default]
171 Anvil,
172 ArbitrumOne,
173 ArbitrumSepoliaTest,
174 Custom,
175}
176
177impl std::fmt::Display for EvmNetwork {
178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179 match self {
180 EvmNetwork::Anvil => write!(f, "evm-custom"),
181 EvmNetwork::ArbitrumOne => write!(f, "evm-arbitrum-one"),
182 EvmNetwork::ArbitrumSepoliaTest => write!(f, "evm-arbitrum-sepolia-test"),
183 EvmNetwork::Custom => write!(f, "evm-custom"),
184 }
185 }
186}
187
188impl std::str::FromStr for EvmNetwork {
189 type Err = String;
190
191 fn from_str(s: &str) -> Result<Self, Self::Err> {
192 match s.to_lowercase().as_str() {
193 "anvil" => Ok(EvmNetwork::Anvil),
194 "arbitrum-one" => Ok(EvmNetwork::ArbitrumOne),
195 "arbitrum-sepolia-test" => Ok(EvmNetwork::ArbitrumSepoliaTest),
196 "custom" => Ok(EvmNetwork::Custom),
197 _ => Err(format!("Invalid EVM network type: {s}")),
198 }
199 }
200}
201
202#[derive(Clone, Debug, Default, Serialize, Deserialize)]
203pub struct EvmDetails {
204 pub network: EvmNetwork,
205 pub data_payments_address: Option<String>,
206 pub payment_token_address: Option<String>,
207 pub rpc_url: Option<String>,
208}
209
210#[derive(Clone, Debug, Default, Serialize, Deserialize)]
211pub struct EnvironmentDetails {
212 pub deployment_type: DeploymentType,
213 pub environment_type: EnvironmentType,
214 pub evm_details: EvmDetails,
215 pub funding_wallet_address: Option<String>,
216 pub network_id: Option<u8>,
217 pub region: String,
218 pub rewards_address: Option<String>,
219}
220
221#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
222pub enum EnvironmentType {
223 #[default]
224 Development,
225 Production,
226 Staging,
227}
228
229impl EnvironmentType {
230 pub fn get_tfvars_filenames(&self, name: &str, region: &str) -> Vec<String> {
231 match self {
232 EnvironmentType::Development => vec![
233 "dev.tfvars".to_string(),
234 format!("dev-images-{region}.tfvars", region = region),
235 ],
236 EnvironmentType::Staging => vec![
237 "staging.tfvars".to_string(),
238 format!("staging-images-{region}.tfvars", region = region),
239 ],
240 EnvironmentType::Production => {
241 vec![
242 format!("{name}.tfvars", name = name),
243 format!("production-images-{region}.tfvars", region = region),
244 ]
245 }
246 }
247 }
248
249 pub fn get_default_peer_cache_node_count(&self) -> u16 {
250 match self {
251 EnvironmentType::Development => 5,
252 EnvironmentType::Production => 5,
253 EnvironmentType::Staging => 5,
254 }
255 }
256
257 pub fn get_default_node_count(&self) -> u16 {
258 match self {
259 EnvironmentType::Development => 25,
260 EnvironmentType::Production => 25,
261 EnvironmentType::Staging => 25,
262 }
263 }
264
265 pub fn get_default_symmetric_private_node_count(&self) -> u16 {
266 self.get_default_node_count()
267 }
268
269 pub fn get_default_full_cone_private_node_count(&self) -> u16 {
270 self.get_default_node_count()
271 }
272 pub fn get_default_upnp_private_node_count(&self) -> u16 {
273 self.get_default_node_count()
274 }
275}
276
277impl std::fmt::Display for EnvironmentType {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 match self {
280 EnvironmentType::Development => write!(f, "development"),
281 EnvironmentType::Production => write!(f, "production"),
282 EnvironmentType::Staging => write!(f, "staging"),
283 }
284 }
285}
286
287impl FromStr for EnvironmentType {
288 type Err = Error;
289
290 fn from_str(s: &str) -> Result<Self, Self::Err> {
291 match s.to_lowercase().as_str() {
292 "development" => Ok(EnvironmentType::Development),
293 "production" => Ok(EnvironmentType::Production),
294 "staging" => Ok(EnvironmentType::Staging),
295 _ => Err(Error::EnvironmentNameFromStringError(s.to_string())),
296 }
297 }
298}
299
300#[derive(Clone, Debug, Serialize, Deserialize)]
314pub enum BinaryOption {
315 BuildFromSource {
317 antnode_features: Option<String>,
319 branch: String,
320 repo_owner: String,
321 skip_binary_build: bool,
324 },
325 Versioned {
327 ant_version: Option<Version>,
328 antctl_version: Option<Version>,
329 antnode_version: Option<Version>,
330 },
331}
332
333impl BinaryOption {
334 pub fn should_provision_build_machine(&self) -> bool {
335 match self {
336 BinaryOption::BuildFromSource {
337 skip_binary_build, ..
338 } => !skip_binary_build,
339 BinaryOption::Versioned { .. } => false,
340 }
341 }
342
343 pub fn print(&self) {
344 match self {
345 BinaryOption::BuildFromSource {
346 antnode_features,
347 branch,
348 repo_owner,
349 skip_binary_build: _,
350 } => {
351 println!("Source configuration:");
352 println!(" Repository owner: {repo_owner}");
353 println!(" Branch: {branch}");
354 if let Some(features) = antnode_features {
355 println!(" Antnode features: {features}");
356 }
357 }
358 BinaryOption::Versioned {
359 ant_version,
360 antctl_version,
361 antnode_version,
362 } => {
363 println!("Versioned binaries configuration:");
364 if let Some(version) = ant_version {
365 println!(" ant version: {version}");
366 }
367 if let Some(version) = antctl_version {
368 println!(" antctl version: {version}");
369 }
370 if let Some(version) = antnode_version {
371 println!(" antnode version: {version}");
372 }
373 }
374 }
375 }
376}
377
378#[derive(Debug, Clone, Copy)]
379pub enum CloudProvider {
380 Aws,
381 DigitalOcean,
382}
383
384impl std::fmt::Display for CloudProvider {
385 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386 match self {
387 CloudProvider::Aws => write!(f, "aws"),
388 CloudProvider::DigitalOcean => write!(f, "digital-ocean"),
389 }
390 }
391}
392
393impl CloudProvider {
394 pub fn get_ssh_user(&self) -> String {
395 match self {
396 CloudProvider::Aws => "ubuntu".to_string(),
397 CloudProvider::DigitalOcean => "root".to_string(),
398 }
399 }
400}
401
402#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
403pub enum LogFormat {
404 Default,
405 Json,
406}
407
408impl LogFormat {
409 pub fn parse_from_str(val: &str) -> Result<Self> {
410 match val {
411 "default" => Ok(LogFormat::Default),
412 "json" => Ok(LogFormat::Json),
413 _ => Err(Error::LoggingConfiguration(
414 "The only valid values for this argument are \"default\" or \"json\"".to_string(),
415 )),
416 }
417 }
418
419 pub fn as_str(&self) -> &'static str {
420 match self {
421 LogFormat::Default => "default",
422 LogFormat::Json => "json",
423 }
424 }
425}
426
427#[derive(Clone)]
428pub struct UpgradeOptions {
429 pub ansible_verbose: bool,
430 pub branch: Option<String>,
431 pub custom_inventory: Option<Vec<VirtualMachine>>,
432 pub env_variables: Option<Vec<(String, String)>>,
433 pub force: bool,
434 pub forks: usize,
435 pub interval: Duration,
436 pub name: String,
437 pub node_type: Option<NodeType>,
438 pub pre_upgrade_delay: Option<u64>,
439 pub provider: CloudProvider,
440 pub repo_owner: Option<String>,
441 pub version: Option<String>,
442}
443
444impl UpgradeOptions {
445 pub fn get_ansible_vars(&self) -> String {
446 let mut extra_vars = ExtraVarsDocBuilder::default();
447 extra_vars.add_variable("interval", &self.interval.as_millis().to_string());
448 if let Some(env_variables) = &self.env_variables {
449 extra_vars.add_env_variable_list("env_variables", env_variables.clone());
450 }
451 if self.force {
452 extra_vars.add_variable("force", &self.force.to_string());
453 }
454 if let Some(version) = &self.version {
455 extra_vars.add_variable("antnode_version", version);
456 }
457 if let Some(pre_upgrade_delay) = &self.pre_upgrade_delay {
458 extra_vars.add_variable("pre_upgrade_delay", &pre_upgrade_delay.to_string());
459 }
460
461 if let (Some(repo_owner), Some(branch)) = (&self.repo_owner, &self.branch) {
462 let binary_option = BinaryOption::BuildFromSource {
463 antnode_features: None,
464 branch: branch.clone(),
465 repo_owner: repo_owner.clone(),
466 skip_binary_build: true,
467 };
468 extra_vars.add_node_url_or_version(&self.name, &binary_option);
469 }
470
471 extra_vars.build()
472 }
473}
474
475#[derive(Default)]
476pub struct TestnetDeployBuilder {
477 ansible_forks: Option<usize>,
478 ansible_verbose_mode: bool,
479 deployment_type: EnvironmentType,
480 environment_name: String,
481 provider: Option<CloudProvider>,
482 region: Option<String>,
483 ssh_secret_key_path: Option<PathBuf>,
484 state_bucket_name: Option<String>,
485 terraform_binary_path: Option<PathBuf>,
486 vault_password_path: Option<PathBuf>,
487 working_directory_path: Option<PathBuf>,
488}
489
490impl TestnetDeployBuilder {
491 pub fn new() -> Self {
492 Default::default()
493 }
494
495 pub fn ansible_verbose_mode(&mut self, ansible_verbose_mode: bool) -> &mut Self {
496 self.ansible_verbose_mode = ansible_verbose_mode;
497 self
498 }
499
500 pub fn ansible_forks(&mut self, ansible_forks: usize) -> &mut Self {
501 self.ansible_forks = Some(ansible_forks);
502 self
503 }
504
505 pub fn deployment_type(&mut self, deployment_type: EnvironmentType) -> &mut Self {
506 self.deployment_type = deployment_type;
507 self
508 }
509
510 pub fn environment_name(&mut self, name: &str) -> &mut Self {
511 self.environment_name = name.to_string();
512 self
513 }
514
515 pub fn provider(&mut self, provider: CloudProvider) -> &mut Self {
516 self.provider = Some(provider);
517 self
518 }
519
520 pub fn state_bucket_name(&mut self, state_bucket_name: String) -> &mut Self {
521 self.state_bucket_name = Some(state_bucket_name);
522 self
523 }
524
525 pub fn terraform_binary_path(&mut self, terraform_binary_path: PathBuf) -> &mut Self {
526 self.terraform_binary_path = Some(terraform_binary_path);
527 self
528 }
529
530 pub fn working_directory(&mut self, working_directory_path: PathBuf) -> &mut Self {
531 self.working_directory_path = Some(working_directory_path);
532 self
533 }
534
535 pub fn ssh_secret_key_path(&mut self, ssh_secret_key_path: PathBuf) -> &mut Self {
536 self.ssh_secret_key_path = Some(ssh_secret_key_path);
537 self
538 }
539
540 pub fn vault_password_path(&mut self, vault_password_path: PathBuf) -> &mut Self {
541 self.vault_password_path = Some(vault_password_path);
542 self
543 }
544
545 pub fn region(&mut self, region: String) -> &mut Self {
546 self.region = Some(region);
547 self
548 }
549
550 pub fn build(&self) -> Result<TestnetDeployer> {
551 let provider = self.provider.unwrap_or(CloudProvider::DigitalOcean);
552 match provider {
553 CloudProvider::DigitalOcean => {
554 let digital_ocean_pat = std::env::var("DO_PAT").map_err(|_| {
555 Error::CloudProviderCredentialsNotSupplied("DO_PAT".to_string())
556 })?;
557 std::env::set_var("DIGITALOCEAN_TOKEN", digital_ocean_pat.clone());
561 std::env::set_var("DO_API_TOKEN", digital_ocean_pat);
562 }
563 _ => {
564 return Err(Error::CloudProviderNotSupported(provider.to_string()));
565 }
566 }
567
568 let state_bucket_name = match self.state_bucket_name {
569 Some(ref bucket_name) => bucket_name.clone(),
570 None => std::env::var("TERRAFORM_STATE_BUCKET_NAME")?,
571 };
572
573 let default_terraform_bin_path = PathBuf::from("terraform");
574 let terraform_binary_path = self
575 .terraform_binary_path
576 .as_ref()
577 .unwrap_or(&default_terraform_bin_path);
578
579 let working_directory_path = match self.working_directory_path {
580 Some(ref work_dir_path) => work_dir_path.clone(),
581 None => std::env::current_dir()?.join("resources"),
582 };
583
584 let ssh_secret_key_path = match self.ssh_secret_key_path {
585 Some(ref ssh_sk_path) => ssh_sk_path.clone(),
586 None => PathBuf::from(std::env::var("SSH_KEY_PATH")?),
587 };
588
589 let vault_password_path = match self.vault_password_path {
590 Some(ref vault_pw_path) => vault_pw_path.clone(),
591 None => PathBuf::from(std::env::var("ANSIBLE_VAULT_PASSWORD_PATH")?),
592 };
593
594 let region = match self.region {
595 Some(ref region) => region.clone(),
596 None => "lon1".to_string(),
597 };
598
599 let terraform_runner = TerraformRunner::new(
600 terraform_binary_path.to_path_buf(),
601 working_directory_path
602 .join("terraform")
603 .join("testnet")
604 .join(provider.to_string()),
605 provider,
606 &state_bucket_name,
607 )?;
608 let ansible_runner = AnsibleRunner::new(
609 self.ansible_forks.unwrap_or(ANSIBLE_DEFAULT_FORKS),
610 self.ansible_verbose_mode,
611 &self.environment_name,
612 provider,
613 ssh_secret_key_path.clone(),
614 vault_password_path,
615 working_directory_path.join("ansible"),
616 )?;
617 let ssh_client = SshClient::new(ssh_secret_key_path);
618 let ansible_provisioner =
619 AnsibleProvisioner::new(ansible_runner, provider, ssh_client.clone());
620 let rpc_client = RpcClient::new(
621 PathBuf::from("/usr/local/bin/safenode_rpc_client"),
622 working_directory_path.clone(),
623 );
624
625 let safe_path = working_directory_path.join("safe");
628 if safe_path.exists() {
629 std::fs::remove_file(safe_path)?;
630 }
631
632 let testnet = TestnetDeployer::new(
633 ansible_provisioner,
634 provider,
635 self.deployment_type.clone(),
636 &self.environment_name,
637 rpc_client,
638 S3Repository {},
639 ssh_client,
640 terraform_runner,
641 working_directory_path,
642 region,
643 )?;
644
645 Ok(testnet)
646 }
647}
648
649#[derive(Clone)]
650pub struct TestnetDeployer {
651 pub ansible_provisioner: AnsibleProvisioner,
652 pub cloud_provider: CloudProvider,
653 pub deployment_type: EnvironmentType,
654 pub environment_name: String,
655 pub inventory_file_path: PathBuf,
656 pub region: String,
657 pub rpc_client: RpcClient,
658 pub s3_repository: S3Repository,
659 pub ssh_client: SshClient,
660 pub terraform_runner: TerraformRunner,
661 pub working_directory_path: PathBuf,
662}
663
664impl TestnetDeployer {
665 #[allow(clippy::too_many_arguments)]
666 pub fn new(
667 ansible_provisioner: AnsibleProvisioner,
668 cloud_provider: CloudProvider,
669 deployment_type: EnvironmentType,
670 environment_name: &str,
671 rpc_client: RpcClient,
672 s3_repository: S3Repository,
673 ssh_client: SshClient,
674 terraform_runner: TerraformRunner,
675 working_directory_path: PathBuf,
676 region: String,
677 ) -> Result<TestnetDeployer> {
678 if environment_name.is_empty() {
679 return Err(Error::EnvironmentNameRequired);
680 }
681 let inventory_file_path = working_directory_path
682 .join("ansible")
683 .join("inventory")
684 .join("dev_inventory_digital_ocean.yml");
685 Ok(TestnetDeployer {
686 ansible_provisioner,
687 cloud_provider,
688 deployment_type,
689 environment_name: environment_name.to_string(),
690 inventory_file_path,
691 region,
692 rpc_client,
693 ssh_client,
694 s3_repository,
695 terraform_runner,
696 working_directory_path,
697 })
698 }
699
700 pub async fn init(&self) -> Result<()> {
701 if self
702 .s3_repository
703 .folder_exists(
704 "sn-testnet",
705 &format!("testnet-logs/{}", self.environment_name),
706 )
707 .await?
708 {
709 return Err(Error::LogsForPreviousTestnetExist(
710 self.environment_name.clone(),
711 ));
712 }
713
714 self.terraform_runner.init()?;
715 let workspaces = self.terraform_runner.workspace_list()?;
716 if !workspaces.contains(&self.environment_name) {
717 self.terraform_runner
718 .workspace_new(&self.environment_name)?;
719 } else {
720 println!("Workspace {} already exists", self.environment_name);
721 }
722
723 let rpc_client_path = self.working_directory_path.join("safenode_rpc_client");
724 if !rpc_client_path.is_file() {
725 println!("Downloading the rpc client for safenode...");
726 let archive_name = "safenode_rpc_client-latest-x86_64-unknown-linux-musl.tar.gz";
727 get_and_extract_archive_from_s3(
728 &self.s3_repository,
729 "sn-node-rpc-client",
730 archive_name,
731 &self.working_directory_path,
732 )
733 .await?;
734 #[cfg(unix)]
735 {
736 use std::os::unix::fs::PermissionsExt;
737 let mut permissions = std::fs::metadata(&rpc_client_path)?.permissions();
738 permissions.set_mode(0o755); std::fs::set_permissions(&rpc_client_path, permissions)?;
740 }
741 }
742
743 Ok(())
744 }
745
746 pub fn plan(&self, options: &InfraRunOptions) -> Result<()> {
747 println!("Selecting {} workspace...", options.name);
748 self.terraform_runner.workspace_select(&options.name)?;
749
750 let args = build_terraform_args(options)?;
751
752 self.terraform_runner
753 .plan(Some(args), options.tfvars_filenames.clone())?;
754 Ok(())
755 }
756
757 pub fn start(
758 &self,
759 interval: Duration,
760 node_type: Option<NodeType>,
761 custom_inventory: Option<Vec<VirtualMachine>>,
762 ) -> Result<()> {
763 self.ansible_provisioner.start_nodes(
764 &self.environment_name,
765 interval,
766 node_type,
767 custom_inventory,
768 )?;
769 Ok(())
770 }
771
772 pub fn status(&self) -> Result<()> {
778 self.ansible_provisioner.status()?;
779
780 let peer_cache_node_registries = self
781 .ansible_provisioner
782 .get_node_registries(&AnsibleInventoryType::PeerCacheNodes)?;
783 let generic_node_registries = self
784 .ansible_provisioner
785 .get_node_registries(&AnsibleInventoryType::Nodes)?;
786 let symmetric_private_node_registries = self
787 .ansible_provisioner
788 .get_node_registries(&AnsibleInventoryType::SymmetricPrivateNodes)?;
789 let full_cone_private_node_registries = self
790 .ansible_provisioner
791 .get_node_registries(&AnsibleInventoryType::FullConePrivateNodes)?;
792 let genesis_node_registry = self
793 .ansible_provisioner
794 .get_node_registries(&AnsibleInventoryType::Genesis)?
795 .clone();
796
797 peer_cache_node_registries.print();
798 generic_node_registries.print();
799 symmetric_private_node_registries.print();
800 full_cone_private_node_registries.print();
801 genesis_node_registry.print();
802
803 let all_registries = [
804 &peer_cache_node_registries,
805 &generic_node_registries,
806 &symmetric_private_node_registries,
807 &full_cone_private_node_registries,
808 &genesis_node_registry,
809 ];
810
811 let mut total_nodes = 0;
812 let mut running_nodes = 0;
813 let mut stopped_nodes = 0;
814 let mut added_nodes = 0;
815 let mut removed_nodes = 0;
816
817 for (_, registry) in all_registries
818 .iter()
819 .flat_map(|r| r.retrieved_registries.iter())
820 {
821 for node in registry.nodes.iter() {
822 total_nodes += 1;
823 match node.status {
824 ServiceStatus::Running => running_nodes += 1,
825 ServiceStatus::Stopped => stopped_nodes += 1,
826 ServiceStatus::Added => added_nodes += 1,
827 ServiceStatus::Removed => removed_nodes += 1,
828 }
829 }
830 }
831
832 let peer_cache_hosts = peer_cache_node_registries.retrieved_registries.len();
833 let generic_hosts = generic_node_registries.retrieved_registries.len();
834 let symmetric_private_hosts = symmetric_private_node_registries.retrieved_registries.len();
835 let full_cone_private_hosts = full_cone_private_node_registries.retrieved_registries.len();
836
837 let peer_cache_nodes = peer_cache_node_registries
838 .retrieved_registries
839 .iter()
840 .flat_map(|(_, n)| n.nodes.iter())
841 .count();
842 let generic_nodes = generic_node_registries
843 .retrieved_registries
844 .iter()
845 .flat_map(|(_, n)| n.nodes.iter())
846 .count();
847 let symmetric_private_nodes = symmetric_private_node_registries
848 .retrieved_registries
849 .iter()
850 .flat_map(|(_, n)| n.nodes.iter())
851 .count();
852 let full_cone_private_nodes = full_cone_private_node_registries
853 .retrieved_registries
854 .iter()
855 .flat_map(|(_, n)| n.nodes.iter())
856 .count();
857
858 println!("-------");
859 println!("Summary");
860 println!("-------");
861 println!(
862 "Total peer cache nodes ({}x{}): {}",
863 peer_cache_hosts,
864 if peer_cache_hosts > 0 {
865 peer_cache_nodes / peer_cache_hosts
866 } else {
867 0
868 },
869 peer_cache_nodes
870 );
871 println!(
872 "Total generic nodes ({}x{}): {}",
873 generic_hosts,
874 if generic_hosts > 0 {
875 generic_nodes / generic_hosts
876 } else {
877 0
878 },
879 generic_nodes
880 );
881 println!(
882 "Total symmetric private nodes ({}x{}): {}",
883 symmetric_private_hosts,
884 if symmetric_private_hosts > 0 {
885 symmetric_private_nodes / symmetric_private_hosts
886 } else {
887 0
888 },
889 symmetric_private_nodes
890 );
891 println!(
892 "Total full cone private nodes ({}x{}): {}",
893 full_cone_private_hosts,
894 if full_cone_private_hosts > 0 {
895 full_cone_private_nodes / full_cone_private_hosts
896 } else {
897 0
898 },
899 full_cone_private_nodes
900 );
901 println!("Total nodes: {total_nodes}");
902 println!("Running nodes: {running_nodes}");
903 println!("Stopped nodes: {stopped_nodes}");
904 println!("Added nodes: {added_nodes}");
905 println!("Removed nodes: {removed_nodes}");
906
907 Ok(())
908 }
909
910 pub fn cleanup_node_logs(&self, setup_cron: bool) -> Result<()> {
911 self.ansible_provisioner.cleanup_node_logs(setup_cron)?;
912 Ok(())
913 }
914
915 pub fn start_telegraf(
916 &self,
917 node_type: Option<NodeType>,
918 custom_inventory: Option<Vec<VirtualMachine>>,
919 ) -> Result<()> {
920 self.ansible_provisioner.start_telegraf(
921 &self.environment_name,
922 node_type,
923 custom_inventory,
924 )?;
925 Ok(())
926 }
927
928 pub fn stop(
929 &self,
930 interval: Duration,
931 node_type: Option<NodeType>,
932 custom_inventory: Option<Vec<VirtualMachine>>,
933 delay: Option<u64>,
934 service_names: Option<Vec<String>>,
935 ) -> Result<()> {
936 self.ansible_provisioner.stop_nodes(
937 &self.environment_name,
938 interval,
939 node_type,
940 custom_inventory,
941 delay,
942 service_names,
943 )?;
944 Ok(())
945 }
946
947 pub fn stop_telegraf(
948 &self,
949 node_type: Option<NodeType>,
950 custom_inventory: Option<Vec<VirtualMachine>>,
951 ) -> Result<()> {
952 self.ansible_provisioner.stop_telegraf(
953 &self.environment_name,
954 node_type,
955 custom_inventory,
956 )?;
957 Ok(())
958 }
959
960 pub fn upgrade(&self, options: UpgradeOptions) -> Result<()> {
961 self.ansible_provisioner.upgrade_nodes(&options)?;
962 Ok(())
963 }
964
965 pub fn upgrade_antctl(
966 &self,
967 version: Version,
968 node_type: Option<NodeType>,
969 custom_inventory: Option<Vec<VirtualMachine>>,
970 ) -> Result<()> {
971 self.ansible_provisioner.upgrade_antctl(
972 &self.environment_name,
973 &version,
974 node_type,
975 custom_inventory,
976 )?;
977 Ok(())
978 }
979
980 pub fn upgrade_geoip_telegraf(&self, name: &str) -> Result<()> {
981 self.ansible_provisioner.upgrade_geoip_telegraf(name)?;
982 Ok(())
983 }
984
985 pub fn upgrade_node_telegraf(&self, name: &str) -> Result<()> {
986 self.ansible_provisioner.upgrade_node_telegraf(name)?;
987 Ok(())
988 }
989
990 pub fn upgrade_client_telegraf(&self, name: &str) -> Result<()> {
991 self.ansible_provisioner.upgrade_client_telegraf(name)?;
992 Ok(())
993 }
994
995 pub async fn clean(&self) -> Result<()> {
996 let environment_details =
997 get_environment_details(&self.environment_name, &self.s3_repository)
998 .await
999 .inspect_err(|err| {
1000 println!("Failed to get environment details: {err}. Continuing cleanup...");
1001 })
1002 .ok();
1003 if let Some(environment_details) = &environment_details {
1004 funding::drain_funds(&self.ansible_provisioner, environment_details).await?;
1005 }
1006
1007 self.destroy_infra(environment_details).await?;
1008
1009 cleanup_environment_inventory(
1010 &self.environment_name,
1011 &self
1012 .working_directory_path
1013 .join("ansible")
1014 .join("inventory"),
1015 None,
1016 )?;
1017
1018 println!("Deleted Ansible inventory for {}", self.environment_name);
1019
1020 if let Err(err) = self
1021 .s3_repository
1022 .delete_object("sn-environment-type", &self.environment_name)
1023 .await
1024 {
1025 println!("Failed to delete environment type: {err}. Continuing cleanup...");
1026 }
1027 Ok(())
1028 }
1029
1030 async fn destroy_infra(&self, environment_details: Option<EnvironmentDetails>) -> Result<()> {
1031 infra::select_workspace(&self.terraform_runner, &self.environment_name)?;
1032
1033 let options = InfraRunOptions::generate_existing(
1034 &self.environment_name,
1035 &self.region,
1036 &self.terraform_runner,
1037 environment_details.as_ref(),
1038 )
1039 .await?;
1040
1041 let args = build_terraform_args(&options)?;
1042 let tfvars_filenames = if let Some(environment_details) = &environment_details {
1043 environment_details
1044 .environment_type
1045 .get_tfvars_filenames(&self.environment_name, &self.region)
1046 } else {
1047 vec![]
1048 };
1049
1050 self.terraform_runner
1051 .destroy(Some(args), Some(tfvars_filenames))?;
1052
1053 infra::delete_workspace(&self.terraform_runner, &self.environment_name)?;
1054
1055 Ok(())
1056 }
1057}
1058
1059pub fn get_genesis_multiaddr(
1064 ansible_runner: &AnsibleRunner,
1065 ssh_client: &SshClient,
1066) -> Result<(String, IpAddr)> {
1067 let genesis_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Genesis, true)?;
1068 let genesis_ip = genesis_inventory[0].public_ip_addr;
1069
1070 let multiaddr = ssh_client
1074 .run_command(
1075 &genesis_ip,
1076 "root",
1077 "jq -r '.nodes[] | select(.initial_peers_config.first == true) | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1078 false,
1079 )
1080 .map(|output| output.first().cloned())
1081 .unwrap_or_else(|err| {
1082 log::error!("Failed to find first node with quic-v1 protocol: {err:?}");
1083 None
1084 });
1085
1086 let multiaddr = match multiaddr {
1088 Some(addr) => addr,
1089 None => ssh_client
1090 .run_command(
1091 &genesis_ip,
1092 "root",
1093 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not) | select(contains(\"quic-v1\"))' /var/antctl/node_registry.json | head -n 1",
1094 false,
1095 )?
1096 .first()
1097 .cloned()
1098 .ok_or_else(|| Error::GenesisListenAddress)?,
1099 };
1100
1101 Ok((multiaddr, genesis_ip))
1102}
1103
1104pub fn get_anvil_node_data(
1105 ansible_runner: &AnsibleRunner,
1106 ssh_client: &SshClient,
1107) -> Result<AnvilNodeData> {
1108 let evm_inventory = ansible_runner.get_inventory(AnsibleInventoryType::EvmNodes, true)?;
1109 if evm_inventory.is_empty() {
1110 return Err(Error::EvmNodeNotFound);
1111 }
1112
1113 let evm_ip = evm_inventory[0].public_ip_addr;
1114 debug!("Retrieved IP address for EVM node: {evm_ip}");
1115 let csv_file_path = "/home/ant/.local/share/autonomi/evm_testnet_data.csv";
1116
1117 const MAX_ATTEMPTS: u8 = 5;
1118 const RETRY_DELAY: Duration = Duration::from_secs(5);
1119
1120 for attempt in 1..=MAX_ATTEMPTS {
1121 match ssh_client.run_command(&evm_ip, "ant", &format!("cat {csv_file_path}"), false) {
1122 Ok(output) => {
1123 if let Some(csv_contents) = output.first() {
1124 let parts: Vec<&str> = csv_contents.split(',').collect();
1125 if parts.len() != 4 {
1126 return Err(Error::EvmTestnetDataParsingError(
1127 "Expected 4 fields in the CSV".to_string(),
1128 ));
1129 }
1130
1131 let evm_testnet_data = AnvilNodeData {
1132 rpc_url: parts[0].trim().to_string(),
1133 payment_token_address: parts[1].trim().to_string(),
1134 data_payments_address: parts[2].trim().to_string(),
1135 deployer_wallet_private_key: parts[3].trim().to_string(),
1136 };
1137 return Ok(evm_testnet_data);
1138 }
1139 }
1140 Err(e) => {
1141 if attempt == MAX_ATTEMPTS {
1142 return Err(e);
1143 }
1144 println!(
1145 "Attempt {} failed to read EVM testnet data. Retrying in {} seconds...",
1146 attempt,
1147 RETRY_DELAY.as_secs()
1148 );
1149 }
1150 }
1151 std::thread::sleep(RETRY_DELAY);
1152 }
1153
1154 Err(Error::EvmTestnetDataNotFound)
1155}
1156
1157pub fn get_multiaddr(
1158 ansible_runner: &AnsibleRunner,
1159 ssh_client: &SshClient,
1160) -> Result<(String, IpAddr)> {
1161 let node_inventory = ansible_runner.get_inventory(AnsibleInventoryType::Nodes, true)?;
1162 let node_ip = node_inventory
1165 .iter()
1166 .find(|vm| vm.name.ends_with("-node-1"))
1167 .ok_or_else(|| Error::NodeAddressNotFound)?
1168 .public_ip_addr;
1169
1170 debug!("Getting multiaddr from node {node_ip}");
1171
1172 let multiaddr =
1173 ssh_client
1174 .run_command(
1175 &node_ip,
1176 "root",
1177 "jq -r '.nodes[] | .listen_addr[] | select(contains(\"127.0.0.1\") | not)' /var/antctl/node_registry.json | head -n 1",
1179 false,
1180 )?.first()
1181 .cloned()
1182 .ok_or_else(|| Error::NodeAddressNotFound)?;
1183
1184 Ok((multiaddr, node_ip))
1187}
1188
1189pub async fn get_and_extract_archive_from_s3(
1190 s3_repository: &S3Repository,
1191 bucket_name: &str,
1192 archive_bucket_path: &str,
1193 dest_path: &Path,
1194) -> Result<()> {
1195 let archive_file_name = archive_bucket_path.split('/').next_back().unwrap();
1198 let archive_dest_path = dest_path.join(archive_file_name);
1199 s3_repository
1200 .download_object(bucket_name, archive_bucket_path, &archive_dest_path)
1201 .await?;
1202 extract_archive(&archive_dest_path, dest_path)?;
1203 Ok(())
1204}
1205
1206pub fn extract_archive(archive_path: &Path, dest_path: &Path) -> Result<()> {
1207 let archive_file = File::open(archive_path)?;
1208 let decoder = GzDecoder::new(archive_file);
1209 let mut archive = Archive::new(decoder);
1210 let entries = archive.entries()?;
1211 for entry_result in entries {
1212 let mut entry = entry_result?;
1213 let extract_path = dest_path.join(entry.path()?);
1214 if entry.header().entry_type() == tar::EntryType::Directory {
1215 std::fs::create_dir_all(extract_path)?;
1216 continue;
1217 }
1218 let mut file = BufWriter::new(File::create(extract_path)?);
1219 std::io::copy(&mut entry, &mut file)?;
1220 }
1221 std::fs::remove_file(archive_path)?;
1222 Ok(())
1223}
1224
1225pub fn run_external_command(
1226 binary_path: PathBuf,
1227 working_directory_path: PathBuf,
1228 args: Vec<String>,
1229 suppress_stdout: bool,
1230 suppress_stderr: bool,
1231) -> Result<Vec<String>> {
1232 let mut command = Command::new(binary_path.clone());
1233 for arg in &args {
1234 command.arg(arg);
1235 }
1236 command.stdout(Stdio::piped());
1237 command.stderr(Stdio::piped());
1238 command.current_dir(working_directory_path.clone());
1239 debug!("Running {binary_path:#?} with args {args:#?}");
1240 debug!("Working directory set to {working_directory_path:#?}");
1241
1242 let mut child = command.spawn()?;
1243 let mut output_lines = Vec::new();
1244
1245 if let Some(ref mut stdout) = child.stdout {
1246 let reader = BufReader::new(stdout);
1247 for line in reader.lines() {
1248 let line = line?;
1249 if !suppress_stdout {
1250 println!("{line}");
1251 }
1252 output_lines.push(line);
1253 }
1254 }
1255
1256 if let Some(ref mut stderr) = child.stderr {
1257 let reader = BufReader::new(stderr);
1258 for line in reader.lines() {
1259 let line = line?;
1260 if !suppress_stderr {
1261 eprintln!("{line}");
1262 }
1263 output_lines.push(line);
1264 }
1265 }
1266
1267 let output = child.wait()?;
1268 if !output.success() {
1269 let binary_path = binary_path.to_str().unwrap();
1271 return Err(Error::ExternalCommandRunFailed {
1272 binary: binary_path.to_string(),
1273 exit_status: output,
1274 });
1275 }
1276
1277 Ok(output_lines)
1278}
1279
1280pub fn is_binary_on_path(binary_name: &str) -> bool {
1281 if let Ok(path) = std::env::var("PATH") {
1282 for dir in path.split(':') {
1283 let mut full_path = PathBuf::from(dir);
1284 full_path.push(binary_name);
1285 if full_path.exists() {
1286 return true;
1287 }
1288 }
1289 }
1290 false
1291}
1292
1293pub fn get_wallet_directory() -> Result<PathBuf> {
1294 Ok(dirs_next::data_dir()
1295 .ok_or_else(|| Error::CouldNotRetrieveDataDirectory)?
1296 .join("safe")
1297 .join("client")
1298 .join("wallet"))
1299}
1300
1301pub async fn notify_slack(inventory: DeploymentInventory) -> Result<()> {
1302 let webhook_url =
1303 std::env::var("SLACK_WEBHOOK_URL").map_err(|_| Error::SlackWebhookUrlNotSupplied)?;
1304
1305 let mut message = String::new();
1306 message.push_str("*Testnet Details*\n");
1307 message.push_str(&format!("Name: {}\n", inventory.name));
1308 message.push_str(&format!("Node count: {}\n", inventory.peers().len()));
1309 message.push_str(&format!("Faucet address: {:?}\n", inventory.faucet_address));
1310 match inventory.binary_option {
1311 BinaryOption::BuildFromSource {
1312 ref repo_owner,
1313 ref branch,
1314 ..
1315 } => {
1316 message.push_str("*Branch Details*\n");
1317 message.push_str(&format!("Repo owner: {repo_owner}\n"));
1318 message.push_str(&format!("Branch: {branch}\n"));
1319 }
1320 BinaryOption::Versioned {
1321 ant_version: ref safe_version,
1322 antnode_version: ref safenode_version,
1323 antctl_version: ref safenode_manager_version,
1324 ..
1325 } => {
1326 message.push_str("*Version Details*\n");
1327 message.push_str(&format!(
1328 "ant version: {}\n",
1329 safe_version
1330 .as_ref()
1331 .map_or("None".to_string(), |v| v.to_string())
1332 ));
1333 message.push_str(&format!(
1334 "safenode version: {}\n",
1335 safenode_version
1336 .as_ref()
1337 .map_or("None".to_string(), |v| v.to_string())
1338 ));
1339 message.push_str(&format!(
1340 "antctl version: {}\n",
1341 safenode_manager_version
1342 .as_ref()
1343 .map_or("None".to_string(), |v| v.to_string())
1344 ));
1345 }
1346 }
1347
1348 message.push_str("*Sample Peers*\n");
1349 message.push_str("```\n");
1350 for peer in inventory.peers().iter().take(20) {
1351 message.push_str(&format!("{peer}\n"));
1352 }
1353 message.push_str("```\n");
1354 message.push_str("*Available Files*\n");
1355 message.push_str("```\n");
1356 for (addr, file_name) in inventory.uploaded_files.iter() {
1357 message.push_str(&format!("{addr}: {file_name}\n"))
1358 }
1359 message.push_str("```\n");
1360
1361 let payload = json!({
1362 "text": message,
1363 });
1364 reqwest::Client::new()
1365 .post(webhook_url)
1366 .json(&payload)
1367 .send()
1368 .await?;
1369 println!("{message}");
1370 println!("Posted notification to Slack");
1371 Ok(())
1372}
1373
1374fn print_duration(duration: Duration) {
1375 let total_seconds = duration.as_secs();
1376 let minutes = total_seconds / 60;
1377 let seconds = total_seconds % 60;
1378 debug!("Time taken: {minutes} minutes and {seconds} seconds");
1379}
1380
1381pub fn get_progress_bar(length: u64) -> Result<ProgressBar> {
1382 let progress_bar = ProgressBar::new(length);
1383 progress_bar.set_style(
1384 ProgressStyle::default_bar()
1385 .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len}")?
1386 .progress_chars("#>-"),
1387 );
1388 progress_bar.enable_steady_tick(Duration::from_millis(100));
1389 Ok(progress_bar)
1390}
1391
1392pub async fn get_environment_details(
1393 environment_name: &str,
1394 s3_repository: &S3Repository,
1395) -> Result<EnvironmentDetails> {
1396 let temp_file = tempfile::NamedTempFile::new()?;
1397
1398 let max_retries = 3;
1399 let mut retries = 0;
1400 let env_details = loop {
1401 debug!("Downloading the environment details file for {environment_name} from S3");
1402 match s3_repository
1403 .download_object("sn-environment-type", environment_name, temp_file.path())
1404 .await
1405 {
1406 Ok(_) => {
1407 debug!("Downloaded the environment details file for {environment_name} from S3");
1408 let content = match std::fs::read_to_string(temp_file.path()) {
1409 Ok(content) => content,
1410 Err(err) => {
1411 log::error!("Could not read the environment details file: {err:?}");
1412 if retries < max_retries {
1413 debug!("Retrying to read the environment details file");
1414 retries += 1;
1415 continue;
1416 } else {
1417 return Err(Error::EnvironmentDetailsNotFound(
1418 environment_name.to_string(),
1419 ));
1420 }
1421 }
1422 };
1423 trace!("Content of the environment details file: {content}");
1424
1425 match serde_json::from_str(&content) {
1426 Ok(environment_details) => break environment_details,
1427 Err(err) => {
1428 log::error!("Could not parse the environment details file: {err:?}");
1429 if retries < max_retries {
1430 debug!("Retrying to parse the environment details file");
1431 retries += 1;
1432 continue;
1433 } else {
1434 return Err(Error::EnvironmentDetailsNotFound(
1435 environment_name.to_string(),
1436 ));
1437 }
1438 }
1439 }
1440 }
1441 Err(err) => {
1442 log::error!(
1443 "Could not download the environment details file for {environment_name} from S3: {err:?}"
1444 );
1445 if retries < max_retries {
1446 retries += 1;
1447 continue;
1448 } else {
1449 return Err(Error::EnvironmentDetailsNotFound(
1450 environment_name.to_string(),
1451 ));
1452 }
1453 }
1454 }
1455 };
1456
1457 debug!("Fetched environment details: {env_details:?}");
1458
1459 Ok(env_details)
1460}
1461
1462pub async fn write_environment_details(
1463 s3_repository: &S3Repository,
1464 environment_name: &str,
1465 environment_details: &EnvironmentDetails,
1466) -> Result<()> {
1467 let temp_dir = tempfile::tempdir()?;
1468 let path = temp_dir.path().to_path_buf().join(environment_name);
1469 let mut file = File::create(&path)?;
1470 let json = serde_json::to_string(environment_details)?;
1471 file.write_all(json.as_bytes())?;
1472 s3_repository
1473 .upload_file("sn-environment-type", &path, true)
1474 .await?;
1475 Ok(())
1476}
1477
1478pub fn calculate_size_per_attached_volume(node_count: u16) -> u16 {
1479 if node_count == 0 {
1480 return 0;
1481 }
1482 let total_volume_required = node_count * STORAGE_REQUIRED_PER_NODE;
1483
1484 (total_volume_required as f64 / 7.0).ceil() as u16
1486}
1487
1488pub fn get_bootstrap_cache_url(ip_addr: &IpAddr) -> String {
1489 format!("http://{ip_addr}/bootstrap_cache.json")
1490}