tikv_client/request/
shard.rs1use std::sync::Arc;
4
5use futures::stream::BoxStream;
6
7use super::plan::PreserveShard;
8use crate::pd::PdClient;
9use crate::region::RegionWithLeader;
10use crate::request::plan::CleanupLocks;
11use crate::request::Dispatch;
12use crate::request::KvRequest;
13use crate::request::Plan;
14use crate::request::ResolveLock;
15use crate::store::RegionStore;
16use crate::store::Request;
17use crate::Result;
18use std::fmt::Debug;
19
20macro_rules! impl_inner_shardable {
21 () => {
22 type Shard = P::Shard;
23
24 fn shards(
25 &self,
26 pd_client: &Arc<impl PdClient>,
27 ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
28 self.inner.shards(pd_client)
29 }
30
31 fn apply_shard(&mut self, shard: Self::Shard) {
32 self.inner.apply_shard(shard);
33 }
34
35 fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
36 self.inner.apply_store(store)
37 }
38 };
39}
40
41pub trait Shardable {
42 type Shard: Debug + Clone + Send + Sync;
43
44 fn shards(
45 &self,
46 pd_client: &Arc<impl PdClient>,
47 ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>>;
48
49 fn apply_shard(&mut self, shard: Self::Shard);
50
51 fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
53 where
54 Self: Sized + Clone,
55 {
56 let mut cloned = self.clone();
57 cloned.apply_shard(shard);
58 cloned
59 }
60
61 fn apply_store(&mut self, store: &RegionStore) -> Result<()>;
62}
63
64pub trait Batchable {
65 type Item;
66
67 fn batches(items: Vec<Self::Item>, batch_size: u64) -> Vec<Vec<Self::Item>> {
68 let mut batches: Vec<Vec<Self::Item>> = Vec::new();
69 let mut batch: Vec<Self::Item> = Vec::new();
70 let mut size = 0;
71
72 for item in items {
73 let item_size = Self::item_size(&item);
74 if size + item_size >= batch_size && !batch.is_empty() {
75 batches.push(batch);
76 batch = Vec::new();
77 size = 0;
78 }
79 size += item_size;
80 batch.push(item);
81 }
82 if !batch.is_empty() {
83 batches.push(batch)
84 }
85 batches
86 }
87
88 fn item_size(item: &Self::Item) -> u64;
89}
90
91pub trait HasNextBatch {
94 fn has_next_batch(&self) -> Option<(Vec<u8>, Vec<u8>)>;
95}
96
97pub trait NextBatch {
99 fn next_batch(&mut self, _range: (Vec<u8>, Vec<u8>));
100}
101
102impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
103 type Shard = Req::Shard;
104
105 fn shards(
106 &self,
107 pd_client: &Arc<impl PdClient>,
108 ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
109 self.request.shards(pd_client)
110 }
111
112 fn apply_shard(&mut self, shard: Self::Shard) {
113 self.request.apply_shard(shard);
114 }
115
116 fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
117 where
118 Self: Sized + Clone,
119 {
120 Dispatch {
121 request: self.request.clone_then_apply_shard(shard),
122 kv_client: self.kv_client.clone(),
123 }
124 }
125
126 fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
127 self.kv_client = Some(store.client.clone());
128 self.request.apply_store(store)
129 }
130}
131
132impl<Req: KvRequest + NextBatch> NextBatch for Dispatch<Req> {
133 fn next_batch(&mut self, range: (Vec<u8>, Vec<u8>)) {
134 self.request.next_batch(range);
135 }
136}
137
138impl<P: Plan + Shardable> Shardable for PreserveShard<P> {
139 type Shard = P::Shard;
140
141 fn shards(
142 &self,
143 pd_client: &Arc<impl PdClient>,
144 ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
145 self.inner.shards(pd_client)
146 }
147
148 fn apply_shard(&mut self, shard: Self::Shard) {
149 self.shard = Some(shard.clone());
150 self.inner.apply_shard(shard)
151 }
152
153 fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
154 self.inner.apply_store(store)
155 }
156}
157
158impl<P: Plan + Shardable, PdC: PdClient> Shardable for ResolveLock<P, PdC> {
159 impl_inner_shardable!();
160}
161
162impl<P: Plan + Shardable, PdC: PdClient> Shardable for CleanupLocks<P, PdC> {
163 type Shard = P::Shard;
164
165 fn shards(
166 &self,
167 pd_client: &Arc<impl PdClient>,
168 ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
169 self.inner.shards(pd_client)
170 }
171
172 fn apply_shard(&mut self, shard: Self::Shard) {
173 self.inner.apply_shard(shard)
174 }
175
176 fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
177 self.store = Some(store.clone());
178 self.inner.apply_store(store)
179 }
180}
181
182#[doc(hidden)]
183#[macro_export]
184macro_rules! shardable_key {
185 ($type_: ty) => {
186 impl Shardable for $type_ {
187 type Shard = Vec<Vec<u8>>;
188
189 fn shards(
190 &self,
191 pd_client: &std::sync::Arc<impl $crate::pd::PdClient>,
192 ) -> futures::stream::BoxStream<
193 'static,
194 $crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>,
195 > {
196 $crate::store::region_stream_for_keys(
197 std::iter::once(self.key.clone()),
198 pd_client.clone(),
199 )
200 }
201
202 fn apply_shard(&mut self, mut shard: Self::Shard) {
203 assert!(shard.len() == 1);
204 self.key = shard.pop().unwrap();
205 }
206
207 fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> {
208 self.set_leader(&store.region_with_leader)
209 }
210 }
211 };
212}
213
214#[doc(hidden)]
215#[macro_export]
216macro_rules! shardable_keys {
217 ($type_: ty) => {
218 impl Shardable for $type_ {
219 type Shard = Vec<Vec<u8>>;
220
221 fn shards(
222 &self,
223 pd_client: &std::sync::Arc<impl $crate::pd::PdClient>,
224 ) -> futures::stream::BoxStream<
225 'static,
226 $crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>,
227 > {
228 let mut keys = self.keys.clone();
229 keys.sort();
230 $crate::store::region_stream_for_keys(keys.into_iter(), pd_client.clone())
231 }
232
233 fn apply_shard(&mut self, shard: Self::Shard) {
234 self.keys = shard.into_iter().map(Into::into).collect();
235 }
236
237 fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> {
238 self.set_leader(&store.region_with_leader)
239 }
240 }
241 };
242}
243
244pub trait RangeRequest: Request {
245 fn is_reverse(&self) -> bool {
246 false
247 }
248}
249
250#[doc(hidden)]
251#[macro_export]
252macro_rules! range_request {
253 ($type_: ty) => {
254 impl RangeRequest for $type_ {}
255 };
256}
257
258#[doc(hidden)]
259#[macro_export]
260macro_rules! reversible_range_request {
261 ($type_: ty) => {
262 impl RangeRequest for $type_ {
263 fn is_reverse(&self) -> bool {
264 self.reverse
265 }
266 }
267 };
268}
269
270#[doc(hidden)]
271#[macro_export]
272macro_rules! shardable_range {
273 ($type_: ty) => {
274 impl Shardable for $type_ {
275 type Shard = (Vec<u8>, Vec<u8>);
276
277 fn shards(
278 &self,
279 pd_client: &Arc<impl $crate::pd::PdClient>,
280 ) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::region::RegionWithLeader)>>
281 {
282 let mut start_key = self.start_key.clone().into();
283 let mut end_key = self.end_key.clone().into();
284 if self.is_reverse() {
287 std::mem::swap(&mut start_key, &mut end_key);
288 }
289 $crate::store::region_stream_for_range((start_key, end_key), pd_client.clone())
290 }
291
292 fn apply_shard(&mut self, shard: Self::Shard) {
293 self.start_key = shard.0;
296 self.end_key = shard.1;
297 if self.is_reverse() {
298 std::mem::swap(&mut self.start_key, &mut self.end_key);
299 }
300 }
301
302 fn apply_store(&mut self, store: &$crate::store::RegionStore) -> $crate::Result<()> {
303 self.set_leader(&store.region_with_leader)
304 }
305 }
306 };
307}
308
309#[cfg(test)]
310mod test {
311 use rand::thread_rng;
312 use rand::Rng;
313
314 use super::Batchable;
315
316 #[test]
317 fn test_batches() {
318 let mut rng = thread_rng();
319
320 let items: Vec<_> = (0..3)
321 .map(|_| (0..2).map(|_| rng.gen::<u8>()).collect::<Vec<_>>())
322 .collect();
323
324 let batch_size = 5;
325
326 let batches = BatchableTest::batches(items.clone(), batch_size);
327
328 assert_eq!(batches.len(), 2);
329 assert_eq!(batches[0].len(), 2);
330 assert_eq!(batches[1].len(), 1);
331 assert_eq!(batches[0][0], items[0]);
332 assert_eq!(batches[0][1], items[1]);
333 assert_eq!(batches[1][0], items[2]);
334 }
335
336 #[test]
337 fn test_batches_big_item() {
338 let mut rng = thread_rng();
339
340 let items: Vec<_> = (0..3)
341 .map(|_| (0..3).map(|_| rng.gen::<u8>()).collect::<Vec<_>>())
342 .collect();
343
344 let batch_size = 2;
345
346 let batches = BatchableTest::batches(items.clone(), batch_size);
347
348 assert_eq!(batches.len(), 3);
349 for i in 0..items.len() {
350 let batch = &batches[i];
351 assert_eq!(batch.len(), 1);
352 assert_eq!(batch[0], items[i]);
353 }
354 }
355
356 struct BatchableTest;
357
358 impl Batchable for BatchableTest {
359 type Item = Vec<u8>;
360
361 fn item_size(item: &Self::Item) -> u64 {
362 item.len() as u64
363 }
364 }
365}