ant_service_management/node/
mod.rs

1// Copyright (C) 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9mod node_service_data;
10mod node_service_data_v0;
11mod node_service_data_v1;
12mod node_service_data_v2;
13
14// Re-export types
15pub use node_service_data::{NODE_SERVICE_DATA_SCHEMA_LATEST, NodeServiceData};
16
17use crate::{ServiceStateActions, ServiceStatus, UpgradeOptions, error::Result, rpc::RpcActions};
18use ant_bootstrap::InitialPeersConfig;
19use ant_evm::EvmNetwork;
20use ant_protocol::get_port_from_multiaddr;
21use libp2p::multiaddr::Protocol;
22use service_manager::{ServiceInstallCtx, ServiceLabel};
23use std::{ffi::OsString, path::PathBuf, sync::Arc, time::Duration};
24use tokio::sync::RwLock;
25use tonic::async_trait;
26
27pub struct NodeService {
28    pub service_data: Arc<RwLock<NodeServiceData>>,
29    pub rpc_actions: Box<dyn RpcActions + Send>,
30    /// Used to enable dynamic startup delay based on the time it takes for a node to connect to the network.
31    pub connection_timeout: Option<Duration>,
32}
33
34impl NodeService {
35    pub fn new(
36        service_data: Arc<RwLock<NodeServiceData>>,
37        rpc_actions: Box<dyn RpcActions + Send>,
38    ) -> NodeService {
39        NodeService {
40            rpc_actions,
41            service_data,
42            connection_timeout: None,
43        }
44    }
45
46    /// Set the max time to wait for the node to connect to the network.
47    /// If not set, we do not perform a dynamic startup delay.
48    pub fn with_connection_timeout(mut self, connection_timeout: Duration) -> NodeService {
49        self.connection_timeout = Some(connection_timeout);
50        self
51    }
52}
53
54#[async_trait]
55impl ServiceStateActions for NodeService {
56    async fn bin_path(&self) -> PathBuf {
57        self.service_data.read().await.antnode_path.clone()
58    }
59
60    async fn build_upgrade_install_context(
61        &self,
62        options: UpgradeOptions,
63    ) -> Result<ServiceInstallCtx> {
64        let service_data = self.service_data.read().await;
65        let label: ServiceLabel = service_data.service_name.parse()?;
66        let mut args = vec![
67            OsString::from("--rpc"),
68            OsString::from(service_data.rpc_socket_addr.to_string()),
69            OsString::from("--root-dir"),
70            OsString::from(service_data.data_dir_path.to_string_lossy().to_string()),
71            OsString::from("--log-output-dest"),
72            OsString::from(service_data.log_dir_path.to_string_lossy().to_string()),
73        ];
74
75        push_arguments_from_initial_peers_config(&service_data.initial_peers_config, &mut args);
76        if let Some(log_fmt) = service_data.log_format {
77            args.push(OsString::from("--log-format"));
78            args.push(OsString::from(log_fmt.as_str()));
79        }
80        if let Some(id) = service_data.network_id {
81            args.push(OsString::from("--network-id"));
82            args.push(OsString::from(id.to_string()));
83        }
84        if service_data.no_upnp {
85            args.push(OsString::from("--no-upnp"));
86        }
87        if service_data.relay {
88            args.push(OsString::from("--relay"));
89        }
90
91        if service_data.alpha {
92            args.push(OsString::from("--alpha"));
93        }
94
95        if let Some(node_ip) = service_data.node_ip {
96            args.push(OsString::from("--ip"));
97            args.push(OsString::from(node_ip.to_string()));
98        }
99
100        if let Some(node_port) = service_data.node_port {
101            args.push(OsString::from("--port"));
102            args.push(OsString::from(node_port.to_string()));
103        }
104        if let Some(metrics_port) = service_data.metrics_port {
105            args.push(OsString::from("--metrics-server-port"));
106            args.push(OsString::from(metrics_port.to_string()));
107        }
108        if let Some(max_archived_log_files) = service_data.max_archived_log_files {
109            args.push(OsString::from("--max-archived-log-files"));
110            args.push(OsString::from(max_archived_log_files.to_string()));
111        }
112        if let Some(max_log_files) = service_data.max_log_files {
113            args.push(OsString::from("--max-log-files"));
114            args.push(OsString::from(max_log_files.to_string()));
115        }
116
117        args.push(OsString::from("--rewards-address"));
118        args.push(OsString::from(service_data.rewards_address.to_string()));
119
120        if service_data.write_older_cache_files {
121            args.push(OsString::from("--write-older-cache-files"));
122        }
123
124        args.push(OsString::from(service_data.evm_network.to_string()));
125        if let EvmNetwork::Custom(custom_network) = &service_data.evm_network {
126            args.push(OsString::from("--rpc-url"));
127            args.push(OsString::from(custom_network.rpc_url_http.to_string()));
128            args.push(OsString::from("--payment-token-address"));
129            args.push(OsString::from(
130                custom_network.payment_token_address.to_string(),
131            ));
132            args.push(OsString::from("--data-payments-address"));
133            args.push(OsString::from(
134                custom_network.data_payments_address.to_string(),
135            ));
136        }
137
138        Ok(ServiceInstallCtx {
139            args,
140            autostart: options.auto_restart,
141            contents: None,
142            environment: options.env_variables,
143            label: label.clone(),
144            program: service_data.antnode_path.to_path_buf(),
145            username: service_data.user.clone(),
146            working_directory: None,
147            disable_restart_on_failure: true,
148        })
149    }
150
151    async fn data_dir_path(&self) -> PathBuf {
152        self.service_data.read().await.data_dir_path.clone()
153    }
154
155    async fn is_user_mode(&self) -> bool {
156        self.service_data.read().await.user_mode
157    }
158
159    async fn log_dir_path(&self) -> PathBuf {
160        self.service_data.read().await.log_dir_path.clone()
161    }
162
163    async fn name(&self) -> String {
164        self.service_data.read().await.service_name.clone()
165    }
166
167    async fn pid(&self) -> Option<u32> {
168        self.service_data.read().await.pid
169    }
170
171    async fn on_remove(&self) {
172        self.service_data.write().await.status = ServiceStatus::Removed;
173    }
174
175    async fn on_start(&self, pid: Option<u32>, full_refresh: bool) -> Result<()> {
176        let mut service_data = self.service_data.write().await;
177        let (connected_peers, pid, peer_id) = if full_refresh {
178            debug!("Performing full refresh for {}", service_data.service_name);
179            if let Some(connection_timeout) = self.connection_timeout {
180                debug!(
181                    "Performing dynamic startup delay for {}",
182                    service_data.service_name
183                );
184                self.rpc_actions
185                    .is_node_connected_to_network(connection_timeout)
186                    .await?;
187            }
188
189            let node_info = self
190                .rpc_actions
191                .node_info()
192                .await
193                .inspect_err(|err| error!("Error obtaining node_info via RPC: {err:?}"))?;
194            let network_info = self
195                .rpc_actions
196                .network_info()
197                .await
198                .inspect_err(|err| error!("Error obtaining network_info via RPC: {err:?}"))?;
199
200            service_data.listen_addr = Some(
201                network_info
202                    .listeners
203                    .iter()
204                    .cloned()
205                    .map(|addr| addr.with(Protocol::P2p(node_info.peer_id)))
206                    .collect(),
207            );
208            for addr in &network_info.listeners {
209                if let Some(port) = get_port_from_multiaddr(addr) {
210                    debug!(
211                        "Found antnode port for {}: {port}",
212                        service_data.service_name
213                    );
214                    service_data.node_port = Some(port);
215                    break;
216                }
217            }
218
219            if service_data.node_port.is_none() {
220                error!("Could not find antnode port");
221                error!("This will cause the node to have a different port during upgrade");
222            }
223
224            (
225                Some(network_info.connected_peers),
226                pid,
227                Some(node_info.peer_id),
228            )
229        } else {
230            debug!(
231                "Performing partial refresh for {}",
232                service_data.service_name
233            );
234            debug!("Previously assigned data will be used");
235            (
236                service_data.connected_peers.clone(),
237                pid,
238                service_data.peer_id,
239            )
240        };
241
242        service_data.connected_peers = connected_peers;
243        service_data.peer_id = peer_id;
244        service_data.pid = pid;
245        service_data.status = ServiceStatus::Running;
246        Ok(())
247    }
248
249    async fn on_stop(&self) -> Result<()> {
250        let mut service_data = self.service_data.write().await;
251        debug!("Marking {} as stopped", service_data.service_name);
252        service_data.pid = None;
253        service_data.status = ServiceStatus::Stopped;
254        service_data.connected_peers = None;
255        Ok(())
256    }
257
258    async fn set_version(&self, version: &str) {
259        self.service_data.write().await.version = version.to_string();
260    }
261
262    async fn status(&self) -> ServiceStatus {
263        self.service_data.read().await.status.clone()
264    }
265
266    async fn version(&self) -> String {
267        self.service_data.read().await.version.clone()
268    }
269}
270
271/// Pushes arguments from the `InitialPeersConfig` struct to the provided `args` vector.
272pub fn push_arguments_from_initial_peers_config(
273    init_peers_config: &InitialPeersConfig,
274    args: &mut Vec<OsString>,
275) {
276    if init_peers_config.first {
277        args.push(OsString::from("--first"));
278    }
279    if init_peers_config.local {
280        args.push(OsString::from("--local"));
281    }
282    if !init_peers_config.addrs.is_empty() {
283        let peers_str = init_peers_config
284            .addrs
285            .iter()
286            .map(|peer| peer.to_string())
287            .collect::<Vec<_>>()
288            .join(",");
289        args.push(OsString::from("--peer"));
290        args.push(OsString::from(peers_str));
291    }
292    if !init_peers_config.network_contacts_url.is_empty() {
293        args.push(OsString::from("--network-contacts-url"));
294        args.push(OsString::from(
295            init_peers_config
296                .network_contacts_url
297                .iter()
298                .map(|url| url.to_string())
299                .collect::<Vec<_>>()
300                .join(","),
301        ));
302    }
303    if init_peers_config.ignore_cache {
304        args.push(OsString::from("--ignore-cache"));
305    }
306    if let Some(path) = &init_peers_config.bootstrap_cache_dir {
307        args.push(OsString::from("--bootstrap-cache-dir"));
308        args.push(OsString::from(path.to_string_lossy().to_string()));
309    }
310}