couchbase_core/memdx/
ops_core.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::memdx::dispatcher::Dispatcher;
20use crate::memdx::error::Result;
21use crate::memdx::error::{Error, ServerError, ServerErrorKind};
22use crate::memdx::magic::Magic;
23use crate::memdx::op_auth_saslauto::OpSASLAutoEncoder;
24use crate::memdx::op_auth_saslbyname::OpSASLAuthByNameEncoder;
25use crate::memdx::op_auth_saslplain::OpSASLPlainEncoder;
26use crate::memdx::op_auth_saslscram::OpSASLScramEncoder;
27use crate::memdx::op_bootstrap::OpBootstrapEncoder;
28use crate::memdx::opcode::OpCode;
29use crate::memdx::packet::{RequestPacket, ResponsePacket};
30use crate::memdx::pendingop::StandardPendingOp;
31use crate::memdx::request::{
32    GetClusterConfigRequest, GetErrorMapRequest, HelloRequest, SASLAuthRequest,
33    SASLListMechsRequest, SASLStepRequest, SelectBucketRequest,
34};
35use crate::memdx::response::{
36    GetClusterConfigResponse, GetErrorMapResponse, HelloResponse, SASLAuthResponse,
37    SASLListMechsResponse, SASLStepResponse, SelectBucketResponse,
38};
39use crate::memdx::status::Status;
40use byteorder::ByteOrder;
41
42pub struct OpsCore {}
43
44impl OpsCore {
45    pub(crate) fn decode_error_context(
46        resp: &ResponsePacket,
47        kind: ServerErrorKind,
48    ) -> ServerError {
49        let mut base_cause = ServerError::new(kind, resp.op_code, resp.status, resp.opaque);
50
51        if let Some(value) = &resp.value {
52            if resp.status == Status::NotMyVbucket {
53                base_cause = base_cause.with_config(value.to_vec());
54            } else {
55                base_cause = base_cause.with_context(value.to_vec());
56            }
57        }
58
59        base_cause
60    }
61
62    pub(crate) fn decode_error(resp: &ResponsePacket) -> Error {
63        let status = resp.status;
64        let base_error_kind = if status == Status::NotMyVbucket {
65            ServerErrorKind::NotMyVbucket
66        } else if status == Status::TmpFail {
67            ServerErrorKind::TmpFail
68        } else if status == Status::NoBucket {
69            ServerErrorKind::NoBucket
70        } else if status == Status::InvalidArgs {
71            return Error::new_invalid_argument_error(
72                "the server rejected the request because one or more arguments were invalid",
73                None,
74            )
75            .with(Self::decode_error_context(
76                resp,
77                ServerErrorKind::InvalidArgs,
78            ));
79        } else {
80            ServerErrorKind::UnknownStatus { status }
81        };
82
83        Self::decode_error_context(resp, base_error_kind).into()
84    }
85}
86
87impl OpBootstrapEncoder for OpsCore {
88    async fn hello<D>(
89        &self,
90        dispatcher: &D,
91        request: HelloRequest,
92    ) -> Result<StandardPendingOp<HelloResponse>>
93    where
94        D: Dispatcher,
95    {
96        let mut features: Vec<u8> = Vec::new();
97        for feature in request.requested_features {
98            let feature: u16 = feature.into();
99            let bytes = feature.to_be_bytes();
100            features.extend_from_slice(&bytes);
101        }
102
103        let op = dispatcher
104            .dispatch(
105                RequestPacket {
106                    magic: Magic::Req,
107                    op_code: OpCode::Hello,
108                    datatype: 0,
109                    vbucket_id: None,
110                    cas: None,
111                    extras: None,
112                    key: None,
113                    value: Some(&features),
114                    framing_extras: None,
115                    opaque: None,
116                },
117                None,
118            )
119            .await?;
120
121        Ok(StandardPendingOp::new(op))
122    }
123
124    async fn get_error_map<D>(
125        &self,
126        dispatcher: &D,
127        request: GetErrorMapRequest,
128    ) -> Result<StandardPendingOp<GetErrorMapResponse>>
129    where
130        D: Dispatcher,
131    {
132        let version = request.version.to_be_bytes();
133
134        let op = dispatcher
135            .dispatch(
136                RequestPacket {
137                    magic: Magic::Req,
138                    op_code: OpCode::GetErrorMap,
139                    datatype: 0,
140                    vbucket_id: None,
141                    cas: None,
142                    extras: None,
143                    key: None,
144                    value: Some(&version),
145                    framing_extras: None,
146                    opaque: None,
147                },
148                None,
149            )
150            .await?;
151
152        Ok(StandardPendingOp::new(op))
153    }
154
155    async fn select_bucket<D>(
156        &self,
157        dispatcher: &D,
158        request: SelectBucketRequest,
159    ) -> Result<StandardPendingOp<SelectBucketResponse>>
160    where
161        D: Dispatcher,
162    {
163        let key = request.bucket_name.into_bytes();
164
165        let op = dispatcher
166            .dispatch(
167                RequestPacket {
168                    magic: Magic::Req,
169                    op_code: OpCode::SelectBucket,
170                    datatype: 0,
171                    vbucket_id: None,
172                    cas: None,
173                    extras: None,
174                    key: Some(&key),
175                    value: None,
176                    framing_extras: None,
177                    opaque: None,
178                },
179                None,
180            )
181            .await?;
182
183        Ok(StandardPendingOp::new(op))
184    }
185
186    async fn get_cluster_config<D>(
187        &self,
188        dispatcher: &D,
189        request: GetClusterConfigRequest,
190    ) -> Result<StandardPendingOp<GetClusterConfigResponse>>
191    where
192        D: Dispatcher,
193    {
194        let mut extra_buf = [0; 16];
195        let extras = if let Some(known_version) = request.known_version {
196            byteorder::BigEndian::write_u64(&mut extra_buf[0..8], known_version.rev_epoch as u64);
197            byteorder::BigEndian::write_u64(&mut extra_buf[8..16], known_version.rev_id as u64);
198
199            Some(&extra_buf[..])
200        } else {
201            None
202        };
203
204        let op = dispatcher
205            .dispatch(
206                RequestPacket {
207                    magic: Magic::Req,
208                    op_code: OpCode::GetClusterConfig,
209                    datatype: 0,
210                    vbucket_id: None,
211                    cas: None,
212                    extras,
213                    key: None,
214                    value: None,
215                    framing_extras: None,
216                    opaque: None,
217                },
218                None,
219            )
220            .await?;
221
222        Ok(StandardPendingOp::new(op))
223    }
224}
225
226impl OpSASLPlainEncoder for OpsCore {
227    async fn sasl_auth<D>(
228        &self,
229        dispatcher: &D,
230        request: SASLAuthRequest,
231    ) -> Result<StandardPendingOp<SASLAuthResponse>>
232    where
233        D: Dispatcher,
234    {
235        let mut value = Vec::new();
236        value.extend_from_slice(request.payload.as_slice());
237        let key: Vec<u8> = request.auth_mechanism.into();
238
239        let op = dispatcher
240            .dispatch(
241                RequestPacket {
242                    magic: Magic::Req,
243                    op_code: OpCode::SASLAuth,
244                    datatype: 0,
245                    vbucket_id: None,
246                    cas: None,
247                    extras: None,
248                    key: Some(&key),
249                    value: Some(&value),
250                    framing_extras: None,
251                    opaque: None,
252                },
253                None,
254            )
255            .await?;
256
257        Ok(StandardPendingOp::new(op))
258    }
259}
260
261impl OpSASLAuthByNameEncoder for OpsCore {}
262
263impl OpSASLAutoEncoder for OpsCore {
264    async fn sasl_list_mechs<D>(
265        &self,
266        dispatcher: &D,
267        _request: SASLListMechsRequest,
268    ) -> Result<StandardPendingOp<SASLListMechsResponse>>
269    where
270        D: Dispatcher,
271    {
272        let op = dispatcher
273            .dispatch(
274                RequestPacket {
275                    magic: Magic::Req,
276                    op_code: OpCode::SASLListMechs,
277                    datatype: 0,
278                    vbucket_id: None,
279                    cas: None,
280                    extras: None,
281                    key: None,
282                    value: None,
283                    framing_extras: None,
284                    opaque: None,
285                },
286                None,
287            )
288            .await?;
289
290        Ok(StandardPendingOp::new(op))
291    }
292}
293
294impl OpSASLScramEncoder for OpsCore {
295    async fn sasl_step<D>(
296        &self,
297        dispatcher: &D,
298        request: SASLStepRequest,
299    ) -> Result<StandardPendingOp<SASLStepResponse>>
300    where
301        D: Dispatcher,
302    {
303        let mut value = Vec::new();
304        value.extend_from_slice(request.payload.as_slice());
305        let key: Vec<u8> = request.auth_mechanism.into();
306
307        let op = dispatcher
308            .dispatch(
309                RequestPacket {
310                    magic: Magic::Req,
311                    op_code: OpCode::SASLStep,
312                    datatype: 0,
313                    vbucket_id: None,
314                    cas: None,
315                    extras: None,
316                    key: Some(&key),
317                    value: Some(&value),
318                    framing_extras: None,
319                    opaque: None,
320                },
321                None,
322            )
323            .await?;
324
325        Ok(StandardPendingOp::new(op))
326    }
327}