tikv_client/request/
plan_builder.rs

1// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
2
3use std::marker::PhantomData;
4use std::sync::Arc;
5
6use super::plan::PreserveShard;
7use crate::backoff::Backoff;
8use crate::pd::PdClient;
9use crate::request::codec::EncodedRequest;
10use crate::request::plan::{CleanupLocks, RetryableAllStores};
11use crate::request::shard::HasNextBatch;
12use crate::request::Dispatch;
13use crate::request::ExtractError;
14use crate::request::KvRequest;
15use crate::request::Merge;
16use crate::request::MergeResponse;
17use crate::request::NextBatch;
18use crate::request::Plan;
19use crate::request::Process;
20use crate::request::ProcessResponse;
21use crate::request::ResolveLock;
22use crate::request::RetryableMultiRegion;
23use crate::request::Shardable;
24use crate::request::{DefaultProcessor, StoreRequest};
25use crate::store::HasKeyErrors;
26use crate::store::HasRegionError;
27use crate::store::HasRegionErrors;
28use crate::store::RegionStore;
29use crate::transaction::HasLocks;
30use crate::transaction::ResolveLocksContext;
31use crate::transaction::ResolveLocksOptions;
32use crate::Result;
33
34/// Builder type for plans (see that module for more).
35pub struct PlanBuilder<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> {
36    pd_client: Arc<PdC>,
37    plan: P,
38    phantom: PhantomData<Ph>,
39}
40
41/// Used to ensure that a plan has a designated target or targets, a target is
42/// a particular TiKV server.
43pub trait PlanBuilderPhase {}
44pub struct NoTarget;
45impl PlanBuilderPhase for NoTarget {}
46pub struct Targetted;
47impl PlanBuilderPhase for Targetted {}
48
49impl<PdC: PdClient, Req: KvRequest> PlanBuilder<PdC, Dispatch<Req>, NoTarget> {
50    pub fn new(pd_client: Arc<PdC>, encoded_request: EncodedRequest<Req>) -> Self {
51        PlanBuilder {
52            pd_client,
53            plan: Dispatch {
54                request: encoded_request.inner,
55                kv_client: None,
56            },
57            phantom: PhantomData,
58        }
59    }
60}
61
62impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted> {
63    /// Return the built plan, note that this can only be called once the plan
64    /// has a target.
65    pub fn plan(self) -> P {
66        self.plan
67    }
68}
69
70impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
71    /// If there is a lock error, then resolve the lock and retry the request.
72    pub fn resolve_lock(self, backoff: Backoff) -> PlanBuilder<PdC, ResolveLock<P, PdC>, Ph>
73    where
74        P::Result: HasLocks,
75    {
76        PlanBuilder {
77            pd_client: self.pd_client.clone(),
78            plan: ResolveLock {
79                inner: self.plan,
80                backoff,
81                pd_client: self.pd_client,
82            },
83            phantom: PhantomData,
84        }
85    }
86
87    pub fn cleanup_locks(
88        self,
89        ctx: ResolveLocksContext,
90        options: ResolveLocksOptions,
91        backoff: Backoff,
92    ) -> PlanBuilder<PdC, CleanupLocks<P, PdC>, Ph>
93    where
94        P: Shardable + NextBatch,
95        P::Result: HasLocks + HasNextBatch + HasRegionError + HasKeyErrors,
96    {
97        PlanBuilder {
98            pd_client: self.pd_client.clone(),
99            plan: CleanupLocks {
100                inner: self.plan,
101                ctx,
102                options,
103                store: None,
104                backoff,
105                pd_client: self.pd_client,
106            },
107            phantom: PhantomData,
108        }
109    }
110
111    /// Merge the results of a request. Usually used where a request is sent to multiple regions
112    /// to combine the responses from each region.
113    pub fn merge<In, M: Merge<In>>(self, merge: M) -> PlanBuilder<PdC, MergeResponse<P, In, M>, Ph>
114    where
115        In: Clone + Send + Sync + 'static,
116        P: Plan<Result = Vec<Result<In>>>,
117    {
118        PlanBuilder {
119            pd_client: self.pd_client.clone(),
120            plan: MergeResponse {
121                inner: self.plan,
122                merge,
123                phantom: PhantomData,
124            },
125            phantom: PhantomData,
126        }
127    }
128
129    /// Apply the default processing step to a response (usually only needed if the request is sent
130    /// to a single region because post-porcessing can be incorporated in the merge step for
131    /// multi-region requests).
132    pub fn post_process_default(self) -> PlanBuilder<PdC, ProcessResponse<P, DefaultProcessor>, Ph>
133    where
134        P: Plan,
135        DefaultProcessor: Process<P::Result>,
136    {
137        PlanBuilder {
138            pd_client: self.pd_client.clone(),
139            plan: ProcessResponse {
140                inner: self.plan,
141                processor: DefaultProcessor,
142            },
143            phantom: PhantomData,
144        }
145    }
146}
147
148impl<PdC: PdClient, P: Plan + Shardable> PlanBuilder<PdC, P, NoTarget>
149where
150    P::Result: HasKeyErrors + HasRegionError,
151{
152    /// Split the request into shards sending a request to the region of each shard.
153    pub fn retry_multi_region(
154        self,
155        backoff: Backoff,
156    ) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
157        self.make_retry_multi_region(backoff, false)
158    }
159
160    /// Preserve all results, even some of them are Err.
161    /// To pass all responses to merge, and handle partial successful results correctly.
162    pub fn retry_multi_region_preserve_results(
163        self,
164        backoff: Backoff,
165    ) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
166        self.make_retry_multi_region(backoff, true)
167    }
168
169    fn make_retry_multi_region(
170        self,
171        backoff: Backoff,
172        preserve_region_results: bool,
173    ) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
174        PlanBuilder {
175            pd_client: self.pd_client.clone(),
176            plan: RetryableMultiRegion {
177                inner: self.plan,
178                pd_client: self.pd_client,
179                backoff,
180                preserve_region_results,
181            },
182            phantom: PhantomData,
183        }
184    }
185}
186
187impl<PdC: PdClient, R: KvRequest> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
188    /// Target the request at a single region; caller supplies the store to target.
189    pub async fn single_region_with_store(
190        self,
191        store: RegionStore,
192    ) -> Result<PlanBuilder<PdC, Dispatch<R>, Targetted>> {
193        set_single_region_store(self.plan, store, self.pd_client)
194    }
195}
196
197impl<PdC: PdClient, P: Plan + StoreRequest> PlanBuilder<PdC, P, NoTarget>
198where
199    P::Result: HasKeyErrors + HasRegionError,
200{
201    pub fn all_stores(
202        self,
203        backoff: Backoff,
204    ) -> PlanBuilder<PdC, RetryableAllStores<P, PdC>, Targetted> {
205        PlanBuilder {
206            pd_client: self.pd_client.clone(),
207            plan: RetryableAllStores {
208                inner: self.plan,
209                pd_client: self.pd_client,
210                backoff,
211            },
212            phantom: PhantomData,
213        }
214    }
215}
216
217impl<PdC: PdClient, P: Plan + Shardable> PlanBuilder<PdC, P, NoTarget>
218where
219    P::Result: HasKeyErrors,
220{
221    pub fn preserve_shard(self) -> PlanBuilder<PdC, PreserveShard<P>, NoTarget> {
222        PlanBuilder {
223            pd_client: self.pd_client.clone(),
224            plan: PreserveShard {
225                inner: self.plan,
226                shard: None,
227            },
228            phantom: PhantomData,
229        }
230    }
231}
232
233impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted>
234where
235    P::Result: HasKeyErrors + HasRegionErrors,
236{
237    pub fn extract_error(self) -> PlanBuilder<PdC, ExtractError<P>, Targetted> {
238        PlanBuilder {
239            pd_client: self.pd_client,
240            plan: ExtractError { inner: self.plan },
241            phantom: self.phantom,
242        }
243    }
244}
245
246fn set_single_region_store<PdC: PdClient, R: KvRequest>(
247    mut plan: Dispatch<R>,
248    store: RegionStore,
249    pd_client: Arc<PdC>,
250) -> Result<PlanBuilder<PdC, Dispatch<R>, Targetted>> {
251    plan.request
252        .set_context(store.region_with_leader.context()?);
253    plan.kv_client = Some(store.client);
254    Ok(PlanBuilder {
255        plan,
256        pd_client,
257        phantom: PhantomData,
258    })
259}
260
261/// Indicates that a request operates on a single key.
262pub trait SingleKey {
263    #[allow(clippy::ptr_arg)]
264    fn key(&self) -> &Vec<u8>;
265}