essential_node_api/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
//! The Essential Node HTTP API.
//!
//! Find the available endpoints under the [`endpoint`] module.
//!
//! To serve the node API, construct a [`router`], a [`TcpListener`] and call [`serve`].
use axum::{routing::get, Router};
use essential_node::{db, BlockRx};
use std::{io, net::SocketAddr};
use thiserror::Error;
use tokio::{
net::{TcpListener, TcpStream},
task::JoinSet,
};
use tower_http::cors::CorsLayer;
pub mod endpoint;
/// State provided to the endpoints when serving connections.
#[derive(Clone)]
pub struct State {
/// A node DB connection pool.
pub conn_pool: db::ConnectionPool,
/// Notifies on availability of a new block (e.g. from the relayer).
///
/// In the case that this is `None`, subscription streams will close after
/// the last available item in the DB.
pub new_block: Option<BlockRx>,
}
/// An error occurred while attempting to serve a new connection.
#[derive(Debug, Error)]
pub enum ServeNextConnError {
/// Failed to acquire the next connection.
#[error("failed to acquire next connection: {0}")]
Next(#[from] io::Error),
/// Failed to serve the connection.
#[error("{0}")]
Serve(#[from] ServeConnError),
}
/// An error occurred while attempting to serve a connection.
#[derive(Debug, Error)]
#[error("Serve connection error: {0}")]
pub struct ServeConnError(#[from] Box<dyn std::error::Error + Send + Sync>);
/// The default value used by `essential-node-cli` for the maximum number of
/// TCP stream connections to maintain at once.
pub const DEFAULT_CONNECTION_LIMIT: usize = 2_000;
/// Continuously serve the Node API using the given `router` and TCP `listener`.
///
/// The number of simultaneous TCP stream connections will be capped at the given
/// `conn_limit`.
///
/// This constructs a new `JoinSet` to use for limiting connections and then
/// calls [`serve_next_conn`] in a loop. Any outstanding connections will not be
/// counted toward the connection limit.
pub async fn serve(router: &Router, listener: &TcpListener, conn_limit: usize) {
let mut conn_set = JoinSet::new();
loop {
serve_next_conn(router, listener, conn_limit, &mut conn_set).await;
}
}
/// Accept and serve the next connection.
///
/// The number of simultaneous TCP stream connections will be capped at the given
/// `conn_limit`.
///
/// If we're at the connection limit, this first awaits for a connection task to
/// become available.
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() {
/// # use essential_node::{self as node};
/// # use essential_node_api as node_api;
/// let conf = node::db::Config::default();
/// let db = node::db(&conf).unwrap();
/// let state = node_api::State {
/// conn_pool: db,
/// new_block: None,
/// };
/// let router = node_api::router(state);
/// let listener = tokio::net::TcpListener::bind("127.0.0.1:3553").await.unwrap();
/// let conn_limit = node_api::DEFAULT_CONNECTION_LIMIT;
/// let mut conn_set = tokio::task::JoinSet::new();
/// // Accept and serve connections.
/// loop {
/// node_api::serve_next_conn(&router, &listener, conn_limit, &mut conn_set).await;
/// }
/// # }
/// ```
#[tracing::instrument(skip_all)]
pub async fn serve_next_conn(
router: &Router,
listener: &TcpListener,
conn_limit: usize,
conn_set: &mut JoinSet<()>,
) {
// Await the next connection.
let stream = match next_conn(listener, conn_limit, conn_set).await {
Ok((stream, _remote_addr)) => {
#[cfg(feature = "tracing")]
tracing::trace!("Accepted new connection from: {_remote_addr}");
stream
}
Err(_err) => {
#[cfg(feature = "tracing")]
tracing::trace!("Failed to accept connection {_err}");
return;
}
};
// Serve the acquired connection.
let router = router.clone();
conn_set.spawn(async move {
if let Err(_err) = serve_conn(&router, stream).await {
#[cfg(feature = "tracing")]
tracing::trace!("Serve connection error: {_err}");
}
});
}
/// Accept and return the next TCP stream connection.
///
/// If we're at the connection limit, this first awaits for a connection task to
/// become available.
#[tracing::instrument(skip_all, err)]
pub async fn next_conn(
listener: &TcpListener,
conn_limit: usize,
conn_set: &mut JoinSet<()>,
) -> io::Result<(TcpStream, SocketAddr)> {
// If the `conn_set` size currently exceeds the limit, wait for the next to join.
if conn_set.len() >= conn_limit {
#[cfg(feature = "tracing")]
tracing::info!("Connection limit reached: {conn_limit}");
conn_set.join_next().await.expect("set cannot be empty")?;
}
// Await another connection.
tracing::trace!("Awaiting new connection at {}", listener.local_addr()?);
listener.accept().await
}
/// Serve a newly accepted TCP stream.
#[tracing::instrument(skip_all, err)]
pub async fn serve_conn(router: &Router, stream: TcpStream) -> Result<(), ServeConnError> {
// Hyper has its own `AsyncRead` and `AsyncWrite` traits and doesn't use
// tokio. `TokioIo` converts between them.
let stream = hyper_util::rt::TokioIo::new(stream);
// Hyper also has its own `Service` trait and doesn't use tower. We can use
// `hyper::service::service_fn` to create a hyper `Service` that calls our
// app through `tower::Service::call`.
let hyper_service = hyper::service::service_fn(
move |request: axum::extract::Request<hyper::body::Incoming>| {
tower::Service::call(&mut router.clone(), request)
},
);
// `TokioExecutor` tells hyper to use `tokio::spawn` to spawn tasks.
let executor = hyper_util::rt::TokioExecutor::new();
let conn = hyper_util::server::conn::auto::Builder::new(executor).http2_only();
conn.serve_connection(stream, hyper_service)
.await
.map_err(ServeConnError)
}
/// Construct the endpoint router with the node [`endpoint`]s, CORS layer and DB
/// connection pool as state.
pub fn router(state: State) -> Router {
with_endpoints(Router::new())
.layer(cors_layer())
.with_state(state)
}
/// Add the node API [`endpoint`]s to the given `router`.
pub fn with_endpoints(router: Router<State>) -> Router<State> {
use endpoint::*;
router
.route(health_check::PATH, get(health_check::handler))
.route(list_blocks::PATH, get(list_blocks::handler))
.route(query_state::PATH, get(query_state::handler))
.route(subscribe_blocks::PATH, get(subscribe_blocks::handler))
}
/// The default CORS layer.
pub fn cors_layer() -> CorsLayer {
CorsLayer::new()
.allow_origin(tower_http::cors::Any)
.allow_methods([http::Method::GET, http::Method::OPTIONS])
.allow_headers([http::header::CONTENT_TYPE])
}