1#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21#[macro_use]
22extern crate amareleo_chain_tracing;
23
24mod helpers;
25pub use helpers::*;
26
27mod routes;
28
29use amareleo_chain_tracing::{TracingHandler, TracingHandlerGuard};
30use amareleo_node_consensus::Consensus;
31use snarkvm::{
32 console::{program::ProgramID, types::Field},
33 prelude::{Ledger, Network, cfg_into_iter, store::ConsensusStorage},
34};
35
36use anyhow::{Result, anyhow, bail};
37use axum::{
38 Json,
39 body::Body,
40 extract::{ConnectInfo, DefaultBodyLimit, Path, Query, State},
41 http::{Method, Request, StatusCode, header::CONTENT_TYPE},
42 middleware,
43 middleware::Next,
44 response::Response,
45 routing::{get, post},
46};
47use axum_extra::response::ErasedJson;
48#[cfg(feature = "locktick")]
49use locktick::parking_lot::Mutex;
50#[cfg(not(feature = "locktick"))]
51use parking_lot::Mutex;
52use std::{net::SocketAddr, sync::Arc};
53use tokio::{
54 net::TcpListener,
55 sync::{oneshot, watch},
56 task::JoinHandle,
57};
58use tower_governor::{GovernorLayer, governor::GovernorConfigBuilder};
59use tower_http::{
60 cors::{Any, CorsLayer},
61 trace::TraceLayer,
62};
63use tracing::subscriber::DefaultGuard;
64
65#[derive(Clone)]
67pub struct Rest<N: Network, C: ConsensusStorage<N>> {
68 consensus: Option<Consensus<N>>,
70 ledger: Ledger<N, C>,
72 tracing: Option<TracingHandler>,
74 shutdown_trigger_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
76 shutdown_complete_rx: Arc<Mutex<Option<watch::Receiver<bool>>>>,
78 rest_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
80}
81
82impl<N: Network, C: 'static + ConsensusStorage<N>> Rest<N, C> {
83 pub async fn start(
85 rest_ip: SocketAddr,
86 rest_rps: u32,
87 consensus: Option<Consensus<N>>,
88 ledger: Ledger<N, C>,
89 tracing: Option<TracingHandler>,
90 ) -> Result<Self> {
91 let mut server = Self {
93 consensus,
94 ledger,
95 tracing,
96 shutdown_trigger_tx: Arc::new(Mutex::new(None)),
97 shutdown_complete_rx: Arc::new(Mutex::new(None)),
98 rest_handle: Arc::new(Mutex::new(None)),
99 };
100 server.spawn_server(rest_ip, rest_rps).await?;
102 Ok(server)
104 }
105
106 pub fn is_finished(&self) -> bool {
107 let lock = self.rest_handle.lock();
108 if let Some(handle) = lock.as_ref() { handle.is_finished() } else { true }
109 }
110
111 pub async fn wait_finish(&self) -> Result<()> {
112 if self.is_finished() {
113 guard_info!(self, "REST server already shutdown.");
114 return Ok(());
115 }
116
117 let rx_option = {
119 let lock = self.shutdown_complete_rx.lock();
120 lock.as_ref().map(|opt| opt.clone())
121 };
122
123 if let Some(mut rx) = rx_option {
124 while !*rx.borrow() {
126 if rx.changed().await.is_err() {
127 bail!("REST shutdown completed signal errored!");
128 }
129 }
130
131 guard_info!(self, "REST shutdown completed signal received.");
132 } else {
133 bail!("REST shutdown completed signal NOT found!");
134 }
135
136 Ok(())
137 }
138
139 pub async fn shut_down(&self) {
140 let shutdown_option = self.shutdown_trigger_tx.lock().take();
142 if let Some(tx) = shutdown_option {
143 let _ = tx.send(()); }
145
146 let _ = self.wait_finish().await;
148 }
149}
150
151impl<N: Network, C: ConsensusStorage<N>> Rest<N, C> {
152 pub const fn ledger(&self) -> &Ledger<N, C> {
154 &self.ledger
155 }
156
157 pub fn get_tracing_guard(&self) -> Option<DefaultGuard> {
159 self.tracing.as_ref().and_then(|trace_handle| trace_handle.get_tracing_guard())
160 }
161}
162
163impl<N: Network, C: ConsensusStorage<N>> Rest<N, C> {
164 async fn spawn_server(&mut self, rest_ip: SocketAddr, rest_rps: u32) -> Result<()> {
165 let cors = CorsLayer::new()
166 .allow_origin(Any)
167 .allow_methods([Method::GET, Method::POST, Method::OPTIONS])
168 .allow_headers([CONTENT_TYPE]);
169
170 guard_debug!(self, "REST rate limit per IP - {rest_rps} RPS");
172
173 let governor_config = match GovernorConfigBuilder::default()
175 .per_nanosecond((1_000_000_000 / rest_rps) as u64)
176 .burst_size(rest_rps)
177 .error_handler(|error| {
178 let error_message = error.to_string();
180 let mut response = Response::new(error_message.clone().into());
181 *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
182 if error_message.contains("Too Many Requests") {
183 *response.status_mut() = StatusCode::TOO_MANY_REQUESTS;
184 }
185 response
186 })
187 .finish()
188 {
189 Some(config) => Box::new(config),
190 None => bail!("Couldn't set up rate limiting for the REST server"),
191 };
192
193 let network = match N::ID {
195 snarkvm::console::network::MainnetV0::ID => "mainnet",
196 snarkvm::console::network::TestnetV0::ID => "testnet",
197 snarkvm::console::network::CanaryV0::ID => "canary",
198 unknown_id => bail!("Unknown network ID ({unknown_id})"),
199 };
200
201 let router = {
202 let routes = axum::Router::new()
203 .route(
205 &format!("/{network}/node/address"),
206 get(Self::get_node_address),
207 )
208 .route(
209 &format!("/{network}/program/{{id}}/mapping/{{name}}"),
210 get(Self::get_mapping_values),
211 )
212 .route_layer(middleware::from_fn(auth_middleware))
213 .route(
215 &format!("/{network}/block/height/latest"),
216 get(Self::get_block_height_latest),
217 )
218 .route(
219 &format!("/{network}/block/hash/latest"),
220 get(Self::get_block_hash_latest),
221 )
222 .route(
223 &format!("/{network}/block/latest"),
224 get(Self::get_block_latest),
225 )
226 .route(
227 &format!("/{network}/block/{{height_or_hash}}"),
228 get(Self::get_block),
229 )
230 .route(&format!("/{network}/block/{{height_or_hash}}/header"), get(Self::get_block_header))
233 .route(&format!("/{network}/block/{{height_or_hash}}/transactions"), get(Self::get_block_transactions))
234 .route(
236 &format!("/{network}/transaction/{{id}}"),
237 get(Self::get_transaction),
238 )
239 .route(
240 &format!("/{network}/transaction/confirmed/{{id}}"),
241 get(Self::get_confirmed_transaction),
242 )
243 .route(&format!("/{network}/transaction/unconfirmed/{{id}}"), get(Self::get_unconfirmed_transaction))
244 .route(
245 &format!("/{network}/transaction/broadcast"),
246 post(Self::transaction_broadcast),
247 )
248 .route(
250 &format!("/{network}/solution/broadcast"),
251 post(Self::solution_broadcast),
252 )
253 .route(
255 &format!("/{network}/find/blockHash/{{tx_id}}"),
256 get(Self::find_block_hash),
257 )
258 .route(
259 &format!("/{network}/find/blockHeight/{{state_root}}"),
260 get(Self::find_block_height_from_state_root),
261 )
262 .route(
263 &format!("/{network}/find/transactionID/deployment/{{program_id}}"),
264 get(Self::find_transaction_id_from_program_id),
265 )
266 .route(
267 &format!("/{network}/find/transactionID/{{transition_id}}"),
268 get(Self::find_transaction_id_from_transition_id),
269 )
270 .route(
271 &format!("/{network}/find/transitionID/{{input_or_output_id}}"),
272 get(Self::find_transition_id),
273 )
274 .route(
276 &format!("/{network}/peers/count"),
277 get(Self::get_peers_count),
278 )
279 .route(&format!("/{network}/peers/all"), get(Self::get_peers_all))
280 .route(
281 &format!("/{network}/peers/all/metrics"),
282 get(Self::get_peers_all_metrics),
283 )
284 .route(&format!("/{network}/program/{{id}}"), get(Self::get_program))
286 .route(
287 &format!("/{network}/program/{{id}}/mappings"),
288 get(Self::get_mapping_names),
289 )
290 .route(
291 &format!("/{network}/program/{{id}}/mapping/{{name}}/{{key}}"),
292 get(Self::get_mapping_value),
293 )
294 .route(&format!("/{network}/blocks"), get(Self::get_blocks))
296 .route(&format!("/{network}/height/{{hash}}"), get(Self::get_height))
297 .route(
298 &format!("/{network}/memoryPool/transmissions"),
299 get(Self::get_memory_pool_transmissions),
300 )
301 .route(
302 &format!("/{network}/memoryPool/solutions"),
303 get(Self::get_memory_pool_solutions),
304 )
305 .route(
306 &format!("/{network}/memoryPool/transactions"),
307 get(Self::get_memory_pool_transactions),
308 )
309 .route(
310 &format!("/{network}/statePath/{{commitment}}"),
311 get(Self::get_state_path_for_commitment),
312 )
313 .route(
314 &format!("/{network}/stateRoot/latest"),
315 get(Self::get_state_root_latest),
316 )
317 .route(
318 &format!("/{network}/stateRoot/{{height}}"),
319 get(Self::get_state_root),
320 )
321 .route(
322 &format!("/{network}/committee/latest"),
323 get(Self::get_committee_latest),
324 )
325 .route(
326 &format!("/{network}/committee/{{height}}"),
327 get(Self::get_committee),
328 )
329 .route(
330 &format!("/{network}/delegators/{{validator}}"),
331 get(Self::get_delegators_for_validator),
332 );
333
334 #[cfg(feature = "history")]
336 let routes =
337 routes.route(&format!("/{network}/block/{{blockHeight}}/history/{{mapping}}"), get(Self::get_history));
338
339 routes
340 .with_state(self.clone())
342 .layer(TraceLayer::new_for_http())
344 .layer(middleware::from_fn_with_state(self.clone(), Self::log_middleware))
346 .layer(cors)
348 .layer(DefaultBodyLimit::max(512 * 1024))
350 .layer(GovernorLayer {
351 config: governor_config.into(),
353 })
354 };
355
356 let (shutdown_trigger_tx, shutdown_trigger_rx) = oneshot::channel::<()>();
358 let (shutdown_complete_tx, shutdown_complete_rx) = watch::channel::<bool>(false);
359 let tracing_: TracingHandler = self.tracing.clone().into();
360
361 let rest_listener =
363 TcpListener::bind(rest_ip).await.map_err(|err| anyhow!("Failed to bind to {}: {}", rest_ip, err))?;
364
365 let serve_handle = tokio::spawn(async move {
366 let result = axum::serve(rest_listener, router.into_make_service_with_connect_info::<SocketAddr>())
367 .with_graceful_shutdown(Self::shutdown_wait(tracing_.clone(), shutdown_trigger_rx))
368 .await;
369
370 if let Err(error) = result {
371 guard_error!(tracing_, "Couldn't start REST server: {}", error);
372 }
373
374 let _ = shutdown_complete_tx.send(true);
375 });
376
377 *self.rest_handle.lock() = Some(serve_handle);
378 *self.shutdown_trigger_tx.lock() = Some(shutdown_trigger_tx);
379 *self.shutdown_complete_rx.lock() = Some(shutdown_complete_rx);
380
381 Ok(())
382 }
383
384 async fn log_middleware(
385 State(rest): State<Self>,
386 ConnectInfo(addr): ConnectInfo<SocketAddr>,
387 request: Request<Body>,
388 next: Next,
389 ) -> Result<Response, StatusCode> {
390 guard_info!(rest, "Received '{} {}' from '{addr}'", request.method(), request.uri());
391 Ok(next.run(request).await)
392 }
393
394 async fn shutdown_wait(tracing: TracingHandler, shutdown_rx: oneshot::Receiver<()>) {
395 if let Err(error) = shutdown_rx.await {
396 guard_error!(tracing, "REST server shutdown signaling error: {}", error);
397 }
398
399 guard_info!(tracing, "REST server shutdown signal recieved...");
400 }
401}
402
403pub fn fmt_id(id: impl ToString) -> String {
405 let id = id.to_string();
406 let mut formatted_id = id.chars().take(16).collect::<String>();
407 if id.chars().count() > 16 {
408 formatted_id.push_str("..");
409 }
410 formatted_id
411}