Skip to main content

tikv_client/transaction/
requests.rs

1// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
2
3use std::cmp;
4use std::iter;
5use std::sync::Arc;
6
7use either::Either;
8use futures::stream::BoxStream;
9use futures::stream::{self};
10use futures::StreamExt;
11
12use super::transaction::TXN_COMMIT_BATCH_SIZE;
13use crate::collect_single;
14use crate::common::Error::PessimisticLockError;
15use crate::pd::PdClient;
16use crate::proto::kvrpcpb::Action;
17use crate::proto::kvrpcpb::LockInfo;
18use crate::proto::kvrpcpb::TxnHeartBeatResponse;
19use crate::proto::kvrpcpb::TxnInfo;
20use crate::proto::kvrpcpb::{self};
21use crate::proto::pdpb::Timestamp;
22use crate::region::RegionWithLeader;
23use crate::request::Collect;
24use crate::request::CollectSingle;
25use crate::request::CollectWithShard;
26use crate::request::DefaultProcessor;
27use crate::request::HasNextBatch;
28use crate::request::KvRequest;
29use crate::request::Merge;
30use crate::request::NextBatch;
31use crate::request::Process;
32use crate::request::RangeRequest;
33use crate::request::ResponseWithShard;
34use crate::request::Shardable;
35use crate::request::SingleKey;
36use crate::request::{Batchable, StoreRequest};
37use crate::reversible_range_request;
38use crate::shardable_key;
39use crate::shardable_keys;
40use crate::shardable_range;
41use crate::store::RegionStore;
42use crate::store::Request;
43use crate::store::Store;
44use crate::store::{region_stream_for_keys, region_stream_for_range};
45use crate::timestamp::TimestampExt;
46use crate::transaction::requests::kvrpcpb::prewrite_request::PessimisticAction;
47use crate::transaction::HasLocks;
48use crate::util::iter::FlatMapOkIterExt;
49use crate::KvPair;
50use crate::Result;
51use crate::Value;
52
53// implement HasLocks for a response type that has a `pairs` field,
54// where locks can be extracted from both the `pairs` and `error` fields
55macro_rules! pair_locks {
56    ($response_type:ty) => {
57        impl HasLocks for $response_type {
58            fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
59                if self.pairs.is_empty() {
60                    self.error
61                        .as_mut()
62                        .and_then(|error| error.locked.take())
63                        .into_iter()
64                        .collect()
65                } else {
66                    self.pairs
67                        .iter_mut()
68                        .filter_map(|pair| {
69                            pair.error.as_mut().and_then(|error| error.locked.take())
70                        })
71                        .collect()
72                }
73            }
74        }
75    };
76}
77
78// implement HasLocks for a response type that does not have a `pairs` field,
79// where locks are only extracted from the `error` field
80macro_rules! error_locks {
81    ($response_type:ty) => {
82        impl HasLocks for $response_type {
83            fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
84                self.error
85                    .as_mut()
86                    .and_then(|error| error.locked.take())
87                    .into_iter()
88                    .collect()
89            }
90        }
91    };
92}
93
94pub fn new_get_request(key: Vec<u8>, timestamp: u64) -> kvrpcpb::GetRequest {
95    let mut req = kvrpcpb::GetRequest::default();
96    req.key = key;
97    req.version = timestamp;
98    req
99}
100
101impl KvRequest for kvrpcpb::GetRequest {
102    type Response = kvrpcpb::GetResponse;
103}
104
105shardable_key!(kvrpcpb::GetRequest);
106collect_single!(kvrpcpb::GetResponse);
107impl SingleKey for kvrpcpb::GetRequest {
108    fn key(&self) -> &Vec<u8> {
109        &self.key
110    }
111}
112
113impl Process<kvrpcpb::GetResponse> for DefaultProcessor {
114    type Out = Option<Value>;
115
116    fn process(&self, input: Result<kvrpcpb::GetResponse>) -> Result<Self::Out> {
117        let input = input?;
118        Ok(if input.not_found {
119            None
120        } else {
121            Some(input.value)
122        })
123    }
124}
125
126pub fn new_batch_get_request(keys: Vec<Vec<u8>>, timestamp: u64) -> kvrpcpb::BatchGetRequest {
127    let mut req = kvrpcpb::BatchGetRequest::default();
128    req.keys = keys;
129    req.version = timestamp;
130    req
131}
132
133impl KvRequest for kvrpcpb::BatchGetRequest {
134    type Response = kvrpcpb::BatchGetResponse;
135}
136
137shardable_keys!(kvrpcpb::BatchGetRequest);
138
139impl Merge<kvrpcpb::BatchGetResponse> for Collect {
140    type Out = Vec<KvPair>;
141
142    fn merge(&self, input: Vec<Result<kvrpcpb::BatchGetResponse>>) -> Result<Self::Out> {
143        input
144            .into_iter()
145            .flat_map_ok(|resp| resp.pairs.into_iter().map(Into::into))
146            .collect()
147    }
148}
149
150pub fn new_scan_request(
151    start_key: Vec<u8>,
152    end_key: Vec<u8>,
153    timestamp: u64,
154    limit: u32,
155    key_only: bool,
156    reverse: bool,
157) -> kvrpcpb::ScanRequest {
158    let mut req = kvrpcpb::ScanRequest::default();
159    if !reverse {
160        req.start_key = start_key;
161        req.end_key = end_key;
162    } else {
163        req.start_key = end_key;
164        req.end_key = start_key;
165    }
166    req.limit = limit;
167    req.key_only = key_only;
168    req.version = timestamp;
169    req.reverse = reverse;
170    req
171}
172
173impl KvRequest for kvrpcpb::ScanRequest {
174    type Response = kvrpcpb::ScanResponse;
175}
176
177reversible_range_request!(kvrpcpb::ScanRequest);
178shardable_range!(kvrpcpb::ScanRequest);
179
180impl Merge<kvrpcpb::ScanResponse> for Collect {
181    type Out = Vec<KvPair>;
182
183    fn merge(&self, input: Vec<Result<kvrpcpb::ScanResponse>>) -> Result<Self::Out> {
184        input
185            .into_iter()
186            .flat_map_ok(|resp| resp.pairs.into_iter().map(Into::into))
187            .collect()
188    }
189}
190
191pub fn new_resolve_lock_request(
192    start_version: u64,
193    commit_version: u64,
194    is_txn_file: bool,
195) -> kvrpcpb::ResolveLockRequest {
196    let mut req = kvrpcpb::ResolveLockRequest::default();
197    req.start_version = start_version;
198    req.commit_version = commit_version;
199    req.is_txn_file = is_txn_file;
200    req
201}
202
203pub fn new_batch_resolve_lock_request(txn_infos: Vec<TxnInfo>) -> kvrpcpb::ResolveLockRequest {
204    let mut req = kvrpcpb::ResolveLockRequest::default();
205    req.txn_infos = txn_infos;
206    req
207}
208
209// Note: ResolveLockRequest is a special one: it can be sent to a specified
210// region without keys. So it's not Shardable. And we don't automatically retry
211// on its region errors (in the Plan level). The region error must be manually
212// handled (in the upper level).
213impl KvRequest for kvrpcpb::ResolveLockRequest {
214    type Response = kvrpcpb::ResolveLockResponse;
215}
216
217pub fn new_prewrite_request(
218    mutations: Vec<kvrpcpb::Mutation>,
219    primary_lock: Vec<u8>,
220    start_version: u64,
221    lock_ttl: u64,
222) -> kvrpcpb::PrewriteRequest {
223    let mut req = kvrpcpb::PrewriteRequest::default();
224    req.mutations = mutations;
225    req.primary_lock = primary_lock;
226    req.start_version = start_version;
227    req.lock_ttl = lock_ttl;
228    // FIXME: Lite resolve lock is currently disabled
229    req.txn_size = u64::MAX;
230
231    req
232}
233
234pub fn new_pessimistic_prewrite_request(
235    mutations: Vec<kvrpcpb::Mutation>,
236    primary_lock: Vec<u8>,
237    start_version: u64,
238    lock_ttl: u64,
239    for_update_ts: u64,
240) -> kvrpcpb::PrewriteRequest {
241    let len = mutations.len();
242    let mut req = new_prewrite_request(mutations, primary_lock, start_version, lock_ttl);
243    req.for_update_ts = for_update_ts;
244    req.pessimistic_actions = iter::repeat(PessimisticAction::DoPessimisticCheck.into())
245        .take(len)
246        .collect();
247    req
248}
249
250impl KvRequest for kvrpcpb::PrewriteRequest {
251    type Response = kvrpcpb::PrewriteResponse;
252}
253
254impl Shardable for kvrpcpb::PrewriteRequest {
255    type Shard = Vec<kvrpcpb::Mutation>;
256
257    fn shards(
258        &self,
259        pd_client: &Arc<impl PdClient>,
260    ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
261        let mut mutations = self.mutations.clone();
262        mutations.sort_by(|a, b| a.key.cmp(&b.key));
263
264        region_stream_for_keys(mutations.into_iter(), pd_client.clone())
265            .flat_map(|result| match result {
266                Ok((mutations, region)) => stream::iter(kvrpcpb::PrewriteRequest::batches(
267                    mutations,
268                    TXN_COMMIT_BATCH_SIZE,
269                ))
270                .map(move |batch| Ok((batch, region.clone())))
271                .boxed(),
272                Err(e) => stream::iter(Err(e)).boxed(),
273            })
274            .boxed()
275    }
276
277    fn apply_shard(&mut self, shard: Self::Shard) {
278        // Only need to set secondary keys if we're sending the primary key.
279        if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) {
280            self.secondaries = vec![];
281        }
282
283        // Only if there is only one request to send
284        if self.try_one_pc && shard.len() != self.secondaries.len() + 1 {
285            self.try_one_pc = false;
286        }
287
288        self.mutations = shard;
289    }
290
291    fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
292        self.set_leader(&store.region_with_leader)
293    }
294}
295
296impl Batchable for kvrpcpb::PrewriteRequest {
297    type Item = kvrpcpb::Mutation;
298
299    fn item_size(item: &Self::Item) -> u64 {
300        let mut size = item.key.len() as u64;
301        size += item.value.len() as u64;
302        size
303    }
304}
305
306pub fn new_commit_request(
307    keys: Vec<Vec<u8>>,
308    start_version: u64,
309    commit_version: u64,
310) -> kvrpcpb::CommitRequest {
311    let mut req = kvrpcpb::CommitRequest::default();
312    req.keys = keys;
313    req.start_version = start_version;
314    req.commit_version = commit_version;
315
316    req
317}
318
319impl KvRequest for kvrpcpb::CommitRequest {
320    type Response = kvrpcpb::CommitResponse;
321}
322
323impl Shardable for kvrpcpb::CommitRequest {
324    type Shard = Vec<Vec<u8>>;
325
326    fn shards(
327        &self,
328        pd_client: &Arc<impl PdClient>,
329    ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
330        let mut keys = self.keys.clone();
331        keys.sort();
332
333        region_stream_for_keys(keys.into_iter(), pd_client.clone())
334            .flat_map(|result| match result {
335                Ok((keys, region)) => {
336                    stream::iter(kvrpcpb::CommitRequest::batches(keys, TXN_COMMIT_BATCH_SIZE))
337                        .map(move |batch| Ok((batch, region.clone())))
338                        .boxed()
339                }
340                Err(e) => stream::iter(Err(e)).boxed(),
341            })
342            .boxed()
343    }
344
345    fn apply_shard(&mut self, shard: Self::Shard) {
346        self.keys = shard.into_iter().map(Into::into).collect();
347    }
348
349    fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
350        self.set_leader(&store.region_with_leader)
351    }
352}
353
354impl Batchable for kvrpcpb::CommitRequest {
355    type Item = Vec<u8>;
356
357    fn item_size(item: &Self::Item) -> u64 {
358        item.len() as u64
359    }
360}
361
362pub fn new_batch_rollback_request(
363    keys: Vec<Vec<u8>>,
364    start_version: u64,
365) -> kvrpcpb::BatchRollbackRequest {
366    let mut req = kvrpcpb::BatchRollbackRequest::default();
367    req.keys = keys;
368    req.start_version = start_version;
369
370    req
371}
372
373impl KvRequest for kvrpcpb::BatchRollbackRequest {
374    type Response = kvrpcpb::BatchRollbackResponse;
375}
376
377shardable_keys!(kvrpcpb::BatchRollbackRequest);
378
379pub fn new_pessimistic_rollback_request(
380    keys: Vec<Vec<u8>>,
381    start_version: u64,
382    for_update_ts: u64,
383) -> kvrpcpb::PessimisticRollbackRequest {
384    let mut req = kvrpcpb::PessimisticRollbackRequest::default();
385    req.keys = keys;
386    req.start_version = start_version;
387    req.for_update_ts = for_update_ts;
388
389    req
390}
391
392impl KvRequest for kvrpcpb::PessimisticRollbackRequest {
393    type Response = kvrpcpb::PessimisticRollbackResponse;
394}
395
396shardable_keys!(kvrpcpb::PessimisticRollbackRequest);
397
398pub fn new_pessimistic_lock_request(
399    mutations: Vec<kvrpcpb::Mutation>,
400    primary_lock: Vec<u8>,
401    start_version: u64,
402    lock_ttl: u64,
403    for_update_ts: u64,
404    need_value: bool,
405) -> kvrpcpb::PessimisticLockRequest {
406    let mut req = kvrpcpb::PessimisticLockRequest::default();
407    req.mutations = mutations;
408    req.primary_lock = primary_lock;
409    req.start_version = start_version;
410    req.lock_ttl = lock_ttl;
411    req.for_update_ts = for_update_ts;
412    // FIXME: make them configurable
413    req.is_first_lock = false;
414    req.wait_timeout = 0;
415    req.return_values = need_value;
416    // FIXME: support large transaction
417    req.min_commit_ts = 0;
418
419    req
420}
421
422impl KvRequest for kvrpcpb::PessimisticLockRequest {
423    type Response = kvrpcpb::PessimisticLockResponse;
424}
425
426impl Shardable for kvrpcpb::PessimisticLockRequest {
427    type Shard = Vec<kvrpcpb::Mutation>;
428
429    fn shards(
430        &self,
431        pd_client: &Arc<impl PdClient>,
432    ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
433        let mut mutations = self.mutations.clone();
434        mutations.sort_by(|a, b| a.key.cmp(&b.key));
435        region_stream_for_keys(mutations.into_iter(), pd_client.clone())
436    }
437
438    fn apply_shard(&mut self, shard: Self::Shard) {
439        self.mutations = shard;
440    }
441
442    fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
443        self.set_leader(&store.region_with_leader)
444    }
445}
446
447// PessimisticLockResponse returns values that preserves the order with keys in request, thus the
448// kvpair result should be produced by zipping the keys in request and the values in respponse.
449impl Merge<ResponseWithShard<kvrpcpb::PessimisticLockResponse, Vec<kvrpcpb::Mutation>>>
450    for CollectWithShard
451{
452    type Out = Vec<KvPair>;
453
454    fn merge(
455        &self,
456        input: Vec<
457            Result<ResponseWithShard<kvrpcpb::PessimisticLockResponse, Vec<kvrpcpb::Mutation>>>,
458        >,
459    ) -> Result<Self::Out> {
460        if input.iter().any(Result::is_err) {
461            let (success, mut errors): (Vec<_>, Vec<_>) =
462                input.into_iter().partition(Result::is_ok);
463            let first_err = errors.pop().unwrap();
464            let success_keys = success
465                .into_iter()
466                .map(Result::unwrap)
467                .flat_map(|ResponseWithShard(_resp, mutations)| {
468                    mutations.into_iter().map(|m| m.key)
469                })
470                .collect();
471            Err(PessimisticLockError {
472                inner: Box::new(first_err.unwrap_err()),
473                success_keys,
474            })
475        } else {
476            Ok(input
477                .into_iter()
478                .map(Result::unwrap)
479                .flat_map(|ResponseWithShard(resp, mutations)| {
480                    let values: Vec<Vec<u8>> = resp.values;
481                    let values_len = values.len();
482                    let not_founds = resp.not_founds;
483                    let kvpairs = mutations
484                        .into_iter()
485                        .map(|m| m.key)
486                        .zip(values)
487                        .map(KvPair::from);
488                    assert_eq!(kvpairs.len(), values_len);
489                    if not_founds.is_empty() {
490                        // Legacy TiKV does not distinguish not existing key and existing key
491                        // that with empty value. We assume that key does not exist if value
492                        // is empty.
493                        Either::Left(kvpairs.filter(|kvpair| !kvpair.value().is_empty()))
494                    } else {
495                        assert_eq!(kvpairs.len(), not_founds.len());
496                        Either::Right(kvpairs.zip(not_founds).filter_map(|(kvpair, not_found)| {
497                            if not_found {
498                                None
499                            } else {
500                                Some(kvpair)
501                            }
502                        }))
503                    }
504                })
505                .collect())
506        }
507    }
508}
509
510pub fn new_scan_lock_request(
511    start_key: Vec<u8>,
512    end_key: Vec<u8>,
513    safepoint: u64,
514    limit: u32,
515) -> kvrpcpb::ScanLockRequest {
516    let mut req = kvrpcpb::ScanLockRequest::default();
517    req.start_key = start_key;
518    req.end_key = end_key;
519    req.max_version = safepoint;
520    req.limit = limit;
521    req
522}
523
524impl KvRequest for kvrpcpb::ScanLockRequest {
525    type Response = kvrpcpb::ScanLockResponse;
526}
527
528impl Shardable for kvrpcpb::ScanLockRequest {
529    type Shard = (Vec<u8>, Vec<u8>);
530
531    fn shards(
532        &self,
533        pd_client: &Arc<impl PdClient>,
534    ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
535        region_stream_for_range(
536            (self.start_key.clone(), self.end_key.clone()),
537            pd_client.clone(),
538        )
539    }
540
541    fn apply_shard(&mut self, shard: Self::Shard) {
542        self.start_key = shard.0;
543    }
544
545    fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
546        self.set_leader(&store.region_with_leader)
547    }
548}
549
550impl HasNextBatch for kvrpcpb::ScanLockResponse {
551    fn has_next_batch(&self) -> Option<(Vec<u8>, Vec<u8>)> {
552        self.locks.last().map(|lock| {
553            // TODO: if last key is larger or equal than ScanLockRequest.end_key, return None.
554            let mut start_key: Vec<u8> = lock.key.clone();
555            start_key.push(0);
556            (start_key, vec![])
557        })
558    }
559}
560
561impl NextBatch for kvrpcpb::ScanLockRequest {
562    fn next_batch(&mut self, range: (Vec<u8>, Vec<u8>)) {
563        self.start_key = range.0;
564    }
565}
566
567impl Merge<kvrpcpb::ScanLockResponse> for Collect {
568    type Out = Vec<kvrpcpb::LockInfo>;
569
570    fn merge(&self, input: Vec<Result<kvrpcpb::ScanLockResponse>>) -> Result<Self::Out> {
571        input
572            .into_iter()
573            .flat_map_ok(|mut resp| resp.take_locks().into_iter().map(Into::into))
574            .collect()
575    }
576}
577
578pub fn new_heart_beat_request(
579    start_ts: u64,
580    primary_lock: Vec<u8>,
581    ttl: u64,
582) -> kvrpcpb::TxnHeartBeatRequest {
583    let mut req = kvrpcpb::TxnHeartBeatRequest::default();
584    req.start_version = start_ts;
585    req.primary_lock = primary_lock;
586    req.advise_lock_ttl = ttl;
587    req
588}
589
590impl KvRequest for kvrpcpb::TxnHeartBeatRequest {
591    type Response = kvrpcpb::TxnHeartBeatResponse;
592}
593
594impl Shardable for kvrpcpb::TxnHeartBeatRequest {
595    type Shard = Vec<Vec<u8>>;
596
597    fn shards(
598        &self,
599        pd_client: &Arc<impl PdClient>,
600    ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
601        region_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone())
602    }
603
604    fn apply_shard(&mut self, mut shard: Self::Shard) {
605        assert!(shard.len() == 1);
606        self.primary_lock = shard.pop().unwrap();
607    }
608
609    fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
610        self.set_leader(&store.region_with_leader)
611    }
612}
613
614collect_single!(TxnHeartBeatResponse);
615
616impl SingleKey for kvrpcpb::TxnHeartBeatRequest {
617    fn key(&self) -> &Vec<u8> {
618        &self.primary_lock
619    }
620}
621
622impl Process<kvrpcpb::TxnHeartBeatResponse> for DefaultProcessor {
623    type Out = u64;
624
625    fn process(&self, input: Result<kvrpcpb::TxnHeartBeatResponse>) -> Result<Self::Out> {
626        Ok(input?.lock_ttl)
627    }
628}
629
630#[allow(clippy::too_many_arguments)]
631pub fn new_check_txn_status_request(
632    primary_key: Vec<u8>,
633    lock_ts: u64,
634    caller_start_ts: u64,
635    current_ts: u64,
636    rollback_if_not_exist: bool,
637    force_sync_commit: bool,
638    resolving_pessimistic_lock: bool,
639    is_txn_file: bool,
640) -> kvrpcpb::CheckTxnStatusRequest {
641    let mut req = kvrpcpb::CheckTxnStatusRequest::default();
642    req.primary_key = primary_key;
643    req.lock_ts = lock_ts;
644    req.caller_start_ts = caller_start_ts;
645    req.current_ts = current_ts;
646    req.rollback_if_not_exist = rollback_if_not_exist;
647    req.force_sync_commit = force_sync_commit;
648    req.resolving_pessimistic_lock = resolving_pessimistic_lock;
649    req.verify_is_primary = true;
650    req.is_txn_file = is_txn_file;
651    req
652}
653
654impl KvRequest for kvrpcpb::CheckTxnStatusRequest {
655    type Response = kvrpcpb::CheckTxnStatusResponse;
656}
657
658impl Shardable for kvrpcpb::CheckTxnStatusRequest {
659    type Shard = Vec<Vec<u8>>;
660
661    fn shards(
662        &self,
663        pd_client: &Arc<impl PdClient>,
664    ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
665        region_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone())
666    }
667
668    fn apply_shard(&mut self, mut shard: Self::Shard) {
669        assert!(shard.len() == 1);
670        self.primary_key = shard.pop().unwrap();
671    }
672
673    fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
674        self.set_leader(&store.region_with_leader)
675    }
676}
677
678impl SingleKey for kvrpcpb::CheckTxnStatusRequest {
679    fn key(&self) -> &Vec<u8> {
680        &self.primary_key
681    }
682}
683
684collect_single!(kvrpcpb::CheckTxnStatusResponse);
685
686impl Process<kvrpcpb::CheckTxnStatusResponse> for DefaultProcessor {
687    type Out = TransactionStatus;
688
689    fn process(&self, input: Result<kvrpcpb::CheckTxnStatusResponse>) -> Result<Self::Out> {
690        Ok(input?.into())
691    }
692}
693
694#[derive(Debug, Clone)]
695pub struct TransactionStatus {
696    pub kind: TransactionStatusKind,
697    pub action: kvrpcpb::Action,
698    pub is_expired: bool, // Available only when kind is Locked.
699}
700
701impl From<kvrpcpb::CheckTxnStatusResponse> for TransactionStatus {
702    fn from(mut resp: kvrpcpb::CheckTxnStatusResponse) -> TransactionStatus {
703        TransactionStatus {
704            action: Action::try_from(resp.action).unwrap(),
705            kind: (resp.commit_version, resp.lock_ttl, resp.lock_info.take()).into(),
706            is_expired: false,
707        }
708    }
709}
710
711#[derive(Debug, Clone)]
712pub enum TransactionStatusKind {
713    Committed(Timestamp),
714    RolledBack,
715    Locked(u64, kvrpcpb::LockInfo), // None of ttl means expired.
716}
717
718impl TransactionStatus {
719    pub fn check_ttl(&mut self, current: Timestamp) {
720        if let TransactionStatusKind::Locked(ref ttl, ref lock_info) = self.kind {
721            if current.physical - Timestamp::from_version(lock_info.lock_version).physical
722                >= *ttl as i64
723            {
724                self.is_expired = true
725            }
726        }
727    }
728
729    // is_cacheable checks whether the transaction status is certain.
730    // If transaction is already committed, the result could be cached.
731    // Otherwise:
732    //   If l.LockType is pessimistic lock type:
733    //       - if its primary lock is pessimistic too, the check txn status result should not be cached.
734    //       - if its primary lock is prewrite lock type, the check txn status could be cached.
735    //   If l.lockType is prewrite lock type:
736    //       - always cache the check txn status result.
737    // For prewrite locks, their primary keys should ALWAYS be the correct one and will NOT change.
738    pub fn is_cacheable(&self) -> bool {
739        match &self.kind {
740            TransactionStatusKind::RolledBack | TransactionStatusKind::Committed(..) => true,
741            TransactionStatusKind::Locked(..) if self.is_expired => matches!(
742                self.action,
743                kvrpcpb::Action::NoAction
744                    | kvrpcpb::Action::LockNotExistRollback
745                    | kvrpcpb::Action::TtlExpireRollback
746            ),
747            _ => false,
748        }
749    }
750}
751
752impl From<(u64, u64, Option<kvrpcpb::LockInfo>)> for TransactionStatusKind {
753    fn from((ts, ttl, info): (u64, u64, Option<kvrpcpb::LockInfo>)) -> TransactionStatusKind {
754        match (ts, ttl, info) {
755            (0, 0, None) => TransactionStatusKind::RolledBack,
756            (ts, 0, None) => TransactionStatusKind::Committed(Timestamp::from_version(ts)),
757            (0, ttl, Some(info)) => TransactionStatusKind::Locked(ttl, info),
758            _ => unreachable!(),
759        }
760    }
761}
762
763pub fn new_check_secondary_locks_request(
764    keys: Vec<Vec<u8>>,
765    start_version: u64,
766) -> kvrpcpb::CheckSecondaryLocksRequest {
767    let mut req = kvrpcpb::CheckSecondaryLocksRequest::default();
768    req.keys = keys;
769    req.start_version = start_version;
770    req
771}
772
773impl KvRequest for kvrpcpb::CheckSecondaryLocksRequest {
774    type Response = kvrpcpb::CheckSecondaryLocksResponse;
775}
776
777shardable_keys!(kvrpcpb::CheckSecondaryLocksRequest);
778
779impl Merge<kvrpcpb::CheckSecondaryLocksResponse> for Collect {
780    type Out = SecondaryLocksStatus;
781
782    fn merge(&self, input: Vec<Result<kvrpcpb::CheckSecondaryLocksResponse>>) -> Result<Self::Out> {
783        let mut out = SecondaryLocksStatus {
784            commit_ts: None,
785            min_commit_ts: 0,
786            fallback_2pc: false,
787        };
788        for resp in input {
789            let resp = resp?;
790            for lock in resp.locks.into_iter() {
791                if !lock.use_async_commit {
792                    out.fallback_2pc = true;
793                    return Ok(out);
794                }
795                out.min_commit_ts = cmp::max(out.min_commit_ts, lock.min_commit_ts);
796            }
797            out.commit_ts = match (
798                out.commit_ts.take(),
799                Timestamp::try_from_version(resp.commit_ts),
800            ) {
801                (Some(a), Some(b)) => {
802                    assert_eq!(a, b);
803                    Some(a)
804                }
805                (Some(a), None) => Some(a),
806                (None, Some(b)) => Some(b),
807                (None, None) => None,
808            };
809        }
810        Ok(out)
811    }
812}
813
814pub struct SecondaryLocksStatus {
815    pub commit_ts: Option<Timestamp>,
816    pub min_commit_ts: u64,
817    pub fallback_2pc: bool,
818}
819
820pair_locks!(kvrpcpb::BatchGetResponse);
821pair_locks!(kvrpcpb::ScanResponse);
822error_locks!(kvrpcpb::GetResponse);
823error_locks!(kvrpcpb::ResolveLockResponse);
824error_locks!(kvrpcpb::CommitResponse);
825error_locks!(kvrpcpb::BatchRollbackResponse);
826error_locks!(kvrpcpb::TxnHeartBeatResponse);
827error_locks!(kvrpcpb::CheckTxnStatusResponse);
828error_locks!(kvrpcpb::CheckSecondaryLocksResponse);
829
830impl HasLocks for kvrpcpb::ScanLockResponse {
831    fn take_locks(&mut self) -> Vec<LockInfo> {
832        std::mem::take(&mut self.locks)
833    }
834}
835
836impl HasLocks for kvrpcpb::PessimisticRollbackResponse {
837    fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
838        self.errors
839            .iter_mut()
840            .filter_map(|error| error.locked.take())
841            .collect()
842    }
843}
844
845impl HasLocks for kvrpcpb::PessimisticLockResponse {
846    fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
847        self.errors
848            .iter_mut()
849            .filter_map(|error| error.locked.take())
850            .collect()
851    }
852}
853
854impl HasLocks for kvrpcpb::PrewriteResponse {
855    fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
856        self.errors
857            .iter_mut()
858            .filter_map(|error| error.locked.take())
859            .collect()
860    }
861}
862
863pub fn new_unsafe_destroy_range_request(
864    start_key: Vec<u8>,
865    end_key: Vec<u8>,
866) -> kvrpcpb::UnsafeDestroyRangeRequest {
867    let mut req = kvrpcpb::UnsafeDestroyRangeRequest::default();
868    req.start_key = start_key;
869    req.end_key = end_key;
870    req
871}
872
873impl KvRequest for kvrpcpb::UnsafeDestroyRangeRequest {
874    type Response = kvrpcpb::UnsafeDestroyRangeResponse;
875}
876
877impl StoreRequest for kvrpcpb::UnsafeDestroyRangeRequest {
878    fn apply_store(&mut self, _store: &Store) {}
879}
880
881impl HasLocks for kvrpcpb::UnsafeDestroyRangeResponse {}
882
883impl Merge<kvrpcpb::UnsafeDestroyRangeResponse> for Collect {
884    type Out = ();
885
886    fn merge(&self, input: Vec<Result<kvrpcpb::UnsafeDestroyRangeResponse>>) -> Result<Self::Out> {
887        let _: Vec<kvrpcpb::UnsafeDestroyRangeResponse> =
888            input.into_iter().collect::<Result<Vec<_>>>()?;
889        Ok(())
890    }
891}
892
893#[cfg(test)]
894mod tests {
895    use crate::common::Error::PessimisticLockError;
896    use crate::common::Error::ResolveLockError;
897    use crate::proto::kvrpcpb;
898    use crate::request::plan::Merge;
899    use crate::request::CollectWithShard;
900    use crate::request::ResponseWithShard;
901    use crate::KvPair;
902
903    #[tokio::test]
904    async fn test_merge_pessimistic_lock_response() {
905        let (key1, key2, key3, key4) = (b"key1", b"key2", b"key3", b"key4");
906        let (value1, value4) = (b"value1", b"value4");
907        let value_empty = b"";
908
909        let resp1 = ResponseWithShard(
910            kvrpcpb::PessimisticLockResponse {
911                values: vec![value1.to_vec()],
912                ..Default::default()
913            },
914            vec![kvrpcpb::Mutation {
915                op: kvrpcpb::Op::PessimisticLock.into(),
916                key: key1.to_vec(),
917                ..Default::default()
918            }],
919        );
920
921        let resp_empty_value = ResponseWithShard(
922            kvrpcpb::PessimisticLockResponse {
923                values: vec![value_empty.to_vec()],
924                ..Default::default()
925            },
926            vec![kvrpcpb::Mutation {
927                op: kvrpcpb::Op::PessimisticLock.into(),
928                key: key2.to_vec(),
929                ..Default::default()
930            }],
931        );
932
933        let resp_not_found = ResponseWithShard(
934            kvrpcpb::PessimisticLockResponse {
935                values: vec![value_empty.to_vec(), value4.to_vec()],
936                not_founds: vec![true, false],
937                ..Default::default()
938            },
939            vec![
940                kvrpcpb::Mutation {
941                    op: kvrpcpb::Op::PessimisticLock.into(),
942                    key: key3.to_vec(),
943                    ..Default::default()
944                },
945                kvrpcpb::Mutation {
946                    op: kvrpcpb::Op::PessimisticLock.into(),
947                    key: key4.to_vec(),
948                    ..Default::default()
949                },
950            ],
951        );
952
953        let merger = CollectWithShard {};
954        {
955            // empty values & not founds are filtered.
956            let input = vec![
957                Ok(resp1.clone()),
958                Ok(resp_empty_value.clone()),
959                Ok(resp_not_found.clone()),
960            ];
961            let result = merger.merge(input);
962
963            assert_eq!(
964                result.unwrap(),
965                vec![
966                    KvPair::new(key1.to_vec(), value1.to_vec()),
967                    KvPair::new(key4.to_vec(), value4.to_vec()),
968                ]
969            );
970        }
971        {
972            let input = vec![
973                Ok(resp1),
974                Ok(resp_empty_value),
975                Err(ResolveLockError(vec![])),
976                Ok(resp_not_found),
977            ];
978            let result = merger.merge(input);
979
980            if let PessimisticLockError {
981                inner,
982                success_keys,
983            } = result.unwrap_err()
984            {
985                assert!(matches!(*inner, ResolveLockError(_)));
986                assert_eq!(
987                    success_keys,
988                    vec![key1.to_vec(), key2.to_vec(), key3.to_vec(), key4.to_vec()]
989                );
990            } else {
991                panic!();
992            }
993        }
994    }
995}