ant_service_management/node/
mod.rs1mod node_service_data;
10mod node_service_data_v0;
11mod node_service_data_v1;
12mod node_service_data_v2;
13
14pub use node_service_data::{NODE_SERVICE_DATA_SCHEMA_LATEST, NodeServiceData};
16
17use crate::{ServiceStateActions, ServiceStatus, UpgradeOptions, error::Result, rpc::RpcActions};
18use ant_bootstrap::InitialPeersConfig;
19use ant_evm::EvmNetwork;
20use ant_protocol::get_port_from_multiaddr;
21use libp2p::multiaddr::Protocol;
22use service_manager::{ServiceInstallCtx, ServiceLabel};
23use std::{ffi::OsString, path::PathBuf, sync::Arc, time::Duration};
24use tokio::sync::RwLock;
25use tonic::async_trait;
26
27pub struct NodeService {
28 pub service_data: Arc<RwLock<NodeServiceData>>,
29 pub rpc_actions: Box<dyn RpcActions + Send>,
30 pub connection_timeout: Option<Duration>,
32}
33
34impl NodeService {
35 pub fn new(
36 service_data: Arc<RwLock<NodeServiceData>>,
37 rpc_actions: Box<dyn RpcActions + Send>,
38 ) -> NodeService {
39 NodeService {
40 rpc_actions,
41 service_data,
42 connection_timeout: None,
43 }
44 }
45
46 pub fn with_connection_timeout(mut self, connection_timeout: Duration) -> NodeService {
49 self.connection_timeout = Some(connection_timeout);
50 self
51 }
52}
53
54#[async_trait]
55impl ServiceStateActions for NodeService {
56 async fn bin_path(&self) -> PathBuf {
57 self.service_data.read().await.antnode_path.clone()
58 }
59
60 async fn build_upgrade_install_context(
61 &self,
62 options: UpgradeOptions,
63 ) -> Result<ServiceInstallCtx> {
64 let service_data = self.service_data.read().await;
65 let label: ServiceLabel = service_data.service_name.parse()?;
66 let mut args = vec![
67 OsString::from("--rpc"),
68 OsString::from(service_data.rpc_socket_addr.to_string()),
69 OsString::from("--root-dir"),
70 OsString::from(service_data.data_dir_path.to_string_lossy().to_string()),
71 OsString::from("--log-output-dest"),
72 OsString::from(service_data.log_dir_path.to_string_lossy().to_string()),
73 ];
74
75 push_arguments_from_initial_peers_config(&service_data.initial_peers_config, &mut args);
76 if let Some(log_fmt) = service_data.log_format {
77 args.push(OsString::from("--log-format"));
78 args.push(OsString::from(log_fmt.as_str()));
79 }
80 if let Some(id) = service_data.network_id {
81 args.push(OsString::from("--network-id"));
82 args.push(OsString::from(id.to_string()));
83 }
84 if service_data.no_upnp {
85 args.push(OsString::from("--no-upnp"));
86 }
87 if service_data.relay {
88 args.push(OsString::from("--relay"));
89 }
90
91 if service_data.alpha {
92 args.push(OsString::from("--alpha"));
93 }
94
95 if let Some(node_ip) = service_data.node_ip {
96 args.push(OsString::from("--ip"));
97 args.push(OsString::from(node_ip.to_string()));
98 }
99
100 if let Some(node_port) = service_data.node_port {
101 args.push(OsString::from("--port"));
102 args.push(OsString::from(node_port.to_string()));
103 }
104 if let Some(metrics_port) = service_data.metrics_port {
105 args.push(OsString::from("--metrics-server-port"));
106 args.push(OsString::from(metrics_port.to_string()));
107 }
108 if let Some(max_archived_log_files) = service_data.max_archived_log_files {
109 args.push(OsString::from("--max-archived-log-files"));
110 args.push(OsString::from(max_archived_log_files.to_string()));
111 }
112 if let Some(max_log_files) = service_data.max_log_files {
113 args.push(OsString::from("--max-log-files"));
114 args.push(OsString::from(max_log_files.to_string()));
115 }
116
117 args.push(OsString::from("--rewards-address"));
118 args.push(OsString::from(service_data.rewards_address.to_string()));
119
120 if service_data.write_older_cache_files {
121 args.push(OsString::from("--write-older-cache-files"));
122 }
123
124 args.push(OsString::from(service_data.evm_network.to_string()));
125 if let EvmNetwork::Custom(custom_network) = &service_data.evm_network {
126 args.push(OsString::from("--rpc-url"));
127 args.push(OsString::from(custom_network.rpc_url_http.to_string()));
128 args.push(OsString::from("--payment-token-address"));
129 args.push(OsString::from(
130 custom_network.payment_token_address.to_string(),
131 ));
132 args.push(OsString::from("--data-payments-address"));
133 args.push(OsString::from(
134 custom_network.data_payments_address.to_string(),
135 ));
136 }
137
138 Ok(ServiceInstallCtx {
139 args,
140 autostart: options.auto_restart,
141 contents: None,
142 environment: options.env_variables,
143 label: label.clone(),
144 program: service_data.antnode_path.to_path_buf(),
145 username: service_data.user.clone(),
146 working_directory: None,
147 disable_restart_on_failure: true,
148 })
149 }
150
151 async fn data_dir_path(&self) -> PathBuf {
152 self.service_data.read().await.data_dir_path.clone()
153 }
154
155 async fn is_user_mode(&self) -> bool {
156 self.service_data.read().await.user_mode
157 }
158
159 async fn log_dir_path(&self) -> PathBuf {
160 self.service_data.read().await.log_dir_path.clone()
161 }
162
163 async fn name(&self) -> String {
164 self.service_data.read().await.service_name.clone()
165 }
166
167 async fn pid(&self) -> Option<u32> {
168 self.service_data.read().await.pid
169 }
170
171 async fn on_remove(&self) {
172 self.service_data.write().await.status = ServiceStatus::Removed;
173 }
174
175 async fn on_start(&self, pid: Option<u32>, full_refresh: bool) -> Result<()> {
176 let mut service_data = self.service_data.write().await;
177 let (connected_peers, pid, peer_id) = if full_refresh {
178 debug!("Performing full refresh for {}", service_data.service_name);
179 if let Some(connection_timeout) = self.connection_timeout {
180 debug!(
181 "Performing dynamic startup delay for {}",
182 service_data.service_name
183 );
184 self.rpc_actions
185 .is_node_connected_to_network(connection_timeout)
186 .await?;
187 }
188
189 let node_info = self
190 .rpc_actions
191 .node_info()
192 .await
193 .inspect_err(|err| error!("Error obtaining node_info via RPC: {err:?}"))?;
194 let network_info = self
195 .rpc_actions
196 .network_info()
197 .await
198 .inspect_err(|err| error!("Error obtaining network_info via RPC: {err:?}"))?;
199
200 service_data.listen_addr = Some(
201 network_info
202 .listeners
203 .iter()
204 .cloned()
205 .map(|addr| addr.with(Protocol::P2p(node_info.peer_id)))
206 .collect(),
207 );
208 for addr in &network_info.listeners {
209 if let Some(port) = get_port_from_multiaddr(addr) {
210 debug!(
211 "Found antnode port for {}: {port}",
212 service_data.service_name
213 );
214 service_data.node_port = Some(port);
215 break;
216 }
217 }
218
219 if service_data.node_port.is_none() {
220 error!("Could not find antnode port");
221 error!("This will cause the node to have a different port during upgrade");
222 }
223
224 (
225 Some(network_info.connected_peers),
226 pid,
227 Some(node_info.peer_id),
228 )
229 } else {
230 debug!(
231 "Performing partial refresh for {}",
232 service_data.service_name
233 );
234 debug!("Previously assigned data will be used");
235 (
236 service_data.connected_peers.clone(),
237 pid,
238 service_data.peer_id,
239 )
240 };
241
242 service_data.connected_peers = connected_peers;
243 service_data.peer_id = peer_id;
244 service_data.pid = pid;
245 service_data.status = ServiceStatus::Running;
246 Ok(())
247 }
248
249 async fn on_stop(&self) -> Result<()> {
250 let mut service_data = self.service_data.write().await;
251 debug!("Marking {} as stopped", service_data.service_name);
252 service_data.pid = None;
253 service_data.status = ServiceStatus::Stopped;
254 service_data.connected_peers = None;
255 Ok(())
256 }
257
258 async fn set_version(&self, version: &str) {
259 self.service_data.write().await.version = version.to_string();
260 }
261
262 async fn status(&self) -> ServiceStatus {
263 self.service_data.read().await.status.clone()
264 }
265
266 async fn version(&self) -> String {
267 self.service_data.read().await.version.clone()
268 }
269}
270
271pub fn push_arguments_from_initial_peers_config(
273 init_peers_config: &InitialPeersConfig,
274 args: &mut Vec<OsString>,
275) {
276 if init_peers_config.first {
277 args.push(OsString::from("--first"));
278 }
279 if init_peers_config.local {
280 args.push(OsString::from("--local"));
281 }
282 if !init_peers_config.addrs.is_empty() {
283 let peers_str = init_peers_config
284 .addrs
285 .iter()
286 .map(|peer| peer.to_string())
287 .collect::<Vec<_>>()
288 .join(",");
289 args.push(OsString::from("--peer"));
290 args.push(OsString::from(peers_str));
291 }
292 if !init_peers_config.network_contacts_url.is_empty() {
293 args.push(OsString::from("--network-contacts-url"));
294 args.push(OsString::from(
295 init_peers_config
296 .network_contacts_url
297 .iter()
298 .map(|url| url.to_string())
299 .collect::<Vec<_>>()
300 .join(","),
301 ));
302 }
303 if init_peers_config.ignore_cache {
304 args.push(OsString::from("--ignore-cache"));
305 }
306 if let Some(path) = &init_peers_config.bootstrap_cache_dir {
307 args.push(OsString::from("--bootstrap-cache-dir"));
308 args.push(OsString::from(path.to_string_lossy().to_string()));
309 }
310}