ant_service_management/
rpc.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::error::{Error, Result};
10use ant_protocol::{
11    CLOSE_GROUP_SIZE,
12    antnode_proto::{
13        NetworkInfoRequest, NodeInfoRequest, RecordAddressesRequest, RestartRequest, StopRequest,
14        UpdateLogLevelRequest, UpdateRequest, ant_node_client::AntNodeClient,
15    },
16};
17use async_trait::async_trait;
18use libp2p::{Multiaddr, PeerId, kad::RecordKey};
19use std::{net::SocketAddr, path::PathBuf, str::FromStr};
20use tokio::time::Duration;
21use tonic::Request;
22use tracing::error;
23
24#[derive(Debug, Clone)]
25pub struct NodeInfo {
26    pub pid: u32,
27    pub peer_id: PeerId,
28    pub log_path: PathBuf,
29    pub data_path: PathBuf,
30    pub version: String,
31    pub uptime: Duration,
32    pub wallet_balance: u64,
33}
34
35#[derive(Debug, Clone)]
36pub struct NetworkInfo {
37    pub connected_peers: Vec<PeerId>,
38    pub listeners: Vec<Multiaddr>,
39}
40
41#[derive(Debug, Clone)]
42pub struct RecordAddress {
43    pub key: RecordKey,
44}
45
46#[async_trait]
47pub trait RpcActions: Sync {
48    async fn node_info(&self) -> Result<NodeInfo>;
49    async fn network_info(&self) -> Result<NetworkInfo>;
50    async fn record_addresses(&self) -> Result<Vec<RecordAddress>>;
51    async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> Result<()>;
52    async fn node_stop(&self, delay_millis: u64) -> Result<()>;
53    async fn node_update(&self, delay_millis: u64) -> Result<()>;
54    async fn is_node_connected_to_network(&self, timeout: Duration) -> Result<()>;
55    async fn update_log_level(&self, log_levels: String) -> Result<()>;
56}
57
58#[derive(Debug, Clone)]
59pub struct RpcClient {
60    endpoint: String,
61    max_attempts: u8,
62    retry_delay: Duration,
63}
64
65impl RpcClient {
66    const MAX_CONNECTION_RETRY_ATTEMPTS: u8 = 5;
67    const CONNECTION_RETRY_DELAY_SEC: Duration = Duration::from_secs(1);
68
69    pub fn new(endpoint: &str) -> Self {
70        Self {
71            endpoint: endpoint.to_string(),
72            max_attempts: Self::MAX_CONNECTION_RETRY_ATTEMPTS,
73            retry_delay: Self::CONNECTION_RETRY_DELAY_SEC,
74        }
75    }
76
77    pub fn from_socket_addr(socket: SocketAddr) -> Self {
78        let endpoint = format!("https://{socket}");
79        Self::new(&endpoint)
80    }
81
82    /// Set the maximum number of retry attempts when connecting to the RPC endpoint. Default is 5.
83    pub fn set_max_attempts(&mut self, max_retry_attempts: u8) {
84        self.max_attempts = max_retry_attempts;
85    }
86
87    /// Set the delay between retry attempts when connecting to the RPC endpoint. Default is 1 second.
88    pub fn set_retry_delay(&mut self, retry_delay: Duration) {
89        self.retry_delay = retry_delay;
90    }
91
92    // Connect to the RPC endpoint with retry
93    async fn connect_with_retry(&self) -> Result<AntNodeClient<tonic::transport::Channel>> {
94        let mut attempts = 0;
95        loop {
96            debug!(
97                "Attempting connection to node RPC endpoint at {}...",
98                self.endpoint
99            );
100            match AntNodeClient::connect(self.endpoint.clone()).await {
101                Ok(rpc_client) => {
102                    debug!("Connection successful");
103                    break Ok(rpc_client);
104                }
105                Err(_) => {
106                    attempts += 1;
107                    tokio::time::sleep(self.retry_delay).await;
108                    if attempts >= self.max_attempts {
109                        return Err(Error::RpcConnectionError(self.endpoint.clone()));
110                    }
111                    error!(
112                        "Could not connect to RPC endpoint {:?}. Retrying {attempts}/{}",
113                        self.endpoint, self.max_attempts
114                    );
115                }
116            }
117        }
118    }
119}
120
121#[async_trait]
122impl RpcActions for RpcClient {
123    async fn node_info(&self) -> Result<NodeInfo> {
124        let mut client = self.connect_with_retry().await?;
125        let response = client
126            .node_info(Request::new(NodeInfoRequest {}))
127            .await
128            .map_err(|e| {
129                error!("Could not obtain node info through RPC: {e:?}");
130                Error::RpcNodeInfoError(e.to_string())
131            })?;
132        let node_info_resp = response.get_ref();
133        let peer_id = PeerId::from_bytes(&node_info_resp.peer_id)?;
134        let node_info = NodeInfo {
135            pid: node_info_resp.pid,
136            peer_id,
137            log_path: PathBuf::from(node_info_resp.log_dir.clone()),
138            data_path: PathBuf::from(node_info_resp.data_dir.clone()),
139            version: node_info_resp.bin_version.clone(),
140            uptime: Duration::from_secs(node_info_resp.uptime_secs),
141            wallet_balance: node_info_resp.wallet_balance,
142        };
143        Ok(node_info)
144    }
145    async fn network_info(&self) -> Result<NetworkInfo> {
146        let mut client = self.connect_with_retry().await?;
147        let response = client
148            .network_info(Request::new(NetworkInfoRequest {}))
149            .await
150            .map_err(|e| {
151                error!("Could not obtain network info through RPC: {e:?}");
152                Error::RpcNodeInfoError(e.to_string())
153            })?;
154        let network_info = response.get_ref();
155
156        let mut connected_peers = Vec::new();
157        for bytes in network_info.connected_peers.iter() {
158            let peer_id = PeerId::from_bytes(bytes)?;
159            connected_peers.push(peer_id);
160        }
161
162        let mut listeners = Vec::new();
163        for multiaddr_str in network_info.listeners.iter() {
164            let multiaddr = Multiaddr::from_str(multiaddr_str)?;
165            listeners.push(multiaddr);
166        }
167
168        Ok(NetworkInfo {
169            connected_peers,
170            listeners,
171        })
172    }
173
174    async fn record_addresses(&self) -> Result<Vec<RecordAddress>> {
175        let mut client = self.connect_with_retry().await?;
176        let response = client
177            .record_addresses(Request::new(RecordAddressesRequest {}))
178            .await
179            .map_err(|e| {
180                error!("Could not obtain record addresses through RPC: {e:?}");
181                Error::RpcRecordAddressError(e.to_string())
182            })?;
183        let mut record_addresses = vec![];
184        for bytes in response.get_ref().addresses.iter() {
185            let key = libp2p::kad::RecordKey::from(bytes.clone());
186            record_addresses.push(RecordAddress { key });
187        }
188        Ok(record_addresses)
189    }
190
191    async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> Result<()> {
192        let mut client = self.connect_with_retry().await?;
193        let _response = client
194            .restart(Request::new(RestartRequest {
195                delay_millis,
196                retain_peer_id,
197            }))
198            .await
199            .map_err(|e| {
200                error!("Could not restart node through RPC: {e:?}");
201                Error::RpcNodeRestartError(e.to_string())
202            })?;
203        Ok(())
204    }
205
206    async fn node_stop(&self, delay_millis: u64) -> Result<()> {
207        let mut client = self.connect_with_retry().await?;
208        let _response = client
209            .stop(Request::new(StopRequest { delay_millis }))
210            .await
211            .map_err(|e| {
212                error!("Could not restart node through RPC: {e:?}");
213                Error::RpcNodeStopError(e.to_string())
214            })?;
215        Ok(())
216    }
217
218    async fn node_update(&self, delay_millis: u64) -> Result<()> {
219        let mut client = self.connect_with_retry().await?;
220        let _response = client
221            .update(Request::new(UpdateRequest { delay_millis }))
222            .await
223            .map_err(|e| {
224                error!("Could not update node through RPC: {e:?}");
225                Error::RpcNodeUpdateError(e.to_string())
226            })?;
227        Ok(())
228    }
229
230    async fn is_node_connected_to_network(&self, timeout: Duration) -> Result<()> {
231        let max_attempts = std::cmp::max(1, timeout.as_secs() / self.retry_delay.as_secs());
232        trace!(
233            "RPC conneciton max attempts set to: {max_attempts} with retry_delay of {:?}",
234            self.retry_delay
235        );
236        let mut attempts = 0;
237        loop {
238            debug!(
239                "Attempting connection to node RPC endpoint at {}...",
240                self.endpoint
241            );
242            if let Ok(mut client) = AntNodeClient::connect(self.endpoint.clone()).await {
243                debug!("Connection to RPC successful");
244                if let Ok(response) = client
245                    .network_info(Request::new(NetworkInfoRequest {}))
246                    .await
247                {
248                    if response.get_ref().connected_peers.len() > CLOSE_GROUP_SIZE {
249                        return Ok(());
250                    } else {
251                        error!(
252                            "Node does not have enough peers connected yet. Retrying {attempts}/{max_attempts}",
253                        );
254                    }
255                } else {
256                    error!(
257                        "Could not obtain NetworkInfo through RPC. Retrying {attempts}/{max_attempts}"
258                    );
259                }
260            } else {
261                error!(
262                    "Could not connect to RPC endpoint {:?}. Retrying {attempts}/{max_attempts}",
263                    self.endpoint
264                );
265            }
266
267            attempts += 1;
268            tokio::time::sleep(self.retry_delay).await;
269            if attempts >= max_attempts {
270                return Err(Error::RpcConnectionError(self.endpoint.clone()));
271            }
272        }
273    }
274
275    async fn update_log_level(&self, log_levels: String) -> Result<()> {
276        let mut client = self.connect_with_retry().await?;
277        let _response = client
278            .update_log_level(Request::new(UpdateLogLevelRequest {
279                log_level: log_levels,
280            }))
281            .await
282            .map_err(|e| {
283                error!("Could not update node through RPC: {e:?}");
284                Error::RpcNodeUpdateError(e.to_string())
285            })?;
286        Ok(())
287    }
288}