tikv_client/transaction/
lowering.rs1use std::iter::Iterator;
4
5use crate::proto::kvrpcpb;
6use crate::proto::pdpb::Timestamp;
7use crate::timestamp::TimestampExt;
11use crate::transaction::requests;
15use crate::BoundRange;
19use crate::Key;
23
24pub fn new_get_request(key: Key, timestamp: Timestamp) -> kvrpcpb::GetRequest {
25 requests::new_get_request(key.into(), timestamp.version())
26}
27
28pub fn new_batch_get_request(
29 keys: impl Iterator<Item = Key>,
30 timestamp: Timestamp,
31) -> kvrpcpb::BatchGetRequest {
32 requests::new_batch_get_request(keys.map(Into::into).collect(), timestamp.version())
33}
34
35pub fn new_scan_request(
36 range: BoundRange,
37 timestamp: Timestamp,
38 limit: u32,
39 key_only: bool,
40 reverse: bool,
41) -> kvrpcpb::ScanRequest {
42 let (start_key, end_key) = range.into_keys();
43 requests::new_scan_request(
44 start_key.into(),
45 end_key.unwrap_or_default().into(),
46 timestamp.version(),
47 limit,
48 key_only,
49 reverse,
50 )
51}
52
53pub fn new_prewrite_request(
54 mutations: Vec<kvrpcpb::Mutation>,
55 primary_lock: Key,
56 start_version: Timestamp,
57 lock_ttl: u64,
58) -> kvrpcpb::PrewriteRequest {
59 requests::new_prewrite_request(
60 mutations,
61 primary_lock.into(),
62 start_version.version(),
63 lock_ttl,
64 )
65}
66
67pub fn new_pessimistic_prewrite_request(
68 mutations: Vec<kvrpcpb::Mutation>,
69 primary_lock: Key,
70 start_version: Timestamp,
71 lock_ttl: u64,
72 for_update_ts: Timestamp,
73) -> kvrpcpb::PrewriteRequest {
74 requests::new_pessimistic_prewrite_request(
75 mutations,
76 primary_lock.into(),
77 start_version.version(),
78 lock_ttl,
79 for_update_ts.version(),
80 )
81}
82
83pub fn new_commit_request(
84 keys: impl Iterator<Item = Key>,
85 start_version: Timestamp,
86 commit_version: Timestamp,
87) -> kvrpcpb::CommitRequest {
88 requests::new_commit_request(
89 keys.map(Into::into).collect(),
90 start_version.version(),
91 commit_version.version(),
92 )
93}
94
95pub fn new_batch_rollback_request(
96 keys: impl Iterator<Item = Key>,
97 start_version: Timestamp,
98) -> kvrpcpb::BatchRollbackRequest {
99 requests::new_batch_rollback_request(keys.map(Into::into).collect(), start_version.version())
100}
101
102pub fn new_pessimistic_rollback_request(
103 keys: impl Iterator<Item = Key>,
104 start_version: Timestamp,
105 for_update_ts: Timestamp,
106) -> kvrpcpb::PessimisticRollbackRequest {
107 requests::new_pessimistic_rollback_request(
108 keys.map(Into::into).collect(),
109 start_version.version(),
110 for_update_ts.version(),
111 )
112}
113
114pub trait PessimisticLock: Clone {
115 fn key(self) -> Key;
116
117 fn assertion(&self) -> kvrpcpb::Assertion;
118}
119
120impl PessimisticLock for Key {
121 fn key(self) -> Key {
122 self
123 }
124
125 fn assertion(&self) -> kvrpcpb::Assertion {
126 kvrpcpb::Assertion::None
127 }
128}
129
130impl PessimisticLock for (Key, kvrpcpb::Assertion) {
131 fn key(self) -> Key {
132 self.0
133 }
134
135 fn assertion(&self) -> kvrpcpb::Assertion {
136 self.1
137 }
138}
139
140pub fn new_pessimistic_lock_request(
141 locks: impl Iterator<Item = impl PessimisticLock>,
142 primary_lock: Key,
143 start_version: Timestamp,
144 lock_ttl: u64,
145 for_update_ts: Timestamp,
146 need_value: bool,
147) -> kvrpcpb::PessimisticLockRequest {
148 requests::new_pessimistic_lock_request(
149 locks
150 .map(|pl| {
151 let mut mutation = kvrpcpb::Mutation::default();
152 mutation.op = kvrpcpb::Op::PessimisticLock.into();
153 mutation.assertion = pl.assertion().into();
154 mutation.key = pl.key().into();
155 mutation
156 })
157 .collect(),
158 primary_lock.into(),
159 start_version.version(),
160 lock_ttl,
161 for_update_ts.version(),
162 need_value,
163 )
164}
165
166pub fn new_scan_lock_request(
167 range: BoundRange,
168 safepoint: &Timestamp,
169 limit: u32,
170) -> kvrpcpb::ScanLockRequest {
171 let (start_key, end_key) = range.into_keys();
172 requests::new_scan_lock_request(
173 start_key.into(),
174 end_key.unwrap_or_default().into(),
175 safepoint.version(),
176 limit,
177 )
178}
179
180pub fn new_heart_beat_request(
181 start_ts: Timestamp,
182 primary_lock: Key,
183 ttl: u64,
184) -> kvrpcpb::TxnHeartBeatRequest {
185 requests::new_heart_beat_request(start_ts.version(), primary_lock.into(), ttl)
186}
187
188pub fn new_unsafe_destroy_range_request(range: BoundRange) -> kvrpcpb::UnsafeDestroyRangeRequest {
189 let (start_key, end_key) = range.into_keys();
190 requests::new_unsafe_destroy_range_request(start_key.into(), end_key.unwrap_or_default().into())
191}