velesdb_core/storage/
log_payload.rs1use super::traits::PayloadStorage;
28
29use parking_lot::RwLock;
30use rustc_hash::FxHashMap;
31use std::fs::{File, OpenOptions};
32use std::io::{self, BufReader, Read, Seek, SeekFrom, Write};
33use std::path::{Path, PathBuf};
34
35pub(crate) const SNAPSHOT_MAGIC: &[u8; 4] = b"VSNP";
37
38pub(crate) const SNAPSHOT_VERSION: u8 = 1;
40
41const DEFAULT_SNAPSHOT_THRESHOLD: u64 = 10 * 1024 * 1024;
43
44#[inline]
48#[allow(clippy::cast_possible_truncation)] fn crc32_hash(data: &[u8]) -> u32 {
50 const CRC32_TABLE: [u32; 256] = {
51 let mut table = [0u32; 256];
52 let mut i = 0;
53 while i < 256 {
54 let mut crc = i as u32;
55 let mut j = 0;
56 while j < 8 {
57 if crc & 1 != 0 {
58 crc = (crc >> 1) ^ 0xEDB8_8320;
59 } else {
60 crc >>= 1;
61 }
62 j += 1;
63 }
64 table[i] = crc;
65 i += 1;
66 }
67 table
68 };
69
70 let mut crc = 0xFFFF_FFFF_u32;
71 for &byte in data {
72 let idx = ((crc ^ u32::from(byte)) & 0xFF) as usize;
73 crc = (crc >> 8) ^ CRC32_TABLE[idx];
74 }
75 !crc
76}
77
78#[allow(clippy::module_name_repetitions)]
83pub struct LogPayloadStorage {
84 path: PathBuf,
86 index: RwLock<FxHashMap<u64, u64>>,
88 wal: RwLock<io::BufWriter<File>>,
90 reader: RwLock<File>,
92 last_snapshot_wal_pos: RwLock<u64>,
94}
95
96impl LogPayloadStorage {
97 pub fn new<P: AsRef<Path>>(path: P) -> io::Result<Self> {
106 let path = path.as_ref().to_path_buf();
107 std::fs::create_dir_all(&path)?;
108 let log_path = path.join("payloads.log");
109 let snapshot_path = path.join("payloads.snapshot");
110
111 let writer_file = OpenOptions::new()
113 .create(true)
114 .append(true)
115 .open(&log_path)?;
116 let wal = io::BufWriter::new(writer_file);
117
118 if !log_path.exists() {
121 File::create(&log_path)?;
122 }
123 let reader = File::open(&log_path)?;
124 let wal_len = reader.metadata()?.len();
125
126 let (index, last_snapshot_wal_pos) =
128 if let Ok((snapshot_index, snapshot_wal_pos)) = Self::load_snapshot(&snapshot_path) {
129 let index =
131 Self::replay_wal_from(&log_path, snapshot_index, snapshot_wal_pos, wal_len)?;
132 (index, snapshot_wal_pos)
133 } else {
134 let index = Self::replay_wal_from(&log_path, FxHashMap::default(), 0, wal_len)?;
136 (index, 0)
137 };
138
139 Ok(Self {
140 path,
141 index: RwLock::new(index),
142 wal: RwLock::new(wal),
143 reader: RwLock::new(reader),
144 last_snapshot_wal_pos: RwLock::new(last_snapshot_wal_pos),
145 })
146 }
147
148 fn replay_wal_from(
150 log_path: &Path,
151 mut index: FxHashMap<u64, u64>,
152 start_pos: u64,
153 end_pos: u64,
154 ) -> io::Result<FxHashMap<u64, u64>> {
155 if start_pos >= end_pos {
156 return Ok(index);
157 }
158
159 let file = File::open(log_path)?;
160 let mut reader_buf = BufReader::new(file);
161 reader_buf.seek(SeekFrom::Start(start_pos))?;
162
163 let mut pos = start_pos;
164
165 while pos < end_pos {
166 let mut marker = [0u8; 1];
168 if reader_buf.read_exact(&mut marker).is_err() {
169 break;
170 }
171 pos += 1;
172
173 let mut id_bytes = [0u8; 8];
175 reader_buf.read_exact(&mut id_bytes)?;
176 let id = u64::from_le_bytes(id_bytes);
177 pos += 8;
178
179 if marker[0] == 1 {
180 let len_offset = pos;
182
183 let mut len_bytes = [0u8; 4];
185 reader_buf.read_exact(&mut len_bytes)?;
186 let payload_len = u64::from(u32::from_le_bytes(len_bytes));
187 pos += 4;
188
189 index.insert(id, len_offset);
190
191 let skip = i64::try_from(payload_len)
193 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Payload too large"))?;
194 reader_buf.seek(SeekFrom::Current(skip))?;
195 pos += payload_len;
196 } else if marker[0] == 2 {
197 index.remove(&id);
199 } else {
200 return Err(io::Error::new(io::ErrorKind::InvalidData, "Unknown marker"));
201 }
202 }
203
204 Ok(index)
205 }
206
207 fn load_snapshot(snapshot_path: &Path) -> io::Result<(FxHashMap<u64, u64>, u64)> {
211 if !snapshot_path.exists() {
212 return Err(io::Error::new(io::ErrorKind::NotFound, "No snapshot"));
213 }
214
215 let data = std::fs::read(snapshot_path)?;
216
217 if data.len() < 25 {
219 return Err(io::Error::new(
220 io::ErrorKind::InvalidData,
221 "Snapshot too small",
222 ));
223 }
224
225 if &data[0..4] != SNAPSHOT_MAGIC {
227 return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid magic"));
228 }
229
230 if data[4] != SNAPSHOT_VERSION {
232 return Err(io::Error::new(
233 io::ErrorKind::InvalidData,
234 "Unsupported version",
235 ));
236 }
237
238 let wal_pos = u64::from_le_bytes(
240 data[5..13]
241 .try_into()
242 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid WAL position"))?,
243 );
244
245 let entry_count_u64 = u64::from_le_bytes(
247 data[13..21]
248 .try_into()
249 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid entry count"))?,
250 );
251
252 let max_possible_entries = data.len().saturating_sub(25) / 16; if entry_count_u64 > max_possible_entries as u64 {
257 return Err(io::Error::new(
258 io::ErrorKind::InvalidData,
259 "Entry count exceeds data size",
260 ));
261 }
262
263 #[allow(clippy::cast_possible_truncation)] let entry_count = entry_count_u64 as usize;
265
266 let expected_size = 21 + entry_count * 16 + 4;
269 if data.len() != expected_size {
270 return Err(io::Error::new(io::ErrorKind::InvalidData, "Size mismatch"));
271 }
272
273 let stored_crc = u32::from_le_bytes(
275 data[data.len() - 4..]
276 .try_into()
277 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid CRC"))?,
278 );
279 let computed_crc = crc32_hash(&data[..data.len() - 4]);
280 if stored_crc != computed_crc {
281 return Err(io::Error::new(io::ErrorKind::InvalidData, "CRC mismatch"));
282 }
283
284 let mut index = FxHashMap::default();
286 index.reserve(entry_count);
287
288 let entries_start = 21;
289 for i in 0..entry_count {
290 let offset = entries_start + i * 16;
291 let id = u64::from_le_bytes(
292 data[offset..offset + 8]
293 .try_into()
294 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid entry ID"))?,
295 );
296 let wal_offset =
297 u64::from_le_bytes(data[offset + 8..offset + 16].try_into().map_err(|_| {
298 io::Error::new(io::ErrorKind::InvalidData, "Invalid entry offset")
299 })?);
300 index.insert(id, wal_offset);
301 }
302
303 Ok((index, wal_pos))
304 }
305
306 pub fn create_snapshot(&mut self) -> io::Result<()> {
317 self.wal.write().flush()?;
319
320 let snapshot_path = self.path.join("payloads.snapshot");
321 let index = self.index.read();
322
323 let wal_pos = self.wal.write().get_ref().metadata()?.len();
325
326 let entry_count = index.len();
328 let buf_size = 21 + entry_count * 16 + 4; let mut buf = Vec::with_capacity(buf_size);
330
331 buf.extend_from_slice(SNAPSHOT_MAGIC);
333 buf.push(SNAPSHOT_VERSION);
334 buf.extend_from_slice(&wal_pos.to_le_bytes());
335 buf.extend_from_slice(&(entry_count as u64).to_le_bytes());
336
337 for (&id, &offset) in index.iter() {
339 buf.extend_from_slice(&id.to_le_bytes());
340 buf.extend_from_slice(&offset.to_le_bytes());
341 }
342
343 let crc = crc32_hash(&buf);
345 buf.extend_from_slice(&crc.to_le_bytes());
346
347 let temp_path = self.path.join("payloads.snapshot.tmp");
349 std::fs::write(&temp_path, &buf)?;
350 std::fs::rename(&temp_path, &snapshot_path)?;
351
352 *self.last_snapshot_wal_pos.write() = wal_pos;
354
355 Ok(())
356 }
357
358 #[must_use]
363 pub fn should_create_snapshot(&self) -> bool {
364 let last_pos = *self.last_snapshot_wal_pos.read();
365
366 let current_pos = match self.wal.write().get_ref().metadata() {
368 Ok(m) => m.len(),
369 Err(_) => return false,
370 };
371
372 current_pos.saturating_sub(last_pos) >= DEFAULT_SNAPSHOT_THRESHOLD
373 }
374}
375
376impl PayloadStorage for LogPayloadStorage {
377 fn store(&mut self, id: u64, payload: &serde_json::Value) -> io::Result<()> {
378 let payload_bytes = serde_json::to_vec(payload)
379 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
380
381 let mut wal = self.wal.write();
382 let mut index = self.index.write();
383
384 wal.flush()?;
386 let pos = wal.get_ref().metadata()?.len();
387
388 wal.write_all(&[1u8])?;
393 wal.write_all(&id.to_le_bytes())?;
394 let len_u32 = u32::try_from(payload_bytes.len())
395 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "Payload too large"))?;
396 wal.write_all(&len_u32.to_le_bytes())?;
397 wal.write_all(&payload_bytes)?;
398
399 wal.flush()?;
401
402 index.insert(id, pos + 9);
403
404 Ok(())
405 }
406
407 fn retrieve(&self, id: u64) -> io::Result<Option<serde_json::Value>> {
408 let index = self.index.read();
409 let Some(&offset) = index.get(&id) else {
410 return Ok(None);
411 };
412 drop(index);
413
414 let mut reader = self.reader.write(); reader.seek(SeekFrom::Start(offset))?;
416
417 let mut len_bytes = [0u8; 4];
418 reader.read_exact(&mut len_bytes)?;
419 let len = u32::from_le_bytes(len_bytes) as usize;
420
421 let mut payload_bytes = vec![0u8; len];
422 reader.read_exact(&mut payload_bytes)?;
423
424 let payload = serde_json::from_slice(&payload_bytes)
425 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
426
427 Ok(Some(payload))
428 }
429
430 fn delete(&mut self, id: u64) -> io::Result<()> {
431 let mut wal = self.wal.write();
432 let mut index = self.index.write();
433
434 wal.write_all(&[2u8])?;
435 wal.write_all(&id.to_le_bytes())?;
436
437 index.remove(&id);
438
439 Ok(())
440 }
441
442 fn flush(&mut self) -> io::Result<()> {
443 self.wal.write().flush()
444 }
445
446 fn ids(&self) -> Vec<u64> {
447 self.index.read().keys().copied().collect()
448 }
449}