casper_node/components/
rest_server.rs

1//! REST server
2//!
3//! The REST server provides clients with a simple RESTful HTTP API. This component is (currently)
4//! intended for basic informational / GET endpoints only; more complex operations should be handled
5//! via the RPC server.
6//!
7//! The actual server is run in backgrounded tasks. HTTP requests are translated into reactor
8//! requests to various components.
9//!
10//! This module currently provides both halves of what is required for an API server:
11//! a component implementation that interfaces with other components via being plugged into a
12//! reactor, and an external facing http server that exposes various uri routes and converts
13//! HTTP requests into the appropriate component events.
14//!
15//! Currently this component supports two endpoints, each of which takes no arguments:
16//! /status : a human readable JSON equivalent of the info-get-status rpc method.
17//!     example: curl -X GET 'http://IP:8888/status'
18//! /metrics : time series data collected from the internals of the node being queried.
19//!     example: curl -X GET 'http://IP:8888/metrics'
20
21mod config;
22mod docs;
23mod event;
24mod filters;
25mod http_server;
26mod info;
27
28use std::{net::SocketAddr, sync::Arc};
29
30use datasize::DataSize;
31use futures::join;
32use once_cell::sync::OnceCell;
33use tokio::{sync::oneshot, task::JoinHandle};
34use tracing::{error, info, warn};
35
36#[cfg(test)]
37use futures::{future::BoxFuture, FutureExt};
38
39#[cfg(test)]
40use tracing::debug;
41
42use casper_types::ProtocolVersion;
43
44use super::{Component, ComponentState, InitializedComponent};
45use crate::{
46    components::PortBoundComponent,
47    effect::{
48        requests::{
49            BlockSynchronizerRequest, ChainspecRawBytesRequest, ConsensusRequest, MetricsRequest,
50            NetworkInfoRequest, ReactorInfoRequest, RestRequest, StorageRequest,
51            UpgradeWatcherRequest,
52        },
53        EffectBuilder, EffectExt, Effects,
54    },
55    reactor::main_reactor::MainEvent,
56    types::{ChainspecInfo, StatusFeed},
57    utils::{self, ListeningError},
58    NodeRng,
59};
60pub use config::Config;
61pub use docs::DocExample;
62pub(crate) use docs::DOCS_EXAMPLE_PROTOCOL_VERSION;
63pub(crate) use event::Event;
64pub(crate) use info::{GetChainspecResult, GetValidatorChangesResult};
65
66const COMPONENT_NAME: &str = "rest_server";
67
68/// A helper trait capturing all of this components Request type dependencies.
69pub(crate) trait ReactorEventT:
70    From<Event>
71    + From<RestRequest>
72    + From<NetworkInfoRequest>
73    + From<StorageRequest>
74    + From<ChainspecRawBytesRequest>
75    + From<UpgradeWatcherRequest>
76    + From<ConsensusRequest>
77    + From<MetricsRequest>
78    + From<ReactorInfoRequest>
79    + From<BlockSynchronizerRequest>
80    + Send
81{
82}
83
84impl<REv> ReactorEventT for REv where
85    REv: From<Event>
86        + From<RestRequest>
87        + From<NetworkInfoRequest>
88        + From<StorageRequest>
89        + From<ChainspecRawBytesRequest>
90        + From<UpgradeWatcherRequest>
91        + From<ConsensusRequest>
92        + From<MetricsRequest>
93        + From<ReactorInfoRequest>
94        + From<BlockSynchronizerRequest>
95        + Send
96        + 'static
97{
98}
99
100#[derive(DataSize, Debug)]
101pub(crate) struct InnerRestServer {
102    /// When the message is sent, it signals the server loop to exit cleanly.
103    #[data_size(skip)]
104    #[allow(dead_code)]
105    shutdown_sender: oneshot::Sender<()>,
106    /// The address the server is listening on.
107    local_addr: Arc<OnceCell<SocketAddr>>,
108    /// The task handle which will only join once the server loop has exited.
109    #[data_size(skip)]
110    #[allow(dead_code)]
111    server_join_handle: Option<JoinHandle<()>>,
112    /// The network name, as specified in the chainspec
113    network_name: String,
114}
115
116#[derive(DataSize, Debug)]
117pub(crate) struct RestServer {
118    /// The component state.
119    state: ComponentState,
120    config: Config,
121    api_version: ProtocolVersion,
122    network_name: String,
123    /// Inner server is present only when enabled in the config.
124    inner_rest: Option<InnerRestServer>,
125}
126
127impl RestServer {
128    pub(crate) fn new(config: Config, api_version: ProtocolVersion, network_name: String) -> Self {
129        RestServer {
130            state: ComponentState::Uninitialized,
131            config,
132            api_version,
133            network_name,
134            inner_rest: None,
135        }
136    }
137}
138
139impl<REv> Component<REv> for RestServer
140where
141    REv: ReactorEventT,
142{
143    type Event = Event;
144
145    fn handle_event(
146        &mut self,
147        effect_builder: EffectBuilder<REv>,
148        _rng: &mut NodeRng,
149        event: Self::Event,
150    ) -> Effects<Self::Event> {
151        match &self.state {
152            ComponentState::Fatal(msg) => {
153                error!(
154                    msg,
155                    ?event,
156                    name = <Self as Component<MainEvent>>::name(self),
157                    "should not handle this event when this component has fatal error"
158                );
159                Effects::new()
160            }
161            ComponentState::Uninitialized => {
162                warn!(
163                    ?event,
164                    name = <Self as Component<MainEvent>>::name(self),
165                    "should not handle this event when component is uninitialized"
166                );
167                Effects::new()
168            }
169            ComponentState::Initializing => match event {
170                Event::Initialize => {
171                    let (effects, state) = self.bind(self.config.enable_server, effect_builder);
172                    <Self as InitializedComponent<MainEvent>>::set_state(self, state);
173                    effects
174                }
175                Event::RestRequest(_) | Event::GetMetricsResult { .. } => {
176                    warn!(
177                        ?event,
178                        name = <Self as Component<MainEvent>>::name(self),
179                        "should not handle this event when component is pending initialization"
180                    );
181                    Effects::new()
182                }
183            },
184            ComponentState::Initialized => match event {
185                Event::Initialize => {
186                    error!(
187                        ?event,
188                        name = <Self as Component<MainEvent>>::name(self),
189                        "component already initialized"
190                    );
191                    Effects::new()
192                }
193                Event::RestRequest(RestRequest::Status { responder }) => {
194                    let network_name = self.network_name.clone();
195                    async move {
196                        let (
197                            last_added_block,
198                            peers,
199                            next_upgrade,
200                            consensus_status,
201                            reactor_state,
202                            last_progress,
203                            node_uptime,
204                            available_block_range,
205                            block_sync,
206                            latest_switch_block_header,
207                        ) = join!(
208                            effect_builder.get_highest_complete_block_from_storage(),
209                            effect_builder.network_peers(),
210                            effect_builder.get_next_upgrade(),
211                            effect_builder.consensus_status(),
212                            effect_builder.get_reactor_state(),
213                            effect_builder.get_last_progress(),
214                            effect_builder.get_uptime(),
215                            effect_builder.get_available_block_range_from_storage(),
216                            effect_builder.get_block_synchronizer_status(),
217                            effect_builder.get_latest_switch_block_header_from_storage()
218                        );
219                        let starting_state_root_hash = effect_builder
220                            .get_block_header_at_height_from_storage(
221                                available_block_range.low(),
222                                true,
223                            )
224                            .await
225                            .map(|header| *header.state_root_hash())
226                            .unwrap_or_default();
227                        let status_feed = StatusFeed::new(
228                            last_added_block,
229                            peers,
230                            ChainspecInfo::new(network_name, next_upgrade),
231                            consensus_status,
232                            node_uptime.into(),
233                            reactor_state,
234                            last_progress.into_inner(),
235                            available_block_range,
236                            block_sync,
237                            starting_state_root_hash,
238                            latest_switch_block_header.map(|header| header.block_hash()),
239                        );
240                        responder.respond(status_feed).await;
241                    }
242                }
243                .ignore(),
244                Event::RestRequest(RestRequest::Metrics { responder }) => effect_builder
245                    .get_metrics()
246                    .event(move |text| Event::GetMetricsResult {
247                        text,
248                        main_responder: responder,
249                    }),
250                Event::GetMetricsResult {
251                    text,
252                    main_responder,
253                } => main_responder.respond(text).ignore(),
254            },
255        }
256    }
257
258    fn name(&self) -> &str {
259        COMPONENT_NAME
260    }
261}
262
263impl<REv> InitializedComponent<REv> for RestServer
264where
265    REv: ReactorEventT,
266{
267    fn state(&self) -> &ComponentState {
268        &self.state
269    }
270
271    fn set_state(&mut self, new_state: ComponentState) {
272        info!(
273            ?new_state,
274            name = <Self as Component<MainEvent>>::name(self),
275            "component state changed"
276        );
277
278        self.state = new_state;
279    }
280}
281
282impl<REv> PortBoundComponent<REv> for RestServer
283where
284    REv: ReactorEventT,
285{
286    type Error = ListeningError;
287    type ComponentEvent = Event;
288
289    fn listen(
290        &mut self,
291        effect_builder: EffectBuilder<REv>,
292    ) -> Result<Effects<Self::ComponentEvent>, Self::Error> {
293        let cfg = &self.config;
294        let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
295
296        let builder = utils::start_listening(&cfg.address)?;
297        let local_addr: Arc<OnceCell<SocketAddr>> = Default::default();
298
299        let server_join_handle = if cfg.cors_origin.is_empty() {
300            Some(tokio::spawn(http_server::run(
301                builder,
302                effect_builder,
303                self.api_version,
304                shutdown_receiver,
305                cfg.qps_limit,
306                local_addr.clone(),
307            )))
308        } else {
309            Some(tokio::spawn(http_server::run_with_cors(
310                builder,
311                effect_builder,
312                self.api_version,
313                shutdown_receiver,
314                cfg.qps_limit,
315                local_addr.clone(),
316                cfg.cors_origin.clone(),
317            )))
318        };
319
320        let network_name = self.network_name.clone();
321        self.inner_rest = Some(InnerRestServer {
322            local_addr,
323            shutdown_sender,
324            server_join_handle,
325            network_name,
326        });
327
328        Ok(Effects::new())
329    }
330}
331
332#[cfg(test)]
333impl crate::reactor::Finalize for RestServer {
334    fn finalize(self) -> BoxFuture<'static, ()> {
335        async {
336            if let Some(mut rest_server) = self.inner_rest {
337                let _ = rest_server.shutdown_sender.send(());
338
339                // Wait for the server to exit cleanly.
340                if let Some(join_handle) = rest_server.server_join_handle.take() {
341                    match join_handle.await {
342                        Ok(_) => debug!("rest server exited cleanly"),
343                        Err(error) => error!(%error, "could not join rest server task cleanly"),
344                    }
345                } else {
346                    warn!("rest server shutdown while already shut down")
347                }
348            } else {
349                info!("rest server was disabled in config, no shutdown performed")
350            }
351        }
352        .boxed()
353    }
354}
355
356#[cfg(test)]
357mod schema_tests {
358    use crate::{testing::assert_schema, types::GetStatusResult};
359    use schemars::schema_for;
360
361    use super::{GetChainspecResult, GetValidatorChangesResult};
362
363    #[test]
364    fn json_schema_status_check() {
365        let schema_path = format!(
366            "{}/../resources/test/rest_schema_status.json",
367            env!("CARGO_MANIFEST_DIR")
368        );
369        let pretty = serde_json::to_string_pretty(&schema_for!(GetStatusResult)).unwrap();
370        assert_schema(schema_path, pretty);
371    }
372
373    #[test]
374    fn json_schema_validator_changes_check() {
375        let schema_path = format!(
376            "{}/../resources/test/rest_schema_validator_changes.json",
377            env!("CARGO_MANIFEST_DIR")
378        );
379        assert_schema(
380            schema_path,
381            serde_json::to_string_pretty(&schema_for!(GetValidatorChangesResult)).unwrap(),
382        );
383    }
384
385    #[test]
386    fn json_schema_chainspec_bytes_check() {
387        let schema_path = format!(
388            "{}/../resources/test/rest_schema_chainspec_bytes.json",
389            env!("CARGO_MANIFEST_DIR")
390        );
391        assert_schema(
392            schema_path,
393            serde_json::to_string_pretty(&schema_for!(GetChainspecResult)).unwrap(),
394        );
395    }
396}