1use bytes::Bytes;
43use std::fmt;
44
45use crate::error::{ClientError, Result};
46
47pub const TLV_HEADER_SIZE: usize = 5;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
52#[repr(u8)]
53pub enum RecordType {
54 Data = 0x01,
56 Tombstone = 0x02,
58 Transaction = 0x03,
60 Checkpoint = 0x04,
62 Schema = 0x05,
64 Compressed = 0x10,
66 UserDefined = 0x80,
68 Unknown = 0xFF,
70}
71
72impl From<u8> for RecordType {
73 fn from(byte: u8) -> Self {
74 match byte {
75 0x01 => RecordType::Data,
76 0x02 => RecordType::Tombstone,
77 0x03 => RecordType::Transaction,
78 0x04 => RecordType::Checkpoint,
79 0x05 => RecordType::Schema,
80 0x10 => RecordType::Compressed,
81 0x80..=0xFE => RecordType::UserDefined,
82 _ => RecordType::Unknown,
83 }
84 }
85}
86
87impl From<RecordType> for u8 {
88 fn from(rt: RecordType) -> u8 {
89 rt as u8
90 }
91}
92
93#[derive(Debug, Clone)]
95pub struct Record {
96 pub record_type: RecordType,
98 pub type_byte: u8,
100 pub payload: Bytes,
102 pub offset: usize,
104}
105
106impl Record {
107 pub fn new(record_type: RecordType, type_byte: u8, payload: Bytes, offset: usize) -> Self {
109 Self {
110 record_type,
111 type_byte,
112 payload,
113 offset,
114 }
115 }
116
117 pub fn is_data(&self) -> bool {
119 self.record_type == RecordType::Data
120 }
121
122 pub fn is_tombstone(&self) -> bool {
124 self.record_type == RecordType::Tombstone
125 }
126
127 pub fn total_size(&self) -> usize {
129 TLV_HEADER_SIZE + self.payload.len()
130 }
131
132 pub fn as_str(&self) -> Option<&str> {
134 std::str::from_utf8(&self.payload).ok()
135 }
136
137 pub fn as_bytes(&self) -> &[u8] {
139 &self.payload
140 }
141}
142
143#[derive(Debug, Clone)]
145pub enum RecordParseError {
146 InsufficientHeader { needed: usize, available: usize },
148 InsufficientPayload { needed: usize, available: usize },
150 InvalidType(u8),
152 PayloadTooLarge { length: u32, max: u32 },
154}
155
156impl fmt::Display for RecordParseError {
157 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158 match self {
159 RecordParseError::InsufficientHeader { needed, available } => {
160 write!(
161 f,
162 "Insufficient data for header: need {} bytes, have {}",
163 needed, available
164 )
165 },
166 RecordParseError::InsufficientPayload { needed, available } => {
167 write!(
168 f,
169 "Insufficient data for payload: need {} bytes, have {}",
170 needed, available
171 )
172 },
173 RecordParseError::InvalidType(t) => {
174 write!(f, "Invalid record type: 0x{:02X}", t)
175 },
176 RecordParseError::PayloadTooLarge { length, max } => {
177 write!(f, "Payload too large: {} bytes (max: {})", length, max)
178 },
179 }
180 }
181}
182
183impl std::error::Error for RecordParseError {}
184
185impl From<RecordParseError> for ClientError {
186 fn from(e: RecordParseError) -> Self {
187 ClientError::ProtocolError(e.to_string())
188 }
189}
190
191#[derive(Debug, Clone)]
193pub struct RecordParserConfig {
194 pub max_payload_size: u32,
196 pub skip_unknown: bool,
198}
199
200impl Default for RecordParserConfig {
201 fn default() -> Self {
202 Self {
203 max_payload_size: 16 * 1024 * 1024, skip_unknown: true,
205 }
206 }
207}
208
209pub struct RecordIterator {
211 data: Bytes,
212 offset: usize,
213 config: RecordParserConfig,
214}
215
216impl RecordIterator {
217 pub fn new(data: Bytes) -> Self {
219 Self {
220 data,
221 offset: 0,
222 config: RecordParserConfig::default(),
223 }
224 }
225
226 pub fn with_config(data: Bytes, config: RecordParserConfig) -> Self {
228 Self {
229 data,
230 offset: 0,
231 config,
232 }
233 }
234
235 pub fn offset(&self) -> usize {
237 self.offset
238 }
239
240 pub fn remaining(&self) -> usize {
242 self.data.len().saturating_sub(self.offset)
243 }
244
245 fn parse_record(&mut self) -> std::result::Result<Option<Record>, RecordParseError> {
247 let remaining = self.remaining();
248
249 if remaining == 0 {
251 return Ok(None);
252 }
253
254 if remaining < TLV_HEADER_SIZE {
255 return Err(RecordParseError::InsufficientHeader {
256 needed: TLV_HEADER_SIZE,
257 available: remaining,
258 });
259 }
260
261 let start_offset = self.offset;
262
263 let type_byte = self.data[self.offset];
265 let record_type = RecordType::from(type_byte);
266
267 let length = u32::from_le_bytes([
269 self.data[self.offset + 1],
270 self.data[self.offset + 2],
271 self.data[self.offset + 3],
272 self.data[self.offset + 4],
273 ]);
274
275 if length > self.config.max_payload_size {
277 return Err(RecordParseError::PayloadTooLarge {
278 length,
279 max: self.config.max_payload_size,
280 });
281 }
282
283 let payload_len = length as usize;
284
285 if remaining < TLV_HEADER_SIZE + payload_len {
287 return Err(RecordParseError::InsufficientPayload {
288 needed: payload_len,
289 available: remaining - TLV_HEADER_SIZE,
290 });
291 }
292
293 let payload_start = self.offset + TLV_HEADER_SIZE;
295 let payload_end = payload_start + payload_len;
296 let payload = self.data.slice(payload_start..payload_end);
297
298 self.offset = payload_end;
300
301 Ok(Some(Record::new(
302 record_type,
303 type_byte,
304 payload,
305 start_offset,
306 )))
307 }
308}
309
310impl Iterator for RecordIterator {
311 type Item = std::result::Result<Record, RecordParseError>;
312
313 fn next(&mut self) -> Option<Self::Item> {
314 match self.parse_record() {
315 Ok(Some(record)) => Some(Ok(record)),
316 Ok(None) => None,
317 Err(e) => Some(Err(e)),
318 }
319 }
320}
321
322pub fn parse_records(data: Bytes) -> Result<Vec<Record>> {
324 let mut records = Vec::new();
325 for result in RecordIterator::new(data) {
326 records.push(result?);
327 }
328 Ok(records)
329}
330
331pub fn parse_record(data: &[u8]) -> std::result::Result<(Record, usize), RecordParseError> {
333 if data.len() < TLV_HEADER_SIZE {
334 return Err(RecordParseError::InsufficientHeader {
335 needed: TLV_HEADER_SIZE,
336 available: data.len(),
337 });
338 }
339
340 let type_byte = data[0];
341 let record_type = RecordType::from(type_byte);
342
343 let length = u32::from_le_bytes([data[1], data[2], data[3], data[4]]);
344 let payload_len = length as usize;
345
346 if data.len() < TLV_HEADER_SIZE + payload_len {
347 return Err(RecordParseError::InsufficientPayload {
348 needed: payload_len,
349 available: data.len() - TLV_HEADER_SIZE,
350 });
351 }
352
353 let payload = Bytes::copy_from_slice(&data[TLV_HEADER_SIZE..TLV_HEADER_SIZE + payload_len]);
354 let total_size = TLV_HEADER_SIZE + payload_len;
355
356 Ok((Record::new(record_type, type_byte, payload, 0), total_size))
357}
358
359pub fn encode_record(record_type: RecordType, payload: &[u8]) -> Bytes {
361 let mut buf = Vec::with_capacity(TLV_HEADER_SIZE + payload.len());
362
363 buf.push(record_type as u8);
365
366 let length = payload.len() as u32;
368 buf.extend_from_slice(&length.to_le_bytes());
369
370 buf.extend_from_slice(payload);
372
373 Bytes::from(buf)
374}
375
376pub fn encode_record_with_type(type_byte: u8, payload: &[u8]) -> Bytes {
378 let mut buf = Vec::with_capacity(TLV_HEADER_SIZE + payload.len());
379
380 buf.push(type_byte);
381 let length = payload.len() as u32;
382 buf.extend_from_slice(&length.to_le_bytes());
383 buf.extend_from_slice(payload);
384
385 Bytes::from(buf)
386}
387
388#[cfg(test)]
389#[allow(clippy::unwrap_used)]
390mod tests {
391 use super::*;
392
393 #[test]
394 fn test_record_type_from_byte() {
395 assert_eq!(RecordType::from(0x01), RecordType::Data);
396 assert_eq!(RecordType::from(0x02), RecordType::Tombstone);
397 assert_eq!(RecordType::from(0x80), RecordType::UserDefined);
398 assert_eq!(RecordType::from(0x00), RecordType::Unknown);
399 }
400
401 #[test]
402 fn test_encode_decode_record() {
403 let payload = b"hello world";
404 let encoded = encode_record(RecordType::Data, payload);
405
406 assert_eq!(encoded.len(), TLV_HEADER_SIZE + payload.len());
407 assert_eq!(encoded[0], 0x01); let (record, size) = parse_record(&encoded).unwrap();
410 assert_eq!(size, encoded.len());
411 assert_eq!(record.record_type, RecordType::Data);
412 assert_eq!(record.payload.as_ref(), payload);
413 }
414
415 #[test]
416 fn test_record_iterator() {
417 let mut data = Vec::new();
419 data.extend_from_slice(&encode_record(RecordType::Data, b"record1"));
420 data.extend_from_slice(&encode_record(RecordType::Data, b"record2"));
421 data.extend_from_slice(&encode_record(RecordType::Tombstone, b""));
422
423 let records: Vec<_> = RecordIterator::new(Bytes::from(data))
424 .collect::<std::result::Result<Vec<_>, _>>()
425 .unwrap();
426
427 assert_eq!(records.len(), 3);
428 assert_eq!(records[0].as_str(), Some("record1"));
429 assert_eq!(records[1].as_str(), Some("record2"));
430 assert!(records[2].is_tombstone());
431 }
432
433 #[test]
434 fn test_insufficient_header() {
435 let data = Bytes::from(vec![0x01, 0x00]); let mut iter = RecordIterator::new(data);
437
438 let result = iter.next();
439 assert!(matches!(
440 result,
441 Some(Err(RecordParseError::InsufficientHeader { .. }))
442 ));
443 }
444
445 #[test]
446 fn test_insufficient_payload() {
447 let mut data = vec![0x01, 0x64, 0x00, 0x00, 0x00]; data.extend_from_slice(b"short"); let mut iter = RecordIterator::new(Bytes::from(data));
453 let result = iter.next();
454 assert!(matches!(
455 result,
456 Some(Err(RecordParseError::InsufficientPayload { .. }))
457 ));
458 }
459
460 #[test]
461 fn test_empty_record() {
462 let encoded = encode_record(RecordType::Tombstone, b"");
463 let (record, _) = parse_record(&encoded).unwrap();
464
465 assert!(record.is_tombstone());
466 assert!(record.payload.is_empty());
467 }
468
469 #[test]
470 fn test_record_offset_tracking() {
471 let mut data = Vec::new();
472 let rec1 = encode_record(RecordType::Data, b"first");
473 let rec2 = encode_record(RecordType::Data, b"second");
474 data.extend_from_slice(&rec1);
475 data.extend_from_slice(&rec2);
476
477 let records: Vec<_> = RecordIterator::new(Bytes::from(data))
478 .collect::<std::result::Result<Vec<_>, _>>()
479 .unwrap();
480
481 assert_eq!(records[0].offset, 0);
482 assert_eq!(records[1].offset, rec1.len());
483 }
484}