couchbase_core/memdx/
codec.rs1use tokio_util::bytes::{Buf, BufMut, BytesMut};
20use tokio_util::codec::{Decoder, Encoder};
21
22use crate::memdx::error::Error;
23use crate::memdx::magic::Magic;
24use crate::memdx::opcode::OpCode;
25use crate::memdx::packet::{RequestPacket, ResponsePacket};
26use crate::memdx::status::Status;
27
28pub const HEADER_SIZE: usize = 24;
29
30#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
31pub struct KeyValueCodec(());
32
33impl Decoder for KeyValueCodec {
34 type Item = ResponsePacket;
35 type Error = Error;
36
37 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
38 let buf_len = buf.len();
39
40 if buf_len < HEADER_SIZE {
41 return Ok(None);
42 }
43
44 let total_body_len = match buf[8..12].try_into() {
45 Ok(v) => u32::from_be_bytes(v),
46 Err(e) => {
47 return Err(Error::new_protocol_error("failed to read total body length").with(e));
48 }
49 } as usize;
50
51 if buf_len < (HEADER_SIZE + total_body_len) {
52 buf.reserve(HEADER_SIZE + total_body_len);
53 return Ok(None);
54 }
55
56 let mut slice = buf.split_to(HEADER_SIZE + total_body_len);
57
58 let magic = Magic::try_from(slice.get_u8())?;
60 let flexible = magic.is_extended();
61
62 let opcode = OpCode::try_from(slice.get_u8())?;
64
65 let flexible_extras_len = if flexible {
66 slice.get_u8()
68 } else {
69 0
70 } as usize;
71
72 let key_len = if flexible {
73 slice.get_u8() as u16
75 } else {
76 slice.get_u16()
78 } as usize;
79
80 let extras_len = slice.get_u8() as usize;
82 let datatype = slice.get_u8();
84 let status = Status::from(slice.get_u16());
86
87 let total_body_len = slice.get_u32() as usize;
89 let opaque = slice.get_u32();
91 let cas = slice.get_u64();
93 let body_len = total_body_len - key_len - extras_len - flexible_extras_len;
94
95 let mut packet = ResponsePacket::new(magic, opcode, datatype, status, opaque);
96 packet.cas = Some(cas);
97
98 let mut payload_pos = 0;
99
100 if flexible_extras_len > 0 {
101 packet.framing_extras =
102 Some(slice[payload_pos..(payload_pos + flexible_extras_len)].to_vec());
103 payload_pos += flexible_extras_len;
104 }
105
106 if extras_len > 0 {
107 packet.extras = Some(slice[payload_pos..(payload_pos + extras_len)].to_vec());
108 payload_pos += extras_len;
109 };
110
111 if key_len > 0 {
112 packet.key = Some(slice[payload_pos..(payload_pos + key_len)].to_vec());
113 payload_pos += key_len;
114 };
115
116 if body_len > 0 {
117 packet.value = Some(slice[payload_pos..].to_vec());
118 };
119
120 Ok(Some(packet))
121 }
122}
123
124impl Encoder<RequestPacket<'_>> for KeyValueCodec {
125 type Error = Error;
126
127 fn encode(&mut self, item: RequestPacket, dst: &mut BytesMut) -> Result<(), Self::Error> {
128 let key = item.key;
129 let extras = item.extras;
130 let framing_extras = item.framing_extras;
131 let body = item.value;
132
133 let key_size = if let Some(k) = &key { k.len() } else { 0 };
134 let extras_size = if let Some(e) = &extras { e.len() } else { 0 };
135 let framing_extras_size = if let Some(e) = &framing_extras {
136 e.len()
137 } else {
138 0
139 };
140 let body_size = if let Some(b) = &body { b.len() } else { 0 };
141
142 let total_body_size = key_size + extras_size + framing_extras_size + body_size;
143
144 dst.reserve(HEADER_SIZE + total_body_size);
145
146 dst.put_u8(item.magic.into());
147 dst.put_u8(item.op_code.into());
148 if framing_extras.is_some() {
149 if key_size > u8::MAX as usize {
150 return Err(Error::new_invalid_argument_error(
151 "key size too large",
152 "key".to_string(),
153 ));
154 }
155
156 if framing_extras_size > u8::MAX as usize {
157 return Err(Error::new_invalid_argument_error(
158 "frame extras too large",
159 "frame extras".to_string(),
160 ));
161 }
162
163 dst.put_u8(framing_extras_size as u8);
164 dst.put_u8(key_size as u8);
165 } else {
166 if key_size > u16::MAX as usize {
167 return Err(Error::new_invalid_argument_error(
168 "key size too large",
169 "key".to_string(),
170 ));
171 }
172
173 dst.put_u16(key_size as u16);
174 }
175 dst.put_u8(extras_size as u8);
176 dst.put_u8(item.datatype);
177 dst.put_u16(item.vbucket_id.unwrap_or_default());
178 dst.put_u32(total_body_size as u32);
179 dst.put_u32(item.opaque.unwrap_or_default());
180 dst.put_u64(item.cas.unwrap_or_default());
181
182 if let Some(framing_extras) = framing_extras {
183 dst.extend(framing_extras);
184 }
185
186 if let Some(extras) = extras {
187 dst.extend(extras);
188 }
189
190 if let Some(key) = key {
191 dst.extend(key);
192 }
193
194 if let Some(body) = body {
195 dst.extend(body);
196 }
197
198 Ok(())
199 }
200}