1use std::marker::PhantomData;
4use std::sync::Arc;
5
6use super::plan::PreserveShard;
7use super::Keyspace;
8use crate::backoff::Backoff;
9use crate::pd::PdClient;
10use crate::request::plan::{CleanupLocks, RetryableAllStores};
11use crate::request::shard::HasNextBatch;
12use crate::request::Dispatch;
13use crate::request::ExtractError;
14use crate::request::KvRequest;
15use crate::request::Merge;
16use crate::request::MergeResponse;
17use crate::request::NextBatch;
18use crate::request::Plan;
19use crate::request::Process;
20use crate::request::ProcessResponse;
21use crate::request::ResolveLock;
22use crate::request::RetryableMultiRegion;
23use crate::request::Shardable;
24use crate::request::{DefaultProcessor, StoreRequest};
25use crate::store::HasKeyErrors;
26use crate::store::HasRegionError;
27use crate::store::HasRegionErrors;
28use crate::store::RegionStore;
29use crate::transaction::HasLocks;
30use crate::transaction::ResolveLocksContext;
31use crate::transaction::ResolveLocksOptions;
32use crate::Result;
33use crate::Timestamp;
34
35pub struct PlanBuilder<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> {
37 pd_client: Arc<PdC>,
38 plan: P,
39 phantom: PhantomData<Ph>,
40}
41
42pub trait PlanBuilderPhase {}
45pub struct NoTarget;
46impl PlanBuilderPhase for NoTarget {}
47pub struct Targetted;
48impl PlanBuilderPhase for Targetted {}
49
50impl<PdC: PdClient, Req: KvRequest> PlanBuilder<PdC, Dispatch<Req>, NoTarget> {
51 pub fn new(pd_client: Arc<PdC>, keyspace: Keyspace, mut request: Req) -> Self {
52 request.set_api_version(keyspace.api_version());
53 PlanBuilder {
54 pd_client,
55 plan: Dispatch {
56 request,
57 kv_client: None,
58 },
59 phantom: PhantomData,
60 }
61 }
62}
63
64impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted> {
65 pub fn plan(self) -> P {
68 self.plan
69 }
70}
71
72impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
73 pub fn resolve_lock(
75 self,
76 timestamp: Timestamp,
77 backoff: Backoff,
78 keyspace: Keyspace,
79 ) -> PlanBuilder<PdC, ResolveLock<P, PdC>, Ph>
80 where
81 P::Result: HasLocks,
82 {
83 PlanBuilder {
84 pd_client: self.pd_client.clone(),
85 plan: ResolveLock {
86 inner: self.plan,
87 timestamp,
88 backoff,
89 pd_client: self.pd_client,
90 keyspace,
91 },
92 phantom: PhantomData,
93 }
94 }
95
96 pub fn cleanup_locks(
97 self,
98 ctx: ResolveLocksContext,
99 options: ResolveLocksOptions,
100 backoff: Backoff,
101 keyspace: Keyspace,
102 ) -> PlanBuilder<PdC, CleanupLocks<P, PdC>, Ph>
103 where
104 P: Shardable + NextBatch,
105 P::Result: HasLocks + HasNextBatch + HasRegionError + HasKeyErrors,
106 {
107 PlanBuilder {
108 pd_client: self.pd_client.clone(),
109 plan: CleanupLocks {
110 inner: self.plan,
111 ctx,
112 options,
113 store: None,
114 backoff,
115 pd_client: self.pd_client,
116 keyspace,
117 },
118 phantom: PhantomData,
119 }
120 }
121
122 pub fn merge<In, M: Merge<In>>(self, merge: M) -> PlanBuilder<PdC, MergeResponse<P, In, M>, Ph>
125 where
126 In: Clone + Send + Sync + 'static,
127 P: Plan<Result = Vec<Result<In>>>,
128 {
129 PlanBuilder {
130 pd_client: self.pd_client.clone(),
131 plan: MergeResponse {
132 inner: self.plan,
133 merge,
134 phantom: PhantomData,
135 },
136 phantom: PhantomData,
137 }
138 }
139
140 pub fn post_process_default(self) -> PlanBuilder<PdC, ProcessResponse<P, DefaultProcessor>, Ph>
144 where
145 P: Plan,
146 DefaultProcessor: Process<P::Result>,
147 {
148 PlanBuilder {
149 pd_client: self.pd_client.clone(),
150 plan: ProcessResponse {
151 inner: self.plan,
152 processor: DefaultProcessor,
153 },
154 phantom: PhantomData,
155 }
156 }
157}
158
159impl<PdC: PdClient, P: Plan + Shardable> PlanBuilder<PdC, P, NoTarget>
160where
161 P::Result: HasKeyErrors + HasRegionError,
162{
163 pub fn retry_multi_region(
165 self,
166 backoff: Backoff,
167 ) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
168 self.make_retry_multi_region(backoff, false)
169 }
170
171 pub fn retry_multi_region_preserve_results(
174 self,
175 backoff: Backoff,
176 ) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
177 self.make_retry_multi_region(backoff, true)
178 }
179
180 fn make_retry_multi_region(
181 self,
182 backoff: Backoff,
183 preserve_region_results: bool,
184 ) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
185 PlanBuilder {
186 pd_client: self.pd_client.clone(),
187 plan: RetryableMultiRegion {
188 inner: self.plan,
189 pd_client: self.pd_client,
190 backoff,
191 preserve_region_results,
192 },
193 phantom: PhantomData,
194 }
195 }
196}
197
198impl<PdC: PdClient, R: KvRequest> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
199 pub async fn single_region_with_store(
201 self,
202 store: RegionStore,
203 ) -> Result<PlanBuilder<PdC, Dispatch<R>, Targetted>> {
204 set_single_region_store(self.plan, store, self.pd_client)
205 }
206}
207
208impl<PdC: PdClient, P: Plan + StoreRequest> PlanBuilder<PdC, P, NoTarget>
209where
210 P::Result: HasKeyErrors + HasRegionError,
211{
212 pub fn all_stores(
213 self,
214 backoff: Backoff,
215 ) -> PlanBuilder<PdC, RetryableAllStores<P, PdC>, Targetted> {
216 PlanBuilder {
217 pd_client: self.pd_client.clone(),
218 plan: RetryableAllStores {
219 inner: self.plan,
220 pd_client: self.pd_client,
221 backoff,
222 },
223 phantom: PhantomData,
224 }
225 }
226}
227
228impl<PdC: PdClient, P: Plan + Shardable> PlanBuilder<PdC, P, NoTarget>
229where
230 P::Result: HasKeyErrors,
231{
232 pub fn preserve_shard(self) -> PlanBuilder<PdC, PreserveShard<P>, NoTarget> {
233 PlanBuilder {
234 pd_client: self.pd_client.clone(),
235 plan: PreserveShard {
236 inner: self.plan,
237 shard: None,
238 },
239 phantom: PhantomData,
240 }
241 }
242}
243
244impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted>
245where
246 P::Result: HasKeyErrors + HasRegionErrors,
247{
248 pub fn extract_error(self) -> PlanBuilder<PdC, ExtractError<P>, Targetted> {
249 PlanBuilder {
250 pd_client: self.pd_client,
251 plan: ExtractError { inner: self.plan },
252 phantom: self.phantom,
253 }
254 }
255}
256
257fn set_single_region_store<PdC: PdClient, R: KvRequest>(
258 mut plan: Dispatch<R>,
259 store: RegionStore,
260 pd_client: Arc<PdC>,
261) -> Result<PlanBuilder<PdC, Dispatch<R>, Targetted>> {
262 plan.request.set_leader(&store.region_with_leader)?;
263 plan.kv_client = Some(store.client);
264 Ok(PlanBuilder {
265 plan,
266 pd_client,
267 phantom: PhantomData,
268 })
269}
270
271pub trait SingleKey {
273 #[allow(clippy::ptr_arg)]
274 fn key(&self) -> &Vec<u8>;
275}