Skip to main content

snarkos_node_rest/
lib.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21mod helpers;
22// Imports custom `Path` type, to be used instead of `axum`'s.
23pub use helpers::*;
24
25mod routes;
26
27mod version;
28
29use snarkos_node_cdn::CdnBlockSync;
30use snarkos_node_consensus::Consensus;
31use snarkos_node_router::{
32    Routing,
33    messages::{Message, UnconfirmedTransaction},
34};
35use snarkos_node_sync::BlockSync;
36use snarkvm::{
37    console::{program::ProgramID, types::Field},
38    ledger::narwhal::Data,
39    prelude::{Ledger, Network, VM, cfg_into_iter, store::ConsensusStorage},
40};
41
42use anyhow::{Context, Result};
43use axum::{
44    body::Body,
45    extract::{ConnectInfo, DefaultBodyLimit, Query, State},
46    http::{Method, Request, StatusCode, header::CONTENT_TYPE},
47    middleware,
48    response::Response,
49    routing::{get, post},
50};
51use axum_extra::response::ErasedJson;
52#[cfg(feature = "locktick")]
53use locktick::parking_lot::Mutex;
54use lru::LruCache;
55#[cfg(not(feature = "locktick"))]
56use parking_lot::Mutex;
57use std::{net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
58use tokio::{net::TcpListener, sync::Semaphore, task::JoinHandle};
59use tower_governor::{GovernorLayer, governor::GovernorConfigBuilder};
60use tower_http::{
61    cors::{Any, CorsLayer},
62    trace::TraceLayer,
63};
64use tracing::Span;
65
66/// The default port used for the REST API
67pub const DEFAULT_REST_PORT: u16 = 3030;
68
69/// The API version prefixes.
70pub const API_VERSION_V1: &str = "v1";
71pub const API_VERSION_V2: &str = "v2";
72
73/// The capacity of the LRU holding recently requested blocks.
74const BLOCK_CACHE_SIZE: usize = 128;
75
76/// A REST API server for the ledger.
77#[derive(Clone)]
78pub struct Rest<N: Network, C: ConsensusStorage<N>, R: Routing<N>> {
79    /// CDN sync (only if node is using the CDN to sync).
80    cdn_sync: Option<Arc<CdnBlockSync>>,
81    /// The consensus module.
82    consensus: Option<Consensus<N>>,
83    /// The ledger.
84    ledger: Ledger<N, C>,
85    /// The node (routing).
86    routing: Arc<R>,
87    /// The server handles.
88    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
89    /// A reference to BlockSync,
90    block_sync: Arc<BlockSync<N>>,
91    /// The number of ongoing deploy transaction verifications via REST.
92    num_verifying_deploys: Arc<Semaphore>,
93    /// The number of ongoing execute transaction verifications via REST.
94    num_verifying_executions: Arc<Semaphore>,
95    /// The number of ongoing solution verifications via REST.
96    num_verifying_solutions: Arc<Semaphore>,
97    /// A cache containing recently requested blocks.
98    block_cache: Arc<Mutex<LruCache<N::BlockHash, ErasedJson>>>,
99}
100
101impl<N: Network, C: 'static + ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
102    /// Initializes a new instance of the server.
103    pub async fn start(
104        rest_ip: SocketAddr,
105        rest_rps: u32,
106        consensus: Option<Consensus<N>>,
107        ledger: Ledger<N, C>,
108        routing: Arc<R>,
109        cdn_sync: Option<Arc<CdnBlockSync>>,
110        block_sync: Arc<BlockSync<N>>,
111    ) -> Result<Self> {
112        // Initialize the server.
113        let mut server = Self {
114            consensus,
115            ledger,
116            routing,
117            cdn_sync,
118            block_sync,
119            handles: Default::default(),
120            num_verifying_deploys: Arc::new(Semaphore::new(VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS)),
121            num_verifying_executions: Arc::new(Semaphore::new(VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS)),
122            num_verifying_solutions: Arc::new(Semaphore::new(N::MAX_SOLUTIONS)),
123            block_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(BLOCK_CACHE_SIZE).unwrap()))),
124        };
125        // Spawn the server.
126        server.spawn_server(rest_ip, rest_rps).await?;
127        // Return the server.
128        Ok(server)
129    }
130}
131
132impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
133    /// Returns the ledger.
134    pub const fn ledger(&self) -> &Ledger<N, C> {
135        &self.ledger
136    }
137
138    /// Returns the handles.
139    pub const fn handles(&self) -> &Arc<Mutex<Vec<JoinHandle<()>>>> {
140        &self.handles
141    }
142
143    /// Shuts down the REST instance.
144    pub fn shut_down(&self) {
145        self.handles.lock().iter().for_each(|handle| handle.abort());
146    }
147}
148
149impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
150    fn build_routes(&self, rest_rps: u32) -> axum::Router {
151        let cors = CorsLayer::new()
152            .allow_origin(Any)
153            .allow_methods([Method::GET, Method::POST, Method::DELETE, Method::OPTIONS])
154            .allow_headers([CONTENT_TYPE]);
155
156        // Prepare the rate limiting setup.
157        let governor_config = Box::new(
158            GovernorConfigBuilder::default()
159                .per_nanosecond((1_000_000_000 / rest_rps) as u64)
160                .burst_size(rest_rps)
161                .error_handler(|error| {
162                    // Properly return a 429 Too Many Requests error
163                    let error_message = error.to_string();
164                    let mut response = Response::new(error_message.clone().into());
165                    *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
166                    if error_message.contains("Too Many Requests") {
167                        *response.status_mut() = StatusCode::TOO_MANY_REQUESTS;
168                    }
169                    response
170                })
171                .finish()
172                .expect("Couldn't set up rate limiting for the REST server!"),
173        );
174
175        // Build the JWT auth-protected endpoints. #[cfg] cannot appear inside a method chain, so we
176        // build this router as a named binding and conditionally extend it before applying the layer.
177        let auth_routes = axum::Router::new()
178            .route("/node/address", get(Self::get_node_address))
179            .route("/program/{id}/mapping/{name}", get(Self::get_mapping_values))
180            .route("/db_backup", post(Self::db_backup));
181
182        // Slipstream plugin management endpoints require auth.
183        #[cfg(feature = "slipstream-plugins")]
184        let auth_routes = auth_routes
185            .route("/slipstream/plugins", get(Self::slipstream_list_plugins).post(Self::slipstream_load_plugin))
186            .route(
187                "/slipstream/plugins/{name}",
188                // TODO: PUT (reload) is not yet implemented.
189                axum::routing::delete(Self::slipstream_unload_plugin),
190            );
191
192        let routes = axum::Router::new()
193            .merge(auth_routes.route_layer(middleware::from_fn(auth_middleware)))
194
195            // All endpoints declared after here are not protected
196
197             // Get ../consensus_version
198            .route("/consensus_version", get(Self::get_consensus_version))
199
200            // GET ../block/..
201            .route("/block/height/latest", get(Self::get_block_height_latest))
202            .route("/block/hash/latest", get(Self::get_block_hash_latest))
203            .route("/block/latest", get(Self::get_block_latest))
204            .route("/block/{height_or_hash}", get(Self::get_block))
205            // The path param here is actually only the height, but the name must match the route
206            // above, otherwise there'll be a conflict at runtime.
207            .route("/block/{height_or_hash}/header", get(Self::get_block_header))
208            .route("/block/{height_or_hash}/transactions", get(Self::get_block_transactions))
209
210            // GET and POST ../transaction/..
211            .route("/transaction/{id}", get(Self::get_transaction))
212            .route("/transaction/confirmed/{id}", get(Self::get_confirmed_transaction))
213            .route("/transaction/unconfirmed/{id}", get(Self::get_unconfirmed_transaction))
214            .route("/transaction/broadcast", post(Self::transaction_broadcast))
215
216            // GET and POST ../solution/..
217            .route("/solution/limits/{prover_address}", get(Self::get_solution_limits_for_prover))
218            .route("/solution/broadcast", post(Self::solution_broadcast))
219
220            // GET ../find/..
221            .route("/find/blockHash/{tx_id}", get(Self::find_block_hash))
222            .route("/find/blockHeight/{state_root}", get(Self::find_block_height_from_state_root))
223            .route("/find/transactionID/deployment/{program_id}", get(Self::find_latest_transaction_id_from_program_id))
224            .route("/find/transactionID/deployment/{program_id}/{edition}", get(Self::find_latest_transaction_id_from_program_id_and_edition))
225            .route("/find/transactionID/deployment/{program_id}/{edition}/original", get(Self::find_original_deployment_transaction_id))
226            .route("/find/transactionID/deployment/{program_id}/{edition}/{amendment}", get(Self::find_transaction_id_from_program_id_edition_and_amendment))
227            .route("/find/transactionID/{transition_id}", get(Self::find_transaction_id_from_transition_id))
228            .route("/find/transitionID/{input_or_output_id}", get(Self::find_transition_id))
229
230            // GET ../connections/p2p/.. (with ../peers/.. aliases)
231            .route("/peers/count", get(Self::get_peers_count))
232            .route("/peers/all", get(Self::get_peers_all))
233            .route("/peers/all/metrics", get(Self::get_peers_all_metrics))
234            .route("/connections/p2p/count", get(Self::get_peers_count))
235            .route("/connections/p2p/all", get(Self::get_peers_all))
236            .route("/connections/p2p/all/metrics", get(Self::get_peers_all_metrics))
237
238            // GET ../program/..
239            .route("/program/{id}", get(Self::get_program))
240            .route("/program/{id}/latest_edition", get(Self::get_latest_program_edition))
241            .route("/program/{id}/{edition}", get(Self::get_program_for_edition))
242            .route("/program/{id}/mappings", get(Self::get_mapping_names))
243            .route("/program/{id}/mapping/{name}/{key}", get(Self::get_mapping_value))
244            .route("/program/{id}/amendment_count", get(Self::get_program_amendment_count))
245            .route("/program/{id}/{edition}/amendment_count", get(Self::get_program_amendment_count_for_edition))
246
247            // GET ../sync/..
248            // Note: keeping ../sync_status for compatibility
249            .route("/sync_status", get(Self::get_sync_status))
250            .route("/sync/status", get(Self::get_sync_status))
251            .route("/sync/peers", get(Self::get_sync_peers))
252            .route("/sync/requests", get(Self::get_sync_requests_summary))
253            .route("/sync/requests/list", get(Self::get_sync_requests_list))
254
255            // GET misc endpoints.
256            .route("/version", get(Self::get_version))
257            .route("/blocks", get(Self::get_blocks))
258            .route("/height/{hash}", get(Self::get_height))
259            .route("/memoryPool/transmissions", get(Self::get_memory_pool_transmissions))
260            .route("/memoryPool/solutions", get(Self::get_memory_pool_solutions))
261            .route("/memoryPool/transactions", get(Self::get_memory_pool_transactions))
262            .route("/statePath/{commitment}", get(Self::get_state_path_for_commitment))
263            .route("/statePaths", get(Self::get_state_paths_for_commitments))
264            .route("/stateRoot/latest", get(Self::get_state_root_latest))
265            .route("/stateRoot/{height}", get(Self::get_state_root))
266            .route("/committee/latest", get(Self::get_committee_latest))
267            .route("/committee/{height}", get(Self::get_committee))
268            .route("/delegators/{validator}", get(Self::get_delegators_for_validator));
269
270        // If the node is a validator, enable the BFT connections endpoints.
271        let routes = match self.consensus {
272            Some(_) => routes
273                .route("/connections/bft/count", get(Self::get_bft_connections_count))
274                .route("/connections/bft/all", get(Self::get_bft_connections_all)),
275            None => routes,
276        };
277
278        // If the node is a validator and `telemetry` features is enabled, enable the additional endpoint.
279        #[cfg(feature = "telemetry")]
280        let routes = match self.consensus {
281            Some(_) => routes.route("/validators/participation", get(Self::get_validator_participation_scores)),
282            None => routes,
283        };
284
285        // If the `history` feature is enabled, enable the additional endpoint.
286        #[cfg(feature = "history")]
287        let routes = routes
288            .route("/program/{id}/mapping/{name}/{key}/history/{height}", get(Self::get_history))
289            .route("/program/{id}/mapping/{name}/history/{height}", get(Self::get_history_batch));
290
291        // If the `history-staking-rewards` feature is enabled, enable the additional endpoint.
292        #[cfg(feature = "history-staking-rewards")]
293        let routes = routes.route("/staking/rewards/{address}/{height}", get(Self::get_staking_reward));
294
295        let trace_layer = TraceLayer::new_for_http()
296            .make_span_with(|request: &Request<_>| {
297                let addr = request
298                    .extensions()
299                    .get::<ConnectInfo<SocketAddr>>()
300                    .map(|ConnectInfo(addr)| addr.to_string())
301                    .unwrap_or_else(|| "unknown".to_string());
302
303                // Create a span that includes method, path, and our extracted IP
304                tracing::info_span!(
305                    "REST",
306                    method = %request.method(),
307                    uri = %request.uri().path(),
308                    addr = %addr,
309                )
310            })
311            .on_request(|_request: &Request<_>, _span: &Span| {
312                info!("Received a request");
313            })
314            .on_response(|_response: &Response<_>, latency: Duration, _span: &Span| {
315                info!("Finished request in {:?}", latency);
316            });
317
318        routes
319            // Pass in `Rest` to make things convenient.
320            .with_state(self.clone())
321            // Cap the request body size at 512KiB.
322            .layer(DefaultBodyLimit::max(512 * 1024))
323            .layer(GovernorLayer {
324                config: governor_config.into(),
325            })
326            // Enable CORS.
327            .layer(cors)
328            // Enable tower-http tracing.
329            .layer(trace_layer)
330    }
331
332    async fn spawn_server(&mut self, rest_ip: SocketAddr, rest_rps: u32) -> Result<()> {
333        // Log the REST rate limit per IP.
334        debug!("REST rate limit per IP - {rest_rps} RPS");
335
336        // Add the v1 API as default and under "/v1".
337        let default_router = axum::Router::new().nest(
338            &format!("/{}", N::SHORT_NAME),
339            self.build_routes(rest_rps).layer(middleware::map_response(v1_error_middleware)),
340        );
341        let v1_router = axum::Router::new().nest(
342            &format!("/{API_VERSION_V1}/{}", N::SHORT_NAME),
343            self.build_routes(rest_rps).layer(middleware::map_response(v1_error_middleware)),
344        );
345
346        // Add the v2 API under "/v2".
347        let v2_router =
348            axum::Router::new().nest(&format!("/{API_VERSION_V2}/{}", N::SHORT_NAME), self.build_routes(rest_rps));
349
350        // Combine all routes.
351        let router = default_router.merge(v1_router).merge(v2_router);
352
353        let rest_listener =
354            TcpListener::bind(rest_ip).await.with_context(|| "Failed to bind TCP port for REST endpoints")?;
355
356        let handle = tokio::spawn(async move {
357            axum::serve(rest_listener, router.into_make_service_with_connect_info::<SocketAddr>())
358                .await
359                .expect("couldn't start rest server");
360        });
361
362        self.handles.lock().push(handle);
363        Ok(())
364    }
365}
366
367/// Converts errors to the old style for the v1 API.
368/// The error code will always be 500 and the content a simple string.
369async fn v1_error_middleware(response: Response) -> Response {
370    // The status code used by all v1 errors
371    const V1_STATUS_CODE: StatusCode = StatusCode::INTERNAL_SERVER_ERROR;
372
373    if response.status().is_success() {
374        return response;
375    }
376
377    // Returns a opaque error instead of panicking.
378    let fallback = || {
379        let mut response = Response::new(Body::from("Failed to convert error"));
380        *response.status_mut() = V1_STATUS_CODE;
381        response
382    };
383
384    let Ok(bytes) = axum::body::to_bytes(response.into_body(), usize::MAX).await else {
385        return fallback();
386    };
387
388    // Deserialize REST error so we can convert it to a string
389    let Ok(json_err) = serde_json::from_slice::<SerializedRestError>(&bytes) else {
390        return fallback();
391    };
392
393    let mut message = json_err.message;
394    for next in json_err.chain.into_iter() {
395        message = format!("{message} — {next}");
396    }
397
398    let mut response = Response::new(Body::from(message));
399
400    *response.status_mut() = V1_STATUS_CODE;
401
402    response
403}
404
405/// Formats an ID into a truncated identifier (for logging purposes).
406pub fn fmt_id(id: impl ToString) -> String {
407    let id = id.to_string();
408    let mut formatted_id = id.chars().take(16).collect::<String>();
409    if id.chars().count() > 16 {
410        formatted_id.push_str("..");
411    }
412    formatted_id
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418    use anyhow::anyhow;
419    use axum::{
420        Router,
421        body::Body,
422        http::{Request, StatusCode},
423        middleware,
424        routing::get,
425    };
426    use tower::ServiceExt; // for `oneshot`
427
428    fn test_app() -> Router {
429        let build_routes = || {
430            Router::new()
431                .route("/not_found", get(|| async { Err::<(), RestError>(RestError::not_found(anyhow!("missing"))) }))
432                .route("/bad_request", get(|| async { Err::<(), RestError>(RestError::bad_request(anyhow!("bad"))) }))
433                .route(
434                    "/service_unavailable",
435                    get(|| async { Err::<(), RestError>(RestError::service_unavailable(anyhow!("gone"))) }),
436                )
437        };
438        let router_v1 = build_routes().route_layer(middleware::map_response(v1_error_middleware));
439        let router_v2 = Router::new().nest(&format!("/{API_VERSION_V2}"), build_routes());
440        router_v1.merge(router_v2)
441    }
442
443    #[tokio::test]
444    async fn v1_routes_force_internal_server_error() {
445        let app = test_app();
446
447        let res = app.clone().oneshot(Request::builder().uri("/not_found").body(Body::empty()).unwrap()).await.unwrap();
448        assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
449
450        let res =
451            app.clone().oneshot(Request::builder().uri("/bad_request").body(Body::empty()).unwrap()).await.unwrap();
452        assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
453
454        let res =
455            app.oneshot(Request::builder().uri("/service_unavailable").body(Body::empty()).unwrap()).await.unwrap();
456        assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
457    }
458
459    #[tokio::test]
460    async fn v2_routes_return_specific_errors() {
461        let app = test_app();
462
463        let res =
464            app.clone().oneshot(Request::builder().uri("/v2/not_found").body(Body::empty()).unwrap()).await.unwrap();
465        assert_eq!(res.status(), StatusCode::NOT_FOUND);
466
467        let res =
468            app.clone().oneshot(Request::builder().uri("/v2/bad_request").body(Body::empty()).unwrap()).await.unwrap();
469        assert_eq!(res.status(), StatusCode::BAD_REQUEST);
470
471        let res =
472            app.oneshot(Request::builder().uri("/v2/service_unavailable").body(Body::empty()).unwrap()).await.unwrap();
473        assert_eq!(res.status(), StatusCode::SERVICE_UNAVAILABLE);
474    }
475}