essential_node_api/
endpoint.rs

1//! Provides a small module for each endpoint with associated `PATH` and `handler`.
2
3use axum::{
4    extract::{Path, Query, State},
5    response::{
6        sse::{self, Sse},
7        IntoResponse,
8    },
9    Json,
10};
11use essential_node::db;
12use essential_node_types::{block_notify::BlockRx, Block};
13use essential_types::{convert::word_from_bytes, ContentAddress, Value, Word};
14use futures::{Stream, StreamExt};
15use serde::Deserialize;
16use thiserror::Error;
17
18/// A range in blocks, used for the `list-blocks` and `list-contracts` endpoints.
19///
20/// The range is non-inclusive of the `end`, i.e. it is equivalent to `start..end`.
21#[derive(Deserialize)]
22pub struct BlockRange {
23    /// Start of the range.
24    pub start: Word,
25    /// The end of the range (exclusive).
26    pub end: Word,
27}
28
29/// Type to deserialize a block number query parameter.
30#[derive(Deserialize)]
31pub struct StartBlock {
32    /// The block number to start from.
33    pub start_block: Word,
34}
35
36/// Any endpoint error that might occur.
37#[derive(Debug, Error)]
38pub enum Error {
39    #[error("failed to decode from hex string: {0}")]
40    HexDecode(#[from] hex::FromHexError),
41    #[error("DB query failed: {0}")]
42    ConnPoolQuery(#[from] db::pool::AcquireThenQueryError),
43    #[error(
44        "Invalid query parameter for /query-state: {0}. {}",
45        query_state::HELP_MSG
46    )]
47    InvalidQueryParameters(query_state::QueryStateParams),
48}
49
50/// An error produced by a subscription endpoint stream.
51#[derive(Debug, Error)]
52pub enum SubscriptionError {
53    /// An axum error occurred.
54    #[error("an axum error occurred: {0}")]
55    Axum(#[from] axum::Error),
56    /// A DB query failure occurred.
57    #[error("DB query failed: {0}")]
58    Query(#[from] db::QueryError),
59}
60
61/// Provides an [`db::AwaitNewBlock`] implementation for the API.
62struct AwaitNewBlock(Option<BlockRx>);
63
64impl IntoResponse for Error {
65    fn into_response(self) -> axum::response::Response {
66        use axum::http::StatusCode;
67        match self {
68            Error::ConnPoolQuery(e) => {
69                (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
70            }
71            e @ Error::HexDecode(_) => (StatusCode::BAD_REQUEST, e.to_string()).into_response(),
72            e @ Error::InvalidQueryParameters(_) => {
73                (StatusCode::BAD_REQUEST, e.to_string()).into_response()
74            }
75        }
76    }
77}
78
79impl db::AwaitNewBlock for AwaitNewBlock {
80    async fn await_new_block(&mut self) -> Option<()> {
81        match self.0 {
82            None => None,
83            Some(ref mut rx) => rx.changed().await.ok(),
84        }
85    }
86}
87
88/// The return a health check response.
89pub mod health_check {
90    pub const PATH: &str = "/";
91    pub async fn handler() {}
92}
93
94/// The `list-blocks` get endpoint.
95///
96/// Takes a range of L2 blocks as a parameter.
97pub mod list_blocks {
98    use super::*;
99    pub const PATH: &str = "/list-blocks";
100    pub async fn handler(
101        State(state): State<crate::State>,
102        Query(block_range): Query<BlockRange>,
103    ) -> Result<Json<Vec<Block>>, Error> {
104        let blocks = state
105            .conn_pool
106            .list_blocks(block_range.start..block_range.end)
107            .await?;
108        Ok(Json(blocks))
109    }
110}
111
112/// The `query-state` get endpoint.
113///
114/// Takes a contract content address and a byte array key as path parameters,
115/// both encoded as hex.
116pub mod query_state {
117    use std::fmt::Display;
118
119    use serde::Serialize;
120
121    use super::*;
122
123    pub const HELP_MSG: &str = r#"
124The query parameters must be empty or one of the following combinations:
125    - block_inclusive
126    - block_exclusive
127    - block_inclusive, solution_inclusive
128    - block_inclusive, solution_exclusive
129"#;
130
131    #[derive(Deserialize, Serialize, Default, Debug)]
132    pub struct QueryStateParams {
133        pub block_inclusive: Option<Word>,
134        pub block_exclusive: Option<Word>,
135        pub solution_inclusive: Option<u64>,
136        pub solution_exclusive: Option<u64>,
137    }
138
139    impl Display for QueryStateParams {
140        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141            write!(
142                f,
143                "block_inclusive: {:?}, block_exclusive: {:?}, solution_inclusive: {:?}, solution_exclusive: {:?}",
144                self.block_inclusive, self.block_exclusive, self.solution_inclusive, self.solution_exclusive
145            )
146        }
147    }
148
149    pub const PATH: &str = "/query-state/:contract-ca/:key";
150    pub async fn handler(
151        State(state): State<crate::State>,
152        Path((contract_ca, key)): Path<(String, String)>,
153        Query(params): Query<QueryStateParams>,
154    ) -> Result<Json<Option<Value>>, Error> {
155        let contract_ca: ContentAddress = contract_ca.parse()?;
156        let key: Vec<u8> = hex::decode(key)?;
157        let key = key_words_from_bytes(&key);
158        // TODO: When state is compacted and blocks are discarded, this query should
159        // fall back to querying compacted state.
160
161        // TODO: When blocks aren't immediately finalized, this query will need to
162        // either take a block address or use a fork choice rule to determine the
163        // latest state to return. It's possible this query won't make much sense
164        // at that point.
165
166        let value = match params {
167            QueryStateParams {
168                block_inclusive: Some(block),
169                block_exclusive: None,
170                solution_inclusive: None,
171                solution_exclusive: None,
172            } => {
173                state
174                    .conn_pool
175                    .query_state_finalized_inclusive_block(contract_ca, key, block)
176                    .await?
177            }
178            QueryStateParams {
179                block_inclusive: None,
180                block_exclusive: Some(block),
181                solution_inclusive: None,
182                solution_exclusive: None,
183            } => {
184                state
185                    .conn_pool
186                    .query_state_finalized_exclusive_block(contract_ca, key, block)
187                    .await?
188            }
189            QueryStateParams {
190                block_inclusive: Some(block),
191                block_exclusive: None,
192                solution_inclusive: Some(solution_ix),
193                solution_exclusive: None,
194            } => {
195                state
196                    .conn_pool
197                    .query_state_finalized_inclusive_solution_set(
198                        contract_ca,
199                        key,
200                        block,
201                        solution_ix,
202                    )
203                    .await?
204            }
205            QueryStateParams {
206                block_inclusive: Some(block),
207                block_exclusive: None,
208                solution_inclusive: None,
209                solution_exclusive: Some(solution_ix),
210            } => {
211                state
212                    .conn_pool
213                    .query_state_finalized_exclusive_solution_set(
214                        contract_ca,
215                        key,
216                        block,
217                        solution_ix,
218                    )
219                    .await?
220            }
221            QueryStateParams {
222                block_inclusive: None,
223                block_exclusive: None,
224                solution_inclusive: None,
225                solution_exclusive: None,
226            } => {
227                state
228                    .conn_pool
229                    .query_latest_finalized_block(contract_ca, key)
230                    .await?
231            }
232            _ => return Err(Error::InvalidQueryParameters(params)),
233        };
234        Ok(Json(value))
235    }
236}
237
238/// The `subscribe-blocks` get endpoint.
239///
240/// Produces an event for every block starting from the given block number.
241pub mod subscribe_blocks {
242    use super::*;
243    pub const PATH: &str = "/subscribe-blocks";
244    pub async fn handler(
245        State(state): State<crate::State>,
246        Query(StartBlock { start_block }): Query<StartBlock>,
247    ) -> Sse<impl Stream<Item = Result<sse::Event, SubscriptionError>>> {
248        // The block stream.
249        let new_block = AwaitNewBlock(state.new_block.clone());
250        let blocks = state.conn_pool.subscribe_blocks(start_block, new_block);
251
252        // Map the stream of blocks to SSE events.
253        let sse_events = blocks.map(|res| {
254            let block = res?;
255            let event = sse::Event::default().json_data(block)?;
256            Ok(event)
257        });
258
259        Sse::new(sse_events).keep_alive(sse::KeepAlive::default())
260    }
261}
262
263fn key_words_from_bytes(key: &[u8]) -> Vec<Word> {
264    key.chunks_exact(core::mem::size_of::<Word>())
265        .map(|chunk| word_from_bytes(chunk.try_into().expect("safe due to chunk size")))
266        .collect::<Vec<_>>()
267}