Skip to main content

tikv_client/transaction/
transaction.rs

1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3use std::iter;
4use std::sync::atomic;
5use std::sync::atomic::AtomicU8;
6use std::sync::Arc;
7use std::time::Instant;
8
9use derive_new::new;
10use fail::fail_point;
11use futures::prelude::*;
12use log::{debug, error, info, trace, warn};
13use tokio::time::Duration;
14
15use crate::backoff::Backoff;
16use crate::backoff::DEFAULT_REGION_BACKOFF;
17use crate::kv::HexRepr;
18use crate::pd::PdClient;
19use crate::pd::PdRpcClient;
20use crate::proto::kvrpcpb;
21use crate::proto::pdpb::Timestamp;
22use crate::request::Collect;
23use crate::request::CollectError;
24use crate::request::CollectSingle;
25use crate::request::CollectWithShard;
26use crate::request::EncodeKeyspace;
27use crate::request::KeyMode;
28use crate::request::Keyspace;
29use crate::request::Plan;
30use crate::request::PlanBuilder;
31use crate::request::RetryOptions;
32use crate::request::TruncateKeyspace;
33use crate::timestamp::TimestampExt;
34use crate::transaction::buffer::Buffer;
35use crate::transaction::lowering::*;
36use crate::BoundRange;
37use crate::Error;
38use crate::Key;
39use crate::KvPair;
40use crate::Result;
41use crate::Value;
42
43/// An undo-able set of actions on the dataset.
44///
45/// Create a transaction using a [`TransactionClient`](crate::TransactionClient), then run actions
46/// (such as `get`, or `put`) on the transaction. Reads are executed immediately, writes are
47/// buffered locally. Once complete, `commit` the transaction. Behind the scenes, the client will
48/// perform a two phase commit and return success as soon as the writes are guaranteed to be
49/// committed (some finalisation may continue in the background after the return, but no data can be
50/// lost).
51///
52/// TiKV transactions use multi-version concurrency control. All reads logically happen at the start
53/// of the transaction (at the start timestamp, `start_ts`). Once a transaction is commited, a
54/// its writes atomically become visible to other transactions at (logically) the commit timestamp.
55///
56/// In other words, a transaction can read data that was committed at `commit_ts` < its `start_ts`,
57/// and its writes are readable by transactions with `start_ts` >= its `commit_ts`.
58///
59/// Mutations are buffered locally and sent to the TiKV cluster at the time of commit.
60/// In a pessimistic transaction, all write operations and `xxx_for_update` operations will immediately
61/// acquire locks from TiKV. Such a lock blocks other transactions from writing to that key.
62/// A lock exists until the transaction is committed or rolled back, or the lock reaches its time to
63/// live (TTL).
64///
65/// For details, the [SIG-Transaction](https://github.com/tikv/sig-transaction)
66/// provides materials explaining designs and implementations of TiKV transactions.
67///
68/// # Examples
69///
70/// ```rust,no_run
71/// # use tikv_client::{Config, TransactionClient};
72/// # use futures::prelude::*;
73/// # futures::executor::block_on(async {
74/// let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
75/// let mut txn = client.begin_optimistic().await.unwrap();
76/// let foo = txn.get("foo".to_owned()).await.unwrap().unwrap();
77/// txn.put("bar".to_owned(), foo).await.unwrap();
78/// txn.commit().await.unwrap();
79/// # });
80/// ```
81pub struct Transaction<PdC: PdClient = PdRpcClient> {
82    status: Arc<AtomicU8>,
83    timestamp: Timestamp,
84    buffer: Buffer,
85    rpc: Arc<PdC>,
86    options: TransactionOptions,
87    keyspace: Keyspace,
88    is_heartbeat_started: bool,
89    start_instant: Instant,
90}
91
92impl<PdC: PdClient> Transaction<PdC> {
93    pub(crate) fn new(
94        timestamp: Timestamp,
95        rpc: Arc<PdC>,
96        options: TransactionOptions,
97        keyspace: Keyspace,
98    ) -> Transaction<PdC> {
99        let status = if options.read_only {
100            TransactionStatus::ReadOnly
101        } else {
102            TransactionStatus::Active
103        };
104        Transaction {
105            status: Arc::new(AtomicU8::new(status as u8)),
106            timestamp,
107            buffer: Buffer::new(options.is_pessimistic()),
108            rpc,
109            options,
110            keyspace,
111            is_heartbeat_started: false,
112            start_instant: std::time::Instant::now(),
113        }
114    }
115
116    /// Create a new 'get' request
117    ///
118    /// Once resolved this request will result in the fetching of the value associated with the
119    /// given key.
120    ///
121    /// Retuning `Ok(None)` indicates the key does not exist in TiKV.
122    ///
123    /// # Examples
124    /// ```rust,no_run
125    /// # use tikv_client::{Value, Config, TransactionClient};
126    /// # use futures::prelude::*;
127    /// # futures::executor::block_on(async {
128    /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
129    /// let mut txn = client.begin_optimistic().await.unwrap();
130    /// let key = "TiKV".to_owned();
131    /// let result: Option<Value> = txn.get(key).await.unwrap();
132    /// # });
133    /// ```
134    pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
135        trace!("invoking transactional get request");
136        self.check_allow_operation().await?;
137        let timestamp = self.timestamp.clone();
138        let rpc = self.rpc.clone();
139        let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
140        let retry_options = self.options.retry_options.clone();
141        let keyspace = self.keyspace;
142
143        self.buffer
144            .get_or_else(key, |key| async move {
145                let request = new_get_request(key, timestamp.clone());
146                let plan = PlanBuilder::new(rpc, keyspace, request)
147                    .resolve_lock(timestamp, retry_options.lock_backoff, keyspace)
148                    .retry_multi_region(DEFAULT_REGION_BACKOFF)
149                    .merge(CollectSingle)
150                    .post_process_default()
151                    .plan();
152                plan.execute().await
153            })
154            .await
155    }
156
157    /// Create a `get for update` request.
158    ///
159    /// The request reads and "locks" a key. It is similar to `SELECT ... FOR
160    /// UPDATE` in TiDB, and has different behavior in optimistic and
161    /// pessimistic transactions.
162    ///
163    /// # Optimistic transaction
164    ///
165    /// It reads at the "start timestamp" and caches the value, just like normal
166    /// get requests. The lock is written in prewrite and commit, so it cannot
167    /// prevent concurrent transactions from writing the same key, but can only
168    /// prevent itself from committing.
169    ///
170    /// # Pessimistic transaction
171    ///
172    /// It reads at the "current timestamp" and thus does not cache the value.
173    /// So following read requests won't be affected by the `get_for_udpate`.
174    /// A lock will be acquired immediately with this request, which prevents
175    /// concurrent transactions from mutating the keys.
176    ///
177    /// The "current timestamp" (also called `for_update_ts` of the request) is fetched from PD.
178    ///
179    /// Note: The behavior of this command under pessimistic transaction does not follow snapshot.
180    /// It reads the latest value (using current timestamp), and the value is not cached in the
181    /// local buffer. So normal `get`-like commands after `get_for_update` will not be influenced,
182    /// they still read values at the transaction's `start_ts`.
183    ///
184    /// # Examples
185    ///
186    /// ```rust,no_run
187    /// # use tikv_client::{Value, Config, TransactionClient};
188    /// # use futures::prelude::*;
189    /// # futures::executor::block_on(async {
190    /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
191    /// let mut txn = client.begin_pessimistic().await.unwrap();
192    /// let key = "TiKV".to_owned();
193    /// let result: Value = txn.get_for_update(key).await.unwrap().unwrap();
194    /// // now the key "TiKV" is locked, other transactions cannot modify it
195    /// // Finish the transaction...
196    /// txn.commit().await.unwrap();
197    /// # });
198    /// ```
199    pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
200        debug!("invoking transactional get_for_update request");
201        self.check_allow_operation().await?;
202        if !self.is_pessimistic() {
203            let key = key.into();
204            self.lock_keys(iter::once(key.clone())).await?;
205            self.get(key).await
206        } else {
207            let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
208            let mut pairs = self.pessimistic_lock(iter::once(key), true).await?;
209            debug_assert!(pairs.len() <= 1);
210            match pairs.pop() {
211                Some(pair) => Ok(Some(pair.1)),
212                None => Ok(None),
213            }
214        }
215    }
216
217    /// Check whether a key exists.
218    ///
219    /// # Examples
220    ///
221    /// ```rust,no_run
222    /// # use tikv_client::{Value, Config, TransactionClient};
223    /// # use futures::prelude::*;
224    /// # futures::executor::block_on(async {
225    /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
226    /// let mut txn = client.begin_pessimistic().await.unwrap();
227    /// let exists = txn.key_exists("k1".to_owned()).await.unwrap();
228    /// txn.commit().await.unwrap();
229    /// # });
230    /// ```
231    pub async fn key_exists(&mut self, key: impl Into<Key>) -> Result<bool> {
232        debug!("invoking transactional key_exists request");
233        Ok(self.get(key).await?.is_some())
234    }
235
236    /// Create a new 'batch get' request.
237    ///
238    /// Once resolved this request will result in the fetching of the values associated with the
239    /// given keys.
240    ///
241    /// Non-existent entries will not appear in the result. The order of the keys is not retained in
242    /// the result.
243    ///
244    /// # Examples
245    ///
246    /// ```rust,no_run
247    /// # use tikv_client::{Key, Value, Config, TransactionClient};
248    /// # use futures::prelude::*;
249    /// # use std::collections::HashMap;
250    /// # futures::executor::block_on(async {
251    /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
252    /// let mut txn = client.begin_optimistic().await.unwrap();
253    /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
254    /// let result: HashMap<Key, Value> = txn
255    ///     .batch_get(keys)
256    ///     .await
257    ///     .unwrap()
258    ///     .map(|pair| (pair.0, pair.1))
259    ///     .collect();
260    /// // Finish the transaction...
261    /// txn.commit().await.unwrap();
262    /// # });
263    /// ```
264    pub async fn batch_get(
265        &mut self,
266        keys: impl IntoIterator<Item = impl Into<Key>>,
267    ) -> Result<impl Iterator<Item = KvPair>> {
268        debug!("invoking transactional batch_get request");
269        self.check_allow_operation().await?;
270        let timestamp = self.timestamp.clone();
271        let rpc = self.rpc.clone();
272        let keyspace = self.keyspace;
273        let keys = keys
274            .into_iter()
275            .map(move |k| k.into().encode_keyspace(keyspace, KeyMode::Txn));
276        let retry_options = self.options.retry_options.clone();
277
278        self.buffer
279            .batch_get_or_else(keys, move |keys| async move {
280                let request = new_batch_get_request(keys, timestamp.clone());
281                let plan = PlanBuilder::new(rpc, keyspace, request)
282                    .resolve_lock(timestamp, retry_options.lock_backoff, keyspace)
283                    .retry_multi_region(retry_options.region_backoff)
284                    .merge(Collect)
285                    .plan();
286                plan.execute()
287                    .await
288                    .map(|r| r.into_iter().map(Into::into).collect())
289            })
290            .await
291            .map(move |pairs| pairs.map(move |pair| pair.truncate_keyspace(keyspace)))
292    }
293
294    /// Create a new 'batch get for update' request.
295    ///
296    /// Similar to [`get_for_update`](Transaction::get_for_update), but it works
297    /// for a batch of keys.
298    ///
299    /// Non-existent entries will not appear in the result. The order of the
300    /// keys is not retained in the result.
301    ///
302    /// # Examples
303    ///
304    /// ```rust,no_run
305    /// # use tikv_client::{Key, Value, Config, TransactionClient, KvPair};
306    /// # use futures::prelude::*;
307    /// # use std::collections::HashMap;
308    /// # futures::executor::block_on(async {
309    /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
310    /// let mut txn = client.begin_pessimistic().await.unwrap();
311    /// let keys = vec!["foo".to_owned(), "bar".to_owned()];
312    /// let result: Vec<KvPair> = txn
313    ///     .batch_get_for_update(keys)
314    ///     .await
315    ///     .unwrap();
316    /// // now "foo" and "bar" are both locked
317    /// // Finish the transaction...
318    /// txn.commit().await.unwrap();
319    /// # });
320    /// ```
321    pub async fn batch_get_for_update(
322        &mut self,
323        keys: impl IntoIterator<Item = impl Into<Key>>,
324    ) -> Result<Vec<KvPair>> {
325        debug!("invoking transactional batch_get_for_update request");
326        self.check_allow_operation().await?;
327        if !self.is_pessimistic() {
328            let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
329            self.lock_keys(keys.clone()).await?;
330            Ok(self.batch_get(keys).await?.collect())
331        } else {
332            let keyspace = self.keyspace;
333            let keys = keys
334                .into_iter()
335                .map(move |k| k.into().encode_keyspace(keyspace, KeyMode::Txn));
336            let pairs = self
337                .pessimistic_lock(keys, true)
338                .await?
339                .truncate_keyspace(keyspace);
340            Ok(pairs)
341        }
342    }
343
344    /// Create a new 'scan' request.
345    ///
346    /// Once resolved this request will result in a `Vec` of all key-value pairs that lie in the
347    /// specified range.
348    ///
349    /// If the number of eligible key-value pairs are greater than `limit`,
350    /// only the first `limit` pairs are returned, ordered by key.
351    ///
352    /// # Examples
353    ///
354    /// ```rust,no_run
355    /// # use tikv_client::{Key, KvPair, Value, Config, TransactionClient};
356    /// # use futures::prelude::*;
357    /// # use std::collections::HashMap;
358    /// # futures::executor::block_on(async {
359    /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
360    /// let mut txn = client.begin_optimistic().await.unwrap();
361    /// let key1: Key = b"foo".to_vec().into();
362    /// let key2: Key = b"bar".to_vec().into();
363    /// let result: Vec<KvPair> = txn
364    ///     .scan(key1..key2, 10)
365    ///     .await
366    ///     .unwrap()
367    ///     .collect();
368    /// // Finish the transaction...
369    /// txn.commit().await.unwrap();
370    /// # });
371    /// ```
372    pub async fn scan(
373        &mut self,
374        range: impl Into<BoundRange>,
375        limit: u32,
376    ) -> Result<impl Iterator<Item = KvPair>> {
377        debug!("invoking transactional scan request");
378        self.scan_inner(range, limit, false, false).await
379    }
380
381    /// Create a new 'scan' request that only returns the keys.
382    ///
383    /// Once resolved this request will result in a `Vec` of keys that lies in the specified range.
384    ///
385    /// If the number of eligible keys are greater than `limit`,
386    /// only the first `limit` keys are returned, ordered by key.
387    ///
388    /// # Examples
389    ///
390    /// ```rust,no_run
391    /// # use tikv_client::{Key, KvPair, Value, Config, TransactionClient};
392    /// # use futures::prelude::*;
393    /// # use std::collections::HashMap;
394    /// # futures::executor::block_on(async {
395    /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
396    /// let mut txn = client.begin_optimistic().await.unwrap();
397    /// let key1: Key = b"foo".to_vec().into();
398    /// let key2: Key = b"bar".to_vec().into();
399    /// let result: Vec<Key> = txn
400    ///     .scan_keys(key1..key2, 10)
401    ///     .await
402    ///     .unwrap()
403    ///     .collect();
404    /// // Finish the transaction...
405    /// txn.commit().await.unwrap();
406    /// # });
407    /// ```
408    pub async fn scan_keys(
409        &mut self,
410        range: impl Into<BoundRange>,
411        limit: u32,
412    ) -> Result<impl Iterator<Item = Key>> {
413        debug!("invoking transactional scan_keys request");
414        Ok(self
415            .scan_inner(range, limit, true, false)
416            .await?
417            .map(KvPair::into_key))
418    }
419
420    /// Create a 'scan_reverse' request.
421    ///
422    /// Similar to [`scan`](Transaction::scan), but scans in the reverse direction.
423    pub async fn scan_reverse(
424        &mut self,
425        range: impl Into<BoundRange>,
426        limit: u32,
427    ) -> Result<impl Iterator<Item = KvPair>> {
428        debug!("invoking transactional scan_reverse request");
429        self.scan_inner(range, limit, false, true).await
430    }
431
432    /// Create a 'scan_keys_reverse' request.
433    ///
434    /// Similar to [`scan`](Transaction::scan_keys), but scans in the reverse direction.
435    pub async fn scan_keys_reverse(
436        &mut self,
437        range: impl Into<BoundRange>,
438        limit: u32,
439    ) -> Result<impl Iterator<Item = Key>> {
440        debug!("invoking transactional scan_keys_reverse request");
441        Ok(self
442            .scan_inner(range, limit, true, true)
443            .await?
444            .map(KvPair::into_key))
445    }
446
447    /// Sets the value associated with the given key.
448    ///
449    /// # Examples
450    ///
451    /// ```rust,no_run
452    /// # use tikv_client::{Key, Value, Config, TransactionClient};
453    /// # use futures::prelude::*;
454    /// # futures::executor::block_on(async {
455    /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
456    /// let mut txn = client.begin_optimistic().await.unwrap();
457    /// let key = "foo".to_owned();
458    /// let val = "FOO".to_owned();
459    /// txn.put(key, val);
460    /// txn.commit().await.unwrap();
461    /// # });
462    /// ```
463    pub async fn put(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
464        trace!("invoking transactional put request");
465        self.check_allow_operation().await?;
466        let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
467        if self.is_pessimistic() {
468            self.pessimistic_lock(iter::once(key.clone()), false)
469                .await?;
470        }
471        self.buffer.put(key, value.into());
472        Ok(())
473    }
474
475    /// Inserts the value associated with the given key.
476    ///
477    /// Similar to [`put'], but it has an additional constraint that the key should not exist
478    /// before this operation.
479    ///
480    /// # Examples
481    ///
482    /// ```rust,no_run
483    /// # use tikv_client::{Key, Value, Config, TransactionClient};
484    /// # use futures::prelude::*;
485    /// # futures::executor::block_on(async {
486    /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
487    /// let mut txn = client.begin_optimistic().await.unwrap();
488    /// let key = "foo".to_owned();
489    /// let val = "FOO".to_owned();
490    /// txn.insert(key, val);
491    /// txn.commit().await.unwrap();
492    /// # });
493    /// ```
494    pub async fn insert(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
495        debug!("invoking transactional insert request");
496        self.check_allow_operation().await?;
497        let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
498        if self.buffer.get(&key).is_some() {
499            return Err(Error::DuplicateKeyInsertion);
500        }
501        if self.is_pessimistic() {
502            self.pessimistic_lock(
503                iter::once((key.clone(), kvrpcpb::Assertion::NotExist)),
504                false,
505            )
506            .await?;
507        }
508        self.buffer.insert(key, value.into());
509        Ok(())
510    }
511
512    /// Deletes the given key and its value from the database.
513    ///
514    /// Deleting a non-existent key will not result in an error.
515    ///
516    /// # Examples
517    ///
518    /// ```rust,no_run
519    /// # use tikv_client::{Key, Config, TransactionClient};
520    /// # use futures::prelude::*;
521    /// # futures::executor::block_on(async {
522    /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
523    /// let mut txn = client.begin_optimistic().await.unwrap();
524    /// let key = "foo".to_owned();
525    /// txn.delete(key);
526    /// txn.commit().await.unwrap();
527    /// # });
528    /// ```
529    pub async fn delete(&mut self, key: impl Into<Key>) -> Result<()> {
530        debug!("invoking transactional delete request");
531        self.check_allow_operation().await?;
532        let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
533        if self.is_pessimistic() {
534            self.pessimistic_lock(iter::once(key.clone()), false)
535                .await?;
536        }
537        self.buffer.delete(key);
538        Ok(())
539    }
540
541    /// Batch mutate the database.
542    ///
543    /// Only `Put` and `Delete` are supported.
544    ///
545    /// # Examples
546    ///
547    /// ```rust,no_run
548    /// # use tikv_client::{Key, Config, TransactionClient, transaction::Mutation};
549    /// # use futures::prelude::*;
550    /// # futures::executor::block_on(async {
551    /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
552    /// let mut txn = client.begin_optimistic().await.unwrap();
553    /// let mutations = vec![
554    ///     Mutation::Delete("k0".to_owned().into()),
555    ///     Mutation::Put("k1".to_owned().into(), b"v1".to_vec()),
556    /// ];
557    /// txn.batch_mutate(mutations).await.unwrap();
558    /// txn.commit().await.unwrap();
559    /// # });
560    /// ```
561    pub async fn batch_mutate(
562        &mut self,
563        mutations: impl IntoIterator<Item = Mutation>,
564    ) -> Result<()> {
565        debug!("invoking transactional batch mutate request");
566        self.check_allow_operation().await?;
567        let mutations: Vec<Mutation> = mutations
568            .into_iter()
569            .map(|mutation| mutation.encode_keyspace(self.keyspace, KeyMode::Txn))
570            .collect();
571        if self.is_pessimistic() {
572            self.pessimistic_lock(mutations.iter().map(|m| m.key().clone()), false)
573                .await?;
574            for m in mutations {
575                self.buffer.mutate(m);
576            }
577        } else {
578            for m in mutations.into_iter() {
579                self.buffer.mutate(m);
580            }
581        }
582        Ok(())
583    }
584
585    /// Lock the given keys without mutating their values.
586    ///
587    /// In optimistic mode, write conflicts are not checked until commit.
588    /// So use this command to indicate that
589    /// "I do not want to commit if the value associated with this key has been modified".
590    /// It's useful to avoid the *write skew* anomaly.
591    ///
592    /// In pessimistic mode, it is similar to [`batch_get_for_update`](Transaction::batch_get_for_update),
593    /// except that it does not read values.
594    ///
595    /// # Examples
596    ///
597    /// ```rust,no_run
598    /// # use tikv_client::{Config, TransactionClient};
599    /// # use futures::prelude::*;
600    /// # futures::executor::block_on(async {
601    /// # let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
602    /// let mut txn = client.begin_optimistic().await.unwrap();
603    /// txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]);
604    /// // ... Do some actions.
605    /// txn.commit().await.unwrap();
606    /// # });
607    /// ```
608    pub async fn lock_keys(
609        &mut self,
610        keys: impl IntoIterator<Item = impl Into<Key>>,
611    ) -> Result<()> {
612        debug!("invoking transactional lock_keys request");
613        self.check_allow_operation().await?;
614        let keyspace = self.keyspace;
615        let keys = keys
616            .into_iter()
617            .map(move |k| k.into().encode_keyspace(keyspace, KeyMode::Txn));
618        match self.options.kind {
619            TransactionKind::Optimistic => {
620                for key in keys {
621                    self.buffer.lock(key);
622                }
623            }
624            TransactionKind::Pessimistic(_) => {
625                self.pessimistic_lock(keys, false).await?;
626            }
627        }
628        Ok(())
629    }
630
631    /// Commits the actions of the transaction. On success, we return the commit timestamp (or
632    /// `None` if there was nothing to commit).
633    ///
634    /// # Examples
635    ///
636    /// ```rust,no_run
637    /// # use tikv_client::{Config, Timestamp, TransactionClient};
638    /// # use futures::prelude::*;
639    /// # futures::executor::block_on(async {
640    /// # let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
641    /// let mut txn = client.begin_optimistic().await.unwrap();
642    /// // ... Do some actions.
643    /// let result: Timestamp = txn.commit().await.unwrap().unwrap();
644    /// # });
645    /// ```
646    pub async fn commit(&mut self) -> Result<Option<Timestamp>> {
647        debug!("commiting transaction");
648        if !self.transit_status(
649            |status| {
650                matches!(
651                    status,
652                    TransactionStatus::StartedCommit | TransactionStatus::Active
653                )
654            },
655            TransactionStatus::StartedCommit,
656        ) {
657            return Err(Error::OperationAfterCommitError);
658        }
659
660        let primary_key = self.buffer.get_primary_key();
661        let mutations = self.buffer.to_proto_mutations();
662        if mutations.is_empty() {
663            assert!(primary_key.is_none());
664            return Ok(None);
665        }
666
667        self.start_auto_heartbeat().await;
668
669        let res = Committer::new(
670            primary_key,
671            mutations,
672            self.timestamp.clone(),
673            self.rpc.clone(),
674            self.options.clone(),
675            self.keyspace,
676            self.buffer.get_write_size() as u64,
677            self.start_instant,
678        )
679        .commit()
680        .await;
681
682        if res.is_ok() {
683            self.set_status(TransactionStatus::Committed);
684        }
685        res
686    }
687
688    /// Rollback the transaction.
689    ///
690    /// If it succeeds, all mutations made by this transaction will be discarded.
691    ///
692    /// # Examples
693    ///
694    /// ```rust,no_run
695    /// # use tikv_client::{Config, Timestamp, TransactionClient};
696    /// # use futures::prelude::*;
697    /// # futures::executor::block_on(async {
698    /// # let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
699    /// let mut txn = client.begin_optimistic().await.unwrap();
700    /// // ... Do some actions.
701    /// txn.rollback().await.unwrap();
702    /// # });
703    /// ```
704    pub async fn rollback(&mut self) -> Result<()> {
705        debug!("rolling back transaction");
706        if !self.transit_status(
707            |status| {
708                matches!(
709                    status,
710                    TransactionStatus::StartedRollback
711                        | TransactionStatus::Active
712                        | TransactionStatus::StartedCommit
713                )
714            },
715            TransactionStatus::StartedRollback,
716        ) {
717            return Err(Error::OperationAfterCommitError);
718        }
719
720        let primary_key = self.buffer.get_primary_key();
721        let mutations = self.buffer.to_proto_mutations();
722        let res = Committer::new(
723            primary_key,
724            mutations,
725            self.timestamp.clone(),
726            self.rpc.clone(),
727            self.options.clone(),
728            self.keyspace,
729            self.buffer.get_write_size() as u64,
730            self.start_instant,
731        )
732        .rollback()
733        .await;
734
735        if res.is_ok() {
736            self.set_status(TransactionStatus::Rolledback);
737        }
738        res
739    }
740
741    /// Get the start timestamp of this transaction.
742    pub fn start_timestamp(&self) -> Timestamp {
743        self.timestamp.clone()
744    }
745
746    /// Send a heart beat message to keep the transaction alive on the server and update its TTL.
747    ///
748    /// Returns the TTL set on the transaction's locks by TiKV.
749    #[doc(hidden)]
750    pub async fn send_heart_beat(&mut self) -> Result<u64> {
751        debug!("sending heart_beat");
752        self.check_allow_operation().await?;
753        let primary_key = match self.buffer.get_primary_key() {
754            Some(k) => k,
755            None => return Err(Error::NoPrimaryKey),
756        };
757        let request = new_heart_beat_request(
758            self.timestamp.clone(),
759            primary_key,
760            self.start_instant.elapsed().as_millis() as u64 + MAX_TTL,
761        );
762        let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
763            .resolve_lock(
764                self.timestamp.clone(),
765                self.options.retry_options.lock_backoff.clone(),
766                self.keyspace,
767            )
768            .retry_multi_region(self.options.retry_options.region_backoff.clone())
769            .extract_error()
770            .merge(CollectSingle)
771            .post_process_default()
772            .plan();
773        plan.execute().await
774    }
775
776    async fn scan_inner(
777        &mut self,
778        range: impl Into<BoundRange>,
779        limit: u32,
780        key_only: bool,
781        reverse: bool,
782    ) -> Result<impl Iterator<Item = KvPair>> {
783        self.check_allow_operation().await?;
784        let timestamp = self.timestamp.clone();
785        let rpc = self.rpc.clone();
786        let retry_options = self.options.retry_options.clone();
787        let keyspace = self.keyspace;
788        let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn);
789
790        self.buffer
791            .scan_and_fetch(
792                range,
793                limit,
794                !key_only,
795                reverse,
796                move |new_range, new_limit| async move {
797                    let request = new_scan_request(
798                        new_range,
799                        timestamp.clone(),
800                        new_limit,
801                        key_only,
802                        reverse,
803                    );
804                    let plan = PlanBuilder::new(rpc, keyspace, request)
805                        .resolve_lock(timestamp, retry_options.lock_backoff, keyspace)
806                        .retry_multi_region(retry_options.region_backoff)
807                        .merge(Collect)
808                        .plan();
809                    plan.execute()
810                        .await
811                        .map(|r| r.into_iter().map(Into::into).collect())
812                },
813            )
814            .await
815            .map(move |pairs| pairs.map(move |pair| pair.truncate_keyspace(keyspace)))
816    }
817
818    /// Pessimistically lock the keys, and optionally retrieve corresponding values.
819    /// If a key does not exist, the corresponding pair will not appear in the result.
820    ///
821    /// Once resolved it acquires locks on the keys in TiKV.
822    /// A lock prevents other transactions from mutating the entry until it is released.
823    ///
824    /// # Panics
825    ///
826    /// Only valid for pessimistic transactions, panics if called on an optimistic transaction.
827    async fn pessimistic_lock(
828        &mut self,
829        keys: impl IntoIterator<Item = impl PessimisticLock>,
830        need_value: bool,
831    ) -> Result<Vec<KvPair>> {
832        debug!("acquiring pessimistic lock");
833        assert!(
834            matches!(self.options.kind, TransactionKind::Pessimistic(_)),
835            "`pessimistic_lock` is only valid to use with pessimistic transactions"
836        );
837
838        let keys: Vec<_> = keys.into_iter().collect();
839        if keys.is_empty() {
840            return Ok(vec![]);
841        }
842
843        let first_key = keys[0].clone().key();
844        // we do not set the primary key here, because pessimistic lock request
845        // can fail, in which case the keys may not be part of the transaction.
846        let primary_lock = self
847            .buffer
848            .get_primary_key()
849            .unwrap_or_else(|| first_key.clone());
850        let for_update_ts = self.rpc.clone().get_timestamp().await?;
851        self.options.push_for_update_ts(for_update_ts.clone());
852        let request = new_pessimistic_lock_request(
853            keys.clone().into_iter(),
854            primary_lock,
855            self.timestamp.clone(),
856            MAX_TTL,
857            for_update_ts.clone(),
858            need_value,
859        );
860        let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
861            .resolve_lock(
862                self.timestamp.clone(),
863                self.options.retry_options.lock_backoff.clone(),
864                self.keyspace,
865            )
866            .preserve_shard()
867            .retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone())
868            .merge(CollectWithShard)
869            .plan();
870        let pairs = plan.execute().await;
871
872        if let Err(err) = pairs {
873            match err {
874                Error::PessimisticLockError {
875                    inner,
876                    success_keys,
877                } if !success_keys.is_empty() => {
878                    let keys = success_keys.into_iter().map(Key::from);
879                    self.pessimistic_lock_rollback(keys, self.timestamp.clone(), for_update_ts)
880                        .await?;
881                    Err(*inner)
882                }
883                _ => Err(err),
884            }
885        } else {
886            // primary key will be set here if needed
887            self.buffer.primary_key_or(&first_key);
888
889            self.start_auto_heartbeat().await;
890
891            for key in keys {
892                self.buffer.lock(key.key());
893            }
894
895            pairs
896        }
897    }
898
899    /// Rollback pessimistic lock
900    async fn pessimistic_lock_rollback(
901        &mut self,
902        keys: impl Iterator<Item = Key>,
903        start_version: Timestamp,
904        for_update_ts: Timestamp,
905    ) -> Result<()> {
906        debug!("rollback pessimistic lock");
907
908        let keys: Vec<_> = keys.into_iter().collect();
909        if keys.is_empty() {
910            return Ok(());
911        }
912
913        let req = new_pessimistic_rollback_request(
914            keys.clone().into_iter(),
915            start_version.clone(),
916            for_update_ts,
917        );
918        let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, req)
919            .resolve_lock(
920                start_version,
921                self.options.retry_options.lock_backoff.clone(),
922                self.keyspace,
923            )
924            .retry_multi_region(self.options.retry_options.region_backoff.clone())
925            .extract_error()
926            .plan();
927        plan.execute().await?;
928
929        for key in keys {
930            self.buffer.unlock(&key);
931        }
932        Ok(())
933    }
934
935    /// Checks if the transaction can perform arbitrary operations.
936    async fn check_allow_operation(&self) -> Result<()> {
937        match self.get_status() {
938            TransactionStatus::ReadOnly | TransactionStatus::Active => Ok(()),
939            TransactionStatus::Committed
940            | TransactionStatus::Rolledback
941            | TransactionStatus::StartedCommit
942            | TransactionStatus::StartedRollback
943            | TransactionStatus::Dropped => Err(Error::OperationAfterCommitError),
944        }
945    }
946
947    fn is_pessimistic(&self) -> bool {
948        matches!(self.options.kind, TransactionKind::Pessimistic(_))
949    }
950
951    async fn start_auto_heartbeat(&mut self) {
952        debug!("starting auto_heartbeat");
953        if !self.options.heartbeat_option.is_auto_heartbeat() || self.is_heartbeat_started {
954            return;
955        }
956        self.is_heartbeat_started = true;
957
958        let status = self.status.clone();
959        let primary_key = self
960            .buffer
961            .get_primary_key()
962            .expect("Primary key should exist");
963        let start_ts = self.timestamp.clone();
964        let region_backoff = self.options.retry_options.region_backoff.clone();
965        let rpc = self.rpc.clone();
966        let heartbeat_interval = match self.options.heartbeat_option {
967            HeartbeatOption::NoHeartbeat => DEFAULT_HEARTBEAT_INTERVAL,
968            HeartbeatOption::FixedTime(heartbeat_interval) => heartbeat_interval,
969        };
970        let start_instant = self.start_instant;
971        let keyspace = self.keyspace;
972
973        let heartbeat_task = async move {
974            loop {
975                tokio::time::sleep(heartbeat_interval).await;
976                {
977                    let status: TransactionStatus = status.load(atomic::Ordering::Acquire).into();
978                    if matches!(
979                        status,
980                        TransactionStatus::Rolledback
981                            | TransactionStatus::Committed
982                            | TransactionStatus::Dropped
983                    ) {
984                        break;
985                    }
986                }
987                let request = new_heart_beat_request(
988                    start_ts.clone(),
989                    primary_key.clone(),
990                    start_instant.elapsed().as_millis() as u64 + MAX_TTL,
991                );
992                let plan = PlanBuilder::new(rpc.clone(), keyspace, request)
993                    .retry_multi_region(region_backoff.clone())
994                    .merge(CollectSingle)
995                    .plan();
996                plan.execute().await?;
997            }
998            Ok::<(), Error>(())
999        };
1000
1001        tokio::spawn(async {
1002            if let Err(err) = heartbeat_task.await {
1003                log::error!("Error: While sending heartbeat. {}", err);
1004            }
1005        });
1006    }
1007
1008    fn get_status(&self) -> TransactionStatus {
1009        self.status.load(atomic::Ordering::Acquire).into()
1010    }
1011
1012    fn set_status(&self, status: TransactionStatus) {
1013        self.status.store(status as u8, atomic::Ordering::Release);
1014    }
1015
1016    fn transit_status<F>(&self, check_status: F, next: TransactionStatus) -> bool
1017    where
1018        F: Fn(TransactionStatus) -> bool,
1019    {
1020        let mut current = self.get_status();
1021        while check_status(current) {
1022            if current == next {
1023                return true;
1024            }
1025            match self.status.compare_exchange_weak(
1026                current as u8,
1027                next as u8,
1028                atomic::Ordering::AcqRel,
1029                atomic::Ordering::Acquire,
1030            ) {
1031                Ok(_) => return true,
1032                Err(x) => current = x.into(),
1033            }
1034        }
1035        false
1036    }
1037}
1038
1039impl<PdC: PdClient> Drop for Transaction<PdC> {
1040    fn drop(&mut self) {
1041        debug!("dropping transaction");
1042        if std::thread::panicking() {
1043            return;
1044        }
1045        if self.get_status() == TransactionStatus::Active {
1046            match self.options.check_level {
1047                CheckLevel::Panic => {
1048                    panic!("Dropping an active transaction. Consider commit or rollback it.")
1049                }
1050                CheckLevel::Warn => {
1051                    warn!("Dropping an active transaction. Consider commit or rollback it.")
1052                }
1053
1054                CheckLevel::None => {}
1055            }
1056        }
1057        self.set_status(TransactionStatus::Dropped);
1058    }
1059}
1060
1061/// The default max TTL of a lock in milliseconds. Also called `ManagedLockTTL` in TiDB.
1062const MAX_TTL: u64 = 20000;
1063/// The default TTL of a lock in milliseconds.
1064const DEFAULT_LOCK_TTL: u64 = 3000;
1065/// The default heartbeat interval
1066const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_millis(MAX_TTL / 2);
1067/// TiKV recommends each RPC packet should be less than around 1MB. We keep KV size of
1068/// each request below 16KB.
1069pub const TXN_COMMIT_BATCH_SIZE: u64 = 16 * 1024;
1070const TTL_FACTOR: f64 = 6000.0;
1071
1072/// Optimistic or pessimistic transaction.
1073#[derive(Clone, PartialEq, Debug)]
1074pub enum TransactionKind {
1075    Optimistic,
1076    /// Argument is the transaction's for_update_ts
1077    Pessimistic(Timestamp),
1078}
1079
1080/// Options for configuring a transaction.
1081///
1082/// `TransactionOptions` has a builder-style API.
1083#[derive(Clone, PartialEq, Debug)]
1084pub struct TransactionOptions {
1085    /// Optimistic or pessimistic (default) transaction.
1086    kind: TransactionKind,
1087    /// Try using 1pc rather than 2pc (default is to always use 2pc).
1088    try_one_pc: bool,
1089    /// Try to use async commit (default is not to).
1090    async_commit: bool,
1091    /// Is the transaction read only? (Default is no).
1092    read_only: bool,
1093    /// How to retry in the event of certain errors.
1094    retry_options: RetryOptions,
1095    /// What to do if the transaction is dropped without an attempt to commit or rollback
1096    check_level: CheckLevel,
1097    #[doc(hidden)]
1098    heartbeat_option: HeartbeatOption,
1099}
1100
1101#[derive(Clone, PartialEq, Eq, Debug)]
1102pub enum HeartbeatOption {
1103    NoHeartbeat,
1104    FixedTime(Duration),
1105}
1106
1107impl Default for TransactionOptions {
1108    fn default() -> TransactionOptions {
1109        Self::new_pessimistic()
1110    }
1111}
1112
1113impl TransactionOptions {
1114    /// Default options for an optimistic transaction.
1115    pub fn new_optimistic() -> TransactionOptions {
1116        TransactionOptions {
1117            kind: TransactionKind::Optimistic,
1118            try_one_pc: false,
1119            async_commit: false,
1120            read_only: false,
1121            retry_options: RetryOptions::default_optimistic(),
1122            check_level: CheckLevel::Panic,
1123            heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL),
1124        }
1125    }
1126
1127    /// Default options for a pessimistic transaction.
1128    pub fn new_pessimistic() -> TransactionOptions {
1129        TransactionOptions {
1130            kind: TransactionKind::Pessimistic(Timestamp::from_version(0)),
1131            try_one_pc: false,
1132            async_commit: false,
1133            read_only: false,
1134            retry_options: RetryOptions::default_pessimistic(),
1135            check_level: CheckLevel::Panic,
1136            heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL),
1137        }
1138    }
1139
1140    /// Try to use async commit.
1141    #[must_use]
1142    pub fn use_async_commit(mut self) -> TransactionOptions {
1143        self.async_commit = true;
1144        self
1145    }
1146
1147    /// Try to use 1pc.
1148    #[must_use]
1149    pub fn try_one_pc(mut self) -> TransactionOptions {
1150        self.try_one_pc = true;
1151        self
1152    }
1153
1154    /// Make the transaction read only.
1155    #[must_use]
1156    pub fn read_only(mut self) -> TransactionOptions {
1157        self.read_only = true;
1158        self
1159    }
1160
1161    /// Don't automatically resolve locks and retry if keys are locked.
1162    #[must_use]
1163    pub fn no_resolve_locks(mut self) -> TransactionOptions {
1164        self.retry_options.lock_backoff = Backoff::no_backoff();
1165        self
1166    }
1167
1168    /// Don't automatically resolve regions with PD if we have outdated region information.
1169    #[must_use]
1170    pub fn no_resolve_regions(mut self) -> TransactionOptions {
1171        self.retry_options.region_backoff = Backoff::no_backoff();
1172        self
1173    }
1174
1175    /// Set RetryOptions.
1176    #[must_use]
1177    pub fn retry_options(mut self, options: RetryOptions) -> TransactionOptions {
1178        self.retry_options = options;
1179        self
1180    }
1181
1182    /// Set the behavior when dropping a transaction without an attempt to commit or rollback it.
1183    #[must_use]
1184    pub fn drop_check(mut self, level: CheckLevel) -> TransactionOptions {
1185        self.check_level = level;
1186        self
1187    }
1188
1189    fn push_for_update_ts(&mut self, for_update_ts: Timestamp) {
1190        match &mut self.kind {
1191            TransactionKind::Optimistic => unreachable!(),
1192            TransactionKind::Pessimistic(old_for_update_ts) => {
1193                self.kind = TransactionKind::Pessimistic(Timestamp::from_version(std::cmp::max(
1194                    old_for_update_ts.version(),
1195                    for_update_ts.version(),
1196                )));
1197            }
1198        }
1199    }
1200
1201    #[must_use]
1202    pub fn heartbeat_option(mut self, heartbeat_option: HeartbeatOption) -> TransactionOptions {
1203        self.heartbeat_option = heartbeat_option;
1204        self
1205    }
1206
1207    // Returns true if these options describe a pessimistic transaction.
1208    pub fn is_pessimistic(&self) -> bool {
1209        match self.kind {
1210            TransactionKind::Pessimistic(_) => true,
1211            TransactionKind::Optimistic => false,
1212        }
1213    }
1214}
1215
1216/// Determines what happens when a transaction is dropped without being rolled back or committed.
1217///
1218/// The default is to panic.
1219#[derive(Clone, Eq, PartialEq, Debug)]
1220pub enum CheckLevel {
1221    /// The program will panic.
1222    ///
1223    /// Note that if the thread is already panicking, then we will not double-panic and abort, but
1224    /// just ignore the issue.
1225    Panic,
1226    /// Log a warning.
1227    Warn,
1228    /// Do nothing
1229    None,
1230}
1231
1232impl HeartbeatOption {
1233    pub fn is_auto_heartbeat(&self) -> bool {
1234        !matches!(self, HeartbeatOption::NoHeartbeat)
1235    }
1236}
1237
1238#[derive(Clone, Eq, PartialEq, Debug)]
1239pub enum Mutation {
1240    Put(Key, Value),
1241    Delete(Key),
1242}
1243
1244impl Mutation {
1245    pub fn key(&self) -> &Key {
1246        match self {
1247            Mutation::Put(key, _) => key,
1248            Mutation::Delete(key) => key,
1249        }
1250    }
1251}
1252
1253/// A struct wrapping the details of two-phase commit protocol (2PC).
1254///
1255/// The two phases are `prewrite` and `commit`.
1256/// Generally, the `prewrite` phase is to send data to all regions and write them.
1257/// The `commit` phase is to mark all written data as successfully committed.
1258///
1259/// The committer implements `prewrite`, `commit` and `rollback` functions.
1260#[allow(clippy::too_many_arguments)]
1261#[derive(new)]
1262struct Committer<PdC: PdClient = PdRpcClient> {
1263    primary_key: Option<Key>,
1264    mutations: Vec<kvrpcpb::Mutation>,
1265    start_version: Timestamp,
1266    rpc: Arc<PdC>,
1267    options: TransactionOptions,
1268    keyspace: Keyspace,
1269    #[new(default)]
1270    undetermined: bool,
1271    write_size: u64,
1272    start_instant: Instant,
1273}
1274
1275impl<PdC: PdClient> Committer<PdC> {
1276    async fn commit(mut self) -> Result<Option<Timestamp>> {
1277        debug!("committing");
1278
1279        let min_commit_ts = self.prewrite().await?;
1280
1281        fail_point!("after-prewrite", |_| {
1282            Err(Error::StringError(
1283                "failpoint: after-prewrite return error".to_owned(),
1284            ))
1285        });
1286
1287        // If we didn't use 1pc, prewrite will set `try_one_pc` to false.
1288        if self.options.try_one_pc {
1289            return Ok(min_commit_ts);
1290        }
1291
1292        let commit_ts = if self.options.async_commit {
1293            // FIXME: min_commit_ts == 0 => fallback to normal 2PC
1294            min_commit_ts.unwrap()
1295        } else {
1296            match self.commit_primary_with_retry().await {
1297                Ok(commit_ts) => commit_ts,
1298                Err(e) => {
1299                    return if self.undetermined {
1300                        Err(Error::UndeterminedError(Box::new(e)))
1301                    } else {
1302                        Err(e)
1303                    };
1304                }
1305            }
1306        };
1307        tokio::spawn(self.commit_secondary(commit_ts.clone()).map(|res| {
1308            if let Err(e) = res {
1309                log::warn!("Failed to commit secondary keys: {}", e);
1310            }
1311        }));
1312        Ok(Some(commit_ts))
1313    }
1314
1315    async fn prewrite(&mut self) -> Result<Option<Timestamp>> {
1316        debug!("prewriting");
1317        let primary_lock = self.primary_key.clone().unwrap();
1318        let elapsed = self.start_instant.elapsed().as_millis() as u64;
1319        let lock_ttl = self.calc_txn_lock_ttl();
1320        let mut request = match &self.options.kind {
1321            TransactionKind::Optimistic => new_prewrite_request(
1322                self.mutations.clone(),
1323                primary_lock,
1324                self.start_version.clone(),
1325                lock_ttl + elapsed,
1326            ),
1327            TransactionKind::Pessimistic(for_update_ts) => new_pessimistic_prewrite_request(
1328                self.mutations.clone(),
1329                primary_lock,
1330                self.start_version.clone(),
1331                lock_ttl + elapsed,
1332                for_update_ts.clone(),
1333            ),
1334        };
1335
1336        request.use_async_commit = self.options.async_commit;
1337        request.try_one_pc = self.options.try_one_pc;
1338        request.secondaries = self
1339            .mutations
1340            .iter()
1341            .filter(|m| self.primary_key.as_ref().unwrap() != m.key.as_ref())
1342            .map(|m| m.key.clone())
1343            .collect();
1344        // FIXME set max_commit_ts and min_commit_ts
1345
1346        let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
1347            .resolve_lock(
1348                self.start_version.clone(),
1349                self.options.retry_options.lock_backoff.clone(),
1350                self.keyspace,
1351            )
1352            .retry_multi_region(self.options.retry_options.region_backoff.clone())
1353            .merge(CollectError)
1354            .extract_error()
1355            .plan();
1356        let response = plan.execute().await?;
1357
1358        if self.options.try_one_pc && response.len() == 1 {
1359            if response[0].one_pc_commit_ts == 0 {
1360                return Err(Error::OnePcFailure);
1361            }
1362
1363            return Ok(Timestamp::try_from_version(response[0].one_pc_commit_ts));
1364        }
1365
1366        self.options.try_one_pc = false;
1367
1368        let min_commit_ts = response
1369            .iter()
1370            .map(|r| {
1371                assert_eq!(r.one_pc_commit_ts, 0);
1372                r.min_commit_ts
1373            })
1374            .max()
1375            .map(Timestamp::from_version);
1376
1377        Ok(min_commit_ts)
1378    }
1379
1380    /// Commits the primary key and returns the commit version
1381    async fn commit_primary(&mut self) -> Result<Timestamp> {
1382        debug!("committing primary");
1383        let primary_key = self.primary_key.clone().into_iter();
1384        let commit_version = self.rpc.clone().get_timestamp().await?;
1385        let req = new_commit_request(
1386            primary_key,
1387            self.start_version.clone(),
1388            commit_version.clone(),
1389        );
1390        let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, req)
1391            .resolve_lock(
1392                self.start_version.clone(),
1393                self.options.retry_options.lock_backoff.clone(),
1394                self.keyspace,
1395            )
1396            .retry_multi_region(self.options.retry_options.region_backoff.clone())
1397            .extract_error()
1398            .plan();
1399        plan.execute()
1400            .inspect_err(|e| {
1401                debug!(
1402                    "commit primary error: {:?}, start_ts: {}",
1403                    e,
1404                    self.start_version.version()
1405                );
1406                // We don't know whether the transaction is committed or not if we fail to receive
1407                // the response. Then, we mark the transaction as undetermined and propagate the
1408                // error to the user.
1409                if let Error::Grpc(_) = e {
1410                    self.undetermined = true;
1411                }
1412            })
1413            .await?;
1414
1415        Ok(commit_version)
1416    }
1417
1418    async fn commit_primary_with_retry(&mut self) -> Result<Timestamp> {
1419        loop {
1420            match self.commit_primary().await {
1421                Ok(commit_version) => return Ok(commit_version),
1422                Err(Error::ExtractedErrors(mut errors)) => match errors.pop() {
1423                    Some(Error::KeyError(key_err)) => {
1424                        if let Some(expired) = key_err.commit_ts_expired {
1425                            // Ref: https://github.com/tikv/client-go/blob/tidb-8.5/txnkv/transaction/commit.go
1426                            info!("2PC commit_ts rejected by TiKV, retry with a newer commit_ts, start_ts: {}",
1427                                self.start_version.version());
1428
1429                            let primary_key = self.primary_key.as_ref().unwrap();
1430                            if primary_key != expired.key.as_ref() {
1431                                error!("2PC commit_ts rejected by TiKV, but the key is not the primary key, start_ts: {}, key: {}, primary: {:?}",
1432                                    self.start_version.version(), HexRepr(&expired.key), primary_key);
1433                                return Err(Error::StringError("2PC commitTS rejected by TiKV, but the key is not the primary key".to_string()));
1434                            }
1435
1436                            // Do not retry for a txn which has a too large min_commit_ts.
1437                            // 3600000 << 18 = 943718400000
1438                            if expired
1439                                .min_commit_ts
1440                                .saturating_sub(expired.attempted_commit_ts)
1441                                > 943718400000
1442                            {
1443                                let msg = format!("2PC min_commit_ts is too large, we got min_commit_ts: {}, and attempted_commit_ts: {}",
1444                                                     expired.min_commit_ts, expired.attempted_commit_ts);
1445                                return Err(Error::StringError(msg));
1446                            }
1447                            continue;
1448                        } else {
1449                            return Err(Error::KeyError(key_err));
1450                        }
1451                    }
1452                    Some(err) => return Err(err),
1453                    None => unreachable!(),
1454                },
1455                Err(err) => return Err(err),
1456            }
1457        }
1458    }
1459
1460    async fn commit_secondary(self, commit_version: Timestamp) -> Result<()> {
1461        debug!("committing secondary");
1462        let start_version = self.start_version.clone();
1463        let mutations_len = self.mutations.len();
1464        let primary_only = mutations_len == 1;
1465        #[cfg(not(feature = "integration-tests"))]
1466        let mutations = self.mutations.into_iter();
1467
1468        #[cfg(feature = "integration-tests")]
1469        let mutations = self.mutations.into_iter().take({
1470            // Truncate mutation to a new length as `percent/100`.
1471            // Return error when truncate to zero.
1472            let fp = || -> Result<usize> {
1473                let mut new_len = mutations_len;
1474                fail_point!("before-commit-secondary", |percent| {
1475                    let percent = percent.unwrap().parse::<usize>().unwrap();
1476                    new_len = mutations_len * percent / 100;
1477                    if new_len == 0 {
1478                        Err(Error::StringError(
1479                            "failpoint: before-commit-secondary return error".to_owned(),
1480                        ))
1481                    } else {
1482                        debug!(
1483                            "failpoint: before-commit-secondary truncate mutation {} -> {}",
1484                            mutations_len, new_len
1485                        );
1486                        Ok(new_len)
1487                    }
1488                });
1489                Ok(new_len)
1490            };
1491            fp()?
1492        });
1493
1494        let req = if self.options.async_commit {
1495            let keys = mutations.map(|m| m.key.into());
1496            new_commit_request(keys, start_version.clone(), commit_version)
1497        } else if primary_only {
1498            return Ok(());
1499        } else {
1500            let primary_key = self.primary_key.unwrap();
1501            let keys = mutations
1502                .map(|m| m.key.into())
1503                .filter(|key| &primary_key != key);
1504            new_commit_request(keys, start_version.clone(), commit_version)
1505        };
1506        let plan = PlanBuilder::new(self.rpc, self.keyspace, req)
1507            .resolve_lock(
1508                start_version,
1509                self.options.retry_options.lock_backoff,
1510                self.keyspace,
1511            )
1512            .retry_multi_region(self.options.retry_options.region_backoff)
1513            .extract_error()
1514            .plan();
1515        plan.execute().await?;
1516        Ok(())
1517    }
1518
1519    async fn rollback(self) -> Result<()> {
1520        debug!("rolling back");
1521        if self.options.kind == TransactionKind::Optimistic && self.mutations.is_empty() {
1522            return Ok(());
1523        }
1524        let keys = self
1525            .mutations
1526            .into_iter()
1527            .map(|mutation| mutation.key.into());
1528        let start_version = self.start_version.clone();
1529        match self.options.kind {
1530            TransactionKind::Optimistic => {
1531                let req = new_batch_rollback_request(keys, start_version.clone());
1532                let plan = PlanBuilder::new(self.rpc, self.keyspace, req)
1533                    .resolve_lock(
1534                        start_version.clone(),
1535                        self.options.retry_options.lock_backoff,
1536                        self.keyspace,
1537                    )
1538                    .retry_multi_region(self.options.retry_options.region_backoff)
1539                    .extract_error()
1540                    .plan();
1541                plan.execute().await?;
1542            }
1543            TransactionKind::Pessimistic(for_update_ts) => {
1544                let req =
1545                    new_pessimistic_rollback_request(keys, start_version.clone(), for_update_ts);
1546                let plan = PlanBuilder::new(self.rpc, self.keyspace, req)
1547                    .resolve_lock(
1548                        start_version.clone(),
1549                        self.options.retry_options.lock_backoff,
1550                        self.keyspace,
1551                    )
1552                    .retry_multi_region(self.options.retry_options.region_backoff)
1553                    .extract_error()
1554                    .plan();
1555                plan.execute().await?;
1556            }
1557        }
1558        Ok(())
1559    }
1560
1561    fn calc_txn_lock_ttl(&mut self) -> u64 {
1562        let mut lock_ttl = DEFAULT_LOCK_TTL;
1563        if self.write_size > TXN_COMMIT_BATCH_SIZE {
1564            let size_mb = self.write_size as f64 / 1024.0 / 1024.0;
1565            lock_ttl = (TTL_FACTOR * size_mb.sqrt()) as u64;
1566            lock_ttl = lock_ttl.clamp(DEFAULT_LOCK_TTL, MAX_TTL);
1567        }
1568        lock_ttl
1569    }
1570}
1571
1572#[derive(PartialEq, Eq, Clone, Copy)]
1573#[repr(u8)]
1574enum TransactionStatus {
1575    /// The transaction is read-only [`Snapshot`](super::Snapshot), no need to commit or rollback or panic on drop.
1576    ReadOnly = 0,
1577    /// The transaction have not been committed or rolled back.
1578    Active = 1,
1579    /// The transaction has committed.
1580    Committed = 2,
1581    /// The transaction has tried to commit. Only `commit` is allowed.
1582    StartedCommit = 3,
1583    /// The transaction has rolled back.
1584    Rolledback = 4,
1585    /// The transaction has tried to rollback. Only `rollback` is allowed.
1586    StartedRollback = 5,
1587    /// The transaction has been dropped.
1588    Dropped = 6,
1589}
1590
1591impl From<u8> for TransactionStatus {
1592    fn from(num: u8) -> Self {
1593        match num {
1594            0 => TransactionStatus::ReadOnly,
1595            1 => TransactionStatus::Active,
1596            2 => TransactionStatus::Committed,
1597            3 => TransactionStatus::StartedCommit,
1598            4 => TransactionStatus::Rolledback,
1599            5 => TransactionStatus::StartedRollback,
1600            6 => TransactionStatus::Dropped,
1601            _ => panic!("Unknown transaction status {}", num),
1602        }
1603    }
1604}
1605
1606#[cfg(test)]
1607mod tests {
1608    use std::any::Any;
1609    use std::io;
1610    use std::sync::atomic::AtomicUsize;
1611    use std::sync::atomic::Ordering;
1612    use std::sync::Arc;
1613    use std::time::Duration;
1614
1615    use fail::FailScenario;
1616
1617    use crate::mock::MockKvClient;
1618    use crate::mock::MockPdClient;
1619    use crate::proto::kvrpcpb;
1620    use crate::proto::pdpb::Timestamp;
1621    use crate::request::Keyspace;
1622    use crate::transaction::HeartbeatOption;
1623    use crate::Transaction;
1624    use crate::TransactionOptions;
1625
1626    #[rstest::rstest]
1627    #[case(Keyspace::Disable)]
1628    #[case(Keyspace::Enable { keyspace_id: 0 })]
1629    #[tokio::test]
1630    async fn test_optimistic_heartbeat(#[case] keyspace: Keyspace) -> Result<(), io::Error> {
1631        let scenario = FailScenario::setup();
1632        fail::cfg("after-prewrite", "sleep(1500)").unwrap();
1633        let heartbeats = Arc::new(AtomicUsize::new(0));
1634        let heartbeats_cloned = heartbeats.clone();
1635        let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
1636            move |req: &dyn Any| {
1637                if req.downcast_ref::<kvrpcpb::TxnHeartBeatRequest>().is_some() {
1638                    heartbeats_cloned.fetch_add(1, Ordering::SeqCst);
1639                    Ok(Box::<kvrpcpb::TxnHeartBeatResponse>::default() as Box<dyn Any>)
1640                } else if req.downcast_ref::<kvrpcpb::PrewriteRequest>().is_some() {
1641                    Ok(Box::<kvrpcpb::PrewriteResponse>::default() as Box<dyn Any>)
1642                } else {
1643                    Ok(Box::<kvrpcpb::CommitResponse>::default() as Box<dyn Any>)
1644                }
1645            },
1646        )));
1647        let key1 = "key1".to_owned();
1648        let mut heartbeat_txn = Transaction::new(
1649            Timestamp::default(),
1650            pd_client,
1651            TransactionOptions::new_optimistic()
1652                .heartbeat_option(HeartbeatOption::FixedTime(Duration::from_secs(1))),
1653            keyspace,
1654        );
1655        heartbeat_txn.put(key1.clone(), "foo").await.unwrap();
1656        let heartbeat_txn_handle = tokio::task::spawn_blocking(move || {
1657            assert!(futures::executor::block_on(heartbeat_txn.commit()).is_ok())
1658        });
1659        assert_eq!(heartbeats.load(Ordering::SeqCst), 0);
1660        heartbeat_txn_handle.await.unwrap();
1661        assert_eq!(heartbeats.load(Ordering::SeqCst), 1);
1662        scenario.teardown();
1663        Ok(())
1664    }
1665
1666    #[rstest::rstest]
1667    #[case(Keyspace::Disable)]
1668    #[case(Keyspace::Enable { keyspace_id: 0 })]
1669    #[tokio::test]
1670    async fn test_pessimistic_heartbeat(#[case] keyspace: Keyspace) -> Result<(), io::Error> {
1671        let heartbeats = Arc::new(AtomicUsize::new(0));
1672        let heartbeats_cloned = heartbeats.clone();
1673        let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
1674            move |req: &dyn Any| {
1675                if req.downcast_ref::<kvrpcpb::TxnHeartBeatRequest>().is_some() {
1676                    heartbeats_cloned.fetch_add(1, Ordering::SeqCst);
1677                    Ok(Box::<kvrpcpb::TxnHeartBeatResponse>::default() as Box<dyn Any>)
1678                } else if req.downcast_ref::<kvrpcpb::PrewriteRequest>().is_some() {
1679                    Ok(Box::<kvrpcpb::PrewriteResponse>::default() as Box<dyn Any>)
1680                } else if req
1681                    .downcast_ref::<kvrpcpb::PessimisticLockRequest>()
1682                    .is_some()
1683                {
1684                    Ok(Box::<kvrpcpb::PessimisticLockResponse>::default() as Box<dyn Any>)
1685                } else {
1686                    Ok(Box::<kvrpcpb::CommitResponse>::default() as Box<dyn Any>)
1687                }
1688            },
1689        )));
1690        let key1 = "key1".to_owned();
1691        let mut heartbeat_txn = Transaction::new(
1692            Timestamp::default(),
1693            pd_client,
1694            TransactionOptions::new_pessimistic()
1695                .heartbeat_option(HeartbeatOption::FixedTime(Duration::from_secs(1))),
1696            keyspace,
1697        );
1698        heartbeat_txn.put(key1.clone(), "foo").await.unwrap();
1699        assert_eq!(heartbeats.load(Ordering::SeqCst), 0);
1700        tokio::time::sleep(tokio::time::Duration::from_millis(1500)).await;
1701        assert_eq!(heartbeats.load(Ordering::SeqCst), 1);
1702        let heartbeat_txn_handle = tokio::spawn(async move {
1703            assert!(heartbeat_txn.commit().await.is_ok());
1704        });
1705        heartbeat_txn_handle.await.unwrap();
1706        Ok(())
1707    }
1708}