essential_node_api/lib.rs
1//! The Essential Node HTTP API.
2//!
3//! Find the available endpoints under the [`endpoint`] module.
4//!
5//! To serve the node API, construct a [`router`], a [`TcpListener`] and call [`serve`].
6
7use axum::{routing::get, Router};
8use essential_node::db;
9use essential_node_types::block_notify::BlockRx;
10use std::{io, net::SocketAddr};
11use thiserror::Error;
12use tokio::{
13 net::{TcpListener, TcpStream},
14 task::JoinSet,
15};
16use tower_http::cors::CorsLayer;
17
18pub mod endpoint;
19
20/// State provided to the endpoints when serving connections.
21#[derive(Clone)]
22pub struct State {
23 /// A node DB connection pool.
24 pub conn_pool: db::ConnectionPool,
25 /// Notifies on availability of a new block (e.g. from the relayer).
26 ///
27 /// In the case that this is `None`, subscription streams will close after
28 /// the last available item in the DB.
29 pub new_block: Option<BlockRx>,
30}
31
32/// An error occurred while attempting to serve a new connection.
33#[derive(Debug, Error)]
34pub enum ServeNextConnError {
35 /// Failed to acquire the next connection.
36 #[error("failed to acquire next connection: {0}")]
37 Next(#[from] io::Error),
38 /// Failed to serve the connection.
39 #[error("{0}")]
40 Serve(#[from] ServeConnError),
41}
42
43/// An error occurred while attempting to serve a connection.
44#[derive(Debug, Error)]
45#[error("Serve connection error: {0}")]
46pub struct ServeConnError(#[from] Box<dyn std::error::Error + Send + Sync>);
47
48/// The default value used by `essential-node-cli` for the maximum number of
49/// TCP stream connections to maintain at once.
50pub const DEFAULT_CONNECTION_LIMIT: usize = 2_000;
51
52/// Continuously serve the Node API using the given `router` and TCP `listener`.
53///
54/// The number of simultaneous TCP stream connections will be capped at the given
55/// `conn_limit`.
56///
57/// This constructs a new `JoinSet` to use for limiting connections and then
58/// calls [`serve_next_conn`] in a loop. Any outstanding connections will not be
59/// counted toward the connection limit.
60pub async fn serve(router: &Router, listener: &TcpListener, conn_limit: usize) {
61 let mut conn_set = JoinSet::new();
62 loop {
63 serve_next_conn(router, listener, conn_limit, &mut conn_set).await;
64 }
65}
66
67/// Accept and serve the next connection.
68///
69/// The number of simultaneous TCP stream connections will be capped at the given
70/// `conn_limit`.
71///
72/// If we're at the connection limit, this first awaits for a connection task to
73/// become available.
74///
75/// ```no_run
76/// # #[tokio::main]
77/// # async fn main() {
78/// # use essential_node::{self as node};
79/// # use essential_node_api as node_api;
80/// let conf = node::db::pool::Config::default();
81/// let db = node::db::ConnectionPool::with_tables(&conf).unwrap();
82/// let state = node_api::State {
83/// conn_pool: db,
84/// new_block: None,
85/// };
86/// let router = node_api::router(state);
87/// let listener = tokio::net::TcpListener::bind("127.0.0.1:3553").await.unwrap();
88/// let conn_limit = node_api::DEFAULT_CONNECTION_LIMIT;
89/// let mut conn_set = tokio::task::JoinSet::new();
90/// // Accept and serve connections.
91/// loop {
92/// node_api::serve_next_conn(&router, &listener, conn_limit, &mut conn_set).await;
93/// }
94/// # }
95/// ```
96#[tracing::instrument(skip_all)]
97pub async fn serve_next_conn(
98 router: &Router,
99 listener: &TcpListener,
100 conn_limit: usize,
101 conn_set: &mut JoinSet<()>,
102) {
103 // Await the next connection.
104 let stream = match next_conn(listener, conn_limit, conn_set).await {
105 Ok((stream, _remote_addr)) => {
106 #[cfg(feature = "tracing")]
107 tracing::trace!("Accepted new connection from: {_remote_addr}");
108 stream
109 }
110 Err(_err) => {
111 #[cfg(feature = "tracing")]
112 tracing::trace!("Failed to accept connection {_err}");
113 return;
114 }
115 };
116
117 // Serve the acquired connection.
118 let router = router.clone();
119 conn_set.spawn(async move {
120 if let Err(_err) = serve_conn(&router, stream).await {
121 #[cfg(feature = "tracing")]
122 tracing::trace!("Serve connection error: {_err}");
123 }
124 });
125}
126
127/// Accept and return the next TCP stream connection.
128///
129/// If we're at the connection limit, this first awaits for a connection task to
130/// become available.
131#[tracing::instrument(skip_all, err)]
132pub async fn next_conn(
133 listener: &TcpListener,
134 conn_limit: usize,
135 conn_set: &mut JoinSet<()>,
136) -> io::Result<(TcpStream, SocketAddr)> {
137 // If the `conn_set` size currently exceeds the limit, wait for the next to join.
138 if conn_set.len() >= conn_limit {
139 #[cfg(feature = "tracing")]
140 tracing::info!("Connection limit reached: {conn_limit}");
141 conn_set.join_next().await.expect("set cannot be empty")?;
142 }
143 // Await another connection.
144 tracing::trace!("Awaiting new connection at {}", listener.local_addr()?);
145 listener.accept().await
146}
147
148/// Serve a newly accepted TCP stream.
149#[tracing::instrument(skip_all, err)]
150pub async fn serve_conn(router: &Router, stream: TcpStream) -> Result<(), ServeConnError> {
151 // Hyper has its own `AsyncRead` and `AsyncWrite` traits and doesn't use
152 // tokio. `TokioIo` converts between them.
153 let stream = hyper_util::rt::TokioIo::new(stream);
154
155 // Hyper also has its own `Service` trait and doesn't use tower. We can use
156 // `hyper::service::service_fn` to create a hyper `Service` that calls our
157 // app through `tower::Service::call`.
158 let hyper_service = hyper::service::service_fn(
159 move |request: axum::extract::Request<hyper::body::Incoming>| {
160 tower::Service::call(&mut router.clone(), request)
161 },
162 );
163
164 // `TokioExecutor` tells hyper to use `tokio::spawn` to spawn tasks.
165 let executor = hyper_util::rt::TokioExecutor::new();
166 let conn = hyper_util::server::conn::auto::Builder::new(executor).http2_only();
167 conn.serve_connection(stream, hyper_service)
168 .await
169 .map_err(ServeConnError)
170}
171
172/// Construct the endpoint router with the node [`endpoint`]s, CORS layer and DB
173/// connection pool as state.
174pub fn router(state: State) -> Router {
175 with_endpoints(Router::new())
176 .layer(cors_layer())
177 .with_state(state)
178}
179
180/// Add the node API [`endpoint`]s to the given `router`.
181pub fn with_endpoints(router: Router<State>) -> Router<State> {
182 use endpoint::*;
183 router
184 .route(health_check::PATH, get(health_check::handler))
185 .route(list_blocks::PATH, get(list_blocks::handler))
186 .route(query_state::PATH, get(query_state::handler))
187 .route(subscribe_blocks::PATH, get(subscribe_blocks::handler))
188}
189
190/// The default CORS layer.
191pub fn cors_layer() -> CorsLayer {
192 CorsLayer::new()
193 .allow_origin(tower_http::cors::Any)
194 .allow_methods([http::Method::GET, http::Method::OPTIONS])
195 .allow_headers([http::header::CONTENT_TYPE])
196}