kimberlite_storage/record.rs
1//! Record type for the append-only log.
2//!
3//! Each record contains an offset, record kind, compression kind, optional
4//! link to previous record's hash, and a payload. Records are serialized with
5//! CRC32 checksums for integrity.
6//!
7//! # Security boundary: CRC32 vs hash chain
8//!
9//! - **CRC32** (appended to every record): detects accidental corruption —
10//! bit-rot, torn writes, hardware errors. It is **not** tamper-evident;
11//! an adversary with write access can forge a valid CRC32.
12//!
13//! - **SHA-256 hash chain** (`prev_hash` field): provides tamper evidence.
14//! Each record cryptographically links to the previous one, forming a chain
15//! back to genesis. Use [`Storage::verify_chain`] when tamper detection is
16//! required (e.g., compliance exports, audit log verification).
17//!
18//! # Record Format
19//!
20//! ```text
21//! [RECORD_START:u32][offset:u64][prev_hash:32B][kind:u8][compression:u8][length:u32][payload:bytes][crc32:u32][RECORD_END:u32]
22//! 4B 8B 32B 1B 1B 4B variable 4B 4B
23//! ```
24//!
25//! **AUDIT-2026-03 M-8:** Sentinel markers (RECORD_START/END) enable torn write detection.
26//! If RECORD_END is missing during recovery, the record was incompletely written (power loss).
27
28use bytes::{Bytes, BytesMut};
29use kimberlite_crypto::{ChainHash, chain_hash};
30use kimberlite_types::{CompressionKind, Offset, RecordKind};
31
32use crate::StorageError;
33
34// **AUDIT-2026-03 M-8: Torn Write Protection**
35// Magic number marking the start of a record (0xBADC0FFE in little-endian).
36const RECORD_START: u32 = 0xBADC_0FFE;
37
38// Magic number marking the end of a complete record (0xC0FFEE42 in little-endian).
39const RECORD_END: u32 = 0xC0FF_EE42;
40
41// Header size: start_sentinel(4) + offset(8) + prev_hash(32) + kind(1) + compression(1) + length(4) = 50 bytes.
42const HEADER_SIZE: usize = 50;
43
44// Total overhead per record: header(50) + crc(4) + end_sentinel(4) = 58 bytes.
45const RECORD_OVERHEAD: usize = 58;
46
47/// A single record in the event log.
48///
49/// Records are the on-disk representation of events. Each record contains
50/// an offset (logical position), a record kind, compression kind, the event
51/// payload, and is serialized with a CRC32 checksum for integrity.
52///
53/// # Record Kinds
54///
55/// - [`RecordKind::Data`]: Normal application data
56/// - [`RecordKind::Checkpoint`]: Periodic verification anchor
57/// - [`RecordKind::Tombstone`]: Logical deletion marker
58#[derive(Debug, Clone, PartialEq, Eq, Hash)]
59pub struct Record {
60 offset: Offset,
61 prev_hash: Option<ChainHash>,
62 kind: RecordKind,
63 compression: CompressionKind,
64 payload: Bytes,
65}
66
67impl Record {
68 /// Creates a new data record with the given offset and payload (uncompressed).
69 pub fn new(offset: Offset, prev_hash: Option<ChainHash>, payload: Bytes) -> Self {
70 Self {
71 offset,
72 prev_hash,
73 kind: RecordKind::Data,
74 compression: CompressionKind::None,
75 payload,
76 }
77 }
78
79 /// Creates a new record with a specific kind (uncompressed).
80 pub fn with_kind(
81 offset: Offset,
82 prev_hash: Option<ChainHash>,
83 kind: RecordKind,
84 payload: Bytes,
85 ) -> Self {
86 Self {
87 offset,
88 prev_hash,
89 kind,
90 compression: CompressionKind::None,
91 payload,
92 }
93 }
94
95 /// Creates a new record with a specific kind and compression.
96 pub fn with_compression(
97 offset: Offset,
98 prev_hash: Option<ChainHash>,
99 kind: RecordKind,
100 compression: CompressionKind,
101 payload: Bytes,
102 ) -> Self {
103 Self {
104 offset,
105 prev_hash,
106 kind,
107 compression,
108 payload,
109 }
110 }
111
112 /// Returns the offset of this record.
113 pub fn offset(&self) -> Offset {
114 self.offset
115 }
116
117 /// Returns the hash of the previous record, if any.
118 pub fn prev_hash(&self) -> Option<ChainHash> {
119 self.prev_hash
120 }
121
122 /// Returns the kind of this record.
123 pub fn kind(&self) -> RecordKind {
124 self.kind
125 }
126
127 /// Returns the compression kind used for the payload.
128 pub fn compression(&self) -> CompressionKind {
129 self.compression
130 }
131
132 /// Returns the payload of this record.
133 pub fn payload(&self) -> &Bytes {
134 &self.payload
135 }
136
137 /// Returns true if this is a checkpoint record.
138 pub fn is_checkpoint(&self) -> bool {
139 self.kind == RecordKind::Checkpoint
140 }
141
142 /// Computes the hash of this record for chain linking.
143 ///
144 /// The hash covers the kind byte and payload to ensure the record kind
145 /// is part of the tamper-evident chain. The hash is computed over the
146 /// *original* (uncompressed) payload.
147 pub fn compute_hash(&self) -> ChainHash {
148 let mut data = Vec::with_capacity(1 + self.payload.len());
149 data.push(self.kind.as_byte());
150 data.extend_from_slice(&self.payload);
151 chain_hash(self.prev_hash.as_ref(), &data)
152 }
153
154 /// Serializes the record to bytes.
155 ///
156 /// Format: `[offset:u64][prev_hash:32B][kind:u8][compression:u8][length:u32][payload][crc32:u32]`
157 ///
158 /// All integers are little-endian.
159 pub fn to_bytes(&self) -> Vec<u8> {
160 let mut buf = Vec::with_capacity(RECORD_OVERHEAD + self.payload.len());
161 self.write_into(&mut buf);
162 buf
163 }
164
165 /// Serializes the record into an existing `BytesMut` buffer (zero-copy path).
166 pub fn to_bytes_into(&self, buf: &mut BytesMut) {
167 buf.reserve(RECORD_OVERHEAD + self.payload.len());
168 let mut tmp = Vec::with_capacity(RECORD_OVERHEAD + self.payload.len());
169 self.write_into(&mut tmp);
170 buf.extend_from_slice(&tmp);
171 }
172
173 /// Internal serialization into a `Vec<u8>`.
174 ///
175 /// **AUDIT-2026-03 M-8:** Includes sentinel markers for torn write detection.
176 fn write_into(&self, buf: &mut Vec<u8>) {
177 // RECORD_START sentinel (4 bytes) - AUDIT-2026-03 M-8
178 buf.extend_from_slice(&RECORD_START.to_le_bytes());
179
180 // offset (8 bytes)
181 buf.extend_from_slice(&self.offset.as_u64().to_le_bytes());
182
183 // prev_hash (32 bytes) - zeros if genesis
184 match &self.prev_hash {
185 Some(hash) => buf.extend_from_slice(hash.as_bytes()),
186 None => buf.extend_from_slice(&[0u8; 32]),
187 }
188
189 // kind (1 byte)
190 buf.push(self.kind.as_byte());
191
192 // compression (1 byte)
193 buf.push(self.compression.as_byte());
194
195 // length (4 bytes)
196 buf.extend_from_slice(&(self.payload.len() as u32).to_le_bytes());
197
198 // payload (variable)
199 buf.extend_from_slice(&self.payload);
200
201 // crc (4 bytes) - checksum of everything from start_sentinel to payload (inclusive)
202 let crc = kimberlite_crypto::crc32(buf);
203 buf.extend_from_slice(&crc.to_le_bytes());
204
205 // Property: CRC32 written must match recomputation over the same buffer
206 kimberlite_properties::always!(
207 kimberlite_crypto::crc32(&buf[..buf.len() - 4]) == crc,
208 "storage.crc32_matches_after_write",
209 "CRC32 checksum matches stored data immediately after write"
210 );
211
212 // RECORD_END sentinel (4 bytes) - AUDIT-2026-03 M-8
213 // If this is missing during recovery, the record was incompletely written (torn write)
214 buf.extend_from_slice(&RECORD_END.to_le_bytes());
215 }
216
217 /// Deserializes a record from bytes.
218 ///
219 /// Returns the parsed record and the number of bytes consumed.
220 /// Uses zero-copy slicing for the payload via [`Bytes::slice`].
221 ///
222 /// **AUDIT-2026-03 M-8:** Detects torn writes via RECORD_END sentinel check.
223 ///
224 /// # Errors
225 ///
226 /// - [`StorageError::UnexpectedEof`] if the data is truncated
227 /// - [`StorageError::CorruptedRecord`] if the CRC doesn't match
228 /// - [`StorageError::TornWrite`] if RECORD_START or RECORD_END sentinel is missing
229 /// - [`StorageError::InvalidRecordKind`] if the kind byte is invalid
230 /// - [`StorageError::InvalidCompressionKind`] if the compression byte is invalid
231 pub fn from_bytes(data: &Bytes) -> Result<(Self, usize), StorageError> {
232 if data.len() < HEADER_SIZE {
233 return Err(StorageError::UnexpectedEof);
234 }
235
236 // **AUDIT-2026-03 M-8: Torn Write Detection**
237 // Check RECORD_START sentinel (bytes 0-3)
238 let start_sentinel = u32::from_le_bytes(
239 data[0..4]
240 .try_into()
241 .expect("slice is exactly 4 bytes after bounds check"),
242 );
243 if start_sentinel != RECORD_START {
244 return Err(StorageError::TornWrite {
245 reason: "missing or corrupted RECORD_START sentinel".to_string(),
246 });
247 }
248
249 // Read offset (bytes 4-11)
250 let offset = Offset::new(u64::from_le_bytes(
251 data[4..12]
252 .try_into()
253 .expect("slice is exactly 8 bytes after bounds check"),
254 ));
255
256 // Read prev_hash (bytes 12-43)
257 let prev_hash_bytes: [u8; 32] = data[12..44]
258 .try_into()
259 .expect("slice is exactly 32 bytes after bounds check");
260 let prev_hash = if prev_hash_bytes == [0u8; 32] {
261 None
262 } else {
263 Some(ChainHash::from_bytes(&prev_hash_bytes))
264 };
265
266 // Read kind (byte 44)
267 let kind = RecordKind::from_byte(data[44]).ok_or(StorageError::InvalidRecordKind {
268 byte: data[44],
269 offset,
270 })?;
271
272 // Read compression (byte 45)
273 let compression =
274 CompressionKind::from_byte(data[45]).ok_or(StorageError::InvalidCompressionKind {
275 byte: data[45],
276 offset,
277 })?;
278
279 // Read length (bytes 46-49)
280 let length = u32::from_le_bytes(
281 data[46..50]
282 .try_into()
283 .expect("slice is exactly 4 bytes after bounds check"),
284 ) as usize;
285
286 // Check we have enough for payload + crc(4) + end_sentinel(4)
287 let total_size = HEADER_SIZE + length + 4 + 4;
288 if data.len() < total_size {
289 return Err(StorageError::UnexpectedEof);
290 }
291
292 // Read payload (bytes 50..50+length) - zero-copy!
293 let payload = data.slice(HEADER_SIZE..HEADER_SIZE + length);
294
295 // Read and verify CRC (bytes 50+length..54+length)
296 let crc_offset = HEADER_SIZE + length;
297 let stored_crc = u32::from_le_bytes(
298 data[crc_offset..crc_offset + 4]
299 .try_into()
300 .expect("slice is exactly 4 bytes after bounds check"),
301 );
302 let computed_crc = kimberlite_crypto::crc32(&data[0..crc_offset]);
303
304 if stored_crc != computed_crc {
305 return Err(StorageError::CorruptedRecord);
306 }
307
308 // Property: CRC32 verified successfully — checksum matches stored data
309 kimberlite_properties::always!(
310 stored_crc == computed_crc,
311 "storage.crc32_verified_on_read",
312 "CRC32 checksum matches stored data after successful deserialization"
313 );
314
315 // **AUDIT-2026-03 M-8: Torn Write Detection**
316 // Check RECORD_END sentinel (bytes 54+length..58+length)
317 let end_sentinel_offset = crc_offset + 4;
318 let end_sentinel = u32::from_le_bytes(
319 data[end_sentinel_offset..end_sentinel_offset + 4]
320 .try_into()
321 .expect("slice is exactly 4 bytes after bounds check"),
322 );
323 if end_sentinel != RECORD_END {
324 return Err(StorageError::TornWrite {
325 reason: format!(
326 "missing or corrupted RECORD_END sentinel at offset {}: expected {:#010x}, found {:#010x}",
327 offset.as_u64(),
328 RECORD_END,
329 end_sentinel
330 ),
331 });
332 }
333
334 Ok((
335 Record {
336 offset,
337 prev_hash,
338 kind,
339 compression,
340 payload,
341 },
342 total_size,
343 ))
344 }
345}