Skip to main content

tikv_client/raw/
client.rs

1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2use core::ops::Range;
3
4use std::str::FromStr;
5use std::sync::Arc;
6
7use log::debug;
8use tokio::time::sleep;
9
10use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
11use crate::common::Error;
12use crate::config::Config;
13use crate::pd::PdClient;
14use crate::pd::PdRpcClient;
15use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse};
16use crate::proto::metapb;
17use crate::raw::lowering::*;
18use crate::request::CollectSingle;
19use crate::request::EncodeKeyspace;
20use crate::request::KeyMode;
21use crate::request::Keyspace;
22use crate::request::Plan;
23use crate::request::TruncateKeyspace;
24use crate::request::{plan, Collect};
25use crate::store::{HasRegionError, RegionStore};
26use crate::Backoff;
27use crate::BoundRange;
28use crate::ColumnFamily;
29use crate::Error::RegionError;
30use crate::Key;
31use crate::KvPair;
32use crate::Result;
33use crate::Value;
34
35const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
36
37/// The TiKV raw `Client` is used to interact with TiKV using raw requests.
38///
39/// Raw requests don't need a wrapping transaction.
40/// Each request is immediately processed once executed.
41///
42/// The returned results of raw request methods are [`Future`](std::future::Future)s that must be
43/// awaited to execute.
44pub struct Client<PdC: PdClient = PdRpcClient> {
45    rpc: Arc<PdC>,
46    cf: Option<ColumnFamily>,
47    backoff: Backoff,
48    /// Whether to use the [`atomic mode`](Client::with_atomic_for_cas).
49    atomic: bool,
50    keyspace: Keyspace,
51}
52
53impl Clone for Client {
54    fn clone(&self) -> Self {
55        Self {
56            rpc: self.rpc.clone(),
57            cf: self.cf.clone(),
58            backoff: self.backoff.clone(),
59            atomic: self.atomic,
60            keyspace: self.keyspace,
61        }
62    }
63}
64
65impl Client<PdRpcClient> {
66    /// Create a raw [`Client`] and connect to the TiKV cluster.
67    ///
68    /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
69    /// PD must be provided, not the TiKV nodes. It's important to include more than one PD endpoint
70    /// (include all endpoints, if possible), this helps avoid having a single point of failure.
71    ///
72    /// # Examples
73    ///
74    /// ```rust,no_run
75    /// # use tikv_client::RawClient;
76    /// # use futures::prelude::*;
77    /// # futures::executor::block_on(async {
78    /// let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
79    /// # });
80    /// ```
81    pub async fn new<S: Into<String>>(pd_endpoints: Vec<S>) -> Result<Self> {
82        Self::new_with_config(pd_endpoints, Config::default()).await
83    }
84
85    /// Create a raw [`Client`] with a custom configuration, and connect to the TiKV cluster.
86    ///
87    /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
88    /// PD must be provided, not the TiKV nodes. It's important to include more than one PD endpoint
89    /// (include all endpoints, if possible), this helps avoid having a single point of failure.
90    ///
91    /// # Examples
92    ///
93    /// ```rust,no_run
94    /// # use tikv_client::{Config, RawClient};
95    /// # use futures::prelude::*;
96    /// # use std::time::Duration;
97    /// # futures::executor::block_on(async {
98    /// let client = RawClient::new_with_config(
99    ///     vec!["192.168.0.100"],
100    ///     Config::default().with_timeout(Duration::from_secs(60)),
101    /// )
102    /// .await
103    /// .unwrap();
104    /// # });
105    /// ```
106    pub async fn new_with_config<S: Into<String>>(
107        pd_endpoints: Vec<S>,
108        config: Config,
109    ) -> Result<Self> {
110        let enable_codec = config.keyspace.is_some();
111        let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
112        let rpc =
113            Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), enable_codec).await?);
114        let keyspace = match config.keyspace {
115            Some(name) => {
116                let keyspace = rpc.load_keyspace(&name).await?;
117                Keyspace::Enable {
118                    keyspace_id: keyspace.id,
119                }
120            }
121            None => Keyspace::Disable,
122        };
123        Ok(Client {
124            rpc,
125            cf: None,
126            backoff: DEFAULT_REGION_BACKOFF,
127            atomic: false,
128            keyspace,
129        })
130    }
131
132    /// Create a new client which is a clone of `self`, but which uses an explicit column family for
133    /// all requests.
134    ///
135    /// This function returns a new `Client`; requests created with the new client will use the
136    /// supplied column family. The original `Client` can still be used (without the new
137    /// column family).
138    ///
139    /// By default, raw clients use the `Default` column family.
140    ///
141    /// # Examples
142    ///
143    /// ```rust,no_run
144    /// # use tikv_client::{Config, RawClient, ColumnFamily};
145    /// # use futures::prelude::*;
146    /// # use std::convert::TryInto;
147    /// # futures::executor::block_on(async {
148    /// let client = RawClient::new(vec!["192.168.0.100"])
149    ///     .await
150    ///     .unwrap()
151    ///     .with_cf(ColumnFamily::Write);
152    /// // Fetch a value at "foo" from the Write CF.
153    /// let get_request = client.get("foo".to_owned());
154    /// # });
155    /// ```
156    #[must_use]
157    pub fn with_cf(&self, cf: ColumnFamily) -> Self {
158        Client {
159            rpc: self.rpc.clone(),
160            cf: Some(cf),
161            backoff: self.backoff.clone(),
162            atomic: self.atomic,
163            keyspace: self.keyspace,
164        }
165    }
166
167    /// Set the [`Backoff`] strategy for retrying requests.
168    /// The default strategy is [`DEFAULT_REGION_BACKOFF`](crate::backoff::DEFAULT_REGION_BACKOFF).
169    /// See [`Backoff`] for more information.
170    /// # Examples
171    /// ```rust,no_run
172    /// # use tikv_client::{Config, RawClient, ColumnFamily};
173    /// # use tikv_client::backoff::DEFAULT_REGION_BACKOFF;
174    /// # use futures::prelude::*;
175    /// # use std::convert::TryInto;
176    /// # futures::executor::block_on(async {
177    /// let client = RawClient::new(vec!["192.168.0.100"])
178    ///     .await
179    ///     .unwrap()
180    ///     .with_backoff(DEFAULT_REGION_BACKOFF);
181    /// // Fetch a value at "foo" from the Write CF.
182    /// let get_request = client.get("foo".to_owned());
183    /// # });
184    /// ```
185    #[must_use]
186    pub fn with_backoff(&self, backoff: Backoff) -> Self {
187        Client {
188            rpc: self.rpc.clone(),
189            cf: self.cf.clone(),
190            backoff,
191            atomic: self.atomic,
192            keyspace: self.keyspace,
193        }
194    }
195
196    /// Set to use the atomic mode.
197    ///
198    /// The only reason of using atomic mode is the
199    /// [`compare_and_swap`](Client::compare_and_swap) operation. To guarantee
200    /// the atomicity of CAS, write operations like [`put`](Client::put) or
201    /// [`delete`](Client::delete) in atomic mode are more expensive. Some
202    /// operations are not supported in the mode.
203    #[must_use]
204    pub fn with_atomic_for_cas(&self) -> Self {
205        Client {
206            rpc: self.rpc.clone(),
207            cf: self.cf.clone(),
208            backoff: self.backoff.clone(),
209            atomic: true,
210            keyspace: self.keyspace,
211        }
212    }
213}
214
215impl<PdC: PdClient> Client<PdC> {
216    /// Create a new 'get' request.
217    ///
218    /// Once resolved this request will result in the fetching of the value associated with the
219    /// given key.
220    ///
221    /// Retuning `Ok(None)` indicates the key does not exist in TiKV.
222    ///
223    /// # Examples
224    /// ```rust,no_run
225    /// # use tikv_client::{Value, Config, RawClient};
226    /// # use futures::prelude::*;
227    /// # futures::executor::block_on(async {
228    /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
229    /// let key = "TiKV".to_owned();
230    /// let req = client.get(key);
231    /// let result: Option<Value> = req.await.unwrap();
232    /// # });
233    /// ```
234    pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
235        debug!("invoking raw get request");
236        let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw);
237        let request = new_raw_get_request(key, self.cf.clone());
238        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
239            .retry_multi_region(self.backoff.clone())
240            .merge(CollectSingle)
241            .post_process_default()
242            .plan();
243        plan.execute().await
244    }
245
246    /// Create a new 'batch get' request.
247    ///
248    /// Once resolved this request will result in the fetching of the values associated with the
249    /// given keys.
250    ///
251    /// Non-existent entries will not appear in the result. The order of the keys is not retained in the result.
252    ///
253    /// # Examples
254    /// ```rust,no_run
255    /// # use tikv_client::{KvPair, Config, RawClient};
256    /// # use futures::prelude::*;
257    /// # futures::executor::block_on(async {
258    /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
259    /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
260    /// let req = client.batch_get(keys);
261    /// let result: Vec<KvPair> = req.await.unwrap();
262    /// # });
263    /// ```
264    pub async fn batch_get(
265        &self,
266        keys: impl IntoIterator<Item = impl Into<Key>>,
267    ) -> Result<Vec<KvPair>> {
268        debug!("invoking raw batch_get request");
269        let keys = keys
270            .into_iter()
271            .map(|k| k.into().encode_keyspace(self.keyspace, KeyMode::Raw));
272        let request = new_raw_batch_get_request(keys, self.cf.clone());
273        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
274            .retry_multi_region(self.backoff.clone())
275            .merge(Collect)
276            .plan();
277        plan.execute().await.map(|r| {
278            r.into_iter()
279                .map(|pair| pair.truncate_keyspace(self.keyspace))
280                .collect()
281        })
282    }
283
284    /// Create a new 'get key ttl' request.
285    ///
286    /// Once resolved this request will result in the fetching of the alive time left for the
287    /// given key.
288    ///
289    /// Retuning `Ok(None)` indicates the key does not exist in TiKV.
290    ///
291    /// # Examples
292    /// # use tikv_client::{Value, Config, RawClient};
293    /// # use futures::prelude::*;
294    /// # futures::executor::block_on(async {
295    /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
296    /// let key = "TiKV".to_owned();
297    /// let req = client.get_key_ttl_secs(key);
298    /// let result: Option<Value> = req.await.unwrap();
299    /// # });
300    pub async fn get_key_ttl_secs(&self, key: impl Into<Key>) -> Result<Option<u64>> {
301        debug!("invoking raw get_key_ttl_secs request");
302        let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw);
303        let request = new_raw_get_key_ttl_request(key, self.cf.clone());
304        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
305            .retry_multi_region(self.backoff.clone())
306            .merge(CollectSingle)
307            .post_process_default()
308            .plan();
309        plan.execute().await
310    }
311
312    /// Create a new 'put' request.
313    ///
314    /// Once resolved this request will result in the setting of the value associated with the given key.
315    ///
316    /// # Examples
317    /// ```rust,no_run
318    /// # use tikv_client::{Key, Value, Config, RawClient};
319    /// # use futures::prelude::*;
320    /// # futures::executor::block_on(async {
321    /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
322    /// let key = "TiKV".to_owned();
323    /// let val = "TiKV".to_owned();
324    /// let req = client.put(key, val);
325    /// let result: () = req.await.unwrap();
326    /// # });
327    /// ```
328    pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
329        self.put_with_ttl(key, value, 0).await
330    }
331
332    pub async fn put_with_ttl(
333        &self,
334        key: impl Into<Key>,
335        value: impl Into<Value>,
336        ttl_secs: u64,
337    ) -> Result<()> {
338        debug!("invoking raw put request");
339        let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw);
340        let request =
341            new_raw_put_request(key, value.into(), self.cf.clone(), ttl_secs, self.atomic);
342        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
343            .retry_multi_region(self.backoff.clone())
344            .merge(CollectSingle)
345            .extract_error()
346            .plan();
347        plan.execute().await?;
348        Ok(())
349    }
350
351    /// Create a new 'batch put' request.
352    ///
353    /// Once resolved this request will result in the setting of the values associated with the given keys.
354    ///
355    /// # Examples
356    /// ```rust,no_run
357    /// # use tikv_client::{Result, KvPair, Key, Value, Config, RawClient, IntoOwnedRange};
358    /// # use futures::prelude::*;
359    /// # futures::executor::block_on(async {
360    /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
361    /// let kvpair1 = ("PD".to_owned(), "Go".to_owned());
362    /// let kvpair2 = ("TiKV".to_owned(), "Rust".to_owned());
363    /// let iterable = vec![kvpair1, kvpair2];
364    /// let req = client.batch_put(iterable);
365    /// let result: () = req.await.unwrap();
366    /// # });
367    /// ```
368    pub async fn batch_put(
369        &self,
370        pairs: impl IntoIterator<Item = impl Into<KvPair>>,
371    ) -> Result<()> {
372        self.batch_put_with_ttl(pairs, std::iter::repeat(0)).await
373    }
374
375    pub async fn batch_put_with_ttl(
376        &self,
377        pairs: impl IntoIterator<Item = impl Into<KvPair>>,
378        ttls: impl IntoIterator<Item = u64>,
379    ) -> Result<()> {
380        debug!("invoking raw batch_put request");
381        let pairs = pairs
382            .into_iter()
383            .map(|pair| pair.into().encode_keyspace(self.keyspace, KeyMode::Raw));
384        let request =
385            new_raw_batch_put_request(pairs, ttls.into_iter(), self.cf.clone(), self.atomic);
386        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
387            .retry_multi_region(self.backoff.clone())
388            .extract_error()
389            .plan();
390        plan.execute().await?;
391        Ok(())
392    }
393
394    /// Create a new 'delete' request.
395    ///
396    /// Once resolved this request will result in the deletion of the given key.
397    ///
398    /// It does not return an error if the key does not exist in TiKV.
399    ///
400    /// # Examples
401    /// ```rust,no_run
402    /// # use tikv_client::{Key, Config, RawClient};
403    /// # use futures::prelude::*;
404    /// # futures::executor::block_on(async {
405    /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
406    /// let key = "TiKV".to_owned();
407    /// let req = client.delete(key);
408    /// let result: () = req.await.unwrap();
409    /// # });
410    /// ```
411    pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
412        debug!("invoking raw delete request");
413        let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw);
414        let request = new_raw_delete_request(key, self.cf.clone(), self.atomic);
415        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
416            .retry_multi_region(self.backoff.clone())
417            .merge(CollectSingle)
418            .extract_error()
419            .plan();
420        plan.execute().await?;
421        Ok(())
422    }
423
424    /// Create a new 'batch delete' request.
425    ///
426    /// Once resolved this request will result in the deletion of the given keys.
427    ///
428    /// It does not return an error if some of the keys do not exist and will delete the others.
429    ///
430    /// # Examples
431    /// ```rust,no_run
432    /// # use tikv_client::{Config, RawClient};
433    /// # use futures::prelude::*;
434    /// # futures::executor::block_on(async {
435    /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
436    /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
437    /// let req = client.batch_delete(keys);
438    /// let result: () = req.await.unwrap();
439    /// # });
440    /// ```
441    pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
442        debug!("invoking raw batch_delete request");
443        self.assert_non_atomic()?;
444        let keys = keys
445            .into_iter()
446            .map(|k| k.into().encode_keyspace(self.keyspace, KeyMode::Raw));
447        let request = new_raw_batch_delete_request(keys, self.cf.clone());
448        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
449            .retry_multi_region(self.backoff.clone())
450            .extract_error()
451            .plan();
452        plan.execute().await?;
453        Ok(())
454    }
455
456    /// Create a new 'delete range' request.
457    ///
458    /// Once resolved this request will result in the deletion of all keys lying in the given range.
459    ///
460    /// # Examples
461    /// ```rust,no_run
462    /// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
463    /// # use futures::prelude::*;
464    /// # futures::executor::block_on(async {
465    /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
466    /// let inclusive_range = "TiKV"..="TiDB";
467    /// let req = client.delete_range(inclusive_range.into_owned());
468    /// let result: () = req.await.unwrap();
469    /// # });
470    /// ```
471    pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
472        debug!("invoking raw delete_range request");
473        self.assert_non_atomic()?;
474        let range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
475        let request = new_raw_delete_range_request(range, self.cf.clone());
476        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
477            .retry_multi_region(self.backoff.clone())
478            .extract_error()
479            .plan();
480        plan.execute().await?;
481        Ok(())
482    }
483
484    /// Create a new 'scan' request.
485    ///
486    /// Once resolved this request will result in a `Vec` of key-value pairs that lies in the specified range.
487    ///
488    /// If the number of eligible key-value pairs are greater than `limit`,
489    /// only the first `limit` pairs are returned, ordered by the key.
490    ///
491    ///
492    /// # Examples
493    /// ```rust,no_run
494    /// # use tikv_client::{KvPair, Config, RawClient, IntoOwnedRange};
495    /// # use futures::prelude::*;
496    /// # futures::executor::block_on(async {
497    /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
498    /// let inclusive_range = "TiKV"..="TiDB";
499    /// let req = client.scan(inclusive_range.into_owned(), 2);
500    /// let result: Vec<KvPair> = req.await.unwrap();
501    /// # });
502    /// ```
503    pub async fn scan(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<KvPair>> {
504        debug!("invoking raw scan request");
505        self.scan_inner(range.into(), limit, false, false).await
506    }
507
508    /// Create a new 'scan' request but scans in "reverse" direction.
509    ///
510    /// Once resolved this request will result in a `Vec` of key-value pairs that lies in the specified range.
511    ///
512    /// If the number of eligible key-value pairs are greater than `limit`,
513    /// only the first `limit` pairs are returned, ordered by the key.
514    ///
515    ///
516    /// Reverse Scan queries continuous kv pairs in range [startKey, endKey),
517    /// from startKey(lowerBound) to endKey(upperBound) in reverse order, up to limit pairs.
518    /// The returned keys are in reversed lexicographical order.
519    /// If you want to include the endKey or exclude the startKey, push a '\0' to the key.
520    /// It doesn't support Scanning from "", because locating the last Region is not yet implemented.
521    /// # Examples
522    /// ```rust,no_run
523    /// # use tikv_client::{KvPair, Config, RawClient, IntoOwnedRange};
524    /// # use futures::prelude::*;
525    /// # futures::executor::block_on(async {
526    /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
527    /// let inclusive_range = "TiKV"..="TiDB";
528    /// let req = client.scan_reverse(inclusive_range.into_owned(), 2);
529    /// let result: Vec<KvPair> = req.await.unwrap();
530    /// # });
531    /// ```
532    pub async fn scan_reverse(
533        &self,
534        range: impl Into<BoundRange>,
535        limit: u32,
536    ) -> Result<Vec<KvPair>> {
537        debug!("invoking raw reverse scan request");
538        self.scan_inner(range.into(), limit, false, true).await
539    }
540
541    /// Create a new 'scan' request that only returns the keys.
542    ///
543    /// Once resolved this request will result in a `Vec` of keys that lies in the specified range.
544    ///
545    /// If the number of eligible keys are greater than `limit`,
546    /// only the first `limit` pairs are returned, ordered by the key.
547    ///
548    ///
549    /// # Examples
550    /// ```rust,no_run
551    /// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
552    /// # use futures::prelude::*;
553    /// # futures::executor::block_on(async {
554    /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
555    /// let inclusive_range = "TiKV"..="TiDB";
556    /// let req = client.scan_keys(inclusive_range.into_owned(), 2);
557    /// let result: Vec<Key> = req.await.unwrap();
558    /// # });
559    /// ```
560    pub async fn scan_keys(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<Key>> {
561        debug!("invoking raw scan_keys request");
562        Ok(self
563            .scan_inner(range, limit, true, false)
564            .await?
565            .into_iter()
566            .map(KvPair::into_key)
567            .collect())
568    }
569
570    /// Create a new 'scan' request that only returns the keys in reverse order.
571    ///
572    /// Once resolved this request will result in a `Vec` of keys that lies in the specified range.
573    ///
574    /// If the number of eligible keys are greater than `limit`,
575    /// only the first `limit` pairs are returned, ordered by the key.
576    ///
577    ///
578    /// # Examples
579    /// ```rust,no_run
580    /// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
581    /// # use futures::prelude::*;
582    /// # futures::executor::block_on(async {
583    /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
584    /// let inclusive_range = "TiKV"..="TiDB";
585    /// let req = client.scan_keys(inclusive_range.into_owned(), 2);
586    /// let result: Vec<Key> = req.await.unwrap();
587    /// # });
588    /// ```
589    pub async fn scan_keys_reverse(
590        &self,
591        range: impl Into<BoundRange>,
592        limit: u32,
593    ) -> Result<Vec<Key>> {
594        debug!("invoking raw scan_keys request");
595        Ok(self
596            .scan_inner(range, limit, true, true)
597            .await?
598            .into_iter()
599            .map(KvPair::into_key)
600            .collect())
601    }
602
603    /// Create a new 'batch scan' request.
604    ///
605    /// Once resolved this request will result in a set of scanners over the given keys.
606    ///
607    /// **Warning**: This method is experimental. The `each_limit` parameter does not work as expected.
608    /// It does not limit the number of results returned of each range,
609    /// instead it limits the number of results in each region of each range.
610    /// As a result, you may get **more than** `each_limit` key-value pairs for each range.
611    /// But you should not miss any entries.
612    ///
613    /// # Examples
614    /// ```rust,no_run
615    /// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
616    /// # use futures::prelude::*;
617    /// # futures::executor::block_on(async {
618    /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
619    /// let inclusive_range1 = "TiDB"..="TiKV";
620    /// let inclusive_range2 = "TiKV"..="TiSpark";
621    /// let iterable = vec![inclusive_range1.into_owned(), inclusive_range2.into_owned()];
622    /// let req = client.batch_scan(iterable, 2);
623    /// let result = req.await;
624    /// # });
625    /// ```
626    pub async fn batch_scan(
627        &self,
628        ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
629        each_limit: u32,
630    ) -> Result<Vec<KvPair>> {
631        debug!("invoking raw batch_scan request");
632        self.batch_scan_inner(ranges, each_limit, false).await
633    }
634
635    /// Create a new 'batch scan' request that only returns the keys.
636    ///
637    /// Once resolved this request will result in a set of scanners over the given keys.
638    ///
639    /// **Warning**: This method is experimental.
640    /// The `each_limit` parameter does not limit the number of results returned of each range,
641    /// instead it limits the number of results in each region of each range.
642    /// As a result, you may get **more than** `each_limit` key-value pairs for each range,
643    /// but you should not miss any entries.
644    ///
645    /// # Examples
646    /// ```rust,no_run
647    /// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
648    /// # use futures::prelude::*;
649    /// # futures::executor::block_on(async {
650    /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
651    /// let inclusive_range1 = "TiDB"..="TiKV";
652    /// let inclusive_range2 = "TiKV"..="TiSpark";
653    /// let iterable = vec![inclusive_range1.into_owned(), inclusive_range2.into_owned()];
654    /// let req = client.batch_scan(iterable, 2);
655    /// let result = req.await;
656    /// # });
657    /// ```
658    pub async fn batch_scan_keys(
659        &self,
660        ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
661        each_limit: u32,
662    ) -> Result<Vec<Key>> {
663        debug!("invoking raw batch_scan_keys request");
664        Ok(self
665            .batch_scan_inner(ranges, each_limit, true)
666            .await?
667            .into_iter()
668            .map(KvPair::into_key)
669            .collect())
670    }
671
672    /// Create a new *atomic* 'compare and set' request.
673    ///
674    /// Once resolved this request will result in an atomic `compare and set'
675    /// operation for the given key.
676    ///
677    /// If the value retrived is equal to `current_value`, `new_value` is
678    /// written.
679    ///
680    /// # Return Value
681    ///
682    /// A tuple is returned if successful: the previous value and whether the
683    /// value is swapped
684    pub async fn compare_and_swap(
685        &self,
686        key: impl Into<Key>,
687        previous_value: impl Into<Option<Value>>,
688        new_value: impl Into<Value>,
689    ) -> Result<(Option<Value>, bool)> {
690        debug!("invoking raw compare_and_swap request");
691        self.assert_atomic()?;
692        let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw);
693        let req = new_cas_request(
694            key,
695            new_value.into(),
696            previous_value.into(),
697            self.cf.clone(),
698        );
699        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, req)
700            .retry_multi_region(self.backoff.clone())
701            .merge(CollectSingle)
702            .post_process_default()
703            .plan();
704        plan.execute().await
705    }
706
707    pub async fn coprocessor(
708        &self,
709        copr_name: impl Into<String>,
710        copr_version_req: impl Into<String>,
711        ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
712        request_builder: impl Fn(metapb::Region, Vec<Range<Key>>) -> Vec<u8> + Send + Sync + 'static,
713    ) -> Result<Vec<(Vec<Range<Key>>, Vec<u8>)>> {
714        let copr_version_req = copr_version_req.into();
715        semver::VersionReq::from_str(&copr_version_req)?;
716        let ranges = ranges
717            .into_iter()
718            .map(|range| range.into().encode_keyspace(self.keyspace, KeyMode::Raw));
719        let keyspace = self.keyspace;
720        let request_builder = move |region, ranges: Vec<Range<Key>>| {
721            request_builder(
722                region,
723                ranges
724                    .into_iter()
725                    .map(|range| range.truncate_keyspace(keyspace))
726                    .collect(),
727            )
728        };
729        let req = new_raw_coprocessor_request(
730            copr_name.into(),
731            copr_version_req,
732            ranges,
733            request_builder,
734        );
735        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, req)
736            .preserve_shard()
737            .retry_multi_region(self.backoff.clone())
738            .post_process_default()
739            .plan();
740        Ok(plan
741            .execute()
742            .await?
743            .into_iter()
744            .map(|(ranges, data)| (ranges.truncate_keyspace(keyspace), data))
745            .collect())
746    }
747
748    async fn scan_inner(
749        &self,
750        range: impl Into<BoundRange>,
751        limit: u32,
752        key_only: bool,
753        reverse: bool,
754    ) -> Result<Vec<KvPair>> {
755        if limit > MAX_RAW_KV_SCAN_LIMIT {
756            return Err(Error::MaxScanLimitExceeded {
757                limit,
758                max_limit: MAX_RAW_KV_SCAN_LIMIT,
759            });
760        }
761        let backoff = DEFAULT_STORE_BACKOFF;
762        let mut range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
763        let mut result = Vec::new();
764        let mut current_limit = limit;
765        let (start_key, end_key) = range.clone().into_keys();
766        let mut current_key: Key = start_key;
767
768        while current_limit > 0 {
769            let scan_args = ScanInnerArgs {
770                start_key: current_key.clone(),
771                end_key: end_key.clone(),
772                limit: current_limit,
773                key_only,
774                reverse,
775                backoff: backoff.clone(),
776            };
777            let (res, next_key) = self.retryable_scan(scan_args).await?;
778
779            let mut kvs = res
780                .map(|r| r.kvs.into_iter().map(Into::into).collect::<Vec<KvPair>>())
781                .unwrap_or(Vec::new());
782
783            if !kvs.is_empty() {
784                current_limit -= kvs.len() as u32;
785                result.append(&mut kvs);
786            }
787            if end_key.clone().is_some_and(|ek| ek <= next_key) {
788                break;
789            } else {
790                current_key = next_key;
791                range = BoundRange::new(std::ops::Bound::Included(current_key.clone()), range.to);
792            }
793        }
794
795        // limit is a soft limit, so we need check the number of results
796        result.truncate(limit as usize);
797
798        // truncate the prefix of keys
799        let result = result.truncate_keyspace(self.keyspace);
800
801        Ok(result)
802    }
803
804    async fn retryable_scan(
805        &self,
806        mut scan_args: ScanInnerArgs,
807    ) -> Result<(Option<RawScanResponse>, Key)> {
808        let start_key = scan_args.start_key;
809        let end_key = scan_args.end_key;
810        loop {
811            let region = self.rpc.clone().region_for_key(&start_key).await?;
812            let store = self.rpc.clone().store_for_id(region.id()).await?;
813            let request = new_raw_scan_request(
814                (start_key.clone(), end_key.clone()).into(),
815                scan_args.limit,
816                scan_args.key_only,
817                scan_args.reverse,
818                self.cf.clone(),
819            );
820            let resp = self.do_store_scan(store.clone(), request.clone()).await;
821            return match resp {
822                Ok(mut r) => {
823                    if let Some(err) = r.region_error() {
824                        let status =
825                            plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone())
826                                .await?;
827                        if status {
828                            continue;
829                        } else if let Some(duration) = scan_args.backoff.next_delay_duration() {
830                            sleep(duration).await;
831                            continue;
832                        } else {
833                            return Err(RegionError(Box::new(err)));
834                        }
835                    }
836                    Ok((Some(r), region.end_key()))
837                }
838                Err(err) => Err(err),
839            };
840        }
841    }
842
843    async fn do_store_scan(
844        &self,
845        store: RegionStore,
846        scan_request: RawScanRequest,
847    ) -> Result<RawScanResponse> {
848        crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, scan_request)
849            .single_region_with_store(store.clone())
850            .await?
851            .plan()
852            .execute()
853            .await
854    }
855
856    async fn batch_scan_inner(
857        &self,
858        ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
859        each_limit: u32,
860        key_only: bool,
861    ) -> Result<Vec<KvPair>> {
862        if each_limit > MAX_RAW_KV_SCAN_LIMIT {
863            return Err(Error::MaxScanLimitExceeded {
864                limit: each_limit,
865                max_limit: MAX_RAW_KV_SCAN_LIMIT,
866            });
867        }
868
869        let ranges = ranges
870            .into_iter()
871            .map(|range| range.into().encode_keyspace(self.keyspace, KeyMode::Raw));
872
873        let request = new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone());
874        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
875            .retry_multi_region(self.backoff.clone())
876            .merge(Collect)
877            .plan();
878        plan.execute().await.map(|r| {
879            r.into_iter()
880                .map(|pair| pair.truncate_keyspace(self.keyspace))
881                .collect()
882        })
883    }
884
885    fn assert_non_atomic(&self) -> Result<()> {
886        if !self.atomic {
887            Ok(())
888        } else {
889            Err(Error::UnsupportedMode)
890        }
891    }
892
893    fn assert_atomic(&self) -> Result<()> {
894        if self.atomic {
895            Ok(())
896        } else {
897            Err(Error::UnsupportedMode)
898        }
899    }
900}
901
902#[derive(Clone)]
903struct ScanInnerArgs {
904    start_key: Key,
905    end_key: Option<Key>,
906    limit: u32,
907    key_only: bool,
908    reverse: bool,
909    backoff: Backoff,
910}
911
912#[cfg(test)]
913mod tests {
914    use std::any::Any;
915    use std::sync::Arc;
916
917    use super::*;
918    use crate::mock::MockKvClient;
919    use crate::mock::MockPdClient;
920    use crate::proto::kvrpcpb;
921    use crate::Result;
922
923    #[tokio::test]
924    async fn test_batch_put_with_ttl() -> Result<()> {
925        let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
926            move |req: &dyn Any| {
927                if req.downcast_ref::<kvrpcpb::RawBatchPutRequest>().is_some() {
928                    let resp = kvrpcpb::RawBatchPutResponse {
929                        ..Default::default()
930                    };
931                    Ok(Box::new(resp) as Box<dyn Any>)
932                } else {
933                    unreachable!()
934                }
935            },
936        )));
937        let client = Client {
938            rpc: pd_client,
939            cf: Some(ColumnFamily::Default),
940            backoff: DEFAULT_REGION_BACKOFF,
941            atomic: false,
942            keyspace: Keyspace::Enable { keyspace_id: 0 },
943        };
944        let pairs = vec![
945            KvPair(vec![11].into(), vec![12]),
946            KvPair(vec![11].into(), vec![12]),
947        ];
948        let ttls = vec![0, 0];
949        assert!(client.batch_put_with_ttl(pairs, ttls).await.is_ok());
950        Ok(())
951    }
952
953    #[tokio::test]
954    async fn test_raw_coprocessor() -> Result<()> {
955        let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
956            move |req: &dyn Any| {
957                if let Some(req) = req.downcast_ref::<kvrpcpb::RawCoprocessorRequest>() {
958                    assert_eq!(req.copr_name, "example");
959                    assert_eq!(req.copr_version_req, "0.1.0");
960                    let resp = kvrpcpb::RawCoprocessorResponse {
961                        data: req.data.clone(),
962                        ..Default::default()
963                    };
964                    Ok(Box::new(resp) as Box<dyn Any>)
965                } else {
966                    unreachable!()
967                }
968            },
969        )));
970        let client = Client {
971            rpc: pd_client,
972            cf: Some(ColumnFamily::Default),
973            backoff: DEFAULT_REGION_BACKOFF,
974            atomic: false,
975            keyspace: Keyspace::Enable { keyspace_id: 0 },
976        };
977        let resps = client
978            .coprocessor(
979                "example",
980                "0.1.0",
981                vec![vec![5]..vec![15], vec![20]..vec![]],
982                |region, ranges| format!("{:?}:{:?}", region.id, ranges).into_bytes(),
983            )
984            .await?;
985        let resps: Vec<_> = resps
986            .into_iter()
987            .map(|(ranges, data)| (ranges, String::from_utf8(data).unwrap()))
988            .collect();
989        assert_eq!(
990            resps,
991            vec![(
992                vec![
993                    Key::from(vec![5])..Key::from(vec![15]),
994                    Key::from(vec![20])..Key::from(vec![])
995                ],
996                "2:[Key(05)..Key(0F), Key(14)..Key()]".to_string(),
997            ),]
998        );
999        Ok(())
1000    }
1001}