#![feature(fn_traits)]
#![feature(once_cell)]
#![feature(const_trait_impl)]
#![feature(unboxed_closures)]
#![feature(min_specialization)]
use std::ops::Deref;
use std::fmt::Debug;
use std::hash::Hash;
use std::cmp::Ordering as CmpOrdering;
use std::sync::{Arc,
atomic::{AtomicUsize, Ordering}};
use futures::{future::BoxFuture,
stream::BoxStream};
use bytes::{Buf, BufMut};
use log::warn;
use pi_bon::{WriteBuffer, ReadBuffer, Encode, Decode, ReadBonErr};
use pi_sinfo::EnumType;
use pi_async_rt::rt::{AsyncRuntime,
multi_thread::MultiTaskRuntime};
use pi_async_transaction::{AsyncCommitLog, TransactionError, ErrorLevel};
use pi_guid::Guid;
use pi_ordmap::asbtree::TreeByteSize;
pub mod db;
pub mod tables;
pub mod inspector;
pub mod utils;
#[derive(Debug, Hash)]
pub struct Binary(Arc<Vec<u8>>);
unsafe impl Send for Binary {}
impl Clone for Binary {
fn clone(&self) -> Self {
Binary(self.0.clone())
}
}
impl AsRef<[u8]> for Binary {
fn as_ref(&self) -> &[u8] {
self.0.as_slice()
}
}
impl Deref for Binary {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.0.as_slice()
}
}
impl From<KVTableMeta> for Binary {
fn from(src: KVTableMeta) -> Self {
let mut buf = Vec::new();
buf.put_u8(src.table_type as u8);
if src.persistence {
buf.put_u8(1);
} else {
buf.put_u8(0);
}
let mut write_buffer = WriteBuffer::new();
src.key.encode(&mut write_buffer);
buf.put_u16_le(write_buffer.len() as u16); buf.put_slice(write_buffer.get_byte().as_slice());
let mut write_buffer = WriteBuffer::new();
src.value.encode(&mut write_buffer);
buf.put_u16_le(write_buffer.len() as u16); buf.put_slice(write_buffer.get_byte().as_slice());
Binary::new(buf)
}
}
impl Ord for Binary {
fn cmp(&self, other: &Binary) -> CmpOrdering {
self.partial_cmp(other).expect(&format!("Can't compare two binaries, {:?}, {:?}", self, other))
}
}
impl PartialOrd for Binary {
fn partial_cmp(&self, other: &Binary) -> Option<CmpOrdering> {
ReadBuffer::new(self.0.as_slice(), 0)
.partial_cmp(&ReadBuffer::new(other.0.as_slice(), 0))
}
}
impl Eq for Binary {}
impl PartialEq for Binary {
fn eq(&self, other: &Binary) -> bool {
match self.partial_cmp(other){
Some(CmpOrdering::Equal) => true,
_ => false
}
}
}
impl Default for Binary {
fn default() -> Self {
Binary(Arc::new(Vec::default()))
}
}
impl TreeByteSize for Binary {
fn tree_bytes_size(&self) -> u64 {
self.0.len() as u64
}
}
impl Binary {
pub fn new(bin: Vec<u8>) -> Self {
Binary(Arc::new(bin))
}
pub fn binary_equal(this: &Self, other: &Self) -> bool {
Arc::ptr_eq(&this.0, &other.0)
}
pub fn from_shared(shared: Arc<Vec<u8>>) -> Self {
Binary(shared)
}
pub fn from_slice<B: AsRef<[u8]>>(slice: B) -> Self {
Binary(Arc::new(Vec::from(slice.as_ref())))
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn to_shared(&self) -> Arc<Vec<u8>> {
self.0.clone()
}
}
pub trait KVAction: Send + Sync + 'static {
type Key: AsRef<[u8]> + Deref<Target = [u8]> + Hash + PartialEq + Eq + PartialOrd + Ord + Clone + Send + 'static;
type Value: AsRef<[u8]> + Deref<Target = [u8]> + Default + Clone + Send + 'static;
type Error: Debug + 'static;
fn dirty_query(&self, key: <Self as KVAction>::Key)
-> BoxFuture<Option<<Self as KVAction>::Value>>;
fn query(&self, key: <Self as KVAction>::Key)
-> BoxFuture<Option<<Self as KVAction>::Value>>;
fn dirty_upsert(&self,
key: <Self as KVAction>::Key,
value: <Self as KVAction>::Value)
-> BoxFuture<Result<(), <Self as KVAction>::Error>>;
fn upsert(&self,
key: <Self as KVAction>::Key,
value: <Self as KVAction>::Value)
-> BoxFuture<Result<(), <Self as KVAction>::Error>>;
fn dirty_delete(&self, key: <Self as KVAction>::Key)
-> BoxFuture<Result<Option<<Self as KVAction>::Value>, <Self as KVAction>::Error>>;
fn delete(&self, key: <Self as KVAction>::Key)
-> BoxFuture<Result<Option<<Self as KVAction>::Value>, <Self as KVAction>::Error>>;
fn keys<'a>(&self,
key: Option<<Self as KVAction>::Key>,
descending: bool)
-> BoxStream<'a, <Self as KVAction>::Key>;
fn values<'a>(&self,
key: Option<<Self as KVAction>::Key>,
descending: bool)
-> BoxStream<'a, (<Self as KVAction>::Key, <Self as KVAction>::Value)>;
fn lock_key(&self, key: <Self as KVAction>::Key)
-> BoxFuture<Result<(), <Self as KVAction>::Error>>;
fn unlock_key(&self, key: <Self as KVAction>::Key)
-> BoxFuture<Result<(), <Self as KVAction>::Error>>;
}
#[derive(Debug, Clone, PartialEq)]
pub enum KVDBTableType {
MemOrdTab = 1, LogOrdTab, LogWTab, BtreeOrdTab, }
impl From<u8> for KVDBTableType {
fn from(src: u8) -> Self {
match src {
1 => KVDBTableType::MemOrdTab,
2 => KVDBTableType::LogOrdTab,
3 => KVDBTableType::LogWTab,
4 => KVDBTableType::BtreeOrdTab,
_ => panic!("From u8 to KVDBTableType failed, src: {}, reason: invalid src", src),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct KVTableMeta {
table_type: KVDBTableType, persistence: bool, key: EnumType, value: EnumType, }
impl From<Binary> for KVTableMeta {
fn from(src: Binary) -> Self {
let mut buf = src.as_ref();
let mut offset = 0;
let table_type = KVDBTableType::from(buf.get_u8());
offset += 1;
let persistence = if buf.get_u8() == 0 {
false
} else {
true
};
offset += 1;
let key_len = buf.get_u16_le() as usize;
offset += 2;
let mut read_buffer = ReadBuffer::new(&buf[0..key_len], 0);
buf.advance(key_len); offset += key_len;
let key = EnumType::decode(&mut read_buffer).unwrap();
let value_len = buf.get_u16_le() as usize;
offset += 2;
let mut read_buffer = ReadBuffer::new(&buf[0..value_len], 0);
buf.advance(value_len);
offset += value_len;
let value = EnumType::decode(&mut read_buffer).unwrap();
KVTableMeta {
table_type,
persistence,
key,
value,
}
}
}
impl KVTableMeta {
pub fn new(table_type: KVDBTableType,
persistence: bool,
key: EnumType,
value: EnumType) -> Self {
KVTableMeta {
table_type,
persistence,
key,
value,
}
}
pub fn with_compatibled(table_type: KVDBTableType,
persistence: bool,
bin: &[u8]) -> Result<Self, ReadBonErr> {
let mut buffer = ReadBuffer::new(bin, 0);
let key = EnumType::decode(&mut buffer)?;
let value = EnumType::decode(&mut buffer)?;
Ok(Self::new(table_type, persistence, key, value))
}
pub fn table_type(&self) -> &KVDBTableType {
&self.table_type
}
pub fn is_persistence(&self) -> bool {
self.persistence
}
pub fn key_type(&self) -> &EnumType {
&self.key
}
pub fn value_type(&self) -> &EnumType {
&self.value
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TableTrQos {
Unsafe = 0, ThreadSafe, Safe, }
impl Default for TableTrQos {
fn default() -> Self {
TableTrQos::Safe
}
}
#[derive(Debug, Clone)]
pub enum KVActionLog {
Read, Write(Option<Binary>), DirtyWrite(Option<Binary>), }
impl KVActionLog {
#[inline]
pub fn is_dirty_writed(&self) -> bool {
if let KVActionLog::DirtyWrite(_) = self {
true
} else {
false
}
}
}
#[derive(Clone)]
pub struct KVDBCommitConfirm<
C: Clone + Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
>(Arc<(
MultiTaskRuntime<()>, Log, Guid, Option<Guid>, AtomicUsize, )>);
unsafe impl<
C: Clone + Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
> Send for KVDBCommitConfirm<C, Log> {}
unsafe impl<
C: Clone + Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
> Sync for KVDBCommitConfirm<C, Log> {}
impl<
C: Clone + Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
> FnOnce<(Guid, Guid, Result<(), KVTableTrError>)> for KVDBCommitConfirm<C, Log> {
type Output = Result<(), KVTableTrError>;
extern "rust-call" fn call_once(self, args: (Guid, Guid, Result<(), KVTableTrError>))
-> Self::Output {
self.call(args)
}
}
impl<
C: Clone + Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
> FnMut<(Guid, Guid, Result<(), KVTableTrError>)> for KVDBCommitConfirm<C, Log> {
extern "rust-call" fn call_mut(&mut self, args: (Guid, Guid, Result<(), KVTableTrError>))
-> Self::Output {
self.call(args)
}
}
impl<
C: Clone + Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
> Fn<(Guid, Guid, Result<(), KVTableTrError>)> for KVDBCommitConfirm<C, Log> {
extern "rust-call" fn call(&self, args: (Guid, Guid, Result<(), KVTableTrError>))
-> Self::Output {
if let Err(e) = args.2 {
if let ErrorLevel::Fatal = &e.level() {
return Err(e);
}
}
self.confirm_commited(args.0, args.1)
}
}
impl<
C: Clone + Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
> KVDBCommitConfirm<C, Log> {
pub fn new(rt: MultiTaskRuntime<()>,
commit_logger: Log,
tid: Guid,
cid: Option<Guid>,
count: usize) -> Self {
KVDBCommitConfirm(Arc::new((
rt,
commit_logger,
tid,
cid,
AtomicUsize::new(count),
)))
}
pub fn commit_logger(&self) -> &Log {
&(self.0).1
}
#[inline(always)]
fn confirm_commited(&self, tid: Guid, cid: Guid) -> Result<(), KVTableTrError> {
if (self.0).2 != tid || (self.0).3.clone().unwrap() != cid {
return Err(KVTableTrError::new_transaction_error(ErrorLevel::Normal,
format!("Confirm commited failed, require_transaction_uid: {:?}, require_commit_uid: {:?}, transaction_uid: {:?}, commit_uid: {:?}, reason: invalid transaction_uid or commit_uid", (self.0).2, (self.0).3, tid, cid)));
}
if (self.0).4.fetch_sub(1, Ordering::SeqCst) <= 1 {
let confirmer = self.clone();
let _ = (self.0).0.spawn(async move {
let last = COMMITED_LEN.fetch_add(1, Ordering::Relaxed);
if let Err(e) = (confirmer.0)
.1
.confirm(cid.clone())
.await {
warn!("Confirm commit log failed, transaction_uid: {:?}, commit_uid: {:?}, reason: {:?}", tid, cid, e);
}
#[cfg(feature = "log_table_debug")]
{
let event = TransactionDebugEvent::End(tid.clone(), cid.clone());
let logger = transaction_debug_logger();
logger.log(event);
}
});
}
Ok(())
}
}
static COMMITED_LEN: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug)]
pub enum KVTableTrError {
Common(ErrorLevel, String), Conflicts(Atom, Binary), }
unsafe impl Send for KVTableTrError {}
unsafe impl Sync for KVTableTrError {}
impl TransactionError for KVTableTrError {
fn new_transaction_error<E>(level: ErrorLevel, reason: E) -> Self
where E: Debug + Sized + 'static
{
KVTableTrError::Common(
level,
format!("Table transaction error, reason: {:?}", reason),
)
}
}
impl KVTableTrError {
pub fn new_conflicts_error(table: Atom, key: Binary) -> Self {
KVTableTrError::Conflicts(
table.as_ref().into(),
Binary::from_slice(key),
)
}
pub fn is_common(&self) -> bool {
if let Self::Common(_, _) = self {
true
} else {
false
}
}
pub fn is_conflicts(&self) -> bool {
if let Self::Conflicts(_, _) = self {
true
} else {
false
}
}
pub fn level(&self) -> ErrorLevel {
if let Self::Common(level, _) = self {
level.clone()
} else {
ErrorLevel::Normal
}
}
pub fn conflicts(&self) -> Option<(&Atom, &Binary)> {
if let Self::Conflicts(table, binary) = self {
Some((table, binary))
} else {
None
}
}
}
use std::sync::OnceLock;
static TRANSACTION_DEBUG_LOGGER: OnceLock<TransactionDebugLogger> = OnceLock::new();
pub fn init_transaction_debug_logger<P: AsRef<Path>>(rt: MultiTaskRuntime<()>,
path: P,
interval: usize,
timeout: usize) {
let logger = TransactionDebugLogger::new(rt, path);
if let Ok(_) = TRANSACTION_DEBUG_LOGGER.set(logger.clone()) {
logger.startup(interval, timeout);
}
}
pub fn transaction_debug_logger<'a>() -> &'a TransactionDebugLogger {
TRANSACTION_DEBUG_LOGGER
.get()
.unwrap()
}
use pi_atom::Atom;
use pi_async_transaction::manager_2pc::Transaction2PcStatus;
pub enum TransactionDebugEvent {
Begin(Guid, Transaction2PcStatus, bool, bool, usize), Commit(Guid, Guid, Transaction2PcStatus, Atom, usize, usize), CommitConfirm(Guid, Guid, Atom, bool, bool), End(Guid, Guid), }
use std::path::Path;
use std::time::Instant;
use crossbeam_channel::{Sender, Receiver, unbounded, bounded};
use dashmap::{DashMap, mapref::entry::Entry};
use pi_store::log_store::log_file::{LogFile, LogMethod};
pub struct TransactionDebugLogger(Arc<InnerTransactionDebugLogger>);
unsafe impl Send for TransactionDebugLogger {}
unsafe impl Sync for TransactionDebugLogger {}
impl Clone for TransactionDebugLogger {
fn clone(&self) -> Self {
TransactionDebugLogger(self.0.clone())
}
}
impl TransactionDebugLogger {
pub fn new<P: AsRef<Path>>(rt: MultiTaskRuntime<()>,
path: P) -> Self {
let (sender, receiver) = unbounded();
let times = DashMap::new();
let rt_copy = rt.clone();
let (s, r) = bounded(1);
let path = path.as_ref().to_path_buf();
rt.spawn(async move {
let log = LogFile
::open(rt_copy.clone(),
path,
8096,
128 * 1024 * 1024,
None)
.await
.unwrap();
s.send(log);
});
let log = r.recv().unwrap();
let inner = InnerTransactionDebugLogger {
rt,
sender,
receiver,
times,
log,
};
TransactionDebugLogger(Arc::new(inner))
}
pub fn log(&self, event: TransactionDebugEvent) {
self.0
.sender
.send(event);
}
pub fn startup(self,
mut interval: usize,
mut timeout: usize) {
if interval < 1000 {
interval = 1000;
}
if timeout < 1000 {
timeout = 1000;
}
let logger = self.clone();
self.0.rt.spawn(async move {
loop {
let events: Vec<TransactionDebugEvent> = logger.0.receiver.try_iter().collect();
let mut log_id = 0;
for event in events {
match event {
TransactionDebugEvent::Begin(tid, status, writable, require_persistence, output_size) => {
match logger.0.times.entry(tid.clone()) {
Entry::Occupied(o) => {
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("Transaction id conflict:\n\tstatus: {:?}\n\twritable: {:?}\n\trequire_persistence: {:?}\n\toutput_size: {:?}\n",
status,
writable,
require_persistence,
output_size).as_bytes());
},
Entry::Vacant(v) => {
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("Begin transaction:\n\tstatus: {:?}\n\twritable: {:?}\n\trequire_persistence: {:?}\n\toutput_size: {:?}\n",
status,
writable,
require_persistence,
output_size).as_bytes());
v.insert(Instant::now());
},
}
},
TransactionDebugEvent::Commit(tid, cid, status, table, actions_len, log_index) => {
if let Some(item) = logger.0.times.get(&tid) {
let time = item.value().elapsed();
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("Commit transaction successed:\n\tlog_index: {:?}\n\tcid: {:?}\n\tstatus: {:?}\n\ttable: {:?}\n\tactions_len: {:?}\n\ttime: {:?}\n",
log_index,
cid,
status,
table.as_str(),
actions_len,
time).as_bytes());
} else {
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("Commit transaction failed, transaction not exist:\n\tlog_index: {:?}\n\tcid: {:?}\n\tstatus: {:?}\n\ttable: {:?}\n\tactions_len: {:?}\n",
log_index,
cid,
status,
table.as_str(),
actions_len).as_bytes());
}
},
TransactionDebugEvent::CommitConfirm(tid, cid, table, writable, require_persistence) => {
if let Some(item) = logger.0.times.get(&tid) {
let time = item.value().elapsed();
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("Commit confirm transaction successed:\n\tcid: {:?}\n\ttable: {:?}\n\twritable: {:?}\n\trequire_persistence: {:?}\n\ttime: {:?}\n",
cid,
table.as_str(),
writable,
require_persistence,
time).as_bytes());
} else {
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("Commit confirm transaction failed, transaction not exist:\n\tcid: {:?}\n\ttable: {:?}\n\twritable: {:?}\n\trequire_persistence: {:?}\n",
cid,
table.as_str(),
writable,
require_persistence).as_bytes());
}
},
TransactionDebugEvent::End(tid, cid) => {
if let Some((_tid, now)) = logger.0.times.remove(&tid) {
let time = now.elapsed();
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("End transaction:\n\tcid: {:?}\n\ttime: {:?}\n",
cid,
time).as_bytes());
} else {
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("End transaction failed, transaction not exist:\n\tcid: {:?}\n",
cid).as_bytes());
}
},
}
}
let _ = logger
.0
.log
.commit(log_id,
false,
false,
Some(timeout)).await;
logger
.0
.rt
.timeout(interval)
.await;
}
});
}
}
struct InnerTransactionDebugLogger {
rt: MultiTaskRuntime<()>, sender: Sender<TransactionDebugEvent>, receiver: Receiver<TransactionDebugEvent>, times: DashMap<Guid, Instant>, log: LogFile, }