1use super::RawRpcRequest;
4use crate::collect_single;
5use crate::kv::KvPairTTL;
6use crate::pd::PdClient;
7use crate::proto::kvrpcpb;
8use crate::proto::metapb;
9use crate::proto::tikvpb::tikv_client::TikvClient;
10use crate::range_request;
11use crate::region::RegionWithLeader;
12use crate::request::plan::ResponseWithShard;
13use crate::request::CollectSingle;
14use crate::request::DefaultProcessor;
15use crate::request::KvRequest;
16use crate::request::Merge;
17use crate::request::Process;
18use crate::request::RangeRequest;
19use crate::request::Shardable;
20use crate::request::SingleKey;
21use crate::request::{Batchable, Collect};
22use crate::shardable_key;
23use crate::shardable_keys;
24use crate::shardable_range;
25use crate::store::region_stream_for_keys;
26use crate::store::region_stream_for_ranges;
27use crate::store::RegionStore;
28use crate::store::Request;
29use crate::transaction::HasLocks;
30use crate::util::iter::FlatMapOkIterExt;
31use crate::ColumnFamily;
32use crate::Key;
33use crate::KvPair;
34use crate::Result;
35use crate::Value;
36use async_trait::async_trait;
37use futures::stream::BoxStream;
38use futures::{stream, StreamExt};
39use std::any::Any;
40use std::ops::Range;
41use std::sync::Arc;
42use std::time::Duration;
43use tonic::transport::Channel;
44
45const RAW_KV_REQUEST_BATCH_SIZE: u64 = 16 * 1024; pub fn new_raw_get_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
48 let mut req = kvrpcpb::RawGetRequest::default();
49 req.key = key;
50 req.maybe_set_cf(cf);
51
52 req
53}
54
55impl KvRequest for kvrpcpb::RawGetRequest {
56 type Response = kvrpcpb::RawGetResponse;
57}
58
59shardable_key!(kvrpcpb::RawGetRequest);
60collect_single!(kvrpcpb::RawGetResponse);
61
62impl SingleKey for kvrpcpb::RawGetRequest {
63 fn key(&self) -> &Vec<u8> {
64 &self.key
65 }
66}
67
68impl Process<kvrpcpb::RawGetResponse> for DefaultProcessor {
69 type Out = Option<Value>;
70
71 fn process(&self, input: Result<kvrpcpb::RawGetResponse>) -> Result<Self::Out> {
72 let input = input?;
73 Ok(if input.not_found {
74 None
75 } else {
76 Some(input.value)
77 })
78 }
79}
80
81pub fn new_raw_batch_get_request(
82 keys: Vec<Vec<u8>>,
83 cf: Option<ColumnFamily>,
84) -> kvrpcpb::RawBatchGetRequest {
85 let mut req = kvrpcpb::RawBatchGetRequest::default();
86 req.keys = keys;
87 req.maybe_set_cf(cf);
88
89 req
90}
91
92impl KvRequest for kvrpcpb::RawBatchGetRequest {
93 type Response = kvrpcpb::RawBatchGetResponse;
94}
95
96shardable_keys!(kvrpcpb::RawBatchGetRequest);
97
98impl Merge<kvrpcpb::RawBatchGetResponse> for Collect {
99 type Out = Vec<KvPair>;
100
101 fn merge(&self, input: Vec<Result<kvrpcpb::RawBatchGetResponse>>) -> Result<Self::Out> {
102 input
103 .into_iter()
104 .flat_map_ok(|resp| resp.pairs.into_iter().map(Into::into))
105 .collect()
106 }
107}
108
109pub fn new_raw_get_key_ttl_request(
110 key: Vec<u8>,
111 cf: Option<ColumnFamily>,
112) -> kvrpcpb::RawGetKeyTtlRequest {
113 let mut req = kvrpcpb::RawGetKeyTtlRequest::default();
114 req.key = key;
115 req.maybe_set_cf(cf);
116
117 req
118}
119
120impl KvRequest for kvrpcpb::RawGetKeyTtlRequest {
121 type Response = kvrpcpb::RawGetKeyTtlResponse;
122}
123
124shardable_key!(kvrpcpb::RawGetKeyTtlRequest);
125collect_single!(kvrpcpb::RawGetKeyTtlResponse);
126
127impl SingleKey for kvrpcpb::RawGetKeyTtlRequest {
128 fn key(&self) -> &Vec<u8> {
129 &self.key
130 }
131}
132
133impl Process<kvrpcpb::RawGetKeyTtlResponse> for DefaultProcessor {
134 type Out = Option<u64>;
135
136 fn process(&self, input: Result<kvrpcpb::RawGetKeyTtlResponse>) -> Result<Self::Out> {
137 let input = input?;
138 Ok(if input.not_found {
139 None
140 } else {
141 Some(input.ttl)
142 })
143 }
144}
145
146pub fn new_raw_put_request(
147 key: Vec<u8>,
148 value: Vec<u8>,
149 ttl: u64,
150 cf: Option<ColumnFamily>,
151 atomic: bool,
152) -> kvrpcpb::RawPutRequest {
153 let mut req = kvrpcpb::RawPutRequest::default();
154 req.key = key;
155 req.value = value;
156 req.ttl = ttl;
157 req.maybe_set_cf(cf);
158 req.for_cas = atomic;
159
160 req
161}
162
163impl KvRequest for kvrpcpb::RawPutRequest {
164 type Response = kvrpcpb::RawPutResponse;
165}
166
167shardable_key!(kvrpcpb::RawPutRequest);
168collect_single!(kvrpcpb::RawPutResponse);
169impl SingleKey for kvrpcpb::RawPutRequest {
170 fn key(&self) -> &Vec<u8> {
171 &self.key
172 }
173}
174
175pub fn new_raw_batch_put_request(
176 pairs: Vec<kvrpcpb::KvPair>,
177 ttls: Vec<u64>,
178 cf: Option<ColumnFamily>,
179 atomic: bool,
180) -> kvrpcpb::RawBatchPutRequest {
181 let mut req = kvrpcpb::RawBatchPutRequest::default();
182 req.pairs = pairs;
183 req.ttls = ttls;
184 req.maybe_set_cf(cf);
185 req.for_cas = atomic;
186
187 req
188}
189
190impl KvRequest for kvrpcpb::RawBatchPutRequest {
191 type Response = kvrpcpb::RawBatchPutResponse;
192}
193
194impl Batchable for kvrpcpb::RawBatchPutRequest {
195 type Item = (kvrpcpb::KvPair, u64);
196
197 fn item_size(item: &Self::Item) -> u64 {
198 (item.0.key.len() + item.0.value.len()) as u64
199 }
200}
201
202impl Shardable for kvrpcpb::RawBatchPutRequest {
203 type Shard = Vec<(kvrpcpb::KvPair, u64)>;
204
205 fn shards(
206 &self,
207 pd_client: &Arc<impl PdClient>,
208 ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
209 let kvs = self.pairs.clone();
210 let ttls = self.ttls.clone();
211 let mut kv_ttl: Vec<KvPairTTL> = kvs
212 .into_iter()
213 .zip(ttls)
214 .map(|(kv, ttl)| KvPairTTL(kv, ttl))
215 .collect();
216 kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key));
217 region_stream_for_keys(kv_ttl.into_iter(), pd_client.clone())
218 .flat_map(|result| match result {
219 Ok((keys, region)) => stream::iter(kvrpcpb::RawBatchPutRequest::batches(
220 keys,
221 RAW_KV_REQUEST_BATCH_SIZE,
222 ))
223 .map(move |batch| Ok((batch, region.clone())))
224 .boxed(),
225 Err(e) => stream::iter(Err(e)).boxed(),
226 })
227 .boxed()
228 }
229
230 fn apply_shard(&mut self, shard: Self::Shard) {
231 let (pairs, ttls) = shard.into_iter().unzip();
232 self.pairs = pairs;
233 self.ttls = ttls;
234 }
235
236 fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
237 where
238 Self: Sized + Clone,
239 {
240 let mut cloned = Self::default();
241 cloned.context = self.context.clone();
242 cloned.cf = self.cf.clone();
243 cloned.for_cas = self.for_cas;
244 cloned.apply_shard(shard);
245 cloned
246 }
247
248 fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
249 self.set_leader(&store.region_with_leader)
250 }
251}
252
253pub fn new_raw_delete_request(
254 key: Vec<u8>,
255 cf: Option<ColumnFamily>,
256 atomic: bool,
257) -> kvrpcpb::RawDeleteRequest {
258 let mut req = kvrpcpb::RawDeleteRequest::default();
259 req.key = key;
260 req.maybe_set_cf(cf);
261 req.for_cas = atomic;
262
263 req
264}
265
266impl KvRequest for kvrpcpb::RawDeleteRequest {
267 type Response = kvrpcpb::RawDeleteResponse;
268}
269
270shardable_key!(kvrpcpb::RawDeleteRequest);
271collect_single!(kvrpcpb::RawDeleteResponse);
272impl SingleKey for kvrpcpb::RawDeleteRequest {
273 fn key(&self) -> &Vec<u8> {
274 &self.key
275 }
276}
277
278pub fn new_raw_batch_delete_request(
279 keys: Vec<Vec<u8>>,
280 cf: Option<ColumnFamily>,
281) -> kvrpcpb::RawBatchDeleteRequest {
282 let mut req = kvrpcpb::RawBatchDeleteRequest::default();
283 req.keys = keys;
284 req.maybe_set_cf(cf);
285
286 req
287}
288
289impl KvRequest for kvrpcpb::RawBatchDeleteRequest {
290 type Response = kvrpcpb::RawBatchDeleteResponse;
291}
292
293impl Batchable for kvrpcpb::RawBatchDeleteRequest {
294 type Item = Vec<u8>;
295
296 fn item_size(item: &Self::Item) -> u64 {
297 item.len() as u64
298 }
299}
300
301impl Shardable for kvrpcpb::RawBatchDeleteRequest {
302 type Shard = Vec<Vec<u8>>;
303
304 fn shards(
305 &self,
306 pd_client: &Arc<impl PdClient>,
307 ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
308 let mut keys = self.keys.clone();
309 keys.sort();
310 region_stream_for_keys(keys.into_iter(), pd_client.clone())
311 .flat_map(|result| match result {
312 Ok((keys, region)) => stream::iter(kvrpcpb::RawBatchDeleteRequest::batches(
313 keys,
314 RAW_KV_REQUEST_BATCH_SIZE,
315 ))
316 .map(move |batch| Ok((batch, region.clone())))
317 .boxed(),
318 Err(e) => stream::iter(Err(e)).boxed(),
319 })
320 .boxed()
321 }
322
323 fn apply_shard(&mut self, shard: Self::Shard) {
324 self.keys = shard;
325 }
326
327 fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
328 where
329 Self: Sized + Clone,
330 {
331 let mut cloned = Self::default();
332 cloned.context = self.context.clone();
333 cloned.cf = self.cf.clone();
334 cloned.for_cas = self.for_cas;
335 cloned.apply_shard(shard);
336 cloned
337 }
338
339 fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
340 self.set_leader(&store.region_with_leader)
341 }
342}
343
344pub fn new_raw_delete_range_request(
345 start_key: Vec<u8>,
346 end_key: Vec<u8>,
347 cf: Option<ColumnFamily>,
348) -> kvrpcpb::RawDeleteRangeRequest {
349 let mut req = kvrpcpb::RawDeleteRangeRequest::default();
350 req.start_key = start_key;
351 req.end_key = end_key;
352 req.maybe_set_cf(cf);
353
354 req
355}
356
357impl KvRequest for kvrpcpb::RawDeleteRangeRequest {
358 type Response = kvrpcpb::RawDeleteRangeResponse;
359}
360
361range_request!(kvrpcpb::RawDeleteRangeRequest);
362shardable_range!(kvrpcpb::RawDeleteRangeRequest);
363
364pub fn new_raw_scan_request(
365 start_key: Vec<u8>,
366 end_key: Vec<u8>,
367 limit: u32,
368 key_only: bool,
369 reverse: bool,
370 cf: Option<ColumnFamily>,
371) -> kvrpcpb::RawScanRequest {
372 let mut req = kvrpcpb::RawScanRequest::default();
373 if !reverse {
374 req.start_key = start_key;
375 req.end_key = end_key;
376 } else {
377 req.start_key = end_key;
378 req.end_key = start_key;
379 }
380 req.limit = limit;
381 req.key_only = key_only;
382 req.reverse = reverse;
383 req.maybe_set_cf(cf);
384
385 req
386}
387
388impl KvRequest for kvrpcpb::RawScanRequest {
389 type Response = kvrpcpb::RawScanResponse;
390}
391
392range_request!(kvrpcpb::RawScanRequest);
393shardable_range!(kvrpcpb::RawScanRequest);
394
395impl Merge<kvrpcpb::RawScanResponse> for Collect {
396 type Out = Vec<KvPair>;
397
398 fn merge(&self, input: Vec<Result<kvrpcpb::RawScanResponse>>) -> Result<Self::Out> {
399 input
400 .into_iter()
401 .flat_map_ok(|resp| resp.kvs.into_iter().map(Into::into))
402 .collect()
403 }
404}
405
406pub fn new_raw_batch_scan_request(
407 ranges: Vec<kvrpcpb::KeyRange>,
408 each_limit: u32,
409 key_only: bool,
410 cf: Option<ColumnFamily>,
411) -> kvrpcpb::RawBatchScanRequest {
412 let mut req = kvrpcpb::RawBatchScanRequest::default();
413 req.ranges = ranges;
414 req.each_limit = each_limit;
415 req.key_only = key_only;
416 req.maybe_set_cf(cf);
417
418 req
419}
420
421impl KvRequest for kvrpcpb::RawBatchScanRequest {
422 type Response = kvrpcpb::RawBatchScanResponse;
423}
424
425impl Shardable for kvrpcpb::RawBatchScanRequest {
426 type Shard = Vec<kvrpcpb::KeyRange>;
427
428 fn shards(
429 &self,
430 pd_client: &Arc<impl PdClient>,
431 ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
432 region_stream_for_ranges(self.ranges.clone(), pd_client.clone())
433 }
434
435 fn apply_shard(&mut self, shard: Self::Shard) {
436 self.ranges = shard;
437 }
438
439 fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
440 self.set_leader(&store.region_with_leader)
441 }
442}
443
444impl Merge<kvrpcpb::RawBatchScanResponse> for Collect {
445 type Out = Vec<KvPair>;
446
447 fn merge(&self, input: Vec<Result<kvrpcpb::RawBatchScanResponse>>) -> Result<Self::Out> {
448 input
449 .into_iter()
450 .flat_map_ok(|resp| resp.kvs.into_iter().map(Into::into))
451 .collect()
452 }
453}
454
455pub fn new_cas_request(
456 key: Vec<u8>,
457 value: Vec<u8>,
458 previous_value: Option<Vec<u8>>,
459 cf: Option<ColumnFamily>,
460) -> kvrpcpb::RawCasRequest {
461 let mut req = kvrpcpb::RawCasRequest::default();
462 req.key = key;
463 req.value = value;
464 match previous_value {
465 Some(v) => req.previous_value = v,
466 None => req.previous_not_exist = true,
467 }
468 req.maybe_set_cf(cf);
469 req
470}
471
472impl KvRequest for kvrpcpb::RawCasRequest {
473 type Response = kvrpcpb::RawCasResponse;
474}
475
476shardable_key!(kvrpcpb::RawCasRequest);
477collect_single!(kvrpcpb::RawCasResponse);
478impl SingleKey for kvrpcpb::RawCasRequest {
479 fn key(&self) -> &Vec<u8> {
480 &self.key
481 }
482}
483
484impl Process<kvrpcpb::RawCasResponse> for DefaultProcessor {
485 type Out = (Option<Value>, bool); fn process(&self, input: Result<kvrpcpb::RawCasResponse>) -> Result<Self::Out> {
488 let input = input?;
489 if input.previous_not_exist {
490 Ok((None, input.succeed))
491 } else {
492 Ok((Some(input.previous_value), input.succeed))
493 }
494 }
495}
496
497type RawCoprocessorRequestDataBuilder =
498 Arc<dyn Fn(metapb::Region, Vec<kvrpcpb::KeyRange>) -> Vec<u8> + Send + Sync>;
499
500pub fn new_raw_coprocessor_request(
501 copr_name: String,
502 copr_version_req: String,
503 ranges: Vec<kvrpcpb::KeyRange>,
504 data_builder: RawCoprocessorRequestDataBuilder,
505) -> RawCoprocessorRequest {
506 let mut inner = kvrpcpb::RawCoprocessorRequest::default();
507 inner.copr_name = copr_name;
508 inner.copr_version_req = copr_version_req;
509 inner.ranges = ranges;
510 RawCoprocessorRequest {
511 inner,
512 data_builder,
513 }
514}
515
516#[derive(Clone)]
517pub struct RawCoprocessorRequest {
518 inner: kvrpcpb::RawCoprocessorRequest,
519 data_builder: RawCoprocessorRequestDataBuilder,
520}
521
522#[async_trait]
523impl Request for RawCoprocessorRequest {
524 async fn dispatch(
525 &self,
526 client: &TikvClient<Channel>,
527 timeout: Duration,
528 ) -> Result<Box<dyn Any>> {
529 self.inner.dispatch(client, timeout).await
530 }
531
532 fn label(&self) -> &'static str {
533 self.inner.label()
534 }
535
536 fn as_any(&self) -> &dyn Any {
537 self.inner.as_any()
538 }
539
540 fn set_leader(&mut self, leader: &RegionWithLeader) -> Result<()> {
541 self.inner.set_leader(leader)
542 }
543
544 fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) {
545 self.inner.set_api_version(api_version);
546 }
547}
548
549impl KvRequest for RawCoprocessorRequest {
550 type Response = kvrpcpb::RawCoprocessorResponse;
551}
552
553impl Shardable for RawCoprocessorRequest {
554 type Shard = Vec<kvrpcpb::KeyRange>;
555
556 fn shards(
557 &self,
558 pd_client: &Arc<impl PdClient>,
559 ) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
560 region_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone())
561 }
562
563 fn apply_shard(&mut self, shard: Self::Shard) {
564 self.inner.ranges = shard;
565 }
566
567 fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
568 self.set_leader(&store.region_with_leader)?;
569 self.inner.data = (self.data_builder)(
570 store.region_with_leader.region.clone(),
571 self.inner.ranges.clone(),
572 );
573 Ok(())
574 }
575}
576
577#[allow(clippy::type_complexity)]
578impl
579 Process<Vec<Result<ResponseWithShard<kvrpcpb::RawCoprocessorResponse, Vec<kvrpcpb::KeyRange>>>>>
580 for DefaultProcessor
581{
582 type Out = Vec<(Vec<Range<Key>>, Vec<u8>)>;
583
584 fn process(
585 &self,
586 input: Result<
587 Vec<Result<ResponseWithShard<kvrpcpb::RawCoprocessorResponse, Vec<kvrpcpb::KeyRange>>>>,
588 >,
589 ) -> Result<Self::Out> {
590 input?
591 .into_iter()
592 .map(|shard_resp| {
593 shard_resp.map(|ResponseWithShard(resp, ranges)| {
594 (
595 ranges
596 .into_iter()
597 .map(|range| range.start_key.into()..range.end_key.into())
598 .collect(),
599 resp.data,
600 )
601 })
602 })
603 .collect::<Result<Vec<_>>>()
604 }
605}
606
607macro_rules! impl_raw_rpc_request {
608 ($name: ident) => {
609 impl RawRpcRequest for kvrpcpb::$name {
610 fn set_cf(&mut self, cf: String) {
611 self.cf = cf;
612 }
613 }
614 };
615}
616
617impl_raw_rpc_request!(RawGetRequest);
618impl_raw_rpc_request!(RawBatchGetRequest);
619impl_raw_rpc_request!(RawGetKeyTtlRequest);
620impl_raw_rpc_request!(RawPutRequest);
621impl_raw_rpc_request!(RawBatchPutRequest);
622impl_raw_rpc_request!(RawDeleteRequest);
623impl_raw_rpc_request!(RawBatchDeleteRequest);
624impl_raw_rpc_request!(RawScanRequest);
625impl_raw_rpc_request!(RawBatchScanRequest);
626impl_raw_rpc_request!(RawDeleteRangeRequest);
627impl_raw_rpc_request!(RawCasRequest);
628
629impl HasLocks for kvrpcpb::RawGetResponse {}
630
631impl HasLocks for kvrpcpb::RawBatchGetResponse {}
632
633impl HasLocks for kvrpcpb::RawGetKeyTtlResponse {}
634
635impl HasLocks for kvrpcpb::RawPutResponse {}
636
637impl HasLocks for kvrpcpb::RawBatchPutResponse {}
638
639impl HasLocks for kvrpcpb::RawDeleteResponse {}
640
641impl HasLocks for kvrpcpb::RawBatchDeleteResponse {}
642
643impl HasLocks for kvrpcpb::RawScanResponse {}
644
645impl HasLocks for kvrpcpb::RawBatchScanResponse {}
646
647impl HasLocks for kvrpcpb::RawDeleteRangeResponse {}
648
649impl HasLocks for kvrpcpb::RawCasResponse {}
650
651impl HasLocks for kvrpcpb::RawCoprocessorResponse {}
652
653#[cfg(test)]
654mod test {
655 use std::any::Any;
656 use std::collections::HashMap;
657 use std::ops::Deref;
658 use std::sync::Mutex;
659
660 use super::*;
661 use crate::backoff::DEFAULT_REGION_BACKOFF;
662 use crate::mock::MockKvClient;
663 use crate::mock::MockPdClient;
664 use crate::proto::kvrpcpb;
665 use crate::request::Keyspace;
666 use crate::request::Plan;
667
668 #[rstest::rstest]
669 #[case(Keyspace::Disable)]
670 #[case(Keyspace::Enable { keyspace_id: 0 })]
671 #[tokio::test]
672 async fn test_raw_scan(#[case] keyspace: Keyspace) {
673 let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
674 |req: &dyn Any| {
675 let req: &kvrpcpb::RawScanRequest = req.downcast_ref().unwrap();
676 assert!(req.key_only);
677 assert_eq!(req.limit, 10);
678
679 let mut resp = kvrpcpb::RawScanResponse::default();
680 for i in req.start_key[0]..req.end_key[0] {
681 let kv = kvrpcpb::KvPair {
682 key: vec![i],
683 ..Default::default()
684 };
685 resp.kvs.push(kv);
686 }
687
688 Ok(Box::new(resp) as Box<dyn Any>)
689 },
690 )));
691
692 let start: Key = vec![1].into();
693 let end: Key = vec![50].into();
694 let scan = kvrpcpb::RawScanRequest {
695 start_key: start.into(),
696 end_key: end.into(),
697 limit: 10,
698 key_only: true,
699 ..Default::default()
700 };
701 let plan = crate::request::PlanBuilder::new(client, keyspace, scan)
702 .retry_multi_region(DEFAULT_REGION_BACKOFF)
703 .merge(Collect)
704 .plan();
705 let scan = plan.execute().await.unwrap();
706
707 assert_eq!(scan.len(), 49);
708 }
710
711 #[tokio::test]
712 async fn test_raw_batch_put() -> Result<()> {
713 let region1_kvs = vec![KvPair(vec![9].into(), vec![12])];
714 let region1_ttls = vec![0];
715 let region2_kvs = vec![
716 KvPair(vec![11].into(), vec![12]),
717 KvPair("FFF".to_string().as_bytes().to_vec().into(), vec![12]),
718 ];
719 let region2_ttls = vec![0, 1];
720
721 let expected_map = HashMap::from([
722 (region1_kvs.clone(), region1_ttls.clone()),
723 (region2_kvs.clone(), region2_ttls.clone()),
724 ]);
725
726 let pairs: Vec<kvrpcpb::KvPair> = [region1_kvs, region2_kvs]
727 .concat()
728 .into_iter()
729 .map(|kv| kv.into())
730 .collect();
731 let ttls = [region1_ttls, region2_ttls].concat();
732 let cf = ColumnFamily::Default;
733
734 let actual_map: Arc<Mutex<HashMap<Vec<KvPair>, Vec<u64>>>> =
735 Arc::new(Mutex::new(HashMap::new()));
736 let fut_actual_map = actual_map.clone();
737 let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
738 move |req: &dyn Any| {
739 let req: &kvrpcpb::RawBatchPutRequest = req.downcast_ref().unwrap();
740 let kv_pair = req
741 .pairs
742 .clone()
743 .into_iter()
744 .map(|p| p.into())
745 .collect::<Vec<KvPair>>();
746 let ttls = req.ttls.clone();
747 fut_actual_map.lock().unwrap().insert(kv_pair, ttls);
748 let resp = kvrpcpb::RawBatchPutResponse::default();
749 Ok(Box::new(resp) as Box<dyn Any>)
750 },
751 )));
752
753 let batch_put_request =
754 new_raw_batch_put_request(pairs.clone(), ttls.clone(), Some(cf), false);
755 let keyspace = Keyspace::Enable { keyspace_id: 0 };
756 let plan = crate::request::PlanBuilder::new(client, keyspace, batch_put_request)
757 .retry_multi_region(DEFAULT_REGION_BACKOFF)
758 .plan();
759 let _ = plan.execute().await;
760 assert_eq!(actual_map.lock().unwrap().deref(), &expected_map);
761 Ok(())
762 }
763}