Skip to main content

tikv_client/request/
keyspace.rs

1// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.
2
3use std::ops::{Bound, Range};
4
5use serde_derive::{Deserialize, Serialize};
6
7use crate::transaction::Mutation;
8use crate::{proto::kvrpcpb, Key};
9use crate::{BoundRange, KvPair};
10
11pub const RAW_KEY_PREFIX: u8 = b'r';
12pub const TXN_KEY_PREFIX: u8 = b'x';
13pub const KEYSPACE_PREFIX_LEN: usize = 4;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
16#[non_exhaustive]
17pub enum Keyspace {
18    Disable,
19    Enable {
20        keyspace_id: u32,
21    },
22    /// Use API V2 without adding or removing the API V2 keyspace/key-mode prefix.
23    ///
24    /// This mode is intended for **server-side embedding** use cases (e.g. embedding this client in
25    /// `tikv-server`) where keys are already in API V2 "logical key bytes" form and must be passed
26    /// through unchanged.
27    ApiV2NoPrefix,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
31pub enum KeyMode {
32    Raw,
33    Txn,
34}
35
36impl Keyspace {
37    pub fn api_version(&self) -> kvrpcpb::ApiVersion {
38        match self {
39            Keyspace::Disable => kvrpcpb::ApiVersion::V1,
40            Keyspace::Enable { .. } => kvrpcpb::ApiVersion::V2,
41            Keyspace::ApiV2NoPrefix => kvrpcpb::ApiVersion::V2,
42        }
43    }
44}
45
46pub trait EncodeKeyspace {
47    fn encode_keyspace(self, keyspace: Keyspace, key_mode: KeyMode) -> Self;
48}
49
50pub trait TruncateKeyspace {
51    fn truncate_keyspace(self, keyspace: Keyspace) -> Self;
52}
53
54impl EncodeKeyspace for Key {
55    fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self {
56        let Keyspace::Enable { keyspace_id } = keyspace else {
57            return self;
58        };
59        let prefix = keyspace_prefix(keyspace_id, key_mode);
60
61        prepend_bytes(&mut self.0, &prefix);
62
63        self
64    }
65}
66
67impl EncodeKeyspace for KvPair {
68    fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self {
69        self.0 = self.0.encode_keyspace(keyspace, key_mode);
70        self
71    }
72}
73
74impl EncodeKeyspace for BoundRange {
75    fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self {
76        self.from = match self.from {
77            Bound::Included(key) => Bound::Included(key.encode_keyspace(keyspace, key_mode)),
78            Bound::Excluded(key) => Bound::Excluded(key.encode_keyspace(keyspace, key_mode)),
79            Bound::Unbounded => Bound::Included(Key::EMPTY.encode_keyspace(keyspace, key_mode)),
80        };
81
82        self.to = match self.to {
83            Bound::Included(key) if !key.is_empty() => {
84                Bound::Included(key.encode_keyspace(keyspace, key_mode))
85            }
86            Bound::Excluded(key) if !key.is_empty() => {
87                Bound::Excluded(key.encode_keyspace(keyspace, key_mode))
88            }
89            _ => match keyspace {
90                Keyspace::Enable { keyspace_id } => Bound::Excluded(Key::EMPTY.encode_keyspace(
91                    Keyspace::Enable {
92                        keyspace_id: keyspace_id + 1,
93                    },
94                    key_mode,
95                )),
96                _ => Bound::Excluded(Key::EMPTY),
97            },
98        };
99        self
100    }
101}
102
103impl EncodeKeyspace for Mutation {
104    fn encode_keyspace(self, keyspace: Keyspace, key_mode: KeyMode) -> Self {
105        match self {
106            Mutation::Put(key, val) => Mutation::Put(key.encode_keyspace(keyspace, key_mode), val),
107            Mutation::Delete(key) => Mutation::Delete(key.encode_keyspace(keyspace, key_mode)),
108        }
109    }
110}
111
112impl TruncateKeyspace for Key {
113    fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
114        if !matches!(keyspace, Keyspace::Enable { .. }) {
115            return self;
116        }
117
118        pretruncate_bytes::<KEYSPACE_PREFIX_LEN>(&mut self.0);
119
120        self
121    }
122}
123
124impl TruncateKeyspace for KvPair {
125    fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
126        self.0 = self.0.truncate_keyspace(keyspace);
127        self
128    }
129}
130
131impl TruncateKeyspace for Range<Key> {
132    fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
133        self.start = self.start.truncate_keyspace(keyspace);
134        self.end = self.end.truncate_keyspace(keyspace);
135        self
136    }
137}
138
139impl TruncateKeyspace for Vec<Range<Key>> {
140    fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
141        if !matches!(keyspace, Keyspace::Enable { .. }) {
142            return self;
143        }
144        for range in &mut self {
145            take_mut::take(range, |range| range.truncate_keyspace(keyspace));
146        }
147        self
148    }
149}
150
151impl TruncateKeyspace for Vec<KvPair> {
152    fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
153        if !matches!(keyspace, Keyspace::Enable { .. }) {
154            return self;
155        }
156        for pair in &mut self {
157            take_mut::take(pair, |pair| pair.truncate_keyspace(keyspace));
158        }
159        self
160    }
161}
162
163impl TruncateKeyspace for Vec<crate::proto::kvrpcpb::LockInfo> {
164    fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self {
165        if !matches!(keyspace, Keyspace::Enable { .. }) {
166            return self;
167        }
168        for lock in &mut self {
169            take_mut::take(&mut lock.key, |key| {
170                Key::from(key).truncate_keyspace(keyspace).into()
171            });
172            take_mut::take(&mut lock.primary_lock, |primary| {
173                Key::from(primary).truncate_keyspace(keyspace).into()
174            });
175            for secondary in lock.secondaries.iter_mut() {
176                take_mut::take(secondary, |secondary| {
177                    Key::from(secondary).truncate_keyspace(keyspace).into()
178                });
179            }
180        }
181        self
182    }
183}
184
185impl EncodeKeyspace for Vec<crate::proto::kvrpcpb::LockInfo> {
186    fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self {
187        if !matches!(keyspace, Keyspace::Enable { .. }) {
188            return self;
189        }
190        for lock in &mut self {
191            take_mut::take(&mut lock.key, |key| {
192                Key::from(key).encode_keyspace(keyspace, key_mode).into()
193            });
194            take_mut::take(&mut lock.primary_lock, |primary| {
195                Key::from(primary)
196                    .encode_keyspace(keyspace, key_mode)
197                    .into()
198            });
199            for secondary in lock.secondaries.iter_mut() {
200                take_mut::take(secondary, |secondary| {
201                    Key::from(secondary)
202                        .encode_keyspace(keyspace, key_mode)
203                        .into()
204                });
205            }
206        }
207        self
208    }
209}
210
211fn keyspace_prefix(keyspace_id: u32, key_mode: KeyMode) -> [u8; KEYSPACE_PREFIX_LEN] {
212    let mut prefix = keyspace_id.to_be_bytes();
213    prefix[0] = match key_mode {
214        KeyMode::Raw => RAW_KEY_PREFIX,
215        KeyMode::Txn => TXN_KEY_PREFIX,
216    };
217    prefix
218}
219
220fn prepend_bytes<const N: usize>(vec: &mut Vec<u8>, prefix: &[u8; N]) {
221    unsafe {
222        vec.reserve_exact(N);
223        std::ptr::copy(vec.as_ptr(), vec.as_mut_ptr().add(N), vec.len());
224        std::ptr::copy_nonoverlapping(prefix.as_ptr(), vec.as_mut_ptr(), N);
225        vec.set_len(vec.len() + N);
226    }
227}
228
229fn pretruncate_bytes<const N: usize>(vec: &mut Vec<u8>) {
230    assert!(vec.len() >= N);
231    unsafe {
232        std::ptr::copy(vec.as_ptr().add(N), vec.as_mut_ptr(), vec.len() - N);
233        vec.set_len(vec.len() - N);
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240
241    #[test]
242    fn test_keyspace_prefix() {
243        let key_mode = KeyMode::Raw;
244        assert_eq!(keyspace_prefix(0, key_mode), [b'r', 0, 0, 0]);
245        assert_eq!(keyspace_prefix(1, key_mode), [b'r', 0, 0, 1]);
246        assert_eq!(keyspace_prefix(0xFFFF, key_mode), [b'r', 0, 0xFF, 0xFF]);
247
248        let key_mode = KeyMode::Txn;
249        assert_eq!(keyspace_prefix(0, key_mode), [b'x', 0, 0, 0]);
250        assert_eq!(keyspace_prefix(1, key_mode), [b'x', 0, 0, 1]);
251        assert_eq!(keyspace_prefix(0xFFFF, key_mode), [b'x', 0, 0xFF, 0xFF]);
252    }
253
254    #[test]
255    fn test_encode_version() {
256        let keyspace = Keyspace::Enable {
257            keyspace_id: 0xDEAD,
258        };
259        let key_mode = KeyMode::Raw;
260
261        let key = Key::from(vec![0xBE, 0xEF]);
262        let expected_key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]);
263        assert_eq!(key.encode_keyspace(keyspace, key_mode), expected_key);
264
265        let bound: BoundRange = (Key::from(vec![0xDE, 0xAD])..Key::from(vec![0xBE, 0xEF])).into();
266        let expected_bound: BoundRange = (Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xDE, 0xAD])
267            ..Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]))
268            .into();
269        assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound);
270
271        let bound: BoundRange = (..).into();
272        let expected_bound: BoundRange =
273            (Key::from(vec![b'r', 0, 0xDE, 0xAD])..Key::from(vec![b'r', 0, 0xDE, 0xAE])).into();
274        assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound);
275
276        let bound: BoundRange = (Key::from(vec![])..Key::from(vec![])).into();
277        let expected_bound: BoundRange =
278            (Key::from(vec![b'r', 0, 0xDE, 0xAD])..Key::from(vec![b'r', 0, 0xDE, 0xAE])).into();
279        assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound);
280
281        let bound: BoundRange = (Key::from(vec![])..=Key::from(vec![])).into();
282        let expected_bound: BoundRange =
283            (Key::from(vec![b'r', 0, 0xDE, 0xAD])..Key::from(vec![b'r', 0, 0xDE, 0xAE])).into();
284        assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound);
285
286        let mutation = Mutation::Put(Key::from(vec![0xBE, 0xEF]), vec![4, 5, 6]);
287        let expected_mutation = Mutation::Put(
288            Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]),
289            vec![4, 5, 6],
290        );
291        assert_eq!(
292            mutation.encode_keyspace(keyspace, key_mode),
293            expected_mutation
294        );
295
296        let mutation = Mutation::Delete(Key::from(vec![0xBE, 0xEF]));
297        let expected_mutation = Mutation::Delete(Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]));
298        assert_eq!(
299            mutation.encode_keyspace(keyspace, key_mode),
300            expected_mutation
301        );
302
303        let key_mode = KeyMode::Txn;
304        let lock = crate::proto::kvrpcpb::LockInfo {
305            key: vec![b'k', b'1'],
306            primary_lock: vec![b'p', b'1'],
307            secondaries: vec![vec![b's', b'1'], vec![b's', b'2']],
308            ..Default::default()
309        };
310        let locks = vec![lock].encode_keyspace(keyspace, key_mode);
311        assert_eq!(locks.len(), 1);
312        assert_eq!(locks[0].key, vec![b'x', 0, 0xDE, 0xAD, b'k', b'1']);
313        assert_eq!(locks[0].primary_lock, vec![b'x', 0, 0xDE, 0xAD, b'p', b'1']);
314        assert_eq!(
315            locks[0].secondaries,
316            vec![
317                vec![b'x', 0, 0xDE, 0xAD, b's', b'1'],
318                vec![b'x', 0, 0xDE, 0xAD, b's', b'2']
319            ]
320        );
321    }
322
323    #[test]
324    fn test_truncate_version() {
325        let keyspace = Keyspace::Enable {
326            keyspace_id: 0xDEAD,
327        };
328
329        let key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]);
330        let expected_key = Key::from(vec![0xBE, 0xEF]);
331        assert_eq!(key.truncate_keyspace(keyspace), expected_key);
332
333        let key = Key::from(vec![b'x', 0, 0xDE, 0xAD, 0xBE, 0xEF]);
334        let expected_key = Key::from(vec![0xBE, 0xEF]);
335        assert_eq!(key.truncate_keyspace(keyspace), expected_key);
336
337        let pair = KvPair(Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k']), vec![b'v']);
338        let expected_pair = KvPair(Key::from(vec![b'k']), vec![b'v']);
339        assert_eq!(pair.truncate_keyspace(keyspace), expected_pair);
340
341        let range = Range {
342            start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'a']),
343            end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'b']),
344        };
345        let expected_range = Range {
346            start: Key::from(vec![b'a']),
347            end: Key::from(vec![b'b']),
348        };
349        assert_eq!(range.truncate_keyspace(keyspace), expected_range);
350
351        let ranges = vec![
352            Range {
353                start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'a']),
354                end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'b']),
355            },
356            Range {
357                start: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'c']),
358                end: Key::from(vec![b'x', 0, 0xDE, 0xAD, b'd']),
359            },
360        ];
361        let expected_ranges = vec![
362            Range {
363                start: Key::from(vec![b'a']),
364                end: Key::from(vec![b'b']),
365            },
366            Range {
367                start: Key::from(vec![b'c']),
368                end: Key::from(vec![b'd']),
369            },
370        ];
371        assert_eq!(ranges.truncate_keyspace(keyspace), expected_ranges);
372
373        let pairs = vec![
374            KvPair(Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k']), vec![b'v']),
375            KvPair(
376                Key::from(vec![b'x', 0, 0xDE, 0xAD, b'k', b'2']),
377                vec![b'v', b'2'],
378            ),
379        ];
380        let expected_pairs = vec![
381            KvPair(Key::from(vec![b'k']), vec![b'v']),
382            KvPair(Key::from(vec![b'k', b'2']), vec![b'v', b'2']),
383        ];
384        assert_eq!(pairs.truncate_keyspace(keyspace), expected_pairs);
385
386        let lock = crate::proto::kvrpcpb::LockInfo {
387            key: vec![b'x', 0, 0xDE, 0xAD, b'k'],
388            primary_lock: vec![b'x', 0, 0xDE, 0xAD, b'p'],
389            secondaries: vec![vec![b'x', 0, 0xDE, 0xAD, b's']],
390            ..Default::default()
391        };
392        let expected_lock = crate::proto::kvrpcpb::LockInfo {
393            key: vec![b'k'],
394            primary_lock: vec![b'p'],
395            secondaries: vec![vec![b's']],
396            ..Default::default()
397        };
398        assert_eq!(vec![lock].truncate_keyspace(keyspace), vec![expected_lock]);
399    }
400
401    #[test]
402    fn test_apiv2_no_prefix_api_version() {
403        assert_eq!(
404            Keyspace::ApiV2NoPrefix.api_version(),
405            kvrpcpb::ApiVersion::V2
406        );
407    }
408
409    #[test]
410    fn test_apiv2_no_prefix_encode_is_noop() {
411        let keyspace = Keyspace::ApiV2NoPrefix;
412        let key_mode = KeyMode::Txn;
413
414        let key = Key::from(vec![b'x', 0, 0, 0, b'k']);
415        assert_eq!(key.clone().encode_keyspace(keyspace, key_mode), key);
416
417        let pair = KvPair(Key::from(vec![b'x', 0, 0, 0, b'k']), vec![b'v']);
418        assert_eq!(pair.clone().encode_keyspace(keyspace, key_mode), pair);
419
420        let bound: BoundRange =
421            (Key::from(vec![b'x', 0, 0, 0, b'a'])..Key::from(vec![b'x', 0, 0, 0, b'b'])).into();
422        assert_eq!(bound.clone().encode_keyspace(keyspace, key_mode), bound);
423
424        let mutation = Mutation::Put(Key::from(vec![b'x', 0, 0, 0, b'k']), vec![1, 2, 3]);
425        assert_eq!(
426            mutation.clone().encode_keyspace(keyspace, key_mode),
427            mutation
428        );
429
430        let lock = crate::proto::kvrpcpb::LockInfo {
431            key: vec![b'x', 0, 0, 0, b'k'],
432            primary_lock: vec![b'x', 0, 0, 0, b'p'],
433            secondaries: vec![vec![b'x', 0, 0, 0, b's']],
434            ..Default::default()
435        };
436        let locks = vec![lock];
437        assert_eq!(locks.clone().encode_keyspace(keyspace, key_mode), locks);
438
439        let lock = crate::proto::kvrpcpb::LockInfo {
440            key: vec![b'k', b'1'],
441            primary_lock: vec![b'p', b'1'],
442            secondaries: vec![vec![b's', b'1']],
443            ..Default::default()
444        };
445        let locks = vec![lock.clone()];
446        assert_eq!(
447            locks.clone().encode_keyspace(Keyspace::Disable, key_mode),
448            locks
449        );
450    }
451
452    #[test]
453    fn test_apiv2_no_prefix_truncate_is_noop() {
454        let keyspace = Keyspace::ApiV2NoPrefix;
455
456        let key = Key::from(vec![b'x', 0, 0, 0, b'k']);
457        assert_eq!(key.clone().truncate_keyspace(keyspace), key);
458
459        let pair = KvPair(Key::from(vec![b'x', 0, 0, 0, b'k']), vec![b'v']);
460        assert_eq!(pair.clone().truncate_keyspace(keyspace), pair);
461
462        let range = Range {
463            start: Key::from(vec![b'x', 0, 0, 0, b'a']),
464            end: Key::from(vec![b'x', 0, 0, 0, b'b']),
465        };
466        assert_eq!(range.clone().truncate_keyspace(keyspace), range);
467
468        let pairs = vec![pair];
469        assert_eq!(pairs.clone().truncate_keyspace(keyspace), pairs);
470
471        let lock = crate::proto::kvrpcpb::LockInfo {
472            key: vec![b'x', 0, 0, 0, b'k'],
473            primary_lock: vec![b'x', 0, 0, 0, b'p'],
474            secondaries: vec![vec![b'x', 0, 0, 0, b's']],
475            ..Default::default()
476        };
477        let locks = vec![lock];
478        assert_eq!(locks.clone().truncate_keyspace(keyspace), locks);
479    }
480}