Skip to main content

lnc_client/
record.rs

1//! TLV Record Parsing
2//!
3//! Provides parsing utilities for TLV (Type-Length-Value) encoded records
4//! from LANCE stream data.
5//!
6//! # Record Format
7//!
8//! Each record in a LANCE stream follows the TLV format:
9//!
10//! ```text
11//! +--------+--------+------------------+
12//! | Type   | Length | Value (payload)  |
13//! | 1 byte | 4 bytes| variable         |
14//! +--------+--------+------------------+
15//! ```
16//!
17//! - **Type**: Record type identifier (1 byte)
18//! - **Length**: Payload length in bytes (4 bytes, big-endian)
19//! - **Value**: Record payload (variable length)
20//!
21//! # Example
22//!
23//! ```rust,no_run
24//! use lnc_client::{RecordIterator, Record, RecordType};
25//! use bytes::Bytes;
26//!
27//! fn process_records(data: Bytes) {
28//!     for result in RecordIterator::new(data) {
29//!         match result {
30//!             Ok(record) => {
31//!                 println!("Type: {:?}, Length: {}", record.record_type, record.payload.len());
32//!             }
33//!             Err(e) => {
34//!                 eprintln!("Parse error: {}", e);
35//!                 break;
36//!             }
37//!         }
38//!     }
39//! }
40//! ```
41
42use bytes::Bytes;
43use std::fmt;
44
45use crate::error::{ClientError, Result};
46
47/// TLV header size in bytes (1 byte type + 4 bytes length)
48pub const TLV_HEADER_SIZE: usize = 5;
49
50/// Record type identifiers
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
52#[repr(u8)]
53pub enum RecordType {
54    /// Standard data record
55    Data = 0x01,
56    /// Tombstone/deletion marker
57    Tombstone = 0x02,
58    /// Transaction boundary marker
59    Transaction = 0x03,
60    /// Checkpoint/snapshot marker
61    Checkpoint = 0x04,
62    /// Schema change record
63    Schema = 0x05,
64    /// Compression marker (following records are compressed)
65    Compressed = 0x10,
66    /// User-defined record type (application-specific)
67    UserDefined = 0x80,
68    /// Unknown/reserved type
69    Unknown = 0xFF,
70}
71
72impl From<u8> for RecordType {
73    fn from(byte: u8) -> Self {
74        match byte {
75            0x01 => RecordType::Data,
76            0x02 => RecordType::Tombstone,
77            0x03 => RecordType::Transaction,
78            0x04 => RecordType::Checkpoint,
79            0x05 => RecordType::Schema,
80            0x10 => RecordType::Compressed,
81            0x80..=0xFE => RecordType::UserDefined,
82            _ => RecordType::Unknown,
83        }
84    }
85}
86
87impl From<RecordType> for u8 {
88    fn from(rt: RecordType) -> u8 {
89        rt as u8
90    }
91}
92
93/// A parsed TLV record
94#[derive(Debug, Clone)]
95pub struct Record {
96    /// Record type
97    pub record_type: RecordType,
98    /// Raw type byte (for user-defined types)
99    pub type_byte: u8,
100    /// Record payload (zero-copy reference to original data)
101    pub payload: Bytes,
102    /// Offset in the original data where this record started
103    pub offset: usize,
104}
105
106impl Record {
107    /// Create a new record
108    pub fn new(record_type: RecordType, type_byte: u8, payload: Bytes, offset: usize) -> Self {
109        Self {
110            record_type,
111            type_byte,
112            payload,
113            offset,
114        }
115    }
116
117    /// Check if this is a data record
118    pub fn is_data(&self) -> bool {
119        self.record_type == RecordType::Data
120    }
121
122    /// Check if this is a tombstone record
123    pub fn is_tombstone(&self) -> bool {
124        self.record_type == RecordType::Tombstone
125    }
126
127    /// Get the total size of this record (header + payload)
128    pub fn total_size(&self) -> usize {
129        TLV_HEADER_SIZE + self.payload.len()
130    }
131
132    /// Get payload as UTF-8 string (if valid)
133    pub fn as_str(&self) -> Option<&str> {
134        std::str::from_utf8(&self.payload).ok()
135    }
136
137    /// Get payload as bytes slice
138    pub fn as_bytes(&self) -> &[u8] {
139        &self.payload
140    }
141}
142
143/// Error type for record parsing
144#[derive(Debug, Clone)]
145pub enum RecordParseError {
146    /// Not enough data for TLV header
147    InsufficientHeader { needed: usize, available: usize },
148    /// Not enough data for payload
149    InsufficientPayload { needed: usize, available: usize },
150    /// Invalid record type
151    InvalidType(u8),
152    /// Payload length exceeds maximum
153    PayloadTooLarge { length: u32, max: u32 },
154}
155
156impl fmt::Display for RecordParseError {
157    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158        match self {
159            RecordParseError::InsufficientHeader { needed, available } => {
160                write!(
161                    f,
162                    "Insufficient data for header: need {} bytes, have {}",
163                    needed, available
164                )
165            },
166            RecordParseError::InsufficientPayload { needed, available } => {
167                write!(
168                    f,
169                    "Insufficient data for payload: need {} bytes, have {}",
170                    needed, available
171                )
172            },
173            RecordParseError::InvalidType(t) => {
174                write!(f, "Invalid record type: 0x{:02X}", t)
175            },
176            RecordParseError::PayloadTooLarge { length, max } => {
177                write!(f, "Payload too large: {} bytes (max: {})", length, max)
178            },
179        }
180    }
181}
182
183impl std::error::Error for RecordParseError {}
184
185impl From<RecordParseError> for ClientError {
186    fn from(e: RecordParseError) -> Self {
187        ClientError::ProtocolError(e.to_string())
188    }
189}
190
191/// Configuration for record parsing
192#[derive(Debug, Clone)]
193pub struct RecordParserConfig {
194    /// Maximum allowed payload size (default: 16MB)
195    pub max_payload_size: u32,
196    /// Skip unknown record types instead of erroring
197    pub skip_unknown: bool,
198}
199
200impl Default for RecordParserConfig {
201    fn default() -> Self {
202        Self {
203            max_payload_size: 16 * 1024 * 1024, // 16MB
204            skip_unknown: true,
205        }
206    }
207}
208
209/// Iterator over TLV records in a byte buffer
210pub struct RecordIterator {
211    data: Bytes,
212    offset: usize,
213    config: RecordParserConfig,
214}
215
216impl RecordIterator {
217    /// Create a new record iterator with default config
218    pub fn new(data: Bytes) -> Self {
219        Self {
220            data,
221            offset: 0,
222            config: RecordParserConfig::default(),
223        }
224    }
225
226    /// Create a new record iterator with custom config
227    pub fn with_config(data: Bytes, config: RecordParserConfig) -> Self {
228        Self {
229            data,
230            offset: 0,
231            config,
232        }
233    }
234
235    /// Get current offset in the data
236    pub fn offset(&self) -> usize {
237        self.offset
238    }
239
240    /// Get remaining bytes
241    pub fn remaining(&self) -> usize {
242        self.data.len().saturating_sub(self.offset)
243    }
244
245    /// Parse a single record at the current position
246    fn parse_record(&mut self) -> std::result::Result<Option<Record>, RecordParseError> {
247        let remaining = self.remaining();
248
249        // Check if we have enough for header
250        if remaining == 0 {
251            return Ok(None);
252        }
253
254        if remaining < TLV_HEADER_SIZE {
255            return Err(RecordParseError::InsufficientHeader {
256                needed: TLV_HEADER_SIZE,
257                available: remaining,
258            });
259        }
260
261        let start_offset = self.offset;
262
263        // Parse type byte
264        let type_byte = self.data[self.offset];
265        let record_type = RecordType::from(type_byte);
266
267        // Parse length (4 bytes little-endian per Architecture §2.2)
268        let length = u32::from_le_bytes([
269            self.data[self.offset + 1],
270            self.data[self.offset + 2],
271            self.data[self.offset + 3],
272            self.data[self.offset + 4],
273        ]);
274
275        // Validate length
276        if length > self.config.max_payload_size {
277            return Err(RecordParseError::PayloadTooLarge {
278                length,
279                max: self.config.max_payload_size,
280            });
281        }
282
283        let payload_len = length as usize;
284
285        // Check if we have enough for payload
286        if remaining < TLV_HEADER_SIZE + payload_len {
287            return Err(RecordParseError::InsufficientPayload {
288                needed: payload_len,
289                available: remaining - TLV_HEADER_SIZE,
290            });
291        }
292
293        // Extract payload (zero-copy slice)
294        let payload_start = self.offset + TLV_HEADER_SIZE;
295        let payload_end = payload_start + payload_len;
296        let payload = self.data.slice(payload_start..payload_end);
297
298        // Advance offset
299        self.offset = payload_end;
300
301        Ok(Some(Record::new(
302            record_type,
303            type_byte,
304            payload,
305            start_offset,
306        )))
307    }
308}
309
310impl Iterator for RecordIterator {
311    type Item = std::result::Result<Record, RecordParseError>;
312
313    fn next(&mut self) -> Option<Self::Item> {
314        match self.parse_record() {
315            Ok(Some(record)) => Some(Ok(record)),
316            Ok(None) => None,
317            Err(e) => Some(Err(e)),
318        }
319    }
320}
321
322/// Parse all records from a byte buffer
323pub fn parse_records(data: Bytes) -> Result<Vec<Record>> {
324    let mut records = Vec::new();
325    for result in RecordIterator::new(data) {
326        records.push(result?);
327    }
328    Ok(records)
329}
330
331/// Parse a single record from a byte buffer
332pub fn parse_record(data: &[u8]) -> std::result::Result<(Record, usize), RecordParseError> {
333    if data.len() < TLV_HEADER_SIZE {
334        return Err(RecordParseError::InsufficientHeader {
335            needed: TLV_HEADER_SIZE,
336            available: data.len(),
337        });
338    }
339
340    let type_byte = data[0];
341    let record_type = RecordType::from(type_byte);
342
343    let length = u32::from_le_bytes([data[1], data[2], data[3], data[4]]);
344    let payload_len = length as usize;
345
346    if data.len() < TLV_HEADER_SIZE + payload_len {
347        return Err(RecordParseError::InsufficientPayload {
348            needed: payload_len,
349            available: data.len() - TLV_HEADER_SIZE,
350        });
351    }
352
353    let payload = Bytes::copy_from_slice(&data[TLV_HEADER_SIZE..TLV_HEADER_SIZE + payload_len]);
354    let total_size = TLV_HEADER_SIZE + payload_len;
355
356    Ok((Record::new(record_type, type_byte, payload, 0), total_size))
357}
358
359/// Encode a record to TLV format
360pub fn encode_record(record_type: RecordType, payload: &[u8]) -> Bytes {
361    let mut buf = Vec::with_capacity(TLV_HEADER_SIZE + payload.len());
362
363    // Type byte
364    buf.push(record_type as u8);
365
366    // Length (4 bytes little-endian per Architecture §2.2)
367    let length = payload.len() as u32;
368    buf.extend_from_slice(&length.to_le_bytes());
369
370    // Payload
371    buf.extend_from_slice(payload);
372
373    Bytes::from(buf)
374}
375
376/// Encode a record with custom type byte
377pub fn encode_record_with_type(type_byte: u8, payload: &[u8]) -> Bytes {
378    let mut buf = Vec::with_capacity(TLV_HEADER_SIZE + payload.len());
379
380    buf.push(type_byte);
381    let length = payload.len() as u32;
382    buf.extend_from_slice(&length.to_le_bytes());
383    buf.extend_from_slice(payload);
384
385    Bytes::from(buf)
386}
387
388#[cfg(test)]
389#[allow(clippy::unwrap_used)]
390mod tests {
391    use super::*;
392
393    #[test]
394    fn test_record_type_from_byte() {
395        assert_eq!(RecordType::from(0x01), RecordType::Data);
396        assert_eq!(RecordType::from(0x02), RecordType::Tombstone);
397        assert_eq!(RecordType::from(0x80), RecordType::UserDefined);
398        assert_eq!(RecordType::from(0x00), RecordType::Unknown);
399    }
400
401    #[test]
402    fn test_encode_decode_record() {
403        let payload = b"hello world";
404        let encoded = encode_record(RecordType::Data, payload);
405
406        assert_eq!(encoded.len(), TLV_HEADER_SIZE + payload.len());
407        assert_eq!(encoded[0], 0x01); // Data type
408
409        let (record, size) = parse_record(&encoded).unwrap();
410        assert_eq!(size, encoded.len());
411        assert_eq!(record.record_type, RecordType::Data);
412        assert_eq!(record.payload.as_ref(), payload);
413    }
414
415    #[test]
416    fn test_record_iterator() {
417        // Create multiple records
418        let mut data = Vec::new();
419        data.extend_from_slice(&encode_record(RecordType::Data, b"record1"));
420        data.extend_from_slice(&encode_record(RecordType::Data, b"record2"));
421        data.extend_from_slice(&encode_record(RecordType::Tombstone, b""));
422
423        let records: Vec<_> = RecordIterator::new(Bytes::from(data))
424            .collect::<std::result::Result<Vec<_>, _>>()
425            .unwrap();
426
427        assert_eq!(records.len(), 3);
428        assert_eq!(records[0].as_str(), Some("record1"));
429        assert_eq!(records[1].as_str(), Some("record2"));
430        assert!(records[2].is_tombstone());
431    }
432
433    #[test]
434    fn test_insufficient_header() {
435        let data = Bytes::from(vec![0x01, 0x00]); // Only 2 bytes
436        let mut iter = RecordIterator::new(data);
437
438        let result = iter.next();
439        assert!(matches!(
440            result,
441            Some(Err(RecordParseError::InsufficientHeader { .. }))
442        ));
443    }
444
445    #[test]
446    fn test_insufficient_payload() {
447        // Header says 100 bytes but only 5 provided
448        // Length is little-endian: 100 = 0x64 0x00 0x00 0x00
449        let mut data = vec![0x01, 0x64, 0x00, 0x00, 0x00]; // type + length (100 LE)
450        data.extend_from_slice(b"short"); // Only 5 bytes
451
452        let mut iter = RecordIterator::new(Bytes::from(data));
453        let result = iter.next();
454        assert!(matches!(
455            result,
456            Some(Err(RecordParseError::InsufficientPayload { .. }))
457        ));
458    }
459
460    #[test]
461    fn test_empty_record() {
462        let encoded = encode_record(RecordType::Tombstone, b"");
463        let (record, _) = parse_record(&encoded).unwrap();
464
465        assert!(record.is_tombstone());
466        assert!(record.payload.is_empty());
467    }
468
469    #[test]
470    fn test_record_offset_tracking() {
471        let mut data = Vec::new();
472        let rec1 = encode_record(RecordType::Data, b"first");
473        let rec2 = encode_record(RecordType::Data, b"second");
474        data.extend_from_slice(&rec1);
475        data.extend_from_slice(&rec2);
476
477        let records: Vec<_> = RecordIterator::new(Bytes::from(data))
478            .collect::<std::result::Result<Vec<_>, _>>()
479            .unwrap();
480
481        assert_eq!(records[0].offset, 0);
482        assert_eq!(records[1].offset, rec1.len());
483    }
484}