couchbase_core/memdx/
response.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use std::io::{Cursor, Read};
20use std::time::Duration;
21
22use crate::memdx::auth_mechanism::AuthMechanism;
23use crate::memdx::client_response::ClientResponse;
24use crate::memdx::error::{
25    Error, ResourceError, ServerError, ServerErrorKind, SubdocError, SubdocErrorKind,
26};
27use crate::memdx::extframe::decode_res_ext_frames;
28use crate::memdx::hello_feature::HelloFeature;
29use crate::memdx::ops_core::OpsCore;
30use crate::memdx::ops_crud::OpsCrud;
31use crate::memdx::status::Status;
32use crate::memdx::subdoc::{SubDocResult, SubdocDocFlag};
33use byteorder::{BigEndian, ReadBytesExt};
34use tokio_io::Buf;
35
36pub trait TryFromClientResponse: Sized {
37    fn try_from(resp: ClientResponse) -> Result<Self, Error>;
38}
39
40#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
41pub struct HelloResponse {
42    pub enabled_features: Vec<HelloFeature>,
43}
44
45impl TryFromClientResponse for HelloResponse {
46    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
47        let packet = resp.packet();
48        let status = packet.status;
49        if status != Status::Success {
50            return Err(OpsCore::decode_error(&packet));
51        }
52
53        let mut features: Vec<HelloFeature> = Vec::new();
54        if let Some(value) = &packet.value {
55            if value.len() % 2 != 0 {
56                return Err(Error::new_protocol_error("invalid hello features length"));
57            }
58
59            let mut cursor = Cursor::new(value);
60            while let Ok(code) = cursor.read_u16::<BigEndian>() {
61                features.push(HelloFeature::from(code));
62            }
63        }
64        let response = HelloResponse {
65            enabled_features: features,
66        };
67
68        Ok(response)
69    }
70}
71
72#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
73pub struct GetErrorMapResponse {
74    pub error_map: Vec<u8>,
75}
76
77impl TryFromClientResponse for GetErrorMapResponse {
78    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
79        let packet = resp.packet();
80        let status = packet.status;
81        if status != Status::Success {
82            return Err(OpsCore::decode_error(&packet));
83        }
84
85        let value = packet.value.unwrap_or_default();
86        let response = GetErrorMapResponse { error_map: value };
87
88        Ok(response)
89    }
90}
91
92#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
93pub struct SelectBucketResponse {}
94
95impl TryFromClientResponse for SelectBucketResponse {
96    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
97        let packet = resp.packet();
98        let status = packet.status;
99        if status != Status::Success {
100            if status == Status::AccessError || status == Status::KeyNotFound {
101                return Err(ServerError::new(
102                    ServerErrorKind::UnknownBucketName,
103                    packet.op_code,
104                    status,
105                    packet.opaque,
106                )
107                .into());
108            }
109            return Err(OpsCore::decode_error(&packet));
110        }
111
112        Ok(SelectBucketResponse {})
113    }
114}
115
116#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
117pub struct SASLAuthResponse {
118    pub needs_more_steps: bool,
119    pub payload: Vec<u8>,
120}
121
122impl TryFromClientResponse for SASLAuthResponse {
123    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
124        let packet = resp.packet();
125        let status = packet.status;
126        if status == Status::AuthContinue {
127            return Ok(SASLAuthResponse {
128                needs_more_steps: true,
129                payload: packet.value.unwrap_or_default(),
130            });
131        }
132
133        if status != Status::Success {
134            return Err(OpsCore::decode_error(&packet));
135        }
136
137        Ok(SASLAuthResponse {
138            needs_more_steps: false,
139            payload: packet.value.unwrap_or_default(),
140        })
141    }
142}
143
144#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
145pub struct SASLStepResponse {
146    pub needs_more_steps: bool,
147    pub payload: Vec<u8>,
148}
149
150impl TryFromClientResponse for SASLStepResponse {
151    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
152        let packet = resp.packet();
153        let status = packet.status;
154        if status != Status::Success {
155            return Err(OpsCore::decode_error(&packet));
156        }
157
158        Ok(SASLStepResponse {
159            needs_more_steps: false,
160            payload: packet.value.unwrap_or_default(),
161        })
162    }
163}
164
165#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
166pub struct SASLListMechsResponse {
167    pub available_mechs: Vec<AuthMechanism>,
168}
169
170impl TryFromClientResponse for SASLListMechsResponse {
171    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
172        let packet = resp.packet();
173        let status = packet.status;
174        if status != Status::Success {
175            if status == Status::KeyNotFound {
176                // KeyNotFound appears here when the bucket was initialized by ns_server, but
177                // ns_server has not posted a configuration for the bucket to kv_engine yet. We
178                // transform this into a ErrTmpFail as we make the assumption that the
179                // SelectBucket will have failed if this was anything but a transient issue.
180                return Err(ServerError::new(
181                    ServerErrorKind::ConfigNotSet,
182                    packet.op_code,
183                    status,
184                    packet.opaque,
185                )
186                .into());
187            }
188            return Err(OpsCore::decode_error(&packet));
189        }
190
191        let value = packet.value.unwrap_or_default();
192        let mechs_list_string = match String::from_utf8(value) {
193            Ok(v) => v,
194            Err(e) => {
195                return Err(Error::new_protocol_error(
196                    "failed to parse authentication mechanism list",
197                )
198                .with(e));
199            }
200        };
201        let mechs_list_split = mechs_list_string.split(' ');
202        let mut mechs_list = Vec::new();
203        for item in mechs_list_split {
204            mechs_list.push(AuthMechanism::try_from(item)?);
205        }
206
207        Ok(SASLListMechsResponse {
208            available_mechs: mechs_list,
209        })
210    }
211}
212
213#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
214pub struct GetClusterConfigResponse {
215    pub config: Vec<u8>,
216}
217
218impl TryFromClientResponse for GetClusterConfigResponse {
219    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
220        let packet = resp.packet();
221        let status = packet.status;
222        if status != Status::Success {
223            return Err(OpsCore::decode_error(&packet));
224        }
225
226        Ok(GetClusterConfigResponse {
227            config: packet.value.clone().unwrap_or_default(),
228        })
229    }
230}
231
232#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
233pub struct BootstrapResult {
234    pub hello: Option<HelloResponse>,
235    pub error_map: Option<GetErrorMapResponse>,
236    pub cluster_config: Option<GetClusterConfigResponse>,
237}
238
239#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
240pub struct MutationToken {
241    pub vbuuid: u64,
242    pub seqno: u64,
243}
244
245impl TryFrom<&Vec<u8>> for MutationToken {
246    type Error = Error;
247
248    fn try_from(value: &Vec<u8>) -> Result<Self, Self::Error> {
249        if value.len() != 16 {
250            return Err(Error::new_protocol_error("bad extras length"));
251        }
252
253        let (vbuuid_bytes, seqno_bytes) = value.split_at(size_of::<u64>());
254        let vbuuid = u64::from_be_bytes(vbuuid_bytes.try_into().unwrap());
255        let seqno = u64::from_be_bytes(seqno_bytes.try_into().unwrap());
256
257        Ok(MutationToken { vbuuid, seqno })
258    }
259}
260
261#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
262pub struct SetResponse {
263    pub cas: u64,
264    pub mutation_token: Option<MutationToken>,
265    pub server_duration: Option<Duration>,
266}
267
268impl TryFromClientResponse for SetResponse {
269    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
270        let packet = resp.packet();
271        let status = packet.status;
272
273        let kind = if status == Status::TooBig {
274            Some(ServerErrorKind::TooBig)
275        } else if status == Status::Locked {
276            Some(ServerErrorKind::Locked)
277        } else if status == Status::KeyExists {
278            Some(ServerErrorKind::CasMismatch)
279        } else if status == Status::Success {
280            None
281        } else {
282            return Err(OpsCrud::decode_common_mutation_error(&packet));
283        };
284
285        if let Some(kind) = kind {
286            return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
287        }
288
289        let mutation_token = if let Some(extras) = &packet.extras {
290            Some(MutationToken::try_from(extras)?)
291        } else {
292            None
293        };
294
295        let server_duration = if let Some(f) = &packet.framing_extras {
296            decode_res_ext_frames(f)?
297        } else {
298            None
299        };
300
301        Ok(SetResponse {
302            cas: packet.cas.unwrap_or_default(),
303            mutation_token,
304            server_duration,
305        })
306    }
307}
308
309fn parse_flags(extras: &Option<Vec<u8>>) -> Result<u32, Error> {
310    if let Some(extras) = &extras {
311        if extras.len() != 4 {
312            return Err(Error::new_protocol_error("bad extras length reading flags"));
313        }
314
315        Ok(u32::from_be_bytes(extras.as_slice().try_into().unwrap()))
316    } else {
317        Err(Error::new_protocol_error("no extras in response"))
318    }
319}
320
321#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
322pub struct GetResponse {
323    pub cas: u64,
324    pub flags: u32,
325    pub value: Vec<u8>,
326    pub datatype: u8,
327    pub server_duration: Option<Duration>,
328}
329
330impl TryFromClientResponse for GetResponse {
331    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
332        let packet = resp.packet();
333        let status = packet.status;
334
335        if status == Status::KeyNotFound {
336            return Err(ServerError::new(
337                ServerErrorKind::KeyNotFound,
338                packet.op_code,
339                packet.status,
340                packet.opaque,
341            )
342            .into());
343        } else if status != Status::Success {
344            return Err(OpsCrud::decode_common_error(&packet));
345        }
346
347        let flags = parse_flags(&packet.extras)?;
348
349        let server_duration = if let Some(f) = &packet.framing_extras {
350            decode_res_ext_frames(f)?
351        } else {
352            None
353        };
354
355        let value = packet.value.unwrap_or_default();
356
357        Ok(GetResponse {
358            cas: packet.cas.unwrap_or_default(),
359            flags,
360            value,
361            datatype: packet.datatype,
362            server_duration,
363        })
364    }
365}
366
367#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
368pub struct GetMetaResponse {
369    pub cas: u64,
370    pub flags: u32,
371    pub value: Vec<u8>,
372    pub datatype: u8,
373    pub server_duration: Option<Duration>,
374    pub expiry: u32,
375    pub seq_no: u64,
376    pub deleted: bool,
377}
378
379impl TryFromClientResponse for GetMetaResponse {
380    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
381        let packet = resp.packet();
382        let status = packet.status;
383
384        if status == Status::KeyNotFound {
385            return Err(ServerError::new(
386                ServerErrorKind::KeyNotFound,
387                packet.op_code,
388                packet.status,
389                packet.opaque,
390            )
391            .into());
392        } else if status != Status::Success {
393            return Err(OpsCrud::decode_common_error(&packet));
394        }
395
396        let server_duration = if let Some(f) = &packet.framing_extras {
397            decode_res_ext_frames(f)?
398        } else {
399            None
400        };
401
402        let value = packet.value.unwrap_or_default();
403
404        if let Some(extras) = &packet.extras {
405            if extras.len() != 21 {
406                return Err(Error::new_protocol_error("bad extras length"));
407            }
408
409            let mut extras = Cursor::new(extras);
410            let deleted = extras.read_u32::<BigEndian>()?;
411            let flags = extras.read_u32::<BigEndian>()?;
412            let expiry = extras.read_u32::<BigEndian>()?;
413            let seq_no = extras.read_u64::<BigEndian>()?;
414            let datatype = extras.read_u8()?;
415
416            Ok(GetMetaResponse {
417                cas: packet.cas.unwrap_or_default(),
418                flags,
419                value,
420                datatype,
421                server_duration,
422                expiry,
423                seq_no,
424                deleted: deleted != 0,
425            })
426        } else {
427            Err(Error::new_protocol_error("no extras in response"))
428        }
429    }
430}
431
432#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
433pub struct DeleteResponse {
434    pub cas: u64,
435    pub mutation_token: Option<MutationToken>,
436    pub server_duration: Option<Duration>,
437}
438
439impl TryFromClientResponse for DeleteResponse {
440    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
441        let packet = resp.packet();
442        let status = packet.status;
443
444        let kind = if status == Status::KeyNotFound {
445            Some(ServerErrorKind::KeyNotFound)
446        } else if status == Status::Locked {
447            Some(ServerErrorKind::Locked)
448        } else if status == Status::KeyExists {
449            Some(ServerErrorKind::CasMismatch)
450        } else if status == Status::Success {
451            None
452        } else {
453            return Err(OpsCrud::decode_common_mutation_error(&packet));
454        };
455
456        if let Some(kind) = kind {
457            return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
458        }
459
460        let mutation_token = if let Some(extras) = &packet.extras {
461            Some(MutationToken::try_from(extras)?)
462        } else {
463            None
464        };
465
466        let server_duration = if let Some(f) = &packet.framing_extras {
467            decode_res_ext_frames(f)?
468        } else {
469            None
470        };
471
472        Ok(DeleteResponse {
473            cas: packet.cas.unwrap_or_default(),
474            mutation_token,
475            server_duration,
476        })
477    }
478}
479
480#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
481pub struct GetAndLockResponse {
482    pub cas: u64,
483    pub flags: u32,
484    pub value: Vec<u8>,
485    pub datatype: u8,
486    pub server_duration: Option<Duration>,
487}
488
489impl TryFromClientResponse for GetAndLockResponse {
490    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
491        let packet = resp.packet();
492        let status = packet.status;
493
494        if status == Status::KeyNotFound {
495            return Err(ServerError::new(
496                ServerErrorKind::KeyNotFound,
497                packet.op_code,
498                packet.status,
499                packet.opaque,
500            )
501            .into());
502        } else if status == Status::Locked {
503            return Err(ServerError::new(
504                ServerErrorKind::Locked,
505                packet.op_code,
506                packet.status,
507                packet.opaque,
508            )
509            .into());
510        } else if status != Status::Success {
511            return Err(OpsCrud::decode_common_error(&packet));
512        }
513
514        let flags = parse_flags(&packet.extras)?;
515
516        let server_duration = if let Some(f) = &packet.framing_extras {
517            decode_res_ext_frames(f)?
518        } else {
519            None
520        };
521
522        let value = packet.value.unwrap_or_default();
523
524        Ok(GetAndLockResponse {
525            cas: packet.cas.unwrap_or_default(),
526            flags,
527            value,
528            datatype: packet.datatype,
529            server_duration,
530        })
531    }
532}
533
534#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
535pub struct GetAndTouchResponse {
536    pub cas: u64,
537    pub flags: u32,
538    pub value: Vec<u8>,
539    pub datatype: u8,
540    pub server_duration: Option<Duration>,
541}
542
543impl TryFromClientResponse for GetAndTouchResponse {
544    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
545        let packet = resp.packet();
546        let status = packet.status;
547
548        if status == Status::KeyNotFound {
549            return Err(ServerError::new(
550                ServerErrorKind::KeyNotFound,
551                packet.op_code,
552                packet.status,
553                packet.opaque,
554            )
555            .into());
556        } else if status == Status::Locked {
557            return Err(ServerError::new(
558                ServerErrorKind::Locked,
559                packet.op_code,
560                packet.status,
561                packet.opaque,
562            )
563            .into());
564        } else if status != Status::Success {
565            return Err(OpsCrud::decode_common_error(&packet));
566        }
567
568        let flags = parse_flags(&packet.extras)?;
569
570        let server_duration = if let Some(f) = &packet.framing_extras {
571            decode_res_ext_frames(f)?
572        } else {
573            None
574        };
575
576        let value = packet.value.unwrap_or_default();
577
578        Ok(GetAndTouchResponse {
579            cas: packet.cas.unwrap_or_default(),
580            flags,
581            value,
582            datatype: packet.datatype,
583            server_duration,
584        })
585    }
586}
587
588#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
589pub struct UnlockResponse {
590    pub server_duration: Option<Duration>,
591}
592
593impl TryFromClientResponse for UnlockResponse {
594    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
595        let packet = resp.packet();
596        let status = packet.status;
597
598        if status == Status::KeyNotFound {
599            return Err(ServerError::new(
600                ServerErrorKind::KeyNotFound,
601                packet.op_code,
602                packet.status,
603                packet.opaque,
604            )
605            .into());
606        } else if status == Status::Locked {
607            return Err(ServerError::new(
608                ServerErrorKind::CasMismatch,
609                packet.op_code,
610                packet.status,
611                packet.opaque,
612            )
613            .into());
614        } else if status == Status::NotLocked {
615            return Err(ServerError::new(
616                ServerErrorKind::NotLocked,
617                packet.op_code,
618                packet.status,
619                packet.opaque,
620            )
621            .into());
622        } else if status != Status::Success {
623            return Err(OpsCrud::decode_common_error(&packet));
624        }
625
626        let server_duration = if let Some(f) = &packet.framing_extras {
627            decode_res_ext_frames(f)?
628        } else {
629            None
630        };
631
632        Ok(UnlockResponse { server_duration })
633    }
634}
635
636#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
637pub struct TouchResponse {
638    pub cas: u64,
639    pub server_duration: Option<Duration>,
640}
641
642impl TryFromClientResponse for TouchResponse {
643    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
644        let packet = resp.packet();
645        let status = packet.status;
646
647        if status == Status::KeyNotFound {
648            return Err(ServerError::new(
649                ServerErrorKind::KeyNotFound,
650                packet.op_code,
651                packet.status,
652                packet.opaque,
653            )
654            .into());
655        } else if status == Status::Locked {
656            return Err(ServerError::new(
657                ServerErrorKind::Locked,
658                packet.op_code,
659                packet.status,
660                packet.opaque,
661            )
662            .into());
663        } else if status != Status::Success {
664            return Err(OpsCrud::decode_common_error(&packet));
665        }
666
667        if let Some(extras) = &packet.extras {
668            if !extras.is_empty() {
669                return Err(Error::new_protocol_error("bad extras length"));
670            }
671        }
672
673        let server_duration = if let Some(f) = &packet.framing_extras {
674            decode_res_ext_frames(f)?
675        } else {
676            None
677        };
678
679        Ok(TouchResponse {
680            cas: packet.cas.unwrap_or_default(),
681            server_duration,
682        })
683    }
684}
685
686#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
687pub struct AddResponse {
688    pub cas: u64,
689    pub mutation_token: Option<MutationToken>,
690    pub server_duration: Option<Duration>,
691}
692
693impl TryFromClientResponse for AddResponse {
694    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
695        let packet = resp.packet();
696        let status = packet.status;
697
698        let kind = if status == Status::TooBig {
699            Some(ServerErrorKind::TooBig)
700        } else if status == Status::Locked {
701            Some(ServerErrorKind::Locked)
702        } else if status == Status::KeyExists {
703            Some(ServerErrorKind::KeyExists)
704        } else if status == Status::Success {
705            None
706        } else {
707            return Err(OpsCrud::decode_common_mutation_error(&packet));
708        };
709
710        if let Some(kind) = kind {
711            return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
712        }
713
714        let mutation_token = if let Some(extras) = &packet.extras {
715            Some(MutationToken::try_from(extras)?)
716        } else {
717            None
718        };
719
720        let server_duration = if let Some(f) = &packet.framing_extras {
721            decode_res_ext_frames(f)?
722        } else {
723            None
724        };
725
726        Ok(AddResponse {
727            cas: packet.cas.unwrap_or_default(),
728            mutation_token,
729            server_duration,
730        })
731    }
732}
733
734#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
735pub struct ReplaceResponse {
736    pub cas: u64,
737    pub mutation_token: Option<MutationToken>,
738    pub server_duration: Option<Duration>,
739}
740
741impl TryFromClientResponse for ReplaceResponse {
742    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
743        let packet = resp.packet();
744        let status = packet.status;
745
746        let kind = if status == Status::TooBig {
747            Some(ServerErrorKind::TooBig)
748        } else if status == Status::KeyNotFound {
749            Some(ServerErrorKind::KeyNotFound)
750        } else if status == Status::Locked {
751            Some(ServerErrorKind::Locked)
752        } else if status == Status::KeyExists {
753            Some(ServerErrorKind::CasMismatch)
754        } else if status == Status::Success {
755            None
756        } else {
757            return Err(OpsCrud::decode_common_mutation_error(&packet));
758        };
759
760        if let Some(kind) = kind {
761            return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
762        }
763
764        let mutation_token = if let Some(extras) = &packet.extras {
765            Some(MutationToken::try_from(extras)?)
766        } else {
767            None
768        };
769
770        let server_duration = if let Some(f) = &packet.framing_extras {
771            decode_res_ext_frames(f)?
772        } else {
773            None
774        };
775
776        Ok(ReplaceResponse {
777            cas: packet.cas.unwrap_or_default(),
778            mutation_token,
779            server_duration,
780        })
781    }
782}
783
784#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
785pub struct AppendResponse {
786    pub cas: u64,
787    pub mutation_token: Option<MutationToken>,
788    pub server_duration: Option<Duration>,
789}
790
791impl TryFromClientResponse for AppendResponse {
792    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
793        let cas = resp.response_context().cas;
794        let packet = resp.packet();
795        let status = packet.status;
796
797        let kind = if status == Status::TooBig {
798            Some(ServerErrorKind::TooBig)
799        } else if status == Status::NotStored {
800            Some(ServerErrorKind::NotStored)
801        } else if status == Status::Locked {
802            Some(ServerErrorKind::Locked)
803        } else if status == Status::KeyExists && cas.is_some() {
804            // KeyExists without a request cas would be an odd error to receive so we don't
805            // handle that case.
806            Some(ServerErrorKind::CasMismatch)
807        } else if status == Status::Success {
808            None
809        } else {
810            return Err(OpsCrud::decode_common_mutation_error(&packet));
811        };
812
813        if let Some(kind) = kind {
814            return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
815        }
816
817        let mutation_token = if let Some(extras) = &packet.extras {
818            Some(MutationToken::try_from(extras)?)
819        } else {
820            None
821        };
822
823        let server_duration = if let Some(f) = &packet.framing_extras {
824            decode_res_ext_frames(f)?
825        } else {
826            None
827        };
828
829        Ok(AppendResponse {
830            cas: packet.cas.unwrap_or_default(),
831            mutation_token,
832            server_duration,
833        })
834    }
835}
836
837#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
838pub struct PrependResponse {
839    pub cas: u64,
840    pub mutation_token: Option<MutationToken>,
841    pub server_duration: Option<Duration>,
842}
843
844impl TryFromClientResponse for PrependResponse {
845    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
846        let cas = resp.response_context().cas;
847        let packet = resp.packet();
848        let status = packet.status;
849
850        let kind = if status == Status::TooBig {
851            Some(ServerErrorKind::TooBig)
852        } else if status == Status::NotStored {
853            Some(ServerErrorKind::NotStored)
854        } else if status == Status::Locked {
855            Some(ServerErrorKind::Locked)
856        } else if status == Status::KeyExists && cas.is_some() {
857            // KeyExists without a request cas would be an odd error to receive so we don't
858            // handle that case.
859            Some(ServerErrorKind::CasMismatch)
860        } else if status == Status::Success {
861            None
862        } else {
863            return Err(OpsCrud::decode_common_mutation_error(&packet));
864        };
865
866        if let Some(kind) = kind {
867            return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
868        }
869
870        let mutation_token = if let Some(extras) = &packet.extras {
871            Some(MutationToken::try_from(extras)?)
872        } else {
873            None
874        };
875
876        let server_duration = if let Some(f) = &packet.framing_extras {
877            decode_res_ext_frames(f)?
878        } else {
879            None
880        };
881
882        Ok(PrependResponse {
883            cas: packet.cas.unwrap_or_default(),
884            mutation_token,
885            server_duration,
886        })
887    }
888}
889
890#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
891pub struct IncrementResponse {
892    pub cas: u64,
893    pub value: u64,
894    pub mutation_token: Option<MutationToken>,
895    pub server_duration: Option<Duration>,
896}
897
898impl TryFromClientResponse for IncrementResponse {
899    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
900        let packet = resp.packet();
901        let status = packet.status;
902
903        let kind = if status == Status::KeyNotFound {
904            Some(ServerErrorKind::KeyNotFound)
905        } else if status == Status::Locked {
906            Some(ServerErrorKind::Locked)
907        } else if status == Status::BadDelta {
908            Some(ServerErrorKind::BadDelta)
909        } else if status == Status::Success {
910            None
911        } else {
912            return Err(OpsCrud::decode_common_mutation_error(&packet));
913        };
914
915        if let Some(kind) = kind {
916            return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
917        }
918
919        let value = if let Some(val) = &packet.value {
920            if val.len() != 8 {
921                return Err(Error::new_protocol_error(
922                    "bad counter value length in response",
923                ));
924            }
925
926            u64::from_be_bytes(val.as_slice().try_into().unwrap())
927        } else {
928            0
929        };
930
931        let mutation_token = if let Some(extras) = &packet.extras {
932            Some(MutationToken::try_from(extras)?)
933        } else {
934            None
935        };
936
937        let server_duration = if let Some(f) = &packet.framing_extras {
938            decode_res_ext_frames(f)?
939        } else {
940            None
941        };
942
943        Ok(IncrementResponse {
944            cas: packet.cas.unwrap_or_default(),
945            value,
946            mutation_token,
947            server_duration,
948        })
949    }
950}
951
952#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
953pub struct DecrementResponse {
954    pub cas: u64,
955    pub value: u64,
956    pub mutation_token: Option<MutationToken>,
957    pub server_duration: Option<Duration>,
958}
959
960impl TryFromClientResponse for DecrementResponse {
961    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
962        let packet = resp.packet();
963        let status = packet.status;
964
965        let kind = if status == Status::KeyNotFound {
966            Some(ServerErrorKind::KeyNotFound)
967        } else if status == Status::Locked {
968            Some(ServerErrorKind::Locked)
969        } else if status == Status::BadDelta {
970            Some(ServerErrorKind::BadDelta)
971        } else if status == Status::Success {
972            None
973        } else {
974            return Err(OpsCrud::decode_common_mutation_error(&packet));
975        };
976
977        if let Some(kind) = kind {
978            return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
979        }
980
981        let value = if let Some(val) = &packet.value {
982            if val.len() != 8 {
983                return Err(Error::new_protocol_error(
984                    "bad counter value length in response",
985                ));
986            }
987
988            u64::from_be_bytes(val.as_slice().try_into().unwrap())
989        } else {
990            0
991        };
992
993        let mutation_token = if let Some(extras) = &packet.extras {
994            Some(MutationToken::try_from(extras)?)
995        } else {
996            None
997        };
998
999        let server_duration = if let Some(f) = &packet.framing_extras {
1000            decode_res_ext_frames(f)?
1001        } else {
1002            None
1003        };
1004
1005        Ok(DecrementResponse {
1006            cas: packet.cas.unwrap_or_default(),
1007            value,
1008            mutation_token,
1009            server_duration,
1010        })
1011    }
1012}
1013
1014pub struct LookupInResponse {
1015    pub cas: u64,
1016    pub ops: Vec<SubDocResult>,
1017    pub doc_is_deleted: bool,
1018    pub server_duration: Option<Duration>,
1019}
1020
1021impl TryFromClientResponse for LookupInResponse {
1022    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
1023        let subdoc_info = resp
1024            .response_context()
1025            .subdoc_info
1026            .expect("missing subdoc info in response context");
1027        let packet = resp.packet();
1028        let cas = packet.cas;
1029        let status = packet.status;
1030
1031        if status == Status::KeyNotFound {
1032            return Err(ServerError::new(
1033                ServerErrorKind::KeyNotFound,
1034                packet.op_code,
1035                packet.status,
1036                packet.opaque,
1037            )
1038            .into());
1039        } else if status == Status::Locked {
1040            return Err(ServerError::new(
1041                ServerErrorKind::Locked,
1042                packet.op_code,
1043                packet.status,
1044                packet.opaque,
1045            )
1046            .into());
1047        } else if status == Status::SubDocInvalidCombo {
1048            return Err(ServerError::new(
1049                ServerErrorKind::Subdoc {
1050                    error: SubdocError::new(SubdocErrorKind::InvalidCombo, None),
1051                },
1052                packet.op_code,
1053                packet.status,
1054                packet.opaque,
1055            )
1056            .into());
1057        } else if status == Status::SubDocInvalidXattrOrder {
1058            return Err(ServerError::new(
1059                ServerErrorKind::Subdoc {
1060                    error: SubdocError::new(SubdocErrorKind::InvalidXattrOrder, None),
1061                },
1062                packet.op_code,
1063                packet.status,
1064                packet.opaque,
1065            )
1066            .into());
1067        } else if status == Status::SubDocXattrInvalidKeyCombo {
1068            return Err(ServerError::new(
1069                ServerErrorKind::Subdoc {
1070                    error: SubdocError::new(SubdocErrorKind::XattrInvalidKeyCombo, None),
1071                },
1072                packet.op_code,
1073                packet.status,
1074                packet.opaque,
1075            )
1076            .into());
1077        } else if status == Status::SubDocXattrInvalidFlagCombo {
1078            return Err(ServerError::new(
1079                ServerErrorKind::Subdoc {
1080                    error: SubdocError::new(SubdocErrorKind::XattrInvalidFlagCombo, None),
1081                },
1082                packet.op_code,
1083                packet.status,
1084                packet.opaque,
1085            )
1086            .into());
1087        }
1088
1089        let mut doc_is_deleted = false;
1090
1091        if status == Status::SubDocSuccessDeleted || status == Status::SubDocMultiPathFailureDeleted
1092        {
1093            doc_is_deleted = true;
1094            // still considered a success
1095        } else if status != Status::Success && status != Status::SubDocMultiPathFailure {
1096            return Err(OpsCrud::decode_common_error(&packet));
1097        }
1098
1099        let mut results: Vec<SubDocResult> = Vec::with_capacity(subdoc_info.op_count as usize);
1100        let mut op_index = 0;
1101
1102        let value = packet
1103            .value
1104            .as_ref()
1105            .ok_or_else(|| Error::new_protocol_error("missing value"))?;
1106
1107        let mut cursor = Cursor::new(value);
1108        while cursor.position() < cursor.get_ref().len() as u64 {
1109            if cursor.remaining() < 6 {
1110                return Err(Error::new_protocol_error("bad value length"));
1111            }
1112
1113            let res_status = cursor.read_u16::<BigEndian>()?;
1114            let res_status = Status::from(res_status);
1115            let res_value_len = cursor.read_u32::<BigEndian>()?;
1116
1117            if cursor.remaining() < res_value_len as usize {
1118                return Err(Error::new_protocol_error("bad value length"));
1119            }
1120
1121            let value = if res_value_len > 0 {
1122                let mut tmp_val = vec![0; res_value_len as usize];
1123                cursor.read_exact(&mut tmp_val)?;
1124                Some(tmp_val)
1125            } else {
1126                None
1127            };
1128
1129            let err_kind: Option<SubdocErrorKind> = match res_status {
1130                Status::Success => None,
1131                Status::SubDocDocTooDeep => Some(SubdocErrorKind::DocTooDeep),
1132                Status::SubDocNotJSON => Some(SubdocErrorKind::NotJSON),
1133                Status::SubDocPathNotFound => Some(SubdocErrorKind::PathNotFound),
1134                Status::SubDocPathMismatch => Some(SubdocErrorKind::PathMismatch),
1135                Status::SubDocPathInvalid => Some(SubdocErrorKind::PathInvalid),
1136                Status::SubDocPathTooBig => Some(SubdocErrorKind::PathTooBig),
1137                Status::SubDocXattrUnknownVAttr => Some(SubdocErrorKind::XattrUnknownVAttr),
1138                _ => Some(SubdocErrorKind::UnknownStatus { status: res_status }),
1139            };
1140
1141            let err = err_kind.map(|kind| {
1142                ServerError::new(
1143                    ServerErrorKind::Subdoc {
1144                        error: SubdocError::new(kind, op_index),
1145                    },
1146                    packet.op_code,
1147                    packet.status,
1148                    packet.opaque,
1149                )
1150                .into()
1151            });
1152
1153            results.push(SubDocResult { value, err });
1154            op_index += 1;
1155        }
1156
1157        let server_duration = if let Some(f) = &packet.framing_extras {
1158            decode_res_ext_frames(f)?
1159        } else {
1160            None
1161        };
1162
1163        Ok(LookupInResponse {
1164            cas: cas.unwrap_or_default(),
1165            ops: results,
1166            doc_is_deleted,
1167            server_duration,
1168        })
1169    }
1170}
1171
1172pub struct MutateInResponse {
1173    pub cas: u64,
1174    pub ops: Vec<SubDocResult>,
1175    pub doc_is_deleted: bool,
1176    pub mutation_token: Option<MutationToken>,
1177    pub server_duration: Option<Duration>,
1178}
1179
1180impl TryFromClientResponse for MutateInResponse {
1181    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
1182        let subdoc_info = resp
1183            .response_context()
1184            .subdoc_info
1185            .expect("missing subdoc info in response context");
1186
1187        let packet = resp.packet();
1188        let cas = packet.cas;
1189        let status = packet.status;
1190
1191        let kind = if status == Status::KeyNotFound {
1192            Some(ServerErrorKind::KeyNotFound)
1193        } else if status == Status::KeyExists && cas.is_some() {
1194            Some(ServerErrorKind::CasMismatch)
1195        } else if status == Status::KeyExists {
1196            Some(ServerErrorKind::KeyExists)
1197        } else if status == Status::Locked {
1198            Some(ServerErrorKind::Locked)
1199        } else if status == Status::TooBig {
1200            Some(ServerErrorKind::TooBig)
1201        } else if status == Status::SubDocInvalidCombo {
1202            Some(ServerErrorKind::Subdoc {
1203                error: SubdocError::new(SubdocErrorKind::InvalidCombo, None),
1204            })
1205        } else if status == Status::SubDocInvalidXattrOrder {
1206            Some(ServerErrorKind::Subdoc {
1207                error: SubdocError::new(SubdocErrorKind::InvalidXattrOrder, None),
1208            })
1209        } else if status == Status::SubDocXattrInvalidKeyCombo {
1210            Some(ServerErrorKind::Subdoc {
1211                error: SubdocError::new(SubdocErrorKind::XattrInvalidKeyCombo, None),
1212            })
1213        } else if status == Status::SubDocXattrInvalidFlagCombo {
1214            Some(ServerErrorKind::Subdoc {
1215                error: SubdocError::new(SubdocErrorKind::XattrInvalidFlagCombo, None),
1216            })
1217        } else if status == Status::SubDocXattrUnknownMacro {
1218            Some(ServerErrorKind::Subdoc {
1219                error: SubdocError::new(SubdocErrorKind::XattrUnknownMacro, None),
1220            })
1221        } else if status == Status::SubDocXattrUnknownVattrMacro {
1222            Some(ServerErrorKind::Subdoc {
1223                error: SubdocError::new(SubdocErrorKind::XattrUnknownVattrMacro, None),
1224            })
1225        } else if status == Status::SubDocXattrCannotModifyVAttr {
1226            Some(ServerErrorKind::Subdoc {
1227                error: SubdocError::new(SubdocErrorKind::XattrCannotModifyVAttr, None),
1228            })
1229        } else if status == Status::SubDocCanOnlyReviveDeletedDocuments {
1230            Some(ServerErrorKind::Subdoc {
1231                error: SubdocError::new(SubdocErrorKind::CanOnlyReviveDeletedDocuments, None),
1232            })
1233        } else if status == Status::SubDocDeletedDocumentCantHaveValue {
1234            Some(ServerErrorKind::Subdoc {
1235                error: SubdocError::new(SubdocErrorKind::DeletedDocumentCantHaveValue, None),
1236            })
1237        } else if status == Status::NotStored {
1238            if subdoc_info.flags.contains(SubdocDocFlag::AddDoc) {
1239                Some(ServerErrorKind::KeyExists)
1240            } else {
1241                Some(ServerErrorKind::NotStored)
1242            }
1243        } else if status == Status::SubDocMultiPathFailure {
1244            if let Some(value) = &packet.value {
1245                if value.len() != 3 {
1246                    return Err(Error::new_protocol_error("bad value length"));
1247                }
1248
1249                let mut cursor = Cursor::new(value);
1250                let op_index = cursor.read_u8()?;
1251                let res_status = cursor.read_u16::<BigEndian>()?;
1252
1253                let res_status = Status::from(res_status);
1254
1255                let err_kind: SubdocErrorKind = match res_status {
1256                    Status::SubDocDocTooDeep => SubdocErrorKind::DocTooDeep,
1257                    Status::SubDocNotJSON => SubdocErrorKind::NotJSON,
1258                    Status::SubDocPathNotFound => SubdocErrorKind::PathNotFound,
1259                    Status::SubDocPathMismatch => SubdocErrorKind::PathMismatch,
1260                    Status::SubDocPathInvalid => SubdocErrorKind::PathInvalid,
1261                    Status::SubDocPathTooBig => SubdocErrorKind::PathTooBig,
1262                    Status::SubDocPathExists => SubdocErrorKind::PathExists,
1263                    Status::SubDocCantInsert => SubdocErrorKind::CantInsert,
1264                    Status::SubDocBadRange => SubdocErrorKind::BadRange,
1265                    Status::SubDocBadDelta => SubdocErrorKind::BadDelta,
1266                    Status::SubDocValueTooDeep => SubdocErrorKind::ValueTooDeep,
1267                    _ => SubdocErrorKind::UnknownStatus { status: res_status },
1268                };
1269
1270                Some(ServerErrorKind::Subdoc {
1271                    error: SubdocError::new(err_kind, Some(op_index)),
1272                })
1273            } else {
1274                return Err(Error::new_protocol_error("bad value length"));
1275            }
1276        } else {
1277            None
1278        };
1279
1280        if let Some(kind) = kind {
1281            return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
1282        }
1283
1284        let mut doc_is_deleted = false;
1285        if status == Status::SubDocSuccessDeleted {
1286            doc_is_deleted = true;
1287            // still considered a success
1288        } else if status != Status::Success && status != Status::SubDocMultiPathFailure {
1289            return Err(OpsCrud::decode_common_mutation_error(&packet));
1290        }
1291
1292        let mut results: Vec<SubDocResult> = Vec::with_capacity(subdoc_info.op_count as usize);
1293
1294        if let Some(value) = &packet.value {
1295            let mut cursor = Cursor::new(value);
1296
1297            while cursor.position() < cursor.get_ref().len() as u64 {
1298                if cursor.remaining() < 3 {
1299                    return Err(Error::new_protocol_error("bad value length"));
1300                }
1301
1302                let op_index = cursor.read_u8()?;
1303
1304                if op_index > results.len() as u8 {
1305                    for _ in results.len() as u8..op_index {
1306                        results.push(SubDocResult {
1307                            err: None,
1308                            value: None,
1309                        });
1310                    }
1311                }
1312
1313                let op_status = cursor.read_u16::<BigEndian>()?;
1314                let op_status = Status::from(op_status);
1315
1316                if op_status == Status::Success {
1317                    let val_length = cursor.read_u32::<BigEndian>()?;
1318
1319                    let mut value = vec![0; val_length as usize];
1320                    cursor.read_exact(&mut value)?;
1321
1322                    results.push(SubDocResult {
1323                        err: None,
1324                        value: Some(value),
1325                    });
1326                } else {
1327                    return Err(Error::new_protocol_error(
1328                        "subdoc mutatein op illegally provided an error",
1329                    ));
1330                }
1331            }
1332        }
1333
1334        if results.len() < subdoc_info.op_count as usize {
1335            for _ in results.len()..subdoc_info.op_count as usize {
1336                results.push(SubDocResult {
1337                    err: None,
1338                    value: None,
1339                });
1340            }
1341        }
1342
1343        let mutation_token = if let Some(extras) = &packet.extras {
1344            if extras.len() != 16 {
1345                return Err(Error::new_protocol_error("bad extras length"));
1346            }
1347
1348            let (vbuuid_bytes, seqno_bytes) = extras.split_at(size_of::<u64>());
1349            let vbuuid = u64::from_be_bytes(vbuuid_bytes.try_into().unwrap());
1350            let seqno = u64::from_be_bytes(seqno_bytes.try_into().unwrap());
1351
1352            Some(MutationToken { vbuuid, seqno })
1353        } else {
1354            None
1355        };
1356
1357        let server_duration = if let Some(f) = &packet.framing_extras {
1358            decode_res_ext_frames(f)?
1359        } else {
1360            None
1361        };
1362
1363        Ok(MutateInResponse {
1364            cas: cas.unwrap_or_default(),
1365            ops: results,
1366            mutation_token,
1367            doc_is_deleted,
1368            server_duration,
1369        })
1370    }
1371}
1372
1373#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
1374pub struct GetCollectionIdResponse {
1375    pub manifest_rev: u64,
1376    pub collection_id: u32,
1377}
1378
1379impl TryFromClientResponse for GetCollectionIdResponse {
1380    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
1381        let context = resp.response_context();
1382        let scope_name = context.scope_name.as_ref().expect("missing scope name");
1383        let collection_name = context
1384            .collection_name
1385            .as_ref()
1386            .expect("missing collection name");
1387        let packet = resp.packet();
1388        let status = packet.status;
1389
1390        if status == Status::ScopeUnknown {
1391            return Err(ResourceError::new(
1392                ServerError::new(
1393                    ServerErrorKind::UnknownScopeName,
1394                    packet.op_code,
1395                    packet.status,
1396                    packet.opaque,
1397                ),
1398                scope_name,
1399                collection_name,
1400            )
1401            .into());
1402        } else if status == Status::CollectionUnknown {
1403            return Err(ResourceError::new(
1404                ServerError::new(
1405                    ServerErrorKind::UnknownCollectionName,
1406                    packet.op_code,
1407                    packet.status,
1408                    packet.opaque,
1409                ),
1410                scope_name,
1411                collection_name,
1412            )
1413            .into());
1414        } else if status != Status::Success {
1415            return Err(OpsCore::decode_error(&packet));
1416        }
1417
1418        let extras = if let Some(extras) = &packet.extras {
1419            if extras.len() != 12 {
1420                return Err(Error::new_protocol_error("invalid extras length"));
1421            }
1422            extras
1423        } else {
1424            return Err(Error::new_protocol_error("no extras in response"));
1425        };
1426
1427        let mut extras = Cursor::new(extras);
1428        let manifest_rev = extras.read_u64::<BigEndian>()?;
1429        let collection_id = extras.read_u32::<BigEndian>()?;
1430
1431        Ok(GetCollectionIdResponse {
1432            manifest_rev,
1433            collection_id,
1434        })
1435    }
1436}
1437
1438#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
1439pub struct PingResponse {}
1440
1441impl TryFromClientResponse for PingResponse {
1442    fn try_from(resp: ClientResponse) -> Result<Self, Error> {
1443        let packet = resp.packet();
1444        let status = packet.status;
1445
1446        if status != Status::Success {
1447            return Err(OpsCore::decode_error(&packet));
1448        }
1449
1450        Ok(PingResponse {})
1451    }
1452}