1use axum::{
4 extract::{Path, Query, State},
5 response::{
6 sse::{self, Sse},
7 IntoResponse,
8 },
9 Json,
10};
11use essential_node::db;
12use essential_node_types::{block_notify::BlockRx, Block};
13use essential_types::{convert::word_from_bytes, ContentAddress, Value, Word};
14use futures::{Stream, StreamExt};
15use serde::Deserialize;
16use thiserror::Error;
17
18#[derive(Deserialize)]
22pub struct BlockRange {
23 pub start: Word,
25 pub end: Word,
27}
28
29#[derive(Deserialize)]
31pub struct StartBlock {
32 pub start_block: Word,
34}
35
36#[derive(Debug, Error)]
38pub enum Error {
39 #[error("failed to decode from hex string: {0}")]
40 HexDecode(#[from] hex::FromHexError),
41 #[error("DB query failed: {0}")]
42 ConnPoolQuery(#[from] db::pool::AcquireThenQueryError),
43 #[error(
44 "Invalid query parameter for /query-state: {0}. {}",
45 query_state::HELP_MSG
46 )]
47 InvalidQueryParameters(query_state::QueryStateParams),
48}
49
50#[derive(Debug, Error)]
52pub enum SubscriptionError {
53 #[error("an axum error occurred: {0}")]
55 Axum(#[from] axum::Error),
56 #[error("DB query failed: {0}")]
58 Query(#[from] db::QueryError),
59}
60
61struct AwaitNewBlock(Option<BlockRx>);
63
64impl IntoResponse for Error {
65 fn into_response(self) -> axum::response::Response {
66 use axum::http::StatusCode;
67 match self {
68 Error::ConnPoolQuery(e) => {
69 (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
70 }
71 e @ Error::HexDecode(_) => (StatusCode::BAD_REQUEST, e.to_string()).into_response(),
72 e @ Error::InvalidQueryParameters(_) => {
73 (StatusCode::BAD_REQUEST, e.to_string()).into_response()
74 }
75 }
76 }
77}
78
79impl db::AwaitNewBlock for AwaitNewBlock {
80 async fn await_new_block(&mut self) -> Option<()> {
81 match self.0 {
82 None => None,
83 Some(ref mut rx) => rx.changed().await.ok(),
84 }
85 }
86}
87
88pub mod health_check {
90 pub const PATH: &str = "/";
91 pub async fn handler() {}
92}
93
94pub mod list_blocks {
98 use super::*;
99 pub const PATH: &str = "/list-blocks";
100 pub async fn handler(
101 State(state): State<crate::State>,
102 Query(block_range): Query<BlockRange>,
103 ) -> Result<Json<Vec<Block>>, Error> {
104 let blocks = state
105 .conn_pool
106 .list_blocks(block_range.start..block_range.end)
107 .await?;
108 Ok(Json(blocks))
109 }
110}
111
112pub mod query_state {
117 use std::fmt::Display;
118
119 use serde::Serialize;
120
121 use super::*;
122
123 pub const HELP_MSG: &str = r#"
124The query parameters must be empty or one of the following combinations:
125 - block_inclusive
126 - block_exclusive
127 - block_inclusive, solution_inclusive
128 - block_inclusive, solution_exclusive
129"#;
130
131 #[derive(Deserialize, Serialize, Default, Debug)]
132 pub struct QueryStateParams {
133 pub block_inclusive: Option<Word>,
134 pub block_exclusive: Option<Word>,
135 pub solution_inclusive: Option<u64>,
136 pub solution_exclusive: Option<u64>,
137 }
138
139 impl Display for QueryStateParams {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 write!(
142 f,
143 "block_inclusive: {:?}, block_exclusive: {:?}, solution_inclusive: {:?}, solution_exclusive: {:?}",
144 self.block_inclusive, self.block_exclusive, self.solution_inclusive, self.solution_exclusive
145 )
146 }
147 }
148
149 pub const PATH: &str = "/query-state/:contract-ca/:key";
150 pub async fn handler(
151 State(state): State<crate::State>,
152 Path((contract_ca, key)): Path<(String, String)>,
153 Query(params): Query<QueryStateParams>,
154 ) -> Result<Json<Option<Value>>, Error> {
155 let contract_ca: ContentAddress = contract_ca.parse()?;
156 let key: Vec<u8> = hex::decode(key)?;
157 let key = key_words_from_bytes(&key);
158 let value = match params {
167 QueryStateParams {
168 block_inclusive: Some(block),
169 block_exclusive: None,
170 solution_inclusive: None,
171 solution_exclusive: None,
172 } => {
173 state
174 .conn_pool
175 .query_state_finalized_inclusive_block(contract_ca, key, block)
176 .await?
177 }
178 QueryStateParams {
179 block_inclusive: None,
180 block_exclusive: Some(block),
181 solution_inclusive: None,
182 solution_exclusive: None,
183 } => {
184 state
185 .conn_pool
186 .query_state_finalized_exclusive_block(contract_ca, key, block)
187 .await?
188 }
189 QueryStateParams {
190 block_inclusive: Some(block),
191 block_exclusive: None,
192 solution_inclusive: Some(solution_ix),
193 solution_exclusive: None,
194 } => {
195 state
196 .conn_pool
197 .query_state_finalized_inclusive_solution_set(
198 contract_ca,
199 key,
200 block,
201 solution_ix,
202 )
203 .await?
204 }
205 QueryStateParams {
206 block_inclusive: Some(block),
207 block_exclusive: None,
208 solution_inclusive: None,
209 solution_exclusive: Some(solution_ix),
210 } => {
211 state
212 .conn_pool
213 .query_state_finalized_exclusive_solution_set(
214 contract_ca,
215 key,
216 block,
217 solution_ix,
218 )
219 .await?
220 }
221 QueryStateParams {
222 block_inclusive: None,
223 block_exclusive: None,
224 solution_inclusive: None,
225 solution_exclusive: None,
226 } => {
227 state
228 .conn_pool
229 .query_latest_finalized_block(contract_ca, key)
230 .await?
231 }
232 _ => return Err(Error::InvalidQueryParameters(params)),
233 };
234 Ok(Json(value))
235 }
236}
237
238pub mod subscribe_blocks {
242 use super::*;
243 pub const PATH: &str = "/subscribe-blocks";
244 pub async fn handler(
245 State(state): State<crate::State>,
246 Query(StartBlock { start_block }): Query<StartBlock>,
247 ) -> Sse<impl Stream<Item = Result<sse::Event, SubscriptionError>>> {
248 let new_block = AwaitNewBlock(state.new_block.clone());
250 let blocks = state.conn_pool.subscribe_blocks(start_block, new_block);
251
252 let sse_events = blocks.map(|res| {
254 let block = res?;
255 let event = sse::Event::default().json_data(block)?;
256 Ok(event)
257 });
258
259 Sse::new(sse_events).keep_alive(sse::KeepAlive::default())
260 }
261}
262
263fn key_words_from_bytes(key: &[u8]) -> Vec<Word> {
264 key.chunks_exact(core::mem::size_of::<Word>())
265 .map(|chunk| word_from_bytes(chunk.try_into().expect("safe due to chunk size")))
266 .collect::<Vec<_>>()
267}