Skip to main content

velesdb_core/storage/
log_payload.rs

1//! Log-structured payload storage with snapshot support.
2//!
3//! Stores payloads in an append-only log file with an in-memory index.
4//! Supports periodic snapshots for fast cold-start recovery.
5//!
6//! ## WAL Entry Formats
7//!
8//! **CRC32-protected (current, markers 0xC3/0xC4):**
9//! ```text
10//! Store:  [0xC3: 1B] [id: 8B LE] [len: 4B LE] [payload: len B] [crc32: 4B LE]
11//! Delete: [0xC4: 1B] [id: 8B LE] [crc32: 4B LE]
12//! ```
13//!
14//! **Legacy (markers 1/2, read-only for backward compatibility):**
15//! ```text
16//! Store:  [1: 1B] [id: 8B LE] [len: 4B LE] [payload: len B]
17//! Delete: [2: 1B] [id: 8B LE]
18//! ```
19//!
20//! CRC32 covers all bytes preceding the CRC field. On CRC mismatch during
21//! replay, the corrupted entry is skipped and a warning is logged.
22//!
23//! Snapshot format and I/O are handled by the [`super::snapshot`] module.
24
25use super::log_payload_io::{compute_delete_crc, write_store_record, CRC_DELETE_MARKER};
26use super::snapshot;
27use super::traits::PayloadStorage;
28
29// Re-export snapshot items for backward compatibility with existing imports
30#[allow(unused_imports)] // SNAPSHOT_MAGIC/VERSION used only in test modules
31pub(crate) use snapshot::{crc32_hash, SNAPSHOT_MAGIC, SNAPSHOT_VERSION};
32
33use parking_lot::RwLock;
34use rustc_hash::FxHashMap;
35use std::fs::{File, OpenOptions};
36use std::io::{self, BufReader, Read, Seek, SeekFrom, Write};
37use std::path::{Path, PathBuf};
38
39/// Controls how payload WAL writes are synced to disk.
40///
41/// - `Fsync` (default): `flush()` + `sync_all()` — full durability, safe against power loss.
42/// - `FlushOnly`: `flush()` only — data reaches OS kernel but may be lost on power failure.
43/// - `None`: No sync — maximum throughput for bulk imports where data can be re-derived.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
45#[non_exhaustive]
46pub enum DurabilityMode {
47    /// Full durability: flush buffer + fsync to disk.
48    #[default]
49    Fsync,
50    /// Flush buffer to OS only (no fsync). Faster but not power-loss safe.
51    FlushOnly,
52    /// No sync at all. Maximum throughput for bulk imports.
53    None,
54}
55
56/// Log-structured payload storage with snapshot support.
57///
58/// Stores payloads in an append-only log file with an in-memory index.
59/// Supports periodic snapshots for O(1) cold-start recovery instead of O(N) WAL replay.
60#[allow(clippy::module_name_repetitions)]
61pub struct LogPayloadStorage {
62    /// Directory path for storage files
63    path: PathBuf,
64    /// In-memory index: ID -> Offset of length field in WAL
65    index: RwLock<FxHashMap<u64, u64>>,
66    /// Write-Ahead Log writer (append-only)
67    wal: RwLock<io::BufWriter<File>>,
68    /// Independent file handle for reading, protected for seeking
69    reader: RwLock<File>,
70    /// WAL position at last snapshot (0 = no snapshot)
71    last_snapshot_wal_pos: RwLock<u64>,
72    /// Durability mode for WAL writes
73    durability: DurabilityMode,
74    /// Tracked WAL write position (avoids flush+metadata syscall for `DurabilityMode::None`)
75    write_offset: RwLock<u64>,
76}
77
78use super::wal_entry::WalEntry;
79
80impl LogPayloadStorage {
81    /// Creates a new `LogPayloadStorage` with the default durability mode (`Fsync`).
82    ///
83    /// If a snapshot file exists and is valid, loads from snapshot and replays
84    /// only the WAL delta for fast startup. Otherwise, falls back to full WAL replay.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if file operations fail.
89    pub fn new<P: AsRef<Path>>(path: P) -> io::Result<Self> {
90        Self::new_with_durability(path, DurabilityMode::default())
91    }
92
93    /// Creates a new `LogPayloadStorage` with the specified durability mode.
94    ///
95    /// See [`DurabilityMode`] for available modes and their trade-offs.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if file operations fail.
100    pub fn new_with_durability<P: AsRef<Path>>(
101        path: P,
102        durability: DurabilityMode,
103    ) -> io::Result<Self> {
104        let path = path.as_ref().to_path_buf();
105        std::fs::create_dir_all(&path)?;
106        let log_path = path.join("payloads.log");
107
108        let wal = Self::open_wal_writer(&log_path)?;
109        let (reader, wal_len) = Self::open_wal_reader(&log_path)?;
110        let (index, last_snapshot_wal_pos) = Self::load_or_replay_index(&path, &log_path, wal_len)?;
111
112        Ok(Self {
113            path,
114            index: RwLock::new(index),
115            wal: RwLock::new(wal),
116            reader: RwLock::new(reader),
117            last_snapshot_wal_pos: RwLock::new(last_snapshot_wal_pos),
118            durability,
119            write_offset: RwLock::new(wal_len),
120        })
121    }
122
123    /// Opens the WAL file for append-mode writing.
124    fn open_wal_writer(log_path: &Path) -> io::Result<io::BufWriter<File>> {
125        let writer_file = OpenOptions::new()
126            .create(true)
127            .append(true)
128            .open(log_path)?;
129        Ok(io::BufWriter::new(writer_file))
130    }
131
132    /// Opens the WAL file for random-access reading, creating it if absent.
133    ///
134    /// Returns the reader handle and the current WAL length in bytes.
135    fn open_wal_reader(log_path: &Path) -> io::Result<(File, u64)> {
136        if !log_path.exists() {
137            File::create(log_path)?;
138        }
139        let reader = File::open(log_path)?;
140        let wal_len = reader.metadata()?.len();
141        Ok((reader, wal_len))
142    }
143
144    /// Loads the payload index, trying a snapshot first, falling back to full WAL replay.
145    ///
146    /// Returns `(index, last_snapshot_wal_position)`.
147    fn load_or_replay_index(
148        dir: &Path,
149        log_path: &Path,
150        wal_len: u64,
151    ) -> io::Result<(FxHashMap<u64, u64>, u64)> {
152        let snapshot_path = dir.join("payloads.snapshot");
153        if let Ok((snapshot_index, snapshot_wal_pos)) = snapshot::load_snapshot(&snapshot_path) {
154            let index = Self::replay_wal_from(log_path, snapshot_index, snapshot_wal_pos, wal_len)?;
155            Ok((index, snapshot_wal_pos))
156        } else {
157            let index = Self::replay_wal_from(log_path, FxHashMap::default(), 0, wal_len)?;
158            Ok((index, 0))
159        }
160    }
161
162    /// Applies the configured durability mode to a WAL writer.
163    fn sync_wal(wal: &mut io::BufWriter<File>, mode: DurabilityMode) -> io::Result<()> {
164        match mode {
165            DurabilityMode::Fsync => {
166                wal.flush()?;
167                wal.get_ref().sync_all()?;
168            }
169            DurabilityMode::FlushOnly => {
170                wal.flush()?;
171            }
172            DurabilityMode::None => {}
173        }
174        Ok(())
175    }
176
177    /// Syncs the WAL according to durability mode, resyncing `write_offset`
178    /// with the actual file length on failure to prevent desync on subsequent
179    /// writes.
180    ///
181    /// RF-2: Shared by `store` and `delete` to eliminate duplicated
182    /// sync-and-resync-offset error handling.
183    fn sync_wal_or_resync(
184        wal: &mut io::BufWriter<File>,
185        mode: DurabilityMode,
186        offset: &mut u64,
187    ) -> io::Result<()> {
188        if let Err(e) = Self::sync_wal(wal, mode) {
189            if let Ok(meta) = wal.get_ref().metadata() {
190                *offset = meta.len();
191            }
192            return Err(e);
193        }
194        Ok(())
195    }
196
197    /// Replays WAL entries from `start_pos` to `end_pos`, updating the index.
198    fn replay_wal_from(
199        log_path: &Path,
200        mut index: FxHashMap<u64, u64>,
201        start_pos: u64,
202        end_pos: u64,
203    ) -> io::Result<FxHashMap<u64, u64>> {
204        if start_pos >= end_pos {
205            return Ok(index);
206        }
207
208        let file = File::open(log_path)?;
209        let mut reader_buf = BufReader::new(file);
210        reader_buf.seek(SeekFrom::Start(start_pos))?;
211
212        let mut pos = start_pos;
213        while pos < end_pos {
214            let Some(entry) = WalEntry::read(&mut reader_buf, pos)? else {
215                break;
216            };
217            pos = entry.apply(&mut index, &mut reader_buf)?;
218        }
219
220        Ok(index)
221    }
222
223    /// Creates a snapshot of the current index state.
224    ///
225    /// The snapshot captures:
226    /// - Current WAL position
227    /// - All index entries (ID -> offset mappings)
228    /// - CRC32 checksum for integrity
229    ///
230    /// # Errors
231    ///
232    /// Returns an error if file operations fail.
233    pub fn create_snapshot(&mut self) -> io::Result<()> {
234        // Flush WAL before snapshotting to ensure data is on disk for the reader
235        {
236            let mut wal = self.wal.write();
237            wal.flush()?;
238            wal.get_ref().sync_all()?;
239        }
240
241        let index = self.index.read();
242        let wal_pos = *self.write_offset.read();
243
244        snapshot::create_snapshot_file(&self.path, &index, wal_pos)?;
245
246        *self.last_snapshot_wal_pos.write() = wal_pos;
247
248        Ok(())
249    }
250
251    /// Returns whether a new snapshot should be created.
252    ///
253    /// Heuristic: Returns true if WAL has grown by more than the default threshold
254    /// bytes since the last snapshot.
255    #[must_use]
256    pub fn should_create_snapshot(&self) -> bool {
257        snapshot::should_create_snapshot(
258            *self.last_snapshot_wal_pos.read(),
259            *self.write_offset.read(),
260        )
261    }
262
263    /// Attempts to create a snapshot if the WAL has grown past the threshold.
264    ///
265    /// Best-effort: on failure the error is logged but not propagated,
266    /// because the WAL write that triggered the check already succeeded.
267    fn maybe_auto_snapshot(&mut self) {
268        if self.should_create_snapshot() {
269            if let Err(e) = self.create_snapshot() {
270                tracing::warn!(
271                    error = %e,
272                    "Auto-snapshot after WAL growth failed; will retry on next write"
273                );
274            }
275        }
276    }
277
278    /// Stores multiple payloads in a single batch operation.
279    ///
280    /// Optimized for bulk imports: acquires WAL + index + offset locks once,
281    /// writes all records sequentially, and performs a **single** durability
282    /// sync at the end instead of per-point fsync.
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if serialization or WAL write fails. On partial failure,
287    /// entries written before the error are durable (WAL is append-only).
288    pub fn store_batch(&mut self, entries: &[(u64, &serde_json::Value)]) -> io::Result<()> {
289        self.store_batch_inner(entries, true)
290    }
291
292    /// Stores multiple payloads without forcing an fsync at the end.
293    ///
294    /// Identical to [`store_batch`](Self::store_batch) except the final
295    /// `sync_all()` is replaced by a buffer-only `flush()`. WAL entries are
296    /// written and the `BufWriter` is flushed to the OS kernel, but not
297    /// fsynced to disk.
298    ///
299    /// Use this for intermediate batches in a streaming bulk import, where
300    /// only the final batch needs full durability. Call
301    /// [`PayloadStorage::flush()`] after the last batch to force fsync.
302    ///
303    /// # Errors
304    ///
305    /// Returns an error if serialization or WAL write fails.
306    pub fn store_batch_deferred(
307        &mut self,
308        entries: &[(u64, &serde_json::Value)],
309    ) -> io::Result<()> {
310        self.store_batch_inner(entries, false)
311    }
312
313    /// Shared implementation for [`store_batch`] and [`store_batch_deferred`].
314    ///
315    /// When `fsync` is `true`, the configured durability mode is applied.
316    /// When `false`, only a buffer flush is performed (no `sync_all`).
317    fn store_batch_inner(
318        &mut self,
319        entries: &[(u64, &serde_json::Value)],
320        fsync: bool,
321    ) -> io::Result<()> {
322        if entries.is_empty() {
323            return Ok(());
324        }
325
326        {
327            let mut wal = self.wal.write();
328            let mut index = self.index.write();
329            let mut offset = self.write_offset.write();
330            let mut record_buf = Vec::with_capacity(256);
331
332            for &(id, payload) in entries {
333                write_store_record(
334                    &mut wal,
335                    id,
336                    payload,
337                    &mut offset,
338                    &mut index,
339                    &mut record_buf,
340                )?;
341            }
342
343            if fsync {
344                Self::sync_wal_or_resync(&mut wal, self.durability, &mut offset)?;
345            } else {
346                // Buffer-only flush: data reaches OS kernel but is not fsynced.
347                // Safe for intermediate batches — caller must fsync after the
348                // final batch.
349                wal.flush()?;
350            }
351        }
352
353        self.maybe_auto_snapshot();
354        Ok(())
355    }
356}
357
358impl PayloadStorage for LogPayloadStorage {
359    fn store(&mut self, id: u64, payload: &serde_json::Value) -> io::Result<()> {
360        // Scoped block: lock guards released before auto-snapshot (which acquires locks).
361        {
362            let mut wal = self.wal.write();
363            let mut index = self.index.write();
364            let mut offset = self.write_offset.write();
365            let mut record_buf = Vec::new();
366
367            write_store_record(
368                &mut wal,
369                id,
370                payload,
371                &mut offset,
372                &mut index,
373                &mut record_buf,
374            )?;
375
376            Self::sync_wal_or_resync(&mut wal, self.durability, &mut offset)?;
377        }
378
379        self.maybe_auto_snapshot();
380        Ok(())
381    }
382
383    fn retrieve(&self, id: u64) -> io::Result<Option<serde_json::Value>> {
384        let index = self.index.read();
385        let Some(&offset) = index.get(&id) else {
386            return Ok(None);
387        };
388        drop(index);
389
390        // H-2: Only flush when DurabilityMode::None is configured, because sync_wal()
391        // already flushes the BufWriter after every write in Fsync and FlushOnly modes.
392        // Skipping this avoids acquiring the WAL write lock on every read, which would
393        // serialize all readers behind writers.
394        if self.durability == DurabilityMode::None {
395            self.wal.write().flush()?;
396        }
397
398        let mut reader = self.reader.write(); // Need write lock to seek
399        reader.seek(SeekFrom::Start(offset))?;
400
401        let mut len_bytes = [0u8; 4];
402        reader.read_exact(&mut len_bytes)?;
403        let len = u32::from_le_bytes(len_bytes) as usize;
404
405        let mut payload_bytes = vec![0u8; len];
406        reader.read_exact(&mut payload_bytes)?;
407
408        let payload = serde_json::from_slice(&payload_bytes)
409            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
410
411        Ok(Some(payload))
412    }
413
414    fn delete(&mut self, id: u64) -> io::Result<()> {
415        let crc = compute_delete_crc(id);
416
417        // Scoped block: all lock guards are released before the auto-snapshot
418        // check, which itself acquires locks (see `create_snapshot`).
419        {
420            let mut wal = self.wal.write();
421            let mut index = self.index.write();
422            let mut offset = self.write_offset.write();
423
424            // H-3: Build complete delete record in one buffer to minimize partial-write window.
425            // CRC-protected format: Marker(0xC4) | ID(8) | CRC32(4)
426            let mut record = [0u8; 1 + 8 + 4];
427            record[0] = CRC_DELETE_MARKER;
428            record[1..9].copy_from_slice(&id.to_le_bytes());
429            record[9..13].copy_from_slice(&crc.to_le_bytes());
430            wal.write_all(&record)?;
431
432            // Sync WAL according to durability mode (resync offset on failure).
433            Self::sync_wal_or_resync(&mut wal, self.durability, &mut offset)?;
434
435            *offset += 1 + 8 + 4; // Marker(1) + ID(8) + CRC32(4)
436            index.remove(&id);
437        }
438
439        self.maybe_auto_snapshot();
440        Ok(())
441    }
442
443    fn flush(&mut self) -> io::Result<()> {
444        let mut wal = self.wal.write();
445        Self::sync_wal(&mut wal, self.durability)
446    }
447
448    fn ids(&self) -> Vec<u64> {
449        self.index.read().keys().copied().collect()
450    }
451}