Skip to main content

lnc_client/
record.rs

1//! TLV Record Parsing
2//!
3//! Provides parsing utilities for TLV (Type-Length-Value) encoded records
4//! from LANCE stream data.
5//!
6//! # Record Format
7//!
8//! Each record in a LANCE stream follows the TLV format:
9//!
10//! ```text
11//! +--------+--------+------------------+
12//! | Type   | Length | Value (payload)  |
13//! | 1 byte | 4 bytes| variable         |
14//! +--------+--------+------------------+
15//! ```
16//!
17//! - **Type**: Record type identifier (1 byte)
18//! - **Length**: Payload length in bytes (4 bytes, big-endian)
19//! - **Value**: Record payload (variable length)
20//!
21//! # Example
22//!
23//! ```rust,no_run
24//! use lnc_client::{RecordIterator, Record, RecordType};
25//! use bytes::Bytes;
26//!
27//! fn process_records(data: Bytes) {
28//!     for result in RecordIterator::new(data) {
29//!         match result {
30//!             Ok(record) => {
31//!                 println!("Type: {:?}, Length: {}", record.record_type, record.payload.len());
32//!             }
33//!             Err(e) => {
34//!                 eprintln!("Parse error: {}", e);
35//!                 break;
36//!             }
37//!         }
38//!     }
39//! }
40//! ```
41
42use bytes::Bytes;
43use std::fmt;
44
45use crate::error::{ClientError, Result};
46
47/// TLV header size in bytes (1 byte type + 4 bytes length)
48pub const TLV_HEADER_SIZE: usize = 5;
49
50/// Record type identifiers
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
52#[repr(u8)]
53pub enum RecordType {
54    /// Standard data record
55    Data = 0x01,
56    /// Tombstone/deletion marker
57    Tombstone = 0x02,
58    /// Transaction boundary marker
59    Transaction = 0x03,
60    /// Checkpoint/snapshot marker
61    Checkpoint = 0x04,
62    /// Schema change record
63    Schema = 0x05,
64    /// Compression marker (following records are compressed)
65    Compressed = 0x10,
66    /// User-defined record type (application-specific)
67    UserDefined = 0x80,
68    /// Unknown/reserved type
69    Unknown = 0xFF,
70}
71
72impl From<u8> for RecordType {
73    fn from(byte: u8) -> Self {
74        match byte {
75            0x01 => RecordType::Data,
76            0x02 => RecordType::Tombstone,
77            0x03 => RecordType::Transaction,
78            0x04 => RecordType::Checkpoint,
79            0x05 => RecordType::Schema,
80            0x10 => RecordType::Compressed,
81            0x80..=0xFE => RecordType::UserDefined,
82            _ => RecordType::Unknown,
83        }
84    }
85}
86
87impl From<RecordType> for u8 {
88    fn from(rt: RecordType) -> u8 {
89        rt as u8
90    }
91}
92
93/// A parsed TLV record
94#[derive(Debug, Clone)]
95pub struct Record {
96    /// Record type
97    pub record_type: RecordType,
98    /// Raw type byte (for user-defined types)
99    pub type_byte: u8,
100    /// Record payload (zero-copy reference to original data)
101    pub payload: Bytes,
102    /// Offset in the original data where this record started
103    pub offset: usize,
104}
105
106impl Record {
107    /// Create a new record
108    pub fn new(record_type: RecordType, type_byte: u8, payload: Bytes, offset: usize) -> Self {
109        Self {
110            record_type,
111            type_byte,
112            payload,
113            offset,
114        }
115    }
116
117    /// Check if this is a data record
118    pub fn is_data(&self) -> bool {
119        self.record_type == RecordType::Data
120    }
121
122    /// Check if this is a tombstone record
123    pub fn is_tombstone(&self) -> bool {
124        self.record_type == RecordType::Tombstone
125    }
126
127    /// Get the total size of this record (header + payload)
128    pub fn total_size(&self) -> usize {
129        TLV_HEADER_SIZE + self.payload.len()
130    }
131
132    /// Get payload as UTF-8 string (if valid)
133    pub fn as_str(&self) -> Option<&str> {
134        std::str::from_utf8(&self.payload).ok()
135    }
136
137    /// Get payload as bytes slice
138    pub fn as_bytes(&self) -> &[u8] {
139        &self.payload
140    }
141}
142
143/// Error type for record parsing
144#[derive(Debug, Clone)]
145pub enum RecordParseError {
146    /// Not enough data for TLV header
147    InsufficientHeader {
148        /// Number of bytes needed
149        needed: usize,
150        /// Number of bytes available
151        available: usize,
152    },
153    /// Not enough data for payload
154    InsufficientPayload {
155        /// Number of bytes needed for payload
156        needed: usize,
157        /// Number of bytes available
158        available: usize,
159    },
160    /// Invalid record type
161    InvalidType(u8),
162    /// Payload length exceeds maximum
163    PayloadTooLarge {
164        /// Actual payload length
165        length: u32,
166        /// Maximum allowed length
167        max: u32,
168    },
169}
170
171impl fmt::Display for RecordParseError {
172    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173        match self {
174            RecordParseError::InsufficientHeader { needed, available } => {
175                write!(
176                    f,
177                    "Insufficient data for header: need {} bytes, have {}",
178                    needed, available
179                )
180            },
181            RecordParseError::InsufficientPayload { needed, available } => {
182                write!(
183                    f,
184                    "Insufficient data for payload: need {} bytes, have {}",
185                    needed, available
186                )
187            },
188            RecordParseError::InvalidType(t) => {
189                write!(f, "Invalid record type: 0x{:02X}", t)
190            },
191            RecordParseError::PayloadTooLarge { length, max } => {
192                write!(f, "Payload too large: {} bytes (max: {})", length, max)
193            },
194        }
195    }
196}
197
198impl std::error::Error for RecordParseError {}
199
200impl From<RecordParseError> for ClientError {
201    fn from(e: RecordParseError) -> Self {
202        ClientError::ProtocolError(e.to_string())
203    }
204}
205
206/// Configuration for record parsing
207#[derive(Debug, Clone)]
208pub struct RecordParserConfig {
209    /// Maximum allowed payload size (default: 16MB)
210    pub max_payload_size: u32,
211    /// Skip unknown record types instead of erroring
212    pub skip_unknown: bool,
213}
214
215impl Default for RecordParserConfig {
216    fn default() -> Self {
217        Self {
218            max_payload_size: 16 * 1024 * 1024, // 16MB
219            skip_unknown: true,
220        }
221    }
222}
223
224/// Iterator over TLV records in a byte buffer
225pub struct RecordIterator {
226    data: Bytes,
227    offset: usize,
228    config: RecordParserConfig,
229}
230
231impl RecordIterator {
232    /// Create a new record iterator with default config
233    pub fn new(data: Bytes) -> Self {
234        Self {
235            data,
236            offset: 0,
237            config: RecordParserConfig::default(),
238        }
239    }
240
241    /// Create a new record iterator with custom config
242    pub fn with_config(data: Bytes, config: RecordParserConfig) -> Self {
243        Self {
244            data,
245            offset: 0,
246            config,
247        }
248    }
249
250    /// Get current offset in the data
251    pub fn offset(&self) -> usize {
252        self.offset
253    }
254
255    /// Get remaining bytes
256    pub fn remaining(&self) -> usize {
257        self.data.len().saturating_sub(self.offset)
258    }
259
260    /// Parse a single record at the current position
261    fn parse_record(&mut self) -> std::result::Result<Option<Record>, RecordParseError> {
262        let remaining = self.remaining();
263
264        // Check if we have enough for header
265        if remaining == 0 {
266            return Ok(None);
267        }
268
269        if remaining < TLV_HEADER_SIZE {
270            return Err(RecordParseError::InsufficientHeader {
271                needed: TLV_HEADER_SIZE,
272                available: remaining,
273            });
274        }
275
276        let start_offset = self.offset;
277
278        // Parse type byte
279        let type_byte = self.data[self.offset];
280        let record_type = RecordType::from(type_byte);
281
282        // Parse length (4 bytes little-endian per Architecture §2.2)
283        let length = u32::from_le_bytes([
284            self.data[self.offset + 1],
285            self.data[self.offset + 2],
286            self.data[self.offset + 3],
287            self.data[self.offset + 4],
288        ]);
289
290        // Validate length
291        if length > self.config.max_payload_size {
292            return Err(RecordParseError::PayloadTooLarge {
293                length,
294                max: self.config.max_payload_size,
295            });
296        }
297
298        let payload_len = length as usize;
299
300        // Check if we have enough for payload
301        if remaining < TLV_HEADER_SIZE + payload_len {
302            return Err(RecordParseError::InsufficientPayload {
303                needed: payload_len,
304                available: remaining - TLV_HEADER_SIZE,
305            });
306        }
307
308        // Extract payload (zero-copy slice)
309        let payload_start = self.offset + TLV_HEADER_SIZE;
310        let payload_end = payload_start + payload_len;
311        let payload = self.data.slice(payload_start..payload_end);
312
313        // Advance offset
314        self.offset = payload_end;
315
316        Ok(Some(Record::new(
317            record_type,
318            type_byte,
319            payload,
320            start_offset,
321        )))
322    }
323}
324
325impl Iterator for RecordIterator {
326    type Item = std::result::Result<Record, RecordParseError>;
327
328    fn next(&mut self) -> Option<Self::Item> {
329        match self.parse_record() {
330            Ok(Some(record)) => Some(Ok(record)),
331            Ok(None) => None,
332            Err(e) => Some(Err(e)),
333        }
334    }
335}
336
337/// Parse all records from a byte buffer
338pub fn parse_records(data: Bytes) -> Result<Vec<Record>> {
339    let mut records = Vec::new();
340    for result in RecordIterator::new(data) {
341        records.push(result?);
342    }
343    Ok(records)
344}
345
346/// Parse a single record from a byte buffer
347pub fn parse_record(data: &[u8]) -> std::result::Result<(Record, usize), RecordParseError> {
348    if data.len() < TLV_HEADER_SIZE {
349        return Err(RecordParseError::InsufficientHeader {
350            needed: TLV_HEADER_SIZE,
351            available: data.len(),
352        });
353    }
354
355    let type_byte = data[0];
356    let record_type = RecordType::from(type_byte);
357
358    let length = u32::from_le_bytes([data[1], data[2], data[3], data[4]]);
359    let payload_len = length as usize;
360
361    if data.len() < TLV_HEADER_SIZE + payload_len {
362        return Err(RecordParseError::InsufficientPayload {
363            needed: payload_len,
364            available: data.len() - TLV_HEADER_SIZE,
365        });
366    }
367
368    let payload = Bytes::copy_from_slice(&data[TLV_HEADER_SIZE..TLV_HEADER_SIZE + payload_len]);
369    let total_size = TLV_HEADER_SIZE + payload_len;
370
371    Ok((Record::new(record_type, type_byte, payload, 0), total_size))
372}
373
374/// Encode a record to TLV format
375pub fn encode_record(record_type: RecordType, payload: &[u8]) -> Bytes {
376    let mut buf = Vec::with_capacity(TLV_HEADER_SIZE + payload.len());
377
378    // Type byte
379    buf.push(record_type as u8);
380
381    // Length (4 bytes little-endian per Architecture §2.2)
382    let length = payload.len() as u32;
383    buf.extend_from_slice(&length.to_le_bytes());
384
385    // Payload
386    buf.extend_from_slice(payload);
387
388    Bytes::from(buf)
389}
390
391/// Encode a record with custom type byte
392pub fn encode_record_with_type(type_byte: u8, payload: &[u8]) -> Bytes {
393    let mut buf = Vec::with_capacity(TLV_HEADER_SIZE + payload.len());
394
395    buf.push(type_byte);
396    let length = payload.len() as u32;
397    buf.extend_from_slice(&length.to_le_bytes());
398    buf.extend_from_slice(payload);
399
400    Bytes::from(buf)
401}
402
403#[cfg(test)]
404#[allow(clippy::unwrap_used)]
405mod tests {
406    use super::*;
407
408    #[test]
409    fn test_record_type_from_byte() {
410        assert_eq!(RecordType::from(0x01), RecordType::Data);
411        assert_eq!(RecordType::from(0x02), RecordType::Tombstone);
412        assert_eq!(RecordType::from(0x80), RecordType::UserDefined);
413        assert_eq!(RecordType::from(0x00), RecordType::Unknown);
414    }
415
416    #[test]
417    fn test_encode_decode_record() {
418        let payload = b"hello world";
419        let encoded = encode_record(RecordType::Data, payload);
420
421        assert_eq!(encoded.len(), TLV_HEADER_SIZE + payload.len());
422        assert_eq!(encoded[0], 0x01); // Data type
423
424        let (record, size) = parse_record(&encoded).unwrap();
425        assert_eq!(size, encoded.len());
426        assert_eq!(record.record_type, RecordType::Data);
427        assert_eq!(record.payload.as_ref(), payload);
428    }
429
430    #[test]
431    fn test_record_iterator() {
432        // Create multiple records
433        let mut data = Vec::new();
434        data.extend_from_slice(&encode_record(RecordType::Data, b"record1"));
435        data.extend_from_slice(&encode_record(RecordType::Data, b"record2"));
436        data.extend_from_slice(&encode_record(RecordType::Tombstone, b""));
437
438        let records: Vec<_> = RecordIterator::new(Bytes::from(data))
439            .collect::<std::result::Result<Vec<_>, _>>()
440            .unwrap();
441
442        assert_eq!(records.len(), 3);
443        assert_eq!(records[0].as_str(), Some("record1"));
444        assert_eq!(records[1].as_str(), Some("record2"));
445        assert!(records[2].is_tombstone());
446    }
447
448    #[test]
449    fn test_insufficient_header() {
450        let data = Bytes::from(vec![0x01, 0x00]); // Only 2 bytes
451        let mut iter = RecordIterator::new(data);
452
453        let result = iter.next();
454        assert!(matches!(
455            result,
456            Some(Err(RecordParseError::InsufficientHeader { .. }))
457        ));
458    }
459
460    #[test]
461    fn test_insufficient_payload() {
462        // Header says 100 bytes but only 5 provided
463        // Length is little-endian: 100 = 0x64 0x00 0x00 0x00
464        let mut data = vec![0x01, 0x64, 0x00, 0x00, 0x00]; // type + length (100 LE)
465        data.extend_from_slice(b"short"); // Only 5 bytes
466
467        let mut iter = RecordIterator::new(Bytes::from(data));
468        let result = iter.next();
469        assert!(matches!(
470            result,
471            Some(Err(RecordParseError::InsufficientPayload { .. }))
472        ));
473    }
474
475    #[test]
476    fn test_empty_record() {
477        let encoded = encode_record(RecordType::Tombstone, b"");
478        let (record, _) = parse_record(&encoded).unwrap();
479
480        assert!(record.is_tombstone());
481        assert!(record.payload.is_empty());
482    }
483
484    #[test]
485    fn test_record_offset_tracking() {
486        let mut data = Vec::new();
487        let rec1 = encode_record(RecordType::Data, b"first");
488        let rec2 = encode_record(RecordType::Data, b"second");
489        data.extend_from_slice(&rec1);
490        data.extend_from_slice(&rec2);
491
492        let records: Vec<_> = RecordIterator::new(Bytes::from(data))
493            .collect::<std::result::Result<Vec<_>, _>>()
494            .unwrap();
495
496        assert_eq!(records[0].offset, 0);
497        assert_eq!(records[1].offset, rec1.len());
498    }
499}