Skip to main content

tikv_client/transaction/
client.rs

1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3use std::sync::Arc;
4
5use log::debug;
6use log::info;
7
8use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
9use crate::config::Config;
10use crate::pd::PdClient;
11use crate::pd::PdRpcClient;
12use crate::proto::pdpb::Timestamp;
13use crate::request::plan::CleanupLocksResult;
14use crate::request::EncodeKeyspace;
15use crate::request::KeyMode;
16use crate::request::Keyspace;
17use crate::request::Plan;
18use crate::timestamp::TimestampExt;
19use crate::transaction::lock::ResolveLocksOptions;
20use crate::transaction::lowering::new_scan_lock_request;
21use crate::transaction::lowering::new_unsafe_destroy_range_request;
22use crate::transaction::resolve_locks;
23use crate::transaction::ResolveLocksContext;
24use crate::transaction::Snapshot;
25use crate::transaction::Transaction;
26use crate::transaction::TransactionOptions;
27use crate::Backoff;
28use crate::BoundRange;
29use crate::Result;
30
31/// Protobuf-generated lock information returned by TiKV.
32///
33/// This type is generated from TiKV's protobuf definitions and may change in a
34/// future release even if the wire format is compatible.
35#[doc(inline)]
36pub use crate::proto::kvrpcpb::LockInfo as ProtoLockInfo;
37
38// FIXME: cargo-culted value
39const SCAN_LOCK_BATCH_SIZE: u32 = 1024;
40
41/// The TiKV transactional `Client` is used to interact with TiKV using transactional requests.
42///
43/// Transactions support optimistic and pessimistic modes. For more details see the SIG-transaction
44/// [docs](https://github.com/tikv/sig-transaction/tree/master/doc/tikv#optimistic-and-pessimistic-transactions).
45///
46/// Begin a [`Transaction`] by calling [`begin_optimistic`](Client::begin_optimistic) or
47/// [`begin_pessimistic`](Client::begin_pessimistic). A transaction must be rolled back or committed.
48///
49/// Besides transactions, the client provides some further functionality:
50/// - `gc`: trigger a GC process which clears stale data in the cluster.
51/// - `current_timestamp`: get the current `Timestamp` from PD.
52/// - `snapshot`: get a [`Snapshot`] of the database at a specified timestamp.
53///   A `Snapshot` is a read-only transaction.
54///
55/// The returned results of transactional requests are [`Future`](std::future::Future)s that must be
56/// awaited to execute.
57pub struct Client {
58    pd: Arc<PdRpcClient>,
59    keyspace: Keyspace,
60}
61
62impl Clone for Client {
63    fn clone(&self) -> Self {
64        Self {
65            pd: self.pd.clone(),
66            keyspace: self.keyspace,
67        }
68    }
69}
70
71impl Client {
72    /// Create a transactional [`Client`] and connect to the TiKV cluster.
73    ///
74    /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
75    /// PD must be provided, not the TiKV nodes. It's important to include more than one PD endpoint
76    /// (include all endpoints, if possible), this helps avoid having a single point of failure.
77    ///
78    /// # Examples
79    ///
80    /// ```rust,no_run
81    /// # use tikv_client::{Config, TransactionClient};
82    /// # use futures::prelude::*;
83    /// # futures::executor::block_on(async {
84    /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
85    /// # });
86    /// ```
87    pub async fn new<S: Into<String>>(pd_endpoints: Vec<S>) -> Result<Client> {
88        // debug!("creating transactional client");
89        Self::new_with_config(pd_endpoints, Config::default()).await
90    }
91
92    /// Create a transactional [`Client`] with a custom configuration, and connect to the TiKV cluster.
93    ///
94    /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
95    /// PD must be provided, not the TiKV nodes. It's important to include more than one PD endpoint
96    /// (include all endpoints, if possible), this helps avoid having a single point of failure.
97    ///
98    /// # Examples
99    ///
100    /// ```rust,no_run
101    /// # use tikv_client::{Config, TransactionClient};
102    /// # use futures::prelude::*;
103    /// # use std::time::Duration;
104    /// # futures::executor::block_on(async {
105    /// let client = TransactionClient::new_with_config(
106    ///     vec!["192.168.0.100"],
107    ///     Config::default().with_timeout(Duration::from_secs(60)),
108    /// )
109    /// .await
110    /// .unwrap();
111    /// # });
112    /// ```
113    pub async fn new_with_config<S: Into<String>>(
114        pd_endpoints: Vec<S>,
115        config: Config,
116    ) -> Result<Client> {
117        debug!("creating new transactional client");
118        let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
119        let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), true).await?);
120        let keyspace = match config.keyspace {
121            Some(name) => {
122                let keyspace = pd.load_keyspace(&name).await?;
123                Keyspace::Enable {
124                    keyspace_id: keyspace.id,
125                }
126            }
127            None => Keyspace::Disable,
128        };
129        Ok(Client { pd, keyspace })
130    }
131
132    /// Create a transactional [`Client`] that uses API V2 without adding or removing any API V2
133    /// keyspace/key-mode prefix, with a custom configuration.
134    ///
135    /// This is intended for **server-side embedding** use cases. `config.keyspace` must be unset.
136    pub async fn new_with_config_api_v2_no_prefix<S: Into<String>>(
137        pd_endpoints: Vec<S>,
138        config: Config,
139    ) -> Result<Client> {
140        if config.keyspace.is_some() {
141            return Err(crate::Error::StringError(
142                "config.keyspace must be unset when using api-v2-no-prefix mode".to_owned(),
143            ));
144        }
145
146        debug!("creating new transactional client (api-v2-no-prefix)");
147        let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
148        let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), true).await?);
149        Ok(Client {
150            pd,
151            keyspace: Keyspace::ApiV2NoPrefix,
152        })
153    }
154
155    /// Creates a new optimistic [`Transaction`].
156    ///
157    /// Use the transaction to issue requests like [`get`](Transaction::get) or
158    /// [`put`](Transaction::put).
159    ///
160    /// Write operations do not lock data in TiKV, thus the commit request may fail due to a write
161    /// conflict.
162    ///
163    /// # Examples
164    ///
165    /// ```rust,no_run
166    /// # use tikv_client::{Config, TransactionClient};
167    /// # use futures::prelude::*;
168    /// # futures::executor::block_on(async {
169    /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
170    /// let mut transaction = client.begin_optimistic().await.unwrap();
171    /// // ... Issue some commands.
172    /// transaction.commit().await.unwrap();
173    /// # });
174    /// ```
175    pub async fn begin_optimistic(&self) -> Result<Transaction> {
176        debug!("creating new optimistic transaction");
177        let timestamp = self.current_timestamp().await?;
178        Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic()))
179    }
180
181    /// Creates a new pessimistic [`Transaction`].
182    ///
183    /// Write operations will lock the data until committed, thus commit requests should not suffer
184    /// from write conflicts.
185    ///
186    /// # Examples
187    ///
188    /// ```rust,no_run
189    /// # use tikv_client::{Config, TransactionClient};
190    /// # use futures::prelude::*;
191    /// # futures::executor::block_on(async {
192    /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
193    /// let mut transaction = client.begin_pessimistic().await.unwrap();
194    /// // ... Issue some commands.
195    /// transaction.commit().await.unwrap();
196    /// # });
197    /// ```
198    pub async fn begin_pessimistic(&self) -> Result<Transaction> {
199        debug!("creating new pessimistic transaction");
200        let timestamp = self.current_timestamp().await?;
201        Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic()))
202    }
203
204    /// Create a new customized [`Transaction`].
205    ///
206    /// # Examples
207    ///
208    /// ```rust,no_run
209    /// # use tikv_client::{Config, TransactionClient, TransactionOptions};
210    /// # use futures::prelude::*;
211    /// # futures::executor::block_on(async {
212    /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
213    /// let mut transaction = client
214    ///     .begin_with_options(TransactionOptions::default().use_async_commit())
215    ///     .await
216    ///     .unwrap();
217    /// // ... Issue some commands.
218    /// transaction.commit().await.unwrap();
219    /// # });
220    /// ```
221    pub async fn begin_with_options(&self, options: TransactionOptions) -> Result<Transaction> {
222        debug!("creating new customized transaction");
223        let timestamp = self.current_timestamp().await?;
224        Ok(self.new_transaction(timestamp, options))
225    }
226
227    /// Create a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp).
228    pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot {
229        debug!("creating new snapshot");
230        Snapshot::new(self.new_transaction(timestamp, options.read_only()))
231    }
232
233    /// Retrieve the current [`Timestamp`].
234    ///
235    /// # Examples
236    ///
237    /// ```rust,no_run
238    /// # use tikv_client::{Config, TransactionClient};
239    /// # use futures::prelude::*;
240    /// # futures::executor::block_on(async {
241    /// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
242    /// let timestamp = client.current_timestamp().await.unwrap();
243    /// # });
244    /// ```
245    pub async fn current_timestamp(&self) -> Result<Timestamp> {
246        self.pd.clone().get_timestamp().await
247    }
248
249    /// Request garbage collection (GC) of the TiKV cluster.
250    ///
251    /// GC deletes MVCC records whose timestamp is lower than the given `safepoint`. We must guarantee
252    ///  that all transactions started before this timestamp had committed. We can keep an active
253    /// transaction list in application to decide which is the minimal start timestamp of them.
254    ///
255    /// For each key, the last mutation record (unless it's a deletion) before `safepoint` is retained.
256    ///
257    /// GC is performed by:
258    /// 1. resolving all locks with timestamp <= `safepoint`
259    /// 2. updating PD's known safepoint
260    ///
261    /// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview).
262    /// We skip the second step "delete ranges" which is an optimization for TiDB.
263    pub async fn gc(&self, safepoint: Timestamp) -> Result<bool> {
264        debug!("invoking transactional gc request");
265
266        let options = ResolveLocksOptions {
267            batch_size: SCAN_LOCK_BATCH_SIZE,
268            ..Default::default()
269        };
270        self.cleanup_locks(.., &safepoint, options).await?;
271
272        // update safepoint to PD
273        let res: bool = self
274            .pd
275            .clone()
276            .update_safepoint(safepoint.version())
277            .await?;
278        if !res {
279            info!("new safepoint != user-specified safepoint");
280        }
281        Ok(res)
282    }
283
284    pub async fn cleanup_locks(
285        &self,
286        range: impl Into<BoundRange>,
287        safepoint: &Timestamp,
288        options: ResolveLocksOptions,
289    ) -> Result<CleanupLocksResult> {
290        debug!("invoking cleanup async commit locks");
291        // scan all locks with ts <= safepoint
292        let ctx = ResolveLocksContext::default();
293        let backoff = Backoff::equal_jitter_backoff(100, 10000, 50);
294        let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn);
295        let req = new_scan_lock_request(range, safepoint, options.batch_size);
296        let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.keyspace, req)
297            .cleanup_locks(ctx.clone(), options, backoff, self.keyspace)
298            .retry_multi_region(DEFAULT_REGION_BACKOFF)
299            .extract_error()
300            .merge(crate::request::Collect)
301            .plan();
302        plan.execute().await
303    }
304
305    // Note: `batch_size` must be >= expected number of locks.
306    pub async fn scan_locks(
307        &self,
308        safepoint: &Timestamp,
309        range: impl Into<BoundRange>,
310        batch_size: u32,
311    ) -> Result<Vec<ProtoLockInfo>> {
312        use crate::request::TruncateKeyspace;
313
314        let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn);
315        let req = new_scan_lock_request(range, safepoint, batch_size);
316        let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.keyspace, req)
317            .retry_multi_region(DEFAULT_REGION_BACKOFF)
318            .merge(crate::request::Collect)
319            .plan();
320        Ok(plan.execute().await?.truncate_keyspace(self.keyspace))
321    }
322
323    /// Resolves the given locks and returns any that remain live.
324    ///
325    /// This method retries until either all locks are resolved or the provided
326    /// `backoff` is exhausted. The `timestamp` is used as the caller start
327    /// timestamp when checking transaction status.
328    pub async fn resolve_locks(
329        &self,
330        locks: Vec<ProtoLockInfo>,
331        timestamp: Timestamp,
332        mut backoff: Backoff,
333    ) -> Result<Vec<ProtoLockInfo>> {
334        use crate::request::TruncateKeyspace;
335
336        let mut live_locks = locks;
337        loop {
338            let resolved_locks = resolve_locks(
339                live_locks.encode_keyspace(self.keyspace, KeyMode::Txn),
340                timestamp.clone(),
341                self.pd.clone(),
342                self.keyspace,
343            )
344            .await?;
345            live_locks = resolved_locks.truncate_keyspace(self.keyspace);
346            if live_locks.is_empty() {
347                return Ok(live_locks);
348            }
349
350            match backoff.next_delay_duration() {
351                None => return Ok(live_locks),
352                Some(delay_duration) => {
353                    tokio::time::sleep(delay_duration).await;
354                }
355            }
356        }
357    }
358
359    /// Cleans up all keys in a range and quickly reclaim disk space.
360    ///
361    /// The range can span over multiple regions.
362    ///
363    /// Note that the request will directly delete data from RocksDB, and all MVCC will be erased.
364    ///
365    /// This interface is intended for special scenarios that resemble operations like "drop table" or "drop database" in TiDB.
366    pub async fn unsafe_destroy_range(&self, range: impl Into<BoundRange>) -> Result<()> {
367        let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn);
368        let req = new_unsafe_destroy_range_request(range);
369        let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.keyspace, req)
370            .all_stores(DEFAULT_STORE_BACKOFF)
371            .merge(crate::request::Collect)
372            .plan();
373        plan.execute().await
374    }
375
376    fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction {
377        Transaction::new(timestamp, self.pd.clone(), options, self.keyspace)
378    }
379}