1use std::sync::Arc;
4
5use axum::{
6 Router,
7 response::{IntoResponse, Response},
8 routing::get,
9};
10
11use crate::{Config, chain_sync::SyncStatus, libp2p::PeerManager, networks::ChainConfig};
12
13mod endpoints;
14
15pub const DEFAULT_HEALTHCHECK_PORT: u16 = 2346;
17
18pub(crate) struct ForestState {
20 pub config: Config,
21 pub chain_config: Arc<ChainConfig>,
22 pub genesis_timestamp: u64,
23 pub sync_status: SyncStatus,
24 pub peer_manager: Arc<PeerManager>,
25}
26
27pub(crate) async fn init_healthcheck_server(
35 forest_state: ForestState,
36 tcp_listener: tokio::net::TcpListener,
37) -> anyhow::Result<()> {
38 let healthcheck_service = Router::new()
39 .route("/healthz", get(endpoints::healthz))
40 .route("/readyz", get(endpoints::readyz))
41 .route("/livez", get(endpoints::livez))
42 .with_state(forest_state.into());
43
44 axum::serve(tcp_listener, healthcheck_service).await?;
45 Ok(())
46}
47
48struct AppError(anyhow::Error);
50
51impl IntoResponse for AppError {
52 fn into_response(self) -> Response {
53 (http::StatusCode::SERVICE_UNAVAILABLE, self.0.to_string()).into_response()
54 }
55}
56
57#[cfg(test)]
58mod test {
59 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
60
61 use super::*;
62 use crate::Client;
63 use crate::chain_sync::{NodeSyncStatus, SyncStatusReport};
64 use crate::cli_shared::cli::ChainIndexerConfig;
65 use parking_lot::RwLock;
66 use reqwest::StatusCode;
67
68 #[tokio::test]
69 async fn test_check_readyz() {
70 let healthcheck_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
71 let rpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
72
73 let sync_status = Arc::new(RwLock::new(SyncStatusReport::init()));
74
75 let forest_state = ForestState {
76 config: Config {
77 chain_indexer: ChainIndexerConfig {
78 enable_indexer: true,
79 gc_retention_epochs: None,
80 },
81 client: Client {
82 healthcheck_address,
83 rpc_address: rpc_listener.local_addr().unwrap(),
84 ..Default::default()
85 },
86 ..Default::default()
87 },
88 chain_config: Arc::new(ChainConfig::default()),
89 genesis_timestamp: 0,
90 sync_status: sync_status.clone(),
91 peer_manager: Arc::new(PeerManager::default()),
92 };
93
94 let listener =
95 tokio::net::TcpListener::bind(forest_state.config.client.healthcheck_address)
96 .await
97 .unwrap();
98 let healthcheck_port = listener.local_addr().unwrap().port();
99
100 tokio::spawn(async move {
101 init_healthcheck_server(forest_state, listener)
102 .await
103 .unwrap();
104 });
105
106 let call_healthcheck = |verbose| {
107 reqwest::get(format!(
108 "http://localhost:{}/readyz{}",
109 healthcheck_port,
110 if verbose { "?verbose" } else { "" }
111 ))
112 };
113
114 sync_status.write().status = NodeSyncStatus::Synced;
116 sync_status.write().current_head_epoch = i64::MAX;
117
118 assert_eq!(
119 call_healthcheck(false).await.unwrap().status(),
120 StatusCode::OK
121 );
122 let response = call_healthcheck(true).await.unwrap();
123 assert_eq!(response.status(), StatusCode::OK);
124 let text = response.text().await.unwrap();
125 assert!(text.contains("[+] sync complete"));
126 assert!(text.contains("[+] epoch up to date"));
127 assert!(text.contains("[+] rpc server running"));
128
129 drop(rpc_listener);
131 sync_status.write().status = NodeSyncStatus::Error;
132 sync_status.write().current_head_epoch = 0;
133
134 assert_eq!(
135 call_healthcheck(false).await.unwrap().status(),
136 StatusCode::SERVICE_UNAVAILABLE
137 );
138 let response = call_healthcheck(true).await.unwrap();
139 assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
140
141 let text = response.text().await.unwrap();
142 assert!(text.contains("[!] sync incomplete"));
143 assert!(text.contains("[!] epoch outdated"));
144 assert!(text.contains("[!] rpc server not running"));
145 }
146
147 #[tokio::test]
148 async fn test_check_livez() {
149 let healthcheck_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
150 let rpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
151
152 let sync_status = Arc::new(RwLock::new(SyncStatusReport::default()));
153 let peer_manager = Arc::new(PeerManager::default());
154 let forest_state = ForestState {
155 config: Config {
156 client: Client {
157 healthcheck_address,
158 rpc_address: rpc_listener.local_addr().unwrap(),
159 ..Default::default()
160 },
161 ..Default::default()
162 },
163 chain_config: Arc::new(ChainConfig::default()),
164 genesis_timestamp: 0,
165 sync_status: sync_status.clone(),
166 peer_manager: peer_manager.clone(),
167 };
168
169 let listener =
170 tokio::net::TcpListener::bind(forest_state.config.client.healthcheck_address)
171 .await
172 .unwrap();
173 let healthcheck_port = listener.local_addr().unwrap().port();
174
175 tokio::spawn(async move {
176 init_healthcheck_server(forest_state, listener)
177 .await
178 .unwrap();
179 });
180
181 let call_healthcheck = |verbose| {
182 reqwest::get(format!(
183 "http://localhost:{}/livez{}",
184 healthcheck_port,
185 if verbose { "?verbose" } else { "" }
186 ))
187 };
188
189 sync_status.write().status = NodeSyncStatus::Syncing;
191 let peer = libp2p::PeerId::random();
192 peer_manager.touch_peer(&peer);
193
194 assert_eq!(
195 call_healthcheck(false).await.unwrap().status(),
196 StatusCode::OK
197 );
198
199 let response = call_healthcheck(true).await.unwrap();
200 assert_eq!(response.status(), StatusCode::OK);
201 let text = response.text().await.unwrap();
202 assert!(text.contains("[+] sync ok"));
203 assert!(text.contains("[+] peers connected"));
204
205 sync_status.write().status = NodeSyncStatus::Error;
207 peer_manager.remove_peer(&peer);
208
209 assert_eq!(
210 call_healthcheck(false).await.unwrap().status(),
211 StatusCode::SERVICE_UNAVAILABLE
212 );
213
214 let response = call_healthcheck(true).await.unwrap();
215 assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
216 let text = response.text().await.unwrap();
217 assert!(text.contains("[!] sync error"));
218 assert!(text.contains("[!] no peers connected"));
219 }
220
221 #[tokio::test]
222 async fn test_check_healthz() {
223 let healthcheck_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
224 let rpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
225 let peer_manager = Arc::new(PeerManager::default());
226
227 let sync_status = Arc::new(RwLock::new(SyncStatusReport::default()));
228 let forest_state = ForestState {
229 config: Config {
230 client: Client {
231 healthcheck_address,
232 rpc_address: rpc_listener.local_addr().unwrap(),
233 ..Default::default()
234 },
235 ..Default::default()
236 },
237 chain_config: Arc::new(ChainConfig::default()),
238 genesis_timestamp: 0,
239 sync_status: sync_status.clone(),
240 peer_manager: peer_manager.clone(),
241 };
242
243 let listener =
244 tokio::net::TcpListener::bind(forest_state.config.client.healthcheck_address)
245 .await
246 .unwrap();
247 let healthcheck_port = listener.local_addr().unwrap().port();
248
249 tokio::spawn(async move {
250 init_healthcheck_server(forest_state, listener)
251 .await
252 .unwrap();
253 });
254
255 let call_healthcheck = |verbose| {
256 reqwest::get(format!(
257 "http://localhost:{}/healthz{}",
258 healthcheck_port,
259 if verbose { "?verbose" } else { "" }
260 ))
261 };
262
263 sync_status.write().current_head_epoch = i64::MAX;
265 sync_status.write().status = NodeSyncStatus::Syncing;
266 let peer = libp2p::PeerId::random();
267 peer_manager.touch_peer(&peer);
268
269 assert_eq!(
270 call_healthcheck(false).await.unwrap().status(),
271 StatusCode::OK
272 );
273 let response = call_healthcheck(true).await.unwrap();
274 assert_eq!(response.status(), StatusCode::OK);
275 let text = response.text().await.unwrap();
276 assert!(text.contains("[+] sync ok"));
277 assert!(text.contains("[+] epoch up to date"));
278 assert!(text.contains("[+] rpc server running"));
279 assert!(text.contains("[+] peers connected"));
280
281 drop(rpc_listener);
283 sync_status.write().status = NodeSyncStatus::Error;
284 sync_status.write().current_head_epoch = 0;
285 peer_manager.remove_peer(&peer);
286
287 assert_eq!(
288 call_healthcheck(false).await.unwrap().status(),
289 StatusCode::SERVICE_UNAVAILABLE
290 );
291 let response = call_healthcheck(true).await.unwrap();
292 assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
293
294 let text = response.text().await.unwrap();
295 assert!(text.contains("[!] sync error"));
296 assert!(text.contains("[!] epoch outdated"));
297 assert!(text.contains("[!] rpc server not running"));
298 assert!(text.contains("[!] no peers connected"));
299 }
300
301 #[tokio::test]
302 async fn test_check_unknown_healthcheck_endpoint() {
303 let healthcheck_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
304 let forest_state = ForestState {
305 config: Config {
306 client: Client {
307 healthcheck_address,
308 ..Default::default()
309 },
310 ..Default::default()
311 },
312 chain_config: Arc::default(),
313 genesis_timestamp: 0,
314 sync_status: Arc::new(RwLock::new(SyncStatusReport::default())),
315 peer_manager: Arc::default(),
316 };
317 let listener =
318 tokio::net::TcpListener::bind(forest_state.config.client.healthcheck_address)
319 .await
320 .unwrap();
321 let healthcheck_port = listener.local_addr().unwrap().port();
322
323 tokio::spawn(async move {
324 init_healthcheck_server(forest_state, listener)
325 .await
326 .unwrap();
327 });
328
329 let response = reqwest::get(format!(
330 "http://localhost:{healthcheck_port}/phngluimglwnafhcthulhurlyehwgahnaglfhtagn"
331 ))
332 .await
333 .unwrap();
334
335 assert_eq!(response.status(), StatusCode::NOT_FOUND);
336 }
337}