eigen_nodeapi/
lib.rs

1#![doc(
2    html_logo_url = "https://github.com/Layr-Labs/eigensdk-rs/assets/91280922/bd13caec-3c00-4afc-839a-b83d2890beb5",
3    issue_tracker_base_url = "https://github.com/Layr-Labs/eigensdk-rs/issues/"
4)]
5#![cfg_attr(not(test), warn(unused_crate_dependencies))]
6
7pub mod error;
8
9use error::NodeApiError;
10use ntex::web::{self, App, HttpResponse, HttpServer, Responder};
11use serde::{Deserialize, Serialize};
12use std::sync::{Arc, Mutex};
13use tracing::info;
14
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
16pub enum NodeHealth {
17    Healthy,
18    PartiallyHealthy,
19    Unhealthy,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub enum ServiceStatus {
24    Up,
25    Down,
26    Initializing,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct NodeService {
31    id: String,
32    name: String,
33    description: String,
34    status: ServiceStatus,
35}
36
37#[derive(Clone, Deserialize)]
38pub struct NodeInfo {
39    node_name: String,
40    node_version: String,
41    health: NodeHealth,
42    services: Vec<NodeService>,
43}
44
45impl NodeInfo {
46    /// Creates a new instance of [`NodeApi`].
47    ///
48    /// # Arguments
49    ///
50    /// * `node_name` - A string slice that holds the name of the node.
51    /// * `node_version` - A string slice that holds the version of the node.
52    ///
53    /// # Returns
54    ///
55    /// A new instance of [`NodeApi`] with the specified node name and version,
56    /// and default health status set to `Healthy` and an empty list of services.
57    pub fn new(node_name: &str, node_version: &str) -> Self {
58        Self {
59            node_name: node_name.to_string(),
60            node_version: node_version.to_string(),
61            health: NodeHealth::Healthy,
62            services: Vec::new(),
63        }
64    }
65
66    /// Add a service to the node
67    fn register_service(&mut self, id: &str, name: &str, description: &str, status: ServiceStatus) {
68        self.services.push(NodeService {
69            id: id.to_string(),
70            name: name.to_string(),
71            description: description.to_string(),
72            status,
73        });
74    }
75
76    fn update_service_status(
77        &mut self,
78        service_id: &str,
79        status: ServiceStatus,
80    ) -> Result<(), NodeApiError> {
81        for service in self.services.iter_mut() {
82            if service.id == service_id {
83                service.status = status;
84                return Ok(());
85            }
86        }
87
88        Err(NodeApiError::ServiceIdNotFound(service_id.to_string()))
89    }
90
91    fn deregister_service(&mut self, service_id: &str) -> Result<(), NodeApiError> {
92        if let Some(index) = self.services.iter().position(|s| s.id == service_id) {
93            self.services.remove(index);
94            return Ok(());
95        }
96        Err(NodeApiError::ServiceIdNotFound(service_id.to_string()))
97    }
98}
99
100#[derive(Clone)]
101pub struct NodeApi(Arc<Mutex<NodeInfo>>);
102
103impl NodeApi {
104    /// Creates a new instance of [`NodeApi`].
105    ///
106    /// # Arguments
107    ///
108    /// * `node_info` - A [`NodeInfo`] instance that holds the node information.
109    ///
110    /// # Returns
111    ///
112    /// A new instance of [`NodeApi`] with the provided node information.
113    pub fn new(node_info: NodeInfo) -> Self {
114        Self(Arc::new(Mutex::new(node_info)))
115    }
116
117    /// Function to create the Ntex HTTP server
118    /// This function sets up the server and routes.
119    /// External users can call this function to create and run the server.
120    pub fn start_server(&self, ip_port_addr: &str) -> std::io::Result<ntex::server::Server> {
121        let state = Arc::clone(&self.0);
122        let server = HttpServer::new(move || {
123            App::new()
124                .state(state.clone()) // Use the provided NodeApi instance
125                .route("/eigen/node", web::get().to(node_info))
126                .route("/eigen/node/health", web::get().to(health_check))
127                .route("/eigen/node/services", web::get().to(list_services))
128                .route(
129                    "/eigen/node/services/{id}/health",
130                    web::get().to(service_health),
131                )
132        })
133        .bind(ip_port_addr)?
134        .run();
135        info!("node api server running at port :{}", ip_port_addr);
136        Ok(server)
137    }
138
139    ///
140    /// Updates the health status of the node.
141    ///
142    /// # Arguments
143    ///
144    /// * `new_health` - The new health status to be set for the node.
145    pub fn update_health(&mut self, new_health: NodeHealth) {
146        let mut info = self.0.lock().unwrap();
147        info.health = new_health;
148    }
149
150    /// Registers a new service with the node.
151    ///
152    /// # Arguments
153    ///
154    /// * `id` - A string slice that holds the unique identifier of the service.
155    /// * `name` - A string slice that holds the name of the service.
156    /// * `description` - A string slice that holds the description of the service.
157    /// * `status` - The status of the service, represented by the `ServiceStatus` enum.
158    pub fn register_service(
159        &mut self,
160        id: &str,
161        name: &str,
162        description: &str,
163        status: ServiceStatus,
164    ) -> Result<(), NodeApiError> {
165        self.0
166            .lock()
167            .map_err(|_| NodeApiError::InternalServerError)?
168            .register_service(id, name, description, status);
169
170        Ok(())
171    }
172
173    /// Updates the status of a registered service.
174    ///
175    /// # Arguments
176    ///
177    /// * `service_id` - A string slice that holds the unique identifier of the service.
178    /// * `status` - The new status to be set for the service, represented by the `ServiceStatus` enum.
179    ///
180    /// # Returns
181    ///
182    /// A `Result` which is `Ok(())` if the service status was updated successfully,
183    /// or an `Err` with a message if the service with the specified id was not found.
184    pub fn update_service_status(
185        &mut self,
186        service_id: &str,
187        status: ServiceStatus,
188    ) -> Result<(), NodeApiError> {
189        self.0
190            .lock()
191            .map_err(|_| NodeApiError::InternalServerError)?
192            .update_service_status(service_id, status)
193    }
194
195    /// Deregisters a service from the node.
196    ///
197    /// # Arguments
198    ///
199    /// * `service_id` - A string slice that holds the unique identifier of the service.
200    ///
201    /// # Returns
202    ///
203    /// A `Result` which is `Ok(())` if the service was deregistered successfully,
204    /// or an `Err` with a message if the service with the specified id was not found.
205    pub fn deregister_service(&mut self, service_id: &str) -> Result<(), NodeApiError> {
206        self.0
207            .lock()
208            .map_err(|_| NodeApiError::InternalServerError)?
209            .deregister_service(service_id)
210    }
211}
212
213async fn node_info(api: web::types::State<Arc<Mutex<NodeInfo>>>) -> impl Responder {
214    let data = match api.lock() {
215        Ok(guard) => guard,
216        Err(err) => {
217            return HttpResponse::InternalServerError()
218                .body(format!("Internal Server Error: {}", err));
219        }
220    };
221    let response = serde_json::json!({
222        "node_name": data.node_name,
223        "node_version": data.node_version,
224        "spec_version": "v0.0.1",
225    });
226    HttpResponse::Ok().json(&response)
227}
228
229async fn health_check(api: web::types::State<Arc<Mutex<NodeInfo>>>) -> impl Responder {
230    let data = match api.lock() {
231        Ok(guard) => guard,
232        Err(err) => {
233            return HttpResponse::InternalServerError()
234                .body(format!("Internal Server Error: {}", err));
235        }
236    };
237    let health = &data.health;
238
239    match health {
240        NodeHealth::Healthy => HttpResponse::Ok().finish(),
241        NodeHealth::PartiallyHealthy => HttpResponse::PartialContent().finish(),
242        NodeHealth::Unhealthy => HttpResponse::ServiceUnavailable().finish(),
243    }
244}
245
246async fn list_services(api: web::types::State<Arc<Mutex<NodeInfo>>>) -> impl Responder {
247    let data = match api.lock() {
248        Ok(guard) => guard,
249        Err(err) => {
250            return HttpResponse::InternalServerError()
251                .body(format!("Internal Server Error: {}", err));
252        }
253    };
254    let services = &data.services;
255    HttpResponse::Ok().json(&serde_json::json!({ "services": *services }))
256}
257
258async fn service_health(
259    api: web::types::State<Arc<Mutex<NodeInfo>>>,
260    path: web::types::Path<String>,
261) -> impl Responder {
262    let service_id = path.into_inner();
263    let data = match api.lock() {
264        Ok(guard) => guard,
265        Err(err) => {
266            return HttpResponse::InternalServerError()
267                .body(format!("Internal Server Error: {}", err));
268        }
269    };
270    let services = &data.services;
271
272    if let Some(service) = services.iter().find(|s| s.id == service_id) {
273        match service.status {
274            ServiceStatus::Up => HttpResponse::Ok().finish(),
275            ServiceStatus::Down => HttpResponse::ServiceUnavailable().finish(),
276            ServiceStatus::Initializing => HttpResponse::PartialContent().finish(),
277        }
278    } else {
279        HttpResponse::NotFound().finish()
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use ntex::{http, web::test};
287    use reqwest::Client;
288    use std::sync::Arc;
289
290    #[tokio::test]
291    async fn test_node_handler() {
292        let mut node_info = NodeInfo::new("test_avs", "v0.0.1");
293        node_info.register_service(
294            "test_service_id",
295            "test_service_name",
296            "test_service_description",
297            ServiceStatus::Initializing,
298        );
299        let state = Arc::new(Mutex::new(node_info));
300
301        let app = App::new()
302            .state(state.clone())
303            .route("/eigen/node", web::get().to(super::node_info));
304        let app = test::init_service(app).await;
305
306        let req = test::TestRequest::get().uri("/eigen/node").to_request();
307        let resp = app.call(req).await.unwrap();
308        assert_eq!(resp.status(), http::StatusCode::OK);
309        let body = test::read_body(resp).await;
310        let body_str = String::from_utf8_lossy(&body);
311
312        // The expected JSON response
313        let expected_body =
314            "{\"node_name\":\"test_avs\",\"node_version\":\"v0.0.1\",\"spec_version\":\"v0.0.1\"}";
315
316        assert_eq!(body_str, expected_body);
317    }
318
319    #[tokio::test]
320    async fn test_list_services_handler() {
321        let tests = vec![
322            (
323                Arc::new(Mutex::new(NodeInfo::new("test_avs", "v0.0.1"))),
324                http::StatusCode::OK,
325                "{\"services\":[]}",
326            ),
327            (
328                {
329                    let mut node_api = NodeInfo::new("test_avs", "v0.0.1");
330                    node_api.register_service(
331                        "testServiceId",
332                        "testServiceName",
333                        "testServiceDescription",
334                        ServiceStatus::Up,
335                    );
336                    Arc::new(Mutex::new(node_api))
337                },
338                http::StatusCode::OK,
339                "{\"services\":[{\"id\":\"testServiceId\",\"name\":\"testServiceName\",\"description\":\"testServiceDescription\",\"status\":\"Up\"}]}",
340            ),
341            (
342                {
343                    let mut node_info = NodeInfo::new("test_avs", "v0.0.1");
344                    node_info.register_service(
345                        "testServiceId",
346                        "testServiceName",
347                        "testServiceDescription",
348                        ServiceStatus::Up,
349                    );
350                    node_info.register_service(
351                        "testServiceId2",
352                        "testServiceName2",
353                        "testServiceDescription2",
354                        ServiceStatus::Down,
355                    );
356                    Arc::new(Mutex::new(node_info))
357                },
358                http::StatusCode::OK,
359                "{\"services\":[{\"id\":\"testServiceId\",\"name\":\"testServiceName\",\"description\":\"testServiceDescription\",\"status\":\"Up\"},{\"id\":\"testServiceId2\",\"name\":\"testServiceName2\",\"description\":\"testServiceDescription2\",\"status\":\"Down\"}]}",
360            ),
361        ];
362
363        for (node_api, expected_status, expected_body) in tests {
364            let app = test::init_service(
365                App::new()
366                    .state(node_api.clone())
367                    .route("/eigen/node/services", web::get().to(list_services)),
368            )
369            .await;
370
371            let req = test::TestRequest::get()
372                .uri("/eigen/node/services")
373                .to_request();
374            let resp = app.call(req).await.unwrap();
375
376            // Assert status code
377            assert_eq!(resp.status(), expected_status);
378
379            let body = test::read_body(resp).await;
380            let body_str = String::from_utf8_lossy(&body);
381
382            let actual_json: serde_json::Value = serde_json::from_str(&body_str).unwrap();
383            let expected_json: serde_json::Value = serde_json::from_str(expected_body).unwrap();
384
385            assert_eq!(actual_json, expected_json);
386        }
387    }
388
389    #[tokio::test]
390    async fn test_service_health_handler() {
391        let mut node_info = NodeInfo::new("test_avs", "v0.0.1");
392
393        // Register a service to the NodeApi
394        node_info.register_service(
395            "testServiceId",
396            "testServiceName",
397            "testServiceDescription",
398            ServiceStatus::Up,
399        );
400
401        let state = Arc::new(Mutex::new(node_info));
402
403        // Initialize the app with the NodeApi
404        let app = test::init_service(App::new().state(state.clone()).route(
405            "/eigen/node/services/{id}/health",
406            web::get().to(service_health),
407        ))
408        .await;
409
410        // Test Case 1: Service exists (should return 200 OK)
411        let req = test::TestRequest::get()
412            .uri("/eigen/node/services/testServiceId/health")
413            .to_request();
414
415        let resp = app.call(req).await.unwrap();
416        assert_eq!(resp.status(), http::StatusCode::OK); // Expect 200 OK for existing service
417
418        // Test Case 2: Service does not exist (should return 404 Not Found)
419        let req = test::TestRequest::get()
420            .uri("/eigen/node/services/nonexistentService/health")
421            .to_request();
422
423        let resp = app.call(req).await.unwrap();
424        assert_eq!(resp.status(), http::StatusCode::NOT_FOUND);
425        let body = test::read_body(resp).await;
426        assert_eq!(body.len(), 0);
427    }
428
429    #[ntex::test]
430    async fn test_create_server() -> std::io::Result<()> {
431        // Create a NodeApi instance and register a service
432        let mut node_info = NodeInfo::new("test_node", "v1.0.0");
433        node_info.register_service(
434            "test_service",
435            "Test Service",
436            "Test service description",
437            ServiceStatus::Up,
438        );
439
440        // Set up a server running on a test address (e.g., 127.0.0.1:8081)
441        let ip_port_addr = "127.0.0.1:8081";
442
443        let mut node_api = NodeApi::new(node_info);
444        let server = node_api.start_server(ip_port_addr).unwrap();
445
446        // Start the server in a background task
447        ntex::rt::spawn(server);
448
449        // Give the server some time to start up
450        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
451
452        // Test the /eigen/node route
453        // Use Reqwest to send HTTP requests to the running server
454        let client = Client::new();
455
456        // Test the /eigen/node route
457        let resp = client
458            .get(format!("http://{}/eigen/node", ip_port_addr))
459            .send()
460            .await
461            .unwrap();
462        assert_eq!(resp.status(), reqwest::StatusCode::OK);
463
464        // Test the /eigen/node/health route
465        let resp = client
466            .get(format!("http://{}/eigen/node/health", ip_port_addr))
467            .send()
468            .await
469            .unwrap();
470        assert_eq!(resp.status(), reqwest::StatusCode::OK);
471
472        // // Test the /eigen/node/services route
473        let resp = client
474            .get(format!("http://{}/eigen/node/services", ip_port_addr))
475            .send()
476            .await
477            .unwrap();
478        assert_eq!(resp.status(), reqwest::StatusCode::OK);
479
480        // Test the /eigen/node/services/{id}/health route
481        let resp = client
482            .get(format!(
483                "http://{}/eigen/node/services/test_service/health",
484                ip_port_addr
485            ))
486            .send()
487            .await
488            .unwrap();
489        assert_eq!(resp.status(), reqwest::StatusCode::OK);
490
491        // ------------------------------
492        // The following example shows how to modify the state of the service.
493        // Modifying the service status of a given service to ServiceStatus::Down will
494        // give a 503 Service Unavailable response.
495
496        node_api
497            .update_service_status("test_service", ServiceStatus::Down)
498            .unwrap();
499
500        // Test health endpoint
501        let resp = client
502            .get(format!(
503                "http://{}/eigen/node/services/test_service/health",
504                ip_port_addr
505            ))
506            .send()
507            .await
508            .unwrap();
509        // Expect 503 SERVICE_UNAVAILABLE
510        assert_eq!(resp.status(), reqwest::StatusCode::SERVICE_UNAVAILABLE);
511
512        Ok(())
513    }
514}