Skip to main content

couchbase_core/memdx/
ops_core.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::memdx::dispatcher::Dispatcher;
20use crate::memdx::error::Result;
21use crate::memdx::error::{Error, ServerError, ServerErrorKind};
22use crate::memdx::magic::Magic;
23use crate::memdx::op_auth_saslauto::OpSASLAutoEncoder;
24use crate::memdx::op_auth_saslbyname::OpSASLAuthByNameEncoder;
25use crate::memdx::op_auth_saslplain::OpSASLPlainEncoder;
26use crate::memdx::op_auth_saslscram::OpSASLScramEncoder;
27use crate::memdx::op_bootstrap::OpBootstrapEncoder;
28use crate::memdx::opcode::OpCode;
29use crate::memdx::packet::{RequestPacket, ResponsePacket};
30use crate::memdx::pendingop::StandardPendingOp;
31use crate::memdx::request::{
32    GetClusterConfigRequest, GetErrorMapRequest, HelloRequest, SASLAuthRequest,
33    SASLListMechsRequest, SASLStepRequest, SelectBucketRequest,
34};
35use crate::memdx::response::{
36    GetClusterConfigResponse, GetErrorMapResponse, HelloResponse, SASLAuthResponse,
37    SASLListMechsResponse, SASLStepResponse, SelectBucketResponse,
38};
39use crate::memdx::status::Status;
40use byteorder::ByteOrder;
41
42pub struct OpsCore {}
43
44impl OpsCore {
45    pub(crate) fn decode_error_context(
46        resp: &ResponsePacket,
47        kind: ServerErrorKind,
48    ) -> ServerError {
49        let mut base_cause = ServerError::new(kind, resp.op_code, resp.status, resp.opaque);
50
51        if let Some(value) = &resp.value {
52            if resp.status == Status::NotMyVbucket {
53                base_cause = base_cause.with_config(value.to_vec());
54            } else {
55                base_cause = base_cause.with_context(value.to_vec());
56            }
57        }
58
59        base_cause
60    }
61
62    pub(crate) fn decode_error(resp: &ResponsePacket) -> Error {
63        let status = resp.status;
64        let base_error_kind = if status == Status::NotMyVbucket {
65            ServerErrorKind::NotMyVbucket
66        } else if status == Status::TmpFail {
67            ServerErrorKind::TmpFail
68        } else if status == Status::NoBucket {
69            ServerErrorKind::NoBucket
70        } else if status == Status::AuthStale {
71            ServerErrorKind::AuthStale
72        } else if status == Status::InvalidArgs {
73            return Error::new_invalid_argument_error(
74                "the server rejected the request because one or more arguments were invalid",
75                None,
76            )
77            .with(Self::decode_error_context(
78                resp,
79                ServerErrorKind::InvalidArgs,
80            ));
81        } else {
82            ServerErrorKind::UnknownStatus { status }
83        };
84
85        Self::decode_error_context(resp, base_error_kind).into()
86    }
87}
88
89impl OpBootstrapEncoder for OpsCore {
90    async fn hello<D>(
91        &self,
92        dispatcher: &D,
93        request: HelloRequest,
94    ) -> Result<StandardPendingOp<HelloResponse>>
95    where
96        D: Dispatcher,
97    {
98        let mut features: Vec<u8> = Vec::new();
99        for feature in request.requested_features {
100            let feature: u16 = feature.into();
101            let bytes = feature.to_be_bytes();
102            features.extend_from_slice(&bytes);
103        }
104
105        let op = dispatcher
106            .dispatch(
107                RequestPacket {
108                    magic: Magic::Req,
109                    op_code: OpCode::Hello,
110                    datatype: 0,
111                    vbucket_id: None,
112                    cas: None,
113                    extras: None,
114                    key: None,
115                    value: Some(&features),
116                    framing_extras: None,
117                    opaque: None,
118                },
119                false,
120                None,
121            )
122            .await?;
123
124        Ok(StandardPendingOp::new(op))
125    }
126
127    async fn get_error_map<D>(
128        &self,
129        dispatcher: &D,
130        request: GetErrorMapRequest,
131    ) -> Result<StandardPendingOp<GetErrorMapResponse>>
132    where
133        D: Dispatcher,
134    {
135        let version = request.version.to_be_bytes();
136
137        let op = dispatcher
138            .dispatch(
139                RequestPacket {
140                    magic: Magic::Req,
141                    op_code: OpCode::GetErrorMap,
142                    datatype: 0,
143                    vbucket_id: None,
144                    cas: None,
145                    extras: None,
146                    key: None,
147                    value: Some(&version),
148                    framing_extras: None,
149                    opaque: None,
150                },
151                false,
152                None,
153            )
154            .await?;
155
156        Ok(StandardPendingOp::new(op))
157    }
158
159    async fn select_bucket<D>(
160        &self,
161        dispatcher: &D,
162        request: SelectBucketRequest,
163    ) -> Result<StandardPendingOp<SelectBucketResponse>>
164    where
165        D: Dispatcher,
166    {
167        let key = request.bucket_name.into_bytes();
168
169        let op = dispatcher
170            .dispatch(
171                RequestPacket {
172                    magic: Magic::Req,
173                    op_code: OpCode::SelectBucket,
174                    datatype: 0,
175                    vbucket_id: None,
176                    cas: None,
177                    extras: None,
178                    key: Some(&key),
179                    value: None,
180                    framing_extras: None,
181                    opaque: None,
182                },
183                false,
184                None,
185            )
186            .await?;
187
188        Ok(StandardPendingOp::new(op))
189    }
190
191    async fn get_cluster_config<D>(
192        &self,
193        dispatcher: &D,
194        request: GetClusterConfigRequest,
195    ) -> Result<StandardPendingOp<GetClusterConfigResponse>>
196    where
197        D: Dispatcher,
198    {
199        let mut extra_buf = [0; 16];
200        let extras = if let Some(known_version) = request.known_version {
201            byteorder::BigEndian::write_u64(&mut extra_buf[0..8], known_version.rev_epoch as u64);
202            byteorder::BigEndian::write_u64(&mut extra_buf[8..16], known_version.rev_id as u64);
203
204            Some(&extra_buf[..])
205        } else {
206            None
207        };
208
209        let op = dispatcher
210            .dispatch(
211                RequestPacket {
212                    magic: Magic::Req,
213                    op_code: OpCode::GetClusterConfig,
214                    datatype: 0,
215                    vbucket_id: None,
216                    cas: None,
217                    extras,
218                    key: None,
219                    value: None,
220                    framing_extras: None,
221                    opaque: None,
222                },
223                false,
224                None,
225            )
226            .await?;
227
228        Ok(StandardPendingOp::new(op))
229    }
230}
231
232impl OpSASLPlainEncoder for OpsCore {
233    async fn sasl_auth<D>(
234        &self,
235        dispatcher: &D,
236        request: SASLAuthRequest,
237    ) -> Result<StandardPendingOp<SASLAuthResponse>>
238    where
239        D: Dispatcher,
240    {
241        let mut value = Vec::new();
242        value.extend_from_slice(request.payload.as_slice());
243        let key: Vec<u8> = request.auth_mechanism.into();
244
245        let op = dispatcher
246            .dispatch(
247                RequestPacket {
248                    magic: Magic::Req,
249                    op_code: OpCode::SASLAuth,
250                    datatype: 0,
251                    vbucket_id: None,
252                    cas: None,
253                    extras: None,
254                    key: Some(&key),
255                    value: Some(&value),
256                    framing_extras: None,
257                    opaque: None,
258                },
259                false,
260                None,
261            )
262            .await?;
263
264        Ok(StandardPendingOp::new(op))
265    }
266}
267
268impl OpSASLAuthByNameEncoder for OpsCore {}
269
270impl OpSASLAutoEncoder for OpsCore {
271    async fn sasl_list_mechs<D>(
272        &self,
273        dispatcher: &D,
274        _request: SASLListMechsRequest,
275    ) -> Result<StandardPendingOp<SASLListMechsResponse>>
276    where
277        D: Dispatcher,
278    {
279        let op = dispatcher
280            .dispatch(
281                RequestPacket {
282                    magic: Magic::Req,
283                    op_code: OpCode::SASLListMechs,
284                    datatype: 0,
285                    vbucket_id: None,
286                    cas: None,
287                    extras: None,
288                    key: None,
289                    value: None,
290                    framing_extras: None,
291                    opaque: None,
292                },
293                false,
294                None,
295            )
296            .await?;
297
298        Ok(StandardPendingOp::new(op))
299    }
300}
301
302impl OpSASLScramEncoder for OpsCore {
303    async fn sasl_step<D>(
304        &self,
305        dispatcher: &D,
306        request: SASLStepRequest,
307    ) -> Result<StandardPendingOp<SASLStepResponse>>
308    where
309        D: Dispatcher,
310    {
311        let mut value = Vec::new();
312        value.extend_from_slice(request.payload.as_slice());
313        let key: Vec<u8> = request.auth_mechanism.into();
314
315        let op = dispatcher
316            .dispatch(
317                RequestPacket {
318                    magic: Magic::Req,
319                    op_code: OpCode::SASLStep,
320                    datatype: 0,
321                    vbucket_id: None,
322                    cas: None,
323                    extras: None,
324                    key: Some(&key),
325                    value: Some(&value),
326                    framing_extras: None,
327                    opaque: None,
328                },
329                false,
330                None,
331            )
332            .await?;
333
334        Ok(StandardPendingOp::new(op))
335    }
336}