#![forbid(unsafe_code)]
#[macro_use]
extern crate tracing;
mod helpers;
pub use helpers::*;
mod routes;
mod version;
use snarkos_node_cdn::CdnBlockSync;
use snarkos_node_consensus::Consensus;
use snarkos_node_router::{
Routing,
messages::{Message, UnconfirmedTransaction},
};
use snarkos_node_sync::BlockSync;
use snarkvm::{
console::{program::ProgramID, types::Field},
ledger::narwhal::Data,
prelude::{Ledger, Network, VM, cfg_into_iter, store::ConsensusStorage},
};
use anyhow::{Context, Result};
use axum::{
body::Body,
extract::{ConnectInfo, DefaultBodyLimit, Query, State},
http::{Method, Request, StatusCode, header::CONTENT_TYPE},
middleware,
response::Response,
routing::{get, post},
};
use axum_extra::response::ErasedJson;
#[cfg(feature = "locktick")]
use locktick::parking_lot::Mutex;
#[cfg(not(feature = "locktick"))]
use parking_lot::Mutex;
use std::{net::SocketAddr, sync::Arc};
use tokio::{net::TcpListener, sync::Semaphore, task::JoinHandle};
use tower_governor::{GovernorLayer, governor::GovernorConfigBuilder};
use tower_http::{
cors::{Any, CorsLayer},
trace::TraceLayer,
};
pub const DEFAULT_REST_PORT: u16 = 3030;
pub const API_VERSION_V1: &str = "v1";
pub const API_VERSION_V2: &str = "v2";
#[derive(Clone)]
pub struct Rest<N: Network, C: ConsensusStorage<N>, R: Routing<N>> {
cdn_sync: Option<Arc<CdnBlockSync>>,
consensus: Option<Consensus<N>>,
ledger: Ledger<N, C>,
routing: Arc<R>,
handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
block_sync: Arc<BlockSync<N>>,
num_verifying_deploys: Arc<Semaphore>,
num_verifying_executions: Arc<Semaphore>,
num_verifying_solutions: Arc<Semaphore>,
}
impl<N: Network, C: 'static + ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
pub async fn start(
rest_ip: SocketAddr,
rest_rps: u32,
consensus: Option<Consensus<N>>,
ledger: Ledger<N, C>,
routing: Arc<R>,
cdn_sync: Option<Arc<CdnBlockSync>>,
block_sync: Arc<BlockSync<N>>,
) -> Result<Self> {
let mut server = Self {
consensus,
ledger,
routing,
cdn_sync,
block_sync,
handles: Default::default(),
num_verifying_deploys: Arc::new(Semaphore::new(VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS)),
num_verifying_executions: Arc::new(Semaphore::new(VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS)),
num_verifying_solutions: Arc::new(Semaphore::new(N::MAX_SOLUTIONS)),
};
server.spawn_server(rest_ip, rest_rps).await?;
Ok(server)
}
}
impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
pub const fn ledger(&self) -> &Ledger<N, C> {
&self.ledger
}
pub const fn handles(&self) -> &Arc<Mutex<Vec<JoinHandle<()>>>> {
&self.handles
}
}
impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
fn build_routes(&self, rest_rps: u32) -> axum::Router {
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods([Method::GET, Method::POST, Method::OPTIONS])
.allow_headers([CONTENT_TYPE]);
let governor_config = Box::new(
GovernorConfigBuilder::default()
.per_nanosecond((1_000_000_000 / rest_rps) as u64)
.burst_size(rest_rps)
.error_handler(|error| {
let error_message = error.to_string();
let mut response = Response::new(error_message.clone().into());
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
if error_message.contains("Too Many Requests") {
*response.status_mut() = StatusCode::TOO_MANY_REQUESTS;
}
response
})
.finish()
.expect("Couldn't set up rate limiting for the REST server!"),
);
let routes = axum::Router::new()
.route("/node/address", get(Self::get_node_address))
.route("/program/{id}/mapping/{name}", get(Self::get_mapping_values))
.route("/db_backup", post(Self::db_backup))
.route_layer(middleware::from_fn(auth_middleware))
.route("/consensus_version", get(Self::get_consensus_version))
.route("/block/height/latest", get(Self::get_block_height_latest))
.route("/block/hash/latest", get(Self::get_block_hash_latest))
.route("/block/latest", get(Self::get_block_latest))
.route("/block/{height_or_hash}", get(Self::get_block))
.route("/block/{height_or_hash}/header", get(Self::get_block_header))
.route("/block/{height_or_hash}/transactions", get(Self::get_block_transactions))
.route("/transaction/{id}", get(Self::get_transaction))
.route("/transaction/confirmed/{id}", get(Self::get_confirmed_transaction))
.route("/transaction/unconfirmed/{id}", get(Self::get_unconfirmed_transaction))
.route("/transaction/broadcast", post(Self::transaction_broadcast))
.route("/solution/broadcast", post(Self::solution_broadcast))
.route("/find/blockHash/{tx_id}", get(Self::find_block_hash))
.route("/find/blockHeight/{state_root}", get(Self::find_block_height_from_state_root))
.route("/find/transactionID/deployment/{program_id}", get(Self::find_latest_transaction_id_from_program_id))
.route("/find/transactionID/deployment/{program_id}/{edition}", get(Self::find_latest_transaction_id_from_program_id_and_edition))
.route("/find/transactionID/deployment/{program_id}/{edition}/original", get(Self::find_original_deployment_transaction_id))
.route("/find/transactionID/deployment/{program_id}/{edition}/{amendment}", get(Self::find_transaction_id_from_program_id_edition_and_amendment))
.route("/find/transactionID/{transition_id}", get(Self::find_transaction_id_from_transition_id))
.route("/find/transitionID/{input_or_output_id}", get(Self::find_transition_id))
.route("/peers/count", get(Self::get_peers_count))
.route("/peers/all", get(Self::get_peers_all))
.route("/peers/all/metrics", get(Self::get_peers_all_metrics))
.route("/connections/p2p/count", get(Self::get_peers_count))
.route("/connections/p2p/all", get(Self::get_peers_all))
.route("/connections/p2p/all/metrics", get(Self::get_peers_all_metrics))
.route("/program/{id}", get(Self::get_program))
.route("/program/{id}/latest_edition", get(Self::get_latest_program_edition))
.route("/program/{id}/{edition}", get(Self::get_program_for_edition))
.route("/program/{id}/mappings", get(Self::get_mapping_names))
.route("/program/{id}/mapping/{name}/{key}", get(Self::get_mapping_value))
.route("/program/{id}/amendment_count", get(Self::get_program_amendment_count))
.route("/program/{id}/{edition}/amendment_count", get(Self::get_program_amendment_count_for_edition))
.route("/sync_status", get(Self::get_sync_status))
.route("/sync/status", get(Self::get_sync_status))
.route("/sync/peers", get(Self::get_sync_peers))
.route("/sync/requests", get(Self::get_sync_requests_summary))
.route("/sync/requests/list", get(Self::get_sync_requests_list))
.route("/version", get(Self::get_version))
.route("/blocks", get(Self::get_blocks))
.route("/height/{hash}", get(Self::get_height))
.route("/memoryPool/transmissions", get(Self::get_memory_pool_transmissions))
.route("/memoryPool/solutions", get(Self::get_memory_pool_solutions))
.route("/memoryPool/transactions", get(Self::get_memory_pool_transactions))
.route("/statePath/{commitment}", get(Self::get_state_path_for_commitment))
.route("/statePaths", get(Self::get_state_paths_for_commitments))
.route("/stateRoot/latest", get(Self::get_state_root_latest))
.route("/stateRoot/{height}", get(Self::get_state_root))
.route("/committee/latest", get(Self::get_committee_latest))
.route("/committee/{height}", get(Self::get_committee))
.route("/delegators/{validator}", get(Self::get_delegators_for_validator));
let routes = match self.consensus {
Some(_) => routes
.route("/connections/bft/count", get(Self::get_bft_connections_count))
.route("/connections/bft/all", get(Self::get_bft_connections_all)),
None => routes,
};
#[cfg(feature = "telemetry")]
let routes = match self.consensus {
Some(_) => routes.route("/validators/participation", get(Self::get_validator_participation_scores)),
None => routes,
};
#[cfg(feature = "history")]
let routes = routes.route("/program/{id}/mapping/{name}/{key}/history/{height}", get(Self::get_history));
#[cfg(feature = "history-staking-rewards")]
let routes = routes.route("/staking/rewards/{address}/{height}", get(Self::get_staking_reward));
routes
.with_state(self.clone())
.layer(TraceLayer::new_for_http())
.layer(middleware::map_request(log_middleware))
.layer(cors)
.layer(DefaultBodyLimit::max(512 * 1024))
.layer(GovernorLayer {
config: governor_config.into(),
})
}
async fn spawn_server(&mut self, rest_ip: SocketAddr, rest_rps: u32) -> Result<()> {
debug!("REST rate limit per IP - {rest_rps} RPS");
let default_router = axum::Router::new().nest(
&format!("/{}", N::SHORT_NAME),
self.build_routes(rest_rps).layer(middleware::map_response(v1_error_middleware)),
);
let v1_router = axum::Router::new().nest(
&format!("/{API_VERSION_V1}/{}", N::SHORT_NAME),
self.build_routes(rest_rps).layer(middleware::map_response(v1_error_middleware)),
);
let v2_router =
axum::Router::new().nest(&format!("/{API_VERSION_V2}/{}", N::SHORT_NAME), self.build_routes(rest_rps));
let router = default_router.merge(v1_router).merge(v2_router);
let rest_listener =
TcpListener::bind(rest_ip).await.with_context(|| "Failed to bind TCP port for REST endpoints")?;
let handle = tokio::spawn(async move {
axum::serve(rest_listener, router.into_make_service_with_connect_info::<SocketAddr>())
.await
.expect("couldn't start rest server");
});
self.handles.lock().push(handle);
Ok(())
}
}
async fn log_middleware(ConnectInfo(addr): ConnectInfo<SocketAddr>, request: Request<Body>) -> Request<Body> {
info!("Received '{} {}' from '{addr}'", request.method(), request.uri());
request
}
async fn v1_error_middleware(response: Response) -> Response {
const V1_STATUS_CODE: StatusCode = StatusCode::INTERNAL_SERVER_ERROR;
if response.status().is_success() {
return response;
}
let fallback = || {
let mut response = Response::new(Body::from("Failed to convert error"));
*response.status_mut() = V1_STATUS_CODE;
response
};
let Ok(bytes) = axum::body::to_bytes(response.into_body(), usize::MAX).await else {
return fallback();
};
let Ok(json_err) = serde_json::from_slice::<SerializedRestError>(&bytes) else {
return fallback();
};
let mut message = json_err.message;
for next in json_err.chain.into_iter() {
message = format!("{message} — {next}");
}
let mut response = Response::new(Body::from(message));
*response.status_mut() = V1_STATUS_CODE;
response
}
pub fn fmt_id(id: impl ToString) -> String {
let id = id.to_string();
let mut formatted_id = id.chars().take(16).collect::<String>();
if id.chars().count() > 16 {
formatted_id.push_str("..");
}
formatted_id
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::anyhow;
use axum::{
Router,
body::Body,
http::{Request, StatusCode},
middleware,
routing::get,
};
use tower::ServiceExt;
fn test_app() -> Router {
let build_routes = || {
Router::new()
.route("/not_found", get(|| async { Err::<(), RestError>(RestError::not_found(anyhow!("missing"))) }))
.route("/bad_request", get(|| async { Err::<(), RestError>(RestError::bad_request(anyhow!("bad"))) }))
.route(
"/service_unavailable",
get(|| async { Err::<(), RestError>(RestError::service_unavailable(anyhow!("gone"))) }),
)
};
let router_v1 = build_routes().route_layer(middleware::map_response(v1_error_middleware));
let router_v2 = Router::new().nest(&format!("/{API_VERSION_V2}"), build_routes());
router_v1.merge(router_v2)
}
#[tokio::test]
async fn v1_routes_force_internal_server_error() {
let app = test_app();
let res = app.clone().oneshot(Request::builder().uri("/not_found").body(Body::empty()).unwrap()).await.unwrap();
assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
let res =
app.clone().oneshot(Request::builder().uri("/bad_request").body(Body::empty()).unwrap()).await.unwrap();
assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
let res =
app.oneshot(Request::builder().uri("/service_unavailable").body(Body::empty()).unwrap()).await.unwrap();
assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
}
#[tokio::test]
async fn v2_routes_return_specific_errors() {
let app = test_app();
let res =
app.clone().oneshot(Request::builder().uri("/v2/not_found").body(Body::empty()).unwrap()).await.unwrap();
assert_eq!(res.status(), StatusCode::NOT_FOUND);
let res =
app.clone().oneshot(Request::builder().uri("/v2/bad_request").body(Body::empty()).unwrap()).await.unwrap();
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
let res =
app.oneshot(Request::builder().uri("/v2/service_unavailable").body(Body::empty()).unwrap()).await.unwrap();
assert_eq!(res.status(), StatusCode::SERVICE_UNAVAILABLE);
}
}