rustlite_wal/
record.rs

1// WAL record format and encoding/decoding
2//
3// Record format (binary):
4// [length: u32 LE] [type: u8] [payload bytes] [crc32: u32 LE]
5//
6// Types:
7// - PUT (1): key-value insert/update
8// - DELETE (2): key deletion
9// - BEGIN_TX (3): transaction start marker
10// - COMMIT_TX (4): transaction commit marker
11// - CHECKPOINT (5): checkpoint marker
12
13use crc32fast::Hasher;
14use rustlite_core::{Error, Result};
15use serde::{Deserialize, Serialize};
16
17/// WAL record types
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19#[repr(u8)]
20pub enum RecordType {
21    Put = 1,
22    Delete = 2,
23    BeginTx = 3,
24    CommitTx = 4,
25    Checkpoint = 5,
26}
27
28impl TryFrom<u8> for RecordType {
29    type Error = Error;
30
31    fn try_from(value: u8) -> Result<Self> {
32        match value {
33            1 => Ok(RecordType::Put),
34            2 => Ok(RecordType::Delete),
35            3 => Ok(RecordType::BeginTx),
36            4 => Ok(RecordType::CommitTx),
37            5 => Ok(RecordType::Checkpoint),
38            _ => Err(Error::InvalidOperation(format!(
39                "Unknown WAL record type: {}",
40                value
41            ))),
42        }
43    }
44}
45
46/// WAL record payload
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48pub enum RecordPayload {
49    Put { key: Vec<u8>, value: Vec<u8> },
50    Delete { key: Vec<u8> },
51    BeginTx { tx_id: u64 },
52    CommitTx { tx_id: u64 },
53    Checkpoint { sequence: u64 },
54}
55
56/// A WAL record
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct WalRecord {
59    pub record_type: RecordType,
60    pub payload: RecordPayload,
61}
62
63impl WalRecord {
64    /// Create a new WAL record
65    /// This is a convenience method for tests and simple usage
66    pub fn new(record_type: RecordType, key: Vec<u8>, value: Vec<u8>) -> Self {
67        match record_type {
68            RecordType::Put => Self::put(key, value),
69            RecordType::Delete => Self::delete(key),
70            RecordType::BeginTx => Self::begin_tx(0), // Default tx_id
71            RecordType::CommitTx => Self::commit_tx(0),
72            RecordType::Checkpoint => Self::checkpoint(0),
73        }
74    }
75
76    /// Create a PUT record
77    pub fn put(key: Vec<u8>, value: Vec<u8>) -> Self {
78        Self {
79            record_type: RecordType::Put,
80            payload: RecordPayload::Put { key, value },
81        }
82    }
83
84    /// Create a DELETE record
85    pub fn delete(key: Vec<u8>) -> Self {
86        Self {
87            record_type: RecordType::Delete,
88            payload: RecordPayload::Delete { key },
89        }
90    }
91
92    /// Create a BEGIN_TX record
93    pub fn begin_tx(tx_id: u64) -> Self {
94        Self {
95            record_type: RecordType::BeginTx,
96            payload: RecordPayload::BeginTx { tx_id },
97        }
98    }
99
100    /// Create a COMMIT_TX record
101    pub fn commit_tx(tx_id: u64) -> Self {
102        Self {
103            record_type: RecordType::CommitTx,
104            payload: RecordPayload::CommitTx { tx_id },
105        }
106    }
107
108    /// Create a CHECKPOINT record
109    pub fn checkpoint(sequence: u64) -> Self {
110        Self {
111            record_type: RecordType::Checkpoint,
112            payload: RecordPayload::Checkpoint { sequence },
113        }
114    }
115
116    /// Encode record to bytes with framing and CRC
117    /// Format: [length: u32 LE] [type: u8] [payload bytes] [crc32: u32 LE]
118    pub fn encode(&self) -> Result<Vec<u8>> {
119        // Serialize payload
120        let payload_bytes = bincode::serialize(&self.payload)
121            .map_err(|e| Error::Serialization(format!("Failed to serialize payload: {}", e)))?;
122
123        let type_byte = self.record_type as u8;
124
125        // Calculate length (type byte + payload)
126        let content_len = 1 + payload_bytes.len();
127
128        // Calculate CRC over type + payload
129        let mut hasher = Hasher::new();
130        hasher.update(&[type_byte]);
131        hasher.update(&payload_bytes);
132        let crc = hasher.finalize();
133
134        // Build frame: [length][type][payload][crc]
135        let mut frame = Vec::with_capacity(4 + content_len + 4);
136        frame.extend_from_slice(&(content_len as u32).to_le_bytes());
137        frame.push(type_byte);
138        frame.extend_from_slice(&payload_bytes);
139        frame.extend_from_slice(&crc.to_le_bytes());
140
141        Ok(frame)
142    }
143
144    /// Decode record from bytes with validation
145    pub fn decode(data: &[u8]) -> Result<(Self, usize)> {
146        if data.len() < 9 {
147            // Minimum: 4 (length) + 1 (type) + 0 (payload) + 4 (crc)
148            return Err(Error::Serialization("Incomplete record frame".to_string()));
149        }
150
151        // Read length
152        let length = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
153
154        // Check if we have the full record
155        let total_size = 4 + length + 4; // length field + content + crc
156        if data.len() < total_size {
157            return Err(Error::Serialization(format!(
158                "Incomplete record: expected {} bytes, got {}",
159                total_size,
160                data.len()
161            )));
162        }
163
164        // Read type
165        let type_byte = data[4];
166        let record_type = RecordType::try_from(type_byte)?;
167
168        // Read payload
169        let payload_bytes = &data[5..4 + length];
170
171        // Read CRC
172        let crc_offset = 4 + length;
173        let expected_crc = u32::from_le_bytes([
174            data[crc_offset],
175            data[crc_offset + 1],
176            data[crc_offset + 2],
177            data[crc_offset + 3],
178        ]);
179
180        // Validate CRC
181        let mut hasher = Hasher::new();
182        hasher.update(&[type_byte]);
183        hasher.update(payload_bytes);
184        let actual_crc = hasher.finalize();
185
186        if actual_crc != expected_crc {
187            return Err(Error::Storage(format!(
188                "CRC mismatch: expected {}, got {}",
189                expected_crc, actual_crc
190            )));
191        }
192
193        // Deserialize payload
194        let payload: RecordPayload = bincode::deserialize(payload_bytes)
195            .map_err(|e| Error::Serialization(format!("Failed to deserialize payload: {}", e)))?;
196
197        Ok((
198            WalRecord {
199                record_type,
200                payload,
201            },
202            total_size,
203        ))
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210
211    #[test]
212    fn test_record_type_conversion() {
213        assert_eq!(RecordType::try_from(1).unwrap(), RecordType::Put);
214        assert_eq!(RecordType::try_from(2).unwrap(), RecordType::Delete);
215        assert!(RecordType::try_from(99).is_err());
216    }
217
218    #[test]
219    fn test_put_record_encode_decode() {
220        let record = WalRecord::put(b"key1".to_vec(), b"value1".to_vec());
221
222        let encoded = record.encode().unwrap();
223        assert!(encoded.len() > 9); // Has minimum framing
224
225        let (decoded, size) = WalRecord::decode(&encoded).unwrap();
226        assert_eq!(decoded, record);
227        assert_eq!(size, encoded.len());
228    }
229
230    #[test]
231    fn test_delete_record_encode_decode() {
232        let record = WalRecord::delete(b"key1".to_vec());
233
234        let encoded = record.encode().unwrap();
235        let (decoded, _) = WalRecord::decode(&encoded).unwrap();
236
237        assert_eq!(decoded, record);
238    }
239
240    #[test]
241    fn test_tx_records_encode_decode() {
242        let begin = WalRecord::begin_tx(42);
243        let commit = WalRecord::commit_tx(42);
244
245        let begin_enc = begin.encode().unwrap();
246        let commit_enc = commit.encode().unwrap();
247
248        let (begin_dec, _) = WalRecord::decode(&begin_enc).unwrap();
249        let (commit_dec, _) = WalRecord::decode(&commit_enc).unwrap();
250
251        assert_eq!(begin_dec, begin);
252        assert_eq!(commit_dec, commit);
253    }
254
255    #[test]
256    fn test_checkpoint_record() {
257        let record = WalRecord::checkpoint(1000);
258
259        let encoded = record.encode().unwrap();
260        let (decoded, _) = WalRecord::decode(&encoded).unwrap();
261
262        assert_eq!(decoded, record);
263    }
264
265    #[test]
266    fn test_crc_validation() {
267        let record = WalRecord::put(b"key".to_vec(), b"value".to_vec());
268        let mut encoded = record.encode().unwrap();
269
270        // Corrupt the payload
271        if encoded.len() > 10 {
272            encoded[10] ^= 0xFF;
273        }
274
275        // Should fail CRC check
276        let result = WalRecord::decode(&encoded);
277        assert!(result.is_err());
278    }
279
280    #[test]
281    fn test_incomplete_record() {
282        let record = WalRecord::put(b"key".to_vec(), b"value".to_vec());
283        let encoded = record.encode().unwrap();
284
285        // Try to decode incomplete data
286        let result = WalRecord::decode(&encoded[..5]);
287        assert!(result.is_err());
288    }
289}