Skip to main content

couchbase_core/memdx/
codec.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use tokio_util::bytes::{Buf, BufMut, BytesMut};
20use tokio_util::codec::{Decoder, Encoder};
21
22use crate::memdx::error::Error;
23use crate::memdx::magic::Magic;
24use crate::memdx::opcode::OpCode;
25use crate::memdx::packet::{RequestPacket, ResponsePacket};
26use crate::memdx::status::Status;
27
28pub const HEADER_SIZE: usize = 24;
29
30#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
31pub struct KeyValueCodec(());
32
33impl Decoder for KeyValueCodec {
34    type Item = ResponsePacket;
35    type Error = Error;
36
37    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
38        let buf_len = buf.len();
39
40        if buf_len < HEADER_SIZE {
41            return Ok(None);
42        }
43
44        let total_body_len = match buf[8..12].try_into() {
45            Ok(v) => u32::from_be_bytes(v),
46            Err(e) => {
47                return Err(Error::new_protocol_error("failed to read total body length").with(e));
48            }
49        } as usize;
50
51        if buf_len < (HEADER_SIZE + total_body_len) {
52            buf.reserve(HEADER_SIZE + total_body_len);
53            return Ok(None);
54        }
55
56        let mut slice = buf.split_to(HEADER_SIZE + total_body_len);
57
58        // 0
59        let magic = Magic::try_from(slice.get_u8())?;
60        let flexible = magic.is_extended();
61
62        // 1
63        let opcode = OpCode::try_from(slice.get_u8())?;
64
65        let flexible_extras_len = if flexible {
66            // 2
67            slice.get_u8()
68        } else {
69            0
70        } as usize;
71
72        let key_len = if flexible {
73            // 3
74            slice.get_u8() as u16
75        } else {
76            // 2, 3
77            slice.get_u16()
78        } as usize;
79
80        // 4
81        let extras_len = slice.get_u8() as usize;
82        // 5
83        let datatype = slice.get_u8();
84        // 6, 7
85        let status = Status::from(slice.get_u16());
86
87        // 8, 9
88        let total_body_len = slice.get_u32() as usize;
89        // 10, 11, 12, 13
90        let opaque = slice.get_u32();
91        // 14, 15, 16, 17, 18, 19, 20, 21
92        let cas = slice.get_u64();
93        let body_len = total_body_len - key_len - extras_len - flexible_extras_len;
94
95        let mut packet = ResponsePacket::new(magic, opcode, datatype, status, opaque);
96        packet.cas = Some(cas);
97
98        if flexible_extras_len > 0 {
99            packet.framing_extras = Some(slice.split_to(flexible_extras_len).freeze());
100        }
101
102        if extras_len > 0 {
103            packet.extras = Some(slice.split_to(extras_len).freeze());
104        };
105
106        if key_len > 0 {
107            packet.key = Some(slice.split_to(key_len).freeze());
108        };
109
110        if body_len > 0 {
111            packet.value = Some(slice.freeze());
112        };
113
114        Ok(Some(packet))
115    }
116}
117
118impl Encoder<RequestPacket<'_>> for KeyValueCodec {
119    type Error = Error;
120
121    fn encode(&mut self, item: RequestPacket, dst: &mut BytesMut) -> Result<(), Self::Error> {
122        let key = item.key;
123        let extras = item.extras;
124        let framing_extras = item.framing_extras;
125        let body = item.value;
126
127        let key_size = if let Some(k) = &key { k.len() } else { 0 };
128        let extras_size = if let Some(e) = &extras { e.len() } else { 0 };
129        let framing_extras_size = if let Some(e) = framing_extras {
130            e.len()
131        } else {
132            0
133        };
134        let body_size = if let Some(b) = &body { b.len() } else { 0 };
135
136        let total_body_size = key_size + extras_size + framing_extras_size + body_size;
137
138        dst.reserve(HEADER_SIZE + total_body_size);
139
140        dst.put_u8(item.magic.into());
141        dst.put_u8(item.op_code.into());
142        if framing_extras.is_some() {
143            if key_size > u8::MAX as usize {
144                return Err(Error::new_invalid_argument_error(
145                    "key size too large",
146                    "key".to_string(),
147                ));
148            }
149
150            if framing_extras_size > u8::MAX as usize {
151                return Err(Error::new_invalid_argument_error(
152                    "frame extras too large",
153                    "frame extras".to_string(),
154                ));
155            }
156
157            dst.put_u8(framing_extras_size as u8);
158            dst.put_u8(key_size as u8);
159        } else {
160            if key_size > u16::MAX as usize {
161                return Err(Error::new_invalid_argument_error(
162                    "key size too large",
163                    "key".to_string(),
164                ));
165            }
166
167            dst.put_u16(key_size as u16);
168        }
169        dst.put_u8(extras_size as u8);
170        dst.put_u8(item.datatype);
171        dst.put_u16(item.vbucket_id.unwrap_or_default());
172        dst.put_u32(total_body_size as u32);
173        dst.put_u32(item.opaque.unwrap_or_default());
174        dst.put_u64(item.cas.unwrap_or_default());
175
176        if let Some(framing_extras) = framing_extras {
177            dst.extend_from_slice(framing_extras);
178        }
179
180        if let Some(extras) = extras {
181            dst.extend_from_slice(extras);
182        }
183
184        if let Some(key) = key {
185            dst.extend_from_slice(key);
186        }
187
188        if let Some(body) = body {
189            dst.extend_from_slice(body);
190        }
191
192        Ok(())
193    }
194}