1#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21mod helpers;
22pub use helpers::*;
24
25mod routes;
26
27mod version;
28
29use snarkos_node_cdn::CdnBlockSync;
30use snarkos_node_consensus::Consensus;
31use snarkos_node_router::{
32 Routing,
33 messages::{Message, UnconfirmedTransaction},
34};
35use snarkos_node_sync::BlockSync;
36use snarkvm::{
37 console::{program::ProgramID, types::Field},
38 ledger::narwhal::Data,
39 prelude::{Ledger, Network, VM, cfg_into_iter, store::ConsensusStorage},
40};
41
42use anyhow::{Context, Result};
43use axum::{
44 body::Body,
45 extract::{ConnectInfo, DefaultBodyLimit, Query, State},
46 http::{Method, Request, StatusCode, header::CONTENT_TYPE},
47 middleware,
48 response::Response,
49 routing::{get, post},
50};
51use axum_extra::response::ErasedJson;
52#[cfg(feature = "locktick")]
53use locktick::parking_lot::Mutex;
54#[cfg(not(feature = "locktick"))]
55use parking_lot::Mutex;
56use std::{net::SocketAddr, sync::Arc};
57use tokio::{net::TcpListener, sync::Semaphore, task::JoinHandle};
58use tower_governor::{GovernorLayer, governor::GovernorConfigBuilder};
59use tower_http::{
60 cors::{Any, CorsLayer},
61 trace::TraceLayer,
62};
63
64pub const DEFAULT_REST_PORT: u16 = 3030;
66
67pub const API_VERSION_V1: &str = "v1";
69pub const API_VERSION_V2: &str = "v2";
70
71#[derive(Clone)]
73pub struct Rest<N: Network, C: ConsensusStorage<N>, R: Routing<N>> {
74 cdn_sync: Option<Arc<CdnBlockSync>>,
76 consensus: Option<Consensus<N>>,
78 ledger: Ledger<N, C>,
80 routing: Arc<R>,
82 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
84 block_sync: Arc<BlockSync<N>>,
86 num_verifying_deploys: Arc<Semaphore>,
88 num_verifying_executions: Arc<Semaphore>,
90 num_verifying_solutions: Arc<Semaphore>,
92}
93
94impl<N: Network, C: 'static + ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
95 pub async fn start(
97 rest_ip: SocketAddr,
98 rest_rps: u32,
99 consensus: Option<Consensus<N>>,
100 ledger: Ledger<N, C>,
101 routing: Arc<R>,
102 cdn_sync: Option<Arc<CdnBlockSync>>,
103 block_sync: Arc<BlockSync<N>>,
104 ) -> Result<Self> {
105 let mut server = Self {
107 consensus,
108 ledger,
109 routing,
110 cdn_sync,
111 block_sync,
112 handles: Default::default(),
113 num_verifying_deploys: Arc::new(Semaphore::new(VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS)),
114 num_verifying_executions: Arc::new(Semaphore::new(VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS)),
115 num_verifying_solutions: Arc::new(Semaphore::new(N::MAX_SOLUTIONS)),
116 };
117 server.spawn_server(rest_ip, rest_rps).await?;
119 Ok(server)
121 }
122}
123
124impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
125 pub const fn ledger(&self) -> &Ledger<N, C> {
127 &self.ledger
128 }
129
130 pub const fn handles(&self) -> &Arc<Mutex<Vec<JoinHandle<()>>>> {
132 &self.handles
133 }
134}
135
136impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
137 fn build_routes(&self, rest_rps: u32) -> axum::Router {
138 let cors = CorsLayer::new()
139 .allow_origin(Any)
140 .allow_methods([Method::GET, Method::POST, Method::OPTIONS])
141 .allow_headers([CONTENT_TYPE]);
142
143 let governor_config = Box::new(
145 GovernorConfigBuilder::default()
146 .per_nanosecond((1_000_000_000 / rest_rps) as u64)
147 .burst_size(rest_rps)
148 .error_handler(|error| {
149 let error_message = error.to_string();
151 let mut response = Response::new(error_message.clone().into());
152 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
153 if error_message.contains("Too Many Requests") {
154 *response.status_mut() = StatusCode::TOO_MANY_REQUESTS;
155 }
156 response
157 })
158 .finish()
159 .expect("Couldn't set up rate limiting for the REST server!"),
160 );
161
162 let routes = axum::Router::new()
163
164 .route("/node/address", get(Self::get_node_address))
166 .route("/program/{id}/mapping/{name}", get(Self::get_mapping_values))
167 .route("/db_backup", post(Self::db_backup))
168 .route_layer(middleware::from_fn(auth_middleware))
169
170 .route("/consensus_version", get(Self::get_consensus_version))
172
173 .route("/block/height/latest", get(Self::get_block_height_latest))
175 .route("/block/hash/latest", get(Self::get_block_hash_latest))
176 .route("/block/latest", get(Self::get_block_latest))
177 .route("/block/{height_or_hash}", get(Self::get_block))
178 .route("/block/{height_or_hash}/header", get(Self::get_block_header))
181 .route("/block/{height_or_hash}/transactions", get(Self::get_block_transactions))
182
183 .route("/transaction/{id}", get(Self::get_transaction))
185 .route("/transaction/confirmed/{id}", get(Self::get_confirmed_transaction))
186 .route("/transaction/unconfirmed/{id}", get(Self::get_unconfirmed_transaction))
187 .route("/transaction/broadcast", post(Self::transaction_broadcast))
188
189 .route("/solution/broadcast", post(Self::solution_broadcast))
191
192 .route("/find/blockHash/{tx_id}", get(Self::find_block_hash))
194 .route("/find/blockHeight/{state_root}", get(Self::find_block_height_from_state_root))
195 .route("/find/transactionID/deployment/{program_id}", get(Self::find_latest_transaction_id_from_program_id))
196 .route("/find/transactionID/deployment/{program_id}/{edition}", get(Self::find_latest_transaction_id_from_program_id_and_edition))
197 .route("/find/transactionID/deployment/{program_id}/{edition}/original", get(Self::find_original_deployment_transaction_id))
198 .route("/find/transactionID/deployment/{program_id}/{edition}/{amendment}", get(Self::find_transaction_id_from_program_id_edition_and_amendment))
199 .route("/find/transactionID/{transition_id}", get(Self::find_transaction_id_from_transition_id))
200 .route("/find/transitionID/{input_or_output_id}", get(Self::find_transition_id))
201
202 .route("/peers/count", get(Self::get_peers_count))
204 .route("/peers/all", get(Self::get_peers_all))
205 .route("/peers/all/metrics", get(Self::get_peers_all_metrics))
206 .route("/connections/p2p/count", get(Self::get_peers_count))
207 .route("/connections/p2p/all", get(Self::get_peers_all))
208 .route("/connections/p2p/all/metrics", get(Self::get_peers_all_metrics))
209
210 .route("/program/{id}", get(Self::get_program))
212 .route("/program/{id}/latest_edition", get(Self::get_latest_program_edition))
213 .route("/program/{id}/{edition}", get(Self::get_program_for_edition))
214 .route("/program/{id}/mappings", get(Self::get_mapping_names))
215 .route("/program/{id}/mapping/{name}/{key}", get(Self::get_mapping_value))
216 .route("/program/{id}/amendment_count", get(Self::get_program_amendment_count))
217 .route("/program/{id}/{edition}/amendment_count", get(Self::get_program_amendment_count_for_edition))
218
219 .route("/sync_status", get(Self::get_sync_status))
222 .route("/sync/status", get(Self::get_sync_status))
223 .route("/sync/peers", get(Self::get_sync_peers))
224 .route("/sync/requests", get(Self::get_sync_requests_summary))
225 .route("/sync/requests/list", get(Self::get_sync_requests_list))
226
227 .route("/version", get(Self::get_version))
229 .route("/blocks", get(Self::get_blocks))
230 .route("/height/{hash}", get(Self::get_height))
231 .route("/memoryPool/transmissions", get(Self::get_memory_pool_transmissions))
232 .route("/memoryPool/solutions", get(Self::get_memory_pool_solutions))
233 .route("/memoryPool/transactions", get(Self::get_memory_pool_transactions))
234 .route("/statePath/{commitment}", get(Self::get_state_path_for_commitment))
235 .route("/statePaths", get(Self::get_state_paths_for_commitments))
236 .route("/stateRoot/latest", get(Self::get_state_root_latest))
237 .route("/stateRoot/{height}", get(Self::get_state_root))
238 .route("/committee/latest", get(Self::get_committee_latest))
239 .route("/committee/{height}", get(Self::get_committee))
240 .route("/delegators/{validator}", get(Self::get_delegators_for_validator));
241
242 let routes = match self.consensus {
244 Some(_) => routes
245 .route("/connections/bft/count", get(Self::get_bft_connections_count))
246 .route("/connections/bft/all", get(Self::get_bft_connections_all)),
247 None => routes,
248 };
249
250 #[cfg(feature = "telemetry")]
252 let routes = match self.consensus {
253 Some(_) => routes.route("/validators/participation", get(Self::get_validator_participation_scores)),
254 None => routes,
255 };
256
257 #[cfg(feature = "history")]
259 let routes = routes.route("/program/{id}/mapping/{name}/{key}/history/{height}", get(Self::get_history));
260
261 #[cfg(feature = "history-staking-rewards")]
263 let routes = routes.route("/staking/rewards/{address}/{height}", get(Self::get_staking_reward));
264
265 routes
266 .with_state(self.clone())
268 .layer(TraceLayer::new_for_http())
270 .layer(middleware::map_request(log_middleware))
272 .layer(cors)
274 .layer(DefaultBodyLimit::max(512 * 1024))
276 .layer(GovernorLayer {
277 config: governor_config.into(),
278 })
279 }
280
281 async fn spawn_server(&mut self, rest_ip: SocketAddr, rest_rps: u32) -> Result<()> {
282 debug!("REST rate limit per IP - {rest_rps} RPS");
284
285 let default_router = axum::Router::new().nest(
287 &format!("/{}", N::SHORT_NAME),
288 self.build_routes(rest_rps).layer(middleware::map_response(v1_error_middleware)),
289 );
290 let v1_router = axum::Router::new().nest(
291 &format!("/{API_VERSION_V1}/{}", N::SHORT_NAME),
292 self.build_routes(rest_rps).layer(middleware::map_response(v1_error_middleware)),
293 );
294
295 let v2_router =
297 axum::Router::new().nest(&format!("/{API_VERSION_V2}/{}", N::SHORT_NAME), self.build_routes(rest_rps));
298
299 let router = default_router.merge(v1_router).merge(v2_router);
301
302 let rest_listener =
303 TcpListener::bind(rest_ip).await.with_context(|| "Failed to bind TCP port for REST endpoints")?;
304
305 let handle = tokio::spawn(async move {
306 axum::serve(rest_listener, router.into_make_service_with_connect_info::<SocketAddr>())
307 .await
308 .expect("couldn't start rest server");
309 });
310
311 self.handles.lock().push(handle);
312 Ok(())
313 }
314}
315
316async fn log_middleware(ConnectInfo(addr): ConnectInfo<SocketAddr>, request: Request<Body>) -> Request<Body> {
318 info!("Received '{} {}' from '{addr}'", request.method(), request.uri());
319 request
320}
321
322async fn v1_error_middleware(response: Response) -> Response {
325 const V1_STATUS_CODE: StatusCode = StatusCode::INTERNAL_SERVER_ERROR;
327
328 if response.status().is_success() {
329 return response;
330 }
331
332 let fallback = || {
334 let mut response = Response::new(Body::from("Failed to convert error"));
335 *response.status_mut() = V1_STATUS_CODE;
336 response
337 };
338
339 let Ok(bytes) = axum::body::to_bytes(response.into_body(), usize::MAX).await else {
340 return fallback();
341 };
342
343 let Ok(json_err) = serde_json::from_slice::<SerializedRestError>(&bytes) else {
345 return fallback();
346 };
347
348 let mut message = json_err.message;
349 for next in json_err.chain.into_iter() {
350 message = format!("{message} — {next}");
351 }
352
353 let mut response = Response::new(Body::from(message));
354
355 *response.status_mut() = V1_STATUS_CODE;
356
357 response
358}
359
360pub fn fmt_id(id: impl ToString) -> String {
362 let id = id.to_string();
363 let mut formatted_id = id.chars().take(16).collect::<String>();
364 if id.chars().count() > 16 {
365 formatted_id.push_str("..");
366 }
367 formatted_id
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373 use anyhow::anyhow;
374 use axum::{
375 Router,
376 body::Body,
377 http::{Request, StatusCode},
378 middleware,
379 routing::get,
380 };
381 use tower::ServiceExt; fn test_app() -> Router {
384 let build_routes = || {
385 Router::new()
386 .route("/not_found", get(|| async { Err::<(), RestError>(RestError::not_found(anyhow!("missing"))) }))
387 .route("/bad_request", get(|| async { Err::<(), RestError>(RestError::bad_request(anyhow!("bad"))) }))
388 .route(
389 "/service_unavailable",
390 get(|| async { Err::<(), RestError>(RestError::service_unavailable(anyhow!("gone"))) }),
391 )
392 };
393 let router_v1 = build_routes().route_layer(middleware::map_response(v1_error_middleware));
394 let router_v2 = Router::new().nest(&format!("/{API_VERSION_V2}"), build_routes());
395 router_v1.merge(router_v2)
396 }
397
398 #[tokio::test]
399 async fn v1_routes_force_internal_server_error() {
400 let app = test_app();
401
402 let res = app.clone().oneshot(Request::builder().uri("/not_found").body(Body::empty()).unwrap()).await.unwrap();
403 assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
404
405 let res =
406 app.clone().oneshot(Request::builder().uri("/bad_request").body(Body::empty()).unwrap()).await.unwrap();
407 assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
408
409 let res =
410 app.oneshot(Request::builder().uri("/service_unavailable").body(Body::empty()).unwrap()).await.unwrap();
411 assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
412 }
413
414 #[tokio::test]
415 async fn v2_routes_return_specific_errors() {
416 let app = test_app();
417
418 let res =
419 app.clone().oneshot(Request::builder().uri("/v2/not_found").body(Body::empty()).unwrap()).await.unwrap();
420 assert_eq!(res.status(), StatusCode::NOT_FOUND);
421
422 let res =
423 app.clone().oneshot(Request::builder().uri("/v2/bad_request").body(Body::empty()).unwrap()).await.unwrap();
424 assert_eq!(res.status(), StatusCode::BAD_REQUEST);
425
426 let res =
427 app.oneshot(Request::builder().uri("/v2/service_unavailable").body(Body::empty()).unwrap()).await.unwrap();
428 assert_eq!(res.status(), StatusCode::SERVICE_UNAVAILABLE);
429 }
430}