tikv_client/transaction/client.rs
1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3use std::sync::Arc;
4
5use log::debug;
6use log::info;
7
8use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
9use crate::config::Config;
10use crate::pd::PdClient;
11use crate::pd::PdRpcClient;
12use crate::proto::pdpb::Timestamp;
13use crate::request::plan::CleanupLocksResult;
14use crate::request::EncodeKeyspace;
15use crate::request::KeyMode;
16use crate::request::Keyspace;
17use crate::request::Plan;
18use crate::timestamp::TimestampExt;
19use crate::transaction::lock::ResolveLocksOptions;
20use crate::transaction::lowering::new_scan_lock_request;
21use crate::transaction::lowering::new_unsafe_destroy_range_request;
22use crate::transaction::resolve_locks;
23use crate::transaction::ResolveLocksContext;
24use crate::transaction::Snapshot;
25use crate::transaction::Transaction;
26use crate::transaction::TransactionOptions;
27use crate::Backoff;
28use crate::BoundRange;
29use crate::Result;
30
31/// Protobuf-generated lock information returned by TiKV.
32///
33/// This type is generated from TiKV's protobuf definitions and may change in a
34/// future release even if the wire format is compatible.
35#[doc(inline)]
36pub use crate::proto::kvrpcpb::LockInfo as ProtoLockInfo;
37
38// FIXME: cargo-culted value
39const SCAN_LOCK_BATCH_SIZE: u32 = 1024;
40
41/// The TiKV transactional `Client` is used to interact with TiKV using transactional requests.
42///
43/// Transactions support optimistic and pessimistic modes. For more details see the SIG-transaction
44/// [docs](https://github.com/tikv/sig-transaction/tree/master/doc/tikv#optimistic-and-pessimistic-transactions).
45///
46/// Begin a [`Transaction`] by calling [`begin_optimistic`](Client::begin_optimistic) or
47/// [`begin_pessimistic`](Client::begin_pessimistic). A transaction must be rolled back or committed.
48///
49/// Besides transactions, the client provides some further functionality:
50/// - `gc`: trigger a GC process which clears stale data in the cluster.
51/// - `current_timestamp`: get the current `Timestamp` from PD.
52/// - `snapshot`: get a [`Snapshot`] of the database at a specified timestamp.
53/// A `Snapshot` is a read-only transaction.
54///
55/// The returned results of transactional requests are [`Future`](std::future::Future)s that must be
56/// awaited to execute.
57pub struct Client {
58 pd: Arc<PdRpcClient>,
59 keyspace: Keyspace,
60}
61
62impl Clone for Client {
63 fn clone(&self) -> Self {
64 Self {
65 pd: self.pd.clone(),
66 keyspace: self.keyspace,
67 }
68 }
69}
70
71impl Client {
72 /// Create a transactional [`Client`] and connect to the TiKV cluster.
73 ///
74 /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
75 /// PD must be provided, not the TiKV nodes. It's important to include more than one PD endpoint
76 /// (include all endpoints, if possible), this helps avoid having a single point of failure.
77 ///
78 /// # Examples
79 ///
80 /// ```rust,no_run
81 /// # use tikv_client::{Config, TransactionClient};
82 /// # use futures::prelude::*;
83 /// # futures::executor::block_on(async {
84 /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
85 /// # });
86 /// ```
87 pub async fn new<S: Into<String>>(pd_endpoints: Vec<S>) -> Result<Client> {
88 // debug!("creating transactional client");
89 Self::new_with_config(pd_endpoints, Config::default()).await
90 }
91
92 /// Create a transactional [`Client`] with a custom configuration, and connect to the TiKV cluster.
93 ///
94 /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
95 /// PD must be provided, not the TiKV nodes. It's important to include more than one PD endpoint
96 /// (include all endpoints, if possible), this helps avoid having a single point of failure.
97 ///
98 /// # Examples
99 ///
100 /// ```rust,no_run
101 /// # use tikv_client::{Config, TransactionClient};
102 /// # use futures::prelude::*;
103 /// # use std::time::Duration;
104 /// # futures::executor::block_on(async {
105 /// let client = TransactionClient::new_with_config(
106 /// vec!["192.168.0.100"],
107 /// Config::default().with_timeout(Duration::from_secs(60)),
108 /// )
109 /// .await
110 /// .unwrap();
111 /// # });
112 /// ```
113 pub async fn new_with_config<S: Into<String>>(
114 pd_endpoints: Vec<S>,
115 config: Config,
116 ) -> Result<Client> {
117 debug!("creating new transactional client");
118 let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
119 let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), true).await?);
120 let keyspace = match config.keyspace {
121 Some(name) => {
122 let keyspace = pd.load_keyspace(&name).await?;
123 Keyspace::Enable {
124 keyspace_id: keyspace.id,
125 }
126 }
127 None => Keyspace::Disable,
128 };
129 Ok(Client { pd, keyspace })
130 }
131
132 /// Create a transactional [`Client`] that uses API V2 without adding or removing any API V2
133 /// keyspace/key-mode prefix, with a custom configuration.
134 ///
135 /// This is intended for **server-side embedding** use cases. `config.keyspace` must be unset.
136 pub async fn new_with_config_api_v2_no_prefix<S: Into<String>>(
137 pd_endpoints: Vec<S>,
138 config: Config,
139 ) -> Result<Client> {
140 if config.keyspace.is_some() {
141 return Err(crate::Error::StringError(
142 "config.keyspace must be unset when using api-v2-no-prefix mode".to_owned(),
143 ));
144 }
145
146 debug!("creating new transactional client (api-v2-no-prefix)");
147 let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
148 let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), true).await?);
149 Ok(Client {
150 pd,
151 keyspace: Keyspace::ApiV2NoPrefix,
152 })
153 }
154
155 /// Creates a new optimistic [`Transaction`].
156 ///
157 /// Use the transaction to issue requests like [`get`](Transaction::get) or
158 /// [`put`](Transaction::put).
159 ///
160 /// Write operations do not lock data in TiKV, thus the commit request may fail due to a write
161 /// conflict.
162 ///
163 /// # Examples
164 ///
165 /// ```rust,no_run
166 /// # use tikv_client::{Config, TransactionClient};
167 /// # use futures::prelude::*;
168 /// # futures::executor::block_on(async {
169 /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
170 /// let mut transaction = client.begin_optimistic().await.unwrap();
171 /// // ... Issue some commands.
172 /// transaction.commit().await.unwrap();
173 /// # });
174 /// ```
175 pub async fn begin_optimistic(&self) -> Result<Transaction> {
176 debug!("creating new optimistic transaction");
177 let timestamp = self.current_timestamp().await?;
178 Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic()))
179 }
180
181 /// Creates a new pessimistic [`Transaction`].
182 ///
183 /// Write operations will lock the data until committed, thus commit requests should not suffer
184 /// from write conflicts.
185 ///
186 /// # Examples
187 ///
188 /// ```rust,no_run
189 /// # use tikv_client::{Config, TransactionClient};
190 /// # use futures::prelude::*;
191 /// # futures::executor::block_on(async {
192 /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
193 /// let mut transaction = client.begin_pessimistic().await.unwrap();
194 /// // ... Issue some commands.
195 /// transaction.commit().await.unwrap();
196 /// # });
197 /// ```
198 pub async fn begin_pessimistic(&self) -> Result<Transaction> {
199 debug!("creating new pessimistic transaction");
200 let timestamp = self.current_timestamp().await?;
201 Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic()))
202 }
203
204 /// Create a new customized [`Transaction`].
205 ///
206 /// # Examples
207 ///
208 /// ```rust,no_run
209 /// # use tikv_client::{Config, TransactionClient, TransactionOptions};
210 /// # use futures::prelude::*;
211 /// # futures::executor::block_on(async {
212 /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
213 /// let mut transaction = client
214 /// .begin_with_options(TransactionOptions::default().use_async_commit())
215 /// .await
216 /// .unwrap();
217 /// // ... Issue some commands.
218 /// transaction.commit().await.unwrap();
219 /// # });
220 /// ```
221 pub async fn begin_with_options(&self, options: TransactionOptions) -> Result<Transaction> {
222 debug!("creating new customized transaction");
223 let timestamp = self.current_timestamp().await?;
224 Ok(self.new_transaction(timestamp, options))
225 }
226
227 /// Create a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp).
228 pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot {
229 debug!("creating new snapshot");
230 Snapshot::new(self.new_transaction(timestamp, options.read_only()))
231 }
232
233 /// Retrieve the current [`Timestamp`].
234 ///
235 /// # Examples
236 ///
237 /// ```rust,no_run
238 /// # use tikv_client::{Config, TransactionClient};
239 /// # use futures::prelude::*;
240 /// # futures::executor::block_on(async {
241 /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
242 /// let timestamp = client.current_timestamp().await.unwrap();
243 /// # });
244 /// ```
245 pub async fn current_timestamp(&self) -> Result<Timestamp> {
246 self.pd.clone().get_timestamp().await
247 }
248
249 /// Request garbage collection (GC) of the TiKV cluster.
250 ///
251 /// GC deletes MVCC records whose timestamp is lower than the given `safepoint`. We must guarantee
252 /// that all transactions started before this timestamp had committed. We can keep an active
253 /// transaction list in application to decide which is the minimal start timestamp of them.
254 ///
255 /// For each key, the last mutation record (unless it's a deletion) before `safepoint` is retained.
256 ///
257 /// GC is performed by:
258 /// 1. resolving all locks with timestamp <= `safepoint`
259 /// 2. updating PD's known safepoint
260 ///
261 /// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview).
262 /// We skip the second step "delete ranges" which is an optimization for TiDB.
263 pub async fn gc(&self, safepoint: Timestamp) -> Result<bool> {
264 debug!("invoking transactional gc request");
265
266 let options = ResolveLocksOptions {
267 batch_size: SCAN_LOCK_BATCH_SIZE,
268 ..Default::default()
269 };
270 self.cleanup_locks(.., &safepoint, options).await?;
271
272 // update safepoint to PD
273 let res: bool = self
274 .pd
275 .clone()
276 .update_safepoint(safepoint.version())
277 .await?;
278 if !res {
279 info!("new safepoint != user-specified safepoint");
280 }
281 Ok(res)
282 }
283
284 pub async fn cleanup_locks(
285 &self,
286 range: impl Into<BoundRange>,
287 safepoint: &Timestamp,
288 options: ResolveLocksOptions,
289 ) -> Result<CleanupLocksResult> {
290 debug!("invoking cleanup async commit locks");
291 // scan all locks with ts <= safepoint
292 let ctx = ResolveLocksContext::default();
293 let backoff = Backoff::equal_jitter_backoff(100, 10000, 50);
294 let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn);
295 let req = new_scan_lock_request(range, safepoint, options.batch_size);
296 let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.keyspace, req)
297 .cleanup_locks(ctx.clone(), options, backoff, self.keyspace)
298 .retry_multi_region(DEFAULT_REGION_BACKOFF)
299 .extract_error()
300 .merge(crate::request::Collect)
301 .plan();
302 plan.execute().await
303 }
304
305 // Note: `batch_size` must be >= expected number of locks.
306 pub async fn scan_locks(
307 &self,
308 safepoint: &Timestamp,
309 range: impl Into<BoundRange>,
310 batch_size: u32,
311 ) -> Result<Vec<ProtoLockInfo>> {
312 use crate::request::TruncateKeyspace;
313
314 let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn);
315 let req = new_scan_lock_request(range, safepoint, batch_size);
316 let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.keyspace, req)
317 .retry_multi_region(DEFAULT_REGION_BACKOFF)
318 .merge(crate::request::Collect)
319 .plan();
320 Ok(plan.execute().await?.truncate_keyspace(self.keyspace))
321 }
322
323 /// Resolves the given locks and returns any that remain live.
324 ///
325 /// This method retries until either all locks are resolved or the provided
326 /// `backoff` is exhausted. The `timestamp` is used as the caller start
327 /// timestamp when checking transaction status.
328 pub async fn resolve_locks(
329 &self,
330 locks: Vec<ProtoLockInfo>,
331 timestamp: Timestamp,
332 mut backoff: Backoff,
333 ) -> Result<Vec<ProtoLockInfo>> {
334 use crate::request::TruncateKeyspace;
335
336 let mut live_locks = locks;
337 loop {
338 let resolved_locks = resolve_locks(
339 live_locks.encode_keyspace(self.keyspace, KeyMode::Txn),
340 timestamp.clone(),
341 self.pd.clone(),
342 self.keyspace,
343 )
344 .await?;
345 live_locks = resolved_locks.truncate_keyspace(self.keyspace);
346 if live_locks.is_empty() {
347 return Ok(live_locks);
348 }
349
350 match backoff.next_delay_duration() {
351 None => return Ok(live_locks),
352 Some(delay_duration) => {
353 tokio::time::sleep(delay_duration).await;
354 }
355 }
356 }
357 }
358
359 /// Cleans up all keys in a range and quickly reclaim disk space.
360 ///
361 /// The range can span over multiple regions.
362 ///
363 /// Note that the request will directly delete data from RocksDB, and all MVCC will be erased.
364 ///
365 /// This interface is intended for special scenarios that resemble operations like "drop table" or "drop database" in TiDB.
366 pub async fn unsafe_destroy_range(&self, range: impl Into<BoundRange>) -> Result<()> {
367 let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn);
368 let req = new_unsafe_destroy_range_request(range);
369 let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.keyspace, req)
370 .all_stores(DEFAULT_STORE_BACKOFF)
371 .merge(crate::request::Collect)
372 .plan();
373 plan.execute().await
374 }
375
376 fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction {
377 Transaction::new(timestamp, self.pd.clone(), options, self.keyspace)
378 }
379}