1use bytes::Bytes;
43use std::fmt;
44
45use crate::error::{ClientError, Result};
46
47pub const TLV_HEADER_SIZE: usize = 5;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
52#[repr(u8)]
53pub enum RecordType {
54 Data = 0x01,
56 Tombstone = 0x02,
58 Transaction = 0x03,
60 Checkpoint = 0x04,
62 Schema = 0x05,
64 Compressed = 0x10,
66 UserDefined = 0x80,
68 Unknown = 0xFF,
70}
71
72impl From<u8> for RecordType {
73 fn from(byte: u8) -> Self {
74 match byte {
75 0x01 => RecordType::Data,
76 0x02 => RecordType::Tombstone,
77 0x03 => RecordType::Transaction,
78 0x04 => RecordType::Checkpoint,
79 0x05 => RecordType::Schema,
80 0x10 => RecordType::Compressed,
81 0x80..=0xFE => RecordType::UserDefined,
82 _ => RecordType::Unknown,
83 }
84 }
85}
86
87impl From<RecordType> for u8 {
88 fn from(rt: RecordType) -> u8 {
89 rt as u8
90 }
91}
92
93#[derive(Debug, Clone)]
95pub struct Record {
96 pub record_type: RecordType,
98 pub type_byte: u8,
100 pub payload: Bytes,
102 pub offset: usize,
104}
105
106impl Record {
107 pub fn new(record_type: RecordType, type_byte: u8, payload: Bytes, offset: usize) -> Self {
109 Self {
110 record_type,
111 type_byte,
112 payload,
113 offset,
114 }
115 }
116
117 pub fn is_data(&self) -> bool {
119 self.record_type == RecordType::Data
120 }
121
122 pub fn is_tombstone(&self) -> bool {
124 self.record_type == RecordType::Tombstone
125 }
126
127 pub fn total_size(&self) -> usize {
129 TLV_HEADER_SIZE + self.payload.len()
130 }
131
132 pub fn as_str(&self) -> Option<&str> {
134 std::str::from_utf8(&self.payload).ok()
135 }
136
137 pub fn as_bytes(&self) -> &[u8] {
139 &self.payload
140 }
141}
142
143#[derive(Debug, Clone)]
145pub enum RecordParseError {
146 InsufficientHeader {
148 needed: usize,
150 available: usize,
152 },
153 InsufficientPayload {
155 needed: usize,
157 available: usize,
159 },
160 InvalidType(u8),
162 PayloadTooLarge {
164 length: u32,
166 max: u32,
168 },
169}
170
171impl fmt::Display for RecordParseError {
172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173 match self {
174 RecordParseError::InsufficientHeader { needed, available } => {
175 write!(
176 f,
177 "Insufficient data for header: need {} bytes, have {}",
178 needed, available
179 )
180 },
181 RecordParseError::InsufficientPayload { needed, available } => {
182 write!(
183 f,
184 "Insufficient data for payload: need {} bytes, have {}",
185 needed, available
186 )
187 },
188 RecordParseError::InvalidType(t) => {
189 write!(f, "Invalid record type: 0x{:02X}", t)
190 },
191 RecordParseError::PayloadTooLarge { length, max } => {
192 write!(f, "Payload too large: {} bytes (max: {})", length, max)
193 },
194 }
195 }
196}
197
198impl std::error::Error for RecordParseError {}
199
200impl From<RecordParseError> for ClientError {
201 fn from(e: RecordParseError) -> Self {
202 ClientError::ProtocolError(e.to_string())
203 }
204}
205
206#[derive(Debug, Clone)]
208pub struct RecordParserConfig {
209 pub max_payload_size: u32,
211 pub skip_unknown: bool,
213}
214
215impl Default for RecordParserConfig {
216 fn default() -> Self {
217 Self {
218 max_payload_size: 16 * 1024 * 1024, skip_unknown: true,
220 }
221 }
222}
223
224pub struct RecordIterator {
226 data: Bytes,
227 offset: usize,
228 config: RecordParserConfig,
229}
230
231impl RecordIterator {
232 pub fn new(data: Bytes) -> Self {
234 Self {
235 data,
236 offset: 0,
237 config: RecordParserConfig::default(),
238 }
239 }
240
241 pub fn with_config(data: Bytes, config: RecordParserConfig) -> Self {
243 Self {
244 data,
245 offset: 0,
246 config,
247 }
248 }
249
250 pub fn offset(&self) -> usize {
252 self.offset
253 }
254
255 pub fn remaining(&self) -> usize {
257 self.data.len().saturating_sub(self.offset)
258 }
259
260 fn parse_record(&mut self) -> std::result::Result<Option<Record>, RecordParseError> {
262 let remaining = self.remaining();
263
264 if remaining == 0 {
266 return Ok(None);
267 }
268
269 if remaining < TLV_HEADER_SIZE {
270 return Err(RecordParseError::InsufficientHeader {
271 needed: TLV_HEADER_SIZE,
272 available: remaining,
273 });
274 }
275
276 let start_offset = self.offset;
277
278 let type_byte = self.data[self.offset];
280 let record_type = RecordType::from(type_byte);
281
282 let length = u32::from_le_bytes([
284 self.data[self.offset + 1],
285 self.data[self.offset + 2],
286 self.data[self.offset + 3],
287 self.data[self.offset + 4],
288 ]);
289
290 if length > self.config.max_payload_size {
292 return Err(RecordParseError::PayloadTooLarge {
293 length,
294 max: self.config.max_payload_size,
295 });
296 }
297
298 let payload_len = length as usize;
299
300 if remaining < TLV_HEADER_SIZE + payload_len {
302 return Err(RecordParseError::InsufficientPayload {
303 needed: payload_len,
304 available: remaining - TLV_HEADER_SIZE,
305 });
306 }
307
308 let payload_start = self.offset + TLV_HEADER_SIZE;
310 let payload_end = payload_start + payload_len;
311 let payload = self.data.slice(payload_start..payload_end);
312
313 self.offset = payload_end;
315
316 Ok(Some(Record::new(
317 record_type,
318 type_byte,
319 payload,
320 start_offset,
321 )))
322 }
323}
324
325impl Iterator for RecordIterator {
326 type Item = std::result::Result<Record, RecordParseError>;
327
328 fn next(&mut self) -> Option<Self::Item> {
329 match self.parse_record() {
330 Ok(Some(record)) => Some(Ok(record)),
331 Ok(None) => None,
332 Err(e) => Some(Err(e)),
333 }
334 }
335}
336
337pub fn parse_records(data: Bytes) -> Result<Vec<Record>> {
339 let mut records = Vec::new();
340 for result in RecordIterator::new(data) {
341 records.push(result?);
342 }
343 Ok(records)
344}
345
346pub fn parse_record(data: &[u8]) -> std::result::Result<(Record, usize), RecordParseError> {
348 if data.len() < TLV_HEADER_SIZE {
349 return Err(RecordParseError::InsufficientHeader {
350 needed: TLV_HEADER_SIZE,
351 available: data.len(),
352 });
353 }
354
355 let type_byte = data[0];
356 let record_type = RecordType::from(type_byte);
357
358 let length = u32::from_le_bytes([data[1], data[2], data[3], data[4]]);
359 let payload_len = length as usize;
360
361 if data.len() < TLV_HEADER_SIZE + payload_len {
362 return Err(RecordParseError::InsufficientPayload {
363 needed: payload_len,
364 available: data.len() - TLV_HEADER_SIZE,
365 });
366 }
367
368 let payload = Bytes::copy_from_slice(&data[TLV_HEADER_SIZE..TLV_HEADER_SIZE + payload_len]);
369 let total_size = TLV_HEADER_SIZE + payload_len;
370
371 Ok((Record::new(record_type, type_byte, payload, 0), total_size))
372}
373
374pub fn encode_record(record_type: RecordType, payload: &[u8]) -> Bytes {
376 let mut buf = Vec::with_capacity(TLV_HEADER_SIZE + payload.len());
377
378 buf.push(record_type as u8);
380
381 let length = payload.len() as u32;
383 buf.extend_from_slice(&length.to_le_bytes());
384
385 buf.extend_from_slice(payload);
387
388 Bytes::from(buf)
389}
390
391pub fn encode_record_with_type(type_byte: u8, payload: &[u8]) -> Bytes {
393 let mut buf = Vec::with_capacity(TLV_HEADER_SIZE + payload.len());
394
395 buf.push(type_byte);
396 let length = payload.len() as u32;
397 buf.extend_from_slice(&length.to_le_bytes());
398 buf.extend_from_slice(payload);
399
400 Bytes::from(buf)
401}
402
403#[cfg(test)]
404#[allow(clippy::unwrap_used)]
405mod tests {
406 use super::*;
407
408 #[test]
409 fn test_record_type_from_byte() {
410 assert_eq!(RecordType::from(0x01), RecordType::Data);
411 assert_eq!(RecordType::from(0x02), RecordType::Tombstone);
412 assert_eq!(RecordType::from(0x80), RecordType::UserDefined);
413 assert_eq!(RecordType::from(0x00), RecordType::Unknown);
414 }
415
416 #[test]
417 fn test_encode_decode_record() {
418 let payload = b"hello world";
419 let encoded = encode_record(RecordType::Data, payload);
420
421 assert_eq!(encoded.len(), TLV_HEADER_SIZE + payload.len());
422 assert_eq!(encoded[0], 0x01); let (record, size) = parse_record(&encoded).unwrap();
425 assert_eq!(size, encoded.len());
426 assert_eq!(record.record_type, RecordType::Data);
427 assert_eq!(record.payload.as_ref(), payload);
428 }
429
430 #[test]
431 fn test_record_iterator() {
432 let mut data = Vec::new();
434 data.extend_from_slice(&encode_record(RecordType::Data, b"record1"));
435 data.extend_from_slice(&encode_record(RecordType::Data, b"record2"));
436 data.extend_from_slice(&encode_record(RecordType::Tombstone, b""));
437
438 let records: Vec<_> = RecordIterator::new(Bytes::from(data))
439 .collect::<std::result::Result<Vec<_>, _>>()
440 .unwrap();
441
442 assert_eq!(records.len(), 3);
443 assert_eq!(records[0].as_str(), Some("record1"));
444 assert_eq!(records[1].as_str(), Some("record2"));
445 assert!(records[2].is_tombstone());
446 }
447
448 #[test]
449 fn test_insufficient_header() {
450 let data = Bytes::from(vec![0x01, 0x00]); let mut iter = RecordIterator::new(data);
452
453 let result = iter.next();
454 assert!(matches!(
455 result,
456 Some(Err(RecordParseError::InsufficientHeader { .. }))
457 ));
458 }
459
460 #[test]
461 fn test_insufficient_payload() {
462 let mut data = vec![0x01, 0x64, 0x00, 0x00, 0x00]; data.extend_from_slice(b"short"); let mut iter = RecordIterator::new(Bytes::from(data));
468 let result = iter.next();
469 assert!(matches!(
470 result,
471 Some(Err(RecordParseError::InsufficientPayload { .. }))
472 ));
473 }
474
475 #[test]
476 fn test_empty_record() {
477 let encoded = encode_record(RecordType::Tombstone, b"");
478 let (record, _) = parse_record(&encoded).unwrap();
479
480 assert!(record.is_tombstone());
481 assert!(record.payload.is_empty());
482 }
483
484 #[test]
485 fn test_record_offset_tracking() {
486 let mut data = Vec::new();
487 let rec1 = encode_record(RecordType::Data, b"first");
488 let rec2 = encode_record(RecordType::Data, b"second");
489 data.extend_from_slice(&rec1);
490 data.extend_from_slice(&rec2);
491
492 let records: Vec<_> = RecordIterator::new(Bytes::from(data))
493 .collect::<std::result::Result<Vec<_>, _>>()
494 .unwrap();
495
496 assert_eq!(records[0].offset, 0);
497 assert_eq!(records[1].offset, rec1.len());
498 }
499}