1use crate::{error::Result, rpc::RpcActions, ServiceStateActions, ServiceStatus, UpgradeOptions};
10use ant_bootstrap::InitialPeersConfig;
11use ant_evm::{AttoTokens, EvmNetwork, RewardsAddress};
12use ant_logging::LogFormat;
13use ant_protocol::get_port_from_multiaddr;
14use async_trait::async_trait;
15use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
16use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer};
17use service_manager::{ServiceInstallCtx, ServiceLabel};
18use std::{
19 ffi::OsString,
20 net::{Ipv4Addr, SocketAddr},
21 path::PathBuf,
22 str::FromStr,
23 time::Duration,
24};
25
26pub struct NodeService<'a> {
27 pub service_data: &'a mut NodeServiceData,
28 pub rpc_actions: Box<dyn RpcActions + Send>,
29 pub connection_timeout: Option<Duration>,
31}
32
33impl<'a> NodeService<'a> {
34 pub fn new(
35 service_data: &'a mut NodeServiceData,
36 rpc_actions: Box<dyn RpcActions + Send>,
37 ) -> NodeService<'a> {
38 NodeService {
39 rpc_actions,
40 service_data,
41 connection_timeout: None,
42 }
43 }
44
45 pub fn with_connection_timeout(mut self, connection_timeout: Duration) -> NodeService<'a> {
48 self.connection_timeout = Some(connection_timeout);
49 self
50 }
51}
52
53#[async_trait]
54impl ServiceStateActions for NodeService<'_> {
55 fn bin_path(&self) -> PathBuf {
56 self.service_data.antnode_path.clone()
57 }
58
59 fn build_upgrade_install_context(&self, options: UpgradeOptions) -> Result<ServiceInstallCtx> {
60 let label: ServiceLabel = self.service_data.service_name.parse()?;
61 let mut args = vec![
62 OsString::from("--rpc"),
63 OsString::from(self.service_data.rpc_socket_addr.to_string()),
64 OsString::from("--root-dir"),
65 OsString::from(
66 self.service_data
67 .data_dir_path
68 .to_string_lossy()
69 .to_string(),
70 ),
71 OsString::from("--log-output-dest"),
72 OsString::from(self.service_data.log_dir_path.to_string_lossy().to_string()),
73 ];
74
75 push_arguments_from_initial_peers_config(&self.service_data.peers_args, &mut args);
76 if let Some(log_fmt) = self.service_data.log_format {
77 args.push(OsString::from("--log-format"));
78 args.push(OsString::from(log_fmt.as_str()));
79 }
80 if let Some(id) = self.service_data.network_id {
81 args.push(OsString::from("--network-id"));
82 args.push(OsString::from(id.to_string()));
83 }
84 if self.service_data.upnp {
85 args.push(OsString::from("--upnp"));
86 }
87 if self.service_data.home_network {
88 args.push(OsString::from("--home-network"));
89 }
90
91 if let Some(node_ip) = self.service_data.node_ip {
92 args.push(OsString::from("--ip"));
93 args.push(OsString::from(node_ip.to_string()));
94 }
95
96 if let Some(node_port) = self.service_data.node_port {
97 args.push(OsString::from("--port"));
98 args.push(OsString::from(node_port.to_string()));
99 }
100 if let Some(metrics_port) = self.service_data.metrics_port {
101 args.push(OsString::from("--metrics-server-port"));
102 args.push(OsString::from(metrics_port.to_string()));
103 }
104 if let Some(max_archived_log_files) = self.service_data.max_archived_log_files {
105 args.push(OsString::from("--max-archived-log-files"));
106 args.push(OsString::from(max_archived_log_files.to_string()));
107 }
108 if let Some(max_log_files) = self.service_data.max_log_files {
109 args.push(OsString::from("--max-log-files"));
110 args.push(OsString::from(max_log_files.to_string()));
111 }
112
113 args.push(OsString::from("--rewards-address"));
114 args.push(OsString::from(
115 self.service_data.rewards_address.to_string(),
116 ));
117
118 args.push(OsString::from(self.service_data.evm_network.to_string()));
119 if let EvmNetwork::Custom(custom_network) = &self.service_data.evm_network {
120 args.push(OsString::from("--rpc-url"));
121 args.push(OsString::from(custom_network.rpc_url_http.to_string()));
122 args.push(OsString::from("--payment-token-address"));
123 args.push(OsString::from(
124 custom_network.payment_token_address.to_string(),
125 ));
126 args.push(OsString::from("--data-payments-address"));
127 args.push(OsString::from(
128 custom_network.data_payments_address.to_string(),
129 ));
130 }
131
132 Ok(ServiceInstallCtx {
133 args,
134 autostart: options.auto_restart,
135 contents: None,
136 environment: options.env_variables,
137 label: label.clone(),
138 program: self.service_data.antnode_path.to_path_buf(),
139 username: self.service_data.user.clone(),
140 working_directory: None,
141 disable_restart_on_failure: true,
142 })
143 }
144
145 fn data_dir_path(&self) -> PathBuf {
146 self.service_data.data_dir_path.clone()
147 }
148
149 fn is_user_mode(&self) -> bool {
150 self.service_data.user_mode
151 }
152
153 fn log_dir_path(&self) -> PathBuf {
154 self.service_data.log_dir_path.clone()
155 }
156
157 fn name(&self) -> String {
158 self.service_data.service_name.clone()
159 }
160
161 fn pid(&self) -> Option<u32> {
162 self.service_data.pid
163 }
164
165 fn on_remove(&mut self) {
166 self.service_data.status = ServiceStatus::Removed;
167 }
168
169 async fn on_start(&mut self, pid: Option<u32>, full_refresh: bool) -> Result<()> {
170 let (connected_peers, pid, peer_id) = if full_refresh {
171 debug!(
172 "Performing full refresh for {}",
173 self.service_data.service_name
174 );
175 if let Some(connection_timeout) = self.connection_timeout {
176 debug!(
177 "Performing dynamic startup delay for {}",
178 self.service_data.service_name
179 );
180 self.rpc_actions
181 .is_node_connected_to_network(connection_timeout)
182 .await?;
183 }
184
185 let node_info = self
186 .rpc_actions
187 .node_info()
188 .await
189 .inspect_err(|err| error!("Error obtaining node_info via RPC: {err:?}"))?;
190 let network_info = self
191 .rpc_actions
192 .network_info()
193 .await
194 .inspect_err(|err| error!("Error obtaining network_info via RPC: {err:?}"))?;
195
196 self.service_data.listen_addr = Some(
197 network_info
198 .listeners
199 .iter()
200 .cloned()
201 .map(|addr| addr.with(Protocol::P2p(node_info.peer_id)))
202 .collect(),
203 );
204 for addr in &network_info.listeners {
205 if let Some(port) = get_port_from_multiaddr(addr) {
206 debug!(
207 "Found antnode port for {}: {port}",
208 self.service_data.service_name
209 );
210 self.service_data.node_port = Some(port);
211 break;
212 }
213 }
214
215 if self.service_data.node_port.is_none() {
216 error!("Could not find antnode port");
217 error!("This will cause the node to have a different port during upgrade");
218 }
219
220 (
221 Some(network_info.connected_peers),
222 pid,
223 Some(node_info.peer_id),
224 )
225 } else {
226 debug!(
227 "Performing partial refresh for {}",
228 self.service_data.service_name
229 );
230 debug!("Previously assigned data will be used");
231 (
232 self.service_data.connected_peers.clone(),
233 pid,
234 self.service_data.peer_id,
235 )
236 };
237
238 self.service_data.connected_peers = connected_peers;
239 self.service_data.peer_id = peer_id;
240 self.service_data.pid = pid;
241 self.service_data.status = ServiceStatus::Running;
242 Ok(())
243 }
244
245 async fn on_stop(&mut self) -> Result<()> {
246 debug!("Marking {} as stopped", self.service_data.service_name);
247 self.service_data.pid = None;
248 self.service_data.status = ServiceStatus::Stopped;
249 self.service_data.connected_peers = None;
250 Ok(())
251 }
252
253 fn set_version(&mut self, version: &str) {
254 self.service_data.version = version.to_string();
255 }
256
257 fn status(&self) -> ServiceStatus {
258 self.service_data.status.clone()
259 }
260
261 fn version(&self) -> String {
262 self.service_data.version.clone()
263 }
264}
265
266#[derive(Clone, Debug, Serialize, Deserialize)]
267pub struct NodeServiceData {
268 pub antnode_path: PathBuf,
269 #[serde(default)]
270 pub auto_restart: bool,
271 #[serde(
272 serialize_with = "serialize_connected_peers",
273 deserialize_with = "deserialize_connected_peers"
274 )]
275 pub connected_peers: Option<Vec<PeerId>>,
276 pub data_dir_path: PathBuf,
277 #[serde(default)]
278 pub evm_network: EvmNetwork,
279 pub home_network: bool,
280 pub listen_addr: Option<Vec<Multiaddr>>,
281 pub log_dir_path: PathBuf,
282 pub log_format: Option<LogFormat>,
283 pub max_archived_log_files: Option<usize>,
284 pub max_log_files: Option<usize>,
285 #[serde(default)]
286 pub metrics_port: Option<u16>,
287 pub network_id: Option<u8>,
288 #[serde(default)]
289 pub node_ip: Option<Ipv4Addr>,
290 #[serde(default)]
291 pub node_port: Option<u16>,
292 pub number: u16,
293 #[serde(
294 serialize_with = "serialize_peer_id",
295 deserialize_with = "deserialize_peer_id"
296 )]
297 pub peer_id: Option<PeerId>,
298 pub peers_args: InitialPeersConfig,
299 pub pid: Option<u32>,
300 #[serde(default)]
301 pub rewards_address: RewardsAddress,
302 pub reward_balance: Option<AttoTokens>,
303 pub rpc_socket_addr: SocketAddr,
304 pub service_name: String,
305 pub status: ServiceStatus,
306 #[serde(default = "default_upnp")]
307 pub upnp: bool,
308 pub user: Option<String>,
309 pub user_mode: bool,
310 pub version: String,
311}
312
313fn default_upnp() -> bool {
314 false
315}
316
317fn serialize_peer_id<S>(value: &Option<PeerId>, serializer: S) -> Result<S::Ok, S::Error>
318where
319 S: Serializer,
320{
321 if let Some(peer_id) = value {
322 return serializer.serialize_str(&peer_id.to_string());
323 }
324 serializer.serialize_none()
325}
326
327fn deserialize_peer_id<'de, D>(deserializer: D) -> Result<Option<PeerId>, D::Error>
328where
329 D: Deserializer<'de>,
330{
331 let s: Option<String> = Option::deserialize(deserializer)?;
332 if let Some(peer_id_str) = s {
333 PeerId::from_str(&peer_id_str)
334 .map(Some)
335 .map_err(DeError::custom)
336 } else {
337 Ok(None)
338 }
339}
340
341fn serialize_connected_peers<S>(
342 connected_peers: &Option<Vec<PeerId>>,
343 serializer: S,
344) -> Result<S::Ok, S::Error>
345where
346 S: Serializer,
347{
348 match connected_peers {
349 Some(peers) => {
350 let peer_strs: Vec<String> = peers.iter().map(|p| p.to_string()).collect();
351 serializer.serialize_some(&peer_strs)
352 }
353 None => serializer.serialize_none(),
354 }
355}
356
357fn deserialize_connected_peers<'de, D>(deserializer: D) -> Result<Option<Vec<PeerId>>, D::Error>
358where
359 D: Deserializer<'de>,
360{
361 let vec: Option<Vec<String>> = Option::deserialize(deserializer)?;
362 match vec {
363 Some(peer_strs) => {
364 let peers: Result<Vec<PeerId>, _> = peer_strs
365 .into_iter()
366 .map(|s| PeerId::from_str(&s).map_err(DeError::custom))
367 .collect();
368 peers.map(Some)
369 }
370 None => Ok(None),
371 }
372}
373
374impl NodeServiceData {
375 pub fn get_antnode_port(&self) -> Option<u16> {
377 if let Some(multi_addrs) = &self.listen_addr {
379 println!("Listening addresses are defined");
380 for addr in multi_addrs {
381 if let Some(port) = get_port_from_multiaddr(addr) {
382 println!("Found port: {}", port);
383 return Some(port);
384 }
385 }
386 }
387 None
388 }
389
390 pub fn get_critical_failure(&self) -> Option<(chrono::DateTime<chrono::Utc>, String)> {
392 const CRITICAL_FAILURE_LOG_FILE: &str = "critical_failure.log";
393
394 let log_path = self.log_dir_path.join(CRITICAL_FAILURE_LOG_FILE);
395
396 if let Ok(content) = std::fs::read_to_string(log_path) {
397 if let Some((timestamp, message)) = content.split_once(']') {
398 let timestamp_trimmed = timestamp.trim_start_matches('[').trim();
399 if let Ok(datetime) = timestamp_trimmed.parse::<chrono::DateTime<chrono::Utc>>() {
400 let message_trimmed = message
401 .trim()
402 .trim_start_matches("Node terminated due to: ");
403 return Some((datetime, message_trimmed.to_string()));
404 }
405 }
406 }
407
408 None
409 }
410}
411
412pub fn push_arguments_from_initial_peers_config(
414 init_peers_config: &InitialPeersConfig,
415 args: &mut Vec<OsString>,
416) {
417 if init_peers_config.first {
418 args.push(OsString::from("--first"));
419 }
420 if init_peers_config.local {
421 args.push(OsString::from("--local"));
422 }
423 if !init_peers_config.addrs.is_empty() {
424 let peers_str = init_peers_config
425 .addrs
426 .iter()
427 .map(|peer| peer.to_string())
428 .collect::<Vec<_>>()
429 .join(",");
430 args.push(OsString::from("--peer"));
431 args.push(OsString::from(peers_str));
432 }
433 if !init_peers_config.network_contacts_url.is_empty() {
434 args.push(OsString::from("--network-contacts-url"));
435 args.push(OsString::from(
436 init_peers_config
437 .network_contacts_url
438 .iter()
439 .map(|url| url.to_string())
440 .collect::<Vec<_>>()
441 .join(","),
442 ));
443 }
444 if init_peers_config.disable_mainnet_contacts {
445 args.push(OsString::from("--testnet"));
446 }
447 if init_peers_config.ignore_cache {
448 args.push(OsString::from("--ignore-cache"));
449 }
450 if let Some(path) = &init_peers_config.bootstrap_cache_dir {
451 args.push(OsString::from("--bootstrap-cache-dir"));
452 args.push(OsString::from(path.to_string_lossy().to_string()));
453 }
454}