use std::sync::Arc;
use log::debug;
use log::info;
use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
use crate::config::Config;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::pdpb::Timestamp;
use crate::request::codec::{ApiV1TxnCodec, ApiV2TxnCodec, Codec, EncodedRequest};
use crate::request::plan::CleanupLocksResult;
use crate::request::Plan;
use crate::timestamp::TimestampExt;
use crate::transaction::lock::ResolveLocksOptions;
use crate::transaction::lowering::new_scan_lock_request;
use crate::transaction::lowering::new_unsafe_destroy_range_request;
use crate::transaction::ResolveLocksContext;
use crate::transaction::Snapshot;
use crate::transaction::Transaction;
use crate::transaction::TransactionOptions;
use crate::Backoff;
use crate::BoundRange;
use crate::Result;
const SCAN_LOCK_BATCH_SIZE: u32 = 1024;
pub struct Client<Cod: Codec = ApiV1TxnCodec> {
pd: Arc<PdRpcClient<Cod>>,
}
impl<Cod: Codec> Clone for Client<Cod> {
fn clone(&self) -> Self {
Self {
pd: self.pd.clone(),
}
}
}
impl Client<ApiV1TxnCodec> {
pub async fn new<S: Into<String>>(pd_endpoints: Vec<S>) -> Result<Client> {
Self::new_with_config(pd_endpoints, Config::default()).await
}
pub async fn new_with_config<S: Into<String>>(
pd_endpoints: Vec<S>,
config: Config,
) -> Result<Client> {
Self::new_with_codec(pd_endpoints, config, ApiV1TxnCodec::default()).await
}
}
impl Client<ApiV2TxnCodec> {
pub async fn new_with_config_v2<S: Into<String>>(
_keyspace_name: &str,
pd_endpoints: Vec<S>,
config: Config,
) -> Result<Client<ApiV2TxnCodec>> {
debug!("creating new transactional client APIv2");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let mut pd = PdRpcClient::connect(&pd_endpoints, config, true, None).await?;
let keyspace_id = 0; pd.set_codec(ApiV2TxnCodec::new(keyspace_id));
Ok(Client { pd: Arc::new(pd) })
}
}
impl<Cod: Codec> Client<Cod> {
pub async fn new_with_codec<S: Into<String>>(
pd_endpoints: Vec<S>,
config: Config,
codec: Cod,
) -> Result<Client<Cod>> {
debug!("creating new transactional client");
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let pd =
Arc::new(PdRpcClient::<Cod>::connect(&pd_endpoints, config, true, Some(codec)).await?);
Ok(Client { pd })
}
pub async fn begin_optimistic(&self) -> Result<Transaction<Cod, PdRpcClient<Cod>>> {
debug!("creating new optimistic transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic()))
}
pub async fn begin_pessimistic(&self) -> Result<Transaction<Cod, PdRpcClient<Cod>>> {
debug!("creating new pessimistic transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic()))
}
pub async fn begin_with_options(
&self,
options: TransactionOptions,
) -> Result<Transaction<Cod, PdRpcClient<Cod>>> {
debug!("creating new customized transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, options))
}
pub fn snapshot(
&self,
timestamp: Timestamp,
options: TransactionOptions,
) -> Snapshot<Cod, PdRpcClient<Cod>> {
debug!("creating new snapshot");
Snapshot::new(self.new_transaction(timestamp, options.read_only()))
}
pub async fn current_timestamp(&self) -> Result<Timestamp> {
self.pd.clone().get_timestamp().await
}
pub async fn gc(&self, safepoint: Timestamp) -> Result<bool> {
debug!("invoking transactional gc request");
let options = ResolveLocksOptions {
batch_size: SCAN_LOCK_BATCH_SIZE,
..Default::default()
};
self.cleanup_locks(.., &safepoint, options).await?;
let res: bool = self
.pd
.clone()
.update_safepoint(safepoint.version())
.await?;
if !res {
info!("new safepoint != user-specified safepoint");
}
Ok(res)
}
pub async fn cleanup_locks(
&self,
range: impl Into<BoundRange>,
safepoint: &Timestamp,
options: ResolveLocksOptions,
) -> Result<CleanupLocksResult> {
debug!("invoking cleanup async commit locks");
let ctx = ResolveLocksContext::default();
let backoff = Backoff::equal_jitter_backoff(100, 10000, 50);
let req = new_scan_lock_request(range.into(), safepoint, options.batch_size);
let encoded_req = EncodedRequest::new(req, self.pd.get_codec());
let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req)
.cleanup_locks(ctx.clone(), options, backoff)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.merge(crate::request::Collect)
.plan();
plan.execute().await
}
#[cfg(feature = "integration-tests")]
pub async fn scan_locks(
&self,
safepoint: &Timestamp,
range: impl Into<BoundRange>,
batch_size: u32,
) -> Result<Vec<crate::proto::kvrpcpb::LockInfo>> {
let req = new_scan_lock_request(range.into(), safepoint, batch_size);
let encoded_req = EncodedRequest::new(req, self.pd.get_codec());
let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(crate::request::Collect)
.plan();
plan.execute().await
}
pub async fn unsafe_destroy_range(&self, range: impl Into<BoundRange>) -> Result<()> {
let req = new_unsafe_destroy_range_request(range.into());
let encoded_req = EncodedRequest::new(req, self.pd.get_codec());
let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req)
.all_stores(DEFAULT_STORE_BACKOFF)
.merge(crate::request::Collect)
.plan();
plan.execute().await
}
fn new_transaction(
&self,
timestamp: Timestamp,
options: TransactionOptions,
) -> Transaction<Cod, PdRpcClient<Cod>> {
Transaction::new(timestamp, self.pd.clone(), options)
}
}