couchbase_core/memdx/
codec.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use tokio_util::bytes::{Buf, BufMut, BytesMut};
20use tokio_util::codec::{Decoder, Encoder};
21
22use crate::memdx::error::Error;
23use crate::memdx::magic::Magic;
24use crate::memdx::opcode::OpCode;
25use crate::memdx::packet::{RequestPacket, ResponsePacket};
26use crate::memdx::status::Status;
27
28pub const HEADER_SIZE: usize = 24;
29
30#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
31pub struct KeyValueCodec(());
32
33impl Decoder for KeyValueCodec {
34    type Item = ResponsePacket;
35    type Error = Error;
36
37    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
38        let buf_len = buf.len();
39
40        if buf_len < HEADER_SIZE {
41            return Ok(None);
42        }
43
44        let total_body_len = match buf[8..12].try_into() {
45            Ok(v) => u32::from_be_bytes(v),
46            Err(e) => {
47                return Err(Error::new_protocol_error("failed to read total body length").with(e));
48            }
49        } as usize;
50
51        if buf_len < (HEADER_SIZE + total_body_len) {
52            buf.reserve(HEADER_SIZE + total_body_len);
53            return Ok(None);
54        }
55
56        let mut slice = buf.split_to(HEADER_SIZE + total_body_len);
57
58        // 0
59        let magic = Magic::try_from(slice.get_u8())?;
60        let flexible = magic.is_extended();
61
62        // 1
63        let opcode = OpCode::try_from(slice.get_u8())?;
64
65        let flexible_extras_len = if flexible {
66            // 2
67            slice.get_u8()
68        } else {
69            0
70        } as usize;
71
72        let key_len = if flexible {
73            // 3
74            slice.get_u8() as u16
75        } else {
76            // 2, 3
77            slice.get_u16()
78        } as usize;
79
80        // 4
81        let extras_len = slice.get_u8() as usize;
82        // 5
83        let datatype = slice.get_u8();
84        // 6, 7
85        let status = Status::from(slice.get_u16());
86
87        // 8, 9
88        let total_body_len = slice.get_u32() as usize;
89        // 10, 11, 12, 13
90        let opaque = slice.get_u32();
91        // 14, 15, 16, 17, 18, 19, 20, 21
92        let cas = slice.get_u64();
93        let body_len = total_body_len - key_len - extras_len - flexible_extras_len;
94
95        let mut packet = ResponsePacket::new(magic, opcode, datatype, status, opaque);
96        packet.cas = Some(cas);
97
98        let mut payload_pos = 0;
99
100        if flexible_extras_len > 0 {
101            packet.framing_extras =
102                Some(slice[payload_pos..(payload_pos + flexible_extras_len)].to_vec());
103            payload_pos += flexible_extras_len;
104        }
105
106        if extras_len > 0 {
107            packet.extras = Some(slice[payload_pos..(payload_pos + extras_len)].to_vec());
108            payload_pos += extras_len;
109        };
110
111        if key_len > 0 {
112            packet.key = Some(slice[payload_pos..(payload_pos + key_len)].to_vec());
113            payload_pos += key_len;
114        };
115
116        if body_len > 0 {
117            packet.value = Some(slice[payload_pos..].to_vec());
118        };
119
120        Ok(Some(packet))
121    }
122}
123
124impl Encoder<RequestPacket<'_>> for KeyValueCodec {
125    type Error = Error;
126
127    fn encode(&mut self, item: RequestPacket, dst: &mut BytesMut) -> Result<(), Self::Error> {
128        let key = item.key;
129        let extras = item.extras;
130        let framing_extras = item.framing_extras;
131        let body = item.value;
132
133        let key_size = if let Some(k) = &key { k.len() } else { 0 };
134        let extras_size = if let Some(e) = &extras { e.len() } else { 0 };
135        let framing_extras_size = if let Some(e) = &framing_extras {
136            e.len()
137        } else {
138            0
139        };
140        let body_size = if let Some(b) = &body { b.len() } else { 0 };
141
142        let total_body_size = key_size + extras_size + framing_extras_size + body_size;
143
144        dst.reserve(HEADER_SIZE + total_body_size);
145
146        dst.put_u8(item.magic.into());
147        dst.put_u8(item.op_code.into());
148        if framing_extras.is_some() {
149            if key_size > u8::MAX as usize {
150                return Err(Error::new_invalid_argument_error(
151                    "key size too large",
152                    "key".to_string(),
153                ));
154            }
155
156            if framing_extras_size > u8::MAX as usize {
157                return Err(Error::new_invalid_argument_error(
158                    "frame extras too large",
159                    "frame extras".to_string(),
160                ));
161            }
162
163            dst.put_u8(framing_extras_size as u8);
164            dst.put_u8(key_size as u8);
165        } else {
166            if key_size > u16::MAX as usize {
167                return Err(Error::new_invalid_argument_error(
168                    "key size too large",
169                    "key".to_string(),
170                ));
171            }
172
173            dst.put_u16(key_size as u16);
174        }
175        dst.put_u8(extras_size as u8);
176        dst.put_u8(item.datatype);
177        dst.put_u16(item.vbucket_id.unwrap_or_default());
178        dst.put_u32(total_body_size as u32);
179        dst.put_u32(item.opaque.unwrap_or_default());
180        dst.put_u64(item.cas.unwrap_or_default());
181
182        if let Some(framing_extras) = framing_extras {
183            dst.extend(framing_extras);
184        }
185
186        if let Some(extras) = extras {
187            dst.extend(extras);
188        }
189
190        if let Some(key) = key {
191            dst.extend(key);
192        }
193
194        if let Some(body) = body {
195            dst.extend(body);
196        }
197
198        Ok(())
199    }
200}