couchbase_core/memdx/
codec.rs1use tokio_util::bytes::{Buf, BufMut, BytesMut};
20use tokio_util::codec::{Decoder, Encoder};
21
22use crate::memdx::error::Error;
23use crate::memdx::magic::Magic;
24use crate::memdx::opcode::OpCode;
25use crate::memdx::packet::{RequestPacket, ResponsePacket};
26use crate::memdx::status::Status;
27
28pub const HEADER_SIZE: usize = 24;
29
30#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
31pub struct KeyValueCodec(());
32
33impl Decoder for KeyValueCodec {
34 type Item = ResponsePacket;
35 type Error = Error;
36
37 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
38 let buf_len = buf.len();
39
40 if buf_len < HEADER_SIZE {
41 return Ok(None);
42 }
43
44 let total_body_len = match buf[8..12].try_into() {
45 Ok(v) => u32::from_be_bytes(v),
46 Err(e) => {
47 return Err(Error::new_protocol_error("failed to read total body length").with(e));
48 }
49 } as usize;
50
51 if buf_len < (HEADER_SIZE + total_body_len) {
52 buf.reserve(HEADER_SIZE + total_body_len);
53 return Ok(None);
54 }
55
56 let mut slice = buf.split_to(HEADER_SIZE + total_body_len);
57
58 let magic = Magic::try_from(slice.get_u8())?;
60 let flexible = magic.is_extended();
61
62 let opcode = OpCode::try_from(slice.get_u8())?;
64
65 let flexible_extras_len = if flexible {
66 slice.get_u8()
68 } else {
69 0
70 } as usize;
71
72 let key_len = if flexible {
73 slice.get_u8() as u16
75 } else {
76 slice.get_u16()
78 } as usize;
79
80 let extras_len = slice.get_u8() as usize;
82 let datatype = slice.get_u8();
84 let status = Status::from(slice.get_u16());
86
87 let total_body_len = slice.get_u32() as usize;
89 let opaque = slice.get_u32();
91 let cas = slice.get_u64();
93 let body_len = total_body_len - key_len - extras_len - flexible_extras_len;
94
95 let mut packet = ResponsePacket::new(magic, opcode, datatype, status, opaque);
96 packet.cas = Some(cas);
97
98 if flexible_extras_len > 0 {
99 packet.framing_extras = Some(slice.split_to(flexible_extras_len).freeze());
100 }
101
102 if extras_len > 0 {
103 packet.extras = Some(slice.split_to(extras_len).freeze());
104 };
105
106 if key_len > 0 {
107 packet.key = Some(slice.split_to(key_len).freeze());
108 };
109
110 if body_len > 0 {
111 packet.value = Some(slice.freeze());
112 };
113
114 Ok(Some(packet))
115 }
116}
117
118impl Encoder<RequestPacket<'_>> for KeyValueCodec {
119 type Error = Error;
120
121 fn encode(&mut self, item: RequestPacket, dst: &mut BytesMut) -> Result<(), Self::Error> {
122 let key = item.key;
123 let extras = item.extras;
124 let framing_extras = item.framing_extras;
125 let body = item.value;
126
127 let key_size = if let Some(k) = &key { k.len() } else { 0 };
128 let extras_size = if let Some(e) = &extras { e.len() } else { 0 };
129 let framing_extras_size = if let Some(e) = framing_extras {
130 e.len()
131 } else {
132 0
133 };
134 let body_size = if let Some(b) = &body { b.len() } else { 0 };
135
136 let total_body_size = key_size + extras_size + framing_extras_size + body_size;
137
138 dst.reserve(HEADER_SIZE + total_body_size);
139
140 dst.put_u8(item.magic.into());
141 dst.put_u8(item.op_code.into());
142 if framing_extras.is_some() {
143 if key_size > u8::MAX as usize {
144 return Err(Error::new_invalid_argument_error(
145 "key size too large",
146 "key".to_string(),
147 ));
148 }
149
150 if framing_extras_size > u8::MAX as usize {
151 return Err(Error::new_invalid_argument_error(
152 "frame extras too large",
153 "frame extras".to_string(),
154 ));
155 }
156
157 dst.put_u8(framing_extras_size as u8);
158 dst.put_u8(key_size as u8);
159 } else {
160 if key_size > u16::MAX as usize {
161 return Err(Error::new_invalid_argument_error(
162 "key size too large",
163 "key".to_string(),
164 ));
165 }
166
167 dst.put_u16(key_size as u16);
168 }
169 dst.put_u8(extras_size as u8);
170 dst.put_u8(item.datatype);
171 dst.put_u16(item.vbucket_id.unwrap_or_default());
172 dst.put_u32(total_body_size as u32);
173 dst.put_u32(item.opaque.unwrap_or_default());
174 dst.put_u64(item.cas.unwrap_or_default());
175
176 if let Some(framing_extras) = framing_extras {
177 dst.extend_from_slice(framing_extras);
178 }
179
180 if let Some(extras) = extras {
181 dst.extend_from_slice(extras);
182 }
183
184 if let Some(key) = key {
185 dst.extend_from_slice(key);
186 }
187
188 if let Some(body) = body {
189 dst.extend_from_slice(body);
190 }
191
192 Ok(())
193 }
194}