Skip to main content

tikv_client/request/
shard.rs

1// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
2
3use std::sync::Arc;
4
5use futures::stream::BoxStream;
6
7use super::plan::PreserveShard;
8use crate::pd::PdClient;
9use crate::region::RegionWithLeader;
10use crate::request::plan::CleanupLocks;
11use crate::request::Dispatch;
12use crate::request::KvRequest;
13use crate::request::Plan;
14use crate::request::ResolveLock;
15use crate::store::RegionStore;
16use crate::store::Request;
17use crate::Result;
18use std::fmt::Debug;
19
20macro_rules! impl_inner_shardable {
21    () => {
22        type Shard = P::Shard;
23
24        fn shards(
25            &self,
26            pd_client: &Arc<impl PdClient>,
27        ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
28            self.inner.shards(pd_client)
29        }
30
31        fn apply_shard(&mut self, shard: Self::Shard) {
32            self.inner.apply_shard(shard);
33        }
34
35        fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
36            self.inner.apply_store(store)
37        }
38    };
39}
40
41pub trait Shardable {
42    type Shard: Debug + Clone + Send + Sync;
43
44    fn shards(
45        &self,
46        pd_client: &Arc<impl PdClient>,
47    ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>>;
48
49    fn apply_shard(&mut self, shard: Self::Shard);
50
51    /// Implementation can skip unnecessary fields clone if fields will be overwritten by `apply_shard`.
52    fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
53    where
54        Self: Sized + Clone,
55    {
56        let mut cloned = self.clone();
57        cloned.apply_shard(shard);
58        cloned
59    }
60
61    fn apply_store(&mut self, store: &RegionStore) -> Result<()>;
62}
63
64pub trait Batchable {
65    type Item;
66
67    fn batches(items: Vec<Self::Item>, batch_size: u64) -> Vec<Vec<Self::Item>> {
68        let mut batches: Vec<Vec<Self::Item>> = Vec::new();
69        let mut batch: Vec<Self::Item> = Vec::new();
70        let mut size = 0;
71
72        for item in items {
73            let item_size = Self::item_size(&item);
74            if size + item_size >= batch_size && !batch.is_empty() {
75                batches.push(batch);
76                batch = Vec::new();
77                size = 0;
78            }
79            size += item_size;
80            batch.push(item);
81        }
82        if !batch.is_empty() {
83            batches.push(batch)
84        }
85        batches
86    }
87
88    fn item_size(item: &Self::Item) -> u64;
89}
90
91// Use to iterate in a region for scan requests that have batch size limit.
92// HasNextBatch use to get the next batch according to previous response.
93pub trait HasNextBatch {
94    fn has_next_batch(&self) -> Option<(Vec<u8>, Vec<u8>)>;
95}
96
97// NextBatch use to change start key of request by result of `has_next_batch`.
98pub trait NextBatch {
99    fn next_batch(&mut self, _range: (Vec<u8>, Vec<u8>));
100}
101
102impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
103    type Shard = Req::Shard;
104
105    fn shards(
106        &self,
107        pd_client: &Arc<impl PdClient>,
108    ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
109        self.request.shards(pd_client)
110    }
111
112    fn apply_shard(&mut self, shard: Self::Shard) {
113        self.request.apply_shard(shard);
114    }
115
116    fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
117    where
118        Self: Sized + Clone,
119    {
120        Dispatch {
121            request: self.request.clone_then_apply_shard(shard),
122            kv_client: self.kv_client.clone(),
123        }
124    }
125
126    fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
127        self.kv_client = Some(store.client.clone());
128        self.request.apply_store(store)
129    }
130}
131
132impl<Req: KvRequest + NextBatch> NextBatch for Dispatch<Req> {
133    fn next_batch(&mut self, range: (Vec<u8>, Vec<u8>)) {
134        self.request.next_batch(range);
135    }
136}
137
138impl<P: Plan + Shardable> Shardable for PreserveShard<P> {
139    type Shard = P::Shard;
140
141    fn shards(
142        &self,
143        pd_client: &Arc<impl PdClient>,
144    ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
145        self.inner.shards(pd_client)
146    }
147
148    fn apply_shard(&mut self, shard: Self::Shard) {
149        self.shard = Some(shard.clone());
150        self.inner.apply_shard(shard)
151    }
152
153    fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
154        self.inner.apply_store(store)
155    }
156}
157
158impl<P: Plan + Shardable, PdC: PdClient> Shardable for ResolveLock<P, PdC> {
159    impl_inner_shardable!();
160}
161
162impl<P: Plan + Shardable, PdC: PdClient> Shardable for CleanupLocks<P, PdC> {
163    type Shard = P::Shard;
164
165    fn shards(
166        &self,
167        pd_client: &Arc<impl PdClient>,
168    ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
169        self.inner.shards(pd_client)
170    }
171
172    fn apply_shard(&mut self, shard: Self::Shard) {
173        self.inner.apply_shard(shard)
174    }
175
176    fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
177        self.store = Some(store.clone());
178        self.inner.apply_store(store)
179    }
180}
181
182#[doc(hidden)]
183#[macro_export]
184macro_rules! shardable_key {
185    ($type_: ty) => {
186        impl Shardable for $type_ {
187            type Shard = Vec<Vec<u8>>;
188
189            fn shards(
190                &self,
191                pd_client: &std::sync::Arc<impl $crate::pd::PdClient>,
192            ) -> futures::stream::BoxStream<
193                'static,
194                $crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>,
195            > {
196                $crate::store::region_stream_for_keys(
197                    std::iter::once(self.key.clone()),
198                    pd_client.clone(),
199                )
200            }
201
202            fn apply_shard(&mut self, mut shard: Self::Shard) {
203                assert!(shard.len() == 1);
204                self.key = shard.pop().unwrap();
205            }
206
207            fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> {
208                self.set_leader(&store.region_with_leader)
209            }
210        }
211    };
212}
213
214#[doc(hidden)]
215#[macro_export]
216macro_rules! shardable_keys {
217    ($type_: ty) => {
218        impl Shardable for $type_ {
219            type Shard = Vec<Vec<u8>>;
220
221            fn shards(
222                &self,
223                pd_client: &std::sync::Arc<impl $crate::pd::PdClient>,
224            ) -> futures::stream::BoxStream<
225                'static,
226                $crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>,
227            > {
228                let mut keys = self.keys.clone();
229                keys.sort();
230                $crate::store::region_stream_for_keys(keys.into_iter(), pd_client.clone())
231            }
232
233            fn apply_shard(&mut self, shard: Self::Shard) {
234                self.keys = shard.into_iter().map(Into::into).collect();
235            }
236
237            fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> {
238                self.set_leader(&store.region_with_leader)
239            }
240        }
241    };
242}
243
244pub trait RangeRequest: Request {
245    fn is_reverse(&self) -> bool {
246        false
247    }
248}
249
250#[doc(hidden)]
251#[macro_export]
252macro_rules! range_request {
253    ($type_: ty) => {
254        impl RangeRequest for $type_ {}
255    };
256}
257
258#[doc(hidden)]
259#[macro_export]
260macro_rules! reversible_range_request {
261    ($type_: ty) => {
262        impl RangeRequest for $type_ {
263            fn is_reverse(&self) -> bool {
264                self.reverse
265            }
266        }
267    };
268}
269
270#[doc(hidden)]
271#[macro_export]
272macro_rules! shardable_range {
273    ($type_: ty) => {
274        impl Shardable for $type_ {
275            type Shard = (Vec<u8>, Vec<u8>);
276
277            fn shards(
278                &self,
279                pd_client: &Arc<impl $crate::pd::PdClient>,
280            ) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>>
281            {
282                let mut start_key = self.start_key.clone().into();
283                let mut end_key = self.end_key.clone().into();
284                // In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key.
285                // Therefore, before fetching the regions from PD, it is necessary to swap the values of start_key and end_key.
286                if self.is_reverse() {
287                    std::mem::swap(&mut start_key, &mut end_key);
288                }
289                $crate::store::region_stream_for_range((start_key, end_key), pd_client.clone())
290            }
291
292            fn apply_shard(&mut self, shard: Self::Shard) {
293                // In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key.
294                // As a result, after obtaining start_key and end_key from PD, we need to swap their values when assigning them to the request.
295                self.start_key = shard.0;
296                self.end_key = shard.1;
297                if self.is_reverse() {
298                    std::mem::swap(&mut self.start_key, &mut self.end_key);
299                }
300            }
301
302            fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> {
303                self.set_leader(&store.region_with_leader)
304            }
305        }
306    };
307}
308
309#[cfg(test)]
310mod test {
311    use rand::thread_rng;
312    use rand::Rng;
313
314    use super::Batchable;
315
316    #[test]
317    fn test_batches() {
318        let mut rng = thread_rng();
319
320        let items: Vec<_> = (0..3)
321            .map(|_| (0..2).map(|_| rng.gen::<u8>()).collect::<Vec<_>>())
322            .collect();
323
324        let batch_size = 5;
325
326        let batches = BatchableTest::batches(items.clone(), batch_size);
327
328        assert_eq!(batches.len(), 2);
329        assert_eq!(batches[0].len(), 2);
330        assert_eq!(batches[1].len(), 1);
331        assert_eq!(batches[0][0], items[0]);
332        assert_eq!(batches[0][1], items[1]);
333        assert_eq!(batches[1][0], items[2]);
334    }
335
336    #[test]
337    fn test_batches_big_item() {
338        let mut rng = thread_rng();
339
340        let items: Vec<_> = (0..3)
341            .map(|_| (0..3).map(|_| rng.gen::<u8>()).collect::<Vec<_>>())
342            .collect();
343
344        let batch_size = 2;
345
346        let batches = BatchableTest::batches(items.clone(), batch_size);
347
348        assert_eq!(batches.len(), 3);
349        for i in 0..items.len() {
350            let batch = &batches[i];
351            assert_eq!(batch.len(), 1);
352            assert_eq!(batch[0], items[i]);
353        }
354    }
355
356    struct BatchableTest;
357
358    impl Batchable for BatchableTest {
359        type Item = Vec<u8>;
360
361        fn item_size(item: &Self::Item) -> u64 {
362            item.len() as u64
363        }
364    }
365}