avalanche-types 0.1.5

Avalanche primitive types in Rust
Documentation
mod batch;
mod concurrency;
mod iterator;

use std::{io::ErrorKind, time::Duration};

pub use super::serve_test_database;
use avalanche_types::subnet::rpc::database::{
    corruptabledb::Database as CorruptableDb,
    manager::{versioned_database::VersionedDatabase, DatabaseManager},
    memdb::Database as MemDb,
    rpcdb::{client::DatabaseClient, server::Server as RpcDb},
};
use semver::Version;
use tokio::net::TcpListener;
use tonic::transport::Channel;

#[tokio::test]
async fn rpcdb_mutation_test() {
    let _ = env_logger::builder()
        .filter_level(log::LevelFilter::Info)
        .is_test(true)
        .try_init();

    let bar_value = "bar".as_bytes().to_vec();
    let baz_value = "baz".as_bytes().to_vec();

    let db = MemDb::new_boxed();
    let server = RpcDb::new_boxed(db);

    let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
    let addr = listener.local_addr().unwrap();
    tokio::spawn(async move {
        serve_test_database(server, listener).await.unwrap();
    });
    tokio::time::sleep(Duration::from_millis(100)).await;

    let client_conn = Channel::builder(format!("http://{}", addr).parse().unwrap())
        .connect()
        .await
        .unwrap();

    let mut client = DatabaseClient::new_boxed(client_conn);

    log::info!("put foo:bar");
    let resp = client.put("foo".as_bytes(), "bar".as_bytes()).await;
    assert!(resp.is_ok());

    log::info!("get foo:bar");
    let resp = client.get("foo".as_bytes()).await;
    let value = resp.unwrap();
    assert_eq!(value, bar_value.clone());

    // verify valid response from cloning client
    let mut client = client.clone();

    log::info!("put foo:baz");
    let resp = client.put("foo".as_bytes(), "baz".as_bytes()).await;
    assert!(resp.is_ok());

    log::info!("get foo:baz");
    let resp = client.get("foo".as_bytes()).await;
    let value = resp.unwrap();
    assert_eq!(value, baz_value.clone());

    log::info!("has foo true");
    let resp = client.has("foo".as_bytes()).await;
    assert!(resp.is_ok());
    assert!(resp.unwrap());

    log::info!("get fool error not found");
    let resp = client.get("fool".as_bytes()).await;
    assert!(resp.is_err());
    assert_eq!(resp.unwrap_err().kind(), ErrorKind::NotFound);

    log::info!("has fool false");
    let resp = client.has("fool".as_bytes()).await;
    assert!(resp.is_ok());
    assert!(!resp.unwrap());

    log::info!("close client");
    let resp = client.close().await;
    assert!(resp.is_ok());

    log::info!("get foo error closed");
    let resp = client.get("foo".as_bytes()).await;
    assert!(resp.is_err());
    assert!(resp.unwrap_err().to_string().contains("database closed"));
}

#[tokio::test]
async fn corruptibledb_mutation_test() {
    let _ = env_logger::builder()
        .filter_level(log::LevelFilter::Info)
        .is_test(true)
        .try_init();

    let bar_value = "bar".as_bytes().to_vec();

    let memdb = MemDb::new_boxed();
    let mut corruptible = CorruptableDb::new_boxed(memdb);

    log::info!("put foo:bar");
    let resp = corruptible.put("foo".as_bytes(), "bar".as_bytes()).await;
    assert!(resp.is_ok());

    log::info!("get foo:bar");
    let resp = corruptible.get("foo".as_bytes()).await;
    let value = resp.unwrap();
    assert_eq!(value, bar_value.clone());

    log::info!("put foo:baz");
    let resp = corruptible.put("foo".as_bytes(), "baz".as_bytes()).await;
    assert!(resp.is_ok());

    log::info!("has foo true");
    let resp = corruptible.has("foo".as_bytes()).await;
    assert!(resp.is_ok());
    assert!(resp.unwrap());

    log::info!("get fool error not found");
    let resp = corruptible.get("fool".as_bytes()).await;
    assert!(resp.is_err());
    assert_eq!(resp.unwrap_err().kind(), ErrorKind::NotFound);

    log::info!("has fool false");
    let resp = corruptible.has("fool".as_bytes()).await;
    assert!(resp.is_ok());
    assert!(!resp.unwrap());

    log::info!("close client");
    let resp = corruptible.close().await;
    assert!(resp.is_ok());

    log::info!("get foo error closed");
    let resp = corruptible.put("foo".as_bytes(), "baz".as_bytes()).await;
    assert!(resp.is_err());
    assert!(resp.unwrap_err().to_string().contains("database closed"));
}

#[tokio::test]
async fn test_rpcdb_corruptible() {
    let _ = env_logger::builder()
        .filter_level(log::LevelFilter::Info)
        .is_test(true)
        .try_init();

    let bar_value = "bar".as_bytes().to_vec();
    let baz_value = "baz".as_bytes().to_vec();

    let memdb = MemDb::new_boxed();
    let rpc_server = RpcDb::new_boxed(memdb);

    let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
    let addr = listener.local_addr().unwrap();
    tokio::spawn(async move {
        serve_test_database(rpc_server, listener).await.unwrap();
    });
    tokio::time::sleep(Duration::from_millis(100)).await;

    let client_conn = Channel::builder(format!("http://{}", addr).parse().unwrap())
        .connect()
        .await
        .unwrap();

    let db = DatabaseClient::new_boxed(client_conn);
    let mut client = CorruptableDb::new_boxed(db);

    log::info!("put foo:bar");
    let resp = client.put("foo".as_bytes(), "bar".as_bytes()).await;
    assert!(resp.is_ok());

    log::info!("get foo:bar");
    let resp = client.get("foo".as_bytes()).await;
    let value = resp.unwrap();
    assert_eq!(value, bar_value.clone());

    // verify valid response from cloning client
    let mut client = client.clone();

    log::info!("put foo:baz");
    let resp = client.put("foo".as_bytes(), "baz".as_bytes()).await;
    assert!(resp.is_ok());

    log::info!("get foo:baz");
    let resp = client.get("foo".as_bytes()).await;
    let value = resp.unwrap();
    assert_eq!(value, baz_value.clone());

    log::info!("has foo true");
    let resp = client.has("foo".as_bytes()).await;
    assert!(resp.is_ok());
    assert!(resp.unwrap());

    log::info!("get fool error not found");
    let resp = client.get("fool".as_bytes()).await;
    assert!(resp.is_err());
    assert_eq!(resp.unwrap_err().kind(), ErrorKind::NotFound);

    log::info!("has fool false");
    let resp = client.has("fool".as_bytes()).await;
    assert!(resp.is_ok());
    assert!(!resp.unwrap());

    log::info!("close client");
    let resp = client.close().await;
    assert!(resp.is_ok());

    log::info!("get foo error closed");
    let resp = client.get("foo".as_bytes()).await;
    assert!(resp.is_err());
    assert!(resp.unwrap_err().to_string().contains("database closed"));
}

#[tokio::test]
async fn test_db_manager() {
    use avalanche_types::subnet::rpc::database::manager::Manager;
    let _ = env_logger::builder()
        .filter_level(log::LevelFilter::Info)
        .is_test(true)
        .try_init();

    let bar_value = "bar".as_bytes().to_vec();
    let baz_value = "baz".as_bytes().to_vec();

    let memdb = MemDb::new_boxed();
    let rpc_server = RpcDb::new_boxed(memdb);

    let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
    let addr = listener.local_addr().unwrap();
    tokio::spawn(async move {
        serve_test_database(rpc_server, listener).await.unwrap();
    });
    tokio::time::sleep(Duration::from_millis(100)).await;

    let client_conn = Channel::builder(format!("http://{}", addr).parse().unwrap())
        .connect()
        .await
        .unwrap();

    let vdb = VersionedDatabase::new(
        DatabaseClient::new_boxed(client_conn),
        Version::new(0, 0, 1),
    );

    let databases: Vec<VersionedDatabase> = vec![vdb];

    let manager = DatabaseManager::from_databases(databases);
    let current = manager.current().await.unwrap();

    let mut client = current.db;

    log::info!("put foo:bar");
    let resp = client.put("foo".as_bytes(), "bar".as_bytes()).await;
    assert!(resp.is_ok());

    log::info!("get foo:bar");
    let resp = client.get("foo".as_bytes()).await;
    let value = resp.unwrap();
    assert_eq!(value, bar_value.clone());

    // verify valid response from cloning client
    let mut client = client.clone();

    log::info!("put foo:baz");
    let resp = client.put("foo".as_bytes(), "baz".as_bytes()).await;
    assert!(resp.is_ok());

    log::info!("get foo:baz");
    let resp = client.get("foo".as_bytes()).await;
    let value = resp.unwrap();
    assert_eq!(value, baz_value.clone());

    log::info!("close all db with manager");
    let _ = manager.close().await;

    log::info!("get foo error closed");
    let resp = client.get("foo".as_bytes()).await;
    assert!(resp.is_err());
    assert!(resp.unwrap_err().to_string().contains("database closed"));
}