essential_node_api/
lib.rs

1//! The Essential Node HTTP API.
2//!
3//! Find the available endpoints under the [`endpoint`] module.
4//!
5//! To serve the node API, construct a [`router`], a [`TcpListener`] and call [`serve`].
6
7use axum::{routing::get, Router};
8use essential_node::db;
9use essential_node_types::block_notify::BlockRx;
10use std::{io, net::SocketAddr};
11use thiserror::Error;
12use tokio::{
13    net::{TcpListener, TcpStream},
14    task::JoinSet,
15};
16use tower_http::cors::CorsLayer;
17
18pub mod endpoint;
19
20/// State provided to the endpoints when serving connections.
21#[derive(Clone)]
22pub struct State {
23    /// A node DB connection pool.
24    pub conn_pool: db::ConnectionPool,
25    /// Notifies on availability of a new block (e.g. from the relayer).
26    ///
27    /// In the case that this is `None`, subscription streams will close after
28    /// the last available item in the DB.
29    pub new_block: Option<BlockRx>,
30}
31
32/// An error occurred while attempting to serve a new connection.
33#[derive(Debug, Error)]
34pub enum ServeNextConnError {
35    /// Failed to acquire the next connection.
36    #[error("failed to acquire next connection: {0}")]
37    Next(#[from] io::Error),
38    /// Failed to serve the connection.
39    #[error("{0}")]
40    Serve(#[from] ServeConnError),
41}
42
43/// An error occurred while attempting to serve a connection.
44#[derive(Debug, Error)]
45#[error("Serve connection error: {0}")]
46pub struct ServeConnError(#[from] Box<dyn std::error::Error + Send + Sync>);
47
48/// The default value used by `essential-node-cli` for the maximum number of
49/// TCP stream connections to maintain at once.
50pub const DEFAULT_CONNECTION_LIMIT: usize = 2_000;
51
52/// Continuously serve the Node API using the given `router` and TCP `listener`.
53///
54/// The number of simultaneous TCP stream connections will be capped at the given
55/// `conn_limit`.
56///
57/// This constructs a new `JoinSet` to use for limiting connections and then
58/// calls [`serve_next_conn`] in a loop. Any outstanding connections will not be
59/// counted toward the connection limit.
60pub async fn serve(router: &Router, listener: &TcpListener, conn_limit: usize) {
61    let mut conn_set = JoinSet::new();
62    loop {
63        serve_next_conn(router, listener, conn_limit, &mut conn_set).await;
64    }
65}
66
67/// Accept and serve the next connection.
68///
69/// The number of simultaneous TCP stream connections will be capped at the given
70/// `conn_limit`.
71///
72/// If we're at the connection limit, this first awaits for a connection task to
73/// become available.
74///
75/// ```no_run
76/// # #[tokio::main]
77/// # async fn main() {
78/// # use essential_node::{self as node};
79/// # use essential_node_api as node_api;
80/// let conf = node::db::pool::Config::default();
81/// let db = node::db::ConnectionPool::with_tables(&conf).unwrap();
82/// let state = node_api::State {
83///     conn_pool: db,
84///     new_block: None,
85/// };
86/// let router = node_api::router(state);
87/// let listener = tokio::net::TcpListener::bind("127.0.0.1:3553").await.unwrap();
88/// let conn_limit = node_api::DEFAULT_CONNECTION_LIMIT;
89/// let mut conn_set = tokio::task::JoinSet::new();
90/// // Accept and serve connections.
91/// loop {
92///     node_api::serve_next_conn(&router, &listener, conn_limit, &mut conn_set).await;
93/// }
94/// # }
95/// ```
96#[tracing::instrument(skip_all)]
97pub async fn serve_next_conn(
98    router: &Router,
99    listener: &TcpListener,
100    conn_limit: usize,
101    conn_set: &mut JoinSet<()>,
102) {
103    // Await the next connection.
104    let stream = match next_conn(listener, conn_limit, conn_set).await {
105        Ok((stream, _remote_addr)) => {
106            #[cfg(feature = "tracing")]
107            tracing::trace!("Accepted new connection from: {_remote_addr}");
108            stream
109        }
110        Err(_err) => {
111            #[cfg(feature = "tracing")]
112            tracing::trace!("Failed to accept connection {_err}");
113            return;
114        }
115    };
116
117    // Serve the acquired connection.
118    let router = router.clone();
119    conn_set.spawn(async move {
120        if let Err(_err) = serve_conn(&router, stream).await {
121            #[cfg(feature = "tracing")]
122            tracing::trace!("Serve connection error: {_err}");
123        }
124    });
125}
126
127/// Accept and return the next TCP stream connection.
128///
129/// If we're at the connection limit, this first awaits for a connection task to
130/// become available.
131#[tracing::instrument(skip_all, err)]
132pub async fn next_conn(
133    listener: &TcpListener,
134    conn_limit: usize,
135    conn_set: &mut JoinSet<()>,
136) -> io::Result<(TcpStream, SocketAddr)> {
137    // If the `conn_set` size currently exceeds the limit, wait for the next to join.
138    if conn_set.len() >= conn_limit {
139        #[cfg(feature = "tracing")]
140        tracing::info!("Connection limit reached: {conn_limit}");
141        conn_set.join_next().await.expect("set cannot be empty")?;
142    }
143    // Await another connection.
144    tracing::trace!("Awaiting new connection at {}", listener.local_addr()?);
145    listener.accept().await
146}
147
148/// Serve a newly accepted TCP stream.
149#[tracing::instrument(skip_all, err)]
150pub async fn serve_conn(router: &Router, stream: TcpStream) -> Result<(), ServeConnError> {
151    // Hyper has its own `AsyncRead` and `AsyncWrite` traits and doesn't use
152    // tokio. `TokioIo` converts between them.
153    let stream = hyper_util::rt::TokioIo::new(stream);
154
155    // Hyper also has its own `Service` trait and doesn't use tower. We can use
156    // `hyper::service::service_fn` to create a hyper `Service` that calls our
157    // app through `tower::Service::call`.
158    let hyper_service = hyper::service::service_fn(
159        move |request: axum::extract::Request<hyper::body::Incoming>| {
160            tower::Service::call(&mut router.clone(), request)
161        },
162    );
163
164    // `TokioExecutor` tells hyper to use `tokio::spawn` to spawn tasks.
165    let executor = hyper_util::rt::TokioExecutor::new();
166    let conn = hyper_util::server::conn::auto::Builder::new(executor).http2_only();
167    conn.serve_connection(stream, hyper_service)
168        .await
169        .map_err(ServeConnError)
170}
171
172/// Construct the endpoint router with the node [`endpoint`]s, CORS layer and DB
173/// connection pool as state.
174pub fn router(state: State) -> Router {
175    with_endpoints(Router::new())
176        .layer(cors_layer())
177        .with_state(state)
178}
179
180/// Add the node API [`endpoint`]s to the given `router`.
181pub fn with_endpoints(router: Router<State>) -> Router<State> {
182    use endpoint::*;
183    router
184        .route(health_check::PATH, get(health_check::handler))
185        .route(list_blocks::PATH, get(list_blocks::handler))
186        .route(query_state::PATH, get(query_state::handler))
187        .route(subscribe_blocks::PATH, get(subscribe_blocks::handler))
188}
189
190/// The default CORS layer.
191pub fn cors_layer() -> CorsLayer {
192    CorsLayer::new()
193        .allow_origin(tower_http::cors::Any)
194        .allow_methods([http::Method::GET, http::Method::OPTIONS])
195        .allow_headers([http::header::CONTENT_TYPE])
196}