velesdb_core/storage/log_payload.rs
1//! Log-structured payload storage with snapshot support.
2//!
3//! Stores payloads in an append-only log file with an in-memory index.
4//! Supports periodic snapshots for fast cold-start recovery.
5//!
6//! ## WAL Entry Formats
7//!
8//! **CRC32-protected (current, markers 0xC3/0xC4):**
9//! ```text
10//! Store: [0xC3: 1B] [id: 8B LE] [len: 4B LE] [payload: len B] [crc32: 4B LE]
11//! Delete: [0xC4: 1B] [id: 8B LE] [crc32: 4B LE]
12//! ```
13//!
14//! **Legacy (markers 1/2, read-only for backward compatibility):**
15//! ```text
16//! Store: [1: 1B] [id: 8B LE] [len: 4B LE] [payload: len B]
17//! Delete: [2: 1B] [id: 8B LE]
18//! ```
19//!
20//! CRC32 covers all bytes preceding the CRC field. On CRC mismatch during
21//! replay, the corrupted entry is skipped and a warning is logged.
22//!
23//! Snapshot format and I/O are handled by the [`super::snapshot`] module.
24
25use super::log_payload_io::{compute_delete_crc, write_store_record, CRC_DELETE_MARKER};
26use super::snapshot;
27use super::traits::PayloadStorage;
28
29// Re-export snapshot items for backward compatibility with existing imports
30#[allow(unused_imports)] // SNAPSHOT_MAGIC/VERSION used only in test modules
31pub(crate) use snapshot::{crc32_hash, SNAPSHOT_MAGIC, SNAPSHOT_VERSION};
32
33use parking_lot::RwLock;
34use rustc_hash::FxHashMap;
35use std::fs::{File, OpenOptions};
36use std::io::{self, BufReader, Read, Seek, SeekFrom, Write};
37use std::path::{Path, PathBuf};
38
39/// Controls how payload WAL writes are synced to disk.
40///
41/// - `Fsync` (default): `flush()` + `sync_all()` — full durability, safe against power loss.
42/// - `FlushOnly`: `flush()` only — data reaches OS kernel but may be lost on power failure.
43/// - `None`: No sync — maximum throughput for bulk imports where data can be re-derived.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
45#[non_exhaustive]
46pub enum DurabilityMode {
47 /// Full durability: flush buffer + fsync to disk.
48 #[default]
49 Fsync,
50 /// Flush buffer to OS only (no fsync). Faster but not power-loss safe.
51 FlushOnly,
52 /// No sync at all. Maximum throughput for bulk imports.
53 None,
54}
55
56/// Log-structured payload storage with snapshot support.
57///
58/// Stores payloads in an append-only log file with an in-memory index.
59/// Supports periodic snapshots for O(1) cold-start recovery instead of O(N) WAL replay.
60#[allow(clippy::module_name_repetitions)]
61pub struct LogPayloadStorage {
62 /// Directory path for storage files
63 path: PathBuf,
64 /// In-memory index: ID -> Offset of length field in WAL
65 index: RwLock<FxHashMap<u64, u64>>,
66 /// Write-Ahead Log writer (append-only)
67 wal: RwLock<io::BufWriter<File>>,
68 /// Independent file handle for reading, protected for seeking
69 reader: RwLock<File>,
70 /// WAL position at last snapshot (0 = no snapshot)
71 last_snapshot_wal_pos: RwLock<u64>,
72 /// Durability mode for WAL writes
73 durability: DurabilityMode,
74 /// Tracked WAL write position (avoids flush+metadata syscall for `DurabilityMode::None`)
75 write_offset: RwLock<u64>,
76}
77
78use super::wal_entry::WalEntry;
79
80impl LogPayloadStorage {
81 /// Creates a new `LogPayloadStorage` with the default durability mode (`Fsync`).
82 ///
83 /// If a snapshot file exists and is valid, loads from snapshot and replays
84 /// only the WAL delta for fast startup. Otherwise, falls back to full WAL replay.
85 ///
86 /// # Errors
87 ///
88 /// Returns an error if file operations fail.
89 pub fn new<P: AsRef<Path>>(path: P) -> io::Result<Self> {
90 Self::new_with_durability(path, DurabilityMode::default())
91 }
92
93 /// Creates a new `LogPayloadStorage` with the specified durability mode.
94 ///
95 /// See [`DurabilityMode`] for available modes and their trade-offs.
96 ///
97 /// # Errors
98 ///
99 /// Returns an error if file operations fail.
100 pub fn new_with_durability<P: AsRef<Path>>(
101 path: P,
102 durability: DurabilityMode,
103 ) -> io::Result<Self> {
104 let path = path.as_ref().to_path_buf();
105 std::fs::create_dir_all(&path)?;
106 let log_path = path.join("payloads.log");
107
108 let wal = Self::open_wal_writer(&log_path)?;
109 let (reader, wal_len) = Self::open_wal_reader(&log_path)?;
110 let (index, last_snapshot_wal_pos) = Self::load_or_replay_index(&path, &log_path, wal_len)?;
111
112 Ok(Self {
113 path,
114 index: RwLock::new(index),
115 wal: RwLock::new(wal),
116 reader: RwLock::new(reader),
117 last_snapshot_wal_pos: RwLock::new(last_snapshot_wal_pos),
118 durability,
119 write_offset: RwLock::new(wal_len),
120 })
121 }
122
123 /// Opens the WAL file for append-mode writing.
124 fn open_wal_writer(log_path: &Path) -> io::Result<io::BufWriter<File>> {
125 let writer_file = OpenOptions::new()
126 .create(true)
127 .append(true)
128 .open(log_path)?;
129 Ok(io::BufWriter::new(writer_file))
130 }
131
132 /// Opens the WAL file for random-access reading, creating it if absent.
133 ///
134 /// Returns the reader handle and the current WAL length in bytes.
135 fn open_wal_reader(log_path: &Path) -> io::Result<(File, u64)> {
136 if !log_path.exists() {
137 File::create(log_path)?;
138 }
139 let reader = File::open(log_path)?;
140 let wal_len = reader.metadata()?.len();
141 Ok((reader, wal_len))
142 }
143
144 /// Loads the payload index, trying a snapshot first, falling back to full WAL replay.
145 ///
146 /// Returns `(index, last_snapshot_wal_position)`.
147 fn load_or_replay_index(
148 dir: &Path,
149 log_path: &Path,
150 wal_len: u64,
151 ) -> io::Result<(FxHashMap<u64, u64>, u64)> {
152 let snapshot_path = dir.join("payloads.snapshot");
153 if let Ok((snapshot_index, snapshot_wal_pos)) = snapshot::load_snapshot(&snapshot_path) {
154 let index = Self::replay_wal_from(log_path, snapshot_index, snapshot_wal_pos, wal_len)?;
155 Ok((index, snapshot_wal_pos))
156 } else {
157 let index = Self::replay_wal_from(log_path, FxHashMap::default(), 0, wal_len)?;
158 Ok((index, 0))
159 }
160 }
161
162 /// Applies the configured durability mode to a WAL writer.
163 fn sync_wal(wal: &mut io::BufWriter<File>, mode: DurabilityMode) -> io::Result<()> {
164 match mode {
165 DurabilityMode::Fsync => {
166 wal.flush()?;
167 wal.get_ref().sync_all()?;
168 }
169 DurabilityMode::FlushOnly => {
170 wal.flush()?;
171 }
172 DurabilityMode::None => {}
173 }
174 Ok(())
175 }
176
177 /// Syncs the WAL according to durability mode, resyncing `write_offset`
178 /// with the actual file length on failure to prevent desync on subsequent
179 /// writes.
180 ///
181 /// RF-2: Shared by `store` and `delete` to eliminate duplicated
182 /// sync-and-resync-offset error handling.
183 fn sync_wal_or_resync(
184 wal: &mut io::BufWriter<File>,
185 mode: DurabilityMode,
186 offset: &mut u64,
187 ) -> io::Result<()> {
188 if let Err(e) = Self::sync_wal(wal, mode) {
189 if let Ok(meta) = wal.get_ref().metadata() {
190 *offset = meta.len();
191 }
192 return Err(e);
193 }
194 Ok(())
195 }
196
197 /// Replays WAL entries from `start_pos` to `end_pos`, updating the index.
198 fn replay_wal_from(
199 log_path: &Path,
200 mut index: FxHashMap<u64, u64>,
201 start_pos: u64,
202 end_pos: u64,
203 ) -> io::Result<FxHashMap<u64, u64>> {
204 if start_pos >= end_pos {
205 return Ok(index);
206 }
207
208 let file = File::open(log_path)?;
209 let mut reader_buf = BufReader::new(file);
210 reader_buf.seek(SeekFrom::Start(start_pos))?;
211
212 let mut pos = start_pos;
213 while pos < end_pos {
214 let Some(entry) = WalEntry::read(&mut reader_buf, pos)? else {
215 break;
216 };
217 pos = entry.apply(&mut index, &mut reader_buf)?;
218 }
219
220 Ok(index)
221 }
222
223 /// Creates a snapshot of the current index state.
224 ///
225 /// The snapshot captures:
226 /// - Current WAL position
227 /// - All index entries (ID -> offset mappings)
228 /// - CRC32 checksum for integrity
229 ///
230 /// # Errors
231 ///
232 /// Returns an error if file operations fail.
233 pub fn create_snapshot(&mut self) -> io::Result<()> {
234 // Flush WAL before snapshotting to ensure data is on disk for the reader
235 {
236 let mut wal = self.wal.write();
237 wal.flush()?;
238 wal.get_ref().sync_all()?;
239 }
240
241 let index = self.index.read();
242 let wal_pos = *self.write_offset.read();
243
244 snapshot::create_snapshot_file(&self.path, &index, wal_pos)?;
245
246 *self.last_snapshot_wal_pos.write() = wal_pos;
247
248 Ok(())
249 }
250
251 /// Returns whether a new snapshot should be created.
252 ///
253 /// Heuristic: Returns true if WAL has grown by more than the default threshold
254 /// bytes since the last snapshot.
255 #[must_use]
256 pub fn should_create_snapshot(&self) -> bool {
257 snapshot::should_create_snapshot(
258 *self.last_snapshot_wal_pos.read(),
259 *self.write_offset.read(),
260 )
261 }
262
263 /// Attempts to create a snapshot if the WAL has grown past the threshold.
264 ///
265 /// Best-effort: on failure the error is logged but not propagated,
266 /// because the WAL write that triggered the check already succeeded.
267 fn maybe_auto_snapshot(&mut self) {
268 if self.should_create_snapshot() {
269 if let Err(e) = self.create_snapshot() {
270 tracing::warn!(
271 error = %e,
272 "Auto-snapshot after WAL growth failed; will retry on next write"
273 );
274 }
275 }
276 }
277
278 /// Stores multiple payloads in a single batch operation.
279 ///
280 /// Optimized for bulk imports: acquires WAL + index + offset locks once,
281 /// writes all records sequentially, and performs a **single** durability
282 /// sync at the end instead of per-point fsync.
283 ///
284 /// # Errors
285 ///
286 /// Returns an error if serialization or WAL write fails. On partial failure,
287 /// entries written before the error are durable (WAL is append-only).
288 pub fn store_batch(&mut self, entries: &[(u64, &serde_json::Value)]) -> io::Result<()> {
289 self.store_batch_inner(entries, true)
290 }
291
292 /// Stores multiple payloads without forcing an fsync at the end.
293 ///
294 /// Identical to [`store_batch`](Self::store_batch) except the final
295 /// `sync_all()` is replaced by a buffer-only `flush()`. WAL entries are
296 /// written and the `BufWriter` is flushed to the OS kernel, but not
297 /// fsynced to disk.
298 ///
299 /// Use this for intermediate batches in a streaming bulk import, where
300 /// only the final batch needs full durability. Call
301 /// [`PayloadStorage::flush()`] after the last batch to force fsync.
302 ///
303 /// # Errors
304 ///
305 /// Returns an error if serialization or WAL write fails.
306 pub fn store_batch_deferred(
307 &mut self,
308 entries: &[(u64, &serde_json::Value)],
309 ) -> io::Result<()> {
310 self.store_batch_inner(entries, false)
311 }
312
313 /// Shared implementation for [`store_batch`] and [`store_batch_deferred`].
314 ///
315 /// When `fsync` is `true`, the configured durability mode is applied.
316 /// When `false`, only a buffer flush is performed (no `sync_all`).
317 fn store_batch_inner(
318 &mut self,
319 entries: &[(u64, &serde_json::Value)],
320 fsync: bool,
321 ) -> io::Result<()> {
322 if entries.is_empty() {
323 return Ok(());
324 }
325
326 {
327 let mut wal = self.wal.write();
328 let mut index = self.index.write();
329 let mut offset = self.write_offset.write();
330 let mut record_buf = Vec::with_capacity(256);
331
332 for &(id, payload) in entries {
333 write_store_record(
334 &mut wal,
335 id,
336 payload,
337 &mut offset,
338 &mut index,
339 &mut record_buf,
340 )?;
341 }
342
343 if fsync {
344 Self::sync_wal_or_resync(&mut wal, self.durability, &mut offset)?;
345 } else {
346 // Buffer-only flush: data reaches OS kernel but is not fsynced.
347 // Safe for intermediate batches — caller must fsync after the
348 // final batch.
349 wal.flush()?;
350 }
351 }
352
353 self.maybe_auto_snapshot();
354 Ok(())
355 }
356}
357
358impl PayloadStorage for LogPayloadStorage {
359 fn store(&mut self, id: u64, payload: &serde_json::Value) -> io::Result<()> {
360 // Scoped block: lock guards released before auto-snapshot (which acquires locks).
361 {
362 let mut wal = self.wal.write();
363 let mut index = self.index.write();
364 let mut offset = self.write_offset.write();
365 let mut record_buf = Vec::new();
366
367 write_store_record(
368 &mut wal,
369 id,
370 payload,
371 &mut offset,
372 &mut index,
373 &mut record_buf,
374 )?;
375
376 Self::sync_wal_or_resync(&mut wal, self.durability, &mut offset)?;
377 }
378
379 self.maybe_auto_snapshot();
380 Ok(())
381 }
382
383 fn retrieve(&self, id: u64) -> io::Result<Option<serde_json::Value>> {
384 let index = self.index.read();
385 let Some(&offset) = index.get(&id) else {
386 return Ok(None);
387 };
388 drop(index);
389
390 // H-2: Only flush when DurabilityMode::None is configured, because sync_wal()
391 // already flushes the BufWriter after every write in Fsync and FlushOnly modes.
392 // Skipping this avoids acquiring the WAL write lock on every read, which would
393 // serialize all readers behind writers.
394 if self.durability == DurabilityMode::None {
395 self.wal.write().flush()?;
396 }
397
398 let mut reader = self.reader.write(); // Need write lock to seek
399 reader.seek(SeekFrom::Start(offset))?;
400
401 let mut len_bytes = [0u8; 4];
402 reader.read_exact(&mut len_bytes)?;
403 let len = u32::from_le_bytes(len_bytes) as usize;
404
405 let mut payload_bytes = vec![0u8; len];
406 reader.read_exact(&mut payload_bytes)?;
407
408 let payload = serde_json::from_slice(&payload_bytes)
409 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
410
411 Ok(Some(payload))
412 }
413
414 fn delete(&mut self, id: u64) -> io::Result<()> {
415 let crc = compute_delete_crc(id);
416
417 // Scoped block: all lock guards are released before the auto-snapshot
418 // check, which itself acquires locks (see `create_snapshot`).
419 {
420 let mut wal = self.wal.write();
421 let mut index = self.index.write();
422 let mut offset = self.write_offset.write();
423
424 // H-3: Build complete delete record in one buffer to minimize partial-write window.
425 // CRC-protected format: Marker(0xC4) | ID(8) | CRC32(4)
426 let mut record = [0u8; 1 + 8 + 4];
427 record[0] = CRC_DELETE_MARKER;
428 record[1..9].copy_from_slice(&id.to_le_bytes());
429 record[9..13].copy_from_slice(&crc.to_le_bytes());
430 wal.write_all(&record)?;
431
432 // Sync WAL according to durability mode (resync offset on failure).
433 Self::sync_wal_or_resync(&mut wal, self.durability, &mut offset)?;
434
435 *offset += 1 + 8 + 4; // Marker(1) + ID(8) + CRC32(4)
436 index.remove(&id);
437 }
438
439 self.maybe_auto_snapshot();
440 Ok(())
441 }
442
443 fn flush(&mut self) -> io::Result<()> {
444 let mut wal = self.wal.write();
445 Self::sync_wal(&mut wal, self.durability)
446 }
447
448 fn ids(&self) -> Vec<u64> {
449 self.index.read().keys().copied().collect()
450 }
451}