1use std::marker::PhantomData;
4use std::sync::Arc;
5
6use super::plan::PreserveShard;
7use crate::backoff::Backoff;
8use crate::pd::PdClient;
9use crate::request::codec::EncodedRequest;
10use crate::request::plan::{CleanupLocks, RetryableAllStores};
11use crate::request::shard::HasNextBatch;
12use crate::request::Dispatch;
13use crate::request::ExtractError;
14use crate::request::KvRequest;
15use crate::request::Merge;
16use crate::request::MergeResponse;
17use crate::request::NextBatch;
18use crate::request::Plan;
19use crate::request::Process;
20use crate::request::ProcessResponse;
21use crate::request::ResolveLock;
22use crate::request::RetryableMultiRegion;
23use crate::request::Shardable;
24use crate::request::{DefaultProcessor, StoreRequest};
25use crate::store::HasKeyErrors;
26use crate::store::HasRegionError;
27use crate::store::HasRegionErrors;
28use crate::store::RegionStore;
29use crate::transaction::HasLocks;
30use crate::transaction::ResolveLocksContext;
31use crate::transaction::ResolveLocksOptions;
32use crate::Result;
33
34pub struct PlanBuilder<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> {
36 pd_client: Arc<PdC>,
37 plan: P,
38 phantom: PhantomData<Ph>,
39}
40
41pub trait PlanBuilderPhase {}
44pub struct NoTarget;
45impl PlanBuilderPhase for NoTarget {}
46pub struct Targetted;
47impl PlanBuilderPhase for Targetted {}
48
49impl<PdC: PdClient, Req: KvRequest> PlanBuilder<PdC, Dispatch<Req>, NoTarget> {
50 pub fn new(pd_client: Arc<PdC>, encoded_request: EncodedRequest<Req>) -> Self {
51 PlanBuilder {
52 pd_client,
53 plan: Dispatch {
54 request: encoded_request.inner,
55 kv_client: None,
56 },
57 phantom: PhantomData,
58 }
59 }
60}
61
62impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted> {
63 pub fn plan(self) -> P {
66 self.plan
67 }
68}
69
70impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
71 pub fn resolve_lock(self, backoff: Backoff) -> PlanBuilder<PdC, ResolveLock<P, PdC>, Ph>
73 where
74 P::Result: HasLocks,
75 {
76 PlanBuilder {
77 pd_client: self.pd_client.clone(),
78 plan: ResolveLock {
79 inner: self.plan,
80 backoff,
81 pd_client: self.pd_client,
82 },
83 phantom: PhantomData,
84 }
85 }
86
87 pub fn cleanup_locks(
88 self,
89 ctx: ResolveLocksContext,
90 options: ResolveLocksOptions,
91 backoff: Backoff,
92 ) -> PlanBuilder<PdC, CleanupLocks<P, PdC>, Ph>
93 where
94 P: Shardable + NextBatch,
95 P::Result: HasLocks + HasNextBatch + HasRegionError + HasKeyErrors,
96 {
97 PlanBuilder {
98 pd_client: self.pd_client.clone(),
99 plan: CleanupLocks {
100 inner: self.plan,
101 ctx,
102 options,
103 store: None,
104 backoff,
105 pd_client: self.pd_client,
106 },
107 phantom: PhantomData,
108 }
109 }
110
111 pub fn merge<In, M: Merge<In>>(self, merge: M) -> PlanBuilder<PdC, MergeResponse<P, In, M>, Ph>
114 where
115 In: Clone + Send + Sync + 'static,
116 P: Plan<Result = Vec<Result<In>>>,
117 {
118 PlanBuilder {
119 pd_client: self.pd_client.clone(),
120 plan: MergeResponse {
121 inner: self.plan,
122 merge,
123 phantom: PhantomData,
124 },
125 phantom: PhantomData,
126 }
127 }
128
129 pub fn post_process_default(self) -> PlanBuilder<PdC, ProcessResponse<P, DefaultProcessor>, Ph>
133 where
134 P: Plan,
135 DefaultProcessor: Process<P::Result>,
136 {
137 PlanBuilder {
138 pd_client: self.pd_client.clone(),
139 plan: ProcessResponse {
140 inner: self.plan,
141 processor: DefaultProcessor,
142 },
143 phantom: PhantomData,
144 }
145 }
146}
147
148impl<PdC: PdClient, P: Plan + Shardable> PlanBuilder<PdC, P, NoTarget>
149where
150 P::Result: HasKeyErrors + HasRegionError,
151{
152 pub fn retry_multi_region(
154 self,
155 backoff: Backoff,
156 ) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
157 self.make_retry_multi_region(backoff, false)
158 }
159
160 pub fn retry_multi_region_preserve_results(
163 self,
164 backoff: Backoff,
165 ) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
166 self.make_retry_multi_region(backoff, true)
167 }
168
169 fn make_retry_multi_region(
170 self,
171 backoff: Backoff,
172 preserve_region_results: bool,
173 ) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
174 PlanBuilder {
175 pd_client: self.pd_client.clone(),
176 plan: RetryableMultiRegion {
177 inner: self.plan,
178 pd_client: self.pd_client,
179 backoff,
180 preserve_region_results,
181 },
182 phantom: PhantomData,
183 }
184 }
185}
186
187impl<PdC: PdClient, R: KvRequest> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
188 pub async fn single_region_with_store(
190 self,
191 store: RegionStore,
192 ) -> Result<PlanBuilder<PdC, Dispatch<R>, Targetted>> {
193 set_single_region_store(self.plan, store, self.pd_client)
194 }
195}
196
197impl<PdC: PdClient, P: Plan + StoreRequest> PlanBuilder<PdC, P, NoTarget>
198where
199 P::Result: HasKeyErrors + HasRegionError,
200{
201 pub fn all_stores(
202 self,
203 backoff: Backoff,
204 ) -> PlanBuilder<PdC, RetryableAllStores<P, PdC>, Targetted> {
205 PlanBuilder {
206 pd_client: self.pd_client.clone(),
207 plan: RetryableAllStores {
208 inner: self.plan,
209 pd_client: self.pd_client,
210 backoff,
211 },
212 phantom: PhantomData,
213 }
214 }
215}
216
217impl<PdC: PdClient, P: Plan + Shardable> PlanBuilder<PdC, P, NoTarget>
218where
219 P::Result: HasKeyErrors,
220{
221 pub fn preserve_shard(self) -> PlanBuilder<PdC, PreserveShard<P>, NoTarget> {
222 PlanBuilder {
223 pd_client: self.pd_client.clone(),
224 plan: PreserveShard {
225 inner: self.plan,
226 shard: None,
227 },
228 phantom: PhantomData,
229 }
230 }
231}
232
233impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted>
234where
235 P::Result: HasKeyErrors + HasRegionErrors,
236{
237 pub fn extract_error(self) -> PlanBuilder<PdC, ExtractError<P>, Targetted> {
238 PlanBuilder {
239 pd_client: self.pd_client,
240 plan: ExtractError { inner: self.plan },
241 phantom: self.phantom,
242 }
243 }
244}
245
246fn set_single_region_store<PdC: PdClient, R: KvRequest>(
247 mut plan: Dispatch<R>,
248 store: RegionStore,
249 pd_client: Arc<PdC>,
250) -> Result<PlanBuilder<PdC, Dispatch<R>, Targetted>> {
251 plan.request
252 .set_context(store.region_with_leader.context()?);
253 plan.kv_client = Some(store.client);
254 Ok(PlanBuilder {
255 plan,
256 pd_client,
257 phantom: PhantomData,
258 })
259}
260
261pub trait SingleKey {
263 #[allow(clippy::ptr_arg)]
264 fn key(&self) -> &Vec<u8>;
265}