1use std::io::{Cursor, Read};
20use std::time::Duration;
21
22use crate::memdx::auth_mechanism::AuthMechanism;
23use crate::memdx::client_response::ClientResponse;
24use crate::memdx::error::{
25 Error, ResourceError, ServerError, ServerErrorKind, SubdocError, SubdocErrorKind,
26};
27use crate::memdx::extframe::decode_res_ext_frames;
28use crate::memdx::hello_feature::HelloFeature;
29use crate::memdx::ops_core::OpsCore;
30use crate::memdx::ops_crud::OpsCrud;
31use crate::memdx::status::Status;
32use crate::memdx::subdoc::{SubDocResult, SubdocDocFlag};
33use byteorder::{BigEndian, ReadBytesExt};
34use tokio_io::Buf;
35
36pub trait TryFromClientResponse: Sized {
37 fn try_from(resp: ClientResponse) -> Result<Self, Error>;
38}
39
40#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
41pub struct HelloResponse {
42 pub enabled_features: Vec<HelloFeature>,
43}
44
45impl TryFromClientResponse for HelloResponse {
46 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
47 let packet = resp.packet();
48 let status = packet.status;
49 if status != Status::Success {
50 return Err(OpsCore::decode_error(&packet));
51 }
52
53 let mut features: Vec<HelloFeature> = Vec::new();
54 if let Some(value) = &packet.value {
55 if value.len() % 2 != 0 {
56 return Err(Error::new_protocol_error("invalid hello features length"));
57 }
58
59 let mut cursor = Cursor::new(value);
60 while let Ok(code) = cursor.read_u16::<BigEndian>() {
61 features.push(HelloFeature::from(code));
62 }
63 }
64 let response = HelloResponse {
65 enabled_features: features,
66 };
67
68 Ok(response)
69 }
70}
71
72#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
73pub struct GetErrorMapResponse {
74 pub error_map: Vec<u8>,
75}
76
77impl TryFromClientResponse for GetErrorMapResponse {
78 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
79 let packet = resp.packet();
80 let status = packet.status;
81 if status != Status::Success {
82 return Err(OpsCore::decode_error(&packet));
83 }
84
85 let value = packet.value.unwrap_or_default();
86 let response = GetErrorMapResponse { error_map: value };
87
88 Ok(response)
89 }
90}
91
92#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
93pub struct SelectBucketResponse {}
94
95impl TryFromClientResponse for SelectBucketResponse {
96 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
97 let packet = resp.packet();
98 let status = packet.status;
99 if status != Status::Success {
100 if status == Status::AccessError || status == Status::KeyNotFound {
101 return Err(ServerError::new(
102 ServerErrorKind::UnknownBucketName,
103 packet.op_code,
104 status,
105 packet.opaque,
106 )
107 .into());
108 }
109 return Err(OpsCore::decode_error(&packet));
110 }
111
112 Ok(SelectBucketResponse {})
113 }
114}
115
116#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
117pub struct SASLAuthResponse {
118 pub needs_more_steps: bool,
119 pub payload: Vec<u8>,
120}
121
122impl TryFromClientResponse for SASLAuthResponse {
123 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
124 let packet = resp.packet();
125 let status = packet.status;
126 if status == Status::AuthContinue {
127 return Ok(SASLAuthResponse {
128 needs_more_steps: true,
129 payload: packet.value.unwrap_or_default(),
130 });
131 }
132
133 if status != Status::Success {
134 return Err(OpsCore::decode_error(&packet));
135 }
136
137 Ok(SASLAuthResponse {
138 needs_more_steps: false,
139 payload: packet.value.unwrap_or_default(),
140 })
141 }
142}
143
144#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
145pub struct SASLStepResponse {
146 pub needs_more_steps: bool,
147 pub payload: Vec<u8>,
148}
149
150impl TryFromClientResponse for SASLStepResponse {
151 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
152 let packet = resp.packet();
153 let status = packet.status;
154 if status != Status::Success {
155 return Err(OpsCore::decode_error(&packet));
156 }
157
158 Ok(SASLStepResponse {
159 needs_more_steps: false,
160 payload: packet.value.unwrap_or_default(),
161 })
162 }
163}
164
165#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
166pub struct SASLListMechsResponse {
167 pub available_mechs: Vec<AuthMechanism>,
168}
169
170impl TryFromClientResponse for SASLListMechsResponse {
171 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
172 let packet = resp.packet();
173 let status = packet.status;
174 if status != Status::Success {
175 if status == Status::KeyNotFound {
176 return Err(ServerError::new(
181 ServerErrorKind::ConfigNotSet,
182 packet.op_code,
183 status,
184 packet.opaque,
185 )
186 .into());
187 }
188 return Err(OpsCore::decode_error(&packet));
189 }
190
191 let value = packet.value.unwrap_or_default();
192 let mechs_list_string = match String::from_utf8(value) {
193 Ok(v) => v,
194 Err(e) => {
195 return Err(Error::new_protocol_error(
196 "failed to parse authentication mechanism list",
197 )
198 .with(e));
199 }
200 };
201 let mechs_list_split = mechs_list_string.split(' ');
202 let mut mechs_list = Vec::new();
203 for item in mechs_list_split {
204 mechs_list.push(AuthMechanism::try_from(item)?);
205 }
206
207 Ok(SASLListMechsResponse {
208 available_mechs: mechs_list,
209 })
210 }
211}
212
213#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
214pub struct GetClusterConfigResponse {
215 pub config: Vec<u8>,
216}
217
218impl TryFromClientResponse for GetClusterConfigResponse {
219 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
220 let packet = resp.packet();
221 let status = packet.status;
222 if status != Status::Success {
223 return Err(OpsCore::decode_error(&packet));
224 }
225
226 Ok(GetClusterConfigResponse {
227 config: packet.value.clone().unwrap_or_default(),
228 })
229 }
230}
231
232#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
233pub struct BootstrapResult {
234 pub hello: Option<HelloResponse>,
235 pub error_map: Option<GetErrorMapResponse>,
236 pub cluster_config: Option<GetClusterConfigResponse>,
237}
238
239#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
240pub struct MutationToken {
241 pub vbuuid: u64,
242 pub seqno: u64,
243}
244
245impl TryFrom<&Vec<u8>> for MutationToken {
246 type Error = Error;
247
248 fn try_from(value: &Vec<u8>) -> Result<Self, Self::Error> {
249 if value.len() != 16 {
250 return Err(Error::new_protocol_error("bad extras length"));
251 }
252
253 let (vbuuid_bytes, seqno_bytes) = value.split_at(size_of::<u64>());
254 let vbuuid = u64::from_be_bytes(vbuuid_bytes.try_into().unwrap());
255 let seqno = u64::from_be_bytes(seqno_bytes.try_into().unwrap());
256
257 Ok(MutationToken { vbuuid, seqno })
258 }
259}
260
261#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
262pub struct SetResponse {
263 pub cas: u64,
264 pub mutation_token: Option<MutationToken>,
265 pub server_duration: Option<Duration>,
266}
267
268impl TryFromClientResponse for SetResponse {
269 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
270 let packet = resp.packet();
271 let status = packet.status;
272
273 let kind = if status == Status::TooBig {
274 Some(ServerErrorKind::TooBig)
275 } else if status == Status::Locked {
276 Some(ServerErrorKind::Locked)
277 } else if status == Status::KeyExists {
278 Some(ServerErrorKind::CasMismatch)
279 } else if status == Status::Success {
280 None
281 } else {
282 return Err(OpsCrud::decode_common_mutation_error(&packet));
283 };
284
285 if let Some(kind) = kind {
286 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
287 }
288
289 let mutation_token = if let Some(extras) = &packet.extras {
290 Some(MutationToken::try_from(extras)?)
291 } else {
292 None
293 };
294
295 let server_duration = if let Some(f) = &packet.framing_extras {
296 decode_res_ext_frames(f)?
297 } else {
298 None
299 };
300
301 Ok(SetResponse {
302 cas: packet.cas.unwrap_or_default(),
303 mutation_token,
304 server_duration,
305 })
306 }
307}
308
309fn parse_flags(extras: &Option<Vec<u8>>) -> Result<u32, Error> {
310 if let Some(extras) = &extras {
311 if extras.len() != 4 {
312 return Err(Error::new_protocol_error("bad extras length reading flags"));
313 }
314
315 Ok(u32::from_be_bytes(extras.as_slice().try_into().unwrap()))
316 } else {
317 Err(Error::new_protocol_error("no extras in response"))
318 }
319}
320
321#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
322pub struct GetResponse {
323 pub cas: u64,
324 pub flags: u32,
325 pub value: Vec<u8>,
326 pub datatype: u8,
327 pub server_duration: Option<Duration>,
328}
329
330impl TryFromClientResponse for GetResponse {
331 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
332 let packet = resp.packet();
333 let status = packet.status;
334
335 if status == Status::KeyNotFound {
336 return Err(ServerError::new(
337 ServerErrorKind::KeyNotFound,
338 packet.op_code,
339 packet.status,
340 packet.opaque,
341 )
342 .into());
343 } else if status != Status::Success {
344 return Err(OpsCrud::decode_common_error(&packet));
345 }
346
347 let flags = parse_flags(&packet.extras)?;
348
349 let server_duration = if let Some(f) = &packet.framing_extras {
350 decode_res_ext_frames(f)?
351 } else {
352 None
353 };
354
355 let value = packet.value.unwrap_or_default();
356
357 Ok(GetResponse {
358 cas: packet.cas.unwrap_or_default(),
359 flags,
360 value,
361 datatype: packet.datatype,
362 server_duration,
363 })
364 }
365}
366
367#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
368pub struct GetMetaResponse {
369 pub cas: u64,
370 pub flags: u32,
371 pub value: Vec<u8>,
372 pub datatype: u8,
373 pub server_duration: Option<Duration>,
374 pub expiry: u32,
375 pub seq_no: u64,
376 pub deleted: bool,
377}
378
379impl TryFromClientResponse for GetMetaResponse {
380 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
381 let packet = resp.packet();
382 let status = packet.status;
383
384 if status == Status::KeyNotFound {
385 return Err(ServerError::new(
386 ServerErrorKind::KeyNotFound,
387 packet.op_code,
388 packet.status,
389 packet.opaque,
390 )
391 .into());
392 } else if status != Status::Success {
393 return Err(OpsCrud::decode_common_error(&packet));
394 }
395
396 let server_duration = if let Some(f) = &packet.framing_extras {
397 decode_res_ext_frames(f)?
398 } else {
399 None
400 };
401
402 let value = packet.value.unwrap_or_default();
403
404 if let Some(extras) = &packet.extras {
405 if extras.len() != 21 {
406 return Err(Error::new_protocol_error("bad extras length"));
407 }
408
409 let mut extras = Cursor::new(extras);
410 let deleted = extras.read_u32::<BigEndian>()?;
411 let flags = extras.read_u32::<BigEndian>()?;
412 let expiry = extras.read_u32::<BigEndian>()?;
413 let seq_no = extras.read_u64::<BigEndian>()?;
414 let datatype = extras.read_u8()?;
415
416 Ok(GetMetaResponse {
417 cas: packet.cas.unwrap_or_default(),
418 flags,
419 value,
420 datatype,
421 server_duration,
422 expiry,
423 seq_no,
424 deleted: deleted != 0,
425 })
426 } else {
427 Err(Error::new_protocol_error("no extras in response"))
428 }
429 }
430}
431
432#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
433pub struct DeleteResponse {
434 pub cas: u64,
435 pub mutation_token: Option<MutationToken>,
436 pub server_duration: Option<Duration>,
437}
438
439impl TryFromClientResponse for DeleteResponse {
440 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
441 let packet = resp.packet();
442 let status = packet.status;
443
444 let kind = if status == Status::KeyNotFound {
445 Some(ServerErrorKind::KeyNotFound)
446 } else if status == Status::Locked {
447 Some(ServerErrorKind::Locked)
448 } else if status == Status::KeyExists {
449 Some(ServerErrorKind::CasMismatch)
450 } else if status == Status::Success {
451 None
452 } else {
453 return Err(OpsCrud::decode_common_mutation_error(&packet));
454 };
455
456 if let Some(kind) = kind {
457 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
458 }
459
460 let mutation_token = if let Some(extras) = &packet.extras {
461 Some(MutationToken::try_from(extras)?)
462 } else {
463 None
464 };
465
466 let server_duration = if let Some(f) = &packet.framing_extras {
467 decode_res_ext_frames(f)?
468 } else {
469 None
470 };
471
472 Ok(DeleteResponse {
473 cas: packet.cas.unwrap_or_default(),
474 mutation_token,
475 server_duration,
476 })
477 }
478}
479
480#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
481pub struct GetAndLockResponse {
482 pub cas: u64,
483 pub flags: u32,
484 pub value: Vec<u8>,
485 pub datatype: u8,
486 pub server_duration: Option<Duration>,
487}
488
489impl TryFromClientResponse for GetAndLockResponse {
490 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
491 let packet = resp.packet();
492 let status = packet.status;
493
494 if status == Status::KeyNotFound {
495 return Err(ServerError::new(
496 ServerErrorKind::KeyNotFound,
497 packet.op_code,
498 packet.status,
499 packet.opaque,
500 )
501 .into());
502 } else if status == Status::Locked {
503 return Err(ServerError::new(
504 ServerErrorKind::Locked,
505 packet.op_code,
506 packet.status,
507 packet.opaque,
508 )
509 .into());
510 } else if status != Status::Success {
511 return Err(OpsCrud::decode_common_error(&packet));
512 }
513
514 let flags = parse_flags(&packet.extras)?;
515
516 let server_duration = if let Some(f) = &packet.framing_extras {
517 decode_res_ext_frames(f)?
518 } else {
519 None
520 };
521
522 let value = packet.value.unwrap_or_default();
523
524 Ok(GetAndLockResponse {
525 cas: packet.cas.unwrap_or_default(),
526 flags,
527 value,
528 datatype: packet.datatype,
529 server_duration,
530 })
531 }
532}
533
534#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
535pub struct GetAndTouchResponse {
536 pub cas: u64,
537 pub flags: u32,
538 pub value: Vec<u8>,
539 pub datatype: u8,
540 pub server_duration: Option<Duration>,
541}
542
543impl TryFromClientResponse for GetAndTouchResponse {
544 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
545 let packet = resp.packet();
546 let status = packet.status;
547
548 if status == Status::KeyNotFound {
549 return Err(ServerError::new(
550 ServerErrorKind::KeyNotFound,
551 packet.op_code,
552 packet.status,
553 packet.opaque,
554 )
555 .into());
556 } else if status == Status::Locked {
557 return Err(ServerError::new(
558 ServerErrorKind::Locked,
559 packet.op_code,
560 packet.status,
561 packet.opaque,
562 )
563 .into());
564 } else if status != Status::Success {
565 return Err(OpsCrud::decode_common_error(&packet));
566 }
567
568 let flags = parse_flags(&packet.extras)?;
569
570 let server_duration = if let Some(f) = &packet.framing_extras {
571 decode_res_ext_frames(f)?
572 } else {
573 None
574 };
575
576 let value = packet.value.unwrap_or_default();
577
578 Ok(GetAndTouchResponse {
579 cas: packet.cas.unwrap_or_default(),
580 flags,
581 value,
582 datatype: packet.datatype,
583 server_duration,
584 })
585 }
586}
587
588#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
589pub struct UnlockResponse {
590 pub server_duration: Option<Duration>,
591}
592
593impl TryFromClientResponse for UnlockResponse {
594 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
595 let packet = resp.packet();
596 let status = packet.status;
597
598 if status == Status::KeyNotFound {
599 return Err(ServerError::new(
600 ServerErrorKind::KeyNotFound,
601 packet.op_code,
602 packet.status,
603 packet.opaque,
604 )
605 .into());
606 } else if status == Status::Locked {
607 return Err(ServerError::new(
608 ServerErrorKind::CasMismatch,
609 packet.op_code,
610 packet.status,
611 packet.opaque,
612 )
613 .into());
614 } else if status == Status::NotLocked {
615 return Err(ServerError::new(
616 ServerErrorKind::NotLocked,
617 packet.op_code,
618 packet.status,
619 packet.opaque,
620 )
621 .into());
622 } else if status != Status::Success {
623 return Err(OpsCrud::decode_common_error(&packet));
624 }
625
626 let server_duration = if let Some(f) = &packet.framing_extras {
627 decode_res_ext_frames(f)?
628 } else {
629 None
630 };
631
632 Ok(UnlockResponse { server_duration })
633 }
634}
635
636#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
637pub struct TouchResponse {
638 pub cas: u64,
639 pub server_duration: Option<Duration>,
640}
641
642impl TryFromClientResponse for TouchResponse {
643 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
644 let packet = resp.packet();
645 let status = packet.status;
646
647 if status == Status::KeyNotFound {
648 return Err(ServerError::new(
649 ServerErrorKind::KeyNotFound,
650 packet.op_code,
651 packet.status,
652 packet.opaque,
653 )
654 .into());
655 } else if status == Status::Locked {
656 return Err(ServerError::new(
657 ServerErrorKind::Locked,
658 packet.op_code,
659 packet.status,
660 packet.opaque,
661 )
662 .into());
663 } else if status != Status::Success {
664 return Err(OpsCrud::decode_common_error(&packet));
665 }
666
667 if let Some(extras) = &packet.extras {
668 if !extras.is_empty() {
669 return Err(Error::new_protocol_error("bad extras length"));
670 }
671 }
672
673 let server_duration = if let Some(f) = &packet.framing_extras {
674 decode_res_ext_frames(f)?
675 } else {
676 None
677 };
678
679 Ok(TouchResponse {
680 cas: packet.cas.unwrap_or_default(),
681 server_duration,
682 })
683 }
684}
685
686#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
687pub struct AddResponse {
688 pub cas: u64,
689 pub mutation_token: Option<MutationToken>,
690 pub server_duration: Option<Duration>,
691}
692
693impl TryFromClientResponse for AddResponse {
694 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
695 let packet = resp.packet();
696 let status = packet.status;
697
698 let kind = if status == Status::TooBig {
699 Some(ServerErrorKind::TooBig)
700 } else if status == Status::Locked {
701 Some(ServerErrorKind::Locked)
702 } else if status == Status::KeyExists {
703 Some(ServerErrorKind::KeyExists)
704 } else if status == Status::Success {
705 None
706 } else {
707 return Err(OpsCrud::decode_common_mutation_error(&packet));
708 };
709
710 if let Some(kind) = kind {
711 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
712 }
713
714 let mutation_token = if let Some(extras) = &packet.extras {
715 Some(MutationToken::try_from(extras)?)
716 } else {
717 None
718 };
719
720 let server_duration = if let Some(f) = &packet.framing_extras {
721 decode_res_ext_frames(f)?
722 } else {
723 None
724 };
725
726 Ok(AddResponse {
727 cas: packet.cas.unwrap_or_default(),
728 mutation_token,
729 server_duration,
730 })
731 }
732}
733
734#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
735pub struct ReplaceResponse {
736 pub cas: u64,
737 pub mutation_token: Option<MutationToken>,
738 pub server_duration: Option<Duration>,
739}
740
741impl TryFromClientResponse for ReplaceResponse {
742 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
743 let packet = resp.packet();
744 let status = packet.status;
745
746 let kind = if status == Status::TooBig {
747 Some(ServerErrorKind::TooBig)
748 } else if status == Status::KeyNotFound {
749 Some(ServerErrorKind::KeyNotFound)
750 } else if status == Status::Locked {
751 Some(ServerErrorKind::Locked)
752 } else if status == Status::KeyExists {
753 Some(ServerErrorKind::CasMismatch)
754 } else if status == Status::Success {
755 None
756 } else {
757 return Err(OpsCrud::decode_common_mutation_error(&packet));
758 };
759
760 if let Some(kind) = kind {
761 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
762 }
763
764 let mutation_token = if let Some(extras) = &packet.extras {
765 Some(MutationToken::try_from(extras)?)
766 } else {
767 None
768 };
769
770 let server_duration = if let Some(f) = &packet.framing_extras {
771 decode_res_ext_frames(f)?
772 } else {
773 None
774 };
775
776 Ok(ReplaceResponse {
777 cas: packet.cas.unwrap_or_default(),
778 mutation_token,
779 server_duration,
780 })
781 }
782}
783
784#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
785pub struct AppendResponse {
786 pub cas: u64,
787 pub mutation_token: Option<MutationToken>,
788 pub server_duration: Option<Duration>,
789}
790
791impl TryFromClientResponse for AppendResponse {
792 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
793 let cas = resp.response_context().cas;
794 let packet = resp.packet();
795 let status = packet.status;
796
797 let kind = if status == Status::TooBig {
798 Some(ServerErrorKind::TooBig)
799 } else if status == Status::NotStored {
800 Some(ServerErrorKind::NotStored)
801 } else if status == Status::Locked {
802 Some(ServerErrorKind::Locked)
803 } else if status == Status::KeyExists && cas.is_some() {
804 Some(ServerErrorKind::CasMismatch)
807 } else if status == Status::Success {
808 None
809 } else {
810 return Err(OpsCrud::decode_common_mutation_error(&packet));
811 };
812
813 if let Some(kind) = kind {
814 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
815 }
816
817 let mutation_token = if let Some(extras) = &packet.extras {
818 Some(MutationToken::try_from(extras)?)
819 } else {
820 None
821 };
822
823 let server_duration = if let Some(f) = &packet.framing_extras {
824 decode_res_ext_frames(f)?
825 } else {
826 None
827 };
828
829 Ok(AppendResponse {
830 cas: packet.cas.unwrap_or_default(),
831 mutation_token,
832 server_duration,
833 })
834 }
835}
836
837#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
838pub struct PrependResponse {
839 pub cas: u64,
840 pub mutation_token: Option<MutationToken>,
841 pub server_duration: Option<Duration>,
842}
843
844impl TryFromClientResponse for PrependResponse {
845 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
846 let cas = resp.response_context().cas;
847 let packet = resp.packet();
848 let status = packet.status;
849
850 let kind = if status == Status::TooBig {
851 Some(ServerErrorKind::TooBig)
852 } else if status == Status::NotStored {
853 Some(ServerErrorKind::NotStored)
854 } else if status == Status::Locked {
855 Some(ServerErrorKind::Locked)
856 } else if status == Status::KeyExists && cas.is_some() {
857 Some(ServerErrorKind::CasMismatch)
860 } else if status == Status::Success {
861 None
862 } else {
863 return Err(OpsCrud::decode_common_mutation_error(&packet));
864 };
865
866 if let Some(kind) = kind {
867 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
868 }
869
870 let mutation_token = if let Some(extras) = &packet.extras {
871 Some(MutationToken::try_from(extras)?)
872 } else {
873 None
874 };
875
876 let server_duration = if let Some(f) = &packet.framing_extras {
877 decode_res_ext_frames(f)?
878 } else {
879 None
880 };
881
882 Ok(PrependResponse {
883 cas: packet.cas.unwrap_or_default(),
884 mutation_token,
885 server_duration,
886 })
887 }
888}
889
890#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
891pub struct IncrementResponse {
892 pub cas: u64,
893 pub value: u64,
894 pub mutation_token: Option<MutationToken>,
895 pub server_duration: Option<Duration>,
896}
897
898impl TryFromClientResponse for IncrementResponse {
899 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
900 let packet = resp.packet();
901 let status = packet.status;
902
903 let kind = if status == Status::KeyNotFound {
904 Some(ServerErrorKind::KeyNotFound)
905 } else if status == Status::Locked {
906 Some(ServerErrorKind::Locked)
907 } else if status == Status::BadDelta {
908 Some(ServerErrorKind::BadDelta)
909 } else if status == Status::Success {
910 None
911 } else {
912 return Err(OpsCrud::decode_common_mutation_error(&packet));
913 };
914
915 if let Some(kind) = kind {
916 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
917 }
918
919 let value = if let Some(val) = &packet.value {
920 if val.len() != 8 {
921 return Err(Error::new_protocol_error(
922 "bad counter value length in response",
923 ));
924 }
925
926 u64::from_be_bytes(val.as_slice().try_into().unwrap())
927 } else {
928 0
929 };
930
931 let mutation_token = if let Some(extras) = &packet.extras {
932 Some(MutationToken::try_from(extras)?)
933 } else {
934 None
935 };
936
937 let server_duration = if let Some(f) = &packet.framing_extras {
938 decode_res_ext_frames(f)?
939 } else {
940 None
941 };
942
943 Ok(IncrementResponse {
944 cas: packet.cas.unwrap_or_default(),
945 value,
946 mutation_token,
947 server_duration,
948 })
949 }
950}
951
952#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
953pub struct DecrementResponse {
954 pub cas: u64,
955 pub value: u64,
956 pub mutation_token: Option<MutationToken>,
957 pub server_duration: Option<Duration>,
958}
959
960impl TryFromClientResponse for DecrementResponse {
961 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
962 let packet = resp.packet();
963 let status = packet.status;
964
965 let kind = if status == Status::KeyNotFound {
966 Some(ServerErrorKind::KeyNotFound)
967 } else if status == Status::Locked {
968 Some(ServerErrorKind::Locked)
969 } else if status == Status::BadDelta {
970 Some(ServerErrorKind::BadDelta)
971 } else if status == Status::Success {
972 None
973 } else {
974 return Err(OpsCrud::decode_common_mutation_error(&packet));
975 };
976
977 if let Some(kind) = kind {
978 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
979 }
980
981 let value = if let Some(val) = &packet.value {
982 if val.len() != 8 {
983 return Err(Error::new_protocol_error(
984 "bad counter value length in response",
985 ));
986 }
987
988 u64::from_be_bytes(val.as_slice().try_into().unwrap())
989 } else {
990 0
991 };
992
993 let mutation_token = if let Some(extras) = &packet.extras {
994 Some(MutationToken::try_from(extras)?)
995 } else {
996 None
997 };
998
999 let server_duration = if let Some(f) = &packet.framing_extras {
1000 decode_res_ext_frames(f)?
1001 } else {
1002 None
1003 };
1004
1005 Ok(DecrementResponse {
1006 cas: packet.cas.unwrap_or_default(),
1007 value,
1008 mutation_token,
1009 server_duration,
1010 })
1011 }
1012}
1013
1014pub struct LookupInResponse {
1015 pub cas: u64,
1016 pub ops: Vec<SubDocResult>,
1017 pub doc_is_deleted: bool,
1018 pub server_duration: Option<Duration>,
1019}
1020
1021impl TryFromClientResponse for LookupInResponse {
1022 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
1023 let subdoc_info = resp
1024 .response_context()
1025 .subdoc_info
1026 .expect("missing subdoc info in response context");
1027 let packet = resp.packet();
1028 let cas = packet.cas;
1029 let status = packet.status;
1030
1031 if status == Status::KeyNotFound {
1032 return Err(ServerError::new(
1033 ServerErrorKind::KeyNotFound,
1034 packet.op_code,
1035 packet.status,
1036 packet.opaque,
1037 )
1038 .into());
1039 } else if status == Status::Locked {
1040 return Err(ServerError::new(
1041 ServerErrorKind::Locked,
1042 packet.op_code,
1043 packet.status,
1044 packet.opaque,
1045 )
1046 .into());
1047 } else if status == Status::SubDocInvalidCombo {
1048 return Err(ServerError::new(
1049 ServerErrorKind::Subdoc {
1050 error: SubdocError::new(SubdocErrorKind::InvalidCombo, None),
1051 },
1052 packet.op_code,
1053 packet.status,
1054 packet.opaque,
1055 )
1056 .into());
1057 } else if status == Status::SubDocInvalidXattrOrder {
1058 return Err(ServerError::new(
1059 ServerErrorKind::Subdoc {
1060 error: SubdocError::new(SubdocErrorKind::InvalidXattrOrder, None),
1061 },
1062 packet.op_code,
1063 packet.status,
1064 packet.opaque,
1065 )
1066 .into());
1067 } else if status == Status::SubDocXattrInvalidKeyCombo {
1068 return Err(ServerError::new(
1069 ServerErrorKind::Subdoc {
1070 error: SubdocError::new(SubdocErrorKind::XattrInvalidKeyCombo, None),
1071 },
1072 packet.op_code,
1073 packet.status,
1074 packet.opaque,
1075 )
1076 .into());
1077 } else if status == Status::SubDocXattrInvalidFlagCombo {
1078 return Err(ServerError::new(
1079 ServerErrorKind::Subdoc {
1080 error: SubdocError::new(SubdocErrorKind::XattrInvalidFlagCombo, None),
1081 },
1082 packet.op_code,
1083 packet.status,
1084 packet.opaque,
1085 )
1086 .into());
1087 }
1088
1089 let mut doc_is_deleted = false;
1090
1091 if status == Status::SubDocSuccessDeleted || status == Status::SubDocMultiPathFailureDeleted
1092 {
1093 doc_is_deleted = true;
1094 } else if status != Status::Success && status != Status::SubDocMultiPathFailure {
1096 return Err(OpsCrud::decode_common_error(&packet));
1097 }
1098
1099 let mut results: Vec<SubDocResult> = Vec::with_capacity(subdoc_info.op_count as usize);
1100 let mut op_index = 0;
1101
1102 let value = packet
1103 .value
1104 .as_ref()
1105 .ok_or_else(|| Error::new_protocol_error("missing value"))?;
1106
1107 let mut cursor = Cursor::new(value);
1108 while cursor.position() < cursor.get_ref().len() as u64 {
1109 if cursor.remaining() < 6 {
1110 return Err(Error::new_protocol_error("bad value length"));
1111 }
1112
1113 let res_status = cursor.read_u16::<BigEndian>()?;
1114 let res_status = Status::from(res_status);
1115 let res_value_len = cursor.read_u32::<BigEndian>()?;
1116
1117 if cursor.remaining() < res_value_len as usize {
1118 return Err(Error::new_protocol_error("bad value length"));
1119 }
1120
1121 let value = if res_value_len > 0 {
1122 let mut tmp_val = vec![0; res_value_len as usize];
1123 cursor.read_exact(&mut tmp_val)?;
1124 Some(tmp_val)
1125 } else {
1126 None
1127 };
1128
1129 let err_kind: Option<SubdocErrorKind> = match res_status {
1130 Status::Success => None,
1131 Status::SubDocDocTooDeep => Some(SubdocErrorKind::DocTooDeep),
1132 Status::SubDocNotJSON => Some(SubdocErrorKind::NotJSON),
1133 Status::SubDocPathNotFound => Some(SubdocErrorKind::PathNotFound),
1134 Status::SubDocPathMismatch => Some(SubdocErrorKind::PathMismatch),
1135 Status::SubDocPathInvalid => Some(SubdocErrorKind::PathInvalid),
1136 Status::SubDocPathTooBig => Some(SubdocErrorKind::PathTooBig),
1137 Status::SubDocXattrUnknownVAttr => Some(SubdocErrorKind::XattrUnknownVAttr),
1138 _ => Some(SubdocErrorKind::UnknownStatus { status: res_status }),
1139 };
1140
1141 let err = err_kind.map(|kind| {
1142 ServerError::new(
1143 ServerErrorKind::Subdoc {
1144 error: SubdocError::new(kind, op_index),
1145 },
1146 packet.op_code,
1147 packet.status,
1148 packet.opaque,
1149 )
1150 .into()
1151 });
1152
1153 results.push(SubDocResult { value, err });
1154 op_index += 1;
1155 }
1156
1157 let server_duration = if let Some(f) = &packet.framing_extras {
1158 decode_res_ext_frames(f)?
1159 } else {
1160 None
1161 };
1162
1163 Ok(LookupInResponse {
1164 cas: cas.unwrap_or_default(),
1165 ops: results,
1166 doc_is_deleted,
1167 server_duration,
1168 })
1169 }
1170}
1171
1172pub struct MutateInResponse {
1173 pub cas: u64,
1174 pub ops: Vec<SubDocResult>,
1175 pub doc_is_deleted: bool,
1176 pub mutation_token: Option<MutationToken>,
1177 pub server_duration: Option<Duration>,
1178}
1179
1180impl TryFromClientResponse for MutateInResponse {
1181 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
1182 let subdoc_info = resp
1183 .response_context()
1184 .subdoc_info
1185 .expect("missing subdoc info in response context");
1186
1187 let packet = resp.packet();
1188 let cas = packet.cas;
1189 let status = packet.status;
1190
1191 let kind = if status == Status::KeyNotFound {
1192 Some(ServerErrorKind::KeyNotFound)
1193 } else if status == Status::KeyExists && cas.is_some() {
1194 Some(ServerErrorKind::CasMismatch)
1195 } else if status == Status::KeyExists {
1196 Some(ServerErrorKind::KeyExists)
1197 } else if status == Status::Locked {
1198 Some(ServerErrorKind::Locked)
1199 } else if status == Status::TooBig {
1200 Some(ServerErrorKind::TooBig)
1201 } else if status == Status::SubDocInvalidCombo {
1202 Some(ServerErrorKind::Subdoc {
1203 error: SubdocError::new(SubdocErrorKind::InvalidCombo, None),
1204 })
1205 } else if status == Status::SubDocInvalidXattrOrder {
1206 Some(ServerErrorKind::Subdoc {
1207 error: SubdocError::new(SubdocErrorKind::InvalidXattrOrder, None),
1208 })
1209 } else if status == Status::SubDocXattrInvalidKeyCombo {
1210 Some(ServerErrorKind::Subdoc {
1211 error: SubdocError::new(SubdocErrorKind::XattrInvalidKeyCombo, None),
1212 })
1213 } else if status == Status::SubDocXattrInvalidFlagCombo {
1214 Some(ServerErrorKind::Subdoc {
1215 error: SubdocError::new(SubdocErrorKind::XattrInvalidFlagCombo, None),
1216 })
1217 } else if status == Status::SubDocXattrUnknownMacro {
1218 Some(ServerErrorKind::Subdoc {
1219 error: SubdocError::new(SubdocErrorKind::XattrUnknownMacro, None),
1220 })
1221 } else if status == Status::SubDocXattrUnknownVattrMacro {
1222 Some(ServerErrorKind::Subdoc {
1223 error: SubdocError::new(SubdocErrorKind::XattrUnknownVattrMacro, None),
1224 })
1225 } else if status == Status::SubDocXattrCannotModifyVAttr {
1226 Some(ServerErrorKind::Subdoc {
1227 error: SubdocError::new(SubdocErrorKind::XattrCannotModifyVAttr, None),
1228 })
1229 } else if status == Status::SubDocCanOnlyReviveDeletedDocuments {
1230 Some(ServerErrorKind::Subdoc {
1231 error: SubdocError::new(SubdocErrorKind::CanOnlyReviveDeletedDocuments, None),
1232 })
1233 } else if status == Status::SubDocDeletedDocumentCantHaveValue {
1234 Some(ServerErrorKind::Subdoc {
1235 error: SubdocError::new(SubdocErrorKind::DeletedDocumentCantHaveValue, None),
1236 })
1237 } else if status == Status::NotStored {
1238 if subdoc_info.flags.contains(SubdocDocFlag::AddDoc) {
1239 Some(ServerErrorKind::KeyExists)
1240 } else {
1241 Some(ServerErrorKind::NotStored)
1242 }
1243 } else if status == Status::SubDocMultiPathFailure {
1244 if let Some(value) = &packet.value {
1245 if value.len() != 3 {
1246 return Err(Error::new_protocol_error("bad value length"));
1247 }
1248
1249 let mut cursor = Cursor::new(value);
1250 let op_index = cursor.read_u8()?;
1251 let res_status = cursor.read_u16::<BigEndian>()?;
1252
1253 let res_status = Status::from(res_status);
1254
1255 let err_kind: SubdocErrorKind = match res_status {
1256 Status::SubDocDocTooDeep => SubdocErrorKind::DocTooDeep,
1257 Status::SubDocNotJSON => SubdocErrorKind::NotJSON,
1258 Status::SubDocPathNotFound => SubdocErrorKind::PathNotFound,
1259 Status::SubDocPathMismatch => SubdocErrorKind::PathMismatch,
1260 Status::SubDocPathInvalid => SubdocErrorKind::PathInvalid,
1261 Status::SubDocPathTooBig => SubdocErrorKind::PathTooBig,
1262 Status::SubDocPathExists => SubdocErrorKind::PathExists,
1263 Status::SubDocCantInsert => SubdocErrorKind::CantInsert,
1264 Status::SubDocBadRange => SubdocErrorKind::BadRange,
1265 Status::SubDocBadDelta => SubdocErrorKind::BadDelta,
1266 Status::SubDocValueTooDeep => SubdocErrorKind::ValueTooDeep,
1267 _ => SubdocErrorKind::UnknownStatus { status: res_status },
1268 };
1269
1270 Some(ServerErrorKind::Subdoc {
1271 error: SubdocError::new(err_kind, Some(op_index)),
1272 })
1273 } else {
1274 return Err(Error::new_protocol_error("bad value length"));
1275 }
1276 } else {
1277 None
1278 };
1279
1280 if let Some(kind) = kind {
1281 return Err(ServerError::new(kind, packet.op_code, status, packet.opaque).into());
1282 }
1283
1284 let mut doc_is_deleted = false;
1285 if status == Status::SubDocSuccessDeleted {
1286 doc_is_deleted = true;
1287 } else if status != Status::Success && status != Status::SubDocMultiPathFailure {
1289 return Err(OpsCrud::decode_common_mutation_error(&packet));
1290 }
1291
1292 let mut results: Vec<SubDocResult> = Vec::with_capacity(subdoc_info.op_count as usize);
1293
1294 if let Some(value) = &packet.value {
1295 let mut cursor = Cursor::new(value);
1296
1297 while cursor.position() < cursor.get_ref().len() as u64 {
1298 if cursor.remaining() < 3 {
1299 return Err(Error::new_protocol_error("bad value length"));
1300 }
1301
1302 let op_index = cursor.read_u8()?;
1303
1304 if op_index > results.len() as u8 {
1305 for _ in results.len() as u8..op_index {
1306 results.push(SubDocResult {
1307 err: None,
1308 value: None,
1309 });
1310 }
1311 }
1312
1313 let op_status = cursor.read_u16::<BigEndian>()?;
1314 let op_status = Status::from(op_status);
1315
1316 if op_status == Status::Success {
1317 let val_length = cursor.read_u32::<BigEndian>()?;
1318
1319 let mut value = vec![0; val_length as usize];
1320 cursor.read_exact(&mut value)?;
1321
1322 results.push(SubDocResult {
1323 err: None,
1324 value: Some(value),
1325 });
1326 } else {
1327 return Err(Error::new_protocol_error(
1328 "subdoc mutatein op illegally provided an error",
1329 ));
1330 }
1331 }
1332 }
1333
1334 if results.len() < subdoc_info.op_count as usize {
1335 for _ in results.len()..subdoc_info.op_count as usize {
1336 results.push(SubDocResult {
1337 err: None,
1338 value: None,
1339 });
1340 }
1341 }
1342
1343 let mutation_token = if let Some(extras) = &packet.extras {
1344 if extras.len() != 16 {
1345 return Err(Error::new_protocol_error("bad extras length"));
1346 }
1347
1348 let (vbuuid_bytes, seqno_bytes) = extras.split_at(size_of::<u64>());
1349 let vbuuid = u64::from_be_bytes(vbuuid_bytes.try_into().unwrap());
1350 let seqno = u64::from_be_bytes(seqno_bytes.try_into().unwrap());
1351
1352 Some(MutationToken { vbuuid, seqno })
1353 } else {
1354 None
1355 };
1356
1357 let server_duration = if let Some(f) = &packet.framing_extras {
1358 decode_res_ext_frames(f)?
1359 } else {
1360 None
1361 };
1362
1363 Ok(MutateInResponse {
1364 cas: cas.unwrap_or_default(),
1365 ops: results,
1366 mutation_token,
1367 doc_is_deleted,
1368 server_duration,
1369 })
1370 }
1371}
1372
1373#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
1374pub struct GetCollectionIdResponse {
1375 pub manifest_rev: u64,
1376 pub collection_id: u32,
1377}
1378
1379impl TryFromClientResponse for GetCollectionIdResponse {
1380 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
1381 let context = resp.response_context();
1382 let scope_name = context.scope_name.as_ref().expect("missing scope name");
1383 let collection_name = context
1384 .collection_name
1385 .as_ref()
1386 .expect("missing collection name");
1387 let packet = resp.packet();
1388 let status = packet.status;
1389
1390 if status == Status::ScopeUnknown {
1391 return Err(ResourceError::new(
1392 ServerError::new(
1393 ServerErrorKind::UnknownScopeName,
1394 packet.op_code,
1395 packet.status,
1396 packet.opaque,
1397 ),
1398 scope_name,
1399 collection_name,
1400 )
1401 .into());
1402 } else if status == Status::CollectionUnknown {
1403 return Err(ResourceError::new(
1404 ServerError::new(
1405 ServerErrorKind::UnknownCollectionName,
1406 packet.op_code,
1407 packet.status,
1408 packet.opaque,
1409 ),
1410 scope_name,
1411 collection_name,
1412 )
1413 .into());
1414 } else if status != Status::Success {
1415 return Err(OpsCore::decode_error(&packet));
1416 }
1417
1418 let extras = if let Some(extras) = &packet.extras {
1419 if extras.len() != 12 {
1420 return Err(Error::new_protocol_error("invalid extras length"));
1421 }
1422 extras
1423 } else {
1424 return Err(Error::new_protocol_error("no extras in response"));
1425 };
1426
1427 let mut extras = Cursor::new(extras);
1428 let manifest_rev = extras.read_u64::<BigEndian>()?;
1429 let collection_id = extras.read_u32::<BigEndian>()?;
1430
1431 Ok(GetCollectionIdResponse {
1432 manifest_rev,
1433 collection_id,
1434 })
1435 }
1436}
1437
1438#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
1439pub struct PingResponse {}
1440
1441impl TryFromClientResponse for PingResponse {
1442 fn try_from(resp: ClientResponse) -> Result<Self, Error> {
1443 let packet = resp.packet();
1444 let status = packet.status;
1445
1446 if status != Status::Success {
1447 return Err(OpsCore::decode_error(&packet));
1448 }
1449
1450 Ok(PingResponse {})
1451 }
1452}