1mod config;
22mod event;
23mod filters;
24mod http_server;
25
26use std::{fmt::Debug, time::Instant};
27
28use datasize::DataSize;
29use futures::{future::BoxFuture, join, FutureExt};
30use tokio::{sync::oneshot, task::JoinHandle};
31use tracing::{debug, error, info, warn};
32
33use casper_json_rpc::CorsOrigin;
34use casper_types::ProtocolVersion;
35
36use super::Component;
37use crate::{
38 components::{
39 rpc_server::rpcs::docs::OPEN_RPC_SCHEMA, ComponentState, InitializedComponent,
40 PortBoundComponent,
41 },
42 effect::{
43 requests::{
44 BlockSynchronizerRequest, ChainspecRawBytesRequest, ConsensusRequest, MetricsRequest,
45 NetworkInfoRequest, ReactorStatusRequest, RestRequest, StorageRequest,
46 UpgradeWatcherRequest,
47 },
48 EffectBuilder, EffectExt, Effects,
49 },
50 reactor::{main_reactor::MainEvent, Finalize},
51 types::{ChainspecInfo, StatusFeed},
52 utils::{self, ListeningError},
53 NodeRng,
54};
55pub use config::Config;
56pub(crate) use event::Event;
57
58const COMPONENT_NAME: &str = "rest_server";
59
60pub(crate) trait ReactorEventT:
62 From<Event>
63 + From<RestRequest>
64 + From<NetworkInfoRequest>
65 + From<StorageRequest>
66 + From<ChainspecRawBytesRequest>
67 + From<UpgradeWatcherRequest>
68 + From<ConsensusRequest>
69 + From<MetricsRequest>
70 + From<ReactorStatusRequest>
71 + From<BlockSynchronizerRequest>
72 + Send
73{
74}
75
76impl<REv> ReactorEventT for REv where
77 REv: From<Event>
78 + From<RestRequest>
79 + From<NetworkInfoRequest>
80 + From<StorageRequest>
81 + From<ChainspecRawBytesRequest>
82 + From<UpgradeWatcherRequest>
83 + From<ConsensusRequest>
84 + From<MetricsRequest>
85 + From<ReactorStatusRequest>
86 + From<BlockSynchronizerRequest>
87 + Send
88 + 'static
89{
90}
91
92#[derive(DataSize, Debug)]
93pub(crate) struct InnerRestServer {
94 #[data_size(skip)]
96 shutdown_sender: oneshot::Sender<()>,
97 #[data_size(skip)]
99 server_join_handle: Option<JoinHandle<()>>,
100 node_startup_instant: Instant,
102 network_name: String,
104}
105
106#[derive(DataSize, Debug)]
107pub(crate) struct RestServer {
108 state: ComponentState,
110 config: Config,
111 api_version: ProtocolVersion,
112 network_name: String,
113 node_startup_instant: Instant,
114 inner_rest: Option<InnerRestServer>,
116}
117
118impl RestServer {
119 pub(crate) fn new(
120 config: Config,
121 api_version: ProtocolVersion,
122 network_name: String,
123 node_startup_instant: Instant,
124 ) -> Self {
125 RestServer {
126 state: ComponentState::Uninitialized,
127 config,
128 api_version,
129 network_name,
130 node_startup_instant,
131 inner_rest: None,
132 }
133 }
134}
135
136impl<REv> Component<REv> for RestServer
137where
138 REv: ReactorEventT,
139{
140 type Event = Event;
141
142 fn handle_event(
143 &mut self,
144 effect_builder: EffectBuilder<REv>,
145 _rng: &mut NodeRng,
146 event: Self::Event,
147 ) -> Effects<Self::Event> {
148 match &self.state {
149 ComponentState::Fatal(msg) => {
150 error!(
151 msg,
152 ?event,
153 name = <Self as Component<MainEvent>>::name(self),
154 "should not handle this event when this component has fatal error"
155 );
156 Effects::new()
157 }
158 ComponentState::Uninitialized => {
159 warn!(
160 ?event,
161 name = <Self as Component<MainEvent>>::name(self),
162 "should not handle this event when component is uninitialized"
163 );
164 Effects::new()
165 }
166 ComponentState::Initializing => match event {
167 Event::Initialize => {
168 let (effects, state) = self.bind(self.config.enable_server, effect_builder);
169 <Self as InitializedComponent<MainEvent>>::set_state(self, state);
170 effects
171 }
172 Event::RestRequest(_) | Event::GetMetricsResult { .. } => {
173 warn!(
174 ?event,
175 name = <Self as Component<MainEvent>>::name(self),
176 "should not handle this event when component is pending initialization"
177 );
178 Effects::new()
179 }
180 },
181 ComponentState::Initialized => match event {
182 Event::Initialize => {
183 error!(
184 ?event,
185 name = <Self as Component<MainEvent>>::name(self),
186 "component already initialized"
187 );
188 Effects::new()
189 }
190 Event::RestRequest(RestRequest::Status { responder }) => {
191 let node_uptime = self.node_startup_instant.elapsed();
192 let network_name = self.network_name.clone();
193 async move {
194 let (
195 last_added_block,
196 peers,
197 next_upgrade,
198 consensus_status,
199 (reactor_state, last_progress),
200 available_block_range,
201 block_sync,
202 ) = join!(
203 effect_builder.get_highest_complete_block_from_storage(),
204 effect_builder.network_peers(),
205 effect_builder.get_next_upgrade(),
206 effect_builder.consensus_status(),
207 effect_builder.get_reactor_status(),
208 effect_builder.get_available_block_range_from_storage(),
209 effect_builder.get_block_synchronizer_status(),
210 );
211 let starting_state_root_hash = effect_builder
212 .get_block_header_at_height_from_storage(
213 available_block_range.low(),
214 true,
215 )
216 .await
217 .map(|header| *header.state_root_hash())
218 .unwrap_or_default();
219 let status_feed = StatusFeed::new(
220 last_added_block,
221 peers,
222 ChainspecInfo::new(network_name, next_upgrade),
223 consensus_status,
224 node_uptime,
225 reactor_state,
226 last_progress,
227 available_block_range,
228 block_sync,
229 starting_state_root_hash,
230 );
231 responder.respond(status_feed).await;
232 }
233 }
234 .ignore(),
235 Event::RestRequest(RestRequest::Metrics { responder }) => effect_builder
236 .get_metrics()
237 .event(move |text| Event::GetMetricsResult {
238 text,
239 main_responder: responder,
240 }),
241 Event::RestRequest(RestRequest::RpcSchema { responder }) => {
242 let schema = OPEN_RPC_SCHEMA.clone();
243 responder.respond(schema).ignore()
244 }
245 Event::GetMetricsResult {
246 text,
247 main_responder,
248 } => main_responder.respond(text).ignore(),
249 },
250 }
251 }
252
253 fn name(&self) -> &str {
254 COMPONENT_NAME
255 }
256}
257
258impl<REv> InitializedComponent<REv> for RestServer
259where
260 REv: ReactorEventT,
261{
262 fn state(&self) -> &ComponentState {
263 &self.state
264 }
265
266 fn set_state(&mut self, new_state: ComponentState) {
267 info!(
268 ?new_state,
269 name = <Self as Component<MainEvent>>::name(self),
270 "component state changed"
271 );
272
273 self.state = new_state;
274 }
275}
276
277impl<REv> PortBoundComponent<REv> for RestServer
278where
279 REv: ReactorEventT,
280{
281 type Error = ListeningError;
282 type ComponentEvent = Event;
283
284 fn listen(
285 &mut self,
286 effect_builder: EffectBuilder<REv>,
287 ) -> Result<Effects<Self::ComponentEvent>, Self::Error> {
288 let cfg = &self.config;
289 let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
290
291 let builder = utils::start_listening(&cfg.address)?;
292
293 let server_join_handle = match cfg.cors_origin.as_str() {
294 "" => Some(tokio::spawn(http_server::run(
295 builder,
296 effect_builder,
297 self.api_version,
298 shutdown_receiver,
299 cfg.qps_limit,
300 ))),
301 "*" => Some(tokio::spawn(http_server::run_with_cors(
302 builder,
303 effect_builder,
304 self.api_version,
305 shutdown_receiver,
306 cfg.qps_limit,
307 CorsOrigin::Any,
308 ))),
309 _ => Some(tokio::spawn(http_server::run_with_cors(
310 builder,
311 effect_builder,
312 self.api_version,
313 shutdown_receiver,
314 cfg.qps_limit,
315 CorsOrigin::Specified(cfg.cors_origin.clone()),
316 ))),
317 };
318
319 let node_startup_instant = self.node_startup_instant;
320 let network_name = self.network_name.clone();
321 self.inner_rest = Some(InnerRestServer {
322 shutdown_sender,
323 server_join_handle,
324 node_startup_instant,
325 network_name,
326 });
327
328 Ok(Effects::new())
329 }
330}
331
332impl Finalize for RestServer {
333 fn finalize(self) -> BoxFuture<'static, ()> {
334 async {
335 if let Some(mut rest_server) = self.inner_rest {
336 let _ = rest_server.shutdown_sender.send(());
337
338 if let Some(join_handle) = rest_server.server_join_handle.take() {
340 match join_handle.await {
341 Ok(_) => debug!("rest server exited cleanly"),
342 Err(error) => error!(%error, "could not join rest server task cleanly"),
343 }
344 } else {
345 warn!("rest server shutdown while already shut down")
346 }
347 } else {
348 info!("rest server was disabled in config, no shutdown performed")
349 }
350 }
351 .boxed()
352 }
353}
354
355#[cfg(test)]
356mod schema_tests {
357 use crate::{
358 rpcs::{
359 docs::OpenRpcSchema,
360 info::{GetChainspecResult, GetValidatorChangesResult},
361 },
362 testing::assert_schema,
363 types::GetStatusResult,
364 };
365 use schemars::schema_for;
366
367 #[test]
368 fn schema_status() {
369 let schema_path = format!(
370 "{}/../resources/test/rest_schema_status.json",
371 env!("CARGO_MANIFEST_DIR")
372 );
373 assert_schema(
374 schema_path,
375 serde_json::to_string_pretty(&schema_for!(GetStatusResult)).unwrap(),
376 );
377 }
378
379 #[test]
380 fn schema_validator_changes() {
381 let schema_path = format!(
382 "{}/../resources/test/rest_schema_validator_changes.json",
383 env!("CARGO_MANIFEST_DIR")
384 );
385 assert_schema(
386 schema_path,
387 serde_json::to_string_pretty(&schema_for!(GetValidatorChangesResult)).unwrap(),
388 );
389 }
390
391 #[test]
392 fn schema_rpc_schema() {
393 let schema_path = format!(
394 "{}/../resources/test/rest_schema_rpc_schema.json",
395 env!("CARGO_MANIFEST_DIR")
396 );
397 assert_schema(
398 schema_path,
399 serde_json::to_string_pretty(&schema_for!(OpenRpcSchema)).unwrap(),
400 );
401 }
402
403 #[test]
404 fn schema_chainspec_bytes() {
405 let schema_path = format!(
406 "{}/../resources/test/rest_schema_chainspec_bytes.json",
407 env!("CARGO_MANIFEST_DIR")
408 );
409 assert_schema(
410 schema_path,
411 serde_json::to_string_pretty(&schema_for!(GetChainspecResult)).unwrap(),
412 );
413 }
414}