use essential_node::{self as node};
use essential_node_api as node_api;
use essential_node_types::{block_notify::BlockTx, Block};
use essential_types::{convert::bytes_from_word, Value};
use futures::{StreamExt, TryStreamExt};
use tokio_util::{
bytes::{self, Buf},
codec::FramedRead,
io::StreamReader,
};
use util::{
client, get_url, init_tracing_subscriber, reqwest_get, state_db_only, test_conn_pool,
with_test_server,
};
mod util;
#[tokio::test]
async fn test_health_check() {
#[cfg(feature = "tracing")]
init_tracing_subscriber();
let db = test_conn_pool();
with_test_server(state_db_only(db), |port| async move {
let response = reqwest_get(port, node_api::endpoint::health_check::PATH).await;
assert!(response.status().is_success());
})
.await;
}
#[tokio::test]
async fn test_query_state() {
#[cfg(feature = "tracing")]
init_tracing_subscriber();
let db = test_conn_pool();
let n_blocks = 100;
let (blocks, _, _) = node::test_utils::test_blocks(n_blocks);
let mut mutations = vec![];
for block in &blocks {
let iter = block
.solution_sets
.iter()
.flat_map(|ss| ss.solutions.iter())
.flat_map(|s| {
s.state_mutations
.iter()
.cloned()
.map(|m| (s.predicate_to_solve.contract.clone(), m))
});
mutations.extend(iter);
let block_ca = db
.insert_block(std::sync::Arc::new(block.clone()))
.await
.unwrap();
db.finalize_block(block_ca).await.unwrap();
}
with_test_server(state_db_only(db), |port| async move {
for (contract, m) in &mutations {
let key_bytes: Vec<_> = m.key.iter().copied().flat_map(bytes_from_word).collect();
let key = hex::encode(&key_bytes);
let response = client()
.get(get_url(
port,
&format!("/query-state/{contract}/{key}?block_inclusive={}", n_blocks),
))
.send()
.await
.unwrap();
assert!(
response.status().is_success(),
"{:?}",
response.text().await
);
let r = response.json::<Option<Value>>().await.unwrap();
assert_eq!(Some(&m.value), r.as_ref());
let response = reqwest_get(port, &format!("/query-state/{contract}/{key}")).await;
assert!(response.status().is_success());
let response_value = response.json::<Option<Value>>().await.unwrap();
assert_eq!(Some(&m.value), response_value.as_ref());
}
})
.await;
}
#[tokio::test]
async fn test_list_blocks() {
#[cfg(feature = "tracing")]
init_tracing_subscriber();
let db = test_conn_pool();
let n_blocks = 100;
let (blocks, _, _) = node::test_utils::test_blocks(n_blocks);
for block in &blocks {
db.insert_block(std::sync::Arc::new(block.clone()))
.await
.unwrap();
}
let fetched_blocks = with_test_server(state_db_only(db), |port| async move {
let response = client()
.get(get_url(
port,
&format!("/list-blocks?start={}&end={}", 0, n_blocks),
))
.send()
.await
.unwrap();
assert!(response.status().is_success());
response.json::<Vec<Block>>().await.unwrap()
})
.await;
assert_eq!(blocks, fetched_blocks);
}
#[tokio::test]
async fn test_subscribe_blocks() {
#[cfg(feature = "tracing")]
init_tracing_subscriber();
let db = test_conn_pool();
let (blocks, _, _) = node::test_utils::test_blocks(1000);
let block_tx = BlockTx::new();
let block_rx = block_tx.new_listener();
for block in &blocks[..10] {
let block = std::sync::Arc::new(block.clone());
db.insert_block(block).await.unwrap();
}
let blocks2 = blocks.clone();
let state = node_api::State {
conn_pool: db.clone(),
new_block: Some(block_rx),
};
let server = with_test_server(state, |port| async move {
let response = reqwest_get(port, "/subscribe-blocks?start_block=0").await;
let bytes_stream = StreamReader::new(
response
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e))),
);
let mut frame_stream = FramedRead::new(bytes_stream, SseDecoder::<Block>::new());
let fetched_blocks: Vec<_> = frame_stream
.by_ref()
.take(10)
.map(Result::unwrap)
.collect()
.await;
assert_eq!(&blocks2[..10], &fetched_blocks);
let fetched_blocks: Vec<_> = frame_stream.map(Result::unwrap).collect().await;
assert_eq!(&blocks2[10..], &fetched_blocks);
});
let blocks_remaining = blocks[10..].to_vec();
let write_remaining_blocks = tokio::spawn(async move {
for block in blocks_remaining {
db.insert_block(block.into()).await.unwrap();
block_tx.notify();
}
std::mem::drop(block_tx);
});
let ((), res) = tokio::join!(server, write_remaining_blocks);
res.unwrap();
}
struct SseDecoder<T>(core::marker::PhantomData<T>);
impl<T> SseDecoder<T> {
fn new() -> Self {
Self(core::marker::PhantomData)
}
}
#[derive(Debug, thiserror::Error)]
#[error("SSE decode error")]
pub enum SseDecodeError {
#[error("an I/O error occurred: {0}")]
Io(#[from] std::io::Error),
}
impl<T> tokio_util::codec::Decoder for SseDecoder<T>
where
T: serde::de::DeserializeOwned,
{
type Item = T;
type Error = SseDecodeError;
fn decode(&mut self, buf: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let end = buf
.iter()
.zip(buf.iter().skip(1))
.position(|(&a, &b)| a == b'\n' && b == b'\n');
match end {
Some(end) => {
let Ok(s) = std::str::from_utf8(&buf[..end]) else {
buf.advance(end + 2);
return Ok(None);
};
let s = s.trim_start_matches("data: ").trim();
let data = serde_json::from_str::<T>(s);
let r = match data {
Ok(data) => Ok(Some(data)),
Err(_) => {
if s == ":" {
Ok(None)
} else {
panic!("stream error: {s}");
}
}
};
buf.advance(end + 2);
r
}
None => Ok(None),
}
}
}