snarkos_node_rest/
lib.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21mod helpers;
22// Imports custom `Path` type, to be used instead of `axum`'s.
23pub use helpers::*;
24
25mod routes;
26
27mod version;
28
29use snarkos_node_cdn::CdnBlockSync;
30use snarkos_node_consensus::Consensus;
31use snarkos_node_router::{
32    Routing,
33    messages::{Message, UnconfirmedTransaction},
34};
35use snarkos_node_sync::BlockSync;
36use snarkvm::{
37    console::{program::ProgramID, types::Field},
38    ledger::narwhal::Data,
39    prelude::{Ledger, Network, cfg_into_iter, store::ConsensusStorage},
40};
41
42use anyhow::{Context, Result};
43use axum::{
44    body::Body,
45    extract::{ConnectInfo, DefaultBodyLimit, Query, State},
46    http::{Method, Request, StatusCode, header::CONTENT_TYPE},
47    middleware,
48    response::Response,
49    routing::{get, post},
50};
51use axum_extra::response::ErasedJson;
52#[cfg(feature = "locktick")]
53use locktick::parking_lot::Mutex;
54#[cfg(not(feature = "locktick"))]
55use parking_lot::Mutex;
56use std::{
57    net::SocketAddr,
58    sync::{Arc, atomic::AtomicUsize},
59};
60use tokio::{net::TcpListener, task::JoinHandle};
61use tower_governor::{GovernorLayer, governor::GovernorConfigBuilder};
62use tower_http::{
63    cors::{Any, CorsLayer},
64    trace::TraceLayer,
65};
66
67/// The default port used for the REST API
68pub const DEFAULT_REST_PORT: u16 = 3030;
69
70/// The API version prefixes.
71pub const API_VERSION_V1: &str = "v1";
72pub const API_VERSION_V2: &str = "v2";
73
74/// A REST API server for the ledger.
75#[derive(Clone)]
76pub struct Rest<N: Network, C: ConsensusStorage<N>, R: Routing<N>> {
77    /// CDN sync (only if node is using the CDN to sync).
78    cdn_sync: Option<Arc<CdnBlockSync>>,
79    /// The consensus module.
80    consensus: Option<Consensus<N>>,
81    /// The ledger.
82    ledger: Ledger<N, C>,
83    /// The node (routing).
84    routing: Arc<R>,
85    /// The server handles.
86    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
87    /// A reference to BlockSync,
88    block_sync: Arc<BlockSync<N>>,
89    /// The number of ongoing deploy transaction verifications via REST.
90    num_verifying_deploys: Arc<AtomicUsize>,
91    /// The number of ongoing execute transaction verifications via REST.
92    num_verifying_executions: Arc<AtomicUsize>,
93    /// The number of ongoing solution verifications via REST.
94    num_verifying_solutions: Arc<AtomicUsize>,
95}
96
97impl<N: Network, C: 'static + ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
98    /// Initializes a new instance of the server.
99    pub async fn start(
100        rest_ip: SocketAddr,
101        rest_rps: u32,
102        consensus: Option<Consensus<N>>,
103        ledger: Ledger<N, C>,
104        routing: Arc<R>,
105        cdn_sync: Option<Arc<CdnBlockSync>>,
106        block_sync: Arc<BlockSync<N>>,
107    ) -> Result<Self> {
108        // Initialize the server.
109        let mut server = Self {
110            consensus,
111            ledger,
112            routing,
113            cdn_sync,
114            block_sync,
115            handles: Default::default(),
116            num_verifying_deploys: Default::default(),
117            num_verifying_executions: Default::default(),
118            num_verifying_solutions: Default::default(),
119        };
120        // Spawn the server.
121        server.spawn_server(rest_ip, rest_rps).await?;
122        // Return the server.
123        Ok(server)
124    }
125}
126
127impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
128    /// Returns the ledger.
129    pub const fn ledger(&self) -> &Ledger<N, C> {
130        &self.ledger
131    }
132
133    /// Returns the handles.
134    pub const fn handles(&self) -> &Arc<Mutex<Vec<JoinHandle<()>>>> {
135        &self.handles
136    }
137}
138
139impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
140    fn build_routes(&self, rest_rps: u32) -> axum::Router {
141        let cors = CorsLayer::new()
142            .allow_origin(Any)
143            .allow_methods([Method::GET, Method::POST, Method::OPTIONS])
144            .allow_headers([CONTENT_TYPE]);
145
146        // Prepare the rate limiting setup.
147        let governor_config = Box::new(
148            GovernorConfigBuilder::default()
149                .per_nanosecond((1_000_000_000 / rest_rps) as u64)
150                .burst_size(rest_rps)
151                .error_handler(|error| {
152                    // Properly return a 429 Too Many Requests error
153                    let error_message = error.to_string();
154                    let mut response = Response::new(error_message.clone().into());
155                    *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
156                    if error_message.contains("Too Many Requests") {
157                        *response.status_mut() = StatusCode::TOO_MANY_REQUESTS;
158                    }
159                    response
160                })
161                .finish()
162                .expect("Couldn't set up rate limiting for the REST server!"),
163        );
164
165        let routes = axum::Router::new()
166
167            // All the endpoints before the call to `route_layer` are protected with JWT auth.
168            .route("/node/address", get(Self::get_node_address))
169            .route("/program/{id}/mapping/{name}", get(Self::get_mapping_values))
170            .route("/db_backup", post(Self::db_backup))
171            .route_layer(middleware::from_fn(auth_middleware))
172
173             // Get ../consensus_version
174            .route("/consensus_version", get(Self::get_consensus_version))
175
176            // GET ../block/..
177            .route("/block/height/latest", get(Self::get_block_height_latest))
178            .route("/block/hash/latest", get(Self::get_block_hash_latest))
179            .route("/block/latest", get(Self::get_block_latest))
180            .route("/block/{height_or_hash}", get(Self::get_block))
181            // The path param here is actually only the height, but the name must match the route
182            // above, otherwise there'll be a conflict at runtime.
183            .route("/block/{height_or_hash}/header", get(Self::get_block_header))
184            .route("/block/{height_or_hash}/transactions", get(Self::get_block_transactions))
185
186            // GET and POST ../transaction/..
187            .route("/transaction/{id}", get(Self::get_transaction))
188            .route("/transaction/confirmed/{id}", get(Self::get_confirmed_transaction))
189            .route("/transaction/unconfirmed/{id}", get(Self::get_unconfirmed_transaction))
190            .route("/transaction/broadcast", post(Self::transaction_broadcast))
191
192            // POST ../solution/broadcast
193            .route("/solution/broadcast", post(Self::solution_broadcast))
194
195            // GET ../find/..
196            .route("/find/blockHash/{tx_id}", get(Self::find_block_hash))
197            .route("/find/blockHeight/{state_root}", get(Self::find_block_height_from_state_root))
198            .route("/find/transactionID/deployment/{program_id}", get(Self::find_latest_transaction_id_from_program_id))
199            .route("/find/transactionID/deployment/{program_id}/{edition}", get(Self::find_transaction_id_from_program_id_and_edition))
200            .route("/find/transactionID/{transition_id}", get(Self::find_transaction_id_from_transition_id))
201            .route("/find/transitionID/{input_or_output_id}", get(Self::find_transition_id))
202
203            // GET ../peers/..
204            .route("/peers/count", get(Self::get_peers_count))
205            .route("/peers/all", get(Self::get_peers_all))
206            .route("/peers/all/metrics", get(Self::get_peers_all_metrics))
207
208            // GET ../program/..
209            .route("/program/{id}", get(Self::get_program))
210            .route("/program/{id}/latest_edition", get(Self::get_latest_program_edition))
211            .route("/program/{id}/{edition}", get(Self::get_program_for_edition))
212            .route("/program/{id}/mappings", get(Self::get_mapping_names))
213            .route("/program/{id}/mapping/{name}/{key}", get(Self::get_mapping_value))
214
215            // GET ../sync/..
216            // Note: keeping ../sync_status for compatibility
217            .route("/sync_status", get(Self::get_sync_status))
218            .route("/sync/status", get(Self::get_sync_status))
219            .route("/sync/peers", get(Self::get_sync_peers))
220            .route("/sync/requests", get(Self::get_sync_requests_summary))
221            .route("/sync/requests/list", get(Self::get_sync_requests_list))
222
223            // GET misc endpoints.
224            .route("/version", get(Self::get_version))
225            .route("/blocks", get(Self::get_blocks))
226            .route("/height/{hash}", get(Self::get_height))
227            .route("/memoryPool/transmissions", get(Self::get_memory_pool_transmissions))
228            .route("/memoryPool/solutions", get(Self::get_memory_pool_solutions))
229            .route("/memoryPool/transactions", get(Self::get_memory_pool_transactions))
230            .route("/statePath/{commitment}", get(Self::get_state_path_for_commitment))
231            .route("/statePaths", get(Self::get_state_paths_for_commitments))
232            .route("/stateRoot/latest", get(Self::get_state_root_latest))
233            .route("/stateRoot/{height}", get(Self::get_state_root))
234            .route("/committee/latest", get(Self::get_committee_latest))
235            .route("/committee/{height}", get(Self::get_committee))
236            .route("/delegators/{validator}", get(Self::get_delegators_for_validator));
237
238        // If the node is a validator and `telemetry` features is enabled, enable the additional endpoint.
239        #[cfg(feature = "telemetry")]
240        let routes = match self.consensus {
241            Some(_) => routes.route("/validators/participation", get(Self::get_validator_participation_scores)),
242            None => routes,
243        };
244
245        // If the `history` feature is enabled, enable the additional endpoint.
246        #[cfg(feature = "history")]
247        let routes = routes.route("/block/{blockHeight}/history/{mapping}", get(Self::get_history));
248
249        routes
250            // Pass in `Rest` to make things convenient.
251            .with_state(self.clone())
252            // Enable tower-http tracing.
253            .layer(TraceLayer::new_for_http())
254            // Custom logging.
255            .layer(middleware::map_request(log_middleware))
256            // Enable CORS.
257            .layer(cors)
258            // Cap the request body size at 512KiB.
259            .layer(DefaultBodyLimit::max(512 * 1024))
260            .layer(GovernorLayer {
261                config: governor_config.into(),
262            })
263    }
264
265    async fn spawn_server(&mut self, rest_ip: SocketAddr, rest_rps: u32) -> Result<()> {
266        // Log the REST rate limit per IP.
267        debug!("REST rate limit per IP - {rest_rps} RPS");
268
269        // Add the v1 API as default and under "/v1".
270        let default_router = axum::Router::new().nest(
271            &format!("/{}", N::SHORT_NAME),
272            self.build_routes(rest_rps).layer(middleware::map_response(v1_error_middleware)),
273        );
274        let v1_router = axum::Router::new().nest(
275            &format!("/{API_VERSION_V1}/{}", N::SHORT_NAME),
276            self.build_routes(rest_rps).layer(middleware::map_response(v1_error_middleware)),
277        );
278
279        // Add the v2 API under "/v2".
280        let v2_router =
281            axum::Router::new().nest(&format!("/{API_VERSION_V2}/{}", N::SHORT_NAME), self.build_routes(rest_rps));
282
283        // Combine all routes.
284        let router = default_router.merge(v1_router).merge(v2_router);
285
286        let rest_listener =
287            TcpListener::bind(rest_ip).await.with_context(|| "Failed to bind TCP port for REST endpoints")?;
288
289        let handle = tokio::spawn(async move {
290            axum::serve(rest_listener, router.into_make_service_with_connect_info::<SocketAddr>())
291                .await
292                .expect("couldn't start rest server");
293        });
294
295        self.handles.lock().push(handle);
296        Ok(())
297    }
298}
299
300/// Creates a log message for every HTTP request.
301async fn log_middleware(ConnectInfo(addr): ConnectInfo<SocketAddr>, request: Request<Body>) -> Request<Body> {
302    info!("Received '{} {}' from '{addr}'", request.method(), request.uri());
303    request
304}
305
306/// Converts errors to the old style for the v1 API.
307/// The error code will always be 500 and the content a simple string.
308async fn v1_error_middleware(response: Response) -> Response {
309    // The status code used by all v1 errors
310    const V1_STATUS_CODE: StatusCode = StatusCode::INTERNAL_SERVER_ERROR;
311
312    if response.status().is_success() {
313        return response;
314    }
315
316    // Returns a opaque error instead of panicking.
317    let fallback = || {
318        let mut response = Response::new(Body::from("Failed to convert error"));
319        *response.status_mut() = V1_STATUS_CODE;
320        response
321    };
322
323    let Ok(bytes) = axum::body::to_bytes(response.into_body(), usize::MAX).await else {
324        return fallback();
325    };
326
327    // Deserialize REST error so we can convert it to a string
328    let Ok(json_err) = serde_json::from_slice::<SerializedRestError>(&bytes) else {
329        return fallback();
330    };
331
332    let mut message = json_err.message;
333    for next in json_err.chain.into_iter() {
334        message = format!("{message} — {next}");
335    }
336
337    let mut response = Response::new(Body::from(message));
338
339    *response.status_mut() = V1_STATUS_CODE;
340
341    response
342}
343
344/// Formats an ID into a truncated identifier (for logging purposes).
345pub fn fmt_id(id: impl ToString) -> String {
346    let id = id.to_string();
347    let mut formatted_id = id.chars().take(16).collect::<String>();
348    if id.chars().count() > 16 {
349        formatted_id.push_str("..");
350    }
351    formatted_id
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357    use anyhow::anyhow;
358    use axum::{
359        Router,
360        body::Body,
361        http::{Request, StatusCode},
362        middleware,
363        routing::get,
364    };
365    use tower::ServiceExt; // for `oneshot`
366
367    fn test_app() -> Router {
368        let build_routes = || {
369            Router::new()
370                .route("/not_found", get(|| async { Err::<(), RestError>(RestError::not_found(anyhow!("missing"))) }))
371                .route("/bad_request", get(|| async { Err::<(), RestError>(RestError::bad_request(anyhow!("bad"))) }))
372                .route(
373                    "/service_unavailable",
374                    get(|| async { Err::<(), RestError>(RestError::service_unavailable(anyhow!("gone"))) }),
375                )
376        };
377        let router_v1 = build_routes().route_layer(middleware::map_response(v1_error_middleware));
378        let router_v2 = Router::new().nest(&format!("/{API_VERSION_V2}"), build_routes());
379        router_v1.merge(router_v2)
380    }
381
382    #[tokio::test]
383    async fn v1_routes_force_internal_server_error() {
384        let app = test_app();
385
386        let res = app.clone().oneshot(Request::builder().uri("/not_found").body(Body::empty()).unwrap()).await.unwrap();
387        assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
388
389        let res =
390            app.clone().oneshot(Request::builder().uri("/bad_request").body(Body::empty()).unwrap()).await.unwrap();
391        assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
392
393        let res =
394            app.oneshot(Request::builder().uri("/service_unavailable").body(Body::empty()).unwrap()).await.unwrap();
395        assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
396    }
397
398    #[tokio::test]
399    async fn v2_routes_return_specific_errors() {
400        let app = test_app();
401
402        let res =
403            app.clone().oneshot(Request::builder().uri("/v2/not_found").body(Body::empty()).unwrap()).await.unwrap();
404        assert_eq!(res.status(), StatusCode::NOT_FOUND);
405
406        let res =
407            app.clone().oneshot(Request::builder().uri("/v2/bad_request").body(Body::empty()).unwrap()).await.unwrap();
408        assert_eq!(res.status(), StatusCode::BAD_REQUEST);
409
410        let res =
411            app.oneshot(Request::builder().uri("/v2/service_unavailable").body(Body::empty()).unwrap()).await.unwrap();
412        assert_eq!(res.status(), StatusCode::SERVICE_UNAVAILABLE);
413    }
414}