essential_node_api/
endpoint.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
//! Provides a small module for each endpoint with associated `PATH` and `handler`.

use axum::{
    extract::{Path, Query, State},
    response::{
        sse::{self, Sse},
        IntoResponse,
    },
    Json,
};
use essential_node::{db, BlockRx};
use essential_types::{convert::word_from_bytes, Block, ContentAddress, Value, Word};
use futures::{Stream, StreamExt};
use serde::Deserialize;
use thiserror::Error;

/// A range in blocks, used for the `list-blocks` and `list-contracts` endpoints.
///
/// The range is non-inclusive of the `end`, i.e. it is equivalent to `start..end`.
#[derive(Deserialize)]
pub struct BlockRange {
    /// Start of the range.
    pub start: Word,
    /// The end of the range (exclusive).
    pub end: Word,
}

/// Type to deserialize a block number query parameter.
#[derive(Deserialize)]
pub struct StartBlock {
    /// The block number to start from.
    pub start_block: Word,
}

/// Any endpoint error that might occur.
#[derive(Debug, Error)]
pub enum Error {
    #[error("failed to decode from hex string: {0}")]
    HexDecode(#[from] hex::FromHexError),
    #[error("DB query failed: {0}")]
    ConnPoolQuery(#[from] db::AcquireThenQueryError),
}

/// An error produced by a subscription endpoint stream.
#[derive(Debug, Error)]
pub enum SubscriptionError {
    /// An axum error occurred.
    #[error("an axum error occurred: {0}")]
    Axum(#[from] axum::Error),
    /// A DB query failure occurred.
    #[error("DB query failed: {0}")]
    Query(#[from] db::QueryError),
}

/// Provides an [`db::AwaitNewBlock`] implementation for the API.
struct AwaitNewBlock(Option<BlockRx>);

impl IntoResponse for Error {
    fn into_response(self) -> axum::response::Response {
        use axum::http::StatusCode;
        match self {
            Error::ConnPoolQuery(e) => {
                (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
            }
            Error::HexDecode(e) => (StatusCode::BAD_REQUEST, e.to_string()).into_response(),
        }
    }
}

impl db::AwaitNewBlock for AwaitNewBlock {
    async fn await_new_block(&mut self) -> Option<()> {
        match self.0 {
            None => None,
            Some(ref mut rx) => rx.changed().await.ok(),
        }
    }
}

/// The return a health check response.
pub mod health_check {
    pub const PATH: &str = "/";
    pub async fn handler() {}
}

/// The `list-blocks` get endpoint.
///
/// Takes a range of L2 blocks as a parameter.
pub mod list_blocks {
    use super::*;
    pub const PATH: &str = "/list-blocks";
    pub async fn handler(
        State(state): State<crate::State>,
        Query(block_range): Query<BlockRange>,
    ) -> Result<Json<Vec<Block>>, Error> {
        let blocks = state
            .conn_pool
            .list_blocks(block_range.start..block_range.end)
            .await?;
        Ok(Json(blocks))
    }
}

/// The `query-state` get endpoint.
///
/// Takes a contract content address and a byte array key as path parameters,
/// both encoded as hex.
pub mod query_state {
    use super::*;
    pub const PATH: &str = "/query-state/:contract-ca/:key";
    pub async fn handler(
        State(state): State<crate::State>,
        Path((contract_ca, key)): Path<(String, String)>,
    ) -> Result<Json<Option<Value>>, Error> {
        let contract_ca: ContentAddress = contract_ca.parse()?;
        let key: Vec<u8> = hex::decode(key)?;
        let key = key_words_from_bytes(&key);
        let value = state.conn_pool.query_state(contract_ca, key).await?;
        Ok(Json(value))
    }
}

/// The `subscribe-blocks` get endpoint.
///
/// Produces an event for every block starting from the given block number.
pub mod subscribe_blocks {
    use super::*;
    pub const PATH: &str = "/subscribe-blocks";
    pub async fn handler(
        State(state): State<crate::State>,
        Query(StartBlock { start_block }): Query<StartBlock>,
    ) -> Sse<impl Stream<Item = Result<sse::Event, SubscriptionError>>> {
        // The block stream.
        let new_block = AwaitNewBlock(state.new_block.clone());
        let blocks = state.conn_pool.subscribe_blocks(start_block, new_block);

        // Map the stream of blocks to SSE events.
        let sse_events = blocks.map(|res| {
            let block = res?;
            let event = sse::Event::default().json_data(block)?;
            Ok(event)
        });

        Sse::new(sse_events).keep_alive(sse::KeepAlive::default())
    }
}

fn key_words_from_bytes(key: &[u8]) -> Vec<Word> {
    key.chunks_exact(core::mem::size_of::<Word>())
        .map(|chunk| word_from_bytes(chunk.try_into().expect("safe due to chunk size")))
        .collect::<Vec<_>>()
}