#![allow(dead_code)]
pub mod client;
use self::client::LocalNetwork;
use ant_protocol::antnode_proto::{ant_node_client::AntNodeClient, NodeInfoRequest};
use ant_service_management::{
antctl_proto::ant_ctl_client::AntCtlClient, get_local_node_registry_path, NodeRegistryManager,
};
use eyre::{bail, eyre, OptionExt, Result};
use itertools::Either;
use libp2p::PeerId;
use std::{net::SocketAddr, time::Duration};
use test_utils::testnet::DeploymentInventory;
use tonic::Request;
use tracing::{debug, error, warn};
pub async fn get_antnode_rpc_client(
socket_addr: SocketAddr,
) -> Result<AntNodeClient<tonic::transport::Channel>> {
let endpoint = format!("https://{socket_addr}");
let mut attempts = 0;
loop {
if let Ok(rpc_client) = AntNodeClient::connect(endpoint.clone()).await {
break Ok(rpc_client);
}
attempts += 1;
println!("Could not connect to rpc {endpoint:?}. Attempts: {attempts:?}/10");
error!("Could not connect to rpc {endpoint:?}. Attempts: {attempts:?}/10");
tokio::time::sleep(Duration::from_secs(1)).await;
if attempts >= 10 {
bail!("Failed to connect to {endpoint:?} even after 10 retries");
}
}
}
pub async fn get_antctl_rpc_client(
socket_addr: SocketAddr,
) -> Result<AntCtlClient<tonic::transport::Channel>> {
let endpoint = format!("https://{socket_addr}");
let mut attempts = 0;
loop {
if let Ok(rpc_client) = AntCtlClient::connect(endpoint.clone()).await {
break Ok(rpc_client);
}
attempts += 1;
println!("Could not connect to rpc {endpoint:?}. Attempts: {attempts:?}/10");
error!("Could not connect to rpc {endpoint:?}. Attempts: {attempts:?}/10");
tokio::time::sleep(Duration::from_secs(1)).await;
if attempts >= 10 {
bail!("Failed to connect to {endpoint:?} even after 10 retries");
}
}
}
pub async fn get_all_peer_ids(node_rpc_addresses: &Vec<SocketAddr>) -> Result<Vec<PeerId>> {
let mut all_peers = Vec::new();
for addr in node_rpc_addresses {
let mut rpc_client = get_antnode_rpc_client(*addr).await?;
let response = rpc_client
.node_info(Request::new(NodeInfoRequest {}))
.await?;
let peer_id = PeerId::from_bytes(&response.get_ref().peer_id)?;
all_peers.push(peer_id);
}
debug!(
"Obtained the PeerId list for the running network with a node count of {}",
node_rpc_addresses.len()
);
Ok(all_peers)
}
pub struct NodeRestart {
inventory_file: Either<DeploymentInventory, NodeRegistryManager>,
next_to_restart_idx: usize,
skip_genesis_for_droplet: bool,
retain_peer_id: bool,
}
impl NodeRestart {
pub async fn new(skip_genesis_for_droplet: bool, retain_peer_id: bool) -> Result<Self> {
let inventory_file = match DeploymentInventory::load() {
Ok(inv) => Either::Left(inv),
Err(_) => {
let reg = NodeRegistryManager::load(&get_local_node_registry_path()?).await?;
Either::Right(reg)
}
};
Ok(Self {
inventory_file,
next_to_restart_idx: 0,
skip_genesis_for_droplet,
retain_peer_id,
})
}
pub async fn restart_next(
&mut self,
loop_over: bool,
progress_on_error: bool,
) -> Result<Option<SocketAddr>> {
let antnode_rpc_endpoint = match self.inventory_file.clone() {
Either::Left(inv) => {
if loop_over && self.next_to_restart_idx > inv.antctld_endpoints.len() {
self.next_to_restart_idx = 0;
}
if let Some((peer_id, daemon_endpoint)) =
inv.antctld_endpoints.iter().nth(self.next_to_restart_idx)
{
self.restart(*peer_id, *daemon_endpoint, progress_on_error)
.await?;
let antnode_rpc_endpoint = inv
.rpc_endpoints
.get(peer_id)
.ok_or_eyre("Failed to obtain antnode rpc endpoint from inventory file")?;
Some(*antnode_rpc_endpoint)
} else {
warn!("We have restarted all the nodes in the list. Since loop_over is false, we are not restarting any nodes now.");
None
}
}
Either::Right(reg) => {
if loop_over && self.next_to_restart_idx > reg.nodes.read().await.len() {
self.next_to_restart_idx = 0;
}
let to_restart = reg
.nodes
.read()
.await
.get(self.next_to_restart_idx)
.cloned();
if let Some(node_to_restart) = to_restart {
let node = node_to_restart.read().await;
let peer_id = node.peer_id.ok_or_eyre(
"PeerId should be present for a local node in NodeRegistryManager",
)?;
let antnode_rpc_endpoint = node.rpc_socket_addr;
self.restart(peer_id, antnode_rpc_endpoint, progress_on_error)
.await?;
Some(antnode_rpc_endpoint)
} else {
warn!("We have restarted all the nodes in the list. Since loop_over is false, we are not restarting any nodes now.");
None
}
}
};
Ok(antnode_rpc_endpoint)
}
async fn restart(
&mut self,
peer_id: PeerId,
endpoint: SocketAddr,
progress_on_error: bool,
) -> Result<()> {
match &self.inventory_file {
Either::Left(_inv) => {
todo!("Not implemented yet for WanNetwork");
},
Either::Right(_reg) => {
match LocalNetwork::restart_node(endpoint, self.retain_peer_id).await
.map_err(|err| eyre!("Failed to restart peer {peer_id:?} on antnode RPC endpoint: {endpoint:?} with err {err:?}")) {
Ok(_) => {
self.next_to_restart_idx += 1;
},
Err(err) => {
if progress_on_error {
self.next_to_restart_idx += 1;
}
return Err(err);
}
}
}
}
Ok(())
}
pub fn reset_index(&mut self) {
self.next_to_restart_idx = 0;
}
}