use std::sync::Arc;
use log::debug;
use log::info;
use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
use crate::config::Config;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::pdpb::Timestamp;
use crate::request::plan::CleanupLocksResult;
use crate::request::EncodeKeyspace;
use crate::request::KeyMode;
use crate::request::Keyspace;
use crate::request::Plan;
use crate::timestamp::TimestampExt;
use crate::transaction::lock::ResolveLocksOptions;
use crate::transaction::lowering::new_scan_lock_request;
use crate::transaction::lowering::new_unsafe_destroy_range_request;
use crate::transaction::resolve_locks;
use crate::transaction::ResolveLocksContext;
use crate::transaction::Snapshot;
use crate::transaction::Transaction;
use crate::transaction::TransactionOptions;
use crate::Backoff;
use crate::BoundRange;
use crate::Result;
#[doc(inline)]
pub use crate::proto::kvrpcpb::LockInfo as ProtoLockInfo;
const SCAN_LOCK_BATCH_SIZE: u32 = 1024;
pub struct Client {
pd: Arc<PdRpcClient>,
keyspace: Keyspace,
}
impl Clone for Client {
fn clone(&self) -> Self {
Self {
pd: self.pd.clone(),
keyspace: self.keyspace,
}
}
}
impl Client {
pub async fn new<S: Into<String>>(pd_endpoints: Vec<S>) -> Result<Client> {
Self::new_with_config(pd_endpoints, Config::default()).await
}
pub async fn new_with_config<S: Into<String>>(
pd_endpoints: Vec<S>,
config: Config,
) -> Result<Client> {
debug!("creating new transactional client");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), true).await?);
let keyspace = match config.keyspace {
Some(name) => {
let keyspace = pd.load_keyspace(&name).await?;
Keyspace::Enable {
keyspace_id: keyspace.id,
}
}
None => Keyspace::Disable,
};
Ok(Client { pd, keyspace })
}
pub async fn new_with_config_api_v2_no_prefix<S: Into<String>>(
pd_endpoints: Vec<S>,
config: Config,
) -> Result<Client> {
if config.keyspace.is_some() {
return Err(crate::Error::StringError(
"config.keyspace must be unset when using api-v2-no-prefix mode".to_owned(),
));
}
debug!("creating new transactional client (api-v2-no-prefix)");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), true).await?);
Ok(Client {
pd,
keyspace: Keyspace::ApiV2NoPrefix,
})
}
pub async fn begin_optimistic(&self) -> Result<Transaction> {
debug!("creating new optimistic transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic()))
}
pub async fn begin_pessimistic(&self) -> Result<Transaction> {
debug!("creating new pessimistic transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic()))
}
pub async fn begin_with_options(&self, options: TransactionOptions) -> Result<Transaction> {
debug!("creating new customized transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, options))
}
pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot {
debug!("creating new snapshot");
Snapshot::new(self.new_transaction(timestamp, options.read_only()))
}
pub async fn current_timestamp(&self) -> Result<Timestamp> {
self.pd.clone().get_timestamp().await
}
pub async fn gc(&self, safepoint: Timestamp) -> Result<bool> {
debug!("invoking transactional gc request");
let options = ResolveLocksOptions {
batch_size: SCAN_LOCK_BATCH_SIZE,
..Default::default()
};
self.cleanup_locks(.., &safepoint, options).await?;
let res: bool = self
.pd
.clone()
.update_safepoint(safepoint.version())
.await?;
if !res {
info!("new safepoint != user-specified safepoint");
}
Ok(res)
}
pub async fn cleanup_locks(
&self,
range: impl Into<BoundRange>,
safepoint: &Timestamp,
options: ResolveLocksOptions,
) -> Result<CleanupLocksResult> {
debug!("invoking cleanup async commit locks");
let ctx = ResolveLocksContext::default();
let backoff = Backoff::equal_jitter_backoff(100, 10000, 50);
let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn);
let req = new_scan_lock_request(range, safepoint, options.batch_size);
let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.keyspace, req)
.cleanup_locks(ctx.clone(), options, backoff, self.keyspace)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.merge(crate::request::Collect)
.plan();
plan.execute().await
}
pub async fn scan_locks(
&self,
safepoint: &Timestamp,
range: impl Into<BoundRange>,
batch_size: u32,
) -> Result<Vec<ProtoLockInfo>> {
use crate::request::TruncateKeyspace;
let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn);
let req = new_scan_lock_request(range, safepoint, batch_size);
let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.keyspace, req)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(crate::request::Collect)
.plan();
Ok(plan.execute().await?.truncate_keyspace(self.keyspace))
}
pub async fn resolve_locks(
&self,
locks: Vec<ProtoLockInfo>,
timestamp: Timestamp,
mut backoff: Backoff,
) -> Result<Vec<ProtoLockInfo>> {
use crate::request::TruncateKeyspace;
let mut live_locks = locks;
loop {
let resolved_locks = resolve_locks(
live_locks.encode_keyspace(self.keyspace, KeyMode::Txn),
timestamp.clone(),
self.pd.clone(),
self.keyspace,
)
.await?;
live_locks = resolved_locks.truncate_keyspace(self.keyspace);
if live_locks.is_empty() {
return Ok(live_locks);
}
match backoff.next_delay_duration() {
None => return Ok(live_locks),
Some(delay_duration) => {
tokio::time::sleep(delay_duration).await;
}
}
}
}
pub async fn unsafe_destroy_range(&self, range: impl Into<BoundRange>) -> Result<()> {
let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn);
let req = new_unsafe_destroy_range_request(range);
let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.keyspace, req)
.all_stores(DEFAULT_STORE_BACKOFF)
.merge(crate::request::Collect)
.plan();
plan.execute().await
}
fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction {
Transaction::new(timestamp, self.pd.clone(), options, self.keyspace)
}
}