velesdb_core/storage/
log_payload.rs

1//! Log-structured payload storage with snapshot support.
2//!
3//! Stores payloads in an append-only log file with an in-memory index.
4//! Supports periodic snapshots for fast cold-start recovery.
5//!
6//! # Snapshot System (P0 Optimization)
7//!
8//! Without snapshots, cold start requires replaying the entire WAL (O(N)).
9//! With snapshots, we load the index directly and only replay the delta.
10//!
11//! ## Files
12//!
13//! - `payloads.log` - Append-only WAL (Write-Ahead Log)
14//! - `payloads.snapshot` - Binary snapshot of the index
15//!
16//! ## Snapshot Format
17//!
18//! ```text
19//! [Magic: "VSNP" 4 bytes]
20//! [Version: 1 byte]
21//! [WAL position: 8 bytes]
22//! [Entry count: 8 bytes]
23//! [Entries: (id: u64, offset: u64) × N]
24//! [CRC32: 4 bytes]
25//! ```
26
27use super::traits::PayloadStorage;
28
29use parking_lot::RwLock;
30use rustc_hash::FxHashMap;
31use std::fs::{File, OpenOptions};
32use std::io::{self, BufReader, Read, Seek, SeekFrom, Write};
33use std::path::{Path, PathBuf};
34
35/// Snapshot file magic bytes.
36pub(crate) const SNAPSHOT_MAGIC: &[u8; 4] = b"VSNP";
37
38/// Current snapshot format version.
39pub(crate) const SNAPSHOT_VERSION: u8 = 1;
40
41/// Default threshold for automatic snapshot creation (10 MB of WAL since last snapshot).
42const DEFAULT_SNAPSHOT_THRESHOLD: u64 = 10 * 1024 * 1024;
43
44/// Simple CRC32 implementation (IEEE 802.3 polynomial).
45///
46/// Used for snapshot integrity validation.
47#[inline]
48#[allow(clippy::cast_possible_truncation)] // Table index always 0-255
49fn crc32_hash(data: &[u8]) -> u32 {
50    const CRC32_TABLE: [u32; 256] = {
51        let mut table = [0u32; 256];
52        let mut i = 0;
53        while i < 256 {
54            let mut crc = i as u32;
55            let mut j = 0;
56            while j < 8 {
57                if crc & 1 != 0 {
58                    crc = (crc >> 1) ^ 0xEDB8_8320;
59                } else {
60                    crc >>= 1;
61                }
62                j += 1;
63            }
64            table[i] = crc;
65            i += 1;
66        }
67        table
68    };
69
70    let mut crc = 0xFFFF_FFFF_u32;
71    for &byte in data {
72        let idx = ((crc ^ u32::from(byte)) & 0xFF) as usize;
73        crc = (crc >> 8) ^ CRC32_TABLE[idx];
74    }
75    !crc
76}
77
78/// Log-structured payload storage with snapshot support.
79///
80/// Stores payloads in an append-only log file with an in-memory index.
81/// Supports periodic snapshots for O(1) cold-start recovery instead of O(N) WAL replay.
82#[allow(clippy::module_name_repetitions)]
83pub struct LogPayloadStorage {
84    /// Directory path for storage files
85    path: PathBuf,
86    /// In-memory index: ID -> Offset of length field in WAL
87    index: RwLock<FxHashMap<u64, u64>>,
88    /// Write-Ahead Log writer (append-only)
89    wal: RwLock<io::BufWriter<File>>,
90    /// Independent file handle for reading, protected for seeking
91    reader: RwLock<File>,
92    /// WAL position at last snapshot (0 = no snapshot)
93    last_snapshot_wal_pos: RwLock<u64>,
94}
95
96impl LogPayloadStorage {
97    /// Creates a new `LogPayloadStorage` or opens an existing one.
98    ///
99    /// If a snapshot file exists and is valid, loads from snapshot and replays
100    /// only the WAL delta for fast startup. Otherwise, falls back to full WAL replay.
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if file operations fail.
105    pub fn new<P: AsRef<Path>>(path: P) -> io::Result<Self> {
106        let path = path.as_ref().to_path_buf();
107        std::fs::create_dir_all(&path)?;
108        let log_path = path.join("payloads.log");
109        let snapshot_path = path.join("payloads.snapshot");
110
111        // Open WAL for writing (append)
112        let writer_file = OpenOptions::new()
113            .create(true)
114            .append(true)
115            .open(&log_path)?;
116        let wal = io::BufWriter::new(writer_file);
117
118        // Open reader for random access
119        // Create empty file if it doesn't exist
120        if !log_path.exists() {
121            File::create(&log_path)?;
122        }
123        let reader = File::open(&log_path)?;
124        let wal_len = reader.metadata()?.len();
125
126        // Try to load from snapshot, fall back to full WAL replay
127        let (index, last_snapshot_wal_pos) =
128            if let Ok((snapshot_index, snapshot_wal_pos)) = Self::load_snapshot(&snapshot_path) {
129                // Replay WAL delta (entries after snapshot)
130                let index =
131                    Self::replay_wal_from(&log_path, snapshot_index, snapshot_wal_pos, wal_len)?;
132                (index, snapshot_wal_pos)
133            } else {
134                // No valid snapshot, full WAL replay
135                let index = Self::replay_wal_from(&log_path, FxHashMap::default(), 0, wal_len)?;
136                (index, 0)
137            };
138
139        Ok(Self {
140            path,
141            index: RwLock::new(index),
142            wal: RwLock::new(wal),
143            reader: RwLock::new(reader),
144            last_snapshot_wal_pos: RwLock::new(last_snapshot_wal_pos),
145        })
146    }
147
148    /// Replays WAL entries from `start_pos` to `end_pos`, updating the index.
149    fn replay_wal_from(
150        log_path: &Path,
151        mut index: FxHashMap<u64, u64>,
152        start_pos: u64,
153        end_pos: u64,
154    ) -> io::Result<FxHashMap<u64, u64>> {
155        if start_pos >= end_pos {
156            return Ok(index);
157        }
158
159        let file = File::open(log_path)?;
160        let mut reader_buf = BufReader::new(file);
161        reader_buf.seek(SeekFrom::Start(start_pos))?;
162
163        let mut pos = start_pos;
164
165        while pos < end_pos {
166            // Read marker (1 byte)
167            let mut marker = [0u8; 1];
168            if reader_buf.read_exact(&mut marker).is_err() {
169                break;
170            }
171            pos += 1;
172
173            // Read ID (8 bytes)
174            let mut id_bytes = [0u8; 8];
175            reader_buf.read_exact(&mut id_bytes)?;
176            let id = u64::from_le_bytes(id_bytes);
177            pos += 8;
178
179            if marker[0] == 1 {
180                // Store operation
181                let len_offset = pos;
182
183                // Read Len (4 bytes)
184                let mut len_bytes = [0u8; 4];
185                reader_buf.read_exact(&mut len_bytes)?;
186                let payload_len = u64::from(u32::from_le_bytes(len_bytes));
187                pos += 4;
188
189                index.insert(id, len_offset);
190
191                // Skip payload data
192                let skip = i64::try_from(payload_len)
193                    .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Payload too large"))?;
194                reader_buf.seek(SeekFrom::Current(skip))?;
195                pos += payload_len;
196            } else if marker[0] == 2 {
197                // Delete operation
198                index.remove(&id);
199            } else {
200                return Err(io::Error::new(io::ErrorKind::InvalidData, "Unknown marker"));
201            }
202        }
203
204        Ok(index)
205    }
206
207    /// Loads index from snapshot file.
208    ///
209    /// Returns (index, `wal_position`) if successful.
210    fn load_snapshot(snapshot_path: &Path) -> io::Result<(FxHashMap<u64, u64>, u64)> {
211        if !snapshot_path.exists() {
212            return Err(io::Error::new(io::ErrorKind::NotFound, "No snapshot"));
213        }
214
215        let data = std::fs::read(snapshot_path)?;
216
217        // Validate minimum size: magic(4) + version(1) + wal_pos(8) + count(8) + crc(4) = 25
218        if data.len() < 25 {
219            return Err(io::Error::new(
220                io::ErrorKind::InvalidData,
221                "Snapshot too small",
222            ));
223        }
224
225        // Validate magic
226        if &data[0..4] != SNAPSHOT_MAGIC {
227            return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid magic"));
228        }
229
230        // Validate version
231        if data[4] != SNAPSHOT_VERSION {
232            return Err(io::Error::new(
233                io::ErrorKind::InvalidData,
234                "Unsupported version",
235            ));
236        }
237
238        // Read WAL position
239        let wal_pos = u64::from_le_bytes(
240            data[5..13]
241                .try_into()
242                .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid WAL position"))?,
243        );
244
245        // Read entry count
246        let entry_count_u64 = u64::from_le_bytes(
247            data[13..21]
248                .try_into()
249                .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid entry count"))?,
250        );
251
252        // P1 Audit: Validate entry_count BEFORE conversion to prevent DoS via huge values
253        // Max reasonable entry count: data.len() / 16 (minimum entry size)
254        // This check prevents both overflow and OOM attacks
255        let max_possible_entries = data.len().saturating_sub(25) / 16; // header(21) + crc(4) = 25
256        if entry_count_u64 > max_possible_entries as u64 {
257            return Err(io::Error::new(
258                io::ErrorKind::InvalidData,
259                "Entry count exceeds data size",
260            ));
261        }
262
263        #[allow(clippy::cast_possible_truncation)] // Validated above
264        let entry_count = entry_count_u64 as usize;
265
266        // Validate size: header(21) + entries(entry_count * 16) + crc(4)
267        // Safe: entry_count is validated to not cause overflow
268        let expected_size = 21 + entry_count * 16 + 4;
269        if data.len() != expected_size {
270            return Err(io::Error::new(io::ErrorKind::InvalidData, "Size mismatch"));
271        }
272
273        // Validate CRC
274        let stored_crc = u32::from_le_bytes(
275            data[data.len() - 4..]
276                .try_into()
277                .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid CRC"))?,
278        );
279        let computed_crc = crc32_hash(&data[..data.len() - 4]);
280        if stored_crc != computed_crc {
281            return Err(io::Error::new(io::ErrorKind::InvalidData, "CRC mismatch"));
282        }
283
284        // Read entries
285        let mut index = FxHashMap::default();
286        index.reserve(entry_count);
287
288        let entries_start = 21;
289        for i in 0..entry_count {
290            let offset = entries_start + i * 16;
291            let id = u64::from_le_bytes(
292                data[offset..offset + 8]
293                    .try_into()
294                    .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid entry ID"))?,
295            );
296            let wal_offset =
297                u64::from_le_bytes(data[offset + 8..offset + 16].try_into().map_err(|_| {
298                    io::Error::new(io::ErrorKind::InvalidData, "Invalid entry offset")
299                })?);
300            index.insert(id, wal_offset);
301        }
302
303        Ok((index, wal_pos))
304    }
305
306    /// Creates a snapshot of the current index state.
307    ///
308    /// The snapshot captures:
309    /// - Current WAL position
310    /// - All index entries (ID -> offset mappings)
311    /// - CRC32 checksum for integrity
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if file operations fail.
316    pub fn create_snapshot(&mut self) -> io::Result<()> {
317        // Flush WAL first to ensure all writes are on disk
318        self.wal.write().flush()?;
319
320        let snapshot_path = self.path.join("payloads.snapshot");
321        let index = self.index.read();
322
323        // Get current WAL position
324        let wal_pos = self.wal.write().get_ref().metadata()?.len();
325
326        // Calculate buffer size
327        let entry_count = index.len();
328        let buf_size = 21 + entry_count * 16 + 4; // header + entries + crc
329        let mut buf = Vec::with_capacity(buf_size);
330
331        // Write header
332        buf.extend_from_slice(SNAPSHOT_MAGIC);
333        buf.push(SNAPSHOT_VERSION);
334        buf.extend_from_slice(&wal_pos.to_le_bytes());
335        buf.extend_from_slice(&(entry_count as u64).to_le_bytes());
336
337        // Write entries
338        for (&id, &offset) in index.iter() {
339            buf.extend_from_slice(&id.to_le_bytes());
340            buf.extend_from_slice(&offset.to_le_bytes());
341        }
342
343        // Compute and append CRC
344        let crc = crc32_hash(&buf);
345        buf.extend_from_slice(&crc.to_le_bytes());
346
347        // Write atomically via temp file + rename
348        let temp_path = self.path.join("payloads.snapshot.tmp");
349        std::fs::write(&temp_path, &buf)?;
350        std::fs::rename(&temp_path, &snapshot_path)?;
351
352        // Update last snapshot position
353        *self.last_snapshot_wal_pos.write() = wal_pos;
354
355        Ok(())
356    }
357
358    /// Returns whether a new snapshot should be created.
359    ///
360    /// Heuristic: Returns true if WAL has grown by more than `DEFAULT_SNAPSHOT_THRESHOLD`
361    /// bytes since the last snapshot.
362    #[must_use]
363    pub fn should_create_snapshot(&self) -> bool {
364        let last_pos = *self.last_snapshot_wal_pos.read();
365
366        // Get current WAL size
367        let current_pos = match self.wal.write().get_ref().metadata() {
368            Ok(m) => m.len(),
369            Err(_) => return false,
370        };
371
372        current_pos.saturating_sub(last_pos) >= DEFAULT_SNAPSHOT_THRESHOLD
373    }
374}
375
376impl PayloadStorage for LogPayloadStorage {
377    fn store(&mut self, id: u64, payload: &serde_json::Value) -> io::Result<()> {
378        let payload_bytes = serde_json::to_vec(payload)
379            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
380
381        let mut wal = self.wal.write();
382        let mut index = self.index.write();
383
384        // Let's force flush to get accurate position or track it manually.
385        wal.flush()?;
386        let pos = wal.get_ref().metadata()?.len();
387
388        // Op: Store (1) | ID | Len | Data
389        // Pos points to start of record (Marker)
390        // We want index to point to Len (Marker(1) + ID(8) = +9 bytes)
391
392        wal.write_all(&[1u8])?;
393        wal.write_all(&id.to_le_bytes())?;
394        let len_u32 = u32::try_from(payload_bytes.len())
395            .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "Payload too large"))?;
396        wal.write_all(&len_u32.to_le_bytes())?;
397        wal.write_all(&payload_bytes)?;
398
399        // Flush to ensure reader sees it
400        wal.flush()?;
401
402        index.insert(id, pos + 9);
403
404        Ok(())
405    }
406
407    fn retrieve(&self, id: u64) -> io::Result<Option<serde_json::Value>> {
408        let index = self.index.read();
409        let Some(&offset) = index.get(&id) else {
410            return Ok(None);
411        };
412        drop(index);
413
414        let mut reader = self.reader.write(); // Need write lock to seek
415        reader.seek(SeekFrom::Start(offset))?;
416
417        let mut len_bytes = [0u8; 4];
418        reader.read_exact(&mut len_bytes)?;
419        let len = u32::from_le_bytes(len_bytes) as usize;
420
421        let mut payload_bytes = vec![0u8; len];
422        reader.read_exact(&mut payload_bytes)?;
423
424        let payload = serde_json::from_slice(&payload_bytes)
425            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
426
427        Ok(Some(payload))
428    }
429
430    fn delete(&mut self, id: u64) -> io::Result<()> {
431        let mut wal = self.wal.write();
432        let mut index = self.index.write();
433
434        wal.write_all(&[2u8])?;
435        wal.write_all(&id.to_le_bytes())?;
436
437        index.remove(&id);
438
439        Ok(())
440    }
441
442    fn flush(&mut self) -> io::Result<()> {
443        self.wal.write().flush()
444    }
445
446    fn ids(&self) -> Vec<u64> {
447        self.index.read().keys().copied().collect()
448    }
449}