#![feature(fn_traits)]
#![feature(once_cell)]
#![feature(const_trait_impl)]
#![feature(unboxed_closures)]
#![feature(min_specialization)]
use std::cmp::Ordering as CmpOrdering;
use std::fmt::Debug;
use std::hash::Hash;
use std::ops::Deref;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use bytes::{Buf, BufMut};
use futures::{future::BoxFuture, stream::BoxStream};
use log::warn;
use pi_async_rt::rt::{multi_thread::MultiTaskRuntime, AsyncRuntime};
use pi_async_transaction::{AsyncCommitLog, ErrorLevel, TransactionError};
use pi_bon::{Decode, Encode, ReadBonErr, ReadBuffer, WriteBuffer};
use pi_guid::Guid;
use pi_ordmap::asbtree::TreeByteSize;
use pi_sinfo::EnumType;
pub mod db;
pub mod inspector;
pub mod tables;
pub mod utils;
#[derive(Debug, Hash)]
pub struct Binary(Arc<Vec<u8>>);
unsafe impl Send for Binary {}
impl Clone for Binary {
fn clone(&self) -> Self {
Binary(self.0.clone())
}
}
impl AsRef<[u8]> for Binary {
fn as_ref(&self) -> &[u8] {
self.0.as_slice()
}
}
impl Deref for Binary {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.0.as_slice()
}
}
impl From<KVTableMeta> for Binary {
fn from(src: KVTableMeta) -> Self {
let mut buf = Vec::new();
buf.put_u8(src.table_type as u8);
if src.persistence {
buf.put_u8(1);
} else {
buf.put_u8(0);
}
let mut write_buffer = WriteBuffer::new();
src.key.encode(&mut write_buffer);
buf.put_u16_le(write_buffer.len() as u16); buf.put_slice(write_buffer.get_byte().as_slice());
let mut write_buffer = WriteBuffer::new();
src.value.encode(&mut write_buffer);
buf.put_u16_le(write_buffer.len() as u16); buf.put_slice(write_buffer.get_byte().as_slice());
Binary::new(buf)
}
}
impl Ord for Binary {
fn cmp(&self, other: &Binary) -> CmpOrdering {
self.partial_cmp(other).expect(&format!(
"Can't compare two binaries, {:?}, {:?}",
self, other
))
}
}
impl PartialOrd for Binary {
fn partial_cmp(&self, other: &Binary) -> Option<CmpOrdering> {
ReadBuffer::new(self.0.as_slice(), 0).partial_cmp(&ReadBuffer::new(other.0.as_slice(), 0))
}
}
impl Eq for Binary {}
impl PartialEq for Binary {
fn eq(&self, other: &Binary) -> bool {
match self.partial_cmp(other) {
Some(CmpOrdering::Equal) => true,
_ => false,
}
}
}
impl Default for Binary {
fn default() -> Self {
Binary(Arc::new(Vec::default()))
}
}
impl TreeByteSize for Binary {
fn tree_bytes_size(&self) -> u64 {
self.0.len() as u64
}
}
impl Binary {
pub fn new(bin: Vec<u8>) -> Self {
Binary(Arc::new(bin))
}
pub fn binary_equal(this: &Self, other: &Self) -> bool {
Arc::ptr_eq(&this.0, &other.0)
}
pub fn from_shared(shared: Arc<Vec<u8>>) -> Self {
Binary(shared)
}
pub fn from_slice<B: AsRef<[u8]>>(slice: B) -> Self {
Binary(Arc::new(Vec::from(slice.as_ref())))
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn to_shared(&self) -> Arc<Vec<u8>> {
self.0.clone()
}
}
pub trait KVAction: Send + Sync + 'static {
type Key: AsRef<[u8]>
+ Deref<Target = [u8]>
+ Hash
+ PartialEq
+ Eq
+ PartialOrd
+ Ord
+ Clone
+ Send
+ 'static;
type Value: AsRef<[u8]> + Deref<Target = [u8]> + Default + Clone + Send + 'static;
type Error: Debug + 'static;
fn dirty_query(
&self,
key: <Self as KVAction>::Key,
) -> BoxFuture<Option<<Self as KVAction>::Value>>;
fn query(&self, key: <Self as KVAction>::Key) -> BoxFuture<Option<<Self as KVAction>::Value>>;
fn dirty_upsert(
&self,
key: <Self as KVAction>::Key,
value: <Self as KVAction>::Value,
) -> BoxFuture<Result<(), <Self as KVAction>::Error>>;
fn upsert(
&self,
key: <Self as KVAction>::Key,
value: <Self as KVAction>::Value,
) -> BoxFuture<Result<(), <Self as KVAction>::Error>>;
fn dirty_delete(
&self,
key: <Self as KVAction>::Key,
) -> BoxFuture<Result<Option<<Self as KVAction>::Value>, <Self as KVAction>::Error>>;
fn delete(
&self,
key: <Self as KVAction>::Key,
) -> BoxFuture<Result<Option<<Self as KVAction>::Value>, <Self as KVAction>::Error>>;
fn keys<'a>(
&self,
key: Option<<Self as KVAction>::Key>,
descending: bool,
) -> BoxStream<'a, <Self as KVAction>::Key>;
fn values<'a>(
&self,
key: Option<<Self as KVAction>::Key>,
descending: bool,
) -> BoxStream<'a, (<Self as KVAction>::Key, <Self as KVAction>::Value)>;
fn lock_key(
&self,
key: <Self as KVAction>::Key,
) -> BoxFuture<Result<(), <Self as KVAction>::Error>>;
fn unlock_key(
&self,
key: <Self as KVAction>::Key,
) -> BoxFuture<Result<(), <Self as KVAction>::Error>>;
}
#[derive(Debug, Clone, PartialEq)]
pub enum KVDBTableType {
MemOrdTab = 1, LogOrdTab, LogWTab, BtreeOrdTab, }
impl From<u8> for KVDBTableType {
fn from(src: u8) -> Self {
match src {
1 => KVDBTableType::MemOrdTab,
2 => KVDBTableType::LogOrdTab,
3 => KVDBTableType::LogWTab,
4 => KVDBTableType::BtreeOrdTab,
_ => panic!(
"From u8 to KVDBTableType failed, src: {}, reason: invalid src",
src
),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct KVTableMeta {
table_type: KVDBTableType, persistence: bool, key: EnumType, value: EnumType, }
impl From<Binary> for KVTableMeta {
fn from(src: Binary) -> Self {
let mut buf = src.as_ref();
let mut offset = 0;
let table_type = KVDBTableType::from(buf.get_u8());
offset += 1;
let persistence = if buf.get_u8() == 0 {
false
} else {
true
};
offset += 1;
let key_len = buf.get_u16_le() as usize;
offset += 2;
let mut read_buffer = ReadBuffer::new(&buf[0..key_len], 0);
buf.advance(key_len); offset += key_len;
let key = EnumType::decode(&mut read_buffer).unwrap();
let value_len = buf.get_u16_le() as usize;
offset += 2;
let mut read_buffer = ReadBuffer::new(&buf[0..value_len], 0);
buf.advance(value_len);
offset += value_len;
let value = EnumType::decode(&mut read_buffer).unwrap();
KVTableMeta {
table_type,
persistence,
key,
value,
}
}
}
impl KVTableMeta {
pub fn new(
table_type: KVDBTableType,
persistence: bool,
key: EnumType,
value: EnumType,
) -> Self {
KVTableMeta {
table_type,
persistence,
key,
value,
}
}
pub fn with_compatibled(
table_type: KVDBTableType,
persistence: bool,
bin: &[u8],
) -> Result<Self, ReadBonErr> {
let mut buffer = ReadBuffer::new(bin, 0);
let key = EnumType::decode(&mut buffer)?;
let value = EnumType::decode(&mut buffer)?;
Ok(Self::new(table_type, persistence, key, value))
}
pub fn table_type(&self) -> &KVDBTableType {
&self.table_type
}
pub fn is_persistence(&self) -> bool {
self.persistence
}
pub fn key_type(&self) -> &EnumType {
&self.key
}
pub fn value_type(&self) -> &EnumType {
&self.value
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TableTrQos {
Unsafe = 0, ThreadSafe, Safe, }
impl Default for TableTrQos {
fn default() -> Self {
TableTrQos::Safe
}
}
#[derive(Debug, Clone)]
pub enum KVActionLog {
Read, Write(Option<Binary>), DirtyWrite(Option<Binary>), }
impl KVActionLog {
#[inline]
pub fn is_dirty_writed(&self) -> bool {
if let KVActionLog::DirtyWrite(_) = self {
true
} else {
false
}
}
}
#[derive(Clone)]
pub struct KVDBCommitConfirm<C: Clone + Send + 'static, Log: AsyncCommitLog<C = C, Cid = Guid>>(
Arc<(
MultiTaskRuntime<()>, Log, Guid, Option<Guid>, AtomicUsize, )>,
);
unsafe impl<C: Clone + Send + 'static, Log: AsyncCommitLog<C = C, Cid = Guid>> Send
for KVDBCommitConfirm<C, Log>
{
}
unsafe impl<C: Clone + Send + 'static, Log: AsyncCommitLog<C = C, Cid = Guid>> Sync
for KVDBCommitConfirm<C, Log>
{
}
impl<C: Clone + Send + 'static, Log: AsyncCommitLog<C = C, Cid = Guid>>
FnOnce<(Guid, Guid, Result<(), KVTableTrError>)> for KVDBCommitConfirm<C, Log>
{
type Output = Result<(), KVTableTrError>;
extern "rust-call" fn call_once(
self,
args: (Guid, Guid, Result<(), KVTableTrError>),
) -> Self::Output {
self.call(args)
}
}
impl<C: Clone + Send + 'static, Log: AsyncCommitLog<C = C, Cid = Guid>>
FnMut<(Guid, Guid, Result<(), KVTableTrError>)> for KVDBCommitConfirm<C, Log>
{
extern "rust-call" fn call_mut(
&mut self,
args: (Guid, Guid, Result<(), KVTableTrError>),
) -> Self::Output {
self.call(args)
}
}
impl<C: Clone + Send + 'static, Log: AsyncCommitLog<C = C, Cid = Guid>>
Fn<(Guid, Guid, Result<(), KVTableTrError>)> for KVDBCommitConfirm<C, Log>
{
extern "rust-call" fn call(
&self,
args: (Guid, Guid, Result<(), KVTableTrError>),
) -> Self::Output {
if let Err(e) = args.2 {
if let ErrorLevel::Fatal = &e.level() {
return Err(e);
}
}
self.confirm_commited(args.0, args.1)
}
}
impl<C: Clone + Send + 'static, Log: AsyncCommitLog<C = C, Cid = Guid>> KVDBCommitConfirm<C, Log> {
pub fn new(
rt: MultiTaskRuntime<()>,
commit_logger: Log,
tid: Guid,
cid: Option<Guid>,
count: usize,
) -> Self {
KVDBCommitConfirm(Arc::new((
rt,
commit_logger,
tid,
cid,
AtomicUsize::new(count),
)))
}
pub fn commit_logger(&self) -> &Log {
&(self.0).1
}
#[inline(always)]
fn confirm_commited(&self, tid: Guid, cid: Guid) -> Result<(), KVTableTrError> {
if (self.0).2 != tid || (self.0).3.clone().unwrap() != cid {
return Err(KVTableTrError::new_transaction_error(ErrorLevel::Normal,
format!("Confirm commited failed, require_transaction_uid: {:?}, require_commit_uid: {:?}, transaction_uid: {:?}, commit_uid: {:?}, reason: invalid transaction_uid or commit_uid", (self.0).2, (self.0).3, tid, cid)));
}
if (self.0).4.fetch_sub(1, Ordering::SeqCst) <= 1 {
let confirmer = self.clone();
let replay_confirm_delay_ms = std::env::var(TEST_REPLAY_CONFIRM_DELAY_MS_ENV)
.ok()
.and_then(|value| value.trim().parse::<usize>().ok())
.filter(|delay| *delay > 0);
let _ = (self.0).0.spawn(async move {
if let Some(delay_ms) = replay_confirm_delay_ms {
confirmer.0.0.timeout(delay_ms).await;
}
let last = COMMITED_LEN.fetch_add(1, Ordering::Relaxed);
if let Err(e) = (confirmer.0)
.1
.confirm(cid.clone())
.await {
warn!("Confirm commit log failed, transaction_uid: {:?}, commit_uid: {:?}, reason: {:?}", tid, cid, e);
}
#[cfg(feature = "log_table_debug")]
{
let event = TransactionDebugEvent::End(tid.clone(), cid.clone());
let logger = transaction_debug_logger();
logger.log(event);
}
});
}
Ok(())
}
}
static COMMITED_LEN: AtomicUsize = AtomicUsize::new(0);
const TEST_REPLAY_CONFIRM_DELAY_MS_ENV: &str = "PI_DB_TEST_REPLAY_CONFIRM_DELAY_MS";
#[derive(Debug)]
pub enum KVTableTrError {
Common(ErrorLevel, String), Conflicts(Atom, Binary), }
unsafe impl Send for KVTableTrError {}
unsafe impl Sync for KVTableTrError {}
impl TransactionError for KVTableTrError {
fn new_transaction_error<E>(level: ErrorLevel, reason: E) -> Self
where
E: Debug + Sized + 'static,
{
KVTableTrError::Common(
level,
format!("Table transaction error, reason: {:?}", reason),
)
}
}
impl KVTableTrError {
pub fn new_conflicts_error(table: Atom, key: Binary) -> Self {
KVTableTrError::Conflicts(table.as_ref().into(), Binary::from_slice(key))
}
pub fn is_common(&self) -> bool {
if let Self::Common(_, _) = self {
true
} else {
false
}
}
pub fn is_conflicts(&self) -> bool {
if let Self::Conflicts(_, _) = self {
true
} else {
false
}
}
pub fn level(&self) -> ErrorLevel {
if let Self::Common(level, _) = self {
level.clone()
} else {
ErrorLevel::Normal
}
}
pub fn conflicts(&self) -> Option<(&Atom, &Binary)> {
if let Self::Conflicts(table, binary) = self {
Some((table, binary))
} else {
None
}
}
}
use std::sync::OnceLock;
static TRANSACTION_DEBUG_LOGGER: OnceLock<TransactionDebugLogger> = OnceLock::new();
pub fn init_transaction_debug_logger<P: AsRef<Path>>(
rt: MultiTaskRuntime<()>,
path: P,
interval: usize,
timeout: usize,
) {
let logger = TransactionDebugLogger::new(rt, path);
if let Ok(_) = TRANSACTION_DEBUG_LOGGER.set(logger.clone()) {
logger.startup(interval, timeout);
}
}
pub fn transaction_debug_logger<'a>() -> &'a TransactionDebugLogger {
TRANSACTION_DEBUG_LOGGER.get().unwrap()
}
use pi_async_transaction::manager_2pc::Transaction2PcStatus;
use pi_atom::Atom;
pub enum TransactionDebugEvent {
Begin(Guid, Transaction2PcStatus, bool, bool, usize), Commit(Guid, Guid, Transaction2PcStatus, Atom, usize, usize), CommitConfirm(Guid, Guid, Atom, bool, bool), End(Guid, Guid), }
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use dashmap::{mapref::entry::Entry, DashMap};
use pi_store::log_store::log_file::{LogFile, LogMethod};
use std::path::Path;
use std::time::Instant;
pub struct TransactionDebugLogger(Arc<InnerTransactionDebugLogger>);
unsafe impl Send for TransactionDebugLogger {}
unsafe impl Sync for TransactionDebugLogger {}
impl Clone for TransactionDebugLogger {
fn clone(&self) -> Self {
TransactionDebugLogger(self.0.clone())
}
}
impl TransactionDebugLogger {
pub fn new<P: AsRef<Path>>(rt: MultiTaskRuntime<()>, path: P) -> Self {
let (sender, receiver) = unbounded();
let times = DashMap::new();
let rt_copy = rt.clone();
let (s, r) = bounded(1);
let path = path.as_ref().to_path_buf();
rt.spawn(async move {
let log = LogFile::open(rt_copy.clone(), path, 8096, 128 * 1024 * 1024, None)
.await
.unwrap();
s.send(log);
});
let log = r.recv().unwrap();
let inner = InnerTransactionDebugLogger {
rt,
sender,
receiver,
times,
log,
};
TransactionDebugLogger(Arc::new(inner))
}
pub fn log(&self, event: TransactionDebugEvent) {
self.0.sender.send(event);
}
pub fn startup(self, mut interval: usize, mut timeout: usize) {
if interval < 1000 {
interval = 1000;
}
if timeout < 1000 {
timeout = 1000;
}
let logger = self.clone();
self.0.rt.spawn(async move {
loop {
let events: Vec<TransactionDebugEvent> = logger.0.receiver.try_iter().collect();
let mut log_id = 0;
for event in events {
match event {
TransactionDebugEvent::Begin(tid, status, writable, require_persistence, output_size) => {
match logger.0.times.entry(tid.clone()) {
Entry::Occupied(o) => {
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("Transaction id conflict:\n\tstatus: {:?}\n\twritable: {:?}\n\trequire_persistence: {:?}\n\toutput_size: {:?}\n",
status,
writable,
require_persistence,
output_size).as_bytes());
},
Entry::Vacant(v) => {
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("Begin transaction:\n\tstatus: {:?}\n\twritable: {:?}\n\trequire_persistence: {:?}\n\toutput_size: {:?}\n",
status,
writable,
require_persistence,
output_size).as_bytes());
v.insert(Instant::now());
},
}
},
TransactionDebugEvent::Commit(tid, cid, status, table, actions_len, log_index) => {
if let Some(item) = logger.0.times.get(&tid) {
let time = item.value().elapsed();
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("Commit transaction successed:\n\tlog_index: {:?}\n\tcid: {:?}\n\tstatus: {:?}\n\ttable: {:?}\n\tactions_len: {:?}\n\ttime: {:?}\n",
log_index,
cid,
status,
table.as_str(),
actions_len,
time).as_bytes());
} else {
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("Commit transaction failed, transaction not exist:\n\tlog_index: {:?}\n\tcid: {:?}\n\tstatus: {:?}\n\ttable: {:?}\n\tactions_len: {:?}\n",
log_index,
cid,
status,
table.as_str(),
actions_len).as_bytes());
}
},
TransactionDebugEvent::CommitConfirm(tid, cid, table, writable, require_persistence) => {
if let Some(item) = logger.0.times.get(&tid) {
let time = item.value().elapsed();
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("Commit confirm transaction successed:\n\tcid: {:?}\n\ttable: {:?}\n\twritable: {:?}\n\trequire_persistence: {:?}\n\ttime: {:?}\n",
cid,
table.as_str(),
writable,
require_persistence,
time).as_bytes());
} else {
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("Commit confirm transaction failed, transaction not exist:\n\tcid: {:?}\n\ttable: {:?}\n\twritable: {:?}\n\trequire_persistence: {:?}\n",
cid,
table.as_str(),
writable,
require_persistence).as_bytes());
}
},
TransactionDebugEvent::End(tid, cid) => {
if let Some((_tid, now)) = logger.0.times.remove(&tid) {
let time = now.elapsed();
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("End transaction:\n\tcid: {:?}\n\ttime: {:?}\n",
cid,
time).as_bytes());
} else {
log_id = logger
.0
.log
.append(LogMethod::PlainAppend,
format!("{:?}", tid).as_bytes(),
format!("End transaction failed, transaction not exist:\n\tcid: {:?}\n",
cid).as_bytes());
}
},
}
}
let _ = logger
.0
.log
.commit(log_id,
false,
false,
Some(timeout)).await;
logger
.0
.rt
.timeout(interval)
.await;
}
});
}
}
struct InnerTransactionDebugLogger {
rt: MultiTaskRuntime<()>, sender: Sender<TransactionDebugEvent>, receiver: Receiver<TransactionDebugEvent>, times: DashMap<Guid, Instant>, log: LogFile, }