use std::any::Any;
use std::fmt::Debug;
use std::io::{Error, Result as IOResult, ErrorKind};
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
use bytes::BufMut;
use dashmap::DashMap;
use futures::future::{FutureExt, BoxFuture};
use pi_async_rt::rt::{AsyncRuntime,
multi_thread::MultiTaskRuntime};
use pi_guid::{GuidGen, Guid};
use log::{debug, warn};
use pi_atom::Atom;
use super::{ErrorLevel,
TransactionError,
AsyncTransaction,
Transaction2Pc,
TransactionTree,
AsyncCommitLog};
const DEFAULT_TRANSACTION_CTRL_ID: u16 = 0;
const DEFAULT_TRANSACTION_PREPARE_CTRL_ID: u16 = 1;
const DEFAULT_TRANSACTION_COMMIT_CTRL_ID: u16 = 2;
const DEFAULT_MAX_PARALLEL_TRANSACTION_LIMIT: usize = usize::MAX;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Transaction2PcStatus {
Start = 0, Initing, Inited, InitFailed, Actioning, Actioned, ActionFailed, Prepareing, Prepared, PrepareFailed, LogCommiting, LogCommited, LogCommitFailed, Commiting, Commited, CommitFailed, Rollbacking, Rollbacked, RollbackFailed, }
impl Default for Transaction2PcStatus {
fn default() -> Self {
Transaction2PcStatus::Start
}
}
pub struct Transaction2PcManager<
C: Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
>(Arc<Inner2PcManager<C, Log>>);
unsafe impl<
C: Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
> Send for Transaction2PcManager<C, Log> {}
unsafe impl<
C: Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
> Sync for Transaction2PcManager<C, Log> {}
impl<
C: Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
> Clone for Transaction2PcManager<C, Log> {
fn clone(&self) -> Self {
Transaction2PcManager(self.0.clone())
}
}
impl<
C: Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
> Transaction2PcManager<C, Log> {
pub fn new(rt: MultiTaskRuntime<()>,
uid_gen: GuidGen,
commit_logger: Log) -> Self {
let inner = Inner2PcManager {
rt,
uid_gen,
commit_logger,
trans_table: DashMap::default(),
source_counter: DashMap::default(),
prepare_produced: AtomicUsize::new(0),
prepare_consumed: AtomicUsize::new(0),
commit_produced: AtomicUsize::new(0),
commit_consumed: AtomicUsize::new(0),
produced_total: AtomicUsize::new(0),
consumed_total: AtomicUsize::new(0),
};
Transaction2PcManager(Arc::new(inner))
}
pub fn commit_logger(&self) -> Log {
self.0.commit_logger.clone()
}
pub fn transaction_len(&self) -> usize {
self.0.trans_table.len()
}
pub fn source_len(&self, source: &Atom) -> Option<usize> {
if let Some(counter) = self.0.source_counter.get(source) {
let (add, sub, _parallel_limit) = counter.value();
Some(add
.load(Ordering::Relaxed)
.checked_sub(sub.load(Ordering::Relaxed))
.unwrap_or(0))
} else {
None
}
}
pub fn get_max_source_parallel_limit(&self, source: &Atom) -> Option<usize> {
if let Some(counter) = self.0.source_counter.get(source) {
let (_add, _sub, parallel_limit) = counter.value();
Some(parallel_limit.load(Ordering::Relaxed))
} else {
None
}
}
pub fn set_max_source_parallel_limit(&self, source: &Atom, limit: usize) {
if let Some(counter) = self.0.source_counter.get(source) {
let (_add, _sub, parallel_limit) = counter.value();
parallel_limit.store(limit, Ordering::SeqCst);
} else {
self.0.source_counter.insert(source.clone(),
(AtomicUsize::new(0), AtomicUsize::new(0), AtomicUsize::new(limit)));
}
}
pub fn prepare_len(&self) -> usize {
self
.0
.prepare_produced
.load(Ordering::Relaxed)
.checked_sub(self.0.prepare_consumed.load(Ordering::Relaxed))
.unwrap_or(0)
}
pub fn commit_len(&self) -> usize {
self.0
.commit_produced
.load(Ordering::Relaxed)
.checked_sub(self.0.commit_consumed.load(Ordering::Relaxed))
.unwrap_or(0)
}
pub fn produced_transaction_total(&self) -> usize {
self.0.produced_total.load(Ordering::Relaxed)
}
pub fn consumed_transaction_total(&self) -> usize {
self.0.consumed_total.load(Ordering::Relaxed)
}
pub fn get_transaction_status<T>(&self, uid: &Guid) -> Option<Transaction2PcStatus>
where T: TransactionTree<Tid = Guid, Pid = Guid, Cid = Guid, Node = T, Status = Transaction2PcStatus> {
if let Some(shared_tr) = self.0.trans_table.get(uid) {
if let Some(tr) = <dyn Any>::downcast_ref::<T>(shared_tr.value()) {
return Some(tr.get_status());
}
}
None
}
pub fn finish<T>(&self, tr: T)
where T: TransactionTree<Tid = Guid, Pid = Guid, Cid = Guid, Node = T, Status = Transaction2PcStatus> {
let status = tr.get_status();
if status == Transaction2PcStatus::Start
|| status == Transaction2PcStatus::Initing
|| status == Transaction2PcStatus::Actioning
|| status == Transaction2PcStatus::Prepareing
|| status == Transaction2PcStatus::LogCommiting
|| status == Transaction2PcStatus::Commiting
|| status == Transaction2PcStatus::Rollbacking
|| status == Transaction2PcStatus::CommitFailed
|| status == Transaction2PcStatus::RollbackFailed {
warn!("Finish transaction failed, status: {:?}, reason: invalid status",
status);
return;
}
if let Some(transaction_uid) = tr.get_transaction_uid() {
if let Some(counter) = self.0.source_counter.get(&tr.get_source()) {
counter.value().1.fetch_add(1, Ordering::Relaxed);
}
let _ = self.0.trans_table.remove(&transaction_uid);
}
self.0.consumed_total.fetch_add(1, Ordering::Relaxed); }
}
impl<
C: Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
> Transaction2PcManager<C, Log> {
pub async fn start<T>(&self, tr: T)
-> Result<<T as AsyncTransaction>::Output, <T as AsyncTransaction>::Error>
where T: TransactionTree<Tid = Guid, Pid = Guid, Cid = Guid, Node = T, Status = Transaction2PcStatus> {
let current_tr_status = tr.get_status();
if current_tr_status != Transaction2PcStatus::Start {
warn!("Start root transaction failed, status: {:?}, reason: invalid transaction status", current_tr_status);
tr.set_status(Transaction2PcStatus::InitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Normal, format!("Init root failed, type: unit, status: {:?}, reason: invalid transaction status", current_tr_status)));
}
tr.set_transaction_uid(alloc_transaction_uid(&self.0.uid_gen, &tr)); if let Err(current_len) = register_transcation(&self, &tr) {
warn!("Start root transaction failed, status: {:?}, reason: {:?}", tr.get_status(), current_len);
tr.set_status(Transaction2PcStatus::InitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Normal, format!("Init root failed, type: unit, current: {}, status: {:?}, reason: same source transaction excessive", current_len, tr.get_status())));
}
tr.set_status(Transaction2PcStatus::Initing);
let result = self.init_childrens(tr.clone()).await;
if result.is_err() {
tr.set_status(Transaction2PcStatus::InitFailed); } else {
tr.set_status(Transaction2PcStatus::Inited); }
result
}
fn init_childrens<T>(&self, tr: T)
-> BoxFuture<Result<<T as AsyncTransaction>::Output, <T as AsyncTransaction>::Error>>
where T: TransactionTree<Tid = Guid, Pid = Guid, Cid = Guid, Node = T, Status = Transaction2PcStatus> {
let mgr = self.clone();
async move {
if tr.is_unit() {
return tr.init().await;
}
let childs: Vec<<T as TransactionTree>::Node> = tr.to_children().collect();
for child in childs {
if child.is_unit() {
child.set_transaction_uid(alloc_transaction_uid(&mgr.0.uid_gen, &tr)); child.set_status(Transaction2PcStatus::Initing);
if let Err(e) = child.init().await {
warn!("Start child transaction failed, type: unit, status: {:?}, reason: {:?}", child.get_status(), e);
child.set_status(Transaction2PcStatus::InitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Normal, format!("Init children failed, type: unit, reason: {:?}", e)));
}
child.set_status(Transaction2PcStatus::Inited); } else if child.is_tree() {
if let Err(e) = mgr.start(child).await {
warn!("Start child transaction failed, type: tree, status: {:?}, reason: {:?}", tr.get_status(), e);
return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Normal, format!("Init children failed, type: tree, reason: {:?}", e)));
}
} else {
warn!("Start child transaction failed, type: invalid, status: {:?}, reason: invalid transaction type", child.get_status());
child.set_status(Transaction2PcStatus::InitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Normal, "Init transaction failed, reason: invalid transaction type"));
}
}
match tr.init().await {
Err(e) => {
warn!("Start root transaction failed, status: {:?}, reason: {:?}", tr.get_status(), e);
Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Normal, format!("Init root failed, type: tree, reason: {:?}", e)))
},
Ok(output) => {
Ok(output)
},
}
}.boxed()
}
pub async fn prepare<T>(&self, tr: T)
-> Result<Option<<T as Transaction2Pc>::PrepareOutput>, <T as Transaction2Pc>::PrepareError>
where T: TransactionTree<Tid = Guid, Pid = Guid, Cid = Guid, Node = T, Status = Transaction2PcStatus> {
let current_tr_status = tr.get_status();
if current_tr_status != Transaction2PcStatus::Inited
&& current_tr_status != Transaction2PcStatus::Actioned
&& current_tr_status != Transaction2PcStatus::Rollbacked {
warn!("Prepare root transaction failed, status: {:?}, reason: invalid transaction status", current_tr_status);
tr.set_status(Transaction2PcStatus::PrepareFailed); return Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal, format!("Prepare root failed, type: unit, transaction_uid: {:?}, prepare_uid: {:?}, status: {:?}, reason: invalid transaction status", tr.get_transaction_uid(), tr.get_prepare_uid(), current_tr_status)));
}
if !tr.is_writable() {
tr.set_status(Transaction2PcStatus::Prepared); return Ok(None);
}
self.0.prepare_produced.fetch_add(1, Ordering::Relaxed); tr.set_status(Transaction2PcStatus::Prepareing);
let result = if tr.is_unit() {
tr.prepare().await
} else {
self.prepare_childrens(tr.clone()).await
};
if result.is_err() {
tr.set_status(Transaction2PcStatus::PrepareFailed); } else {
tr.set_status(Transaction2PcStatus::Prepared); }
self.0.prepare_consumed.fetch_add(1, Ordering::Relaxed);
result
}
fn prepare_childrens<T>(&self, tr: T)
-> BoxFuture<Result<Option<<T as Transaction2Pc>::PrepareOutput>, <T as Transaction2Pc>::PrepareError>>
where T: TransactionTree<Tid = Guid, Pid = Guid, Cid = Guid, Node = T, Status = Transaction2PcStatus> {
let mgr = self.clone();
async move {
if tr.is_require_persistence() {
tr.set_commit_uid(alloc_commit_uid(&mgr.0.uid_gen, &tr)); }
if tr.is_concurrent_prepare() {
let mut map_reduce = mgr.0.rt.map_reduce(tr.children_len());
let childs: Vec<<T as TransactionTree>::Node> = tr.to_children().collect();
for child in childs {
let child_uid = child.get_transaction_uid();
let prepare_uid = child.get_prepare_uid();
if child.is_unit() {
if child.is_require_persistence() {
child.set_commit_uid(alloc_commit_uid(&mgr.0.uid_gen, &tr)); }
let child_copy = child.clone();
if let Err(e) = map_reduce.map(mgr.0.rt.clone(), async move {
child_copy.set_status(Transaction2PcStatus::Prepareing);
match child_copy.prepare().await {
Err(e) => {
debug!("Prepare child transaction failed, type: unit, status: {:?}, reason: {:?}", child_copy.get_status(), e);
child_copy.set_status(Transaction2PcStatus::PrepareFailed); Err(Error::new(ErrorKind::Other, format!("Prepare children failed, type: unit, child_uid: {:?}, prepare_uid: {:?}, reason: {:?}", child_copy.get_transaction_uid(), child_copy.get_prepare_uid(), e)))
},
Ok(output) => {
child_copy.set_status(Transaction2PcStatus::Prepared); Ok(output)
},
}
}) {
debug!("Prepare child transaction failed, type: unit, status: {:?}, reason: {:?}", child.get_status(), e);
child.set_status(Transaction2PcStatus::PrepareFailed); return Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal, format!("Map children prepare failed, type: unit, child_uid: {:?}, prepare_uid: {:?}, reason: {:?}", child_uid, prepare_uid, e)));
};
} else if child.is_tree() {
let mgr_copy = mgr.clone();
if let Err(e) = map_reduce.map(mgr.0.rt.clone(), async move {
let child_uid = child.get_transaction_uid();
let prepare_uid = child.get_prepare_uid();
match mgr_copy.prepare(child).await {
Err(e) => {
debug!("Prepare child transaction failed, type: tree, uid: {:?}, reason: {:?}", (&child_uid, &prepare_uid), e);
Err(Error::new(ErrorKind::Other, format!("Prepare children failed, type: tree, child_uid: {:?}, prepare_uid: {:?}, reason: {:?}", child_uid, prepare_uid, e)))
},
Ok(output) => {
Ok(output)
},
}
}) {
debug!("Prepare child transaction failed, type: tree, status: {:?}, reason: {:?}", tr.get_status(), e);
return Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal, format!("Map children prepare failed, type: tree, child_uid: {:?}, prepare_uid: {:?}, reason: {:?}", child_uid, prepare_uid, e)));
};
} else {
warn!("Prepare child transaction failed, type: invalid, status: {:?}, reason: invalid transaction type", tr.get_status());
child.set_status(Transaction2PcStatus::PrepareFailed); return Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal, format!("Prepare transaction failed, child_uid: {:?}, prepare_uid: {:?}, reason: invalid transaction type", child_uid, prepare_uid)));
}
}
match map_reduce.reduce(true).await {
Err(e) => {
debug!("Prepare child transaction failed,status: {:?}, reason: {:?}", tr.get_status(), e);
Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal, format!("Reduce children prepare failed, type: tree, transaction_uid: {:?}, prepare_uid: {:?}, reason: {:?}", tr.get_transaction_uid(), tr.get_prepare_uid(), e)))
},
Ok(results) => {
let mut childs_output: Vec<u8> = Vec::new();
for result in results {
match result {
Err(e) => {
return Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal, e));
},
Ok(child_output) => {
if let Some(child_output) = child_output {
let buf = child_output.as_ref();
if buf.len() > 0 {
childs_output.put_slice(buf);
}
}
},
}
}
match tr.prepare().await {
Err(e) => {
debug!("Prepare root transaction failed, status: {:?}, reason: {:?}", tr.get_status(), e);
Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal, format!("Prepare root failed, type: tree, transaction_uid: {:?}, prepare_uid: {:?}, reason: {:?}", tr.get_transaction_uid(), tr.get_prepare_uid(), e)))
},
Ok(output) => {
if let Some(mut output) = output {
if childs_output.len() > 0 {
output.put_slice(childs_output.as_ref());
}
Ok(Some(output))
} else {
Ok(None)
}
},
}
}
}
} else {
let childs: Vec<<T as TransactionTree>::Node> = tr.to_children().collect();
let mut childs_output: Vec<u8> = Vec::new();
for child in childs {
if child.is_unit() {
if child.is_require_persistence() {
child.set_commit_uid(alloc_commit_uid(&mgr.0.uid_gen, &tr)); }
child.set_status(Transaction2PcStatus::Prepareing);
match child.prepare().await {
Err(e) => {
debug!("Prepare child transaction failed, type: unit, status: {:?}, reason: {:?}", child.get_status(), e);
child.set_status(Transaction2PcStatus::PrepareFailed); return Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal, format!("Prepare children failed, type: unit, child_uid: {:?}, prepare_uid: {:?}, reason: {:?}", child.get_transaction_uid(), child.get_prepare_uid(), e)));
},
Ok(child_output) => {
child.set_status(Transaction2PcStatus::Prepared);
if let Some(child_output) = child_output {
let buf = child_output.as_ref();
if buf.len() > 0 {
childs_output.put_slice(buf);
}
}
},
}
} else if child.is_tree() {
let child_uid = child.get_transaction_uid();
let prepare_uid = child.get_prepare_uid();
match mgr.prepare(child).await {
Err(e) => {
debug!("Prepare child transaction failed, type: tree, status: {:?}, reason: {:?}", tr.get_status(), e);
return Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal, format!("Prepare children failed, type: tree, child_uid: {:?}, prepare_uid: {:?}, reason: {:?}", child_uid, prepare_uid, e)));
},
Ok(child_output) => {
if let Some(child_output) = child_output {
let buf = child_output.as_ref();
if buf.len() > 0 {
childs_output.put_slice(buf);
}
}
},
}
} else {
debug!("Prepare child transaction failed, type: invalid, status: {:?}, reason: invalid transaction type", child.get_status());
child.set_status(Transaction2PcStatus::PrepareFailed); return Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal, format!("Prepare transaction failed, child_uid: {:?}, prepare_uid: {:?}, reason: invalid transaction type", child.get_transaction_uid(), child.get_prepare_uid())));
}
}
match tr.prepare().await {
Err(e) => {
debug!("Prepare root transaction failed, status: {:?}, reason: {:?}", tr.get_status(), e);
Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal, format!("Prepare root failed, type: tree, transaction_uid: {:?}, prepare_uid: {:?}, reason: {:?}", tr.get_transaction_uid(), tr.get_prepare_uid(), e)))
},
Ok(output) => {
if let Some(mut output) = output {
if childs_output.len() > 0 {
output.put_slice(childs_output.as_ref());
}
Ok(Some(output))
} else {
Ok(None)
}
},
}
}
}.boxed()
}
pub async fn prepare_conflicts<T>(&self, tr: T)
-> Result<Option<<T as Transaction2Pc>::PrepareOutput>, <T as Transaction2Pc>::PrepareError>
where T: TransactionTree<Tid = Guid, Pid = Guid, Cid = Guid, Node = T, Status = Transaction2PcStatus>,
<T as Transaction2Pc>::PrepareError: Send,
{
let current_tr_status = tr.get_status();
if current_tr_status != Transaction2PcStatus::Inited
&& current_tr_status != Transaction2PcStatus::Actioned
&& current_tr_status != Transaction2PcStatus::Rollbacked {
warn!("Prepare root transaction conflicts failed, status: {:?}, reason: invalid transaction status",
current_tr_status);
tr.set_status(Transaction2PcStatus::PrepareFailed); return Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal,
format!("Prepare root conflicts failed, type: unit, transaction_uid: {:?}, prepare_uid: {:?}, status: {:?}, reason: invalid transaction status",
tr.get_transaction_uid(),
tr.get_prepare_uid(),
current_tr_status)));
}
if !tr.is_writable() {
tr.set_status(Transaction2PcStatus::Prepared); return Ok(None);
}
self.0.prepare_produced.fetch_add(1, Ordering::Relaxed); tr.set_status(Transaction2PcStatus::Prepareing);
let result = if tr.is_unit() {
tr.prepare_conflicts().await
} else {
self.prepare_childrens_conflicts(tr.clone()).await
};
if result.is_err() {
tr.set_status(Transaction2PcStatus::PrepareFailed); } else {
tr.set_status(Transaction2PcStatus::Prepared); }
self.0.prepare_consumed.fetch_add(1, Ordering::Relaxed);
result
}
fn prepare_childrens_conflicts<T>(&self, tr: T)
-> BoxFuture<Result<Option<<T as Transaction2Pc>::PrepareOutput>, <T as Transaction2Pc>::PrepareError>>
where T: TransactionTree<Tid = Guid, Pid = Guid, Cid = Guid, Node = T, Status = Transaction2PcStatus>,
<T as Transaction2Pc>::PrepareError: Send,
{
let mgr = self.clone();
async move {
if tr.is_require_persistence() {
tr.set_commit_uid(alloc_commit_uid(&mgr.0.uid_gen, &tr)); }
if tr.is_concurrent_prepare() {
let mut map_reduce = mgr.0.rt.map_reduce(tr.children_len());
let childs: Vec<<T as TransactionTree>::Node> = tr.to_children().collect();
for child in childs {
let child_uid = child.get_transaction_uid();
let prepare_uid = child.get_prepare_uid();
if child.is_unit() {
if child.is_require_persistence() {
child.set_commit_uid(alloc_commit_uid(&mgr.0.uid_gen, &tr)); }
let child_copy = child.clone();
if let Err(e) = map_reduce.map(mgr.0.rt.clone(), async move {
child_copy.set_status(Transaction2PcStatus::Prepareing);
match child_copy.prepare_conflicts().await {
Err(e) => {
debug!("Prepare child transaction conflicts failed, type: unit, status: {:?}, reason: {:?}",
child_copy.get_status(),
e);
child_copy.set_status(Transaction2PcStatus::PrepareFailed); Ok(Err(e))
},
Ok(output) => {
child_copy.set_status(Transaction2PcStatus::Prepared); Ok(Ok(output))
},
}
}) {
debug!("Prepare child transaction conflicts failed, type: unit, status: {:?}, reason: {:?}",
child.get_status(),
e);
child.set_status(Transaction2PcStatus::PrepareFailed); return Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal,
format!("Map children prepare conflicts failed, type: unit, child_uid: {:?}, prepare_uid: {:?}, reason: {:?}",
child_uid,
prepare_uid,
e)));
};
} else if child.is_tree() {
let mgr_copy = mgr.clone();
if let Err(e) = map_reduce.map(mgr.0.rt.clone(), async move {
let child_uid = child.get_transaction_uid();
let prepare_uid = child.get_prepare_uid();
match mgr_copy.prepare_conflicts(child).await {
Err(e) => {
debug!("Prepare child transaction conflicts failed, type: tree, uid: {:?}, reason: {:?}",
(&child_uid, &prepare_uid),
e);
Ok(Err(e))
},
Ok(output) => {
Ok(Ok(output))
},
}
}) {
debug!("Prepare child transaction conflicts failed, type: tree, status: {:?}, reason: {:?}",
tr.get_status(),
e);
return Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal,
format!("Map children prepare conflicts failed, type: tree, child_uid: {:?}, prepare_uid: {:?}, reason: {:?}",
child_uid,
prepare_uid,
e)));
};
} else {
warn!("Prepare child transaction conflicts failed, type: invalid, status: {:?}, reason: invalid transaction type",
tr.get_status());
child.set_status(Transaction2PcStatus::PrepareFailed); return Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal,
format!("Prepare transaction conflicts failed, child_uid: {:?}, prepare_uid: {:?}, reason: invalid transaction type",
child_uid,
prepare_uid)));
}
}
match map_reduce.reduce(true).await {
Err(e) => {
debug!("Prepare child transaction conflicts failed,status: {:?}, reason: {:?}",
tr.get_status(),
e);
Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal,
format!("Reduce children prepare conflicts failed, type: tree, transaction_uid: {:?}, prepare_uid: {:?}, reason: {:?}",
tr.get_transaction_uid(),
tr.get_prepare_uid(),
e)))
},
Ok(results) => {
let mut childs_output: Vec<u8> = Vec::new();
for result in results {
match result {
Err(e) => {
return Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal, e));
},
Ok(Err(e)) => {
return Err(e);
},
Ok(Ok(child_output)) => {
if let Some(child_output) = child_output {
let buf = child_output.as_ref();
if buf.len() > 0 {
childs_output.put_slice(buf);
}
}
},
}
}
match tr.prepare_conflicts().await {
Err(e) => {
debug!("Prepare root transaction conflicts failed, status: {:?}, reason: {:?}",
tr.get_status(),
e);
Err(e)
},
Ok(output) => {
if let Some(mut output) = output {
if childs_output.len() > 0 {
output.put_slice(childs_output.as_ref());
}
Ok(Some(output))
} else {
Ok(None)
}
},
}
}
}
} else {
let childs: Vec<<T as TransactionTree>::Node> = tr.to_children().collect();
let mut childs_output: Vec<u8> = Vec::new();
for child in childs {
if child.is_unit() {
if child.is_require_persistence() {
child.set_commit_uid(alloc_commit_uid(&mgr.0.uid_gen, &tr)); }
child.set_status(Transaction2PcStatus::Prepareing);
match child.prepare_conflicts().await {
Err(e) => {
debug!("Prepare child transaction conflicts failed, type: unit, status: {:?}, reason: {:?}",
child.get_status(),
e);
child.set_status(Transaction2PcStatus::PrepareFailed); return Err(e);
},
Ok(child_output) => {
child.set_status(Transaction2PcStatus::Prepared);
if let Some(child_output) = child_output {
let buf = child_output.as_ref();
if buf.len() > 0 {
childs_output.put_slice(buf);
}
}
},
}
} else if child.is_tree() {
match mgr.prepare_conflicts(child).await {
Err(e) => {
debug!("Prepare child transaction conflicts failed, type: tree, status: {:?}, reason: {:?}",
tr.get_status(),
e);
return Err(e);
},
Ok(child_output) => {
if let Some(child_output) = child_output {
let buf = child_output.as_ref();
if buf.len() > 0 {
childs_output.put_slice(buf);
}
}
},
}
} else {
debug!("Prepare child transaction conflicts failed, type: invalid, status: {:?}, reason: invalid transaction type",
child.get_status());
child.set_status(Transaction2PcStatus::PrepareFailed); return Err(<T as Transaction2Pc>::PrepareError::new_transaction_error(ErrorLevel::Normal,
format!("Prepare transaction conflicts failed, child_uid: {:?}, prepare_uid: {:?}, reason: invalid transaction type",
child.get_transaction_uid(),
child.get_prepare_uid())));
}
}
match tr.prepare_conflicts().await {
Err(e) => {
debug!("Prepare root transaction conflicts failed, status: {:?}, reason: {:?}",
tr.get_status(),
e);
Err(e)
},
Ok(output) => {
if let Some(mut output) = output {
if childs_output.len() > 0 {
output.put_slice(childs_output.as_ref());
}
Ok(Some(output))
} else {
Ok(None)
}
},
}
}
}.boxed()
}
pub async fn commit<T>(&self,
tr: T,
input: <T as Transaction2Pc>::PrepareOutput,
confirm: <T as Transaction2Pc>::CommitConfirm)
-> Result<<T as AsyncTransaction>::Output, <T as AsyncTransaction>::Error>
where T: TransactionTree<Cid = Guid, Node = T, Status = Transaction2PcStatus> {
let current_tr_status = tr.get_status();
if current_tr_status != Transaction2PcStatus::Prepared {
warn!("Commmit root transaction failed, status: {:?}, reason: invalid transaction status", current_tr_status);
tr.set_status(Transaction2PcStatus::LogCommitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Normal, format!("Commit root failed, type: unit, transaction_uid: {:?}, commit_uid: {:?}, status: {:?}, reason: invalid transaction status", tr.get_transaction_uid(), tr.get_commit_uid(), current_tr_status)));
}
if tr.is_writable() && tr.is_require_persistence() {
if input.as_ref().len() == 0 {
tr.set_status(Transaction2PcStatus::Commited); return Ok(<T as AsyncTransaction>::Output::default());
}
self.0.commit_produced.fetch_add(1, Ordering::Relaxed); tr.set_status(Transaction2PcStatus::LogCommiting);
match self.0.commit_logger.append(tr.get_commit_uid().unwrap(),
input).await {
Err(e) => {
warn!("Commmit root transaction failed, status: {:?}, reason: {:?}", tr.get_status(), e);
tr.set_status(Transaction2PcStatus::LogCommitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Normal, format!("Commit transaction failed, type: tree, transaction_uid: {:?}, commit_uid: {:?}, reason: {:?}", tr.get_transaction_uid(), tr.get_commit_uid(), e)));
},
Ok(log_handle) => {
if let Err(e) = self.0.commit_logger.flush(log_handle).await {
warn!("Commmit root transaction failed, status: {:?}, reason: {:?}", tr.get_status(), e);
tr.set_status(Transaction2PcStatus::LogCommitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Normal, format!("Commit transaction failed, type: tree, transaction_uid: {:?}, commit_uid: {:?}, reason: {:?}", tr.get_transaction_uid(), tr.get_commit_uid(), e)));
}
tr.set_status(Transaction2PcStatus::LogCommited); },
}
} else if !tr.is_require_persistence() {
tr.set_status(Transaction2PcStatus::LogCommited); } else if !tr.is_writable() {
tr.set_status(Transaction2PcStatus::Commited); return Ok(<T as AsyncTransaction>::Output::default());
}
let current_tr_status = tr.get_status();
if current_tr_status != Transaction2PcStatus::LogCommited {
warn!("Commmit root transaction failed, status: {:?}, reason: invalid transaction type", tr.get_status());
tr.set_status(Transaction2PcStatus::CommitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Normal, format!("Commit root failed, type: unit, transaction_uid: {:?}, commit_uid: {:?}, status: {:?}, reason: invalid transaction status", tr.get_transaction_uid(), tr.get_commit_uid(), current_tr_status)));
}
let result = self.commit_confirm(tr.clone(), confirm).await;
if result.is_err() {
tr.set_status(Transaction2PcStatus::CommitFailed); } else {
tr.set_status(Transaction2PcStatus::Commited); }
self.0.commit_consumed.fetch_add(1, Ordering::Relaxed);
result
}
fn commit_confirm<T>(&self,
tr: T,
confirm: <T as Transaction2Pc>::CommitConfirm)
-> BoxFuture<Result<<T as AsyncTransaction>::Output, <T as AsyncTransaction>::Error>>
where T: TransactionTree<Node = T, Status = Transaction2PcStatus> {
let mgr = self.clone();
async move {
let current_tr_status = tr.get_status();
if current_tr_status != Transaction2PcStatus::LogCommited {
warn!("Confirm root transaction failed, status: {:?}, reason: invalid transaction status", current_tr_status);
tr.set_status(Transaction2PcStatus::CommitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Normal, format!("Commit root failed, type: unit, transaction_uid: {:?}, commit_uid: {:?}, status: {:?}, reason: invalid transaction status", tr.get_transaction_uid(), tr.get_commit_uid(), current_tr_status)));
}
tr.set_status(Transaction2PcStatus::LogCommiting);
if tr.is_unit() {
return tr.commit(confirm).await;
}
if tr.is_concurrent_commit() {
let mut map_reduce = mgr.0.rt.map_reduce(tr.children_len());
let childs: Vec<<T as TransactionTree>::Node> = tr.to_children().collect();
for child in childs {
let child_uid = child.get_transaction_uid();
let commit_uid = child.get_commit_uid();
if child.is_unit() {
let child_copy = child.clone();
let confirm_copy = confirm.clone();
if let Err(e) = map_reduce.map(mgr.0.rt.clone(), async move {
child_copy.set_status(Transaction2PcStatus::Commiting);
match child_copy.commit(confirm_copy).await {
Err(e) => {
warn!("Confirm child transaction failed, type: unit, status: {:?}, reason: {:?}", child_copy.get_status(), e);
child_copy.set_status(Transaction2PcStatus::Commiting); Err(Error::new(ErrorKind::Other, format!("Commit children failed, type: unit, child_uid: {:?}, commit_uid: {:?}, reason: {:?}", child_copy.get_transaction_uid(), child_copy.get_commit_uid(), e)))
},
Ok(output) => {
child_copy.set_status(Transaction2PcStatus::Commited); Ok(output)
},
}
}) {
warn!("Confirm child transaction failed, type: unit, status: {:?}, reason: {:?}", tr.get_status(), e);
child.set_status(Transaction2PcStatus::CommitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Map children commit failed, type: unit, child_uid: {:?}, commit_uid: {:?}, reason: {:?}", child_uid, commit_uid, e)));
};
} else if child.is_tree() {
let mgr_copy = mgr.clone();
let child_copy = child.clone();
let confirm_copy = confirm.clone();
if let Err(e) = map_reduce.map(mgr.0.rt.clone(), async move {
let child_uid = child.get_transaction_uid();
let commit_uid = child.get_commit_uid();
child_copy.set_status(Transaction2PcStatus::Commiting);
match mgr_copy.commit_confirm(child, confirm_copy).await {
Err(e) => {
warn!("Confirm child transaction failed, type: tree, status: {:?}, reason: {:?}", child_copy.get_status(), e);
child_copy.set_status(Transaction2PcStatus::CommitFailed); Err(Error::new(ErrorKind::Other, format!("Commit children failed, type: tree, child_uid: {:?}, commit_uid: {:?}, reason: {:?}", child_uid, commit_uid, e)))
},
Ok(output) => {
child_copy.set_status(Transaction2PcStatus::Commited); Ok(output)
},
}
}) {
warn!("Confirm child transaction failed, type: tree, status: {:?}, reason: {:?}", tr.get_status(), e);
return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Map children commit failed, type: tree, child_uid: {:?}, commit_uid: {:?}, reason: {:?}", child_uid, commit_uid, e)));
};
} else {
warn!("Confirm child transaction failed, type: invalid, status: {:?}, reason: invalid transaction type", child.get_status());
child.set_status(Transaction2PcStatus::CommitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Commit transaction failed, child_uid: {:?}, commit_uid: {:?}, reason: invalid transaction type", child_uid, commit_uid)));
}
}
match map_reduce.reduce(true).await {
Err(e) => {
warn!("Confirm child transaction failed, status: {:?}, reason: {:?}", tr.get_status(), e);
Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Reduce children commit failed, type: tree, child_uid: {:?}, commit_uid: {:?}, reason: {:?}", tr.get_transaction_uid(), tr.get_commit_uid(), e)))
},
Ok(results) => {
for result in results {
if let Err(e) = result {
warn!("Confirm child transaction failed, status: {:?}, reason: {:?}", tr.get_status(), e);
return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Commit children failed, type: tree, child_uid: {:?}, commit_uid: {:?}, reason: {:?}", tr.get_transaction_uid(), tr.get_commit_uid(), e)));
}
}
match tr.commit(confirm).await {
Err(e) => {
warn!("Confirm root transaction failed, status: {:?}, reason: {:?}", tr.get_status(), e);
Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Commit root failed, type: tree, transaction_uid: {:?}, commit_uid: {:?}, reason: {:?}", tr.get_transaction_uid(), tr.get_commit_uid(), e)))
},
Ok(output) => {
Ok(output)
},
}
}
}
} else {
let childs: Vec<<T as TransactionTree>::Node> = tr.to_children().collect();
for child in childs {
if child.is_unit() {
child.set_status(Transaction2PcStatus::Commiting);
if let Err(e) = child.commit(confirm.clone()).await {
warn!("Confirm child transaction failed, type: unit, status: {:?}, reason: {:?}", child.get_status(), e);
child.set_status(Transaction2PcStatus::CommitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Commit children failed, type: unit, child_uid: {:?}, commit_uid: {:?}, reason: {:?}", child.get_transaction_uid(), child.get_commit_uid(), e)));
}
child.set_status(Transaction2PcStatus::Commited); } else if child.is_tree() {
let child_uid = child.get_transaction_uid();
let commit_uid = child.get_commit_uid();
child.set_status(Transaction2PcStatus::Commiting);
if let Err(e) = mgr.commit_confirm(child.clone(), confirm.clone()).await {
warn!("Confirm child transaction failed, type: tree, status: {:?}, reason: {:?}", child.get_status(), e);
child.set_status(Transaction2PcStatus::CommitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Commit children failed, type: tree, child_uid: {:?}, commit_uid: {:?}, reason: {:?}", child_uid, commit_uid, e)));
}
child.set_status(Transaction2PcStatus::Commited); } else {
warn!("Confirm child transaction failed, type: invalid, status: {:?}, reason: invalid transaction type", child.get_status());
child.set_status(Transaction2PcStatus::CommitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Commit transaction failed, child_uid: {:?}, commit_uid: {:?}, reason: invalid transaction type", child.get_transaction_uid(), child.get_commit_uid())));
}
}
match tr.commit(confirm).await {
Err(e) => {
warn!("Confirm root transaction failed, status: {:?}, reason: {:?}", tr.get_status(), e);
Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Commit root failed, type: tree, transaction_uid: {:?}, commit_uid: {:?}, reason: {:?}", tr.get_transaction_uid(), tr.get_commit_uid(), e)))
},
Ok(output) => {
Ok(output)
},
}
}
}.boxed()
}
pub async fn rollback<T>(&self, tr: T)
-> Result<<T as AsyncTransaction>::Output, <T as AsyncTransaction>::Error>
where T: TransactionTree<Node = T, Status = Transaction2PcStatus> {
let current_tr_status = tr.get_status();
if current_tr_status != Transaction2PcStatus::ActionFailed
&& current_tr_status != Transaction2PcStatus::PrepareFailed
&& current_tr_status != Transaction2PcStatus::LogCommitFailed {
warn!("Rollback root transaction failed, status: {:?}, reason: invalid transaction", current_tr_status);
tr.set_status(Transaction2PcStatus::RollbackFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Normal, format!("Rollback root failed, type: unit, transaction_uid: {:?}, commit_uid: {:?}, status: {:?}, reason: invalid transaction status", tr.get_transaction_uid(), tr.get_commit_uid(), current_tr_status)));
}
if !tr.is_writable() {
tr.set_status(Transaction2PcStatus::Rollbacked); return Ok(<T as AsyncTransaction>::Output::default());
}
tr.set_status(Transaction2PcStatus::Rollbacking);
let result = if tr.is_unit() {
tr.rollback().await
} else {
self.rollback_childrens(tr.clone()).await
};
if result.is_err() {
tr.set_status(Transaction2PcStatus::RollbackFailed); } else {
tr.set_status(Transaction2PcStatus::Rollbacked); }
result
}
fn rollback_childrens<T>(&self, tr: T)
-> BoxFuture<Result<<T as AsyncTransaction>::Output, <T as AsyncTransaction>::Error>>
where T: TransactionTree<Node = T, Status = Transaction2PcStatus> {
let mgr = self.clone();
async move {
if tr.is_unit() {
return tr.rollback().await;
}
if tr.is_concurrent_rollback() {
let mut map_reduce = mgr.0.rt.map_reduce(tr.children_len());
let childs: Vec<<T as TransactionTree>::Node> = tr.to_children().collect();
for child in childs {
let child_uid = child.get_transaction_uid();
let prepare_uid = child.get_prepare_uid();
let commit_uid = child.get_commit_uid();
if child.is_unit() {
let child_copy = child.clone();
if let Err(e) = map_reduce.map(mgr.0.rt.clone(), async move {
child_copy.set_status(Transaction2PcStatus::Rollbacking);
match child_copy.rollback().await {
Err(e) => {
warn!("Rollback child transaction failed, type: unit, status: {:?}, reason: {:?}", child_copy.get_status(), e);
child_copy.set_status(Transaction2PcStatus::RollbackFailed); Err(Error::new(ErrorKind::Other, format!("Rollback children failed, type: unit, child_uid: {:?}, prepare_uid: {:?}, commit_uid: {:?}, reason: {:?}", child_copy.get_transaction_uid(), child_copy.get_prepare_uid(), child_copy.get_commit_uid(), e)))
},
Ok(output) => {
child_copy.set_status(Transaction2PcStatus::Rollbacked); Ok(output)
},
}
}) {
warn!("Rollback child transaction failed, type: tree, status: {:?}, reason: {:?}", child.get_status(), e);
child.set_status(Transaction2PcStatus::RollbackFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Map children rollback failed, type: unit, child_uid: {:?}, prepare_uid: {:?}, commit_uid: {:?}, reason: {:?}", child_uid, prepare_uid, commit_uid, e)));
};
} else if child.is_tree() {
let mgr_copy = mgr.clone();
if let Err(e) = map_reduce.map(mgr.0.rt.clone(), async move {
let child_uid = child.get_transaction_uid();
let prepare_uid = child.get_prepare_uid();
let commit_uid = child.get_commit_uid();
match mgr_copy.rollback(child).await {
Err(e) => {
warn!("Rollback child transaction failed, type: tree, uid: {:?}, reason: {:?}", (&child_uid, &prepare_uid, &commit_uid), e);
Err(Error::new(ErrorKind::Other, format!("Rollback children failed, type: tree, child_uid: {:?}, prepare_uid: {:?}, commit_uid: {:?}, reason: {:?}", child_uid, prepare_uid, commit_uid, e)))
},
Ok(output) => {
Ok(output)
},
}
}) {
warn!("Rollback child transaction failed, type: tree, status: {:?}, reason: {:?}", tr.get_status(), e);
return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Map children rollback failed, type: tree, child_uid: {:?}, prepare_uid: {:?}, commit_uid: {:?}, reason: {:?}", child_uid, prepare_uid, commit_uid, e)));
};
} else {
warn!("Rollback child transaction failed, type: invalid, status: {:?}, reason: invalid transaction type", child.get_status());
child.set_status(Transaction2PcStatus::RollbackFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Rollback transaction failed, child_uid: {:?}, prepare_uid: {:?}, commit_uid: {:?}, reason: invalid transaction type", child_uid, prepare_uid, commit_uid)));
}
}
match map_reduce.reduce(true).await {
Err(e) => {
warn!("Rollback child transaction failed, status: {:?}, reason: {:?}", tr.get_status(), e);
Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Reduce children rollback failed, type: tree, child_uid: {:?}, prepare_uid: {:?}, commit_uid: {:?}, reason: {:?}", tr.get_transaction_uid(), tr.get_prepare_uid(), tr.get_commit_uid(), e)))
},
Ok(results) => {
for result in results {
if let Err(e) = result {
warn!("Rollback child transaction failed, status: {:?}, reason: {:?}", tr.get_status(), e);
return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Rollback children failed, type: tree, child_uid: {:?}, prepare_uid: {:?}, commit_uid: {:?}, reason: {:?}", tr.get_transaction_uid(), tr.get_prepare_uid(), tr.get_commit_uid(), e)));
}
}
match tr.rollback().await {
Err(e) => {
warn!("Rollback root transaction failed, status: {:?}, reason: {:?}", tr.get_status(), e);
Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Rollback root failed, type: tree, transaction_uid: {:?}, prepare_uid: {:?}, commit_uid: {:?}, reason: {:?}", tr.get_transaction_uid(), tr.get_prepare_uid(), tr.get_commit_uid(), e)))
},
Ok(output) => {
Ok(output)
},
}
}
}
} else {
let childs: Vec<<T as TransactionTree>::Node> = tr.to_children().collect();
for child in childs {
if child.is_unit() {
child.set_status(Transaction2PcStatus::Rollbacking);
if let Err(e) = child.rollback().await {
warn!("Rollback child transaction failed, type: unit, status: {:?}, reason: {:?}", child.get_status(), e);
child.set_status(Transaction2PcStatus::RollbackFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Rollback children failed, type: unit, child_uid: {:?}, prepare_uid: {:?}, commit_uid: {:?}, reason: {:?}", child.get_transaction_uid(), child.get_prepare_uid(), child.get_commit_uid(), e)));
}
child.set_status(Transaction2PcStatus::Rollbacked); } else if child.is_tree() {
let child_uid = child.get_transaction_uid();
let prepare_uid = child.get_prepare_uid();
let commit_uid = child.get_commit_uid();
if let Err(e) = mgr.rollback(child).await {
warn!("Rollback child transaction failed, type: tree, uid: {:?}, reason: {:?}", (&child_uid, &prepare_uid, &commit_uid), e);
return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Rollback children failed, type: tree, child_uid: {:?}, prepare_uid: {:?}, commit_uid: {:?}, reason: {:?}", child_uid, prepare_uid, commit_uid, e)));
}
} else {
warn!("Rollback child transaction failed, type invalid, status: {:?}, reason: invalid transaction type", child.get_status());
child.set_status(Transaction2PcStatus::CommitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Rollback transaction failed, child_uid: {:?}, prepare_uid: {:?}, commit_uid: {:?}, reason: invalid transaction type", child.get_transaction_uid(), child.get_prepare_uid(), child.get_commit_uid())));
}
}
match tr.rollback().await {
Err(e) => {
warn!("Rollback root transaction failed, status: {:?}, reason: {:?}", tr.get_status(), e);
Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Fatal, format!("Rollback root failed, type: tree, transaction_uid: {:?}, prepare_uid: {:?}, commit_uid: {:?}, reason: {:?}", tr.get_transaction_uid(), tr.get_prepare_uid(), tr.get_commit_uid(), e)))
},
Ok(output) => {
Ok(output)
},
}
}
}.boxed()
}
pub async fn replay_commit_log<B>(&self,
callback: impl Fn(Guid, B) -> IOResult<()> + Send + Sync + 'static)
-> IOResult<(usize, usize)>
where B: BufMut + AsRef<[u8]> + From<Vec<u8>> + Send + Sized + 'static {
self.0.commit_logger.start_replay(Arc::new(callback)).await
}
pub async fn replay_commit_log_by_file<B>(&self,
callback: impl Fn(Guid, B) -> IOResult<()> + Send + Sync + 'static,
file_finished: impl Fn() -> IOResult<()> + Send + Sync + 'static)
-> IOResult<(usize, usize)>
where B: BufMut + AsRef<[u8]> + From<Vec<u8>> + Send + Sized + 'static {
self.0
.commit_logger
.start_replay_by_file(Arc::new(callback), Arc::new(file_finished))
.await
}
pub async fn replay_commit<T>(&self,
tr: T,
transaction_uid: <T as Transaction2Pc>::Tid,
commit_uid: <T as Transaction2Pc>::Cid,
input: <T as Transaction2Pc>::PrepareOutput,
confirm: <T as Transaction2Pc>::CommitConfirm)
-> Result<<T as AsyncTransaction>::Output, <T as AsyncTransaction>::Error>
where T: TransactionTree<Tid = Guid, Pid = Guid, Cid = Guid, Node = T, Status = Transaction2PcStatus> {
reset_transaction_uid::<C, Log, T>(tr.clone(), transaction_uid); if let Err(current_len) = register_transcation(&self, &tr) {
tr.set_status(Transaction2PcStatus::InitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Normal, format!("Init root failed, type: unit, current: {}, status: {:?}, reason: same source transaction excessive", current_len, tr.get_status())));
}
reset_commit_uid::<C, Log, T>(tr.clone(), commit_uid);
self.0.commit_produced.fetch_add(1, Ordering::Relaxed);
match self.0.commit_logger.append_replay(tr.get_commit_uid().unwrap(),
input).await {
Err(e) => {
tr.set_status(Transaction2PcStatus::LogCommitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Normal, format!("Commit replay transaction failed, type: tree, transaction_uid: {:?}, commit_uid: {:?}, reason: {:?}", tr.get_transaction_uid(), tr.get_commit_uid(), e)));
},
Ok(log_handle) => {
if let Err(e) = self.0.commit_logger.flush_replay(log_handle).await {
tr.set_status(Transaction2PcStatus::LogCommitFailed); return Err(<T as AsyncTransaction>::Error::new_transaction_error(ErrorLevel::Normal, format!("Commit replay transaction failed, type: tree, transaction_uid: {:?}, commit_uid: {:?}, reason: {:?}", tr.get_transaction_uid(), tr.get_commit_uid(), e)));
}
tr.set_status(Transaction2PcStatus::LogCommited); },
}
let result = self.commit_confirm(tr.clone(), confirm).await;
if result.is_err() {
tr.set_status(Transaction2PcStatus::CommitFailed); } else {
tr.set_status(Transaction2PcStatus::Commited); }
self.0.commit_consumed.fetch_add(1, Ordering::Relaxed);
result
}
pub async fn finish_replay(&self) -> IOResult<()> {
self.0.commit_logger.finish_replay().await
}
pub async fn advance_replay_check_point(&self) -> IOResult<()> {
self.0.commit_logger.advance_replay_check_point().await
}
}
#[inline(always)]
fn alloc_transaction_uid<T>(uid_gen: &GuidGen, tr: &T) -> Guid
where T: TransactionTree<Tid = Guid, Pid = Guid, Cid = Guid, Node = T, Status = Transaction2PcStatus> {
if tr.is_enable_inherit_uid() {
if let Some(uid) = tr.get_transaction_uid() {
return uid;
}
}
uid_gen.gen(DEFAULT_TRANSACTION_CTRL_ID)
}
#[inline(always)]
fn alloc_commit_uid<T>(uid_gen: &GuidGen, tr: &T) -> Guid
where T: TransactionTree<Tid = Guid, Pid = Guid, Cid = Guid, Node = T, Status = Transaction2PcStatus> {
if tr.is_enable_inherit_uid() {
if let Some(uid) = tr.get_commit_uid() {
return uid;
}
}
uid_gen.gen(DEFAULT_TRANSACTION_COMMIT_CTRL_ID)
}
fn register_transcation<
C: Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
T: TransactionTree<Tid = Guid, Pid = Guid, Cid = Guid, Node = T, Status = Transaction2PcStatus>,
>(mgr: &Transaction2PcManager<C, Log>, tr: &T) -> Result<(), usize> {
let source = tr.get_source();
if let (Some(current_len), Some(limit)) = (mgr.source_len(&source), mgr.get_max_source_parallel_limit(&source)) {
if current_len >= limit {
return Err(current_len)
}
}
if let Some(transaction_uid) = tr.get_transaction_uid() {
let shared_tr = Arc::new(tr.clone()) as Arc<dyn Any + Send + Sync + 'static>;
mgr.0.trans_table.insert(transaction_uid.clone(), shared_tr.clone()); if let Some(counter) = mgr.0.source_counter.get(&tr.get_source()) {
counter.value().0.fetch_add(1, Ordering::Relaxed);
} else {
mgr.0.source_counter.insert(tr.get_source(),
(AtomicUsize::new(1), AtomicUsize::new(0), AtomicUsize::new(DEFAULT_MAX_PARALLEL_TRANSACTION_LIMIT)));
}
mgr.0.produced_total.fetch_add(1, Ordering::Relaxed); }
Ok(())
}
fn reset_transaction_uid<
C: Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
T: TransactionTree<Tid = Guid, Pid = Guid, Cid = Guid, Node = T, Status = Transaction2PcStatus>,
>(tr: T,
transaction_uid: <T as Transaction2Pc>::Tid) {
tr.set_transaction_uid(transaction_uid.clone());
if tr.children_len() > 0 {
let mut childs = tr.to_children();
while let Some(child) = childs.next() {
reset_transaction_uid::<C, Log, T>(child, transaction_uid.clone());
}
}
}
fn reset_commit_uid<
C: Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
T: TransactionTree<Tid = Guid, Pid = Guid, Cid = Guid, Node = T, Status = Transaction2PcStatus>,
>(tr: T,
commit_uid: <T as Transaction2Pc>::Cid) {
tr.set_commit_uid(commit_uid.clone());
if tr.children_len() > 0 {
let mut childs = tr.to_children();
while let Some(child) = childs.next() {
reset_commit_uid::<C, Log, T>(child, commit_uid.clone());
}
}
}
struct Inner2PcManager<
C: Send + 'static,
Log: AsyncCommitLog<C = C, Cid = Guid>,
> {
rt: MultiTaskRuntime<()>, uid_gen: GuidGen, commit_logger: Log, trans_table: DashMap<Guid, Arc<dyn Any + Send + Sync + 'static>>, source_counter: DashMap<Atom, (AtomicUsize, AtomicUsize, AtomicUsize)>, prepare_produced: AtomicUsize, prepare_consumed: AtomicUsize, commit_produced: AtomicUsize, commit_consumed: AtomicUsize, produced_total: AtomicUsize, consumed_total: AtomicUsize, }