zaino-state 0.3.0

A mempool and chain-fetching service built on top of zebra's ReadStateService and TrustedChainSync.
Documentation
//! Holds tests for the V0 database.

use std::path::PathBuf;
use tempfile::TempDir;

use zaino_common::network::ActivationHeights;
use zaino_common::{DatabaseConfig, Network, StorageConfig};
use zaino_proto::proto::utils::{compact_block_with_pool_types, PoolTypeFilter};

use crate::chain_index::finalised_state::reader::DbReader;
use crate::chain_index::finalised_state::ZainoDB;
use crate::chain_index::source::mockchain_source::MockchainSource;
use crate::chain_index::tests::init_tracing;
use crate::chain_index::tests::vectors::{
    build_mockchain_source, indexed_block_chain, load_test_vectors, TestVectorData,
};
use crate::error::FinalisedStateError;
use crate::{BlockCacheConfig, Height};

pub(crate) async fn spawn_v0_zaino_db(
    source: MockchainSource,
) -> Result<(TempDir, ZainoDB), FinalisedStateError> {
    let temp_dir: TempDir = tempfile::tempdir().unwrap();
    let db_path: PathBuf = temp_dir.path().to_path_buf();

    let config = BlockCacheConfig {
        storage: StorageConfig {
            database: DatabaseConfig {
                path: db_path,
                ..Default::default()
            },
            ..Default::default()
        },
        db_version: 0,
        network: Network::Regtest(ActivationHeights::default()),
    };

    let zaino_db = ZainoDB::spawn(config, source).await.unwrap();

    Ok((temp_dir, zaino_db))
}

pub(crate) async fn load_vectors_and_spawn_and_sync_v0_zaino_db(
) -> (TestVectorData, TempDir, ZainoDB) {
    let test_data = load_test_vectors().unwrap();

    let source = build_mockchain_source(test_data.blocks.clone());

    let (db_dir, zaino_db) = spawn_v0_zaino_db(source).await.unwrap();

    crate::chain_index::tests::vectors::sync_db_with_blockdata(
        zaino_db.router(),
        test_data.blocks.clone(),
        None,
    )
    .await;

    (test_data, db_dir, zaino_db)
}

pub(crate) async fn load_vectors_v0db_and_reader(
) -> (TestVectorData, TempDir, std::sync::Arc<ZainoDB>, DbReader) {
    let (test_data, db_dir, zaino_db) = load_vectors_and_spawn_and_sync_v0_zaino_db().await;

    let zaino_db = std::sync::Arc::new(zaino_db);

    zaino_db.wait_until_ready().await;
    dbg!(zaino_db.status());
    dbg!(zaino_db.db_height().await.unwrap()).unwrap();

    let db_reader = zaino_db.to_reader();
    dbg!(db_reader.db_height().await.unwrap()).unwrap();

    (test_data, db_dir, zaino_db, db_reader)
}

// *** ZainoDB Tests ***

#[tokio::test(flavor = "multi_thread")]
async fn shutdown_returns_promptly() {
    super::assert_shutdown_returns_promptly("DbV0", spawn_v0_zaino_db).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn sync_to_height() {
    init_tracing();

    let blocks = load_test_vectors().unwrap().blocks;

    let source = build_mockchain_source(blocks.clone());

    let (_db_dir, zaino_db) = spawn_v0_zaino_db(source.clone()).await.unwrap();

    zaino_db.sync_to_height(Height(200), &source).await.unwrap();

    zaino_db.wait_until_ready().await;
    dbg!(zaino_db.status());
    let built_db_height = dbg!(zaino_db.db_height().await.unwrap()).unwrap();

    assert_eq!(built_db_height, Height(200));
}

#[tokio::test(flavor = "multi_thread")]
async fn add_blocks_to_db_and_verify() {
    init_tracing();

    let (_test_vector, _db_dir, zaino_db) = load_vectors_and_spawn_and_sync_v0_zaino_db().await;
    zaino_db.wait_until_ready().await;
    dbg!(zaino_db.status());
    dbg!(zaino_db.db_height().await.unwrap());
}

#[tokio::test(flavor = "multi_thread")]
async fn delete_blocks_from_db() {
    init_tracing();

    let (_test_vector, _db_dir, zaino_db) = load_vectors_and_spawn_and_sync_v0_zaino_db().await;

    for h in (1..=200).rev() {
        // dbg!("Deleting block at height {}", h);
        zaino_db
            .delete_block_at_height(crate::Height(h))
            .await
            .unwrap();
    }

    zaino_db.wait_until_ready().await;
    dbg!(zaino_db.status());
    dbg!(zaino_db.db_height().await.unwrap());
}

#[tokio::test(flavor = "multi_thread")]
async fn save_db_to_file_and_reload() {
    init_tracing();

    let blocks = load_test_vectors().unwrap().blocks;

    let temp_dir: TempDir = tempfile::tempdir().unwrap();
    let db_path: PathBuf = temp_dir.path().to_path_buf();
    let config = BlockCacheConfig {
        storage: StorageConfig {
            database: DatabaseConfig {
                path: db_path,
                ..Default::default()
            },
            ..Default::default()
        },
        db_version: 0,
        network: Network::Regtest(ActivationHeights::default()),
    };

    let source = build_mockchain_source(blocks.clone());
    let source_clone = source.clone();

    let blocks_clone = blocks.clone();
    let config_clone = config.clone();
    std::thread::spawn(move || {
        let rt = tokio::runtime::Runtime::new().unwrap();
        rt.block_on(async move {
            let zaino_db = ZainoDB::spawn(config_clone, source).await.unwrap();

            crate::chain_index::tests::vectors::sync_db_with_blockdata(
                zaino_db.router(),
                blocks_clone,
                None,
            )
            .await;

            zaino_db.wait_until_ready().await;
            dbg!(zaino_db.status());
            dbg!(zaino_db.db_height().await.unwrap());

            dbg!(zaino_db.shutdown().await.unwrap());
        });
    })
    .join()
    .unwrap();

    std::thread::sleep(std::time::Duration::from_millis(1000));

    std::thread::spawn(move || {
        let rt = tokio::runtime::Runtime::new().unwrap();
        rt.block_on(async move {
            dbg!(config
                .storage
                .database
                .path
                .read_dir()
                .unwrap()
                .collect::<Vec<_>>());
            let zaino_db_2 = ZainoDB::spawn(config, source_clone).await.unwrap();

            zaino_db_2.wait_until_ready().await;
            dbg!(zaino_db_2.status());
            let db_height = dbg!(zaino_db_2.db_height().await.unwrap()).unwrap();

            assert_eq!(db_height.0, 200);

            dbg!(zaino_db_2.shutdown().await.unwrap());
        });
    })
    .join()
    .unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn create_db_reader() {
    init_tracing();

    let (TestVectorData { blocks, .. }, _db_dir, zaino_db, db_reader) =
        load_vectors_v0db_and_reader().await;

    let test_vector_block = blocks.last().unwrap();
    let db_height = dbg!(zaino_db.db_height().await.unwrap()).unwrap();
    let db_reader_height = dbg!(db_reader.db_height().await.unwrap()).unwrap();

    assert_eq!(test_vector_block.height, db_height.0);
    assert_eq!(db_height, db_reader_height);
}

#[tokio::test(flavor = "multi_thread")]
async fn get_compact_blocks() {
    init_tracing();

    let (TestVectorData { blocks, .. }, _db_dir, _zaino_db, db_reader) =
        load_vectors_v0db_and_reader().await;

    for chain_block in indexed_block_chain(&blocks) {
        let height = chain_block.context.index.height;
        let compact_block = chain_block.to_compact_block();

        let reader_compact_block_default = db_reader
            .get_compact_block(height, PoolTypeFilter::default())
            .await
            .unwrap();
        let default_compact_block = compact_block_with_pool_types(
            compact_block.clone(),
            &PoolTypeFilter::default().to_pool_types_vector(),
        );
        assert_eq!(default_compact_block, reader_compact_block_default);

        let reader_compact_block_all_data = db_reader
            .get_compact_block(height, PoolTypeFilter::includes_all())
            .await
            .unwrap();
        let all_data_compact_block = compact_block_with_pool_types(
            compact_block,
            &PoolTypeFilter::includes_all().to_pool_types_vector(),
        );
        assert_eq!(all_data_compact_block, reader_compact_block_all_data);

        println!("CompactBlock at height {} OK", height.0);
    }
}

#[tokio::test(flavor = "multi_thread")]
async fn get_compact_block_stream() {
    use futures::StreamExt;

    init_tracing();

    let (TestVectorData { blocks, .. }, _db_dir, _zaino_db, db_reader) =
        load_vectors_v0db_and_reader().await;

    let start_height = Height(blocks.first().unwrap().height);
    let end_height = Height(blocks.last().unwrap().height);

    for pool_type_filter in [PoolTypeFilter::default(), PoolTypeFilter::includes_all()] {
        let compact_block_stream = db_reader
            .get_compact_block_stream(start_height, end_height, pool_type_filter.clone())
            .await
            .unwrap();

        futures::pin_mut!(compact_block_stream);

        let mut expected_next_height_u32: u32 = start_height.0;
        let mut streamed_block_count: usize = 0;

        while let Some(block_result) = compact_block_stream.next().await {
            let streamed_compact_block = block_result.unwrap();

            let streamed_height_u32: u32 = u32::try_from(streamed_compact_block.height).unwrap();

            assert_eq!(streamed_height_u32, expected_next_height_u32);

            let singular_compact_block = db_reader
                .get_compact_block(Height(streamed_height_u32), pool_type_filter.clone())
                .await
                .unwrap();

            assert_eq!(singular_compact_block, streamed_compact_block);

            expected_next_height_u32 = expected_next_height_u32.saturating_add(1);
            streamed_block_count = streamed_block_count.saturating_add(1);
        }

        let expected_block_count: usize = (end_height
            .0
            .saturating_sub(start_height.0)
            .saturating_add(1)) as usize;

        assert_eq!(streamed_block_count, expected_block_count);
        assert_eq!(expected_next_height_u32, end_height.0.saturating_add(1));
    }
}