1mod config;
22mod docs;
23mod event;
24mod filters;
25mod http_server;
26mod info;
27
28use std::{net::SocketAddr, sync::Arc};
29
30use datasize::DataSize;
31use futures::join;
32use once_cell::sync::OnceCell;
33use tokio::{sync::oneshot, task::JoinHandle};
34use tracing::{error, info, warn};
35
36#[cfg(test)]
37use futures::{future::BoxFuture, FutureExt};
38
39#[cfg(test)]
40use tracing::debug;
41
42use casper_types::ProtocolVersion;
43
44use super::{Component, ComponentState, InitializedComponent};
45use crate::{
46 components::PortBoundComponent,
47 effect::{
48 requests::{
49 BlockSynchronizerRequest, ChainspecRawBytesRequest, ConsensusRequest, MetricsRequest,
50 NetworkInfoRequest, ReactorInfoRequest, RestRequest, StorageRequest,
51 UpgradeWatcherRequest,
52 },
53 EffectBuilder, EffectExt, Effects,
54 },
55 reactor::main_reactor::MainEvent,
56 types::{ChainspecInfo, StatusFeed},
57 utils::{self, ListeningError},
58 NodeRng,
59};
60pub use config::Config;
61pub use docs::DocExample;
62pub(crate) use docs::DOCS_EXAMPLE_PROTOCOL_VERSION;
63pub(crate) use event::Event;
64pub(crate) use info::{GetChainspecResult, GetValidatorChangesResult};
65
66const COMPONENT_NAME: &str = "rest_server";
67
68pub(crate) trait ReactorEventT:
70 From<Event>
71 + From<RestRequest>
72 + From<NetworkInfoRequest>
73 + From<StorageRequest>
74 + From<ChainspecRawBytesRequest>
75 + From<UpgradeWatcherRequest>
76 + From<ConsensusRequest>
77 + From<MetricsRequest>
78 + From<ReactorInfoRequest>
79 + From<BlockSynchronizerRequest>
80 + Send
81{
82}
83
84impl<REv> ReactorEventT for REv where
85 REv: From<Event>
86 + From<RestRequest>
87 + From<NetworkInfoRequest>
88 + From<StorageRequest>
89 + From<ChainspecRawBytesRequest>
90 + From<UpgradeWatcherRequest>
91 + From<ConsensusRequest>
92 + From<MetricsRequest>
93 + From<ReactorInfoRequest>
94 + From<BlockSynchronizerRequest>
95 + Send
96 + 'static
97{
98}
99
100#[derive(DataSize, Debug)]
101pub(crate) struct InnerRestServer {
102 #[data_size(skip)]
104 #[allow(dead_code)]
105 shutdown_sender: oneshot::Sender<()>,
106 local_addr: Arc<OnceCell<SocketAddr>>,
108 #[data_size(skip)]
110 #[allow(dead_code)]
111 server_join_handle: Option<JoinHandle<()>>,
112 network_name: String,
114}
115
116#[derive(DataSize, Debug)]
117pub(crate) struct RestServer {
118 state: ComponentState,
120 config: Config,
121 api_version: ProtocolVersion,
122 network_name: String,
123 inner_rest: Option<InnerRestServer>,
125}
126
127impl RestServer {
128 pub(crate) fn new(config: Config, api_version: ProtocolVersion, network_name: String) -> Self {
129 RestServer {
130 state: ComponentState::Uninitialized,
131 config,
132 api_version,
133 network_name,
134 inner_rest: None,
135 }
136 }
137}
138
139impl<REv> Component<REv> for RestServer
140where
141 REv: ReactorEventT,
142{
143 type Event = Event;
144
145 fn handle_event(
146 &mut self,
147 effect_builder: EffectBuilder<REv>,
148 _rng: &mut NodeRng,
149 event: Self::Event,
150 ) -> Effects<Self::Event> {
151 match &self.state {
152 ComponentState::Fatal(msg) => {
153 error!(
154 msg,
155 ?event,
156 name = <Self as Component<MainEvent>>::name(self),
157 "should not handle this event when this component has fatal error"
158 );
159 Effects::new()
160 }
161 ComponentState::Uninitialized => {
162 warn!(
163 ?event,
164 name = <Self as Component<MainEvent>>::name(self),
165 "should not handle this event when component is uninitialized"
166 );
167 Effects::new()
168 }
169 ComponentState::Initializing => match event {
170 Event::Initialize => {
171 let (effects, state) = self.bind(self.config.enable_server, effect_builder);
172 <Self as InitializedComponent<MainEvent>>::set_state(self, state);
173 effects
174 }
175 Event::RestRequest(_) | Event::GetMetricsResult { .. } => {
176 warn!(
177 ?event,
178 name = <Self as Component<MainEvent>>::name(self),
179 "should not handle this event when component is pending initialization"
180 );
181 Effects::new()
182 }
183 },
184 ComponentState::Initialized => match event {
185 Event::Initialize => {
186 error!(
187 ?event,
188 name = <Self as Component<MainEvent>>::name(self),
189 "component already initialized"
190 );
191 Effects::new()
192 }
193 Event::RestRequest(RestRequest::Status { responder }) => {
194 let network_name = self.network_name.clone();
195 async move {
196 let (
197 last_added_block,
198 peers,
199 next_upgrade,
200 consensus_status,
201 reactor_state,
202 last_progress,
203 node_uptime,
204 available_block_range,
205 block_sync,
206 latest_switch_block_header,
207 ) = join!(
208 effect_builder.get_highest_complete_block_from_storage(),
209 effect_builder.network_peers(),
210 effect_builder.get_next_upgrade(),
211 effect_builder.consensus_status(),
212 effect_builder.get_reactor_state(),
213 effect_builder.get_last_progress(),
214 effect_builder.get_uptime(),
215 effect_builder.get_available_block_range_from_storage(),
216 effect_builder.get_block_synchronizer_status(),
217 effect_builder.get_latest_switch_block_header_from_storage()
218 );
219 let starting_state_root_hash = effect_builder
220 .get_block_header_at_height_from_storage(
221 available_block_range.low(),
222 true,
223 )
224 .await
225 .map(|header| *header.state_root_hash())
226 .unwrap_or_default();
227 let status_feed = StatusFeed::new(
228 last_added_block,
229 peers,
230 ChainspecInfo::new(network_name, next_upgrade),
231 consensus_status,
232 node_uptime.into(),
233 reactor_state,
234 last_progress.into_inner(),
235 available_block_range,
236 block_sync,
237 starting_state_root_hash,
238 latest_switch_block_header.map(|header| header.block_hash()),
239 );
240 responder.respond(status_feed).await;
241 }
242 }
243 .ignore(),
244 Event::RestRequest(RestRequest::Metrics { responder }) => effect_builder
245 .get_metrics()
246 .event(move |text| Event::GetMetricsResult {
247 text,
248 main_responder: responder,
249 }),
250 Event::GetMetricsResult {
251 text,
252 main_responder,
253 } => main_responder.respond(text).ignore(),
254 },
255 }
256 }
257
258 fn name(&self) -> &str {
259 COMPONENT_NAME
260 }
261}
262
263impl<REv> InitializedComponent<REv> for RestServer
264where
265 REv: ReactorEventT,
266{
267 fn state(&self) -> &ComponentState {
268 &self.state
269 }
270
271 fn set_state(&mut self, new_state: ComponentState) {
272 info!(
273 ?new_state,
274 name = <Self as Component<MainEvent>>::name(self),
275 "component state changed"
276 );
277
278 self.state = new_state;
279 }
280}
281
282impl<REv> PortBoundComponent<REv> for RestServer
283where
284 REv: ReactorEventT,
285{
286 type Error = ListeningError;
287 type ComponentEvent = Event;
288
289 fn listen(
290 &mut self,
291 effect_builder: EffectBuilder<REv>,
292 ) -> Result<Effects<Self::ComponentEvent>, Self::Error> {
293 let cfg = &self.config;
294 let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
295
296 let builder = utils::start_listening(&cfg.address)?;
297 let local_addr: Arc<OnceCell<SocketAddr>> = Default::default();
298
299 let server_join_handle = if cfg.cors_origin.is_empty() {
300 Some(tokio::spawn(http_server::run(
301 builder,
302 effect_builder,
303 self.api_version,
304 shutdown_receiver,
305 cfg.qps_limit,
306 local_addr.clone(),
307 )))
308 } else {
309 Some(tokio::spawn(http_server::run_with_cors(
310 builder,
311 effect_builder,
312 self.api_version,
313 shutdown_receiver,
314 cfg.qps_limit,
315 local_addr.clone(),
316 cfg.cors_origin.clone(),
317 )))
318 };
319
320 let network_name = self.network_name.clone();
321 self.inner_rest = Some(InnerRestServer {
322 local_addr,
323 shutdown_sender,
324 server_join_handle,
325 network_name,
326 });
327
328 Ok(Effects::new())
329 }
330}
331
332#[cfg(test)]
333impl crate::reactor::Finalize for RestServer {
334 fn finalize(self) -> BoxFuture<'static, ()> {
335 async {
336 if let Some(mut rest_server) = self.inner_rest {
337 let _ = rest_server.shutdown_sender.send(());
338
339 if let Some(join_handle) = rest_server.server_join_handle.take() {
341 match join_handle.await {
342 Ok(_) => debug!("rest server exited cleanly"),
343 Err(error) => error!(%error, "could not join rest server task cleanly"),
344 }
345 } else {
346 warn!("rest server shutdown while already shut down")
347 }
348 } else {
349 info!("rest server was disabled in config, no shutdown performed")
350 }
351 }
352 .boxed()
353 }
354}
355
356#[cfg(test)]
357mod schema_tests {
358 use crate::{testing::assert_schema, types::GetStatusResult};
359 use schemars::schema_for;
360
361 use super::{GetChainspecResult, GetValidatorChangesResult};
362
363 #[test]
364 fn json_schema_status_check() {
365 let schema_path = format!(
366 "{}/../resources/test/rest_schema_status.json",
367 env!("CARGO_MANIFEST_DIR")
368 );
369 let pretty = serde_json::to_string_pretty(&schema_for!(GetStatusResult)).unwrap();
370 assert_schema(schema_path, pretty);
371 }
372
373 #[test]
374 fn json_schema_validator_changes_check() {
375 let schema_path = format!(
376 "{}/../resources/test/rest_schema_validator_changes.json",
377 env!("CARGO_MANIFEST_DIR")
378 );
379 assert_schema(
380 schema_path,
381 serde_json::to_string_pretty(&schema_for!(GetValidatorChangesResult)).unwrap(),
382 );
383 }
384
385 #[test]
386 fn json_schema_chainspec_bytes_check() {
387 let schema_path = format!(
388 "{}/../resources/test/rest_schema_chainspec_bytes.json",
389 env!("CARGO_MANIFEST_DIR")
390 );
391 assert_schema(
392 schema_path,
393 serde_json::to_string_pretty(&schema_for!(GetChainspecResult)).unwrap(),
394 );
395 }
396}