1use std::ops::{Bound, Range};
4
5use serde_derive::{Deserialize, Serialize};
6
7use crate::transaction::Mutation;
8use crate::{proto::kvrpcpb, Key};
9use crate::{BoundRange, KvPair};
10
11pub const RAW_KEY_PREFIX: u8 = b'r';
12pub const TXN_KEY_PREFIX: u8 = b'x';
13pub const KEYSPACE_PREFIX_LEN: usize = 4;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
16#[non_exhaustive]
17pub enum Keyspace {
18 Disable,
19 Enable {
20 keyspace_id: u32,
21 },
22 ApiV2NoPrefix,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
31pub enum KeyMode {
32 Raw,
33 Txn,
34}
35
36impl Keyspace {
37 pub fn api_version(&self) -> kvrpcpb::ApiVersion {
38 match self {
39 Keyspace::Disable => kvrpcpb::ApiVersion::V1,
40 Keyspace::Enable { .. } => kvrpcpb::ApiVersion::V2,
41 Keyspace::ApiV2NoPrefix => kvrpcpb::ApiVersion::V2,
42 }
43 }
44}
45
46pub trait EncodeKeyspace {
47 fn encode_keyspace(self, keyspace: Keyspace, key_mode: KeyMode) -> Self;
48}
49
50pub trait TruncateKeyspace {
51 fn truncate_keyspace(self, keyspace: Keyspace) -> Self;
52}
53
54impl EncodeKeyspace for Key {
55 fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self {
56 let Keyspace::Enable { keyspace_id } = keyspace else {
57 return self;
58 };
59 let prefix = keyspace_prefix(keyspace_id, key_mode);
60
61 prepend_bytes(&mut self.0, &prefix);
62
63 self
64 }
65}
66
67impl EncodeKeyspace for KvPair {
68 fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self {
69 self.0 = self.0.encode_keyspace(keyspace, key_mode);
70 self
71 }
72}
73
74impl EncodeKeyspace for BoundRange {
75 fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self {
76 self.from = match self.from {
77 Bound::Included(key) => Bound::Included(key.encode_keyspace(keyspace, key_mode)),
78 Bound::Excluded(key) => Bound::Excluded(key.encode_keyspace(keyspace, key_mode)),
79 Bound::Unbounded => Bound::Included(Key::EMPTY.encode_keyspace(keyspace, key_mode)),
80 };
81
82 self.to = match self.to {
83 Bound::Included(key) if !key.is_empty() => {
84 Bound::Included(key.encode_keyspace(keyspace, key_mode))
85 }
86 Bound::Excluded(key) if !key.is_empty() => {
87 Bound::Excluded(key.encode_keyspace(keyspace, key_mode))
88 }
89 _ => match keyspace {
90 Keyspace::Enable { keyspace_id } => Bound::Excluded(Key::EMPTY.encode_keyspace(
91 Keyspace::Enable {
92 keyspace_id: keyspace_id + 1,
93 },
94 key_mode,
95 )),
96 _ => Bound::Excluded(Key::EMPTY),
97 },
98 };
99 self
100 }
101}
102
103impl EncodeKeyspace for Mutation {
104 fn encode_keyspace(self, keyspace: Keyspace, key_mode: KeyMode) -> Self {
105 match self {
106 Mutation::Put(key, val) => Mutation::Put(key.encode_keyspace(keyspace, key_mode), val),
107 Mutation::Delete(key) => Mutation::Delete(key.encode_keyspace(keyspace, key_mode)),
108 }
109 }
110}
111
112impl TruncateKeyspace for Key {
113 fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
114 if !matches!(keyspace, Keyspace::Enable { .. }) {
115 return self;
116 }
117
118 pretruncate_bytes::<KEYSPACE_PREFIX_LEN>(&mut self.0);
119
120 self
121 }
122}
123
124impl TruncateKeyspace for KvPair {
125 fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
126 self.0 = self.0.truncate_keyspace(keyspace);
127 self
128 }
129}
130
131impl TruncateKeyspace for Range<Key> {
132 fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
133 self.start = self.start.truncate_keyspace(keyspace);
134 self.end = self.end.truncate_keyspace(keyspace);
135 self
136 }
137}
138
139impl TruncateKeyspace for Vec<Range<Key>> {
140 fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
141 if !matches!(keyspace, Keyspace::Enable { .. }) {
142 return self;
143 }
144 for range in &mut self {
145 take_mut::take(range, |range| range.truncate_keyspace(keyspace));
146 }
147 self
148 }
149}
150
151impl TruncateKeyspace for Vec<KvPair> {
152 fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
153 if !matches!(keyspace, Keyspace::Enable { .. }) {
154 return self;
155 }
156 for pair in &mut self {
157 take_mut::take(pair, |pair| pair.truncate_keyspace(keyspace));
158 }
159 self
160 }
161}
162
163impl TruncateKeyspace for Vec<crate::proto::kvrpcpb::LockInfo> {
164 fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
165 if !matches!(keyspace, Keyspace::Enable { .. }) {
166 return self;
167 }
168 for lock in &mut self {
169 take_mut::take(&mut lock.key, |key| {
170 Key::from(key).truncate_keyspace(keyspace).into()
171 });
172 take_mut::take(&mut lock.primary_lock, |primary| {
173 Key::from(primary).truncate_keyspace(keyspace).into()
174 });
175 for secondary in lock.secondaries.iter_mut() {
176 take_mut::take(secondary, |secondary| {
177 Key::from(secondary).truncate_keyspace(keyspace).into()
178 });
179 }
180 }
181 self
182 }
183}
184
185impl EncodeKeyspace for Vec<crate::proto::kvrpcpb::LockInfo> {
186 fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self {
187 if !matches!(keyspace, Keyspace::Enable { .. }) {
188 return self;
189 }
190 for lock in &mut self {
191 take_mut::take(&mut lock.key, |key| {
192 Key::from(key).encode_keyspace(keyspace, key_mode).into()
193 });
194 take_mut::take(&mut lock.primary_lock, |primary| {
195 Key::from(primary)
196 .encode_keyspace(keyspace, key_mode)
197 .into()
198 });
199 for secondary in lock.secondaries.iter_mut() {
200 take_mut::take(secondary, |secondary| {
201 Key::from(secondary)
202 .encode_keyspace(keyspace, key_mode)
203 .into()
204 });
205 }
206 }
207 self
208 }
209}
210
211fn keyspace_prefix(keyspace_id: u32, key_mode: KeyMode) -> [u8; KEYSPACE_PREFIX_LEN] {
212 let mut prefix = keyspace_id.to_be_bytes();
213 prefix[0] = match key_mode {
214 KeyMode::Raw => RAW_KEY_PREFIX,
215 KeyMode::Txn => TXN_KEY_PREFIX,
216 };
217 prefix
218}
219
220fn prepend_bytes<const N: usize>(vec: &mut Vec<u8>, prefix: &[u8; N]) {
221 unsafe {
222 vec.reserve_exact(N);
223 std::ptr::copy(vec.as_ptr(), vec.as_mut_ptr().add(N), vec.len());
224 std::ptr::copy_nonoverlapping(prefix.as_ptr(), vec.as_mut_ptr(), N);
225 vec.set_len(vec.len() + N);
226 }
227}
228
229fn pretruncate_bytes<const N: usize>(vec: &mut Vec<u8>) {
230 assert!(vec.len() >= N);
231 unsafe {
232 std::ptr::copy(vec.as_ptr().add(N), vec.as_mut_ptr(), vec.len() - N);
233 vec.set_len(vec.len() - N);
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240
241 #[test]
242 fn test_keyspace_prefix() {
243 let key_mode = KeyMode::Raw;
244 assert_eq!(keyspace_prefix(0, key_mode), [b'r', 0, 0, 0]);
245 assert_eq!(keyspace_prefix(1, key_mode), [b'r', 0, 0, 1]);
246 assert_eq!(keyspace_prefix(0xFFFF, key_mode), [b'r', 0, 0xFF, 0xFF]);
247
248 let key_mode = KeyMode::Txn;
249 assert_eq!(keyspace_prefix(0, key_mode), [b'x', 0, 0, 0]);
250 assert_eq!(keyspace_prefix(1, key_mode), [b'x', 0, 0, 1]);
251 assert_eq!(keyspace_prefix(0xFFFF, key_mode), [b'x', 0, 0xFF, 0xFF]);
252 }
253
254 #[test]
255 fn test_encode_version() {
256 let keyspace = Keyspace::Enable {
257 keyspace_id: 0xDEAD,
258 };
259 let key_mode = KeyMode::Raw;
260
261 let key = Key::from(vec![0xBE, 0xEF]);
262 let expected_key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]);
263 assert_eq!(key.encode_keyspace(keyspace, key_mode), expected_key);
264
265 let bound: BoundRange = (Key::from(vec![0xDE, 0xAD])..Key::from(vec![0xBE, 0xEF])).into();
266 let expected_bound: BoundRange = (Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xDE, 0xAD])
267 ..Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]))
268 .into();
269 assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound);
270
271 let bound: BoundRange = (..).into();
272 let expected_bound: BoundRange =
273 (Key::from(vec![b'r', 0, 0xDE, 0xAD])..Key::from(vec![b'r', 0, 0xDE, 0xAE])).into();
274 assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound);
275
276 let bound: BoundRange = (Key::from(vec![])..Key::from(vec![])).into();
277 let expected_bound: BoundRange =
278 (Key::from(vec![b'r', 0, 0xDE, 0xAD])..Key::from(vec![b'r', 0, 0xDE, 0xAE])).into();
279 assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound);
280
281 let bound: BoundRange = (Key::from(vec![])..=Key::from(vec![])).into();
282 let expected_bound: BoundRange =
283 (Key::from(vec![b'r', 0, 0xDE, 0xAD])..Key::from(vec![b'r', 0, 0xDE, 0xAE])).into();
284 assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound);
285
286 let mutation = Mutation::Put(Key::from(vec![0xBE, 0xEF]), vec![4, 5, 6]);
287 let expected_mutation = Mutation::Put(
288 Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]),
289 vec![4, 5, 6],
290 );
291 assert_eq!(
292 mutation.encode_keyspace(keyspace, key_mode),
293 expected_mutation
294 );
295
296 let mutation = Mutation::Delete(Key::from(vec![0xBE, 0xEF]));
297 let expected_mutation = Mutation::Delete(Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]));
298 assert_eq!(
299 mutation.encode_keyspace(keyspace, key_mode),
300 expected_mutation
301 );
302
303 let key_mode = KeyMode::Txn;
304 let lock = crate::proto::kvrpcpb::LockInfo {
305 key: vec![b'k', b'1'],
306 primary_lock: vec![b'p', b'1'],
307 secondaries: vec![vec![b's', b'1'], vec![b's', b'2']],
308 ..Default::default()
309 };
310 let locks = vec![lock].encode_keyspace(keyspace, key_mode);
311 assert_eq!(locks.len(), 1);
312 assert_eq!(locks[0].key, vec![b'x', 0, 0xDE, 0xAD, b'k', b'1']);
313 assert_eq!(locks[0].primary_lock, vec![b'x', 0, 0xDE, 0xAD, b'p', b'1']);
314 assert_eq!(
315 locks[0].secondaries,
316 vec![
317 vec![b'x', 0, 0xDE, 0xAD, b's', b'1'],
318 vec![b'x', 0, 0xDE, 0xAD, b's', b'2']
319 ]
320 );
321 }
322
323 #[test]
324 fn test_truncate_version() {
325 let keyspace = Keyspace::Enable {
326 keyspace_id: 0xDEAD,
327 };
328
329 let key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]);
330 let expected_key = Key::from(vec![0xBE, 0xEF]);
331 assert_eq!(key.truncate_keyspace(keyspace), expected_key);
332
333 let key = Key::from(vec![b'x', 0, 0xDE, 0xAD, 0xBE, 0xEF]);
334 let expected_key = Key::from(vec![0xBE, 0xEF]);
335 assert_eq!(key.truncate_keyspace(keyspace), expected_key);
336
337 let pair = KvPair(Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k']), vec![b'v']);
338 let expected_pair = KvPair(Key::from(vec![b'k']), vec![b'v']);
339 assert_eq!(pair.truncate_keyspace(keyspace), expected_pair);
340
341 let range = Range {
342 start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'a']),
343 end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'b']),
344 };
345 let expected_range = Range {
346 start: Key::from(vec![b'a']),
347 end: Key::from(vec![b'b']),
348 };
349 assert_eq!(range.truncate_keyspace(keyspace), expected_range);
350
351 let ranges = vec![
352 Range {
353 start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'a']),
354 end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'b']),
355 },
356 Range {
357 start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'c']),
358 end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'd']),
359 },
360 ];
361 let expected_ranges = vec![
362 Range {
363 start: Key::from(vec![b'a']),
364 end: Key::from(vec![b'b']),
365 },
366 Range {
367 start: Key::from(vec![b'c']),
368 end: Key::from(vec![b'd']),
369 },
370 ];
371 assert_eq!(ranges.truncate_keyspace(keyspace), expected_ranges);
372
373 let pairs = vec![
374 KvPair(Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k']), vec![b'v']),
375 KvPair(
376 Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k', b'2']),
377 vec![b'v', b'2'],
378 ),
379 ];
380 let expected_pairs = vec![
381 KvPair(Key::from(vec![b'k']), vec![b'v']),
382 KvPair(Key::from(vec![b'k', b'2']), vec![b'v', b'2']),
383 ];
384 assert_eq!(pairs.truncate_keyspace(keyspace), expected_pairs);
385
386 let lock = crate::proto::kvrpcpb::LockInfo {
387 key: vec![b'x', 0, 0xDE, 0xAD, b'k'],
388 primary_lock: vec![b'x', 0, 0xDE, 0xAD, b'p'],
389 secondaries: vec![vec![b'x', 0, 0xDE, 0xAD, b's']],
390 ..Default::default()
391 };
392 let expected_lock = crate::proto::kvrpcpb::LockInfo {
393 key: vec![b'k'],
394 primary_lock: vec![b'p'],
395 secondaries: vec![vec![b's']],
396 ..Default::default()
397 };
398 assert_eq!(vec![lock].truncate_keyspace(keyspace), vec![expected_lock]);
399 }
400
401 #[test]
402 fn test_apiv2_no_prefix_api_version() {
403 assert_eq!(
404 Keyspace::ApiV2NoPrefix.api_version(),
405 kvrpcpb::ApiVersion::V2
406 );
407 }
408
409 #[test]
410 fn test_apiv2_no_prefix_encode_is_noop() {
411 let keyspace = Keyspace::ApiV2NoPrefix;
412 let key_mode = KeyMode::Txn;
413
414 let key = Key::from(vec![b'x', 0, 0, 0, b'k']);
415 assert_eq!(key.clone().encode_keyspace(keyspace, key_mode), key);
416
417 let pair = KvPair(Key::from(vec![b'x', 0, 0, 0, b'k']), vec![b'v']);
418 assert_eq!(pair.clone().encode_keyspace(keyspace, key_mode), pair);
419
420 let bound: BoundRange =
421 (Key::from(vec![b'x', 0, 0, 0, b'a'])..Key::from(vec![b'x', 0, 0, 0, b'b'])).into();
422 assert_eq!(bound.clone().encode_keyspace(keyspace, key_mode), bound);
423
424 let mutation = Mutation::Put(Key::from(vec![b'x', 0, 0, 0, b'k']), vec![1, 2, 3]);
425 assert_eq!(
426 mutation.clone().encode_keyspace(keyspace, key_mode),
427 mutation
428 );
429
430 let lock = crate::proto::kvrpcpb::LockInfo {
431 key: vec![b'x', 0, 0, 0, b'k'],
432 primary_lock: vec![b'x', 0, 0, 0, b'p'],
433 secondaries: vec![vec![b'x', 0, 0, 0, b's']],
434 ..Default::default()
435 };
436 let locks = vec![lock];
437 assert_eq!(locks.clone().encode_keyspace(keyspace, key_mode), locks);
438
439 let lock = crate::proto::kvrpcpb::LockInfo {
440 key: vec![b'k', b'1'],
441 primary_lock: vec![b'p', b'1'],
442 secondaries: vec![vec![b's', b'1']],
443 ..Default::default()
444 };
445 let locks = vec![lock.clone()];
446 assert_eq!(
447 locks.clone().encode_keyspace(Keyspace::Disable, key_mode),
448 locks
449 );
450 }
451
452 #[test]
453 fn test_apiv2_no_prefix_truncate_is_noop() {
454 let keyspace = Keyspace::ApiV2NoPrefix;
455
456 let key = Key::from(vec![b'x', 0, 0, 0, b'k']);
457 assert_eq!(key.clone().truncate_keyspace(keyspace), key);
458
459 let pair = KvPair(Key::from(vec![b'x', 0, 0, 0, b'k']), vec![b'v']);
460 assert_eq!(pair.clone().truncate_keyspace(keyspace), pair);
461
462 let range = Range {
463 start: Key::from(vec![b'x', 0, 0, 0, b'a']),
464 end: Key::from(vec![b'x', 0, 0, 0, b'b']),
465 };
466 assert_eq!(range.clone().truncate_keyspace(keyspace), range);
467
468 let pairs = vec![pair];
469 assert_eq!(pairs.clone().truncate_keyspace(keyspace), pairs);
470
471 let lock = crate::proto::kvrpcpb::LockInfo {
472 key: vec![b'x', 0, 0, 0, b'k'],
473 primary_lock: vec![b'x', 0, 0, 0, b'p'],
474 secondaries: vec![vec![b'x', 0, 0, 0, b's']],
475 ..Default::default()
476 };
477 let locks = vec![lock];
478 assert_eq!(locks.clone().truncate_keyspace(keyspace), locks);
479 }
480}