#![doc(
html_logo_url = "https://github.com/Layr-Labs/eigensdk-rs/assets/91280922/bd13caec-3c00-4afc-839a-b83d2890beb5",
issue_tracker_base_url = "https://github.com/Layr-Labs/eigensdk-rs/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
pub mod error;
use error::NodeApiError;
use ntex::web::{self, App, HttpResponse, HttpServer, Responder};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use tracing::info;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum NodeHealth {
Healthy,
PartiallyHealthy,
Unhealthy,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ServiceStatus {
Up,
Down,
Initializing,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeService {
id: String,
name: String,
description: String,
status: ServiceStatus,
}
#[derive(Clone, Deserialize)]
pub struct NodeInfo {
node_name: String,
node_version: String,
health: NodeHealth,
services: Vec<NodeService>,
}
impl NodeInfo {
pub fn new(node_name: &str, node_version: &str) -> Self {
Self {
node_name: node_name.to_string(),
node_version: node_version.to_string(),
health: NodeHealth::Healthy,
services: Vec::new(),
}
}
fn register_service(&mut self, id: &str, name: &str, description: &str, status: ServiceStatus) {
self.services.push(NodeService {
id: id.to_string(),
name: name.to_string(),
description: description.to_string(),
status,
});
}
fn update_service_status(
&mut self,
service_id: &str,
status: ServiceStatus,
) -> Result<(), NodeApiError> {
for service in self.services.iter_mut() {
if service.id == service_id {
service.status = status;
return Ok(());
}
}
Err(NodeApiError::ServiceIdNotFound(service_id.to_string()))
}
fn deregister_service(&mut self, service_id: &str) -> Result<(), NodeApiError> {
if let Some(index) = self.services.iter().position(|s| s.id == service_id) {
self.services.remove(index);
return Ok(());
}
Err(NodeApiError::ServiceIdNotFound(service_id.to_string()))
}
}
#[derive(Clone)]
pub struct NodeApi(Arc<Mutex<NodeInfo>>);
impl NodeApi {
pub fn new(node_info: NodeInfo) -> Self {
Self(Arc::new(Mutex::new(node_info)))
}
pub fn start_server(&self, ip_port_addr: &str) -> std::io::Result<ntex::server::Server> {
let state = Arc::clone(&self.0);
let server = HttpServer::new(move || {
App::new()
.state(state.clone()) .route("/eigen/node", web::get().to(node_info))
.route("/eigen/node/health", web::get().to(health_check))
.route("/eigen/node/services", web::get().to(list_services))
.route(
"/eigen/node/services/{id}/health",
web::get().to(service_health),
)
})
.bind(ip_port_addr)?
.run();
info!("node api server running at port :{}", ip_port_addr);
Ok(server)
}
pub fn update_health(&mut self, new_health: NodeHealth) {
let mut info = self.0.lock().unwrap();
info.health = new_health;
}
pub fn register_service(
&mut self,
id: &str,
name: &str,
description: &str,
status: ServiceStatus,
) -> Result<(), NodeApiError> {
self.0
.lock()
.map_err(|_| NodeApiError::InternalServerError)?
.register_service(id, name, description, status);
Ok(())
}
pub fn update_service_status(
&mut self,
service_id: &str,
status: ServiceStatus,
) -> Result<(), NodeApiError> {
self.0
.lock()
.map_err(|_| NodeApiError::InternalServerError)?
.update_service_status(service_id, status)
}
pub fn deregister_service(&mut self, service_id: &str) -> Result<(), NodeApiError> {
self.0
.lock()
.map_err(|_| NodeApiError::InternalServerError)?
.deregister_service(service_id)
}
}
async fn node_info(api: web::types::State<Arc<Mutex<NodeInfo>>>) -> impl Responder {
let data = match api.lock() {
Ok(guard) => guard,
Err(err) => {
return HttpResponse::InternalServerError()
.body(format!("Internal Server Error: {}", err));
}
};
let response = serde_json::json!({
"node_name": data.node_name,
"node_version": data.node_version,
"spec_version": "v0.0.1",
});
HttpResponse::Ok().json(&response)
}
async fn health_check(api: web::types::State<Arc<Mutex<NodeInfo>>>) -> impl Responder {
let data = match api.lock() {
Ok(guard) => guard,
Err(err) => {
return HttpResponse::InternalServerError()
.body(format!("Internal Server Error: {}", err));
}
};
let health = &data.health;
match health {
NodeHealth::Healthy => HttpResponse::Ok().finish(),
NodeHealth::PartiallyHealthy => HttpResponse::PartialContent().finish(),
NodeHealth::Unhealthy => HttpResponse::ServiceUnavailable().finish(),
}
}
async fn list_services(api: web::types::State<Arc<Mutex<NodeInfo>>>) -> impl Responder {
let data = match api.lock() {
Ok(guard) => guard,
Err(err) => {
return HttpResponse::InternalServerError()
.body(format!("Internal Server Error: {}", err));
}
};
let services = &data.services;
HttpResponse::Ok().json(&serde_json::json!({ "services": *services }))
}
async fn service_health(
api: web::types::State<Arc<Mutex<NodeInfo>>>,
path: web::types::Path<String>,
) -> impl Responder {
let service_id = path.into_inner();
let data = match api.lock() {
Ok(guard) => guard,
Err(err) => {
return HttpResponse::InternalServerError()
.body(format!("Internal Server Error: {}", err));
}
};
let services = &data.services;
if let Some(service) = services.iter().find(|s| s.id == service_id) {
match service.status {
ServiceStatus::Up => HttpResponse::Ok().finish(),
ServiceStatus::Down => HttpResponse::ServiceUnavailable().finish(),
ServiceStatus::Initializing => HttpResponse::PartialContent().finish(),
}
} else {
HttpResponse::NotFound().finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use ntex::{http, web::test};
use reqwest::Client;
use std::sync::Arc;
#[tokio::test]
async fn test_node_handler() {
let mut node_info = NodeInfo::new("test_avs", "v0.0.1");
node_info.register_service(
"test_service_id",
"test_service_name",
"test_service_description",
ServiceStatus::Initializing,
);
let state = Arc::new(Mutex::new(node_info));
let app = App::new()
.state(state.clone())
.route("/eigen/node", web::get().to(super::node_info));
let app = test::init_service(app).await;
let req = test::TestRequest::get().uri("/eigen/node").to_request();
let resp = app.call(req).await.unwrap();
assert_eq!(resp.status(), http::StatusCode::OK);
let body = test::read_body(resp).await;
let body_str = String::from_utf8_lossy(&body);
let expected_body =
"{\"node_name\":\"test_avs\",\"node_version\":\"v0.0.1\",\"spec_version\":\"v0.0.1\"}";
assert_eq!(body_str, expected_body);
}
#[tokio::test]
async fn test_list_services_handler() {
let tests = vec![
(
Arc::new(Mutex::new(NodeInfo::new("test_avs", "v0.0.1"))),
http::StatusCode::OK,
"{\"services\":[]}",
),
(
{
let mut node_api = NodeInfo::new("test_avs", "v0.0.1");
node_api.register_service(
"testServiceId",
"testServiceName",
"testServiceDescription",
ServiceStatus::Up,
);
Arc::new(Mutex::new(node_api))
},
http::StatusCode::OK,
"{\"services\":[{\"id\":\"testServiceId\",\"name\":\"testServiceName\",\"description\":\"testServiceDescription\",\"status\":\"Up\"}]}",
),
(
{
let mut node_info = NodeInfo::new("test_avs", "v0.0.1");
node_info.register_service(
"testServiceId",
"testServiceName",
"testServiceDescription",
ServiceStatus::Up,
);
node_info.register_service(
"testServiceId2",
"testServiceName2",
"testServiceDescription2",
ServiceStatus::Down,
);
Arc::new(Mutex::new(node_info))
},
http::StatusCode::OK,
"{\"services\":[{\"id\":\"testServiceId\",\"name\":\"testServiceName\",\"description\":\"testServiceDescription\",\"status\":\"Up\"},{\"id\":\"testServiceId2\",\"name\":\"testServiceName2\",\"description\":\"testServiceDescription2\",\"status\":\"Down\"}]}",
),
];
for (node_api, expected_status, expected_body) in tests {
let app = test::init_service(
App::new()
.state(node_api.clone())
.route("/eigen/node/services", web::get().to(list_services)),
)
.await;
let req = test::TestRequest::get()
.uri("/eigen/node/services")
.to_request();
let resp = app.call(req).await.unwrap();
assert_eq!(resp.status(), expected_status);
let body = test::read_body(resp).await;
let body_str = String::from_utf8_lossy(&body);
let actual_json: serde_json::Value = serde_json::from_str(&body_str).unwrap();
let expected_json: serde_json::Value = serde_json::from_str(expected_body).unwrap();
assert_eq!(actual_json, expected_json);
}
}
#[tokio::test]
async fn test_service_health_handler() {
let mut node_info = NodeInfo::new("test_avs", "v0.0.1");
node_info.register_service(
"testServiceId",
"testServiceName",
"testServiceDescription",
ServiceStatus::Up,
);
let state = Arc::new(Mutex::new(node_info));
let app = test::init_service(App::new().state(state.clone()).route(
"/eigen/node/services/{id}/health",
web::get().to(service_health),
))
.await;
let req = test::TestRequest::get()
.uri("/eigen/node/services/testServiceId/health")
.to_request();
let resp = app.call(req).await.unwrap();
assert_eq!(resp.status(), http::StatusCode::OK);
let req = test::TestRequest::get()
.uri("/eigen/node/services/nonexistentService/health")
.to_request();
let resp = app.call(req).await.unwrap();
assert_eq!(resp.status(), http::StatusCode::NOT_FOUND);
let body = test::read_body(resp).await;
assert_eq!(body.len(), 0);
}
#[ntex::test]
async fn test_create_server() -> std::io::Result<()> {
let mut node_info = NodeInfo::new("test_node", "v1.0.0");
node_info.register_service(
"test_service",
"Test Service",
"Test service description",
ServiceStatus::Up,
);
let ip_port_addr = "127.0.0.1:8081";
let mut node_api = NodeApi::new(node_info);
let server = node_api.start_server(ip_port_addr).unwrap();
ntex::rt::spawn(server);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let client = Client::new();
let resp = client
.get(format!("http://{}/eigen/node", ip_port_addr))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
let resp = client
.get(format!("http://{}/eigen/node/health", ip_port_addr))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
let resp = client
.get(format!("http://{}/eigen/node/services", ip_port_addr))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
let resp = client
.get(format!(
"http://{}/eigen/node/services/test_service/health",
ip_port_addr
))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
node_api
.update_service_status("test_service", ServiceStatus::Down)
.unwrap();
let resp = client
.get(format!(
"http://{}/eigen/node/services/test_service/health",
ip_port_addr
))
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::SERVICE_UNAVAILABLE);
Ok(())
}
}