1use std::cmp;
4use std::iter;
5use std::sync::Arc;
6
7use either::Either;
8use futures::stream::BoxStream;
9use futures::stream::{self};
10use futures::StreamExt;
11
12use super::transaction::TXN_COMMIT_BATCH_SIZE;
13use crate::collect_single;
14use crate::common::Error::PessimisticLockError;
15use crate::pd::PdClient;
16use crate::proto::kvrpcpb::Action;
17use crate::proto::kvrpcpb::LockInfo;
18use crate::proto::kvrpcpb::TxnHeartBeatResponse;
19use crate::proto::kvrpcpb::TxnInfo;
20use crate::proto::kvrpcpb::{self};
21use crate::proto::pdpb::Timestamp;
22use crate::region::RegionWithLeader;
23use crate::request::Collect;
24use crate::request::CollectSingle;
25use crate::request::CollectWithShard;
26use crate::request::DefaultProcessor;
27use crate::request::HasNextBatch;
28use crate::request::KvRequest;
29use crate::request::Merge;
30use crate::request::NextBatch;
31use crate::request::Process;
32use crate::request::RangeRequest;
33use crate::request::ResponseWithShard;
34use crate::request::Shardable;
35use crate::request::SingleKey;
36use crate::request::{Batchable, StoreRequest};
37use crate::reversible_range_request;
38use crate::shardable_key;
39use crate::shardable_keys;
40use crate::shardable_range;
41use crate::store::RegionStore;
42use crate::store::Request;
43use crate::store::Store;
44use crate::store::{region_stream_for_keys, region_stream_for_range};
45use crate::timestamp::TimestampExt;
46use crate::transaction::requests::kvrpcpb::prewrite_request::PessimisticAction;
47use crate::transaction::HasLocks;
48use crate::util::iter::FlatMapOkIterExt;
49use crate::KvPair;
50use crate::Result;
51use crate::Value;
52
53macro_rules! pair_locks {
56 ($response_type:ty) => {
57 impl HasLocks for $response_type {
58 fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
59 if self.pairs.is_empty() {
60 self.error
61 .as_mut()
62 .and_then(|error| error.locked.take())
63 .into_iter()
64 .collect()
65 } else {
66 self.pairs
67 .iter_mut()
68 .filter_map(|pair| {
69 pair.error.as_mut().and_then(|error| error.locked.take())
70 })
71 .collect()
72 }
73 }
74 }
75 };
76}
77
78macro_rules! error_locks {
81 ($response_type:ty) => {
82 impl HasLocks for $response_type {
83 fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
84 self.error
85 .as_mut()
86 .and_then(|error| error.locked.take())
87 .into_iter()
88 .collect()
89 }
90 }
91 };
92}
93
94pub fn new_get_request(key: Vec<u8>, timestamp: u64) -> kvrpcpb::GetRequest {
95 let mut req = kvrpcpb::GetRequest::default();
96 req.key = key;
97 req.version = timestamp;
98 req
99}
100
101impl KvRequest for kvrpcpb::GetRequest {
102 type Response = kvrpcpb::GetResponse;
103}
104
105shardable_key!(kvrpcpb::GetRequest);
106collect_single!(kvrpcpb::GetResponse);
107impl SingleKey for kvrpcpb::GetRequest {
108 fn key(&self) -> &Vec<u8> {
109 &self.key
110 }
111}
112
113impl Process<kvrpcpb::GetResponse> for DefaultProcessor {
114 type Out = Option<Value>;
115
116 fn process(&self, input: Result<kvrpcpb::GetResponse>) -> Result<Self::Out> {
117 let input = input?;
118 Ok(if input.not_found {
119 None
120 } else {
121 Some(input.value)
122 })
123 }
124}
125
126pub fn new_batch_get_request(keys: Vec<Vec<u8>>, timestamp: u64) -> kvrpcpb::BatchGetRequest {
127 let mut req = kvrpcpb::BatchGetRequest::default();
128 req.keys = keys;
129 req.version = timestamp;
130 req
131}
132
133impl KvRequest for kvrpcpb::BatchGetRequest {
134 type Response = kvrpcpb::BatchGetResponse;
135}
136
137shardable_keys!(kvrpcpb::BatchGetRequest);
138
139impl Merge<kvrpcpb::BatchGetResponse> for Collect {
140 type Out = Vec<KvPair>;
141
142 fn merge(&self, input: Vec<Result<kvrpcpb::BatchGetResponse>>) -> Result<Self::Out> {
143 input
144 .into_iter()
145 .flat_map_ok(|resp| resp.pairs.into_iter().map(Into::into))
146 .collect()
147 }
148}
149
150pub fn new_scan_request(
151 start_key: Vec<u8>,
152 end_key: Vec<u8>,
153 timestamp: u64,
154 limit: u32,
155 key_only: bool,
156 reverse: bool,
157) -> kvrpcpb::ScanRequest {
158 let mut req = kvrpcpb::ScanRequest::default();
159 if !reverse {
160 req.start_key = start_key;
161 req.end_key = end_key;
162 } else {
163 req.start_key = end_key;
164 req.end_key = start_key;
165 }
166 req.limit = limit;
167 req.key_only = key_only;
168 req.version = timestamp;
169 req.reverse = reverse;
170 req
171}
172
173impl KvRequest for kvrpcpb::ScanRequest {
174 type Response = kvrpcpb::ScanResponse;
175}
176
177reversible_range_request!(kvrpcpb::ScanRequest);
178shardable_range!(kvrpcpb::ScanRequest);
179
180impl Merge<kvrpcpb::ScanResponse> for Collect {
181 type Out = Vec<KvPair>;
182
183 fn merge(&self, input: Vec<Result<kvrpcpb::ScanResponse>>) -> Result<Self::Out> {
184 input
185 .into_iter()
186 .flat_map_ok(|resp| resp.pairs.into_iter().map(Into::into))
187 .collect()
188 }
189}
190
191pub fn new_resolve_lock_request(
192 start_version: u64,
193 commit_version: u64,
194 is_txn_file: bool,
195) -> kvrpcpb::ResolveLockRequest {
196 let mut req = kvrpcpb::ResolveLockRequest::default();
197 req.start_version = start_version;
198 req.commit_version = commit_version;
199 req.is_txn_file = is_txn_file;
200 req
201}
202
203pub fn new_batch_resolve_lock_request(txn_infos: Vec<TxnInfo>) -> kvrpcpb::ResolveLockRequest {
204 let mut req = kvrpcpb::ResolveLockRequest::default();
205 req.txn_infos = txn_infos;
206 req
207}
208
209impl KvRequest for kvrpcpb::ResolveLockRequest {
214 type Response = kvrpcpb::ResolveLockResponse;
215}
216
217pub fn new_prewrite_request(
218 mutations: Vec<kvrpcpb::Mutation>,
219 primary_lock: Vec<u8>,
220 start_version: u64,
221 lock_ttl: u64,
222) -> kvrpcpb::PrewriteRequest {
223 let mut req = kvrpcpb::PrewriteRequest::default();
224 req.mutations = mutations;
225 req.primary_lock = primary_lock;
226 req.start_version = start_version;
227 req.lock_ttl = lock_ttl;
228 req.txn_size = u64::MAX;
230
231 req
232}
233
234pub fn new_pessimistic_prewrite_request(
235 mutations: Vec<kvrpcpb::Mutation>,
236 primary_lock: Vec<u8>,
237 start_version: u64,
238 lock_ttl: u64,
239 for_update_ts: u64,
240) -> kvrpcpb::PrewriteRequest {
241 let len = mutations.len();
242 let mut req = new_prewrite_request(mutations, primary_lock, start_version, lock_ttl);
243 req.for_update_ts = for_update_ts;
244 req.pessimistic_actions = iter::repeat(PessimisticAction::DoPessimisticCheck.into())
245 .take(len)
246 .collect();
247 req
248}
249
250impl KvRequest for kvrpcpb::PrewriteRequest {
251 type Response = kvrpcpb::PrewriteResponse;
252}
253
254impl Shardable for kvrpcpb::PrewriteRequest {
255 type Shard = Vec<kvrpcpb::Mutation>;
256
257 fn shards(
258 &self,
259 pd_client: &Arc<impl PdClient>,
260 ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
261 let mut mutations = self.mutations.clone();
262 mutations.sort_by(|a, b| a.key.cmp(&b.key));
263
264 region_stream_for_keys(mutations.into_iter(), pd_client.clone())
265 .flat_map(|result| match result {
266 Ok((mutations, region)) => stream::iter(kvrpcpb::PrewriteRequest::batches(
267 mutations,
268 TXN_COMMIT_BATCH_SIZE,
269 ))
270 .map(move |batch| Ok((batch, region.clone())))
271 .boxed(),
272 Err(e) => stream::iter(Err(e)).boxed(),
273 })
274 .boxed()
275 }
276
277 fn apply_shard(&mut self, shard: Self::Shard) {
278 if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) {
280 self.secondaries = vec![];
281 }
282
283 if self.try_one_pc && shard.len() != self.secondaries.len() + 1 {
285 self.try_one_pc = false;
286 }
287
288 self.mutations = shard;
289 }
290
291 fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
292 self.set_leader(&store.region_with_leader)
293 }
294}
295
296impl Batchable for kvrpcpb::PrewriteRequest {
297 type Item = kvrpcpb::Mutation;
298
299 fn item_size(item: &Self::Item) -> u64 {
300 let mut size = item.key.len() as u64;
301 size += item.value.len() as u64;
302 size
303 }
304}
305
306pub fn new_commit_request(
307 keys: Vec<Vec<u8>>,
308 start_version: u64,
309 commit_version: u64,
310) -> kvrpcpb::CommitRequest {
311 let mut req = kvrpcpb::CommitRequest::default();
312 req.keys = keys;
313 req.start_version = start_version;
314 req.commit_version = commit_version;
315
316 req
317}
318
319impl KvRequest for kvrpcpb::CommitRequest {
320 type Response = kvrpcpb::CommitResponse;
321}
322
323impl Shardable for kvrpcpb::CommitRequest {
324 type Shard = Vec<Vec<u8>>;
325
326 fn shards(
327 &self,
328 pd_client: &Arc<impl PdClient>,
329 ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
330 let mut keys = self.keys.clone();
331 keys.sort();
332
333 region_stream_for_keys(keys.into_iter(), pd_client.clone())
334 .flat_map(|result| match result {
335 Ok((keys, region)) => {
336 stream::iter(kvrpcpb::CommitRequest::batches(keys, TXN_COMMIT_BATCH_SIZE))
337 .map(move |batch| Ok((batch, region.clone())))
338 .boxed()
339 }
340 Err(e) => stream::iter(Err(e)).boxed(),
341 })
342 .boxed()
343 }
344
345 fn apply_shard(&mut self, shard: Self::Shard) {
346 self.keys = shard.into_iter().map(Into::into).collect();
347 }
348
349 fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
350 self.set_leader(&store.region_with_leader)
351 }
352}
353
354impl Batchable for kvrpcpb::CommitRequest {
355 type Item = Vec<u8>;
356
357 fn item_size(item: &Self::Item) -> u64 {
358 item.len() as u64
359 }
360}
361
362pub fn new_batch_rollback_request(
363 keys: Vec<Vec<u8>>,
364 start_version: u64,
365) -> kvrpcpb::BatchRollbackRequest {
366 let mut req = kvrpcpb::BatchRollbackRequest::default();
367 req.keys = keys;
368 req.start_version = start_version;
369
370 req
371}
372
373impl KvRequest for kvrpcpb::BatchRollbackRequest {
374 type Response = kvrpcpb::BatchRollbackResponse;
375}
376
377shardable_keys!(kvrpcpb::BatchRollbackRequest);
378
379pub fn new_pessimistic_rollback_request(
380 keys: Vec<Vec<u8>>,
381 start_version: u64,
382 for_update_ts: u64,
383) -> kvrpcpb::PessimisticRollbackRequest {
384 let mut req = kvrpcpb::PessimisticRollbackRequest::default();
385 req.keys = keys;
386 req.start_version = start_version;
387 req.for_update_ts = for_update_ts;
388
389 req
390}
391
392impl KvRequest for kvrpcpb::PessimisticRollbackRequest {
393 type Response = kvrpcpb::PessimisticRollbackResponse;
394}
395
396shardable_keys!(kvrpcpb::PessimisticRollbackRequest);
397
398pub fn new_pessimistic_lock_request(
399 mutations: Vec<kvrpcpb::Mutation>,
400 primary_lock: Vec<u8>,
401 start_version: u64,
402 lock_ttl: u64,
403 for_update_ts: u64,
404 need_value: bool,
405) -> kvrpcpb::PessimisticLockRequest {
406 let mut req = kvrpcpb::PessimisticLockRequest::default();
407 req.mutations = mutations;
408 req.primary_lock = primary_lock;
409 req.start_version = start_version;
410 req.lock_ttl = lock_ttl;
411 req.for_update_ts = for_update_ts;
412 req.is_first_lock = false;
414 req.wait_timeout = 0;
415 req.return_values = need_value;
416 req.min_commit_ts = 0;
418
419 req
420}
421
422impl KvRequest for kvrpcpb::PessimisticLockRequest {
423 type Response = kvrpcpb::PessimisticLockResponse;
424}
425
426impl Shardable for kvrpcpb::PessimisticLockRequest {
427 type Shard = Vec<kvrpcpb::Mutation>;
428
429 fn shards(
430 &self,
431 pd_client: &Arc<impl PdClient>,
432 ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
433 let mut mutations = self.mutations.clone();
434 mutations.sort_by(|a, b| a.key.cmp(&b.key));
435 region_stream_for_keys(mutations.into_iter(), pd_client.clone())
436 }
437
438 fn apply_shard(&mut self, shard: Self::Shard) {
439 self.mutations = shard;
440 }
441
442 fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
443 self.set_leader(&store.region_with_leader)
444 }
445}
446
447impl Merge<ResponseWithShard<kvrpcpb::PessimisticLockResponse, Vec<kvrpcpb::Mutation>>>
450 for CollectWithShard
451{
452 type Out = Vec<KvPair>;
453
454 fn merge(
455 &self,
456 input: Vec<
457 Result<ResponseWithShard<kvrpcpb::PessimisticLockResponse, Vec<kvrpcpb::Mutation>>>,
458 >,
459 ) -> Result<Self::Out> {
460 if input.iter().any(Result::is_err) {
461 let (success, mut errors): (Vec<_>, Vec<_>) =
462 input.into_iter().partition(Result::is_ok);
463 let first_err = errors.pop().unwrap();
464 let success_keys = success
465 .into_iter()
466 .map(Result::unwrap)
467 .flat_map(|ResponseWithShard(_resp, mutations)| {
468 mutations.into_iter().map(|m| m.key)
469 })
470 .collect();
471 Err(PessimisticLockError {
472 inner: Box::new(first_err.unwrap_err()),
473 success_keys,
474 })
475 } else {
476 Ok(input
477 .into_iter()
478 .map(Result::unwrap)
479 .flat_map(|ResponseWithShard(resp, mutations)| {
480 let values: Vec<Vec<u8>> = resp.values;
481 let values_len = values.len();
482 let not_founds = resp.not_founds;
483 let kvpairs = mutations
484 .into_iter()
485 .map(|m| m.key)
486 .zip(values)
487 .map(KvPair::from);
488 assert_eq!(kvpairs.len(), values_len);
489 if not_founds.is_empty() {
490 Either::Left(kvpairs.filter(|kvpair| !kvpair.value().is_empty()))
494 } else {
495 assert_eq!(kvpairs.len(), not_founds.len());
496 Either::Right(kvpairs.zip(not_founds).filter_map(|(kvpair, not_found)| {
497 if not_found {
498 None
499 } else {
500 Some(kvpair)
501 }
502 }))
503 }
504 })
505 .collect())
506 }
507 }
508}
509
510pub fn new_scan_lock_request(
511 start_key: Vec<u8>,
512 end_key: Vec<u8>,
513 safepoint: u64,
514 limit: u32,
515) -> kvrpcpb::ScanLockRequest {
516 let mut req = kvrpcpb::ScanLockRequest::default();
517 req.start_key = start_key;
518 req.end_key = end_key;
519 req.max_version = safepoint;
520 req.limit = limit;
521 req
522}
523
524impl KvRequest for kvrpcpb::ScanLockRequest {
525 type Response = kvrpcpb::ScanLockResponse;
526}
527
528impl Shardable for kvrpcpb::ScanLockRequest {
529 type Shard = (Vec<u8>, Vec<u8>);
530
531 fn shards(
532 &self,
533 pd_client: &Arc<impl PdClient>,
534 ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
535 region_stream_for_range(
536 (self.start_key.clone(), self.end_key.clone()),
537 pd_client.clone(),
538 )
539 }
540
541 fn apply_shard(&mut self, shard: Self::Shard) {
542 self.start_key = shard.0;
543 }
544
545 fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
546 self.set_leader(&store.region_with_leader)
547 }
548}
549
550impl HasNextBatch for kvrpcpb::ScanLockResponse {
551 fn has_next_batch(&self) -> Option<(Vec<u8>, Vec<u8>)> {
552 self.locks.last().map(|lock| {
553 let mut start_key: Vec<u8> = lock.key.clone();
555 start_key.push(0);
556 (start_key, vec![])
557 })
558 }
559}
560
561impl NextBatch for kvrpcpb::ScanLockRequest {
562 fn next_batch(&mut self, range: (Vec<u8>, Vec<u8>)) {
563 self.start_key = range.0;
564 }
565}
566
567impl Merge<kvrpcpb::ScanLockResponse> for Collect {
568 type Out = Vec<kvrpcpb::LockInfo>;
569
570 fn merge(&self, input: Vec<Result<kvrpcpb::ScanLockResponse>>) -> Result<Self::Out> {
571 input
572 .into_iter()
573 .flat_map_ok(|mut resp| resp.take_locks().into_iter().map(Into::into))
574 .collect()
575 }
576}
577
578pub fn new_heart_beat_request(
579 start_ts: u64,
580 primary_lock: Vec<u8>,
581 ttl: u64,
582) -> kvrpcpb::TxnHeartBeatRequest {
583 let mut req = kvrpcpb::TxnHeartBeatRequest::default();
584 req.start_version = start_ts;
585 req.primary_lock = primary_lock;
586 req.advise_lock_ttl = ttl;
587 req
588}
589
590impl KvRequest for kvrpcpb::TxnHeartBeatRequest {
591 type Response = kvrpcpb::TxnHeartBeatResponse;
592}
593
594impl Shardable for kvrpcpb::TxnHeartBeatRequest {
595 type Shard = Vec<Vec<u8>>;
596
597 fn shards(
598 &self,
599 pd_client: &Arc<impl PdClient>,
600 ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
601 region_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone())
602 }
603
604 fn apply_shard(&mut self, mut shard: Self::Shard) {
605 assert!(shard.len() == 1);
606 self.primary_lock = shard.pop().unwrap();
607 }
608
609 fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
610 self.set_leader(&store.region_with_leader)
611 }
612}
613
614collect_single!(TxnHeartBeatResponse);
615
616impl SingleKey for kvrpcpb::TxnHeartBeatRequest {
617 fn key(&self) -> &Vec<u8> {
618 &self.primary_lock
619 }
620}
621
622impl Process<kvrpcpb::TxnHeartBeatResponse> for DefaultProcessor {
623 type Out = u64;
624
625 fn process(&self, input: Result<kvrpcpb::TxnHeartBeatResponse>) -> Result<Self::Out> {
626 Ok(input?.lock_ttl)
627 }
628}
629
630#[allow(clippy::too_many_arguments)]
631pub fn new_check_txn_status_request(
632 primary_key: Vec<u8>,
633 lock_ts: u64,
634 caller_start_ts: u64,
635 current_ts: u64,
636 rollback_if_not_exist: bool,
637 force_sync_commit: bool,
638 resolving_pessimistic_lock: bool,
639 is_txn_file: bool,
640) -> kvrpcpb::CheckTxnStatusRequest {
641 let mut req = kvrpcpb::CheckTxnStatusRequest::default();
642 req.primary_key = primary_key;
643 req.lock_ts = lock_ts;
644 req.caller_start_ts = caller_start_ts;
645 req.current_ts = current_ts;
646 req.rollback_if_not_exist = rollback_if_not_exist;
647 req.force_sync_commit = force_sync_commit;
648 req.resolving_pessimistic_lock = resolving_pessimistic_lock;
649 req.verify_is_primary = true;
650 req.is_txn_file = is_txn_file;
651 req
652}
653
654impl KvRequest for kvrpcpb::CheckTxnStatusRequest {
655 type Response = kvrpcpb::CheckTxnStatusResponse;
656}
657
658impl Shardable for kvrpcpb::CheckTxnStatusRequest {
659 type Shard = Vec<Vec<u8>>;
660
661 fn shards(
662 &self,
663 pd_client: &Arc<impl PdClient>,
664 ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
665 region_stream_for_keys(std::iter::once(self.key().clone()), pd_client.clone())
666 }
667
668 fn apply_shard(&mut self, mut shard: Self::Shard) {
669 assert!(shard.len() == 1);
670 self.primary_key = shard.pop().unwrap();
671 }
672
673 fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
674 self.set_leader(&store.region_with_leader)
675 }
676}
677
678impl SingleKey for kvrpcpb::CheckTxnStatusRequest {
679 fn key(&self) -> &Vec<u8> {
680 &self.primary_key
681 }
682}
683
684collect_single!(kvrpcpb::CheckTxnStatusResponse);
685
686impl Process<kvrpcpb::CheckTxnStatusResponse> for DefaultProcessor {
687 type Out = TransactionStatus;
688
689 fn process(&self, input: Result<kvrpcpb::CheckTxnStatusResponse>) -> Result<Self::Out> {
690 Ok(input?.into())
691 }
692}
693
694#[derive(Debug, Clone)]
695pub struct TransactionStatus {
696 pub kind: TransactionStatusKind,
697 pub action: kvrpcpb::Action,
698 pub is_expired: bool, }
700
701impl From<kvrpcpb::CheckTxnStatusResponse> for TransactionStatus {
702 fn from(mut resp: kvrpcpb::CheckTxnStatusResponse) -> TransactionStatus {
703 TransactionStatus {
704 action: Action::try_from(resp.action).unwrap(),
705 kind: (resp.commit_version, resp.lock_ttl, resp.lock_info.take()).into(),
706 is_expired: false,
707 }
708 }
709}
710
711#[derive(Debug, Clone)]
712pub enum TransactionStatusKind {
713 Committed(Timestamp),
714 RolledBack,
715 Locked(u64, kvrpcpb::LockInfo), }
717
718impl TransactionStatus {
719 pub fn check_ttl(&mut self, current: Timestamp) {
720 if let TransactionStatusKind::Locked(ref ttl, ref lock_info) = self.kind {
721 if current.physical - Timestamp::from_version(lock_info.lock_version).physical
722 >= *ttl as i64
723 {
724 self.is_expired = true
725 }
726 }
727 }
728
729 pub fn is_cacheable(&self) -> bool {
739 match &self.kind {
740 TransactionStatusKind::RolledBack | TransactionStatusKind::Committed(..) => true,
741 TransactionStatusKind::Locked(..) if self.is_expired => matches!(
742 self.action,
743 kvrpcpb::Action::NoAction
744 | kvrpcpb::Action::LockNotExistRollback
745 | kvrpcpb::Action::TtlExpireRollback
746 ),
747 _ => false,
748 }
749 }
750}
751
752impl From<(u64, u64, Option<kvrpcpb::LockInfo>)> for TransactionStatusKind {
753 fn from((ts, ttl, info): (u64, u64, Option<kvrpcpb::LockInfo>)) -> TransactionStatusKind {
754 match (ts, ttl, info) {
755 (0, 0, None) => TransactionStatusKind::RolledBack,
756 (ts, 0, None) => TransactionStatusKind::Committed(Timestamp::from_version(ts)),
757 (0, ttl, Some(info)) => TransactionStatusKind::Locked(ttl, info),
758 _ => unreachable!(),
759 }
760 }
761}
762
763pub fn new_check_secondary_locks_request(
764 keys: Vec<Vec<u8>>,
765 start_version: u64,
766) -> kvrpcpb::CheckSecondaryLocksRequest {
767 let mut req = kvrpcpb::CheckSecondaryLocksRequest::default();
768 req.keys = keys;
769 req.start_version = start_version;
770 req
771}
772
773impl KvRequest for kvrpcpb::CheckSecondaryLocksRequest {
774 type Response = kvrpcpb::CheckSecondaryLocksResponse;
775}
776
777shardable_keys!(kvrpcpb::CheckSecondaryLocksRequest);
778
779impl Merge<kvrpcpb::CheckSecondaryLocksResponse> for Collect {
780 type Out = SecondaryLocksStatus;
781
782 fn merge(&self, input: Vec<Result<kvrpcpb::CheckSecondaryLocksResponse>>) -> Result<Self::Out> {
783 let mut out = SecondaryLocksStatus {
784 commit_ts: None,
785 min_commit_ts: 0,
786 fallback_2pc: false,
787 };
788 for resp in input {
789 let resp = resp?;
790 for lock in resp.locks.into_iter() {
791 if !lock.use_async_commit {
792 out.fallback_2pc = true;
793 return Ok(out);
794 }
795 out.min_commit_ts = cmp::max(out.min_commit_ts, lock.min_commit_ts);
796 }
797 out.commit_ts = match (
798 out.commit_ts.take(),
799 Timestamp::try_from_version(resp.commit_ts),
800 ) {
801 (Some(a), Some(b)) => {
802 assert_eq!(a, b);
803 Some(a)
804 }
805 (Some(a), None) => Some(a),
806 (None, Some(b)) => Some(b),
807 (None, None) => None,
808 };
809 }
810 Ok(out)
811 }
812}
813
814pub struct SecondaryLocksStatus {
815 pub commit_ts: Option<Timestamp>,
816 pub min_commit_ts: u64,
817 pub fallback_2pc: bool,
818}
819
820pair_locks!(kvrpcpb::BatchGetResponse);
821pair_locks!(kvrpcpb::ScanResponse);
822error_locks!(kvrpcpb::GetResponse);
823error_locks!(kvrpcpb::ResolveLockResponse);
824error_locks!(kvrpcpb::CommitResponse);
825error_locks!(kvrpcpb::BatchRollbackResponse);
826error_locks!(kvrpcpb::TxnHeartBeatResponse);
827error_locks!(kvrpcpb::CheckTxnStatusResponse);
828error_locks!(kvrpcpb::CheckSecondaryLocksResponse);
829
830impl HasLocks for kvrpcpb::ScanLockResponse {
831 fn take_locks(&mut self) -> Vec<LockInfo> {
832 std::mem::take(&mut self.locks)
833 }
834}
835
836impl HasLocks for kvrpcpb::PessimisticRollbackResponse {
837 fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
838 self.errors
839 .iter_mut()
840 .filter_map(|error| error.locked.take())
841 .collect()
842 }
843}
844
845impl HasLocks for kvrpcpb::PessimisticLockResponse {
846 fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
847 self.errors
848 .iter_mut()
849 .filter_map(|error| error.locked.take())
850 .collect()
851 }
852}
853
854impl HasLocks for kvrpcpb::PrewriteResponse {
855 fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
856 self.errors
857 .iter_mut()
858 .filter_map(|error| error.locked.take())
859 .collect()
860 }
861}
862
863pub fn new_unsafe_destroy_range_request(
864 start_key: Vec<u8>,
865 end_key: Vec<u8>,
866) -> kvrpcpb::UnsafeDestroyRangeRequest {
867 let mut req = kvrpcpb::UnsafeDestroyRangeRequest::default();
868 req.start_key = start_key;
869 req.end_key = end_key;
870 req
871}
872
873impl KvRequest for kvrpcpb::UnsafeDestroyRangeRequest {
874 type Response = kvrpcpb::UnsafeDestroyRangeResponse;
875}
876
877impl StoreRequest for kvrpcpb::UnsafeDestroyRangeRequest {
878 fn apply_store(&mut self, _store: &Store) {}
879}
880
881impl HasLocks for kvrpcpb::UnsafeDestroyRangeResponse {}
882
883impl Merge<kvrpcpb::UnsafeDestroyRangeResponse> for Collect {
884 type Out = ();
885
886 fn merge(&self, input: Vec<Result<kvrpcpb::UnsafeDestroyRangeResponse>>) -> Result<Self::Out> {
887 let _: Vec<kvrpcpb::UnsafeDestroyRangeResponse> =
888 input.into_iter().collect::<Result<Vec<_>>>()?;
889 Ok(())
890 }
891}
892
893#[cfg(test)]
894mod tests {
895 use crate::common::Error::PessimisticLockError;
896 use crate::common::Error::ResolveLockError;
897 use crate::proto::kvrpcpb;
898 use crate::request::plan::Merge;
899 use crate::request::CollectWithShard;
900 use crate::request::ResponseWithShard;
901 use crate::KvPair;
902
903 #[tokio::test]
904 async fn test_merge_pessimistic_lock_response() {
905 let (key1, key2, key3, key4) = (b"key1", b"key2", b"key3", b"key4");
906 let (value1, value4) = (b"value1", b"value4");
907 let value_empty = b"";
908
909 let resp1 = ResponseWithShard(
910 kvrpcpb::PessimisticLockResponse {
911 values: vec![value1.to_vec()],
912 ..Default::default()
913 },
914 vec![kvrpcpb::Mutation {
915 op: kvrpcpb::Op::PessimisticLock.into(),
916 key: key1.to_vec(),
917 ..Default::default()
918 }],
919 );
920
921 let resp_empty_value = ResponseWithShard(
922 kvrpcpb::PessimisticLockResponse {
923 values: vec![value_empty.to_vec()],
924 ..Default::default()
925 },
926 vec![kvrpcpb::Mutation {
927 op: kvrpcpb::Op::PessimisticLock.into(),
928 key: key2.to_vec(),
929 ..Default::default()
930 }],
931 );
932
933 let resp_not_found = ResponseWithShard(
934 kvrpcpb::PessimisticLockResponse {
935 values: vec![value_empty.to_vec(), value4.to_vec()],
936 not_founds: vec![true, false],
937 ..Default::default()
938 },
939 vec![
940 kvrpcpb::Mutation {
941 op: kvrpcpb::Op::PessimisticLock.into(),
942 key: key3.to_vec(),
943 ..Default::default()
944 },
945 kvrpcpb::Mutation {
946 op: kvrpcpb::Op::PessimisticLock.into(),
947 key: key4.to_vec(),
948 ..Default::default()
949 },
950 ],
951 );
952
953 let merger = CollectWithShard {};
954 {
955 let input = vec![
957 Ok(resp1.clone()),
958 Ok(resp_empty_value.clone()),
959 Ok(resp_not_found.clone()),
960 ];
961 let result = merger.merge(input);
962
963 assert_eq!(
964 result.unwrap(),
965 vec![
966 KvPair::new(key1.to_vec(), value1.to_vec()),
967 KvPair::new(key4.to_vec(), value4.to_vec()),
968 ]
969 );
970 }
971 {
972 let input = vec![
973 Ok(resp1),
974 Ok(resp_empty_value),
975 Err(ResolveLockError(vec![])),
976 Ok(resp_not_found),
977 ];
978 let result = merger.merge(input);
979
980 if let PessimisticLockError {
981 inner,
982 success_keys,
983 } = result.unwrap_err()
984 {
985 assert!(matches!(*inner, ResolveLockError(_)));
986 assert_eq!(
987 success_keys,
988 vec![key1.to_vec(), key2.to_vec(), key3.to_vec(), key4.to_vec()]
989 );
990 } else {
991 panic!();
992 }
993 }
994 }
995}