use essential_node_db::{self as node_db};
use essential_node_types::block_notify::BlockTx;
use futures::StreamExt;
use util::test_conn_pool;
mod util;
#[tokio::test]
async fn subscribe_blocks() {
let blocks = util::test_blocks(1000);
let conn_pool = test_conn_pool();
let new_block_tx = BlockTx::new();
let new_block_rx = new_block_tx.new_listener();
let mut conn = conn_pool.acquire().await.unwrap();
let tx = conn.transaction().unwrap();
node_db::create_tables(&tx).unwrap();
for block in &blocks[..10] {
node_db::insert_block(&tx, block).unwrap();
}
tx.commit().unwrap();
std::mem::drop(conn);
let start_block = 0;
let stream = node_db::subscribe_blocks(start_block, conn_pool.clone(), new_block_rx);
let mut stream = std::pin::pin!(stream);
let fetched_blocks: Vec<_> = stream.by_ref().take(10).map(Result::unwrap).collect().await;
assert_eq!(&blocks[..10], &fetched_blocks);
let blocks_remaining = blocks[10..].to_vec();
let jh = tokio::spawn(async move {
for block in blocks_remaining {
let mut conn = conn_pool.acquire().await.unwrap();
node_db::with_tx(&mut conn, |tx| node_db::insert_block(tx, &block)).unwrap();
new_block_tx.notify();
}
assert_eq!(new_block_tx.receiver_count(), 1);
std::mem::drop(new_block_tx);
});
let fetched_blocks: Vec<_> = stream.map(Result::unwrap).collect().await;
assert_eq!(&blocks[10..], &fetched_blocks);
jh.await.unwrap();
}