ant_node_manager/
rpc_client.rs

1use ant_service_management::antctl_proto::ant_ctl_client::AntCtlClient;
2use ant_service_management::antctl_proto::NodeServiceRestartRequest;
3use color_eyre::eyre::bail;
4use color_eyre::{eyre::eyre, Result};
5use libp2p_identity::PeerId;
6use std::net::SocketAddr;
7use std::str::FromStr;
8use std::time::Duration;
9use tonic::transport::Channel;
10use tonic::Request;
11
12struct DaemonRpcClient {
13    addr: SocketAddr,
14    rpc: AntCtlClient<Channel>,
15}
16
17pub async fn restart_node(
18    peer_ids: Vec<String>,
19    rpc_server_address: SocketAddr,
20    retain_peer_id: bool,
21) -> Result<()> {
22    for peer_id in peer_ids {
23        debug!("Sending NodeServiceRestartRequest to {peer_id:?} at {rpc_server_address:?}");
24        let str_bytes = PeerId::from_str(&peer_id)?.to_bytes();
25
26        let mut daemon_client = get_rpc_client(rpc_server_address).await?;
27
28        let _response = daemon_client
29            .rpc
30            .restart_node_service(Request::new(NodeServiceRestartRequest {
31                peer_id: str_bytes,
32                delay_millis: 0,
33                retain_peer_id,
34            }))
35            .await
36            .map_err(|err| {
37                error!("Failed to restart node service with {peer_id:?} at {rpc_server_address:?} with err: {err:?}");
38                eyre!(
39                    "Failed to restart node service with {peer_id:?} at {:?} with err: {err:?}",
40                    daemon_client.addr
41                )
42            })?;
43    }
44    Ok(())
45}
46
47async fn get_rpc_client(socket_addr: SocketAddr) -> Result<DaemonRpcClient> {
48    let endpoint = format!("https://{socket_addr}");
49    let mut attempts = 0;
50    loop {
51        if let Ok(rpc_client) = AntCtlClient::connect(endpoint.clone()).await {
52            let rpc_client = DaemonRpcClient {
53                addr: socket_addr,
54                rpc: rpc_client,
55            };
56            return Ok(rpc_client);
57        }
58        attempts += 1;
59        error!("Could not connect to rpc {endpoint:?}. Attempts: {attempts:?}/10");
60        println!("Could not connect to rpc {endpoint:?}. Attempts: {attempts:?}/10");
61        tokio::time::sleep(Duration::from_secs(1)).await;
62        if attempts >= 10 {
63            error!("Failed to connect to {endpoint:?} even after 10 retries");
64            bail!("Failed to connect to {endpoint:?} even after 10 retries");
65        }
66    }
67}