1use crate::memdx::dispatcher::Dispatcher;
20use crate::memdx::error::Result;
21use crate::memdx::error::{Error, ServerError, ServerErrorKind};
22use crate::memdx::magic::Magic;
23use crate::memdx::op_auth_saslauto::OpSASLAutoEncoder;
24use crate::memdx::op_auth_saslbyname::OpSASLAuthByNameEncoder;
25use crate::memdx::op_auth_saslplain::OpSASLPlainEncoder;
26use crate::memdx::op_auth_saslscram::OpSASLScramEncoder;
27use crate::memdx::op_bootstrap::OpBootstrapEncoder;
28use crate::memdx::opcode::OpCode;
29use crate::memdx::packet::{RequestPacket, ResponsePacket};
30use crate::memdx::pendingop::StandardPendingOp;
31use crate::memdx::request::{
32 GetClusterConfigRequest, GetErrorMapRequest, HelloRequest, SASLAuthRequest,
33 SASLListMechsRequest, SASLStepRequest, SelectBucketRequest,
34};
35use crate::memdx::response::{
36 GetClusterConfigResponse, GetErrorMapResponse, HelloResponse, SASLAuthResponse,
37 SASLListMechsResponse, SASLStepResponse, SelectBucketResponse,
38};
39use crate::memdx::status::Status;
40use byteorder::ByteOrder;
41
42pub struct OpsCore {}
43
44impl OpsCore {
45 pub(crate) fn decode_error_context(
46 resp: &ResponsePacket,
47 kind: ServerErrorKind,
48 ) -> ServerError {
49 let mut base_cause = ServerError::new(kind, resp.op_code, resp.status, resp.opaque);
50
51 if let Some(value) = &resp.value {
52 if resp.status == Status::NotMyVbucket {
53 base_cause = base_cause.with_config(value.to_vec());
54 } else {
55 base_cause = base_cause.with_context(value.to_vec());
56 }
57 }
58
59 base_cause
60 }
61
62 pub(crate) fn decode_error(resp: &ResponsePacket) -> Error {
63 let status = resp.status;
64 let base_error_kind = if status == Status::NotMyVbucket {
65 ServerErrorKind::NotMyVbucket
66 } else if status == Status::TmpFail {
67 ServerErrorKind::TmpFail
68 } else if status == Status::NoBucket {
69 ServerErrorKind::NoBucket
70 } else if status == Status::InvalidArgs {
71 return Error::new_invalid_argument_error(
72 "the server rejected the request because one or more arguments were invalid",
73 None,
74 )
75 .with(Self::decode_error_context(
76 resp,
77 ServerErrorKind::InvalidArgs,
78 ));
79 } else {
80 ServerErrorKind::UnknownStatus { status }
81 };
82
83 Self::decode_error_context(resp, base_error_kind).into()
84 }
85}
86
87impl OpBootstrapEncoder for OpsCore {
88 async fn hello<D>(
89 &self,
90 dispatcher: &D,
91 request: HelloRequest,
92 ) -> Result<StandardPendingOp<HelloResponse>>
93 where
94 D: Dispatcher,
95 {
96 let mut features: Vec<u8> = Vec::new();
97 for feature in request.requested_features {
98 let feature: u16 = feature.into();
99 let bytes = feature.to_be_bytes();
100 features.extend_from_slice(&bytes);
101 }
102
103 let op = dispatcher
104 .dispatch(
105 RequestPacket {
106 magic: Magic::Req,
107 op_code: OpCode::Hello,
108 datatype: 0,
109 vbucket_id: None,
110 cas: None,
111 extras: None,
112 key: None,
113 value: Some(&features),
114 framing_extras: None,
115 opaque: None,
116 },
117 None,
118 )
119 .await?;
120
121 Ok(StandardPendingOp::new(op))
122 }
123
124 async fn get_error_map<D>(
125 &self,
126 dispatcher: &D,
127 request: GetErrorMapRequest,
128 ) -> Result<StandardPendingOp<GetErrorMapResponse>>
129 where
130 D: Dispatcher,
131 {
132 let version = request.version.to_be_bytes();
133
134 let op = dispatcher
135 .dispatch(
136 RequestPacket {
137 magic: Magic::Req,
138 op_code: OpCode::GetErrorMap,
139 datatype: 0,
140 vbucket_id: None,
141 cas: None,
142 extras: None,
143 key: None,
144 value: Some(&version),
145 framing_extras: None,
146 opaque: None,
147 },
148 None,
149 )
150 .await?;
151
152 Ok(StandardPendingOp::new(op))
153 }
154
155 async fn select_bucket<D>(
156 &self,
157 dispatcher: &D,
158 request: SelectBucketRequest,
159 ) -> Result<StandardPendingOp<SelectBucketResponse>>
160 where
161 D: Dispatcher,
162 {
163 let key = request.bucket_name.into_bytes();
164
165 let op = dispatcher
166 .dispatch(
167 RequestPacket {
168 magic: Magic::Req,
169 op_code: OpCode::SelectBucket,
170 datatype: 0,
171 vbucket_id: None,
172 cas: None,
173 extras: None,
174 key: Some(&key),
175 value: None,
176 framing_extras: None,
177 opaque: None,
178 },
179 None,
180 )
181 .await?;
182
183 Ok(StandardPendingOp::new(op))
184 }
185
186 async fn get_cluster_config<D>(
187 &self,
188 dispatcher: &D,
189 request: GetClusterConfigRequest,
190 ) -> Result<StandardPendingOp<GetClusterConfigResponse>>
191 where
192 D: Dispatcher,
193 {
194 let mut extra_buf = [0; 16];
195 let extras = if let Some(known_version) = request.known_version {
196 byteorder::BigEndian::write_u64(&mut extra_buf[0..8], known_version.rev_epoch as u64);
197 byteorder::BigEndian::write_u64(&mut extra_buf[8..16], known_version.rev_id as u64);
198
199 Some(&extra_buf[..])
200 } else {
201 None
202 };
203
204 let op = dispatcher
205 .dispatch(
206 RequestPacket {
207 magic: Magic::Req,
208 op_code: OpCode::GetClusterConfig,
209 datatype: 0,
210 vbucket_id: None,
211 cas: None,
212 extras,
213 key: None,
214 value: None,
215 framing_extras: None,
216 opaque: None,
217 },
218 None,
219 )
220 .await?;
221
222 Ok(StandardPendingOp::new(op))
223 }
224}
225
226impl OpSASLPlainEncoder for OpsCore {
227 async fn sasl_auth<D>(
228 &self,
229 dispatcher: &D,
230 request: SASLAuthRequest,
231 ) -> Result<StandardPendingOp<SASLAuthResponse>>
232 where
233 D: Dispatcher,
234 {
235 let mut value = Vec::new();
236 value.extend_from_slice(request.payload.as_slice());
237 let key: Vec<u8> = request.auth_mechanism.into();
238
239 let op = dispatcher
240 .dispatch(
241 RequestPacket {
242 magic: Magic::Req,
243 op_code: OpCode::SASLAuth,
244 datatype: 0,
245 vbucket_id: None,
246 cas: None,
247 extras: None,
248 key: Some(&key),
249 value: Some(&value),
250 framing_extras: None,
251 opaque: None,
252 },
253 None,
254 )
255 .await?;
256
257 Ok(StandardPendingOp::new(op))
258 }
259}
260
261impl OpSASLAuthByNameEncoder for OpsCore {}
262
263impl OpSASLAutoEncoder for OpsCore {
264 async fn sasl_list_mechs<D>(
265 &self,
266 dispatcher: &D,
267 _request: SASLListMechsRequest,
268 ) -> Result<StandardPendingOp<SASLListMechsResponse>>
269 where
270 D: Dispatcher,
271 {
272 let op = dispatcher
273 .dispatch(
274 RequestPacket {
275 magic: Magic::Req,
276 op_code: OpCode::SASLListMechs,
277 datatype: 0,
278 vbucket_id: None,
279 cas: None,
280 extras: None,
281 key: None,
282 value: None,
283 framing_extras: None,
284 opaque: None,
285 },
286 None,
287 )
288 .await?;
289
290 Ok(StandardPendingOp::new(op))
291 }
292}
293
294impl OpSASLScramEncoder for OpsCore {
295 async fn sasl_step<D>(
296 &self,
297 dispatcher: &D,
298 request: SASLStepRequest,
299 ) -> Result<StandardPendingOp<SASLStepResponse>>
300 where
301 D: Dispatcher,
302 {
303 let mut value = Vec::new();
304 value.extend_from_slice(request.payload.as_slice());
305 let key: Vec<u8> = request.auth_mechanism.into();
306
307 let op = dispatcher
308 .dispatch(
309 RequestPacket {
310 magic: Magic::Req,
311 op_code: OpCode::SASLStep,
312 datatype: 0,
313 vbucket_id: None,
314 cas: None,
315 extras: None,
316 key: Some(&key),
317 value: Some(&value),
318 framing_extras: None,
319 opaque: None,
320 },
321 None,
322 )
323 .await?;
324
325 Ok(StandardPendingOp::new(op))
326 }
327}