use super::*;
use crate::{
db::{self, finalize_block, insert_block},
test_utils::{
assert_validation_progress_is_some, test_big_bang, test_blocks_with_contracts,
test_conn_pool_with_big_bang, test_invalid_block_with_contract,
},
};
use essential_node_db as node_db;
use essential_node_types::{block_notify::BlockTx, BigBang, Block};
use essential_types::Word;
use rusqlite::Connection;
use std::time::Duration;
fn insert_block_and_send_notification(conn: &mut Connection, block: &Block, block_tx: &BlockTx) {
node_db::with_tx(conn, |tx| {
let block_ca = insert_block(tx, block)?;
finalize_block(tx, &block_ca)
})
.unwrap();
block_tx.notify();
}
#[tokio::test]
async fn can_validate() {
#[cfg(feature = "tracing")]
let _ = tracing_subscriber::fmt::try_init();
let conn_pool = test_conn_pool_with_big_bang().await;
let mut conn = conn_pool.acquire().await.unwrap();
const NUM_TEST_BLOCKS: Word = 4;
let blocks = test_blocks_with_contracts(1, 1 + NUM_TEST_BLOCKS);
let block_addrs = blocks.iter().map(content_addr).collect::<Vec<_>>();
let block_tx = BlockTx::new();
let block_rx = block_tx.new_listener();
let big_bang = test_big_bang();
let contract_registry = big_bang.contract_registry.contract;
let program_registry = big_bang.program_registry.contract;
let handle = validation_stream(
conn_pool.clone(),
contract_registry,
program_registry,
block_rx,
)
.unwrap();
let bbb_ca = essential_hash::content_addr(&BigBang::default().block());
assert_validation_progress_is_some(&conn, &bbb_ca);
insert_block_and_send_notification(&mut conn, &blocks[0], &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
assert_validation_progress_is_some(&conn, &block_addrs[0]);
insert_block_and_send_notification(&mut conn, &blocks[1], &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
insert_block_and_send_notification(&mut conn, &blocks[2], &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
assert_validation_progress_is_some(&conn, &block_addrs[2]);
insert_block_and_send_notification(&mut conn, &blocks[3], &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
assert_validation_progress_is_some(&conn, &block_addrs[3]);
handle.close().await.unwrap();
}
#[tokio::test]
async fn test_invalid_block_validation() {
let conn_pool = test_conn_pool_with_big_bang().await;
let mut conn = conn_pool.acquire().await.unwrap();
let block = test_invalid_block_with_contract(1, Duration::from_secs(1));
let block_tx = BlockTx::new();
let block_rx = block_tx.new_listener();
let big_bang = test_big_bang();
let contract_registry = big_bang.contract_registry.contract;
let program_registry = big_bang.program_registry.contract;
let handle = validation_stream(
conn_pool.clone(),
contract_registry,
program_registry,
block_rx,
)
.unwrap();
let bbb_ca = essential_hash::content_addr(&BigBang::default().block());
assert_validation_progress_is_some(&conn, &bbb_ca);
insert_block_and_send_notification(&mut conn, &block, &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
assert_validation_progress_is_some(&conn, &bbb_ca);
let fetched_failed_blocks = db::list_failed_blocks(&conn, 0..10).unwrap();
assert_eq!(fetched_failed_blocks.len(), 1);
assert_eq!(fetched_failed_blocks[0].0, block.header.number);
assert_eq!(
fetched_failed_blocks[0].1,
content_addr(&block.solution_sets[0])
);
handle.close().await.unwrap();
}
#[ignore]
#[tokio::test]
async fn can_process_valid_and_invalid_blocks() {
#[cfg(feature = "tracing")]
let _ = tracing_subscriber::fmt::try_init();
let conn_pool = test_conn_pool_with_big_bang().await;
let mut conn = conn_pool.acquire().await.unwrap();
let test_blocks = test_blocks_with_contracts(1, 3);
let invalid_block = test_invalid_block_with_contract(2, Duration::from_secs(2));
let blocks = test_blocks;
let block_addrs = blocks.iter().map(content_addr).collect::<Vec<_>>();
let block_tx = BlockTx::new();
let block_rx = block_tx.new_listener();
let big_bang = test_big_bang();
let contract_registry = big_bang.contract_registry.contract;
let program_registry = big_bang.program_registry.contract;
let handle = validation_stream(
conn_pool.clone(),
contract_registry,
program_registry,
block_rx,
)
.unwrap();
let bbb_ca = essential_hash::content_addr(&BigBang::default().block());
assert_validation_progress_is_some(&conn, &bbb_ca);
insert_block_and_send_notification(&mut conn, &blocks[0], &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
assert_validation_progress_is_some(&conn, &block_addrs[0]);
insert_block_and_send_notification(&mut conn, &invalid_block, &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
assert_validation_progress_is_some(&conn, &block_addrs[0]);
let fetched_failed_blocks = db::list_failed_blocks(&conn, 0..10).unwrap();
assert_eq!(fetched_failed_blocks.len(), 1);
assert_eq!(fetched_failed_blocks[0].0, invalid_block.header.number);
assert_eq!(
fetched_failed_blocks[0].1,
content_addr(&invalid_block.solution_sets[0])
);
insert_block_and_send_notification(&mut conn, &blocks[1], &block_tx);
tokio::time::sleep(Duration::from_millis(100)).await;
assert_validation_progress_is_some(&conn, &block_addrs[1]);
handle.close().await.unwrap();
}