use std::fmt::Debug;
use std::path::Path;
use bytes::{Buf, BufMut};
use futures::future::BoxFuture;
use pi_async_transaction::{
AsyncTransaction, SequenceTransaction, Transaction2Pc, TransactionTree, UnitTransaction,
};
use pi_atom::Atom;
use crate::{Binary, KVAction};
pub mod log_ord_table;
pub mod log_write_table;
pub mod mem_ord_table;
pub mod meta_table;
pub mod b_tree_ord_table;
const DEFAULT_DB_TABLE_NAME_MAX_LEN: usize = 0xffff;
pub trait KVTable: Send + Sync + 'static {
type Name: AsRef<str> + Debug + Clone + Send + 'static;
type Tr: KVAction
+ TransactionTree
+ SequenceTransaction
+ UnitTransaction
+ Transaction2Pc
+ AsyncTransaction;
type Error: Debug + Send + 'static;
fn name(&self) -> <Self as KVTable>::Name;
fn path(&self) -> Option<&Path>;
fn is_persistent(&self) -> bool;
fn is_ordered(&self) -> bool;
fn len(&self) -> usize;
fn size(&self) -> u64;
fn transaction(
&self,
source: Atom,
is_writable: bool,
is_persistent: bool,
prepare_timeout: u64,
commit_timeout: u64,
) -> Self::Tr;
fn ready_collect(&self) -> BoxFuture<Result<(), Self::Error>>;
fn collect(&self) -> BoxFuture<Result<(), Self::Error>>;
fn init_table_prepare_output(
&self,
prepare_output: &mut <<Self as KVTable>::Tr as Transaction2Pc>::PrepareOutput,
writed_len: u64,
) {
let table_name = self.name().as_ref().to_string();
let bytes = table_name.as_bytes();
let bytes_len = bytes.len();
if bytes_len == 0 || bytes_len > DEFAULT_DB_TABLE_NAME_MAX_LEN {
panic!("Init table prepare output failed, table_name: {:?}, reason: invalid table name length", table_name.as_str());
}
prepare_output.put_u16_le(bytes_len as u16); prepare_output.put_slice(bytes); prepare_output.put_u64_le(writed_len); }
fn append_key_value_to_table_prepare_output(
&self,
prepare_output: &mut <<Self as KVTable>::Tr as Transaction2Pc>::PrepareOutput,
key: &<<Self as KVTable>::Tr as KVAction>::Key,
value: Option<&<<Self as KVTable>::Tr as KVAction>::Value>,
) {
let bytes: &[u8] = key.as_ref();
prepare_output.put_u16_le(bytes.len() as u16); prepare_output.put_slice(bytes);
if let Some(value) = value {
let bytes: &[u8] = value.as_ref();
prepare_output.put_u32_le(bytes.len() as u32); prepare_output.put_slice(bytes); } else {
prepare_output.put_u32_le(0); }
}
fn get_init_table_prepare_output(
prepare_output: &<<Self as KVTable>::Tr as Transaction2Pc>::PrepareOutput,
mut offset: usize,
) -> (Atom, u64, usize) {
let mut bytes: &[u8] = prepare_output.as_ref();
bytes.advance(offset);
let table_name_len = bytes.get_u16_le() as usize; offset += 2;
let table_name_string = String::from_utf8(bytes[0..table_name_len].to_vec()).unwrap();
let table_name = Atom::from(table_name_string); bytes.advance(table_name_len); offset += table_name_len;
let kvs_len = bytes.get_u64_le(); offset += 8;
(table_name, kvs_len, offset)
}
fn get_all_key_value_from_table_prepare_output(
prepare_output: &<<Self as KVTable>::Tr as Transaction2Pc>::PrepareOutput,
table: &Atom,
kvs_len: u64,
mut offset: usize,
) -> (Vec<TableKV>, usize) {
let mut bytes: &[u8] = prepare_output.as_ref();
bytes.advance(offset);
let mut tkvs = Vec::with_capacity(kvs_len as usize);
for _index in 0..kvs_len {
let key_len = bytes.get_u16_le() as usize; offset += 2;
let key = Binary::from_slice(&bytes[0..key_len]); bytes.advance(key_len); offset += key_len;
let value_len = bytes.get_u32_le() as usize; offset += 4;
if value_len > 0 {
let value = Binary::from_slice(&bytes[0..value_len]); bytes.advance(value_len); offset += value_len;
tkvs.push(TableKV::new(table.clone(), key, Some(value)));
} else {
tkvs.push(TableKV::new(table.clone(), key, None));
}
}
(tkvs, offset)
}
}
#[derive(Debug, Clone)]
pub struct TableKV {
pub table: Atom, pub key: Binary, pub value: Option<Binary>, }
unsafe impl Send for TableKV {}
unsafe impl Sync for TableKV {}
impl TableKV {
pub fn new(table: Atom, key: Binary, value: Option<Binary>) -> Self {
TableKV { table, key, value }
}
pub fn exist_value(&self) -> bool {
self.value.is_some()
}
}