essential_builder_api/
lib.rs

1//! The API for the Essential block builder.
2//!
3//! Find the available endpoints under the [`endpoint`] module.
4//!
5//! To serve the builder API, construct a [`router`], a [`TcpListener`] and call [`serve`].
6
7use axum::{
8    routing::{get, post},
9    Router,
10};
11use essential_builder_db as db;
12use std::{io, net::SocketAddr};
13use thiserror::Error;
14use tokio::{
15    net::{TcpListener, TcpStream},
16    task::JoinSet,
17};
18use tower_http::cors::CorsLayer;
19
20pub mod endpoint;
21
22/// State provided to the endpoints when serving connections.
23#[derive(Clone)]
24pub struct State {
25    /// A builder DB connection pool.
26    pub conn_pool: db::ConnectionPool,
27}
28
29/// An error occurred while attempting to serve a new connection.
30#[derive(Debug, Error)]
31pub enum ServeNextConnError {
32    /// Failed to acquire the next connection.
33    #[error("failed to acquire next connection: {0}")]
34    Next(#[from] io::Error),
35    /// Failed to serve the connection.
36    #[error("{0}")]
37    Serve(#[from] ServeConnError),
38}
39
40/// An error occurred while attempting to serve a connection.
41#[derive(Debug, Error)]
42#[error("Serve connection error: {0}")]
43pub struct ServeConnError(#[from] Box<dyn std::error::Error + Send + Sync>);
44
45/// The default value used by `essential-builder-cli` for the maximum number of
46/// TCP stream connections to maintain at once.
47pub const DEFAULT_CONNECTION_LIMIT: usize = 2_000;
48
49/// Continuously serve the Builder API using the given `router` and TCP `listener`.
50///
51/// The number of simultaneous TCP stream connections will be capped at the given
52/// `conn_limit`.
53///
54/// This constructs a new `JoinSet` to use for limiting connections and then
55/// calls [`serve_next_conn`] in a loop. Any outstanding connections will not be
56/// counted toward the connection limit.
57pub async fn serve(router: &Router, listener: &TcpListener, conn_limit: usize) {
58    let mut conn_set = JoinSet::new();
59    loop {
60        serve_next_conn(router, listener, conn_limit, &mut conn_set).await;
61    }
62}
63
64/// Accept and serve the next connection.
65///
66/// The number of simultaneous TCP stream connections will be capped at the given
67/// `conn_limit`.
68///
69/// If we're at the connection limit, this first awaits for a connection task to
70/// become available.
71///
72/// ```no_run
73/// # #[tokio::main]
74/// # async fn main() {
75/// # use essential_builder_api as builder_api;
76/// # use essential_builder_db as builder_db;
77/// let conf = builder_db::pool::Config::default();
78/// let conn_pool = builder_db::ConnectionPool::with_tables(&conf).unwrap();
79/// let state = builder_api::State { conn_pool };
80/// let router = builder_api::router(state);
81/// let listener = tokio::net::TcpListener::bind("127.0.0.1:3553").await.unwrap();
82/// let conn_limit = builder_api::DEFAULT_CONNECTION_LIMIT;
83/// let mut conn_set = tokio::task::JoinSet::new();
84/// // Accept and serve connections.
85/// loop {
86///     builder_api::serve_next_conn(&router, &listener, conn_limit, &mut conn_set).await;
87/// }
88/// # }
89/// ```
90#[tracing::instrument(skip_all)]
91pub async fn serve_next_conn(
92    router: &Router,
93    listener: &TcpListener,
94    conn_limit: usize,
95    conn_set: &mut JoinSet<()>,
96) {
97    // Await the next connection.
98    let stream = match next_conn(listener, conn_limit, conn_set).await {
99        Ok((stream, _remote_addr)) => {
100            #[cfg(feature = "tracing")]
101            tracing::trace!("Accepted new connection from: {_remote_addr}");
102            stream
103        }
104        Err(_err) => {
105            #[cfg(feature = "tracing")]
106            tracing::trace!("Failed to accept connection {_err}");
107            return;
108        }
109    };
110
111    // Serve the acquired connection.
112    let router = router.clone();
113    conn_set.spawn(async move {
114        if let Err(_err) = serve_conn(&router, stream).await {
115            #[cfg(feature = "tracing")]
116            tracing::trace!("Serve connection error: {_err}");
117        }
118    });
119}
120
121/// Accept and return the next TCP stream connection.
122///
123/// If we're at the connection limit, this first awaits for a connection task to
124/// become available.
125#[tracing::instrument(skip_all, err)]
126pub async fn next_conn(
127    listener: &TcpListener,
128    conn_limit: usize,
129    conn_set: &mut JoinSet<()>,
130) -> io::Result<(TcpStream, SocketAddr)> {
131    // If the `conn_set` size currently exceeds the limit, wait for the next to join.
132    if conn_set.len() >= conn_limit {
133        #[cfg(feature = "tracing")]
134        tracing::info!("Connection limit reached: {conn_limit}");
135        conn_set.join_next().await.expect("set cannot be empty")?;
136    }
137    // Await another connection.
138    tracing::trace!("Awaiting new connection at {}", listener.local_addr()?);
139    listener.accept().await
140}
141
142/// Serve a newly accepted TCP stream.
143#[tracing::instrument(skip_all, err)]
144pub async fn serve_conn(router: &Router, stream: TcpStream) -> Result<(), ServeConnError> {
145    // Hyper has its own `AsyncRead` and `AsyncWrite` traits and doesn't use
146    // tokio. `TokioIo` converts between them.
147    let stream = hyper_util::rt::TokioIo::new(stream);
148
149    // Hyper also has its own `Service` trait and doesn't use tower. We can use
150    // `hyper::service::service_fn` to create a hyper `Service` that calls our
151    // app through `tower::Service::call`.
152    let hyper_service = hyper::service::service_fn(
153        move |request: axum::extract::Request<hyper::body::Incoming>| {
154            tower::Service::call(&mut router.clone(), request)
155        },
156    );
157
158    // `TokioExecutor` tells hyper to use `tokio::spawn` to spawn tasks.
159    let executor = hyper_util::rt::TokioExecutor::new();
160    let conn = hyper_util::server::conn::auto::Builder::new(executor).http2_only();
161    conn.serve_connection(stream, hyper_service)
162        .await
163        .map_err(ServeConnError)
164}
165
166/// Construct the endpoint router with the builder [`endpoint`]s, CORS layer and DB
167/// connection pool as state.
168pub fn router(state: State) -> Router {
169    with_endpoints(Router::new())
170        .layer(cors_layer())
171        .with_state(state)
172}
173
174/// Add the builder API [`endpoint`]s to the given `router`.
175pub fn with_endpoints(router: Router<State>) -> Router<State> {
176    use endpoint::*;
177    router
178        .route(health_check::PATH, get(health_check::handler))
179        .route(
180            latest_solution_set_failures::PATH,
181            get(latest_solution_set_failures::handler),
182        )
183        .route(
184            list_solution_set_failures::PATH,
185            get(list_solution_set_failures::handler),
186        )
187        .route(
188            submit_solution_set::PATH,
189            post(submit_solution_set::handler),
190        )
191}
192
193/// The default CORS layer.
194pub fn cors_layer() -> CorsLayer {
195    CorsLayer::new()
196        .allow_origin(tower_http::cors::Any)
197        .allow_methods([http::Method::GET, http::Method::OPTIONS, http::Method::POST])
198        .allow_headers([http::header::CONTENT_TYPE])
199}