use std::iter;
use std::sync::atomic;
use std::sync::atomic::AtomicU8;
use std::sync::Arc;
use std::time::Instant;
use derive_new::new;
use fail::fail_point;
use futures::prelude::*;
use log::{debug, error, info, trace, warn};
use tokio::time::Duration;
use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::kv::HexRepr;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::kvrpcpb;
use crate::proto::pdpb::Timestamp;
use crate::request::Collect;
use crate::request::CollectError;
use crate::request::CollectSingle;
use crate::request::CollectWithShard;
use crate::request::EncodeKeyspace;
use crate::request::KeyMode;
use crate::request::Keyspace;
use crate::request::Plan;
use crate::request::PlanBuilder;
use crate::request::RetryOptions;
use crate::request::TruncateKeyspace;
use crate::timestamp::TimestampExt;
use crate::transaction::buffer::Buffer;
use crate::transaction::lowering::*;
use crate::BoundRange;
use crate::Error;
use crate::Key;
use crate::KvPair;
use crate::Result;
use crate::Value;
pub struct Transaction<PdC: PdClient = PdRpcClient> {
status: Arc<AtomicU8>,
timestamp: Timestamp,
buffer: Buffer,
rpc: Arc<PdC>,
options: TransactionOptions,
keyspace: Keyspace,
is_heartbeat_started: bool,
start_instant: Instant,
}
impl<PdC: PdClient> Transaction<PdC> {
pub(crate) fn new(
timestamp: Timestamp,
rpc: Arc<PdC>,
options: TransactionOptions,
keyspace: Keyspace,
) -> Transaction<PdC> {
let status = if options.read_only {
TransactionStatus::ReadOnly
} else {
TransactionStatus::Active
};
Transaction {
status: Arc::new(AtomicU8::new(status as u8)),
timestamp,
buffer: Buffer::new(options.is_pessimistic()),
rpc,
options,
keyspace,
is_heartbeat_started: false,
start_instant: std::time::Instant::now(),
}
}
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
trace!("invoking transactional get request");
self.check_allow_operation().await?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
let retry_options = self.options.retry_options.clone();
let keyspace = self.keyspace;
self.buffer
.get_or_else(key, |key| async move {
let request = new_get_request(key, timestamp.clone());
let plan = PlanBuilder::new(rpc, keyspace, request)
.resolve_lock(timestamp, retry_options.lock_backoff, keyspace)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
.post_process_default()
.plan();
plan.execute().await
})
.await
}
pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!("invoking transactional get_for_update request");
self.check_allow_operation().await?;
if !self.is_pessimistic() {
let key = key.into();
self.lock_keys(iter::once(key.clone())).await?;
self.get(key).await
} else {
let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
let mut pairs = self.pessimistic_lock(iter::once(key), true).await?;
debug_assert!(pairs.len() <= 1);
match pairs.pop() {
Some(pair) => Ok(Some(pair.1)),
None => Ok(None),
}
}
}
pub async fn key_exists(&mut self, key: impl Into<Key>) -> Result<bool> {
debug!("invoking transactional key_exists request");
Ok(self.get(key).await?.is_some())
}
pub async fn batch_get(
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = KvPair>> {
debug!("invoking transactional batch_get request");
self.check_allow_operation().await?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
let keyspace = self.keyspace;
let keys = keys
.into_iter()
.map(move |k| k.into().encode_keyspace(keyspace, KeyMode::Txn));
let retry_options = self.options.retry_options.clone();
self.buffer
.batch_get_or_else(keys, move |keys| async move {
let request = new_batch_get_request(keys, timestamp.clone());
let plan = PlanBuilder::new(rpc, keyspace, request)
.resolve_lock(timestamp, retry_options.lock_backoff, keyspace)
.retry_multi_region(retry_options.region_backoff)
.merge(Collect)
.plan();
plan.execute()
.await
.map(|r| r.into_iter().map(Into::into).collect())
})
.await
.map(move |pairs| pairs.map(move |pair| pair.truncate_keyspace(keyspace)))
}
pub async fn batch_get_for_update(
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<Vec<KvPair>> {
debug!("invoking transactional batch_get_for_update request");
self.check_allow_operation().await?;
if !self.is_pessimistic() {
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
self.lock_keys(keys.clone()).await?;
Ok(self.batch_get(keys).await?.collect())
} else {
let keyspace = self.keyspace;
let keys = keys
.into_iter()
.map(move |k| k.into().encode_keyspace(keyspace, KeyMode::Txn));
let pairs = self
.pessimistic_lock(keys, true)
.await?
.truncate_keyspace(keyspace);
Ok(pairs)
}
}
pub async fn scan(
&mut self,
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = KvPair>> {
debug!("invoking transactional scan request");
self.scan_inner(range, limit, false, false).await
}
pub async fn scan_keys(
&mut self,
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = Key>> {
debug!("invoking transactional scan_keys request");
Ok(self
.scan_inner(range, limit, true, false)
.await?
.map(KvPair::into_key))
}
pub async fn scan_reverse(
&mut self,
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = KvPair>> {
debug!("invoking transactional scan_reverse request");
self.scan_inner(range, limit, false, true).await
}
pub async fn scan_keys_reverse(
&mut self,
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = Key>> {
debug!("invoking transactional scan_keys_reverse request");
Ok(self
.scan_inner(range, limit, true, true)
.await?
.map(KvPair::into_key))
}
pub async fn put(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
trace!("invoking transactional put request");
self.check_allow_operation().await?;
let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
if self.is_pessimistic() {
self.pessimistic_lock(iter::once(key.clone()), false)
.await?;
}
self.buffer.put(key, value.into());
Ok(())
}
pub async fn insert(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!("invoking transactional insert request");
self.check_allow_operation().await?;
let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
if self.buffer.get(&key).is_some() {
return Err(Error::DuplicateKeyInsertion);
}
if self.is_pessimistic() {
self.pessimistic_lock(
iter::once((key.clone(), kvrpcpb::Assertion::NotExist)),
false,
)
.await?;
}
self.buffer.insert(key, value.into());
Ok(())
}
pub async fn delete(&mut self, key: impl Into<Key>) -> Result<()> {
debug!("invoking transactional delete request");
self.check_allow_operation().await?;
let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
if self.is_pessimistic() {
self.pessimistic_lock(iter::once(key.clone()), false)
.await?;
}
self.buffer.delete(key);
Ok(())
}
pub async fn batch_mutate(
&mut self,
mutations: impl IntoIterator<Item = Mutation>,
) -> Result<()> {
debug!("invoking transactional batch mutate request");
self.check_allow_operation().await?;
let mutations: Vec<Mutation> = mutations
.into_iter()
.map(|mutation| mutation.encode_keyspace(self.keyspace, KeyMode::Txn))
.collect();
if self.is_pessimistic() {
self.pessimistic_lock(mutations.iter().map(|m| m.key().clone()), false)
.await?;
for m in mutations {
self.buffer.mutate(m);
}
} else {
for m in mutations.into_iter() {
self.buffer.mutate(m);
}
}
Ok(())
}
pub async fn lock_keys(
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<()> {
debug!("invoking transactional lock_keys request");
self.check_allow_operation().await?;
let keyspace = self.keyspace;
let keys = keys
.into_iter()
.map(move |k| k.into().encode_keyspace(keyspace, KeyMode::Txn));
match self.options.kind {
TransactionKind::Optimistic => {
for key in keys {
self.buffer.lock(key);
}
}
TransactionKind::Pessimistic(_) => {
self.pessimistic_lock(keys, false).await?;
}
}
Ok(())
}
pub async fn commit(&mut self) -> Result<Option<Timestamp>> {
debug!("commiting transaction");
if !self.transit_status(
|status| {
matches!(
status,
TransactionStatus::StartedCommit | TransactionStatus::Active
)
},
TransactionStatus::StartedCommit,
) {
return Err(Error::OperationAfterCommitError);
}
let primary_key = self.buffer.get_primary_key();
let mutations = self.buffer.to_proto_mutations();
if mutations.is_empty() {
assert!(primary_key.is_none());
return Ok(None);
}
self.start_auto_heartbeat().await;
let res = Committer::new(
primary_key,
mutations,
self.timestamp.clone(),
self.rpc.clone(),
self.options.clone(),
self.keyspace,
self.buffer.get_write_size() as u64,
self.start_instant,
)
.commit()
.await;
if res.is_ok() {
self.set_status(TransactionStatus::Committed);
}
res
}
pub async fn rollback(&mut self) -> Result<()> {
debug!("rolling back transaction");
if !self.transit_status(
|status| {
matches!(
status,
TransactionStatus::StartedRollback
| TransactionStatus::Active
| TransactionStatus::StartedCommit
)
},
TransactionStatus::StartedRollback,
) {
return Err(Error::OperationAfterCommitError);
}
let primary_key = self.buffer.get_primary_key();
let mutations = self.buffer.to_proto_mutations();
let res = Committer::new(
primary_key,
mutations,
self.timestamp.clone(),
self.rpc.clone(),
self.options.clone(),
self.keyspace,
self.buffer.get_write_size() as u64,
self.start_instant,
)
.rollback()
.await;
if res.is_ok() {
self.set_status(TransactionStatus::Rolledback);
}
res
}
pub fn start_timestamp(&self) -> Timestamp {
self.timestamp.clone()
}
#[doc(hidden)]
pub async fn send_heart_beat(&mut self) -> Result<u64> {
debug!("sending heart_beat");
self.check_allow_operation().await?;
let primary_key = match self.buffer.get_primary_key() {
Some(k) => k,
None => return Err(Error::NoPrimaryKey),
};
let request = new_heart_beat_request(
self.timestamp.clone(),
primary_key,
self.start_instant.elapsed().as_millis() as u64 + MAX_TTL,
);
let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.resolve_lock(
self.timestamp.clone(),
self.options.retry_options.lock_backoff.clone(),
self.keyspace,
)
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.extract_error()
.merge(CollectSingle)
.post_process_default()
.plan();
plan.execute().await
}
async fn scan_inner(
&mut self,
range: impl Into<BoundRange>,
limit: u32,
key_only: bool,
reverse: bool,
) -> Result<impl Iterator<Item = KvPair>> {
self.check_allow_operation().await?;
let timestamp = self.timestamp.clone();
let rpc = self.rpc.clone();
let retry_options = self.options.retry_options.clone();
let keyspace = self.keyspace;
let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn);
self.buffer
.scan_and_fetch(
range,
limit,
!key_only,
reverse,
move |new_range, new_limit| async move {
let request = new_scan_request(
new_range,
timestamp.clone(),
new_limit,
key_only,
reverse,
);
let plan = PlanBuilder::new(rpc, keyspace, request)
.resolve_lock(timestamp, retry_options.lock_backoff, keyspace)
.retry_multi_region(retry_options.region_backoff)
.merge(Collect)
.plan();
plan.execute()
.await
.map(|r| r.into_iter().map(Into::into).collect())
},
)
.await
.map(move |pairs| pairs.map(move |pair| pair.truncate_keyspace(keyspace)))
}
async fn pessimistic_lock(
&mut self,
keys: impl IntoIterator<Item = impl PessimisticLock>,
need_value: bool,
) -> Result<Vec<KvPair>> {
debug!("acquiring pessimistic lock");
assert!(
matches!(self.options.kind, TransactionKind::Pessimistic(_)),
"`pessimistic_lock` is only valid to use with pessimistic transactions"
);
let keys: Vec<_> = keys.into_iter().collect();
if keys.is_empty() {
return Ok(vec![]);
}
let first_key = keys[0].clone().key();
let primary_lock = self
.buffer
.get_primary_key()
.unwrap_or_else(|| first_key.clone());
let for_update_ts = self.rpc.clone().get_timestamp().await?;
self.options.push_for_update_ts(for_update_ts.clone());
let request = new_pessimistic_lock_request(
keys.clone().into_iter(),
primary_lock,
self.timestamp.clone(),
MAX_TTL,
for_update_ts.clone(),
need_value,
);
let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.resolve_lock(
self.timestamp.clone(),
self.options.retry_options.lock_backoff.clone(),
self.keyspace,
)
.preserve_shard()
.retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone())
.merge(CollectWithShard)
.plan();
let pairs = plan.execute().await;
if let Err(err) = pairs {
match err {
Error::PessimisticLockError {
inner,
success_keys,
} if !success_keys.is_empty() => {
let keys = success_keys.into_iter().map(Key::from);
self.pessimistic_lock_rollback(keys, self.timestamp.clone(), for_update_ts)
.await?;
Err(*inner)
}
_ => Err(err),
}
} else {
self.buffer.primary_key_or(&first_key);
self.start_auto_heartbeat().await;
for key in keys {
self.buffer.lock(key.key());
}
pairs
}
}
async fn pessimistic_lock_rollback(
&mut self,
keys: impl Iterator<Item = Key>,
start_version: Timestamp,
for_update_ts: Timestamp,
) -> Result<()> {
debug!("rollback pessimistic lock");
let keys: Vec<_> = keys.into_iter().collect();
if keys.is_empty() {
return Ok(());
}
let req = new_pessimistic_rollback_request(
keys.clone().into_iter(),
start_version.clone(),
for_update_ts,
);
let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, req)
.resolve_lock(
start_version,
self.options.retry_options.lock_backoff.clone(),
self.keyspace,
)
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.extract_error()
.plan();
plan.execute().await?;
for key in keys {
self.buffer.unlock(&key);
}
Ok(())
}
async fn check_allow_operation(&self) -> Result<()> {
match self.get_status() {
TransactionStatus::ReadOnly | TransactionStatus::Active => Ok(()),
TransactionStatus::Committed
| TransactionStatus::Rolledback
| TransactionStatus::StartedCommit
| TransactionStatus::StartedRollback
| TransactionStatus::Dropped => Err(Error::OperationAfterCommitError),
}
}
fn is_pessimistic(&self) -> bool {
matches!(self.options.kind, TransactionKind::Pessimistic(_))
}
async fn start_auto_heartbeat(&mut self) {
debug!("starting auto_heartbeat");
if !self.options.heartbeat_option.is_auto_heartbeat() || self.is_heartbeat_started {
return;
}
self.is_heartbeat_started = true;
let status = self.status.clone();
let primary_key = self
.buffer
.get_primary_key()
.expect("Primary key should exist");
let start_ts = self.timestamp.clone();
let region_backoff = self.options.retry_options.region_backoff.clone();
let rpc = self.rpc.clone();
let heartbeat_interval = match self.options.heartbeat_option {
HeartbeatOption::NoHeartbeat => DEFAULT_HEARTBEAT_INTERVAL,
HeartbeatOption::FixedTime(heartbeat_interval) => heartbeat_interval,
};
let start_instant = self.start_instant;
let keyspace = self.keyspace;
let heartbeat_task = async move {
loop {
tokio::time::sleep(heartbeat_interval).await;
{
let status: TransactionStatus = status.load(atomic::Ordering::Acquire).into();
if matches!(
status,
TransactionStatus::Rolledback
| TransactionStatus::Committed
| TransactionStatus::Dropped
) {
break;
}
}
let request = new_heart_beat_request(
start_ts.clone(),
primary_key.clone(),
start_instant.elapsed().as_millis() as u64 + MAX_TTL,
);
let plan = PlanBuilder::new(rpc.clone(), keyspace, request)
.retry_multi_region(region_backoff.clone())
.merge(CollectSingle)
.plan();
plan.execute().await?;
}
Ok::<(), Error>(())
};
tokio::spawn(async {
if let Err(err) = heartbeat_task.await {
log::error!("Error: While sending heartbeat. {}", err);
}
});
}
fn get_status(&self) -> TransactionStatus {
self.status.load(atomic::Ordering::Acquire).into()
}
fn set_status(&self, status: TransactionStatus) {
self.status.store(status as u8, atomic::Ordering::Release);
}
fn transit_status<F>(&self, check_status: F, next: TransactionStatus) -> bool
where
F: Fn(TransactionStatus) -> bool,
{
let mut current = self.get_status();
while check_status(current) {
if current == next {
return true;
}
match self.status.compare_exchange_weak(
current as u8,
next as u8,
atomic::Ordering::AcqRel,
atomic::Ordering::Acquire,
) {
Ok(_) => return true,
Err(x) => current = x.into(),
}
}
false
}
}
impl<PdC: PdClient> Drop for Transaction<PdC> {
fn drop(&mut self) {
debug!("dropping transaction");
if std::thread::panicking() {
return;
}
if self.get_status() == TransactionStatus::Active {
match self.options.check_level {
CheckLevel::Panic => {
panic!("Dropping an active transaction. Consider commit or rollback it.")
}
CheckLevel::Warn => {
warn!("Dropping an active transaction. Consider commit or rollback it.")
}
CheckLevel::None => {}
}
}
self.set_status(TransactionStatus::Dropped);
}
}
const MAX_TTL: u64 = 20000;
const DEFAULT_LOCK_TTL: u64 = 3000;
const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_millis(MAX_TTL / 2);
pub const TXN_COMMIT_BATCH_SIZE: u64 = 16 * 1024;
const TTL_FACTOR: f64 = 6000.0;
#[derive(Clone, PartialEq, Debug)]
pub enum TransactionKind {
Optimistic,
Pessimistic(Timestamp),
}
#[derive(Clone, PartialEq, Debug)]
pub struct TransactionOptions {
kind: TransactionKind,
try_one_pc: bool,
async_commit: bool,
read_only: bool,
retry_options: RetryOptions,
check_level: CheckLevel,
#[doc(hidden)]
heartbeat_option: HeartbeatOption,
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum HeartbeatOption {
NoHeartbeat,
FixedTime(Duration),
}
impl Default for TransactionOptions {
fn default() -> TransactionOptions {
Self::new_pessimistic()
}
}
impl TransactionOptions {
pub fn new_optimistic() -> TransactionOptions {
TransactionOptions {
kind: TransactionKind::Optimistic,
try_one_pc: false,
async_commit: false,
read_only: false,
retry_options: RetryOptions::default_optimistic(),
check_level: CheckLevel::Panic,
heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL),
}
}
pub fn new_pessimistic() -> TransactionOptions {
TransactionOptions {
kind: TransactionKind::Pessimistic(Timestamp::from_version(0)),
try_one_pc: false,
async_commit: false,
read_only: false,
retry_options: RetryOptions::default_pessimistic(),
check_level: CheckLevel::Panic,
heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL),
}
}
#[must_use]
pub fn use_async_commit(mut self) -> TransactionOptions {
self.async_commit = true;
self
}
#[must_use]
pub fn try_one_pc(mut self) -> TransactionOptions {
self.try_one_pc = true;
self
}
#[must_use]
pub fn read_only(mut self) -> TransactionOptions {
self.read_only = true;
self
}
#[must_use]
pub fn no_resolve_locks(mut self) -> TransactionOptions {
self.retry_options.lock_backoff = Backoff::no_backoff();
self
}
#[must_use]
pub fn no_resolve_regions(mut self) -> TransactionOptions {
self.retry_options.region_backoff = Backoff::no_backoff();
self
}
#[must_use]
pub fn retry_options(mut self, options: RetryOptions) -> TransactionOptions {
self.retry_options = options;
self
}
#[must_use]
pub fn drop_check(mut self, level: CheckLevel) -> TransactionOptions {
self.check_level = level;
self
}
fn push_for_update_ts(&mut self, for_update_ts: Timestamp) {
match &mut self.kind {
TransactionKind::Optimistic => unreachable!(),
TransactionKind::Pessimistic(old_for_update_ts) => {
self.kind = TransactionKind::Pessimistic(Timestamp::from_version(std::cmp::max(
old_for_update_ts.version(),
for_update_ts.version(),
)));
}
}
}
#[must_use]
pub fn heartbeat_option(mut self, heartbeat_option: HeartbeatOption) -> TransactionOptions {
self.heartbeat_option = heartbeat_option;
self
}
pub fn is_pessimistic(&self) -> bool {
match self.kind {
TransactionKind::Pessimistic(_) => true,
TransactionKind::Optimistic => false,
}
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum CheckLevel {
Panic,
Warn,
None,
}
impl HeartbeatOption {
pub fn is_auto_heartbeat(&self) -> bool {
!matches!(self, HeartbeatOption::NoHeartbeat)
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum Mutation {
Put(Key, Value),
Delete(Key),
}
impl Mutation {
pub fn key(&self) -> &Key {
match self {
Mutation::Put(key, _) => key,
Mutation::Delete(key) => key,
}
}
}
#[allow(clippy::too_many_arguments)]
#[derive(new)]
struct Committer<PdC: PdClient = PdRpcClient> {
primary_key: Option<Key>,
mutations: Vec<kvrpcpb::Mutation>,
start_version: Timestamp,
rpc: Arc<PdC>,
options: TransactionOptions,
keyspace: Keyspace,
#[new(default)]
undetermined: bool,
write_size: u64,
start_instant: Instant,
}
impl<PdC: PdClient> Committer<PdC> {
async fn commit(mut self) -> Result<Option<Timestamp>> {
debug!("committing");
let min_commit_ts = self.prewrite().await?;
fail_point!("after-prewrite", |_| {
Err(Error::StringError(
"failpoint: after-prewrite return error".to_owned(),
))
});
if self.options.try_one_pc {
return Ok(min_commit_ts);
}
let commit_ts = if self.options.async_commit {
min_commit_ts.unwrap()
} else {
match self.commit_primary_with_retry().await {
Ok(commit_ts) => commit_ts,
Err(e) => {
return if self.undetermined {
Err(Error::UndeterminedError(Box::new(e)))
} else {
Err(e)
};
}
}
};
tokio::spawn(self.commit_secondary(commit_ts.clone()).map(|res| {
if let Err(e) = res {
log::warn!("Failed to commit secondary keys: {}", e);
}
}));
Ok(Some(commit_ts))
}
async fn prewrite(&mut self) -> Result<Option<Timestamp>> {
debug!("prewriting");
let primary_lock = self.primary_key.clone().unwrap();
let elapsed = self.start_instant.elapsed().as_millis() as u64;
let lock_ttl = self.calc_txn_lock_ttl();
let mut request = match &self.options.kind {
TransactionKind::Optimistic => new_prewrite_request(
self.mutations.clone(),
primary_lock,
self.start_version.clone(),
lock_ttl + elapsed,
),
TransactionKind::Pessimistic(for_update_ts) => new_pessimistic_prewrite_request(
self.mutations.clone(),
primary_lock,
self.start_version.clone(),
lock_ttl + elapsed,
for_update_ts.clone(),
),
};
request.use_async_commit = self.options.async_commit;
request.try_one_pc = self.options.try_one_pc;
request.secondaries = self
.mutations
.iter()
.filter(|m| self.primary_key.as_ref().unwrap() != m.key.as_ref())
.map(|m| m.key.clone())
.collect();
let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.resolve_lock(
self.start_version.clone(),
self.options.retry_options.lock_backoff.clone(),
self.keyspace,
)
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.merge(CollectError)
.extract_error()
.plan();
let response = plan.execute().await?;
if self.options.try_one_pc && response.len() == 1 {
if response[0].one_pc_commit_ts == 0 {
return Err(Error::OnePcFailure);
}
return Ok(Timestamp::try_from_version(response[0].one_pc_commit_ts));
}
self.options.try_one_pc = false;
let min_commit_ts = response
.iter()
.map(|r| {
assert_eq!(r.one_pc_commit_ts, 0);
r.min_commit_ts
})
.max()
.map(Timestamp::from_version);
Ok(min_commit_ts)
}
async fn commit_primary(&mut self) -> Result<Timestamp> {
debug!("committing primary");
let primary_key = self.primary_key.clone().into_iter();
let commit_version = self.rpc.clone().get_timestamp().await?;
let req = new_commit_request(
primary_key,
self.start_version.clone(),
commit_version.clone(),
);
let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, req)
.resolve_lock(
self.start_version.clone(),
self.options.retry_options.lock_backoff.clone(),
self.keyspace,
)
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.extract_error()
.plan();
plan.execute()
.inspect_err(|e| {
debug!(
"commit primary error: {:?}, start_ts: {}",
e,
self.start_version.version()
);
if let Error::Grpc(_) = e {
self.undetermined = true;
}
})
.await?;
Ok(commit_version)
}
async fn commit_primary_with_retry(&mut self) -> Result<Timestamp> {
loop {
match self.commit_primary().await {
Ok(commit_version) => return Ok(commit_version),
Err(Error::ExtractedErrors(mut errors)) => match errors.pop() {
Some(Error::KeyError(key_err)) => {
if let Some(expired) = key_err.commit_ts_expired {
info!("2PC commit_ts rejected by TiKV, retry with a newer commit_ts, start_ts: {}",
self.start_version.version());
let primary_key = self.primary_key.as_ref().unwrap();
if primary_key != expired.key.as_ref() {
error!("2PC commit_ts rejected by TiKV, but the key is not the primary key, start_ts: {}, key: {}, primary: {:?}",
self.start_version.version(), HexRepr(&expired.key), primary_key);
return Err(Error::StringError("2PC commitTS rejected by TiKV, but the key is not the primary key".to_string()));
}
if expired
.min_commit_ts
.saturating_sub(expired.attempted_commit_ts)
> 943718400000
{
let msg = format!("2PC min_commit_ts is too large, we got min_commit_ts: {}, and attempted_commit_ts: {}",
expired.min_commit_ts, expired.attempted_commit_ts);
return Err(Error::StringError(msg));
}
continue;
} else {
return Err(Error::KeyError(key_err));
}
}
Some(err) => return Err(err),
None => unreachable!(),
},
Err(err) => return Err(err),
}
}
}
async fn commit_secondary(self, commit_version: Timestamp) -> Result<()> {
debug!("committing secondary");
let start_version = self.start_version.clone();
let mutations_len = self.mutations.len();
let primary_only = mutations_len == 1;
#[cfg(not(feature = "integration-tests"))]
let mutations = self.mutations.into_iter();
#[cfg(feature = "integration-tests")]
let mutations = self.mutations.into_iter().take({
let fp = || -> Result<usize> {
let mut new_len = mutations_len;
fail_point!("before-commit-secondary", |percent| {
let percent = percent.unwrap().parse::<usize>().unwrap();
new_len = mutations_len * percent / 100;
if new_len == 0 {
Err(Error::StringError(
"failpoint: before-commit-secondary return error".to_owned(),
))
} else {
debug!(
"failpoint: before-commit-secondary truncate mutation {} -> {}",
mutations_len, new_len
);
Ok(new_len)
}
});
Ok(new_len)
};
fp()?
});
let req = if self.options.async_commit {
let keys = mutations.map(|m| m.key.into());
new_commit_request(keys, start_version.clone(), commit_version)
} else if primary_only {
return Ok(());
} else {
let primary_key = self.primary_key.unwrap();
let keys = mutations
.map(|m| m.key.into())
.filter(|key| &primary_key != key);
new_commit_request(keys, start_version.clone(), commit_version)
};
let plan = PlanBuilder::new(self.rpc, self.keyspace, req)
.resolve_lock(
start_version,
self.options.retry_options.lock_backoff,
self.keyspace,
)
.retry_multi_region(self.options.retry_options.region_backoff)
.extract_error()
.plan();
plan.execute().await?;
Ok(())
}
async fn rollback(self) -> Result<()> {
debug!("rolling back");
if self.options.kind == TransactionKind::Optimistic && self.mutations.is_empty() {
return Ok(());
}
let keys = self
.mutations
.into_iter()
.map(|mutation| mutation.key.into());
let start_version = self.start_version.clone();
match self.options.kind {
TransactionKind::Optimistic => {
let req = new_batch_rollback_request(keys, start_version.clone());
let plan = PlanBuilder::new(self.rpc, self.keyspace, req)
.resolve_lock(
start_version.clone(),
self.options.retry_options.lock_backoff,
self.keyspace,
)
.retry_multi_region(self.options.retry_options.region_backoff)
.extract_error()
.plan();
plan.execute().await?;
}
TransactionKind::Pessimistic(for_update_ts) => {
let req =
new_pessimistic_rollback_request(keys, start_version.clone(), for_update_ts);
let plan = PlanBuilder::new(self.rpc, self.keyspace, req)
.resolve_lock(
start_version.clone(),
self.options.retry_options.lock_backoff,
self.keyspace,
)
.retry_multi_region(self.options.retry_options.region_backoff)
.extract_error()
.plan();
plan.execute().await?;
}
}
Ok(())
}
fn calc_txn_lock_ttl(&mut self) -> u64 {
let mut lock_ttl = DEFAULT_LOCK_TTL;
if self.write_size > TXN_COMMIT_BATCH_SIZE {
let size_mb = self.write_size as f64 / 1024.0 / 1024.0;
lock_ttl = (TTL_FACTOR * size_mb.sqrt()) as u64;
lock_ttl = lock_ttl.clamp(DEFAULT_LOCK_TTL, MAX_TTL);
}
lock_ttl
}
}
#[derive(PartialEq, Eq, Clone, Copy)]
#[repr(u8)]
enum TransactionStatus {
ReadOnly = 0,
Active = 1,
Committed = 2,
StartedCommit = 3,
Rolledback = 4,
StartedRollback = 5,
Dropped = 6,
}
impl From<u8> for TransactionStatus {
fn from(num: u8) -> Self {
match num {
0 => TransactionStatus::ReadOnly,
1 => TransactionStatus::Active,
2 => TransactionStatus::Committed,
3 => TransactionStatus::StartedCommit,
4 => TransactionStatus::Rolledback,
5 => TransactionStatus::StartedRollback,
6 => TransactionStatus::Dropped,
_ => panic!("Unknown transaction status {}", num),
}
}
}
#[cfg(test)]
mod tests {
use std::any::Any;
use std::io;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use fail::FailScenario;
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::proto::kvrpcpb;
use crate::proto::pdpb::Timestamp;
use crate::request::Keyspace;
use crate::transaction::HeartbeatOption;
use crate::Transaction;
use crate::TransactionOptions;
#[rstest::rstest]
#[case(Keyspace::Disable)]
#[case(Keyspace::Enable { keyspace_id: 0 })]
#[tokio::test]
async fn test_optimistic_heartbeat(#[case] keyspace: Keyspace) -> Result<(), io::Error> {
let scenario = FailScenario::setup();
fail::cfg("after-prewrite", "sleep(1500)").unwrap();
let heartbeats = Arc::new(AtomicUsize::new(0));
let heartbeats_cloned = heartbeats.clone();
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
move |req: &dyn Any| {
if req.downcast_ref::<kvrpcpb::TxnHeartBeatRequest>().is_some() {
heartbeats_cloned.fetch_add(1, Ordering::SeqCst);
Ok(Box::<kvrpcpb::TxnHeartBeatResponse>::default() as Box<dyn Any>)
} else if req.downcast_ref::<kvrpcpb::PrewriteRequest>().is_some() {
Ok(Box::<kvrpcpb::PrewriteResponse>::default() as Box<dyn Any>)
} else {
Ok(Box::<kvrpcpb::CommitResponse>::default() as Box<dyn Any>)
}
},
)));
let key1 = "key1".to_owned();
let mut heartbeat_txn = Transaction::new(
Timestamp::default(),
pd_client,
TransactionOptions::new_optimistic()
.heartbeat_option(HeartbeatOption::FixedTime(Duration::from_secs(1))),
keyspace,
);
heartbeat_txn.put(key1.clone(), "foo").await.unwrap();
let heartbeat_txn_handle = tokio::task::spawn_blocking(move || {
assert!(futures::executor::block_on(heartbeat_txn.commit()).is_ok())
});
assert_eq!(heartbeats.load(Ordering::SeqCst), 0);
heartbeat_txn_handle.await.unwrap();
assert_eq!(heartbeats.load(Ordering::SeqCst), 1);
scenario.teardown();
Ok(())
}
#[rstest::rstest]
#[case(Keyspace::Disable)]
#[case(Keyspace::Enable { keyspace_id: 0 })]
#[tokio::test]
async fn test_pessimistic_heartbeat(#[case] keyspace: Keyspace) -> Result<(), io::Error> {
let heartbeats = Arc::new(AtomicUsize::new(0));
let heartbeats_cloned = heartbeats.clone();
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
move |req: &dyn Any| {
if req.downcast_ref::<kvrpcpb::TxnHeartBeatRequest>().is_some() {
heartbeats_cloned.fetch_add(1, Ordering::SeqCst);
Ok(Box::<kvrpcpb::TxnHeartBeatResponse>::default() as Box<dyn Any>)
} else if req.downcast_ref::<kvrpcpb::PrewriteRequest>().is_some() {
Ok(Box::<kvrpcpb::PrewriteResponse>::default() as Box<dyn Any>)
} else if req
.downcast_ref::<kvrpcpb::PessimisticLockRequest>()
.is_some()
{
Ok(Box::<kvrpcpb::PessimisticLockResponse>::default() as Box<dyn Any>)
} else {
Ok(Box::<kvrpcpb::CommitResponse>::default() as Box<dyn Any>)
}
},
)));
let key1 = "key1".to_owned();
let mut heartbeat_txn = Transaction::new(
Timestamp::default(),
pd_client,
TransactionOptions::new_pessimistic()
.heartbeat_option(HeartbeatOption::FixedTime(Duration::from_secs(1))),
keyspace,
);
heartbeat_txn.put(key1.clone(), "foo").await.unwrap();
assert_eq!(heartbeats.load(Ordering::SeqCst), 0);
tokio::time::sleep(tokio::time::Duration::from_millis(1500)).await;
assert_eq!(heartbeats.load(Ordering::SeqCst), 1);
let heartbeat_txn_handle = tokio::spawn(async move {
assert!(heartbeat_txn.commit().await.is_ok());
});
heartbeat_txn_handle.await.unwrap();
Ok(())
}
}