tikv_client/request/
mod.rs1use async_trait::async_trait;
4use derive_new::new;
5
6pub use self::keyspace::EncodeKeyspace;
7pub use self::keyspace::KeyMode;
8pub use self::keyspace::Keyspace;
9pub use self::keyspace::TruncateKeyspace;
10pub use self::plan::Collect;
11pub use self::plan::CollectError;
12pub use self::plan::CollectSingle;
13pub use self::plan::CollectWithShard;
14pub use self::plan::DefaultProcessor;
15pub use self::plan::Dispatch;
16pub use self::plan::ExtractError;
17pub use self::plan::Merge;
18pub use self::plan::MergeResponse;
19pub use self::plan::Plan;
20pub use self::plan::Process;
21pub use self::plan::ProcessResponse;
22pub use self::plan::ResolveLock;
23pub use self::plan::ResponseWithShard;
24pub use self::plan::RetryableMultiRegion;
25pub use self::plan_builder::PlanBuilder;
26pub use self::plan_builder::SingleKey;
27pub use self::shard::Batchable;
28pub use self::shard::HasNextBatch;
29pub use self::shard::NextBatch;
30pub use self::shard::RangeRequest;
31pub use self::shard::Shardable;
32use crate::backoff::Backoff;
33use crate::backoff::DEFAULT_REGION_BACKOFF;
34use crate::backoff::OPTIMISTIC_BACKOFF;
35use crate::backoff::PESSIMISTIC_BACKOFF;
36use crate::store::Request;
37use crate::store::{HasKeyErrors, Store};
38use crate::transaction::HasLocks;
39
40mod keyspace;
41pub mod plan;
42mod plan_builder;
43mod shard;
44
45#[async_trait]
47pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static {
48 type Response: HasKeyErrors + HasLocks + Clone + Send + 'static;
50}
51
52pub trait StoreRequest {
54 fn apply_store(&mut self, store: &Store);
56}
57
58#[derive(Clone, Debug, new, Eq, PartialEq)]
59pub struct RetryOptions {
60 pub region_backoff: Backoff,
62 pub lock_backoff: Backoff,
64}
65
66impl RetryOptions {
67 pub const fn default_optimistic() -> RetryOptions {
68 RetryOptions {
69 region_backoff: DEFAULT_REGION_BACKOFF,
70 lock_backoff: OPTIMISTIC_BACKOFF,
71 }
72 }
73
74 pub const fn default_pessimistic() -> RetryOptions {
75 RetryOptions {
76 region_backoff: DEFAULT_REGION_BACKOFF,
77 lock_backoff: PESSIMISTIC_BACKOFF,
78 }
79 }
80
81 pub const fn none() -> RetryOptions {
82 RetryOptions {
83 region_backoff: Backoff::no_backoff(),
84 lock_backoff: Backoff::no_backoff(),
85 }
86 }
87}
88
89#[cfg(test)]
90mod test {
91 use std::any::Any;
92 use std::iter;
93 use std::sync::atomic::AtomicUsize;
94 use std::sync::Arc;
95 use std::time::Duration;
96
97 use tonic::transport::Channel;
98
99 use super::*;
100 use crate::mock::MockKvClient;
101 use crate::mock::MockPdClient;
102 use crate::proto::kvrpcpb;
103 use crate::proto::pdpb::Timestamp;
104 use crate::proto::tikvpb::tikv_client::TikvClient;
105 use crate::region::RegionWithLeader;
106 use crate::store::region_stream_for_keys;
107 use crate::store::HasRegionError;
108 use crate::transaction::lowering::new_commit_request;
109 use crate::Error;
110 use crate::Key;
111 use crate::Result;
112
113 #[tokio::test]
114 async fn test_region_retry() {
115 #[derive(Debug, Clone)]
116 struct MockRpcResponse;
117
118 impl HasKeyErrors for MockRpcResponse {
119 fn key_errors(&mut self) -> Option<Vec<Error>> {
120 None
121 }
122 }
123
124 impl HasRegionError for MockRpcResponse {
125 fn region_error(&mut self) -> Option<crate::proto::errorpb::Error> {
126 Some(crate::proto::errorpb::Error::default())
127 }
128 }
129
130 impl HasLocks for MockRpcResponse {}
131
132 #[derive(Clone)]
133 struct MockKvRequest {
134 test_invoking_count: Arc<AtomicUsize>,
135 }
136
137 #[async_trait]
138 impl Request for MockKvRequest {
139 async fn dispatch(&self, _: &TikvClient<Channel>, _: Duration) -> Result<Box<dyn Any>> {
140 Ok(Box::new(MockRpcResponse {}))
141 }
142
143 fn label(&self) -> &'static str {
144 "mock"
145 }
146
147 fn as_any(&self) -> &dyn Any {
148 self
149 }
150
151 fn set_leader(&mut self, _: &RegionWithLeader) -> Result<()> {
152 Ok(())
153 }
154
155 fn set_api_version(&mut self, _: kvrpcpb::ApiVersion) {}
156 }
157
158 #[async_trait]
159 impl KvRequest for MockKvRequest {
160 type Response = MockRpcResponse;
161 }
162
163 impl Shardable for MockKvRequest {
164 type Shard = Vec<Vec<u8>>;
165
166 fn shards(
167 &self,
168 pd_client: &std::sync::Arc<impl crate::pd::PdClient>,
169 ) -> futures::stream::BoxStream<
170 'static,
171 crate::Result<(Self::Shard, crate::region::RegionWithLeader)>,
172 > {
173 self.test_invoking_count
175 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
176 region_stream_for_keys(
177 Some(Key::from("mock_key".to_owned())).into_iter(),
178 pd_client.clone(),
179 )
180 }
181
182 fn apply_shard(&mut self, _shard: Self::Shard) {}
183
184 fn apply_store(&mut self, _store: &crate::store::RegionStore) -> crate::Result<()> {
185 Ok(())
186 }
187 }
188
189 let invoking_count = Arc::new(AtomicUsize::new(0));
190
191 let request = MockKvRequest {
192 test_invoking_count: invoking_count.clone(),
193 };
194
195 let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
196 |_: &dyn Any| Ok(Box::new(MockRpcResponse) as Box<dyn Any>),
197 )));
198
199 let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, request)
200 .retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3))
201 .extract_error()
202 .plan();
203 let _ = plan.execute().await;
204
205 assert_eq!(invoking_count.load(std::sync::atomic::Ordering::SeqCst), 4);
207 }
208
209 #[tokio::test]
210 async fn test_extract_error() {
211 let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
212 |_: &dyn Any| {
213 Ok(Box::new(kvrpcpb::CommitResponse {
214 error: Some(kvrpcpb::KeyError::default()),
215 ..Default::default()
216 }) as Box<dyn Any>)
217 },
218 )));
219
220 let key: Key = "key".to_owned().into();
221 let req = new_commit_request(iter::once(key), Timestamp::default(), Timestamp::default());
222
223 let plan =
225 crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req.clone())
226 .retry_multi_region(OPTIMISTIC_BACKOFF)
227 .plan();
228 assert!(plan.execute().await.is_ok());
229
230 let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req)
232 .retry_multi_region(OPTIMISTIC_BACKOFF)
233 .extract_error()
234 .plan();
235 assert!(plan.execute().await.is_err());
236 }
237}