Skip to main content

tikv_client/raw/
requests.rs

1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3use super::RawRpcRequest;
4use crate::collect_single;
5use crate::kv::KvPairTTL;
6use crate::pd::PdClient;
7use crate::proto::kvrpcpb;
8use crate::proto::metapb;
9use crate::proto::tikvpb::tikv_client::TikvClient;
10use crate::range_request;
11use crate::region::RegionWithLeader;
12use crate::request::plan::ResponseWithShard;
13use crate::request::CollectSingle;
14use crate::request::DefaultProcessor;
15use crate::request::KvRequest;
16use crate::request::Merge;
17use crate::request::Process;
18use crate::request::RangeRequest;
19use crate::request::Shardable;
20use crate::request::SingleKey;
21use crate::request::{Batchable, Collect};
22use crate::shardable_key;
23use crate::shardable_keys;
24use crate::shardable_range;
25use crate::store::region_stream_for_keys;
26use crate::store::region_stream_for_ranges;
27use crate::store::RegionStore;
28use crate::store::Request;
29use crate::transaction::HasLocks;
30use crate::util::iter::FlatMapOkIterExt;
31use crate::ColumnFamily;
32use crate::Key;
33use crate::KvPair;
34use crate::Result;
35use crate::Value;
36use async_trait::async_trait;
37use futures::stream::BoxStream;
38use futures::{stream, StreamExt};
39use std::any::Any;
40use std::ops::Range;
41use std::sync::Arc;
42use std::time::Duration;
43use tonic::transport::Channel;
44
45const RAW_KV_REQUEST_BATCH_SIZE: u64 = 16 * 1024; // 16 KB
46
47pub fn new_raw_get_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
48    let mut req = kvrpcpb::RawGetRequest::default();
49    req.key = key;
50    req.maybe_set_cf(cf);
51
52    req
53}
54
55impl KvRequest for kvrpcpb::RawGetRequest {
56    type Response = kvrpcpb::RawGetResponse;
57}
58
59shardable_key!(kvrpcpb::RawGetRequest);
60collect_single!(kvrpcpb::RawGetResponse);
61
62impl SingleKey for kvrpcpb::RawGetRequest {
63    fn key(&self) -> &Vec<u8> {
64        &self.key
65    }
66}
67
68impl Process<kvrpcpb::RawGetResponse> for DefaultProcessor {
69    type Out = Option<Value>;
70
71    fn process(&self, input: Result<kvrpcpb::RawGetResponse>) -> Result<Self::Out> {
72        let input = input?;
73        Ok(if input.not_found {
74            None
75        } else {
76            Some(input.value)
77        })
78    }
79}
80
81pub fn new_raw_batch_get_request(
82    keys: Vec<Vec<u8>>,
83    cf: Option<ColumnFamily>,
84) -> kvrpcpb::RawBatchGetRequest {
85    let mut req = kvrpcpb::RawBatchGetRequest::default();
86    req.keys = keys;
87    req.maybe_set_cf(cf);
88
89    req
90}
91
92impl KvRequest for kvrpcpb::RawBatchGetRequest {
93    type Response = kvrpcpb::RawBatchGetResponse;
94}
95
96shardable_keys!(kvrpcpb::RawBatchGetRequest);
97
98impl Merge<kvrpcpb::RawBatchGetResponse> for Collect {
99    type Out = Vec<KvPair>;
100
101    fn merge(&self, input: Vec<Result<kvrpcpb::RawBatchGetResponse>>) -> Result<Self::Out> {
102        input
103            .into_iter()
104            .flat_map_ok(|resp| resp.pairs.into_iter().map(Into::into))
105            .collect()
106    }
107}
108
109pub fn new_raw_get_key_ttl_request(
110    key: Vec<u8>,
111    cf: Option<ColumnFamily>,
112) -> kvrpcpb::RawGetKeyTtlRequest {
113    let mut req = kvrpcpb::RawGetKeyTtlRequest::default();
114    req.key = key;
115    req.maybe_set_cf(cf);
116
117    req
118}
119
120impl KvRequest for kvrpcpb::RawGetKeyTtlRequest {
121    type Response = kvrpcpb::RawGetKeyTtlResponse;
122}
123
124shardable_key!(kvrpcpb::RawGetKeyTtlRequest);
125collect_single!(kvrpcpb::RawGetKeyTtlResponse);
126
127impl SingleKey for kvrpcpb::RawGetKeyTtlRequest {
128    fn key(&self) -> &Vec<u8> {
129        &self.key
130    }
131}
132
133impl Process<kvrpcpb::RawGetKeyTtlResponse> for DefaultProcessor {
134    type Out = Option<u64>;
135
136    fn process(&self, input: Result<kvrpcpb::RawGetKeyTtlResponse>) -> Result<Self::Out> {
137        let input = input?;
138        Ok(if input.not_found {
139            None
140        } else {
141            Some(input.ttl)
142        })
143    }
144}
145
146pub fn new_raw_put_request(
147    key: Vec<u8>,
148    value: Vec<u8>,
149    ttl: u64,
150    cf: Option<ColumnFamily>,
151    atomic: bool,
152) -> kvrpcpb::RawPutRequest {
153    let mut req = kvrpcpb::RawPutRequest::default();
154    req.key = key;
155    req.value = value;
156    req.ttl = ttl;
157    req.maybe_set_cf(cf);
158    req.for_cas = atomic;
159
160    req
161}
162
163impl KvRequest for kvrpcpb::RawPutRequest {
164    type Response = kvrpcpb::RawPutResponse;
165}
166
167shardable_key!(kvrpcpb::RawPutRequest);
168collect_single!(kvrpcpb::RawPutResponse);
169impl SingleKey for kvrpcpb::RawPutRequest {
170    fn key(&self) -> &Vec<u8> {
171        &self.key
172    }
173}
174
175pub fn new_raw_batch_put_request(
176    pairs: Vec<kvrpcpb::KvPair>,
177    ttls: Vec<u64>,
178    cf: Option<ColumnFamily>,
179    atomic: bool,
180) -> kvrpcpb::RawBatchPutRequest {
181    let mut req = kvrpcpb::RawBatchPutRequest::default();
182    req.pairs = pairs;
183    req.ttls = ttls;
184    req.maybe_set_cf(cf);
185    req.for_cas = atomic;
186
187    req
188}
189
190impl KvRequest for kvrpcpb::RawBatchPutRequest {
191    type Response = kvrpcpb::RawBatchPutResponse;
192}
193
194impl Batchable for kvrpcpb::RawBatchPutRequest {
195    type Item = (kvrpcpb::KvPair, u64);
196
197    fn item_size(item: &Self::Item) -> u64 {
198        (item.0.key.len() + item.0.value.len()) as u64
199    }
200}
201
202impl Shardable for kvrpcpb::RawBatchPutRequest {
203    type Shard = Vec<(kvrpcpb::KvPair, u64)>;
204
205    fn shards(
206        &self,
207        pd_client: &Arc<impl PdClient>,
208    ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
209        let kvs = self.pairs.clone();
210        let ttls = self.ttls.clone();
211        let mut kv_ttl: Vec<KvPairTTL> = kvs
212            .into_iter()
213            .zip(ttls)
214            .map(|(kv, ttl)| KvPairTTL(kv, ttl))
215            .collect();
216        kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key));
217        region_stream_for_keys(kv_ttl.into_iter(), pd_client.clone())
218            .flat_map(|result| match result {
219                Ok((keys, region)) => stream::iter(kvrpcpb::RawBatchPutRequest::batches(
220                    keys,
221                    RAW_KV_REQUEST_BATCH_SIZE,
222                ))
223                .map(move |batch| Ok((batch, region.clone())))
224                .boxed(),
225                Err(e) => stream::iter(Err(e)).boxed(),
226            })
227            .boxed()
228    }
229
230    fn apply_shard(&mut self, shard: Self::Shard) {
231        let (pairs, ttls) = shard.into_iter().unzip();
232        self.pairs = pairs;
233        self.ttls = ttls;
234    }
235
236    fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
237    where
238        Self: Sized + Clone,
239    {
240        let mut cloned = Self::default();
241        cloned.context = self.context.clone();
242        cloned.cf = self.cf.clone();
243        cloned.for_cas = self.for_cas;
244        cloned.apply_shard(shard);
245        cloned
246    }
247
248    fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
249        self.set_leader(&store.region_with_leader)
250    }
251}
252
253pub fn new_raw_delete_request(
254    key: Vec<u8>,
255    cf: Option<ColumnFamily>,
256    atomic: bool,
257) -> kvrpcpb::RawDeleteRequest {
258    let mut req = kvrpcpb::RawDeleteRequest::default();
259    req.key = key;
260    req.maybe_set_cf(cf);
261    req.for_cas = atomic;
262
263    req
264}
265
266impl KvRequest for kvrpcpb::RawDeleteRequest {
267    type Response = kvrpcpb::RawDeleteResponse;
268}
269
270shardable_key!(kvrpcpb::RawDeleteRequest);
271collect_single!(kvrpcpb::RawDeleteResponse);
272impl SingleKey for kvrpcpb::RawDeleteRequest {
273    fn key(&self) -> &Vec<u8> {
274        &self.key
275    }
276}
277
278pub fn new_raw_batch_delete_request(
279    keys: Vec<Vec<u8>>,
280    cf: Option<ColumnFamily>,
281) -> kvrpcpb::RawBatchDeleteRequest {
282    let mut req = kvrpcpb::RawBatchDeleteRequest::default();
283    req.keys = keys;
284    req.maybe_set_cf(cf);
285
286    req
287}
288
289impl KvRequest for kvrpcpb::RawBatchDeleteRequest {
290    type Response = kvrpcpb::RawBatchDeleteResponse;
291}
292
293impl Batchable for kvrpcpb::RawBatchDeleteRequest {
294    type Item = Vec<u8>;
295
296    fn item_size(item: &Self::Item) -> u64 {
297        item.len() as u64
298    }
299}
300
301impl Shardable for kvrpcpb::RawBatchDeleteRequest {
302    type Shard = Vec<Vec<u8>>;
303
304    fn shards(
305        &self,
306        pd_client: &Arc<impl PdClient>,
307    ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
308        let mut keys = self.keys.clone();
309        keys.sort();
310        region_stream_for_keys(keys.into_iter(), pd_client.clone())
311            .flat_map(|result| match result {
312                Ok((keys, region)) => stream::iter(kvrpcpb::RawBatchDeleteRequest::batches(
313                    keys,
314                    RAW_KV_REQUEST_BATCH_SIZE,
315                ))
316                .map(move |batch| Ok((batch, region.clone())))
317                .boxed(),
318                Err(e) => stream::iter(Err(e)).boxed(),
319            })
320            .boxed()
321    }
322
323    fn apply_shard(&mut self, shard: Self::Shard) {
324        self.keys = shard;
325    }
326
327    fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
328    where
329        Self: Sized + Clone,
330    {
331        let mut cloned = Self::default();
332        cloned.context = self.context.clone();
333        cloned.cf = self.cf.clone();
334        cloned.for_cas = self.for_cas;
335        cloned.apply_shard(shard);
336        cloned
337    }
338
339    fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
340        self.set_leader(&store.region_with_leader)
341    }
342}
343
344pub fn new_raw_delete_range_request(
345    start_key: Vec<u8>,
346    end_key: Vec<u8>,
347    cf: Option<ColumnFamily>,
348) -> kvrpcpb::RawDeleteRangeRequest {
349    let mut req = kvrpcpb::RawDeleteRangeRequest::default();
350    req.start_key = start_key;
351    req.end_key = end_key;
352    req.maybe_set_cf(cf);
353
354    req
355}
356
357impl KvRequest for kvrpcpb::RawDeleteRangeRequest {
358    type Response = kvrpcpb::RawDeleteRangeResponse;
359}
360
361range_request!(kvrpcpb::RawDeleteRangeRequest);
362shardable_range!(kvrpcpb::RawDeleteRangeRequest);
363
364pub fn new_raw_scan_request(
365    start_key: Vec<u8>,
366    end_key: Vec<u8>,
367    limit: u32,
368    key_only: bool,
369    reverse: bool,
370    cf: Option<ColumnFamily>,
371) -> kvrpcpb::RawScanRequest {
372    let mut req = kvrpcpb::RawScanRequest::default();
373    if !reverse {
374        req.start_key = start_key;
375        req.end_key = end_key;
376    } else {
377        req.start_key = end_key;
378        req.end_key = start_key;
379    }
380    req.limit = limit;
381    req.key_only = key_only;
382    req.reverse = reverse;
383    req.maybe_set_cf(cf);
384
385    req
386}
387
388impl KvRequest for kvrpcpb::RawScanRequest {
389    type Response = kvrpcpb::RawScanResponse;
390}
391
392range_request!(kvrpcpb::RawScanRequest);
393shardable_range!(kvrpcpb::RawScanRequest);
394
395impl Merge<kvrpcpb::RawScanResponse> for Collect {
396    type Out = Vec<KvPair>;
397
398    fn merge(&self, input: Vec<Result<kvrpcpb::RawScanResponse>>) -> Result<Self::Out> {
399        input
400            .into_iter()
401            .flat_map_ok(|resp| resp.kvs.into_iter().map(Into::into))
402            .collect()
403    }
404}
405
406pub fn new_raw_batch_scan_request(
407    ranges: Vec<kvrpcpb::KeyRange>,
408    each_limit: u32,
409    key_only: bool,
410    cf: Option<ColumnFamily>,
411) -> kvrpcpb::RawBatchScanRequest {
412    let mut req = kvrpcpb::RawBatchScanRequest::default();
413    req.ranges = ranges;
414    req.each_limit = each_limit;
415    req.key_only = key_only;
416    req.maybe_set_cf(cf);
417
418    req
419}
420
421impl KvRequest for kvrpcpb::RawBatchScanRequest {
422    type Response = kvrpcpb::RawBatchScanResponse;
423}
424
425impl Shardable for kvrpcpb::RawBatchScanRequest {
426    type Shard = Vec<kvrpcpb::KeyRange>;
427
428    fn shards(
429        &self,
430        pd_client: &Arc<impl PdClient>,
431    ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
432        region_stream_for_ranges(self.ranges.clone(), pd_client.clone())
433    }
434
435    fn apply_shard(&mut self, shard: Self::Shard) {
436        self.ranges = shard;
437    }
438
439    fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
440        self.set_leader(&store.region_with_leader)
441    }
442}
443
444impl Merge<kvrpcpb::RawBatchScanResponse> for Collect {
445    type Out = Vec<KvPair>;
446
447    fn merge(&self, input: Vec<Result<kvrpcpb::RawBatchScanResponse>>) -> Result<Self::Out> {
448        input
449            .into_iter()
450            .flat_map_ok(|resp| resp.kvs.into_iter().map(Into::into))
451            .collect()
452    }
453}
454
455pub fn new_cas_request(
456    key: Vec<u8>,
457    value: Vec<u8>,
458    previous_value: Option<Vec<u8>>,
459    cf: Option<ColumnFamily>,
460) -> kvrpcpb::RawCasRequest {
461    let mut req = kvrpcpb::RawCasRequest::default();
462    req.key = key;
463    req.value = value;
464    match previous_value {
465        Some(v) => req.previous_value = v,
466        None => req.previous_not_exist = true,
467    }
468    req.maybe_set_cf(cf);
469    req
470}
471
472impl KvRequest for kvrpcpb::RawCasRequest {
473    type Response = kvrpcpb::RawCasResponse;
474}
475
476shardable_key!(kvrpcpb::RawCasRequest);
477collect_single!(kvrpcpb::RawCasResponse);
478impl SingleKey for kvrpcpb::RawCasRequest {
479    fn key(&self) -> &Vec<u8> {
480        &self.key
481    }
482}
483
484impl Process<kvrpcpb::RawCasResponse> for DefaultProcessor {
485    type Out = (Option<Value>, bool); // (previous_value, swapped)
486
487    fn process(&self, input: Result<kvrpcpb::RawCasResponse>) -> Result<Self::Out> {
488        let input = input?;
489        if input.previous_not_exist {
490            Ok((None, input.succeed))
491        } else {
492            Ok((Some(input.previous_value), input.succeed))
493        }
494    }
495}
496
497type RawCoprocessorRequestDataBuilder =
498    Arc<dyn Fn(metapb::Region, Vec<kvrpcpb::KeyRange>) -> Vec<u8> + Send + Sync>;
499
500pub fn new_raw_coprocessor_request(
501    copr_name: String,
502    copr_version_req: String,
503    ranges: Vec<kvrpcpb::KeyRange>,
504    data_builder: RawCoprocessorRequestDataBuilder,
505) -> RawCoprocessorRequest {
506    let mut inner = kvrpcpb::RawCoprocessorRequest::default();
507    inner.copr_name = copr_name;
508    inner.copr_version_req = copr_version_req;
509    inner.ranges = ranges;
510    RawCoprocessorRequest {
511        inner,
512        data_builder,
513    }
514}
515
516#[derive(Clone)]
517pub struct RawCoprocessorRequest {
518    inner: kvrpcpb::RawCoprocessorRequest,
519    data_builder: RawCoprocessorRequestDataBuilder,
520}
521
522#[async_trait]
523impl Request for RawCoprocessorRequest {
524    async fn dispatch(
525        &self,
526        client: &TikvClient<Channel>,
527        timeout: Duration,
528    ) -> Result<Box<dyn Any>> {
529        self.inner.dispatch(client, timeout).await
530    }
531
532    fn label(&self) -> &'static str {
533        self.inner.label()
534    }
535
536    fn as_any(&self) -> &dyn Any {
537        self.inner.as_any()
538    }
539
540    fn set_leader(&mut self, leader: &RegionWithLeader) -> Result<()> {
541        self.inner.set_leader(leader)
542    }
543
544    fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) {
545        self.inner.set_api_version(api_version);
546    }
547}
548
549impl KvRequest for RawCoprocessorRequest {
550    type Response = kvrpcpb::RawCoprocessorResponse;
551}
552
553impl Shardable for RawCoprocessorRequest {
554    type Shard = Vec<kvrpcpb::KeyRange>;
555
556    fn shards(
557        &self,
558        pd_client: &Arc<impl PdClient>,
559    ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
560        region_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone())
561    }
562
563    fn apply_shard(&mut self, shard: Self::Shard) {
564        self.inner.ranges = shard;
565    }
566
567    fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
568        self.set_leader(&store.region_with_leader)?;
569        self.inner.data = (self.data_builder)(
570            store.region_with_leader.region.clone(),
571            self.inner.ranges.clone(),
572        );
573        Ok(())
574    }
575}
576
577#[allow(clippy::type_complexity)]
578impl
579    Process<Vec<Result<ResponseWithShard<kvrpcpb::RawCoprocessorResponse, Vec<kvrpcpb::KeyRange>>>>>
580    for DefaultProcessor
581{
582    type Out = Vec<(Vec<Range<Key>>, Vec<u8>)>;
583
584    fn process(
585        &self,
586        input: Result<
587            Vec<Result<ResponseWithShard<kvrpcpb::RawCoprocessorResponse, Vec<kvrpcpb::KeyRange>>>>,
588        >,
589    ) -> Result<Self::Out> {
590        input?
591            .into_iter()
592            .map(|shard_resp| {
593                shard_resp.map(|ResponseWithShard(resp, ranges)| {
594                    (
595                        ranges
596                            .into_iter()
597                            .map(|range| range.start_key.into()..range.end_key.into())
598                            .collect(),
599                        resp.data,
600                    )
601                })
602            })
603            .collect::<Result<Vec<_>>>()
604    }
605}
606
607macro_rules! impl_raw_rpc_request {
608    ($name: ident) => {
609        impl RawRpcRequest for kvrpcpb::$name {
610            fn set_cf(&mut self, cf: String) {
611                self.cf = cf;
612            }
613        }
614    };
615}
616
617impl_raw_rpc_request!(RawGetRequest);
618impl_raw_rpc_request!(RawBatchGetRequest);
619impl_raw_rpc_request!(RawGetKeyTtlRequest);
620impl_raw_rpc_request!(RawPutRequest);
621impl_raw_rpc_request!(RawBatchPutRequest);
622impl_raw_rpc_request!(RawDeleteRequest);
623impl_raw_rpc_request!(RawBatchDeleteRequest);
624impl_raw_rpc_request!(RawScanRequest);
625impl_raw_rpc_request!(RawBatchScanRequest);
626impl_raw_rpc_request!(RawDeleteRangeRequest);
627impl_raw_rpc_request!(RawCasRequest);
628
629impl HasLocks for kvrpcpb::RawGetResponse {}
630
631impl HasLocks for kvrpcpb::RawBatchGetResponse {}
632
633impl HasLocks for kvrpcpb::RawGetKeyTtlResponse {}
634
635impl HasLocks for kvrpcpb::RawPutResponse {}
636
637impl HasLocks for kvrpcpb::RawBatchPutResponse {}
638
639impl HasLocks for kvrpcpb::RawDeleteResponse {}
640
641impl HasLocks for kvrpcpb::RawBatchDeleteResponse {}
642
643impl HasLocks for kvrpcpb::RawScanResponse {}
644
645impl HasLocks for kvrpcpb::RawBatchScanResponse {}
646
647impl HasLocks for kvrpcpb::RawDeleteRangeResponse {}
648
649impl HasLocks for kvrpcpb::RawCasResponse {}
650
651impl HasLocks for kvrpcpb::RawCoprocessorResponse {}
652
653#[cfg(test)]
654mod test {
655    use std::any::Any;
656    use std::collections::HashMap;
657    use std::ops::Deref;
658    use std::sync::Mutex;
659
660    use super::*;
661    use crate::backoff::DEFAULT_REGION_BACKOFF;
662    use crate::mock::MockKvClient;
663    use crate::mock::MockPdClient;
664    use crate::proto::kvrpcpb;
665    use crate::request::Keyspace;
666    use crate::request::Plan;
667
668    #[rstest::rstest]
669    #[case(Keyspace::Disable)]
670    #[case(Keyspace::Enable { keyspace_id: 0 })]
671    #[tokio::test]
672    async fn test_raw_scan(#[case] keyspace: Keyspace) {
673        let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
674            |req: &dyn Any| {
675                let req: &kvrpcpb::RawScanRequest = req.downcast_ref().unwrap();
676                assert!(req.key_only);
677                assert_eq!(req.limit, 10);
678
679                let mut resp = kvrpcpb::RawScanResponse::default();
680                for i in req.start_key[0]..req.end_key[0] {
681                    let kv = kvrpcpb::KvPair {
682                        key: vec![i],
683                        ..Default::default()
684                    };
685                    resp.kvs.push(kv);
686                }
687
688                Ok(Box::new(resp) as Box<dyn Any>)
689            },
690        )));
691
692        let start: Key = vec![1].into();
693        let end: Key = vec![50].into();
694        let scan = kvrpcpb::RawScanRequest {
695            start_key: start.into(),
696            end_key: end.into(),
697            limit: 10,
698            key_only: true,
699            ..Default::default()
700        };
701        let plan = crate::request::PlanBuilder::new(client, keyspace, scan)
702            .retry_multi_region(DEFAULT_REGION_BACKOFF)
703            .merge(Collect)
704            .plan();
705        let scan = plan.execute().await.unwrap();
706
707        assert_eq!(scan.len(), 49);
708        // FIXME test the keys returned.
709    }
710
711    #[tokio::test]
712    async fn test_raw_batch_put() -> Result<()> {
713        let region1_kvs = vec![KvPair(vec![9].into(), vec![12])];
714        let region1_ttls = vec![0];
715        let region2_kvs = vec![
716            KvPair(vec![11].into(), vec![12]),
717            KvPair("FFF".to_string().as_bytes().to_vec().into(), vec![12]),
718        ];
719        let region2_ttls = vec![0, 1];
720
721        let expected_map = HashMap::from([
722            (region1_kvs.clone(), region1_ttls.clone()),
723            (region2_kvs.clone(), region2_ttls.clone()),
724        ]);
725
726        let pairs: Vec<kvrpcpb::KvPair> = [region1_kvs, region2_kvs]
727            .concat()
728            .into_iter()
729            .map(|kv| kv.into())
730            .collect();
731        let ttls = [region1_ttls, region2_ttls].concat();
732        let cf = ColumnFamily::Default;
733
734        let actual_map: Arc<Mutex<HashMap<Vec<KvPair>, Vec<u64>>>> =
735            Arc::new(Mutex::new(HashMap::new()));
736        let fut_actual_map = actual_map.clone();
737        let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
738            move |req: &dyn Any| {
739                let req: &kvrpcpb::RawBatchPutRequest = req.downcast_ref().unwrap();
740                let kv_pair = req
741                    .pairs
742                    .clone()
743                    .into_iter()
744                    .map(|p| p.into())
745                    .collect::<Vec<KvPair>>();
746                let ttls = req.ttls.clone();
747                fut_actual_map.lock().unwrap().insert(kv_pair, ttls);
748                let resp = kvrpcpb::RawBatchPutResponse::default();
749                Ok(Box::new(resp) as Box<dyn Any>)
750            },
751        )));
752
753        let batch_put_request =
754            new_raw_batch_put_request(pairs.clone(), ttls.clone(), Some(cf), false);
755        let keyspace = Keyspace::Enable { keyspace_id: 0 };
756        let plan = crate::request::PlanBuilder::new(client, keyspace, batch_put_request)
757            .retry_multi_region(DEFAULT_REGION_BACKOFF)
758            .plan();
759        let _ = plan.execute().await;
760        assert_eq!(actual_map.lock().unwrap().deref(), &expected_map);
761        Ok(())
762    }
763}