rnacos 0.8.3

Nacos server re-implemented in Rust.
Documentation
use crate::common::constant::{CONFIG_TREE_NAME, NAMESPACE_TREE_NAME, USER_TREE_NAME};
use crate::config::core::{ConfigKey, ConfigValue};
use crate::config::model::ConfigValueDO;
use crate::namespace::model::NamespaceDO;
use crate::transfer::model::TransferRecordRef;
use crate::transfer::reader::{reader_transfer_record, TransferFileReader};
use crate::transfer::sqlite::dao::config::{ConfigDO, ConfigDao};
use crate::transfer::sqlite::dao::config_history::{ConfigHistoryDO, ConfigHistoryDao};
use crate::transfer::sqlite::dao::tenant::{TenantDO, TenantDao};
use crate::transfer::sqlite::dao::user::{UserDO, UserDao};
use crate::transfer::sqlite::TableSeq;
use crate::user::model::UserDo;
use rusqlite::Connection;

pub async fn data_to_sqlite(data_file: &str, db_path: &str) -> anyhow::Result<()> {
    let mut file_reader = TransferFileReader::new(data_file).await?;
    let conn = open_init_db(db_path)?;
    let mut config_count = 0;
    let mut tenant_count = 0;
    let mut user_count = 0;
    let mut ignore = 0;
    let mut table_seq = TableSeq::default();
    let config_dao = ConfigDao::new(&conn);
    let config_history_dao = ConfigHistoryDao::new(&conn);
    let user_dao = UserDao::new(&conn);
    let tenant_dao = TenantDao::new(&conn);
    while let Ok(Some(vec)) = file_reader.read_record_vec().await {
        let record = reader_transfer_record(&vec, &file_reader.header)?;
        if record.table_name.as_str() == CONFIG_TREE_NAME.as_str() {
            config_count += 1;
            insert_config(&mut table_seq, &config_dao, &config_history_dao, record)?;
        } else if record.table_name.as_str() == NAMESPACE_TREE_NAME.as_str() {
            tenant_count += 1;
            insert_namespace(&mut table_seq, &tenant_dao, record)?
        } else if record.table_name.as_str() == USER_TREE_NAME.as_str() {
            user_count += 1;
            insert_user(&mut table_seq, &user_dao, record)?
        } else {
            ignore += 1;
        }
    }
    log::info!(
        "transfer to sqlite db finished,config count:{},tenant count:{},use count:{},ignore count:{}",
        config_count,
        tenant_count,
        user_count,
        ignore
    );
    Ok(())
}

fn insert_config(
    table_seq: &mut TableSeq,
    config_dao: &ConfigDao<'_>,
    config_history_dao: &ConfigHistoryDao<'_>,
    record: TransferRecordRef<'_>,
) -> anyhow::Result<()> {
    let value_do = ConfigValueDO::from_bytes(&record.value)?;
    let key = String::from_utf8_lossy(&record.key).to_string();
    let key: ConfigKey = (&key as &str).into();
    let config_value: ConfigValue = value_do.into();
    let config_do = ConfigDO {
        id: Some(table_seq.next_config_id()),
        data_id: Some(key.data_id.clone()),
        group_id: Some(key.group.clone()),
        tenant_id: Some(key.tenant.clone()),
        content: Some(config_value.content.clone()),
        config_type: config_value.config_type.clone(),
        config_desc: config_value.desc,
        last_time: Some(config_value.last_modified),
    };
    config_dao.insert(&config_do)?;
    for history_item in config_value.histories {
        let history = ConfigHistoryDO {
            id: Some(table_seq.next_config_history_id()),
            data_id: Some(key.data_id.clone()),
            group_id: Some(key.group.clone()),
            tenant_id: Some(key.tenant.clone()),
            content: Some(history_item.content.clone()),
            config_type: None,
            config_desc: None,
            op_user: history_item.op_user.clone(),
            last_time: Some(history_item.modified_time),
        };
        config_history_dao.insert(&history)?;
    }
    Ok(())
}

fn insert_namespace(
    table_seq: &mut TableSeq,
    tenant_dao: &TenantDao<'_>,
    record: TransferRecordRef<'_>,
) -> anyhow::Result<()> {
    let value_do: NamespaceDO = NamespaceDO::from_bytes(&record.value)?;
    let tenant_do = TenantDO {
        id: Some(table_seq.next_tenant_id()),
        tenant_id: value_do.namespace_id,
        tenant_name: value_do.namespace_name,
        tenant_desc: None,
        create_flag: None,
    };
    tenant_dao.insert(&tenant_do)?;
    Ok(())
}

fn insert_user(
    table_seq: &mut TableSeq,
    user_dao: &UserDao<'_>,
    record: TransferRecordRef<'_>,
) -> anyhow::Result<()> {
    let value_do = UserDo::from_bytes(&record.value)?;
    let mut user_do: UserDO = value_do.into();
    user_do.id = Some(table_seq.next_user_id());
    user_dao.insert(&user_do)?;
    Ok(())
}

pub fn open_init_db(db_path: &str) -> anyhow::Result<Connection> {
    let conn = Connection::open(db_path)?;
    let create_table_sql = r"
create table if not exists tb_config(
    id integer primary key autoincrement,
    data_id text,
    group_id text,
    tenant_id text,
    content text,
    config_type text,
    config_desc text,
    last_time long
);
create index if not exists tb_config_key_idx on tb_config(data_id,group_id,tenant_id);

create table if not exists tb_config_history(
    id integer primary key autoincrement,
    data_id text,
    group_id text,
    tenant_id text,
    content text,
    config_type text,
    config_desc text,
    op_user text,
    last_time long
);
create index if not exists tb_config_history_key_idx on tb_config_history(data_id,group_id,tenant_id);

create table if not exists tb_tenant(
    id integer primary key autoincrement,
    tenant_id text,
    tenant_name text,
    tenant_desc text,
    create_flag integer
);

create table if not exists tb_user(
    id integer primary key autoincrement,
    username text,
    nickname text,
    password_hash text,
    gmt_create integer,
    gmt_modified integer,
    enabled text,
    roles text,
    extend_info text
);
        ";
    conn.execute_batch(create_table_sql)?;
    Ok(conn)
}