1use crate::memdx::durability_level::{DurabilityLevel, DurabilityLevelSettings};
20use crate::memdx::error;
21use crate::memdx::error::Error;
22use crate::memdx::ext_frame_code::{ExtReqFrameCode, ExtResFrameCode};
23use bytes::BufMut;
24use std::time::Duration;
25
26pub(crate) fn decode_res_ext_frames(buf: &[u8]) -> error::Result<Option<Duration>> {
27 let mut server_duration_data = None;
28
29 iter_ext_frames(buf, |code, data| {
30 if code == ExtResFrameCode::ServerDuration {
31 server_duration_data = Some(decode_server_duration_ext_frame(data));
32 }
33 })?;
34
35 if let Some(data) = server_duration_data {
36 return Ok(Some(data?));
37 }
38
39 Ok(None)
40}
41
42pub fn decode_ext_frame(buf: &[u8]) -> error::Result<(ExtResFrameCode, &[u8], usize)> {
43 if buf.is_empty() {
44 return Err(Error::new_protocol_error(
45 "empty value buffer when decoding ext frame",
46 ));
47 }
48
49 let mut buf_pos = 0;
50
51 let frame_header = buf[buf_pos];
52 let mut u_frame_code = (frame_header & 0xF0) >> 4;
53 let mut frame_code = ExtResFrameCode::from(u_frame_code as u16);
54 let mut frame_len = frame_header & 0x0F;
55 buf_pos += 1;
56
57 if u_frame_code == 15 {
58 if buf.len() < buf_pos + 1 {
59 return Err(Error::new_protocol_error(
60 "unexpected eof decoding ext frame",
61 ));
62 }
63
64 let frame_code_ext = buf[buf_pos];
65 u_frame_code = 15 + frame_code_ext;
66 frame_code = ExtResFrameCode::from(u_frame_code as u16);
67 buf_pos += 1;
68 }
69
70 if frame_len == 15 {
71 if buf.len() < buf_pos + 1 {
72 return Err(Error::new_protocol_error(
73 "unexpected eof decoding ext frame",
74 ));
75 }
76
77 let frame_len_ext = buf[buf_pos];
78 frame_len = 15 + frame_len_ext;
79 buf_pos += 1;
80 }
81
82 let u_frame_len = frame_len as usize;
83 if buf.len() < buf_pos + u_frame_len {
84 return Err(Error::new_protocol_error(
85 "unexpected eof decoding ext frame",
86 ));
87 }
88
89 let frame_body = &buf[buf_pos..buf_pos + u_frame_len];
90 buf_pos += u_frame_len;
91
92 Ok((frame_code, frame_body, buf_pos))
93}
94
95fn iter_ext_frames(buf: &[u8], mut cb: impl FnMut(ExtResFrameCode, &[u8])) -> error::Result<&[u8]> {
96 if !buf.is_empty() {
97 let (frame_code, frame_body, buf_pos) = decode_ext_frame(buf)?;
98
99 cb(frame_code, frame_body);
100
101 return Ok(&buf[buf_pos..]);
102 }
103
104 Ok(buf)
105}
106
107pub fn append_ext_frame(
108 frame_code: ExtReqFrameCode,
109 frame_body: &[u8],
110 buf: &mut [u8],
111 offset: &mut usize,
112) -> error::Result<()> {
113 let frame_len = frame_body.len();
114
115 if *offset >= buf.len() {
116 return Err(Error::new_invalid_argument_error(
117 "buffer overflow",
118 "ext frame".to_string(),
119 ));
120 }
121
122 buf[*offset] = 0;
123 let hdr_byte_ptr = *offset;
124 *offset += 1;
125 let u_frame_code: u16 = frame_code.into();
126
127 if u_frame_code < 15 {
128 buf[hdr_byte_ptr] |= ((u_frame_code & 0x0f) << 4) as u8;
129 } else {
130 if u_frame_code - 15 >= 15 {
131 return Err(Error::new_invalid_argument_error(
132 "ext frame code too large to encode",
133 "ext frame".to_string(),
134 ));
135 }
136 buf[hdr_byte_ptr] |= 0xF0;
137
138 if *offset + 2 > buf.len() {
139 return Err(Error::new_invalid_argument_error(
140 "buffer overflow",
141 "ext frame".to_string(),
142 ));
143 }
144 buf[*offset..*offset + 2].copy_from_slice(&(u_frame_code.to_be_bytes()));
145 *offset += 2;
146 }
147
148 if frame_len < 15 {
149 buf[hdr_byte_ptr] |= (frame_len as u8) & 0xF;
150 } else {
151 if frame_len - 15 >= 15 {
152 return Err(Error::new_invalid_argument_error(
153 "ext frame len too large to encode",
154 "ext frame".to_string(),
155 ));
156 }
157 buf[hdr_byte_ptr] |= 0x0F;
158 if *offset + 2 > buf.len() {
159 return Err(Error::new_invalid_argument_error(
160 "buffer overflow",
161 "ext frame".to_string(),
162 ));
163 }
164 buf[*offset..*offset + 2].copy_from_slice(&((frame_len - 15) as u16).to_be_bytes());
165 *offset += 2;
166 }
167
168 if frame_len > 0 {
169 if *offset + frame_len > buf.len() {
170 return Err(Error::new_invalid_argument_error(
171 "buffer overflow",
172 "ext frame".to_string(),
173 ));
174 }
175 buf[*offset..*offset + frame_len].copy_from_slice(frame_body);
176 *offset += frame_len;
177 }
178
179 Ok(())
180}
181
182pub fn make_uleb128_32(collection_id: u32, buf: &mut [u8]) -> usize {
183 let mut cid = collection_id;
184 let mut count = 0;
185 loop {
186 let mut c: u8 = (cid & 0x7f) as u8;
187 cid >>= 7;
188 if cid != 0 {
189 c |= 0x80;
190 }
191
192 buf[count] = c;
193 count += 1;
194 if c & 0x80 == 0 {
195 break;
196 }
197 }
198
199 count
200}
201
202pub fn encode_durability_ext_frame(
203 level: DurabilityLevel,
204 timeout: Option<Duration>,
205) -> error::Result<Vec<u8>> {
206 if timeout.is_none() {
207 return Ok(vec![level.into()]);
208 }
209
210 let timeout = timeout.unwrap();
211
212 let mut timeout_millis = timeout.as_millis();
213 if timeout_millis > 65535 {
214 return Err(Error::new_invalid_argument_error(
215 "cannot encode durability timeout greater than 65535 milliseconds",
216 "durability_level_timeout".to_string(),
217 ));
218 }
219
220 if timeout_millis == 0 {
221 timeout_millis = 1;
222 }
223
224 let mut buf = vec![level.into()];
225 buf.put_u8((timeout_millis >> 8) as u8);
226 buf.put_u8(timeout_millis as u8);
227
228 Ok(buf)
229}
230
231pub(crate) fn decode_server_duration_ext_frame(mut data: &[u8]) -> error::Result<Duration> {
232 if data.len() != 2 {
233 return Err(Error::new_protocol_error(
234 "invalid server duration ext frame length",
235 ));
236 }
237
238 let dura_enc = ((data[0] as u32) << 8) | (data[1] as u32);
239 let dura_micros = ((dura_enc as f32).powf(1.74) / 2.0).round();
240
241 Ok(Duration::from_micros(dura_micros as u64))
242}
243
244pub(crate) fn decode_durability_level_ext_frame(
245 data: &mut Vec<u8>,
246) -> error::Result<DurabilityLevelSettings> {
247 if data.len() == 1 {
248 let durability = DurabilityLevel::from(data.remove(0));
249
250 return Ok(DurabilityLevelSettings::new(durability));
251 } else if data.len() == 3 {
252 let durability = DurabilityLevel::from(data.remove(0));
253 let timeout_millis = ((data.remove(0) as u32) << 8) | (data.remove(0) as u32);
254
255 return Ok(DurabilityLevelSettings::new_with_timeout(
256 durability,
257 Duration::from_millis(timeout_millis as u64),
258 ));
259 }
260
261 Err(Error::new_message_error(
262 "invalid durability ext frame length",
263 ))
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use crate::memdx::durability_level::DurabilityLevel;
270 use std::time::Duration;
271
272 fn test_one_durability(
273 l: DurabilityLevel,
274 d: impl Into<Option<Duration>>,
275 expected_bytes: &[u8],
276 ) {
277 let d = d.into();
278 let data = encode_durability_ext_frame(l, d).expect("encode failed");
279 assert_eq!(data, expected_bytes);
280
281 let mut data_clone = data.clone();
282 let settings = decode_durability_level_ext_frame(&mut data_clone).expect("decode failed");
283 assert_eq!(settings.durability_level, l);
284
285 let decoded_timeout = settings.timeout.unwrap_or(Duration::from_millis(0));
286 if let Some(d) = d {
287 let diff = (decoded_timeout.as_millis() as i64 - d.as_millis() as i64).abs();
288 assert!(
289 diff <= 1,
290 "Expected relative difference less than 1ms, got {}",
291 diff
292 );
293 } else {
294 assert_eq!(0, decoded_timeout.as_millis() as i64);
295 }
296 }
297
298 #[test]
299 fn test_durability_ext_frame_majority_no_duration() {
300 test_one_durability(DurabilityLevel::MAJORITY, None, &[0x01]);
301 }
302
303 #[test]
304 fn test_durability_ext_frame_majority_persist_active_no_duration() {
305 test_one_durability(DurabilityLevel::MAJORITY_AND_PERSIST_ACTIVE, None, &[0x02]);
306 }
307
308 #[test]
309 fn test_durability_ext_frame_majority_duration_0() {
310 test_one_durability(
311 DurabilityLevel::MAJORITY,
312 Duration::from_millis(0),
313 &[0x01, 0x00, 0x01],
314 );
315 }
316
317 #[test]
318 fn test_durability_ext_frame_majority_duration_1() {
319 test_one_durability(
320 DurabilityLevel::MAJORITY,
321 Duration::from_millis(1),
322 &[0x01, 0x00, 0x01],
323 );
324 }
325
326 #[test]
327 fn test_durability_ext_frame_majority_duration_12201() {
328 test_one_durability(
329 DurabilityLevel::MAJORITY,
330 Duration::from_millis(12201),
331 &[0x01, 0x2f, 0xa9],
332 );
333 }
334
335 #[test]
336 fn test_durability_ext_frame_majority_duration_max() {
337 test_one_durability(
338 DurabilityLevel::MAJORITY,
339 Duration::from_millis(65535),
340 &[0x01, 0xff, 0xff],
341 );
342 }
343
344 #[test]
345 fn test_append_preserve_expiry() {
346 let mut buf = [0; 128];
347 let mut offset = 0;
348 append_ext_frame(ExtReqFrameCode::PreserveTTL, &[], &mut buf, &mut offset).unwrap();
349
350 assert_eq!(&buf[..offset], &[80]);
351 }
352
353 #[test]
354 fn test_append_durability_level_no_timeout() {
355 let mut buf = [0; 128];
356 let mut offset = 0;
357 append_ext_frame(ExtReqFrameCode::Durability, &[0x01], &mut buf, &mut offset).unwrap();
358
359 assert_eq!(&buf[..offset], &[17, 1]);
360 }
361
362 #[test]
363 fn test_append_durability_level_timeout() {
364 let mut buf = [0u8; 128];
365 let mut offset = 0;
366 append_ext_frame(
367 ExtReqFrameCode::Durability,
368 &[0x01, 0x00, 0x01],
369 &mut buf,
370 &mut offset,
371 )
372 .unwrap();
373
374 assert_eq!(&buf[..offset], &[19, 1, 0, 1]);
375 }
376}