1use crate::memdx::dispatcher::Dispatcher;
20use crate::memdx::error::Result;
21use crate::memdx::error::{Error, ServerError, ServerErrorKind};
22use crate::memdx::magic::Magic;
23use crate::memdx::op_auth_saslauto::OpSASLAutoEncoder;
24use crate::memdx::op_auth_saslbyname::OpSASLAuthByNameEncoder;
25use crate::memdx::op_auth_saslplain::OpSASLPlainEncoder;
26use crate::memdx::op_auth_saslscram::OpSASLScramEncoder;
27use crate::memdx::op_bootstrap::OpBootstrapEncoder;
28use crate::memdx::opcode::OpCode;
29use crate::memdx::packet::{RequestPacket, ResponsePacket};
30use crate::memdx::pendingop::StandardPendingOp;
31use crate::memdx::request::{
32 GetClusterConfigRequest, GetErrorMapRequest, HelloRequest, SASLAuthRequest,
33 SASLListMechsRequest, SASLStepRequest, SelectBucketRequest,
34};
35use crate::memdx::response::{
36 GetClusterConfigResponse, GetErrorMapResponse, HelloResponse, SASLAuthResponse,
37 SASLListMechsResponse, SASLStepResponse, SelectBucketResponse,
38};
39use crate::memdx::status::Status;
40use byteorder::ByteOrder;
41
42pub struct OpsCore {}
43
44impl OpsCore {
45 pub(crate) fn decode_error_context(
46 resp: &ResponsePacket,
47 kind: ServerErrorKind,
48 ) -> ServerError {
49 let mut base_cause = ServerError::new(kind, resp.op_code, resp.status, resp.opaque);
50
51 if let Some(value) = &resp.value {
52 if resp.status == Status::NotMyVbucket {
53 base_cause = base_cause.with_config(value.to_vec());
54 } else {
55 base_cause = base_cause.with_context(value.to_vec());
56 }
57 }
58
59 base_cause
60 }
61
62 pub(crate) fn decode_error(resp: &ResponsePacket) -> Error {
63 let status = resp.status;
64 let base_error_kind = if status == Status::NotMyVbucket {
65 ServerErrorKind::NotMyVbucket
66 } else if status == Status::TmpFail {
67 ServerErrorKind::TmpFail
68 } else if status == Status::NoBucket {
69 ServerErrorKind::NoBucket
70 } else if status == Status::AuthStale {
71 ServerErrorKind::AuthStale
72 } else if status == Status::InvalidArgs {
73 return Error::new_invalid_argument_error(
74 "the server rejected the request because one or more arguments were invalid",
75 None,
76 )
77 .with(Self::decode_error_context(
78 resp,
79 ServerErrorKind::InvalidArgs,
80 ));
81 } else {
82 ServerErrorKind::UnknownStatus { status }
83 };
84
85 Self::decode_error_context(resp, base_error_kind).into()
86 }
87}
88
89impl OpBootstrapEncoder for OpsCore {
90 async fn hello<D>(
91 &self,
92 dispatcher: &D,
93 request: HelloRequest,
94 ) -> Result<StandardPendingOp<HelloResponse>>
95 where
96 D: Dispatcher,
97 {
98 let mut features: Vec<u8> = Vec::new();
99 for feature in request.requested_features {
100 let feature: u16 = feature.into();
101 let bytes = feature.to_be_bytes();
102 features.extend_from_slice(&bytes);
103 }
104
105 let op = dispatcher
106 .dispatch(
107 RequestPacket {
108 magic: Magic::Req,
109 op_code: OpCode::Hello,
110 datatype: 0,
111 vbucket_id: None,
112 cas: None,
113 extras: None,
114 key: None,
115 value: Some(&features),
116 framing_extras: None,
117 opaque: None,
118 },
119 false,
120 None,
121 )
122 .await?;
123
124 Ok(StandardPendingOp::new(op))
125 }
126
127 async fn get_error_map<D>(
128 &self,
129 dispatcher: &D,
130 request: GetErrorMapRequest,
131 ) -> Result<StandardPendingOp<GetErrorMapResponse>>
132 where
133 D: Dispatcher,
134 {
135 let version = request.version.to_be_bytes();
136
137 let op = dispatcher
138 .dispatch(
139 RequestPacket {
140 magic: Magic::Req,
141 op_code: OpCode::GetErrorMap,
142 datatype: 0,
143 vbucket_id: None,
144 cas: None,
145 extras: None,
146 key: None,
147 value: Some(&version),
148 framing_extras: None,
149 opaque: None,
150 },
151 false,
152 None,
153 )
154 .await?;
155
156 Ok(StandardPendingOp::new(op))
157 }
158
159 async fn select_bucket<D>(
160 &self,
161 dispatcher: &D,
162 request: SelectBucketRequest,
163 ) -> Result<StandardPendingOp<SelectBucketResponse>>
164 where
165 D: Dispatcher,
166 {
167 let key = request.bucket_name.into_bytes();
168
169 let op = dispatcher
170 .dispatch(
171 RequestPacket {
172 magic: Magic::Req,
173 op_code: OpCode::SelectBucket,
174 datatype: 0,
175 vbucket_id: None,
176 cas: None,
177 extras: None,
178 key: Some(&key),
179 value: None,
180 framing_extras: None,
181 opaque: None,
182 },
183 false,
184 None,
185 )
186 .await?;
187
188 Ok(StandardPendingOp::new(op))
189 }
190
191 async fn get_cluster_config<D>(
192 &self,
193 dispatcher: &D,
194 request: GetClusterConfigRequest,
195 ) -> Result<StandardPendingOp<GetClusterConfigResponse>>
196 where
197 D: Dispatcher,
198 {
199 let mut extra_buf = [0; 16];
200 let extras = if let Some(known_version) = request.known_version {
201 byteorder::BigEndian::write_u64(&mut extra_buf[0..8], known_version.rev_epoch as u64);
202 byteorder::BigEndian::write_u64(&mut extra_buf[8..16], known_version.rev_id as u64);
203
204 Some(&extra_buf[..])
205 } else {
206 None
207 };
208
209 let op = dispatcher
210 .dispatch(
211 RequestPacket {
212 magic: Magic::Req,
213 op_code: OpCode::GetClusterConfig,
214 datatype: 0,
215 vbucket_id: None,
216 cas: None,
217 extras,
218 key: None,
219 value: None,
220 framing_extras: None,
221 opaque: None,
222 },
223 false,
224 None,
225 )
226 .await?;
227
228 Ok(StandardPendingOp::new(op))
229 }
230}
231
232impl OpSASLPlainEncoder for OpsCore {
233 async fn sasl_auth<D>(
234 &self,
235 dispatcher: &D,
236 request: SASLAuthRequest,
237 ) -> Result<StandardPendingOp<SASLAuthResponse>>
238 where
239 D: Dispatcher,
240 {
241 let mut value = Vec::new();
242 value.extend_from_slice(request.payload.as_slice());
243 let key: Vec<u8> = request.auth_mechanism.into();
244
245 let op = dispatcher
246 .dispatch(
247 RequestPacket {
248 magic: Magic::Req,
249 op_code: OpCode::SASLAuth,
250 datatype: 0,
251 vbucket_id: None,
252 cas: None,
253 extras: None,
254 key: Some(&key),
255 value: Some(&value),
256 framing_extras: None,
257 opaque: None,
258 },
259 false,
260 None,
261 )
262 .await?;
263
264 Ok(StandardPendingOp::new(op))
265 }
266}
267
268impl OpSASLAuthByNameEncoder for OpsCore {}
269
270impl OpSASLAutoEncoder for OpsCore {
271 async fn sasl_list_mechs<D>(
272 &self,
273 dispatcher: &D,
274 _request: SASLListMechsRequest,
275 ) -> Result<StandardPendingOp<SASLListMechsResponse>>
276 where
277 D: Dispatcher,
278 {
279 let op = dispatcher
280 .dispatch(
281 RequestPacket {
282 magic: Magic::Req,
283 op_code: OpCode::SASLListMechs,
284 datatype: 0,
285 vbucket_id: None,
286 cas: None,
287 extras: None,
288 key: None,
289 value: None,
290 framing_extras: None,
291 opaque: None,
292 },
293 false,
294 None,
295 )
296 .await?;
297
298 Ok(StandardPendingOp::new(op))
299 }
300}
301
302impl OpSASLScramEncoder for OpsCore {
303 async fn sasl_step<D>(
304 &self,
305 dispatcher: &D,
306 request: SASLStepRequest,
307 ) -> Result<StandardPendingOp<SASLStepResponse>>
308 where
309 D: Dispatcher,
310 {
311 let mut value = Vec::new();
312 value.extend_from_slice(request.payload.as_slice());
313 let key: Vec<u8> = request.auth_mechanism.into();
314
315 let op = dispatcher
316 .dispatch(
317 RequestPacket {
318 magic: Magic::Req,
319 op_code: OpCode::SASLStep,
320 datatype: 0,
321 vbucket_id: None,
322 cas: None,
323 extras: None,
324 key: Some(&key),
325 value: Some(&value),
326 framing_extras: None,
327 opaque: None,
328 },
329 false,
330 None,
331 )
332 .await?;
333
334 Ok(StandardPendingOp::new(op))
335 }
336}