tikv_client/raw/client.rs
1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2use core::ops::Range;
3
4use std::str::FromStr;
5use std::sync::Arc;
6
7use log::debug;
8use tokio::time::sleep;
9
10use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
11use crate::common::Error;
12use crate::config::Config;
13use crate::pd::PdClient;
14use crate::pd::PdRpcClient;
15use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse};
16use crate::proto::metapb;
17use crate::raw::lowering::*;
18use crate::request::CollectSingle;
19use crate::request::EncodeKeyspace;
20use crate::request::KeyMode;
21use crate::request::Keyspace;
22use crate::request::Plan;
23use crate::request::TruncateKeyspace;
24use crate::request::{plan, Collect};
25use crate::store::{HasRegionError, RegionStore};
26use crate::Backoff;
27use crate::BoundRange;
28use crate::ColumnFamily;
29use crate::Error::RegionError;
30use crate::Key;
31use crate::KvPair;
32use crate::Result;
33use crate::Value;
34
35const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
36
37/// The TiKV raw `Client` is used to interact with TiKV using raw requests.
38///
39/// Raw requests don't need a wrapping transaction.
40/// Each request is immediately processed once executed.
41///
42/// The returned results of raw request methods are [`Future`](std::future::Future)s that must be
43/// awaited to execute.
44pub struct Client<PdC: PdClient = PdRpcClient> {
45 rpc: Arc<PdC>,
46 cf: Option<ColumnFamily>,
47 backoff: Backoff,
48 /// Whether to use the [`atomic mode`](Client::with_atomic_for_cas).
49 atomic: bool,
50 keyspace: Keyspace,
51}
52
53impl Clone for Client {
54 fn clone(&self) -> Self {
55 Self {
56 rpc: self.rpc.clone(),
57 cf: self.cf.clone(),
58 backoff: self.backoff.clone(),
59 atomic: self.atomic,
60 keyspace: self.keyspace,
61 }
62 }
63}
64
65impl Client<PdRpcClient> {
66 /// Create a raw [`Client`] and connect to the TiKV cluster.
67 ///
68 /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
69 /// PD must be provided, not the TiKV nodes. It's important to include more than one PD endpoint
70 /// (include all endpoints, if possible), this helps avoid having a single point of failure.
71 ///
72 /// # Examples
73 ///
74 /// ```rust,no_run
75 /// # use tikv_client::RawClient;
76 /// # use futures::prelude::*;
77 /// # futures::executor::block_on(async {
78 /// let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
79 /// # });
80 /// ```
81 pub async fn new<S: Into<String>>(pd_endpoints: Vec<S>) -> Result<Self> {
82 Self::new_with_config(pd_endpoints, Config::default()).await
83 }
84
85 /// Create a raw [`Client`] with a custom configuration, and connect to the TiKV cluster.
86 ///
87 /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
88 /// PD must be provided, not the TiKV nodes. It's important to include more than one PD endpoint
89 /// (include all endpoints, if possible), this helps avoid having a single point of failure.
90 ///
91 /// # Examples
92 ///
93 /// ```rust,no_run
94 /// # use tikv_client::{Config, RawClient};
95 /// # use futures::prelude::*;
96 /// # use std::time::Duration;
97 /// # futures::executor::block_on(async {
98 /// let client = RawClient::new_with_config(
99 /// vec!["192.168.0.100"],
100 /// Config::default().with_timeout(Duration::from_secs(60)),
101 /// )
102 /// .await
103 /// .unwrap();
104 /// # });
105 /// ```
106 pub async fn new_with_config<S: Into<String>>(
107 pd_endpoints: Vec<S>,
108 config: Config,
109 ) -> Result<Self> {
110 let enable_codec = config.keyspace.is_some();
111 let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
112 let rpc =
113 Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), enable_codec).await?);
114 let keyspace = match config.keyspace {
115 Some(name) => {
116 let keyspace = rpc.load_keyspace(&name).await?;
117 Keyspace::Enable {
118 keyspace_id: keyspace.id,
119 }
120 }
121 None => Keyspace::Disable,
122 };
123 Ok(Client {
124 rpc,
125 cf: None,
126 backoff: DEFAULT_REGION_BACKOFF,
127 atomic: false,
128 keyspace,
129 })
130 }
131
132 /// Create a new client which is a clone of `self`, but which uses an explicit column family for
133 /// all requests.
134 ///
135 /// This function returns a new `Client`; requests created with the new client will use the
136 /// supplied column family. The original `Client` can still be used (without the new
137 /// column family).
138 ///
139 /// By default, raw clients use the `Default` column family.
140 ///
141 /// # Examples
142 ///
143 /// ```rust,no_run
144 /// # use tikv_client::{Config, RawClient, ColumnFamily};
145 /// # use futures::prelude::*;
146 /// # use std::convert::TryInto;
147 /// # futures::executor::block_on(async {
148 /// let client = RawClient::new(vec!["192.168.0.100"])
149 /// .await
150 /// .unwrap()
151 /// .with_cf(ColumnFamily::Write);
152 /// // Fetch a value at "foo" from the Write CF.
153 /// let get_request = client.get("foo".to_owned());
154 /// # });
155 /// ```
156 #[must_use]
157 pub fn with_cf(&self, cf: ColumnFamily) -> Self {
158 Client {
159 rpc: self.rpc.clone(),
160 cf: Some(cf),
161 backoff: self.backoff.clone(),
162 atomic: self.atomic,
163 keyspace: self.keyspace,
164 }
165 }
166
167 /// Set the [`Backoff`] strategy for retrying requests.
168 /// The default strategy is [`DEFAULT_REGION_BACKOFF`](crate::backoff::DEFAULT_REGION_BACKOFF).
169 /// See [`Backoff`] for more information.
170 /// # Examples
171 /// ```rust,no_run
172 /// # use tikv_client::{Config, RawClient, ColumnFamily};
173 /// # use tikv_client::backoff::DEFAULT_REGION_BACKOFF;
174 /// # use futures::prelude::*;
175 /// # use std::convert::TryInto;
176 /// # futures::executor::block_on(async {
177 /// let client = RawClient::new(vec!["192.168.0.100"])
178 /// .await
179 /// .unwrap()
180 /// .with_backoff(DEFAULT_REGION_BACKOFF);
181 /// // Fetch a value at "foo" from the Write CF.
182 /// let get_request = client.get("foo".to_owned());
183 /// # });
184 /// ```
185 #[must_use]
186 pub fn with_backoff(&self, backoff: Backoff) -> Self {
187 Client {
188 rpc: self.rpc.clone(),
189 cf: self.cf.clone(),
190 backoff,
191 atomic: self.atomic,
192 keyspace: self.keyspace,
193 }
194 }
195
196 /// Set to use the atomic mode.
197 ///
198 /// The only reason of using atomic mode is the
199 /// [`compare_and_swap`](Client::compare_and_swap) operation. To guarantee
200 /// the atomicity of CAS, write operations like [`put`](Client::put) or
201 /// [`delete`](Client::delete) in atomic mode are more expensive. Some
202 /// operations are not supported in the mode.
203 #[must_use]
204 pub fn with_atomic_for_cas(&self) -> Self {
205 Client {
206 rpc: self.rpc.clone(),
207 cf: self.cf.clone(),
208 backoff: self.backoff.clone(),
209 atomic: true,
210 keyspace: self.keyspace,
211 }
212 }
213}
214
215impl<PdC: PdClient> Client<PdC> {
216 /// Create a new 'get' request.
217 ///
218 /// Once resolved this request will result in the fetching of the value associated with the
219 /// given key.
220 ///
221 /// Retuning `Ok(None)` indicates the key does not exist in TiKV.
222 ///
223 /// # Examples
224 /// ```rust,no_run
225 /// # use tikv_client::{Value, Config, RawClient};
226 /// # use futures::prelude::*;
227 /// # futures::executor::block_on(async {
228 /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
229 /// let key = "TiKV".to_owned();
230 /// let req = client.get(key);
231 /// let result: Option<Value> = req.await.unwrap();
232 /// # });
233 /// ```
234 pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
235 debug!("invoking raw get request");
236 let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw);
237 let request = new_raw_get_request(key, self.cf.clone());
238 let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
239 .retry_multi_region(self.backoff.clone())
240 .merge(CollectSingle)
241 .post_process_default()
242 .plan();
243 plan.execute().await
244 }
245
246 /// Create a new 'batch get' request.
247 ///
248 /// Once resolved this request will result in the fetching of the values associated with the
249 /// given keys.
250 ///
251 /// Non-existent entries will not appear in the result. The order of the keys is not retained in the result.
252 ///
253 /// # Examples
254 /// ```rust,no_run
255 /// # use tikv_client::{KvPair, Config, RawClient};
256 /// # use futures::prelude::*;
257 /// # futures::executor::block_on(async {
258 /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
259 /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
260 /// let req = client.batch_get(keys);
261 /// let result: Vec<KvPair> = req.await.unwrap();
262 /// # });
263 /// ```
264 pub async fn batch_get(
265 &self,
266 keys: impl IntoIterator<Item = impl Into<Key>>,
267 ) -> Result<Vec<KvPair>> {
268 debug!("invoking raw batch_get request");
269 let keys = keys
270 .into_iter()
271 .map(|k| k.into().encode_keyspace(self.keyspace, KeyMode::Raw));
272 let request = new_raw_batch_get_request(keys, self.cf.clone());
273 let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
274 .retry_multi_region(self.backoff.clone())
275 .merge(Collect)
276 .plan();
277 plan.execute().await.map(|r| {
278 r.into_iter()
279 .map(|pair| pair.truncate_keyspace(self.keyspace))
280 .collect()
281 })
282 }
283
284 /// Create a new 'get key ttl' request.
285 ///
286 /// Once resolved this request will result in the fetching of the alive time left for the
287 /// given key.
288 ///
289 /// Retuning `Ok(None)` indicates the key does not exist in TiKV.
290 ///
291 /// # Examples
292 /// # use tikv_client::{Value, Config, RawClient};
293 /// # use futures::prelude::*;
294 /// # futures::executor::block_on(async {
295 /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
296 /// let key = "TiKV".to_owned();
297 /// let req = client.get_key_ttl_secs(key);
298 /// let result: Option<Value> = req.await.unwrap();
299 /// # });
300 pub async fn get_key_ttl_secs(&self, key: impl Into<Key>) -> Result<Option<u64>> {
301 debug!("invoking raw get_key_ttl_secs request");
302 let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw);
303 let request = new_raw_get_key_ttl_request(key, self.cf.clone());
304 let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
305 .retry_multi_region(self.backoff.clone())
306 .merge(CollectSingle)
307 .post_process_default()
308 .plan();
309 plan.execute().await
310 }
311
312 /// Create a new 'put' request.
313 ///
314 /// Once resolved this request will result in the setting of the value associated with the given key.
315 ///
316 /// # Examples
317 /// ```rust,no_run
318 /// # use tikv_client::{Key, Value, Config, RawClient};
319 /// # use futures::prelude::*;
320 /// # futures::executor::block_on(async {
321 /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
322 /// let key = "TiKV".to_owned();
323 /// let val = "TiKV".to_owned();
324 /// let req = client.put(key, val);
325 /// let result: () = req.await.unwrap();
326 /// # });
327 /// ```
328 pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
329 self.put_with_ttl(key, value, 0).await
330 }
331
332 pub async fn put_with_ttl(
333 &self,
334 key: impl Into<Key>,
335 value: impl Into<Value>,
336 ttl_secs: u64,
337 ) -> Result<()> {
338 debug!("invoking raw put request");
339 let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw);
340 let request =
341 new_raw_put_request(key, value.into(), self.cf.clone(), ttl_secs, self.atomic);
342 let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
343 .retry_multi_region(self.backoff.clone())
344 .merge(CollectSingle)
345 .extract_error()
346 .plan();
347 plan.execute().await?;
348 Ok(())
349 }
350
351 /// Create a new 'batch put' request.
352 ///
353 /// Once resolved this request will result in the setting of the values associated with the given keys.
354 ///
355 /// # Examples
356 /// ```rust,no_run
357 /// # use tikv_client::{Result, KvPair, Key, Value, Config, RawClient, IntoOwnedRange};
358 /// # use futures::prelude::*;
359 /// # futures::executor::block_on(async {
360 /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
361 /// let kvpair1 = ("PD".to_owned(), "Go".to_owned());
362 /// let kvpair2 = ("TiKV".to_owned(), "Rust".to_owned());
363 /// let iterable = vec![kvpair1, kvpair2];
364 /// let req = client.batch_put(iterable);
365 /// let result: () = req.await.unwrap();
366 /// # });
367 /// ```
368 pub async fn batch_put(
369 &self,
370 pairs: impl IntoIterator<Item = impl Into<KvPair>>,
371 ) -> Result<()> {
372 self.batch_put_with_ttl(pairs, std::iter::repeat(0)).await
373 }
374
375 pub async fn batch_put_with_ttl(
376 &self,
377 pairs: impl IntoIterator<Item = impl Into<KvPair>>,
378 ttls: impl IntoIterator<Item = u64>,
379 ) -> Result<()> {
380 debug!("invoking raw batch_put request");
381 let pairs = pairs
382 .into_iter()
383 .map(|pair| pair.into().encode_keyspace(self.keyspace, KeyMode::Raw));
384 let request =
385 new_raw_batch_put_request(pairs, ttls.into_iter(), self.cf.clone(), self.atomic);
386 let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
387 .retry_multi_region(self.backoff.clone())
388 .extract_error()
389 .plan();
390 plan.execute().await?;
391 Ok(())
392 }
393
394 /// Create a new 'delete' request.
395 ///
396 /// Once resolved this request will result in the deletion of the given key.
397 ///
398 /// It does not return an error if the key does not exist in TiKV.
399 ///
400 /// # Examples
401 /// ```rust,no_run
402 /// # use tikv_client::{Key, Config, RawClient};
403 /// # use futures::prelude::*;
404 /// # futures::executor::block_on(async {
405 /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
406 /// let key = "TiKV".to_owned();
407 /// let req = client.delete(key);
408 /// let result: () = req.await.unwrap();
409 /// # });
410 /// ```
411 pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
412 debug!("invoking raw delete request");
413 let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw);
414 let request = new_raw_delete_request(key, self.cf.clone(), self.atomic);
415 let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
416 .retry_multi_region(self.backoff.clone())
417 .merge(CollectSingle)
418 .extract_error()
419 .plan();
420 plan.execute().await?;
421 Ok(())
422 }
423
424 /// Create a new 'batch delete' request.
425 ///
426 /// Once resolved this request will result in the deletion of the given keys.
427 ///
428 /// It does not return an error if some of the keys do not exist and will delete the others.
429 ///
430 /// # Examples
431 /// ```rust,no_run
432 /// # use tikv_client::{Config, RawClient};
433 /// # use futures::prelude::*;
434 /// # futures::executor::block_on(async {
435 /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
436 /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
437 /// let req = client.batch_delete(keys);
438 /// let result: () = req.await.unwrap();
439 /// # });
440 /// ```
441 pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
442 debug!("invoking raw batch_delete request");
443 self.assert_non_atomic()?;
444 let keys = keys
445 .into_iter()
446 .map(|k| k.into().encode_keyspace(self.keyspace, KeyMode::Raw));
447 let request = new_raw_batch_delete_request(keys, self.cf.clone());
448 let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
449 .retry_multi_region(self.backoff.clone())
450 .extract_error()
451 .plan();
452 plan.execute().await?;
453 Ok(())
454 }
455
456 /// Create a new 'delete range' request.
457 ///
458 /// Once resolved this request will result in the deletion of all keys lying in the given range.
459 ///
460 /// # Examples
461 /// ```rust,no_run
462 /// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
463 /// # use futures::prelude::*;
464 /// # futures::executor::block_on(async {
465 /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
466 /// let inclusive_range = "TiKV"..="TiDB";
467 /// let req = client.delete_range(inclusive_range.into_owned());
468 /// let result: () = req.await.unwrap();
469 /// # });
470 /// ```
471 pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
472 debug!("invoking raw delete_range request");
473 self.assert_non_atomic()?;
474 let range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
475 let request = new_raw_delete_range_request(range, self.cf.clone());
476 let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
477 .retry_multi_region(self.backoff.clone())
478 .extract_error()
479 .plan();
480 plan.execute().await?;
481 Ok(())
482 }
483
484 /// Create a new 'scan' request.
485 ///
486 /// Once resolved this request will result in a `Vec` of key-value pairs that lies in the specified range.
487 ///
488 /// If the number of eligible key-value pairs are greater than `limit`,
489 /// only the first `limit` pairs are returned, ordered by the key.
490 ///
491 ///
492 /// # Examples
493 /// ```rust,no_run
494 /// # use tikv_client::{KvPair, Config, RawClient, IntoOwnedRange};
495 /// # use futures::prelude::*;
496 /// # futures::executor::block_on(async {
497 /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
498 /// let inclusive_range = "TiKV"..="TiDB";
499 /// let req = client.scan(inclusive_range.into_owned(), 2);
500 /// let result: Vec<KvPair> = req.await.unwrap();
501 /// # });
502 /// ```
503 pub async fn scan(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<KvPair>> {
504 debug!("invoking raw scan request");
505 self.scan_inner(range.into(), limit, false, false).await
506 }
507
508 /// Create a new 'scan' request but scans in "reverse" direction.
509 ///
510 /// Once resolved this request will result in a `Vec` of key-value pairs that lies in the specified range.
511 ///
512 /// If the number of eligible key-value pairs are greater than `limit`,
513 /// only the first `limit` pairs are returned, ordered by the key.
514 ///
515 ///
516 /// Reverse Scan queries continuous kv pairs in range [startKey, endKey),
517 /// from startKey(lowerBound) to endKey(upperBound) in reverse order, up to limit pairs.
518 /// The returned keys are in reversed lexicographical order.
519 /// If you want to include the endKey or exclude the startKey, push a '\0' to the key.
520 /// It doesn't support Scanning from "", because locating the last Region is not yet implemented.
521 /// # Examples
522 /// ```rust,no_run
523 /// # use tikv_client::{KvPair, Config, RawClient, IntoOwnedRange};
524 /// # use futures::prelude::*;
525 /// # futures::executor::block_on(async {
526 /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
527 /// let inclusive_range = "TiKV"..="TiDB";
528 /// let req = client.scan_reverse(inclusive_range.into_owned(), 2);
529 /// let result: Vec<KvPair> = req.await.unwrap();
530 /// # });
531 /// ```
532 pub async fn scan_reverse(
533 &self,
534 range: impl Into<BoundRange>,
535 limit: u32,
536 ) -> Result<Vec<KvPair>> {
537 debug!("invoking raw reverse scan request");
538 self.scan_inner(range.into(), limit, false, true).await
539 }
540
541 /// Create a new 'scan' request that only returns the keys.
542 ///
543 /// Once resolved this request will result in a `Vec` of keys that lies in the specified range.
544 ///
545 /// If the number of eligible keys are greater than `limit`,
546 /// only the first `limit` pairs are returned, ordered by the key.
547 ///
548 ///
549 /// # Examples
550 /// ```rust,no_run
551 /// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
552 /// # use futures::prelude::*;
553 /// # futures::executor::block_on(async {
554 /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
555 /// let inclusive_range = "TiKV"..="TiDB";
556 /// let req = client.scan_keys(inclusive_range.into_owned(), 2);
557 /// let result: Vec<Key> = req.await.unwrap();
558 /// # });
559 /// ```
560 pub async fn scan_keys(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<Key>> {
561 debug!("invoking raw scan_keys request");
562 Ok(self
563 .scan_inner(range, limit, true, false)
564 .await?
565 .into_iter()
566 .map(KvPair::into_key)
567 .collect())
568 }
569
570 /// Create a new 'scan' request that only returns the keys in reverse order.
571 ///
572 /// Once resolved this request will result in a `Vec` of keys that lies in the specified range.
573 ///
574 /// If the number of eligible keys are greater than `limit`,
575 /// only the first `limit` pairs are returned, ordered by the key.
576 ///
577 ///
578 /// # Examples
579 /// ```rust,no_run
580 /// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
581 /// # use futures::prelude::*;
582 /// # futures::executor::block_on(async {
583 /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
584 /// let inclusive_range = "TiKV"..="TiDB";
585 /// let req = client.scan_keys(inclusive_range.into_owned(), 2);
586 /// let result: Vec<Key> = req.await.unwrap();
587 /// # });
588 /// ```
589 pub async fn scan_keys_reverse(
590 &self,
591 range: impl Into<BoundRange>,
592 limit: u32,
593 ) -> Result<Vec<Key>> {
594 debug!("invoking raw scan_keys request");
595 Ok(self
596 .scan_inner(range, limit, true, true)
597 .await?
598 .into_iter()
599 .map(KvPair::into_key)
600 .collect())
601 }
602
603 /// Create a new 'batch scan' request.
604 ///
605 /// Once resolved this request will result in a set of scanners over the given keys.
606 ///
607 /// **Warning**: This method is experimental. The `each_limit` parameter does not work as expected.
608 /// It does not limit the number of results returned of each range,
609 /// instead it limits the number of results in each region of each range.
610 /// As a result, you may get **more than** `each_limit` key-value pairs for each range.
611 /// But you should not miss any entries.
612 ///
613 /// # Examples
614 /// ```rust,no_run
615 /// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
616 /// # use futures::prelude::*;
617 /// # futures::executor::block_on(async {
618 /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
619 /// let inclusive_range1 = "TiDB"..="TiKV";
620 /// let inclusive_range2 = "TiKV"..="TiSpark";
621 /// let iterable = vec![inclusive_range1.into_owned(), inclusive_range2.into_owned()];
622 /// let req = client.batch_scan(iterable, 2);
623 /// let result = req.await;
624 /// # });
625 /// ```
626 pub async fn batch_scan(
627 &self,
628 ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
629 each_limit: u32,
630 ) -> Result<Vec<KvPair>> {
631 debug!("invoking raw batch_scan request");
632 self.batch_scan_inner(ranges, each_limit, false).await
633 }
634
635 /// Create a new 'batch scan' request that only returns the keys.
636 ///
637 /// Once resolved this request will result in a set of scanners over the given keys.
638 ///
639 /// **Warning**: This method is experimental.
640 /// The `each_limit` parameter does not limit the number of results returned of each range,
641 /// instead it limits the number of results in each region of each range.
642 /// As a result, you may get **more than** `each_limit` key-value pairs for each range,
643 /// but you should not miss any entries.
644 ///
645 /// # Examples
646 /// ```rust,no_run
647 /// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
648 /// # use futures::prelude::*;
649 /// # futures::executor::block_on(async {
650 /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
651 /// let inclusive_range1 = "TiDB"..="TiKV";
652 /// let inclusive_range2 = "TiKV"..="TiSpark";
653 /// let iterable = vec![inclusive_range1.into_owned(), inclusive_range2.into_owned()];
654 /// let req = client.batch_scan(iterable, 2);
655 /// let result = req.await;
656 /// # });
657 /// ```
658 pub async fn batch_scan_keys(
659 &self,
660 ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
661 each_limit: u32,
662 ) -> Result<Vec<Key>> {
663 debug!("invoking raw batch_scan_keys request");
664 Ok(self
665 .batch_scan_inner(ranges, each_limit, true)
666 .await?
667 .into_iter()
668 .map(KvPair::into_key)
669 .collect())
670 }
671
672 /// Create a new *atomic* 'compare and set' request.
673 ///
674 /// Once resolved this request will result in an atomic `compare and set'
675 /// operation for the given key.
676 ///
677 /// If the value retrived is equal to `current_value`, `new_value` is
678 /// written.
679 ///
680 /// # Return Value
681 ///
682 /// A tuple is returned if successful: the previous value and whether the
683 /// value is swapped
684 pub async fn compare_and_swap(
685 &self,
686 key: impl Into<Key>,
687 previous_value: impl Into<Option<Value>>,
688 new_value: impl Into<Value>,
689 ) -> Result<(Option<Value>, bool)> {
690 debug!("invoking raw compare_and_swap request");
691 self.assert_atomic()?;
692 let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw);
693 let req = new_cas_request(
694 key,
695 new_value.into(),
696 previous_value.into(),
697 self.cf.clone(),
698 );
699 let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, req)
700 .retry_multi_region(self.backoff.clone())
701 .merge(CollectSingle)
702 .post_process_default()
703 .plan();
704 plan.execute().await
705 }
706
707 pub async fn coprocessor(
708 &self,
709 copr_name: impl Into<String>,
710 copr_version_req: impl Into<String>,
711 ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
712 request_builder: impl Fn(metapb::Region, Vec<Range<Key>>) -> Vec<u8> + Send + Sync + 'static,
713 ) -> Result<Vec<(Vec<Range<Key>>, Vec<u8>)>> {
714 let copr_version_req = copr_version_req.into();
715 semver::VersionReq::from_str(&copr_version_req)?;
716 let ranges = ranges
717 .into_iter()
718 .map(|range| range.into().encode_keyspace(self.keyspace, KeyMode::Raw));
719 let keyspace = self.keyspace;
720 let request_builder = move |region, ranges: Vec<Range<Key>>| {
721 request_builder(
722 region,
723 ranges
724 .into_iter()
725 .map(|range| range.truncate_keyspace(keyspace))
726 .collect(),
727 )
728 };
729 let req = new_raw_coprocessor_request(
730 copr_name.into(),
731 copr_version_req,
732 ranges,
733 request_builder,
734 );
735 let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, req)
736 .preserve_shard()
737 .retry_multi_region(self.backoff.clone())
738 .post_process_default()
739 .plan();
740 Ok(plan
741 .execute()
742 .await?
743 .into_iter()
744 .map(|(ranges, data)| (ranges.truncate_keyspace(keyspace), data))
745 .collect())
746 }
747
748 async fn scan_inner(
749 &self,
750 range: impl Into<BoundRange>,
751 limit: u32,
752 key_only: bool,
753 reverse: bool,
754 ) -> Result<Vec<KvPair>> {
755 if limit > MAX_RAW_KV_SCAN_LIMIT {
756 return Err(Error::MaxScanLimitExceeded {
757 limit,
758 max_limit: MAX_RAW_KV_SCAN_LIMIT,
759 });
760 }
761 let backoff = DEFAULT_STORE_BACKOFF;
762 let mut range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
763 let mut result = Vec::new();
764 let mut current_limit = limit;
765 let (start_key, end_key) = range.clone().into_keys();
766 let mut current_key: Key = start_key;
767
768 while current_limit > 0 {
769 let scan_args = ScanInnerArgs {
770 start_key: current_key.clone(),
771 end_key: end_key.clone(),
772 limit: current_limit,
773 key_only,
774 reverse,
775 backoff: backoff.clone(),
776 };
777 let (res, next_key) = self.retryable_scan(scan_args).await?;
778
779 let mut kvs = res
780 .map(|r| r.kvs.into_iter().map(Into::into).collect::<Vec<KvPair>>())
781 .unwrap_or(Vec::new());
782
783 if !kvs.is_empty() {
784 current_limit -= kvs.len() as u32;
785 result.append(&mut kvs);
786 }
787 if end_key.clone().is_some_and(|ek| ek <= next_key) {
788 break;
789 } else {
790 current_key = next_key;
791 range = BoundRange::new(std::ops::Bound::Included(current_key.clone()), range.to);
792 }
793 }
794
795 // limit is a soft limit, so we need check the number of results
796 result.truncate(limit as usize);
797
798 // truncate the prefix of keys
799 let result = result.truncate_keyspace(self.keyspace);
800
801 Ok(result)
802 }
803
804 async fn retryable_scan(
805 &self,
806 mut scan_args: ScanInnerArgs,
807 ) -> Result<(Option<RawScanResponse>, Key)> {
808 let start_key = scan_args.start_key;
809 let end_key = scan_args.end_key;
810 loop {
811 let region = self.rpc.clone().region_for_key(&start_key).await?;
812 let store = self.rpc.clone().store_for_id(region.id()).await?;
813 let request = new_raw_scan_request(
814 (start_key.clone(), end_key.clone()).into(),
815 scan_args.limit,
816 scan_args.key_only,
817 scan_args.reverse,
818 self.cf.clone(),
819 );
820 let resp = self.do_store_scan(store.clone(), request.clone()).await;
821 return match resp {
822 Ok(mut r) => {
823 if let Some(err) = r.region_error() {
824 let status =
825 plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone())
826 .await?;
827 if status {
828 continue;
829 } else if let Some(duration) = scan_args.backoff.next_delay_duration() {
830 sleep(duration).await;
831 continue;
832 } else {
833 return Err(RegionError(Box::new(err)));
834 }
835 }
836 Ok((Some(r), region.end_key()))
837 }
838 Err(err) => Err(err),
839 };
840 }
841 }
842
843 async fn do_store_scan(
844 &self,
845 store: RegionStore,
846 scan_request: RawScanRequest,
847 ) -> Result<RawScanResponse> {
848 crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, scan_request)
849 .single_region_with_store(store.clone())
850 .await?
851 .plan()
852 .execute()
853 .await
854 }
855
856 async fn batch_scan_inner(
857 &self,
858 ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
859 each_limit: u32,
860 key_only: bool,
861 ) -> Result<Vec<KvPair>> {
862 if each_limit > MAX_RAW_KV_SCAN_LIMIT {
863 return Err(Error::MaxScanLimitExceeded {
864 limit: each_limit,
865 max_limit: MAX_RAW_KV_SCAN_LIMIT,
866 });
867 }
868
869 let ranges = ranges
870 .into_iter()
871 .map(|range| range.into().encode_keyspace(self.keyspace, KeyMode::Raw));
872
873 let request = new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone());
874 let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
875 .retry_multi_region(self.backoff.clone())
876 .merge(Collect)
877 .plan();
878 plan.execute().await.map(|r| {
879 r.into_iter()
880 .map(|pair| pair.truncate_keyspace(self.keyspace))
881 .collect()
882 })
883 }
884
885 fn assert_non_atomic(&self) -> Result<()> {
886 if !self.atomic {
887 Ok(())
888 } else {
889 Err(Error::UnsupportedMode)
890 }
891 }
892
893 fn assert_atomic(&self) -> Result<()> {
894 if self.atomic {
895 Ok(())
896 } else {
897 Err(Error::UnsupportedMode)
898 }
899 }
900}
901
902#[derive(Clone)]
903struct ScanInnerArgs {
904 start_key: Key,
905 end_key: Option<Key>,
906 limit: u32,
907 key_only: bool,
908 reverse: bool,
909 backoff: Backoff,
910}
911
912#[cfg(test)]
913mod tests {
914 use std::any::Any;
915 use std::sync::Arc;
916
917 use super::*;
918 use crate::mock::MockKvClient;
919 use crate::mock::MockPdClient;
920 use crate::proto::kvrpcpb;
921 use crate::Result;
922
923 #[tokio::test]
924 async fn test_batch_put_with_ttl() -> Result<()> {
925 let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
926 move |req: &dyn Any| {
927 if req.downcast_ref::<kvrpcpb::RawBatchPutRequest>().is_some() {
928 let resp = kvrpcpb::RawBatchPutResponse {
929 ..Default::default()
930 };
931 Ok(Box::new(resp) as Box<dyn Any>)
932 } else {
933 unreachable!()
934 }
935 },
936 )));
937 let client = Client {
938 rpc: pd_client,
939 cf: Some(ColumnFamily::Default),
940 backoff: DEFAULT_REGION_BACKOFF,
941 atomic: false,
942 keyspace: Keyspace::Enable { keyspace_id: 0 },
943 };
944 let pairs = vec![
945 KvPair(vec![11].into(), vec![12]),
946 KvPair(vec![11].into(), vec![12]),
947 ];
948 let ttls = vec![0, 0];
949 assert!(client.batch_put_with_ttl(pairs, ttls).await.is_ok());
950 Ok(())
951 }
952
953 #[tokio::test]
954 async fn test_raw_coprocessor() -> Result<()> {
955 let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
956 move |req: &dyn Any| {
957 if let Some(req) = req.downcast_ref::<kvrpcpb::RawCoprocessorRequest>() {
958 assert_eq!(req.copr_name, "example");
959 assert_eq!(req.copr_version_req, "0.1.0");
960 let resp = kvrpcpb::RawCoprocessorResponse {
961 data: req.data.clone(),
962 ..Default::default()
963 };
964 Ok(Box::new(resp) as Box<dyn Any>)
965 } else {
966 unreachable!()
967 }
968 },
969 )));
970 let client = Client {
971 rpc: pd_client,
972 cf: Some(ColumnFamily::Default),
973 backoff: DEFAULT_REGION_BACKOFF,
974 atomic: false,
975 keyspace: Keyspace::Enable { keyspace_id: 0 },
976 };
977 let resps = client
978 .coprocessor(
979 "example",
980 "0.1.0",
981 vec![vec![5]..vec![15], vec![20]..vec![]],
982 |region, ranges| format!("{:?}:{:?}", region.id, ranges).into_bytes(),
983 )
984 .await?;
985 let resps: Vec<_> = resps
986 .into_iter()
987 .map(|(ranges, data)| (ranges, String::from_utf8(data).unwrap()))
988 .collect();
989 assert_eq!(
990 resps,
991 vec![(
992 vec![
993 Key::from(vec![5])..Key::from(vec![15]),
994 Key::from(vec![20])..Key::from(vec![])
995 ],
996 "2:[Key(05)..Key(0F), Key(14)..Key()]".to_string(),
997 ),]
998 );
999 Ok(())
1000 }
1001}