1#![doc(
2 html_logo_url = "https://github.com/Layr-Labs/eigensdk-rs/assets/91280922/bd13caec-3c00-4afc-839a-b83d2890beb5",
3 issue_tracker_base_url = "https://github.com/Layr-Labs/eigensdk-rs/issues/"
4)]
5#![cfg_attr(not(test), warn(unused_crate_dependencies))]
6
7pub mod error;
8
9use error::NodeApiError;
10use ntex::web::{self, App, HttpResponse, HttpServer, Responder};
11use serde::{Deserialize, Serialize};
12use std::sync::{Arc, Mutex};
13use tracing::info;
14
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
16pub enum NodeHealth {
17 Healthy,
18 PartiallyHealthy,
19 Unhealthy,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub enum ServiceStatus {
24 Up,
25 Down,
26 Initializing,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct NodeService {
31 id: String,
32 name: String,
33 description: String,
34 status: ServiceStatus,
35}
36
37#[derive(Clone, Deserialize)]
38pub struct NodeInfo {
39 node_name: String,
40 node_version: String,
41 health: NodeHealth,
42 services: Vec<NodeService>,
43}
44
45impl NodeInfo {
46 pub fn new(node_name: &str, node_version: &str) -> Self {
58 Self {
59 node_name: node_name.to_string(),
60 node_version: node_version.to_string(),
61 health: NodeHealth::Healthy,
62 services: Vec::new(),
63 }
64 }
65
66 fn register_service(&mut self, id: &str, name: &str, description: &str, status: ServiceStatus) {
68 self.services.push(NodeService {
69 id: id.to_string(),
70 name: name.to_string(),
71 description: description.to_string(),
72 status,
73 });
74 }
75
76 fn update_service_status(
77 &mut self,
78 service_id: &str,
79 status: ServiceStatus,
80 ) -> Result<(), NodeApiError> {
81 for service in self.services.iter_mut() {
82 if service.id == service_id {
83 service.status = status;
84 return Ok(());
85 }
86 }
87
88 Err(NodeApiError::ServiceIdNotFound(service_id.to_string()))
89 }
90
91 fn deregister_service(&mut self, service_id: &str) -> Result<(), NodeApiError> {
92 if let Some(index) = self.services.iter().position(|s| s.id == service_id) {
93 self.services.remove(index);
94 return Ok(());
95 }
96 Err(NodeApiError::ServiceIdNotFound(service_id.to_string()))
97 }
98}
99
100#[derive(Clone)]
101pub struct NodeApi(Arc<Mutex<NodeInfo>>);
102
103impl NodeApi {
104 pub fn new(node_info: NodeInfo) -> Self {
114 Self(Arc::new(Mutex::new(node_info)))
115 }
116
117 pub fn start_server(&self, ip_port_addr: &str) -> std::io::Result<ntex::server::Server> {
121 let state = Arc::clone(&self.0);
122 let server = HttpServer::new(move || {
123 App::new()
124 .state(state.clone()) .route("/eigen/node", web::get().to(node_info))
126 .route("/eigen/node/health", web::get().to(health_check))
127 .route("/eigen/node/services", web::get().to(list_services))
128 .route(
129 "/eigen/node/services/{id}/health",
130 web::get().to(service_health),
131 )
132 })
133 .bind(ip_port_addr)?
134 .run();
135 info!("node api server running at port :{}", ip_port_addr);
136 Ok(server)
137 }
138
139 pub fn update_health(&mut self, new_health: NodeHealth) {
146 let mut info = self.0.lock().unwrap();
147 info.health = new_health;
148 }
149
150 pub fn register_service(
159 &mut self,
160 id: &str,
161 name: &str,
162 description: &str,
163 status: ServiceStatus,
164 ) -> Result<(), NodeApiError> {
165 self.0
166 .lock()
167 .map_err(|_| NodeApiError::InternalServerError)?
168 .register_service(id, name, description, status);
169
170 Ok(())
171 }
172
173 pub fn update_service_status(
185 &mut self,
186 service_id: &str,
187 status: ServiceStatus,
188 ) -> Result<(), NodeApiError> {
189 self.0
190 .lock()
191 .map_err(|_| NodeApiError::InternalServerError)?
192 .update_service_status(service_id, status)
193 }
194
195 pub fn deregister_service(&mut self, service_id: &str) -> Result<(), NodeApiError> {
206 self.0
207 .lock()
208 .map_err(|_| NodeApiError::InternalServerError)?
209 .deregister_service(service_id)
210 }
211}
212
213async fn node_info(api: web::types::State<Arc<Mutex<NodeInfo>>>) -> impl Responder {
214 let data = match api.lock() {
215 Ok(guard) => guard,
216 Err(err) => {
217 return HttpResponse::InternalServerError()
218 .body(format!("Internal Server Error: {}", err));
219 }
220 };
221 let response = serde_json::json!({
222 "node_name": data.node_name,
223 "node_version": data.node_version,
224 "spec_version": "v0.0.1",
225 });
226 HttpResponse::Ok().json(&response)
227}
228
229async fn health_check(api: web::types::State<Arc<Mutex<NodeInfo>>>) -> impl Responder {
230 let data = match api.lock() {
231 Ok(guard) => guard,
232 Err(err) => {
233 return HttpResponse::InternalServerError()
234 .body(format!("Internal Server Error: {}", err));
235 }
236 };
237 let health = &data.health;
238
239 match health {
240 NodeHealth::Healthy => HttpResponse::Ok().finish(),
241 NodeHealth::PartiallyHealthy => HttpResponse::PartialContent().finish(),
242 NodeHealth::Unhealthy => HttpResponse::ServiceUnavailable().finish(),
243 }
244}
245
246async fn list_services(api: web::types::State<Arc<Mutex<NodeInfo>>>) -> impl Responder {
247 let data = match api.lock() {
248 Ok(guard) => guard,
249 Err(err) => {
250 return HttpResponse::InternalServerError()
251 .body(format!("Internal Server Error: {}", err));
252 }
253 };
254 let services = &data.services;
255 HttpResponse::Ok().json(&serde_json::json!({ "services": *services }))
256}
257
258async fn service_health(
259 api: web::types::State<Arc<Mutex<NodeInfo>>>,
260 path: web::types::Path<String>,
261) -> impl Responder {
262 let service_id = path.into_inner();
263 let data = match api.lock() {
264 Ok(guard) => guard,
265 Err(err) => {
266 return HttpResponse::InternalServerError()
267 .body(format!("Internal Server Error: {}", err));
268 }
269 };
270 let services = &data.services;
271
272 if let Some(service) = services.iter().find(|s| s.id == service_id) {
273 match service.status {
274 ServiceStatus::Up => HttpResponse::Ok().finish(),
275 ServiceStatus::Down => HttpResponse::ServiceUnavailable().finish(),
276 ServiceStatus::Initializing => HttpResponse::PartialContent().finish(),
277 }
278 } else {
279 HttpResponse::NotFound().finish()
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use ntex::{http, web::test};
287 use reqwest::Client;
288 use std::sync::Arc;
289
290 #[tokio::test]
291 async fn test_node_handler() {
292 let mut node_info = NodeInfo::new("test_avs", "v0.0.1");
293 node_info.register_service(
294 "test_service_id",
295 "test_service_name",
296 "test_service_description",
297 ServiceStatus::Initializing,
298 );
299 let state = Arc::new(Mutex::new(node_info));
300
301 let app = App::new()
302 .state(state.clone())
303 .route("/eigen/node", web::get().to(super::node_info));
304 let app = test::init_service(app).await;
305
306 let req = test::TestRequest::get().uri("/eigen/node").to_request();
307 let resp = app.call(req).await.unwrap();
308 assert_eq!(resp.status(), http::StatusCode::OK);
309 let body = test::read_body(resp).await;
310 let body_str = String::from_utf8_lossy(&body);
311
312 let expected_body =
314 "{\"node_name\":\"test_avs\",\"node_version\":\"v0.0.1\",\"spec_version\":\"v0.0.1\"}";
315
316 assert_eq!(body_str, expected_body);
317 }
318
319 #[tokio::test]
320 async fn test_list_services_handler() {
321 let tests = vec![
322 (
323 Arc::new(Mutex::new(NodeInfo::new("test_avs", "v0.0.1"))),
324 http::StatusCode::OK,
325 "{\"services\":[]}",
326 ),
327 (
328 {
329 let mut node_api = NodeInfo::new("test_avs", "v0.0.1");
330 node_api.register_service(
331 "testServiceId",
332 "testServiceName",
333 "testServiceDescription",
334 ServiceStatus::Up,
335 );
336 Arc::new(Mutex::new(node_api))
337 },
338 http::StatusCode::OK,
339 "{\"services\":[{\"id\":\"testServiceId\",\"name\":\"testServiceName\",\"description\":\"testServiceDescription\",\"status\":\"Up\"}]}",
340 ),
341 (
342 {
343 let mut node_info = NodeInfo::new("test_avs", "v0.0.1");
344 node_info.register_service(
345 "testServiceId",
346 "testServiceName",
347 "testServiceDescription",
348 ServiceStatus::Up,
349 );
350 node_info.register_service(
351 "testServiceId2",
352 "testServiceName2",
353 "testServiceDescription2",
354 ServiceStatus::Down,
355 );
356 Arc::new(Mutex::new(node_info))
357 },
358 http::StatusCode::OK,
359 "{\"services\":[{\"id\":\"testServiceId\",\"name\":\"testServiceName\",\"description\":\"testServiceDescription\",\"status\":\"Up\"},{\"id\":\"testServiceId2\",\"name\":\"testServiceName2\",\"description\":\"testServiceDescription2\",\"status\":\"Down\"}]}",
360 ),
361 ];
362
363 for (node_api, expected_status, expected_body) in tests {
364 let app = test::init_service(
365 App::new()
366 .state(node_api.clone())
367 .route("/eigen/node/services", web::get().to(list_services)),
368 )
369 .await;
370
371 let req = test::TestRequest::get()
372 .uri("/eigen/node/services")
373 .to_request();
374 let resp = app.call(req).await.unwrap();
375
376 assert_eq!(resp.status(), expected_status);
378
379 let body = test::read_body(resp).await;
380 let body_str = String::from_utf8_lossy(&body);
381
382 let actual_json: serde_json::Value = serde_json::from_str(&body_str).unwrap();
383 let expected_json: serde_json::Value = serde_json::from_str(expected_body).unwrap();
384
385 assert_eq!(actual_json, expected_json);
386 }
387 }
388
389 #[tokio::test]
390 async fn test_service_health_handler() {
391 let mut node_info = NodeInfo::new("test_avs", "v0.0.1");
392
393 node_info.register_service(
395 "testServiceId",
396 "testServiceName",
397 "testServiceDescription",
398 ServiceStatus::Up,
399 );
400
401 let state = Arc::new(Mutex::new(node_info));
402
403 let app = test::init_service(App::new().state(state.clone()).route(
405 "/eigen/node/services/{id}/health",
406 web::get().to(service_health),
407 ))
408 .await;
409
410 let req = test::TestRequest::get()
412 .uri("/eigen/node/services/testServiceId/health")
413 .to_request();
414
415 let resp = app.call(req).await.unwrap();
416 assert_eq!(resp.status(), http::StatusCode::OK); let req = test::TestRequest::get()
420 .uri("/eigen/node/services/nonexistentService/health")
421 .to_request();
422
423 let resp = app.call(req).await.unwrap();
424 assert_eq!(resp.status(), http::StatusCode::NOT_FOUND);
425 let body = test::read_body(resp).await;
426 assert_eq!(body.len(), 0);
427 }
428
429 #[ntex::test]
430 async fn test_create_server() -> std::io::Result<()> {
431 let mut node_info = NodeInfo::new("test_node", "v1.0.0");
433 node_info.register_service(
434 "test_service",
435 "Test Service",
436 "Test service description",
437 ServiceStatus::Up,
438 );
439
440 let ip_port_addr = "127.0.0.1:8081";
442
443 let mut node_api = NodeApi::new(node_info);
444 let server = node_api.start_server(ip_port_addr).unwrap();
445
446 ntex::rt::spawn(server);
448
449 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
451
452 let client = Client::new();
455
456 let resp = client
458 .get(format!("http://{}/eigen/node", ip_port_addr))
459 .send()
460 .await
461 .unwrap();
462 assert_eq!(resp.status(), reqwest::StatusCode::OK);
463
464 let resp = client
466 .get(format!("http://{}/eigen/node/health", ip_port_addr))
467 .send()
468 .await
469 .unwrap();
470 assert_eq!(resp.status(), reqwest::StatusCode::OK);
471
472 let resp = client
474 .get(format!("http://{}/eigen/node/services", ip_port_addr))
475 .send()
476 .await
477 .unwrap();
478 assert_eq!(resp.status(), reqwest::StatusCode::OK);
479
480 let resp = client
482 .get(format!(
483 "http://{}/eigen/node/services/test_service/health",
484 ip_port_addr
485 ))
486 .send()
487 .await
488 .unwrap();
489 assert_eq!(resp.status(), reqwest::StatusCode::OK);
490
491 node_api
497 .update_service_status("test_service", ServiceStatus::Down)
498 .unwrap();
499
500 let resp = client
502 .get(format!(
503 "http://{}/eigen/node/services/test_service/health",
504 ip_port_addr
505 ))
506 .send()
507 .await
508 .unwrap();
509 assert_eq!(resp.status(), reqwest::StatusCode::SERVICE_UNAVAILABLE);
511
512 Ok(())
513 }
514}