ant_service_management/
node.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::{error::Result, rpc::RpcActions, ServiceStateActions, ServiceStatus, UpgradeOptions};
10use ant_bootstrap::InitialPeersConfig;
11use ant_evm::{AttoTokens, EvmNetwork, RewardsAddress};
12use ant_logging::LogFormat;
13use ant_protocol::get_port_from_multiaddr;
14use async_trait::async_trait;
15use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
16use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer};
17use service_manager::{ServiceInstallCtx, ServiceLabel};
18use std::{
19    ffi::OsString,
20    net::{Ipv4Addr, SocketAddr},
21    path::PathBuf,
22    str::FromStr,
23    time::Duration,
24};
25
26pub struct NodeService<'a> {
27    pub service_data: &'a mut NodeServiceData,
28    pub rpc_actions: Box<dyn RpcActions + Send>,
29    /// Used to enable dynamic startup delay based on the time it takes for a node to connect to the network.
30    pub connection_timeout: Option<Duration>,
31}
32
33impl<'a> NodeService<'a> {
34    pub fn new(
35        service_data: &'a mut NodeServiceData,
36        rpc_actions: Box<dyn RpcActions + Send>,
37    ) -> NodeService<'a> {
38        NodeService {
39            rpc_actions,
40            service_data,
41            connection_timeout: None,
42        }
43    }
44
45    /// Set the max time to wait for the node to connect to the network.
46    /// If not set, we do not perform a dynamic startup delay.
47    pub fn with_connection_timeout(mut self, connection_timeout: Duration) -> NodeService<'a> {
48        self.connection_timeout = Some(connection_timeout);
49        self
50    }
51}
52
53#[async_trait]
54impl ServiceStateActions for NodeService<'_> {
55    fn bin_path(&self) -> PathBuf {
56        self.service_data.antnode_path.clone()
57    }
58
59    fn build_upgrade_install_context(&self, options: UpgradeOptions) -> Result<ServiceInstallCtx> {
60        let label: ServiceLabel = self.service_data.service_name.parse()?;
61        let mut args = vec![
62            OsString::from("--rpc"),
63            OsString::from(self.service_data.rpc_socket_addr.to_string()),
64            OsString::from("--root-dir"),
65            OsString::from(
66                self.service_data
67                    .data_dir_path
68                    .to_string_lossy()
69                    .to_string(),
70            ),
71            OsString::from("--log-output-dest"),
72            OsString::from(self.service_data.log_dir_path.to_string_lossy().to_string()),
73        ];
74
75        push_arguments_from_initial_peers_config(&self.service_data.peers_args, &mut args);
76        if let Some(log_fmt) = self.service_data.log_format {
77            args.push(OsString::from("--log-format"));
78            args.push(OsString::from(log_fmt.as_str()));
79        }
80        if let Some(id) = self.service_data.network_id {
81            args.push(OsString::from("--network-id"));
82            args.push(OsString::from(id.to_string()));
83        }
84        if self.service_data.upnp {
85            args.push(OsString::from("--upnp"));
86        }
87        if self.service_data.home_network {
88            args.push(OsString::from("--home-network"));
89        }
90
91        if let Some(node_ip) = self.service_data.node_ip {
92            args.push(OsString::from("--ip"));
93            args.push(OsString::from(node_ip.to_string()));
94        }
95
96        if let Some(node_port) = self.service_data.node_port {
97            args.push(OsString::from("--port"));
98            args.push(OsString::from(node_port.to_string()));
99        }
100        if let Some(metrics_port) = self.service_data.metrics_port {
101            args.push(OsString::from("--metrics-server-port"));
102            args.push(OsString::from(metrics_port.to_string()));
103        }
104        if let Some(max_archived_log_files) = self.service_data.max_archived_log_files {
105            args.push(OsString::from("--max-archived-log-files"));
106            args.push(OsString::from(max_archived_log_files.to_string()));
107        }
108        if let Some(max_log_files) = self.service_data.max_log_files {
109            args.push(OsString::from("--max-log-files"));
110            args.push(OsString::from(max_log_files.to_string()));
111        }
112
113        args.push(OsString::from("--rewards-address"));
114        args.push(OsString::from(
115            self.service_data.rewards_address.to_string(),
116        ));
117
118        args.push(OsString::from(self.service_data.evm_network.to_string()));
119        if let EvmNetwork::Custom(custom_network) = &self.service_data.evm_network {
120            args.push(OsString::from("--rpc-url"));
121            args.push(OsString::from(custom_network.rpc_url_http.to_string()));
122            args.push(OsString::from("--payment-token-address"));
123            args.push(OsString::from(
124                custom_network.payment_token_address.to_string(),
125            ));
126            args.push(OsString::from("--data-payments-address"));
127            args.push(OsString::from(
128                custom_network.data_payments_address.to_string(),
129            ));
130        }
131
132        Ok(ServiceInstallCtx {
133            args,
134            autostart: options.auto_restart,
135            contents: None,
136            environment: options.env_variables,
137            label: label.clone(),
138            program: self.service_data.antnode_path.to_path_buf(),
139            username: self.service_data.user.clone(),
140            working_directory: None,
141            disable_restart_on_failure: true,
142        })
143    }
144
145    fn data_dir_path(&self) -> PathBuf {
146        self.service_data.data_dir_path.clone()
147    }
148
149    fn is_user_mode(&self) -> bool {
150        self.service_data.user_mode
151    }
152
153    fn log_dir_path(&self) -> PathBuf {
154        self.service_data.log_dir_path.clone()
155    }
156
157    fn name(&self) -> String {
158        self.service_data.service_name.clone()
159    }
160
161    fn pid(&self) -> Option<u32> {
162        self.service_data.pid
163    }
164
165    fn on_remove(&mut self) {
166        self.service_data.status = ServiceStatus::Removed;
167    }
168
169    async fn on_start(&mut self, pid: Option<u32>, full_refresh: bool) -> Result<()> {
170        let (connected_peers, pid, peer_id) = if full_refresh {
171            debug!(
172                "Performing full refresh for {}",
173                self.service_data.service_name
174            );
175            if let Some(connection_timeout) = self.connection_timeout {
176                debug!(
177                    "Performing dynamic startup delay for {}",
178                    self.service_data.service_name
179                );
180                self.rpc_actions
181                    .is_node_connected_to_network(connection_timeout)
182                    .await?;
183            }
184
185            let node_info = self
186                .rpc_actions
187                .node_info()
188                .await
189                .inspect_err(|err| error!("Error obtaining node_info via RPC: {err:?}"))?;
190            let network_info = self
191                .rpc_actions
192                .network_info()
193                .await
194                .inspect_err(|err| error!("Error obtaining network_info via RPC: {err:?}"))?;
195
196            self.service_data.listen_addr = Some(
197                network_info
198                    .listeners
199                    .iter()
200                    .cloned()
201                    .map(|addr| addr.with(Protocol::P2p(node_info.peer_id)))
202                    .collect(),
203            );
204            for addr in &network_info.listeners {
205                if let Some(port) = get_port_from_multiaddr(addr) {
206                    debug!(
207                        "Found antnode port for {}: {port}",
208                        self.service_data.service_name
209                    );
210                    self.service_data.node_port = Some(port);
211                    break;
212                }
213            }
214
215            if self.service_data.node_port.is_none() {
216                error!("Could not find antnode port");
217                error!("This will cause the node to have a different port during upgrade");
218            }
219
220            (
221                Some(network_info.connected_peers),
222                pid,
223                Some(node_info.peer_id),
224            )
225        } else {
226            debug!(
227                "Performing partial refresh for {}",
228                self.service_data.service_name
229            );
230            debug!("Previously assigned data will be used");
231            (
232                self.service_data.connected_peers.clone(),
233                pid,
234                self.service_data.peer_id,
235            )
236        };
237
238        self.service_data.connected_peers = connected_peers;
239        self.service_data.peer_id = peer_id;
240        self.service_data.pid = pid;
241        self.service_data.status = ServiceStatus::Running;
242        Ok(())
243    }
244
245    async fn on_stop(&mut self) -> Result<()> {
246        debug!("Marking {} as stopped", self.service_data.service_name);
247        self.service_data.pid = None;
248        self.service_data.status = ServiceStatus::Stopped;
249        self.service_data.connected_peers = None;
250        Ok(())
251    }
252
253    fn set_version(&mut self, version: &str) {
254        self.service_data.version = version.to_string();
255    }
256
257    fn status(&self) -> ServiceStatus {
258        self.service_data.status.clone()
259    }
260
261    fn version(&self) -> String {
262        self.service_data.version.clone()
263    }
264}
265
266#[derive(Clone, Debug, Serialize, Deserialize)]
267pub struct NodeServiceData {
268    pub antnode_path: PathBuf,
269    #[serde(default)]
270    pub auto_restart: bool,
271    #[serde(
272        serialize_with = "serialize_connected_peers",
273        deserialize_with = "deserialize_connected_peers"
274    )]
275    pub connected_peers: Option<Vec<PeerId>>,
276    pub data_dir_path: PathBuf,
277    #[serde(default)]
278    pub evm_network: EvmNetwork,
279    pub home_network: bool,
280    pub listen_addr: Option<Vec<Multiaddr>>,
281    pub log_dir_path: PathBuf,
282    pub log_format: Option<LogFormat>,
283    pub max_archived_log_files: Option<usize>,
284    pub max_log_files: Option<usize>,
285    #[serde(default)]
286    pub metrics_port: Option<u16>,
287    pub network_id: Option<u8>,
288    #[serde(default)]
289    pub node_ip: Option<Ipv4Addr>,
290    #[serde(default)]
291    pub node_port: Option<u16>,
292    pub number: u16,
293    #[serde(
294        serialize_with = "serialize_peer_id",
295        deserialize_with = "deserialize_peer_id"
296    )]
297    pub peer_id: Option<PeerId>,
298    pub peers_args: InitialPeersConfig,
299    pub pid: Option<u32>,
300    #[serde(default)]
301    pub rewards_address: RewardsAddress,
302    pub reward_balance: Option<AttoTokens>,
303    pub rpc_socket_addr: SocketAddr,
304    pub service_name: String,
305    pub status: ServiceStatus,
306    #[serde(default = "default_upnp")]
307    pub upnp: bool,
308    pub user: Option<String>,
309    pub user_mode: bool,
310    pub version: String,
311}
312
313fn default_upnp() -> bool {
314    false
315}
316
317fn serialize_peer_id<S>(value: &Option<PeerId>, serializer: S) -> Result<S::Ok, S::Error>
318where
319    S: Serializer,
320{
321    if let Some(peer_id) = value {
322        return serializer.serialize_str(&peer_id.to_string());
323    }
324    serializer.serialize_none()
325}
326
327fn deserialize_peer_id<'de, D>(deserializer: D) -> Result<Option<PeerId>, D::Error>
328where
329    D: Deserializer<'de>,
330{
331    let s: Option<String> = Option::deserialize(deserializer)?;
332    if let Some(peer_id_str) = s {
333        PeerId::from_str(&peer_id_str)
334            .map(Some)
335            .map_err(DeError::custom)
336    } else {
337        Ok(None)
338    }
339}
340
341fn serialize_connected_peers<S>(
342    connected_peers: &Option<Vec<PeerId>>,
343    serializer: S,
344) -> Result<S::Ok, S::Error>
345where
346    S: Serializer,
347{
348    match connected_peers {
349        Some(peers) => {
350            let peer_strs: Vec<String> = peers.iter().map(|p| p.to_string()).collect();
351            serializer.serialize_some(&peer_strs)
352        }
353        None => serializer.serialize_none(),
354    }
355}
356
357fn deserialize_connected_peers<'de, D>(deserializer: D) -> Result<Option<Vec<PeerId>>, D::Error>
358where
359    D: Deserializer<'de>,
360{
361    let vec: Option<Vec<String>> = Option::deserialize(deserializer)?;
362    match vec {
363        Some(peer_strs) => {
364            let peers: Result<Vec<PeerId>, _> = peer_strs
365                .into_iter()
366                .map(|s| PeerId::from_str(&s).map_err(DeError::custom))
367                .collect();
368            peers.map(Some)
369        }
370        None => Ok(None),
371    }
372}
373
374impl NodeServiceData {
375    /// Returns the UDP port from our node's listen address.
376    pub fn get_antnode_port(&self) -> Option<u16> {
377        // assuming the listening addr contains /ip4/127.0.0.1/udp/56215/quic-v1/p2p/<peer_id>
378        if let Some(multi_addrs) = &self.listen_addr {
379            println!("Listening addresses are defined");
380            for addr in multi_addrs {
381                if let Some(port) = get_port_from_multiaddr(addr) {
382                    println!("Found port: {}", port);
383                    return Some(port);
384                }
385            }
386        }
387        None
388    }
389
390    /// Returns an optional critical failure of the node.
391    pub fn get_critical_failure(&self) -> Option<(chrono::DateTime<chrono::Utc>, String)> {
392        const CRITICAL_FAILURE_LOG_FILE: &str = "critical_failure.log";
393
394        let log_path = self.log_dir_path.join(CRITICAL_FAILURE_LOG_FILE);
395
396        if let Ok(content) = std::fs::read_to_string(log_path) {
397            if let Some((timestamp, message)) = content.split_once(']') {
398                let timestamp_trimmed = timestamp.trim_start_matches('[').trim();
399                if let Ok(datetime) = timestamp_trimmed.parse::<chrono::DateTime<chrono::Utc>>() {
400                    let message_trimmed = message
401                        .trim()
402                        .trim_start_matches("Node terminated due to: ");
403                    return Some((datetime, message_trimmed.to_string()));
404                }
405            }
406        }
407
408        None
409    }
410}
411
412/// Pushes arguments from the `InitialPeersConfig` struct to the provided `args` vector.
413pub fn push_arguments_from_initial_peers_config(
414    init_peers_config: &InitialPeersConfig,
415    args: &mut Vec<OsString>,
416) {
417    if init_peers_config.first {
418        args.push(OsString::from("--first"));
419    }
420    if init_peers_config.local {
421        args.push(OsString::from("--local"));
422    }
423    if !init_peers_config.addrs.is_empty() {
424        let peers_str = init_peers_config
425            .addrs
426            .iter()
427            .map(|peer| peer.to_string())
428            .collect::<Vec<_>>()
429            .join(",");
430        args.push(OsString::from("--peer"));
431        args.push(OsString::from(peers_str));
432    }
433    if !init_peers_config.network_contacts_url.is_empty() {
434        args.push(OsString::from("--network-contacts-url"));
435        args.push(OsString::from(
436            init_peers_config
437                .network_contacts_url
438                .iter()
439                .map(|url| url.to_string())
440                .collect::<Vec<_>>()
441                .join(","),
442        ));
443    }
444    if init_peers_config.disable_mainnet_contacts {
445        args.push(OsString::from("--testnet"));
446    }
447    if init_peers_config.ignore_cache {
448        args.push(OsString::from("--ignore-cache"));
449    }
450    if let Some(path) = &init_peers_config.bootstrap_cache_dir {
451        args.push(OsString::from("--bootstrap-cache-dir"));
452        args.push(OsString::from(path.to_string_lossy().to_string()));
453    }
454}