ant_service_management/
rpc.rs1use crate::error::{Error, Result};
10use ant_protocol::{
11 CLOSE_GROUP_SIZE,
12 antnode_proto::{
13 NetworkInfoRequest, NodeInfoRequest, RecordAddressesRequest, RestartRequest, StopRequest,
14 UpdateLogLevelRequest, UpdateRequest, ant_node_client::AntNodeClient,
15 },
16};
17use async_trait::async_trait;
18use libp2p::{Multiaddr, PeerId, kad::RecordKey};
19use std::{net::SocketAddr, path::PathBuf, str::FromStr};
20use tokio::time::Duration;
21use tonic::Request;
22use tracing::error;
23
24#[derive(Debug, Clone)]
25pub struct NodeInfo {
26 pub pid: u32,
27 pub peer_id: PeerId,
28 pub log_path: PathBuf,
29 pub data_path: PathBuf,
30 pub version: String,
31 pub uptime: Duration,
32 pub wallet_balance: u64,
33}
34
35#[derive(Debug, Clone)]
36pub struct NetworkInfo {
37 pub connected_peers: Vec<PeerId>,
38 pub listeners: Vec<Multiaddr>,
39}
40
41#[derive(Debug, Clone)]
42pub struct RecordAddress {
43 pub key: RecordKey,
44}
45
46#[async_trait]
47pub trait RpcActions: Sync {
48 async fn node_info(&self) -> Result<NodeInfo>;
49 async fn network_info(&self) -> Result<NetworkInfo>;
50 async fn record_addresses(&self) -> Result<Vec<RecordAddress>>;
51 async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> Result<()>;
52 async fn node_stop(&self, delay_millis: u64) -> Result<()>;
53 async fn node_update(&self, delay_millis: u64) -> Result<()>;
54 async fn is_node_connected_to_network(&self, timeout: Duration) -> Result<()>;
55 async fn update_log_level(&self, log_levels: String) -> Result<()>;
56}
57
58#[derive(Debug, Clone)]
59pub struct RpcClient {
60 endpoint: String,
61 max_attempts: u8,
62 retry_delay: Duration,
63}
64
65impl RpcClient {
66 const MAX_CONNECTION_RETRY_ATTEMPTS: u8 = 5;
67 const CONNECTION_RETRY_DELAY_SEC: Duration = Duration::from_secs(1);
68
69 pub fn new(endpoint: &str) -> Self {
70 Self {
71 endpoint: endpoint.to_string(),
72 max_attempts: Self::MAX_CONNECTION_RETRY_ATTEMPTS,
73 retry_delay: Self::CONNECTION_RETRY_DELAY_SEC,
74 }
75 }
76
77 pub fn from_socket_addr(socket: SocketAddr) -> Self {
78 let endpoint = format!("https://{socket}");
79 Self::new(&endpoint)
80 }
81
82 pub fn set_max_attempts(&mut self, max_retry_attempts: u8) {
84 self.max_attempts = max_retry_attempts;
85 }
86
87 pub fn set_retry_delay(&mut self, retry_delay: Duration) {
89 self.retry_delay = retry_delay;
90 }
91
92 async fn connect_with_retry(&self) -> Result<AntNodeClient<tonic::transport::Channel>> {
94 let mut attempts = 0;
95 loop {
96 debug!(
97 "Attempting connection to node RPC endpoint at {}...",
98 self.endpoint
99 );
100 match AntNodeClient::connect(self.endpoint.clone()).await {
101 Ok(rpc_client) => {
102 debug!("Connection successful");
103 break Ok(rpc_client);
104 }
105 Err(_) => {
106 attempts += 1;
107 tokio::time::sleep(self.retry_delay).await;
108 if attempts >= self.max_attempts {
109 return Err(Error::RpcConnectionError(self.endpoint.clone()));
110 }
111 error!(
112 "Could not connect to RPC endpoint {:?}. Retrying {attempts}/{}",
113 self.endpoint, self.max_attempts
114 );
115 }
116 }
117 }
118 }
119}
120
121#[async_trait]
122impl RpcActions for RpcClient {
123 async fn node_info(&self) -> Result<NodeInfo> {
124 let mut client = self.connect_with_retry().await?;
125 let response = client
126 .node_info(Request::new(NodeInfoRequest {}))
127 .await
128 .map_err(|e| {
129 error!("Could not obtain node info through RPC: {e:?}");
130 Error::RpcNodeInfoError(e.to_string())
131 })?;
132 let node_info_resp = response.get_ref();
133 let peer_id = PeerId::from_bytes(&node_info_resp.peer_id)?;
134 let node_info = NodeInfo {
135 pid: node_info_resp.pid,
136 peer_id,
137 log_path: PathBuf::from(node_info_resp.log_dir.clone()),
138 data_path: PathBuf::from(node_info_resp.data_dir.clone()),
139 version: node_info_resp.bin_version.clone(),
140 uptime: Duration::from_secs(node_info_resp.uptime_secs),
141 wallet_balance: node_info_resp.wallet_balance,
142 };
143 Ok(node_info)
144 }
145 async fn network_info(&self) -> Result<NetworkInfo> {
146 let mut client = self.connect_with_retry().await?;
147 let response = client
148 .network_info(Request::new(NetworkInfoRequest {}))
149 .await
150 .map_err(|e| {
151 error!("Could not obtain network info through RPC: {e:?}");
152 Error::RpcNodeInfoError(e.to_string())
153 })?;
154 let network_info = response.get_ref();
155
156 let mut connected_peers = Vec::new();
157 for bytes in network_info.connected_peers.iter() {
158 let peer_id = PeerId::from_bytes(bytes)?;
159 connected_peers.push(peer_id);
160 }
161
162 let mut listeners = Vec::new();
163 for multiaddr_str in network_info.listeners.iter() {
164 let multiaddr = Multiaddr::from_str(multiaddr_str)?;
165 listeners.push(multiaddr);
166 }
167
168 Ok(NetworkInfo {
169 connected_peers,
170 listeners,
171 })
172 }
173
174 async fn record_addresses(&self) -> Result<Vec<RecordAddress>> {
175 let mut client = self.connect_with_retry().await?;
176 let response = client
177 .record_addresses(Request::new(RecordAddressesRequest {}))
178 .await
179 .map_err(|e| {
180 error!("Could not obtain record addresses through RPC: {e:?}");
181 Error::RpcRecordAddressError(e.to_string())
182 })?;
183 let mut record_addresses = vec![];
184 for bytes in response.get_ref().addresses.iter() {
185 let key = libp2p::kad::RecordKey::from(bytes.clone());
186 record_addresses.push(RecordAddress { key });
187 }
188 Ok(record_addresses)
189 }
190
191 async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> Result<()> {
192 let mut client = self.connect_with_retry().await?;
193 let _response = client
194 .restart(Request::new(RestartRequest {
195 delay_millis,
196 retain_peer_id,
197 }))
198 .await
199 .map_err(|e| {
200 error!("Could not restart node through RPC: {e:?}");
201 Error::RpcNodeRestartError(e.to_string())
202 })?;
203 Ok(())
204 }
205
206 async fn node_stop(&self, delay_millis: u64) -> Result<()> {
207 let mut client = self.connect_with_retry().await?;
208 let _response = client
209 .stop(Request::new(StopRequest { delay_millis }))
210 .await
211 .map_err(|e| {
212 error!("Could not restart node through RPC: {e:?}");
213 Error::RpcNodeStopError(e.to_string())
214 })?;
215 Ok(())
216 }
217
218 async fn node_update(&self, delay_millis: u64) -> Result<()> {
219 let mut client = self.connect_with_retry().await?;
220 let _response = client
221 .update(Request::new(UpdateRequest { delay_millis }))
222 .await
223 .map_err(|e| {
224 error!("Could not update node through RPC: {e:?}");
225 Error::RpcNodeUpdateError(e.to_string())
226 })?;
227 Ok(())
228 }
229
230 async fn is_node_connected_to_network(&self, timeout: Duration) -> Result<()> {
231 let max_attempts = std::cmp::max(1, timeout.as_secs() / self.retry_delay.as_secs());
232 trace!(
233 "RPC conneciton max attempts set to: {max_attempts} with retry_delay of {:?}",
234 self.retry_delay
235 );
236 let mut attempts = 0;
237 loop {
238 debug!(
239 "Attempting connection to node RPC endpoint at {}...",
240 self.endpoint
241 );
242 if let Ok(mut client) = AntNodeClient::connect(self.endpoint.clone()).await {
243 debug!("Connection to RPC successful");
244 if let Ok(response) = client
245 .network_info(Request::new(NetworkInfoRequest {}))
246 .await
247 {
248 if response.get_ref().connected_peers.len() > CLOSE_GROUP_SIZE {
249 return Ok(());
250 } else {
251 error!(
252 "Node does not have enough peers connected yet. Retrying {attempts}/{max_attempts}",
253 );
254 }
255 } else {
256 error!(
257 "Could not obtain NetworkInfo through RPC. Retrying {attempts}/{max_attempts}"
258 );
259 }
260 } else {
261 error!(
262 "Could not connect to RPC endpoint {:?}. Retrying {attempts}/{max_attempts}",
263 self.endpoint
264 );
265 }
266
267 attempts += 1;
268 tokio::time::sleep(self.retry_delay).await;
269 if attempts >= max_attempts {
270 return Err(Error::RpcConnectionError(self.endpoint.clone()));
271 }
272 }
273 }
274
275 async fn update_log_level(&self, log_levels: String) -> Result<()> {
276 let mut client = self.connect_with_retry().await?;
277 let _response = client
278 .update_log_level(Request::new(UpdateLogLevelRequest {
279 log_level: log_levels,
280 }))
281 .await
282 .map_err(|e| {
283 error!("Could not update node through RPC: {e:?}");
284 Error::RpcNodeUpdateError(e.to_string())
285 })?;
286 Ok(())
287 }
288}