1use std::iter;
4use std::sync::atomic;
5use std::sync::atomic::AtomicU8;
6use std::sync::Arc;
7use std::time::Instant;
8
9use derive_new::new;
10use fail::fail_point;
11use futures::prelude::*;
12use log::{debug, error, info, trace, warn};
13use tokio::time::Duration;
14
15use crate::backoff::Backoff;
16use crate::backoff::DEFAULT_REGION_BACKOFF;
17use crate::kv::HexRepr;
18use crate::pd::PdClient;
19use crate::pd::PdRpcClient;
20use crate::proto::kvrpcpb;
21use crate::proto::pdpb::Timestamp;
22use crate::request::Collect;
23use crate::request::CollectError;
24use crate::request::CollectSingle;
25use crate::request::CollectWithShard;
26use crate::request::EncodeKeyspace;
27use crate::request::KeyMode;
28use crate::request::Keyspace;
29use crate::request::Plan;
30use crate::request::PlanBuilder;
31use crate::request::RetryOptions;
32use crate::request::TruncateKeyspace;
33use crate::timestamp::TimestampExt;
34use crate::transaction::buffer::Buffer;
35use crate::transaction::lowering::*;
36use crate::BoundRange;
37use crate::Error;
38use crate::Key;
39use crate::KvPair;
40use crate::Result;
41use crate::Value;
42
43pub struct Transaction<PdC: PdClient = PdRpcClient> {
82 status: Arc<AtomicU8>,
83 timestamp: Timestamp,
84 buffer: Buffer,
85 rpc: Arc<PdC>,
86 options: TransactionOptions,
87 keyspace: Keyspace,
88 is_heartbeat_started: bool,
89 start_instant: Instant,
90}
91
92impl<PdC: PdClient> Transaction<PdC> {
93 pub(crate) fn new(
94 timestamp: Timestamp,
95 rpc: Arc<PdC>,
96 options: TransactionOptions,
97 keyspace: Keyspace,
98 ) -> Transaction<PdC> {
99 let status = if options.read_only {
100 TransactionStatus::ReadOnly
101 } else {
102 TransactionStatus::Active
103 };
104 Transaction {
105 status: Arc::new(AtomicU8::new(status as u8)),
106 timestamp,
107 buffer: Buffer::new(options.is_pessimistic()),
108 rpc,
109 options,
110 keyspace,
111 is_heartbeat_started: false,
112 start_instant: std::time::Instant::now(),
113 }
114 }
115
116 pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
135 trace!("invoking transactional get request");
136 self.check_allow_operation().await?;
137 let timestamp = self.timestamp.clone();
138 let rpc = self.rpc.clone();
139 let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
140 let retry_options = self.options.retry_options.clone();
141 let keyspace = self.keyspace;
142
143 self.buffer
144 .get_or_else(key, |key| async move {
145 let request = new_get_request(key, timestamp.clone());
146 let plan = PlanBuilder::new(rpc, keyspace, request)
147 .resolve_lock(timestamp, retry_options.lock_backoff, keyspace)
148 .retry_multi_region(DEFAULT_REGION_BACKOFF)
149 .merge(CollectSingle)
150 .post_process_default()
151 .plan();
152 plan.execute().await
153 })
154 .await
155 }
156
157 pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
200 debug!("invoking transactional get_for_update request");
201 self.check_allow_operation().await?;
202 if !self.is_pessimistic() {
203 let key = key.into();
204 self.lock_keys(iter::once(key.clone())).await?;
205 self.get(key).await
206 } else {
207 let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
208 let mut pairs = self.pessimistic_lock(iter::once(key), true).await?;
209 debug_assert!(pairs.len() <= 1);
210 match pairs.pop() {
211 Some(pair) => Ok(Some(pair.1)),
212 None => Ok(None),
213 }
214 }
215 }
216
217 pub async fn key_exists(&mut self, key: impl Into<Key>) -> Result<bool> {
232 debug!("invoking transactional key_exists request");
233 Ok(self.get(key).await?.is_some())
234 }
235
236 pub async fn batch_get(
265 &mut self,
266 keys: impl IntoIterator<Item = impl Into<Key>>,
267 ) -> Result<impl Iterator<Item = KvPair>> {
268 debug!("invoking transactional batch_get request");
269 self.check_allow_operation().await?;
270 let timestamp = self.timestamp.clone();
271 let rpc = self.rpc.clone();
272 let keyspace = self.keyspace;
273 let keys = keys
274 .into_iter()
275 .map(move |k| k.into().encode_keyspace(keyspace, KeyMode::Txn));
276 let retry_options = self.options.retry_options.clone();
277
278 self.buffer
279 .batch_get_or_else(keys, move |keys| async move {
280 let request = new_batch_get_request(keys, timestamp.clone());
281 let plan = PlanBuilder::new(rpc, keyspace, request)
282 .resolve_lock(timestamp, retry_options.lock_backoff, keyspace)
283 .retry_multi_region(retry_options.region_backoff)
284 .merge(Collect)
285 .plan();
286 plan.execute()
287 .await
288 .map(|r| r.into_iter().map(Into::into).collect())
289 })
290 .await
291 .map(move |pairs| pairs.map(move |pair| pair.truncate_keyspace(keyspace)))
292 }
293
294 pub async fn batch_get_for_update(
322 &mut self,
323 keys: impl IntoIterator<Item = impl Into<Key>>,
324 ) -> Result<Vec<KvPair>> {
325 debug!("invoking transactional batch_get_for_update request");
326 self.check_allow_operation().await?;
327 if !self.is_pessimistic() {
328 let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
329 self.lock_keys(keys.clone()).await?;
330 Ok(self.batch_get(keys).await?.collect())
331 } else {
332 let keyspace = self.keyspace;
333 let keys = keys
334 .into_iter()
335 .map(move |k| k.into().encode_keyspace(keyspace, KeyMode::Txn));
336 let pairs = self
337 .pessimistic_lock(keys, true)
338 .await?
339 .truncate_keyspace(keyspace);
340 Ok(pairs)
341 }
342 }
343
344 pub async fn scan(
373 &mut self,
374 range: impl Into<BoundRange>,
375 limit: u32,
376 ) -> Result<impl Iterator<Item = KvPair>> {
377 debug!("invoking transactional scan request");
378 self.scan_inner(range, limit, false, false).await
379 }
380
381 pub async fn scan_keys(
409 &mut self,
410 range: impl Into<BoundRange>,
411 limit: u32,
412 ) -> Result<impl Iterator<Item = Key>> {
413 debug!("invoking transactional scan_keys request");
414 Ok(self
415 .scan_inner(range, limit, true, false)
416 .await?
417 .map(KvPair::into_key))
418 }
419
420 pub async fn scan_reverse(
424 &mut self,
425 range: impl Into<BoundRange>,
426 limit: u32,
427 ) -> Result<impl Iterator<Item = KvPair>> {
428 debug!("invoking transactional scan_reverse request");
429 self.scan_inner(range, limit, false, true).await
430 }
431
432 pub async fn scan_keys_reverse(
436 &mut self,
437 range: impl Into<BoundRange>,
438 limit: u32,
439 ) -> Result<impl Iterator<Item = Key>> {
440 debug!("invoking transactional scan_keys_reverse request");
441 Ok(self
442 .scan_inner(range, limit, true, true)
443 .await?
444 .map(KvPair::into_key))
445 }
446
447 pub async fn put(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
464 trace!("invoking transactional put request");
465 self.check_allow_operation().await?;
466 let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
467 if self.is_pessimistic() {
468 self.pessimistic_lock(iter::once(key.clone()), false)
469 .await?;
470 }
471 self.buffer.put(key, value.into());
472 Ok(())
473 }
474
475 pub async fn insert(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
495 debug!("invoking transactional insert request");
496 self.check_allow_operation().await?;
497 let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
498 if self.buffer.get(&key).is_some() {
499 return Err(Error::DuplicateKeyInsertion);
500 }
501 if self.is_pessimistic() {
502 self.pessimistic_lock(
503 iter::once((key.clone(), kvrpcpb::Assertion::NotExist)),
504 false,
505 )
506 .await?;
507 }
508 self.buffer.insert(key, value.into());
509 Ok(())
510 }
511
512 pub async fn delete(&mut self, key: impl Into<Key>) -> Result<()> {
530 debug!("invoking transactional delete request");
531 self.check_allow_operation().await?;
532 let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn);
533 if self.is_pessimistic() {
534 self.pessimistic_lock(iter::once(key.clone()), false)
535 .await?;
536 }
537 self.buffer.delete(key);
538 Ok(())
539 }
540
541 pub async fn batch_mutate(
562 &mut self,
563 mutations: impl IntoIterator<Item = Mutation>,
564 ) -> Result<()> {
565 debug!("invoking transactional batch mutate request");
566 self.check_allow_operation().await?;
567 let mutations: Vec<Mutation> = mutations
568 .into_iter()
569 .map(|mutation| mutation.encode_keyspace(self.keyspace, KeyMode::Txn))
570 .collect();
571 if self.is_pessimistic() {
572 self.pessimistic_lock(mutations.iter().map(|m| m.key().clone()), false)
573 .await?;
574 for m in mutations {
575 self.buffer.mutate(m);
576 }
577 } else {
578 for m in mutations.into_iter() {
579 self.buffer.mutate(m);
580 }
581 }
582 Ok(())
583 }
584
585 pub async fn lock_keys(
609 &mut self,
610 keys: impl IntoIterator<Item = impl Into<Key>>,
611 ) -> Result<()> {
612 debug!("invoking transactional lock_keys request");
613 self.check_allow_operation().await?;
614 let keyspace = self.keyspace;
615 let keys = keys
616 .into_iter()
617 .map(move |k| k.into().encode_keyspace(keyspace, KeyMode::Txn));
618 match self.options.kind {
619 TransactionKind::Optimistic => {
620 for key in keys {
621 self.buffer.lock(key);
622 }
623 }
624 TransactionKind::Pessimistic(_) => {
625 self.pessimistic_lock(keys, false).await?;
626 }
627 }
628 Ok(())
629 }
630
631 pub async fn commit(&mut self) -> Result<Option<Timestamp>> {
647 debug!("commiting transaction");
648 if !self.transit_status(
649 |status| {
650 matches!(
651 status,
652 TransactionStatus::StartedCommit | TransactionStatus::Active
653 )
654 },
655 TransactionStatus::StartedCommit,
656 ) {
657 return Err(Error::OperationAfterCommitError);
658 }
659
660 let primary_key = self.buffer.get_primary_key();
661 let mutations = self.buffer.to_proto_mutations();
662 if mutations.is_empty() {
663 assert!(primary_key.is_none());
664 return Ok(None);
665 }
666
667 self.start_auto_heartbeat().await;
668
669 let res = Committer::new(
670 primary_key,
671 mutations,
672 self.timestamp.clone(),
673 self.rpc.clone(),
674 self.options.clone(),
675 self.keyspace,
676 self.buffer.get_write_size() as u64,
677 self.start_instant,
678 )
679 .commit()
680 .await;
681
682 if res.is_ok() {
683 self.set_status(TransactionStatus::Committed);
684 }
685 res
686 }
687
688 pub async fn rollback(&mut self) -> Result<()> {
705 debug!("rolling back transaction");
706 if !self.transit_status(
707 |status| {
708 matches!(
709 status,
710 TransactionStatus::StartedRollback
711 | TransactionStatus::Active
712 | TransactionStatus::StartedCommit
713 )
714 },
715 TransactionStatus::StartedRollback,
716 ) {
717 return Err(Error::OperationAfterCommitError);
718 }
719
720 let primary_key = self.buffer.get_primary_key();
721 let mutations = self.buffer.to_proto_mutations();
722 let res = Committer::new(
723 primary_key,
724 mutations,
725 self.timestamp.clone(),
726 self.rpc.clone(),
727 self.options.clone(),
728 self.keyspace,
729 self.buffer.get_write_size() as u64,
730 self.start_instant,
731 )
732 .rollback()
733 .await;
734
735 if res.is_ok() {
736 self.set_status(TransactionStatus::Rolledback);
737 }
738 res
739 }
740
741 pub fn start_timestamp(&self) -> Timestamp {
743 self.timestamp.clone()
744 }
745
746 #[doc(hidden)]
750 pub async fn send_heart_beat(&mut self) -> Result<u64> {
751 debug!("sending heart_beat");
752 self.check_allow_operation().await?;
753 let primary_key = match self.buffer.get_primary_key() {
754 Some(k) => k,
755 None => return Err(Error::NoPrimaryKey),
756 };
757 let request = new_heart_beat_request(
758 self.timestamp.clone(),
759 primary_key,
760 self.start_instant.elapsed().as_millis() as u64 + MAX_TTL,
761 );
762 let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
763 .resolve_lock(
764 self.timestamp.clone(),
765 self.options.retry_options.lock_backoff.clone(),
766 self.keyspace,
767 )
768 .retry_multi_region(self.options.retry_options.region_backoff.clone())
769 .extract_error()
770 .merge(CollectSingle)
771 .post_process_default()
772 .plan();
773 plan.execute().await
774 }
775
776 async fn scan_inner(
777 &mut self,
778 range: impl Into<BoundRange>,
779 limit: u32,
780 key_only: bool,
781 reverse: bool,
782 ) -> Result<impl Iterator<Item = KvPair>> {
783 self.check_allow_operation().await?;
784 let timestamp = self.timestamp.clone();
785 let rpc = self.rpc.clone();
786 let retry_options = self.options.retry_options.clone();
787 let keyspace = self.keyspace;
788 let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn);
789
790 self.buffer
791 .scan_and_fetch(
792 range,
793 limit,
794 !key_only,
795 reverse,
796 move |new_range, new_limit| async move {
797 let request = new_scan_request(
798 new_range,
799 timestamp.clone(),
800 new_limit,
801 key_only,
802 reverse,
803 );
804 let plan = PlanBuilder::new(rpc, keyspace, request)
805 .resolve_lock(timestamp, retry_options.lock_backoff, keyspace)
806 .retry_multi_region(retry_options.region_backoff)
807 .merge(Collect)
808 .plan();
809 plan.execute()
810 .await
811 .map(|r| r.into_iter().map(Into::into).collect())
812 },
813 )
814 .await
815 .map(move |pairs| pairs.map(move |pair| pair.truncate_keyspace(keyspace)))
816 }
817
818 async fn pessimistic_lock(
828 &mut self,
829 keys: impl IntoIterator<Item = impl PessimisticLock>,
830 need_value: bool,
831 ) -> Result<Vec<KvPair>> {
832 debug!("acquiring pessimistic lock");
833 assert!(
834 matches!(self.options.kind, TransactionKind::Pessimistic(_)),
835 "`pessimistic_lock` is only valid to use with pessimistic transactions"
836 );
837
838 let keys: Vec<_> = keys.into_iter().collect();
839 if keys.is_empty() {
840 return Ok(vec![]);
841 }
842
843 let first_key = keys[0].clone().key();
844 let primary_lock = self
847 .buffer
848 .get_primary_key()
849 .unwrap_or_else(|| first_key.clone());
850 let for_update_ts = self.rpc.clone().get_timestamp().await?;
851 self.options.push_for_update_ts(for_update_ts.clone());
852 let request = new_pessimistic_lock_request(
853 keys.clone().into_iter(),
854 primary_lock,
855 self.timestamp.clone(),
856 MAX_TTL,
857 for_update_ts.clone(),
858 need_value,
859 );
860 let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
861 .resolve_lock(
862 self.timestamp.clone(),
863 self.options.retry_options.lock_backoff.clone(),
864 self.keyspace,
865 )
866 .preserve_shard()
867 .retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone())
868 .merge(CollectWithShard)
869 .plan();
870 let pairs = plan.execute().await;
871
872 if let Err(err) = pairs {
873 match err {
874 Error::PessimisticLockError {
875 inner,
876 success_keys,
877 } if !success_keys.is_empty() => {
878 let keys = success_keys.into_iter().map(Key::from);
879 self.pessimistic_lock_rollback(keys, self.timestamp.clone(), for_update_ts)
880 .await?;
881 Err(*inner)
882 }
883 _ => Err(err),
884 }
885 } else {
886 self.buffer.primary_key_or(&first_key);
888
889 self.start_auto_heartbeat().await;
890
891 for key in keys {
892 self.buffer.lock(key.key());
893 }
894
895 pairs
896 }
897 }
898
899 async fn pessimistic_lock_rollback(
901 &mut self,
902 keys: impl Iterator<Item = Key>,
903 start_version: Timestamp,
904 for_update_ts: Timestamp,
905 ) -> Result<()> {
906 debug!("rollback pessimistic lock");
907
908 let keys: Vec<_> = keys.into_iter().collect();
909 if keys.is_empty() {
910 return Ok(());
911 }
912
913 let req = new_pessimistic_rollback_request(
914 keys.clone().into_iter(),
915 start_version.clone(),
916 for_update_ts,
917 );
918 let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, req)
919 .resolve_lock(
920 start_version,
921 self.options.retry_options.lock_backoff.clone(),
922 self.keyspace,
923 )
924 .retry_multi_region(self.options.retry_options.region_backoff.clone())
925 .extract_error()
926 .plan();
927 plan.execute().await?;
928
929 for key in keys {
930 self.buffer.unlock(&key);
931 }
932 Ok(())
933 }
934
935 async fn check_allow_operation(&self) -> Result<()> {
937 match self.get_status() {
938 TransactionStatus::ReadOnly | TransactionStatus::Active => Ok(()),
939 TransactionStatus::Committed
940 | TransactionStatus::Rolledback
941 | TransactionStatus::StartedCommit
942 | TransactionStatus::StartedRollback
943 | TransactionStatus::Dropped => Err(Error::OperationAfterCommitError),
944 }
945 }
946
947 fn is_pessimistic(&self) -> bool {
948 matches!(self.options.kind, TransactionKind::Pessimistic(_))
949 }
950
951 async fn start_auto_heartbeat(&mut self) {
952 debug!("starting auto_heartbeat");
953 if !self.options.heartbeat_option.is_auto_heartbeat() || self.is_heartbeat_started {
954 return;
955 }
956 self.is_heartbeat_started = true;
957
958 let status = self.status.clone();
959 let primary_key = self
960 .buffer
961 .get_primary_key()
962 .expect("Primary key should exist");
963 let start_ts = self.timestamp.clone();
964 let region_backoff = self.options.retry_options.region_backoff.clone();
965 let rpc = self.rpc.clone();
966 let heartbeat_interval = match self.options.heartbeat_option {
967 HeartbeatOption::NoHeartbeat => DEFAULT_HEARTBEAT_INTERVAL,
968 HeartbeatOption::FixedTime(heartbeat_interval) => heartbeat_interval,
969 };
970 let start_instant = self.start_instant;
971 let keyspace = self.keyspace;
972
973 let heartbeat_task = async move {
974 loop {
975 tokio::time::sleep(heartbeat_interval).await;
976 {
977 let status: TransactionStatus = status.load(atomic::Ordering::Acquire).into();
978 if matches!(
979 status,
980 TransactionStatus::Rolledback
981 | TransactionStatus::Committed
982 | TransactionStatus::Dropped
983 ) {
984 break;
985 }
986 }
987 let request = new_heart_beat_request(
988 start_ts.clone(),
989 primary_key.clone(),
990 start_instant.elapsed().as_millis() as u64 + MAX_TTL,
991 );
992 let plan = PlanBuilder::new(rpc.clone(), keyspace, request)
993 .retry_multi_region(region_backoff.clone())
994 .merge(CollectSingle)
995 .plan();
996 plan.execute().await?;
997 }
998 Ok::<(), Error>(())
999 };
1000
1001 tokio::spawn(async {
1002 if let Err(err) = heartbeat_task.await {
1003 log::error!("Error: While sending heartbeat. {}", err);
1004 }
1005 });
1006 }
1007
1008 fn get_status(&self) -> TransactionStatus {
1009 self.status.load(atomic::Ordering::Acquire).into()
1010 }
1011
1012 fn set_status(&self, status: TransactionStatus) {
1013 self.status.store(status as u8, atomic::Ordering::Release);
1014 }
1015
1016 fn transit_status<F>(&self, check_status: F, next: TransactionStatus) -> bool
1017 where
1018 F: Fn(TransactionStatus) -> bool,
1019 {
1020 let mut current = self.get_status();
1021 while check_status(current) {
1022 if current == next {
1023 return true;
1024 }
1025 match self.status.compare_exchange_weak(
1026 current as u8,
1027 next as u8,
1028 atomic::Ordering::AcqRel,
1029 atomic::Ordering::Acquire,
1030 ) {
1031 Ok(_) => return true,
1032 Err(x) => current = x.into(),
1033 }
1034 }
1035 false
1036 }
1037}
1038
1039impl<PdC: PdClient> Drop for Transaction<PdC> {
1040 fn drop(&mut self) {
1041 debug!("dropping transaction");
1042 if std::thread::panicking() {
1043 return;
1044 }
1045 if self.get_status() == TransactionStatus::Active {
1046 match self.options.check_level {
1047 CheckLevel::Panic => {
1048 panic!("Dropping an active transaction. Consider commit or rollback it.")
1049 }
1050 CheckLevel::Warn => {
1051 warn!("Dropping an active transaction. Consider commit or rollback it.")
1052 }
1053
1054 CheckLevel::None => {}
1055 }
1056 }
1057 self.set_status(TransactionStatus::Dropped);
1058 }
1059}
1060
1061const MAX_TTL: u64 = 20000;
1063const DEFAULT_LOCK_TTL: u64 = 3000;
1065const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_millis(MAX_TTL / 2);
1067pub const TXN_COMMIT_BATCH_SIZE: u64 = 16 * 1024;
1070const TTL_FACTOR: f64 = 6000.0;
1071
1072#[derive(Clone, PartialEq, Debug)]
1074pub enum TransactionKind {
1075 Optimistic,
1076 Pessimistic(Timestamp),
1078}
1079
1080#[derive(Clone, PartialEq, Debug)]
1084pub struct TransactionOptions {
1085 kind: TransactionKind,
1087 try_one_pc: bool,
1089 async_commit: bool,
1091 read_only: bool,
1093 retry_options: RetryOptions,
1095 check_level: CheckLevel,
1097 #[doc(hidden)]
1098 heartbeat_option: HeartbeatOption,
1099}
1100
1101#[derive(Clone, PartialEq, Eq, Debug)]
1102pub enum HeartbeatOption {
1103 NoHeartbeat,
1104 FixedTime(Duration),
1105}
1106
1107impl Default for TransactionOptions {
1108 fn default() -> TransactionOptions {
1109 Self::new_pessimistic()
1110 }
1111}
1112
1113impl TransactionOptions {
1114 pub fn new_optimistic() -> TransactionOptions {
1116 TransactionOptions {
1117 kind: TransactionKind::Optimistic,
1118 try_one_pc: false,
1119 async_commit: false,
1120 read_only: false,
1121 retry_options: RetryOptions::default_optimistic(),
1122 check_level: CheckLevel::Panic,
1123 heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL),
1124 }
1125 }
1126
1127 pub fn new_pessimistic() -> TransactionOptions {
1129 TransactionOptions {
1130 kind: TransactionKind::Pessimistic(Timestamp::from_version(0)),
1131 try_one_pc: false,
1132 async_commit: false,
1133 read_only: false,
1134 retry_options: RetryOptions::default_pessimistic(),
1135 check_level: CheckLevel::Panic,
1136 heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL),
1137 }
1138 }
1139
1140 #[must_use]
1142 pub fn use_async_commit(mut self) -> TransactionOptions {
1143 self.async_commit = true;
1144 self
1145 }
1146
1147 #[must_use]
1149 pub fn try_one_pc(mut self) -> TransactionOptions {
1150 self.try_one_pc = true;
1151 self
1152 }
1153
1154 #[must_use]
1156 pub fn read_only(mut self) -> TransactionOptions {
1157 self.read_only = true;
1158 self
1159 }
1160
1161 #[must_use]
1163 pub fn no_resolve_locks(mut self) -> TransactionOptions {
1164 self.retry_options.lock_backoff = Backoff::no_backoff();
1165 self
1166 }
1167
1168 #[must_use]
1170 pub fn no_resolve_regions(mut self) -> TransactionOptions {
1171 self.retry_options.region_backoff = Backoff::no_backoff();
1172 self
1173 }
1174
1175 #[must_use]
1177 pub fn retry_options(mut self, options: RetryOptions) -> TransactionOptions {
1178 self.retry_options = options;
1179 self
1180 }
1181
1182 #[must_use]
1184 pub fn drop_check(mut self, level: CheckLevel) -> TransactionOptions {
1185 self.check_level = level;
1186 self
1187 }
1188
1189 fn push_for_update_ts(&mut self, for_update_ts: Timestamp) {
1190 match &mut self.kind {
1191 TransactionKind::Optimistic => unreachable!(),
1192 TransactionKind::Pessimistic(old_for_update_ts) => {
1193 self.kind = TransactionKind::Pessimistic(Timestamp::from_version(std::cmp::max(
1194 old_for_update_ts.version(),
1195 for_update_ts.version(),
1196 )));
1197 }
1198 }
1199 }
1200
1201 #[must_use]
1202 pub fn heartbeat_option(mut self, heartbeat_option: HeartbeatOption) -> TransactionOptions {
1203 self.heartbeat_option = heartbeat_option;
1204 self
1205 }
1206
1207 pub fn is_pessimistic(&self) -> bool {
1209 match self.kind {
1210 TransactionKind::Pessimistic(_) => true,
1211 TransactionKind::Optimistic => false,
1212 }
1213 }
1214}
1215
1216#[derive(Clone, Eq, PartialEq, Debug)]
1220pub enum CheckLevel {
1221 Panic,
1226 Warn,
1228 None,
1230}
1231
1232impl HeartbeatOption {
1233 pub fn is_auto_heartbeat(&self) -> bool {
1234 !matches!(self, HeartbeatOption::NoHeartbeat)
1235 }
1236}
1237
1238#[derive(Clone, Eq, PartialEq, Debug)]
1239pub enum Mutation {
1240 Put(Key, Value),
1241 Delete(Key),
1242}
1243
1244impl Mutation {
1245 pub fn key(&self) -> &Key {
1246 match self {
1247 Mutation::Put(key, _) => key,
1248 Mutation::Delete(key) => key,
1249 }
1250 }
1251}
1252
1253#[allow(clippy::too_many_arguments)]
1261#[derive(new)]
1262struct Committer<PdC: PdClient = PdRpcClient> {
1263 primary_key: Option<Key>,
1264 mutations: Vec<kvrpcpb::Mutation>,
1265 start_version: Timestamp,
1266 rpc: Arc<PdC>,
1267 options: TransactionOptions,
1268 keyspace: Keyspace,
1269 #[new(default)]
1270 undetermined: bool,
1271 write_size: u64,
1272 start_instant: Instant,
1273}
1274
1275impl<PdC: PdClient> Committer<PdC> {
1276 async fn commit(mut self) -> Result<Option<Timestamp>> {
1277 debug!("committing");
1278
1279 let min_commit_ts = self.prewrite().await?;
1280
1281 fail_point!("after-prewrite", |_| {
1282 Err(Error::StringError(
1283 "failpoint: after-prewrite return error".to_owned(),
1284 ))
1285 });
1286
1287 if self.options.try_one_pc {
1289 return Ok(min_commit_ts);
1290 }
1291
1292 let commit_ts = if self.options.async_commit {
1293 min_commit_ts.unwrap()
1295 } else {
1296 match self.commit_primary_with_retry().await {
1297 Ok(commit_ts) => commit_ts,
1298 Err(e) => {
1299 return if self.undetermined {
1300 Err(Error::UndeterminedError(Box::new(e)))
1301 } else {
1302 Err(e)
1303 };
1304 }
1305 }
1306 };
1307 tokio::spawn(self.commit_secondary(commit_ts.clone()).map(|res| {
1308 if let Err(e) = res {
1309 log::warn!("Failed to commit secondary keys: {}", e);
1310 }
1311 }));
1312 Ok(Some(commit_ts))
1313 }
1314
1315 async fn prewrite(&mut self) -> Result<Option<Timestamp>> {
1316 debug!("prewriting");
1317 let primary_lock = self.primary_key.clone().unwrap();
1318 let elapsed = self.start_instant.elapsed().as_millis() as u64;
1319 let lock_ttl = self.calc_txn_lock_ttl();
1320 let mut request = match &self.options.kind {
1321 TransactionKind::Optimistic => new_prewrite_request(
1322 self.mutations.clone(),
1323 primary_lock,
1324 self.start_version.clone(),
1325 lock_ttl + elapsed,
1326 ),
1327 TransactionKind::Pessimistic(for_update_ts) => new_pessimistic_prewrite_request(
1328 self.mutations.clone(),
1329 primary_lock,
1330 self.start_version.clone(),
1331 lock_ttl + elapsed,
1332 for_update_ts.clone(),
1333 ),
1334 };
1335
1336 request.use_async_commit = self.options.async_commit;
1337 request.try_one_pc = self.options.try_one_pc;
1338 request.secondaries = self
1339 .mutations
1340 .iter()
1341 .filter(|m| self.primary_key.as_ref().unwrap() != m.key.as_ref())
1342 .map(|m| m.key.clone())
1343 .collect();
1344 let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
1347 .resolve_lock(
1348 self.start_version.clone(),
1349 self.options.retry_options.lock_backoff.clone(),
1350 self.keyspace,
1351 )
1352 .retry_multi_region(self.options.retry_options.region_backoff.clone())
1353 .merge(CollectError)
1354 .extract_error()
1355 .plan();
1356 let response = plan.execute().await?;
1357
1358 if self.options.try_one_pc && response.len() == 1 {
1359 if response[0].one_pc_commit_ts == 0 {
1360 return Err(Error::OnePcFailure);
1361 }
1362
1363 return Ok(Timestamp::try_from_version(response[0].one_pc_commit_ts));
1364 }
1365
1366 self.options.try_one_pc = false;
1367
1368 let min_commit_ts = response
1369 .iter()
1370 .map(|r| {
1371 assert_eq!(r.one_pc_commit_ts, 0);
1372 r.min_commit_ts
1373 })
1374 .max()
1375 .map(Timestamp::from_version);
1376
1377 Ok(min_commit_ts)
1378 }
1379
1380 async fn commit_primary(&mut self) -> Result<Timestamp> {
1382 debug!("committing primary");
1383 let primary_key = self.primary_key.clone().into_iter();
1384 let commit_version = self.rpc.clone().get_timestamp().await?;
1385 let req = new_commit_request(
1386 primary_key,
1387 self.start_version.clone(),
1388 commit_version.clone(),
1389 );
1390 let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, req)
1391 .resolve_lock(
1392 self.start_version.clone(),
1393 self.options.retry_options.lock_backoff.clone(),
1394 self.keyspace,
1395 )
1396 .retry_multi_region(self.options.retry_options.region_backoff.clone())
1397 .extract_error()
1398 .plan();
1399 plan.execute()
1400 .inspect_err(|e| {
1401 debug!(
1402 "commit primary error: {:?}, start_ts: {}",
1403 e,
1404 self.start_version.version()
1405 );
1406 if let Error::Grpc(_) = e {
1410 self.undetermined = true;
1411 }
1412 })
1413 .await?;
1414
1415 Ok(commit_version)
1416 }
1417
1418 async fn commit_primary_with_retry(&mut self) -> Result<Timestamp> {
1419 loop {
1420 match self.commit_primary().await {
1421 Ok(commit_version) => return Ok(commit_version),
1422 Err(Error::ExtractedErrors(mut errors)) => match errors.pop() {
1423 Some(Error::KeyError(key_err)) => {
1424 if let Some(expired) = key_err.commit_ts_expired {
1425 info!("2PC commit_ts rejected by TiKV, retry with a newer commit_ts, start_ts: {}",
1427 self.start_version.version());
1428
1429 let primary_key = self.primary_key.as_ref().unwrap();
1430 if primary_key != expired.key.as_ref() {
1431 error!("2PC commit_ts rejected by TiKV, but the key is not the primary key, start_ts: {}, key: {}, primary: {:?}",
1432 self.start_version.version(), HexRepr(&expired.key), primary_key);
1433 return Err(Error::StringError("2PC commitTS rejected by TiKV, but the key is not the primary key".to_string()));
1434 }
1435
1436 if expired
1439 .min_commit_ts
1440 .saturating_sub(expired.attempted_commit_ts)
1441 > 943718400000
1442 {
1443 let msg = format!("2PC min_commit_ts is too large, we got min_commit_ts: {}, and attempted_commit_ts: {}",
1444 expired.min_commit_ts, expired.attempted_commit_ts);
1445 return Err(Error::StringError(msg));
1446 }
1447 continue;
1448 } else {
1449 return Err(Error::KeyError(key_err));
1450 }
1451 }
1452 Some(err) => return Err(err),
1453 None => unreachable!(),
1454 },
1455 Err(err) => return Err(err),
1456 }
1457 }
1458 }
1459
1460 async fn commit_secondary(self, commit_version: Timestamp) -> Result<()> {
1461 debug!("committing secondary");
1462 let start_version = self.start_version.clone();
1463 let mutations_len = self.mutations.len();
1464 let primary_only = mutations_len == 1;
1465 #[cfg(not(feature = "integration-tests"))]
1466 let mutations = self.mutations.into_iter();
1467
1468 #[cfg(feature = "integration-tests")]
1469 let mutations = self.mutations.into_iter().take({
1470 let fp = || -> Result<usize> {
1473 let mut new_len = mutations_len;
1474 fail_point!("before-commit-secondary", |percent| {
1475 let percent = percent.unwrap().parse::<usize>().unwrap();
1476 new_len = mutations_len * percent / 100;
1477 if new_len == 0 {
1478 Err(Error::StringError(
1479 "failpoint: before-commit-secondary return error".to_owned(),
1480 ))
1481 } else {
1482 debug!(
1483 "failpoint: before-commit-secondary truncate mutation {} -> {}",
1484 mutations_len, new_len
1485 );
1486 Ok(new_len)
1487 }
1488 });
1489 Ok(new_len)
1490 };
1491 fp()?
1492 });
1493
1494 let req = if self.options.async_commit {
1495 let keys = mutations.map(|m| m.key.into());
1496 new_commit_request(keys, start_version.clone(), commit_version)
1497 } else if primary_only {
1498 return Ok(());
1499 } else {
1500 let primary_key = self.primary_key.unwrap();
1501 let keys = mutations
1502 .map(|m| m.key.into())
1503 .filter(|key| &primary_key != key);
1504 new_commit_request(keys, start_version.clone(), commit_version)
1505 };
1506 let plan = PlanBuilder::new(self.rpc, self.keyspace, req)
1507 .resolve_lock(
1508 start_version,
1509 self.options.retry_options.lock_backoff,
1510 self.keyspace,
1511 )
1512 .retry_multi_region(self.options.retry_options.region_backoff)
1513 .extract_error()
1514 .plan();
1515 plan.execute().await?;
1516 Ok(())
1517 }
1518
1519 async fn rollback(self) -> Result<()> {
1520 debug!("rolling back");
1521 if self.options.kind == TransactionKind::Optimistic && self.mutations.is_empty() {
1522 return Ok(());
1523 }
1524 let keys = self
1525 .mutations
1526 .into_iter()
1527 .map(|mutation| mutation.key.into());
1528 let start_version = self.start_version.clone();
1529 match self.options.kind {
1530 TransactionKind::Optimistic => {
1531 let req = new_batch_rollback_request(keys, start_version.clone());
1532 let plan = PlanBuilder::new(self.rpc, self.keyspace, req)
1533 .resolve_lock(
1534 start_version.clone(),
1535 self.options.retry_options.lock_backoff,
1536 self.keyspace,
1537 )
1538 .retry_multi_region(self.options.retry_options.region_backoff)
1539 .extract_error()
1540 .plan();
1541 plan.execute().await?;
1542 }
1543 TransactionKind::Pessimistic(for_update_ts) => {
1544 let req =
1545 new_pessimistic_rollback_request(keys, start_version.clone(), for_update_ts);
1546 let plan = PlanBuilder::new(self.rpc, self.keyspace, req)
1547 .resolve_lock(
1548 start_version.clone(),
1549 self.options.retry_options.lock_backoff,
1550 self.keyspace,
1551 )
1552 .retry_multi_region(self.options.retry_options.region_backoff)
1553 .extract_error()
1554 .plan();
1555 plan.execute().await?;
1556 }
1557 }
1558 Ok(())
1559 }
1560
1561 fn calc_txn_lock_ttl(&mut self) -> u64 {
1562 let mut lock_ttl = DEFAULT_LOCK_TTL;
1563 if self.write_size > TXN_COMMIT_BATCH_SIZE {
1564 let size_mb = self.write_size as f64 / 1024.0 / 1024.0;
1565 lock_ttl = (TTL_FACTOR * size_mb.sqrt()) as u64;
1566 lock_ttl = lock_ttl.clamp(DEFAULT_LOCK_TTL, MAX_TTL);
1567 }
1568 lock_ttl
1569 }
1570}
1571
1572#[derive(PartialEq, Eq, Clone, Copy)]
1573#[repr(u8)]
1574enum TransactionStatus {
1575 ReadOnly = 0,
1577 Active = 1,
1579 Committed = 2,
1581 StartedCommit = 3,
1583 Rolledback = 4,
1585 StartedRollback = 5,
1587 Dropped = 6,
1589}
1590
1591impl From<u8> for TransactionStatus {
1592 fn from(num: u8) -> Self {
1593 match num {
1594 0 => TransactionStatus::ReadOnly,
1595 1 => TransactionStatus::Active,
1596 2 => TransactionStatus::Committed,
1597 3 => TransactionStatus::StartedCommit,
1598 4 => TransactionStatus::Rolledback,
1599 5 => TransactionStatus::StartedRollback,
1600 6 => TransactionStatus::Dropped,
1601 _ => panic!("Unknown transaction status {}", num),
1602 }
1603 }
1604}
1605
1606#[cfg(test)]
1607mod tests {
1608 use std::any::Any;
1609 use std::io;
1610 use std::sync::atomic::AtomicUsize;
1611 use std::sync::atomic::Ordering;
1612 use std::sync::Arc;
1613 use std::time::Duration;
1614
1615 use fail::FailScenario;
1616
1617 use crate::mock::MockKvClient;
1618 use crate::mock::MockPdClient;
1619 use crate::proto::kvrpcpb;
1620 use crate::proto::pdpb::Timestamp;
1621 use crate::request::Keyspace;
1622 use crate::transaction::HeartbeatOption;
1623 use crate::Transaction;
1624 use crate::TransactionOptions;
1625
1626 #[rstest::rstest]
1627 #[case(Keyspace::Disable)]
1628 #[case(Keyspace::Enable { keyspace_id: 0 })]
1629 #[tokio::test]
1630 async fn test_optimistic_heartbeat(#[case] keyspace: Keyspace) -> Result<(), io::Error> {
1631 let scenario = FailScenario::setup();
1632 fail::cfg("after-prewrite", "sleep(1500)").unwrap();
1633 let heartbeats = Arc::new(AtomicUsize::new(0));
1634 let heartbeats_cloned = heartbeats.clone();
1635 let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
1636 move |req: &dyn Any| {
1637 if req.downcast_ref::<kvrpcpb::TxnHeartBeatRequest>().is_some() {
1638 heartbeats_cloned.fetch_add(1, Ordering::SeqCst);
1639 Ok(Box::<kvrpcpb::TxnHeartBeatResponse>::default() as Box<dyn Any>)
1640 } else if req.downcast_ref::<kvrpcpb::PrewriteRequest>().is_some() {
1641 Ok(Box::<kvrpcpb::PrewriteResponse>::default() as Box<dyn Any>)
1642 } else {
1643 Ok(Box::<kvrpcpb::CommitResponse>::default() as Box<dyn Any>)
1644 }
1645 },
1646 )));
1647 let key1 = "key1".to_owned();
1648 let mut heartbeat_txn = Transaction::new(
1649 Timestamp::default(),
1650 pd_client,
1651 TransactionOptions::new_optimistic()
1652 .heartbeat_option(HeartbeatOption::FixedTime(Duration::from_secs(1))),
1653 keyspace,
1654 );
1655 heartbeat_txn.put(key1.clone(), "foo").await.unwrap();
1656 let heartbeat_txn_handle = tokio::task::spawn_blocking(move || {
1657 assert!(futures::executor::block_on(heartbeat_txn.commit()).is_ok())
1658 });
1659 assert_eq!(heartbeats.load(Ordering::SeqCst), 0);
1660 heartbeat_txn_handle.await.unwrap();
1661 assert_eq!(heartbeats.load(Ordering::SeqCst), 1);
1662 scenario.teardown();
1663 Ok(())
1664 }
1665
1666 #[rstest::rstest]
1667 #[case(Keyspace::Disable)]
1668 #[case(Keyspace::Enable { keyspace_id: 0 })]
1669 #[tokio::test]
1670 async fn test_pessimistic_heartbeat(#[case] keyspace: Keyspace) -> Result<(), io::Error> {
1671 let heartbeats = Arc::new(AtomicUsize::new(0));
1672 let heartbeats_cloned = heartbeats.clone();
1673 let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
1674 move |req: &dyn Any| {
1675 if req.downcast_ref::<kvrpcpb::TxnHeartBeatRequest>().is_some() {
1676 heartbeats_cloned.fetch_add(1, Ordering::SeqCst);
1677 Ok(Box::<kvrpcpb::TxnHeartBeatResponse>::default() as Box<dyn Any>)
1678 } else if req.downcast_ref::<kvrpcpb::PrewriteRequest>().is_some() {
1679 Ok(Box::<kvrpcpb::PrewriteResponse>::default() as Box<dyn Any>)
1680 } else if req
1681 .downcast_ref::<kvrpcpb::PessimisticLockRequest>()
1682 .is_some()
1683 {
1684 Ok(Box::<kvrpcpb::PessimisticLockResponse>::default() as Box<dyn Any>)
1685 } else {
1686 Ok(Box::<kvrpcpb::CommitResponse>::default() as Box<dyn Any>)
1687 }
1688 },
1689 )));
1690 let key1 = "key1".to_owned();
1691 let mut heartbeat_txn = Transaction::new(
1692 Timestamp::default(),
1693 pd_client,
1694 TransactionOptions::new_pessimistic()
1695 .heartbeat_option(HeartbeatOption::FixedTime(Duration::from_secs(1))),
1696 keyspace,
1697 );
1698 heartbeat_txn.put(key1.clone(), "foo").await.unwrap();
1699 assert_eq!(heartbeats.load(Ordering::SeqCst), 0);
1700 tokio::time::sleep(tokio::time::Duration::from_millis(1500)).await;
1701 assert_eq!(heartbeats.load(Ordering::SeqCst), 1);
1702 let heartbeat_txn_handle = tokio::spawn(async move {
1703 assert!(heartbeat_txn.commit().await.is_ok());
1704 });
1705 heartbeat_txn_handle.await.unwrap();
1706 Ok(())
1707 }
1708}