Skip to main content

tikv_client/request/
plan_builder.rs

1// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
2
3use std::marker::PhantomData;
4use std::sync::Arc;
5
6use super::plan::PreserveShard;
7use super::Keyspace;
8use crate::backoff::Backoff;
9use crate::pd::PdClient;
10use crate::request::plan::{CleanupLocks, RetryableAllStores};
11use crate::request::shard::HasNextBatch;
12use crate::request::Dispatch;
13use crate::request::ExtractError;
14use crate::request::KvRequest;
15use crate::request::Merge;
16use crate::request::MergeResponse;
17use crate::request::NextBatch;
18use crate::request::Plan;
19use crate::request::Process;
20use crate::request::ProcessResponse;
21use crate::request::ResolveLock;
22use crate::request::RetryableMultiRegion;
23use crate::request::Shardable;
24use crate::request::{DefaultProcessor, StoreRequest};
25use crate::store::HasKeyErrors;
26use crate::store::HasRegionError;
27use crate::store::HasRegionErrors;
28use crate::store::RegionStore;
29use crate::transaction::HasLocks;
30use crate::transaction::ResolveLocksContext;
31use crate::transaction::ResolveLocksOptions;
32use crate::Result;
33use crate::Timestamp;
34
35/// Builder type for plans (see that module for more).
36pub struct PlanBuilder<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> {
37    pd_client: Arc<PdC>,
38    plan: P,
39    phantom: PhantomData<Ph>,
40}
41
42/// Used to ensure that a plan has a designated target or targets, a target is
43/// a particular TiKV server.
44pub trait PlanBuilderPhase {}
45pub struct NoTarget;
46impl PlanBuilderPhase for NoTarget {}
47pub struct Targetted;
48impl PlanBuilderPhase for Targetted {}
49
50impl<PdC: PdClient, Req: KvRequest> PlanBuilder<PdC, Dispatch<Req>, NoTarget> {
51    pub fn new(pd_client: Arc<PdC>, keyspace: Keyspace, mut request: Req) -> Self {
52        request.set_api_version(keyspace.api_version());
53        PlanBuilder {
54            pd_client,
55            plan: Dispatch {
56                request,
57                kv_client: None,
58            },
59            phantom: PhantomData,
60        }
61    }
62}
63
64impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted> {
65    /// Return the built plan, note that this can only be called once the plan
66    /// has a target.
67    pub fn plan(self) -> P {
68        self.plan
69    }
70}
71
72impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
73    /// If there is a lock error, then resolve the lock and retry the request.
74    pub fn resolve_lock(
75        self,
76        timestamp: Timestamp,
77        backoff: Backoff,
78        keyspace: Keyspace,
79    ) -> PlanBuilder<PdC, ResolveLock<P, PdC>, Ph>
80    where
81        P::Result: HasLocks,
82    {
83        PlanBuilder {
84            pd_client: self.pd_client.clone(),
85            plan: ResolveLock {
86                inner: self.plan,
87                timestamp,
88                backoff,
89                pd_client: self.pd_client,
90                keyspace,
91            },
92            phantom: PhantomData,
93        }
94    }
95
96    pub fn cleanup_locks(
97        self,
98        ctx: ResolveLocksContext,
99        options: ResolveLocksOptions,
100        backoff: Backoff,
101        keyspace: Keyspace,
102    ) -> PlanBuilder<PdC, CleanupLocks<P, PdC>, Ph>
103    where
104        P: Shardable + NextBatch,
105        P::Result: HasLocks + HasNextBatch + HasRegionError + HasKeyErrors,
106    {
107        PlanBuilder {
108            pd_client: self.pd_client.clone(),
109            plan: CleanupLocks {
110                inner: self.plan,
111                ctx,
112                options,
113                store: None,
114                backoff,
115                pd_client: self.pd_client,
116                keyspace,
117            },
118            phantom: PhantomData,
119        }
120    }
121
122    /// Merge the results of a request. Usually used where a request is sent to multiple regions
123    /// to combine the responses from each region.
124    pub fn merge<In, M: Merge<In>>(self, merge: M) -> PlanBuilder<PdC, MergeResponse<P, In, M>, Ph>
125    where
126        In: Clone + Send + Sync + 'static,
127        P: Plan<Result = Vec<Result<In>>>,
128    {
129        PlanBuilder {
130            pd_client: self.pd_client.clone(),
131            plan: MergeResponse {
132                inner: self.plan,
133                merge,
134                phantom: PhantomData,
135            },
136            phantom: PhantomData,
137        }
138    }
139
140    /// Apply the default processing step to a response (usually only needed if the request is sent
141    /// to a single region because post-porcessing can be incorporated in the merge step for
142    /// multi-region requests).
143    pub fn post_process_default(self) -> PlanBuilder<PdC, ProcessResponse<P, DefaultProcessor>, Ph>
144    where
145        P: Plan,
146        DefaultProcessor: Process<P::Result>,
147    {
148        PlanBuilder {
149            pd_client: self.pd_client.clone(),
150            plan: ProcessResponse {
151                inner: self.plan,
152                processor: DefaultProcessor,
153            },
154            phantom: PhantomData,
155        }
156    }
157}
158
159impl<PdC: PdClient, P: Plan + Shardable> PlanBuilder<PdC, P, NoTarget>
160where
161    P::Result: HasKeyErrors + HasRegionError,
162{
163    /// Split the request into shards sending a request to the region of each shard.
164    pub fn retry_multi_region(
165        self,
166        backoff: Backoff,
167    ) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
168        self.make_retry_multi_region(backoff, false)
169    }
170
171    /// Preserve all results, even some of them are Err.
172    /// To pass all responses to merge, and handle partial successful results correctly.
173    pub fn retry_multi_region_preserve_results(
174        self,
175        backoff: Backoff,
176    ) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
177        self.make_retry_multi_region(backoff, true)
178    }
179
180    fn make_retry_multi_region(
181        self,
182        backoff: Backoff,
183        preserve_region_results: bool,
184    ) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
185        PlanBuilder {
186            pd_client: self.pd_client.clone(),
187            plan: RetryableMultiRegion {
188                inner: self.plan,
189                pd_client: self.pd_client,
190                backoff,
191                preserve_region_results,
192            },
193            phantom: PhantomData,
194        }
195    }
196}
197
198impl<PdC: PdClient, R: KvRequest> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
199    /// Target the request at a single region; caller supplies the store to target.
200    pub async fn single_region_with_store(
201        self,
202        store: RegionStore,
203    ) -> Result<PlanBuilder<PdC, Dispatch<R>, Targetted>> {
204        set_single_region_store(self.plan, store, self.pd_client)
205    }
206}
207
208impl<PdC: PdClient, P: Plan + StoreRequest> PlanBuilder<PdC, P, NoTarget>
209where
210    P::Result: HasKeyErrors + HasRegionError,
211{
212    pub fn all_stores(
213        self,
214        backoff: Backoff,
215    ) -> PlanBuilder<PdC, RetryableAllStores<P, PdC>, Targetted> {
216        PlanBuilder {
217            pd_client: self.pd_client.clone(),
218            plan: RetryableAllStores {
219                inner: self.plan,
220                pd_client: self.pd_client,
221                backoff,
222            },
223            phantom: PhantomData,
224        }
225    }
226}
227
228impl<PdC: PdClient, P: Plan + Shardable> PlanBuilder<PdC, P, NoTarget>
229where
230    P::Result: HasKeyErrors,
231{
232    pub fn preserve_shard(self) -> PlanBuilder<PdC, PreserveShard<P>, NoTarget> {
233        PlanBuilder {
234            pd_client: self.pd_client.clone(),
235            plan: PreserveShard {
236                inner: self.plan,
237                shard: None,
238            },
239            phantom: PhantomData,
240        }
241    }
242}
243
244impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted>
245where
246    P::Result: HasKeyErrors + HasRegionErrors,
247{
248    pub fn extract_error(self) -> PlanBuilder<PdC, ExtractError<P>, Targetted> {
249        PlanBuilder {
250            pd_client: self.pd_client,
251            plan: ExtractError { inner: self.plan },
252            phantom: self.phantom,
253        }
254    }
255}
256
257fn set_single_region_store<PdC: PdClient, R: KvRequest>(
258    mut plan: Dispatch<R>,
259    store: RegionStore,
260    pd_client: Arc<PdC>,
261) -> Result<PlanBuilder<PdC, Dispatch<R>, Targetted>> {
262    plan.request.set_leader(&store.region_with_leader)?;
263    plan.kv_client = Some(store.client);
264    Ok(PlanBuilder {
265        plan,
266        pd_client,
267        phantom: PhantomData,
268    })
269}
270
271/// Indicates that a request operates on a single key.
272pub trait SingleKey {
273    #[allow(clippy::ptr_arg)]
274    fn key(&self) -> &Vec<u8>;
275}