Skip to main content

kimberlite_storage/
record.rs

1//! Record type for the append-only log.
2//!
3//! Each record contains an offset, record kind, compression kind, optional
4//! link to previous record's hash, and a payload. Records are serialized with
5//! CRC32 checksums for integrity.
6//!
7//! # Security boundary: CRC32 vs hash chain
8//!
9//! - **CRC32** (appended to every record): detects accidental corruption —
10//!   bit-rot, torn writes, hardware errors.  It is **not** tamper-evident;
11//!   an adversary with write access can forge a valid CRC32.
12//!
13//! - **SHA-256 hash chain** (`prev_hash` field): provides tamper evidence.
14//!   Each record cryptographically links to the previous one, forming a chain
15//!   back to genesis.  Use [`Storage::verify_chain`] when tamper detection is
16//!   required (e.g., compliance exports, audit log verification).
17//!
18//! # Record Format
19//!
20//! ```text
21//! [RECORD_START:u32][offset:u64][prev_hash:32B][kind:u8][compression:u8][length:u32][payload:bytes][crc32:u32][RECORD_END:u32]
22//!       4B               8B           32B         1B          1B             4B         variable        4B            4B
23//! ```
24//!
25//! **AUDIT-2026-03 M-8:** Sentinel markers (RECORD_START/END) enable torn write detection.
26//! If RECORD_END is missing during recovery, the record was incompletely written (power loss).
27
28use bytes::{Bytes, BytesMut};
29use kimberlite_crypto::{ChainHash, chain_hash};
30use kimberlite_types::{CompressionKind, Offset, RecordKind};
31
32use crate::StorageError;
33
34// **AUDIT-2026-03 M-8: Torn Write Protection**
35// Magic number marking the start of a record (0xBADC0FFE in little-endian).
36const RECORD_START: u32 = 0xBADC_0FFE;
37
38// Magic number marking the end of a complete record (0xC0FFEE42 in little-endian).
39const RECORD_END: u32 = 0xC0FF_EE42;
40
41// Header size: start_sentinel(4) + offset(8) + prev_hash(32) + kind(1) + compression(1) + length(4) = 50 bytes.
42const HEADER_SIZE: usize = 50;
43
44// Total overhead per record: header(50) + crc(4) + end_sentinel(4) = 58 bytes.
45const RECORD_OVERHEAD: usize = 58;
46
47/// A single record in the event log.
48///
49/// Records are the on-disk representation of events. Each record contains
50/// an offset (logical position), a record kind, compression kind, the event
51/// payload, and is serialized with a CRC32 checksum for integrity.
52///
53/// # Record Kinds
54///
55/// - [`RecordKind::Data`]: Normal application data
56/// - [`RecordKind::Checkpoint`]: Periodic verification anchor
57/// - [`RecordKind::Tombstone`]: Logical deletion marker
58#[derive(Debug, Clone, PartialEq, Eq, Hash)]
59pub struct Record {
60    offset: Offset,
61    prev_hash: Option<ChainHash>,
62    kind: RecordKind,
63    compression: CompressionKind,
64    payload: Bytes,
65}
66
67impl Record {
68    /// Creates a new data record with the given offset and payload (uncompressed).
69    pub fn new(offset: Offset, prev_hash: Option<ChainHash>, payload: Bytes) -> Self {
70        Self {
71            offset,
72            prev_hash,
73            kind: RecordKind::Data,
74            compression: CompressionKind::None,
75            payload,
76        }
77    }
78
79    /// Creates a new record with a specific kind (uncompressed).
80    pub fn with_kind(
81        offset: Offset,
82        prev_hash: Option<ChainHash>,
83        kind: RecordKind,
84        payload: Bytes,
85    ) -> Self {
86        Self {
87            offset,
88            prev_hash,
89            kind,
90            compression: CompressionKind::None,
91            payload,
92        }
93    }
94
95    /// Creates a new record with a specific kind and compression.
96    pub fn with_compression(
97        offset: Offset,
98        prev_hash: Option<ChainHash>,
99        kind: RecordKind,
100        compression: CompressionKind,
101        payload: Bytes,
102    ) -> Self {
103        Self {
104            offset,
105            prev_hash,
106            kind,
107            compression,
108            payload,
109        }
110    }
111
112    /// Returns the offset of this record.
113    pub fn offset(&self) -> Offset {
114        self.offset
115    }
116
117    /// Returns the hash of the previous record, if any.
118    pub fn prev_hash(&self) -> Option<ChainHash> {
119        self.prev_hash
120    }
121
122    /// Returns the kind of this record.
123    pub fn kind(&self) -> RecordKind {
124        self.kind
125    }
126
127    /// Returns the compression kind used for the payload.
128    pub fn compression(&self) -> CompressionKind {
129        self.compression
130    }
131
132    /// Returns the payload of this record.
133    pub fn payload(&self) -> &Bytes {
134        &self.payload
135    }
136
137    /// Returns true if this is a checkpoint record.
138    pub fn is_checkpoint(&self) -> bool {
139        self.kind == RecordKind::Checkpoint
140    }
141
142    /// Computes the hash of this record for chain linking.
143    ///
144    /// The hash covers the kind byte and payload to ensure the record kind
145    /// is part of the tamper-evident chain. The hash is computed over the
146    /// *original* (uncompressed) payload.
147    pub fn compute_hash(&self) -> ChainHash {
148        let mut data = Vec::with_capacity(1 + self.payload.len());
149        data.push(self.kind.as_byte());
150        data.extend_from_slice(&self.payload);
151        chain_hash(self.prev_hash.as_ref(), &data)
152    }
153
154    /// Serializes the record to bytes.
155    ///
156    /// Format: `[offset:u64][prev_hash:32B][kind:u8][compression:u8][length:u32][payload][crc32:u32]`
157    ///
158    /// All integers are little-endian.
159    pub fn to_bytes(&self) -> Vec<u8> {
160        let mut buf = Vec::with_capacity(RECORD_OVERHEAD + self.payload.len());
161        self.write_into(&mut buf);
162        buf
163    }
164
165    /// Serializes the record into an existing `BytesMut` buffer (zero-copy path).
166    pub fn to_bytes_into(&self, buf: &mut BytesMut) {
167        buf.reserve(RECORD_OVERHEAD + self.payload.len());
168        let mut tmp = Vec::with_capacity(RECORD_OVERHEAD + self.payload.len());
169        self.write_into(&mut tmp);
170        buf.extend_from_slice(&tmp);
171    }
172
173    /// Internal serialization into a `Vec<u8>`.
174    ///
175    /// **AUDIT-2026-03 M-8:** Includes sentinel markers for torn write detection.
176    fn write_into(&self, buf: &mut Vec<u8>) {
177        // RECORD_START sentinel (4 bytes) - AUDIT-2026-03 M-8
178        buf.extend_from_slice(&RECORD_START.to_le_bytes());
179
180        // offset (8 bytes)
181        buf.extend_from_slice(&self.offset.as_u64().to_le_bytes());
182
183        // prev_hash (32 bytes) - zeros if genesis
184        match &self.prev_hash {
185            Some(hash) => buf.extend_from_slice(hash.as_bytes()),
186            None => buf.extend_from_slice(&[0u8; 32]),
187        }
188
189        // kind (1 byte)
190        buf.push(self.kind.as_byte());
191
192        // compression (1 byte)
193        buf.push(self.compression.as_byte());
194
195        // length (4 bytes)
196        buf.extend_from_slice(&(self.payload.len() as u32).to_le_bytes());
197
198        // payload (variable)
199        buf.extend_from_slice(&self.payload);
200
201        // crc (4 bytes) - checksum of everything from start_sentinel to payload (inclusive)
202        let crc = kimberlite_crypto::crc32(buf);
203        buf.extend_from_slice(&crc.to_le_bytes());
204
205        // Property: CRC32 written must match recomputation over the same buffer
206        kimberlite_properties::always!(
207            kimberlite_crypto::crc32(&buf[..buf.len() - 4]) == crc,
208            "storage.crc32_matches_after_write",
209            "CRC32 checksum matches stored data immediately after write"
210        );
211
212        // RECORD_END sentinel (4 bytes) - AUDIT-2026-03 M-8
213        // If this is missing during recovery, the record was incompletely written (torn write)
214        buf.extend_from_slice(&RECORD_END.to_le_bytes());
215    }
216
217    /// Deserializes a record from bytes.
218    ///
219    /// Returns the parsed record and the number of bytes consumed.
220    /// Uses zero-copy slicing for the payload via [`Bytes::slice`].
221    ///
222    /// **AUDIT-2026-03 M-8:** Detects torn writes via RECORD_END sentinel check.
223    ///
224    /// # Errors
225    ///
226    /// - [`StorageError::UnexpectedEof`] if the data is truncated
227    /// - [`StorageError::CorruptedRecord`] if the CRC doesn't match
228    /// - [`StorageError::TornWrite`] if RECORD_START or RECORD_END sentinel is missing
229    /// - [`StorageError::InvalidRecordKind`] if the kind byte is invalid
230    /// - [`StorageError::InvalidCompressionKind`] if the compression byte is invalid
231    pub fn from_bytes(data: &Bytes) -> Result<(Self, usize), StorageError> {
232        if data.len() < HEADER_SIZE {
233            return Err(StorageError::UnexpectedEof);
234        }
235
236        // **AUDIT-2026-03 M-8: Torn Write Detection**
237        // Check RECORD_START sentinel (bytes 0-3)
238        let start_sentinel = u32::from_le_bytes(
239            data[0..4]
240                .try_into()
241                .expect("slice is exactly 4 bytes after bounds check"),
242        );
243        if start_sentinel != RECORD_START {
244            return Err(StorageError::TornWrite {
245                reason: "missing or corrupted RECORD_START sentinel".to_string(),
246            });
247        }
248
249        // Read offset (bytes 4-11)
250        let offset = Offset::new(u64::from_le_bytes(
251            data[4..12]
252                .try_into()
253                .expect("slice is exactly 8 bytes after bounds check"),
254        ));
255
256        // Read prev_hash (bytes 12-43)
257        let prev_hash_bytes: [u8; 32] = data[12..44]
258            .try_into()
259            .expect("slice is exactly 32 bytes after bounds check");
260        let prev_hash = if prev_hash_bytes == [0u8; 32] {
261            None
262        } else {
263            Some(ChainHash::from_bytes(&prev_hash_bytes))
264        };
265
266        // Read kind (byte 44)
267        let kind = RecordKind::from_byte(data[44]).ok_or(StorageError::InvalidRecordKind {
268            byte: data[44],
269            offset,
270        })?;
271
272        // Read compression (byte 45)
273        let compression =
274            CompressionKind::from_byte(data[45]).ok_or(StorageError::InvalidCompressionKind {
275                byte: data[45],
276                offset,
277            })?;
278
279        // Read length (bytes 46-49)
280        let length = u32::from_le_bytes(
281            data[46..50]
282                .try_into()
283                .expect("slice is exactly 4 bytes after bounds check"),
284        ) as usize;
285
286        // Check we have enough for payload + crc(4) + end_sentinel(4)
287        let total_size = HEADER_SIZE + length + 4 + 4;
288        if data.len() < total_size {
289            return Err(StorageError::UnexpectedEof);
290        }
291
292        // Read payload (bytes 50..50+length) - zero-copy!
293        let payload = data.slice(HEADER_SIZE..HEADER_SIZE + length);
294
295        // Read and verify CRC (bytes 50+length..54+length)
296        let crc_offset = HEADER_SIZE + length;
297        let stored_crc = u32::from_le_bytes(
298            data[crc_offset..crc_offset + 4]
299                .try_into()
300                .expect("slice is exactly 4 bytes after bounds check"),
301        );
302        let computed_crc = kimberlite_crypto::crc32(&data[0..crc_offset]);
303
304        if stored_crc != computed_crc {
305            return Err(StorageError::CorruptedRecord);
306        }
307
308        // Property: CRC32 verified successfully — checksum matches stored data
309        kimberlite_properties::always!(
310            stored_crc == computed_crc,
311            "storage.crc32_verified_on_read",
312            "CRC32 checksum matches stored data after successful deserialization"
313        );
314
315        // **AUDIT-2026-03 M-8: Torn Write Detection**
316        // Check RECORD_END sentinel (bytes 54+length..58+length)
317        let end_sentinel_offset = crc_offset + 4;
318        let end_sentinel = u32::from_le_bytes(
319            data[end_sentinel_offset..end_sentinel_offset + 4]
320                .try_into()
321                .expect("slice is exactly 4 bytes after bounds check"),
322        );
323        if end_sentinel != RECORD_END {
324            return Err(StorageError::TornWrite {
325                reason: format!(
326                    "missing or corrupted RECORD_END sentinel at offset {}: expected {:#010x}, found {:#010x}",
327                    offset.as_u64(),
328                    RECORD_END,
329                    end_sentinel
330                ),
331            });
332        }
333
334        Ok((
335            Record {
336                offset,
337                prev_hash,
338                kind,
339                compression,
340                payload,
341            },
342            total_size,
343        ))
344    }
345}