casper_node/components/
rest_server.rs

1//! REST server
2//!
3//! The REST server provides clients with a simple RESTful HTTP API. This component is (currently)
4//! intended for basic informational / GET endpoints only; more complex operations should be handled
5//! via the RPC server.
6//!
7//! The actual server is run in backgrounded tasks. HTTP requests are translated into reactor
8//! requests to various components.
9//!
10//! This module currently provides both halves of what is required for an API server:
11//! a component implementation that interfaces with other components via being plugged into a
12//! reactor, and an external facing http server that exposes various uri routes and converts
13//! HTTP requests into the appropriate component events.
14//!
15//! Currently this component supports two endpoints, each of which takes no arguments:
16//! /status : a human readable JSON equivalent of the info-get-status rpc method.
17//!     example: curl -X GET 'http://IP:8888/status'
18//! /metrics : time series data collected from the internals of the node being queried.
19//!     example: curl -X GET 'http://IP:8888/metrics'
20
21mod config;
22mod event;
23mod filters;
24mod http_server;
25
26use std::{fmt::Debug, time::Instant};
27
28use datasize::DataSize;
29use futures::{future::BoxFuture, join, FutureExt};
30use tokio::{sync::oneshot, task::JoinHandle};
31use tracing::{debug, error, info, warn};
32
33use casper_json_rpc::CorsOrigin;
34use casper_types::ProtocolVersion;
35
36use super::Component;
37use crate::{
38    components::{
39        rpc_server::rpcs::docs::OPEN_RPC_SCHEMA, ComponentState, InitializedComponent,
40        PortBoundComponent,
41    },
42    effect::{
43        requests::{
44            BlockSynchronizerRequest, ChainspecRawBytesRequest, ConsensusRequest, MetricsRequest,
45            NetworkInfoRequest, ReactorStatusRequest, RestRequest, StorageRequest,
46            UpgradeWatcherRequest,
47        },
48        EffectBuilder, EffectExt, Effects,
49    },
50    reactor::{main_reactor::MainEvent, Finalize},
51    types::{ChainspecInfo, StatusFeed},
52    utils::{self, ListeningError},
53    NodeRng,
54};
55pub use config::Config;
56pub(crate) use event::Event;
57
58const COMPONENT_NAME: &str = "rest_server";
59
60/// A helper trait capturing all of this components Request type dependencies.
61pub(crate) trait ReactorEventT:
62    From<Event>
63    + From<RestRequest>
64    + From<NetworkInfoRequest>
65    + From<StorageRequest>
66    + From<ChainspecRawBytesRequest>
67    + From<UpgradeWatcherRequest>
68    + From<ConsensusRequest>
69    + From<MetricsRequest>
70    + From<ReactorStatusRequest>
71    + From<BlockSynchronizerRequest>
72    + Send
73{
74}
75
76impl<REv> ReactorEventT for REv where
77    REv: From<Event>
78        + From<RestRequest>
79        + From<NetworkInfoRequest>
80        + From<StorageRequest>
81        + From<ChainspecRawBytesRequest>
82        + From<UpgradeWatcherRequest>
83        + From<ConsensusRequest>
84        + From<MetricsRequest>
85        + From<ReactorStatusRequest>
86        + From<BlockSynchronizerRequest>
87        + Send
88        + 'static
89{
90}
91
92#[derive(DataSize, Debug)]
93pub(crate) struct InnerRestServer {
94    /// When the message is sent, it signals the server loop to exit cleanly.
95    #[data_size(skip)]
96    shutdown_sender: oneshot::Sender<()>,
97    /// The task handle which will only join once the server loop has exited.
98    #[data_size(skip)]
99    server_join_handle: Option<JoinHandle<()>>,
100    /// The instant at which the node has started.
101    node_startup_instant: Instant,
102    /// The network name, as specified in the chainspec
103    network_name: String,
104}
105
106#[derive(DataSize, Debug)]
107pub(crate) struct RestServer {
108    /// The component state.
109    state: ComponentState,
110    config: Config,
111    api_version: ProtocolVersion,
112    network_name: String,
113    node_startup_instant: Instant,
114    /// Inner server is present only when enabled in the config.
115    inner_rest: Option<InnerRestServer>,
116}
117
118impl RestServer {
119    pub(crate) fn new(
120        config: Config,
121        api_version: ProtocolVersion,
122        network_name: String,
123        node_startup_instant: Instant,
124    ) -> Self {
125        RestServer {
126            state: ComponentState::Uninitialized,
127            config,
128            api_version,
129            network_name,
130            node_startup_instant,
131            inner_rest: None,
132        }
133    }
134}
135
136impl<REv> Component<REv> for RestServer
137where
138    REv: ReactorEventT,
139{
140    type Event = Event;
141
142    fn handle_event(
143        &mut self,
144        effect_builder: EffectBuilder<REv>,
145        _rng: &mut NodeRng,
146        event: Self::Event,
147    ) -> Effects<Self::Event> {
148        match &self.state {
149            ComponentState::Fatal(msg) => {
150                error!(
151                    msg,
152                    ?event,
153                    name = <Self as Component<MainEvent>>::name(self),
154                    "should not handle this event when this component has fatal error"
155                );
156                Effects::new()
157            }
158            ComponentState::Uninitialized => {
159                warn!(
160                    ?event,
161                    name = <Self as Component<MainEvent>>::name(self),
162                    "should not handle this event when component is uninitialized"
163                );
164                Effects::new()
165            }
166            ComponentState::Initializing => match event {
167                Event::Initialize => {
168                    let (effects, state) = self.bind(self.config.enable_server, effect_builder);
169                    <Self as InitializedComponent<MainEvent>>::set_state(self, state);
170                    effects
171                }
172                Event::RestRequest(_) | Event::GetMetricsResult { .. } => {
173                    warn!(
174                        ?event,
175                        name = <Self as Component<MainEvent>>::name(self),
176                        "should not handle this event when component is pending initialization"
177                    );
178                    Effects::new()
179                }
180            },
181            ComponentState::Initialized => match event {
182                Event::Initialize => {
183                    error!(
184                        ?event,
185                        name = <Self as Component<MainEvent>>::name(self),
186                        "component already initialized"
187                    );
188                    Effects::new()
189                }
190                Event::RestRequest(RestRequest::Status { responder }) => {
191                    let node_uptime = self.node_startup_instant.elapsed();
192                    let network_name = self.network_name.clone();
193                    async move {
194                        let (
195                            last_added_block,
196                            peers,
197                            next_upgrade,
198                            consensus_status,
199                            (reactor_state, last_progress),
200                            available_block_range,
201                            block_sync,
202                        ) = join!(
203                            effect_builder.get_highest_complete_block_from_storage(),
204                            effect_builder.network_peers(),
205                            effect_builder.get_next_upgrade(),
206                            effect_builder.consensus_status(),
207                            effect_builder.get_reactor_status(),
208                            effect_builder.get_available_block_range_from_storage(),
209                            effect_builder.get_block_synchronizer_status(),
210                        );
211                        let starting_state_root_hash = effect_builder
212                            .get_block_header_at_height_from_storage(
213                                available_block_range.low(),
214                                true,
215                            )
216                            .await
217                            .map(|header| *header.state_root_hash())
218                            .unwrap_or_default();
219                        let status_feed = StatusFeed::new(
220                            last_added_block,
221                            peers,
222                            ChainspecInfo::new(network_name, next_upgrade),
223                            consensus_status,
224                            node_uptime,
225                            reactor_state,
226                            last_progress,
227                            available_block_range,
228                            block_sync,
229                            starting_state_root_hash,
230                        );
231                        responder.respond(status_feed).await;
232                    }
233                }
234                .ignore(),
235                Event::RestRequest(RestRequest::Metrics { responder }) => effect_builder
236                    .get_metrics()
237                    .event(move |text| Event::GetMetricsResult {
238                        text,
239                        main_responder: responder,
240                    }),
241                Event::RestRequest(RestRequest::RpcSchema { responder }) => {
242                    let schema = OPEN_RPC_SCHEMA.clone();
243                    responder.respond(schema).ignore()
244                }
245                Event::GetMetricsResult {
246                    text,
247                    main_responder,
248                } => main_responder.respond(text).ignore(),
249            },
250        }
251    }
252
253    fn name(&self) -> &str {
254        COMPONENT_NAME
255    }
256}
257
258impl<REv> InitializedComponent<REv> for RestServer
259where
260    REv: ReactorEventT,
261{
262    fn state(&self) -> &ComponentState {
263        &self.state
264    }
265
266    fn set_state(&mut self, new_state: ComponentState) {
267        info!(
268            ?new_state,
269            name = <Self as Component<MainEvent>>::name(self),
270            "component state changed"
271        );
272
273        self.state = new_state;
274    }
275}
276
277impl<REv> PortBoundComponent<REv> for RestServer
278where
279    REv: ReactorEventT,
280{
281    type Error = ListeningError;
282    type ComponentEvent = Event;
283
284    fn listen(
285        &mut self,
286        effect_builder: EffectBuilder<REv>,
287    ) -> Result<Effects<Self::ComponentEvent>, Self::Error> {
288        let cfg = &self.config;
289        let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
290
291        let builder = utils::start_listening(&cfg.address)?;
292
293        let server_join_handle = match cfg.cors_origin.as_str() {
294            "" => Some(tokio::spawn(http_server::run(
295                builder,
296                effect_builder,
297                self.api_version,
298                shutdown_receiver,
299                cfg.qps_limit,
300            ))),
301            "*" => Some(tokio::spawn(http_server::run_with_cors(
302                builder,
303                effect_builder,
304                self.api_version,
305                shutdown_receiver,
306                cfg.qps_limit,
307                CorsOrigin::Any,
308            ))),
309            _ => Some(tokio::spawn(http_server::run_with_cors(
310                builder,
311                effect_builder,
312                self.api_version,
313                shutdown_receiver,
314                cfg.qps_limit,
315                CorsOrigin::Specified(cfg.cors_origin.clone()),
316            ))),
317        };
318
319        let node_startup_instant = self.node_startup_instant;
320        let network_name = self.network_name.clone();
321        self.inner_rest = Some(InnerRestServer {
322            shutdown_sender,
323            server_join_handle,
324            node_startup_instant,
325            network_name,
326        });
327
328        Ok(Effects::new())
329    }
330}
331
332impl Finalize for RestServer {
333    fn finalize(self) -> BoxFuture<'static, ()> {
334        async {
335            if let Some(mut rest_server) = self.inner_rest {
336                let _ = rest_server.shutdown_sender.send(());
337
338                // Wait for the server to exit cleanly.
339                if let Some(join_handle) = rest_server.server_join_handle.take() {
340                    match join_handle.await {
341                        Ok(_) => debug!("rest server exited cleanly"),
342                        Err(error) => error!(%error, "could not join rest server task cleanly"),
343                    }
344                } else {
345                    warn!("rest server shutdown while already shut down")
346                }
347            } else {
348                info!("rest server was disabled in config, no shutdown performed")
349            }
350        }
351        .boxed()
352    }
353}
354
355#[cfg(test)]
356mod schema_tests {
357    use crate::{
358        rpcs::{
359            docs::OpenRpcSchema,
360            info::{GetChainspecResult, GetValidatorChangesResult},
361        },
362        testing::assert_schema,
363        types::GetStatusResult,
364    };
365    use schemars::schema_for;
366
367    #[test]
368    fn schema_status() {
369        let schema_path = format!(
370            "{}/../resources/test/rest_schema_status.json",
371            env!("CARGO_MANIFEST_DIR")
372        );
373        assert_schema(
374            schema_path,
375            serde_json::to_string_pretty(&schema_for!(GetStatusResult)).unwrap(),
376        );
377    }
378
379    #[test]
380    fn schema_validator_changes() {
381        let schema_path = format!(
382            "{}/../resources/test/rest_schema_validator_changes.json",
383            env!("CARGO_MANIFEST_DIR")
384        );
385        assert_schema(
386            schema_path,
387            serde_json::to_string_pretty(&schema_for!(GetValidatorChangesResult)).unwrap(),
388        );
389    }
390
391    #[test]
392    fn schema_rpc_schema() {
393        let schema_path = format!(
394            "{}/../resources/test/rest_schema_rpc_schema.json",
395            env!("CARGO_MANIFEST_DIR")
396        );
397        assert_schema(
398            schema_path,
399            serde_json::to_string_pretty(&schema_for!(OpenRpcSchema)).unwrap(),
400        );
401    }
402
403    #[test]
404    fn schema_chainspec_bytes() {
405        let schema_path = format!(
406            "{}/../resources/test/rest_schema_chainspec_bytes.json",
407            env!("CARGO_MANIFEST_DIR")
408        );
409        assert_schema(
410            schema_path,
411            serde_json::to_string_pretty(&schema_for!(GetChainspecResult)).unwrap(),
412        );
413    }
414}