1use std::io::{Cursor, Read};
20use std::time::Duration;
21
22use bytes::Bytes;
23
24use crate::memdx::auth_mechanism::AuthMechanism;
25use crate::memdx::client_response::ClientResponse;
26use crate::memdx::error::{
27 Error, ResourceError, ServerError, ServerErrorKind, SubdocError, SubdocErrorKind,
28};
29use crate::memdx::extframe::decode_res_ext_frames;
30use crate::memdx::hello_feature::HelloFeature;
31use crate::memdx::ops_core::OpsCore;
32use crate::memdx::ops_crud::OpsCrud;
33use crate::memdx::status::Status;
34use crate::memdx::subdoc::{SubDocResult, SubdocDocFlag};
35use byteorder::{BigEndian, ReadBytesExt};
36use bytes::Buf;
37
38pub trait TryFromClientResponse: Sized {
39 fn try_from(resp: ClientResponse) -> Result<Self, Error>;
40}
41
42pub trait TraceAttributes {
43 fn server_duration(&self) -> Option<Duration>;
44}
45
46#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
47pub struct HelloResponse {
48 pub enabled_features: Vec<HelloFeature>,
49}
50
51impl TryFromClientResponse for HelloResponse {
52 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
53 let packet = resp.packet();
54 let status = packet.status;
55 if status != Status::Success {
56 return Err(OpsCore::decode_error(&packet));
57 }
58
59 let mut features: Vec<HelloFeature> = Vec::new();
60 if let Some(value) = &packet.value {
61 if value.len() % 2 != 0 {
62 return Err(Error::new_protocol_error("invalid hello features length"));
63 }
64
65 let mut cursor = Cursor::new(value);
66 while let Ok(code) = cursor.read_u16::<BigEndian>() {
67 features.push(HelloFeature::from(code));
68 }
69 }
70 let response = HelloResponse {
71 enabled_features: features,
72 };
73
74 Ok(response)
75 }
76}
77
78#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
79pub struct GetErrorMapResponse {
80 pub error_map: Bytes,
81}
82
83impl TryFromClientResponse for GetErrorMapResponse {
84 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
85 let packet = resp.packet();
86 let status = packet.status;
87 if status != Status::Success {
88 return Err(OpsCore::decode_error(&packet));
89 }
90
91 let value = packet.value.unwrap_or_default();
92 let response = GetErrorMapResponse { error_map: value };
93
94 Ok(response)
95 }
96}
97
98#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
99pub struct SelectBucketResponse {}
100
101impl TryFromClientResponse for SelectBucketResponse {
102 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
103 let packet = resp.packet();
104 let status = packet.status;
105 if status != Status::Success {
106 if status == Status::AccessError || status == Status::KeyNotFound {
107 return Err(ServerError::new(
108 ServerErrorKind::UnknownBucketName,
109 packet.op_code,
110 status,
111 packet.opaque,
112 )
113 .into());
114 }
115 return Err(OpsCore::decode_error(&packet));
116 }
117
118 Ok(SelectBucketResponse {})
119 }
120}
121
122#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
123pub struct SASLAuthResponse {
124 pub needs_more_steps: bool,
125 pub payload: Bytes,
126}
127
128impl TryFromClientResponse for SASLAuthResponse {
129 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
130 let packet = resp.packet();
131 let status = packet.status;
132 if status == Status::AuthContinue {
133 return Ok(SASLAuthResponse {
134 needs_more_steps: true,
135 payload: packet.value.unwrap_or_default(),
136 });
137 }
138
139 if status != Status::Success {
140 return Err(OpsCore::decode_error(&packet));
141 }
142
143 Ok(SASLAuthResponse {
144 needs_more_steps: false,
145 payload: packet.value.unwrap_or_default(),
146 })
147 }
148}
149
150#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
151pub struct SASLStepResponse {
152 pub needs_more_steps: bool,
153 pub payload: Bytes,
154}
155
156impl TryFromClientResponse for SASLStepResponse {
157 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
158 let packet = resp.packet();
159 let status = packet.status;
160 if status != Status::Success {
161 return Err(OpsCore::decode_error(&packet));
162 }
163
164 Ok(SASLStepResponse {
165 needs_more_steps: false,
166 payload: packet.value.unwrap_or_default(),
167 })
168 }
169}
170
171#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
172pub struct SASLListMechsResponse {
173 pub available_mechs: Vec<AuthMechanism>,
174}
175
176impl TryFromClientResponse for SASLListMechsResponse {
177 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
178 let packet = resp.packet();
179 let status = packet.status;
180 if status != Status::Success {
181 if status == Status::KeyNotFound {
182 return Err(ServerError::new(
187 ServerErrorKind::ConfigNotSet,
188 packet.op_code,
189 status,
190 packet.opaque,
191 )
192 .into());
193 }
194 return Err(OpsCore::decode_error(&packet));
195 }
196
197 let value = packet.value.unwrap_or_default();
198 let mechs_list_string = match String::from_utf8(value.to_vec()) {
199 Ok(v) => v,
200 Err(e) => {
201 return Err(Error::new_protocol_error(
202 "failed to parse authentication mechanism list",
203 )
204 .with(e));
205 }
206 };
207 let mechs_list_split = mechs_list_string.split(' ');
208 let mut mechs_list = Vec::new();
209 for item in mechs_list_split {
210 mechs_list.push(AuthMechanism::try_from(item)?);
211 }
212
213 Ok(SASLListMechsResponse {
214 available_mechs: mechs_list,
215 })
216 }
217}
218
219#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
220pub struct GetClusterConfigResponse {
221 pub config: Bytes,
222}
223
224impl TryFromClientResponse for GetClusterConfigResponse {
225 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
226 let packet = resp.packet();
227 let status = packet.status;
228 if status != Status::Success {
229 return Err(OpsCore::decode_error(&packet));
230 }
231
232 Ok(GetClusterConfigResponse {
233 config: packet.value.unwrap_or_default(),
234 })
235 }
236}
237
238#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
239pub struct BootstrapResult {
240 pub hello: Option<HelloResponse>,
241 pub error_map: Option<GetErrorMapResponse>,
242 pub cluster_config: Option<GetClusterConfigResponse>,
243}
244
245#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
246pub struct MutationToken {
247 pub vbuuid: u64,
248 pub seqno: u64,
249}
250
251impl TryFrom<&[u8]> for MutationToken {
252 type Error = Error;
253
254 fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
255 if value.len() != 16 {
256 return Err(Error::new_protocol_error("bad extras length"));
257 }
258
259 let (vbuuid_bytes, seqno_bytes) = value.split_at(size_of::<u64>());
260 let vbuuid = u64::from_be_bytes(vbuuid_bytes.try_into().unwrap());
261 let seqno = u64::from_be_bytes(seqno_bytes.try_into().unwrap());
262
263 Ok(MutationToken { vbuuid, seqno })
264 }
265}
266
267#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
268pub struct SetResponse {
269 pub cas: u64,
270 pub mutation_token: Option<MutationToken>,
271 pub server_duration: Option<Duration>,
272}
273
274impl TryFromClientResponse for SetResponse {
275 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
276 let packet = resp.packet();
277 let status = packet.status;
278
279 let kind = if status == Status::TooBig {
280 Some(ServerErrorKind::TooBig)
281 } else if status == Status::Locked {
282 Some(ServerErrorKind::Locked)
283 } else if status == Status::KeyExists {
284 Some(ServerErrorKind::CasMismatch)
285 } else if status == Status::Success {
286 None
287 } else {
288 return Err(OpsCrud::decode_common_mutation_error(&packet));
289 };
290
291 if let Some(kind) = kind {
292 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
293 }
294
295 let mutation_token = if let Some(extras) = &packet.extras {
296 Some(MutationToken::try_from(extras.as_ref())?)
297 } else {
298 None
299 };
300
301 let server_duration = if let Some(f) = &packet.framing_extras {
302 decode_res_ext_frames(f)?
303 } else {
304 None
305 };
306
307 Ok(SetResponse {
308 cas: packet.cas.unwrap_or_default(),
309 mutation_token,
310 server_duration,
311 })
312 }
313}
314
315impl TraceAttributes for SetResponse {
316 fn server_duration(&self) -> Option<Duration> {
317 self.server_duration
318 }
319}
320
321fn parse_flags(extras: &Option<Bytes>) -> Result<u32, Error> {
322 if let Some(extras) = &extras {
323 if extras.len() != 4 {
324 return Err(Error::new_protocol_error("bad extras length reading flags"));
325 }
326
327 Ok(u32::from_be_bytes(extras[..].try_into().unwrap()))
328 } else {
329 Err(Error::new_protocol_error("no extras in response"))
330 }
331}
332
333#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
334pub struct GetResponse {
335 pub cas: u64,
336 pub flags: u32,
337 pub value: Bytes,
338 pub datatype: u8,
339 pub server_duration: Option<Duration>,
340}
341
342impl TryFromClientResponse for GetResponse {
343 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
344 let packet = resp.packet();
345 let status = packet.status;
346
347 if status == Status::KeyNotFound {
348 return Err(ServerError::new(
349 ServerErrorKind::KeyNotFound,
350 packet.op_code,
351 packet.status,
352 packet.opaque,
353 )
354 .into());
355 } else if status != Status::Success {
356 return Err(OpsCrud::decode_common_error(&packet));
357 }
358
359 let flags = parse_flags(&packet.extras)?;
360
361 let server_duration = if let Some(f) = &packet.framing_extras {
362 decode_res_ext_frames(f)?
363 } else {
364 None
365 };
366
367 let value = packet.value.unwrap_or_default();
368
369 Ok(GetResponse {
370 cas: packet.cas.unwrap_or_default(),
371 flags,
372 value,
373 datatype: packet.datatype,
374 server_duration,
375 })
376 }
377}
378
379impl TraceAttributes for GetResponse {
380 fn server_duration(&self) -> Option<Duration> {
381 self.server_duration
382 }
383}
384
385#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
386pub struct GetMetaResponse {
387 pub cas: u64,
388 pub flags: u32,
389 pub value: Bytes,
390 pub datatype: u8,
391 pub server_duration: Option<Duration>,
392 pub expiry: u32,
393 pub seq_no: u64,
394 pub deleted: bool,
395}
396
397impl TryFromClientResponse for GetMetaResponse {
398 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
399 let packet = resp.packet();
400 let status = packet.status;
401
402 if status == Status::KeyNotFound {
403 return Err(ServerError::new(
404 ServerErrorKind::KeyNotFound,
405 packet.op_code,
406 packet.status,
407 packet.opaque,
408 )
409 .into());
410 } else if status != Status::Success {
411 return Err(OpsCrud::decode_common_error(&packet));
412 }
413
414 let server_duration = if let Some(f) = &packet.framing_extras {
415 decode_res_ext_frames(f)?
416 } else {
417 None
418 };
419
420 let value = packet.value.unwrap_or_default();
421
422 if let Some(extras) = &packet.extras {
423 if extras.len() != 21 {
424 return Err(Error::new_protocol_error("bad extras length"));
425 }
426
427 let mut extras = Cursor::new(extras);
428 let deleted = extras.read_u32::<BigEndian>()?;
429 let flags = extras.read_u32::<BigEndian>()?;
430 let expiry = extras.read_u32::<BigEndian>()?;
431 let seq_no = extras.read_u64::<BigEndian>()?;
432 let datatype = extras.read_u8()?;
433
434 Ok(GetMetaResponse {
435 cas: packet.cas.unwrap_or_default(),
436 flags,
437 value,
438 datatype,
439 server_duration,
440 expiry,
441 seq_no,
442 deleted: deleted != 0,
443 })
444 } else {
445 Err(Error::new_protocol_error("no extras in response"))
446 }
447 }
448}
449
450#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
451pub struct DeleteResponse {
452 pub cas: u64,
453 pub mutation_token: Option<MutationToken>,
454 pub server_duration: Option<Duration>,
455}
456
457impl TryFromClientResponse for DeleteResponse {
458 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
459 let packet = resp.packet();
460 let status = packet.status;
461
462 let kind = if status == Status::KeyNotFound {
463 Some(ServerErrorKind::KeyNotFound)
464 } else if status == Status::Locked {
465 Some(ServerErrorKind::Locked)
466 } else if status == Status::KeyExists {
467 Some(ServerErrorKind::CasMismatch)
468 } else if status == Status::Success {
469 None
470 } else {
471 return Err(OpsCrud::decode_common_mutation_error(&packet));
472 };
473
474 if let Some(kind) = kind {
475 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
476 }
477
478 let mutation_token = if let Some(extras) = &packet.extras {
479 Some(MutationToken::try_from(extras.as_ref())?)
480 } else {
481 None
482 };
483
484 let server_duration = if let Some(f) = &packet.framing_extras {
485 decode_res_ext_frames(f)?
486 } else {
487 None
488 };
489
490 Ok(DeleteResponse {
491 cas: packet.cas.unwrap_or_default(),
492 mutation_token,
493 server_duration,
494 })
495 }
496}
497
498impl TraceAttributes for DeleteResponse {
499 fn server_duration(&self) -> Option<Duration> {
500 self.server_duration
501 }
502}
503
504#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
505pub struct GetAndLockResponse {
506 pub cas: u64,
507 pub flags: u32,
508 pub value: Bytes,
509 pub datatype: u8,
510 pub server_duration: Option<Duration>,
511}
512
513impl TryFromClientResponse for GetAndLockResponse {
514 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
515 let packet = resp.packet();
516 let status = packet.status;
517
518 if status == Status::KeyNotFound {
519 return Err(ServerError::new(
520 ServerErrorKind::KeyNotFound,
521 packet.op_code,
522 packet.status,
523 packet.opaque,
524 )
525 .into());
526 } else if status == Status::Locked {
527 return Err(ServerError::new(
528 ServerErrorKind::Locked,
529 packet.op_code,
530 packet.status,
531 packet.opaque,
532 )
533 .into());
534 } else if status != Status::Success {
535 return Err(OpsCrud::decode_common_error(&packet));
536 }
537
538 let flags = parse_flags(&packet.extras)?;
539
540 let server_duration = if let Some(f) = &packet.framing_extras {
541 decode_res_ext_frames(f)?
542 } else {
543 None
544 };
545
546 let value = packet.value.unwrap_or_default();
547
548 Ok(GetAndLockResponse {
549 cas: packet.cas.unwrap_or_default(),
550 flags,
551 value,
552 datatype: packet.datatype,
553 server_duration,
554 })
555 }
556}
557
558impl TraceAttributes for GetAndLockResponse {
559 fn server_duration(&self) -> Option<Duration> {
560 self.server_duration
561 }
562}
563
564#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
565pub struct GetAndTouchResponse {
566 pub cas: u64,
567 pub flags: u32,
568 pub value: Bytes,
569 pub datatype: u8,
570 pub server_duration: Option<Duration>,
571}
572
573impl TryFromClientResponse for GetAndTouchResponse {
574 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
575 let packet = resp.packet();
576 let status = packet.status;
577
578 if status == Status::KeyNotFound {
579 return Err(ServerError::new(
580 ServerErrorKind::KeyNotFound,
581 packet.op_code,
582 packet.status,
583 packet.opaque,
584 )
585 .into());
586 } else if status == Status::Locked {
587 return Err(ServerError::new(
588 ServerErrorKind::Locked,
589 packet.op_code,
590 packet.status,
591 packet.opaque,
592 )
593 .into());
594 } else if status != Status::Success {
595 return Err(OpsCrud::decode_common_error(&packet));
596 }
597
598 let flags = parse_flags(&packet.extras)?;
599
600 let server_duration = if let Some(f) = &packet.framing_extras {
601 decode_res_ext_frames(f)?
602 } else {
603 None
604 };
605
606 let value = packet.value.unwrap_or_default();
607
608 Ok(GetAndTouchResponse {
609 cas: packet.cas.unwrap_or_default(),
610 flags,
611 value,
612 datatype: packet.datatype,
613 server_duration,
614 })
615 }
616}
617
618impl TraceAttributes for GetAndTouchResponse {
619 fn server_duration(&self) -> Option<Duration> {
620 self.server_duration
621 }
622}
623
624#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
625pub struct UnlockResponse {
626 pub server_duration: Option<Duration>,
627}
628
629impl TryFromClientResponse for UnlockResponse {
630 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
631 let packet = resp.packet();
632 let status = packet.status;
633
634 if status == Status::KeyNotFound {
635 return Err(ServerError::new(
636 ServerErrorKind::KeyNotFound,
637 packet.op_code,
638 packet.status,
639 packet.opaque,
640 )
641 .into());
642 } else if status == Status::Locked {
643 return Err(ServerError::new(
644 ServerErrorKind::CasMismatch,
645 packet.op_code,
646 packet.status,
647 packet.opaque,
648 )
649 .into());
650 } else if status == Status::NotLocked {
651 return Err(ServerError::new(
652 ServerErrorKind::NotLocked,
653 packet.op_code,
654 packet.status,
655 packet.opaque,
656 )
657 .into());
658 } else if status != Status::Success {
659 return Err(OpsCrud::decode_common_error(&packet));
660 }
661
662 let server_duration = if let Some(f) = &packet.framing_extras {
663 decode_res_ext_frames(f)?
664 } else {
665 None
666 };
667
668 Ok(UnlockResponse { server_duration })
669 }
670}
671
672impl TraceAttributes for UnlockResponse {
673 fn server_duration(&self) -> Option<Duration> {
674 self.server_duration
675 }
676}
677
678#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
679pub struct TouchResponse {
680 pub cas: u64,
681 pub server_duration: Option<Duration>,
682}
683
684impl TryFromClientResponse for TouchResponse {
685 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
686 let packet = resp.packet();
687 let status = packet.status;
688
689 if status == Status::KeyNotFound {
690 return Err(ServerError::new(
691 ServerErrorKind::KeyNotFound,
692 packet.op_code,
693 packet.status,
694 packet.opaque,
695 )
696 .into());
697 } else if status == Status::Locked {
698 return Err(ServerError::new(
699 ServerErrorKind::Locked,
700 packet.op_code,
701 packet.status,
702 packet.opaque,
703 )
704 .into());
705 } else if status != Status::Success {
706 return Err(OpsCrud::decode_common_error(&packet));
707 }
708
709 if let Some(extras) = &packet.extras {
710 if !extras.is_empty() {
711 return Err(Error::new_protocol_error("bad extras length"));
712 }
713 }
714
715 let server_duration = if let Some(f) = &packet.framing_extras {
716 decode_res_ext_frames(f)?
717 } else {
718 None
719 };
720
721 Ok(TouchResponse {
722 cas: packet.cas.unwrap_or_default(),
723 server_duration,
724 })
725 }
726}
727
728impl TraceAttributes for TouchResponse {
729 fn server_duration(&self) -> Option<Duration> {
730 self.server_duration
731 }
732}
733
734#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
735pub struct AddResponse {
736 pub cas: u64,
737 pub mutation_token: Option<MutationToken>,
738 pub server_duration: Option<Duration>,
739}
740
741impl TryFromClientResponse for AddResponse {
742 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
743 let packet = resp.packet();
744 let status = packet.status;
745
746 let kind = if status == Status::TooBig {
747 Some(ServerErrorKind::TooBig)
748 } else if status == Status::Locked {
749 Some(ServerErrorKind::Locked)
750 } else if status == Status::KeyExists {
751 Some(ServerErrorKind::KeyExists)
752 } else if status == Status::Success {
753 None
754 } else {
755 return Err(OpsCrud::decode_common_mutation_error(&packet));
756 };
757
758 if let Some(kind) = kind {
759 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
760 }
761
762 let mutation_token = if let Some(extras) = &packet.extras {
763 Some(MutationToken::try_from(extras.as_ref())?)
764 } else {
765 None
766 };
767
768 let server_duration = if let Some(f) = &packet.framing_extras {
769 decode_res_ext_frames(f)?
770 } else {
771 None
772 };
773
774 Ok(AddResponse {
775 cas: packet.cas.unwrap_or_default(),
776 mutation_token,
777 server_duration,
778 })
779 }
780}
781
782impl TraceAttributes for AddResponse {
783 fn server_duration(&self) -> Option<Duration> {
784 self.server_duration
785 }
786}
787
788#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
789pub struct ReplaceResponse {
790 pub cas: u64,
791 pub mutation_token: Option<MutationToken>,
792 pub server_duration: Option<Duration>,
793}
794
795impl TryFromClientResponse for ReplaceResponse {
796 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
797 let packet = resp.packet();
798 let status = packet.status;
799
800 let kind = if status == Status::TooBig {
801 Some(ServerErrorKind::TooBig)
802 } else if status == Status::KeyNotFound {
803 Some(ServerErrorKind::KeyNotFound)
804 } else if status == Status::Locked {
805 Some(ServerErrorKind::Locked)
806 } else if status == Status::KeyExists {
807 Some(ServerErrorKind::CasMismatch)
808 } else if status == Status::Success {
809 None
810 } else {
811 return Err(OpsCrud::decode_common_mutation_error(&packet));
812 };
813
814 if let Some(kind) = kind {
815 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
816 }
817
818 let mutation_token = if let Some(extras) = &packet.extras {
819 Some(MutationToken::try_from(extras.as_ref())?)
820 } else {
821 None
822 };
823
824 let server_duration = if let Some(f) = &packet.framing_extras {
825 decode_res_ext_frames(f)?
826 } else {
827 None
828 };
829
830 Ok(ReplaceResponse {
831 cas: packet.cas.unwrap_or_default(),
832 mutation_token,
833 server_duration,
834 })
835 }
836}
837
838impl TraceAttributes for ReplaceResponse {
839 fn server_duration(&self) -> Option<Duration> {
840 self.server_duration
841 }
842}
843
844#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
845pub struct AppendResponse {
846 pub cas: u64,
847 pub mutation_token: Option<MutationToken>,
848 pub server_duration: Option<Duration>,
849}
850
851impl TryFromClientResponse for AppendResponse {
852 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
853 let cas = resp
854 .response_context()
855 .expect("response did not have a response context")
856 .cas;
857 let packet = resp.packet();
858 let status = packet.status;
859
860 let kind = if status == Status::TooBig {
861 Some(ServerErrorKind::TooBig)
862 } else if status == Status::NotStored {
863 Some(ServerErrorKind::NotStored)
864 } else if status == Status::Locked {
865 Some(ServerErrorKind::Locked)
866 } else if status == Status::KeyExists && cas.is_some() {
867 Some(ServerErrorKind::CasMismatch)
870 } else if status == Status::Success {
871 None
872 } else {
873 return Err(OpsCrud::decode_common_mutation_error(&packet));
874 };
875
876 if let Some(kind) = kind {
877 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
878 }
879
880 let mutation_token = if let Some(extras) = &packet.extras {
881 Some(MutationToken::try_from(extras.as_ref())?)
882 } else {
883 None
884 };
885
886 let server_duration = if let Some(f) = &packet.framing_extras {
887 decode_res_ext_frames(f)?
888 } else {
889 None
890 };
891
892 Ok(AppendResponse {
893 cas: packet.cas.unwrap_or_default(),
894 mutation_token,
895 server_duration,
896 })
897 }
898}
899
900impl TraceAttributes for AppendResponse {
901 fn server_duration(&self) -> Option<Duration> {
902 self.server_duration
903 }
904}
905
906#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
907pub struct PrependResponse {
908 pub cas: u64,
909 pub mutation_token: Option<MutationToken>,
910 pub server_duration: Option<Duration>,
911}
912
913impl TryFromClientResponse for PrependResponse {
914 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
915 let cas = resp
916 .response_context()
917 .expect("response did not have a response context")
918 .cas;
919 let packet = resp.packet();
920 let status = packet.status;
921
922 let kind = if status == Status::TooBig {
923 Some(ServerErrorKind::TooBig)
924 } else if status == Status::NotStored {
925 Some(ServerErrorKind::NotStored)
926 } else if status == Status::Locked {
927 Some(ServerErrorKind::Locked)
928 } else if status == Status::KeyExists && cas.is_some() {
929 Some(ServerErrorKind::CasMismatch)
932 } else if status == Status::Success {
933 None
934 } else {
935 return Err(OpsCrud::decode_common_mutation_error(&packet));
936 };
937
938 if let Some(kind) = kind {
939 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
940 }
941
942 let mutation_token = if let Some(extras) = &packet.extras {
943 Some(MutationToken::try_from(extras.as_ref())?)
944 } else {
945 None
946 };
947
948 let server_duration = if let Some(f) = &packet.framing_extras {
949 decode_res_ext_frames(f)?
950 } else {
951 None
952 };
953
954 Ok(PrependResponse {
955 cas: packet.cas.unwrap_or_default(),
956 mutation_token,
957 server_duration,
958 })
959 }
960}
961
962impl TraceAttributes for PrependResponse {
963 fn server_duration(&self) -> Option<Duration> {
964 self.server_duration
965 }
966}
967
968#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
969pub struct IncrementResponse {
970 pub cas: u64,
971 pub value: u64,
972 pub mutation_token: Option<MutationToken>,
973 pub server_duration: Option<Duration>,
974}
975
976impl TryFromClientResponse for IncrementResponse {
977 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
978 let packet = resp.packet();
979 let status = packet.status;
980
981 let kind = if status == Status::KeyNotFound {
982 Some(ServerErrorKind::KeyNotFound)
983 } else if status == Status::Locked {
984 Some(ServerErrorKind::Locked)
985 } else if status == Status::BadDelta {
986 Some(ServerErrorKind::BadDelta)
987 } else if status == Status::Success {
988 None
989 } else {
990 return Err(OpsCrud::decode_common_mutation_error(&packet));
991 };
992
993 if let Some(kind) = kind {
994 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
995 }
996
997 let value = if let Some(val) = &packet.value {
998 if val.len() != 8 {
999 return Err(Error::new_protocol_error(
1000 "bad counter value length in response",
1001 ));
1002 }
1003
1004 u64::from_be_bytes(val[..].try_into().unwrap())
1005 } else {
1006 0
1007 };
1008
1009 let mutation_token = if let Some(extras) = &packet.extras {
1010 Some(MutationToken::try_from(extras.as_ref())?)
1011 } else {
1012 None
1013 };
1014
1015 let server_duration = if let Some(f) = &packet.framing_extras {
1016 decode_res_ext_frames(f)?
1017 } else {
1018 None
1019 };
1020
1021 Ok(IncrementResponse {
1022 cas: packet.cas.unwrap_or_default(),
1023 value,
1024 mutation_token,
1025 server_duration,
1026 })
1027 }
1028}
1029
1030impl TraceAttributes for IncrementResponse {
1031 fn server_duration(&self) -> Option<Duration> {
1032 self.server_duration
1033 }
1034}
1035
1036#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
1037pub struct DecrementResponse {
1038 pub cas: u64,
1039 pub value: u64,
1040 pub mutation_token: Option<MutationToken>,
1041 pub server_duration: Option<Duration>,
1042}
1043
1044impl TryFromClientResponse for DecrementResponse {
1045 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
1046 let packet = resp.packet();
1047 let status = packet.status;
1048
1049 let kind = if status == Status::KeyNotFound {
1050 Some(ServerErrorKind::KeyNotFound)
1051 } else if status == Status::Locked {
1052 Some(ServerErrorKind::Locked)
1053 } else if status == Status::BadDelta {
1054 Some(ServerErrorKind::BadDelta)
1055 } else if status == Status::Success {
1056 None
1057 } else {
1058 return Err(OpsCrud::decode_common_mutation_error(&packet));
1059 };
1060
1061 if let Some(kind) = kind {
1062 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
1063 }
1064
1065 let value = if let Some(val) = &packet.value {
1066 if val.len() != 8 {
1067 return Err(Error::new_protocol_error(
1068 "bad counter value length in response",
1069 ));
1070 }
1071
1072 u64::from_be_bytes(val[..].try_into().unwrap())
1073 } else {
1074 0
1075 };
1076
1077 let mutation_token = if let Some(extras) = &packet.extras {
1078 Some(MutationToken::try_from(extras.as_ref())?)
1079 } else {
1080 None
1081 };
1082
1083 let server_duration = if let Some(f) = &packet.framing_extras {
1084 decode_res_ext_frames(f)?
1085 } else {
1086 None
1087 };
1088
1089 Ok(DecrementResponse {
1090 cas: packet.cas.unwrap_or_default(),
1091 value,
1092 mutation_token,
1093 server_duration,
1094 })
1095 }
1096}
1097
1098impl TraceAttributes for DecrementResponse {
1099 fn server_duration(&self) -> Option<Duration> {
1100 self.server_duration
1101 }
1102}
1103
1104pub struct LookupInResponse {
1105 pub cas: u64,
1106 pub ops: Vec<SubDocResult>,
1107 pub doc_is_deleted: bool,
1108 pub server_duration: Option<Duration>,
1109}
1110
1111impl TryFromClientResponse for LookupInResponse {
1112 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
1113 let subdoc_info = resp
1114 .response_context()
1115 .expect("response did not have a response context")
1116 .subdoc_info
1117 .expect("missing subdoc info in response context");
1118 let packet = resp.packet();
1119 let cas = packet.cas;
1120 let status = packet.status;
1121
1122 if status == Status::KeyNotFound {
1123 return Err(ServerError::new(
1124 ServerErrorKind::KeyNotFound,
1125 packet.op_code,
1126 packet.status,
1127 packet.opaque,
1128 )
1129 .into());
1130 } else if status == Status::Locked {
1131 return Err(ServerError::new(
1132 ServerErrorKind::Locked,
1133 packet.op_code,
1134 packet.status,
1135 packet.opaque,
1136 )
1137 .into());
1138 } else if status == Status::SubDocInvalidCombo {
1139 return Err(ServerError::new(
1140 ServerErrorKind::Subdoc {
1141 error: SubdocError::new(SubdocErrorKind::InvalidCombo, None),
1142 },
1143 packet.op_code,
1144 packet.status,
1145 packet.opaque,
1146 )
1147 .into());
1148 } else if status == Status::SubDocInvalidXattrOrder {
1149 return Err(ServerError::new(
1150 ServerErrorKind::Subdoc {
1151 error: SubdocError::new(SubdocErrorKind::InvalidXattrOrder, None),
1152 },
1153 packet.op_code,
1154 packet.status,
1155 packet.opaque,
1156 )
1157 .into());
1158 } else if status == Status::SubDocXattrInvalidKeyCombo {
1159 return Err(ServerError::new(
1160 ServerErrorKind::Subdoc {
1161 error: SubdocError::new(SubdocErrorKind::XattrInvalidKeyCombo, None),
1162 },
1163 packet.op_code,
1164 packet.status,
1165 packet.opaque,
1166 )
1167 .into());
1168 } else if status == Status::SubDocXattrInvalidFlagCombo {
1169 return Err(ServerError::new(
1170 ServerErrorKind::Subdoc {
1171 error: SubdocError::new(SubdocErrorKind::XattrInvalidFlagCombo, None),
1172 },
1173 packet.op_code,
1174 packet.status,
1175 packet.opaque,
1176 )
1177 .into());
1178 }
1179
1180 let mut doc_is_deleted = false;
1181
1182 if status == Status::SubDocSuccessDeleted || status == Status::SubDocMultiPathFailureDeleted
1183 {
1184 doc_is_deleted = true;
1185 } else if status != Status::Success && status != Status::SubDocMultiPathFailure {
1187 return Err(OpsCrud::decode_common_error(&packet));
1188 }
1189
1190 let mut results: Vec<SubDocResult> = Vec::with_capacity(subdoc_info.op_count as usize);
1191 let mut op_index = 0;
1192
1193 let value = packet
1194 .value
1195 .as_ref()
1196 .ok_or_else(|| Error::new_protocol_error("missing value"))?;
1197
1198 let mut cursor = Cursor::new(value);
1199 while cursor.position() < cursor.get_ref().len() as u64 {
1200 if cursor.remaining() < 6 {
1201 return Err(Error::new_protocol_error("bad value length"));
1202 }
1203
1204 let res_status = cursor.read_u16::<BigEndian>()?;
1205 let res_status = Status::from(res_status);
1206 let res_value_len = cursor.read_u32::<BigEndian>()?;
1207
1208 if cursor.remaining() < res_value_len as usize {
1209 return Err(Error::new_protocol_error("bad value length"));
1210 }
1211
1212 let value = if res_value_len > 0 {
1213 let mut tmp_val = vec![0; res_value_len as usize];
1214 cursor.read_exact(&mut tmp_val)?;
1215 Some(tmp_val)
1216 } else {
1217 None
1218 };
1219
1220 let err_kind: Option<SubdocErrorKind> = match res_status {
1221 Status::Success => None,
1222 Status::SubDocDocTooDeep => Some(SubdocErrorKind::DocTooDeep),
1223 Status::SubDocNotJSON => Some(SubdocErrorKind::NotJSON),
1224 Status::SubDocPathNotFound => Some(SubdocErrorKind::PathNotFound),
1225 Status::SubDocPathMismatch => Some(SubdocErrorKind::PathMismatch),
1226 Status::SubDocPathInvalid => Some(SubdocErrorKind::PathInvalid),
1227 Status::SubDocPathTooBig => Some(SubdocErrorKind::PathTooBig),
1228 Status::SubDocXattrUnknownVAttr => Some(SubdocErrorKind::XattrUnknownVAttr),
1229 _ => Some(SubdocErrorKind::UnknownStatus { status: res_status }),
1230 };
1231
1232 let err = err_kind.map(|kind| {
1233 ServerError::new(
1234 ServerErrorKind::Subdoc {
1235 error: SubdocError::new(kind, op_index),
1236 },
1237 packet.op_code,
1238 packet.status,
1239 packet.opaque,
1240 )
1241 .into()
1242 });
1243
1244 results.push(SubDocResult { value, err });
1245 op_index += 1;
1246 }
1247
1248 let server_duration = if let Some(f) = &packet.framing_extras {
1249 decode_res_ext_frames(f)?
1250 } else {
1251 None
1252 };
1253
1254 Ok(LookupInResponse {
1255 cas: cas.unwrap_or_default(),
1256 ops: results,
1257 doc_is_deleted,
1258 server_duration,
1259 })
1260 }
1261}
1262
1263impl TraceAttributes for LookupInResponse {
1264 fn server_duration(&self) -> Option<Duration> {
1265 self.server_duration
1266 }
1267}
1268
1269pub struct MutateInResponse {
1270 pub cas: u64,
1271 pub ops: Vec<SubDocResult>,
1272 pub doc_is_deleted: bool,
1273 pub mutation_token: Option<MutationToken>,
1274 pub server_duration: Option<Duration>,
1275}
1276
1277impl TryFromClientResponse for MutateInResponse {
1278 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
1279 let subdoc_info = resp
1280 .response_context()
1281 .expect("response did not have a response context")
1282 .subdoc_info
1283 .expect("missing subdoc info in response context");
1284
1285 let packet = resp.packet();
1286 let cas = packet.cas;
1287 let status = packet.status;
1288
1289 let kind = if status == Status::KeyNotFound {
1290 Some(ServerErrorKind::KeyNotFound)
1291 } else if status == Status::KeyExists && cas.is_some() {
1292 Some(ServerErrorKind::CasMismatch)
1293 } else if status == Status::KeyExists {
1294 Some(ServerErrorKind::KeyExists)
1295 } else if status == Status::Locked {
1296 Some(ServerErrorKind::Locked)
1297 } else if status == Status::TooBig {
1298 Some(ServerErrorKind::TooBig)
1299 } else if status == Status::SubDocInvalidCombo {
1300 Some(ServerErrorKind::Subdoc {
1301 error: SubdocError::new(SubdocErrorKind::InvalidCombo, None),
1302 })
1303 } else if status == Status::SubDocInvalidXattrOrder {
1304 Some(ServerErrorKind::Subdoc {
1305 error: SubdocError::new(SubdocErrorKind::InvalidXattrOrder, None),
1306 })
1307 } else if status == Status::SubDocXattrInvalidKeyCombo {
1308 Some(ServerErrorKind::Subdoc {
1309 error: SubdocError::new(SubdocErrorKind::XattrInvalidKeyCombo, None),
1310 })
1311 } else if status == Status::SubDocXattrInvalidFlagCombo {
1312 Some(ServerErrorKind::Subdoc {
1313 error: SubdocError::new(SubdocErrorKind::XattrInvalidFlagCombo, None),
1314 })
1315 } else if status == Status::SubDocXattrUnknownMacro {
1316 Some(ServerErrorKind::Subdoc {
1317 error: SubdocError::new(SubdocErrorKind::XattrUnknownMacro, None),
1318 })
1319 } else if status == Status::SubDocXattrUnknownVattrMacro {
1320 Some(ServerErrorKind::Subdoc {
1321 error: SubdocError::new(SubdocErrorKind::XattrUnknownVattrMacro, None),
1322 })
1323 } else if status == Status::SubDocXattrCannotModifyVAttr {
1324 Some(ServerErrorKind::Subdoc {
1325 error: SubdocError::new(SubdocErrorKind::XattrCannotModifyVAttr, None),
1326 })
1327 } else if status == Status::SubDocCanOnlyReviveDeletedDocuments {
1328 Some(ServerErrorKind::Subdoc {
1329 error: SubdocError::new(SubdocErrorKind::CanOnlyReviveDeletedDocuments, None),
1330 })
1331 } else if status == Status::SubDocDeletedDocumentCantHaveValue {
1332 Some(ServerErrorKind::Subdoc {
1333 error: SubdocError::new(SubdocErrorKind::DeletedDocumentCantHaveValue, None),
1334 })
1335 } else if status == Status::NotStored {
1336 if subdoc_info.flags.contains(SubdocDocFlag::AddDoc) {
1337 Some(ServerErrorKind::KeyExists)
1338 } else {
1339 Some(ServerErrorKind::NotStored)
1340 }
1341 } else if status == Status::SubDocMultiPathFailure {
1342 if let Some(value) = &packet.value {
1343 if value.len() != 3 {
1344 return Err(Error::new_protocol_error("bad value length"));
1345 }
1346
1347 let mut cursor = Cursor::new(value);
1348 let op_index = cursor.read_u8()?;
1349 let res_status = cursor.read_u16::<BigEndian>()?;
1350
1351 let res_status = Status::from(res_status);
1352
1353 let err_kind: SubdocErrorKind = match res_status {
1354 Status::SubDocDocTooDeep => SubdocErrorKind::DocTooDeep,
1355 Status::SubDocNotJSON => SubdocErrorKind::NotJSON,
1356 Status::SubDocPathNotFound => SubdocErrorKind::PathNotFound,
1357 Status::SubDocPathMismatch => SubdocErrorKind::PathMismatch,
1358 Status::SubDocPathInvalid => SubdocErrorKind::PathInvalid,
1359 Status::SubDocPathTooBig => SubdocErrorKind::PathTooBig,
1360 Status::SubDocPathExists => SubdocErrorKind::PathExists,
1361 Status::SubDocCantInsert => SubdocErrorKind::CantInsert,
1362 Status::SubDocBadRange => SubdocErrorKind::BadRange,
1363 Status::SubDocBadDelta => SubdocErrorKind::BadDelta,
1364 Status::SubDocValueTooDeep => SubdocErrorKind::ValueTooDeep,
1365 _ => SubdocErrorKind::UnknownStatus { status: res_status },
1366 };
1367
1368 Some(ServerErrorKind::Subdoc {
1369 error: SubdocError::new(err_kind, Some(op_index)),
1370 })
1371 } else {
1372 return Err(Error::new_protocol_error("bad value length"));
1373 }
1374 } else {
1375 None
1376 };
1377
1378 if let Some(kind) = kind {
1379 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
1380 }
1381
1382 let mut doc_is_deleted = false;
1383 if status == Status::SubDocSuccessDeleted {
1384 doc_is_deleted = true;
1385 } else if status != Status::Success && status != Status::SubDocMultiPathFailure {
1387 return Err(OpsCrud::decode_common_mutation_error(&packet));
1388 }
1389
1390 let mut results: Vec<SubDocResult> = Vec::with_capacity(subdoc_info.op_count as usize);
1391
1392 if let Some(value) = &packet.value {
1393 let mut cursor = Cursor::new(value);
1394
1395 while cursor.position() < cursor.get_ref().len() as u64 {
1396 if cursor.remaining() < 3 {
1397 return Err(Error::new_protocol_error("bad value length"));
1398 }
1399
1400 let op_index = cursor.read_u8()?;
1401
1402 if op_index > results.len() as u8 {
1403 for _ in results.len() as u8..op_index {
1404 results.push(SubDocResult {
1405 err: None,
1406 value: None,
1407 });
1408 }
1409 }
1410
1411 let op_status = cursor.read_u16::<BigEndian>()?;
1412 let op_status = Status::from(op_status);
1413
1414 if op_status == Status::Success {
1415 let val_length = cursor.read_u32::<BigEndian>()?;
1416
1417 let mut value = vec![0; val_length as usize];
1418 cursor.read_exact(&mut value)?;
1419
1420 results.push(SubDocResult {
1421 err: None,
1422 value: Some(value),
1423 });
1424 } else {
1425 return Err(Error::new_protocol_error(
1426 "subdoc mutatein op illegally provided an error",
1427 ));
1428 }
1429 }
1430 }
1431
1432 if results.len() < subdoc_info.op_count as usize {
1433 for _ in results.len()..subdoc_info.op_count as usize {
1434 results.push(SubDocResult {
1435 err: None,
1436 value: None,
1437 });
1438 }
1439 }
1440
1441 let mutation_token = if let Some(extras) = &packet.extras {
1442 if extras.len() != 16 {
1443 return Err(Error::new_protocol_error("bad extras length"));
1444 }
1445
1446 let (vbuuid_bytes, seqno_bytes) = extras.split_at(size_of::<u64>());
1447 let vbuuid = u64::from_be_bytes(vbuuid_bytes.try_into().unwrap());
1448 let seqno = u64::from_be_bytes(seqno_bytes.try_into().unwrap());
1449
1450 Some(MutationToken { vbuuid, seqno })
1451 } else {
1452 None
1453 };
1454
1455 let server_duration = if let Some(f) = &packet.framing_extras {
1456 decode_res_ext_frames(f)?
1457 } else {
1458 None
1459 };
1460
1461 Ok(MutateInResponse {
1462 cas: cas.unwrap_or_default(),
1463 ops: results,
1464 mutation_token,
1465 doc_is_deleted,
1466 server_duration,
1467 })
1468 }
1469}
1470
1471impl TraceAttributes for MutateInResponse {
1472 fn server_duration(&self) -> Option<Duration> {
1473 self.server_duration
1474 }
1475}
1476
1477#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
1478pub struct GetCollectionIdResponse {
1479 pub manifest_rev: u64,
1480 pub collection_id: u32,
1481}
1482
1483impl TryFromClientResponse for GetCollectionIdResponse {
1484 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
1485 let (scope_name, collection_name) = {
1486 let context = resp
1487 .response_context()
1488 .expect("response did not have a response context");
1489 let scope_name = context.scope_name.clone().expect("missing scope name");
1490 let collection_name = context
1491 .collection_name
1492 .clone()
1493 .expect("missing collection name");
1494
1495 (scope_name, collection_name)
1496 };
1497 let packet = resp.packet();
1498 let status = packet.status;
1499
1500 if status == Status::ScopeUnknown {
1501 return Err(ResourceError::new(
1502 ServerError::new(
1503 ServerErrorKind::UnknownScopeName,
1504 packet.op_code,
1505 packet.status,
1506 packet.opaque,
1507 ),
1508 scope_name,
1509 collection_name,
1510 )
1511 .into());
1512 } else if status == Status::CollectionUnknown {
1513 return Err(ResourceError::new(
1514 ServerError::new(
1515 ServerErrorKind::UnknownCollectionName,
1516 packet.op_code,
1517 packet.status,
1518 packet.opaque,
1519 ),
1520 scope_name,
1521 collection_name,
1522 )
1523 .into());
1524 } else if status != Status::Success {
1525 return Err(OpsCore::decode_error(&packet));
1526 }
1527
1528 let extras = if let Some(extras) = &packet.extras {
1529 if extras.len() != 12 {
1530 return Err(Error::new_protocol_error("invalid extras length"));
1531 }
1532 extras
1533 } else {
1534 return Err(Error::new_protocol_error("no extras in response"));
1535 };
1536
1537 let mut extras = Cursor::new(extras);
1538 let manifest_rev = extras.read_u64::<BigEndian>()?;
1539 let collection_id = extras.read_u32::<BigEndian>()?;
1540
1541 Ok(GetCollectionIdResponse {
1542 manifest_rev,
1543 collection_id,
1544 })
1545 }
1546}
1547
1548#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
1549pub struct PingResponse {}
1550
1551impl TryFromClientResponse for PingResponse {
1552 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
1553 let packet = resp.packet();
1554 let status = packet.status;
1555
1556 if status != Status::Success {
1557 return Err(OpsCore::decode_error(&packet));
1558 }
1559
1560 Ok(PingResponse {})
1561 }
1562}
1563
1564impl TraceAttributes for PingResponse {
1565 fn server_duration(&self) -> Option<Duration> {
1566 None
1567 }
1568}