Skip to main content

couchbase_core/memdx/
extframe.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::memdx::durability_level::{DurabilityLevel, DurabilityLevelSettings};
20use crate::memdx::error;
21use crate::memdx::error::Error;
22use crate::memdx::ext_frame_code::{ExtReqFrameCode, ExtResFrameCode};
23use bytes::BufMut;
24use std::time::Duration;
25
26pub(crate) fn decode_res_ext_frames(buf: &[u8]) -> error::Result<Option<Duration>> {
27    let mut server_duration_data = None;
28
29    iter_ext_frames(buf, |code, data| {
30        if code == ExtResFrameCode::ServerDuration {
31            server_duration_data = Some(decode_server_duration_ext_frame(data));
32        }
33    })?;
34
35    if let Some(data) = server_duration_data {
36        return Ok(Some(data?));
37    }
38
39    Ok(None)
40}
41
42pub fn decode_ext_frame(buf: &[u8]) -> error::Result<(ExtResFrameCode, &[u8], usize)> {
43    if buf.is_empty() {
44        return Err(Error::new_protocol_error(
45            "empty value buffer when decoding ext frame",
46        ));
47    }
48
49    let mut buf_pos = 0;
50
51    let frame_header = buf[buf_pos];
52    let mut u_frame_code = (frame_header & 0xF0) >> 4;
53    let mut frame_code = ExtResFrameCode::from(u_frame_code as u16);
54    let mut frame_len = frame_header & 0x0F;
55    buf_pos += 1;
56
57    if u_frame_code == 15 {
58        if buf.len() < buf_pos + 1 {
59            return Err(Error::new_protocol_error(
60                "unexpected eof decoding ext frame",
61            ));
62        }
63
64        let frame_code_ext = buf[buf_pos];
65        u_frame_code = 15 + frame_code_ext;
66        frame_code = ExtResFrameCode::from(u_frame_code as u16);
67        buf_pos += 1;
68    }
69
70    if frame_len == 15 {
71        if buf.len() < buf_pos + 1 {
72            return Err(Error::new_protocol_error(
73                "unexpected eof decoding ext frame",
74            ));
75        }
76
77        let frame_len_ext = buf[buf_pos];
78        frame_len = 15 + frame_len_ext;
79        buf_pos += 1;
80    }
81
82    let u_frame_len = frame_len as usize;
83    if buf.len() < buf_pos + u_frame_len {
84        return Err(Error::new_protocol_error(
85            "unexpected eof decoding ext frame",
86        ));
87    }
88
89    let frame_body = &buf[buf_pos..buf_pos + u_frame_len];
90    buf_pos += u_frame_len;
91
92    Ok((frame_code, frame_body, buf_pos))
93}
94
95fn iter_ext_frames(buf: &[u8], mut cb: impl FnMut(ExtResFrameCode, &[u8])) -> error::Result<&[u8]> {
96    if !buf.is_empty() {
97        let (frame_code, frame_body, buf_pos) = decode_ext_frame(buf)?;
98
99        cb(frame_code, frame_body);
100
101        return Ok(&buf[buf_pos..]);
102    }
103
104    Ok(buf)
105}
106
107pub fn append_ext_frame(
108    frame_code: ExtReqFrameCode,
109    frame_body: &[u8],
110    buf: &mut [u8],
111    offset: &mut usize,
112) -> error::Result<()> {
113    let frame_len = frame_body.len();
114
115    if *offset >= buf.len() {
116        return Err(Error::new_invalid_argument_error(
117            "buffer overflow",
118            "ext frame".to_string(),
119        ));
120    }
121
122    buf[*offset] = 0;
123    let hdr_byte_ptr = *offset;
124    *offset += 1;
125    let u_frame_code: u16 = frame_code.into();
126
127    if u_frame_code < 15 {
128        buf[hdr_byte_ptr] |= ((u_frame_code & 0x0f) << 4) as u8;
129    } else {
130        if u_frame_code - 15 >= 15 {
131            return Err(Error::new_invalid_argument_error(
132                "ext frame code too large to encode",
133                "ext frame".to_string(),
134            ));
135        }
136        buf[hdr_byte_ptr] |= 0xF0;
137
138        if *offset + 2 > buf.len() {
139            return Err(Error::new_invalid_argument_error(
140                "buffer overflow",
141                "ext frame".to_string(),
142            ));
143        }
144        buf[*offset..*offset + 2].copy_from_slice(&(u_frame_code.to_be_bytes()));
145        *offset += 2;
146    }
147
148    if frame_len < 15 {
149        buf[hdr_byte_ptr] |= (frame_len as u8) & 0xF;
150    } else {
151        if frame_len - 15 >= 15 {
152            return Err(Error::new_invalid_argument_error(
153                "ext frame len too large to encode",
154                "ext frame".to_string(),
155            ));
156        }
157        buf[hdr_byte_ptr] |= 0x0F;
158        if *offset + 2 > buf.len() {
159            return Err(Error::new_invalid_argument_error(
160                "buffer overflow",
161                "ext frame".to_string(),
162            ));
163        }
164        buf[*offset..*offset + 2].copy_from_slice(&((frame_len - 15) as u16).to_be_bytes());
165        *offset += 2;
166    }
167
168    if frame_len > 0 {
169        if *offset + frame_len > buf.len() {
170            return Err(Error::new_invalid_argument_error(
171                "buffer overflow",
172                "ext frame".to_string(),
173            ));
174        }
175        buf[*offset..*offset + frame_len].copy_from_slice(frame_body);
176        *offset += frame_len;
177    }
178
179    Ok(())
180}
181
182pub fn make_uleb128_32(collection_id: u32, buf: &mut [u8]) -> usize {
183    let mut cid = collection_id;
184    let mut count = 0;
185    loop {
186        let mut c: u8 = (cid & 0x7f) as u8;
187        cid >>= 7;
188        if cid != 0 {
189            c |= 0x80;
190        }
191
192        buf[count] = c;
193        count += 1;
194        if c & 0x80 == 0 {
195            break;
196        }
197    }
198
199    count
200}
201
202pub fn encode_durability_ext_frame(
203    level: DurabilityLevel,
204    timeout: Option<Duration>,
205) -> error::Result<Vec<u8>> {
206    if timeout.is_none() {
207        return Ok(vec![level.into()]);
208    }
209
210    let timeout = timeout.unwrap();
211
212    let mut timeout_millis = timeout.as_millis();
213    if timeout_millis > 65535 {
214        return Err(Error::new_invalid_argument_error(
215            "cannot encode durability timeout greater than 65535 milliseconds",
216            "durability_level_timeout".to_string(),
217        ));
218    }
219
220    if timeout_millis == 0 {
221        timeout_millis = 1;
222    }
223
224    let mut buf = vec![level.into()];
225    buf.put_u8((timeout_millis >> 8) as u8);
226    buf.put_u8(timeout_millis as u8);
227
228    Ok(buf)
229}
230
231pub(crate) fn decode_server_duration_ext_frame(mut data: &[u8]) -> error::Result<Duration> {
232    if data.len() != 2 {
233        return Err(Error::new_protocol_error(
234            "invalid server duration ext frame length",
235        ));
236    }
237
238    let dura_enc = ((data[0] as u32) << 8) | (data[1] as u32);
239    let dura_micros = ((dura_enc as f32).powf(1.74) / 2.0).round();
240
241    Ok(Duration::from_micros(dura_micros as u64))
242}
243
244pub(crate) fn decode_durability_level_ext_frame(
245    data: &mut Vec<u8>,
246) -> error::Result<DurabilityLevelSettings> {
247    if data.len() == 1 {
248        let durability = DurabilityLevel::from(data.remove(0));
249
250        return Ok(DurabilityLevelSettings::new(durability));
251    } else if data.len() == 3 {
252        let durability = DurabilityLevel::from(data.remove(0));
253        let timeout_millis = ((data.remove(0) as u32) << 8) | (data.remove(0) as u32);
254
255        return Ok(DurabilityLevelSettings::new_with_timeout(
256            durability,
257            Duration::from_millis(timeout_millis as u64),
258        ));
259    }
260
261    Err(Error::new_message_error(
262        "invalid durability ext frame length",
263    ))
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use crate::memdx::durability_level::DurabilityLevel;
270    use std::time::Duration;
271
272    fn test_one_durability(
273        l: DurabilityLevel,
274        d: impl Into<Option<Duration>>,
275        expected_bytes: &[u8],
276    ) {
277        let d = d.into();
278        let data = encode_durability_ext_frame(l, d).expect("encode failed");
279        assert_eq!(data, expected_bytes);
280
281        let mut data_clone = data.clone();
282        let settings = decode_durability_level_ext_frame(&mut data_clone).expect("decode failed");
283        assert_eq!(settings.durability_level, l);
284
285        let decoded_timeout = settings.timeout.unwrap_or(Duration::from_millis(0));
286        if let Some(d) = d {
287            let diff = (decoded_timeout.as_millis() as i64 - d.as_millis() as i64).abs();
288            assert!(
289                diff <= 1,
290                "Expected relative difference less than 1ms, got {}",
291                diff
292            );
293        } else {
294            assert_eq!(0, decoded_timeout.as_millis() as i64);
295        }
296    }
297
298    #[test]
299    fn test_durability_ext_frame_majority_no_duration() {
300        test_one_durability(DurabilityLevel::MAJORITY, None, &[0x01]);
301    }
302
303    #[test]
304    fn test_durability_ext_frame_majority_persist_active_no_duration() {
305        test_one_durability(DurabilityLevel::MAJORITY_AND_PERSIST_ACTIVE, None, &[0x02]);
306    }
307
308    #[test]
309    fn test_durability_ext_frame_majority_duration_0() {
310        test_one_durability(
311            DurabilityLevel::MAJORITY,
312            Duration::from_millis(0),
313            &[0x01, 0x00, 0x01],
314        );
315    }
316
317    #[test]
318    fn test_durability_ext_frame_majority_duration_1() {
319        test_one_durability(
320            DurabilityLevel::MAJORITY,
321            Duration::from_millis(1),
322            &[0x01, 0x00, 0x01],
323        );
324    }
325
326    #[test]
327    fn test_durability_ext_frame_majority_duration_12201() {
328        test_one_durability(
329            DurabilityLevel::MAJORITY,
330            Duration::from_millis(12201),
331            &[0x01, 0x2f, 0xa9],
332        );
333    }
334
335    #[test]
336    fn test_durability_ext_frame_majority_duration_max() {
337        test_one_durability(
338            DurabilityLevel::MAJORITY,
339            Duration::from_millis(65535),
340            &[0x01, 0xff, 0xff],
341        );
342    }
343
344    #[test]
345    fn test_append_preserve_expiry() {
346        let mut buf = [0; 128];
347        let mut offset = 0;
348        append_ext_frame(ExtReqFrameCode::PreserveTTL, &[], &mut buf, &mut offset).unwrap();
349
350        assert_eq!(&buf[..offset], &[80]);
351    }
352
353    #[test]
354    fn test_append_durability_level_no_timeout() {
355        let mut buf = [0; 128];
356        let mut offset = 0;
357        append_ext_frame(ExtReqFrameCode::Durability, &[0x01], &mut buf, &mut offset).unwrap();
358
359        assert_eq!(&buf[..offset], &[17, 1]);
360    }
361
362    #[test]
363    fn test_append_durability_level_timeout() {
364        let mut buf = [0u8; 128];
365        let mut offset = 0;
366        append_ext_frame(
367            ExtReqFrameCode::Durability,
368            &[0x01, 0x00, 0x01],
369            &mut buf,
370            &mut offset,
371        )
372        .unwrap();
373
374        assert_eq!(&buf[..offset], &[19, 1, 0, 1]);
375    }
376}