essential_node_api/
endpoint.rsuse axum::{
extract::{Path, Query, State},
response::{
sse::{self, Sse},
IntoResponse,
},
Json,
};
use essential_node::{db, BlockRx};
use essential_types::{convert::word_from_bytes, Block, ContentAddress, Value, Word};
use futures::{Stream, StreamExt};
use serde::Deserialize;
use thiserror::Error;
#[derive(Deserialize)]
pub struct BlockRange {
pub start: Word,
pub end: Word,
}
#[derive(Deserialize)]
pub struct StartBlock {
pub start_block: Word,
}
#[derive(Debug, Error)]
pub enum Error {
#[error("failed to decode from hex string: {0}")]
HexDecode(#[from] hex::FromHexError),
#[error("DB query failed: {0}")]
ConnPoolQuery(#[from] db::AcquireThenQueryError),
}
#[derive(Debug, Error)]
pub enum SubscriptionError {
#[error("an axum error occurred: {0}")]
Axum(#[from] axum::Error),
#[error("DB query failed: {0}")]
Query(#[from] db::QueryError),
}
struct AwaitNewBlock(Option<BlockRx>);
impl IntoResponse for Error {
fn into_response(self) -> axum::response::Response {
use axum::http::StatusCode;
match self {
Error::ConnPoolQuery(e) => {
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
}
Error::HexDecode(e) => (StatusCode::BAD_REQUEST, e.to_string()).into_response(),
}
}
}
impl db::AwaitNewBlock for AwaitNewBlock {
async fn await_new_block(&mut self) -> Option<()> {
match self.0 {
None => None,
Some(ref mut rx) => rx.changed().await.ok(),
}
}
}
pub mod health_check {
pub const PATH: &str = "/";
pub async fn handler() {}
}
pub mod list_blocks {
use super::*;
pub const PATH: &str = "/list-blocks";
pub async fn handler(
State(state): State<crate::State>,
Query(block_range): Query<BlockRange>,
) -> Result<Json<Vec<Block>>, Error> {
let blocks = state
.conn_pool
.list_blocks(block_range.start..block_range.end)
.await?;
Ok(Json(blocks))
}
}
pub mod query_state {
use super::*;
pub const PATH: &str = "/query-state/:contract-ca/:key";
pub async fn handler(
State(state): State<crate::State>,
Path((contract_ca, key)): Path<(String, String)>,
) -> Result<Json<Option<Value>>, Error> {
let contract_ca: ContentAddress = contract_ca.parse()?;
let key: Vec<u8> = hex::decode(key)?;
let key = key_words_from_bytes(&key);
let value = state.conn_pool.query_state(contract_ca, key).await?;
Ok(Json(value))
}
}
pub mod subscribe_blocks {
use super::*;
pub const PATH: &str = "/subscribe-blocks";
pub async fn handler(
State(state): State<crate::State>,
Query(StartBlock { start_block }): Query<StartBlock>,
) -> Sse<impl Stream<Item = Result<sse::Event, SubscriptionError>>> {
let new_block = AwaitNewBlock(state.new_block.clone());
let blocks = state.conn_pool.subscribe_blocks(start_block, new_block);
let sse_events = blocks.map(|res| {
let block = res?;
let event = sse::Event::default().json_data(block)?;
Ok(event)
});
Sse::new(sse_events).keep_alive(sse::KeepAlive::default())
}
}
fn key_words_from_bytes(key: &[u8]) -> Vec<Word> {
key.chunks_exact(core::mem::size_of::<Word>())
.map(|chunk| word_from_bytes(chunk.try_into().expect("safe due to chunk size")))
.collect::<Vec<_>>()
}