Skip to main content

tikv_client/request/
mod.rs

1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3use async_trait::async_trait;
4use derive_new::new;
5
6pub use self::keyspace::EncodeKeyspace;
7pub use self::keyspace::KeyMode;
8pub use self::keyspace::Keyspace;
9pub use self::keyspace::TruncateKeyspace;
10pub use self::plan::Collect;
11pub use self::plan::CollectError;
12pub use self::plan::CollectSingle;
13pub use self::plan::CollectWithShard;
14pub use self::plan::DefaultProcessor;
15pub use self::plan::Dispatch;
16pub use self::plan::ExtractError;
17pub use self::plan::Merge;
18pub use self::plan::MergeResponse;
19pub use self::plan::Plan;
20pub use self::plan::Process;
21pub use self::plan::ProcessResponse;
22pub use self::plan::ResolveLock;
23pub use self::plan::ResponseWithShard;
24pub use self::plan::RetryableMultiRegion;
25pub use self::plan_builder::PlanBuilder;
26pub use self::plan_builder::SingleKey;
27pub use self::shard::Batchable;
28pub use self::shard::HasNextBatch;
29pub use self::shard::NextBatch;
30pub use self::shard::RangeRequest;
31pub use self::shard::Shardable;
32use crate::backoff::Backoff;
33use crate::backoff::DEFAULT_REGION_BACKOFF;
34use crate::backoff::OPTIMISTIC_BACKOFF;
35use crate::backoff::PESSIMISTIC_BACKOFF;
36use crate::store::Request;
37use crate::store::{HasKeyErrors, Store};
38use crate::transaction::HasLocks;
39
40mod keyspace;
41pub mod plan;
42mod plan_builder;
43mod shard;
44
45/// Abstracts any request sent to a TiKV server.
46#[async_trait]
47pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static {
48    /// The expected response to the request.
49    type Response: HasKeyErrors + HasLocks + Clone + Send + 'static;
50}
51
52/// For requests or plans which are handled at TiKV store (other than region) level.
53pub trait StoreRequest {
54    /// Apply the request to specified TiKV store.
55    fn apply_store(&mut self, store: &Store);
56}
57
58#[derive(Clone, Debug, new, Eq, PartialEq)]
59pub struct RetryOptions {
60    /// How to retry when there is a region error and we need to resolve regions with PD.
61    pub region_backoff: Backoff,
62    /// How to retry when a key is locked.
63    pub lock_backoff: Backoff,
64}
65
66impl RetryOptions {
67    pub const fn default_optimistic() -> RetryOptions {
68        RetryOptions {
69            region_backoff: DEFAULT_REGION_BACKOFF,
70            lock_backoff: OPTIMISTIC_BACKOFF,
71        }
72    }
73
74    pub const fn default_pessimistic() -> RetryOptions {
75        RetryOptions {
76            region_backoff: DEFAULT_REGION_BACKOFF,
77            lock_backoff: PESSIMISTIC_BACKOFF,
78        }
79    }
80
81    pub const fn none() -> RetryOptions {
82        RetryOptions {
83            region_backoff: Backoff::no_backoff(),
84            lock_backoff: Backoff::no_backoff(),
85        }
86    }
87}
88
89#[cfg(test)]
90mod test {
91    use std::any::Any;
92    use std::iter;
93    use std::sync::atomic::AtomicUsize;
94    use std::sync::Arc;
95    use std::time::Duration;
96
97    use tonic::transport::Channel;
98
99    use super::*;
100    use crate::mock::MockKvClient;
101    use crate::mock::MockPdClient;
102    use crate::proto::kvrpcpb;
103    use crate::proto::pdpb::Timestamp;
104    use crate::proto::tikvpb::tikv_client::TikvClient;
105    use crate::region::RegionWithLeader;
106    use crate::store::region_stream_for_keys;
107    use crate::store::HasRegionError;
108    use crate::transaction::lowering::new_commit_request;
109    use crate::Error;
110    use crate::Key;
111    use crate::Result;
112
113    #[tokio::test]
114    async fn test_region_retry() {
115        #[derive(Debug, Clone)]
116        struct MockRpcResponse;
117
118        impl HasKeyErrors for MockRpcResponse {
119            fn key_errors(&mut self) -> Option<Vec<Error>> {
120                None
121            }
122        }
123
124        impl HasRegionError for MockRpcResponse {
125            fn region_error(&mut self) -> Option<crate::proto::errorpb::Error> {
126                Some(crate::proto::errorpb::Error::default())
127            }
128        }
129
130        impl HasLocks for MockRpcResponse {}
131
132        #[derive(Clone)]
133        struct MockKvRequest {
134            test_invoking_count: Arc<AtomicUsize>,
135        }
136
137        #[async_trait]
138        impl Request for MockKvRequest {
139            async fn dispatch(&self, _: &TikvClient<Channel>, _: Duration) -> Result<Box<dyn Any>> {
140                Ok(Box::new(MockRpcResponse {}))
141            }
142
143            fn label(&self) -> &'static str {
144                "mock"
145            }
146
147            fn as_any(&self) -> &dyn Any {
148                self
149            }
150
151            fn set_leader(&mut self, _: &RegionWithLeader) -> Result<()> {
152                Ok(())
153            }
154
155            fn set_api_version(&mut self, _: kvrpcpb::ApiVersion) {}
156        }
157
158        #[async_trait]
159        impl KvRequest for MockKvRequest {
160            type Response = MockRpcResponse;
161        }
162
163        impl Shardable for MockKvRequest {
164            type Shard = Vec<Vec<u8>>;
165
166            fn shards(
167                &self,
168                pd_client: &std::sync::Arc<impl crate::pd::PdClient>,
169            ) -> futures::stream::BoxStream<
170                'static,
171                crate::Result<(Self::Shard, crate::region::RegionWithLeader)>,
172            > {
173                // Increases by 1 for each call.
174                self.test_invoking_count
175                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
176                region_stream_for_keys(
177                    Some(Key::from("mock_key".to_owned())).into_iter(),
178                    pd_client.clone(),
179                )
180            }
181
182            fn apply_shard(&mut self, _shard: Self::Shard) {}
183
184            fn apply_store(&mut self, _store: &crate::store::RegionStore) -> crate::Result<()> {
185                Ok(())
186            }
187        }
188
189        let invoking_count = Arc::new(AtomicUsize::new(0));
190
191        let request = MockKvRequest {
192            test_invoking_count: invoking_count.clone(),
193        };
194
195        let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
196            |_: &dyn Any| Ok(Box::new(MockRpcResponse) as Box<dyn Any>),
197        )));
198
199        let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, request)
200            .retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3))
201            .extract_error()
202            .plan();
203        let _ = plan.execute().await;
204
205        // Original call plus the 3 retries
206        assert_eq!(invoking_count.load(std::sync::atomic::Ordering::SeqCst), 4);
207    }
208
209    #[tokio::test]
210    async fn test_extract_error() {
211        let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
212            |_: &dyn Any| {
213                Ok(Box::new(kvrpcpb::CommitResponse {
214                    error: Some(kvrpcpb::KeyError::default()),
215                    ..Default::default()
216                }) as Box<dyn Any>)
217            },
218        )));
219
220        let key: Key = "key".to_owned().into();
221        let req = new_commit_request(iter::once(key), Timestamp::default(), Timestamp::default());
222
223        // does not extract error
224        let plan =
225            crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req.clone())
226                .retry_multi_region(OPTIMISTIC_BACKOFF)
227                .plan();
228        assert!(plan.execute().await.is_ok());
229
230        // extract error
231        let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req)
232            .retry_multi_region(OPTIMISTIC_BACKOFF)
233            .extract_error()
234            .plan();
235        assert!(plan.execute().await.is_err());
236    }
237}